refactor: implement control plane as controller-runtime manager

This commit is contained in:
Peter 2025-01-20 17:06:41 +01:00
parent a5c170a478
commit 3104f50c58
Signed by: prskr
GPG key ID: F56BED6903BC5E37
67 changed files with 3693 additions and 261 deletions

View file

@ -1,5 +1,5 @@
/*
Copyright 2024 Peter Kurfer.
Copyright 2025 Peter Kurfer.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -18,12 +18,11 @@ package main
import (
"context"
"errors"
"crypto/tls"
"fmt"
"net"
"time"
"github.com/alecthomas/kong"
clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
@ -31,7 +30,7 @@ import (
routeservice "github.com/envoyproxy/go-control-plane/envoy/service/route/v3"
runtimeservice "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3"
secretservice "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
"google.golang.org/grpc"
grpchealth "google.golang.org/grpc/health"
@ -39,16 +38,103 @@ import (
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
mgr "sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"code.icb4dc0.de/prskr/supabase-operator/internal/controlplane"
)
//nolint:lll // flag declaration with struct tags is as long as it is
type controlPlane struct {
ListenAddr string `name:"listen-address" default:":18000" help:"The address the control plane binds to."`
ListenAddr string `name:"listen-address" default:":18000" help:"The address the control plane binds to."`
MetricsAddr string `name:"metrics-bind-address" default:"0" help:"The address the metrics endpoint binds to. Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service."`
EnableLeaderElection bool `name:"leader-elect" default:"false" help:"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager."`
ProbeAddr string `name:"health-probe-bind-address" default:":8081" help:"The address the probe endpoint binds to."`
SecureMetrics bool `name:"metrics-secure" default:"true" help:"If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead."`
EnableHTTP2 bool `name:"enable-http2" default:"false" help:"If set, HTTP/2 will be enabled for the metrics and webhook servers"`
}
func (p controlPlane) Run(ctx context.Context, cache cache.SnapshotCache) (err error) {
func (cp controlPlane) Run(ctx context.Context) error {
var tlsOpts []func(*tls.Config)
// if the enable-http2 flag is false (the default), http/2 should be disabled
// due to its vulnerabilities. More specifically, disabling http/2 will
// prevent from being vulnerable to the HTTP/2 Stream Cancellation and
// Rapid Reset CVEs. For more information see:
// - https://github.com/advisories/GHSA-qppj-fm5r-hxr3
// - https://github.com/advisories/GHSA-4374-p667-p6c8
disableHTTP2 := func(c *tls.Config) {
setupLog.Info("disabling http/2")
c.NextProtos = []string{"http/1.1"}
}
if !cp.EnableHTTP2 {
tlsOpts = append(tlsOpts, disableHTTP2)
}
// Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server.
// More info:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/server
// - https://book.kubebuilder.io/reference/metrics.html
metricsServerOptions := metricsserver.Options{
BindAddress: cp.MetricsAddr,
SecureServing: cp.SecureMetrics,
TLSOpts: tlsOpts,
}
if cp.SecureMetrics {
// FilterProvider is used to protect the metrics endpoint with authn/authz.
// These configurations ensure that only authorized users and service accounts
// can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info:
// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/filters#WithAuthenticationAndAuthorization
metricsServerOptions.FilterProvider = filters.WithAuthenticationAndAuthorization
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: metricsServerOptions,
HealthProbeBindAddress: cp.ProbeAddr,
LeaderElection: cp.EnableLeaderElection,
BaseContext: func() context.Context { return ctx },
LeaderElectionID: "30f6fafb.k8s.icb4dc0.de",
LeaderElectionReleaseOnCancel: true,
})
if err != nil {
return fmt.Errorf("unable to start control plane: %w", err)
}
envoySnapshotCache := cachev3.NewSnapshotCache(false, cachev3.IDHash{}, nil)
envoySrv, err := cp.envoyServer(ctx, envoySnapshotCache)
if err != nil {
return err
}
if err := mgr.Add(envoySrv); err != nil {
return fmt.Errorf("failed to add enovy server to manager: %w", err)
}
if err = (&controlplane.APIGatewayReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Cache: envoySnapshotCache,
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to create controller Core DB: %w", err)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
return fmt.Errorf("problem running manager: %w", err)
}
return nil
}
func (cp controlPlane) envoyServer(
ctx context.Context,
cache cachev3.SnapshotCache,
) (runnable mgr.Runnable, err error) {
const (
grpcKeepaliveTime = 30 * time.Second
grpcKeepaliveTimeout = 5 * time.Second
@ -56,19 +142,10 @@ func (p controlPlane) Run(ctx context.Context, cache cache.SnapshotCache) (err e
grpcMaxConcurrentStreams = 1000000
)
logger := ctrl.Log.WithName("control-plane")
clientOpts := client.Options{
Scheme: scheme,
}
logger.Info("Creating client")
watcherClient, err := client.NewWithWatch(ctrl.GetConfigOrDie(), clientOpts)
if err != nil {
return err
}
srv := server.NewServer(ctx, cache, nil)
var (
logger = ctrl.Log.WithName("control-plane")
srv = server.NewServer(ctx, cache, nil)
)
// gRPC golang library sets a very small upper bound for the number gRPC/h2
// streams over a single TCP connection. If a proxy multiplexes requests over
@ -89,13 +166,14 @@ func (p controlPlane) Run(ctx context.Context, cache cache.SnapshotCache) (err e
)
grpcServer := grpc.NewServer(grpcOptions...)
logger.Info("Opening listener", "addr", p.ListenAddr)
lis, err := net.Listen("tcp", p.ListenAddr)
logger.Info("Opening listener", "addr", cp.ListenAddr)
lis, err := net.Listen("tcp", cp.ListenAddr)
if err != nil {
return fmt.Errorf("opening listener: %w", err)
return nil, fmt.Errorf("opening listener: %w", err)
}
logger.Info("Preparing health endpoints")
healthService := grpchealth.NewServer()
healthService.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
@ -109,39 +187,11 @@ func (p controlPlane) Run(ctx context.Context, cache cache.SnapshotCache) (err e
runtimeservice.RegisterRuntimeDiscoveryServiceServer(grpcServer, srv)
grpc_health_v1.RegisterHealthServer(grpcServer, healthService)
// discoverygrpc.AggregatedDiscoveryService_ServiceDesc.ServiceName
endpointsController := controlplane.EndpointsController{
Client: watcherClient,
Cache: cache,
}
errOut := make(chan error)
go func(errOut chan<- error) {
logger.Info("Starting gRPC server")
errOut <- grpcServer.Serve(lis)
}(errOut)
go func(errOut chan<- error) {
logger.Info("Staring endpoints controller")
errOut <- endpointsController.Run(ctx)
}(errOut)
go func(errOut chan error) {
for out := range errOut {
err = errors.Join(err, out)
}
}(errOut)
<-ctx.Done()
grpcServer.Stop()
return err
}
//nolint:unparam // signature required by kong
func (p controlPlane) AfterApply(kongctx *kong.Context) error {
kongctx.BindTo(cache.NewSnapshotCache(false, cache.IDHash{}, nil), (*cache.SnapshotCache)(nil))
return nil
return mgr.RunnableFunc(func(ctx context.Context) error {
go func(ctx context.Context) {
<-ctx.Done()
grpcServer.GracefulStop()
}(ctx)
return grpcServer.Serve(lis)
}), nil
}

View file

@ -87,23 +87,14 @@ func (m manager) Run(ctx context.Context) error {
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: metricsServerOptions,
WebhookServer: webhookServer,
HealthProbeBindAddress: m.ProbeAddr,
LeaderElection: m.EnableLeaderElection,
LeaderElectionID: "05f9463f.k8s.icb4dc0.de",
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
// when the Manager ends. This requires the binary to immediately end when the
// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
// speeds up voluntary leader transitions as the new leader don't have to wait
// LeaseDuration time first.
//
// In the default scaffold provided, the program ends immediately after
// the manager stops, so would be fine to enable this option. However,
// if you are doing or is intended to do any operation such as perform cleanups
// after the manager stops then its usage might be unsafe.
// LeaderElectionReleaseOnCancel: true,
Scheme: scheme,
Metrics: metricsServerOptions,
WebhookServer: webhookServer,
HealthProbeBindAddress: m.ProbeAddr,
LeaderElection: m.EnableLeaderElection,
BaseContext: func() context.Context { return ctx },
LeaderElectionID: "05f9463f.k8s.icb4dc0.de",
LeaderElectionReleaseOnCancel: true,
})
if err != nil {
return fmt.Errorf("unable to start manager: %w", err)