Case Study: Design a Modern Data Lakehouse

“Năm 2015 em phải chọn: data warehouse (đắt, structured, ACID) hoặc data lake (rẻ, flexible, no ACID). Năm 2024 em không phải chọn — Lakehouse cho cả hai. Iceberg + Trino + Flink chạy trên S3 với cost 1/10 Snowflake và schema evolution mạnh hơn Hive.”

Tags: system-design lakehouse iceberg delta-lake hudi kappa flink data-engineering case-study bonus Student: Hieu (Backend Dev → Architect) Prerequisite: Tuan-07-Database-Sharding-Replication · Tuan-08-Message-Queue · Case-Design-Ad-Click-Event-Aggregation Liên quan: Tuan-Bonus-Outbox-Pattern · Tuan-Bonus-Consistency-Models-Isolation · Case-Design-Metrics-Monitoring-Alerting


Context & Why — Tại sao Lakehouse?

Analogy: Thư viện truyền thống vs Kho hàng vs Trung tâm logistics

Hieu, tưởng tượng 3 mô hình lưu trữ data:

Data Warehouse (Snowflake, Redshift, BigQuery) — Thư viện 5 sao:

  • Sách được biên mục cẩn thận, có index, ACID
  • Tìm kiếm cực nhanh
  • Đắt: $40-100/TB/tháng
  • Cứng: phải predefine schema (ETL trước)
  • Vendor lock-in: data format proprietary

Data Lake (S3 + Parquet/JSON) — Kho hàng tổng hợp:

  • Vứt mọi thứ vào, format gì cũng được (CSV, JSON, Parquet, video)
  • Rẻ: $20-25/TB/tháng (S3 Standard)
  • Flexible: lưu raw, biến đổi sau (ELT)
  • Vấn đề: không có ACID → concurrent writes corrupt data, schema mỗi file mỗi khác, không update/delete được hiệu quả

Lakehouse (Iceberg + S3 + Trino/Spark/Flink) — Trung tâm logistics hiện đại:

  • Storage rẻ như Data Lake (S3 Parquet)
  • ACID + transactions như Warehouse
  • Schema evolution mạnh
  • Time travel (đọc data tại thời điểm cũ)
  • Multi-engine query (Spark + Flink + Trino + DuckDB cùng đọc 1 table)
  • Open: Apache Iceberg, Apache Hudi, Delta Lake — không vendor lock

Tại sao Backend Dev cần hiểu Lakehouse?

Lý doGiải thích
Mọi production app cần analyticsData từ DB phải đi đâu đó để dashboard, ML, BI
CDC từ Outbox đi vào LakehouseHiểu Iceberg = hiểu data destination
Cost matter at scaleTB → PB scale: Snowflake 400K/year
Real-time analyticsKappa architecture với Flink → Iceberg = sub-minute analytics
ML feature storeModern feature stores (Feast, Tecton) build trên Iceberg
Compliance & auditTime travel + lineage = GDPR/SOX-ready

Tại sao Alex Xu không cover Lakehouse?

Alex Xu vol 1+2 nói về Ad-Click Aggregation (Kappa architecture) nhưng không nhắc Iceberg/Delta Lake — vì đây là evolution 2020+. Năm 2024-2026, mọi data infrastructure mới đều base trên Lakehouse pattern. Đây là gap critical để cập nhật vault này tới state-of-the-art.

Tham chiếu chính


Step 1 — Understand the Problem & Establish Design Scope

1.1 Clarifying Questions

Câu hỏiTrả lời giả địnhImplication
Volume?10 TB/day raw (1B events × 10KB)Cần distributed processing
Latency requirements?Real-time (< 1 min) + batch (hourly/daily)Hybrid — Kappa with batch fallback
Source data?Postgres CDC + Kafka events + S3 raw files + 3rd-party APIsMulti-source ingestion
Query patterns?Ad-hoc SQL (analyst) + ML training + dashboards (real-time)Multi-engine: Trino + Spark + Flink
Update/delete needed?Yes (GDPR right-to-be-forgotten, late-arriving data)Cần ACID + UPSERT support
Schema stability?Evolves over time (add columns, change types)Cần schema evolution
Compliance?GDPR, SOC 2, audit trailTime travel, immutable history
Multi-region?Yes, replicate across US + EUCần geo-replication strategy

1.2 Functional Requirements

  • FR1 — Ingest: Multi-source data ingestion (CDC, streaming events, batch files, API pulls)
  • FR2 — Store: Reliable, ACID-compliant storage at PB scale với cost-effective object storage
  • FR3 — Query: Multi-engine SQL access (analytics, BI, ML)
  • FR4 — Stream: Real-time queries với end-to-end latency < 1 minute
  • FR5 — Update/Delete: Support row-level UPDATE/DELETE (GDPR compliance)
  • FR6 — Time travel: Query data as-of any point in past N days
  • FR7 — Schema evolution: Add/rename/drop columns without rewriting data
  • FR8 — Lineage: Track data provenance (where each row came from, when transformed)

1.3 Non-functional Requirements

Yêu cầuMục tiêu
Scale100 TB warm + 10 PB cold
Query latencyP95 < 10s for ad-hoc; sub-second for cached
Streaming freshness< 1 min from event to queryable
Cost< $0.01 per GB-month at warm tier
Availability99.9% (multi-AZ object storage)
Durability11 nines (S3 standard)
Concurrent writers50+ Flink jobs writing same table without conflicts

