buildr/internal/execution/container/task.go
Peter 34c431790e
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/tag Build is failing
refactor: use connect-go instead of regular Google gRPC
- support binary name for plugins
- register plugins for container jobs
2023-09-12 18:43:34 +02:00

362 lines
10 KiB
Go

package container
import (
"archive/tar"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"sync"
"time"
"connectrpc.com/connect"
commonv1 "code.icb4dc0.de/buildr/api/generated/common/v1"
remotev1 "code.icb4dc0.de/buildr/api/generated/remote/v1"
"code.icb4dc0.de/buildr/common/protocol"
"code.icb4dc0.de/buildr/buildr/modules/plugin"
"code.icb4dc0.de/buildr/buildr/internal/rpc"
"code.icb4dc0.de/buildr/buildr/internal/errs"
"code.icb4dc0.de/buildr/buildr/modules/state"
"github.com/klauspost/compress/s2"
"code.icb4dc0.de/buildr/api/generated/remote/v1/rpcv1connect"
"code.icb4dc0.de/buildr/buildr/internal/containers"
"code.icb4dc0.de/buildr/buildr/internal/execution"
"code.icb4dc0.de/buildr/buildr/internal/ioutils"
"code.icb4dc0.de/buildr/buildr/internal/logging"
"code.icb4dc0.de/buildr/buildr/modules"
)
const (
defaultDirMode os.FileMode = 0o755
ownerExecutableMode os.FileMode = 0o700
)
type containerTask struct {
stateStore state.Store
moduleWithMeta modules.ModuleWithMeta
orchestrator *containers.Orchestrator
repo *modules.Repository
execution.TaskDependencies
once sync.Once
}
func (c *containerTask) Execute(ctx context.Context, spec execution.Spec) (err error) {
c.once.Do(func() {
err = c.doExecute(ctx, spec)
})
return err
}
//nolint:funlen,gocognit,gocyclo // doesn't make sense to split it
func (c *containerTask) doExecute(ctx context.Context, spec execution.Spec) (err error) {
if err := c.TaskDependencies.Execute(ctx, spec); err != nil {
return err
}
if outDir := c.moduleWithMeta.OutDir(); outDir != "" {
if err := os.MkdirAll(c.moduleWithMeta.OutDir(), defaultDirMode); err != nil {
return err
}
}
logger := slog.
Default().
With(
slog.String("module_name", c.moduleWithMeta.Name()),
slog.String("module_type", modules.CategoryName(c.moduleWithMeta.Category())),
)
if c.moduleWithMeta.ShouldSkip() {
logger.Info("Skipping execution as requested")
return nil
}
extraBinaries := make([]string, 0, len(c.TaskDependencies))
for _, depid := range c.moduleWithMeta.Dependencies() {
depMod := c.repo.ModuleByID(depid)
if depMod == nil {
continue
}
if binNamer, ok := depMod.Unwrap().(modules.BinaryNamer); ok {
if extraBin, err := binNamer.BinaryName(ctx); err == nil && extraBin != "" {
extraBinaries = append(extraBinaries, extraBin)
}
}
}
outputSink, err := logging.NewTaskOutputSink(
spec.LogsDirectory,
spec.LogToStdErr,
modules.LogFileNameFormatter(c.moduleWithMeta),
)
if err != nil {
return err
}
defer func() {
err = errors.Join(err, outputSink.Close())
}()
inputMappings := c.moduleWithMeta.InputMappings()
if len(inputMappings) == 0 {
inputMappings[spec.RepoRoot] = "."
}
containerSpec := c.moduleWithMeta.ContainerSpec()
buildrContainerSpec := containers.BuildRContainerSpec{
ID: c.moduleWithMeta.ID(),
ModuleName: c.moduleWithMeta.Name(),
Image: containerSpec.Image,
User: containerSpec.User,
Privileged: containerSpec.Privileged,
RepoRoot: spec.RepoRoot,
Content: inputMappings,
BinariesDir: spec.BinariesDirectory,
ExtraBinaries: extraBinaries,
Mounts: containerSpec.Mounts(),
}
logger.Debug("Preparing container")
con, baseURL, err := c.orchestrator.BuildRContainer(ctx, &buildrContainerSpec)
if err != nil {
return fmt.Errorf("failed to create container for task %s/%s: %w", c.moduleWithMeta.Type(), c.moduleWithMeta.Name(), err)
}
client := rpc.NewH2cHTTPClient()
moduleReference := &commonv1.ModuleReference{
ModuleCategory: c.moduleWithMeta.Category(),
ModuleType: c.moduleWithMeta.Type(),
}
if pluginModule, ok := c.moduleWithMeta.Unwrap().(plugin.Module); ok {
payload, err := pluginModule.PluginPayload.Bytes()
if err != nil {
return err
}
pluginMgrClient := rpcv1connect.NewPluginManagerServiceClient(client, baseURL)
registerPluginRequest := &remotev1.RegisterPluginModuleRequest{
ModuleRef: moduleReference,
PluginPayload: payload,
}
if _, err := pluginMgrClient.RegisterPluginModule(ctx, connect.NewRequest(registerPluginRequest)); err != nil {
return err
}
}
executorClient := rpcv1connect.NewExecutorServiceClient(client, baseURL)
logger.Debug("Start remote task execution")
streamClient := executorClient.ExecuteStream(ctx)
defer func() {
err = errors.Join(err, streamClient.CloseRequest())
}()
rawModule, err := protocol.Marshal(c.moduleWithMeta.Unwrap())
if err != nil {
return fmt.Errorf("failed to marshal task spec as JSON: %w", err)
}
rawModule.Category = c.moduleWithMeta.Category()
startTaskReq := &remotev1.ExecutionClientMessage{
Envelope: &remotev1.ExecutionClientMessage_StartTask{
StartTask: &remotev1.StartTaskRequest{
Buildr: &remotev1.Buildr{
Repo: &remotev1.Buildr_Repo{
Root: fmt.Sprintf("/work/%s", c.moduleWithMeta.ID()),
},
},
Reference: &remotev1.TaskReference{
Id: c.moduleWithMeta.ID(),
Name: c.moduleWithMeta.Name(),
Module: moduleReference,
},
Spec: rawModule,
},
},
}
err = streamClient.Send(startTaskReq)
if err != nil {
return fmt.Errorf("failed to send start task request: %w", err)
}
for {
ev, err := streamClient.Receive()
if err != nil {
return fmt.Errorf("failed to receive remote executor event: %w", err)
}
switch msg := ev.GetEnvelope().(type) {
case *remotev1.ExecutionServerMessage_TaskLog:
c.handleTaskLog(ctx, msg.TaskLog, logger)
case *remotev1.ExecutionServerMessage_TaskOutput:
logger.Debug("Handle task output")
if err = c.handleTaskOutput(outputSink, msg.TaskOutput); err != nil {
logger.Error("Failed to process task output", slog.String("err", err.Error()))
}
case *remotev1.ExecutionServerMessage_TaskResult:
logger.Debug("Received task result")
if errMsg := msg.TaskResult.Error; errMsg != "" {
return fmt.Errorf("failed to execute task: %s", msg.TaskResult.Error)
}
if msg.TaskResult.ModifiedFilesArchivePath != "" {
outDir := c.moduleWithMeta.OutDir()
if c.moduleWithMeta.Category() == modules.CategoryTool {
outDir = spec.BinariesDirectory
}
if err = c.handleModifiedFiles(ctx, con, msg.TaskResult.ModifiedFilesArchivePath, outDir); err != nil {
return fmt.Errorf("failed to fetch modified files from execution container: %w", err)
}
}
return nil
case *remotev1.ExecutionServerMessage_SetState:
msgIDMeta, ok := ev.Meta.(*remotev1.ExecutionServerMessage_MessageId)
if !ok {
continue
}
result := &remotev1.ExecutionClientMessage_Error{
Error: new(remotev1.Result),
}
respMsg := &remotev1.ExecutionClientMessage{
Meta: &remotev1.ExecutionClientMessage_RepliesTo{
RepliesTo: msgIDMeta.MessageId,
},
Envelope: result,
}
k := state.PlainKey(msg.SetState.Key)
if err = c.stateStore.Set(ctx, k, msg.SetState.Data); err != nil {
result.Error.Error = err.Error()
logger.Error("Failed to set state", slog.String("err", err.Error()), slog.String("key", hex.EncodeToString(k.Bytes())))
}
if err = streamClient.Send(respMsg); err != nil {
logger.Error("Failed to send set status response", errs.Attr(err))
}
case *remotev1.ExecutionServerMessage_GetState:
k := state.PlainKey(msg.GetState.Key)
msgIDMeta, ok := ev.Meta.(*remotev1.ExecutionServerMessage_MessageId)
if !ok {
continue
}
respMsg := &remotev1.ExecutionClientMessage{
Meta: &remotev1.ExecutionClientMessage_RepliesTo{
RepliesTo: msgIDMeta.MessageId,
},
}
data, _, err := c.stateStore.Get(ctx, k)
if err == nil {
respMsg.Envelope = &remotev1.ExecutionClientMessage_GetState{
GetState: &remotev1.GetStateResponse{
Key: msg.GetState.Key,
Data: data,
},
}
} else {
respMsg.Envelope = &remotev1.ExecutionClientMessage_Error{
Error: &remotev1.Result{
Error: err.Error(),
},
}
}
if err = streamClient.Send(respMsg); err != nil {
logger.Error("Failed to send get status response", errs.Attr(err))
}
}
}
}
func (c *containerTask) handleTaskOutput(sink logging.TaskOutputSink, req *remotev1.TaskOutput) (err error) {
//nolint:exhaustive // not necessary here
switch req.Source {
case remotev1.TaskOutputSource_TASK_OUTPUT_SOURCE_STDOUT:
writer := sink.StdOut()
_, err = writer.Write(req.Payload)
if syncer, ok := writer.(interface{ Sync() error }); ok {
err = errors.Join(err, syncer.Sync())
}
case remotev1.TaskOutputSource_TASK_OUTPUT_SOURCE_STDERR:
writer := sink.StdErr()
_, err = writer.Write(req.Payload)
if syncer, ok := writer.(interface{ Sync() error }); ok {
err = errors.Join(err, syncer.Sync())
}
}
return err
}
func (c *containerTask) handleTaskLog(ctx context.Context, taskLog *remotev1.TaskLog, logger *slog.Logger) {
handler := logger.Handler()
level := slog.Level(taskLog.Level)
if !handler.Enabled(ctx, level) {
return
}
rec := slog.NewRecord(time.UnixMicro(taskLog.Time), level, taskLog.Message, 0)
for i := range taskLog.Attributes {
attr := taskLog.Attributes[i]
rec.AddAttrs(slog.String(attr.GetKey(), attr.GetValue()))
}
_ = logger.Handler().Handle(ctx, rec)
}
func (c *containerTask) handleModifiedFiles(ctx context.Context, con containers.Container, modifiedFilesPath, outDir string) error {
return con.CopyFileFromContainer(ctx, modifiedFilesPath, func(_ *tar.Header, reader io.Reader) (err error) {
deflateReader := s2.NewReader(reader)
tarReader := tar.NewReader(deflateReader)
var header *tar.Header
for header, err = tarReader.Next(); err == nil; header, err = tarReader.Next() {
if header.Typeflag == tar.TypeDir {
if err := os.MkdirAll(filepath.Join(outDir, header.Name), os.FileMode(header.Mode)|ownerExecutableMode); err != nil {
return err
}
continue
}
if header.Typeflag != tar.TypeReg {
continue
}
var outFile *os.File
outFile, err = os.OpenFile(filepath.Join(outDir, header.Name), os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(header.Mode))
if err != nil {
return err
}
_, err = ioutils.CopyWithPooledBuffer(outFile, tarReader)
if err := errors.Join(err, outFile.Close()); err != nil {
return err
}
}
if errors.Is(err, io.EOF) {
return nil
}
return err
})
}