跳转至

07 - DeepSeek R1 架构详解

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

内容说明:本文档基于 DeepSeek-R1 论文( arXiv:2501.12948 , 2025 年 1 月发布)和相关技术文档编写,介绍 DeepSeek R1 的真实架构、训练流程和推理特性。代码示例仅用于说明相关技术原理,不代表 DeepSeek R1 的实际实现。

📖 章节概述

本章介绍 DeepSeek R1 的模型架构、训练流程和推理特性。 DeepSeek R1 基于 DeepSeek-V3 基座模型,通过强化学习训练获得强大的推理能力。

🎯 学习目标

完成本章后,你将能够:

  • 理解 DeepSeek R1 的真实架构( MoE + MLA )
  • 掌握 GRPO 强化学习训练流程
  • 了解推理能力如何通过 RL 自然涌现
  • 理解蒸馏技术在小模型推理中的应用

1. DeepSeek R1 概述

1.1 什么是 DeepSeek R1

DeepSeek R1 ( 2025 年 1 月发布)是一个通过强化学习获得推理能力的开源大语言模型:

核心事实: - 架构:基于 DeepSeek-V3 ,采用 MoE (混合专家)+ MLA (多头潜在注意力) - 参数量: 671B 总参数,每 token 激活 37B - 上下文: 128K tokens - 开源协议: MIT License - 注意: R1 是纯文本模型,不是多模态模型

训练流程: 1. DeepSeek-V3-Base 基座模型 2. 冷启动数据 SFT (少量高质量 CoT 数据) 3. GRPO 强化学习训练(推理能力涌现) 4. 拒绝采样 + SFT (改善输出格式与可读性) 5. 二次 RL 训练(对齐人类偏好)

1.2 模型架构(基于 DeepSeek-V3 )

Text Only
DeepSeek R1 真实架构(DeepSeek-V3 基座)
├── Multi-Head Latent Attention (MLA)
│   ├── 低秩键值压缩(减少 KV Cache)
│   ├── 解耦旋转位置编码 (RoPE)
│   └── 推理时显著节省显存
├── DeepSeekMoE
│   ├── 细粒度专家划分(每层 256 个路由专家 + 1 个共享专家)
│   ├── Top-8 路由策略
│   └── 辅助损失平衡负载
└── 训练创新
    ├── GRPO(Group Relative Policy Optimization)
    ├── 无需额外 critic/reward 模型
    ├── 推理能力通过 RL 自然涌现(非硬编码模块)
    └── 置信度估计

2. 模型架构

2.1 混合专家( MoE )架构

DeepSeek-V3 采用细粒度的混合专家架构,每层包含 256 个路由专家和 1 个共享专家,使用 Top-8 路由策略。

Python
import torch
import torch.nn as nn
import torch.nn.functional as F

class MoEExpert(nn.Module):  # 继承nn.Module定义网络层
    """
    混合专家网络中的单个专家
    """
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()  # super()调用父类方法
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.activation = nn.GELU()

    def forward(self, x):
        x = self.fc1(x)
        x = self.activation(x)
        x = self.fc2(x)
        return x