1.4 Out of Scope

  • Real-time OLTP (use Postgres/Cassandra for that)
  • Sub-second analytics (use ClickHouse/Druid for those)
  • Graph queries (use Neo4j/Neptune)

Step 2 — High-Level Design

2.1 Architecture Evolution

2.1.1 Lambda Architecture (Legacy, ~2010)

                    ┌─────────────────────┐
                    │   Source events     │
                    └──────────┬──────────┘
                               │
              ┌────────────────┴────────────────┐
              ▼                                  ▼
   ┌──────────────────┐              ┌──────────────────┐
   │   Batch layer    │              │   Speed layer    │
   │   (Hadoop, S3)   │              │   (Storm, Flink) │
   │   Hourly/daily   │              │   Real-time      │
   │   Source of truth│              │   Approximate    │
   └──────────┬───────┘              └─────────┬────────┘
              │                                 │
              ▼                                 ▼
        Batch view                       Real-time view
              │                                 │
              └────────────────┬────────────────┘
                               ▼
                    ┌──────────────────┐
                    │   Serving layer  │
                    │   (Cassandra,    │
                    │    Druid, etc.)  │
                    └──────────────────┘

Vấn đề Lambda:

  • Code duplication: Logic tính toán phải viết 2 lần (batch + speed)
  • Operational complexity: 2 pipelines, 2 systems, 2 teams
  • Inconsistency: Batch và speed view thường disagree
  • Debugging hell: Bug ở đâu? Batch hay speed?

2.1.2 Kappa Architecture (2014, Jay Kreps)

“Why use 2 pipelines when 1 works?” — Jay Kreps, Questioning the Lambda Architecture (2014)

   ┌─────────────────────┐
   │   Source events     │
   └──────────┬──────────┘
              │
              ▼
   ┌──────────────────┐
   │  Kafka (immutable │
   │   event log)      │
   └────────┬──────────┘
            │
            ▼
   ┌──────────────────┐
   │  Stream processor│  ← Single source of truth
   │  (Flink)         │
   └────────┬──────────┘
            │
            ▼
   ┌──────────────────┐
   │  Serving layer   │
   └──────────────────┘

Ưu điểm:

  • 1 pipeline, 1 codebase
  • Reprocess: replay Kafka từ offset 0
  • Consistency: chỉ 1 view duy nhất

Vấn đề: Kafka chỉ giữ ngắn hạn (7-30 ngày) → reprocess data > 30 ngày phức tạp.

2.1.3 Lakehouse Architecture (2020+)

                ┌─────────────────────┐
                │   Source events     │
                └──────────┬──────────┘
                           │
           ┌───────────────┴───────────────┐
           ▼                                ▼
   ┌──────────────┐                ┌──────────────┐
   │   CDC        │                │   Streaming  │
   │   (Debezium) │                │   (Kafka)    │
   └──────┬───────┘                └──────┬───────┘
          │                                │
          └───────────┬────────────────────┘
                      ▼
            ┌──────────────────────────────────┐
            │    Bronze Layer (Raw)             │
            │    Iceberg/Delta on S3            │
            │    Append-only, full fidelity     │
            └──────────────────┬───────────────┘
                               │
                               ▼
            ┌──────────────────────────────────┐
            │    Silver Layer (Cleansed)        │
            │    Iceberg, deduplicated, typed   │
            └──────────────────┬───────────────┘
                               │
                               ▼
            ┌──────────────────────────────────┐
            │    Gold Layer (Aggregated)        │
            │    Business metrics, ML features  │
            └─────────────────────────┬────────┘
                                      │
              ┌────────────┬──────────┴───────┬──────────┐
              ▼            ▼                  ▼          ▼
         ┌────────┐  ┌──────────┐      ┌─────────┐  ┌────────┐
         │ Trino  │  │  Spark   │      │  Flink  │  │ DuckDB │
         │ (BI)   │  │  (ML)    │      │(Stream) │  │(Local) │
         └────────┘  └──────────┘      └─────────┘  └────────┘

Ưu điểm:

  • Single storage layer: tất cả query engine đọc cùng table
  • Bronze/Silver/Gold pattern (Databricks medallion): rõ ràng data quality tier
  • ACID trên S3: Iceberg/Delta cho concurrent writers safety
  • Streaming + Batch unified: Flink writes Iceberg cùng table mà Spark đọc batch
  • Cost: S3 + open format = 1/5 cost Snowflake

2.2 Components Overview

ComponentVai tròChoice
StorageObject store cho data filesAWS S3 / GCS / Azure Blob
File formatColumnar storageApache Parquet (default)
Table formatACID + metadata layerApache Iceberg / Delta Lake / Hudi
CatalogTable registryAWS Glue / Hive Metastore / Polaris / Unity
Streaming ingestReal-time CDC + eventsApache Flink / Spark Streaming
Batch processingETL jobsApache Spark
Query engineSQL accessTrino / Presto / DuckDB
OrchestrationWorkflow schedulingAirflow / Dagster / Prefect
LineageData provenanceOpenLineage
QualityData testsGreat Expectations / Soda

2.3 Why Iceberg vs Delta vs Hudi?

