package v1 import ( "context" "errors" "fmt" "io" "log/slog" "connectrpc.com/connect" remotev1 "code.icb4dc0.de/buildr/api/generated/remote/v1" "code.icb4dc0.de/buildr/api/generated/remote/v1/rpcv1connect" "code.icb4dc0.de/buildr/buildr/internal/errs" "code.icb4dc0.de/buildr/buildr/modules" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) var ( _ rpcv1connect.ExecutorServiceHandler = (*ExecutorServiceServer)(nil) ErrExecutionCompleted = errors.New("execution completed") ) func NewExecutorServiceServer(registry *modules.TypeRegistry) rpcv1connect.ExecutorServiceHandler { return &ExecutorServiceServer{ registry: registry, } } type ExecutorServiceServer struct { registry *modules.TypeRegistry } func (e *ExecutorServiceServer) ExecuteStream(ctx context.Context, stream *connect.BidiStream[remotev1.ExecutionClientMessage, remotev1.ExecutionServerMessage]) (err error) { ctx, cancel := context.WithCancelCause(ctx) defer cancel(errors.New("stream closed")) defer func() { if r := recover(); r != nil { err = errors.Join(err, fmt.Errorf("module exeuction paniced: %v", r)) } }() logger := slog.New(NewGrpcExecutorHandler(stream)) requestClient := NewRequestResponseClient(logger, stream) request, err := stream.Receive() if err != nil { if errors.Is(err, io.EOF) { return nil } else { return err } } unwrapped, ok := request.GetEnvelope().(*remotev1.ExecutionClientMessage_StartTask) if !ok { return status.Error(codes.InvalidArgument, "expected StartTask as first message in stream") } executor := RemoteTaskExecutor{ logger: logger, registry: e.registry, requestClient: requestClient, sender: stream, } clientErrs := requestClient.DispatchMessages(stream) resp, execErrs := executor.Execute(ctx, unwrapped.StartTask) select { case <-ctx.Done(): return status.Error(codes.Canceled, ctx.Err().Error()) case e := <-clientErrs: logger.Error("Error occurred while handling client - server communication", errs.Attr(e)) case e := <-execErrs: logger.Error("Error occurred while executing module", errs.Attr(e)) return status.Error(codes.Internal, e.Error()) case r := <-resp: _ = stream.Send(&remotev1.ExecutionServerMessage{ Envelope: &remotev1.ExecutionServerMessage_TaskResult{ TaskResult: r, }, }) } return nil }