Bonus 4 — CDC Deep Dive: Debezium, Logical Replication, Kafka Connect

“CDC = mọi thay đổi trong DB stream được. Mở khóa: sync với cache, build read model, audit log, microservice data sharing. Một khi setup CDC, không bao giờ quay lại app code ‘dual-write’.”

Tags: database cdc debezium kafka logical-replication Thời lượng: 4-5 ngày Prerequisites: Tuan-07-Backup-PITR-DR (WAL concept) · Tuan-08-Zero-Downtime-Migration Liên quan: Tuan-Bonus-Outbox-Pattern (SD course)


1. Context

1.1 Vấn đề “dual-write”

graph LR
    App --> DB[(Postgres)]
    App --> Cache[(Redis)]
    App --> Search[(Elasticsearch)]
    App --> Queue[(Kafka)]

App writes to all sinks. Issues:

  • One write succeeds, others fail → inconsistency
  • Latency = slowest sink
  • Retry logic everywhere
  • Hard to add new sink

1.2 CDC solution

graph LR
    App --> DB[(Postgres - source of truth)]
    DB -.WAL/binlog.-> CDC[Debezium]
    CDC --> Kafka[(Kafka)]
    Kafka --> Cache[Cache invalidator]
    Kafka --> Search[ES sync]
    Kafka --> ReadModel[Read model]
    Kafka --> Audit[Audit log]

App writes only to source DB. CDC streams changes. Each consumer subscribes.


2. Postgres Logical Replication

2.1 Setup

# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
-- On source
CREATE PUBLICATION my_pub FOR TABLE orders, users;
 
-- Create replication slot
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');
 
-- On consumer (could be another Postgres or external)
CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=source dbname=appdb'
    PUBLICATION my_pub;

2.2 Plugins

  • pgoutput (built-in) — modern default
  • wal2json — JSON output
  • decoderbufs — protobuf
  • test_decoding — debug only

2.3 What gets replicated

  • INSERT, UPDATE, DELETE
  • TRUNCATE (PG11+)
  • Sequences (PG10+ limited)
  • NOT replicated: DDL (must sync schema manually)

2.4 Slot importance

Replication slot keeps WAL until consumer acks → ensures no data loss. BUT:

  • If consumer down for days → WAL accumulates → primary disk full
  • Monitor: SELECT slot_name, active, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag FROM pg_replication_slots;

3. Debezium

3.1 Overview

Open-source CDC platform. Connects to source DB (Postgres, MySQL, Mongo, etc), writes changes to Kafka.

graph LR
    PG[(Postgres)] -->|logical decode| Debezium[Debezium connector<br/>via Kafka Connect]
    Debezium --> Kafka[(Kafka topic per table)]
    Kafka --> Cons1[Consumer 1]
    Kafka --> Cons2[Consumer 2]

3.2 Setup (Postgres)

// Kafka Connect config
{
    "name": "appdb-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "debezium",
        "database.password": "secret",
        "database.dbname": "appdb",
        "database.server.name": "appdb",
        "schema.include.list": "public",
        "table.include.list": "public.orders,public.users",
        "plugin.name": "pgoutput",
        "slot.name": "debezium_slot",
        "publication.autocreate.mode": "filtered",
        "topic.prefix": "appdb"
    }
}

Topics created: appdb.public.orders, appdb.public.users.

3.3 Event format

{
    "schema": {...},
    "payload": {
        "before": {"id": 42, "name": "Alice", "email": "[email protected]"},
        "after": {"id": 42, "name": "Alice", "email": "[email protected]"},
        "source": {"version": "2.3", "ts_ms": 1700000000000, "db": "appdb", "schema": "public", "table": "users", "lsn": 12345},
        "op": "u",  // c=create, u=update, d=delete, r=read (snapshot)
        "ts_ms": 1700000000050
    }
}

3.4 Snapshot phase

On first connect, Debezium snapshots existing data, then transitions to streaming.

snapshot.mode:
- initial (default) — snapshot then stream
- never — stream only
- when_needed — snapshot if no replication slot offset
- schema_only — schema not data
- always — always snapshot first

3.5 Transforms

Single Message Transforms (SMT) — modify events in transit:

"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"

