117 lines
2.5 KiB
Go
117 lines
2.5 KiB
Go
package reaper
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
)
|
|
|
|
type Kreaper struct {
|
|
labelSelector labels.Selector
|
|
Client client.WithWatch
|
|
Lifetime time.Duration
|
|
Target Target
|
|
TargetNamespace string
|
|
}
|
|
|
|
func (k Kreaper) Kill(ctx context.Context) (err error) {
|
|
var (
|
|
logger = zap.L()
|
|
podList corev1.PodList
|
|
)
|
|
|
|
if k.labelSelector, err = k.Target.Selector(); err != nil {
|
|
return err
|
|
}
|
|
|
|
opts := []client.ListOption{
|
|
client.MatchingLabelsSelector{Selector: k.labelSelector},
|
|
client.InNamespace(k.TargetNamespace),
|
|
}
|
|
|
|
if err = k.Client.List(ctx, &podList, opts...); err != nil {
|
|
logger.Error("failed to list", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
if len(podList.Items) < 1 {
|
|
logger.Warn("No pod targets found")
|
|
return nil
|
|
}
|
|
|
|
for i := range podList.Items {
|
|
logger.Info("Found pod", zap.String("pod_name", podList.Items[i].Name))
|
|
}
|
|
|
|
watcher, err := k.Client.Watch(ctx, &podList, opts...)
|
|
if err != nil {
|
|
logger.Error("failed to setup watch", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
defer watcher.Stop()
|
|
done, err := k.startPodWatcher(ctx, podList, opts)
|
|
|
|
select {
|
|
case <-time.After(k.Lifetime):
|
|
logger.Info("Reached end of lifetime force delete all matching pods")
|
|
if err := k.forceDeleteAll(ctx); err != nil {
|
|
logger.Error("Failed to delete all pods", zap.Error(err))
|
|
return err
|
|
}
|
|
return nil
|
|
case <-done:
|
|
logger.Info("All pods deleted")
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (k *Kreaper) startPodWatcher(ctx context.Context, podList corev1.PodList, opts []client.ListOption) (<-chan struct{}, error) {
|
|
logger := zap.L()
|
|
watcher, err := k.Client.Watch(ctx, &podList, opts...)
|
|
if err != nil {
|
|
logger.Error("failed to setup watch", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
|
|
go func() {
|
|
defer watcher.Stop()
|
|
|
|
for ev := range watcher.ResultChan() {
|
|
if ev.Type != watch.Deleted {
|
|
continue
|
|
}
|
|
|
|
if err := k.Client.List(ctx, &podList, opts...); err != nil {
|
|
continue
|
|
}
|
|
|
|
if len(podList.Items) == 0 {
|
|
close(done)
|
|
break
|
|
}
|
|
}
|
|
}()
|
|
|
|
return done, nil
|
|
}
|
|
|
|
func (k *Kreaper) forceDeleteAll(ctx context.Context) error {
|
|
return k.Client.DeleteAllOf(
|
|
ctx,
|
|
new(corev1.Pod),
|
|
client.InNamespace(k.TargetNamespace),
|
|
client.PropagationPolicy(metav1.DeletePropagationForeground),
|
|
client.MatchingLabelsSelector{
|
|
Selector: k.labelSelector,
|
|
},
|
|
)
|
|
}
|