跳转至

03 - 分布式推理

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

让多台机器协同工作,突破单机限制

📎 交叉引用:本章侧重张量并行/流水线并行的底层原理与实现。 vLLM/SGLang 框架级分布式推理请参考 LLM 应用/12-推理优化 §12.5 ,分布式训练请参考 深度学习/06-高级主题/09-分布式训练

📖 章节概述

本章将深入探讨分布式推理技术,包括模型并行、数据并行和流水线并行等方法。这些技术可以将大模型分布到多个 GPU 或机器上,突破单机显存和计算限制。

🎯 学习目标

完成本章后,你将能够:

  • 理解分布式推理的基本原理
  • 掌握模型并行、数据并行、流水线并行的实现方法
  • 了解不同并行策略的适用场景
  • 能够设计和实现分布式推理系统

1. 分布式推理概述

1.1 为什么需要分布式推理

显存限制: - 大型模型(如 70B 参数)需要超过 140GB 显存 - 单卡 GPU 通常只有 24-80GB 显存 - 需要将模型分布到多个 GPU

计算需求: - 推理延迟要求高 - 吞吐量需求大 - 需要并行计算加速

1.2 分布式推理类型

Text Only
分布式推理
├── 模型并行 (Model Parallelism)
│   ├── 张量并行 (Tensor Parallelism)
│   └── 流水线并行 (Pipeline Parallelism)
├── 数据并行 (Data Parallelism)
│   ├── 同步数据并行
│   └── 异步数据并行
└── 混合并行 (Hybrid Parallelism)
    ├── 模型并行 + 数据并行
    └── 流水线并行 + 数据并行

2. 模型并行

2.1 张量并行

张量并行将模型的张量(权重、激活值)切分到多个 GPU 上。

原理: - 将大矩阵切分为多个小矩阵 - 每个 GPU 计算一部分 - 通过通信合并结果

Python
import torch
import torch.nn as nn
import torch.distributed as dist

class ColumnParallelLinear(nn.Module):
    """
    列并行线性层(Column Parallel Linear)

    将权重矩阵按列切分(输出维度切分),每个GPU计算输出的不同部分。
    适用于:Transformer的FFN层、Attention输出投影等

    通信方式:all_gather(收集所有GPU的部分输出,拼接成完整输出)
    """
    def __init__(self, in_features, out_features, world_size, rank):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.world_size = world_size
        self.rank = rank

        # 切分输出维度(列切分)
        self.out_per_rank = out_features // world_size

        # 每个GPU只存储部分权重(列切片)
        self.weight = nn.Parameter(
            torch.randn(self.out_per_rank, in_features)
        )
        self.bias = nn.Parameter(torch.randn(self.out_per_rank))

    def forward(self, x):
        # 本地计算:每个GPU计算 Y_local = X @ W_local^T + b_local
        local_output = torch.nn.functional.linear(x, self.weight, self.bias)

        # 列并行:使用 all_gather 收集所有GPU的输出并拼接
        # 每个GPU得到的是输出的不同列(特征维度),需要拼接而非求和
        gathered_outputs = [torch.zeros_like(local_output) for _ in range(self.world_size)]
        dist.all_gather(gathered_outputs, local_output)

        # 按列(特征维度)拼接所有GPU的输出
        output = torch.cat(gathered_outputs, dim=-1)

        return output


class RowParallelLinear(nn.Module):
    """
    行并行线性层(Row Parallel Linear)

    将权重矩阵按行切分(输入维度切分),每个GPU计算部分结果后求和。
    适用于:Transformer的第一层FFN、Attention的QKV投影等

    通信方式:all_reduce(对所有GPU的部分结果求和)
    """
    def __init__(self, in_features, out_features, world_size, rank):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.world_size = world_size
        self.rank = rank

        # 切分输入维度(行切分)
        self.in_per_rank = in_features // world_size

        # 每个GPU只存储部分权重(行切片)
        self.weight = nn.Parameter(
            torch.randn(out_features, self.in_per_rank)
        )
        # 行并行通常不加bias,或者只在rank 0上加
        if rank == 0:
            self.bias = nn.Parameter(torch.randn(out_features))
        else:
            self.register_parameter('bias', None)

    def forward(self, x):
        # 每个GPU只处理输入的部分特征
        x_partition = x[..., self.rank * self.in_per_rank : (self.rank + 1) * self.in_per_rank]

        # 本地计算:每个GPU计算 Y_local = X_local @ W_local^T
        local_output = torch.nn.functional.linear(x_partition, self.weight)

        # 行并行:使用 all_reduce 对所有GPU的结果求和
        # 每个GPU得到的是部分和,需要累加得到最终结果
        output = local_output.clone()
        dist.all_reduce(output, op=dist.ReduceOp.SUM)

        # 只在 rank 0 上添加 bias
        if self.bias is not None:
            output = output + self.bias

        return output