FeatureApache IcebergDelta LakeApache Hudi
OriginNetflix (2017)Databricks (2019)Uber (2017)
GovernanceApache, vendor-neutralLinux Foundation (Databricks-led)Apache
ACID
Schema evolution✅ Strongest (full)✅ Good✅ Limited
Time travel✅ Branch + tag✅ Version✅ Commit timeline
Hidden partitioning✅ Best⚠️ Manual⚠️ Manual
Multi-engine✅ Best (Spark, Flink, Trino, Snowflake)⚠️ Spark-best⚠️ Spark-Hudi best
Streaming upsertGoodGood✅ Best (Hudi MoR)
Vendor lockNoneSome (Databricks features)None
Adoption 2024Rising fast (Snowflake, Apple, Netflix, LinkedIn)Mature (Databricks)Specialized

Khuyến nghị 2024-2026:

  • Iceberg cho greenfield + multi-engine flexibility
  • Delta nếu đã ở Databricks ecosystem
  • Hudi cho heavy upsert workload (CDC-heavy)

2.4 High-Level Architecture Diagram

flowchart TB
    subgraph Sources["Data Sources"]
        PG[(Postgres)]
        Kafka[(Kafka<br/>events)]
        S3RAW[(S3 raw files)]
        API[3rd-party APIs]
    end

    subgraph Ingest["Ingestion Layer"]
        Deb[Debezium CDC]
        FlinkCDC[Flink CDC]
        Airbyte[Airbyte]
    end

    subgraph Lake["Lakehouse Storage (S3 + Iceberg)"]
        Bronze[(Bronze<br/>raw, append-only)]
        Silver[(Silver<br/>cleaned, typed)]
        Gold[(Gold<br/>aggregated, ML features)]
    end

    subgraph Catalog["Catalog Layer"]
        Glue[AWS Glue<br/>Iceberg REST]
    end

    subgraph Compute["Compute Engines"]
        Spark[Apache Spark<br/>batch ETL + ML]
        Flink[Apache Flink<br/>streaming]
        Trino[Trino<br/>ad-hoc SQL]
        DuckDB[DuckDB<br/>local analyst]
    end

    subgraph Consumers["Consumers"]
        BI[BI Tools<br/>Tableau, Superset]
        ML[ML Platform<br/>SageMaker, Feast]
        APP[Applications]
    end

    PG --> Deb --> Bronze
    Kafka --> FlinkCDC --> Bronze
    S3RAW --> Spark --> Bronze
    API --> Airbyte --> Bronze

    Bronze --> Spark --> Silver
    Silver --> Spark --> Gold
    Silver --> Flink --> Gold

    Catalog -.metadata.-> Bronze
    Catalog -.metadata.-> Silver
    Catalog -.metadata.-> Gold

    Gold --> Trino --> BI
    Gold --> Spark --> ML
    Gold --> DuckDB --> APP

    style Bronze fill:#cd7f32,color:#fff
    style Silver fill:#c0c0c0,color:#000
    style Gold fill:#ffd700,color:#000

Step 3 — Deep Dive

3.1 Iceberg Internals

3.1.1 Table Layout on S3

s3://bucket/warehouse/db/table/
├── metadata/
│   ├── v1.metadata.json          ← table metadata at version 1
│   ├── v2.metadata.json          ← version 2 after first commit
│   ├── snap-12345.avro           ← snapshot manifest list
│   └── 67890-m0.avro             ← manifest file
└── data/
    ├── month=2026-01/
    │   ├── 00000-1-abc.parquet
    │   └── 00001-2-def.parquet
    └── month=2026-02/
        └── 00000-3-ghi.parquet

3 levels of metadata:

  1. Table metadata (v*.metadata.json): schema, partition spec, snapshots list
  2. Manifest list (snap-*.avro): list of manifests for a snapshot
  3. Manifest file (*-m0.avro): list of data files + statistics (min/max per column)

Tại sao 3 levels:

  • Snapshot isolation: Reader đọc tại snapshot N, không thấy ghi mới ở N+1
  • Pruning: Manifest có column statistics → query có thể skip files không match WHERE
  • Atomic commit: Đổi pointer trong table metadata → atomic version switch

3.1.2 Schema Evolution

Iceberg unique strength: Add/rename/drop column không cần rewrite data files.

-- Add column (default NULL for old rows)
ALTER TABLE events ADD COLUMN device_id STRING;
 
-- Rename column (just metadata change, files unchanged)
ALTER TABLE events RENAME COLUMN ts TO event_time;
 
-- Change column type (with promotion rules)
ALTER TABLE events ALTER COLUMN value TYPE BIGINT;  -- int → bigint OK
 
