跳转至

第 16 章 数据湖仓一体( Lakehouse )

数据湖仓一体

📚 章节概述

Lakehouse 架构融合数据湖的灵活性与数据仓库的可靠性,在统一存储层上同时支持 BI 分析和 AI/ML 工作负载。

🎯 学习目标

完成本章后,你将能够:

  1. 理解 Lakehouse 架构的演进与核心特性
  2. 掌握 Apache Iceberg 和 Delta Lake 两大开放表格式
  3. 了解元数据管理与 Catalog 方案
  4. 掌握 Time Travel 与数据版本管理
  5. 能够设计生产级的 Lakehouse 架构

目录


1. Lakehouse 架构概述

1.1 数据架构演进

Text Only
第一代:数据仓库 (Data Warehouse)
┌──────────┐    ETL    ┌───────────┐
│ 业务系统  │ ───────→ │  数据仓库   │ → BI报表
└──────────┘           └───────────┘
优点:ACID、Schema约束、查询性能好
缺点:成本高、不支持非结构化数据、扩展性差

第二代:数据湖 (Data Lake)
┌──────────┐   Ingest  ┌──────────┐
│ 业务系统  │ ───────→ │  数据湖    │ → AI/ML
│ 日志/IoT  │          │ (S3/HDFS) │
└──────────┘           └──────────┘
优点:低成本、支持所有数据格式、高扩展性
缺点:无ACID、数据沼泽、查询性能差

第三代:Lakehouse
┌──────────┐   Ingest  ┌──────────────────┐
│ 业务系统  │ ───────→ │   Lakehouse       │ → BI + AI
│ 日志/IoT  │          │ ┌──────────────┐  │
│ 流数据    │          │ │ 开放表格式层  │  │
└──────────┘           │ │(Iceberg/Delta)│  │
                       │ ├──────────────┤  │
                       │ │ 对象存储层    │  │
                       │ │(S3/ADLS/GCS) │  │
                       │ └──────────────┘  │
                       └──────────────────┘
优点:融合优势——ACID+低成本+所有数据+AI/BI统一

1.2 Lakehouse 核心特性

特性 说明
ACID 事务 支持原子性、一致性、隔离性、持久性
Schema Enforcement 写入时强制 schema 约束,防止数据质量问题
Schema Evolution 支持安全的 schema 变更(加列、改名等)
Time Travel 查询历史版本数据、支持回滚
开放格式 基于 Parquet + 元数据层,避免厂商锁定
统一存储 BI 查询和 ML 训练使用同一份数据
支持流批一体 同时支持批处理和流处理

1.3 Lakehouse 典型架构

Text Only
         ┌──────────────────────────────────────────┐
查询引擎  │  Spark    Trino/Presto   Flink   DuckDB  │
         └──────────────────┬───────────────────────┘
         ┌──────────────────▼───────────────────────┐
Catalog  │    Hive Metastore / Iceberg REST Catalog  │
         │    Unity Catalog / AWS Glue Catalog        │
         └──────────────────┬───────────────────────┘
         ┌──────────────────▼───────────────────────┐
表格式    │   Apache Iceberg / Delta Lake / Hudi       │
         └──────────────────┬───────────────────────┘
         ┌──────────────────▼───────────────────────┐
文件格式  │    Apache Parquet / ORC / Avro             │
         └──────────────────┬───────────────────────┘
         ┌──────────────────▼───────────────────────┐
存储层    │    S3 / ADLS / GCS / HDFS / MinIO          │
         └──────────────────────────────────────────┘

2. 开放表格式

2.1 为什么需要表格式

原始 Parquet 文件存在的问题:

  • 数据文件只有列式存储,无事务保证
  • 无法安全地并发读写
  • 没有 schema 演化能力
  • 无法进行高效的小文件合并
  • 无法查询历史版本

表格式 = Parquet 数据文件 + 元数据层(事务日志/快照/Manifest )

2.2 三大开放表格式

维度 Apache Iceberg Delta Lake Apache Hudi
创始方 Netflix → Apache Databricks Uber → Apache
底层格式 Parquet/ORC/Avro Parquet Parquet/ORC
元数据 Manifest + Snapshot Transaction Log ( JSON ) Timeline
社区活跃度 ★★★★★ ★★★★☆ ★★★☆☆
引擎兼容性 最广泛 Spark 为主 Spark/Flink
核心优势 开放标准、多引擎 Databricks 深度集成 Upsert 性能好

3. Apache Iceberg 深度解析

3.1 Iceberg 架构

