From 7d9e518f8685fb07ce8097f103464fbe02dc7738 Mon Sep 17 00:00:00 2001 From: Peter Kurfer Date: Sun, 5 Jan 2025 11:42:15 +0100 Subject: [PATCH] refactor(db): extract Supabase migrations from release artifact --- .golangci.yml | 7 ++ .husky.toml | 4 +- api/v1alpha1/core_types.go | 17 +--- api/v1alpha1/zz_generated.deepcopy.go | 30 +----- cmd/control_plane.go | 4 +- cmd/main.go | 1 + .../bases/supabase.k8s.icb4dc0.de_cores.yaml | 26 +---- internal/controller/core_db_controller.go | 3 +- .../controller/dashboard_controller_test.go | 2 +- internal/controller/utils.go | 13 --- internal/db/migrator.go | 2 +- internal/errx/closing.go | 18 ++++ .../v1alpha1/apigateway_webhook_test.go | 1 - .../webhook/v1alpha1/core_webhook_test.go | 1 - .../v1alpha1/core_webhook_validator.go | 1 + magefiles/generate.go | 99 ++++++++++++------- magefiles/github.go | 9 +- magefiles/migrate.go | 54 ---------- test/e2e/e2e_test.go | 2 +- test/utils/utils.go | 4 +- 20 files changed, 113 insertions(+), 185 deletions(-) create mode 100644 internal/errx/closing.go delete mode 100644 magefiles/migrate.go diff --git a/.golangci.yml b/.golangci.yml index 1219d30..64275d2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -64,3 +64,10 @@ linters-settings: alias: $1$2 - pkg: "k8s.io/apimachinery/pkg/apis/meta/v1" alias: metav1 + +severity: + default-severity: error + rules: + - linters: + - godox + severity: info diff --git a/.husky.toml b/.husky.toml index 6b70cba..5ea6988 100644 --- a/.husky.toml +++ b/.husky.toml @@ -3,9 +3,7 @@ # git hook pre commit pre-commit = [ "go mod tidy -go=1.23", - "go run mage.go FetchImageMeta", - "go run mage.go CRDs", - "go run mage.go CRDDocs", + "go run mage.go GenerateAll", "husky lint-staged", # "golangci-lint run", ] diff --git a/api/v1alpha1/core_types.go b/api/v1alpha1/core_types.go index 4061a68..358abc2 100644 --- a/api/v1alpha1/core_types.go +++ b/api/v1alpha1/core_types.go @@ -25,7 +25,6 @@ import ( "slices" "strconv" "strings" - "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -381,7 +380,7 @@ type CoreSpec struct { Auth *AuthSpec `json:"auth,omitempty"` } -type MigrationStatus map[string]int64 +type MigrationStatus map[string]metav1.Time func (s MigrationStatus) IsApplied(name string) bool { _, ok := s[name] @@ -389,7 +388,7 @@ func (s MigrationStatus) IsApplied(name string) bool { } func (s MigrationStatus) Record(name string) { - s[name] = time.Now().UTC().UnixMilli() + s[name] = metav1.Now() } type DatabaseStatus struct { @@ -399,19 +398,9 @@ type DatabaseStatus struct { type CoreConditionType string -type CoreCondition struct { - Type CoreConditionType `json:"type"` - Status corev1.ConditionStatus `json:"status"` - LastProbeTime metav1.Time `json:"lastProbeTime,omitempty"` - LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` - Reason string `json:"reason,omitempty"` - Message string `json:"message,omitempty"` -} - // CoreStatus defines the observed state of Core. type CoreStatus struct { - Database DatabaseStatus `json:"database,omitempty"` - Conditions []CoreCondition `json:"conditions,omitempty"` + Database DatabaseStatus `json:"database,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 7192356..785aefa 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -21,7 +21,7 @@ limitations under the License. package v1alpha1 import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -319,23 +319,6 @@ func (in *Core) DeepCopyObject() runtime.Object { return nil } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *CoreCondition) DeepCopyInto(out *CoreCondition) { - *out = *in - in.LastProbeTime.DeepCopyInto(&out.LastProbeTime) - in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CoreCondition. -func (in *CoreCondition) DeepCopy() *CoreCondition { - if in == nil { - return nil - } - out := new(CoreCondition) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CoreList) DeepCopyInto(out *CoreList) { *out = *in @@ -399,13 +382,6 @@ func (in *CoreSpec) DeepCopy() *CoreSpec { func (in *CoreStatus) DeepCopyInto(out *CoreStatus) { *out = *in in.Database.DeepCopyInto(&out.Database) - if in.Conditions != nil { - in, out := &in.Conditions, &out.Conditions - *out = make([]CoreCondition, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CoreStatus. @@ -631,7 +607,7 @@ func (in *DatabaseStatus) DeepCopyInto(out *DatabaseStatus) { in, out := &in.AppliedMigrations, &out.AppliedMigrations *out = make(MigrationStatus, len(*in)) for key, val := range *in { - (*out)[key] = val + (*out)[key] = *val.DeepCopy() } } if in.Roles != nil { @@ -806,7 +782,7 @@ func (in MigrationStatus) DeepCopyInto(out *MigrationStatus) { in := &in *out = make(MigrationStatus, len(*in)) for key, val := range *in { - (*out)[key] = val + (*out)[key] = *val.DeepCopy() } } } diff --git a/cmd/control_plane.go b/cmd/control_plane.go index a00100b..1adfd0f 100644 --- a/cmd/control_plane.go +++ b/cmd/control_plane.go @@ -73,7 +73,8 @@ func (p controlPlane) Run(ctx context.Context, cache cache.SnapshotCache) (err e // gRPC golang library sets a very small upper bound for the number gRPC/h2 // streams over a single TCP connection. If a proxy multiplexes requests over // a single connection to the management server, then it might lead to - // availability problems. Keepalive timeouts based on connection_keepalive parameter https://www.envoyproxy.io/docs/envoy/latest/configuration/overview/examples#dynamic + // availability problems. Keepalive timeouts based on connection_keepalive parameter + // https://www.envoyproxy.io/docs/envoy/latest/configuration/overview/examples#dynamic grpcOptions := append(make([]grpc.ServerOption, 0, 4), grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams), @@ -139,6 +140,7 @@ func (p controlPlane) Run(ctx context.Context, cache cache.SnapshotCache) (err e return err } +//nolint:unparam // signature required by kong func (p controlPlane) AfterApply(kongctx *kong.Context) error { kongctx.BindTo(cache.NewSnapshotCache(false, cache.IDHash{}, nil), (*cache.SnapshotCache)(nil)) return nil diff --git a/cmd/main.go b/cmd/main.go index 3c76731..b298498 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -59,6 +59,7 @@ type app struct { } `embed:"" prefix:"logging."` } +//nolint:unparam // signature required by kong func (a app) AfterApply(kongctx *kong.Context) error { opts := zap.Options{ Development: a.Logging.Development, diff --git a/config/crd/bases/supabase.k8s.icb4dc0.de_cores.yaml b/config/crd/bases/supabase.k8s.icb4dc0.de_cores.yaml index 918d21f..57e6c35 100644 --- a/config/crd/bases/supabase.k8s.icb4dc0.de_cores.yaml +++ b/config/crd/bases/supabase.k8s.icb4dc0.de_cores.yaml @@ -1779,34 +1779,12 @@ spec: status: description: CoreStatus defines the observed state of Core. properties: - conditions: - items: - properties: - lastProbeTime: - format: date-time - type: string - lastTransitionTime: - format: date-time - type: string - message: - type: string - reason: - type: string - status: - type: string - type: - type: string - required: - - status - - type - type: object - type: array database: properties: appliedMigrations: additionalProperties: - format: int64 - type: integer + format: date-time + type: string type: object roles: additionalProperties: diff --git a/internal/controller/core_db_controller.go b/internal/controller/core_db_controller.go index f5ea369..050a2d8 100644 --- a/internal/controller/core_db_controller.go +++ b/internal/controller/core_db_controller.go @@ -36,6 +36,7 @@ import ( supabasev1alpha1 "code.icb4dc0.de/prskr/supabase-operator/api/v1alpha1" "code.icb4dc0.de/prskr/supabase-operator/assets/migrations" "code.icb4dc0.de/prskr/supabase-operator/internal/db" + "code.icb4dc0.de/prskr/supabase-operator/internal/errx" "code.icb4dc0.de/prskr/supabase-operator/internal/meta" "code.icb4dc0.de/prskr/supabase-operator/internal/supabase" ) @@ -69,7 +70,7 @@ func (r *CoreDbReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return ctrl.Result{RequeueAfter: 30 * time.Second}, nil } - defer CloseCtx(ctx, conn, &err) + defer errx.CloseCtx(ctx, conn, &err) logger.Info("Connected to database, checking for outstanding migrations") if err := r.applyMissingMigrations(ctx, conn, &core); err != nil { diff --git a/internal/controller/dashboard_controller_test.go b/internal/controller/dashboard_controller_test.go index 885d91b..97bc3b9 100644 --- a/internal/controller/dashboard_controller_test.go +++ b/internal/controller/dashboard_controller_test.go @@ -68,7 +68,7 @@ var _ = Describe("Dashboard Controller", func() { }) It("should successfully reconcile the resource", func() { By("Reconciling the created resource") - controllerReconciler := &DashboardReconciler{ + controllerReconciler := &DashboardPGMetaReconciler{ Client: k8sClient, Scheme: k8sClient.Scheme(), } diff --git a/internal/controller/utils.go b/internal/controller/utils.go index 529042e..cdcd1d1 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -3,8 +3,6 @@ package controller import ( "context" "crypto/sha256" - "errors" - "io" "maps" "reflect" @@ -20,17 +18,6 @@ import ( "code.icb4dc0.de/prskr/supabase-operator/api" ) -func Close(closer io.Closer, err *error) { - *err = errors.Join(*err, closer.Close()) -} - -func CloseCtx(ctx context.Context, closable interface { - Close(ctx context.Context) error -}, err *error, -) { - *err = errors.Join(*err, closable.Close(ctx)) -} - func ptrOf[T any](val T) *T { return &val } diff --git a/internal/db/migrator.go b/internal/db/migrator.go index 980d83d..a42c478 100644 --- a/internal/db/migrator.go +++ b/internal/db/migrator.go @@ -5,10 +5,10 @@ import ( "errors" "iter" - "code.icb4dc0.de/prskr/supabase-operator/assets/migrations" "github.com/jackc/pgx/v5" supabasev1alpha1 "code.icb4dc0.de/prskr/supabase-operator/api/v1alpha1" + "code.icb4dc0.de/prskr/supabase-operator/assets/migrations" ) type Migrator struct { diff --git a/internal/errx/closing.go b/internal/errx/closing.go new file mode 100644 index 0000000..0fa7b7f --- /dev/null +++ b/internal/errx/closing.go @@ -0,0 +1,18 @@ +package errx + +import ( + "context" + "errors" + "io" +) + +func Close(closer io.Closer, err *error) { + *err = errors.Join(*err, closer.Close()) +} + +func CloseCtx(ctx context.Context, closable interface { + Close(ctx context.Context) error +}, err *error, +) { + *err = errors.Join(*err, closable.Close(ctx)) +} diff --git a/internal/webhook/v1alpha1/apigateway_webhook_test.go b/internal/webhook/v1alpha1/apigateway_webhook_test.go index 1cd960e..2867512 100644 --- a/internal/webhook/v1alpha1/apigateway_webhook_test.go +++ b/internal/webhook/v1alpha1/apigateway_webhook_test.go @@ -83,5 +83,4 @@ var _ = Describe("APIGateway Webhook", func() { // Expect(validator.ValidateUpdate(ctx, oldObj, obj)).To(BeNil()) // }) }) - }) diff --git a/internal/webhook/v1alpha1/core_webhook_test.go b/internal/webhook/v1alpha1/core_webhook_test.go index a3fcf95..8a77d64 100644 --- a/internal/webhook/v1alpha1/core_webhook_test.go +++ b/internal/webhook/v1alpha1/core_webhook_test.go @@ -83,5 +83,4 @@ var _ = Describe("Core Webhook", func() { // Expect(validator.ValidateUpdate(ctx, oldObj, obj)).To(BeNil()) // }) }) - }) diff --git a/internal/webhook/v1alpha1/core_webhook_validator.go b/internal/webhook/v1alpha1/core_webhook_validator.go index a7e664e..e609e14 100644 --- a/internal/webhook/v1alpha1/core_webhook_validator.go +++ b/internal/webhook/v1alpha1/core_webhook_validator.go @@ -104,6 +104,7 @@ func (v *CoreCustomValidator) ValidateDelete(ctx context.Context, obj runtime.Ob return warns, nil } +//nolint:unparam // keep signature for later func (v *CoreCustomValidator) validateDb( ctx context.Context, core *supabasev1alpha1.Core, diff --git a/magefiles/generate.go b/magefiles/generate.go index 266e7f6..868bca8 100644 --- a/magefiles/generate.go +++ b/magefiles/generate.go @@ -1,10 +1,12 @@ package main import ( + "archive/tar" + "compress/gzip" "context" "errors" "fmt" - "io/fs" + "io" "log/slog" "net/http" "os" @@ -13,8 +15,9 @@ import ( "strings" "github.com/magefile/mage/mg" - "github.com/magefile/mage/sh" "gopkg.in/yaml.v3" + + "code.icb4dc0.de/prskr/supabase-operator/internal/errx" ) const ( @@ -50,7 +53,7 @@ func CRDDocs() error { ) } -func FetchImageMeta(ctx context.Context) error { +func FetchImageMeta(ctx context.Context) (err error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, composeFileUrl, nil) if err != nil { return err @@ -61,7 +64,7 @@ func FetchImageMeta(ctx context.Context) error { return err } - defer resp.Body.Close() + defer errx.Close(resp.Body, &err) var composeFile struct { Services map[string]struct { @@ -78,7 +81,7 @@ func FetchImageMeta(ctx context.Context) error { return err } - defer f.Close() + defer errx.Close(f, &err) type imageRef struct { Repository string @@ -139,55 +142,75 @@ func FetchImageMeta(ctx context.Context) error { return RunTool(tools[Gofumpt], "-l", "-w", f.Name()) } -func FetchMigrations(ctx context.Context) error { +func FetchMigrations(ctx context.Context) (err error) { latestRelease, err := latestReleaseVersion(ctx, "supabase", "postgres") if err != nil { return err } - checkoutDir, err := os.MkdirTemp(os.TempDir(), "supabase-*") + releaseArtifactURL := fmt.Sprintf("https://github.com/supabase/postgres/archive/refs/tags/%s.tar.gz", latestRelease) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, releaseArtifactURL, nil) if err != nil { return err } - repoFS := os.DirFS(checkoutDir) - - defer os.RemoveAll(checkoutDir) - - if err := Git("clone", "--filter=blob:none", "--no-checkout", "https://github.com/supabase/postgres", checkoutDir); err != nil { + resp, err := http.DefaultClient.Do(req) + if err != nil { return err } - if err := Git("-C", checkoutDir, "sparse-checkout", "set", "--cone", "migrations"); err != nil { + defer errx.Close(resp.Body, &err) + + gzipReader, err := gzip.NewReader(resp.Body) + if err != nil { return err } - if err := Git("-C", checkoutDir, "checkout", latestRelease); err != nil { - return err + defer errx.Close(gzipReader, &err) + + migrationsDirPath := path.Join(fmt.Sprintf("postgres-%s", latestRelease), ".", "migrations", "db") + "/" + tarReader := tar.NewReader(gzipReader) + + var header *tar.Header + + for header, err = tarReader.Next(); err == nil; header, err = tarReader.Next() { + fileInfo := header.FileInfo() + if fileInfo.IsDir() || path.Ext(fileInfo.Name()) != ".sql" { + continue + } + + fileName := header.Name + if strings.HasPrefix(fileName, migrationsDirPath) { + fileName = strings.TrimPrefix(fileName, migrationsDirPath) + + dir, _ := path.Split(fileName) + outDir := filepath.Join(workingDir, "assets", "migrations", filepath.FromSlash(dir)) + if err := os.MkdirAll(outDir, 0o750); err != nil { + return err + } + + slog.Info("Copying file", slog.String("file", fileName)) + outFile, err := os.Create(filepath.Join(workingDir, "assets", "migrations", filepath.FromSlash(fileName))) + if err != nil { + return err + } + + if _, err := io.Copy(outFile, tarReader); err != nil { + return err + } + + if err := outFile.Close(); err != nil { + return err + } + + } else { + slog.Debug("skipping file", slog.String("file", fileName)) + } } - migrationsDirPath := path.Join(".", "migrations", "db") - return fs.WalkDir(repoFS, migrationsDirPath, func(filePath string, d fs.DirEntry, err error) error { - if err != nil { - return err - } + if errors.Is(err, io.EOF) { + return nil + } - if d.IsDir() || filepath.Ext(filePath) != ".sql" { - return nil - } - - fileName, err := filepath.Rel(migrationsDirPath, filePath) - if err != nil { - return err - } - - dir, _ := filepath.Split(fileName) - - if err := os.MkdirAll(filepath.Join(workingDir, "assets", "migrations", dir), 0o750); err != nil { - return err - } - - slog.Info("Copying migration file", slog.String("file", fileName)) - return sh.Copy(filepath.Join(workingDir, "assets", "migrations", fileName), filepath.Join(checkoutDir, filepath.FromSlash(filePath))) - }) + return err } diff --git a/magefiles/github.go b/magefiles/github.go index c145079..c6d5198 100644 --- a/magefiles/github.go +++ b/magefiles/github.go @@ -5,10 +5,13 @@ import ( "encoding/json" "fmt" "net/http" + + "code.icb4dc0.de/prskr/supabase-operator/internal/errx" ) -func latestReleaseVersion(ctx context.Context, owner, repo string) (string, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://api.github.com/repos/%s/%s/releases/latest", owner, repo), nil) +func latestReleaseVersion(ctx context.Context, owner, repo string) (tagName string, err error) { + releaseURL := fmt.Sprintf("https://api.github.com/repos/%s/%s/releases/latest", owner, repo) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, releaseURL, nil) if err != nil { return "", err } @@ -18,7 +21,7 @@ func latestReleaseVersion(ctx context.Context, owner, repo string) (string, erro return "", err } - defer resp.Body.Close() + defer errx.Close(resp.Body, &err) if resp.StatusCode < 200 || resp.StatusCode >= 300 { return "", fmt.Errorf("failed to retrieve latest release: %s", resp.Status) diff --git a/magefiles/migrate.go b/magefiles/migrate.go deleted file mode 100644 index 690bf43..0000000 --- a/magefiles/migrate.go +++ /dev/null @@ -1,54 +0,0 @@ -package main - -import ( - "context" - "errors" - "log/slog" - "os" - - "github.com/jackc/pgx/v5" - - "code.icb4dc0.de/prskr/supabase-operator/assets/migrations" -) - -func Migrate(ctx context.Context) error { - dsn := os.Getenv("DATABASE_URL") - if dsn == "" { - return errors.New("DATABASE_URL is required") - } - - conn, err := pgx.Connect(ctx, dsn) - if err != nil { - return err - } - - defer conn.Close(ctx) - - for s, err := range migrations.InitScripts() { - if err != nil { - return err - } - - slog.Info("Running init script", slog.String("file", s.FileName)) - - _, err = conn.Exec(ctx, s.Content) - if err != nil { - return err - } - } - - for s, err := range migrations.MigrationScripts() { - if err != nil { - return err - } - - slog.Info("Running migration script", slog.String("file", s.FileName)) - - _, err = conn.Exec(ctx, s.Content) - if err != nil { - return err - } - } - - return nil -} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index f33bc35..5efc501 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -316,7 +316,7 @@ func serviceAccountToken() (string, error) { // Parse the JSON output to extract the token var token tokenRequest - err = json.Unmarshal([]byte(output), &token) + err = json.Unmarshal(output, &token) g.Expect(err).NotTo(HaveOccurred()) out = token.Status.Token diff --git a/test/utils/utils.go b/test/utils/utils.go index ba78110..528c6d7 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -92,7 +92,7 @@ func IsPrometheusCRDsInstalled() bool { if err != nil { return false } - crdList := GetNonEmptyLines(string(output)) + crdList := GetNonEmptyLines(output) for _, crd := range prometheusCRDs { for _, line := range crdList { if strings.Contains(line, crd) { @@ -153,7 +153,7 @@ func IsCertManagerCRDsInstalled() bool { } // Check if any of the Cert Manager CRDs are present - crdList := GetNonEmptyLines(string(output)) + crdList := GetNonEmptyLines(output) for _, crd := range certManagerCRDs { for _, line := range crdList { if strings.Contains(line, crd) {