-- Drop column (logical, doesn't delete data)
ALTER TABLE events DROP COLUMN deprecated_field;

Cách hoạt động: Iceberg track column bằng field ID (integer), không phải tên. Schema evolution = update mapping field ID ↔ name.

3.1.3 Hidden Partitioning

Hive partitioning vấn đề:

-- Hive: phải biết partition column trong query
SELECT * FROM events
WHERE year=2026 AND month=05 AND day=01
  AND event_time = '2026-05-01 10:30:00';
-- User QUÊN year/month/day → full table scan

Iceberg hidden partitioning:

-- Define partition transformation từ event_time
ALTER TABLE events
  PARTITION BY DAY(event_time);
 
-- User chỉ filter event_time → Iceberg AUTOMATICALLY prune partitions
SELECT * FROM events
WHERE event_time = '2026-05-01 10:30:00';
-- Iceberg apply DAY() transform → prune to 1 partition

Partition transforms hỗ trợ:

  • year(ts), month(ts), day(ts), hour(ts)
  • bucket(N, col) — hash bucketing
  • truncate(W, col) — string/numeric truncation

3.1.4 Time Travel

-- Query as of specific snapshot
SELECT * FROM events.snapshot_id_12345;
 
-- Query as of timestamp (most recent snapshot before)
SELECT * FROM events FOR TIMESTAMP AS OF '2026-04-01 00:00:00';
 
-- Query as of N versions ago
SELECT * FROM events FOR VERSION AS OF 100;

Use cases:

  • GDPR audit: “Show me what user X’s record was on 2025-12-15”
  • Debugging: “Pipeline xuất sai output. So sánh data ngày trước vs hôm nay”
  • Reproducible ML: “Train model với exact training set như tuần trước”
  • Rollback: Bad ETL → revert tới snapshot cũ

3.1.5 Branching & Tagging (Iceberg 1.x)

Like Git for data tables:

-- Create branch for experiments
ALTER TABLE events CREATE BRANCH experiments;
 
-- Write to branch (other readers don't see)
INSERT INTO events.branch_experiments VALUES (...);
 
-- Tag a version
ALTER TABLE events CREATE TAG release_v1 AS OF VERSION 100;
 
-- Rollback main branch to a tag
ALTER TABLE events SET CURRENT SNAPSHOT TO TAG release_v1;

3.2 ACID Implementation

Vấn đề: S3 không có transaction. Concurrent writers ghi cùng table → corrupt.

Iceberg solution: Optimistic Concurrency Control:

1. Reader gets current snapshot (e.g., snapshot 100)
2. Writer A starts transaction, prepares new files
3. Writer B starts transaction, prepares different new files
4. Writer A commits: atomically swap table pointer 100 → 101
5. Writer B tries commit: detect conflict (pointer is now 101, not 100)
6. Writer B retries: re-read snapshot 101, append on top, swap 101 → 102

Atomic swap: Iceberg dùng atomic CAS (compare-and-swap) trên catalog. Catalog backends:

  • Hive Metastore: CAS via DB transaction
  • AWS Glue: CAS via DynamoDB conditional write
  • REST catalog (Polaris): HTTP commit endpoint với etag

Conflict types:

  • Append-append: Compatible (both add files), rare conflict
  • Append-overwrite: Conflict (overwrite changes existing files)
  • Schema evolution + write: Conflict (schema mismatch)

Pattern: Flink consume Kafka → write to Iceberg với exactly-once.

// Flink Iceberg sink
StreamingExecutionEnvironment env = StreamingExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);  // Checkpoint every 60s
 
DataStream<Event> events = env.addSource(kafkaSource);
 
FlinkSink.forRowData(events.map(event -> toRowData(event)))
    .table(icebergTable)
    .tableSchema(schema)
    .writeParallelism(4)
    .upsert(true)  // Use UPSERT for CDC-like workload
    .equalityFieldColumns(Arrays.asList("id"))  // Primary key
    .append();
 
env.execute("Stream to Iceberg");

Exactly-once với Iceberg:

  • Flink checkpoint → flush write to S3 → commit Iceberg snapshot atomically
  • Nếu Flink crash giữa: data files trên S3 nhưng commit chưa happen → orphan files (cleanup later)
  • Recovery: Flink resume from checkpoint, redo write (idempotent vì Iceberg dedup by file hash)

Latency: Checkpoint interval = freshness. 60s checkpoint → data available trong 60s + commit time (~5s).

3.4 Multi-engine Query

Magic của open table format: nhiều engine đọc cùng table.

# Spark
spark.table("warehouse.events").filter("event_time > '2026-01-01'").show()
 
# Trino
trino> SELECT count(*) FROM warehouse.events WHERE event_time > DATE '2026-01-01';
 
# Flink SQL
SELECT count(*) FROM events WHERE event_time > '2026-01-01';
 
# DuckDB (local laptop)
SELECT count(*) FROM iceberg_scan('s3://bucket/warehouse/events')
WHERE event_time > '2026-01-01';
 
# Snowflake (Iceberg external table)
SELECT count(*) FROM iceberg_events WHERE event_time > '2026-01-01';

Why this matters: Không phải migrate data khi chuyển engine. Spark cho ETL nặng, Trino cho analyst BI, Flink cho streaming, DuckDB cho dev local.

3.5 Compaction & Maintenance

Iceberg accumulate small files (mỗi micro-batch tạo 1 file). Phải compact định kỳ:

-- Spark procedure: compact small files into larger ones
CALL system.rewrite_data_files(
  table => 'warehouse.events',
  options => map('target-file-size-bytes', '536870912')  -- 512 MB
);
 
-- Expire old snapshots (release storage)
CALL system.expire_snapshots(
  table => 'warehouse.events',
  older_than => DATE '2026-04-01'
);
 
-- Remove orphan files (data files not referenced)
CALL system.remove_orphan_files(
  table => 'warehouse.events',
  older_than => DATE '2026-04-01'
);
 
-- Rewrite manifests (consolidate)
CALL system.rewrite_manifests('warehouse.events');

Best practice:

  • Compact daily (off-peak)
  • Expire snapshots > 7-30 days
  • Orphan cleanup weekly
  • Monitor file count: > 100K files = degradation

3.6 CDC Integration với Outbox

Linking với Tuan-Bonus-Outbox-Pattern:

Postgres (outbox table)
    │ logical replication
    ▼
Debezium → Kafka topic "events.Order"
    │
    ▼
Flink CDC → Iceberg Bronze layer
    │ batch ETL
    ▼
Iceberg Silver (cleansed, deduplicated)
    │
    ▼
Iceberg Gold (aggregated metrics, ML features)

Bronze layer schema (mirror Postgres + metadata):

CREATE TABLE bronze.orders (
    -- Original columns
    id STRING,
    customer_id STRING,
    total INT,
    status STRING,
    created_at TIMESTAMP,
 
    -- CDC metadata
    op CHAR(1),                    -- 'c'=create, 'u'=update, 'd'=delete
    ts_ms BIGINT,                  -- Kafka timestamp
    source STRUCT<...>,            -- Debezium source info
    txn_id STRING,                 -- Postgres transaction ID
    lsn STRING                     -- WAL log sequence number
)
PARTITIONED BY (DAY(ts_ms))
TBLPROPERTIES (
  'format-version' = '2',
  'write.delete.mode' = 'merge-on-read'
);

3.7 Feature Store Integration

Modern ML feature stores (Feast, Tecton) build trên Iceberg:

# Feast feature definition
from feast import Entity, FeatureView, Field
from feast.types import Int64, Float32
 
driver = Entity(name="driver_id", join_keys=["driver_id"])
 
driver_stats = FeatureView(
    name="driver_stats",
    entities=[driver],
    schema=[
        Field(name="trips_today", dtype=Int64),
        Field(name="avg_rating", dtype=Float32),
    ],
    source=IcebergSource(
        table="warehouse.gold.driver_stats",
        timestamp_field="event_timestamp",
    ),
    ttl=timedelta(days=1),
)

Lợi ích:

  • Training-serving consistency: ML model training read Iceberg historical, serving read Redis (online materialization). Same source.
  • Time travel for backfill: “What features did driver X have on 2026-01-15?” — Iceberg time travel
  • Lineage: Track which features used in which model

Step 4 — Operations & Trade-offs

4.1 Cost Comparison

Scenario: 100 TB warm data + 10 PB cold, query 1 TB/day.

ComponentSnowflakeLakehouse (S3 + Iceberg + Trino)
Storage warm 100 TB/month40/TB)23/TB S3 Standard)
Storage cold 10 PB/monthNot viable economically4/TB S3 Glacier IR)
Compute (queries)$50K/month (warehouse credits)$15K/month (EC2 spot for Trino)
Compute (streaming)Snowpipe N/A streaming$5K/month (Flink on EKS)
Total$54,000+/month$63,260/month but with 100x cold storage

