Bonus 4 — CDC Deep Dive: Debezium, Logical Replication, Kafka Connect
“CDC = mọi thay đổi trong DB stream được. Mở khóa: sync với cache, build read model, audit log, microservice data sharing. Một khi setup CDC, không bao giờ quay lại app code ‘dual-write’.”
Tags: database cdc debezium kafka logical-replication Thời lượng: 4-5 ngày Prerequisites: Tuan-07-Backup-PITR-DR (WAL concept) · Tuan-08-Zero-Downtime-Migration Liên quan: Tuan-Bonus-Outbox-Pattern (SD course)
1. Context
1.1 Vấn đề “dual-write”
graph LR App --> DB[(Postgres)] App --> Cache[(Redis)] App --> Search[(Elasticsearch)] App --> Queue[(Kafka)]
App writes to all sinks. Issues:
- One write succeeds, others fail → inconsistency
- Latency = slowest sink
- Retry logic everywhere
- Hard to add new sink
1.2 CDC solution
graph LR App --> DB[(Postgres - source of truth)] DB -.WAL/binlog.-> CDC[Debezium] CDC --> Kafka[(Kafka)] Kafka --> Cache[Cache invalidator] Kafka --> Search[ES sync] Kafka --> ReadModel[Read model] Kafka --> Audit[Audit log]
App writes only to source DB. CDC streams changes. Each consumer subscribes.
2. Postgres Logical Replication
2.1 Setup
# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10-- On source
CREATE PUBLICATION my_pub FOR TABLE orders, users;
-- Create replication slot
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');
-- On consumer (could be another Postgres or external)
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=source dbname=appdb'
PUBLICATION my_pub;2.2 Plugins
pgoutput(built-in) — modern defaultwal2json— JSON outputdecoderbufs— protobuftest_decoding— debug only
2.3 What gets replicated
- INSERT, UPDATE, DELETE
- TRUNCATE (PG11+)
- Sequences (PG10+ limited)
- NOT replicated: DDL (must sync schema manually)
2.4 Slot importance
Replication slot keeps WAL until consumer acks → ensures no data loss. BUT:
- If consumer down for days → WAL accumulates → primary disk full
- Monitor:
SELECT slot_name, active, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag FROM pg_replication_slots;
3. Debezium
3.1 Overview
Open-source CDC platform. Connects to source DB (Postgres, MySQL, Mongo, etc), writes changes to Kafka.
graph LR PG[(Postgres)] -->|logical decode| Debezium[Debezium connector<br/>via Kafka Connect] Debezium --> Kafka[(Kafka topic per table)] Kafka --> Cons1[Consumer 1] Kafka --> Cons2[Consumer 2]
3.2 Setup (Postgres)
// Kafka Connect config
{
"name": "appdb-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "appdb",
"database.server.name": "appdb",
"schema.include.list": "public",
"table.include.list": "public.orders,public.users",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.autocreate.mode": "filtered",
"topic.prefix": "appdb"
}
}Topics created: appdb.public.orders, appdb.public.users.
3.3 Event format
{
"schema": {...},
"payload": {
"before": {"id": 42, "name": "Alice", "email": "[email protected]"},
"after": {"id": 42, "name": "Alice", "email": "[email protected]"},
"source": {"version": "2.3", "ts_ms": 1700000000000, "db": "appdb", "schema": "public", "table": "users", "lsn": 12345},
"op": "u", // c=create, u=update, d=delete, r=read (snapshot)
"ts_ms": 1700000000050
}
}3.4 Snapshot phase
On first connect, Debezium snapshots existing data, then transitions to streaming.
snapshot.mode:
- initial (default) — snapshot then stream
- never — stream only
- when_needed — snapshot if no replication slot offset
- schema_only — schema not data
- always — always snapshot first
3.5 Transforms
Single Message Transforms (SMT) — modify events in transit:
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"Flatten {before, after} to just after (typical consumer pattern).
3.6 Schema evolution
When source schema changes, Debezium handles automatically:
- New column: appears in events
- Dropped column: missing from events
- Type change: handled per type
Schema Registry (Confluent) stores schemas, consumers stay in sync.
4. Outbox Pattern
4.1 Problem
App needs to publish events when DB writes happen. Naive: write DB + publish.
# BAD - dual write
db.insert(order)
kafka.publish("order_created", order)
# If kafka fails after db success → out of sync4.2 Outbox solution
-- Outbox table in same DB
CREATE TABLE outbox (
id bigserial PRIMARY KEY,
aggregate_type text,
aggregate_id text,
event_type text,
payload jsonb,
created_at timestamptz DEFAULT now()
);
-- Atomic insert + outbox
BEGIN;
INSERT INTO orders (...) VALUES (...);
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Order', '42', 'OrderCreated', '{"id": 42, ...}');
COMMIT;CDC reads from outbox table → publish to Kafka. Atomicity guaranteed (single transaction).
4.3 Debezium Outbox SMT
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "${routedByValue}.events"Routes outbox events to topic based on aggregate_type field.
4.4 Outbox cleanup
Outbox table grows. Cleanup:
DELETE FROM outbox WHERE created_at < now() - interval '7 days';Or use pg_partman for time-based partitioning.
5. Common CDC Patterns
5.1 Cache invalidation
def consume_user_events():
for event in kafka.consume("appdb.public.users"):
cache.delete(f"user:{event.payload.after.id}")Source of truth: Postgres. Cache eventually consistent.
5.2 Search index sync
def consume_product_events():
for event in kafka.consume("appdb.public.products"):
if event.op == 'd':
elasticsearch.delete(event.payload.before.id)
else:
elasticsearch.index(event.payload.after)5.3 Read model (CQRS)
Build denormalized read DB from events.
5.4 Microservice integration
Service A’s DB changes → CDC → Kafka → Service B consumes. Decouples teams without RPC.
5.5 Audit log
Stream all changes to audit DB.
5.6 Data warehouse
CDC → Kafka → ClickHouse/Snowflake. Real-time analytics.
6. Operational Concerns
6.1 Slot bloat — và safety valve PG13+
Stop-or-slow consumer → WAL accumulates → primary disk full risk. Monitor!
SELECT slot_name, active, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_size
FROM pg_replication_slots;Critical safety valve (PG13+):
max_slot_wal_keep_size = 100GBKhi 1 stalled slot vượt threshold → Postgres invalidate slot (lost) thay vì để disk full. Reader phải re-snapshot. Trade-off: chấp nhận mất CDC progress vs DB stop.
Production setting:
- Set
max_slot_wal_keep_size= 50-100GB (1-2 days of WAL at your write rate) - Alert at 50% threshold
- Have re-snapshot runbook ready
Pre-PG13 không có cơ chế này → buộc phải monitor + intervene manual. Đây là 1 lý do upgrade lên PG13+.
6.2 Replica lag
Logical replication is async. Lag varies by load.
Mitigations:
- Throttle bulk writes
- Multiple Debezium connectors (per table)
- Faster Kafka cluster
6.3 Initial snapshot of large table
100GB table → snapshot takes time. Strategies:
snapshot.mode: schema_onlyif data already exists at sink- Incremental snapshot (Debezium 1.6+) — chunked
- Parallel snapshot via
incremental.snapshot.chunk.size
6.4 Recovery from failure
Debezium tracks LSN consumed → resumes from there. Idempotent consumers important (event may be replayed).
6.5 Schema registry
Always use Confluent Schema Registry or equivalent. Avoid “schema in payload” anti-pattern.
7. Alternative Tools
7.1 AWS DMS
Database Migration Service supports CDC. Lower control but managed.
7.2 Maxwell, pg_kafka, AWS DSQL
Specific use cases. Debezium dominant for general CDC.
7.3 Materialize, RisingWave
Streaming SQL DBs that consume CDC events and maintain materialized views with sub-second freshness.
7.4 Estuary Flow, Hevo, Airbyte
Managed CDC + transformation. Cheaper to start than self-hosted Kafka Connect.
8. Anti-patterns
| Pattern | Why bad | Fix |
|---|---|---|
| Dual-write app+kafka | Inconsistency on failure | Outbox + CDC |
| No replication slot monitoring | Disk full surprise | Alert on lag |
| Consumer not idempotent | Duplicates on replay | Idempotency keys |
| Schema changes without coordination | Consumer breaks | Schema registry + backward-compat |
| Single huge connector | SPOF, throughput | Multiple connectors |
| Forgot to recreate slot after DB rebuild | Data loss | Document slot management |
| Use CDC for low-latency sync | Async, lag exists | Different tool if <1s strict |
9. Lab
Day 1: Logical replication
Setup 2 Postgres, replicate users table via publication/subscription.
Day 2: Debezium
docker-compose: Postgres + Kafka + Kafka Connect + Debezium. Stream events.
Day 3: Outbox pattern
Implement order service with outbox. Watch events flow.
Day 4: Cache sync consumer
Build Redis cache invalidator consuming Debezium events.
Day 5: Search index sync
Build ES sync consumer.
10. Schema Registry Deep Dive
10.1 Why schema registry
Without registry:
- Producer changes schema → Consumer crashes
- Multiple versions floating
- No version compatibility check
With registry:
- Schemas centrally managed
- Producer + Consumer reference schema by ID
- Compatibility checks (backward/forward/full)
- Evolution rules enforced
10.2 Confluent Schema Registry
# Register schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [...]}"}' \
http://localhost:8081/subjects/users-value/versions
# Get schema
curl http://localhost:8081/subjects/users-value/versions/latestCompatibility modes:
BACKWARD: new schema reads old dataFORWARD: old schema reads new dataFULL: bothNONE: anything goes
Default: BACKWARD (most common, safe).
10.3 Schema evolution rules
| Change | BACKWARD | FORWARD | FULL |
|---|---|---|---|
| Add optional field | ✓ | ✗ | ✗ |
| Add required field | ✗ | ✗ | ✗ |
| Remove field | ✗ | ✓ | ✗ |
| Rename field | ✗ | ✗ | ✗ |
| Change type compat | depends | depends | depends |
→ Most teams: BACKWARD compatibility, never remove or rename fields. Add new optional fields only.
10.4 Apicurio (Red Hat alternative)
OSS, similar features. Apache 2.0.
10.5 AWS Glue Schema Registry
Managed on AWS. Integrates with MSK (Managed Kafka).
11. CDC Use Cases — Real Patterns
11.1 Cache invalidation (real-time)
async def consume_user_events():
async for event in kafka.consume("appdb.public.users"):
user_id = event.payload.after.id if event.op != 'd' else event.payload.before.id
await redis.delete(f"user:{user_id}")
await redis.delete(f"user_orders:{user_id}") # invalidate related tooPostgres source of truth. Cache eventually consistent (<5s lag typical).
11.2 Search index sync
async def consume_product_events():
async for event in kafka.consume("appdb.public.products"):
if event.op == 'd':
await elasticsearch.delete(index="products", id=event.payload.before.id)
else:
doc = transform_for_es(event.payload.after)
await elasticsearch.index(index="products", id=doc.id, body=doc)11.3 Read model build (CQRS)
Build denormalized read DB from events:
async def build_user_dashboard_view(event):
if event.table == 'orders':
await update_user_lifetime_stats(event.payload.after.user_id)
elif event.table == 'reviews':
await update_product_rating_avg(event.payload.after.product_id)11.4 Microservice integration
Service A’s DB changes → CDC → Kafka → Service B consumes. Decouples teams without RPC.
graph LR SA[Service A: Orders] --> DBA[(Orders DB)] DBA -.CDC.-> Kafka Kafka --> SB[Service B: Notifications] Kafka --> SC[Service C: Analytics]
11.5 Audit log
Stream all changes to audit DB (separate, write-once).
-- Audit DB - append only
CREATE TABLE audit_log (
id bigserial PRIMARY KEY,
source_table text,
operation text,
before_data jsonb,
after_data jsonb,
lsn text,
occurred_at timestamptz
);11.6 Data warehouse / lakehouse ingestion
CDC → Kafka → Iceberg/ClickHouse
Tuần 14 + Bonus Lakehouse. Real-time analytics pipeline.
11.7 Multi-region sync
Replicate via Kafka → reduces direct DB-to-DB cross-region streaming.
12. Operational Patterns
12.1 Initial snapshot of large table
100GB table → snapshot takes time. Strategies:
A. snapshot.mode: initial (default)
- Locks table briefly, then streams
- Acceptable for small/medium tables
B. snapshot.mode: schema_only
- If data already at sink (from previous load)
- Just sync schema, then stream new changes
C. snapshot.mode: when_needed
- Snapshot only if no replication slot offset
D. Incremental snapshot (Debezium 1.6+)
- Chunked snapshot, no long lock
- Resumable
- Less blocking on source
{
"snapshot.mode": "incremental",
"incremental.snapshot.chunk.size": 1024
}For huge tables (TB+): incremental snapshot is the only viable option.
12.2 Recovery from failure
Debezium tracks LSN consumed → resumes from there. Idempotent consumers important (event may be replayed).
Patterns:
- Use database upsert (INSERT ON CONFLICT) on consumer
- Use idempotency key (event_id) in consumer’s local store
- Use Kafka’s exactly-once semantics (transactions)
12.3 Topic partitioning
Topics partitioned for parallel consumption. Key consideration:
Topic: appdb.public.orders
Partition key: order ID hash
→ All events for order X go to same partition
→ Consumer sees events in order for any single order
Lose ordering across orders. Acceptable for most cases.
For strict global order: 1 partition (no parallelism).
12.4 Tombstones (Kafka)
Debezium emits null-value message for DELETE in some configs → “tombstone” in Kafka.
Topic compaction removes old records keeping only latest → eventually delete record erased.
12.5 Backpressure
If consumer slow, Kafka buffers. Eventually disk fills.
Mitigate:
- Scale consumers (more parallelism, more partitions)
- Increase Kafka retention
- Filter at Debezium SMT to reduce volume
- Drop non-essential events
13. Anti-patterns
| Pattern | Why bad | Fix |
|---|---|---|
| Dual-write app+kafka | Inconsistency on failure | Outbox + CDC |
| No replication slot monitoring | Disk full surprise | max_slot_wal_keep_size + alert on lag |
| Consumer not idempotent | Duplicates on replay | Idempotency keys, upsert |
| Schema changes without coordination | Consumer breaks | Schema registry + backward-compat |
| Single huge connector | SPOF, throughput limit | Multiple connectors per table or shard |
| Forgot to recreate slot after DB rebuild | Data loss | Document slot management |
| Use CDC for strict synchronous sync | Async, lag exists | Different tool if <1s strict needed |
| No DLQ for failed events | Stuck pipeline | Dead letter queue config |
| Snapshot blocking | Production impact | Incremental snapshot |
| Skip Schema Registry | Producer/consumer drift | Always use registry |
14. Lab — 7 days
Day 1: Logical replication
Setup 2 Postgres, replicate users table via publication/subscription.
Day 2: Debezium
docker-compose: Postgres + Kafka + Kafka Connect + Debezium. Stream events. Watch topics.
Day 3: Outbox pattern
Implement order service with outbox. Watch events flow.
Day 4: Schema Registry
Setup Confluent Schema Registry. Configure Debezium to use. Test schema evolution (add column, observe).
Day 5: Cache sync consumer
Build Redis cache invalidator consuming Debezium events.
Day 6: Search index sync
Build ES sync consumer. Test inserts/updates/deletes flow.
Day 7: Operational scenarios
- Stop consumer, observe slot lag growth
- Test
max_slot_wal_keep_sizeinvalidation - Test incremental snapshot
15. Self-check
- Dual-write problem là gì? CDC giải quyết thế nào?
- Outbox pattern — atomicity?
max_slot_wal_keep_size(PG13+) — vai trò?- Debezium event format —
opfield values? - Snapshot modes — 4 modes + use case?
- Incremental snapshot — vì sao quan trọng?
- Schema Registry compatibility modes?
- Tombstone trong Kafka topic là gì?
- Topic partitioning ảnh hưởng ordering thế nào?
- Idempotent consumer — vì sao cần?
16. Tiếp theo
Tuan-Bonus-Distributed-SQL-Engineering
Cập nhật: 2026-05-16