From 866e9908a86cf52f23a6651112609cff30385772 Mon Sep 17 00:00:00 2001 From: Peter Kurfer <peter.kurfer@rwe.com> Date: Fri, 14 Feb 2025 08:44:34 +0100 Subject: [PATCH] initial commit --- .editorconfig | 27 ++ .gitignore | 2 + Dockerfile | 26 ++ README.md | 14 + assets/db/01_init.sql | 59 ++++ assets/rabbitmq/definitions.json | 80 +++++ assets/rabbitmq/enabled_plugins | 1 + assets/rabbitmq/rabbitmq.conf | 7 + cmd/app.go | 59 ++++ compose.yml | 70 +++++ core/domain/repl_event.go | 44 +++ core/ports/repl_event_consumer.go | 11 + go.mod | 22 ++ go.sum | 48 +++ infrastructure/config/db.go | 7 + infrastructure/config/logging.go | 15 + infrastructure/config/rabbitmq.go | 11 + infrastructure/db/replication_client.go | 385 ++++++++++++++++++++++++ infrastructure/rabbitmq/publisher.go | 89 ++++++ main.go | 22 ++ 20 files changed, 999 insertions(+) create mode 100644 .editorconfig create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 assets/db/01_init.sql create mode 100644 assets/rabbitmq/definitions.json create mode 100644 assets/rabbitmq/enabled_plugins create mode 100644 assets/rabbitmq/rabbitmq.conf create mode 100644 cmd/app.go create mode 100644 compose.yml create mode 100644 core/domain/repl_event.go create mode 100644 core/ports/repl_event_consumer.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 infrastructure/config/db.go create mode 100644 infrastructure/config/logging.go create mode 100644 infrastructure/config/rabbitmq.go create mode 100644 infrastructure/db/replication_client.go create mode 100644 infrastructure/rabbitmq/publisher.go create mode 100644 main.go diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..2af8321 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,27 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +indent_size = 4 +tab_width = 4 +indent_style = space +insert_final_newline = false +max_line_length = 120 +trim_trailing_whitespace = true + +[*.go] +indent_style = tab +ij_smart_tabs = true +ij_go_GROUP_CURRENT_PROJECT_IMPORTS = true +ij_go_group_stdlib_imports = true +ij_go_import_sorting = goimports +ij_go_local_group_mode = project +ij_go_move_all_imports_in_one_declaration = true +ij_go_move_all_stdlib_imports_in_one_group = true +ij_go_remove_redundant_import_aliases = true + +[*.{yml,yaml}] +indent_size = 2 +tab_width = 2 +insert_final_newline = true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8c2dd28 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +out/ +.idea/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..48824fd --- /dev/null +++ b/Dockerfile @@ -0,0 +1,26 @@ +FROM docker.io/golang:1.24-alpine AS builder + +ENV CGO_ENABLED=0 + +WORKDIR /src + +RUN \ + --mount=type=cache,target=/go/pkg/mod \ + --mount=type=bind,source=./go.mod,target=./go.mod,rw=false \ + --mount=type=bind,source=./go.sum,target=./go.sum,rw=false \ + go mod download + +COPY . ./ + +RUN --mount=type=cache,target=/root/.cache/go-build \ + --mount=type=cache,target=/go/pkg/mod \ + mkdir -p ./out && \ + go build -o ./out/pg_v_man -trimpath -ldflags '-s -w' main.go + +FROM gcr.io/distroless/static:nonroot + +WORKDIR /app + +COPY --from=builder /src/out/pg_v_man ./ + +ENTRYPOINT ["/app/pg_v_man"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..6079194 --- /dev/null +++ b/README.md @@ -0,0 +1,14 @@ +# pg_v_man + +## Getting started + +To get started, simply start the Docker Compose stack: + +```bash +docker compose up +``` + +You can then open the [RabbitMQ management UI](http://localhost:15672) and watch messages coming in in the `letterbox` queue whenever you manipulate any data in the database. +The database port is by default 5432 (as always) and credentials can be found in the [compose.yml](./compose.yml). + +Right now no pgAdmin/cloud-beaver or anything are part of the stack, feel free to use whatever Postgres tool you prefer :). \ No newline at end of file diff --git a/assets/db/01_init.sql b/assets/db/01_init.sql new file mode 100644 index 0000000..28ffb1b --- /dev/null +++ b/assets/db/01_init.sql @@ -0,0 +1,59 @@ +CREATE PUBLICATION v_man_1 FOR ALL TABLES; + +create extension pgcrypto; + +create table users ( + id bigint primary key generated always as identity, + username text not null unique, + email text not null unique, + password_hash text not null +); + +create table categories ( + id bigint primary key generated always as identity, + name text not null unique +); + +create table lists ( + id bigint primary key generated always as identity, + user_id bigint not null references users (id), + name text not null +); + +create table tasks ( + id bigint primary key generated always as identity, + list_id bigint not null references lists (id), + category_id bigint references categories (id), + title text not null, + description text, + due_date date, + priority int, + completed boolean default false +); + +INSERT INTO + users (username, email, password_hash) +VALUES + ( + 'ted.tester', + 'ted.tester@example.com', + crypt ('password', gen_salt ('bf')) + ); + +INSERT INTO + categories (name) +VALUES + ('Groceries'), + ('Work'), + ('Personal'), + ('Other'); + +INSERT INTO + public.lists (user_id, name) +VALUES + (1, 'Groceries'); + +INSERT INTO + public.tasks (list_id, category_id, title) +VALUES + (1, 1, 'Orange Juice'); diff --git a/assets/rabbitmq/definitions.json b/assets/rabbitmq/definitions.json new file mode 100644 index 0000000..26ed99e --- /dev/null +++ b/assets/rabbitmq/definitions.json @@ -0,0 +1,80 @@ +{ + "rabbit_version": "4.0.5", + "rabbitmq_version": "4.0.5", + "product_name": "RabbitMQ", + "product_version": "4.0.5", + "rabbitmq_definition_format": "cluster", + "original_cluster_name": "rabbit@553990bf8169", + "explanation": "Definitions of cluster 'rabbit@553990bf8169'", + "users": [ + { + "name": "v_man", + "password_hash": "ASzCoAwdiANYvE0ySlYTw76+1u6Vda24cyafLJfSb8eiVmKp", + "hashing_algorithm": "rabbit_password_hashing_sha256", + "tags": ["administrator"], + "limits": {} + } + ], + "vhosts": [ + { + "name": "/", + "description": "Default virtual host", + "metadata": { + "description": "Default virtual host", + "tags": [], + "default_queue_type": "classic" + }, + "tags": [], + "default_queue_type": "classic" + } + ], + "permissions": [ + { + "user": "v_man", + "vhost": "/", + "configure": ".*", + "write": ".*", + "read": ".*" + } + ], + "topic_permissions": [], + "parameters": [], + "global_parameters": [ + { "name": "cluster_tags", "value": [] }, + { + "name": "internal_cluster_id", + "value": "rabbitmq-cluster-id-zBQKaZR5QrD8CTz1RhYHag" + } + ], + "policies": [], + "queues": [ + { + "name": "letterbox", + "vhost": "/", + "durable": true, + "auto_delete": false, + "arguments": { "x-max-length": 1000, "x-queue-type": "classic" } + } + ], + "exchanges": [ + { + "name": "pg_v_man", + "vhost": "/", + "type": "topic", + "durable": true, + "auto_delete": false, + "internal": false, + "arguments": {} + } + ], + "bindings": [ + { + "source": "pg_v_man", + "vhost": "/", + "destination": "letterbox", + "destination_type": "queue", + "routing_key": "*", + "arguments": {} + } + ] +} diff --git a/assets/rabbitmq/enabled_plugins b/assets/rabbitmq/enabled_plugins new file mode 100644 index 0000000..318ea04 --- /dev/null +++ b/assets/rabbitmq/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_management,rabbitmq_prometheus]. \ No newline at end of file diff --git a/assets/rabbitmq/rabbitmq.conf b/assets/rabbitmq/rabbitmq.conf new file mode 100644 index 0000000..c3b0b46 --- /dev/null +++ b/assets/rabbitmq/rabbitmq.conf @@ -0,0 +1,7 @@ +default_user = $(RABBITMQ_DEFAULT_USER) +default_pass = $(RABBITMQ_DEFAULT_PASS) + +definitions.import_backend = local_filesystem +definitions.local.path = /etc/rabbitmq/definitions.json + +log.console = true \ No newline at end of file diff --git a/cmd/app.go b/cmd/app.go new file mode 100644 index 0000000..c4c6b1e --- /dev/null +++ b/cmd/app.go @@ -0,0 +1,59 @@ +package cmd + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + + "github.com/alecthomas/kong" + + "code.icb4dc0.de/prskr/pg_v_man/infrastructure/config" + "code.icb4dc0.de/prskr/pg_v_man/infrastructure/db" + "code.icb4dc0.de/prskr/pg_v_man/infrastructure/rabbitmq" +) + +func RunApp(ctx context.Context) error { + var app App + + return kong.Parse( + &app, + kong.Name("replication-emitter"), + kong.BindTo(ctx, (*context.Context)(nil)), + ).Run() +} + +type App struct { + Logging config.Logging `embed:"" prefix:"logging."` + DB config.DB `embed:"" prefix:"db."` + RabbitMQ config.RabbitMQ `embed:"" prefix:"rabbitmq."` +} + +func (a *App) Run(ctx context.Context) (err error) { + publisher, err := rabbitmq.NewPublishingEventConsumer(ctx, a.RabbitMQ) + if err != nil { + return fmt.Errorf("could not create publishing event consumer: %w", err) + } + + replClient, err := db.NewReplicationClient(ctx, a.DB, publisher) + if err != nil { + return fmt.Errorf("could not create replication client: %w", err) + } + + defer func() { + err = errors.Join(err, replClient.Close(context.Background())) + }() + + return replClient.Receive(ctx) +} + +func (a *App) AfterApply(kongCtx *kong.Context) error { + defaultLogger := slog.New(slog.NewJSONHandler(os.Stderr, a.Logging.Options())) + + slog.SetDefault(defaultLogger) + + kongCtx.Bind(defaultLogger) + + return nil +} diff --git a/compose.yml b/compose.yml new file mode 100644 index 0000000..2b75366 --- /dev/null +++ b/compose.yml @@ -0,0 +1,70 @@ +services: + pg_v_man: + build: + context: . + dockerfile: Dockerfile + environment: + DB_CONNECTION_STRING: postgresql://postgres:postgres@postgres:5432/postgres?replication=database + DB_PUBLICATION: v_man_1 + RABBITMQ_CONNECTION_STRING: amqp://v_man:ies6ohF8@rabbitmq:5672/ + restart: always + depends_on: + - rabbitmq + - postgres + + postgres: + image: postgres:17.3 + command: + - "postgres" + - "-c" + - "wal_level=logical" + ports: + - target: 5432 + published: 5432 + protocol: tcp + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + volumes: + - type: bind + source: ./assets/db + target: /docker-entrypoint-initdb.d + - type: volume + source: postgres-data + target: /var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready"] + interval: 10s + timeout: 5s + retries: 5 + + rabbitmq: + image: rabbitmq:4-management-alpine + ports: + - target: 5672 + published: 5672 + protocol: tcp + - target: 15672 + published: 15672 + protocol: tcp + environment: + RABBITMQ_DEFAULT_USER: v_man + RABBITMQ_DEFAULT_PASS: ies6ohF8 + RABBITMQ_DEFAULT_VHOST: / + volumes: + - type: bind + source: ./assets/rabbitmq + target: /etc/rabbitmq + - type: volume + source: rabbitmq-data + target: /var/lib/rabbitmq + healthcheck: + test: rabbitmq-diagnostics -q ping + interval: 30s + timeout: 25s + retries: 3 + +volumes: + rabbitmq-data: + postgres-data: diff --git a/core/domain/repl_event.go b/core/domain/repl_event.go new file mode 100644 index 0000000..72ed41b --- /dev/null +++ b/core/domain/repl_event.go @@ -0,0 +1,44 @@ +package domain + +type EventType string + +func (e EventType) String() string { + return string(e) +} + +const ( + EventTypeInsert EventType = "INSERT" + EventTypeUpdate EventType = "UPDATE" + EventTypeDelete EventType = "DELETE" + EventTypeTruncate EventType = "TRUNCATE" +) + +func NewValues() *Values { + return &Values{ + Key: make(map[string]any), + Data: make(map[string]any), + } +} + +func (v *Values) AddValue(partOfKey bool, key string, value any) { + if partOfKey { + v.Key[key] = value + } else { + v.Data[key] = value + } +} + +type Values struct { + Key map[string]any + Data map[string]any +} + +type ReplicationEvent struct { + EventType EventType `json:"eventType"` + TransactionId uint32 `json:"transactionId"` + DBName string `json:"dbName"` + Namespace string `json:"namespace"` + Relation string `json:"relation"` + NewValues *Values `json:"newValues,omitempty"` + OldValues *Values `json:"oldValues,omitempty"` +} diff --git a/core/ports/repl_event_consumer.go b/core/ports/repl_event_consumer.go new file mode 100644 index 0000000..ab34414 --- /dev/null +++ b/core/ports/repl_event_consumer.go @@ -0,0 +1,11 @@ +package ports + +import ( + "context" + + "code.icb4dc0.de/prskr/pg_v_man/core/domain" +) + +type ReplicationEventConsumer interface { + OnDataChange(ctx context.Context, ev domain.ReplicationEvent) error +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d560fcd --- /dev/null +++ b/go.mod @@ -0,0 +1,22 @@ +module code.icb4dc0.de/prskr/pg_v_man + +go 1.24 + +toolchain go1.24.0 + +require ( + github.com/alecthomas/kong v0.9.0 + github.com/google/uuid v1.6.0 + github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 + github.com/jackc/pgx/v5 v5.6.0 + github.com/wagslane/go-rabbitmq v0.13.0 +) + +require ( + github.com/jackc/pgio v1.0.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect + golang.org/x/crypto v0.24.0 // indirect + golang.org/x/text v0.16.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e866489 --- /dev/null +++ b/go.sum @@ -0,0 +1,48 @@ +github.com/alecthomas/assert/v2 v2.6.0 h1:o3WJwILtexrEUk3cUVal3oiQY2tfgr/FHWiz/v2n4FU= +github.com/alecthomas/assert/v2 v2.6.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= +github.com/alecthomas/kong v0.9.0 h1:G5diXxc85KvoV2f0ZRVuMsi45IrBgx9zDNGNj165aPA= +github.com/alecthomas/kong v0.9.0/go.mod h1:Y47y5gKfHp1hDc7CH7OeXgLIpp+Q2m1Ni0L5s3bI8Os= +github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= +github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 h1:86CQbMauoZdLS0HDLcEHYo6rErjiCBjVvcxGsioIn7s= +github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9/go.mod h1:SO15KF4QqfUM5UhsG9roXre5qeAQLC1rm8a8Gjpgg5k= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/wagslane/go-rabbitmq v0.13.0 h1:u2JfKbwi3cbxCExKV34RrhKBZjW2HoRwyPTA8pERyrs= +github.com/wagslane/go-rabbitmq v0.13.0/go.mod h1:1sUJ53rrW2AIA7LEp8ymmmebHqqq8ksH/gXIfUP0I0s= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/infrastructure/config/db.go b/infrastructure/config/db.go new file mode 100644 index 0000000..1259ecd --- /dev/null +++ b/infrastructure/config/db.go @@ -0,0 +1,7 @@ +package config + +type DB struct { + ConnectionString string `name:"connection-string" env:"DB_CONNECTION_STRING" help:"Database connection string"` + Publication string `name:"publication" env:"DB_PUBLICATION" help:"Database publication name" default:"goplication"` + SlotName string `name:"slot-name" env:"DB_REPLICATION_SLOT_NAME" help:"Database replication slot name" default:"pg_v_man"` +} diff --git a/infrastructure/config/logging.go b/infrastructure/config/logging.go new file mode 100644 index 0000000..a6600e7 --- /dev/null +++ b/infrastructure/config/logging.go @@ -0,0 +1,15 @@ +package config + +import "log/slog" + +type Logging struct { + AddSource bool `env:"LOG_ADD_SOURCE" name:"add-source" default:"false"` + Level slog.Level `env:"LOG_LEVEL" name:"level" default:"info" help:"Log level to apply"` +} + +func (l Logging) Options() *slog.HandlerOptions { + return &slog.HandlerOptions{ + Level: l.Level, + AddSource: l.AddSource, + } +} diff --git a/infrastructure/config/rabbitmq.go b/infrastructure/config/rabbitmq.go new file mode 100644 index 0000000..7c941f0 --- /dev/null +++ b/infrastructure/config/rabbitmq.go @@ -0,0 +1,11 @@ +package config + +type RabbitMQ struct { + ConnectionString string `name:"connection-string" env:"RABBITMQ_CONNECTION_STRING" help:"RabbitMQ connection string"` + Exchange struct { + Name string `name:"name" env:"RABBITMQ_EXCHANGE_NAME" help:"RabbitMQ exchange name" default:"pg_v_man"` + Kind string `name:"kind" env:"RABBITMQ_EXCHANGE_KIND" help:"RabbitMQ exchange kind" default:"topic"` + Durable bool `name:"durable" env:"RABBITMQ_EXCHANGE_DURABLE" help:"RabbitMQ exchange durable" default:"true"` + } `embed:"" prefix:"exchange."` + RoutingKey string `name:"routing-key" env:"RABBITMQ_ROUTING_KEY" help:"RabbitMQ routing key" default:"pg_v_man"` +} diff --git a/infrastructure/db/replication_client.go b/infrastructure/db/replication_client.go new file mode 100644 index 0000000..4c25179 --- /dev/null +++ b/infrastructure/db/replication_client.go @@ -0,0 +1,385 @@ +package db + +import ( + "context" + "errors" + "fmt" + "log" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/jackc/pglogrepl" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgproto3" + "github.com/jackc/pgx/v5/pgtype" + + "code.icb4dc0.de/prskr/pg_v_man/core/domain" + "code.icb4dc0.de/prskr/pg_v_man/core/ports" + "code.icb4dc0.de/prskr/pg_v_man/infrastructure/config" +) + +const ( + outputPlugin = "pgoutput" + defaultConsumerTimeout = 200 * time.Millisecond +) + +func NewReplicationClient(ctx context.Context, cfg config.DB, consumer ports.ReplicationEventConsumer) (client *ReplicationClient, err error) { + conn, err := pgconn.Connect(ctx, cfg.ConnectionString) + if err != nil { + return nil, fmt.Errorf("could not connect to database: %w", err) + } + + defer func() { + if err != nil { + err = errors.Join(err, conn.Close(context.Background())) + } + }() + + client = &ReplicationClient{ + Conn: conn, + Consumer: consumer, + ConsumerTimeout: defaultConsumerTimeout, + typeMap: pgtype.NewMap(), + relations: make(map[uint32]*pglogrepl.RelationMessageV2), + } + + client.sysident, err = pglogrepl.IdentifySystem(ctx, conn) + if err != nil { + return nil, fmt.Errorf("could not identify system: %w", err) + } + + slog.Info( + "System identity", + slog.String("systemId", client.sysident.SystemID), + slog.Int("timeline", int(client.sysident.Timeline)), + slog.String("xlogpos", client.sysident.XLogPos.String()), + slog.String("dbname", client.sysident.DBName), + ) + + _, err = pglogrepl.CreateReplicationSlot(ctx, conn, cfg.SlotName, outputPlugin, pglogrepl.CreateReplicationSlotOptions{Temporary: true}) + if err != nil { + return nil, fmt.Errorf("could not create replication slot: %w", err) + } + + slog.Info("Replication slot created", slog.String("slotName", cfg.SlotName)) + + pluginArguments := []string{ + "proto_version '2'", + fmt.Sprintf("publication_names '%s'", cfg.Publication), + "messages 'true'", + "streaming 'true'", + } + + err = pglogrepl.StartReplication(ctx, conn, cfg.SlotName, client.sysident.XLogPos, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments}) + if err != nil { + return nil, fmt.Errorf("could not start replication: %w", err) + } + + slog.Info("Replication started", slog.String("slotName", cfg.SlotName)) + + return client, nil +} + +type ReplicationClient struct { + ConsumerTimeout time.Duration + Conn *pgconn.PgConn + Consumer ports.ReplicationEventConsumer + sysident pglogrepl.IdentifySystemResult + typeMap *pgtype.Map + relations map[uint32]*pglogrepl.RelationMessageV2 + latestXid uint32 + inStream bool +} + +func (c *ReplicationClient) Receive(ctx context.Context) (err error) { + clientXLogPos := c.sysident.XLogPos + standbyMessageTimeout := time.Second * 10 + nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) + + for ctx.Err() == nil { + if time.Now().After(nextStandbyMessageDeadline) { + err = pglogrepl.SendStandbyStatusUpdate(context.Background(), c.Conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: clientXLogPos}) + if err != nil { + return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) + } + + slog.Debug("Sent Standby status message", slog.String("xlogpos", clientXLogPos.String())) + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) + } + + receiveCtx, stop := context.WithDeadline(ctx, nextStandbyMessageDeadline) + rawMsg, err := c.Conn.ReceiveMessage(receiveCtx) + stop() + + if err != nil { + if pgconn.Timeout(err) { + continue + } + + return fmt.Errorf("could not receive message: %w", err) + } + + if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok { + slog.Error("Error response", slog.String("message", errMsg.Message)) + continue + } + + msg, ok := rawMsg.(*pgproto3.CopyData) + if !ok { + slog.Warn("Received unexpected message", slog.String("message", fmt.Sprintf("%T", rawMsg))) + continue + } + + switch msg.Data[0] { + case pglogrepl.PrimaryKeepaliveMessageByteID: + pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:]) + if err != nil { + return fmt.Errorf("ParsePrimaryKeepaliveMessage failed: %w", err) + } + + slog.Debug( + "Primary Keepalive Message", + slog.String("ServerWALEnd", pkm.ServerWALEnd.String()), + slog.Time("ServerTime", pkm.ServerTime), + slog.Bool("ReplyRequested", pkm.ReplyRequested), + ) + + if pkm.ServerWALEnd > clientXLogPos { + clientXLogPos = pkm.ServerWALEnd + } + if pkm.ReplyRequested { + nextStandbyMessageDeadline = time.Time{} + } + + case pglogrepl.XLogDataByteID: + xld, err := pglogrepl.ParseXLogData(msg.Data[1:]) + if err != nil { + return fmt.Errorf("ParseXLogData failed: %w", err) + } + + slog.Debug( + "Received log data", + slog.String("WALStart", xld.WALStart.String()), + slog.String("ServerWALEnd", xld.ServerWALEnd.String()), + slog.Time("ServerTime", xld.ServerTime), + ) + + processCtx, stop := context.WithTimeout(ctx, c.ConsumerTimeout) + err = c.handleReceivedLog(processCtx, xld.WALData) + stop() + + if err != nil { + return err + } + + if xld.WALStart > clientXLogPos { + clientXLogPos = xld.WALStart + } + } + + } + + return nil +} + +func (c *ReplicationClient) Close(ctx context.Context) error { + return c.Conn.Close(ctx) +} + +func (c *ReplicationClient) handleReceivedLog(ctx context.Context, walData []byte) error { + logicalMsg, err := pglogrepl.ParseV2(walData, c.inStream) + if err != nil { + return fmt.Errorf("parse logical replication message: %w", err) + } + + slog.Debug("Receive a logical replication message", slog.String("type", logicalMsg.Type().String())) + + switch logicalMsg := logicalMsg.(type) { + case *pglogrepl.RelationMessageV2: + c.relations[logicalMsg.RelationID] = logicalMsg + + case *pglogrepl.BeginMessage: + // Indicates the beginning of a group of changes in a transaction. This is only sent for committed transactions. You won't get any events from rolled back transactions. + slog.Debug("Begin message", slog.Int("xid", int(logicalMsg.Xid))) + c.latestXid = logicalMsg.Xid + case *pglogrepl.CommitMessage: + slog.Debug("Commit message", slog.String("commit_lsn", logicalMsg.CommitLSN.String())) + case *pglogrepl.InsertMessageV2: + rel, ok := c.relations[logicalMsg.RelationID] + if !ok { + return fmt.Errorf("unknown relation ID %d", logicalMsg.RelationID) + } + + slog.Debug("Received insert message", + slog.Uint64("xid", uint64(c.latestXid)), + slog.String("namespace", rel.Namespace), + slog.String("relation", rel.RelationName), + slog.Bool("in_stream", c.inStream), + ) + + ev := domain.ReplicationEvent{ + EventType: domain.EventTypeInsert, + TransactionId: c.latestXid, + DBName: c.sysident.DBName, + Namespace: rel.Namespace, + Relation: rel.RelationName, + NewValues: c.tupleToMap(rel, logicalMsg.Tuple, false), + } + + return c.Consumer.OnDataChange(ctx, ev) + + case *pglogrepl.UpdateMessageV2: + rel, ok := c.relations[logicalMsg.RelationID] + if !ok { + return fmt.Errorf("unknown relation ID %d", logicalMsg.RelationID) + } + + slog.Debug( + "Received update message", + slog.Uint64("xid", uint64(c.latestXid)), + slog.String("namespace", rel.Namespace), + slog.String("relation", rel.RelationName), + slog.Bool("in_stream", c.inStream), + ) + + ev := domain.ReplicationEvent{ + EventType: domain.EventTypeUpdate, + TransactionId: c.latestXid, + DBName: c.sysident.DBName, + Namespace: rel.Namespace, + Relation: rel.RelationName, + NewValues: c.tupleToMap(rel, logicalMsg.NewTuple, false), + OldValues: c.tupleToMap(rel, logicalMsg.OldTuple, false), + } + + return c.Consumer.OnDataChange(ctx, ev) + case *pglogrepl.DeleteMessageV2: + rel, ok := c.relations[logicalMsg.RelationID] + if !ok { + return fmt.Errorf("unknown relation ID %d", logicalMsg.RelationID) + } + + slog.Debug( + "Received deletion message", + slog.Uint64("xid", uint64(c.latestXid)), + slog.String("namespace", rel.Namespace), + slog.String("relation", rel.RelationName), + slog.Bool("in_stream", c.inStream), + ) + + ev := domain.ReplicationEvent{ + EventType: domain.EventTypeDelete, + TransactionId: c.latestXid, + DBName: c.sysident.DBName, + Namespace: rel.Namespace, + Relation: rel.RelationName, + OldValues: c.tupleToMap(rel, logicalMsg.OldTuple, true), + } + + return c.Consumer.OnDataChange(ctx, ev) + case *pglogrepl.TruncateMessageV2: + for _, relId := range logicalMsg.RelationIDs { + rel, ok := c.relations[relId] + if !ok { + return fmt.Errorf("unknown relation ID %d", relId) + } + + slog.Debug( + "Received truncation message", + slog.Uint64("xid", uint64(c.latestXid)), + slog.String("namespace", rel.Namespace), + slog.String("relation", rel.RelationName), slog.Int("xid", int(logicalMsg.Xid)), + slog.Bool("in_stream", c.inStream), + ) + + ev := domain.ReplicationEvent{ + EventType: domain.EventTypeTruncate, + TransactionId: c.latestXid, + DBName: c.sysident.DBName, + Namespace: rel.Namespace, + Relation: rel.RelationName, + } + + if err := c.Consumer.OnDataChange(ctx, ev); err != nil { + return err + } + } + case *pglogrepl.TypeMessageV2: + case *pglogrepl.OriginMessage: + case *pglogrepl.LogicalDecodingMessageV2: + slog.Debug( + "Logical decoding message", + slog.String("prefix", logicalMsg.Prefix), + slog.Any("content", logicalMsg.Content), + slog.Int("xid", int(logicalMsg.Xid)), + ) + case *pglogrepl.StreamStartMessageV2: + c.inStream = true + slog.Debug("Stream start message", + slog.Int("xid", int(logicalMsg.Xid)), + slog.Uint64("first_segment", uint64(logicalMsg.FirstSegment)), + ) + + case *pglogrepl.StreamStopMessageV2: + c.inStream = false + slog.Debug("Stream stop message") + case *pglogrepl.StreamCommitMessageV2: + slog.Debug("Stream commit message", slog.Int("xid", int(logicalMsg.Xid))) + case *pglogrepl.StreamAbortMessageV2: + slog.Debug("Stream abort message", slog.Int("xid", int(logicalMsg.Xid))) + default: + slog.Warn("Unknown message type", slog.String("type", fmt.Sprintf("%T", logicalMsg))) + } + + return nil +} + +func (c *ReplicationClient) tupleToMap(relation *pglogrepl.RelationMessageV2, data *pglogrepl.TupleData, onlyKey bool) *domain.Values { + if data == nil { + return nil + } + values := domain.NewValues() + for idx, col := range data.Columns { + isKey := relation.Columns[idx].Flags&1 > 0 + if onlyKey && !isKey { + continue + } + + colName := relation.Columns[idx].Name + switch col.DataType { + case pglogrepl.TupleDataTypeNull: + values.AddValue(isKey, colName, nil) + case pglogrepl.TupleDataTypeToast: // unchanged toast + // This TOAST value was not changed. TOAST values are not stored in the tuple, and logical replication doesn't want to spend a disk read to fetch its value for you. + case pglogrepl.TupleDataTypeBinary: + values.AddValue(isKey, colName, col.Data) + case pglogrepl.TupleDataTypeText: + val, err := c.decodeTextColumnData(col.Data, relation.Columns[idx].DataType) + if err != nil { + log.Fatalln("error decoding column data:", err) + } + values.AddValue(isKey, colName, val) + } + } + + return values +} + +func (c *ReplicationClient) decodeTextColumnData(data []byte, dataType uint32) (any, error) { + if dt, ok := c.typeMap.TypeForOID(dataType); ok { + decoded, err := dt.Codec.DecodeValue(c.typeMap, dataType, pgtype.TextFormatCode, data) + if err != nil { + return nil, err + } + + switch dataType { + case pgtype.UUIDOID: + raw := decoded.([16]byte) + return uuid.FromBytes(raw[:]) + default: + return decoded, nil + } + } + return string(data), nil +} diff --git a/infrastructure/rabbitmq/publisher.go b/infrastructure/rabbitmq/publisher.go new file mode 100644 index 0000000..595c347 --- /dev/null +++ b/infrastructure/rabbitmq/publisher.go @@ -0,0 +1,89 @@ +package rabbitmq + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net" + "strconv" + "strings" + + "github.com/wagslane/go-rabbitmq" + + "code.icb4dc0.de/prskr/pg_v_man/core/domain" + "code.icb4dc0.de/prskr/pg_v_man/core/ports" + "code.icb4dc0.de/prskr/pg_v_man/infrastructure/config" +) + +var _ ports.ReplicationEventConsumer = (*PublishingEventConsumer)(nil) + +func NewPublishingEventConsumer(ctx context.Context, cfg config.RabbitMQ) (consumer *PublishingEventConsumer, err error) { + var dialer net.Dialer + conn, err := rabbitmq.NewConn(cfg.ConnectionString, rabbitmq.WithConnectionOptionsLogging, func(options *rabbitmq.ConnectionOptions) { + options.Config.Dial = func(network, addr string) (net.Conn, error) { + return dialer.DialContext(ctx, network, addr) + } + }) + if err != nil { + return nil, fmt.Errorf("could not create RabbitMQ connection: %w", err) + } + + defer func() { + if err != nil { + err = errors.Join(err, conn.Close()) + } + }() + + publisherOptions := []func(*rabbitmq.PublisherOptions){ + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName(cfg.Exchange.Name), + rabbitmq.WithPublisherOptionsExchangeKind(cfg.Exchange.Kind), + rabbitmq.WithPublisherOptionsExchangeDeclare, + } + + if cfg.Exchange.Durable { + publisherOptions = append(publisherOptions, rabbitmq.WithPublisherOptionsExchangeDurable) + } + + publisher, err := rabbitmq.NewPublisher( + conn, + publisherOptions..., + ) + if err != nil { + return nil, fmt.Errorf("could not create RabbitMQ publisher: %w", err) + } + + return &PublishingEventConsumer{Conn: conn, Publisher: publisher, Cfg: cfg}, nil +} + +type PublishingEventConsumer struct { + Conn *rabbitmq.Conn + Publisher *rabbitmq.Publisher + Cfg config.RabbitMQ +} + +func (p PublishingEventConsumer) OnDataChange(ctx context.Context, ev domain.ReplicationEvent) error { + data, err := json.Marshal(ev) + if err != nil { + return fmt.Errorf("could not marshal event: %w", err) + } + return p.Publisher.PublishWithContext( + ctx, data, []string{p.Cfg.RoutingKey, strings.Join([]string{ev.DBName, ev.Namespace, ev.Relation}, ".")}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsExchange(p.Cfg.Exchange.Name), + rabbitmq.WithPublishOptionsCorrelationID(strconv.Itoa(int(ev.TransactionId))), + rabbitmq.WithPublishOptionsType(ev.EventType.String()), + rabbitmq.WithPublishOptionsHeaders(rabbitmq.Table{ + "db": ev.DBName, + "namespace": ev.Namespace, + "relation": ev.Relation, + }), + ) +} + +func (p PublishingEventConsumer) Close() error { + p.Publisher.Close() + + return p.Conn.Close() +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..04b95e0 --- /dev/null +++ b/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + + "code.icb4dc0.de/prskr/pg_v_man/cmd" +) + +func main() { + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + + err := cmd.RunApp(ctx) + stop() + + if err != nil { + fmt.Printf("Error: %v\n", err) + os.Exit(1) + } +}