feat(storage): finish initial basic implementation
- support both s3 & file storage backends - support imgproxy to scale images - manually tested with MinIO & local storage - fixed service discovery issue in APIGatey reconciler not detecting service changes - refactored defaults and env variable code to make it manageable again - add repo link to docs
This commit is contained in:
parent
604525de38
commit
0014927ca9
46 changed files with 16170 additions and 606 deletions
|
@ -35,8 +35,10 @@ import (
|
|||
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
|
||||
supabasev1alpha1 "code.icb4dc0.de/prskr/supabase-operator/api/v1alpha1"
|
||||
"code.icb4dc0.de/prskr/supabase-operator/internal/meta"
|
||||
|
@ -129,12 +131,23 @@ func (r *APIGatewayReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Ma
|
|||
return fmt.Errorf("constructor selector for watching secrets: %w", err)
|
||||
}
|
||||
|
||||
apiGatewayTargetSelector, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
|
||||
MatchExpressions: []metav1.LabelSelectorRequirement{{
|
||||
Key: meta.SupabaseLabel.ApiGatewayTarget,
|
||||
Operator: metav1.LabelSelectorOpExists,
|
||||
}},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build selector for watching API target services: %w", err)
|
||||
}
|
||||
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&supabasev1alpha1.APIGateway{}).
|
||||
Named("apigateway").
|
||||
Owns(new(corev1.ConfigMap)).
|
||||
Owns(new(appsv1.Deployment)).
|
||||
Owns(new(corev1.Service)).
|
||||
// watch JWKS secret
|
||||
Watches(
|
||||
new(corev1.Secret),
|
||||
FieldSelectorEventHandler[*supabasev1alpha1.APIGateway, *supabasev1alpha1.APIGatewayList](r.Client,
|
||||
|
@ -145,9 +158,49 @@ func (r *APIGatewayReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Ma
|
|||
reloadSelector,
|
||||
),
|
||||
).
|
||||
Watches(
|
||||
new(corev1.Service),
|
||||
r.apiTargetServiceEventHandler(),
|
||||
builder.WithPredicates(apiGatewayTargetSelector),
|
||||
).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
func (r *APIGatewayReconciler) apiTargetServiceEventHandler() handler.TypedEventHandler[client.Object, reconcile.Request] {
|
||||
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
|
||||
var (
|
||||
list supabasev1alpha1.APIGatewayList
|
||||
logger = log.FromContext(ctx, "object", obj.GetName(), "namespace", obj.GetNamespace())
|
||||
)
|
||||
|
||||
targetName, ok := obj.GetLabels()[meta.SupabaseLabel.ApiGatewayTarget]
|
||||
if !ok {
|
||||
logger.Info("Service is not APIGateway target")
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.Client.List(ctx, &list, client.InNamespace(obj.GetNamespace())); err != nil {
|
||||
logger.Error(err, "Failed to list Services to map updates to APIGateway reconciliation requests")
|
||||
return nil
|
||||
}
|
||||
|
||||
if targetName != "" {
|
||||
for gw := range list.Iter() {
|
||||
if gw.Name == targetName {
|
||||
return []reconcile.Request{{NamespacedName: client.ObjectKeyFromObject(gw)}}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
requests := make([]reconcile.Request, 0, len(list.Items))
|
||||
for gw := range list.Iter() {
|
||||
requests = append(requests, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(gw)})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (r *APIGatewayReconciler) reconcileJwksSecret(
|
||||
ctx context.Context,
|
||||
gateway *supabasev1alpha1.APIGateway,
|
||||
|
|
|
@ -19,7 +19,7 @@ package controller
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"hash/fnv"
|
||||
"maps"
|
||||
"net/url"
|
||||
"time"
|
||||
|
@ -173,7 +173,7 @@ func (r *CoreDbReconciler) ensureDbRolesSecrets(
|
|||
core.Status.Database.Roles = make(map[string][]byte)
|
||||
}
|
||||
|
||||
hash := sha256.New()
|
||||
hash := fnv.New64a()
|
||||
|
||||
for secretName, role := range roles {
|
||||
secretLogger := logger.WithValues("secret_name", secretName, "role_name", role.String())
|
||||
|
|
|
@ -71,7 +71,7 @@ func (r *CoreAuthReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r
|
|||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
func (r *CoreAuthReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
// TODO watch changes in DB credentials secret
|
||||
// TODO watch changes in DB credentials secret & JWT secret
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(new(supabasev1alpha1.Core)).
|
||||
Owns(new(appsv1.Deployment)).
|
||||
|
|
|
@ -91,10 +91,7 @@ func (r *CorePostgrestReconiler) reconilePostgrestDeployment(
|
|||
postgrestDeployment = &appsv1.Deployment{
|
||||
ObjectMeta: serviceCfg.ObjectMeta(core),
|
||||
}
|
||||
postgrestSpec = core.Spec.Postgrest
|
||||
)
|
||||
|
||||
var (
|
||||
postgrestSpec = core.Spec.Postgrest
|
||||
anonRole = ValueOrFallback(postgrestSpec.AnonRole, serviceCfg.Defaults.AnonRole)
|
||||
postgrestSchemas = ValueOrFallback(postgrestSpec.Schemas, serviceCfg.Defaults.Schemas)
|
||||
jwtSecretHash string
|
||||
|
@ -178,12 +175,12 @@ func (r *CorePostgrestReconiler) reconilePostgrestDeployment(
|
|||
Env: postgrestSpec.WorkloadTemplate.MergeEnv(postgrestEnv),
|
||||
Ports: []corev1.ContainerPort{
|
||||
{
|
||||
Name: "rest",
|
||||
Name: serviceCfg.Defaults.ServerPortName,
|
||||
ContainerPort: serviceCfg.Defaults.ServerPort,
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
},
|
||||
{
|
||||
Name: "admin",
|
||||
Name: serviceCfg.Defaults.AdminPortName,
|
||||
ContainerPort: serviceCfg.Defaults.AdminPort,
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
},
|
||||
|
@ -234,13 +231,16 @@ func (r *CorePostgrestReconiler) reconcilePostgrestService(
|
|||
ctx context.Context,
|
||||
core *supabasev1alpha1.Core,
|
||||
) error {
|
||||
postgrestService := &corev1.Service{
|
||||
ObjectMeta: supabase.ServiceConfig.Postgrest.ObjectMeta(core),
|
||||
}
|
||||
var (
|
||||
serviceCfg = supabase.ServiceConfig.Postgrest
|
||||
postgrestService = &corev1.Service{
|
||||
ObjectMeta: supabase.ServiceConfig.Postgrest.ObjectMeta(core),
|
||||
}
|
||||
)
|
||||
|
||||
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, postgrestService, func() error {
|
||||
postgrestService.Labels = core.Spec.Postgrest.WorkloadTemplate.MergeLabels(
|
||||
objectLabels(core, supabase.ServiceConfig.Postgrest.Name, "core", supabase.Images.Postgrest.Tag),
|
||||
objectLabels(core, serviceCfg.Name, "core", supabase.Images.Postgrest.Tag),
|
||||
core.Labels,
|
||||
)
|
||||
|
||||
|
@ -249,14 +249,14 @@ func (r *CorePostgrestReconiler) reconcilePostgrestService(
|
|||
}
|
||||
|
||||
postgrestService.Spec = corev1.ServiceSpec{
|
||||
Selector: selectorLabels(core, supabase.ServiceConfig.Postgrest.Name),
|
||||
Selector: selectorLabels(core, serviceCfg.Name),
|
||||
Ports: []corev1.ServicePort{
|
||||
{
|
||||
Name: "rest",
|
||||
Name: serviceCfg.Defaults.ServerPortName,
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
AppProtocol: ptrOf("http"),
|
||||
Port: 3000,
|
||||
TargetPort: intstr.IntOrString{IntVal: 3000},
|
||||
Port: serviceCfg.Defaults.ServerPort,
|
||||
TargetPort: intstr.IntOrString{IntVal: serviceCfg.Defaults.ServerPort},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
309
internal/controller/storage_api_controller.go
Normal file
309
internal/controller/storage_api_controller.go
Normal file
|
@ -0,0 +1,309 @@
|
|||
/*
|
||||
Copyright 2025 Peter Kurfer.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"slices"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
|
||||
supabasev1alpha1 "code.icb4dc0.de/prskr/supabase-operator/api/v1alpha1"
|
||||
"code.icb4dc0.de/prskr/supabase-operator/internal/meta"
|
||||
"code.icb4dc0.de/prskr/supabase-operator/internal/supabase"
|
||||
)
|
||||
|
||||
// StorageApiReconciler reconciles a Storage object
|
||||
type StorageApiReconciler struct {
|
||||
client.Client
|
||||
Scheme *runtime.Scheme
|
||||
}
|
||||
|
||||
// Reconcile is part of the main kubernetes reconciliation loop which aims to
|
||||
// move the current state of the cluster closer to the desired state.
|
||||
//
|
||||
// For more details, check Reconcile and its Result here:
|
||||
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.4/pkg/reconcile
|
||||
func (r *StorageApiReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
var (
|
||||
storage supabasev1alpha1.Storage
|
||||
logger = log.FromContext(ctx)
|
||||
)
|
||||
|
||||
if err := r.Get(ctx, req.NamespacedName, &storage); err != nil {
|
||||
if client.IgnoreNotFound(err) != nil {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
logger.Info("Reconciling Storage API")
|
||||
|
||||
if err := r.reconcileStorageApiDeployment(ctx, &storage); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
if err := r.reconcileStorageApiService(ctx, &storage); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
func (r *StorageApiReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&supabasev1alpha1.Storage{}).
|
||||
Named("storage-api").
|
||||
Owns(new(corev1.Secret)).
|
||||
Owns(new(appsv1.Deployment)).
|
||||
Owns(new(corev1.Service)).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
func (r *StorageApiReconciler) reconcileStorageApiDeployment(
|
||||
ctx context.Context,
|
||||
storage *supabasev1alpha1.Storage,
|
||||
) error {
|
||||
var (
|
||||
serviceCfg = supabase.ServiceConfig.Storage
|
||||
apiSpec = storage.Spec.Api
|
||||
storageApiDeployment = &appsv1.Deployment{
|
||||
ObjectMeta: serviceCfg.ObjectMeta(storage),
|
||||
}
|
||||
|
||||
jwtSecret = &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: apiSpec.JwtAuth.SecretName,
|
||||
Namespace: storage.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
s3ProtocolSecret = &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: apiSpec.S3Protocol.CredentialsSecretRef.SecretName,
|
||||
Namespace: storage.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
jwtStateHash, s3ProtoCredentialsStateHash string
|
||||
)
|
||||
|
||||
if err := r.Get(ctx, client.ObjectKeyFromObject(jwtSecret), jwtSecret); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jwtStateHash = base64.StdEncoding.EncodeToString(HashBytes(
|
||||
jwtSecret.Data[apiSpec.JwtAuth.SecretKey],
|
||||
jwtSecret.Data[apiSpec.JwtAuth.JwksKey],
|
||||
))
|
||||
|
||||
if err := r.Get(ctx, client.ObjectKeyFromObject(s3ProtocolSecret), s3ProtocolSecret); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s3ProtoCredentialsStateHash = base64.StdEncoding.EncodeToString(HashBytes(
|
||||
s3ProtocolSecret.Data[apiSpec.S3Protocol.CredentialsSecretRef.AccessKeyIdKey],
|
||||
s3ProtocolSecret.Data[apiSpec.S3Protocol.CredentialsSecretRef.AccessSecretKeyKey],
|
||||
))
|
||||
|
||||
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, storageApiDeployment, func() error {
|
||||
storageApiDeployment.Labels = apiSpec.WorkloadTemplate.MergeLabels(
|
||||
objectLabels(storage, serviceCfg.Name, "storage", supabase.Images.Storage.Tag),
|
||||
storage.Labels,
|
||||
)
|
||||
|
||||
storagApiEnv := []corev1.EnvVar{
|
||||
{
|
||||
Name: "DB_USERNAME",
|
||||
ValueFrom: &corev1.EnvVarSource{
|
||||
SecretKeyRef: &corev1.SecretKeySelector{
|
||||
LocalObjectReference: corev1.LocalObjectReference{
|
||||
Name: apiSpec.DBSpec.DBCredentialsRef.SecretName,
|
||||
},
|
||||
Key: apiSpec.DBSpec.DBCredentialsRef.UsernameKey,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "DB_PASSWORD",
|
||||
ValueFrom: &corev1.EnvVarSource{
|
||||
SecretKeyRef: &corev1.SecretKeySelector{
|
||||
LocalObjectReference: corev1.LocalObjectReference{
|
||||
Name: apiSpec.DBSpec.DBCredentialsRef.SecretName,
|
||||
},
|
||||
Key: apiSpec.DBSpec.DBCredentialsRef.PasswordKey,
|
||||
},
|
||||
},
|
||||
},
|
||||
serviceCfg.EnvKeys.DatabaseDSN.Var(fmt.Sprintf("postgres://$(DB_USERNAME):$(DB_PASSWORD)@%s:%d/%s", apiSpec.DBSpec.Host, apiSpec.DBSpec.Port, apiSpec.DBSpec.DBName)),
|
||||
serviceCfg.EnvKeys.ServiceKey.Var(apiSpec.JwtAuth.ServiceKeySelector()),
|
||||
serviceCfg.EnvKeys.JwtSecret.Var(apiSpec.JwtAuth.SecretKeySelector()),
|
||||
serviceCfg.EnvKeys.JwtJwks.Var(apiSpec.JwtAuth.JwksKeySelector()),
|
||||
serviceCfg.EnvKeys.S3ProtocolPrefix.Var(),
|
||||
serviceCfg.EnvKeys.S3ProtocolAllowForwardedHeader.Var(apiSpec.S3Protocol.AllowForwardedHeader),
|
||||
serviceCfg.EnvKeys.S3ProtocolAccessKeyId.Var(apiSpec.S3Protocol.CredentialsSecretRef.AccessKeyIdSelector()),
|
||||
serviceCfg.EnvKeys.S3ProtocolAccessKeySecret.Var(apiSpec.S3Protocol.CredentialsSecretRef.AccessSecretKeySelector()),
|
||||
serviceCfg.EnvKeys.TusUrlPath.Var(),
|
||||
serviceCfg.EnvKeys.FileSizeLimit.Var(apiSpec.FileSizeLimit),
|
||||
serviceCfg.EnvKeys.UploadFileSizeLimit.Var(apiSpec.FileSizeLimit),
|
||||
serviceCfg.EnvKeys.UploadFileSizeLimitStandard.Var(apiSpec.FileSizeLimit),
|
||||
serviceCfg.EnvKeys.AnonKey.Var(apiSpec.JwtAuth.AnonKeySelector()),
|
||||
// TODO: https://github.com/supabase/storage-api/issues/55
|
||||
serviceCfg.EnvKeys.FileStorageRegion.Var(),
|
||||
}
|
||||
|
||||
if storage.Spec.ImageProxy != nil && storage.Spec.ImageProxy.Enable {
|
||||
storagApiEnv = append(storagApiEnv, serviceCfg.EnvKeys.ImgProxyURL.Var(fmt.Sprintf("http://%s.%s.svc:%d", supabase.ServiceConfig.ImgProxy.ObjectName(storage), storage.Namespace, supabase.ServiceConfig.ImgProxy.Defaults.ApiPort)))
|
||||
}
|
||||
|
||||
if storageApiDeployment.CreationTimestamp.IsZero() {
|
||||
storageApiDeployment.Spec.Selector = &metav1.LabelSelector{
|
||||
MatchLabels: selectorLabels(storage, serviceCfg.Name),
|
||||
}
|
||||
}
|
||||
|
||||
storageApiDeployment.Spec.Replicas = apiSpec.WorkloadTemplate.ReplicaCount()
|
||||
|
||||
storageApiDeployment.Spec.Template = corev1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Annotations: map[string]string{
|
||||
fmt.Sprintf("%s/%s", supabasev1alpha1.GroupVersion.Group, "jwt-hash"): jwtStateHash,
|
||||
fmt.Sprintf("%s/%s", supabasev1alpha1.GroupVersion.Group, "s3-credentials-hash"): s3ProtoCredentialsStateHash,
|
||||
},
|
||||
Labels: objectLabels(storage, serviceCfg.Name, "storage", supabase.Images.Storage.Tag),
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
ImagePullSecrets: apiSpec.WorkloadTemplate.PullSecrets(),
|
||||
Containers: []corev1.Container{{
|
||||
Name: "supabase-storage",
|
||||
Image: apiSpec.WorkloadTemplate.Image(supabase.Images.Storage.String()),
|
||||
ImagePullPolicy: apiSpec.WorkloadTemplate.ImagePullPolicy(),
|
||||
Env: apiSpec.WorkloadTemplate.MergeEnv(append(storagApiEnv, slices.Concat(apiSpec.FileBackend.Env(), apiSpec.S3Backend.Env())...)),
|
||||
Ports: []corev1.ContainerPort{{
|
||||
Name: serviceCfg.Defaults.ApiPortName,
|
||||
ContainerPort: serviceCfg.Defaults.ApiPort,
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
}},
|
||||
SecurityContext: apiSpec.WorkloadTemplate.ContainerSecurityContext(serviceCfg.Defaults.UID, serviceCfg.Defaults.GID),
|
||||
Resources: apiSpec.WorkloadTemplate.Resources(),
|
||||
VolumeMounts: apiSpec.WorkloadTemplate.AdditionalVolumeMounts(
|
||||
corev1.VolumeMount{
|
||||
Name: "tmp",
|
||||
MountPath: "/tmp",
|
||||
},
|
||||
),
|
||||
ReadinessProbe: &corev1.Probe{
|
||||
InitialDelaySeconds: 5,
|
||||
PeriodSeconds: 3,
|
||||
TimeoutSeconds: 1,
|
||||
SuccessThreshold: 2,
|
||||
ProbeHandler: corev1.ProbeHandler{
|
||||
HTTPGet: &corev1.HTTPGetAction{
|
||||
Path: "/status",
|
||||
Port: intstr.IntOrString{IntVal: serviceCfg.Defaults.ApiPort},
|
||||
},
|
||||
},
|
||||
},
|
||||
LivenessProbe: &corev1.Probe{
|
||||
InitialDelaySeconds: 10,
|
||||
PeriodSeconds: 5,
|
||||
TimeoutSeconds: 3,
|
||||
ProbeHandler: corev1.ProbeHandler{
|
||||
HTTPGet: &corev1.HTTPGetAction{
|
||||
Path: "/status",
|
||||
Port: intstr.IntOrString{IntVal: serviceCfg.Defaults.ApiPort},
|
||||
},
|
||||
},
|
||||
},
|
||||
}},
|
||||
SecurityContext: apiSpec.WorkloadTemplate.PodSecurityContext(),
|
||||
Volumes: apiSpec.WorkloadTemplate.Volumes(
|
||||
corev1.Volume{
|
||||
Name: "tmp",
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
EmptyDir: apiSpec.UploadTemp.VolumeSource(),
|
||||
},
|
||||
},
|
||||
),
|
||||
},
|
||||
}
|
||||
|
||||
if err := controllerutil.SetControllerReference(storage, storageApiDeployment, r.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *StorageApiReconciler) reconcileStorageApiService(
|
||||
ctx context.Context,
|
||||
storage *supabasev1alpha1.Storage,
|
||||
) error {
|
||||
var (
|
||||
serviceCfg = supabase.ServiceConfig.Storage
|
||||
storageApiService = &corev1.Service{
|
||||
ObjectMeta: supabase.ServiceConfig.Storage.ObjectMeta(storage),
|
||||
}
|
||||
)
|
||||
|
||||
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, storageApiService, func() error {
|
||||
storageApiService.Labels = storage.Spec.Api.WorkloadTemplate.MergeLabels(
|
||||
objectLabels(storage, serviceCfg.Name, "storage", supabase.Images.Storage.Tag),
|
||||
storage.Labels,
|
||||
)
|
||||
|
||||
if _, ok := storageApiService.Labels[meta.SupabaseLabel.ApiGatewayTarget]; !ok {
|
||||
storageApiService.Labels[meta.SupabaseLabel.ApiGatewayTarget] = ""
|
||||
}
|
||||
|
||||
storageApiService.Spec = corev1.ServiceSpec{
|
||||
Selector: selectorLabels(storage, serviceCfg.Name),
|
||||
Ports: []corev1.ServicePort{
|
||||
{
|
||||
Name: serviceCfg.Defaults.ApiPortName,
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
AppProtocol: ptrOf("http"),
|
||||
Port: serviceCfg.Defaults.ApiPort,
|
||||
TargetPort: intstr.IntOrString{IntVal: serviceCfg.Defaults.ApiPort},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if err := controllerutil.SetControllerReference(storage, storageApiService, r.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
|
@ -1,55 +0,0 @@
|
|||
/*
|
||||
Copyright 2025 Peter Kurfer.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
|
||||
supabasev1alpha1 "code.icb4dc0.de/prskr/supabase-operator/api/v1alpha1"
|
||||
)
|
||||
|
||||
// StorageReconciler reconciles a Storage object
|
||||
type StorageReconciler struct {
|
||||
client.Client
|
||||
Scheme *runtime.Scheme
|
||||
}
|
||||
|
||||
// Reconcile is part of the main kubernetes reconciliation loop which aims to
|
||||
// move the current state of the cluster closer to the desired state.
|
||||
//
|
||||
// For more details, check Reconcile and its Result here:
|
||||
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.4/pkg/reconcile
|
||||
func (r *StorageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
_ = log.FromContext(ctx)
|
||||
|
||||
// TODO(user): your logic here
|
||||
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
func (r *StorageReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&supabasev1alpha1.Storage{}).
|
||||
Named("storage").
|
||||
Complete(r)
|
||||
}
|
218
internal/controller/storage_imgproxy_controller.go
Normal file
218
internal/controller/storage_imgproxy_controller.go
Normal file
|
@ -0,0 +1,218 @@
|
|||
/*
|
||||
Copyright 2025 Peter Kurfer.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
|
||||
supabasev1alpha1 "code.icb4dc0.de/prskr/supabase-operator/api/v1alpha1"
|
||||
"code.icb4dc0.de/prskr/supabase-operator/internal/supabase"
|
||||
)
|
||||
|
||||
type StorageImgProxyReconciler struct {
|
||||
client.Client
|
||||
Scheme *runtime.Scheme
|
||||
}
|
||||
|
||||
// Reconcile is part of the main kubernetes reconciliation loop which aims to
|
||||
// move the current state of the cluster closer to the desired state.
|
||||
//
|
||||
// For more details, check Reconcile and its Result here:
|
||||
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.4/pkg/reconcile
|
||||
func (r *StorageImgProxyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
var (
|
||||
storage supabasev1alpha1.Storage
|
||||
logger = log.FromContext(ctx)
|
||||
)
|
||||
|
||||
if err := r.Get(ctx, req.NamespacedName, &storage); err != nil {
|
||||
if client.IgnoreNotFound(err) != nil {
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
logger.Info("Reconciling Storage API")
|
||||
|
||||
if storage.Spec.ImageProxy == nil || !storage.Spec.ImageProxy.Enable {
|
||||
logger.Info("ImgProxy is not enabled - skipping")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
if err := r.reconcileImgProxyDeployment(ctx, &storage); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
if err := r.reconcileImgProxyService(ctx, &storage); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
func (r *StorageImgProxyReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&supabasev1alpha1.Storage{}).
|
||||
Named("storage-imgproxy").
|
||||
Owns(new(appsv1.Deployment)).
|
||||
Owns(new(corev1.Service)).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
func (r *StorageImgProxyReconciler) reconcileImgProxyDeployment(
|
||||
ctx context.Context,
|
||||
storage *supabasev1alpha1.Storage,
|
||||
) error {
|
||||
var (
|
||||
serviceCfg = supabase.ServiceConfig.ImgProxy
|
||||
imgProxySpec = storage.Spec.ImageProxy
|
||||
imgProxyDeployment = &appsv1.Deployment{
|
||||
ObjectMeta: serviceCfg.ObjectMeta(storage),
|
||||
}
|
||||
)
|
||||
|
||||
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, imgProxyDeployment, func() error {
|
||||
imgProxyDeployment.Labels = imgProxySpec.WorkloadTemplate.MergeLabels(
|
||||
objectLabels(storage, serviceCfg.Name, "storage", supabase.Images.ImgProxy.Tag),
|
||||
storage.Labels,
|
||||
)
|
||||
|
||||
imgProxyEnv := []corev1.EnvVar{
|
||||
serviceCfg.EnvKeys.Bind.Var(),
|
||||
serviceCfg.EnvKeys.UseETag.Var(),
|
||||
serviceCfg.EnvKeys.EnableWebPDetection.Var(imgProxySpec.EnabledWebPDetection),
|
||||
}
|
||||
|
||||
if storage.Spec.Api.FileBackend != nil {
|
||||
imgProxyEnv = append(imgProxyEnv, serviceCfg.EnvKeys.LocalFileSystemRoot.Var(storage.Spec.Api.FileBackend.Path))
|
||||
}
|
||||
|
||||
if imgProxyDeployment.CreationTimestamp.IsZero() {
|
||||
imgProxyDeployment.Spec.Selector = &metav1.LabelSelector{
|
||||
MatchLabels: selectorLabels(storage, serviceCfg.Name),
|
||||
}
|
||||
}
|
||||
|
||||
imgProxyDeployment.Spec.Replicas = imgProxySpec.WorkloadTemplate.ReplicaCount()
|
||||
|
||||
imgProxyDeployment.Spec.Template = corev1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: objectLabels(storage, serviceCfg.Name, "storage", supabase.Images.ImgProxy.Tag),
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
ImagePullSecrets: imgProxySpec.WorkloadTemplate.PullSecrets(),
|
||||
Containers: []corev1.Container{{
|
||||
Name: "supabase-imgproxy",
|
||||
Image: imgProxySpec.WorkloadTemplate.Image(supabase.Images.ImgProxy.String()),
|
||||
ImagePullPolicy: imgProxySpec.WorkloadTemplate.ImagePullPolicy(),
|
||||
Env: imgProxySpec.WorkloadTemplate.MergeEnv(imgProxyEnv),
|
||||
Ports: []corev1.ContainerPort{{
|
||||
Name: serviceCfg.Defaults.ApiPortName,
|
||||
ContainerPort: serviceCfg.Defaults.ApiPort,
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
}},
|
||||
SecurityContext: imgProxySpec.WorkloadTemplate.ContainerSecurityContext(serviceCfg.Defaults.UID, serviceCfg.Defaults.GID),
|
||||
Resources: imgProxySpec.WorkloadTemplate.Resources(),
|
||||
VolumeMounts: imgProxySpec.WorkloadTemplate.AdditionalVolumeMounts(),
|
||||
ReadinessProbe: &corev1.Probe{
|
||||
InitialDelaySeconds: 5,
|
||||
PeriodSeconds: 3,
|
||||
TimeoutSeconds: 1,
|
||||
SuccessThreshold: 2,
|
||||
ProbeHandler: corev1.ProbeHandler{
|
||||
Exec: &corev1.ExecAction{
|
||||
Command: []string{"imgproxy", "health"},
|
||||
},
|
||||
},
|
||||
},
|
||||
LivenessProbe: &corev1.Probe{
|
||||
InitialDelaySeconds: 10,
|
||||
PeriodSeconds: 5,
|
||||
TimeoutSeconds: 3,
|
||||
ProbeHandler: corev1.ProbeHandler{
|
||||
Exec: &corev1.ExecAction{
|
||||
Command: []string{"imgproxy", "health"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}},
|
||||
SecurityContext: imgProxySpec.WorkloadTemplate.PodSecurityContext(),
|
||||
Volumes: imgProxySpec.WorkloadTemplate.Volumes(),
|
||||
},
|
||||
}
|
||||
|
||||
if err := controllerutil.SetControllerReference(storage, imgProxyDeployment, r.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *StorageImgProxyReconciler) reconcileImgProxyService(
|
||||
ctx context.Context,
|
||||
storage *supabasev1alpha1.Storage,
|
||||
) error {
|
||||
var (
|
||||
serviceCfg = supabase.ServiceConfig.ImgProxy
|
||||
imgProxyService = &corev1.Service{
|
||||
ObjectMeta: supabase.ServiceConfig.Storage.ObjectMeta(storage),
|
||||
}
|
||||
)
|
||||
|
||||
_, err := controllerutil.CreateOrPatch(ctx, r.Client, imgProxyService, func() error {
|
||||
imgProxyService.Labels = storage.Spec.Api.WorkloadTemplate.MergeLabels(
|
||||
objectLabels(storage, serviceCfg.Name, "storage", supabase.Images.ImgProxy.Tag),
|
||||
storage.Labels,
|
||||
)
|
||||
|
||||
imgProxyService.Spec = corev1.ServiceSpec{
|
||||
Selector: selectorLabels(storage, serviceCfg.Name),
|
||||
Ports: []corev1.ServicePort{
|
||||
{
|
||||
Name: serviceCfg.Defaults.ApiPortName,
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
AppProtocol: ptrOf("http"),
|
||||
Port: serviceCfg.Defaults.ApiPort,
|
||||
TargetPort: intstr.IntOrString{IntVal: serviceCfg.Defaults.ApiPort},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if err := controllerutil.SetControllerReference(storage, imgProxyService, r.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
147
internal/controller/storage_s3_creds_controller.go
Normal file
147
internal/controller/storage_s3_creds_controller.go
Normal file
|
@ -0,0 +1,147 @@
|
|||
/*
|
||||
Copyright 2025 Peter Kurfer.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"maps"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
|
||||
supabasev1alpha1 "code.icb4dc0.de/prskr/supabase-operator/api/v1alpha1"
|
||||
"code.icb4dc0.de/prskr/supabase-operator/internal/meta"
|
||||
"code.icb4dc0.de/prskr/supabase-operator/internal/pw"
|
||||
)
|
||||
|
||||
type StorageS3CredentialsReconciler struct {
|
||||
client.Client
|
||||
Scheme *runtime.Scheme
|
||||
}
|
||||
|
||||
func (r *StorageS3CredentialsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
|
||||
var (
|
||||
storage supabasev1alpha1.Storage
|
||||
logger = log.FromContext(ctx)
|
||||
)
|
||||
|
||||
if err := r.Get(ctx, req.NamespacedName, &storage); err != nil {
|
||||
if client.IgnoreNotFound(err) == nil {
|
||||
logger.Info("Storage instance does not exist")
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
if err := r.reconcileS3ProtoSecret(ctx, &storage); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
if storage.Spec.Api.S3Backend != nil {
|
||||
if err := r.reconcileS3StorageSecret(ctx, &storage); err != nil {
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
func (r *StorageS3CredentialsReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(new(supabasev1alpha1.Storage)).
|
||||
Owns(new(corev1.Secret)).
|
||||
Named("storage-s3-creds").
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
func (r *StorageS3CredentialsReconciler) reconcileS3StorageSecret(
|
||||
ctx context.Context,
|
||||
storage *supabasev1alpha1.Storage,
|
||||
) error {
|
||||
if storage.Spec.Api.S3Backend.CredentialsSecretRef == nil {
|
||||
return errors.New("S3 storage credentials secret is empty")
|
||||
}
|
||||
|
||||
s3CredsSecret := &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: storage.Spec.Api.S3Backend.CredentialsSecretRef.SecretName,
|
||||
Namespace: storage.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
if err := r.Get(ctx, client.ObjectKeyFromObject(s3CredsSecret), s3CredsSecret); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := controllerutil.SetControllerReference(storage, s3CredsSecret, r.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return r.Update(ctx, s3CredsSecret)
|
||||
}
|
||||
|
||||
func (r *StorageS3CredentialsReconciler) reconcileS3ProtoSecret(
|
||||
ctx context.Context,
|
||||
storage *supabasev1alpha1.Storage,
|
||||
) error {
|
||||
const (
|
||||
acccessKeyIdAndSecret = 2
|
||||
)
|
||||
s3ProtoSecret := corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: storage.Spec.Api.S3Protocol.CredentialsSecretRef.SecretName,
|
||||
Namespace: storage.Namespace,
|
||||
},
|
||||
}
|
||||
|
||||
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, &s3ProtoSecret, func() error {
|
||||
s3ProtoSecret.Labels = maps.Clone(storage.Labels)
|
||||
if s3ProtoSecret.Labels == nil {
|
||||
s3ProtoSecret.Labels = make(map[string]string)
|
||||
}
|
||||
|
||||
s3ProtoSecret.Labels[meta.SupabaseLabel.Reload] = ""
|
||||
|
||||
if err := controllerutil.SetControllerReference(storage, &s3ProtoSecret, r.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s3ProtoSecret.Data == nil {
|
||||
s3ProtoSecret.Data = make(map[string][]byte, acccessKeyIdAndSecret)
|
||||
}
|
||||
|
||||
if _, ok := s3ProtoSecret.Data[storage.Spec.Api.S3Protocol.CredentialsSecretRef.AccessKeyIdKey]; !ok {
|
||||
s3ProtoSecret.Data[storage.Spec.Api.S3Protocol.CredentialsSecretRef.AccessKeyIdKey] = pw.GeneratePW(32, nil)
|
||||
}
|
||||
|
||||
if _, ok := s3ProtoSecret.Data[storage.Spec.Api.S3Protocol.CredentialsSecretRef.AccessSecretKeyKey]; !ok {
|
||||
s3ProtoSecret.Data[storage.Spec.Api.S3Protocol.CredentialsSecretRef.AccessSecretKeyKey] = pw.GeneratePW(64, nil)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
|
@ -18,7 +18,7 @@ package controller
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"hash/fnv"
|
||||
"maps"
|
||||
"reflect"
|
||||
|
||||
|
@ -93,7 +93,7 @@ func ValueOrFallback[T any](value, fallback T) T {
|
|||
}
|
||||
|
||||
func HashStrings(vals ...string) []byte {
|
||||
h := sha256.New()
|
||||
h := fnv.New64a()
|
||||
|
||||
for _, v := range vals {
|
||||
h.Write([]byte(v))
|
||||
|
@ -103,7 +103,7 @@ func HashStrings(vals ...string) []byte {
|
|||
}
|
||||
|
||||
func HashBytes(vals ...[]byte) []byte {
|
||||
h := sha256.New()
|
||||
h := fnv.New64a()
|
||||
|
||||
for _, v := range vals {
|
||||
h.Write(v)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue