362 lines
10 KiB
Go
362 lines
10 KiB
Go
package container
|
|
|
|
import (
|
|
"archive/tar"
|
|
"context"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"connectrpc.com/connect"
|
|
|
|
commonv1 "code.icb4dc0.de/buildr/api/generated/common/v1"
|
|
remotev1 "code.icb4dc0.de/buildr/api/generated/remote/v1"
|
|
"code.icb4dc0.de/buildr/common/protocol"
|
|
|
|
"code.icb4dc0.de/buildr/buildr/modules/plugin"
|
|
|
|
"code.icb4dc0.de/buildr/buildr/internal/rpc"
|
|
|
|
"code.icb4dc0.de/buildr/buildr/internal/errs"
|
|
"code.icb4dc0.de/buildr/buildr/modules/state"
|
|
|
|
"github.com/klauspost/compress/s2"
|
|
|
|
"code.icb4dc0.de/buildr/api/generated/remote/v1/rpcv1connect"
|
|
|
|
"code.icb4dc0.de/buildr/buildr/internal/containers"
|
|
"code.icb4dc0.de/buildr/buildr/internal/execution"
|
|
"code.icb4dc0.de/buildr/buildr/internal/ioutils"
|
|
"code.icb4dc0.de/buildr/buildr/internal/logging"
|
|
"code.icb4dc0.de/buildr/buildr/modules"
|
|
)
|
|
|
|
const (
|
|
defaultDirMode os.FileMode = 0o755
|
|
ownerExecutableMode os.FileMode = 0o700
|
|
)
|
|
|
|
type containerTask struct {
|
|
stateStore state.Store
|
|
moduleWithMeta modules.ModuleWithMeta
|
|
orchestrator *containers.Orchestrator
|
|
repo *modules.Repository
|
|
execution.TaskDependencies
|
|
once sync.Once
|
|
}
|
|
|
|
func (c *containerTask) Execute(ctx context.Context, spec execution.Spec) (err error) {
|
|
c.once.Do(func() {
|
|
err = c.doExecute(ctx, spec)
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
//nolint:funlen,gocognit,gocyclo // doesn't make sense to split it
|
|
func (c *containerTask) doExecute(ctx context.Context, spec execution.Spec) (err error) {
|
|
if err := c.TaskDependencies.Execute(ctx, spec); err != nil {
|
|
return err
|
|
}
|
|
|
|
if outDir := c.moduleWithMeta.OutDir(); outDir != "" {
|
|
if err := os.MkdirAll(c.moduleWithMeta.OutDir(), defaultDirMode); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
logger := slog.
|
|
Default().
|
|
With(
|
|
slog.String("module_name", c.moduleWithMeta.Name()),
|
|
slog.String("module_type", modules.CategoryName(c.moduleWithMeta.Category())),
|
|
)
|
|
|
|
if c.moduleWithMeta.ShouldSkip() {
|
|
logger.Info("Skipping execution as requested")
|
|
return nil
|
|
}
|
|
|
|
extraBinaries := make([]string, 0, len(c.TaskDependencies))
|
|
|
|
for _, depid := range c.moduleWithMeta.Dependencies() {
|
|
depMod := c.repo.ModuleByID(depid)
|
|
if depMod == nil {
|
|
continue
|
|
}
|
|
if binNamer, ok := depMod.Unwrap().(modules.BinaryNamer); ok {
|
|
if extraBin, err := binNamer.BinaryName(ctx); err == nil && extraBin != "" {
|
|
extraBinaries = append(extraBinaries, extraBin)
|
|
}
|
|
}
|
|
}
|
|
|
|
outputSink, err := logging.NewTaskOutputSink(
|
|
spec.LogsDirectory,
|
|
spec.LogToStdErr,
|
|
modules.LogFileNameFormatter(c.moduleWithMeta),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
err = errors.Join(err, outputSink.Close())
|
|
}()
|
|
|
|
inputMappings := c.moduleWithMeta.InputMappings()
|
|
|
|
if len(inputMappings) == 0 {
|
|
inputMappings[spec.RepoRoot] = "."
|
|
}
|
|
|
|
containerSpec := c.moduleWithMeta.ContainerSpec()
|
|
buildrContainerSpec := containers.BuildRContainerSpec{
|
|
ID: c.moduleWithMeta.ID(),
|
|
ModuleName: c.moduleWithMeta.Name(),
|
|
Image: containerSpec.Image,
|
|
User: containerSpec.User,
|
|
Privileged: containerSpec.Privileged,
|
|
RepoRoot: spec.RepoRoot,
|
|
Content: inputMappings,
|
|
BinariesDir: spec.BinariesDirectory,
|
|
ExtraBinaries: extraBinaries,
|
|
Mounts: containerSpec.Mounts(),
|
|
}
|
|
|
|
logger.Debug("Preparing container")
|
|
con, baseURL, err := c.orchestrator.BuildRContainer(ctx, &buildrContainerSpec)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create container for task %s/%s: %w", c.moduleWithMeta.Type(), c.moduleWithMeta.Name(), err)
|
|
}
|
|
|
|
client := rpc.NewH2cHTTPClient()
|
|
moduleReference := &commonv1.ModuleReference{
|
|
ModuleCategory: c.moduleWithMeta.Category(),
|
|
ModuleType: c.moduleWithMeta.Type(),
|
|
}
|
|
|
|
if pluginModule, ok := c.moduleWithMeta.Unwrap().(plugin.Module); ok {
|
|
payload, err := pluginModule.PluginPayload.Bytes()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pluginMgrClient := rpcv1connect.NewPluginManagerServiceClient(client, baseURL)
|
|
registerPluginRequest := &remotev1.RegisterPluginModuleRequest{
|
|
ModuleRef: moduleReference,
|
|
PluginPayload: payload,
|
|
}
|
|
if _, err := pluginMgrClient.RegisterPluginModule(ctx, connect.NewRequest(registerPluginRequest)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
executorClient := rpcv1connect.NewExecutorServiceClient(client, baseURL)
|
|
|
|
logger.Debug("Start remote task execution")
|
|
streamClient := executorClient.ExecuteStream(ctx)
|
|
|
|
defer func() {
|
|
err = errors.Join(err, streamClient.CloseRequest())
|
|
}()
|
|
|
|
rawModule, err := protocol.Marshal(c.moduleWithMeta.Unwrap())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal task spec as JSON: %w", err)
|
|
}
|
|
|
|
rawModule.Category = c.moduleWithMeta.Category()
|
|
|
|
startTaskReq := &remotev1.ExecutionClientMessage{
|
|
Envelope: &remotev1.ExecutionClientMessage_StartTask{
|
|
StartTask: &remotev1.StartTaskRequest{
|
|
Buildr: &remotev1.Buildr{
|
|
Repo: &remotev1.Buildr_Repo{
|
|
Root: fmt.Sprintf("/work/%s", c.moduleWithMeta.ID()),
|
|
},
|
|
},
|
|
Reference: &remotev1.TaskReference{
|
|
Id: c.moduleWithMeta.ID(),
|
|
Name: c.moduleWithMeta.Name(),
|
|
Module: moduleReference,
|
|
},
|
|
Spec: rawModule,
|
|
},
|
|
},
|
|
}
|
|
|
|
err = streamClient.Send(startTaskReq)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send start task request: %w", err)
|
|
}
|
|
|
|
for {
|
|
ev, err := streamClient.Receive()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to receive remote executor event: %w", err)
|
|
}
|
|
|
|
switch msg := ev.GetEnvelope().(type) {
|
|
case *remotev1.ExecutionServerMessage_TaskLog:
|
|
c.handleTaskLog(ctx, msg.TaskLog, logger)
|
|
case *remotev1.ExecutionServerMessage_TaskOutput:
|
|
logger.Debug("Handle task output")
|
|
if err = c.handleTaskOutput(outputSink, msg.TaskOutput); err != nil {
|
|
logger.Error("Failed to process task output", slog.String("err", err.Error()))
|
|
}
|
|
case *remotev1.ExecutionServerMessage_TaskResult:
|
|
logger.Debug("Received task result")
|
|
if errMsg := msg.TaskResult.Error; errMsg != "" {
|
|
return fmt.Errorf("failed to execute task: %s", msg.TaskResult.Error)
|
|
}
|
|
|
|
if msg.TaskResult.ModifiedFilesArchivePath != "" {
|
|
outDir := c.moduleWithMeta.OutDir()
|
|
if c.moduleWithMeta.Category() == modules.CategoryTool {
|
|
outDir = spec.BinariesDirectory
|
|
}
|
|
if err = c.handleModifiedFiles(ctx, con, msg.TaskResult.ModifiedFilesArchivePath, outDir); err != nil {
|
|
return fmt.Errorf("failed to fetch modified files from execution container: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
case *remotev1.ExecutionServerMessage_SetState:
|
|
msgIDMeta, ok := ev.Meta.(*remotev1.ExecutionServerMessage_MessageId)
|
|
if !ok {
|
|
continue
|
|
}
|
|
result := &remotev1.ExecutionClientMessage_Error{
|
|
Error: new(remotev1.Result),
|
|
}
|
|
respMsg := &remotev1.ExecutionClientMessage{
|
|
Meta: &remotev1.ExecutionClientMessage_RepliesTo{
|
|
RepliesTo: msgIDMeta.MessageId,
|
|
},
|
|
Envelope: result,
|
|
}
|
|
|
|
k := state.PlainKey(msg.SetState.Key)
|
|
if err = c.stateStore.Set(ctx, k, msg.SetState.Data); err != nil {
|
|
result.Error.Error = err.Error()
|
|
logger.Error("Failed to set state", slog.String("err", err.Error()), slog.String("key", hex.EncodeToString(k.Bytes())))
|
|
}
|
|
|
|
if err = streamClient.Send(respMsg); err != nil {
|
|
logger.Error("Failed to send set status response", errs.Attr(err))
|
|
}
|
|
case *remotev1.ExecutionServerMessage_GetState:
|
|
k := state.PlainKey(msg.GetState.Key)
|
|
msgIDMeta, ok := ev.Meta.(*remotev1.ExecutionServerMessage_MessageId)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
respMsg := &remotev1.ExecutionClientMessage{
|
|
Meta: &remotev1.ExecutionClientMessage_RepliesTo{
|
|
RepliesTo: msgIDMeta.MessageId,
|
|
},
|
|
}
|
|
|
|
data, _, err := c.stateStore.Get(ctx, k)
|
|
if err == nil {
|
|
respMsg.Envelope = &remotev1.ExecutionClientMessage_GetState{
|
|
GetState: &remotev1.GetStateResponse{
|
|
Key: msg.GetState.Key,
|
|
Data: data,
|
|
},
|
|
}
|
|
} else {
|
|
respMsg.Envelope = &remotev1.ExecutionClientMessage_Error{
|
|
Error: &remotev1.Result{
|
|
Error: err.Error(),
|
|
},
|
|
}
|
|
}
|
|
|
|
if err = streamClient.Send(respMsg); err != nil {
|
|
logger.Error("Failed to send get status response", errs.Attr(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *containerTask) handleTaskOutput(sink logging.TaskOutputSink, req *remotev1.TaskOutput) (err error) {
|
|
//nolint:exhaustive // not necessary here
|
|
switch req.Source {
|
|
case remotev1.TaskOutputSource_TASK_OUTPUT_SOURCE_STDOUT:
|
|
writer := sink.StdOut()
|
|
_, err = writer.Write(req.Payload)
|
|
if syncer, ok := writer.(interface{ Sync() error }); ok {
|
|
err = errors.Join(err, syncer.Sync())
|
|
}
|
|
case remotev1.TaskOutputSource_TASK_OUTPUT_SOURCE_STDERR:
|
|
writer := sink.StdErr()
|
|
_, err = writer.Write(req.Payload)
|
|
if syncer, ok := writer.(interface{ Sync() error }); ok {
|
|
err = errors.Join(err, syncer.Sync())
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *containerTask) handleTaskLog(ctx context.Context, taskLog *remotev1.TaskLog, logger *slog.Logger) {
|
|
handler := logger.Handler()
|
|
level := slog.Level(taskLog.Level)
|
|
if !handler.Enabled(ctx, level) {
|
|
return
|
|
}
|
|
|
|
rec := slog.NewRecord(time.UnixMicro(taskLog.Time), level, taskLog.Message, 0)
|
|
for i := range taskLog.Attributes {
|
|
attr := taskLog.Attributes[i]
|
|
rec.AddAttrs(slog.String(attr.GetKey(), attr.GetValue()))
|
|
}
|
|
_ = logger.Handler().Handle(ctx, rec)
|
|
}
|
|
|
|
func (c *containerTask) handleModifiedFiles(ctx context.Context, con containers.Container, modifiedFilesPath, outDir string) error {
|
|
return con.CopyFileFromContainer(ctx, modifiedFilesPath, func(_ *tar.Header, reader io.Reader) (err error) {
|
|
deflateReader := s2.NewReader(reader)
|
|
tarReader := tar.NewReader(deflateReader)
|
|
|
|
var header *tar.Header
|
|
for header, err = tarReader.Next(); err == nil; header, err = tarReader.Next() {
|
|
if header.Typeflag == tar.TypeDir {
|
|
if err := os.MkdirAll(filepath.Join(outDir, header.Name), os.FileMode(header.Mode)|ownerExecutableMode); err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
|
|
if header.Typeflag != tar.TypeReg {
|
|
continue
|
|
}
|
|
|
|
var outFile *os.File
|
|
outFile, err = os.OpenFile(filepath.Join(outDir, header.Name), os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(header.Mode))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = ioutils.CopyWithPooledBuffer(outFile, tarReader)
|
|
if err := errors.Join(err, outFile.Close()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if errors.Is(err, io.EOF) {
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
})
|
|
}
|