Flatten {before, after} to just after (typical consumer pattern).

3.6 Schema evolution

When source schema changes, Debezium handles automatically:

  • New column: appears in events
  • Dropped column: missing from events
  • Type change: handled per type

Schema Registry (Confluent) stores schemas, consumers stay in sync.


4. Outbox Pattern

4.1 Problem

App needs to publish events when DB writes happen. Naive: write DB + publish.

# BAD - dual write
db.insert(order)
kafka.publish("order_created", order)
# If kafka fails after db success → out of sync

4.2 Outbox solution

-- Outbox table in same DB
CREATE TABLE outbox (
    id bigserial PRIMARY KEY,
    aggregate_type text,
    aggregate_id text,
    event_type text,
    payload jsonb,
    created_at timestamptz DEFAULT now()
);
 
-- Atomic insert + outbox
BEGIN;
INSERT INTO orders (...) VALUES (...);
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', '42', 'OrderCreated', '{"id": 42, ...}');
COMMIT;

CDC reads from outbox table → publish to Kafka. Atomicity guaranteed (single transaction).

4.3 Debezium Outbox SMT

"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "${routedByValue}.events"

Routes outbox events to topic based on aggregate_type field.

4.4 Outbox cleanup

Outbox table grows. Cleanup:

DELETE FROM outbox WHERE created_at < now() - interval '7 days';

Or use pg_partman for time-based partitioning.


5. Common CDC Patterns

5.1 Cache invalidation

def consume_user_events():
    for event in kafka.consume("appdb.public.users"):
        cache.delete(f"user:{event.payload.after.id}")

Source of truth: Postgres. Cache eventually consistent.

5.2 Search index sync

def consume_product_events():
    for event in kafka.consume("appdb.public.products"):
        if event.op == 'd':
            elasticsearch.delete(event.payload.before.id)
        else:
            elasticsearch.index(event.payload.after)

5.3 Read model (CQRS)

Build denormalized read DB from events.

5.4 Microservice integration

Service A’s DB changes → CDC → Kafka → Service B consumes. Decouples teams without RPC.

5.5 Audit log

Stream all changes to audit DB.

5.6 Data warehouse

CDC → Kafka → ClickHouse/Snowflake. Real-time analytics.


6. Operational Concerns

6.1 Slot bloat — và safety valve PG13+

Stop-or-slow consumer → WAL accumulates → primary disk full risk. Monitor!

SELECT slot_name, active, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_size
FROM pg_replication_slots;

Critical safety valve (PG13+):

max_slot_wal_keep_size = 100GB

Khi 1 stalled slot vượt threshold → Postgres invalidate slot (lost) thay vì để disk full. Reader phải re-snapshot. Trade-off: chấp nhận mất CDC progress vs DB stop.

Production setting:

  • Set max_slot_wal_keep_size = 50-100GB (1-2 days of WAL at your write rate)
  • Alert at 50% threshold
  • Have re-snapshot runbook ready

Pre-PG13 không có cơ chế này → buộc phải monitor + intervene manual. Đây là 1 lý do upgrade lên PG13+.

6.2 Replica lag

Logical replication is async. Lag varies by load.

Mitigations:

  • Throttle bulk writes
  • Multiple Debezium connectors (per table)
  • Faster Kafka cluster

6.3 Initial snapshot of large table

100GB table → snapshot takes time. Strategies:

  • snapshot.mode: schema_only if data already exists at sink
  • Incremental snapshot (Debezium 1.6+) — chunked
  • Parallel snapshot via incremental.snapshot.chunk.size

6.4 Recovery from failure

Debezium tracks LSN consumed → resumes from there. Idempotent consumers important (event may be replayed).

6.5 Schema registry

Always use Confluent Schema Registry or equivalent. Avoid “schema in payload” anti-pattern.


7. Alternative Tools

7.1 AWS DMS

Database Migration Service supports CDC. Lower control but managed.

7.2 Maxwell, pg_kafka, AWS DSQL

Specific use cases. Debezium dominant for general CDC.

7.3 Materialize, RisingWave

Streaming SQL DBs that consume CDC events and maintain materialized views with sub-second freshness.

7.4 Estuary Flow, Hevo, Airbyte

