buildr/internal/containers/orchestrator.go
Peter 34c431790e
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/tag Build is failing
refactor: use connect-go instead of regular Google gRPC
- support binary name for plugins
- register plugins for container jobs
2023-09-12 18:43:34 +02:00

306 lines
7.6 KiB
Go

package containers
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"
"path"
"path/filepath"
"runtime"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"code.icb4dc0.de/buildr/buildr/internal/archive"
"code.icb4dc0.de/buildr/buildr/internal/ignore"
)
const defaultGRPCDialTimeout = 10 * time.Second
var (
_ Shutdowner = (*Orchestrator)(nil)
architectureMapping = map[string]string{
"amd64": "amd64",
"x86_64": "amd64",
"aarch64": "arm64",
}
)
func NewOrchestrator(ctx context.Context, cli *client.Client, ignorer *ignore.Ignorer) (*Orchestrator, error) {
hostInfo, err := cli.Info(ctx)
if err != nil {
return nil, err
}
if runtime.GOOS != hostInfo.OSType {
return nil, fmt.Errorf("buildr is running on %s, but Docker is running on %s", runtime.GOOS, hostInfo.OSType)
}
if runtime.GOARCH != architectureMapping[hostInfo.Architecture] {
return nil, fmt.Errorf("buildr is compiled for %s, but Docker is running on %s", runtime.GOARCH, hostInfo.Architecture)
}
o := Orchestrator{
client: cli,
ignorer: ignorer,
dockerHostOSType: hostInfo.OSType,
dockerHostArch: architectureMapping[hostInfo.Architecture],
}
return &o, nil
}
type Orchestrator struct {
client *client.Client
ignorer *ignore.Ignorer
dockerHostOSType string
dockerHostArch string
}
func (o *Orchestrator) BuildRContainer(ctx context.Context, spec *BuildRContainerSpec) (con Container, baseUrl string, err error) {
conRef := &containerRef{
shutdowner: o,
client: o.client,
resourceName: fmt.Sprintf("buildr-%s-%s", spec.ModuleName, spec.ID),
}
containerSetupCtx, cancel := context.WithCancel(ctx)
defer cancel()
if err := o.pullImage(containerSetupCtx, spec.Image); err != nil {
return nil, "", err
}
if err := o.createContainerNetwork(containerSetupCtx, conRef); err != nil {
return nil, "", err
}
var buildrExecutablePath string
if e, err := os.Executable(); err != nil {
return nil, "", fmt.Errorf("failed to determine buildr executable: %w", err)
} else {
buildrExecutablePath = e
}
_, buildrExecutableName := filepath.Split(buildrExecutablePath)
containerRepoRoot, conSpec := spec.containerSpec(buildrExecutableName)
if err := o.createContainer(containerSetupCtx, conSpec, conRef); err != nil {
return nil, "", err
}
for i := range spec.ExtraBinaries {
extraBin := spec.ExtraBinaries[i]
spec.Content[path.Join(spec.BinariesDir, extraBin)] = path.Join("/", "opt", "buildr", "bin", extraBin)
}
spec.Content[buildrExecutablePath] = path.Join("/", "opt", "buildr", "bin", buildrExecutableName)
if err := o.copyFilesToContainer(containerSetupCtx, spec.RepoRoot, conRef, spec.Content, containerRepoRoot); err != nil {
return nil, "", err
}
if err := o.client.ContainerStart(containerSetupCtx, conRef.containerID, types.ContainerStartOptions{}); err != nil {
return nil, "", err
}
if status, err := o.client.ContainerInspect(containerSetupCtx, conRef.containerID); err != nil {
return nil, "", err
} else if !status.State.Running {
return nil, "", fmt.Errorf("container %s is not running: %d - %s", conRef.containerID, status.State.ExitCode, status.State.Error)
}
_, port, err := conRef.MappedPort(containerSetupCtx, "3000/tcp")
if err != nil {
return nil, "", err
}
baseUrl = fmt.Sprintf("http://localhost:%s", port)
prober := NewContainerProbe(
slog.Default(),
conRef.containerID,
NewGRPCHealthProbe(baseUrl),
o.client,
)
if err = prober.WaitReady(containerSetupCtx); err != nil {
return nil, "", fmt.Errorf("error occurred while waiting for container to be ready: %w", err)
}
return conRef, baseUrl, nil
}
func (o *Orchestrator) ShutdownContainer(ctx context.Context, con Container) error {
ignoreNotFound := func(err error) error {
if client.IsErrNotFound(err) {
return nil
}
return err
}
return errors.Join(
ignoreNotFound(o.client.ContainerStop(ctx, con.ID(), container.StopOptions{})),
ignoreNotFound(o.client.ContainerRemove(ctx, con.ID(), types.ContainerRemoveOptions{})),
ignoreNotFound(o.client.NetworkRemove(ctx, con.NetworkID())),
)
}
func (o *Orchestrator) pullImage(ctx context.Context, image string) error {
_, _, err := o.client.ImageInspectWithRaw(ctx, image)
var shouldPullImage bool
if err != nil {
if client.IsErrNotFound(err) {
shouldPullImage = true
} else {
return err
}
}
if !shouldPullImage {
return nil
}
slog.Info("Pulling image", slog.String("image_name", image))
rc, err := o.client.ImagePull(ctx, image, types.ImagePullOptions{})
if err != nil {
return err
}
_, err = io.Copy(io.Discard, rc)
return errors.Join(err, rc.Close())
}
func (o *Orchestrator) createContainerNetwork(ctx context.Context, conRef *containerRef) (err error) {
var shouldCreateNetwork bool
res, err := o.client.NetworkInspect(ctx, conRef.resourceName, types.NetworkInspectOptions{})
if err != nil {
if client.IsErrNotFound(err) {
shouldCreateNetwork = true
} else {
return err
}
}
if !shouldCreateNetwork {
conRef.networkID = res.ID
return nil
}
createOpts := types.NetworkCreate{
CheckDuplicate: true,
}
if resp, err := o.client.NetworkCreate(ctx, conRef.resourceName, createOpts); err != nil {
return err
} else {
conRef.networkID = resp.ID
return nil
}
}
func (o *Orchestrator) createContainer(ctx context.Context, spec *ContainerSpec, conRef *containerRef) error {
ports, binding, err := nat.ParsePortSpecs(spec.ExposedPorts)
if err != nil {
return err
}
containerConfig := container.Config{
Image: spec.Image,
WorkingDir: spec.WorkDir(),
ExposedPorts: ports,
Entrypoint: spec.Entrypoint,
Cmd: spec.Cmd,
}
if spec.User != "" {
containerConfig.User = spec.User
}
for k, v := range spec.Env {
containerConfig.Env = append(containerConfig.Env, k+"="+v)
}
hostConfig := container.HostConfig{
AutoRemove: spec.AutoRemove,
Privileged: spec.Privileged,
CapAdd: spec.Capabilities.Add,
CapDrop: spec.Capabilities.Drop,
PortBindings: binding,
Mounts: spec.Mounts,
}
networkConfig := network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
conRef.resourceName: {
NetworkID: conRef.networkID,
},
},
}
resp, err := o.client.ContainerCreate(ctx, &containerConfig, &hostConfig, &networkConfig, nil, conRef.resourceName)
if err != nil {
return err
}
conRef.containerID = resp.ID
return nil
}
func (o *Orchestrator) copyFilesToContainer(
ctx context.Context,
repoRoot string,
conRef *containerRef,
content map[string]string,
targetDir string,
) error {
copyOpts := types.CopyToContainerOptions{}
tarTmpFile, err := os.CreateTemp(os.TempDir(), "buildr-container-*")
if err != nil {
return fmt.Errorf("failed to create temp directory: %w", err)
}
defer func(tmpFileName string) {
err = errors.Join(err, os.Remove(tmpFileName))
}(tarTmpFile.Name())
contentArchive := archive.Tar{Ignorer: o.ignorer}
for src, dest := range content {
if !path.IsAbs(src) {
src = path.Join(repoRoot, src)
}
if !path.IsAbs(dest) {
dest = path.Join(targetDir, dest)
}
if err := contentArchive.Add(src, dest); err != nil {
return err
}
}
if err := contentArchive.Write(tarTmpFile); err != nil {
return err
}
if _, err := tarTmpFile.Seek(0, 0); err != nil {
return fmt.Errorf("failed to reset file offset: %w", err)
}
if err := o.client.CopyToContainer(ctx, conRef.containerID, "/", tarTmpFile, copyOpts); err != nil {
return fmt.Errorf("failed to copy files to container: %w", err)
}
return nil
}