94 lines
2 KiB
Go
94 lines
2 KiB
Go
package containers
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/docker/docker/client"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
const containerAPICallingThreshold = 100 * time.Millisecond
|
|
|
|
func NewContainerProbe(logger *slog.Logger, containerID string, probe Probe, inspector Inspector) *ContainerProber {
|
|
return &ContainerProber{
|
|
containerID: containerID,
|
|
probe: probe,
|
|
inspector: inspector,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
type ContainerProber struct {
|
|
probe Probe
|
|
inspector Inspector
|
|
logger *slog.Logger
|
|
containerID string
|
|
}
|
|
|
|
func (p *ContainerProber) WaitReady(ctx context.Context) error {
|
|
waitCtx, cancel := context.WithCancel(ctx)
|
|
|
|
monitorErr := p.inspectMonitor(ctx)
|
|
probeReady, probeErr := p.probe.Execute(waitCtx)
|
|
|
|
defer cancel()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case err := <-monitorErr:
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case ready := <-probeReady:
|
|
if ready {
|
|
return nil
|
|
}
|
|
case err := <-probeErr:
|
|
if respStatus, ok := status.FromError(err); ok {
|
|
if respStatus.Code() == codes.Unavailable {
|
|
continue
|
|
}
|
|
}
|
|
if err != nil {
|
|
p.logger.Error("Error occurred while probing container", slog.String("err", err.Error()))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *ContainerProber) inspectMonitor(ctx context.Context) (errs chan error) {
|
|
errs = make(chan error)
|
|
go func() {
|
|
defer close(errs)
|
|
for ctx.Err() == nil {
|
|
containerInfo, err := p.inspector.ContainerInspect(ctx, p.containerID)
|
|
if err != nil {
|
|
if client.IsErrNotFound(err) {
|
|
errs <- fmt.Errorf("container %s does not exist", p.containerID)
|
|
return
|
|
} else if errors.Is(err, context.Canceled) {
|
|
return
|
|
}
|
|
p.logger.Error("Error occurred while inspecting container", slog.String("err", err.Error()))
|
|
}
|
|
|
|
if !containerInfo.State.Running {
|
|
errs <- fmt.Errorf("container %s is not running", p.containerID)
|
|
return
|
|
}
|
|
|
|
// sleep for a bit to not overload container API
|
|
time.Sleep(containerAPICallingThreshold)
|
|
}
|
|
}()
|
|
|
|
return errs
|
|
}
|