package container import ( "archive/tar" "context" "encoding/hex" "errors" "fmt" "io" "log/slog" "os" "path/filepath" "sync" "time" "connectrpc.com/connect" commonv1 "code.icb4dc0.de/buildr/api/generated/common/v1" remotev1 "code.icb4dc0.de/buildr/api/generated/remote/v1" "code.icb4dc0.de/buildr/common/protocol" "code.icb4dc0.de/buildr/buildr/modules/plugin" "code.icb4dc0.de/buildr/buildr/internal/rpc" "code.icb4dc0.de/buildr/buildr/internal/errs" "code.icb4dc0.de/buildr/buildr/modules/state" "github.com/klauspost/compress/s2" "code.icb4dc0.de/buildr/api/generated/remote/v1/rpcv1connect" "code.icb4dc0.de/buildr/buildr/internal/containers" "code.icb4dc0.de/buildr/buildr/internal/execution" "code.icb4dc0.de/buildr/buildr/internal/ioutils" "code.icb4dc0.de/buildr/buildr/internal/logging" "code.icb4dc0.de/buildr/buildr/modules" ) const ( defaultDirMode os.FileMode = 0o755 ownerExecutableMode os.FileMode = 0o700 ) type containerTask struct { stateStore state.Store moduleWithMeta modules.ModuleWithMeta orchestrator *containers.Orchestrator repo *modules.Repository execution.TaskDependencies once sync.Once } func (c *containerTask) Execute(ctx context.Context, spec execution.Spec) (err error) { c.once.Do(func() { err = c.doExecute(ctx, spec) }) return err } //nolint:funlen,gocognit,gocyclo // doesn't make sense to split it func (c *containerTask) doExecute(ctx context.Context, spec execution.Spec) (err error) { if err := c.TaskDependencies.Execute(ctx, spec); err != nil { return err } if outDir := c.moduleWithMeta.OutDir(); outDir != "" { if err := os.MkdirAll(c.moduleWithMeta.OutDir(), defaultDirMode); err != nil { return err } } logger := slog. Default(). With( slog.String("module_name", c.moduleWithMeta.Name()), slog.String("module_type", modules.CategoryName(c.moduleWithMeta.Category())), ) if c.moduleWithMeta.ShouldSkip() { logger.Info("Skipping execution as requested") return nil } extraBinaries := make([]string, 0, len(c.TaskDependencies)) for _, depid := range c.moduleWithMeta.Dependencies() { depMod := c.repo.ModuleByID(depid) if depMod == nil { continue } if binNamer, ok := depMod.Unwrap().(modules.BinaryNamer); ok { if extraBin, err := binNamer.BinaryName(ctx); err == nil && extraBin != "" { extraBinaries = append(extraBinaries, extraBin) } } } outputSink, err := logging.NewTaskOutputSink( spec.LogsDirectory, spec.LogToStdErr, modules.LogFileNameFormatter(c.moduleWithMeta), ) if err != nil { return err } defer func() { err = errors.Join(err, outputSink.Close()) }() inputMappings := c.moduleWithMeta.InputMappings() if len(inputMappings) == 0 { inputMappings[spec.RepoRoot] = "." } containerSpec := c.moduleWithMeta.ContainerSpec() buildrContainerSpec := containers.BuildRContainerSpec{ ID: c.moduleWithMeta.ID(), ModuleName: c.moduleWithMeta.Name(), Image: containerSpec.Image, User: containerSpec.User, Privileged: containerSpec.Privileged, RepoRoot: spec.RepoRoot, Content: inputMappings, BinariesDir: spec.BinariesDirectory, ExtraBinaries: extraBinaries, Mounts: containerSpec.Mounts(), } logger.Debug("Preparing container") con, baseURL, err := c.orchestrator.BuildRContainer(ctx, &buildrContainerSpec) if err != nil { return fmt.Errorf("failed to create container for task %s/%s: %w", c.moduleWithMeta.Type(), c.moduleWithMeta.Name(), err) } client := rpc.NewH2cHTTPClient() moduleReference := &commonv1.ModuleReference{ ModuleCategory: c.moduleWithMeta.Category(), ModuleType: c.moduleWithMeta.Type(), } if pluginModule, ok := c.moduleWithMeta.Unwrap().(plugin.Module); ok { payload, err := pluginModule.PluginPayload.Bytes() if err != nil { return err } pluginMgrClient := rpcv1connect.NewPluginManagerServiceClient(client, baseURL) registerPluginRequest := &remotev1.RegisterPluginModuleRequest{ ModuleRef: moduleReference, PluginPayload: payload, } if _, err := pluginMgrClient.RegisterPluginModule(ctx, connect.NewRequest(registerPluginRequest)); err != nil { return err } } executorClient := rpcv1connect.NewExecutorServiceClient(client, baseURL) logger.Debug("Start remote task execution") streamClient := executorClient.ExecuteStream(ctx) defer func() { err = errors.Join(err, streamClient.CloseRequest()) }() rawModule, err := protocol.Marshal(c.moduleWithMeta.Unwrap()) if err != nil { return fmt.Errorf("failed to marshal task spec as JSON: %w", err) } rawModule.Category = c.moduleWithMeta.Category() startTaskReq := &remotev1.ExecutionClientMessage{ Envelope: &remotev1.ExecutionClientMessage_StartTask{ StartTask: &remotev1.StartTaskRequest{ Buildr: &remotev1.Buildr{ Repo: &remotev1.Buildr_Repo{ Root: fmt.Sprintf("/work/%s", c.moduleWithMeta.ID()), }, }, Reference: &remotev1.TaskReference{ Id: c.moduleWithMeta.ID(), Name: c.moduleWithMeta.Name(), Module: moduleReference, }, Spec: rawModule, }, }, } err = streamClient.Send(startTaskReq) if err != nil { return fmt.Errorf("failed to send start task request: %w", err) } for { ev, err := streamClient.Receive() if err != nil { return fmt.Errorf("failed to receive remote executor event: %w", err) } switch msg := ev.GetEnvelope().(type) { case *remotev1.ExecutionServerMessage_TaskLog: c.handleTaskLog(ctx, msg.TaskLog, logger) case *remotev1.ExecutionServerMessage_TaskOutput: logger.Debug("Handle task output") if err = c.handleTaskOutput(outputSink, msg.TaskOutput); err != nil { logger.Error("Failed to process task output", slog.String("err", err.Error())) } case *remotev1.ExecutionServerMessage_TaskResult: logger.Debug("Received task result") if errMsg := msg.TaskResult.Error; errMsg != "" { return fmt.Errorf("failed to execute task: %s", msg.TaskResult.Error) } if msg.TaskResult.ModifiedFilesArchivePath != "" { outDir := c.moduleWithMeta.OutDir() if c.moduleWithMeta.Category() == modules.CategoryTool { outDir = spec.BinariesDirectory } if err = c.handleModifiedFiles(ctx, con, msg.TaskResult.ModifiedFilesArchivePath, outDir); err != nil { return fmt.Errorf("failed to fetch modified files from execution container: %w", err) } } return nil case *remotev1.ExecutionServerMessage_SetState: msgIDMeta, ok := ev.Meta.(*remotev1.ExecutionServerMessage_MessageId) if !ok { continue } result := &remotev1.ExecutionClientMessage_Error{ Error: new(remotev1.Result), } respMsg := &remotev1.ExecutionClientMessage{ Meta: &remotev1.ExecutionClientMessage_RepliesTo{ RepliesTo: msgIDMeta.MessageId, }, Envelope: result, } k := state.PlainKey(msg.SetState.Key) if err = c.stateStore.Set(ctx, k, msg.SetState.Data); err != nil { result.Error.Error = err.Error() logger.Error("Failed to set state", slog.String("err", err.Error()), slog.String("key", hex.EncodeToString(k.Bytes()))) } if err = streamClient.Send(respMsg); err != nil { logger.Error("Failed to send set status response", errs.Attr(err)) } case *remotev1.ExecutionServerMessage_GetState: k := state.PlainKey(msg.GetState.Key) msgIDMeta, ok := ev.Meta.(*remotev1.ExecutionServerMessage_MessageId) if !ok { continue } respMsg := &remotev1.ExecutionClientMessage{ Meta: &remotev1.ExecutionClientMessage_RepliesTo{ RepliesTo: msgIDMeta.MessageId, }, } data, _, err := c.stateStore.Get(ctx, k) if err == nil { respMsg.Envelope = &remotev1.ExecutionClientMessage_GetState{ GetState: &remotev1.GetStateResponse{ Key: msg.GetState.Key, Data: data, }, } } else { respMsg.Envelope = &remotev1.ExecutionClientMessage_Error{ Error: &remotev1.Result{ Error: err.Error(), }, } } if err = streamClient.Send(respMsg); err != nil { logger.Error("Failed to send get status response", errs.Attr(err)) } } } } func (c *containerTask) handleTaskOutput(sink logging.TaskOutputSink, req *remotev1.TaskOutput) (err error) { //nolint:exhaustive // not necessary here switch req.Source { case remotev1.TaskOutputSource_TASK_OUTPUT_SOURCE_STDOUT: writer := sink.StdOut() _, err = writer.Write(req.Payload) if syncer, ok := writer.(interface{ Sync() error }); ok { err = errors.Join(err, syncer.Sync()) } case remotev1.TaskOutputSource_TASK_OUTPUT_SOURCE_STDERR: writer := sink.StdErr() _, err = writer.Write(req.Payload) if syncer, ok := writer.(interface{ Sync() error }); ok { err = errors.Join(err, syncer.Sync()) } } return err } func (c *containerTask) handleTaskLog(ctx context.Context, taskLog *remotev1.TaskLog, logger *slog.Logger) { handler := logger.Handler() level := slog.Level(taskLog.Level) if !handler.Enabled(ctx, level) { return } rec := slog.NewRecord(time.UnixMicro(taskLog.Time), level, taskLog.Message, 0) for i := range taskLog.Attributes { attr := taskLog.Attributes[i] rec.AddAttrs(slog.String(attr.GetKey(), attr.GetValue())) } _ = logger.Handler().Handle(ctx, rec) } func (c *containerTask) handleModifiedFiles(ctx context.Context, con containers.Container, modifiedFilesPath, outDir string) error { return con.CopyFileFromContainer(ctx, modifiedFilesPath, func(_ *tar.Header, reader io.Reader) (err error) { deflateReader := s2.NewReader(reader) tarReader := tar.NewReader(deflateReader) var header *tar.Header for header, err = tarReader.Next(); err == nil; header, err = tarReader.Next() { if header.Typeflag == tar.TypeDir { if err := os.MkdirAll(filepath.Join(outDir, header.Name), os.FileMode(header.Mode)|ownerExecutableMode); err != nil { return err } continue } if header.Typeflag != tar.TypeReg { continue } var outFile *os.File outFile, err = os.OpenFile(filepath.Join(outDir, header.Name), os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(header.Mode)) if err != nil { return err } _, err = ioutils.CopyWithPooledBuffer(outFile, tarReader) if err := errors.Join(err, outFile.Close()); err != nil { return err } } if errors.Is(err, io.EOF) { return nil } return err }) }