searcherside/infrastructure/db/migrate/db_revision_rw.go

145 lines
3.1 KiB
Go
Raw Permalink Normal View History

package migrate
import (
"context"
"errors"
"time"
"code.icb4dc0.de/prskr/searcherside/core/ports"
"code.icb4dc0.de/prskr/searcherside/internal/ent"
"code.icb4dc0.de/prskr/searcherside/internal/ent/migration"
"ariga.io/atlas/sql/migrate"
"github.com/jackc/pgx/v5/pgconn"
"modernc.org/sqlite"
)
var _ ports.RevisionReadWriter = (*DBRevisionRW)(nil)
func NewDBRevisionRW(client *ent.Client) ports.RevisionReadWriter {
return &DBRevisionRW{
client: client,
}
}
type DBRevisionRW struct {
client *ent.Client
}
func (rw DBRevisionRW) Client() *ent.Client {
return rw.client
}
func (DBRevisionRW) Ident() *migrate.TableIdent {
return &migrate.TableIdent{
Name: migration.Table,
}
}
func (rw DBRevisionRW) ReadRevisions(ctx context.Context) ([]*migrate.Revision, error) {
allMigrations, err := rw.client.Migration.
Query().
All(ctx)
if err != nil && !ignoreError(err) {
return nil, err
}
revs := make([]*migrate.Revision, len(allMigrations))
for idx := range allMigrations {
revs[idx] = revisionOf(allMigrations[idx])
}
return revs, nil
}
func (rw DBRevisionRW) ReadRevision(ctx context.Context, s string) (*migrate.Revision, error) {
m, err := rw.client.Migration.
Query().
Where(migration.VersionEQ(s)).
Only(ctx)
if err != nil {
if ignoreError(err) {
return nil, migrate.ErrRevisionNotExist
}
return nil, err
}
return revisionOf(m), nil
}
func (rw DBRevisionRW) WriteRevision(ctx context.Context, revision *migrate.Revision) error {
err := rw.client.Migration.Create().
SetVersion(revision.Version).
SetDescription(revision.Description).
SetType(uint(revision.Type)).
SetApplied(revision.Applied).
SetTotal(revision.Total).
SetExecutedAt(revision.ExecutedAt).
SetExecutionTime(int64(revision.ExecutionTime)).
SetError(revision.Error).
SetErrorStmt(revision.ErrorStmt).
SetHash(revision.Hash).
SetPartialHashes(revision.PartialHashes).
SetOperatorVersion(revision.OperatorVersion).
OnConflictColumns(migration.FieldVersion).
UpdateNewValues().
Exec(ctx)
if ignoreError(err) {
return nil
}
return err
}
func (rw DBRevisionRW) DeleteRevision(ctx context.Context, s string) error {
_, err := rw.client.Migration.Delete().Where(migration.VersionEQ(s)).Exec(ctx)
return err
}
func revisionOf(m *ent.Migration) *migrate.Revision {
if m == nil {
return new(migrate.Revision)
}
return &migrate.Revision{
Version: m.Version,
Description: m.Description,
Type: migrate.RevisionType(m.Type),
Applied: m.Applied,
Total: m.Total,
ExecutedAt: m.ExecutedAt,
ExecutionTime: time.Duration(m.ExecutionTime),
Error: m.Error,
ErrorStmt: m.ErrorStmt,
Hash: m.Hash,
PartialHashes: m.PartialHashes,
OperatorVersion: m.OperatorVersion,
}
}
func ignoreError(err error) bool {
if ent.IsNotFound(err) {
return true
}
var postgresErr *pgconn.PgError
if errors.As(err, &postgresErr) {
switch postgresErr.Code {
case "42P01":
return true
}
}
// DB specific errors
var sqliteErr *sqlite.Error
if errors.As(err, &sqliteErr) {
if sqliteErr.Code() == 1 {
return true
}
}
return false
}