Caveat:

  • Snowflake includes auto-tuning, support
  • Lakehouse cần engineering team
  • Break-even điểm phụ thuộc team size: small team < 10 engineers → Snowflake; large team với data eng dedicated → Lakehouse

4.2 Query Performance

Query typeSnowflakeLakehouse + TrinoLakehouse + Spark
Point lookup (1 row)50ms200ms5s (overhead Spark startup)
Aggregation 1 GB scan1s2s8s
Full table scan 1 TB30s45s60s
ML training (full read)Not optimalOKBest
Concurrent usersHigh (warehouse)MediumLow

Trino best for: Interactive BI, ad-hoc. Spark best for: ETL, ML training. Flink best for: Streaming ingestion.

4.3 When NOT to use Lakehouse

Sub-second analytics: Use ClickHouse, Apache Druid, Pinot ❌ OLTP: Use Postgres, Cassandra ❌ Small data (<1 TB): PostgreSQL with TimescaleDB sufficient ❌ Single team, no data engineer: Snowflake/BigQuery managed services ❌ Heavy mutation (>50% of writes are UPDATEs): Lakehouse OK but tuning needed (Hudi MoR)


Estimation

Storage capacity

Daily ingestion:

  • 1B events/day × 5KB/event = 5 TB/day raw
  • After Parquet compression (~5x): 1 TB/day
  • Bronze + Silver + Gold (~3x amplification): 3 TB/day

Yearly: 365 × 3 TB = ~1.1 PB warm + cold

S3 cost (us-east-1):

  • Standard 25K/month for 1.1 PB
  • Glacier IR 4.4K/month for 1.1 PB

Streaming throughput

Flink Iceberg sink benchmark (4 parallelism, m5.xlarge):

  • ~50K records/s sustained
  • ~5MB/s output to S3 per task
  • Checkpoint interval 60s → ~3M records per checkpoint

Query throughput

Trino cluster (10 worker nodes, m5.4xlarge):

  • ~50 concurrent queries
  • P95 latency 5-10s for 100GB scan
  • Cost: 400/day

Compaction overhead

  • 1 TB/day ingestion = ~10K small files/day
  • Daily compaction merge to ~1K target files (1GB each)
  • Compute cost: ~2 hours of m5.4xlarge × 5 nodes = $20/day

Security First

Encryption

  • At rest: S3 SSE-KMS (AES-256), customer-managed keys
  • In transit: TLS 1.3 for all connections (Trino, Spark, Flink)

Access Control

Multi-layer:

  1. AWS IAM: control bucket-level access
  2. Lake Formation / Glue: column-level + row-level policies
  3. Iceberg ACLs (via REST catalog like Polaris)
  4. Engine-level: Trino access controls, Spark SQL grants
