92 lines
2.4 KiB
Go
92 lines
2.4 KiB
Go
package v1
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
|
|
"connectrpc.com/connect"
|
|
|
|
remotev1 "code.icb4dc0.de/buildr/api/generated/remote/v1"
|
|
"code.icb4dc0.de/buildr/api/generated/remote/v1/rpcv1connect"
|
|
|
|
"code.icb4dc0.de/buildr/buildr/internal/errs"
|
|
"code.icb4dc0.de/buildr/buildr/modules"
|
|
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
var (
|
|
_ rpcv1connect.ExecutorServiceHandler = (*ExecutorServiceServer)(nil)
|
|
ErrExecutionCompleted = errors.New("execution completed")
|
|
)
|
|
|
|
func NewExecutorServiceServer(registry *modules.TypeRegistry) rpcv1connect.ExecutorServiceHandler {
|
|
return &ExecutorServiceServer{
|
|
registry: registry,
|
|
}
|
|
}
|
|
|
|
type ExecutorServiceServer struct {
|
|
registry *modules.TypeRegistry
|
|
}
|
|
|
|
func (e *ExecutorServiceServer) ExecuteStream(ctx context.Context, stream *connect.BidiStream[remotev1.ExecutionClientMessage, remotev1.ExecutionServerMessage]) (err error) {
|
|
ctx, cancel := context.WithCancelCause(ctx)
|
|
defer cancel(errors.New("stream closed"))
|
|
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
err = errors.Join(err, fmt.Errorf("module exeuction paniced: %v", r))
|
|
}
|
|
}()
|
|
|
|
logger := slog.New(NewGrpcExecutorHandler(stream))
|
|
requestClient := NewRequestResponseClient(logger, stream)
|
|
|
|
request, err := stream.Receive()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
return nil
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
|
|
unwrapped, ok := request.GetEnvelope().(*remotev1.ExecutionClientMessage_StartTask)
|
|
if !ok {
|
|
return status.Error(codes.InvalidArgument, "expected StartTask as first message in stream")
|
|
}
|
|
|
|
executor := RemoteTaskExecutor{
|
|
logger: logger,
|
|
registry: e.registry,
|
|
requestClient: requestClient,
|
|
sender: stream,
|
|
}
|
|
|
|
clientErrs := requestClient.DispatchMessages(stream)
|
|
resp, execErrs := executor.Execute(ctx, unwrapped.StartTask)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return status.Error(codes.Canceled, ctx.Err().Error())
|
|
case e := <-clientErrs:
|
|
logger.Error("Error occurred while handling client - server communication", errs.Attr(e))
|
|
case e := <-execErrs:
|
|
logger.Error("Error occurred while executing module", errs.Attr(e))
|
|
return status.Error(codes.Internal, e.Error())
|
|
case r := <-resp:
|
|
_ = stream.Send(&remotev1.ExecutionServerMessage{
|
|
Envelope: &remotev1.ExecutionServerMessage_TaskResult{
|
|
TaskResult: r,
|
|
},
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|