diff --git a/Tiltfile b/Tiltfile index 91f534d..8df7741 100644 --- a/Tiltfile +++ b/Tiltfile @@ -72,7 +72,6 @@ k8s_resource( objects=["dashboard-sample:Dashboard:supabase-demo"], extra_pod_selectors={"app.kubernetes.io/component": "dashboard", "app.kubernetes.io/name": "studio"}, discovery_strategy="selectors-only", - # port_forwards=[3000], new_name='Dashboard', resource_deps=[ 'supabase-controller-manager' diff --git a/api/v1alpha1/apigateway_types.go b/api/v1alpha1/apigateway_types.go index 177ed52..567b6fe 100644 --- a/api/v1alpha1/apigateway_types.go +++ b/api/v1alpha1/apigateway_types.go @@ -171,8 +171,7 @@ type APIGatewaySpec struct { } type EnvoyStatus struct { - ConfigVersion string `json:"configVersion,omitempty"` - ResourceHash []byte `json:"resourceHash,omitempty"` + ResourceHash []byte `json:"resourceHash,omitempty"` } // APIGatewayStatus defines the observed state of APIGateway. @@ -185,7 +184,7 @@ type APIGatewayStatus struct { // +kubebuilder:subresource:status // APIGateway is the Schema for the apigateways API. -// +kubebuilder:printcolumn:name="EnvoyConfigVersion",type=string,JSONPath=`.status.envoy.configVersion` +// +kubebuilder:printcolumn:name="EnvoyConfigVersion",type=string,JSONPath=`.status.envoy.resourceHash` type APIGateway struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/cmd/control_plane.go b/cmd/control_plane.go index bf936d5..89232eb 100644 --- a/cmd/control_plane.go +++ b/cmd/control_plane.go @@ -25,6 +25,7 @@ import ( "strings" "time" + corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3" discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" @@ -33,11 +34,11 @@ import ( runtimeservice "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3" secretservice "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "google.golang.org/grpc/credentials" "github.com/go-logr/logr" - "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "google.golang.org/grpc" grpchealth "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" @@ -79,7 +80,7 @@ type controlPlane struct { Namespace string `name:"namespace" env:"CONTROL_PLANE_NAMESPACE" default:"" required:"" help:"Namespace where the controller is running, ideally set via downward API"` } -func (cp *controlPlane) Run(ctx context.Context) error { +func (cp *controlPlane) Run(ctx context.Context, logger logr.Logger) error { var tlsOpts []func(*tls.Config) // if the enable-http2 flag is false (the default), http/2 should be disabled @@ -133,14 +134,15 @@ func (cp *controlPlane) Run(ctx context.Context) error { return fmt.Errorf("unable to start control plane: %w", err) } - envoySnapshotCache := cachev3.NewSnapshotCache(false, cachev3.IDHash{}, nil) + cacheLoggerInst := cacheLogger(logger.WithName("envoy-snapshot-cache")) + envoySnapshotCache := cachev3.NewSnapshotCache(true, cachev3.IDHash{}, cacheLoggerInst) serverCert, err := cp.ensureControlPlaneTlsCert(ctx, bootstrapClient) if err != nil { return fmt.Errorf("failed to ensure control plane TLS cert: %w", err) } - envoySrv, err := cp.envoyServer(ctx, envoySnapshotCache, serverCert) + envoySrv, err := cp.envoyServer(ctx, logger, envoySnapshotCache, serverCert) if err != nil { return err } @@ -188,6 +190,7 @@ func (cp *controlPlane) AfterApply() (err error) { func (cp *controlPlane) envoyServer( ctx context.Context, + logger logr.Logger, cache cachev3.SnapshotCache, serverCert tls.Certificate, ) (runnable mgr.Runnable, err error) { @@ -197,11 +200,8 @@ func (cp *controlPlane) envoyServer( grpcKeepaliveMinTime = 30 * time.Second grpcMaxConcurrentStreams = 1000000 ) - - var ( - logger = ctrl.Log.WithName("control-plane") - srv = server.NewServer(ctx, cache, nil) - ) + srv := server.NewServer(ctx, cache, xdsServerCallbacks(logger)) + logger = logger.WithName("control-plane") // gRPC golang library sets a very small upper bound for the number gRPC/h2 // streams over a single TCP connection. If a proxy multiplexes requests over @@ -214,10 +214,6 @@ func (cp *controlPlane) envoyServer( return nil, fmt.Errorf("failed to create TLS config: %w", err) } - loggingOpts := []logging.Option{ - logging.WithLogOnEvents(logging.StartCall, logging.FinishCall), - } - grpcOptions := append(make([]grpc.ServerOption, 0, 4), grpc.Creds(credentials.NewTLS(tlsCfg)), grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams), @@ -229,12 +225,6 @@ func (cp *controlPlane) envoyServer( MinTime: grpcKeepaliveMinTime, PermitWithoutStream: true, }), - grpc.ChainUnaryInterceptor( - logging.UnaryServerInterceptor(InterceptorLogger(ctrl.Log), loggingOpts...), - ), - grpc.ChainStreamInterceptor( - logging.StreamServerInterceptor(InterceptorLogger(ctrl.Log), loggingOpts...), - ), ) grpcServer := grpc.NewServer(grpcOptions...) @@ -268,7 +258,10 @@ func (cp *controlPlane) envoyServer( }), nil } -func (cp *controlPlane) ensureControlPlaneTlsCert(ctx context.Context, k8sClient client.Client) (tls.Certificate, error) { +func (cp *controlPlane) ensureControlPlaneTlsCert( + ctx context.Context, + k8sClient client.Client, +) (tls.Certificate, error) { var ( controlPlaneServerCert = &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -347,10 +340,86 @@ func (cp *controlPlane) tlsConfig(serverCert tls.Certificate) (*tls.Config, erro return tlsCfg, nil } -// InterceptorLogger adapts slog logger to interceptor logger. -// This code is simple enough to be copied and not imported. -func InterceptorLogger(l logr.Logger) logging.Logger { - return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) { - l.Info(msg, fields...) - }) +func xdsServerCallbacks(logger logr.Logger) server.Callbacks { + return server.CallbackFuncs{ + StreamOpenFunc: func(ctx context.Context, streamId int64, nodeId string) error { + logger.Info("Stream opened", "stream-id", streamId, "node-id", nodeId) + return nil + }, + StreamClosedFunc: func(streamId int64, node *corev3.Node) { + logger.Info("Stream closed", "stream-id", streamId, + "node.id", node.Id, + "node.cluster", node.Cluster, + ) + }, + StreamRequestFunc: func(streamId int64, request *discoverygrpc.DiscoveryRequest) error { + logger.Info("Stream request", + "stream-id", streamId, + "request.node.id", request.Node.Id, + "request.node.cluster", request.Node.Cluster, + "request.version", request.VersionInfo, + "request.error", request.ErrorDetail, + ) + return nil + }, + StreamResponseFunc: func( + ctx context.Context, + streamId int64, + request *discoverygrpc.DiscoveryRequest, + response *discoverygrpc.DiscoveryResponse, + ) { + logger.Info("Stream delta response", + "stream-id", streamId, + "request.node.id", request.Node.Id, + "request.node.cluster", request.Node.Cluster, + ) + }, + DeltaStreamOpenFunc: func(ctx context.Context, streamId int64, nodeId string) error { + logger.Info("Delta stream opened", "stream-id", streamId, "node-id", nodeId) + return nil + }, + + DeltaStreamClosedFunc: func(streamId int64, node *corev3.Node) { + logger.Info("Delta stream closed", + "stream-id", streamId, + "node.id", node.Id, + "node.cluster", node.Cluster, + ) + }, + StreamDeltaRequestFunc: func(i int64, request *discoverygrpc.DeltaDiscoveryRequest) error { + logger.Info("Stream delta request", + "stream-id", i, + "request.node.id", request.Node.Id, + "request.node.cluster", request.Node.Cluster, + "request.error", request.ErrorDetail, + ) + return nil + }, + StreamDeltaResponseFunc: func( + i int64, + request *discoverygrpc.DeltaDiscoveryRequest, + response *discoverygrpc.DeltaDiscoveryResponse, + ) { + logger.Info("Stream delta response", + "stream-id", i, + "request.node", request.Node, + "response.resources", response.Resources, + ) + }, + } +} + +func cacheLogger(logger logr.Logger) log.Logger { + wrapper := func(delegate func(msg string, keysAndValues ...any)) func(string, ...any) { + return func(s string, i ...any) { + delegate(fmt.Sprintf(s, i...)) + } + } + + return log.LoggerFuncs{ + DebugFunc: nil, // enable for debug info + InfoFunc: wrapper(logger.Info), + WarnFunc: wrapper(logger.Info), + ErrorFunc: wrapper(logger.Info), + } } diff --git a/config/crd/bases/supabase.k8s.icb4dc0.de_apigateways.yaml b/config/crd/bases/supabase.k8s.icb4dc0.de_apigateways.yaml index c5e5cc0..4003876 100644 --- a/config/crd/bases/supabase.k8s.icb4dc0.de_apigateways.yaml +++ b/config/crd/bases/supabase.k8s.icb4dc0.de_apigateways.yaml @@ -15,7 +15,7 @@ spec: scope: Namespaced versions: - additionalPrinterColumns: - - jsonPath: .status.envoy.configVersion + - jsonPath: .status.envoy.resourceHash name: EnvoyConfigVersion type: string name: v1alpha1 @@ -2826,8 +2826,6 @@ spec: properties: envoy: properties: - configVersion: - type: string resourceHash: format: byte type: string diff --git a/internal/controller/apigateway_controller.go b/internal/controller/apigateway_controller.go index 14c7911..95f4d40 100644 --- a/internal/controller/apigateway_controller.go +++ b/internal/controller/apigateway_controller.go @@ -419,8 +419,6 @@ func (r *APIGatewayReconciler) reconileEnvoyDeployment( const ( configVolumeName = "config" controlPlaneTlsVolumeName = "cp-tls" - dashboardTlsVolumeName = "dashboard-tls" - apiTlsVolumeName = "api-tls" ) envoyDeployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -448,131 +446,6 @@ func (r *APIGatewayReconciler) reconileEnvoyDeployment( envoyDeployment.Spec.Replicas = envoySpec.WorkloadTemplate.ReplicaCount() - configVolumeProjectionSources := []corev1.VolumeProjection{ - { - ConfigMap: &corev1.ConfigMapProjection{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: supabase.ServiceConfig.Envoy.ObjectName(gateway), - }, - Items: []corev1.KeyToPath{ - { - Key: "config.yaml", - Path: "config.yaml", - }, - }, - }, - }, - { - Secret: &corev1.SecretProjection{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: gateway.Spec.ApiEndpoint.JWKSSelector.Name, - }, - Items: []corev1.KeyToPath{{ - Key: gateway.Spec.ApiEndpoint.JWKSSelector.Key, - Path: "jwks.json", - }}, - }, - }, - { - Secret: &corev1.SecretProjection{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: serviceCfg.ControlPlaneClientCertSecretName(gateway), - }, - Items: []corev1.KeyToPath{ - { - Key: "ca.crt", - Path: "certs/cp/ca.crt", - }, - { - Key: "tls.crt", - Path: "certs/cp/tls.crt", - }, - { - Key: "tls.key", - Path: "certs/cp/tls.key", - }, - }, - }, - }, - } - - if oauth2Spec := gateway.Spec.DashboardEndpoint.OAuth2(); oauth2Spec != nil { - configVolumeProjectionSources = append(configVolumeProjectionSources, corev1.VolumeProjection{ - Secret: &corev1.SecretProjection{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: oauth2Spec.ClientSecretRef.Name, - }, - Items: []corev1.KeyToPath{{ - Key: oauth2Spec.ClientSecretRef.Key, - Path: serviceCfg.Defaults.OAuth2ClientSecretKey, - }}, - }, - }) - } - - volumeMounts := []corev1.VolumeMount{ - { - Name: configVolumeName, - ReadOnly: true, - MountPath: "/etc/envoy", - }, - } - - volumes := []corev1.Volume{ - { - Name: configVolumeName, - VolumeSource: corev1.VolumeSource{ - Projected: &corev1.ProjectedVolumeSource{ - Sources: configVolumeProjectionSources, - }, - }, - }, - { - Name: controlPlaneTlsVolumeName, - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: serviceCfg.ControlPlaneClientCertSecretName(gateway), - }, - }, - }, - } - - if tlsSpec := gateway.Spec.ApiEndpoint.TLSSpec(); tlsSpec != nil { - volumes = append(volumes, corev1.Volume{ - Name: apiTlsVolumeName, - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: tlsSpec.Cert.SecretName, - }, - }, - }) - - volumeMounts = append(volumeMounts, corev1.VolumeMount{ - Name: dashboardTlsVolumeName, - ReadOnly: true, - MountPath: "/etc/envoy/certs/api", - SubPath: "certs/api", - }) - } - - if tlsSpec := gateway.Spec.DashboardEndpoint.TLSSpec(); tlsSpec != nil { - volumes = append(volumes, corev1.Volume{ - Name: dashboardTlsVolumeName, - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: tlsSpec.Cert.SecretName, - }, - }, - }) - - volumeMounts = append(volumeMounts, corev1.VolumeMount{ - Name: dashboardTlsVolumeName, - ReadOnly: true, - MountPath: "/etc/envoy/certs/dashboard", - SubPath: "certs/dashboard", - }) - } - envoyDeployment.Spec.Template = corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ @@ -632,11 +505,78 @@ func (r *APIGatewayReconciler) reconileEnvoyDeployment( }, SecurityContext: envoySpec.WorkloadTemplate.ContainerSecurityContext(serviceCfg.Defaults.UID, serviceCfg.Defaults.GID), Resources: envoySpec.WorkloadTemplate.Resources(), - VolumeMounts: envoySpec.WorkloadTemplate.AdditionalVolumeMounts(volumeMounts...), + VolumeMounts: envoySpec.WorkloadTemplate.AdditionalVolumeMounts(corev1.VolumeMount{ + Name: configVolumeName, + ReadOnly: true, + MountPath: "/etc/envoy", + }), }, }, SecurityContext: envoySpec.WorkloadTemplate.PodSecurityContext(), - Volumes: volumes, + Volumes: []corev1.Volume{ + { + Name: configVolumeName, + VolumeSource: corev1.VolumeSource{ + Projected: &corev1.ProjectedVolumeSource{ + Sources: []corev1.VolumeProjection{ + { + ConfigMap: &corev1.ConfigMapProjection{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: supabase.ServiceConfig.Envoy.ObjectName(gateway), + }, + Items: []corev1.KeyToPath{ + { + Key: "config.yaml", + Path: "config.yaml", + }, + }, + }, + }, + { + Secret: &corev1.SecretProjection{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: gateway.Spec.ApiEndpoint.JWKSSelector.Name, + }, + Items: []corev1.KeyToPath{{ + Key: gateway.Spec.ApiEndpoint.JWKSSelector.Key, + Path: "jwks.json", + }}, + }, + }, + { + Secret: &corev1.SecretProjection{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: serviceCfg.ControlPlaneClientCertSecretName(gateway), + }, + Items: []corev1.KeyToPath{ + { + Key: "ca.crt", + Path: "certs/cp/ca.crt", + }, + { + Key: "tls.crt", + Path: "certs/cp/tls.crt", + }, + { + Key: "tls.key", + Path: "certs/cp/tls.key", + }, + }, + }, + }, + }, + }, + }, + }, + { + Name: controlPlaneTlsVolumeName, + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: serviceCfg.ControlPlaneClientCertSecretName(gateway), + }, + }, + }, + }, }, } diff --git a/internal/controller/templates/envoy_control_plane_config.yaml.tmpl b/internal/controller/templates/envoy_control_plane_config.yaml.tmpl index b1d2581..4a797a8 100644 --- a/internal/controller/templates/envoy_control_plane_config.yaml.tmpl +++ b/internal/controller/templates/envoy_control_plane_config.yaml.tmpl @@ -17,7 +17,7 @@ static_resources: clusters: - name: {{ .ControlPlane.Name }} type: STRICT_DNS - connect_timeout: 1s + connect_timeout: 5s load_assignment: cluster_name: {{ .ControlPlane.Name }} endpoints: diff --git a/internal/controlplane/apigateway_controller.go b/internal/controlplane/apigateway_controller.go index 5021f3c..9f2145b 100644 --- a/internal/controlplane/apigateway_controller.go +++ b/internal/controlplane/apigateway_controller.go @@ -17,7 +17,6 @@ limitations under the License. package controlplane import ( - "bytes" "context" "fmt" "strconv" @@ -87,23 +86,19 @@ func (r *APIGatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) } services.UpsertEndpointSlices(endpointSliceList.Items...) - instance := fmt.Sprintf("%s:%s", gateway.Spec.Envoy.NodeName, gateway.Namespace) + var ( + instance = fmt.Sprintf("%s:%s", gateway.Spec.Envoy.NodeName, gateway.Namespace) + snapshotVersion = strconv.FormatInt(time.Now().UTC().UnixMilli(), 10) + ) - logger.Info("Computing Envoy snapshot for current service targets", "version", gateway.Status.Envoy.ConfigVersion) - snapshot, snapshotHash, err := services.snapshot(ctx, instance, gateway.Status.Envoy.ConfigVersion) + logger.Info("Computing Envoy snapshot for current service targets", "version", snapshotVersion) + snapshot, snapshotHash, err := services.snapshot(ctx, instance, snapshotVersion) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to prepare snapshot: %w", err) } - if !r.initialReconciliation.CompareAndSwap(false, true) && bytes.Equal(gateway.Status.Envoy.ResourceHash, snapshotHash) { - logger.Info("No changes detected, skipping update") - return ctrl.Result{}, nil - } - - logger.Info("Updating service targets") - _, err = controllerutil.CreateOrUpdate(ctx, r.Client, &gateway, func() error { + opResult, err := controllerutil.CreateOrUpdate(ctx, r.Client, &gateway, func() error { gateway.Status.ServiceTargets = services.Targets() - gateway.Status.Envoy.ConfigVersion = strconv.FormatInt(time.Now().UTC().UnixMilli(), 10) gateway.Status.Envoy.ResourceHash = snapshotHash return nil @@ -112,11 +107,27 @@ func (r *APIGatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } - logger.Info("Propagating Envoy snapshot", "version", gateway.Status.Envoy.ConfigVersion) + if opResult == controllerutil.OperationResultNone { + logger.Info("No changes detected in APIGateway object") + return ctrl.Result{}, nil + } + + logger.Info("Propagating Envoy snapshot", "version", snapshotVersion) if err := r.Cache.SetSnapshot(ctx, instance, snapshot); err != nil { return ctrl.Result{}, fmt.Errorf("failed to propagate snapshot: %w", err) } + if info := r.Cache.GetStatusInfo(instance); info != nil { + node := info.GetNode() + logger = logger.WithValues( + "node.id", node.Id, + "node.cluster", node.Cluster, + "node.num_watches", info.GetNumWatches(), + ) + } + + logger.Info("Envoy snapshot propagated successfully") + return ctrl.Result{}, nil } diff --git a/internal/controlplane/auth_filters.go b/internal/controlplane/auth_filters.go index 7305080..e3d9b31 100644 --- a/internal/controlplane/auth_filters.go +++ b/internal/controlplane/auth_filters.go @@ -148,6 +148,12 @@ func RBACAllowAllConfig() *rbacv3.RBAC { } func RBACRequireAuthConfig() *rbacv3.RBAC { + /* + Identifier: &rbacv3cfg.Principal_SourcedMetadata{ + SourcedMetadata: &rbacv3cfg.SourcedMetadata{ + MetadataSource: rbacv3cfg.MetadataSource_DYNAMIC, + MetadataMatcher: &matcherv3.MetadataMatcher{ + */ return &rbacv3.RBAC{ Rules: &rbacv3cfg.RBAC{ Action: rbacv3cfg.RBAC_ALLOW, diff --git a/internal/controlplane/snapshot.go b/internal/controlplane/snapshot.go index 93b0da7..1133f68 100644 --- a/internal/controlplane/snapshot.go +++ b/internal/controlplane/snapshot.go @@ -46,7 +46,6 @@ import ( discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" supabasev1alpha1 "code.icb4dc0.de/prskr/supabase-operator/api/v1alpha1" "code.icb4dc0.de/prskr/supabase-operator/internal/supabase" @@ -130,8 +129,6 @@ func (s EnvoyServices) Targets() map[string][]string { } func (s *EnvoyServices) snapshot(ctx context.Context, instance, version string) (snapshot *cache.Snapshot, snapshotHash []byte, err error) { - logger := log.FromContext(ctx) - listeners := []*listenerv3.Listener{{ Name: apilistenerName, Address: &corev3.Address{ @@ -160,14 +157,12 @@ func (s *EnvoyServices) snapshot(ctx context.Context, instance, version string) }} if studioListener := s.studioListener(); studioListener != nil { - logger.Info("Adding studio listener") listeners = append(listeners, studioListener) } routes := []types.Resource{s.apiRouteConfiguration(instance)} if studioRouteCfg := s.studioRoute(instance); studioRouteCfg != nil { - logger.Info("Adding studio route") routes = append(routes, studioRouteCfg) } @@ -184,7 +179,6 @@ func (s *EnvoyServices) snapshot(ctx context.Context, instance, version string) if oauth2TokenEndpointCluster, err := s.oauth2TokenEndpointCluster(); err != nil { return nil, nil, err } else { - logger.Info("Adding OAuth2 token endpoint cluster") clusters = append(clusters, oauth2TokenEndpointCluster) } }