-- Lake Formation column-level access
GRANT SELECT (id, total, created_at) ON warehouse.gold.orders TO 'analyst-role';
-- Note: customer_email NOT granted to analysts
 
-- Row-level filter
CREATE FILTER vn_only ON warehouse.gold.orders
  USING country = 'VN';
GRANT SELECT WITH FILTER vn_only ON warehouse.gold.orders TO 'vn-team';

PII Handling

  • Bronze layer: full PII (encrypted)
  • Silver layer: pseudonymized (hash email, mask card)
  • Gold layer: aggregated only (no PII)

GDPR Right-to-be-forgotten

-- Iceberg V2 supports row-level DELETE
DELETE FROM warehouse.silver.users
WHERE user_id = 'gdpr-request-user-123';
 
-- Required: rewrite affected files via compaction
CALL system.rewrite_data_files('warehouse.silver.users');
 
-- Or use copy-on-write mode:
-- write.delete.mode = 'copy-on-write'

Audit Trail

Iceberg snapshot history = audit log:

SELECT * FROM warehouse.events.snapshots ORDER BY committed_at DESC;
-- Shows every commit: who, when, what files, what operation

DevOps

Docker Compose: Local Lakehouse Stack

version: "3.8"
 
services:
  minio:
    image: minio/minio
    command: server /data --console-address ":9001"
    environment:
      MINIO_ROOT_USER: admin
      MINIO_ROOT_PASSWORD: password
    ports:
      - "9000:9000"
      - "9001:9001"
    volumes:
      - minio-data:/data
 
  rest-catalog:
    image: tabulario/iceberg-rest:1.5.0
    environment:
      AWS_ACCESS_KEY_ID: admin
      AWS_SECRET_ACCESS_KEY: password
      AWS_REGION: us-east-1
      CATALOG_WAREHOUSE: s3://warehouse/
      CATALOG_IO__IMPL: org.apache.iceberg.aws.s3.S3FileIO
      CATALOG_S3_ENDPOINT: http://minio:9000
    ports:
      - "8181:8181"
 
  trino:
    image: trinodb/trino:435
    ports:
      - "8080:8080"
    volumes:
      - ./trino-iceberg.properties:/etc/trino/catalog/iceberg.properties
 
  spark:
    image: tabulario/spark-iceberg:3.5.0
    environment:
      AWS_ACCESS_KEY_ID: admin
      AWS_SECRET_ACCESS_KEY: password
    ports:
      - "8888:8888"  # Jupyter
 
volumes:
  minio-data:
# trino-iceberg.properties
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://rest-catalog:8181
iceberg.rest-catalog.warehouse=s3://warehouse/
fs.native-s3.enabled=true
s3.endpoint=http://minio:9000
s3.region=us-east-1
s3.path-style-access=true
s3.aws-access-key=admin
s3.aws-secret-key=password

Monitoring

Key metrics:

  • Iceberg table size (bytes, file count)
  • Snapshot age (oldest unexpired)
  • Streaming lag (Flink checkpoint to Iceberg commit)
  • Compaction backlog (files > target size)
  • Query P95 latency

Prometheus alerts:

- alert: IcebergCompactionBacklog
  expr: iceberg_small_files_count > 100000
  for: 30m
  annotations:
    summary: "{{ $labels.table }} has {{ $value }} small files"
 
- alert: FlinkCheckpointFailing
  expr: rate(flink_jobmanager_checkpoint_failed[5m]) > 0
  for: 5m
  annotations:
    summary: "Flink checkpoint failing for {{ $labels.job }}"
 
- alert: IcebergSnapshotsTooMany
  expr: iceberg_snapshot_count > 1000
  annotations:
    summary: "Need to expire snapshots"

Disaster Recovery

RPO/RTO:

  • S3 cross-region replication (RPO ~15 min)
  • Catalog backup hourly (Glue → S3 backup bucket)
  • RTO: ~30 min for catalog restore + cluster spin-up

Time travel as backup:

-- Bad ETL deployed at 10:00, found at 10:30
-- Restore table to 09:55 snapshot
CALL system.rollback_to_timestamp('events', TIMESTAMP '2026-05-01 09:55:00');

Code Implementation

Spark + Iceberg Read/Write

"""
Spark + Iceberg quickstart
Run: docker exec -it spark spark-shell
"""
 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from datetime import datetime
 
spark = SparkSession.builder \
    .appName("LakehouseDemo") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.lake", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.lake.type", "rest") \
    .config("spark.sql.catalog.lake.uri", "http://rest-catalog:8181") \
    .getOrCreate()
 
# Create table
spark.sql("""
    CREATE TABLE IF NOT EXISTS lake.demo.orders (
        id STRING,
        customer_id STRING,
        total INT,
        status STRING,
        created_at TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (days(created_at))
    TBLPROPERTIES (
        'format-version' = '2',
        'write.delete.mode' = 'merge-on-read'
    )
""")
 
# Insert
spark.sql("""
    INSERT INTO lake.demo.orders VALUES
    ('ord-1', 'cust-1', 500000, 'completed', TIMESTAMP '2026-05-01 10:00:00'),
    ('ord-2', 'cust-2', 300000, 'pending', TIMESTAMP '2026-05-01 10:05:00')
""")
 
# Query
spark.sql("SELECT * FROM lake.demo.orders").show()
 
