数据工程实战项目¶
📚 项目概述¶
本章提供 5 个完整的实战项目,帮助你将理论知识转化为实际能力。每个项目都包含完整的技术栈、实现步骤、代码示例和部署指南。
🎯 项目列表¶
项目 1 :电商数据仓库¶
项目概述¶
构建一个完整的电商数据仓库,使用现代数据工程技术栈。
技术栈¶
- 数据源: MySQL 、 PostgreSQL
- 数据集成: Airflow 、 Kafka
- 数据处理: Spark 、 Flink
- 数据仓库: Snowflake
- 数据湖: Delta Lake
- 数据质量: Great Expectations
- 数据目录: DataHub
- 数据可视化: Tableau
实现步骤¶
- 需求分析
Python
# 业务需求
requirements = {
'sales_analysis': {
'description': '销售数据分析',
'metrics': ['total_sales', 'average_order_value', 'conversion_rate'],
'dimensions': ['product', 'category', 'customer', 'time']
},
'customer_analysis': {
'description': '客户分析',
'metrics': ['customer_lifetime_value', 'retention_rate', 'churn_rate'],
'dimensions': ['customer_segment', 'acquisition_channel', 'time']
},
'inventory_analysis': {
'description': '库存分析',
'metrics': ['inventory_turnover', 'stockout_rate', 'reorder_point'],
'dimensions': ['product', 'warehouse', 'time']
}
}
- 数据建模
SQL
-- 事实表
CREATE TABLE dwd_sales_fact (
sale_id BIGINT PRIMARY KEY,
product_id BIGINT,
customer_id BIGINT,
time_id BIGINT,
store_id BIGINT,
quantity INT,
amount DECIMAL(10,2),
discount DECIMAL(10,2),
tax DECIMAL(10,2),
net_amount DECIMAL(10,2)
) PARTITIONED BY (YEAR(sale_date), MONTH(sale_date));
-- 维度表
CREATE TABLE dwd_product_dim (
product_id BIGINT PRIMARY KEY,
product_name VARCHAR(200),
category_id BIGINT,
brand_id BIGINT,
price DECIMAL(10,2),
cost DECIMAL(10,2),
margin DECIMAL(10,2)
);
CREATE TABLE dwd_customer_dim (
customer_id BIGINT PRIMARY KEY,
customer_name VARCHAR(200),
email VARCHAR(100),
phone VARCHAR(20),
city VARCHAR(100),
country VARCHAR(100),
customer_segment VARCHAR(50),
acquisition_channel VARCHAR(50)
);
- ETL 流程
Python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_team',
'start_date': datetime(2024, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'ecommerce_warehouse',
default_args=default_args,
description='Ecommerce data warehouse ETL',
schedule='@daily', # 使用新参数名(Airflow 2.x)
catchup=False,
)
def extract_sales():
"""抽取销售数据"""
print("Extracting sales data...")
# 实际的数据抽取逻辑
return 'sales_data.csv'
def transform_sales():
"""转换销售数据"""
print("Transforming sales data...")
# 实际的数据转换逻辑
return 'transformed_sales.csv'
def load_sales():
"""加载销售数据"""
print("Loading sales data...")
# 实际的数据加载逻辑
return 'load_complete'
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_sales,
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform_sales,
dag=dag
)
load_task = PythonOperator(
task_id='load',
python_callable=load_sales,
dag=dag
)
extract_task >> transform_task >> load_task
部署指南¶
Bash
# 1. 克隆项目
git clone https://github.com/yourorg/ecommerce-dw.git
cd ecommerce-dw
# 2. 配置环境
cp config/airflow.cfg.example config/airflow.cfg
cp config/spark.conf.example config/spark.conf
# 3. 启动Airflow
airflow db init
airflow webserver
airflow scheduler
# 4. 验证部署
# 访问Airflow UI: http://localhost:8080
优化建议¶
- 性能优化
- 使用分区表
- 创建索引
-
优化查询
-
成本优化
- 使用压缩
- 优化存储
-
合理配置资源
-
质量优化
- 实施数据质量检查
- 建立数据质量监控
- 定期数据质量审计
项目 2 :实时数据处理平台¶
项目概述¶
构建一个实时数据处理平台,支持实时数据分析和决策。
技术栈¶
- 消息队列: Kafka
- 流处理: Flink
- 存储: Redis 、 Elasticsearch
- 可视化: Grafana
实现步骤¶
- 数据采集
Python
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# 发送实时数据
producer.send('sales', value={
'product_id': 1,
'quantity': 10,
'amount': 100.0,
'timestamp': datetime.now().isoformat()
})
producer.flush()
- 流处理
Python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.execute_sql("""
CREATE TABLE sales (
product_id INT,
quantity INT,
amount DECIMAL(10,2),
timestamp TIMESTAMP(3),
WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'sales',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
result = t_env.sql_query("""
SELECT
product_id,
COUNT(*) as order_count,
SUM(quantity) as total_quantity,
SUM(amount) as total_amount
FROM sales
GROUP BY
product_id,
TUMBLE(timestamp, INTERVAL '1' MINUTE)
""")
result.execute().print()
部署指南¶
Bash
# 1. 启动Kafka
docker-compose up -d kafka zookeeper
# 2. 启动Flink
docker-compose up -d flink-jobmanager flink-taskmanager
# 3. 提交作业
flink run -c com.example.SalesProcessor target/sales-processor.jar
# 4. 验证部署
# 访问Flink UI: http://localhost:8081
项目 3 :数据质量监控平台¶
项目概述¶
构建一个数据质量监控平台,实时监控数据质量。
技术栈¶
- 数据质量: Great Expectations
- 监控: Prometheus 、 Grafana
- 告警: AlertManager
- 存储: PostgreSQL
实现步骤¶
- 数据质量检查
Python
import great_expectations as gx
import pandas as pd
# 创建 Data Context
context = gx.get_context()
# 读取数据并创建 Data Source
df = pd.read_csv('sales.csv')
data_source = context.data_sources.add_pandas(name="sales_source")
data_asset = data_source.add_dataframe_asset(name="sales_asset")
batch_definition = data_asset.add_batch_definition_whole_dataframe("sales_batch")
# 创建 Expectation Suite
suite = context.suites.add(
gx.ExpectationSuite(name="sales_suite")
)
# 添加 Expectations
suite.add_expectation(
gx.expectations.ExpectColumnToExist(column="product_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column='quantity',
min_value=1,
max_value=1000
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column='amount',
min_value=0,
max_value=100000
)
)
# 创建 Validation Definition 并验证
validation_definition = context.validation_definitions.add(
gx.ValidationDefinition(
name="sales_validation",
data=batch_definition,
suite=suite
)
)
result = validation_definition.run(batch_parameters={"dataframe": df})
# 输出结果
print(f"Validation success: {result.success}")
for exp_result in result.results:
print(f" {exp_result.expectation_config.type}: {exp_result.success}")
部署指南¶
Bash
# 1. 安装Great Expectations
pip install great_expectations
# 2. 初始化项目(GX 1.x 使用 Python API 配置,无需 CLI init)
# 参考 08-数据质量.md 中的 gx.get_context() 方式
# 3. 运行验证:通过 Python 脚本执行
python run_validation.py
# 4. 查看报告
# 访问: http://localhost:8080
项目 4 :数据血缘平台¶
项目概述¶
构建一个数据血缘平台,追踪数据流向和依赖关系。
技术栈¶
- 数据目录: DataHub
- 元数据: PostgreSQL
- 可视化: Neo4j
实现步骤¶
- 数据血缘捕获
Python
# 导入 DataHub REST 发射器,用于向 DataHub 服务端推送元数据
from datahub.emitter.rest_emitter import DatahubRestEmitter
# 导入元数据变更提案包装器,封装元数据变更请求
from datahub.emitter.mcp import MetadataChangeProposalWrapper
# 导入各类元数据 Schema 定义类
from datahub.metadata.schema_classes import (
DatasetPropertiesClass, # 数据集属性(名称、描述等)
UpstreamLineageClass, # 上游血缘关系
UpstreamClass, # 上游数据集
DatasetLineageTypeClass, # 血缘类型(转换、复制等)
)
# 创建 DataHub Emitter
emitter = DatahubRestEmitter(
gms_server="http://localhost:8080"
)
# 定义数据集 URN
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:hive,sales_data,PROD)"
# 发送数据集属性
dataset_properties = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=DatasetPropertiesClass(
name="Sales Data",
description="Daily sales data",
customProperties={"team": "data_engineering"}
)
)
emitter.emit(dataset_properties)
# 发送血缘关系(上游依赖),描述数据集之间的转换关系
upstream_urn = "urn:li:dataset:(urn:li:dataPlatform:mysql,raw_orders,PROD)"
lineage = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=upstream_urn,
type=DatasetLineageTypeClass.TRANSFORMED
)
]
)
)
emitter.emit(lineage)
print(f"DataHub metadata emitted for: {dataset_urn}")
部署指南¶
Bash
# 1. 启动DataHub
docker-compose up -d datahub
# 2. 访问DataHub UI
# http://localhost:9002
# 3. 配置数据源
# 添加数据库、数据湖等数据源
项目 5 :实时推荐系统¶
项目概述¶
构建一个实时推荐系统,基于用户行为实时推荐商品。
技术栈¶
- 实时处理: Flink
- 特征存储: Redis
- 模型服务: TensorFlow Serving
- API: FastAPI
实现步骤¶
- 实时特征计算
Python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.execute_sql("""
CREATE TABLE user_events (
user_id BIGINT,
product_id BIGINT,
event_type STRING,
timestamp TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# 计算用户特征
user_features = t_env.sql_query("""
SELECT
user_id,
COUNT(*) as event_count,
COUNT(DISTINCT product_id) as product_diversity,
AVG(CASE WHEN event_type = 'view' THEN 1 ELSE 0 END) as view_rate,
AVG(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchase_rate
FROM user_events
GROUP BY user_id,
TUMBLE(timestamp, INTERVAL '1' HOUR)
""")
user_features.execute().print()
部署指南¶
Bash
# 1. 启动Redis
docker-compose up -d redis
# 2. 启动TensorFlow Serving
docker run -p 8501:8501 \
-v /path/to/model:/models/model \
tensorflow/serving
# 3. 启动Flink
docker-compose up -d flink
# 4. 启动API服务
uvicorn api:app --host 0.0.0.0 --port 8000
📊 项目对比¶
| 项目 | 技术栈 | 难度 | 耗时 | 适用场景 |
|---|---|---|---|---|
| 电商数据仓库 | Airflow, Spark, Snowflake | ⭐⭐⭐ | 4 周 | 传统数据仓库 |
| 实时数据处理平台 | Kafka, Flink, Redis | ⭐⭐⭐⭐ | 4 周 | 实时数据处理 |
| 数据质量监控平台 | Great Expectations, Prometheus | ⭐⭐ | 2 周 | 数据质量监控 |
| 数据血缘平台 | DataHub, Neo4j | ⭐⭐⭐ | 3 周 | 数据治理 |
| 实时推荐系统 | Flink, Redis, TensorFlow | ⭐⭐⭐⭐ | 4 周 | 实时推荐 |
🎯 学习建议¶
- 循序渐进
- 从简单项目开始
- 逐步增加复杂度
-
理解每个技术点
-
动手实践
- 亲自完成每个项目
- 不要只看不做
-
记录遇到的问题
-
扩展优化
- 在基础项目上扩展
- 尝试不同的实现方式
-
优化性能和成本
-
总结反思
- 项目完成后总结
- 记录经验教训
- 分享给他人
📚 参考资料¶
- Apache Airflow 文档: HTTPS://airflow.apache.org/docs/
- Apache Spark 文档: HTTPS://spark.apache.org/docs/
- Apache Flink 文档: HTTPS://flink.apache.org/docs/
- Great Expectations 文档: HTTPS://docs.greatexpectations.io/
- DataHub 文档: HTTPS://datahubproject.io/docs/
🎯 总结¶
通过完成这 5 个实战项目,你将:
- 掌握数据仓库的构建方法
- 理解实时数据处理技术
- 具备数据质量监控能力
- 掌握数据血缘管理
- 具备实时推荐系统开发能力
每个项目都是完整的、可运行的,可以直接部署到生产环境。建议按照项目顺序逐步完成,每个项目完成后进行总结和反思,为下一个项目做好准备。