feat(task): stream output from tasks on the fly

This commit is contained in:
Peter 2023-04-18 21:30:46 +02:00
parent 0e0207ee64
commit 3e617cedbd
Signed by: prskr
GPG key ID: C1DB5D2E8DB512F9
13 changed files with 297 additions and 113 deletions

View file

@ -1,16 +1,40 @@
# Remote execution
Currently remote execution only applies to execution of modules in container.
Theoretically it could also be used e.g. to execute modules on other machines via SCP/SSH.
## Sequence diagram overview
The following sequence diagram illustrates the remote protocol:
```mermaid
sequenceDiagram
client ->>+ executor: StartTaskRequest
client ->> client: listen for executor events
executor ->> executor: Start FS watcher
executor ->> executor: Unmarshal task and execute
loop during execution
executor -->> client: TaskLog - stream logs
executor -->> client: TaskCreatedFile - notify about created file
loop stream task output & logs
executor -->> client: TaskLog
executor -->> client: TaskOutput
end
executor -->>- client: TaskResult - as soon as execution completes
```
```
## Execution steps for container tasks
The overall execution looks like this:
1. create container network
1. create container
1. copy buildr binary, tools and content into container
- if input mapping is configured, content is copied as it would be mapped
- if no input mapping is configured complete repository is copied
1. start container with buildr as entrypoint (gRPC server)
1. connect to gRPC server
1. send `StartTaskRequest` specifying what to do - although it's called **TaskRequest** it could be any module
1. server starts a FS watcher to monitor modified files
1. buildr CLI listens for logs and output from executed task
1. as soon as the execution completes, modified files are collected into a .tar.s2 file and error message + location of the archive are sent as result to buildr CLI
1. CLI copies archive back and extracts modified files to output directory

View file

@ -30,9 +30,18 @@ message StartTaskRequest {
message TaskResult {
string error = 1;
bytes stdout = 2;
bytes stderr = 3;
string modified_files_archive_path = 4;
string modified_files_archive_path = 2;
}
enum TaskOutputSource {
TASK_OUTPUT_SOURCE_UNKNOWN = 0;
TASK_OUTPUT_SOURCE_STDOUT = 1;
TASK_OUTPUT_SOURCE_STDERR = 2;
}
message TaskOutput {
TaskOutputSource source = 1;
bytes payload = 2;
}
message TaskLog {
@ -58,6 +67,7 @@ message ExecuteTaskResponse {
oneof envelope {
TaskResult task_result = 11;
TaskLog task_log = 12;
TaskOutput task_output = 13;
}
}

View file

@ -17,6 +17,7 @@ import (
"code.icb4dc0.de/buildr/buildr/internal/containers"
"code.icb4dc0.de/buildr/buildr/internal/execution"
rpcv1 "code.icb4dc0.de/buildr/buildr/internal/generated/rpc/v1"
"code.icb4dc0.de/buildr/buildr/internal/logging"
"code.icb4dc0.de/buildr/buildr/modules"
"code.icb4dc0.de/buildr/buildr/modules/buildr"
)
@ -63,6 +64,15 @@ func (c *containerTask) doExecute(ctx context.Context, b buildr.Buildr) (err err
ToolsDir: b.Config.ToolsDirectory,
}
outputSink, err := logging.NewTaskOutputSink(b.Config.Logging, modules.LogFileNameFormatter(c.moduleWithMeta))
if err != nil {
return err
}
defer func() {
err = errors.Join(err, outputSink.Close())
}()
con, grpcConn, err := c.orchestrator.BuildRContainer(ctx, spec)
if err != nil {
return err
@ -116,11 +126,11 @@ func (c *containerTask) doExecute(ctx context.Context, b buildr.Buildr) (err err
switch msg := ev.GetEnvelope().(type) {
case *rpcv1.ExecuteTaskResponse_TaskLog:
c.handleTaskLog(ctx, msg.TaskLog, logger)
case *rpcv1.ExecuteTaskResponse_TaskResult:
if err := c.handleTaskOutput(msg.TaskResult, b); err != nil {
slog.Error("Failed to write task output", slog.String("err", err.Error()))
case *rpcv1.ExecuteTaskResponse_TaskOutput:
if err := c.handleTaskOutput(outputSink, msg.TaskOutput); err != nil {
logger.Error("Failed to process task output", slog.String("err", err.Error()))
}
case *rpcv1.ExecuteTaskResponse_TaskResult:
if errMsg := msg.TaskResult.Error; errMsg != "" {
return fmt.Errorf("failed to execute task: %s", msg.TaskResult.Error)
}
@ -136,6 +146,25 @@ func (c *containerTask) doExecute(ctx context.Context, b buildr.Buildr) (err err
}
}
func (c *containerTask) handleTaskOutput(sink logging.TaskOutputSink, req *rpcv1.TaskOutput) (err error) {
switch req.Source {
case rpcv1.TaskOutputSource_TASK_OUTPUT_SOURCE_STDOUT:
writer := sink.StdOut()
_, err = writer.Write(req.Payload)
if syncer, ok := writer.(interface{ Sync() error }); ok {
err = errors.Join(err, syncer.Sync())
}
case rpcv1.TaskOutputSource_TASK_OUTPUT_SOURCE_STDERR:
writer := sink.StdErr()
_, err = writer.Write(req.Payload)
if syncer, ok := writer.(interface{ Sync() error }); ok {
err = errors.Join(err, syncer.Sync())
}
}
return err
}
func (c *containerTask) handleTaskLog(ctx context.Context, taskLog *rpcv1.TaskLog, logger *slog.Logger) {
rec := slog.NewRecord(taskLog.Time.AsTime().Local(), slog.Level(taskLog.Level), taskLog.Message, 0)
for i := range taskLog.Attributes {
@ -145,45 +174,6 @@ func (c *containerTask) handleTaskLog(ctx context.Context, taskLog *rpcv1.TaskLo
_ = logger.Handler().Handle(ctx, rec)
}
func (c *containerTask) handleTaskOutput(result *rpcv1.TaskResult, b buildr.Buildr) (err error) {
if b.Config.Logging.LogToStdErr {
if errOut := result.Stderr; len(errOut) > 0 {
if _, err = os.Stderr.Write(errOut); err != nil {
return err
}
}
if out := result.Stdout; len(out) > 0 {
if _, err = os.Stderr.Write(out); err != nil {
return err
}
}
return nil
}
logFileName := fmt.Sprintf("%s_%s.log", c.moduleWithMeta.Category(), c.moduleWithMeta.Name())
f, err := os.Create(filepath.Join(b.Config.Logging.LogsDirectory, logFileName))
if err != nil {
return err
}
defer func() {
err = errors.Join(err, f.Close())
}()
if errOut := result.Stderr; len(errOut) > 0 {
if _, err = f.Write(errOut); err != nil {
return err
}
}
if out := result.Stdout; len(out) > 0 {
if _, err = f.Write(out); err != nil {
return err
}
}
return nil
}
func (c *containerTask) handleModifiedFiles(ctx context.Context, con containers.Container, modifiedFilesPath, outDir string) error {
return con.CopyFileFromContainer(ctx, modifiedFilesPath, func(_ *tar.Header, reader io.Reader) (err error) {
s2Reader := s2.NewReader(reader)

View file

@ -1,14 +1,11 @@
package execution
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"code.icb4dc0.de/buildr/buildr/internal/logging"
"code.icb4dc0.de/buildr/buildr/modules"
"code.icb4dc0.de/buildr/buildr/modules/buildr"
@ -49,25 +46,21 @@ func NewDefaultExecutionContext(
opts[i].ApplyToDefaultExecutionContext(&execCtx)
}
if execCtx.buildr.Config.Logging.LogToStdErr {
execCtx.taskOutputWriter = newBufferedTaskOutputWriter(os.Stderr)
} else {
logFileName := fmt.Sprintf("%s_%s.log", mod.Category(), mod.Name())
if f, err := os.Create(filepath.Join(buildr.Config.Logging.LogsDirectory, logFileName)); err != nil {
return DefaultExecutionContext{}, err
} else {
execCtx.taskOutputWriter = f
}
outputSink, err := logging.NewTaskOutputSink(buildr.Config.Logging, modules.LogFileNameFormatter(mod))
if err != nil {
return DefaultExecutionContext{}, err
}
execCtx.outputSink = outputSink
return execCtx, nil
}
type DefaultExecutionContext struct {
context.Context
taskOutputWriter io.WriteCloser
mod modules.ModuleWithMeta
buildr buildr.Buildr
outputSink logging.TaskOutputSink
mod modules.ModuleWithMeta
buildr buildr.Buildr
loggerFactory func() *slog.Logger
}
@ -85,11 +78,11 @@ func (d DefaultExecutionContext) ToolsDir() string {
}
func (d DefaultExecutionContext) StdOut() io.Writer {
return d.taskOutputWriter
return d.outputSink.StdOut()
}
func (d DefaultExecutionContext) StdErr() io.Writer {
return d.taskOutputWriter
return d.outputSink.StdErr()
}
func (d DefaultExecutionContext) Buildr() buildr.Buildr {
@ -104,27 +97,5 @@ func (d DefaultExecutionContext) Logger() *slog.Logger {
}
func (d DefaultExecutionContext) Close() error {
return errors.Join(d.taskOutputWriter.Close())
}
func newBufferedTaskOutputWriter(out io.Writer) *bufferedTaskOutputWriter {
return &bufferedTaskOutputWriter{
out: out,
buf: bytes.NewBuffer(nil),
}
}
type bufferedTaskOutputWriter struct {
out io.Writer
buf *bytes.Buffer
}
func (w *bufferedTaskOutputWriter) Write(p []byte) (n int, err error) {
return w.buf.Write(p)
}
func (w *bufferedTaskOutputWriter) Close() error {
_, err := w.out.Write(w.buf.Bytes())
return err
return d.outputSink.Close()
}

9
internal/logging/api.go Normal file
View file

@ -0,0 +1,9 @@
package logging
import "io"
type TaskOutputSink interface {
StdOut() io.Writer
StdErr() io.Writer
Close() error
}

View file

@ -0,0 +1,16 @@
package logging
import (
"io"
"os"
"sync"
)
var stdErrLock sync.Mutex
func AcquireStdErr(transaction func(stdErr io.Writer) error) error {
stdErrLock.Lock()
defer stdErrLock.Unlock()
return transaction(os.Stderr)
}

View file

@ -0,0 +1,105 @@
package logging
import (
"errors"
"io"
"os"
"path/filepath"
"code.icb4dc0.de/buildr/buildr/modules/buildr"
)
type OutputSource string
const (
OutputSourceStdout OutputSource = "stdout"
OutputSourceStderr OutputSource = "stderr"
)
type LogFileNameFormatter interface {
LogFileName(src OutputSource) string
}
type LogFileNameFormatterFunc func(src OutputSource) string
func (f LogFileNameFormatterFunc) LogFileName(src OutputSource) string {
return f(src)
}
func NewTaskOutputSink(loggingConfig buildr.Logging, formatter LogFileNameFormatter) (TaskOutputSink, error) {
stdOutLogFile, err := logFile(loggingConfig, formatter.LogFileName(OutputSourceStdout))
if err != nil {
return nil, err
}
stdErrLogFile, err := logFile(loggingConfig, formatter.LogFileName(OutputSourceStderr))
if err != nil {
return nil, err
}
return fileOutputSink{
logToStdErr: loggingConfig.LogToStdErr,
stdOut: stdOutLogFile,
stdErr: stdErrLogFile,
}, nil
}
var _ TaskOutputSink = (*fileOutputSink)(nil)
type file interface {
Name() string
io.ReadWriter
io.Seeker
io.Closer
}
type fileOutputSink struct {
logToStdErr bool
stdOut file
stdErr file
}
func (f fileOutputSink) StdOut() io.Writer {
return f.stdOut
}
func (f fileOutputSink) StdErr() io.Writer {
return f.stdErr
}
func (f fileOutputSink) Close() error {
if f.logToStdErr {
return errors.Join(
f.copyToStdErrAndClose(f.stdOut),
f.copyToStdErrAndClose(f.stdErr),
)
}
return errors.Join(
f.stdOut.Close(),
f.stdErr.Close(),
)
}
func (f fileOutputSink) copyToStdErrAndClose(src file) error {
return AcquireStdErr(func(stdErr io.Writer) error {
if _, err := src.Seek(0, 0); err != nil {
return err
}
if _, err := io.Copy(stdErr, src); err != nil {
return err
}
return errors.Join(src.Close(), os.Remove(src.Name()))
})
}
func logFile(loggingConfig buildr.Logging, logFileName string) (*os.File, error) {
logDir := loggingConfig.LogsDirectory
if loggingConfig.LogToStdErr {
logDir = os.TempDir()
}
return os.Create(filepath.Join(logDir, logFileName))
}

9
internal/rpc/v1/api.go Normal file
View file

@ -0,0 +1,9 @@
package v1
import (
"google.golang.org/protobuf/proto"
)
type StreamSender[T proto.Message] interface {
Send(msg T) error
}

View file

@ -1,7 +1,6 @@
package v1
import (
"bytes"
"context"
"io"
@ -17,22 +16,29 @@ func NewContainerExecutionContext(
ctx context.Context,
logger *slog.Logger,
workingDir string,
sender StreamSender[*rpcv1.ExecuteTaskResponse],
) *ContainerExecutionContext {
return &ContainerExecutionContext{
Context: ctx,
logger: logger,
workingDir: workingDir,
stdOutBuf: bytes.NewBuffer(nil),
stdErrBuf: bytes.NewBuffer(nil),
stdOutWriter: newTaskOutputWriter(
rpcv1.TaskOutputSource_TASK_OUTPUT_SOURCE_STDOUT,
sender,
),
stdErrWriter: newTaskOutputWriter(
rpcv1.TaskOutputSource_TASK_OUTPUT_SOURCE_STDERR,
sender,
),
}
}
type ContainerExecutionContext struct {
context.Context
logger *slog.Logger
workingDir string
stdOutBuf *bytes.Buffer
stdErrBuf *bytes.Buffer
logger *slog.Logger
workingDir string
stdOutWriter io.Writer
stdErrWriter io.Writer
}
func (c ContainerExecutionContext) WorkingDir() string {
@ -48,18 +54,13 @@ func (c ContainerExecutionContext) ToolsDir() string {
}
func (c ContainerExecutionContext) StdOut() io.Writer {
return c.stdOutBuf
return c.stdOutWriter
}
func (c ContainerExecutionContext) StdErr() io.Writer {
return c.stdErrBuf
return c.stdErrWriter
}
func (c ContainerExecutionContext) Logger() *slog.Logger {
return c.logger
}
func (c ContainerExecutionContext) WriteOutput(result *rpcv1.TaskResult) {
result.Stdout = c.stdOutBuf.Bytes()
result.Stderr = c.stdErrBuf.Bytes()
}

View file

@ -64,12 +64,10 @@ func (e *ExecutorServiceServer) ExecuteTask(server rpcv1.ExecutorService_Execute
logger.Info("Executing module")
go watcher.Watch(ctx)
executionContext := NewContainerExecutionContext(ctx, logger, t.Buildr.Repo.Root)
executionContext := NewContainerExecutionContext(ctx, logger, t.Buildr.Repo.Root, server)
err = mod.Execute(executionContext)
result := new(rpcv1.TaskResult)
executionContext.WriteOutput(result)
if err != nil {
result.Error = err.Error()
}

View file

@ -11,15 +11,15 @@ import (
var _ slog.Handler = (*GrpcExecutorHandler)(nil)
func NewGrpcExecutorHandler(server rpcv1.ExecutorService_ExecuteTaskServer) *GrpcExecutorHandler {
func NewGrpcExecutorHandler(sender StreamSender[*rpcv1.ExecuteTaskResponse]) *GrpcExecutorHandler {
return &GrpcExecutorHandler{
Level: slog.LevelInfo,
server: server,
sender: sender,
}
}
type GrpcExecutorHandler struct {
server rpcv1.ExecutorService_ExecuteTaskServer
sender StreamSender[*rpcv1.ExecuteTaskResponse]
Level slog.Level
group string
attributes []slog.Attr
@ -50,7 +50,7 @@ func (g GrpcExecutorHandler) Handle(_ context.Context, record slog.Record) error
},
}
return g.server.Send(&resp)
return g.sender.Send(&resp)
}
func (g GrpcExecutorHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
@ -59,7 +59,7 @@ func (g GrpcExecutorHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
copy(all[len(g.attributes):], attrs)
return GrpcExecutorHandler{
server: g.server,
sender: g.sender,
Level: g.Level,
group: g.group,
attributes: all,

View file

@ -0,0 +1,38 @@
package v1
import (
"io"
rpcv1 "code.icb4dc0.de/buildr/buildr/internal/generated/rpc/v1"
)
var _ io.Writer = (*taskOutputWriter)(nil)
func newTaskOutputWriter(source rpcv1.TaskOutputSource, sender StreamSender[*rpcv1.ExecuteTaskResponse]) taskOutputWriter {
return taskOutputWriter{
source: source,
sender: sender,
}
}
type taskOutputWriter struct {
source rpcv1.TaskOutputSource
sender StreamSender[*rpcv1.ExecuteTaskResponse]
}
func (t taskOutputWriter) Write(p []byte) (n int, err error) {
err = t.sender.Send(&rpcv1.ExecuteTaskResponse{
Envelope: &rpcv1.ExecuteTaskResponse_TaskOutput{
TaskOutput: &rpcv1.TaskOutput{
Source: t.source,
Payload: p,
},
},
})
if err != nil {
return 0, err
}
return len(p), nil
}

View file

@ -0,0 +1,13 @@
package modules
import (
"fmt"
"code.icb4dc0.de/buildr/buildr/internal/logging"
)
func LogFileNameFormatter(mod ModuleWithMeta) logging.LogFileNameFormatter {
return logging.LogFileNameFormatterFunc(func(src logging.OutputSource) string {
return fmt.Sprintf("%s_%s.%s.log", mod.Category(), mod.Name(), src)
})
}