Text Only
                    ┌───────────────┐
    Catalog层       │  Catalog      │  存储表的当前元数据指针
                    │ (Hive/REST)   │
                    └───────┬───────┘
                    ┌───────▼───────┐
    元数据层        │  Metadata File │  表的schema、分区、属性
                    │  (.json)      │
                    └───────┬───────┘
                    ┌───────▼───────┐
    快照层          │  Snapshot      │  表在某个时间点的完整视图
                    │               │
                    └───────┬───────┘
                    ┌───────▼───────┐
    Manifest列表    │ Manifest List  │  指向一组Manifest的索引
                    │ (.avro)       │
                    └───────┬───────┘
                ┌───────────┼────────────┐
        ┌───────▼──────┐         ┌───────▼──────┐
        │ Manifest File│         │ Manifest File│  记录数据文件的元信息
        │ (.avro)      │         │ (.avro)      │  (分区值、统计信息)
        └───────┬──────┘         └───────┬──────┘
                │                        │
     ┌──────────┼──────┐      ┌──────────┼──────┐
     │          │      │      │          │      │
   data1     data2   data3  data4     data5   data6
  .parquet  .parquet .parquet .parquet .parquet .parquet

3.2 Iceberg 核心特性

SQL
-- 创建Iceberg表
CREATE TABLE catalog.db.events (
    event_id    BIGINT,
    event_type  STRING,
    user_id     BIGINT,
    event_time  TIMESTAMP,
    properties  MAP<STRING, STRING>
)
USING iceberg
PARTITIONED BY (days(event_time), bucket(16, user_id))
TBLPROPERTIES (
    'write.format.default' = 'parquet',
    'write.parquet.compression-codec' = 'zstd'
);

-- Hidden Partitioning(隐式分区,查询无需感知分区列)
SELECT * FROM catalog.db.events
WHERE event_time >= '2025-01-01'
  AND event_time < '2025-02-01';
-- Iceberg自动进行分区裁剪,用户无需指定分区列

-- Schema Evolution
ALTER TABLE catalog.db.events ADD COLUMN device STRING;
ALTER TABLE catalog.db.events RENAME COLUMN properties TO metadata;
ALTER TABLE catalog.db.events ALTER COLUMN user_id TYPE BIGINT;

-- Partition Evolution(分区演化,无需重写数据)
ALTER TABLE catalog.db.events
  ADD PARTITION FIELD bucket(32, user_id);  -- 从16桶变为32桶

3.3 Iceberg 分区裁剪

Text Only
传统Hive分区: 查询必须指定分区列
  WHERE dt = '2025-01-15'                ✓ 裁剪生效
  WHERE ts > '2025-01-15 10:00:00'       ✗ 全表扫描

Iceberg隐式分区: 自动推导分区值
  WHERE ts > '2025-01-15 10:00:00'       ✓ 自动裁剪days(ts)分区
  无需暴露分区实现细节给用户

4. Delta Lake 深度解析

4.1 Delta Lake 架构

Text Only
Delta Table = Parquet文件 + _delta_log/

table_path/
├── _delta_log/                    # 事务日志
│   ├── 00000000000000000000.json  # 版本0
│   ├── 00000000000000000001.json  # 版本1
│   ├── 00000000000000000002.json  # 版本2
│   └── 00000000000000000010.checkpoint.parquet  # 检查点
├── part-00000-xxx.snappy.parquet  # 数据文件
├── part-00001-xxx.snappy.parquet
└── part-00002-xxx.snappy.parquet

4.2 Delta Lake 核心特性

Python
from delta import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 写入Delta表
df.write.format("delta").mode("overwrite").save("/data/events")

# MERGE操作(Upsert)
delta_table = DeltaTable.forPath(spark, "/data/events")

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.event_id = source.event_id"
).whenMatchedUpdate(set={
    "event_type": "source.event_type",
    "properties": "source.properties"
}).whenNotMatchedInsertAll().execute()

# Change Data Feed(CDC)
spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 5) \
    .load("/data/events")

# OPTIMIZE(小文件合并)
delta_table.optimize().executeCompaction()

# Z-ORDER(列级聚类索引)
delta_table.optimize().executeZOrderBy("user_id", "event_time")

4.3 Delta Lake Liquid Clustering (新特性)

SQL
-- Liquid Clustering替代传统分区和Z-ORDER
CREATE TABLE events (
    event_id BIGINT,
    user_id BIGINT,
    event_time TIMESTAMP
) USING delta
CLUSTER BY (user_id, event_time);

-- 自动优化聚类
OPTIMIZE events;

5. Iceberg vs Delta Lake 对比

5.1 详细对比