# Update (merge-on-read)
spark.sql("""
    UPDATE lake.demo.orders
    SET status = 'completed'
    WHERE id = 'ord-2'
""")
 
# Time travel
snapshot_id = spark.sql("SELECT snapshot_id FROM lake.demo.orders.snapshots ORDER BY committed_at LIMIT 1").first()[0]
spark.sql(f"SELECT * FROM lake.demo.orders VERSION AS OF {snapshot_id}").show()
 
# Schema evolution
spark.sql("ALTER TABLE lake.demo.orders ADD COLUMN payment_method STRING")
 
# Compaction
spark.sql("""
    CALL lake.system.rewrite_data_files(
        table => 'demo.orders',
        options => map('target-file-size-bytes', '536870912')
    )
""")
// Flink Iceberg streaming sink
public class StreamToIceberg {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60_000);
        env.getCheckpointConfig().setCheckpointStorage("s3://bucket/checkpoints");
 
        // Kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers("kafka:9092")
            .setTopics("events.Order")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();
 
        DataStream<String> stream = env.fromSource(source,
            WatermarkStrategy.noWatermarks(), "Kafka");
 
        // Parse JSON to Iceberg row
        DataStream<RowData> rows = stream.map(json -> parseToRow(json));
 
        // Iceberg sink
        TableLoader tableLoader = TableLoader.fromCatalog(
            CatalogLoader.rest("rest", new Configuration(), Map.of(
                "uri", "http://rest-catalog:8181",
                "warehouse", "s3://warehouse/"
            )),
            TableIdentifier.of("demo", "orders_streaming")
        );
 
        FlinkSink.forRowData(rows)
            .tableLoader(tableLoader)
            .upsert(true)
            .equalityFieldColumns(Arrays.asList("id"))
            .writeParallelism(4)
            .append();
 
        env.execute("Stream Orders to Iceberg");
    }
}

Trino Query

-- Trino query Iceberg
USE iceberg.demo;
 
-- Show tables
SHOW TABLES;
 
-- Aggregation
SELECT
    DATE_TRUNC('day', created_at) AS day,
    COUNT(*) AS order_count,
    SUM(total) AS total_revenue
FROM orders
WHERE created_at > DATE '2026-01-01'
GROUP BY 1
ORDER BY 1 DESC;
 
-- Time travel
SELECT * FROM orders FOR TIMESTAMP AS OF TIMESTAMP '2026-04-01 00:00:00';
 
-- Snapshot history
SELECT * FROM "orders$snapshots" ORDER BY committed_at DESC LIMIT 10;
 
-- File statistics
SELECT
    file_path,
    file_size_in_bytes,
    record_count
FROM "orders$files"
ORDER BY file_size_in_bytes DESC
LIMIT 20;

System Design Diagrams

Bronze-Silver-Gold Medallion

flowchart LR
    subgraph Bronze["Bronze Layer"]
        B1["raw_orders<br/>append-only<br/>full CDC payload"]
        B2["raw_clickstream<br/>append-only<br/>schema-less"]
    end

    subgraph Silver["Silver Layer"]
        S1["orders<br/>deduplicated<br/>typed schema<br/>validated"]
        S2["sessions<br/>sessionized<br/>enriched with user"]
    end

    subgraph Gold["Gold Layer"]
        G1["daily_revenue<br/>aggregated by day"]
        G2["user_features<br/>ML training set"]
        G3["funnel_metrics<br/>BI dashboards"]
    end

    B1 -->|"clean + dedupe"| S1
    B2 -->|"sessionize"| S2

    S1 -->|"daily rollup"| G1
    S1 -->|"feature engineering"| G2
    S1 --> G3
    S2 --> G2
    S2 --> G3

    style Bronze fill:#cd7f32,color:#fff
    style Silver fill:#c0c0c0,color:#000
    style Gold fill:#ffd700,color:#000

Iceberg Snapshot Tree

flowchart TD
    Meta[("Table metadata<br/>v3.metadata.json")]

    Meta --> Snap1[("Snapshot 100<br/>commit: 2026-01-01")]
    Meta --> Snap2[("Snapshot 101<br/>commit: 2026-04-01")]
    Meta --> Snap3[("Snapshot 102<br/>commit: 2026-05-01<br/>**current**")]

    Snap1 --> M1[("Manifest list<br/>snap-100.avro")]
    Snap2 --> M2[("Manifest list<br/>snap-101.avro")]
    Snap3 --> M3[("Manifest list<br/>snap-102.avro")]

    M3 --> MF1[("Manifest file<br/>file-1-m0.avro")]
    M3 --> MF2[("Manifest file<br/>file-2-m1.avro")]

    MF1 --> D1[("data/2026-05-01/<br/>file-001.parquet")]
    MF1 --> D2[("data/2026-05-01/<br/>file-002.parquet")]
    MF2 --> D3[("data/2026-05-02/<br/>file-003.parquet")]

    style Snap3 fill:#4caf50,color:#fff

Streaming + Batch Unified

sequenceDiagram
    participant K as Kafka
    participant F as Flink
    participant Ice as Iceberg
    participant S as Spark Batch
    participant T as Trino

    Note over F: Streaming ingest
    K->>F: Event 1
    K->>F: Event 2
    K->>F: Event 3

    Note over F: Checkpoint at 60s
    F->>Ice: Write data files
    F->>Ice: Commit snapshot N

    Note over S: Hourly batch ETL (read snapshot N)
    S->>Ice: SELECT * FROM bronze<br/>(snapshot N)
    S->>Ice: WRITE TO silver<br/>(commit snapshot N+1)

    Note over T: Ad-hoc query (read latest snapshot N+1)
    T->>Ice: SELECT * FROM silver
    Ice-->>T: Data from snapshot N+1

