buildr/internal/rpc/v1/executor_server.go
Peter 34c431790e
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/tag Build is failing
refactor: use connect-go instead of regular Google gRPC
- support binary name for plugins
- register plugins for container jobs
2023-09-12 18:43:34 +02:00

92 lines
2.4 KiB
Go

package v1
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"connectrpc.com/connect"
remotev1 "code.icb4dc0.de/buildr/api/generated/remote/v1"
"code.icb4dc0.de/buildr/api/generated/remote/v1/rpcv1connect"
"code.icb4dc0.de/buildr/buildr/internal/errs"
"code.icb4dc0.de/buildr/buildr/modules"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
_ rpcv1connect.ExecutorServiceHandler = (*ExecutorServiceServer)(nil)
ErrExecutionCompleted = errors.New("execution completed")
)
func NewExecutorServiceServer(registry *modules.TypeRegistry) rpcv1connect.ExecutorServiceHandler {
return &ExecutorServiceServer{
registry: registry,
}
}
type ExecutorServiceServer struct {
registry *modules.TypeRegistry
}
func (e *ExecutorServiceServer) ExecuteStream(ctx context.Context, stream *connect.BidiStream[remotev1.ExecutionClientMessage, remotev1.ExecutionServerMessage]) (err error) {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("stream closed"))
defer func() {
if r := recover(); r != nil {
err = errors.Join(err, fmt.Errorf("module exeuction paniced: %v", r))
}
}()
logger := slog.New(NewGrpcExecutorHandler(stream))
requestClient := NewRequestResponseClient(logger, stream)
request, err := stream.Receive()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
} else {
return err
}
}
unwrapped, ok := request.GetEnvelope().(*remotev1.ExecutionClientMessage_StartTask)
if !ok {
return status.Error(codes.InvalidArgument, "expected StartTask as first message in stream")
}
executor := RemoteTaskExecutor{
logger: logger,
registry: e.registry,
requestClient: requestClient,
sender: stream,
}
clientErrs := requestClient.DispatchMessages(stream)
resp, execErrs := executor.Execute(ctx, unwrapped.StartTask)
select {
case <-ctx.Done():
return status.Error(codes.Canceled, ctx.Err().Error())
case e := <-clientErrs:
logger.Error("Error occurred while handling client - server communication", errs.Attr(e))
case e := <-execErrs:
logger.Error("Error occurred while executing module", errs.Attr(e))
return status.Error(codes.Internal, e.Error())
case r := <-resp:
_ = stream.Send(&remotev1.ExecutionServerMessage{
Envelope: &remotev1.ExecutionServerMessage_TaskResult{
TaskResult: r,
},
})
}
return nil
}