维度 Apache Iceberg Delta Lake
元数据 多级 Manifest 树,高效裁剪 JSON 事务日志,需定期 Checkpoint
引擎支持 Spark/Flink/Trino/Presto/Dremio/StarRocks Spark 为主, Trino/Flink 逐步支持
隐式分区 ✅ 原生支持 ❌ 需显式分区列
分区演化 ✅ 无需重写数据 ❌ 需重写
Merge 性能 一般( Copy-on-Write 为主) 优秀(支持 Merge-on-Read )
行级更新 Position Delete / Equality Delete Deletion Vector
Catalog REST/Hive/Glue/Nessie Unity Catalog/Hive
社区治理 Apache 基金会(厂商中立) Databricks 主导(已开源)
生态集成 AWS/GCP/阿里云广泛支持 Databricks 生态最佳

5.2 选型建议

Text Only
选Apache Iceberg:
├── 多引擎环境(Spark + Trino + Flink混用)
├── 需要厂商中立的开放标准
├── 需要隐式分区和分区演化
├── 大规模表(百万级文件)的元数据性能
└── 非Databricks平台

选Delta Lake:
├── 已使用Databricks平台
├── 需要优秀的Merge/Upsert性能
├── CDC场景较多
├── Spark为主要引擎
└── 需要Liquid Clustering等最新特性

6. 元数据管理

6.1 Catalog 的作用

Text Only
Catalog = 表的注册中心
├── 存储表的位置信息
├── 管理表的schema
├── 提供表级访问控制
└── 支持多引擎发现和访问同一张表

6.2 常见 Catalog 方案

Catalog 类型 适用场景
Hive Metastore 传统 与 Hadoop 生态兼容
AWS Glue Catalog 托管 AWS 环境
Iceberg REST Catalog 开放标准 多引擎、多云环境
Unity Catalog Databricks Databricks 统一治理
Polaris Catalog Snowflake 开源 Iceberg 原生 Catalog
Nessie 开源 类 Git 版本管理 Catalog

6.3 数据治理集成

Text Only
Lakehouse数据治理:
┌─────────────────────────────────────────┐
│  访问控制:  行级过滤 / 列级脱敏           │
│  数据血缘:  OpenLineage / Marquez        │
│  数据发现:  DataHub / OpenMetadata        │
│  数据质量:  Great Expectations / dbt test │
│  合规审计:  操作日志 + Time Travel        │
└─────────────────────────────────────────┘

7. Time Travel 与数据版本管理

7.1 Iceberg Time Travel

SQL
-- 查询历史快照
SELECT * FROM catalog.db.events
  FOR SYSTEM_TIME AS OF '2025-06-01 00:00:00';

-- 按快照ID查询
SELECT * FROM catalog.db.events
  FOR SYSTEM_VERSION AS OF 1234567890;

-- 查看快照历史
SELECT * FROM catalog.db.events.snapshots;

-- 查看历史记录
SELECT * FROM catalog.db.events.history;

-- 回滚到指定快照
CALL catalog.system.rollback_to_snapshot('db.events', 1234567890);

-- 设置快照过期策略
ALTER TABLE catalog.db.events
  SET TBLPROPERTIES ('history.expire.max-snapshot-age-ms' = '604800000');  -- 7天

-- 手动清理过期快照
CALL catalog.system.expire_snapshots('db.events', TIMESTAMP '2025-06-01 00:00:00');

7.2 Delta Lake Time Travel

Python
# 按版本查询
df = spark.read.format("delta").option("versionAsOf", 5).load("/data/events")

# 按时间戳查询
df = spark.read.format("delta") \
    .option("timestampAsOf", "2025-06-01") \
    .load("/data/events")

# 回滚
delta_table = DeltaTable.forPath(spark, "/data/events")
delta_table.restoreToVersion(5)
# 或
delta_table.restoreToTimestamp("2025-06-01")

# 查看历史
delta_table.history().show()

# VACUUM清理旧版本文件(默认保留7天)
delta_table.vacuum(168)  # 小时数

7.3 Time Travel 应用场景

场景 说明
数据审计 查看数据在某个时间点的状态
误操作回滚 恢复到错误操作之前的版本
可重复分析 对固定版本数据运行分析
增量处理 读取两个快照之间的变更
A/B 测试 对同一数据集的不同版本运行对比

8. 与 Spark 集成实战

8.1 Spark + Iceberg

Python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergDemo") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "s3://my-bucket/warehouse") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0") \
    .getOrCreate()

# 创建表
spark.sql("""
    CREATE TABLE local.db.events (
        event_id BIGINT,
        user_id BIGINT,
        event_type STRING,
        event_time TIMESTAMP
    ) USING iceberg
    PARTITIONED BY (days(event_time))
""")

# 写入数据
events_df.writeTo("local.db.events").append()