Aha Moments & Pitfalls

Aha Moments

#1: Iceberg/Delta/Hudi không phải database, là metadata layer trên S3. Data files vẫn là Parquet thường. Magic là Iceberg track ACID + snapshot + statistics qua manifest files. Vì vậy có thể chuyển từ Iceberg sang Delta (chỉ rebuild metadata, không di chuyển data).

#2: Open table format = end of vendor lock-in. Cùng dataset query bằng Spark, Flink, Trino, DuckDB, Snowflake (Iceberg external table) — không cần ETL chuyển đổi.

#3: Schema evolution miễn phí. Iceberg track column bằng field ID (integer), không phải tên. Rename column = update mapping, không rewrite TB data.

#4: Hidden partitioning > Hive partitioning. User filter event_time = '2026-05-01' → Iceberg auto-prune partitions, không cần biết schema phân vùng.

#5: Time travel là free disaster recovery. Bad ETL deploy → rollback table về snapshot 1 giờ trước, 0 data loss.

#6: Lambda architecture là quá khứ. Kappa với Flink + Iceberg cho cùng kết quả với 1/2 codebase và operational overhead.

#7: Bronze-Silver-Gold pattern là đơn giản hoá lớn. Mỗi layer rõ ràng quality tier, dễ debug, dễ rollback.

#8: Lakehouse cost-effective nhất ở scale > 100 TB. Dưới đó, Snowflake/BigQuery managed có thể rẻ hơn (vì không cần data engineer).

Pitfalls

Pitfall 1: Quên compact small files

Sai: Streaming ingest mỗi 60s → tạo 1 file/checkpoint → 1440 files/day → 525K files/year → query slow. Đúng: Daily compaction job. Target file size 256MB-1GB.

Pitfall 2: Không expire snapshots

Sai: Mỗi commit tạo 1 snapshot. Sau 1 năm, có 100K+ snapshots → metadata file huge → query chậm. Đúng: Expire snapshots > 7-30 ngày.

Pitfall 3: Partition by user_id (high cardinality)

Sai: 100M users → 100M partitions → metadata explosion. Đúng: Partition by date hoặc bucket(N, user_id) với N hợp lý (e.g., 1024).

Pitfall 4: Đặt Postgres + Lakehouse vào cùng SLA

Sai: Tin Lakehouse có thể replace OLTP. Iceberg latency tối thiểu vài giây. Đúng: OLTP → Postgres/Cassandra. Lakehouse cho analytics + ML, không phải primary store.

Pitfall 5: GDPR delete không rewrite files

Sai: DELETE ở merge-on-read mode chỉ tạo delete marker → file gốc vẫn chứa PII. Đúng: Định kỳ chạy rewrite_data_files() để materialize delete + chứng minh PII đã xoá.

Pitfall 6: Multi-engine không test compatibility

Sai: Spark write + Trino read → version mismatch → lỗi. Đúng: Test integration matrix. Iceberg spec versioning (v1 vs v2). Stick to recent stable versions.

Pitfall 7: Không monitor catalog health

Sai: Glue catalog quota = 100K tables. Cluster scale lên 200K → lỗi mỗi commit. Đúng: Monitor catalog metrics. Plan với REST catalog (Polaris) cho scale lớn.

Pitfall 8: Streaming + batch conflict

Sai: Flink streaming write + Spark batch write cùng table → conflict mỗi commit. Đúng: Tách table (Flink → bronze_streaming, Spark → bronze_batch). Hoặc dùng Hudi MoR cho UPSERT-heavy.

Pitfall 9: Underestimate metadata cost

Sai: 1B small files / 100K manifests → manifest list 100MB → mỗi query đọc 100MB metadata. Đúng: Compact + manifest rewrite định kỳ. Target < 10K files per partition.

Pitfall 10: Không quản lý cost S3

Sai: Glacier IR cho cold + S3 Standard cho warm, nhưng quên lifecycle policy. Đúng: S3 Intelligent-Tiering cho auto-tier. Hoặc explicit lifecycle: warm → IA after 30 days → Glacier after 90 days.


TopicLiên hệ
Tuan-08-Message-QueueKafka là source cho streaming ingestion
Tuan-Bonus-Outbox-PatternOutbox + Debezium → Iceberg Bronze layer
Tuan-Bonus-Consistency-Models-IsolationIceberg dùng snapshot isolation; ACID trên S3
Case-Design-Ad-Click-Event-AggregationKappa architecture với Flink → có thể migrate to Iceberg sink
Case-Design-Metrics-Monitoring-AlertingTime-series specialized vs Lakehouse
Tuan-13-Monitoring-ObservabilityMonitor lakehouse: file count, snapshot age, compaction backlog
Tuan-15-Data-Security-EncryptionS3 SSE-KMS, column-level encryption

Tham khảo

Books:

Papers:

Engineering blogs:

Docs:

Tools:


Tham chiếu trong Roadmap: bonus case study sau khi học xong Phase 4 và 18 case studies Alex Xu Vol 1+2.