📖 第 6 章:文本生成¶
学习时间: 8 小时 难度星级:⭐⭐⭐⭐ 前置知识: RNN/LSTM 、注意力机制、 Transformer 基础 学习目标:掌握从 N-gram 到 GPT 的文本生成方法,理解解码策略
📋 目录¶
- 1. 文本生成概述
- 2. N-gram 语言模型生成
- 3. RNN/LSTM 文本生成
- 4. Seq2Seq 模型
- 5. 注意力机制
- 6. 解码策略
- 7. GPT 生成原理
- 8. 实战:古诗生成器
- 9. 关键复盘点
- 10. 练习题
1. 文本生成概述¶
1.1 文本生成的分类¶
text_generation_types = {
"自由生成": {
"描述": "给定开头,续写后续内容",
"例子": "输入'春天来了' → '春天来了,万物复苏,百花齐放'",
"模型": "GPT、LSTM语言模型",
},
"条件生成": {
"描述": "根据条件生成文本",
"例子": "机器翻译、摘要生成、问答",
"模型": "Seq2Seq、T5、BART",
},
"受控生成": {
"描述": "控制生成文本的属性",
"例子": "控制情感、风格、长度等",
"模型": "CTRL、Plug-and-Play",
},
}
generation_pipeline = """
文本生成Pipeline:
1. 输入处理 → 2. 编码器(可选) → 3. 解码器 → 4. 解码策略 → 5. 输出文本
│
├── 贪心搜索
├── Beam Search
├── Top-K采样
└── Top-P/Nucleus采样
"""
print(generation_pipeline)
1.2 自回归 vs 非自回归¶
自回归(Autoregressive)生成:
y₁ → y₂ → y₃ → ... → yₙ
每一步依赖之前所有输出
优点:质量高 缺点:速度慢(串行)
代表:GPT、传统Seq2Seq
非自回归(Non-autoregressive)生成:
y₁ y₂ y₃ ... yₙ(并行生成)
所有位置同时输出
优点:速度快 缺点:质量较低
代表:NAT、Mask-Predict
2. N-gram 语言模型生成¶
2.1 语言模型基础¶
N-gram 近似:
import random
from collections import defaultdict, Counter
class NGramGenerator:
"""N-gram语言模型文本生成器"""
def __init__(self, n=3):
self.n = n
self.ngram_counts = defaultdict(Counter) # defaultdict访问不存在的键时返回默认值
self.vocab = set()
def train(self, texts):
"""从文本列表训练N-gram模型"""
for text in texts:
words = ['<BOS>'] * (self.n - 1) + list(text) + ['<EOS>']
self.vocab.update(text)
for i in range(len(words) - self.n + 1):
context = tuple(words[i:i+self.n-1])
target = words[i+self.n-1]
self.ngram_counts[context][target] += 1
def _get_next_word_probs(self, context):
"""获取下一个词的概率分布"""
counts = self.ngram_counts[context]
total = sum(counts.values())
if total == 0:
return {}
return {w: c / total for w, c in counts.items()}
def generate(self, max_len=50, temperature=1.0, start_text=""):
"""生成文本"""
if start_text:
context = list(start_text[-(self.n-1):])
result = list(start_text)
else:
context = ['<BOS>'] * (self.n - 1)
result = []
for _ in range(max_len):
ctx_tuple = tuple(context[-(self.n-1):])
probs = self._get_next_word_probs(ctx_tuple)
if not probs or '<EOS>' in probs and random.random() < probs['<EOS>']:
break
# 移除<EOS>后的概率
probs.pop('<EOS>', None)
if not probs:
break
# 温度采样
words = list(probs.keys())
weights = list(probs.values())
if temperature != 1.0:
import math
weights = [math.exp(math.log(w + 1e-10) / temperature) for w in weights]
total = sum(weights)
weights = [w / total for w in weights]
next_word = random.choices(words, weights=weights, k=1)[0]
result.append(next_word)
context.append(next_word)
return ''.join(result)
# 训练
poems = [
"春眠不觉晓处处闻啼鸟",
"白日依山尽黄河入海流",
"床前明月光疑是地上霜",
"举头望明月低头思故乡",
"锄禾日当午汗滴禾下土",
"谁知盘中餐粒粒皆辛苦",
"红豆生南国春来发几枝",
"愿君多采撷此物最相思",
"春风又绿江南岸明月何时照我还",
"两岸猿声啼不住轻舟已过万重山",
]
gen = NGramGenerator(n=3)
gen.train(poems)
print("N-gram生成示例:")
for i in range(5):
text = gen.generate(max_len=20, temperature=0.8, start_text="春")
print(f" {i+1}. {text}")
3. RNN/LSTM 文本生成¶
3.1 字符级 LSTM 文本生成¶
import torch
import torch.nn as nn
import numpy as np
class CharLSTMGenerator(nn.Module): # 继承nn.Module定义网络层
"""字符级LSTM文本生成器"""
def __init__(self, vocab_size, embedding_dim=64, hidden_dim=256,
num_layers=2, dropout=0.3):
super().__init__() # super()调用父类方法
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(
embedding_dim, hidden_dim,
num_layers=num_layers,
dropout=dropout if num_layers > 1 else 0,
batch_first=True,
)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_dim, vocab_size)
def forward(self, x, hidden=None):
# x: (batch_size, seq_len)
embedded = self.embedding(x)
output, hidden = self.lstm(embedded, hidden)
output = self.dropout(output)
logits = self.fc(output) # (batch_size, seq_len, vocab_size)
return logits, hidden
def generate(self, start_char_idx, char2idx, idx2char,
max_len=100, temperature=1.0, device='cpu'):
"""生成文本"""
if temperature <= 0:
raise ValueError("temperature 必须大于 0")
if start_char_idx not in idx2char:
raise KeyError("start_char_idx 不在词表中")
self.eval() # eval()评估模式
char_idx = start_char_idx
result = [idx2char[char_idx]]
hidden = None
with torch.no_grad(): # 禁用梯度计算,节省内存
for _ in range(max_len):
x = torch.LongTensor([[char_idx]]).to(device) # 移至GPU/CPU
logits, hidden = self(x, hidden)
# 温度调节
logits = logits[0, -1] / temperature
probs = torch.softmax(logits, dim=0)
# 采样
char_idx = torch.multinomial(probs, 1).item() # 将单元素张量转为Python数值
char = idx2char[char_idx]
if char == '<EOS>':
break
result.append(char)
return ''.join(result)
# 准备数据
corpus = "春眠不觉晓处处闻啼鸟夜来风雨声花落知多少白日依山尽黄河入海流欲穷千里目更上一层楼"
chars = sorted(set(corpus))
char2idx = {'<PAD>': 0, '<BOS>': 1, '<EOS>': 2}
for c in chars:
char2idx[c] = len(char2idx)
idx2char = {i: c for c, i in char2idx.items()}
vocab_size = len(char2idx)
# 准备训练序列
def create_sequences(text, seq_len=10):
X, Y = [], []
indices = [char2idx[c] for c in text]
for i in range(len(indices) - seq_len):
X.append(indices[i:i+seq_len])
Y.append(indices[i+1:i+seq_len+1])
return torch.LongTensor(X), torch.LongTensor(Y)
X, Y = create_sequences(corpus, seq_len=5)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if X.numel() == 0 or Y.numel() == 0:
raise ValueError("语料过短,无法构造训练序列")
X, Y = X.to(device), Y.to(device)
# 训练;这是一个最小可运行的字符级示例,真实项目还需要验证集、保存点和更大的语料
model = CharLSTMGenerator(vocab_size=vocab_size, embedding_dim=32, hidden_dim=128, num_layers=2)
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()
model.train()
for epoch in range(100):
logits, _ = model(X)
loss = criterion(logits.view(-1, vocab_size), Y.view(-1)) # 重塑张量形状
optimizer.zero_grad() # 清零梯度
loss.backward() # 反向传播计算梯度
nn.utils.clip_grad_norm_(model.parameters(), 5.0)
optimizer.step() # 更新参数
if (epoch + 1) % 25 == 0:
print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")
# 生成
print("\nLSTM生成文本:")
for temp in [0.5, 0.8, 1.0, 1.5]:
start_idx = char2idx.get('春', 1)
text = model.generate(start_idx, char2idx, idx2char, max_len=20, temperature=temp, device=device)
print(f" 温度={temp}: {text}")
4. Seq2Seq 模型¶
4.1 Encoder-Decoder 架构¶
Seq2Seq架构 (以翻译为例):
Encoder Decoder
┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐
│RNN│→│RNN│→│RNN│→│RNN│───c───→│RNN│→│RNN│→│RNN│
└───┘ └───┘ └───┘ └───┘ └───┘ └───┘ └───┘
↑ ↑ ↑ ↑ context ↑ ↑ ↑
我 爱 中 国 vector I love China
↓ ↓ ↓
I love China
4.2 PyTorch 实现 Seq2Seq¶
class Encoder(nn.Module):
"""编码器"""
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.3):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.rnn = nn.GRU(embedding_dim, hidden_dim, num_layers=num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
embedded = self.dropout(self.embedding(x))
outputs, hidden = self.rnn(embedded)
return outputs, hidden
class Decoder(nn.Module):
"""解码器"""
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.3):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.rnn = nn.GRU(embedding_dim, hidden_dim, num_layers=num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
self.fc = nn.Linear(hidden_dim, vocab_size)
self.dropout = nn.Dropout(dropout)
def forward(self, x, hidden):
embedded = self.dropout(self.embedding(x))
output, hidden = self.rnn(embedded, hidden)
prediction = self.fc(output)
return prediction, hidden
class Seq2Seq(nn.Module):
"""Seq2Seq模型"""
def __init__(self, encoder, decoder, device='cpu'):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(self, src, trg, teacher_forcing_ratio=0.5):
batch_size = src.size(0)
trg_len = trg.size(1)
trg_vocab_size = self.decoder.fc.out_features
outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
# 编码
_, hidden = self.encoder(src)
# 解码(第一个输入为<BOS>)
dec_input = trg[:, 0:1]
for t in range(1, trg_len):
output, hidden = self.decoder(dec_input, hidden)
outputs[:, t:t+1, :] = output
# Teacher Forcing
if random.random() < teacher_forcing_ratio:
dec_input = trg[:, t:t+1] # 使用真实标签
else:
dec_input = output.argmax(dim=-1) # 使用预测结果
return outputs
# 创建Seq2Seq
src_vocab_size = 100
trg_vocab_size = 100
embed_dim = 64
hidden_dim = 128
encoder = Encoder(src_vocab_size, embed_dim, hidden_dim)
decoder = Decoder(trg_vocab_size, embed_dim, hidden_dim)
seq2seq = Seq2Seq(encoder, decoder)
print(f"Seq2Seq模型参数量: {sum(p.numel() for p in seq2seq.parameters()):,}")
5. 注意力机制¶
5.1 注意力机制在生成中的作用¶
问题:Encoder将整个输入压缩到一个固定长度的context vector,信息损失大。
解决:Attention让Decoder每一步都能"查看"Encoder的所有输出。
Encoder输出
h₁ h₂ h₃ h₄
│ │ │ │
├───┼───┼───┤
│ Attention │ ← 计算每个位置的权重
├───┼───┼───┤
│ │ │ │
α₁ α₂ α₃ α₄ 权重
│ │ │ │
└───┴───┴───┘
│
context ← 加权求和
│
Decoder sₜ
5.2 实现 Attention¶
class BahdanauAttention(nn.Module):
"""Bahdanau注意力(加性注意力)"""
def __init__(self, hidden_dim):
super().__init__()
self.W_query = nn.Linear(hidden_dim, hidden_dim, bias=False)
self.W_key = nn.Linear(hidden_dim, hidden_dim, bias=False)
self.v = nn.Linear(hidden_dim, 1, bias=False)
def forward(self, query, keys):
"""
query: (batch, 1, hidden_dim) - 解码器当前隐状态
keys: (batch, src_len, hidden_dim) - 编码器所有输出
"""
# 计算注意力分数
scores = self.v(torch.tanh(
self.W_query(query) + self.W_key(keys)
)) # (batch, src_len, 1)
attention_weights = torch.softmax(scores, dim=1)
context = (attention_weights * keys).sum(dim=1, keepdim=True)
return context, attention_weights.squeeze(-1) # squeeze压缩维度
class LuongAttention(nn.Module):
"""Luong注意力(乘性注意力)"""
def __init__(self, hidden_dim, method='dot'):
super().__init__()
self.method = method
if method == 'general':
self.W = nn.Linear(hidden_dim, hidden_dim, bias=False)
def forward(self, query, keys):
"""
query: (batch, 1, hidden_dim)
keys: (batch, src_len, hidden_dim)
"""
if self.method == 'dot':
scores = torch.bmm(query, keys.transpose(1, 2))
elif self.method == 'general':
scores = torch.bmm(self.W(query), keys.transpose(1, 2))
attention_weights = torch.softmax(scores, dim=-1)
context = torch.bmm(attention_weights, keys)
return context, attention_weights.squeeze(1)
# 测试
batch_size, src_len, hidden_dim = 2, 10, 64
query = torch.randn(batch_size, 1, hidden_dim)
keys = torch.randn(batch_size, src_len, hidden_dim)
bahdanau = BahdanauAttention(hidden_dim)
context, weights = bahdanau(query, keys)
print(f"Bahdanau Attention:")
print(f" Context: {context.shape}")
print(f" Weights: {weights.shape}")
print(f" 权重和: {weights.sum(dim=1)}")
luong = LuongAttention(hidden_dim, method='dot')
context, weights = luong(query, keys)
print(f"\nLuong Attention:")
print(f" Context: {context.shape}")
print(f" 权重分布: {weights[0].data}")
6. 解码策略¶
6.1 各种解码策略对比¶
从概率角度看,解码是在近似求解:
但不同策略优化的目标并不完全一样:
- 贪心 / Beam Search 更接近“找高概率序列”;
- Top-k / Top-p 采样 是先截断候选集合,再按概率分布随机采样,更强调多样性;
- 开放式生成(对话、写作)通常没有唯一标准答案,所以“序列概率最高”不一定等于“用户最喜欢”。
import torch
import torch.nn.functional as F
class DecodingStrategies:
"""文本生成解码策略集合"""
@staticmethod # @staticmethod不需要实例即可调用
def greedy_decode(logits):
"""贪心解码:选概率最大的"""
return logits.argmax(dim=-1)
@staticmethod
def temperature_sampling(logits, temperature=1.0):
"""温度采样"""
if temperature <= 0:
raise ValueError("temperature 必须大于 0")
scaled_logits = logits / temperature
probs = F.softmax(scaled_logits, dim=-1) # F.xxx PyTorch函数式API
return torch.multinomial(probs, 1).squeeze(-1)
@staticmethod
def top_k_sampling(logits, k=10, temperature=1.0):
"""Top-K采样:只从概率最高的K个中采样"""
if temperature <= 0:
raise ValueError("temperature 必须大于 0")
vocab_size = logits.size(-1)
if k < 1 or k > vocab_size:
raise ValueError(f"k 必须位于 [1, {vocab_size}]")
scaled_logits = logits / temperature
top_k_values, top_k_indices = torch.topk(scaled_logits, k, dim=-1)
top_k_probs = F.softmax(top_k_values, dim=-1)
sampled_idx = torch.multinomial(top_k_probs, 1)
return top_k_indices.gather(-1, sampled_idx).squeeze(-1)
@staticmethod
def top_p_sampling(logits, p=0.9, temperature=1.0):
"""Top-P (Nucleus) 采样:从累计概率首次达到 p 的最小 token 集合中采样"""
if temperature <= 0:
raise ValueError("temperature 必须大于 0")
if not 0 < p <= 1:
raise ValueError("p 必须位于 (0, 1] 区间")
scaled_logits = logits.clone() / temperature
sorted_logits, sorted_indices = torch.sort(scaled_logits, descending=True)
sorted_probs = F.softmax(sorted_logits, dim=-1)
cumulative_probs = torch.cumsum(sorted_probs, dim=-1)
# 找到累积概率超过p的位置
sorted_indices_to_remove = cumulative_probs > p
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
sorted_indices_to_remove[..., 0] = False
# 将不需要的位置设为-inf
indices_to_remove = sorted_indices_to_remove.scatter(
-1, sorted_indices, sorted_indices_to_remove
)
scaled_logits[indices_to_remove] = float('-inf')
probs = F.softmax(scaled_logits, dim=-1)
return torch.multinomial(probs, 1).squeeze(-1)
@staticmethod
def beam_search(model, start_token, beam_width=5, max_len=50, eos_token_id=2, length_penalty=0.7):
"""教学版 Beam Search:使用长度惩罚减轻“偏爱短句”问题"""
if beam_width < 1 or max_len < 1:
raise ValueError("beam_width 和 max_len 都必须大于 0")
def normalized_score(log_prob_sum, seq_len):
# 与常见实现一致:按长度惩罚归一化分数;alpha=0 时退化为原始对数概率
return log_prob_sum / (((5 + seq_len) / 6) ** length_penalty)
# 初始化beam
beams = [(torch.LongTensor([[start_token]]), 0.0, None)]
finished = []
for step in range(max_len):
all_candidates = []
for seq, score, hidden in beams:
# 获取模型预测
with torch.no_grad():
logits, new_hidden = model(seq[:, -1:], hidden)
log_probs = F.log_softmax(logits[0, -1], dim=-1)
# 扩展beam
top_k = torch.topk(log_probs, beam_width)
for i in range(beam_width):
token = top_k.indices[i].item()
token_score = top_k.values[i].item()
new_seq = torch.cat([seq, torch.LongTensor([[token]])], dim=1) # torch.cat沿已有维度拼接张量
new_score = score + token_score
norm_score = normalized_score(new_score, new_seq.size(1))
if token == eos_token_id:
finished.append((new_seq, new_score, norm_score))
else:
all_candidates.append((new_seq, new_score, norm_score, new_hidden))
if not all_candidates:
break
# 保留top beam_width个候选
all_candidates.sort(key=lambda x: x[2], reverse=True) # 按归一化分数排序
beams = [(seq, raw_score, hidden) for seq, raw_score, _, hidden in all_candidates[:beam_width]]
if finished:
finished.sort(key=lambda x: x[2], reverse=True)
return finished[0]
best_seq, best_score, _ = max(
((seq, score, normalized_score(score, seq.size(1))) for seq, score, _ in beams),
key=lambda x: x[2]
)
return best_seq, best_score
# 演示各策略的效果
torch.manual_seed(42)
vocab_size = 100
logits = torch.randn(1, vocab_size)
ds = DecodingStrategies()
print("不同解码策略的结果:")
print(f" 贪心: token={ds.greedy_decode(logits).item()}")
print(f" 温度=0.5: token={ds.temperature_sampling(logits, 0.5).item()}")
print(f" 温度=1.0: token={ds.temperature_sampling(logits, 1.0).item()}")
print(f" 温度=1.5: token={ds.temperature_sampling(logits, 1.5).item()}")
print(f" Top-K(k=5): token={ds.top_k_sampling(logits, k=5).item()}")
print(f" Top-P(p=0.9): token={ds.top_p_sampling(logits, p=0.9).item()}")
print("\n策略对比:")
strategies_comparison = """
┌────────────┬──────────┬──────────┬──────────────────────┐
│ 策略 │ 多样性 │ 质量 │ 适用场景 │
├────────────┼──────────┼──────────┼──────────────────────┤
│ 贪心 │ 低 │ 中 │ 机器翻译、纠错 │
│ Beam Search │ 低-中 │ 高 │ 机器翻译 │
│ 温度采样 │ 可调 │ 可调 │ 创意写作 │
│ Top-K │ 中 │ 中-高 │ 对话系统 │
│ Top-P │ 中-高 │ 高 │ 对话/开放式生成常见 │
└────────────┴──────────┴──────────┴──────────────────────┘
"""
print(strategies_comparison)
说明:
top-k/top-p都是先过滤候选 token,再从剩余分布中采样;它们不是“排序算法”。beam search也不是永远优于采样,而是在翻译、摘要这类目标更明确的任务里常更稳定。
7. GPT 生成原理¶
7.1 GPT 架构¶
GPT (Generative Pre-trained Transformer):
Token Embedding + Position Embedding
│
▼
┌─────────────────────┐
│ Transformer Decoder │ × N layers
│ ┌─────────────────┐ │
│ │ Masked Self-Attn│ │ 只能看到左边的token
│ └─────────────────┘ │
│ ┌─────────────────┐ │
│ │ Feed-Forward │ │
│ └─────────────────┘ │
└─────────────────────┘
│
▼
Linear + Softmax
│
▼
下一个token的概率分布
7.2 简化版 GPT 实现¶
import torch
import torch.nn as nn
import math
class MultiHeadAttention(nn.Module):
"""多头自注意力"""
def __init__(self, d_model, num_heads, dropout=0.1):
super().__init__()
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
batch_size, seq_len, _ = x.size()
# view将(B,S,d_model)拆为(B,S,num_heads,d_k)四维,transpose(1,2)交换S和heads得(B,heads,S,d_k)便于并行计算各头注意力
Q = self.W_q(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
K = self.W_k(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
V = self.W_v(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
attention = torch.softmax(scores, dim=-1)
attention = self.dropout(attention)
context = torch.matmul(attention, V)
context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
return self.W_o(context)
class TransformerBlock(nn.Module):
"""Transformer Block"""
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super().__init__()
self.attention = MultiHeadAttention(d_model, num_heads, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.ff = nn.Sequential(
nn.Linear(d_model, d_ff),
nn.GELU(),
nn.Linear(d_ff, d_model),
nn.Dropout(dropout),
)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
attn_out = self.attention(self.norm1(x), mask)
x = x + self.dropout(attn_out)
ff_out = self.ff(self.norm2(x))
x = x + ff_out
return x
class MiniGPT(nn.Module):
"""简化版GPT"""
def __init__(self, vocab_size, d_model=256, num_heads=4,
num_layers=4, d_ff=512, max_len=512, dropout=0.1):
super().__init__()
self.token_embedding = nn.Embedding(vocab_size, d_model)
self.position_embedding = nn.Embedding(max_len, d_model)
self.dropout = nn.Dropout(dropout)
self.blocks = nn.ModuleList([
TransformerBlock(d_model, num_heads, d_ff, dropout)
for _ in range(num_layers)
])
self.norm = nn.LayerNorm(d_model)
self.head = nn.Linear(d_model, vocab_size, bias=False)
# 权重共享
self.head.weight = self.token_embedding.weight
def forward(self, x):
batch_size, seq_len = x.size()
# 创建因果掩码(只能看到左边的token)
# tril生成(S,S)下三角掩码,两次unsqueeze(0)扩展为(1,1,S,S)以广播匹配(B,heads,S,S)的注意力分数矩阵
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0).unsqueeze(0) # unsqueeze增加一个维度
mask = mask.to(x.device)
# 嵌入
positions = torch.arange(seq_len, device=x.device).unsqueeze(0)
x = self.token_embedding(x) + self.position_embedding(positions)
x = self.dropout(x)
# Transformer块
for block in self.blocks:
x = block(x, mask)
x = self.norm(x)
logits = self.head(x)
return logits
@torch.no_grad()
def generate(self, start_tokens, max_new_tokens=50, temperature=1.0, top_k=None):
"""自回归生成"""
self.eval()
tokens = start_tokens.clone()
for _ in range(max_new_tokens):
logits = self(tokens)
next_logits = logits[:, -1, :] / temperature
if top_k is not None:
v, _ = torch.topk(next_logits, min(top_k, next_logits.size(-1)))
next_logits[next_logits < v[:, [-1]]] = float('-inf') # [-1]负索引取最后元素
probs = torch.softmax(next_logits, dim=-1)
next_token = torch.multinomial(probs, 1)
tokens = torch.cat([tokens, next_token], dim=1)
return tokens
# 创建MiniGPT
mini_gpt = MiniGPT(
vocab_size=5000,
d_model=128,
num_heads=4,
num_layers=4,
d_ff=256,
)
total_params = sum(p.numel() for p in mini_gpt.parameters())
print(f"MiniGPT参数量: {total_params:,}")
print(f"模型结构: {mini_gpt}")
8. 实战:古诗生成器¶
"""
实战项目:基于LSTM的古诗生成器
"""
class PoemGenerator:
"""古诗生成器"""
def __init__(self):
self.poems = [
"白日依山尽,黄河入海流。欲穷千里目,更上一层楼。",
"春眠不觉晓,处处闻啼鸟。夜来风雨声,花落知多少。",
"床前明月光,疑是地上霜。举头望明月,低头思故乡。",
"锄禾日当午,汗滴禾下土。谁知盘中餐,粒粒皆辛苦。",
"红豆生南国,春来发几枝。愿君多采撷,此物最相思。",
]
self._build_vocab()
def _build_vocab(self):
self.chars = set()
for poem in self.poems:
self.chars.update(poem)
self.char2idx = {'<PAD>': 0, '<BOS>': 1, '<EOS>': 2}
for c in sorted(self.chars):
self.char2idx[c] = len(self.char2idx)
self.idx2char = {i: c for c, i in self.char2idx.items()}
self.vocab_size = len(self.char2idx)
def generate_acrostic(self, head_chars):
"""藏头诗生成(简易规则版)"""
result = []
for i, char in enumerate(head_chars): # enumerate同时获取索引和元素
line = char
# 从训练诗中随机选择后续字符
import random
poem_chars = list(self.chars - {',', '。', '?', '!'})
for _ in range(4):
line += random.choice(poem_chars)
if i % 2 == 0:
line += ','
else:
line += '。'
result.append(line)
return '\n'.join(result)
# 使用
pg = PoemGenerator()
print("藏头诗(AI生成):")
print(pg.generate_acrostic("春夏秋冬"))
9. 关键复盘点¶
🔑 学习复盘重点
考点 1 : Beam Search 的原理和优缺点¶
✅ 标准答案要点:
原理:
- 每步保留beam_width个最优候选路径
- 每步扩展所有候选,保留top-beam_width个
- 最终选择得分(长度归一化后)最高的路径
优点:
- 比贪心搜索质量更高
- 能探索多条路径
缺点:
- 生成文本缺乏多样性、趋于通用
- 不适合开放域对话
- 计算量随beam_width线性增加
考点 2 : Top-K 和 Top-P 采样的区别¶
✅ 标准答案要点:
Top-K:
- 固定从概率最高的K个token中采样
- 问题:K固定不灵活,概率分布尖锐时K太大,平坦时K太小
Top-P (Nucleus):
- 从累积概率达到p的最小集合中采样
- 自适应:概率集中时候选少,分散时候选多
- 在开放式生成里很常见,但具体是否启用以及 `top_p` 取值取决于系统目标
实际使用:通常Top-P + Temperature组合
问题 3 :如何解决文本生成中的重复问题¶
✅ 标准答案要点:
1. 重复惩罚(Repetition Penalty)
2. N-gram blocking(禁止重复n-gram)
3. Temperature调高增加多样性
4. Top-K/Top-P采样
5. 对比搜索(Contrastive Search)
6. 频率惩罚 + 存在惩罚(部分推理服务会提供)
10. 练习题¶
📝 基础题¶
- 解释自回归和非自回归生成的区别及各自的优缺点。
答案:自回归( AR ):逐 token 依次生成,每步以之前所有 token 为条件。优点——生成质量高,能建模 token 间依赖;缺点——串行解码速度慢。非自回归( NAR ):一次性并行生成所有 token 。优点——解码速度快(可快 10-100 倍);缺点——无法建模 token 间依赖,易出现重复或不连贯,质量通常低于 AR 。改进方法包括迭代式 NAR (多次 refinement )和半自回归(分组并行)。
- 比较贪心搜索、 Beam Search 和 Top-P 采样。
答案:贪心搜索:每步选概率最高的 token ,速度最快但容易重复、单调。Beam Search:维护 beam_size 个候选序列,每步扩展保留 Top-K ,比贪心更全局但倾向生成安全无趣的文本,多样性差。Top-P ( nucleus )采样:从累积概率达到 P 的最小 token 集合中随机采样,生成多样自然但可能不连贯。适用场景: Beam Search 适合翻译、摘要等忠实性任务; Top-P 适合对话、创意写作;贪心适合简单/确定性任务。
💻 编程题¶
- 实现一个完整的字符级 LSTM 古诗生成器。
- 实现 Beam Search 解码算法。
- 实现一个简化版的 GPT 模型并训练。
🔬 思考题¶
- 为什么大语言模型生成时通常使用 Top-P 而不是 Beam Search ?
答案:在很多面向用户的开放式生成系统里, Top-P 比 Beam Search 更常见,但这不是绝对规则。常见原因包括:①多样性需求:对话、写作等任务需要多样化回复,而 Beam Search 往往更容易收敛到高频、保守的表达;②开放式生成无唯一标准答案: Beam Search 更贴近“最大化序列概率”的目标,这在翻译等任务里更合适;③计算效率: Beam Search 需维护多候选,对长序列开销更大;④偏好对齐:在不少人工偏好研究和产品实践里,适度随机性更容易产生自然回复;⑤灵活可控: Top-P 可配合 temperature 调节确定性与创造性。若任务更强调忠实性、稳定性或可验证性, Beam Search 仍可能是更合适的选择。
✅ 自我检查清单¶
□ 我理解自回归生成的原理
□ 我能实现Seq2Seq模型
□ 我理解Attention机制在生成中的作用
□ 我能实现Top-K和Top-P采样
□ 我理解GPT的架构和生成过程
□ 我完成了古诗生成器实战
□ 我完成了至少3道练习题
📚 延伸阅读¶
- Attention Is All You Need
- GPT-2 论文: Language Models are Unsupervised Multitask Learners
- The Curious Case of Neural Text Degeneration — Top-P 采样原论文
- Hugging Face 文本生成教程
下一篇:07-机器翻译 — 从统计翻译到神经机器翻译
⚠️ 核验说明(2026-04-03):已再次人工复核字符级 LSTM 与解码策略示例,并把解码策略比较补上“任务目标不同、默认策略会变”的前提,避免把产品经验写成固定结论。
最后更新日期: 2026-04-03