# Merge Into
spark.sql("""
    MERGE INTO local.db.events t
    USING updates s ON t.event_id = s.event_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

# 维护操作
spark.sql("CALL local.system.rewrite_data_files('db.events')")  # 小文件合并
spark.sql("CALL local.system.expire_snapshots('db.events', TIMESTAMP '2025-06-01')")

8.2 Spark Structured Streaming + Iceberg

Python
# 流式写入Iceberg表
streaming_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "events") \
    .load()

parsed_df = streaming_df.selectExpr(
    "CAST(key AS STRING) as event_id",
    "CAST(value AS STRING) as raw_data",
    "timestamp as event_time"
)

parsed_df.writeStream \
    .format("iceberg") \
    .outputMode("append") \
    .option("checkpointLocation", "s3://my-bucket/checkpoints/events") \
    .toTable("local.db.events")

# 流式读取Iceberg表(增量消费)
spark.readStream \
    .format("iceberg") \
    .option("stream-from-timestamp", "2025-06-01T00:00:00") \
    .load("local.db.events")

8.3 性能优化

SQL
-- 1. 小文件合并(Compaction)
CALL local.system.rewrite_data_files(
    table => 'db.events',
    options => map('target-file-size-bytes', '134217728')  -- 128MB
);

-- 2. 排序优化
CALL local.system.rewrite_data_files(
    table => 'db.events',
    strategy => 'sort',
    sort_order => 'user_id ASC NULLS LAST, event_time DESC'
);

-- 3. 统计信息收集
ALTER TABLE local.db.events
  SET TBLPROPERTIES ('write.metadata.metrics.default' = 'full');

-- 4. Bloom Filter
ALTER TABLE local.db.events
  SET TBLPROPERTIES ('write.parquet.bloom-filter-enabled.column.event_id' = 'true');

9. Lakehouse 最佳实践

9.1 分层设计( Medallion 架构)

Text Only
┌────────────────────────────────────────────────────┐
│                 Medallion Architecture               │
│                                                      │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐        │
│  │  Bronze   │ → │  Silver   │ → │  Gold     │       │
│  │  (原始层)  │   │  (清洗层)  │   │  (聚合层)  │       │
│  │          │   │          │   │          │        │
│  │ 原始数据  │   │ 清洗去重   │   │ 业务指标   │        │
│  │ 全量保留  │   │ Schema标准 │   │ 聚合报表   │        │
│  │ Append   │   │ 数据质量   │   │ ML特征    │        │
│  └──────────┘   └──────────┘   └──────────┘        │
└────────────────────────────────────────────────────┘

9.2 核心设计原则

  1. 选择合适的表格式:多引擎选 Iceberg , Databricks 选 Delta
  2. 合理分区策略:避免过度分区(每个分区至少 128MB )
  3. 定期维护:小文件合并、快照过期、孤儿文件清理
  4. 统一 Catalog:所有引擎通过同一 Catalog 访问数据
  5. 数据质量前移:在 Bronze→Silver 阶段做好数据验证

10. 面试题精选

Q1: Lakehouse 相比数据湖的核心优势

  1. ACID 事务:保证数据一致性,支持并发读写
  2. Schema Enforcement:写入时校验,防止数据质量问题
  3. Time Travel:查询历史版本,支持回滚
  4. 统一存储: BI 分析和 ML 训练使用同一份数据,消除数据孤岛
  5. 高性能:列裁剪、分区裁剪、统计信息等优化

Q2: Iceberg 的隐式分区有什么优势

  • 用户查询时不需要知道分区列,引擎自动裁剪
  • 分区演化不需要重写数据
  • 避免了 Hive 分区导致的"查询必须带分区条件"的限制
  • 支持基于时间的year/month/day/hourbucket/truncate等转换

Q3: Delta Lake 的事务日志如何工作

  • 每次写入操作生成一个JSON 日志文件(版本号递增)
  • 日志记录: AddFile 、 RemoveFile 、 Metadata 变更等操作
  • 读取时重放日志构建当前表状态
  • Checkpoint(每 10 个版本):将日志压缩为 Parquet 格式,加速读取
  • 并发写入通过乐观并发控制解决冲突

Q4: Medallion 架构的三层分别做什么

层次 数据特征 处理逻辑
Bronze 原始、未处理 1:1 复制源数据,保留全部字段
Silver 清洗、标准化 去重、类型转换、数据质量验证
Gold 聚合、业务化 指标计算、维度建模、特征工程

Q5: 如何处理 Lakehouse 中的小文件问题

  1. 定期运行rewrite_data_files( Iceberg )或OPTIMIZE( Delta )
  2. 设置合理的目标文件大小(推荐 128MB-256MB )
  3. 流式写入使用较大的触发间隔或上游攒批
  4. 配置自动优化( Auto-compaction )

11. 推荐资源


下一步学习:结合05-数据湖技术理解底层存储,参考17-dbt 数据转换实现 Lakehouse 上的数据转换流水线。