commit 866e9908a86cf52f23a6651112609cff30385772
Author: Peter Kurfer <peter.kurfer@rwe.com>
Date:   Fri Feb 14 08:44:34 2025 +0100

    initial commit

diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..2af8321
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,27 @@
+root = true
+
+[*]
+charset = utf-8
+end_of_line = lf
+indent_size = 4
+tab_width = 4
+indent_style = space
+insert_final_newline = false
+max_line_length = 120
+trim_trailing_whitespace = true
+
+[*.go]
+indent_style = tab
+ij_smart_tabs = true
+ij_go_GROUP_CURRENT_PROJECT_IMPORTS = true
+ij_go_group_stdlib_imports = true
+ij_go_import_sorting = goimports
+ij_go_local_group_mode = project
+ij_go_move_all_imports_in_one_declaration = true
+ij_go_move_all_stdlib_imports_in_one_group = true
+ij_go_remove_redundant_import_aliases = true
+
+[*.{yml,yaml}]
+indent_size = 2
+tab_width = 2
+insert_final_newline = true
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..8c2dd28
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+out/
+.idea/
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..48824fd
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,26 @@
+FROM docker.io/golang:1.24-alpine AS builder
+
+ENV CGO_ENABLED=0
+
+WORKDIR /src
+
+RUN \
+    --mount=type=cache,target=/go/pkg/mod \
+    --mount=type=bind,source=./go.mod,target=./go.mod,rw=false \
+    --mount=type=bind,source=./go.sum,target=./go.sum,rw=false \
+    go mod download
+
+COPY . ./
+
+RUN --mount=type=cache,target=/root/.cache/go-build \
+    --mount=type=cache,target=/go/pkg/mod \
+    mkdir -p ./out && \
+    go build -o ./out/pg_v_man -trimpath -ldflags '-s -w' main.go
+
+FROM gcr.io/distroless/static:nonroot
+
+WORKDIR /app
+
+COPY --from=builder /src/out/pg_v_man ./
+
+ENTRYPOINT ["/app/pg_v_man"]
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..6079194
--- /dev/null
+++ b/README.md
@@ -0,0 +1,14 @@
+# pg_v_man
+
+## Getting started
+
+To get started, simply start the Docker Compose stack:
+
+```bash
+docker compose up
+```
+
+You can then open the [RabbitMQ management UI](http://localhost:15672) and watch messages coming in in the `letterbox` queue whenever you manipulate any data in the database.
+The database port is by default 5432 (as always) and credentials can be found in the [compose.yml](./compose.yml).
+
+Right now no pgAdmin/cloud-beaver or anything are part of the stack, feel free to use whatever Postgres tool you prefer :).
\ No newline at end of file
diff --git a/assets/db/01_init.sql b/assets/db/01_init.sql
new file mode 100644
index 0000000..28ffb1b
--- /dev/null
+++ b/assets/db/01_init.sql
@@ -0,0 +1,59 @@
+CREATE PUBLICATION v_man_1 FOR ALL TABLES;
+
+create extension pgcrypto;
+
+create table users (
+    id bigint primary key generated always as identity,
+    username text not null unique,
+    email text not null unique,
+    password_hash text not null
+);
+
+create table categories (
+    id bigint primary key generated always as identity,
+    name text not null unique
+);
+
+create table lists (
+    id bigint primary key generated always as identity,
+    user_id bigint not null references users (id),
+    name text not null
+);
+
+create table tasks (
+    id bigint primary key generated always as identity,
+    list_id bigint not null references lists (id),
+    category_id bigint references categories (id),
+    title text not null,
+    description text,
+    due_date date,
+    priority int,
+    completed boolean default false
+);
+
+INSERT INTO
+    users (username, email, password_hash)
+VALUES
+    (
+        'ted.tester',
+        'ted.tester@example.com',
+        crypt ('password', gen_salt ('bf'))
+    );
+
+INSERT INTO
+    categories (name)
+VALUES
+    ('Groceries'),
+    ('Work'),
+    ('Personal'),
+    ('Other');
+
+INSERT INTO
+    public.lists (user_id, name)
+VALUES
+    (1, 'Groceries');
+
+INSERT INTO
+    public.tasks (list_id, category_id, title)
+VALUES
+    (1, 1, 'Orange Juice');
diff --git a/assets/rabbitmq/definitions.json b/assets/rabbitmq/definitions.json
new file mode 100644
index 0000000..26ed99e
--- /dev/null
+++ b/assets/rabbitmq/definitions.json
@@ -0,0 +1,80 @@
+{
+    "rabbit_version": "4.0.5",
+    "rabbitmq_version": "4.0.5",
+    "product_name": "RabbitMQ",
+    "product_version": "4.0.5",
+    "rabbitmq_definition_format": "cluster",
+    "original_cluster_name": "rabbit@553990bf8169",
+    "explanation": "Definitions of cluster 'rabbit@553990bf8169'",
+    "users": [
+        {
+            "name": "v_man",
+            "password_hash": "ASzCoAwdiANYvE0ySlYTw76+1u6Vda24cyafLJfSb8eiVmKp",
+            "hashing_algorithm": "rabbit_password_hashing_sha256",
+            "tags": ["administrator"],
+            "limits": {}
+        }
+    ],
+    "vhosts": [
+        {
+            "name": "/",
+            "description": "Default virtual host",
+            "metadata": {
+                "description": "Default virtual host",
+                "tags": [],
+                "default_queue_type": "classic"
+            },
+            "tags": [],
+            "default_queue_type": "classic"
+        }
+    ],
+    "permissions": [
+        {
+            "user": "v_man",
+            "vhost": "/",
+            "configure": ".*",
+            "write": ".*",
+            "read": ".*"
+        }
+    ],
+    "topic_permissions": [],
+    "parameters": [],
+    "global_parameters": [
+        { "name": "cluster_tags", "value": [] },
+        {
+            "name": "internal_cluster_id",
+            "value": "rabbitmq-cluster-id-zBQKaZR5QrD8CTz1RhYHag"
+        }
+    ],
+    "policies": [],
+    "queues": [
+        {
+            "name": "letterbox",
+            "vhost": "/",
+            "durable": true,
+            "auto_delete": false,
+            "arguments": { "x-max-length": 1000, "x-queue-type": "classic" }
+        }
+    ],
+    "exchanges": [
+        {
+            "name": "pg_v_man",
+            "vhost": "/",
+            "type": "topic",
+            "durable": true,
+            "auto_delete": false,
+            "internal": false,
+            "arguments": {}
+        }
+    ],
+    "bindings": [
+        {
+            "source": "pg_v_man",
+            "vhost": "/",
+            "destination": "letterbox",
+            "destination_type": "queue",
+            "routing_key": "*",
+            "arguments": {}
+        }
+    ]
+}
diff --git a/assets/rabbitmq/enabled_plugins b/assets/rabbitmq/enabled_plugins
new file mode 100644
index 0000000..318ea04
--- /dev/null
+++ b/assets/rabbitmq/enabled_plugins
@@ -0,0 +1 @@
+[rabbitmq_management,rabbitmq_prometheus].
\ No newline at end of file
diff --git a/assets/rabbitmq/rabbitmq.conf b/assets/rabbitmq/rabbitmq.conf
new file mode 100644
index 0000000..c3b0b46
--- /dev/null
+++ b/assets/rabbitmq/rabbitmq.conf
@@ -0,0 +1,7 @@
+default_user = $(RABBITMQ_DEFAULT_USER)
+default_pass = $(RABBITMQ_DEFAULT_PASS)
+
+definitions.import_backend = local_filesystem
+definitions.local.path = /etc/rabbitmq/definitions.json
+
+log.console = true
\ No newline at end of file
diff --git a/cmd/app.go b/cmd/app.go
new file mode 100644
index 0000000..c4c6b1e
--- /dev/null
+++ b/cmd/app.go
@@ -0,0 +1,59 @@
+package cmd
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"log/slog"
+	"os"
+
+	"github.com/alecthomas/kong"
+
+	"code.icb4dc0.de/prskr/pg_v_man/infrastructure/config"
+	"code.icb4dc0.de/prskr/pg_v_man/infrastructure/db"
+	"code.icb4dc0.de/prskr/pg_v_man/infrastructure/rabbitmq"
+)
+
+func RunApp(ctx context.Context) error {
+	var app App
+
+	return kong.Parse(
+		&app,
+		kong.Name("replication-emitter"),
+		kong.BindTo(ctx, (*context.Context)(nil)),
+	).Run()
+}
+
+type App struct {
+	Logging  config.Logging  `embed:"" prefix:"logging."`
+	DB       config.DB       `embed:"" prefix:"db."`
+	RabbitMQ config.RabbitMQ `embed:"" prefix:"rabbitmq."`
+}
+
+func (a *App) Run(ctx context.Context) (err error) {
+	publisher, err := rabbitmq.NewPublishingEventConsumer(ctx, a.RabbitMQ)
+	if err != nil {
+		return fmt.Errorf("could not create publishing event consumer: %w", err)
+	}
+
+	replClient, err := db.NewReplicationClient(ctx, a.DB, publisher)
+	if err != nil {
+		return fmt.Errorf("could not create replication client: %w", err)
+	}
+
+	defer func() {
+		err = errors.Join(err, replClient.Close(context.Background()))
+	}()
+
+	return replClient.Receive(ctx)
+}
+
+func (a *App) AfterApply(kongCtx *kong.Context) error {
+	defaultLogger := slog.New(slog.NewJSONHandler(os.Stderr, a.Logging.Options()))
+
+	slog.SetDefault(defaultLogger)
+
+	kongCtx.Bind(defaultLogger)
+
+	return nil
+}
diff --git a/compose.yml b/compose.yml
new file mode 100644
index 0000000..2b75366
--- /dev/null
+++ b/compose.yml
@@ -0,0 +1,70 @@
+services:
+  pg_v_man:
+    build:
+      context: .
+      dockerfile: Dockerfile
+    environment:
+      DB_CONNECTION_STRING: postgresql://postgres:postgres@postgres:5432/postgres?replication=database
+      DB_PUBLICATION: v_man_1
+      RABBITMQ_CONNECTION_STRING: amqp://v_man:ies6ohF8@rabbitmq:5672/
+    restart: always
+    depends_on:
+      - rabbitmq
+      - postgres
+
+  postgres:
+    image: postgres:17.3
+    command:
+      - "postgres"
+      - "-c"
+      - "wal_level=logical"
+    ports:
+      - target: 5432
+        published: 5432
+        protocol: tcp
+    environment:
+      POSTGRES_USER: postgres
+      POSTGRES_PASSWORD: postgres
+      POSTGRES_DB: postgres
+    volumes:
+      - type: bind
+        source: ./assets/db
+        target: /docker-entrypoint-initdb.d
+      - type: volume
+        source: postgres-data
+        target: /var/lib/postgresql/data
+    healthcheck:
+      test: ["CMD-SHELL", "pg_isready"]
+      interval: 10s
+      timeout: 5s
+      retries: 5
+
+  rabbitmq:
+    image: rabbitmq:4-management-alpine
+    ports:
+      - target: 5672
+        published: 5672
+        protocol: tcp
+      - target: 15672
+        published: 15672
+        protocol: tcp
+    environment:
+      RABBITMQ_DEFAULT_USER: v_man
+      RABBITMQ_DEFAULT_PASS: ies6ohF8
+      RABBITMQ_DEFAULT_VHOST: /
+    volumes:
+      - type: bind
+        source: ./assets/rabbitmq
+        target: /etc/rabbitmq
+      - type: volume
+        source: rabbitmq-data
+        target: /var/lib/rabbitmq
+    healthcheck:
+      test: rabbitmq-diagnostics -q ping
+      interval: 30s
+      timeout: 25s
+      retries: 3
+
+volumes:
+  rabbitmq-data:
+  postgres-data:
diff --git a/core/domain/repl_event.go b/core/domain/repl_event.go
new file mode 100644
index 0000000..72ed41b
--- /dev/null
+++ b/core/domain/repl_event.go
@@ -0,0 +1,44 @@
+package domain
+
+type EventType string
+
+func (e EventType) String() string {
+	return string(e)
+}
+
+const (
+	EventTypeInsert   EventType = "INSERT"
+	EventTypeUpdate   EventType = "UPDATE"
+	EventTypeDelete   EventType = "DELETE"
+	EventTypeTruncate EventType = "TRUNCATE"
+)
+
+func NewValues() *Values {
+	return &Values{
+		Key:  make(map[string]any),
+		Data: make(map[string]any),
+	}
+}
+
+func (v *Values) AddValue(partOfKey bool, key string, value any) {
+	if partOfKey {
+		v.Key[key] = value
+	} else {
+		v.Data[key] = value
+	}
+}
+
+type Values struct {
+	Key  map[string]any
+	Data map[string]any
+}
+
+type ReplicationEvent struct {
+	EventType     EventType `json:"eventType"`
+	TransactionId uint32    `json:"transactionId"`
+	DBName        string    `json:"dbName"`
+	Namespace     string    `json:"namespace"`
+	Relation      string    `json:"relation"`
+	NewValues     *Values   `json:"newValues,omitempty"`
+	OldValues     *Values   `json:"oldValues,omitempty"`
+}
diff --git a/core/ports/repl_event_consumer.go b/core/ports/repl_event_consumer.go
new file mode 100644
index 0000000..ab34414
--- /dev/null
+++ b/core/ports/repl_event_consumer.go
@@ -0,0 +1,11 @@
+package ports
+
+import (
+	"context"
+
+	"code.icb4dc0.de/prskr/pg_v_man/core/domain"
+)
+
+type ReplicationEventConsumer interface {
+	OnDataChange(ctx context.Context, ev domain.ReplicationEvent) error
+}
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..d560fcd
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,22 @@
+module code.icb4dc0.de/prskr/pg_v_man
+
+go 1.24
+
+toolchain go1.24.0
+
+require (
+	github.com/alecthomas/kong v0.9.0
+	github.com/google/uuid v1.6.0
+	github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
+	github.com/jackc/pgx/v5 v5.6.0
+	github.com/wagslane/go-rabbitmq v0.13.0
+)
+
+require (
+	github.com/jackc/pgio v1.0.0 // indirect
+	github.com/jackc/pgpassfile v1.0.0 // indirect
+	github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
+	github.com/rabbitmq/amqp091-go v1.10.0 // indirect
+	golang.org/x/crypto v0.24.0 // indirect
+	golang.org/x/text v0.16.0 // indirect
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..e866489
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,48 @@
+github.com/alecthomas/assert/v2 v2.6.0 h1:o3WJwILtexrEUk3cUVal3oiQY2tfgr/FHWiz/v2n4FU=
+github.com/alecthomas/assert/v2 v2.6.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
+github.com/alecthomas/kong v0.9.0 h1:G5diXxc85KvoV2f0ZRVuMsi45IrBgx9zDNGNj165aPA=
+github.com/alecthomas/kong v0.9.0/go.mod h1:Y47y5gKfHp1hDc7CH7OeXgLIpp+Q2m1Ni0L5s3bI8Os=
+github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
+github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
+github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
+github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
+github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
+github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 h1:86CQbMauoZdLS0HDLcEHYo6rErjiCBjVvcxGsioIn7s=
+github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9/go.mod h1:SO15KF4QqfUM5UhsG9roXre5qeAQLC1rm8a8Gjpgg5k=
+github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
+github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
+github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
+github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
+github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
+github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
+github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
+github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
+github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
+github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
+github.com/wagslane/go-rabbitmq v0.13.0 h1:u2JfKbwi3cbxCExKV34RrhKBZjW2HoRwyPTA8pERyrs=
+github.com/wagslane/go-rabbitmq v0.13.0/go.mod h1:1sUJ53rrW2AIA7LEp8ymmmebHqqq8ksH/gXIfUP0I0s=
+go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
+go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
+golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
+golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
+golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
+golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
+golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/infrastructure/config/db.go b/infrastructure/config/db.go
new file mode 100644
index 0000000..1259ecd
--- /dev/null
+++ b/infrastructure/config/db.go
@@ -0,0 +1,7 @@
+package config
+
+type DB struct {
+	ConnectionString string `name:"connection-string" env:"DB_CONNECTION_STRING" help:"Database connection string"`
+	Publication      string `name:"publication" env:"DB_PUBLICATION" help:"Database publication name" default:"goplication"`
+	SlotName         string `name:"slot-name" env:"DB_REPLICATION_SLOT_NAME" help:"Database replication slot name" default:"pg_v_man"`
+}
diff --git a/infrastructure/config/logging.go b/infrastructure/config/logging.go
new file mode 100644
index 0000000..a6600e7
--- /dev/null
+++ b/infrastructure/config/logging.go
@@ -0,0 +1,15 @@
+package config
+
+import "log/slog"
+
+type Logging struct {
+	AddSource bool       `env:"LOG_ADD_SOURCE" name:"add-source" default:"false"`
+	Level     slog.Level `env:"LOG_LEVEL" name:"level" default:"info" help:"Log level to apply"`
+}
+
+func (l Logging) Options() *slog.HandlerOptions {
+	return &slog.HandlerOptions{
+		Level:     l.Level,
+		AddSource: l.AddSource,
+	}
+}
diff --git a/infrastructure/config/rabbitmq.go b/infrastructure/config/rabbitmq.go
new file mode 100644
index 0000000..7c941f0
--- /dev/null
+++ b/infrastructure/config/rabbitmq.go
@@ -0,0 +1,11 @@
+package config
+
+type RabbitMQ struct {
+	ConnectionString string `name:"connection-string" env:"RABBITMQ_CONNECTION_STRING" help:"RabbitMQ connection string"`
+	Exchange         struct {
+		Name    string `name:"name" env:"RABBITMQ_EXCHANGE_NAME" help:"RabbitMQ exchange name" default:"pg_v_man"`
+		Kind    string `name:"kind" env:"RABBITMQ_EXCHANGE_KIND" help:"RabbitMQ exchange kind" default:"topic"`
+		Durable bool   `name:"durable" env:"RABBITMQ_EXCHANGE_DURABLE" help:"RabbitMQ exchange durable" default:"true"`
+	} `embed:"" prefix:"exchange."`
+	RoutingKey string `name:"routing-key" env:"RABBITMQ_ROUTING_KEY" help:"RabbitMQ routing key" default:"pg_v_man"`
+}
diff --git a/infrastructure/db/replication_client.go b/infrastructure/db/replication_client.go
new file mode 100644
index 0000000..4c25179
--- /dev/null
+++ b/infrastructure/db/replication_client.go
@@ -0,0 +1,385 @@
+package db
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"log"
+	"log/slog"
+	"time"
+
+	"github.com/google/uuid"
+	"github.com/jackc/pglogrepl"
+	"github.com/jackc/pgx/v5/pgconn"
+	"github.com/jackc/pgx/v5/pgproto3"
+	"github.com/jackc/pgx/v5/pgtype"
+
+	"code.icb4dc0.de/prskr/pg_v_man/core/domain"
+	"code.icb4dc0.de/prskr/pg_v_man/core/ports"
+	"code.icb4dc0.de/prskr/pg_v_man/infrastructure/config"
+)
+
+const (
+	outputPlugin           = "pgoutput"
+	defaultConsumerTimeout = 200 * time.Millisecond
+)
+
+func NewReplicationClient(ctx context.Context, cfg config.DB, consumer ports.ReplicationEventConsumer) (client *ReplicationClient, err error) {
+	conn, err := pgconn.Connect(ctx, cfg.ConnectionString)
+	if err != nil {
+		return nil, fmt.Errorf("could not connect to database: %w", err)
+	}
+
+	defer func() {
+		if err != nil {
+			err = errors.Join(err, conn.Close(context.Background()))
+		}
+	}()
+
+	client = &ReplicationClient{
+		Conn:            conn,
+		Consumer:        consumer,
+		ConsumerTimeout: defaultConsumerTimeout,
+		typeMap:         pgtype.NewMap(),
+		relations:       make(map[uint32]*pglogrepl.RelationMessageV2),
+	}
+
+	client.sysident, err = pglogrepl.IdentifySystem(ctx, conn)
+	if err != nil {
+		return nil, fmt.Errorf("could not identify system: %w", err)
+	}
+
+	slog.Info(
+		"System identity",
+		slog.String("systemId", client.sysident.SystemID),
+		slog.Int("timeline", int(client.sysident.Timeline)),
+		slog.String("xlogpos", client.sysident.XLogPos.String()),
+		slog.String("dbname", client.sysident.DBName),
+	)
+
+	_, err = pglogrepl.CreateReplicationSlot(ctx, conn, cfg.SlotName, outputPlugin, pglogrepl.CreateReplicationSlotOptions{Temporary: true})
+	if err != nil {
+		return nil, fmt.Errorf("could not create replication slot: %w", err)
+	}
+
+	slog.Info("Replication slot created", slog.String("slotName", cfg.SlotName))
+
+	pluginArguments := []string{
+		"proto_version '2'",
+		fmt.Sprintf("publication_names '%s'", cfg.Publication),
+		"messages 'true'",
+		"streaming 'true'",
+	}
+
+	err = pglogrepl.StartReplication(ctx, conn, cfg.SlotName, client.sysident.XLogPos, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
+	if err != nil {
+		return nil, fmt.Errorf("could not start replication: %w", err)
+	}
+
+	slog.Info("Replication started", slog.String("slotName", cfg.SlotName))
+
+	return client, nil
+}
+
+type ReplicationClient struct {
+	ConsumerTimeout time.Duration
+	Conn            *pgconn.PgConn
+	Consumer        ports.ReplicationEventConsumer
+	sysident        pglogrepl.IdentifySystemResult
+	typeMap         *pgtype.Map
+	relations       map[uint32]*pglogrepl.RelationMessageV2
+	latestXid       uint32
+	inStream        bool
+}
+
+func (c *ReplicationClient) Receive(ctx context.Context) (err error) {
+	clientXLogPos := c.sysident.XLogPos
+	standbyMessageTimeout := time.Second * 10
+	nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
+
+	for ctx.Err() == nil {
+		if time.Now().After(nextStandbyMessageDeadline) {
+			err = pglogrepl.SendStandbyStatusUpdate(context.Background(), c.Conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: clientXLogPos})
+			if err != nil {
+				return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
+			}
+
+			slog.Debug("Sent Standby status message", slog.String("xlogpos", clientXLogPos.String()))
+			nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
+		}
+
+		receiveCtx, stop := context.WithDeadline(ctx, nextStandbyMessageDeadline)
+		rawMsg, err := c.Conn.ReceiveMessage(receiveCtx)
+		stop()
+
+		if err != nil {
+			if pgconn.Timeout(err) {
+				continue
+			}
+
+			return fmt.Errorf("could not receive message: %w", err)
+		}
+
+		if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
+			slog.Error("Error response", slog.String("message", errMsg.Message))
+			continue
+		}
+
+		msg, ok := rawMsg.(*pgproto3.CopyData)
+		if !ok {
+			slog.Warn("Received unexpected message", slog.String("message", fmt.Sprintf("%T", rawMsg)))
+			continue
+		}
+
+		switch msg.Data[0] {
+		case pglogrepl.PrimaryKeepaliveMessageByteID:
+			pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:])
+			if err != nil {
+				return fmt.Errorf("ParsePrimaryKeepaliveMessage failed: %w", err)
+			}
+
+			slog.Debug(
+				"Primary Keepalive Message",
+				slog.String("ServerWALEnd", pkm.ServerWALEnd.String()),
+				slog.Time("ServerTime", pkm.ServerTime),
+				slog.Bool("ReplyRequested", pkm.ReplyRequested),
+			)
+
+			if pkm.ServerWALEnd > clientXLogPos {
+				clientXLogPos = pkm.ServerWALEnd
+			}
+			if pkm.ReplyRequested {
+				nextStandbyMessageDeadline = time.Time{}
+			}
+
+		case pglogrepl.XLogDataByteID:
+			xld, err := pglogrepl.ParseXLogData(msg.Data[1:])
+			if err != nil {
+				return fmt.Errorf("ParseXLogData failed: %w", err)
+			}
+
+			slog.Debug(
+				"Received log data",
+				slog.String("WALStart", xld.WALStart.String()),
+				slog.String("ServerWALEnd", xld.ServerWALEnd.String()),
+				slog.Time("ServerTime", xld.ServerTime),
+			)
+
+			processCtx, stop := context.WithTimeout(ctx, c.ConsumerTimeout)
+			err = c.handleReceivedLog(processCtx, xld.WALData)
+			stop()
+
+			if err != nil {
+				return err
+			}
+
+			if xld.WALStart > clientXLogPos {
+				clientXLogPos = xld.WALStart
+			}
+		}
+
+	}
+
+	return nil
+}
+
+func (c *ReplicationClient) Close(ctx context.Context) error {
+	return c.Conn.Close(ctx)
+}
+
+func (c *ReplicationClient) handleReceivedLog(ctx context.Context, walData []byte) error {
+	logicalMsg, err := pglogrepl.ParseV2(walData, c.inStream)
+	if err != nil {
+		return fmt.Errorf("parse logical replication message: %w", err)
+	}
+
+	slog.Debug("Receive a logical replication message", slog.String("type", logicalMsg.Type().String()))
+
+	switch logicalMsg := logicalMsg.(type) {
+	case *pglogrepl.RelationMessageV2:
+		c.relations[logicalMsg.RelationID] = logicalMsg
+
+	case *pglogrepl.BeginMessage:
+		// Indicates the beginning of a group of changes in a transaction. This is only sent for committed transactions. You won't get any events from rolled back transactions.
+		slog.Debug("Begin message", slog.Int("xid", int(logicalMsg.Xid)))
+		c.latestXid = logicalMsg.Xid
+	case *pglogrepl.CommitMessage:
+		slog.Debug("Commit message", slog.String("commit_lsn", logicalMsg.CommitLSN.String()))
+	case *pglogrepl.InsertMessageV2:
+		rel, ok := c.relations[logicalMsg.RelationID]
+		if !ok {
+			return fmt.Errorf("unknown relation ID %d", logicalMsg.RelationID)
+		}
+
+		slog.Debug("Received insert message",
+			slog.Uint64("xid", uint64(c.latestXid)),
+			slog.String("namespace", rel.Namespace),
+			slog.String("relation", rel.RelationName),
+			slog.Bool("in_stream", c.inStream),
+		)
+
+		ev := domain.ReplicationEvent{
+			EventType:     domain.EventTypeInsert,
+			TransactionId: c.latestXid,
+			DBName:        c.sysident.DBName,
+			Namespace:     rel.Namespace,
+			Relation:      rel.RelationName,
+			NewValues:     c.tupleToMap(rel, logicalMsg.Tuple, false),
+		}
+
+		return c.Consumer.OnDataChange(ctx, ev)
+
+	case *pglogrepl.UpdateMessageV2:
+		rel, ok := c.relations[logicalMsg.RelationID]
+		if !ok {
+			return fmt.Errorf("unknown relation ID %d", logicalMsg.RelationID)
+		}
+
+		slog.Debug(
+			"Received update message",
+			slog.Uint64("xid", uint64(c.latestXid)),
+			slog.String("namespace", rel.Namespace),
+			slog.String("relation", rel.RelationName),
+			slog.Bool("in_stream", c.inStream),
+		)
+
+		ev := domain.ReplicationEvent{
+			EventType:     domain.EventTypeUpdate,
+			TransactionId: c.latestXid,
+			DBName:        c.sysident.DBName,
+			Namespace:     rel.Namespace,
+			Relation:      rel.RelationName,
+			NewValues:     c.tupleToMap(rel, logicalMsg.NewTuple, false),
+			OldValues:     c.tupleToMap(rel, logicalMsg.OldTuple, false),
+		}
+
+		return c.Consumer.OnDataChange(ctx, ev)
+	case *pglogrepl.DeleteMessageV2:
+		rel, ok := c.relations[logicalMsg.RelationID]
+		if !ok {
+			return fmt.Errorf("unknown relation ID %d", logicalMsg.RelationID)
+		}
+
+		slog.Debug(
+			"Received deletion message",
+			slog.Uint64("xid", uint64(c.latestXid)),
+			slog.String("namespace", rel.Namespace),
+			slog.String("relation", rel.RelationName),
+			slog.Bool("in_stream", c.inStream),
+		)
+
+		ev := domain.ReplicationEvent{
+			EventType:     domain.EventTypeDelete,
+			TransactionId: c.latestXid,
+			DBName:        c.sysident.DBName,
+			Namespace:     rel.Namespace,
+			Relation:      rel.RelationName,
+			OldValues:     c.tupleToMap(rel, logicalMsg.OldTuple, true),
+		}
+
+		return c.Consumer.OnDataChange(ctx, ev)
+	case *pglogrepl.TruncateMessageV2:
+		for _, relId := range logicalMsg.RelationIDs {
+			rel, ok := c.relations[relId]
+			if !ok {
+				return fmt.Errorf("unknown relation ID %d", relId)
+			}
+
+			slog.Debug(
+				"Received truncation message",
+				slog.Uint64("xid", uint64(c.latestXid)),
+				slog.String("namespace", rel.Namespace),
+				slog.String("relation", rel.RelationName), slog.Int("xid", int(logicalMsg.Xid)),
+				slog.Bool("in_stream", c.inStream),
+			)
+
+			ev := domain.ReplicationEvent{
+				EventType:     domain.EventTypeTruncate,
+				TransactionId: c.latestXid,
+				DBName:        c.sysident.DBName,
+				Namespace:     rel.Namespace,
+				Relation:      rel.RelationName,
+			}
+
+			if err := c.Consumer.OnDataChange(ctx, ev); err != nil {
+				return err
+			}
+		}
+	case *pglogrepl.TypeMessageV2:
+	case *pglogrepl.OriginMessage:
+	case *pglogrepl.LogicalDecodingMessageV2:
+		slog.Debug(
+			"Logical decoding message",
+			slog.String("prefix", logicalMsg.Prefix),
+			slog.Any("content", logicalMsg.Content),
+			slog.Int("xid", int(logicalMsg.Xid)),
+		)
+	case *pglogrepl.StreamStartMessageV2:
+		c.inStream = true
+		slog.Debug("Stream start message",
+			slog.Int("xid", int(logicalMsg.Xid)),
+			slog.Uint64("first_segment", uint64(logicalMsg.FirstSegment)),
+		)
+
+	case *pglogrepl.StreamStopMessageV2:
+		c.inStream = false
+		slog.Debug("Stream stop message")
+	case *pglogrepl.StreamCommitMessageV2:
+		slog.Debug("Stream commit message", slog.Int("xid", int(logicalMsg.Xid)))
+	case *pglogrepl.StreamAbortMessageV2:
+		slog.Debug("Stream abort message", slog.Int("xid", int(logicalMsg.Xid)))
+	default:
+		slog.Warn("Unknown message type", slog.String("type", fmt.Sprintf("%T", logicalMsg)))
+	}
+
+	return nil
+}
+
+func (c *ReplicationClient) tupleToMap(relation *pglogrepl.RelationMessageV2, data *pglogrepl.TupleData, onlyKey bool) *domain.Values {
+	if data == nil {
+		return nil
+	}
+	values := domain.NewValues()
+	for idx, col := range data.Columns {
+		isKey := relation.Columns[idx].Flags&1 > 0
+		if onlyKey && !isKey {
+			continue
+		}
+
+		colName := relation.Columns[idx].Name
+		switch col.DataType {
+		case pglogrepl.TupleDataTypeNull:
+			values.AddValue(isKey, colName, nil)
+		case pglogrepl.TupleDataTypeToast: // unchanged toast
+			// This TOAST value was not changed. TOAST values are not stored in the tuple, and logical replication doesn't want to spend a disk read to fetch its value for you.
+		case pglogrepl.TupleDataTypeBinary:
+			values.AddValue(isKey, colName, col.Data)
+		case pglogrepl.TupleDataTypeText:
+			val, err := c.decodeTextColumnData(col.Data, relation.Columns[idx].DataType)
+			if err != nil {
+				log.Fatalln("error decoding column data:", err)
+			}
+			values.AddValue(isKey, colName, val)
+		}
+	}
+
+	return values
+}
+
+func (c *ReplicationClient) decodeTextColumnData(data []byte, dataType uint32) (any, error) {
+	if dt, ok := c.typeMap.TypeForOID(dataType); ok {
+		decoded, err := dt.Codec.DecodeValue(c.typeMap, dataType, pgtype.TextFormatCode, data)
+		if err != nil {
+			return nil, err
+		}
+
+		switch dataType {
+		case pgtype.UUIDOID:
+			raw := decoded.([16]byte)
+			return uuid.FromBytes(raw[:])
+		default:
+			return decoded, nil
+		}
+	}
+	return string(data), nil
+}
diff --git a/infrastructure/rabbitmq/publisher.go b/infrastructure/rabbitmq/publisher.go
new file mode 100644
index 0000000..595c347
--- /dev/null
+++ b/infrastructure/rabbitmq/publisher.go
@@ -0,0 +1,89 @@
+package rabbitmq
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"net"
+	"strconv"
+	"strings"
+
+	"github.com/wagslane/go-rabbitmq"
+
+	"code.icb4dc0.de/prskr/pg_v_man/core/domain"
+	"code.icb4dc0.de/prskr/pg_v_man/core/ports"
+	"code.icb4dc0.de/prskr/pg_v_man/infrastructure/config"
+)
+
+var _ ports.ReplicationEventConsumer = (*PublishingEventConsumer)(nil)
+
+func NewPublishingEventConsumer(ctx context.Context, cfg config.RabbitMQ) (consumer *PublishingEventConsumer, err error) {
+	var dialer net.Dialer
+	conn, err := rabbitmq.NewConn(cfg.ConnectionString, rabbitmq.WithConnectionOptionsLogging, func(options *rabbitmq.ConnectionOptions) {
+		options.Config.Dial = func(network, addr string) (net.Conn, error) {
+			return dialer.DialContext(ctx, network, addr)
+		}
+	})
+	if err != nil {
+		return nil, fmt.Errorf("could not create RabbitMQ connection: %w", err)
+	}
+
+	defer func() {
+		if err != nil {
+			err = errors.Join(err, conn.Close())
+		}
+	}()
+
+	publisherOptions := []func(*rabbitmq.PublisherOptions){
+		rabbitmq.WithPublisherOptionsLogging,
+		rabbitmq.WithPublisherOptionsExchangeName(cfg.Exchange.Name),
+		rabbitmq.WithPublisherOptionsExchangeKind(cfg.Exchange.Kind),
+		rabbitmq.WithPublisherOptionsExchangeDeclare,
+	}
+
+	if cfg.Exchange.Durable {
+		publisherOptions = append(publisherOptions, rabbitmq.WithPublisherOptionsExchangeDurable)
+	}
+
+	publisher, err := rabbitmq.NewPublisher(
+		conn,
+		publisherOptions...,
+	)
+	if err != nil {
+		return nil, fmt.Errorf("could not create RabbitMQ publisher: %w", err)
+	}
+
+	return &PublishingEventConsumer{Conn: conn, Publisher: publisher, Cfg: cfg}, nil
+}
+
+type PublishingEventConsumer struct {
+	Conn      *rabbitmq.Conn
+	Publisher *rabbitmq.Publisher
+	Cfg       config.RabbitMQ
+}
+
+func (p PublishingEventConsumer) OnDataChange(ctx context.Context, ev domain.ReplicationEvent) error {
+	data, err := json.Marshal(ev)
+	if err != nil {
+		return fmt.Errorf("could not marshal event: %w", err)
+	}
+	return p.Publisher.PublishWithContext(
+		ctx, data, []string{p.Cfg.RoutingKey, strings.Join([]string{ev.DBName, ev.Namespace, ev.Relation}, ".")},
+		rabbitmq.WithPublishOptionsContentType("application/json"),
+		rabbitmq.WithPublishOptionsExchange(p.Cfg.Exchange.Name),
+		rabbitmq.WithPublishOptionsCorrelationID(strconv.Itoa(int(ev.TransactionId))),
+		rabbitmq.WithPublishOptionsType(ev.EventType.String()),
+		rabbitmq.WithPublishOptionsHeaders(rabbitmq.Table{
+			"db":        ev.DBName,
+			"namespace": ev.Namespace,
+			"relation":  ev.Relation,
+		}),
+	)
+}
+
+func (p PublishingEventConsumer) Close() error {
+	p.Publisher.Close()
+
+	return p.Conn.Close()
+}
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..04b95e0
--- /dev/null
+++ b/main.go
@@ -0,0 +1,22 @@
+package main
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"os/signal"
+
+	"code.icb4dc0.de/prskr/pg_v_man/cmd"
+)
+
+func main() {
+	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
+
+	err := cmd.RunApp(ctx)
+	stop()
+
+	if err != nil {
+		fmt.Printf("Error: %v\n", err)
+		os.Exit(1)
+	}
+}