class MoELayer(nn.Module):
    """
    混合专家层
    """
    def __init__(self, input_dim, hidden_dim, output_dim, num_experts=8, top_k=2):
        super().__init__()
        self.num_experts = num_experts
        self.top_k = top_k

        # 门控网络
        self.gate = nn.Linear(input_dim, num_experts)

        # 专家网络
        self.experts = nn.ModuleList([
            MoEExpert(input_dim, hidden_dim, output_dim)
            for _ in range(num_experts)
        ])

    def forward(self, x):
        """
        前向传播

        Args:
            x: 输入张量 [batch_size, seq_len, input_dim]
        """
        batch_size, seq_len, input_dim = x.shape

        # 门控选择
        gate_logits = self.gate(x)  # [batch_size, seq_len, num_experts]
        gate_probs = F.softmax(gate_logits, dim=-1)  # F.xxx PyTorch函数式API

        # 选择top-k专家
        top_k_probs, top_k_indices = torch.topk(gate_probs, self.top_k, dim=-1)

        # 归一化top-k概率
        top_k_probs = top_k_probs / top_k_probs.sum(dim=-1, keepdim=True)

        # 初始化输出(注意使用output_dim,即专家输出维度)
        output_dim = self.experts[0].fc2.out_features
        output = torch.zeros(batch_size, seq_len, output_dim, device=x.device)

        # 应用选定的专家
        for k in range(self.top_k):
            # 获取当前k的专家索引和概率
            expert_idx = top_k_indices[:, :, k]  # [batch_size, seq_len]
            expert_prob = top_k_probs[:, :, k:k+1]  # [batch_size, seq_len, 1]

            # 对每个专家计算
            for expert_id in range(self.num_experts):
                # 找到使用该expert_id的样本
                mask = (expert_idx == expert_id)

                if mask.any():  # any()任一为True则返回True
                    # 获取使用该专家的输入
                    expert_input = x[mask]

                    # 通过专家网络
                    expert_output = self.experts[expert_id](expert_input)

                    # 加权累加到输出
                    output[mask] += expert_output * expert_prob[mask]

        return output

# 使用示例
# moe_layer = MoELayer(input_dim=768, hidden_dim=2048, output_dim=768, num_experts=8, top_k=2)
# x = torch.randn(32, 128, 768)
# output = moe_layer(x)

2.2 多头潜在注意力( MLA )

MLA ( Multi-Head Latent Attention )是 DeepSeek-V3 的核心创新之一,通过低秩键值压缩显著减少 KV Cache 的显存占用。

Python
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class MultiHeadLatentAttention(nn.Module):
    """
    多头潜在注意力(MLA)

    MLA 通过低秩键值压缩减少 KV Cache 的显存占用
    """
    def __init__(self, embed_dim, num_heads, latent_dim=64):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        self.latent_dim = latent_dim

        # Q 的投影
        self.q_proj = nn.Linear(embed_dim, embed_dim)

        # KV 的低秩压缩(下投影共享,上投影分开)
        self.kv_down_proj = nn.Linear(embed_dim, latent_dim)
        self.k_up_proj = nn.Linear(latent_dim, embed_dim)  # K 的上投影
        self.v_up_proj = nn.Linear(latent_dim, embed_dim)  # V 的上投影

        # 输出投影
        self.out_proj = nn.Linear(embed_dim, embed_dim)

    def forward(self, x, mask=None):
        """
        前向传播

        Args:
            x: 输入张量 [batch_size, seq_len, embed_dim]
            mask: 注意力掩码 [batch_size, seq_len]
        """
        batch_size, seq_len, _ = x.shape

        # 计算 Q
        Q = self.q_proj(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)  # 重塑张量形状

        # 计算 KV 的低秩表示(共享下投影,分开上投影)
        # MLA低秩压缩:先下投影到latent_dim节省KV Cache,再上投影恢复原始维度
        kv_compressed = self.kv_down_proj(x)  # [batch_size, seq_len, latent_dim]
        k = self.k_up_proj(kv_compressed)  # [batch_size, seq_len, embed_dim]
        v = self.v_up_proj(kv_compressed)  # [batch_size, seq_len, embed_dim]

        # view将(B,S,d_model)拆为(B,S,heads,d_k),transpose(1,2)得(B,heads,S,d_k)多头并行格式
        K = k.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        V = v.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

        # 计算注意力分数
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)

        # 应用掩码
        if mask is not None:
            scores = scores.masked_fill(mask.unsqueeze(1).unsqueeze(1) == 0, float('-inf'))  # unsqueeze增加一个维度

        # 计算注意力权重
        attn_weights = F.softmax(scores, dim=-1)

        # 计算输出
        output = torch.matmul(attn_weights, V)
        output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embed_dim)

        # 输出投影
        output = self.out_proj(output)

        return output

