⚙️ 核心组件详解¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
学习时间: 6 小时 | 难度:⭐⭐⭐ 中等 | 前置知识: 01-系统设计方法论
🎯 本章目标¶
- 深入理解负载均衡的原理与算法选型
- 掌握缓存策略与淘汰算法
- 理解 CDN 的工作原理
- 对比主流消息队列的特性与适用场景
- 建立数据库选型的决策框架
- 掌握分布式 ID 生成方案
- 实现常见限流算法
📋 目录¶
1. 负载均衡¶
1.1 概述¶
负载均衡( Load Balancer )将流量分发到多个后端服务器,实现: - 高可用:一台服务器挂了,流量自动转移到其他服务器 - 水平扩展:通过增加服务器提升整体处理能力 - 故障隔离:自动将故障服务器从池中摘除
┌──────────────┐
│ Client │
└──────┬───────┘
│
┌──────▼───────┐
│ Load │
│ Balancer │
└──┬───┬───┬──┘
│ │ │
┌────────┘ │ └────────┐
│ │ │
┌─────▼──┐ ┌────▼───┐ ┌─────▼──┐
│Server 1│ │Server 2│ │Server 3│
└────────┘ └────────┘ └────────┘
1.2 L4 vs L7 负载均衡¶
| 特性 | L4 (传输层) | L7 (应用层) |
|---|---|---|
| 工作层 | TCP/UDP | HTTP/HTTPS |
| 依据 | IP 、端口 | URL 、 Header 、 Cookie |
| 性能 | 高(不解析内容) | 较低(需解析 HTTP ) |
| 灵活性 | 低 | 高(可按路径/域名分发) |
| SSL 终止 | 不支持 | 支持 |
| 会话保持 | 基于 IP | 基于 Cookie |
| 典型产品 | LVS 、 F5 、 AWS NLB | Nginx 、 HAProxy 、 AWS ALB |
| 适用场景 | 高吞吐、简单路由 | Web 应用、微服务 |
L4负载均衡(基于IP和端口):
Client → [LB看到: srcIP:port → dstIP:port] → Server
只关注传输层信息,不解析应用层内容
L7负载均衡(基于应用层内容):
Client → [LB看到: GET /api/users HTTP/1.1
Host: example.com
Cookie: session=abc]
可以根据URL路径、域名、Header来路由
例如:
/api/* → API Server集群
/static/* → 静态资源服务器
/ws/* → WebSocket服务器
1.3 负载均衡算法¶
1.3.1 轮询( Round Robin )¶
优势:实现简单,请求数量层面近似公平 劣势:不考虑服务器性能差异和当前负载 适用:服务器配置相同、请求处理时间均匀
class RoundRobinBalancer:
def __init__(self, servers):
self.servers = servers
self.index = 0
def get_server(self):
server = self.servers[self.index]
self.index = (self.index + 1) % len(self.servers)
return server
1.3.2 加权轮询( Weighted Round Robin )¶
权重配置:
Server A: weight=5(高性能)
Server B: weight=3(中性能)
Server C: weight=2(低性能)
请求分配比例:A:B:C = 5:3:2
10个请求中:A处理5个,B处理3个,C处理2个
优势:考虑了服务器性能差异 劣势:权重需要手动配置,不能动态调整 适用:服务器配置不同的集群
class WeightedRoundRobinBalancer:
def __init__(self, servers_with_weights):
"""servers_with_weights: [('A', 5), ('B', 3), ('C', 2)]"""
self.servers = servers_with_weights
self.current_weights = [0] * len(servers_with_weights)
self.total_weight = sum(w for _, w in servers_with_weights)
def get_server(self):
# Nginx smooth weighted round robin
max_weight = -1
max_index = 0
for i, (server, weight) in enumerate(self.servers): # enumerate同时获取索引和值
self.current_weights[i] += weight
if self.current_weights[i] > max_weight:
max_weight = self.current_weights[i]
max_index = i
self.current_weights[max_index] -= self.total_weight
return self.servers[max_index][0]
1.3.3 最小连接数( Least Connections )¶
优势:动态感知服务器负载 劣势:需要维护连接计数,慢启动问题 适用:请求处理时间差异大的场景(如视频转码)
import heapq
from collections import defaultdict # defaultdict带默认值的字典,避免KeyError
class LeastConnectionsBalancer:
def __init__(self, servers):
self.connections = {s: 0 for s in servers}
def get_server(self):
server = min(self.connections, key=self.connections.get)
self.connections[server] += 1
return server
def release_server(self, server):
self.connections[server] -= 1
1.3.4 一致性哈希( Consistent Hashing )¶
一致性哈希是分布式系统中最重要的算法之一,用于将请求/数据映射到特定服务器。
哈希环(0 ~ 2^32-1):
0
╱ ╲
╱ ╲
S1 ╲
╱ S2
│ │
│ Hash Ring │
│ │
╲ ╱
S3 ╱
╲ ╱
╲ ╱
S4(虚拟)
Key的映射过程:
1. hash(key) → 得到哈希值
2. 在环上顺时针找到最近的Server
3. key1 → S1, key2 → S2, key3 → S3
解决的问题:普通哈希( hash % N )在增删节点时需要大量数据迁移
虚拟节点:解决数据倾斜问题
import hashlib
import bisect
class ConsistentHash:
def __init__(self, nodes=None, replicas=150):
"""replicas: 每个真实节点的虚拟节点数"""
self.replicas = replicas
self.ring = {} # hash_value -> node
self.sorted_keys = [] # 排序的hash值列表
if nodes:
for node in nodes:
self.add_node(node)
def _hash(self, key):
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node):
for i in range(self.replicas):
virtual_key = f"{node}:v{i}"
hash_val = self._hash(virtual_key)
self.ring[hash_val] = node
bisect.insort(self.sorted_keys, hash_val)
def remove_node(self, node):
for i in range(self.replicas):
virtual_key = f"{node}:v{i}"
hash_val = self._hash(virtual_key)
del self.ring[hash_val]
self.sorted_keys.remove(hash_val)
def get_node(self, key):
if not self.ring:
return None
hash_val = self._hash(key)
idx = bisect.bisect_right(self.sorted_keys, hash_val)
if idx == len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
1.3.5 算法对比总结¶
| 算法 | 复杂度 | 状态 | 公平性 | 适用场景 |
|---|---|---|---|---|
| 轮询 | O(1) | 无状态 | 请求数量近似平均 | 服务器配置相同、请求耗时接近 |
| 加权轮询 | O(N) | 无状态 | 按权重公平 | 配置不同 |
| 最小连接 | O(N) | 有状态 | 动态公平 | 长连接/处理时间不均 |
| 一致性哈希 | O(logN) | 有状态 | 基于哈希 | 有状态服务/缓存 |
| IP 哈希 | O(1) | 无状态 | 基于 IP | 会话保持 |
| 随机 | O(1) | 无状态 | 概率公平 | 简单场景 |
2. 缓存¶
2.1 缓存的价值¶
无缓存: 有缓存:
Client → Server → Database Client → Server → Cache(命中) → 返回
→ Cache(未命中) → DB → 存入Cache
延迟:~100ms 延迟:~1ms(命中) / ~100ms(未命中)
缓存的核心价值(示意):
- 命中时响应时间常可从几十到数百毫秒降到亚毫秒到数毫秒
- 热点读请求可显著从数据库转移出去
- 整体吞吐量通常随命中率提升而改善
2.2 缓存策略¶
Cache Aside (旁路缓存)— 最常见¶
读流程: 写流程:
│ │
├── 1. 查缓存 ├── 1. 更新数据库
│ ├── 命中 → 返回 ├── 2. 删除缓存
│ └── 未命中 └── 完成
│ ├── 2. 查数据库
│ ├── 3. 写入缓存
│ └── 4. 返回
优势:实现简单,适合读多写少 劣势:首次访问一定未命中;短暂不一致窗口 适用:大部分 Web 应用场景
class CacheAsideService:
def __init__(self, cache, db):
self.cache = cache
self.db = db
def get(self, key):
# 1. 查缓存
value = self.cache.get(key)
if value is not None:
return value # 命中
# 2. 未命中,查数据库
value = self.db.get(key)
if value is not None:
# 3. 写入缓存
self.cache.set(key, value, ttl=300)
return value
def update(self, key, value):
# 1. 更新数据库
self.db.update(key, value)
# 2. 删除缓存(而不是更新缓存!)
self.cache.delete(key)
💡 为什么很多系统选择删除缓存而不是更新缓存? 因为并发更新下,删除缓存通常更容易保证实现简单且减少脏数据覆盖;但它也不是银弹,遇到高并发写场景仍可能需要延迟双删、订阅 binlog/CDC 或缩短 TTL 来收敛不一致窗口。
Write Through (写穿透缓存)¶
读流程: 写流程:
│ │
├── 1. 查缓存 ├── 1. 写缓存
│ ├── 命中 → 返回 └── 2. 缓存同步写数据库
│ └── 未命中 (作为整体操作)
│ ├── 2. 缓存从DB加载
│ └── 3. 返回
优势:同步路径更容易管理缓存与底库的一致性 劣势:写延迟更高,不常用数据也可能被提前写入缓存 适用:希望由缓存层统一封装写路径的场景
Write Behind (写回缓存)¶
读流程: 写流程:
│ │
├── 1. 查缓存 ├── 1. 只写缓存(快速返回)
│ ├── 命中 → 返回 └── 2. 异步批量写入数据库
│ └── 未命中
│ ├── 2. 缓存从DB加载
│ └── 3. 返回
优势:写性能极高 劣势:可能丢数据(缓存挂了还没来得及写 DB ) 适用:写密集型场景(如日志、计数器)
Read Through (读穿透缓存)¶
读流程:
│
├── 1. 查缓存
│ ├── 命中 → 返回
│ └── 未命中
│ ├── 2. 缓存自动从DB加载
│ └── 3. 返回
│
│ 与Cache Aside的区别:
│ Cache Aside: 应用程序负责加载
│ Read Through: 缓存组件负责加载
策略对比总结¶
| 策略 | 读性能 | 写性能 | 一致性 | 复杂度 | 适用场景 |
|---|---|---|---|---|---|
| Cache Aside | 高 | 中 | 最终一致(常见) | 低 | 通用场景 |
| Write Through | 高 | 低 | 一致性更易管理,但仍取决于具体实现 | 中 | 封装式缓存层 |
| Write Behind | 高 | 极高 | 弱 / 最终一致 | 高 | 写密集、可容忍异步落库 |
| Read Through | 高 | 中 | 最终一致(常见) | 中 | 读密集型 |
2.3 缓存淘汰策略¶
当缓存空间满时,需要淘汰旧数据:
| 策略 | 全称 | 原理 | 优势 | 劣势 |
|---|---|---|---|---|
| LRU | Least Recently Used | 淘汰最近最少使用的 | 实现简单,效果好 | 偶尔访问会污染 |
| LFU | Least Frequently Used | 淘汰使用频率最低的 | 适合有热点的场景 | 新数据难以被保留 |
| TTL | Time To Live | 按过期时间淘汰 | 简单高效 | 可能淘汰热数据 |
| FIFO | First In First Out | 淘汰最先进入的 | 最简单 | 不考虑使用频率 |
| Random | 随机淘汰 | 随机选择淘汰 | 无额外开销 | 可能淘汰热数据 |
LRU 实现( HashMap + 双向链表):
from collections import OrderedDict
class LRUCache:
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
if key not in self.cache:
return None
self.cache.move_to_end(key) # 移到末尾(最近使用)
return self.cache[key]
def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False) # 移除最久未使用
// Java实现
public class LRUCache<K, V> extends LinkedHashMap<K, V> { // extends继承;implements实现接口
private final int capacity;
public LRUCache(int capacity) {
super(capacity, 0.75f, true); // true = 按访问顺序
this.capacity = capacity;
}
@Override // @Override重写父类方法
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > capacity;
}
}
2.4 缓存常见问题¶
| 问题 | 描述 | 解决方案 |
|---|---|---|
| 缓存穿透 | 查询不存在的数据,每次都打到 DB | 布隆过滤器 / 缓存空值 |
| 缓存击穿 | 热点 key 过期,大量请求打到 DB | 互斥锁 / 热点 key 不过期 |
| 缓存雪崩 | 大量 key 同时过期, DB 瞬间压力暴增 | 过期时间加随机值 / 多级缓存 |
| 缓存不一致 | 缓存和 DB 数据不一致 | 延迟双删 / 订阅 binlog |
缓存穿透: 解决1:布隆过滤器
请求 → 缓存(miss) → DB(miss) 请求 → 布隆过滤器 → 不存在 → 直接返回
请求 → 缓存(miss) → DB(miss) → 可能存在 → 查缓存/DB
(恶意攻击或爬虫)
解决2:缓存空值
请求 → 缓存(miss) → DB(miss) → 缓存空值(TTL较短)
再次请求 → 缓存(hit, 空值) → 返回空
3. CDN¶
3.1 CDN 工作原理¶
CDN ( Content Delivery Network )将内容缓存到离用户最近的边缘节点。
无CDN: 有CDN:
用户(北京) 用户(北京)
│ │
│ 200ms │ 5ms
│ ▼
│ ┌────────────┐
│ │ CDN边缘节点 │ ← 北京节点
│ │ (缓存命中) │
│ └────────────┘
▼ │ (缓存未命中时)
┌────────────┐ │ 100ms
│ 源站服务器 │ ← 广州 ▼
└────────────┘ ┌────────────┐
│ 源站服务器 │ ← 广州
└────────────┘
3.2 CDN 的类型¶
| 类型 | 缓存内容 | 示例 |
|---|---|---|
| Pull CDN | CDN 自动从源站拉取 | 普通网站图片 |
| Push CDN | 主动推送到 CDN | 大型文件、视频 |
3.3 CDN 适用场景¶
| 适合 CDN | 不适合 CDN |
|---|---|
| 静态资源(图片/JS/CSS ) | 动态 API 响应 |
| 视频流 | 个性化内容 |
| 大文件下载 | 实时数据 |
| 高流量网站 | 低流量内部系统 |
3.4 CDN 关键配置¶
| 配置 | 说明 | 建议值 |
|---|---|---|
| TTL | 缓存过期时间 | 静态资源: 1 天~1 年 |
| Cache Key | 缓存键 | URL + 必要参数 |
| 回源策略 | 缓存未命中时的源站请求 | 负载均衡到多个源站 |
| HTTPS | 是否支持 HTTPS | 必须支持 |
| 压缩 | Gzip/Brotli | 开启 |
4. 消息队列¶
4.1 为什么需要消息队列¶
同步调用(紧耦合): 异步调用(松耦合):
订单服务 → 库存服务 订单服务 → MQ → 库存服务
→ 积分服务 → 积分服务
→ 通知服务 → 通知服务
问题: 优势:
- 任一下游挂了整个失败 - 下游挂了不影响上游
- 响应时间是所有下游之和 - 上游快速返回
- 增加新下游需要改上游代码 - 新增消费者无需改上游
消息队列三大价值:
| 价值 | 说明 | 示例 |
|---|---|---|
| 解耦 | 上下游互不依赖 | 订单→库存→积分 |
| 异步 | 快速返回,后台处理 | 上传视频→异步转码 |
| 削峰 | 缓冲突发流量 | 秒杀请求排队处理 |
4.2 Kafka vs RabbitMQ vs RocketMQ¶
| 特性 | Kafka | RabbitMQ | RocketMQ |
|---|---|---|---|
| 定位 | 事件流与日志管道 | 通用消息代理 | 兼顾事务/顺序/延迟消息的 MQ |
| 开发语言 | Scala/Java | Erlang | Java |
| 吞吐特征 | 批量写入和顺序读场景表现突出 | 中高吞吐,路由能力强 | 中高吞吐,业务特性较丰富 |
| 延迟特征 | 低毫秒到几十毫秒,受 batch/acks/刷盘影响 | 低毫秒级常见,强依赖消息模式与持久化配置 | 低毫秒到几十毫秒,依赖刷盘与主从策略 |
| 消息模型 | Pull 为主,消费者自管 offset | Push/Pull 均可 | Pull 为主 |
| 消息保留 | 按时间或大小保留日志 | 以队列/确认语义为主 | 持久化并支持回查 |
| 顺序性 | Partition 内有序 | 依赖队列与消费者模型 | 队列级顺序能力较强 |
| 事务消息 | 支持,但使用约束较多 | 可实现 | 原生支持较成熟 |
| 延迟消息 | 需结合定时/插件/业务层实现 | 插件或业务层实现 | 原生支持较成熟 |
| 消息回溯 | 支持(offset 回放) | 通常不以回放为强项 | 支持 |
| 运维复杂度 | 中,现代版本推荐 KRaft 模式 | 中 | 中 |
选型建议:
选择Kafka:
✅ 大数据场景(日志收集、数据管道)
✅ 高吞吐量需求(100K+ msg/s)
✅ 需要消息回溯和流处理
✅ 事件溯源架构
选择RabbitMQ:
✅ 企业级应用集成
✅ 复杂路由需求(Exchange模式)
✅ 对延迟敏感(μs级)
✅ 团队熟悉Erlang生态
选择RocketMQ:
✅ 国内电商/金融场景
✅ 需要事务消息
✅ 需要延迟消息
✅ 阿里云生态
4.3 消息队列核心概念¶
Producer(生产者)
│
│ 发送消息
▼
┌────────────────────────────┐
│ Broker(消息代理) │
│ ┌──────┐ ┌──────┐ ┌──────┐│
│ │Topic1│ │Topic2│ │Topic3││
│ │ │ │ │ │ ││
│ │ P0 │ │ P0 │ │ P0 ││ ← Partition(分区)
│ │ P1 │ │ P1 │ │ P1 ││
│ │ P2 │ │ │ │ ││
│ └──────┘ └──────┘ └──────┘│
└────────────────────────────┘
│
│ 消费消息
▼
Consumer Group(消费者组)
Consumer 1 Consumer 2 Consumer 3
4.4 消息可靠性保证¶
| 保证级别 | 说明 | 代价 |
|---|---|---|
| At Most Once | 最多一次,可能丢失 | 实现开销通常最低,吞吐往往更高 |
| At Least Once | 至少一次,可能重复 | 需要幂等设计与重试管理,整体成本中等 |
| Exactly Once | 精确一次,不丢不重 | 语义最强,但实现复杂度和运行成本通常更高 |
5. 数据库选型¶
5.1 数据库类型全景¶
数据库选型决策树:
数据有明确的Schema吗?
╱ ╲
是 否
╱ ╲
需要复杂查询/事务? 数据是什么形态?
╱ ╲ ╱ │ ╲
是 否 KV 文档 图关系
│ │ │ │ │
┌────▼────┐ ┌────▼────┐ ┌──▼──┐ ┌▼───┐ ┌▼──────┐
│关系型数据库│ │列族数据库│ │Redis│ │Mongo││Neo4j │
│ MySQL │ │HBase │ │ │ │DB │ │ │
│ PgSQL │ │Cassandra│ └─────┘ └────┘ └──────┘
└─────────┘ └─────────┘
5.2 四类数据库对比¶
| 类型 | 代表 | 数据模型 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|---|---|
| 关系型 | MySQL, PostgreSQL | 表(行+列) | ACID 事务、 SQL 灵活 | 扩展性差、 Schema 固定 | 事务型业务 |
| 文档型 | MongoDB, CouchDB | JSON 文档 | Schema 灵活、开发快 | 不适合复杂 Join | 内容管理、配置 |
| 列族型 | HBase, Cassandra | 列族 | 高写入、大数据量 | 查询模式受限 | 时序数据、日志 |
| 图数据库 | Neo4j, JanusGraph | 节点+边 | 关系查询极快 | 通用查询慢 | 社交网络、推荐 |
5.3 关系型数据库选型¶
| 特性 | MySQL | PostgreSQL |
|---|---|---|
| 并发性能 | 高(简单查询) | 高(复杂查询) |
| JSON 支持 | 5.7+ 基本支持 | 原生优秀支持 |
| 全文搜索 | 支持 | 原生强大 |
| 地理信息 | 基本 | PostGIS 强大 |
| 复制 | 主从 | 主从+逻辑复制 |
| 生态 | 互联网业务场景成熟 | 复杂类型、扩展能力强 |
| 适用 | 通用互联网业务、简单写路径 | 复杂查询、分析扩展、地理/JSON 等场景 |
5.4 NoSQL 选型指南¶
高性能缓存 → Redis (KV, 内存)
→ Memcached (纯缓存)
灵活Schema文档 → MongoDB (通用文档)
→ CouchDB (离线优先)
大规模时序/日志 → Cassandra (AP, 高写入)
→ HBase (CP, Hadoop生态)
→ InfluxDB (时序专用)
社交图谱关系 → Neo4j (原生图)
→ JanusGraph (分布式图)
全文搜索 → Elasticsearch (搜索+分析)
→ Solr (传统搜索)
6. 分布式 ID¶
6.1 为什么需要分布式 ID¶
分库分表后,数据库自增 ID 不再全局唯一,需要全局唯一的 ID 生成方案。
分布式 ID 的要求: - 全局唯一 - 趋势递增(有利于 B+树索引) - 高可用 - 高性能
6.2 方案对比¶
| 方案 | 唯一性 | 有序性 | 可用性 | 性能 | 长度 | 适用场景 |
|---|---|---|---|---|---|---|
| UUID | ✅ | ❌ 无序 | ✅ | ✅ 本地生成 | 128bit(长) | 对顺序无要求 |
| 数据库自增 | ✅ | ✅ | ❌ 单点 | ❌ | 64bit | 小规模 |
| Snowflake | ✅ | ✅ 趋势递增 | ✅ | ✅ 本地生成 | 64bit | 通用方案 |
| Leaf(美团) | ✅ | ✅ | ✅ | ✅ | 64bit | 大厂方案 |
6.3 Snowflake 算法详解¶
Snowflake ID结构(64bit):
0 | 00000000 00000000 00000000 00000000 00000000 0 | 00000 | 00000 | 000000000000
│ │ │ │ │ │
│ │ │ │ │ └── 序列号(12bit)
│ │ │ │ │ 每ms最多4096个
│ │ │ │ │
│ │ │ │ └── 机器ID(5bit)
│ │ │ │ 最多32台
│ │ │ │
│ │ │ └── 数据中心ID(5bit)
│ │ │ 最多32个
│ │ │
│ └── 时间戳(41bit) │
│ 从起始时间(epoch)开始的毫秒数 │
│ 可用 2^41 ms ≈ 69年 │
│ │
└── 符号位(1bit),固定为0
import time
import threading
class SnowflakeGenerator:
# 起始时间戳(示例使用 2020-01-01 00:00:00 UTC)
EPOCH = 1577836800000
# 各部分位数
WORKER_ID_BITS = 5
DATACENTER_ID_BITS = 5
SEQUENCE_BITS = 12
# 最大值
MAX_WORKER_ID = (1 << WORKER_ID_BITS) - 1 # 31
MAX_DATACENTER_ID = (1 << DATACENTER_ID_BITS) - 1 # 31
MAX_SEQUENCE = (1 << SEQUENCE_BITS) - 1 # 4095
# 位移量
WORKER_ID_SHIFT = SEQUENCE_BITS # 12
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS # 17
TIMESTAMP_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS # 22
def __init__(self, datacenter_id, worker_id):
if datacenter_id > self.MAX_DATACENTER_ID or datacenter_id < 0:
raise ValueError(f"Datacenter ID must be between 0 and {self.MAX_DATACENTER_ID}")
if worker_id > self.MAX_WORKER_ID or worker_id < 0:
raise ValueError(f"Worker ID must be between 0 and {self.MAX_WORKER_ID}")
self.datacenter_id = datacenter_id
self.worker_id = worker_id
self.sequence = 0
self.last_timestamp = -1
self.lock = threading.Lock()
def _current_millis(self):
return int(time.time() * 1000)
def next_id(self):
with self.lock:
timestamp = self._current_millis()
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & self.MAX_SEQUENCE
if self.sequence == 0:
# 序列号用完,等待下一毫秒
while timestamp <= self.last_timestamp:
timestamp = self._current_millis()
else:
self.sequence = 0
if timestamp < self.last_timestamp:
raise Exception("Clock moved backwards!")
self.last_timestamp = timestamp
return (
((timestamp - self.EPOCH) << self.TIMESTAMP_SHIFT) |
(self.datacenter_id << self.DATACENTER_ID_SHIFT) |
(self.worker_id << self.WORKER_ID_SHIFT) |
self.sequence
)
# 使用示例
generator = SnowflakeGenerator(datacenter_id=1, worker_id=1)
id1 = generator.next_id()
id2 = generator.next_id()
7. 限流算法¶
7.1 为什么需要限流¶
正常流量: 突发流量/攻击:
|||| |||||||||||||||||||||
↓↓↓↓ ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
┌──────┐ ┌──────┐
│Server│ ← OK │Server│ ← 崩溃!
└──────┘ └──────┘
加上限流器:
|||||||||||||||||||||
↓
┌──────────┐
│ Rate │ → 超出限制的请求被拒绝(429)
│ Limiter │
└────┬─────┘
↓ (只放行合理量的请求)
||||
↓↓↓↓
┌──────┐
│Server│ ← 安全
└──────┘
7.2 令牌桶算法( Token Bucket )¶
原理:
- 以固定速率往桶里放令牌
- 请求来时取一个令牌
- 桶满了令牌就丢弃
- 没有令牌的请求被拒绝
┌─────────────────┐
│ Token Bucket │
│ ○ ○ ○ ○ ○ │ ← 最多capacity个令牌
│ ○ ○ ○ ○ │
│ │
└────────┬─────────┘
│ ↑
取令牌 │ │ 定期放令牌
↓ │
┌────────┐
│ Request│
└────────┘
特点:
- 允许突发流量(桶里有存量令牌)
- 稳定的平均速率
import time
import threading
class TokenBucket:
def __init__(self, rate, capacity):
"""
rate: 每秒生成令牌速率
capacity: 桶的最大容量
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity # 初始满桶
self.last_time = time.time()
self.lock = threading.Lock()
def allow_request(self):
with self.lock:
now = time.time()
# 补充令牌
elapsed = now - self.last_time
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_time = now
# 尝试消耗一个令牌
if self.tokens >= 1:
self.tokens -= 1
return True
return False
# 使用:每秒10个请求,允许突发20个
limiter = TokenBucket(rate=10, capacity=20)
# 模拟请求
for i in range(25):
if limiter.allow_request():
print(f"Request {i}: Allowed")
else:
print(f"Request {i}: Rejected")
// Go实现
package ratelimit
import (
"sync"
"time"
)
type TokenBucket struct {
rate float64
capacity float64
tokens float64
lastTime time.Time
mu sync.Mutex
}
func NewTokenBucket(rate, capacity float64) *TokenBucket {
return &TokenBucket{
rate: rate,
capacity: capacity,
tokens: capacity,
lastTime: time.Now(),
}
}
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock() // defer延迟执行,函数返回前调用
now := time.Now()
elapsed := now.Sub(tb.lastTime).Seconds()
tb.tokens += elapsed * tb.rate
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity
}
tb.lastTime = now
if tb.tokens >= 1 {
tb.tokens -= 1
return true
}
return false
}
7.3 漏桶算法( Leaky Bucket )¶
原理:
- 请求进入桶中排队
- 以固定速率处理请求
- 桶满了新请求被丢弃
请求进入
│
▼
┌─────────────┐
│ ○ ○ ○ ○ │ ← 队列(有最大容量)
│ ○ ○ ○ │
│ ○ ○ │
└──────┬──────┘
│
╔══╧══╗
║ 漏口 ║ ← 固定速率流出
╚══╤══╝
│
▼
○ ○ ○ ← 匀速处理
特点:
- 输出速率恒定(匀速)
- 不允许突发流量
- 适合对处理速率有严格要求的场景
import time
import threading
from collections import deque
class LeakyBucket:
def __init__(self, rate, capacity):
"""
rate: 每秒处理请求数(漏出速率)
capacity: 桶的最大容量
"""
self.rate = rate
self.capacity = capacity
self.water = 0 # 当前水量(排队请求数)
self.last_time = time.time()
self.lock = threading.Lock()
def allow_request(self):
with self.lock:
now = time.time()
elapsed = now - self.last_time
# 漏掉一些水
self.water = max(0, self.water - elapsed * self.rate)
self.last_time = now
# 尝试加水
if self.water < self.capacity:
self.water += 1
return True
return False
7.4 滑动窗口算法( Sliding Window )¶
固定窗口的问题:
窗口1: [00:00 ~ 01:00] 99个请求
窗口2: [01:00 ~ 02:00] 99个请求
限制: 100个/分钟
但是在 [00:30 ~ 01:30] 这个时间段内有 198个请求!
超出了限制!
滑动窗口解决方案:
┌────────────────────────────────┐
│ 滑动窗口(1分钟) │
│ [当前时间 - 60s, 当前时间] │
└────────────────────────────────┘
每次请求来时,只计算最近60秒内的请求数
import time
import threading # 线程池/多线程:并发执行任务
from collections import deque
class SlidingWindowLog:
"""滑动窗口日志算法 - 精确但内存开销大"""
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque() # 存储每个请求的时间戳
self.lock = threading.Lock()
def allow_request(self):
with self.lock:
now = time.time()
# 清除窗口外的请求
while self.requests and self.requests[0] < now - self.window_seconds:
self.requests.popleft()
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
return False
class SlidingWindowCounter:
"""滑动窗口计数器 - 近似但内存省"""
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.current_count = 0
self.previous_count = 0
self.current_start = time.time()
self.lock = threading.Lock()
def allow_request(self):
with self.lock:
now = time.time()
elapsed = now - self.current_start
if elapsed >= self.window_seconds:
self.previous_count = self.current_count
self.current_count = 0
self.current_start = now
elapsed = 0
# 加权计算
weight = 1 - (elapsed / self.window_seconds)
estimated_count = self.previous_count * weight + self.current_count
if estimated_count < self.max_requests:
self.current_count += 1
return True
return False
7.5 限流算法对比¶
| 算法 | 突发流量 | 速率平滑 | 内存 | 精确度 | 适用场景 |
|---|---|---|---|---|---|
| 令牌桶 | ✅允许 | 中 | O(1) | 高 | API 限流的常见选择 |
| 漏桶 | ❌不允许 | ✅匀速 | O(1) | 高 | 请求处理平滑化 |
| 固定窗口 | 有边界问题 | 低 | O(1) | 低 | 简单统计 |
| 滑动窗口日志 | 无边界问题 | 高 | O(N) | 最高 | 精确限流 |
| 滑动窗口计数 | 无边界问题 | 高 | O(1) | 较高 | 通用限流 |
8. 练习与延伸阅读¶
8.1 练习题¶
- 负载均衡:实现一个支持权重动态调整的负载均衡器
- 缓存:设计一个支持 TTL 的 LRU 缓存(同时支持按时间和按容量淘汰)
- 消息队列选型:为以下场景选择合适的消息队列并说明理由:
- 用户行为日志收集系统
- 电商订单处理系统
- 即时通讯消息分发系统
- 分布式 ID:分析 Snowflake 算法的时钟回拨问题,设计解决方案
- 限流:实现一个分布式限流器(基于 Redis )
8.2 延伸阅读¶
- 《 DDIA 》第 5 章 — 复制
- 《 DDIA 》第 6 章 — 分区
- Nginx 负载均衡官方文档
- Redis 官方文档 — Caching patterns
- Kafka 设计文档
- Google Guava RateLimiter 源码分析
📝 本章小结¶
| 组件 | 核心要点 | 面试常见问题 |
|---|---|---|
| 负载均衡 | L4/L7 选型、算法选择 | 一致性哈希原理 |
| 缓存 | Cache Aside 常见、 LRU/LFU 要结合 workload | 缓存穿透/击穿/雪崩 |
| CDN | Pull/Push 、适合静态资源 | CDN 如何更新缓存 |
| 消息队列 | 选型看吞吐、顺序、事务和运维 | 消息丢失/重复如何处理 |
| 数据库 | 按数据特征和查询模式选型 | 关系型 vs 文档型场景 |
| 分布式 ID | Snowflake 是常见通用方案之一 | 时钟回拨问题 |
| 限流 | 令牌桶常见,但并非唯一答案 | 手写限流算法 |
← 上一章:系统设计方法论 | 下一章:数据存储设计 →
⚠️ 核验说明(2026-04-03):本页已完成逐段人工复核,重点修正了缓存一致性、消息队列选型、数据库生态和 Snowflake 示例中的过强口径,并补充了这些组件都依赖工作负载与实现细节的前提。涉及具体中间件版本、基准性能或厂商功能时,请以官方文档和压测结果为准。
最后更新日期: 2026-04-03