feat(task): completed container task prototype

This commit is contained in:
Peter 2023-04-18 18:06:13 +02:00
parent ead4832124
commit 0e0207ee64
Signed by: prskr
GPG key ID: C1DB5D2E8DB512F9
38 changed files with 544 additions and 202 deletions

View file

@ -2,6 +2,12 @@ task "script" "buf_generate" {
inline = [
"buf generate"
]
out_dir = buildr.repo.root
container {
image = "docker.io/bufbuild/buf"
}
}
task "script" "go_generate" {
@ -16,10 +22,6 @@ task "script" "go_generate" {
}
task "script" "go_test" {
container {
image = "docker.io/golang:1.20"
}
inline = [
"gotestsum --junitfile results.xml --format pkgname-and-test-fails -- -race -shuffle=on ./..."
]

View file

@ -1,4 +1 @@
{
"gitea_token": "iVL60jOT+Q5tRB+Z:AAAAAAAAAAA=:hHe1bYr+0mGiZkUgEfvsjCXwF+nAQWPq7OA0maawVPBQpEr9O0flCMP6aEn9LcFNYyhInrQdAu0=",
"github_token": "NQilBUAEKWlgRAjB:AAAAAAAAAAA=:rxA7R6ES2DQsQts9BzXxYRKnpSYoiDHNrnxy1aWuxlq6ljK9dW304S8NJWmUf1GOL7K5sC92nHE="
}
{"gitea_token":"iVL60jOT+Q5tRB+Z:AAAAAAAAAAA=:hHe1bYr+0mGiZkUgEfvsjCXwF+nAQWPq7OA0maawVPBQpEr9O0flCMP6aEn9LcFNYyhInrQdAu0=","github_token":"NQilBUAEKWlgRAjB:AAAAAAAAAAA=:rxA7R6ES2DQsQts9BzXxYRKnpSYoiDHNrnxy1aWuxlq6ljK9dW304S8NJWmUf1GOL7K5sC92nHE="}

View file

@ -32,10 +32,7 @@ message TaskResult {
string error = 1;
bytes stdout = 2;
bytes stderr = 3;
}
message TaskCreatedFile {
string paths = 1;
string modified_files_archive_path = 4;
}
message TaskLog {
@ -60,8 +57,7 @@ message ExecuteTaskResponse {
bytes replies_to = 1;
oneof envelope {
TaskResult task_result = 11;
TaskCreatedFile created_file = 12;
TaskLog task_log = 13;
TaskLog task_log = 12;
}
}

View file

@ -7,10 +7,10 @@ managed:
except:
- buf.build/googleapis/googleapis
plugins:
- name: go
- plugin: buf.build/protocolbuffers/go:v1.30.0
out: ./internal/generated/
opt: paths=source_relative
- name: go-grpc
- plugin: buf.build/grpc/go:v1.3.0
out: ./internal/generated/
opt:
- paths=source_relative

4
go.mod
View file

@ -5,6 +5,7 @@ go 1.20
require (
github.com/docker/docker v23.0.1+incompatible
github.com/docker/go-connections v0.4.0
github.com/fsnotify/fsnotify v1.6.0
github.com/google/go-containerregistry v0.14.0
github.com/google/go-github/v50 v50.1.0
github.com/google/uuid v1.3.0
@ -13,6 +14,7 @@ require (
github.com/hanwen/go-fuse/v2 v2.2.0
github.com/hashicorp/hcl/v2 v2.16.2
github.com/jinzhu/copier v0.3.5
github.com/klauspost/compress v1.16.5
github.com/opencontainers/image-spec v1.1.0-rc2
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
@ -40,14 +42,12 @@ require (
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker-credential-helpers v0.7.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.16.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect

4
go.sum
View file

@ -211,8 +211,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=

17
internal/archive/s2.go Normal file
View file

@ -0,0 +1,17 @@
package archive
import (
"errors"
"io"
"github.com/klauspost/compress/s2"
)
func WriteTarS2Encoded(tar Tar, writer io.Writer) (err error) {
s2Writer := s2.NewWriter(writer)
defer func() {
err = errors.Join(err, s2Writer.Close())
}()
return tar.Write(s2Writer)
}

View file

@ -9,10 +9,7 @@ import (
"path/filepath"
)
var (
ErrNotADirectory = errors.New("path is not a directory")
ErrEmptyArchive = errors.New("no files were added to the archive")
)
var ErrEmptyArchive = errors.New("no files were added to the archive")
type Tar struct {
root *archiveNode
@ -31,6 +28,14 @@ func (t *Tar) Write(writer io.Writer) error {
return tarWriter.Close()
}
func (t *Tar) Remove(path string) bool {
if t.root == nil {
return false
}
return t.root.remove(path)
}
func (t *Tar) Add(srcPath, destPath string) error {
if t.root == nil {
t.root = newNode()

View file

@ -18,15 +18,38 @@ type fileToTar struct {
targetFileName string
}
type filesToTar []fileToTar
func (files *filesToTar) Add(toAdd fileToTar) {
ref := *files
ref = append(ref, toAdd)
*files = ref
}
func (files *filesToTar) Remove(toRemove string) bool {
ref := *files
for i := range ref {
if ref[i].targetFileName == toRemove {
ref = append(ref[:i], ref[i+1:]...)
*files = ref
return true
}
}
return false
}
func newNode() *archiveNode {
return &archiveNode{
children: make(map[string]*archiveNode),
files: new(filesToTar),
}
}
type archiveNode struct {
children map[string]*archiveNode
files []fileToTar
files *filesToTar
}
func (n *archiveNode) writeToTar(writer *tar.Writer, parent string) error {
@ -51,8 +74,7 @@ func (n *archiveNode) writeToTar(writer *tar.Writer, parent string) error {
}
}
for i := range n.files {
fileSpec := n.files[i]
for _, fileSpec := range *n.files {
info, err := os.Stat(fileSpec.sourcePath)
if err != nil {
return err
@ -86,6 +108,33 @@ func (n *archiveNode) writeToTar(writer *tar.Writer, parent string) error {
return nil
}
func (n *archiveNode) remove(path string) bool {
if path == "" {
return false
}
split := strings.Split(path, "/")
last := split[len(split)-1]
current := n
for _, segment := range split[:len(split)-1] {
if child, ok := current.children[segment]; ok {
current = child
continue
} else {
return false
}
}
if _, ok := current.children[last]; ok {
delete(current.children, last)
return true
} else {
return current.files.Remove(last)
}
}
func (n *archiveNode) addFile(sourcePath, targetPath string) {
dirPath, fileName := filepath.Split(targetPath)
current := n
@ -98,13 +147,14 @@ func (n *archiveNode) addFile(sourcePath, targetPath string) {
} else {
newNode := &archiveNode{
children: make(map[string]*archiveNode),
files: new(filesToTar),
}
current.children[segment] = newNode
current = newNode
}
}
current.files = append(current.files, fileToTar{
current.files.Add(fileToTar{
sourcePath: sourcePath,
targetFileName: fileName,
})

View file

@ -1,7 +1,9 @@
package containers
import (
"archive/tar"
"context"
"io"
"github.com/docker/docker/api/types"
"github.com/docker/go-connections/nat"
@ -14,14 +16,22 @@ type Shutdowner interface {
type Inspector interface {
ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error)
}
type ShutdownerInspector interface {
Shutdowner
Inspector
type ContainerFileCopier interface {
CopyFromContainer(ctx context.Context, containerID, srcPath string) (io.ReadCloser, types.ContainerPathStat, error)
}
type Client interface {
Inspector
ContainerFileCopier
}
type CopyContainerFileHandler func(header *tar.Header, reader io.Reader) error
type Container interface {
ID() string
NetworkID() string
MappedPort(ctx context.Context, port nat.Port) (hostIp, hostPort string, err error)
CopyFileFromContainer(ctx context.Context, path string, f CopyContainerFileHandler) error
Shutdown(ctx context.Context) error
}

View file

@ -1,6 +1,7 @@
package containers
import (
"archive/tar"
"context"
"errors"
"fmt"
@ -19,12 +20,34 @@ type containerRef struct {
inspectOnce sync.Once
info types.ContainerJSON
shutdowner Shutdowner
inspector Inspector
client Client
resourceName string
containerId string
networkId string
}
func (c *containerRef) CopyFileFromContainer(ctx context.Context, path string, f CopyContainerFileHandler) (err error) {
reader, _, err := c.client.CopyFromContainer(ctx, c.containerId, path)
if err != nil {
return err
}
defer func() {
err = errors.Join(err, reader.Close())
}()
tarReader := tar.NewReader(reader)
var header *tar.Header
for header, err = tarReader.Next(); err == nil; header, err = tarReader.Next() {
if err := f(header, tarReader); err != nil {
return err
}
}
return nil
}
func (c *containerRef) NetworkID() string {
return c.networkId
}
@ -35,7 +58,7 @@ func (c *containerRef) ID() string {
func (c *containerRef) MappedPort(ctx context.Context, port nat.Port) (hostIp, hostPort string, err error) {
c.inspectOnce.Do(func() {
c.info, err = c.inspector.ContainerInspect(ctx, c.containerId)
c.info, err = c.client.ContainerInspect(ctx, c.containerId)
})
if err != nil {

View file

@ -31,6 +31,7 @@ type BuildRContainerSpec struct {
Image string
RepoRoot string
ToolsDir string
User string
Content map[string]string
}
@ -45,7 +46,7 @@ func (o *Orchestrator) BuildRContainer(ctx context.Context, spec BuildRContainer
cli := (*client.Client)(o)
con := &containerRef{
shutdowner: o,
inspector: cli,
client: cli,
resourceName: fmt.Sprintf("build-%s-%s", spec.ModuleName, spec.ID),
}
@ -59,6 +60,7 @@ func (o *Orchestrator) BuildRContainer(ctx context.Context, spec BuildRContainer
conSpec := ContainerSpec{
Image: spec.Image,
User: spec.User,
ExposedPorts: []string{"3000/tcp"},
Env: map[string]string{
"BUILDR_GRPC_SERVE_ADDRESS": "0.0.0.0:3000",
@ -77,6 +79,12 @@ func (o *Orchestrator) BuildRContainer(ctx context.Context, spec BuildRContainer
return nil, nil, err
}
if spec.Content == nil {
spec.Content = map[string]string{
spec.RepoRoot: ".",
}
}
spec.Content[spec.ToolsDir] = "/opt/buildr/tools/"
if buildrBin, err := os.Executable(); err != nil {
@ -212,6 +220,10 @@ func (o *Orchestrator) createContainer(ctx context.Context, spec ContainerSpec,
Cmd: spec.Cmd,
}
if spec.User != "" {
containerConfig.User = spec.User
}
for k, v := range spec.Env {
containerConfig.Env = append(containerConfig.Env, k+"="+v)
}

View file

@ -27,6 +27,7 @@ func (c *ContainerPlatform) Platform() specsv1.Platform {
type ContainerSpec struct {
ContainerPlatform
Image string
User string
WorkingDir string
ExposedPorts []string
Env map[string]string

View file

@ -9,8 +9,8 @@ import (
)
type TaskProvider interface {
CanProvide(m modules.Module) bool
Create(m modules.Module) (Task, error)
CanProvide(m modules.ModuleWithMeta) bool
Create(m modules.ModuleWithMeta) (Task, error)
}
type Task interface {

View file

@ -1 +0,0 @@
package container

View file

@ -20,22 +20,17 @@ type taskProvider struct {
orchestrator *containers.Orchestrator
}
func (taskProvider) CanProvide(m modules.Module) bool {
if specer, ok := m.(modules.ContainerSpecer); ok && specer.Spec() != nil {
return true
}
return false
func (taskProvider) CanProvide(m modules.ModuleWithMeta) bool {
return m.ContainerSpec() != nil
}
func (p taskProvider) Create(m modules.Module) (execution.Task, error) {
if specer, ok := m.(modules.ContainerSpecer); !ok || specer.Spec() == nil {
return nil, fmt.Errorf("module %v doesn't have a container spec - should be handled differently", m.ModuleName())
func (p taskProvider) Create(m modules.ModuleWithMeta) (execution.Task, error) {
if m.ContainerSpec() == nil {
return nil, fmt.Errorf("module %v doesn't have a container spec - should be handled differently", m.Name())
} else {
return &containerTask{
module: m,
orchestrator: p.orchestrator,
specer: specer,
moduleWithMeta: m,
orchestrator: p.orchestrator,
}, nil
}
}

View file

@ -1,14 +1,17 @@
package container
import (
"archive/tar"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"github.com/klauspost/compress/s2"
"golang.org/x/exp/slog"
"code.icb4dc0.de/buildr/buildr/internal/containers"
@ -20,10 +23,9 @@ import (
type containerTask struct {
execution.TaskDependencies
once sync.Once
orchestrator *containers.Orchestrator
module modules.Module
specer modules.ContainerSpecer
once sync.Once
orchestrator *containers.Orchestrator
moduleWithMeta modules.ModuleWithMeta
}
func (c *containerTask) Execute(ctx context.Context, b buildr.Buildr) (err error) {
@ -39,19 +41,25 @@ func (c *containerTask) doExecute(ctx context.Context, b buildr.Buildr) (err err
return err
}
if err := os.MkdirAll(c.moduleWithMeta.OutDir(), 0o755); err != nil {
return err
}
logger := slog.
Default().
With(
slog.String("module_name", c.module.ModuleName()),
slog.String("module_type", c.module.Category().String()),
slog.String("module_name", c.moduleWithMeta.Name()),
slog.String("module_type", c.moduleWithMeta.Category().String()),
)
containerSpec := c.moduleWithMeta.ContainerSpec()
spec := containers.BuildRContainerSpec{
ID: c.module.ID(),
ModuleName: c.module.ModuleName(),
Image: c.specer.Spec().Image,
ID: c.moduleWithMeta.ID(),
ModuleName: c.moduleWithMeta.Name(),
Image: containerSpec.Image,
User: containerSpec.User,
RepoRoot: b.Repo.Root,
Content: c.module.InputMappings(),
Content: c.moduleWithMeta.InputMappings(),
ToolsDir: b.Config.ToolsDirectory,
}
@ -71,9 +79,7 @@ func (c *containerTask) doExecute(ctx context.Context, b buildr.Buildr) (err err
return err
}
c.specer.ClearSpec()
rawModule, err := json.Marshal(c.module)
rawModule, err := json.Marshal(c.moduleWithMeta.Unwrap())
if err != nil {
return err
}
@ -83,13 +89,13 @@ func (c *containerTask) doExecute(ctx context.Context, b buildr.Buildr) (err err
StartTask: &rpcv1.StartTaskRequest{
Buildr: &rpcv1.Buildr{
Repo: &rpcv1.Buildr_Repo{
Root: fmt.Sprintf("/work/%s", c.module.ID()),
Root: fmt.Sprintf("/work/%s", c.moduleWithMeta.ID()),
},
},
Reference: &rpcv1.ModuleReference{
TaskId: c.module.ID(),
ModuleType: c.module.Category().String(),
ModuleName: c.module.Type(),
TaskId: c.moduleWithMeta.ID(),
ModuleType: c.moduleWithMeta.Category().String(),
ModuleName: c.moduleWithMeta.Type(),
},
RawTask: rawModule,
},
@ -119,6 +125,12 @@ func (c *containerTask) doExecute(ctx context.Context, b buildr.Buildr) (err err
return fmt.Errorf("failed to execute task: %s", msg.TaskResult.Error)
}
if msg.TaskResult.ModifiedFilesArchivePath != "" {
if err := c.handleModifiedFiles(ctx, con, msg.TaskResult.ModifiedFilesArchivePath, c.moduleWithMeta.OutDir()); err != nil {
return err
}
}
return nil
}
}
@ -148,7 +160,7 @@ func (c *containerTask) handleTaskOutput(result *rpcv1.TaskResult, b buildr.Buil
return nil
}
logFileName := fmt.Sprintf("%s_%s.log", c.module.Category(), c.module.ModuleName())
logFileName := fmt.Sprintf("%s_%s.log", c.moduleWithMeta.Category(), c.moduleWithMeta.Name())
f, err := os.Create(filepath.Join(b.Config.Logging.LogsDirectory, logFileName))
if err != nil {
return err
@ -171,3 +183,40 @@ func (c *containerTask) handleTaskOutput(result *rpcv1.TaskResult, b buildr.Buil
return nil
}
func (c *containerTask) handleModifiedFiles(ctx context.Context, con containers.Container, modifiedFilesPath, outDir string) error {
return con.CopyFileFromContainer(ctx, modifiedFilesPath, func(_ *tar.Header, reader io.Reader) (err error) {
s2Reader := s2.NewReader(reader)
tarReader := tar.NewReader(s2Reader)
var header *tar.Header
for header, err = tarReader.Next(); err == nil; header, err = tarReader.Next() {
if header.Typeflag == tar.TypeDir {
if err := os.MkdirAll(filepath.Join(outDir, header.Name), os.FileMode(header.Mode|0o700)); err != nil {
return err
}
continue
}
if header.Typeflag != tar.TypeReg {
continue
}
outFile, err := os.OpenFile(filepath.Join(outDir, header.Name), os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(header.Mode))
if err != nil {
return err
}
_, err = io.Copy(outFile, tarReader)
if err = errors.Join(err, outFile.Close()); err != nil {
return err
}
}
if errors.Is(err, io.EOF) {
return nil
}
return err
})
}

View file

@ -35,7 +35,7 @@ func WithLoggerFactory(factory func() *slog.Logger) defaultExecutionContextOptio
func NewDefaultExecutionContext(
ctx context.Context,
mod modules.Module,
mod modules.ModuleWithMeta,
buildr buildr.Buildr,
opts ...DefaultExecutionContextOption,
) (DefaultExecutionContext, error) {
@ -52,7 +52,7 @@ func NewDefaultExecutionContext(
if execCtx.buildr.Config.Logging.LogToStdErr {
execCtx.taskOutputWriter = newBufferedTaskOutputWriter(os.Stderr)
} else {
logFileName := fmt.Sprintf("%s_%s.log", mod.Category(), mod.ModuleName())
logFileName := fmt.Sprintf("%s_%s.log", mod.Category(), mod.Name())
if f, err := os.Create(filepath.Join(buildr.Config.Logging.LogsDirectory, logFileName)); err != nil {
return DefaultExecutionContext{}, err
} else {
@ -66,7 +66,7 @@ func NewDefaultExecutionContext(
type DefaultExecutionContext struct {
context.Context
taskOutputWriter io.WriteCloser
mod modules.Module
mod modules.ModuleWithMeta
buildr buildr.Buildr
loggerFactory func() *slog.Logger
@ -77,7 +77,7 @@ func (d DefaultExecutionContext) WorkingDir() string {
}
func (d DefaultExecutionContext) OutDir() string {
return filepath.Join(d.buildr.Config.OutDirectory, d.mod.Category().String(), d.mod.ModuleName())
return filepath.Join(d.buildr.Config.OutDirectory, d.mod.Category().String(), d.mod.Name())
}
func (d DefaultExecutionContext) ToolsDir() string {

View file

@ -62,7 +62,7 @@ func (f *TaskFactory) TaskForModule(id string, repo *modules.Repository) (Task,
}
if t == nil {
return nil, fmt.Errorf("no provider available to handle module %s", module.ModuleName())
return nil, fmt.Errorf("no provider available to handle module %s", module.Name())
}
f.knownTasks[id] = t

View file

@ -15,14 +15,13 @@ func Provider() execution.TaskProvider {
type taskProvider struct{}
func (taskProvider) CanProvide(m modules.Module) bool {
specer, ok := m.(modules.ContainerSpecer)
return !ok || specer.Spec() == nil
func (taskProvider) CanProvide(m modules.ModuleWithMeta) bool {
return m.ContainerSpec() == nil
}
func (taskProvider) Create(m modules.Module) (execution.Task, error) {
if specer, ok := m.(modules.ContainerSpecer); ok && specer.Spec() != nil {
return nil, fmt.Errorf("module %v has a container spec - should be handled differently", m.ModuleName())
func (taskProvider) Create(m modules.ModuleWithMeta) (execution.Task, error) {
if m.ContainerSpec() != nil {
return nil, fmt.Errorf("module %v has a container spec - should be handled differently", m.Name())
} else {
return &localTask{
module: m,

View file

@ -21,7 +21,7 @@ var _ execution.Task = (*localTask)(nil)
type localTask struct {
execution.TaskDependencies
once sync.Once
module modules.Module
module modules.ModuleWithMeta
}
func (t *localTask) Execute(ctx context.Context, b buildr.Buildr) (err error) {
@ -43,6 +43,10 @@ func (t *localTask) doExecute(ctx context.Context, b buildr.Buildr) error {
}
}()
if err := os.MkdirAll(t.module.OutDir(), 0o755); err != nil {
return err
}
if mappings := t.module.InputMappings(); len(mappings) > 0 {
return t.executeIsolated(ctx, b)
}
@ -64,7 +68,7 @@ func (t *localTask) executeIsolated(ctx context.Context, b buildr.Buildr) error
_ = os.RemoveAll(workDir)
}()
outDir := t.module.GetOutputDir()
outDir := t.module.OutDir()
mappings := t.module.InputMappings()
layers := make(storage.Layers, 0, len(mappings))
@ -119,14 +123,14 @@ func (t *localTask) executeIsolated(ctx context.Context, b buildr.Buildr) error
return grp.Wait()
}
func (t *localTask) executionContextFor(ctx context.Context, b buildr.Buildr, m modules.Module) (execution.DefaultExecutionContext, error) {
func (t *localTask) executionContextFor(ctx context.Context, b buildr.Buildr, m modules.ModuleWithMeta) (execution.DefaultExecutionContext, error) {
return execution.NewDefaultExecutionContext(
ctx,
m,
b,
execution.WithLoggerFactory(func() *slog.Logger {
return slog.Default().With(
slog.String("module_name", m.ModuleName()),
slog.String("module_name", m.Name()),
slog.String("module_type", m.Category().String()),
)
}),

View file

@ -77,11 +77,13 @@ func initBlock(body *hclsyntax.Body, moduleType modules.ModuleCategory, moduleNa
},
}
body.Attributes["out_dir"] = &hclsyntax.Attribute{
Name: "out_dir",
Expr: &hclsyntax.LiteralValueExpr{
Val: cty.StringVal(filepath.Join(outDir, moduleType.String(), moduleName)),
},
if _, ok := body.Attributes["out_dir"]; !ok {
body.Attributes["out_dir"] = &hclsyntax.Attribute{
Name: "out_dir",
Expr: &hclsyntax.LiteralValueExpr{
Val: cty.StringVal(filepath.Join(outDir, moduleType.String(), moduleName)),
},
}
}
return body

View file

@ -94,8 +94,8 @@ func (s RawSpec) buildParsingInventory(
return blockGroupParsingSpecs, nil
}
func (s RawSpec) parseBlocksToModules(specs []blockParsingSpec, registry *modules.TypeRegistry, evalCtx *hcl.EvalContext) ([]modules.Module, error) {
parsedModules := make([]modules.Module, 0, len(specs))
func (s RawSpec) parseBlocksToModules(specs []blockParsingSpec, registry *modules.TypeRegistry, evalCtx *hcl.EvalContext) ([]modules.ModuleWithMeta, error) {
parsedModules := make([]modules.ModuleWithMeta, 0, len(specs))
for i := range specs {
spec := specs[i]

View file

@ -4,16 +4,21 @@ import (
"context"
"errors"
"io"
"os"
"golang.org/x/exp/slog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"code.icb4dc0.de/buildr/buildr/internal/archive"
rpcv1 "code.icb4dc0.de/buildr/buildr/internal/generated/rpc/v1"
"code.icb4dc0.de/buildr/buildr/modules"
)
var _ rpcv1.ExecutorServiceServer = (*ExecutorServiceServer)(nil)
var (
_ rpcv1.ExecutorServiceServer = (*ExecutorServiceServer)(nil)
ErrExecutionCompleted = errors.New("execution completed")
)
func NewExecutorServiceServer(registry *modules.TypeRegistry) rpcv1.ExecutorServiceServer {
return &ExecutorServiceServer{
@ -45,12 +50,19 @@ func (e *ExecutorServiceServer) ExecuteTask(server rpcv1.ExecutorService_Execute
case *rpcv1.ExecuteTaskRequest_StartTask:
t := unwrapped.StartTask
watcher, err := newFSWatcher(logger, t.Buildr.Repo.Root)
if err != nil {
logger.Error("Failed to start FS watcher", slog.String("err", err.Error()))
return status.Error(codes.Internal, err.Error())
}
mod, err := e.registry.CreateFromJSON(modules.ModuleCategory(t.GetReference().GetModuleType()), t.GetReference().GetModuleName(), t.GetRawTask())
if err != nil {
logger.Error("Failed to unmarshal module from JSON", slog.String("err", err.Error()))
return status.Error(codes.NotFound, err.Error())
} else {
logger.Info("Executing module")
go watcher.Watch(ctx)
executionContext := NewContainerExecutionContext(ctx, logger, t.Buildr.Repo.Root)
err = mod.Execute(executionContext)
@ -62,6 +74,12 @@ func (e *ExecutorServiceServer) ExecuteTask(server rpcv1.ExecutorService_Execute
result.Error = err.Error()
}
cancel(ErrExecutionCompleted)
if err := addModifiedFilesToResult(result, logger, watcher); err != nil {
return err
}
_ = server.Send(&rpcv1.ExecuteTaskResponse{
Envelope: &rpcv1.ExecuteTaskResponse_TaskResult{
TaskResult: result,
@ -73,3 +91,27 @@ func (e *ExecutorServiceServer) ExecuteTask(server rpcv1.ExecutorService_Execute
return nil
}
func addModifiedFilesToResult(result *rpcv1.TaskResult, logger *slog.Logger, watcher *fsWatcher) error {
tmpFile, err := os.CreateTemp(os.TempDir(), "buildr-modified-files-*.tar.s2")
if err != nil {
logger.Error("Failed to create temporary file", slog.String("err", err.Error()))
return status.Error(codes.Internal, err.Error())
}
if err := watcher.WriteTo(tmpFile); err != nil {
if errors.Is(err, archive.ErrEmptyArchive) {
return nil
}
logger.Error("Failed to compress modified files", slog.String("err", err.Error()))
return status.Error(codes.Internal, err.Error())
}
if err := tmpFile.Close(); err != nil {
logger.Error("Failed to close temporary file", slog.String("err", err.Error()))
return status.Error(codes.Internal, err.Error())
}
result.ModifiedFilesArchivePath = tmpFile.Name()
return nil
}

View file

@ -0,0 +1,116 @@
package v1
import (
"context"
"io"
"io/fs"
"path/filepath"
"github.com/fsnotify/fsnotify"
"golang.org/x/exp/slog"
"code.icb4dc0.de/buildr/buildr/internal/archive"
)
func newFSWatcher(logger *slog.Logger, workingDir string) (*fsWatcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
err = filepath.WalkDir(workingDir, func(path string, d fs.DirEntry, err error) error {
if d.IsDir() {
return watcher.Add(path)
}
return nil
})
if err != nil {
return nil, err
}
return &fsWatcher{
logger: logger,
watcher: watcher,
workingDir: workingDir,
}, nil
}
type fsWatcher struct {
logger *slog.Logger
watcher *fsnotify.Watcher
archive archive.Tar
workingDir string
}
func (w *fsWatcher) WriteTo(writer io.Writer) error {
return archive.WriteTarS2Encoded(w.archive, writer)
}
func (w *fsWatcher) Watch(ctx context.Context) {
defer func() {
if err := w.watcher.Close(); err != nil {
w.logger.Error("Failed to close watcher", slog.String("err", err.Error()))
}
}()
for {
select {
case <-ctx.Done():
w.logger.Debug("Watcher is closing")
return
case err, ok := <-w.watcher.Errors:
if !ok {
return
}
w.logger.Error("Error occurred watching file changes", slog.String("err", err.Error()))
case event, ok := <-w.watcher.Events:
if !ok {
return
}
switch event.Op {
case fsnotify.Create, fsnotify.Write, fsnotify.Chmod:
relativeToWorkingDir, err := filepath.Rel(w.workingDir, event.Name)
if err != nil {
w.logger.Error(
"Failed to make path relative to working directory",
slog.String("path", event.Name),
slog.String("err", err.Error()),
)
continue
}
if err := w.archive.Add(event.Name, relativeToWorkingDir); err != nil {
w.logger.Error(
"Failed to add file to archive",
slog.String("path", relativeToWorkingDir),
slog.String("err", err.Error()),
)
continue
}
case fsnotify.Rename:
w.logger.Warn("Received rename event - ignored for now")
case fsnotify.Remove:
relativeToWorkingDir, err := filepath.Rel(w.workingDir, event.Name)
if err != nil {
w.logger.Error(
"Failed to make path relative to working directory",
slog.String("path", event.Name),
slog.String("err", err.Error()),
)
continue
}
if ok := w.archive.Remove(relativeToWorkingDir); !ok {
w.logger.Warn(
"Removed file is not part of archive - ignored for now",
slog.String("path", relativeToWorkingDir),
)
continue
}
}
}
}
}

View file

@ -4,6 +4,7 @@ import (
"context"
"io"
"github.com/hashicorp/hcl/v2"
"golang.org/x/exp/slog"
)
@ -17,30 +18,32 @@ type ExecutionContext interface {
Logger() *slog.Logger
}
type Module interface {
type ModuleWithMeta interface {
Module
Unwrap() Module
ID() string
Category() ModuleCategory
Type() string
ModuleName() string
Name() string
Dependencies() []string
GetOutputDir() string
InputMappings() map[string]string
Execute(ctx ExecutionContext) error
OutDir() string
ContainerSpec() *ContainerSpec
UnmarshalHCL(body hcl.Body, ctx *hcl.EvalContext) hcl.Diagnostics
}
type ContainerSpecer interface {
Spec() *ContainerSpec
ClearSpec()
type Module interface {
Execute(ctx ExecutionContext) error
Category() ModuleCategory
Type() string
}
type ModuleFactory interface {
Create() Module
Create() ModuleWithMeta
}
var _ ModuleFactory = (*ModuleFactoryFunc)(nil)
type ModuleFactoryFunc func() Module
type ModuleFactoryFunc func() ModuleWithMeta
func (f ModuleFactoryFunc) Create() Module {
func (f ModuleFactoryFunc) Create() ModuleWithMeta {
return f()
}

View file

@ -1,29 +1,80 @@
package modules
type BaseModule struct {
Id string `hcl:"id,optional"`
OutputDir string `hcl:"out_dir,optional"`
Name string `hcl:"name,optional"`
import (
"encoding/json"
"reflect"
"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/gohcl"
)
var (
_ ModuleWithMeta = (*Metadata[Module])(nil)
_ json.Unmarshaler = (*Metadata[Module])(nil)
)
type ContainerSpec struct {
Image string `hcl:"image"`
User string `hcl:"user,optional"`
}
type Metadata[T Module] struct {
Module T `hcl:",remain"`
Id string `hcl:"id"`
OutputDir string `hcl:"out_dir"`
ModuleName string `hcl:"name"`
Deps []string `hcl:"depends_on,optional"`
InputMapping map[string]string `hcl:"input_mapping,optional"`
Container *ContainerSpec `hcl:"container,block"`
}
func (m BaseModule) ModuleName() string {
return m.Name
func (m *Metadata[T]) UnmarshalJSON(bytes []byte) error {
if reflect.ValueOf(m.Module).Kind() != reflect.Pointer {
return json.Unmarshal(bytes, &m.Module)
}
return json.Unmarshal(bytes, m.Module)
}
func (m BaseModule) ID() string {
return m.Id
func (m *Metadata[T]) UnmarshalHCL(body hcl.Body, ctx *hcl.EvalContext) hcl.Diagnostics {
return gohcl.DecodeBody(body, ctx, m)
}
func (m BaseModule) Dependencies() []string {
return m.Deps
func (m *Metadata[T]) Execute(ctx ExecutionContext) error {
return m.Module.Execute(ctx)
}
func (m BaseModule) InputMappings() map[string]string {
func (m *Metadata[T]) Category() ModuleCategory {
return m.Module.Category()
}
func (m *Metadata[T]) Type() string {
return m.Module.Type()
}
func (m *Metadata[T]) Unwrap() Module {
return m.Module
}
func (m *Metadata[T]) InputMappings() map[string]string {
return m.InputMapping
}
func (m BaseModule) GetOutputDir() string {
func (m *Metadata[T]) OutDir() string {
return m.OutputDir
}
func (m *Metadata[T]) Dependencies() []string {
return m.Deps
}
func (m *Metadata[T]) ID() string {
return m.Id
}
func (m *Metadata[T]) Name() string {
return m.ModuleName
}
func (m *Metadata[T]) ContainerSpec() *ContainerSpec {
return m.Container
}

View file

@ -15,14 +15,13 @@ const defaultArgsLength = 6
var _ modules.Module = (*GoBuild)(nil)
type GoBuild struct {
modules.ContainerBaseModule `hcl:",remain"`
Binary string `hcl:"binary"`
Main string `hcl:"main"`
GoOS string `hcl:"goos"`
GoArch string `hcl:"goarch"`
Flags []string `hcl:"flags,optional"`
LdFlags []string `hcl:"ldflags,optional"`
Env map[string]string `hcl:"environment,optional"`
Binary string `hcl:"binary"`
Main string `hcl:"main"`
GoOS string `hcl:"goos"`
GoArch string `hcl:"goarch"`
Flags []string `hcl:"flags,optional"`
LdFlags []string `hcl:"ldflags,optional"`
Env map[string]string `hcl:"environment,optional"`
}
func (g GoBuild) Type() string {

View file

@ -5,7 +5,7 @@ import (
)
var Registration = modules.RegistrationFunc(func(registry *modules.TypeRegistry) {
registry.RegisterModule(modules.ModuleTypeBuild, "go_build", modules.ModuleFactoryFunc(func() modules.Module {
return new(GoBuild)
registry.RegisterModule(modules.ModuleFactoryFunc(func() modules.ModuleWithMeta {
return new(modules.Metadata[GoBuild])
}))
})

View file

@ -1,20 +0,0 @@
package modules
var _ ContainerSpecer = (*ContainerBaseModule)(nil)
type ContainerBaseModule struct {
BaseModule `hcl:",remain"`
Container *ContainerSpec `hcl:"container,block"`
}
func (c *ContainerBaseModule) ClearSpec() {
c.Container = nil
}
func (c *ContainerBaseModule) Spec() *ContainerSpec {
return c.Container
}
type ContainerSpec struct {
Image string `hcl:"image"`
}

View file

@ -29,7 +29,6 @@ var (
)
type ContainerImage struct {
modules.BaseModule `hcl:",remain"`
BaseImage string `hcl:"base_image"`
ImageName string `hcl:"image_name"`
Tags []string `hcl:"tags"`
@ -41,19 +40,20 @@ type ContainerImage struct {
PublishToRegistry bool `hcl:"publish_to_registry,optional"`
RegistryCredentials []RegistryAuth `hcl:"registry_auth,block"`
cache sync.Map
cache *sync.Map
publishers []Publisher
}
func (o *ContainerImage) Type() string {
func (o ContainerImage) Type() string {
return "container_image"
}
func (o *ContainerImage) Category() modules.ModuleCategory {
func (o ContainerImage) Category() modules.ModuleCategory {
return modules.ModuleTypePackage
}
func (o *ContainerImage) Execute(ctx modules.ExecutionContext) error {
func (o ContainerImage) Execute(ctx modules.ExecutionContext) error {
o.cache = new(sync.Map)
logger := ctx.Logger()
if err := o.initPublishers(logger); err != nil {
@ -162,7 +162,7 @@ func (o *ContainerImage) Execute(ctx modules.ExecutionContext) error {
return nil
}
func (o *ContainerImage) initPublishers(logger *slog.Logger) error {
func (o ContainerImage) initPublishers(logger *slog.Logger) error {
if o.PublishToDaemon {
cli, err := client.NewClientWithOpts(
client.WithHostFromEnv(),
@ -188,7 +188,7 @@ func (o *ContainerImage) initPublishers(logger *slog.Logger) error {
return nil
}
func (o *ContainerImage) buildImage(ctx modules.ExecutionContext, base v1.Image, platform *v1.Platform) (v1.Image, error) {
func (o ContainerImage) buildImage(ctx modules.ExecutionContext, base v1.Image, platform *v1.Platform) (v1.Image, error) {
// Layers should be typed to match the underlying image, since some
// registries reject mixed-type layers.
var layerMediaType types.MediaType
@ -237,7 +237,7 @@ func (o *ContainerImage) buildImage(ctx modules.ExecutionContext, base v1.Image,
Layer: layer,
History: v1.History{
Author: "buildr",
CreatedBy: fmt.Sprintf("oci_image %s", o.Name),
CreatedBy: fmt.Sprintf("oci_image"),
Created: v1.Time{Time: defaultCreationTime},
},
})
@ -266,7 +266,7 @@ func (o *ContainerImage) buildImage(ctx modules.ExecutionContext, base v1.Image,
return mutate.ConfigFile(withContent, cfg)
}
func (o *ContainerImage) tarFiles(outWriter io.Writer, cwd string) error {
func (o ContainerImage) tarFiles(outWriter io.Writer, cwd string) error {
tw := tar.NewWriter(outWriter)
defer tw.Close()
@ -278,7 +278,7 @@ func (o *ContainerImage) tarFiles(outWriter io.Writer, cwd string) error {
return tree.writeToTar(tw, "/")
}
func (o *ContainerImage) platformMatches(platform *v1.Platform) error {
func (o ContainerImage) platformMatches(platform *v1.Platform) error {
parsed, err := v1.ParsePlatform(o.Platform)
if err != nil {
return err
@ -291,7 +291,7 @@ func (o *ContainerImage) platformMatches(platform *v1.Platform) error {
return fmt.Errorf("platform %s does not satisfy %s", platform.String(), o.Platform)
}
func (o *ContainerImage) getBaseImage(ctx context.Context, logger *slog.Logger, imageRef string) (name.Reference, imageOrIndex, error) {
func (o ContainerImage) getBaseImage(ctx context.Context, logger *slog.Logger, imageRef string) (name.Reference, imageOrIndex, error) {
ref, err := name.ParseReference(imageRef, name.WithDefaultRegistry("docker.io"))
if err != nil {
return nil, nil, fmt.Errorf("parsing base image (%q): %w", imageRef, err)

View file

@ -6,7 +6,7 @@ import (
)
var Registration = modules.RegistrationFunc(func(registry *modules.TypeRegistry) {
registry.RegisterModule(modules.ModuleTypePackage, "container_image", modules.ModuleFactoryFunc(func() modules.Module {
return new(ociimg.ContainerImage)
registry.RegisterModule(modules.ModuleFactoryFunc(func() modules.ModuleWithMeta {
return new(modules.Metadata[ociimg.ContainerImage])
}))
})

View file

@ -6,7 +6,6 @@ import (
"fmt"
"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/gohcl"
"code.icb4dc0.de/buildr/buildr/internal/logging"
)
@ -58,24 +57,25 @@ func (r *TypeRegistry) CreateFromHCL(
moduleType string,
body hcl.Body,
hclCtx *hcl.EvalContext,
) (Module, error) {
) (ModuleWithMeta, error) {
s := specOf(moduleCategory, moduleType)
if m, ok := r.registrations[s]; ok {
module := m.Create()
diags := gohcl.DecodeBody(body, hclCtx, module)
if diags.HasErrors() {
meta := m.Create()
if diags := meta.UnmarshalHCL(body, hclCtx); diags.HasErrors() {
logging.Diagnostics(diags, nil)
return nil, diags
}
return module, nil
return meta, nil
} else {
return nil, fmt.Errorf("%w: %s/%s", ErrNoSuchModule, moduleCategory, moduleType)
}
}
func (r *TypeRegistry) RegisterModule(moduleType ModuleCategory, moduleName string, factory ModuleFactory) {
s := specOf(moduleType, moduleName)
func (r *TypeRegistry) RegisterModule(factory ModuleFactory) {
m := factory.Create()
s := specOf(m.Category(), m.Type())
if _, ok := r.registrations[s]; !ok {
r.registrations[s] = factory
}

View file

@ -5,39 +5,39 @@ import "code.icb4dc0.de/buildr/buildr/modules/buildr"
type Repository struct {
Buildr buildr.Buildr
modulesById map[string]Module
moduleByName map[moduleSpec]Module
modulesById map[string]ModuleWithMeta
moduleByName map[moduleSpec]ModuleWithMeta
}
func (s *Repository) RegisterModules(modules ...Module) {
func (s *Repository) RegisterModules(modules ...ModuleWithMeta) {
if s.moduleByName == nil {
s.moduleByName = make(map[moduleSpec]Module, len(modules))
s.moduleByName = make(map[moduleSpec]ModuleWithMeta, len(modules))
}
if s.modulesById == nil {
s.modulesById = make(map[string]Module, len(modules))
s.modulesById = make(map[string]ModuleWithMeta, len(modules))
}
for i := range modules {
module := modules[i]
s.moduleByName[specOf(module.Category(), module.ModuleName())] = module
s.moduleByName[specOf(module.Category(), module.Name())] = module
s.modulesById[module.ID()] = module
}
}
func (s *Repository) Tools() map[string]Module {
func (s *Repository) Tools() map[string]ModuleWithMeta {
return s.ModulesByType(ModuleTypeTool)
}
func (s *Repository) Tasks() map[string]Module {
func (s *Repository) Tasks() map[string]ModuleWithMeta {
return s.ModulesByType(ModuleTypeTask)
}
func (s *Repository) Builds() map[string]Module {
func (s *Repository) Builds() map[string]ModuleWithMeta {
return s.ModulesByType(ModuleTypeBuild)
}
func (s *Repository) ModuleById(id string) Module {
func (s *Repository) ModuleById(id string) ModuleWithMeta {
module, ok := s.modulesById[id]
if !ok {
return nil
@ -46,7 +46,7 @@ func (s *Repository) ModuleById(id string) Module {
return module
}
func (s *Repository) Module(moduleType ModuleCategory, moduleName string) Module {
func (s *Repository) Module(moduleType ModuleCategory, moduleName string) ModuleWithMeta {
module, ok := s.moduleByName[specOf(moduleType, moduleName)]
if !ok {
return nil
@ -55,8 +55,8 @@ func (s *Repository) Module(moduleType ModuleCategory, moduleName string) Module
return module
}
func (s *Repository) ModulesByType(moduleType ModuleCategory) map[string]Module {
out := make(map[string]Module, len(s.moduleByName))
func (s *Repository) ModulesByType(moduleType ModuleCategory) map[string]ModuleWithMeta {
out := make(map[string]ModuleWithMeta, len(s.moduleByName))
for spec, module := range s.moduleByName {
if spec.TypeName == moduleType {
out[spec.ModuleName] = module

View file

@ -5,7 +5,7 @@ import (
)
var Registration = modules.RegistrationFunc(func(registry *modules.TypeRegistry) {
registry.RegisterModule("task", "script", modules.ModuleFactoryFunc(func() modules.Module {
return new(ScriptTask)
registry.RegisterModule(modules.ModuleFactoryFunc(func() modules.ModuleWithMeta {
return new(modules.Metadata[ScriptTask])
}))
})

View file

@ -17,12 +17,11 @@ import (
var _ modules.Module = (*ScriptTask)(nil)
type ScriptTask struct {
modules.ContainerBaseModule `hcl:",remain"`
Shell string `hcl:"shell,optional"`
Inline []string `hcl:"inline,optional"`
Script string `hcl:"script,optional"`
Env map[string]string `hcl:"environment,optional"`
ContinueOnError bool `hcl:"continue_on_error,optional"`
Shell string `hcl:"shell,optional"`
Inline []string `hcl:"inline,optional"`
Script string `hcl:"script,optional"`
Env map[string]string `hcl:"environment,optional"`
ContinueOnError bool `hcl:"continue_on_error,optional"`
}
func (s ScriptTask) Type() string {
@ -33,10 +32,6 @@ func (s ScriptTask) Category() modules.ModuleCategory {
return modules.ModuleTypeTask
}
func (s ScriptTask) ModuleName() string {
return s.Name
}
func (s ScriptTask) Execute(ctx modules.ExecutionContext) (err error) {
logger := ctx.Logger()

View file

@ -13,12 +13,11 @@ import (
var _ modules.Module = (*GoTool)(nil)
type GoTool struct {
modules.BaseModule `hcl:",remain"`
BinaryName string `hcl:"binary_name"`
Repository string `hcl:"repository"`
Version string `hcl:"version"`
State State `hcl:"state,optional"`
BuildArgs []string `hcl:"build_args,optional"`
BinaryName string `hcl:"binary_name"`
Repository string `hcl:"repository"`
Version string `hcl:"version"`
State State `hcl:"state,optional"`
BuildArgs []string `hcl:"build_args,optional"`
}
func (g GoTool) Type() string {
@ -33,10 +32,6 @@ func (g GoTool) Category() modules.ModuleCategory {
return modules.ModuleTypeTool
}
func (g GoTool) ModuleName() string {
return g.Name
}
func (g GoTool) Execute(ctx modules.ExecutionContext) (err error) {
var (
logger = ctx.Logger().With(slog.String("tool_name", g.BinaryName))

View file

@ -5,7 +5,7 @@ import (
)
var Registration = modules.RegistrationFunc(func(registry *modules.TypeRegistry) {
registry.RegisterModule("tool", "go_tool", modules.ModuleFactoryFunc(func() modules.Module {
return new(GoTool)
registry.RegisterModule(modules.ModuleFactoryFunc(func() modules.ModuleWithMeta {
return new(modules.Metadata[GoTool])
}))
})