# 使用示例
# mla = MultiHeadLatentAttention(embed_dim=768, num_heads=12, latent_dim=64)
# x = torch.randn(32, 128, 768)
# output = mla(x)

2.3 旋转位置编码( RoPE )

Python
import torch
import torch.nn as nn
import math

class RotaryPositionalEmbedding(nn.Module):
    """
    旋转位置编码(RoPE)
    """
    def __init__(self, embed_dim, max_seq_len=8192):
        super().__init__()
        self.embed_dim = embed_dim
        self.max_seq_len = max_seq_len

        # 计算频率
        inv_freq = 1.0 / (10000 ** (torch.arange(0, embed_dim, 2).float() / embed_dim))
        self.register_buffer('inv_freq', inv_freq)

    def forward(self, x, seq_len=None):
        """
        计算旋转位置编码

        Args:
            x: 输入张量 [batch_size, seq_len, embed_dim]
            seq_len: 序列长度
        """
        if seq_len is None:
            seq_len = x.size(1)

        # 生成位置索引
        positions = torch.arange(seq_len, device=x.device).float()

        # 计算角度
        # RoPE角度计算:unsqueeze(-1)和unsqueeze(0)将1D向量广播相乘得(S,d/2)矩阵
        angles = positions.unsqueeze(-1) * self.inv_freq.unsqueeze(0)  # [seq_len, embed_dim/2]

        # 创建旋转矩阵
        # 两次unsqueeze(0)扩展为(1,1,S,d/2)以广播匹配(B,heads,S,d_k)的Q/K张量
        angles = angles.unsqueeze(0).unsqueeze(0)  # [1, 1, seq_len, embed_dim/2]
        cos_angles = torch.cos(angles)
        sin_angles = torch.sin(angles)

        return cos_angles, sin_angles

def apply_rotary_pos_emb(q, k, cos_angles, sin_angles):
    """
    应用旋转位置编码到Q和K

    Args:
        q: 查询张量 [batch_size, num_heads, seq_len, head_dim]
        k: 键张量 [batch_size, num_heads, seq_len, head_dim]
        cos_angles: 余弦角度 [1, 1, seq_len, head_dim/2]
        sin_angles: 正弦角度 [1, 1, seq_len, head_dim/2]
    """
    # 分离实部和虚部
    q_real, q_imag = q[..., ::2], q[..., 1::2]
    k_real, k_imag = k[..., ::2], k[..., 1::2]

    # 应用旋转
    q_rot_real = q_real * cos_angles - q_imag * sin_angles
    q_rot_imag = q_real * sin_angles + q_imag * cos_angles
    k_rot_real = k_real * cos_angles - k_imag * sin_angles
    k_rot_imag = k_real * sin_angles + k_imag * cos_angles

    # 合并
    q_rot = torch.stack([q_rot_real, q_rot_imag], dim=-1).flatten(-2)  # torch.stack沿新维度拼接张量
    k_rot = torch.stack([k_rot_real, k_rot_imag], dim=-1).flatten(-2)

    return q_rot, k_rot

# 使用示例
# rope = RotaryPositionalEmbedding(embed_dim=768)
# x = torch.randn(32, 128, 768)
# cos_angles, sin_angles = rope(x)

3. GRPO 强化学习训练

3.1 GRPO 算法原理

GRPO ( Group Relative Policy Optimization )是 DeepSeek R1 使用的强化学习算法,它是对传统 PPO ( Proximal Policy Optimization )的改进。

核心特点: - 无需 Value Model: GRPO 去掉了 PPO 中的价值模型,减少了计算开销 - Group Sampling:对每个问题采样多个输出,计算平均奖励作为基线 - 相对优势:使用相对于组内平均奖励的优势,而不是绝对优势