# 使用示例
# world_size = 4  # 4个GPU
# rank = dist.get_rank()  # 当前GPU的rank
#
# # 列并行:输出维度切分,使用 all_gather
# col_parallel = ColumnParallelLinear(768, 3072, world_size, rank)
#
# # 行并行:输入维度切分,使用 all_reduce
# row_parallel = RowParallelLinear(3072, 768, world_size, rank)

2.1.1 张量并行通信方式对比

并行类型 切分方式 通信操作 输出形状变化
列并行 输出维度切分 all_gather [B, S, out_per_rank] → [B, S, out_features]
行并行 输入维度切分 all_reduce [B, S, out_features] → [B, S, out_features]
Python
# 通信操作对比示例
import torch.distributed as dist

def demonstrate_comm_difference():
    """
    演示 all_gather 和 all_reduce 的区别
    """
    # 假设 4 个 GPU,每个 GPU 计算输出的一部分
    # GPU 0: [1, 1], GPU 1: [2, 2], GPU 2: [3, 3], GPU 3: [4, 4]

    # === all_gather(列并行使用)===
    # 收集所有GPU的结果并拼接
    # 结果: [1, 1, 2, 2, 3, 3, 4, 4] - 拼接成完整输出
    gathered = [torch.zeros(2) for _ in range(4)]
    dist.all_gather(gathered, local_output)
    full_output = torch.cat(gathered, dim=-1)

    # === all_reduce(行并行使用)===
    # 对所有GPU的结果求和
    # 结果: [1+2+3+4, 1+2+3+4] = [10, 10] - 部分和累加
    output = local_output.clone()
    dist.all_reduce(output, op=dist.ReduceOp.SUM)

2.2 流水线并行

流水线并行将模型的层分布到多个 GPU 上,形成流水线。

原理: - GPU1 计算第 1-3 层 - GPU2 计算第 4-6 层 - GPU3 计算第 7-9 层 - 数据像流水一样在 GPU 间流动

Python
import torch
import torch.nn as nn

class PipelineParallelModel(nn.Module):
    """
    流水线并行模型
    """
    def __init__(self, num_stages=4):
        super().__init__()
        self.stages = nn.ModuleList()

        # 创建多个阶段
        for i in range(num_stages):
            stage = nn.Sequential(
                nn.Linear(768, 768),
                nn.ReLU(),
                nn.Linear(768, 768),
                nn.ReLU()
            )
            self.stages.append(stage)

    def forward(self, x, stage_idx):
        """
        在指定阶段执行前向传播

        Args:
            x: 输入张量
            stage_idx: 阶段索引
        """
        return self.stages[stage_idx] (x)

# 流水线执行
def pipeline_execute(model, inputs, num_stages=4):
    """
    执行流水线并行

    Args:
        model: 流水线模型
        inputs: 输入数据
        num_stages: 阶段数量
    """
    # 初始化各阶段输入
    stage_inputs = [None] * num_stages
    stage_inputs[0] = inputs

    # 流水线执行
    outputs = []
    for step in range(num_stages):
        # 执行当前阶段
        for stage_idx in range(num_stages):
            if stage_inputs[stage_idx] is not None:
                output = model(stage_inputs[stage_idx], stage_idx)

                # 传递到下一阶段
                if stage_idx < num_stages - 1:
                    stage_inputs[stage_idx + 1] = output
                else:
                    outputs.append(output)

        # 准备下一批输入
        if step < num_stages - 1:
            stage_inputs[0] = inputs

    return outputs

# 使用示例
# model = PipelineParallelModel(num_stages=4)
# inputs = torch.randn(32, 768)
# outputs = pipeline_execute(model, inputs)

2.3 使用 Hugging Face 的模型并行

Python
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

