第 06 章 实时数据处理¶
📚 章节概述¶
本章将深入讲解实时数据处理,包括 Kafka 、 Flink 、 Spark Streaming 等。通过本章学习,你将能够设计和实现实时数据处理系统。
🎯 学习目标¶
完成本章后,你将能够:
- 理解实时数据处理的核心概念
- 掌握 Kafka 消息队列的使用
- 了解 Flink 流处理框架
- 掌握 Spark Streaming
- 能够设计和实现实时数据处理系统
6.1 实时数据处理概述¶
6.1.1 什么是实时数据处理¶
实时数据处理是对数据流进行实时处理和分析的技术。
实时处理特点¶
- 低延迟
- 毫秒级响应
- 实时决策
-
快速反馈
-
高吞吐
- 处理大量数据
- 水平扩展
-
弹性伸缩
-
持续处理
- 7x24 运行
- 数据流式处理
- 无缝扩展
6.1.2 实时 vs 批处理¶
| 特性 | 实时处理 | 批处理 |
|---|---|---|
| 延迟 | 毫秒级 | 分钟/小时级 |
| 吞吐 | 高 | 中 |
| 成本 | 高 | 低 |
| 复杂度 | 高 | 中 |
6.2 Kafka 消息队列¶
6.2.1 Kafka 概述¶
Apache Kafka是分布式流处理平台,用于构建实时数据管道和流应用。
核心概念¶
- Topic (主题)
- 数据流分类
- 类似数据库表
-
支持多分区
-
Partition (分区)
- Topic 的分片
- 提高并行度
-
保证顺序
-
Producer (生产者)
- 发送消息
- 支持多种协议
-
高吞吐写入
-
Consumer (消费者)
- 接收消息
- 支持消费组
- 自动负载均衡
6.2.2 Kafka 使用¶
Python
from kafka import KafkaProducer, KafkaConsumer
import json
# 生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8') # lambda匿名函数:简洁的单行函数
)
# 发送消息
producer.send('sales', value={'product_id': 1, 'quantity': 10, 'amount': 100.0})
producer.flush()
# 消费者
consumer = KafkaConsumer(
'sales',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')), # json.loads将JSON字符串转为Python对象
auto_offset_reset='earliest'
)
# 接收消息
for message in consumer:
print(f"Received: {message.value}")
# 处理消息
process_message(message.value)
6.3 Flink 流处理¶
6.3.1 Flink 概述¶
Apache Flink是分布式流处理引擎,支持有界和无界数据流。
核心特性¶
- 流批一体
- 统一 API
- 流批切换
-
相同语义
-
状态管理
- 精确一次
- 状态后端
-
容错机制
-
窗口操作
- 时间窗口
- 计数窗口
- 会话窗口
6.3.2 Flink 使用¶
Python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.expressions import col
# 创建执行环境
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# 定义数据源
t_env.execute_sql("""
CREATE TABLE sales (
product_id INT,
quantity INT,
amount DECIMAL(10,2),
sale_time TIMESTAMP(3),
WATERMARK FOR sale_time AS sale_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'sales',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'sales-group',
'format' = 'json'
)
""")
# 实时聚合
result = t_env.sql_query("""
SELECT
product_id,
COUNT(*) as order_count,
SUM(quantity) as total_quantity,
SUM(amount) as total_amount
FROM sales
GROUP BY
product_id,
TUMBLE(sale_time, INTERVAL '1' MINUTE)
""")
# 定义输出表
t_env.execute_sql("""
CREATE TABLE sales_aggregate (
product_id INT,
order_count BIGINT,
total_quantity BIGINT,
total_amount DECIMAL(10,2)
) WITH (
'connector' = 'print'
)
""")
# 将结果写入输出表
result.execute_insert('sales_aggregate').wait()
6.4 Spark Streaming¶
6.4.1 Spark Streaming 概述¶
Spark Streaming是 Spark 的流处理组件,支持微批处理。
核心特性¶
- 微批处理
- 批次处理
- 低延迟
-
高吞吐
-
Exactly-Once
- 精确一次
- 事务支持
-
容错机制
-
统一 API
- 流批一体
- 相同操作
- 易于使用
6.4.2 Spark Streaming 使用¶
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col
from pyspark.sql.types import StructType, IntegerType, TimestampType, DecimalType
# 创建SparkSession
spark = SparkSession.builder \
.appName("SparkStreamingExample") \
.getOrCreate()
# 读取Kafka数据
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sales") \
.load()
# 解析数据
from pyspark.sql.functions import from_json, to_json
schema = StructType().add("product_id", IntegerType()) \
.add("quantity", IntegerType()) \
.add("amount", DecimalType(10,2)) \
.add("sale_time", TimestampType())
df = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select(
col("data.product_id"),
col("data.quantity"),
col("data.amount"),
col("data.sale_time")
)
# 实时聚合
from pyspark.sql.functions import sum as _sum, count as _count
aggregated_df = df.groupBy(
window(col("sale_time"), "1 minute")
).agg(
_count("*").alias("order_count"),
_sum("quantity").alias("total_quantity"),
_sum("amount").alias("total_amount")
)
# 输出结果
query = aggregated_df \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
# 等待流结束
query.awaitTermination(timeout=300)
6.5 练习题¶
基础题¶
- 选择题
-
实时处理的核心特点不包括什么?
- A. 低延迟
- B. 高成本
- C. 高吞吐
- D. 持续处理
-
简答题
- 解释实时处理 vs 批处理的区别。
- 说明 Kafka 的核心概念。
进阶题¶
- 实践题
- 搭建一个 Kafka 集群。
- 实现 Flink 流处理。
-
实现 Spark Streaming 。
-
设计题
- 设计一个实时数据处理架构。
- 设计一个实时推荐系统。
答案¶
1. 选择题答案¶
- B (实时处理的核心特点不包括高成本)
2. 简答题答案¶
实时处理 vs 批处理的区别: - 实时处理:毫秒级延迟、高吞吐、高成本 - 批处理:分钟/小时级延迟、中吞吐、低成本
Kafka 的核心概念: - Topic 、 Partition 、 Producer 、 Consumer
3. 实践题答案¶
参见 6.2-6.4 节的示例。
4. 设计题答案¶
参见 6.1-6.4 节的架构设计。
6.6 面试准备¶
大厂面试题¶
字节跳动¶
- 解释实时处理的核心概念。
- Kafka 的分区机制是什么?
- Flink 的窗口操作有哪些?
- 如何设计实时数据处理系统?
腾讯¶
- Flink 和 Spark Streaming 的区别是什么?
- 如何实现 Exactly-Once ?
- 如何设计实时数据管道?
- 如何处理实时数据延迟?
阿里云¶
- 实时处理的最佳实践是什么?
- 如何设计实时数据质量?
- 如何设计实时数据监控?
- 如何设计实时数据容灾?
📚 参考资料¶
- Kafka 文档: HTTPS://kafka.apache.org/documentation/
- Flink 文档: HTTPS://flink.apache.org/docs/
- Spark Streaming 文档: HTTPS://spark.apache.org/docs/latest/streaming/
- 《 Stream Processing with Apache Flink 》
🎯 本章小结¶
本章深入讲解了实时数据处理,包括:
- 实时数据处理的核心概念
- Kafka 消息队列
- Flink 流处理
- Spark Streaming
通过本章学习,你掌握了实时数据处理的核心技术,能够设计和实现实时数据处理系统。下一章将深入学习流处理框架。