Python
import torch
import torch.nn as nn
import torch.nn.functional as F

class GRPOPolicy(nn.Module):
    """
    GRPO 策略网络
    """
    def __init__(self, config):
        super().__init__()
        self.config = config

        # 词嵌入
        self.embeddings = nn.Embedding(config.vocab_size, config.hidden_size)

        # Transformer层
        # 注意:PyTorch 2.1+ 中 TransformerEncoderLayer 的 dropout 和 activation 参数
        # 推荐通过 activation 参数传入 'gelu' 或使用自定义激活函数
        # 如需更灵活的配置,建议使用 nn.TransformerEncoderLayer.from_config() 方法
        self.layers = nn.ModuleList([
            nn.TransformerEncoderLayer(
                d_model=config.hidden_size,
                nhead=config.num_attention_heads,
                dim_feedforward=config.intermediate_size,
                dropout=0.1,  # PyTorch 2.0+ 仍支持此参数
                activation='gelu',
                batch_first=True  # PyTorch 1.12+ 推荐使用 batch_first=True
            )
            for _ in range(config.num_hidden_layers)
        ])

        # 最终层归一化
        self.ln_f = nn.LayerNorm(config.hidden_size)

        # 语言模型头
        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

    def forward(self, input_ids, attention_mask=None):
        """
        前向传播
        """
        batch_size, seq_len = input_ids.shape

        # 词嵌入
        hidden_states = self.embeddings(input_ids)

        # 通过Transformer层
        for layer in self.layers:
            hidden_states = layer(
                hidden_states.transpose(0, 1),
                src_key_padding_mask=~attention_mask.bool() if attention_mask is not None else None
            ).transpose(0, 1)

        # 最终层归一化
        hidden_states = self.ln_f(hidden_states)

        # 语言模型头
        logits = self.lm_head(hidden_states)

        return logits

