This repository has been archived on 2023-11-25. You can view files and clone it, but cannot push or open issues or pull requests.
kreaper/reaper/reaper.go
2022-04-13 21:00:32 +02:00

118 lines
2.5 KiB
Go

package reaper
import (
"context"
"time"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type Kreaper struct {
labelSelector labels.Selector
Client client.WithWatch
Lifetime time.Duration
Target Target
TargetNamespace string
}
func (k Kreaper) Kill(ctx context.Context) (err error) {
var (
logger = zap.L()
podList corev1.PodList
)
if k.labelSelector, err = k.Target.Selector(); err != nil {
return err
}
opts := []client.ListOption{
client.MatchingLabelsSelector{Selector: k.labelSelector},
client.InNamespace(k.TargetNamespace),
}
if err = k.Client.List(ctx, &podList, opts...); err != nil {
logger.Error("failed to list", zap.Error(err))
return err
}
if len(podList.Items) < 1 {
logger.Warn("No pod targets found")
return nil
}
for i := range podList.Items {
logger.Info("Found pod", zap.String("pod_name", podList.Items[i].Name))
}
watcher, err := k.Client.Watch(ctx, &podList, opts...)
if err != nil {
logger.Error("failed to setup watch", zap.Error(err))
return err
}
defer watcher.Stop()
done, err := k.startPodWatcher(ctx, podList, opts)
select {
case <-time.After(k.Lifetime):
logger.Info("Reached end of lifetime force delete all matching pods")
if err := k.forceDeleteAll(ctx); err != nil {
logger.Error("Failed to delete all pods", zap.Error(err))
return err
}
return nil
case <-done:
logger.Info("All pods deleted")
return nil
}
}
func (k *Kreaper) startPodWatcher(ctx context.Context, podList corev1.PodList, opts []client.ListOption) (<-chan struct{}, error) {
logger := zap.L()
watcher, err := k.Client.Watch(ctx, &podList, opts...)
if err != nil {
logger.Error("failed to setup watch", zap.Error(err))
return nil, err
}
done := make(chan struct{})
go func() {
defer watcher.Stop()
for ev := range watcher.ResultChan() {
if ev.Type != watch.Deleted {
continue
}
if err := k.Client.List(ctx, &podList, opts...); err != nil {
continue
}
if len(podList.Items) == 0 {
close(done)
break
}
}
}()
return done, nil
}
func (k *Kreaper) forceDeleteAll(ctx context.Context) error {
return k.Client.DeleteAllOf(
ctx,
new(corev1.Pod),
client.InNamespace(k.TargetNamespace),
client.PropagationPolicy(metav1.DeletePropagationForeground),
client.MatchingLabelsSelector{
Selector: k.labelSelector,
},
)
}