def model_parallel_inference(model_name="meta-llama/Llama-2-7b-hf"):
    """
    使用Hugging Face的模型并行

    Args:
        model_name: 模型名称或路径
    """
    # 加载分词器
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    # 加载模型,自动分布到多个GPU
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        device_map="auto",  # 自动设备映射
        torch_dtype=torch.float16
    )

    # 查看模型分布
    print("模型设备分布:")
    for name, param in model.named_parameters():
        if param.device != torch.device('cpu'):
            print(f"  {name}: {param.device}")

    # 推理
    prompt = "请介绍一下人工智能的发展历程"
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    with torch.no_grad():  # 禁用梯度计算,节省内存
        outputs = model.generate(
            **inputs,
            max_length=200,
            do_sample=True,
            temperature=0.7
        )

    result = tokenizer.decode(outputs[0], skip_special_tokens=True)
    print(result)

    return model, tokenizer

# 使用示例
# model, tokenizer = model_parallel_inference()

3. 数据并行

3.1 同步数据并行

同步数据并行在多个 GPU 上复制模型,每个 GPU 处理不同的数据批次。

Python
import os
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

def setup_distributed():
    """
    初始化分布式环境
    """
    # 从环境变量获取分布式配置
    rank = int(os.environ.get('RANK', 0))
    world_size = int(os.environ.get('WORLD_SIZE', 1))
    local_rank = int(os.environ.get('LOCAL_RANK', 0))

    # 初始化进程组
    dist.init_process_group(
        backend='nccl',
        rank=rank,
        world_size=world_size
    )

    # 设置当前设备
    torch.cuda.set_device(local_rank)

    return rank, world_size, local_rank

def data_parallel_inference(model, dataloader):
    """
    数据并行推理

    Args:
        model: 要并行化的模型
        dataloader: 数据加载器
    """
    # 初始化分布式环境
    rank, world_size, local_rank = setup_distributed()

    # 将模型移动到当前GPU
    model = model.to(local_rank)

    # 包装为DDP模型
    model = DDP(model, device_ids=[local_rank])

    # 推理
    model.eval()  # eval()评估模式
    all_outputs = []

    with torch.no_grad():
        for batch in dataloader:
            inputs, targets = batch
            inputs = inputs.to(local_rank)

            outputs = model(inputs)
            all_outputs.append(outputs.cpu())

    # 收集所有GPU的结果
    if rank == 0:
        outputs = torch.cat(all_outputs, dim=0)  # torch.cat沿已有维度拼接张量
        return outputs
    else:
        return None

# 使用示例(需要使用torchrun启动)
# torchrun --nproc_per_node=4 your_script.py

3.2 异步数据并行

异步数据并行允许 GPU 独立更新模型参数,无需等待其他 GPU 。

Python
import copy
import torch
import torch.nn as nn

class AsyncDataParallel(nn.Module):
    """
    异步数据并行
    """
    def __init__(self, model, device_ids):
        super().__init__()
        self.device_ids = device_ids
        self.models = nn.ModuleList([
            copy.deepcopy(model).to(device) for device in device_ids  # 移至GPU/CPU
        ])

    def forward(self, inputs):
        """
        异步前向传播
        """
        # 将输入分发到各个设备
        inputs_chunks = inputs.chunk(len(self.device_ids))

        # 异步执行
        outputs = []
        for model, input_chunk in zip(self.models, inputs_chunks):  # zip按位置配对
            output = model(input_chunk.to(model.device))
            outputs.append(output.cpu())

        # 合并结果
        return torch.cat(outputs, dim=0)

# 使用示例
# model = YourModel()
# parallel_model = AsyncDataParallel(model, device_ids=[0, 1, 2, 3])
# outputs = parallel_model(inputs)

⚠️ 重要说明:上述 AsyncDataParallel 类名虽含"Async",但实际是同步数据并行的演示。代码中的 torch.cuda.current_stream().synchronize() 会阻塞等待所有 GPU 完成。真正的异步数据并行(如 Hogwild!)需要共享内存参数和无锁梯度更新,在深度学习中较少使用。

4. 混合并行

4.1 模型并行 + 数据并行

Python
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

class HybridParallelModel(nn.Module):
    """
    混合并行模型(模型并行 + 数据并行)
    """
    def __init__(self, model_parallel_size, data_parallel_size):
        super().__init__()
        self.model_parallel_size = model_parallel_size
        self.data_parallel_size = data_parallel_size

        # 创建模型并行的子模型
        self.sub_models = nn.ModuleList()
        for i in range(model_parallel_size):
            sub_model = self._create_sub_model(i, model_parallel_size)
            self.sub_models.append(sub_model)

    def _create_sub_model(self, idx, total):
        """
        创建子模型
        """
        # 这里简化处理,实际需要根据模型架构设计
        return nn.Sequential(
            nn.Linear(768, 768),
            nn.ReLU(),
            nn.Linear(768, 768)
        )

    def forward(self, x):
        """
        前向传播
        """
        # 模型并行:每个子模型处理部分数据
        outputs = []
        for sub_model in self.sub_models:
            output = sub_model(x)
            outputs.append(output)

        # 合并结果
        output = torch.cat(outputs, dim=-1)

        return output