class GRPOTrainer:
    """
    GRPO 训练器
    """
    def __init__(self, policy_model, reference_model, reward_model, config):
        self.policy_model = policy_model
        self.reference_model = reference_model
        self.reward_model = reward_model
        self.config = config

        self.optimizer = torch.optim.AdamW(
            policy_model.parameters(),
            lr=config.learning_rate,
            betas=(0.9, 0.999)
        )

    def compute_advantages(self, rewards):
        """
        计算优势(相对于组内平均奖励)

        Args:
            rewards: 奖励张量 [batch_size, group_size]
        """
        # 计算组内平均奖励
        mean_rewards = rewards.mean(dim=-1, keepdim=True)  # [batch_size, 1]

        # 计算优势(相对于平均)
        advantages = rewards - mean_rewards  # [batch_size, group_size]

        # 归一化优势
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        return advantages

    def compute_policy_loss(self, policy_logits, ref_logits, advantages, attention_mask):
        """
        计算策略损失

        Args:
            policy_logits: 策略模型的输出 [batch_size, seq_len, vocab_size]
            ref_logits: 参考模型的输出 [batch_size, seq_len, vocab_size]
            advantages: 优势 [batch_size, group_size]
            attention_mask: 注意力掩码 [batch_size, seq_len]
        """
        # 计算策略概率
        policy_log_probs = F.log_softmax(policy_logits, dim=-1)
        ref_log_probs = F.log_softmax(ref_logits, dim=-1)

        # 计算重要性比率
        ratio = torch.exp(policy_log_probs - ref_log_probs)

        # 应用 GRPO 裁剪
        clipped_ratio = torch.clamp(ratio, 1 - self.config.clip_ratio, 1 + self.config.clip_ratio)

        # 计算损失
        policy_loss = -torch.min(
            ratio * advantages.unsqueeze(-1).unsqueeze(-1),
            clipped_ratio * advantages.unsqueeze(-1).unsqueeze(-1)
        )

        # 应用掩码
        if attention_mask is not None:
            policy_loss = policy_loss * attention_mask.unsqueeze(-1)

        policy_loss = policy_loss.sum() / attention_mask.sum()

        return policy_loss

    def train_step(self, questions, group_size=16):
        """
        训练步骤

        Args:
            questions: 问题列表
            group_size: 每个问题的采样组大小
        """
        total_loss = 0
        num_batches = 0

        for question in questions:
            # 对每个问题采样多个输出
            outputs = []
            rewards = []

            for _ in range(group_size):
                # 从策略模型采样输出
                output = self.sample_output(question)
                outputs.append(output)

                # 计算奖励
                reward = self.reward_model(question, output)
                rewards.append(reward)

            # 转换为张量
            rewards = torch.tensor(rewards)

            # 计算优势
            advantages = self.compute_advantages(rewards)

            # 计算策略损失(简化:使用平均优势加权的负对数概率)
            # 实际 GRPO 会对每个 sample 计算 policy/ref log-prob 并裁剪
            avg_advantage = advantages.mean()
            policy_loss = -avg_advantage  # 简化的策略梯度

            # 反向传播
            self.optimizer.zero_grad()  # 清零梯度
            policy_loss.backward()  # 反向传播计算梯度
            self.optimizer.step()  # 更新参数

            total_loss += policy_loss.item()  # 将单元素张量转为Python数值
            num_batches += 1

        return total_loss / num_batches

    def sample_output(self, question):
        """
        从策略模型采样输出

        Args:
            question: 输入问题
        """
        # 这里简化实现,实际需要更复杂的采样逻辑
        input_ids = self.tokenize(question)
        attention_mask = torch.ones_like(input_ids)

        with torch.no_grad():  # 禁用梯度计算,节省内存
            logits = self.policy_model(input_ids, attention_mask)

        # 采样输出
        output_ids = torch.multinomial(F.softmax(logits[:, -1, :], dim=-1), num_samples=1)

        return output_ids

    def tokenize(self, text):
        """
        简化的分词函数
        """
        # 这里简化实现,实际需要使用真实的 tokenizer
        return torch.randint(0, 100000, (1, 128))

# 使用示例
# config = type('Config', (), {
#     'vocab_size': 100000,
#     'hidden_size': 768,
#     'num_hidden_layers': 12,
#     'num_attention_heads': 12,
#     'intermediate_size': 2048,
#     'learning_rate': 3e-6,
#     'clip_ratio': 10.0
# })()
#
# policy_model = GRPOPolicy(config)
# reference_model = GRPOPolicy(config)
# reward_model = lambda q, o: torch.randn(1)  # 简化的奖励函数
#
# trainer = GRPOTrainer(policy_model, reference_model, reward_model, config)
# loss = trainer.train_step(["What is 2+2?"], group_size=16)

3.2 推理能力的涌现

DeepSeek R1 的推理能力是通过 GRPO 强化学习训练自然涌现的,而不是通过硬编码的推理模块。

涌现过程: 1. 初始阶段:模型开始生成简单的推理步骤 2. 中间阶段:推理链逐渐变长,模型开始自我反思 3. 后期阶段:推理能力稳定,能够解决复杂问题

关键发现: - 模型自发地生成了思维链( Chain of Thought ) - 模型能够自我反思和修正错误 - 推理能力随着训练时间的增加而增强

Python
class RewardModel:
    """
    奖励模型(基于规则的奖励)
    """
    def __init__(self):
        self.rules = {
            'format': self.check_format,
            'correctness': self.check_correctness,
            'consistency': self.check_consistency
        }

    def __call__(self, question, output):  # __call__使实例可像函数一样调用
        """
        计算总奖励
        """
        total_reward = 0

        for rule_name, rule_func in self.rules.items():
            reward = rule_func(question, output)
            total_reward += reward

        return total_reward

    def check_format(self, question, output):
        """
        检查输出格式
        """
        # 检查是否有思维链标记
        if '<think>' in output and '</think>' in output:
            return 1.0

        return 0.0

    def check_correctness(self, question, output):
        """
        检查答案正确性
        """
        # 这里简化实现,实际需要更复杂的逻辑
        # 例如,对于数学问题,可以检查最终答案是否正确
        return torch.randn(1).item()

    def check_consistency(self, question, output):
        """
        检查推理一致性
        """
        # 检查推理步骤是否合理
        # 这里简化实现
        return torch.randn(1).item()