Managed CDC + transformation. Cheaper to start than self-hosted Kafka Connect.


8. Anti-patterns

PatternWhy badFix
Dual-write app+kafkaInconsistency on failureOutbox + CDC
No replication slot monitoringDisk full surpriseAlert on lag
Consumer not idempotentDuplicates on replayIdempotency keys
Schema changes without coordinationConsumer breaksSchema registry + backward-compat
Single huge connectorSPOF, throughputMultiple connectors
Forgot to recreate slot after DB rebuildData lossDocument slot management
Use CDC for low-latency syncAsync, lag existsDifferent tool if <1s strict

9. Lab

Day 1: Logical replication

Setup 2 Postgres, replicate users table via publication/subscription.

Day 2: Debezium

docker-compose: Postgres + Kafka + Kafka Connect + Debezium. Stream events.

Day 3: Outbox pattern

Implement order service with outbox. Watch events flow.

Day 4: Cache sync consumer

Build Redis cache invalidator consuming Debezium events.

Day 5: Search index sync

Build ES sync consumer.


10. Schema Registry Deep Dive

10.1 Why schema registry

Without registry:

  • Producer changes schema → Consumer crashes
  • Multiple versions floating
  • No version compatibility check

With registry:

  • Schemas centrally managed
  • Producer + Consumer reference schema by ID
  • Compatibility checks (backward/forward/full)
  • Evolution rules enforced

10.2 Confluent Schema Registry

# Register schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [...]}"}' \
    http://localhost:8081/subjects/users-value/versions
 
# Get schema
curl http://localhost:8081/subjects/users-value/versions/latest

Compatibility modes:

  • BACKWARD: new schema reads old data
  • FORWARD: old schema reads new data
  • FULL: both
  • NONE: anything goes

Default: BACKWARD (most common, safe).

10.3 Schema evolution rules

ChangeBACKWARDFORWARDFULL
Add optional field
Add required field
Remove field
Rename field
Change type compatdependsdependsdepends

→ Most teams: BACKWARD compatibility, never remove or rename fields. Add new optional fields only.

10.4 Apicurio (Red Hat alternative)

OSS, similar features. Apache 2.0.

10.5 AWS Glue Schema Registry

Managed on AWS. Integrates with MSK (Managed Kafka).


11. CDC Use Cases — Real Patterns

11.1 Cache invalidation (real-time)

async def consume_user_events():
    async for event in kafka.consume("appdb.public.users"):
        user_id = event.payload.after.id if event.op != 'd' else event.payload.before.id
        await redis.delete(f"user:{user_id}")
        await redis.delete(f"user_orders:{user_id}")  # invalidate related too

Postgres source of truth. Cache eventually consistent (<5s lag typical).

11.2 Search index sync

async def consume_product_events():
    async for event in kafka.consume("appdb.public.products"):
        if event.op == 'd':
            await elasticsearch.delete(index="products", id=event.payload.before.id)
        else:
            doc = transform_for_es(event.payload.after)
            await elasticsearch.index(index="products", id=doc.id, body=doc)

11.3 Read model build (CQRS)

Build denormalized read DB from events:

async def build_user_dashboard_view(event):
    if event.table == 'orders':
        await update_user_lifetime_stats(event.payload.after.user_id)
    elif event.table == 'reviews':
        await update_product_rating_avg(event.payload.after.product_id)

11.4 Microservice integration

Service A’s DB changes → CDC → Kafka → Service B consumes. Decouples teams without RPC.

graph LR
    SA[Service A: Orders] --> DBA[(Orders DB)]
    DBA -.CDC.-> Kafka
    Kafka --> SB[Service B: Notifications]
    Kafka --> SC[Service C: Analytics]

11.5 Audit log

Stream all changes to audit DB (separate, write-once).

-- Audit DB - append only
CREATE TABLE audit_log (
    id bigserial PRIMARY KEY,
    source_table text,
    operation text,
    before_data jsonb,
    after_data jsonb,
    lsn text,
    occurred_at timestamptz
);

11.6 Data warehouse / lakehouse ingestion

CDC → Kafka → Iceberg/ClickHouse

Tuần 14 + Bonus Lakehouse. Real-time analytics pipeline.