def hybrid_parallel_training(model, dataloader):
    """
    混合并行训练
    """
    # 初始化分布式环境
    rank = dist.get_rank()
    world_size = dist.get_world_size()

    # 计算模型并行和数据并行的大小
    model_parallel_size = 2  # 例如
    data_parallel_size = world_size // model_parallel_size

    # 创建混合并行模型
    model = HybridParallelModel(model_parallel_size, data_parallel_size)

    # 数据并行包装
    model = DDP(model)

    # 训练循环
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(10):
        for batch in dataloader:
            inputs, targets = batch

            # 前向传播
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            # 反向传播
            optimizer.zero_grad()  # 清零梯度
            loss.backward()  # 反向传播计算梯度
            optimizer.step()  # 更新参数

        print(f"Epoch {epoch+1} completed")

# 使用示例
# hybrid_parallel_training(model, dataloader)

4.2 流水线并行 + 数据并行

Python
import torch
import torch.nn as nn
import torch.distributed as dist

class PipelineDataParallel(nn.Module):
    """
    流水线并行 + 数据并行
    """
    def __init__(self, num_pipeline_stages, num_data_parallel):
        super().__init__()
        self.num_pipeline_stages = num_pipeline_stages
        self.num_data_parallel = num_data_parallel

        # 创建流水线阶段
        self.pipeline_stages = nn.ModuleList()
        for i in range(num_pipeline_stages):
            stage = self._create_stage(i, num_pipeline_stages)
            self.pipeline_stages.append(stage)

    def _create_stage(self, idx, total):
        """
        创建流水线阶段
        """
        return nn.Sequential(
            nn.Linear(768, 768),
            nn.ReLU(),
            nn.Linear(768, 768)
        )

    def forward(self, x, pipeline_idx, data_idx):
        """
        前向传播

        Args:
            x: 输入张量
            pipeline_idx: 流水线索引
            data_idx: 数据并行索引
        """
        # 执行流水线阶段
        output = x
        for stage in self.pipeline_stages:
            output = stage(output)

        return output

def pipeline_data_parallel_execute(model, dataloader):
    """
    执行流水线并行 + 数据并行
    """
    rank = dist.get_rank()
    world_size = dist.get_world_size()

    # 计算流水线并行和数据并行的大小
    num_pipeline_stages = 4  # 例如
    num_data_parallel = world_size // num_pipeline_stages

    # 计算当前rank的角色
    pipeline_idx = rank // num_data_parallel
    data_idx = rank % num_data_parallel

    # 执行推理
    model.eval()
    all_outputs = []

    with torch.no_grad():
        for batch in dataloader:
            inputs, targets = batch

            # 只处理当前数据并行的数据
            batch_size = inputs.size(0)
            chunk_size = batch_size // num_data_parallel
            start_idx = data_idx * chunk_size
            end_idx = start_idx + chunk_size

            chunk_inputs = inputs[start_idx:end_idx]

            # 执行流水线
            outputs = model(chunk_inputs, pipeline_idx, data_idx)
            all_outputs.append(outputs.cpu())

    # 收集结果
    outputs = torch.cat(all_outputs, dim=0)
    return outputs

# 使用示例
# model = PipelineDataParallel(num_pipeline_stages=4, num_data_parallel=2)
# outputs = pipeline_data_parallel_execute(model, dataloader)

5. 使用 vLLM 进行分布式推理

5.1 vLLM 简介

vLLM 是一个高性能的大模型推理库,支持高效的分布式推理。

Python
from vllm import LLM, SamplingParams