# 使用示例
# reward_model = RewardModel()
# question = "What is 2+2?"
# output = "<think>Let me think about this...\n2+2=4</think>\nThe answer is 4."
# reward = reward_model(question, output)

4. 完整的 DeepSeek R1 模型

Python
import torch
import torch.nn as nn
from transformers import PreTrainedModel, PretrainedConfig

class DeepSeekR1Config(PretrainedConfig):
    """
    DeepSeek R1配置
    """
    def __init__(
        self,
        vocab_size=100000,
        max_position_embeddings=128000,
        hidden_size=768,
        num_hidden_layers=32,
        num_attention_heads=12,
        intermediate_size=2048,
        num_experts=256,
        top_k_experts=8,
        latent_dim=64,
        **kwargs  # *args接收任意位置参数,**kwargs接收任意关键字参数
    ):
        super().__init__(**kwargs)
        self.vocab_size = vocab_size
        self.max_position_embeddings = max_position_embeddings
        self.hidden_size = hidden_size
        self.num_hidden_layers = num_hidden_layers
        self.num_attention_heads = num_attention_heads
        self.intermediate_size = intermediate_size
        self.num_experts = num_experts
        self.top_k_experts = top_k_experts
        self.latent_dim = latent_dim

class DeepSeekR1Model(PreTrainedModel):
    """
    DeepSeek R1模型(基于DeepSeek-V3架构)

    注意:这是一个简化的实现,用于说明架构原理。
    实际的 DeepSeek R1 使用更复杂的实现和优化。
    """
    config_class = DeepSeekR1Config

    def __init__(self, config):
        super().__init__(config)
        self.config = config

        # 词嵌入
        self.embeddings = nn.Embedding(config.vocab_size, config.hidden_size)

        # 旋转位置编码
        self.rope = RotaryPositionalEmbedding(config.hidden_size, config.max_position_embeddings)

        # Transformer层
        self.layers = nn.ModuleList([
            DeepSeekR1Layer(config)
            for _ in range(config.num_hidden_layers)
        ])

        # 最终层归一化
        self.ln_f = nn.LayerNorm(config.hidden_size)

        # 语言模型头
        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

        # 初始化权重
        self.apply(self._init_weights)

    def _init_weights(self, module):
        """
        初始化权重
        """
        if isinstance(module, nn.Linear):  # isinstance检查类型
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
        elif isinstance(module, nn.LayerNorm):
            nn.init.ones_(module.weight)
            nn.init.zeros_(module.bias)

    def forward(self, input_ids, attention_mask=None):
        """
        前向传播
        """
        batch_size, seq_len = input_ids.shape

        # 词嵌入
        hidden_states = self.embeddings(input_ids)

        # 位置编码
        cos_angles, sin_angles = self.rope(hidden_states, seq_len)

        # 通过Transformer层
        all_hidden_states = []
        for layer in self.layers:
            hidden_states = layer(hidden_states, cos_angles, sin_angles, attention_mask)
            all_hidden_states.append(hidden_states)

        # 最终层归一化
        hidden_states = self.ln_f(hidden_states)

        # 语言模型头
        logits = self.lm_head(hidden_states)

        return {
            "logits": logits,
            "hidden_states": all_hidden_states
        }