11.7 Multi-region sync

Replicate via Kafka → reduces direct DB-to-DB cross-region streaming.


12. Operational Patterns

12.1 Initial snapshot of large table

100GB table → snapshot takes time. Strategies:

A. snapshot.mode: initial (default)

  • Locks table briefly, then streams
  • Acceptable for small/medium tables

B. snapshot.mode: schema_only

  • If data already at sink (from previous load)
  • Just sync schema, then stream new changes

C. snapshot.mode: when_needed

  • Snapshot only if no replication slot offset

D. Incremental snapshot (Debezium 1.6+)

  • Chunked snapshot, no long lock
  • Resumable
  • Less blocking on source
{
    "snapshot.mode": "incremental",
    "incremental.snapshot.chunk.size": 1024
}

For huge tables (TB+): incremental snapshot is the only viable option.

12.2 Recovery from failure

Debezium tracks LSN consumed → resumes from there. Idempotent consumers important (event may be replayed).

Patterns:

  • Use database upsert (INSERT ON CONFLICT) on consumer
  • Use idempotency key (event_id) in consumer’s local store
  • Use Kafka’s exactly-once semantics (transactions)

12.3 Topic partitioning

Topics partitioned for parallel consumption. Key consideration:

Topic: appdb.public.orders
Partition key: order ID hash
→ All events for order X go to same partition
→ Consumer sees events in order for any single order

Lose ordering across orders. Acceptable for most cases.

For strict global order: 1 partition (no parallelism).

12.4 Tombstones (Kafka)

Debezium emits null-value message for DELETE in some configs → “tombstone” in Kafka.

Topic compaction removes old records keeping only latest → eventually delete record erased.

12.5 Backpressure

If consumer slow, Kafka buffers. Eventually disk fills.

Mitigate:

  • Scale consumers (more parallelism, more partitions)
  • Increase Kafka retention
  • Filter at Debezium SMT to reduce volume
  • Drop non-essential events

13. Anti-patterns

PatternWhy badFix
Dual-write app+kafkaInconsistency on failureOutbox + CDC
No replication slot monitoringDisk full surprisemax_slot_wal_keep_size + alert on lag
Consumer not idempotentDuplicates on replayIdempotency keys, upsert
Schema changes without coordinationConsumer breaksSchema registry + backward-compat
Single huge connectorSPOF, throughput limitMultiple connectors per table or shard
Forgot to recreate slot after DB rebuildData lossDocument slot management
Use CDC for strict synchronous syncAsync, lag existsDifferent tool if <1s strict needed
No DLQ for failed eventsStuck pipelineDead letter queue config
Snapshot blockingProduction impactIncremental snapshot
Skip Schema RegistryProducer/consumer driftAlways use registry

14. Lab — 7 days

Day 1: Logical replication

Setup 2 Postgres, replicate users table via publication/subscription.

Day 2: Debezium

docker-compose: Postgres + Kafka + Kafka Connect + Debezium. Stream events. Watch topics.

Day 3: Outbox pattern

Implement order service with outbox. Watch events flow.

Day 4: Schema Registry

Setup Confluent Schema Registry. Configure Debezium to use. Test schema evolution (add column, observe).

Day 5: Cache sync consumer

Build Redis cache invalidator consuming Debezium events.

Day 6: Search index sync

Build ES sync consumer. Test inserts/updates/deletes flow.

Day 7: Operational scenarios

  • Stop consumer, observe slot lag growth
  • Test max_slot_wal_keep_size invalidation
  • Test incremental snapshot

15. Self-check

  1. Dual-write problem là gì? CDC giải quyết thế nào?
  2. Outbox pattern — atomicity?
  3. max_slot_wal_keep_size (PG13+) — vai trò?
  4. Debezium event format — op field values?
  5. Snapshot modes — 4 modes + use case?
  6. Incremental snapshot — vì sao quan trọng?
  7. Schema Registry compatibility modes?
  8. Tombstone trong Kafka topic là gì?
  9. Topic partitioning ảnh hưởng ordering thế nào?
  10. Idempotent consumer — vì sao cần?

16. Tiếp theo

Tuan-Bonus-Distributed-SQL-Engineering

Cập nhật: 2026-05-16