def vllm_distributed_inference(model_name="meta-llama/Llama-2-7b-hf"):
    """
    使用vLLM进行分布式推理

    Args:
        model_name: 模型名称或路径
    """
    # 初始化LLM(自动使用所有可用GPU)
    llm = LLM(
        model=model_name,
        tensor_parallel_size=4,  # 张量并行大小
        gpu_memory_utilization=0.9,  # GPU内存利用率
        max_model_len=4096  # 最大序列长度
    )

    # 设置采样参数
    sampling_params = SamplingParams(
        temperature=0.7,
        top_p=0.95,
        max_tokens=200
    )

    # 准备提示
    prompts = [
        "请介绍一下人工智能的发展历程",
        "什么是机器学习?",
        "深度学习的应用有哪些?"
    ]

    # 推理
    outputs = llm.generate(prompts, sampling_params)

    # 打印结果
    for output in outputs:
        print(f"提示: {output.prompt}")
        print(f"输出: {output.outputs[0].text}\n")

    return llm

# 使用示例
# llm = vllm_distributed_inference()

5.2 vLLM 高级配置

Python
from vllm import LLM, SamplingParams

def advanced_vllm_inference(model_name="meta-llama/Llama-2-7b-hf"):
    """
    高级vLLM配置
    """
    llm = LLM(
        model=model_name,

        # 张量并行配置
        tensor_parallel_size=4,

        # 内存配置
        gpu_memory_utilization=0.9,
        swap_space=4,  # 交换空间(GB)

        # 模型配置
        max_model_len=8192,
        trust_remote_code=True,

        # 量化配置
        quantization="awq",  # 或 "gptq", "squeezellm"

        # 性能优化
        enforce_eager=False,  # 使用CUDA图优化
        disable_log_stats=False,

        # 其他配置
        dtype="float16",  # 或 "bfloat16"
        seed=42
    )

    # 批量推理
    prompts = ["提示1", "提示2", "提示3"]
    sampling_params = SamplingParams(
        temperature=0.7,
        top_p=0.95,
        max_tokens=200,
        n=1,  # 每个提示生成的数量
        best_of=1,  # 采样候选数
        presence_penalty=0.0,
        frequency_penalty=0.0
    )

    outputs = llm.generate(prompts, sampling_params)

    return outputs

# 使用示例
# outputs = advanced_vllm_inference()

6. 练习题

基础练习

  1. 实现简单的张量并行
Python
# 练习: 实现一个简单的张量并行线性层
class SimpleTensorParallel(nn.Module):
    def __init__(self, in_features, out_features, world_size):
        # 你的代码
        pass

    def forward(self, x):
        # 你的代码
        pass
  1. 实现简单的流水线并行
Python
# 练习: 实现一个简单的流水线执行器
class SimplePipeline:
    def __init__(self, stages):
        # 你的代码
        pass

    def execute(self, inputs):
        # 你的代码
        pass

进阶练习

  1. 实现混合并行策略
Python
# 练习: 实现模型并行 + 数据并行的混合策略
class HybridParallel:
    def __init__(self, model, mp_size, dp_size):
        # 你的代码
        pass

    def forward(self, x):
        # 你的代码
        pass
  1. 实现负载均衡
Python
# 练习: 实现分布式推理的负载均衡
class LoadBalancer:
    def __init__(self, devices):
        # 你的代码
        pass

    def assign_task(self, task):
        # 你的代码
        pass

项目练习

  1. 创建分布式推理框架
  2. 支持多种并行策略
  3. 自动负载均衡
  4. 性能监控和优化

7. 最佳实践

✅ 推荐做法

  1. 选择合适的并行策略
  2. 小模型:数据并行
  3. 大模型:模型并行
  4. 超大模型:混合并行

  5. 优化通信开销

  6. 减少 GPU 间通信
  7. 使用高效的通信后端
  8. 批量传输数据

  9. 监控性能

  10. 监控 GPU 利用率
  11. 监控通信开销
  12. 分析瓶颈

❌ 避免做法

  1. 过度并行化
  2. 不要使用太多 GPU
  3. 考虑通信开销
  4. 评估实际收益

  5. 忽略负载均衡

  6. 确保各 GPU 负载均衡
  7. 避免某些 GPU 空闲
  8. 动态调整任务分配

  9. 不充分的测试

  10. 在小规模上测试
  11. 验证正确性
  12. 逐步扩展规模

8. 总结

本章介绍了分布式推理的核心技术:

  • 张量并行: 切分张量到多个 GPU
  • 流水线并行: 切分层到多个 GPU
  • 数据并行: 复制模型到多个 GPU
  • 混合并行: 组合多种并行策略

选择合适的并行策略需要考虑模型大小、硬件配置和应用场景。

9. 下一步

继续学习04-云端推理服务,了解如何将模型部署到云端并提供服务。