class DeepSeekR1Layer(nn.Module):
    """
    DeepSeek R1层
    """
    def __init__(self, config):
        super().__init__()
        self.config = config

        # 多头潜在注意力(MLA)
        self.attention = MultiHeadLatentAttention(
            config.hidden_size,
            config.num_attention_heads,
            config.latent_dim
        )

        # 前馈网络(MoE)
        self.moe = MoELayer(
            config.hidden_size,
            config.intermediate_size,
            config.hidden_size,
            config.num_experts,
            config.top_k_experts
        )

        # 层归一化
        self.norm1 = nn.LayerNorm(config.hidden_size)
        self.norm2 = nn.LayerNorm(config.hidden_size)

    def forward(self, x, cos_angles, sin_angles, attention_mask=None):
        """
        前向传播
        """
        # 注意力
        residual = x
        x = self.norm1(x)
        attn_output = self.attention(x, attention_mask)
        x = residual + attn_output

        # 前馈网络(MoE)
        residual = x
        x = self.norm2(x)
        moe_output = self.moe(x)
        x = residual + moe_output

        return x

# 使用示例
# config = DeepSeekR1Config(
#     vocab_size=100000,
#     hidden_size=768,
#     num_hidden_layers=12,
#     num_attention_heads=12
# )
# model = DeepSeekR1Model(config)
# input_ids = torch.randint(0, 100000, (32, 128))
# outputs = model(input_ids)

5. 练习题

基础练习

  1. 实现简单的 MoE 层
Python
# 练习: 实现一个简单的MoE层
class SimpleMoE(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_experts):
        # 你的代码
        pass

    def forward(self, x):
        # 你的代码
        pass
  1. 实现多头潜在注意力
Python
# 练习: 实现一个简单的MLA
class SimpleMLA(nn.Module):
    def __init__(self, embed_dim, num_heads, latent_dim):
        # 你的代码
        pass

    def forward(self, x):
        # 你的代码
        pass

进阶练习

  1. 实现 GRPO 训练器
Python
# 练习: 实现一个GRPO训练器
class GRPOTrainer:
    def __init__(self, policy_model, reference_model, reward_model):
        # 你的代码
        pass

    def compute_advantages(self, rewards):
        # 你的代码
        pass

    def train_step(self, questions, group_size):
        # 你的代码
        pass
  1. 实现奖励模型
Python
# 练习: 实现一个奖励模型
class RewardModel:
    def __init__(self):
        # 你的代码
        pass

    def __call__(self, question, output):
        # 你的代码
        pass

项目练习

  1. 创建完整的推理优化模型
  2. 实现 MoE 架构
  3. 添加 MLA 注意力
  4. 实现 GRPO 训练

6. 最佳实践

✅ 推荐做法

  1. 理解架构原理
  2. 深入理解 MoE 机制
  3. 掌握 MLA 注意力
  4. 理解 GRPO 算法

  5. 优化推理性能

  6. 使用批处理
  7. 优化注意力计算
  8. 减少内存访问

  9. 监控训练过程

  10. 追踪奖励变化
  11. 监控推理质量
  12. 评估模型性能

❌ 避免做法

  1. 盲目使用复杂架构
  2. 根据任务选择架构
  3. 考虑计算资源
  4. 评估实际收益

  5. 忽略训练稳定性

  6. 监控梯度
  7. 调整学习率
  8. 使用合适的裁剪

  9. 缺乏实验验证

  10. 在多个任务上测试
  11. 对比不同算法
  12. 记录实验结果

7. 总结

本章介绍了 DeepSeek R1 的核心技术:

  • 混合专家( MoE ): 高效的专家网络
  • 多头潜在注意力( MLA ): 低秩键值压缩
  • 旋转位置编码: RoPE 位置编码
  • GRPO 强化学习: 推理能力自然涌现
  • 奖励模型: 基于规则的奖励

这些技术共同构成了 DeepSeek R1 的高效推理能力。

8. 下一步

继续学习08-推理优化技术,深入了解推理优化的具体技术。