/* Copyright 2024 Peter Kurfer. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controlplane import ( "context" "errors" "fmt" "slices" "strconv" "sync" "time" corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" router "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "google.golang.org/protobuf/types/known/anypb" discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/watch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "code.icb4dc0.de/prskr/supabase-operator/internal/meta" "code.icb4dc0.de/prskr/supabase-operator/internal/supabase" ) var ( ErrUnexpectedObject = errors.New("unexpected object") ErrNoEnvoyClusterLabel = errors.New("no Envoy cluster label set") supabaseServices = []string{ supabase.ServiceConfig.Postgrest.Name, supabase.ServiceConfig.Auth.Name, supabase.ServiceConfig.PGMeta.Name, } ) type EndpointsController struct { lock sync.Mutex Client client.WithWatch Cache cache.SnapshotCache envoyClusters map[string]*envoyClusterServices } func (c *EndpointsController) Run(ctx context.Context) error { var ( logger = ctrl.Log.WithName("endpoints-controller") endpointSlices discoveryv1.EndpointSliceList ) selector := labels.NewSelector() partOfRequirement, err := labels.NewRequirement(meta.WellKnownLabel.PartOf, selection.Equals, []string{"supabase"}) if err != nil { return fmt.Errorf("preparing watcher selectors: %w", err) } nameRequirement, err := labels.NewRequirement(meta.WellKnownLabel.Name, selection.In, supabaseServices) if err != nil { return fmt.Errorf("preparing watcher selectors: %w", err) } envoyClusterRequirement, err := labels.NewRequirement(meta.SupabaseLabel.EnvoyCluster, selection.Exists, nil) if err != nil { return fmt.Errorf("preparing watcher selectors: %w", err) } selector.Add(*partOfRequirement, *nameRequirement, *envoyClusterRequirement) watcher, err := c.Client.Watch( ctx, &endpointSlices, client.MatchingLabelsSelector{ Selector: selector.Add(*partOfRequirement, *nameRequirement, *envoyClusterRequirement), }, ) if err != nil { return err } defer watcher.Stop() for { select { case ev, more := <-watcher.ResultChan(): if !more { return nil } eventLogger := logger.WithValues("event_type", ev.Type) switch ev.Type { case watch.Added, watch.Modified: eps, ok := ev.Object.(*discoveryv1.EndpointSlice) if !ok { logger.Error(fmt.Errorf("%w: %T", ErrUnexpectedObject, ev.Object), "expected EndpointSlice but got a different object type") continue } if err := c.handleModificationEvent(log.IntoContext(ctx, eventLogger), eps); err != nil { logger.Error(err, "error occurred during event handling") } } case <-ctx.Done(): return ctx.Err() } } } func (c *EndpointsController) handleModificationEvent(ctx context.Context, epSlice *discoveryv1.EndpointSlice) error { c.lock.Lock() defer c.lock.Unlock() var ( logger = log.FromContext(ctx) instanceKey string svc *envoyClusterServices ) logger.Info("Observed endpoint slice", "name", epSlice.Name) if c.envoyClusters == nil { c.envoyClusters = make(map[string]*envoyClusterServices) } envoyNodeName, ok := epSlice.Labels[meta.SupabaseLabel.EnvoyCluster] if !ok { return fmt.Errorf("%w: at object %s", ErrNoEnvoyClusterLabel, epSlice.Name) } instanceKey = fmt.Sprintf("%s:%s", envoyNodeName, epSlice.Namespace) if svc, ok = c.envoyClusters[instanceKey]; !ok { svc = new(envoyClusterServices) } svc.UpsertEndpoints(epSlice) c.envoyClusters[instanceKey] = svc return c.updateSnapshot(ctx, instanceKey) } func (c *EndpointsController) updateSnapshot(ctx context.Context, instance string) error { latestVersion := strconv.FormatInt(time.Now().UTC().UnixMilli(), 10) snapshot, err := c.envoyClusters[instance].snapshot(instance, latestVersion) if err != nil { return err } return c.Cache.SetSnapshot(ctx, instance, snapshot) } type envoyClusterServices struct { Postgrest *PostgrestCluster GoTrue *GoTrueCluster PGMeta *PGMetaCluster Studio *StudioCluster } func (s *envoyClusterServices) UpsertEndpoints(eps *discoveryv1.EndpointSlice) { switch eps.Labels[meta.WellKnownLabel.Name] { case supabase.ServiceConfig.Postgrest.Name: if s.Postgrest == nil { s.Postgrest = new(PostgrestCluster) } s.Postgrest.AddOrUpdateEndpoints(eps) case supabase.ServiceConfig.Auth.Name: if s.GoTrue == nil { s.GoTrue = new(GoTrueCluster) } s.GoTrue.AddOrUpdateEndpoints(eps) case supabase.ServiceConfig.PGMeta.Name: if s.PGMeta == nil { s.PGMeta = new(PGMetaCluster) } s.PGMeta.AddOrUpdateEndpoints(eps) } } func (s *envoyClusterServices) snapshot(instance, version string) (*cache.Snapshot, error) { const ( apiRouteName = "supabase" studioRouteName = "supabas-studio" vHostName = "supabase" listenerName = "supabase" ) apiConnectionManager := &hcm.HttpConnectionManager{ CodecType: hcm.HttpConnectionManager_AUTO, StatPrefix: "http", RouteSpecifier: &hcm.HttpConnectionManager_Rds{ Rds: &hcm.Rds{ ConfigSource: &corev3.ConfigSource{ ResourceApiVersion: resource.DefaultAPIVersion, ConfigSourceSpecifier: &corev3.ConfigSource_ApiConfigSource{ ApiConfigSource: &corev3.ApiConfigSource{ TransportApiVersion: resource.DefaultAPIVersion, ApiType: corev3.ApiConfigSource_GRPC, SetNodeOnFirstMessageOnly: true, GrpcServices: []*corev3.GrpcService{{ TargetSpecifier: &corev3.GrpcService_EnvoyGrpc_{ EnvoyGrpc: &corev3.GrpcService_EnvoyGrpc{ClusterName: "supabase-control-plane"}, }, }}, }, }, }, RouteConfigName: apiRouteName, }, }, HttpFilters: []*hcm.HttpFilter{ { Name: FilterNameJwtAuthn, ConfigType: &hcm.HttpFilter_TypedConfig{TypedConfig: MustAny(JWTFilterConfig())}, }, { Name: FilterNameCORS, ConfigType: &hcm.HttpFilter_TypedConfig{TypedConfig: MustAny(Cors())}, }, { Name: FilterNameHttpRouter, ConfigType: &hcm.HttpFilter_TypedConfig{TypedConfig: MustAny(new(router.Router))}, }, }, } studioConnetionManager := &hcm.HttpConnectionManager{ CodecType: hcm.HttpConnectionManager_AUTO, StatPrefix: "http", RouteSpecifier: &hcm.HttpConnectionManager_Rds{ Rds: &hcm.Rds{ ConfigSource: &corev3.ConfigSource{ ResourceApiVersion: resource.DefaultAPIVersion, ConfigSourceSpecifier: &corev3.ConfigSource_ApiConfigSource{ ApiConfigSource: &corev3.ApiConfigSource{ TransportApiVersion: resource.DefaultAPIVersion, ApiType: corev3.ApiConfigSource_GRPC, SetNodeOnFirstMessageOnly: true, GrpcServices: []*corev3.GrpcService{{ TargetSpecifier: &corev3.GrpcService_EnvoyGrpc_{ EnvoyGrpc: &corev3.GrpcService_EnvoyGrpc{ClusterName: "supabase-control-plane"}, }, }}, }, }, }, RouteConfigName: studioRouteName, }, }, HttpFilters: []*hcm.HttpFilter{ { Name: FilterNameHttpRouter, ConfigType: &hcm.HttpFilter_TypedConfig{TypedConfig: MustAny(new(router.Router))}, }, }, } apiRouteCfg := &route.RouteConfiguration{ Name: apiRouteName, VirtualHosts: []*route.VirtualHost{{ Name: "supabase", Domains: []string{"*"}, TypedPerFilterConfig: map[string]*anypb.Any{ FilterNameJwtAuthn: MustAny(JWTPerRouteConfig()), FilterNameRBAC: MustAny(RBACPerRoute(RBACRequireAuthConfig())), }, Routes: slices.Concat( s.Postgrest.Routes(instance), s.GoTrue.Routes(instance), s.PGMeta.Routes(instance), ), }}, TypedPerFilterConfig: map[string]*anypb.Any{ FilterNameCORS: MustAny(CorsPolicy()), }, } // TODO add studio route config listeners := []*listenerv3.Listener{{ Name: listenerName, Address: &corev3.Address{ Address: &corev3.Address_SocketAddress{ SocketAddress: &corev3.SocketAddress{ Protocol: corev3.SocketAddress_TCP, Address: "0.0.0.0", PortSpecifier: &corev3.SocketAddress_PortValue{ PortValue: 8000, }, }, }, }, FilterChains: []*listenerv3.FilterChain{ { Filters: []*listenerv3.Filter{ { Name: FilterNameHttpConnectionManager, ConfigType: &listenerv3.Filter_TypedConfig{ TypedConfig: MustAny(apiConnectionManager), }, }, }, }, }, }} if s.Studio != nil { listeners = append(listeners, &listenerv3.Listener{ Name: "studio", Address: &corev3.Address{ Address: &corev3.Address_SocketAddress{ SocketAddress: &corev3.SocketAddress{ Protocol: corev3.SocketAddress_TCP, Address: "0.0.0.0", PortSpecifier: &corev3.SocketAddress_PortValue{ PortValue: 3000, }, }, }, }, FilterChains: []*listenerv3.FilterChain{ { Filters: []*listenerv3.Filter{ { Name: FilterNameHttpConnectionManager, ConfigType: &listenerv3.Filter_TypedConfig{ TypedConfig: MustAny(studioConnetionManager), }, }, }, }, }, }) } rawSnapshot := map[resource.Type][]types.Resource{ resource.ClusterType: castResources( slices.Concat( s.Postgrest.Cluster(instance), s.GoTrue.Cluster(instance), s.PGMeta.Cluster(instance), )...), resource.RouteType: {apiRouteCfg}, resource.ListenerType: castResources(listeners...), } snapshot, err := cache.NewSnapshot( version, rawSnapshot, ) if err != nil { return nil, err } if err := snapshot.Consistent(); err != nil { return nil, err } return snapshot, nil } func castResources[T types.Resource](from ...T) []types.Resource { result := make([]types.Resource, len(from)) for idx := range from { result[idx] = from[idx] } return result }