大模型应用与产品化¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
目录¶
模型部署架构¶
1.1 部署模式对比¶
Text Only
大模型部署模式
═══════════════════════════════════════════════════════════════════
1. 本地部署 (On-Premise)
├── 适用:数据隐私要求高、网络受限
├── 硬件:A100/H100 GPU服务器
├── 软件:vLLM, TensorRT-LLM, llama.cpp
└── 成本:高(硬件采购+运维)
2. 云端API (Cloud API)
├── 适用:快速启动、弹性需求
├── 提供商:OpenAI, Anthropic, Google, 阿里云
├── 模式:按token计费
└── 成本:按使用量,无前期投入
3. 混合部署 (Hybrid)
├── 适用:敏感数据本地处理,通用任务用API
├── 架构:小模型本地 + 大模型云端
└── 成本:平衡灵活性和隐私
4. 边缘部署 (Edge)
├── 适用:低延迟、离线场景
├── 硬件:手机、嵌入式设备
├── 技术:模型量化、蒸馏、MobileLLM
└── 成本:设备成本,无云端费用
═══════════════════════════════════════════════════════════════════
1.2 分布式推理架构¶
Python
# 分布式推理架构示例
class DistributedInference:
"""
分布式大模型推理系统
"""
def __init__(self, model_path, world_size):
self.world_size = world_size
self.model = self._load_distributed_model(model_path)
def _load_distributed_model(self, model_path):
"""
加载分布式模型(张量并行 + 流水线并行)
"""
# 张量并行:将单层参数分割到多个GPU
# 流水线并行:将不同层分配到不同GPU
from accelerate import init_empty_weights, load_checkpoint_and_dispatch
with init_empty_weights():
model = AutoModelForCausalLM.from_config(config)
# 自动分配层到设备
model = load_checkpoint_and_dispatch(
model,
model_path,
device_map="auto", # 自动决定层分配
no_split_module_classes=["LlamaDecoderLayer"]
)
return model
def tensor_parallel_forward(self, input_ids):
"""
张量并行前向传播
示例:8个GPU,每层attention分成8份
"""
# 每个GPU计算部分注意力头
# All-Reduce聚合结果
pass
def pipeline_parallel_forward(self, input_ids):
"""
流水线并行前向传播
示例:4个GPU,每个负责6层(共24层)
"""
# GPU0处理层0-5,传递给GPU1
# GPU1处理层6-11,传递给GPU2
# ...
pass
class ModelShardingStrategy:
"""
模型分片策略选择
"""
@staticmethod # @staticmethod无需实例即可调用
def auto_device_map(model_config, gpu_memory):
"""
自动生成最优设备映射
"""
# 考虑因素:
# 1. 每GPU显存限制
# 2. 层间通信开销
# 3. 负载均衡
device_map = {}
current_device = 0
current_memory = 0
for layer_name, layer_size in model_config.layers.items():
if current_memory + layer_size > gpu_memory:
current_device += 1
current_memory = 0
device_map[layer_name] = current_device
current_memory += layer_size
return device_map
1.3 服务架构设计¶
Text Only
LLM服务架构
═══════════════════════════════════════════════════════════════════
┌─────────────────────────────────────────────────────────────────┐
│ 负载均衡器 │
│ (Nginx / AWS ALB) │
└─────────────────────────────────────────────────────────────────┘
│
┌─────────────────────┼─────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 推理实例 1 │ │ 推理实例 2 │ │ 推理实例 N │
│ (vLLM) │ │ (vLLM) │ │ (vLLM) │
│ GPU: A100 │ │ GPU: A100 │ │ GPU: A100 │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
└─────────────────────┼─────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ 缓存层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 提示缓存 │ │ KV缓存 │ │ 结果缓存 │ │
│ │ (Redis) │ │ (GPU内存) │ │ (Redis) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 队列系统 │
│ (Redis / RabbitMQ) │
│ - 请求排队 │
│ - 优先级调度 │
│ - 流式响应 │
└─────────────────────────────────────────────────────────────────┘
═══════════════════════════════════════════════════════════════════
📌 交叉引用:推理优化的系统性讲解(含 KV-Cache 原理、批处理优化、编译优化及面试要点)请参考 LLM 应用/12-推理优化.md,本节侧重推理优化在产品化部署中的实践。
推理优化技术¶
2.1 量化 (Quantization)¶
Python
class QuantizationTechniques:
"""
模型量化技术实现
"""
@staticmethod
def int8_quantization(model_path: str):
"""
INT8量化:将FP16/FP32权重压缩到8位
减少50%内存,轻微精度损失
Args:
model_path: HuggingFace模型名称或本地路径
"""
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0,
llm_int8_has_fp16_weight=False
)
model = AutoModelForCausalLM.from_pretrained(
model_path,
quantization_config=quantization_config,
device_map="auto"
)
return model
@staticmethod
def int4_quantization(model_path: str):
"""
INT4量化:更激进的压缩
减少75%内存,适合消费级GPU
Args:
model_path: HuggingFace模型名称或本地路径
"""
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True, # 嵌套量化
bnb_4bit_quant_type="nf4" # Normal Float 4
)
model = AutoModelForCausalLM.from_pretrained(
model_path,
quantization_config=quantization_config,
device_map="auto"
)
return model
@staticmethod
def awq_quantization(model_path: str):
"""
AWQ (Activation-aware Weight Quantization)
保护对激活影响大的权重,精度更好
Args:
model_path: HuggingFace模型名称或本地路径
"""
from awq import AutoAWQForCausalLM
model = AutoAWQForCausalLM.from_quantized(
model_path,
quant_config={"zero_point": True, "q_group_size": 128}
)
return model
@staticmethod
def gptq_quantization(model_path: str, calibration_data):
"""
GPTQ:逐层量化,使用Hessian矩阵信息
需要校准数据,但精度损失最小
Args:
model_path: HuggingFace模型名称或本地路径
calibration_data: 用于GPTQ量化的校准数据
"""
from auto_gptq import AutoGPTQForCausalLM
model = AutoGPTQForCausalLM.from_pretrained(
model_path,
quantize_config={
'bits': 4,
'group_size': 128,
'desc_act': True # 激活重排
}
)
# 量化需要校准数据
model.quantize(calibration_data)
return model
# 量化效果对比
"""
量化方法对比
═══════════════════════════════════════════════════════════════════
方法 精度 内存节省 速度提升 适用场景
─────────────────────────────────────────────────────────────────
FP16 100% 基准 基准 训练、高精度推理
INT8 ~99% 50% 10-20% 通用推理
INT4 (bnb) ~95% 75% 20-30% 消费级GPU
AWQ ~97% 75% 20-30% 精度敏感场景
GPTQ ~96% 75% 20-30% 离线量化部署
GGUF ~95% 75% - CPU推理
═══════════════════════════════════════════════════════════════════
"""
2.2 推理引擎对比¶
Text Only
推理引擎对比
═══════════════════════════════════════════════════════════════════
vLLM
├── 核心技术:PagedAttention(分页注意力)
├── 特点:高吞吐、低延迟、连续批处理
├── 支持:HuggingFace模型、OpenAI兼容API
├── 适用:生产环境、高并发服务
└── 性能:比HuggingFace Transformers高10-20倍
TensorRT-LLM (NVIDIA)
├── 核心技术:图优化、内核融合、量化
├── 特点:极致性能、NVIDIA GPU优化
├── 支持:FP8/INT8/INT4量化
├── 适用:NVIDIA数据中心GPU
└── 性能:业界领先,但构建复杂
llama.cpp
├── 核心技术:GGUF格式、CPU优化、量化
├── 特点:跨平台、无GPU也能跑
├── 支持:ARM/x86、Mac/Windows/Linux
├── 适用:边缘设备、本地部署
└── 性能:单CPU线程慢,但支持多线程
TGI (HuggingFace)
├── 核心技术:Rust实现、Safetensors
├── 特点:生产就绪、多GPU支持
├── 支持:FlashAttention、PagedAttention
├── 适用:企业部署
└── 性能:接近vLLM
DeepSpeed-Inference
├── 核心技术:ZeRO、内核注入
├── 特点:超大模型支持、张量并行
├── 支持:多GPU、多节点
├── 适用:研究、超大模型
└── 性能:扩展性好
═══════════════════════════════════════════════════════════════════
2.3 批处理与调度优化¶
Python
class ContinuousBatching:
"""
连续批处理:动态组合不同请求的生成步骤
"""
def __init__(self, model, max_batch_size=16):
self.model = model
self.max_batch_size = max_batch_size
self.waiting_queue = []
self.running_batch = []
def schedule(self):
"""
调度策略:
1. 尽可能填满batch
2. 优先处理快完成的序列(释放slot)
3. 考虑优先级和超时
"""
# 尝试将等待中的请求加入运行batch
while (len(self.running_batch) < self.max_batch_size and
self.waiting_queue):
request = self.waiting_queue.pop(0)
# 检查是否能加入(KV缓存空间)
if self._can_fit(request):
self.running_batch.append(request)
# 执行一步生成
if self.running_batch:
self._step()
def _step(self):
"""
执行一步生成,所有序列并行处理
"""
# 准备输入:不同长度的序列需要padding
input_ids = []
position_ids = []
for request in self.running_batch:
input_ids.append(request.get_next_input())
position_ids.append(request.current_position)
# 批处理前向传播
outputs = self.model.batch_forward(
input_ids,
position_ids,
attention_mask=self._create_attention_mask()
)
# 更新每个请求的状态
for i, request in enumerate(self.running_batch): # enumerate同时获取索引和元素
request.update(outputs[i])
# 检查是否完成
if request.is_finished():
self._finish_request(request)
self.running_batch.pop(i)
class SpeculativeDecoding:
"""
投机解码:用小模型草稿+大模型验证,加速生成
"""
def __init__(self, draft_model, target_model, gamma=5):
self.draft_model = draft_model # 小模型(如7B)
self.target_model = target_model # 大模型(如70B)
self.gamma = gamma # 每次草稿生成token数
def generate(self, prompt, max_tokens):
"""
投机解码流程:
1. 小模型快速生成gamma个token(草稿)
2. 大模型并行验证所有草稿token
3. 接受匹配的token,从第一个不匹配处重新生成
"""
tokens = prompt.copy()
while len(tokens) < max_tokens:
# 步骤1:草稿模型生成
draft_tokens = self._draft_generate(tokens, self.gamma)
# 步骤2:目标模型并行验证
all_tokens = tokens + draft_tokens
logits = self.target_model.get_logits(all_tokens)
# 步骤3:接受/拒绝
accepted = 0
for i, draft_token in enumerate(draft_tokens):
# 计算接受概率
draft_prob = self._get_token_prob(
self.draft_model,
tokens + draft_tokens[:i],
draft_token
)
target_prob = self._get_token_prob(
self.target_model,
tokens + draft_tokens[:i],
draft_token
)
# 接受概率 = min(1, target_prob / draft_prob)
accept_prob = min(1.0, target_prob / draft_prob)
if random.random() < accept_prob:
tokens.append(draft_token)
accepted += 1
else:
# 从目标分布采样新token
new_token = self._sample_from_target(
logits[len(tokens) + i]
)
tokens.append(new_token)
break
if accepted == len(draft_tokens):
# 全部接受,额外采样一个
tokens.append(self._sample_from_target(logits[-1])) # [-1]负索引取最后一个元素
return tokens
def _draft_generate(self, tokens, num_tokens):
"""草稿模型快速生成"""
draft = []
current = tokens.copy()
for _ in range(num_tokens):
next_token = self.draft_model.generate_next(current)
draft.append(next_token)
current.append(next_token)
return draft
模型服务化¶
3.1 API 服务封装¶
Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional, AsyncGenerator
import asyncio # Python标准异步库
app = FastAPI(title="LLM Inference Service")
class ChatCompletionRequest(BaseModel): # Pydantic BaseModel:自动数据验证和序列化
model: str
messages: List[dict]
temperature: Optional[float] = 0.7 # Optional表示值可以为None
max_tokens: Optional[int] = 512
stream: Optional[bool] = False
top_p: Optional[float] = 1.0
class ChatCompletionResponse(BaseModel):
id: str
object: str
created: int
model: str
choices: List[dict]
usage: dict
# 全局模型实例
model = None
tokenizer = None
@app.on_event("startup")
async def load_model(): # async def定义协程函数
"""启动时加载模型"""
global model, tokenizer
from vllm import LLM, SamplingParams
model = LLM(
model="meta-llama/Llama-2-7b-chat-hf",
tensor_parallel_size=1,
gpu_memory_utilization=0.9
)
print("Model loaded successfully")
@app.post("/v1/chat/completions")
async def chat_completion(request: ChatCompletionRequest):
"""
OpenAI兼容的Chat Completion API
"""
try: # try/except捕获异常,防止程序崩溃
# 格式化消息为prompt
prompt = format_messages(request.messages)
# 采样参数
sampling_params = SamplingParams(
temperature=request.temperature,
top_p=request.top_p,
max_tokens=request.max_tokens
)
if request.stream:
# 流式响应
return StreamingResponse(
stream_generate(prompt, sampling_params),
media_type="text/event-stream"
)
else:
# 非流式响应
outputs = model.generate(prompt, sampling_params)
response = ChatCompletionResponse(
id=f"chatcmpl-{uuid.uuid4()}",
object="chat.completion",
created=int(time.time()),
model=request.model,
choices=[{
"index": 0,
"message": {
"role": "assistant",
"content": outputs[0].outputs[0].text
},
"finish_reason": "stop"
}],
usage={
"prompt_tokens": len(outputs[0].prompt_token_ids),
"completion_tokens": len(outputs[0].outputs[0].token_ids),
"total_tokens": len(outputs[0].prompt_token_ids) +
len(outputs[0].outputs[0].token_ids)
}
)
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def stream_generate(prompt: str, sampling_params) -> AsyncGenerator[str, None]:
"""
流式生成,模拟OpenAI的SSE格式
"""
# vLLM支持流式生成
stream = model.generate(prompt, sampling_params, stream=True)
for output in stream:
delta_text = output.outputs[0].text
chunk = {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "llama-2-7b",
"choices": [{
"index": 0,
"delta": {"content": delta_text},
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk)}\n\n" # yield产出值,函数变为生成器
yield "data: [DONE]\n\n"
def format_messages(messages: List[dict]) -> str:
"""
将OpenAI格式的消息转换为模型prompt
"""
formatted = ""
for msg in messages:
role = msg['role']
content = msg['content']
if role == 'system':
formatted += f"[INST] <<SYS>>\n{content}\n<</SYS>>\n\n"
elif role == 'user':
formatted += f"{content} [/INST]"
elif role == 'assistant':
formatted += f" {content} </s><s>[INST] "
return formatted
3.2 缓存策略¶
Python
class LLMCache:
"""
多级缓存系统
"""
def __init__(self, redis_host='localhost'):
# L1: GPU内存中的KV缓存(由推理引擎管理)
# L2: Redis缓存(提示模板、常见查询)
import redis
self.redis = redis.Redis(host=redis_host, port=6379, db=0)
# L3: 本地内存缓存(热点数据)
from functools import lru_cache
self.local_cache = {}
def get_cached_response(self, prompt_hash: str) -> Optional[str]: # Optional表示值可以为None
"""
获取缓存的响应
"""
# 先查本地缓存
if prompt_hash in self.local_cache:
return self.local_cache[prompt_hash]
# 再查Redis
cached = self.redis.get(f"llm:response:{prompt_hash}")
if cached:
response = cached.decode('utf-8')
# 回填本地缓存
self.local_cache[prompt_hash] = response
return response
return None
def cache_response(self, prompt_hash: str, response: str, ttl=3600):
"""
缓存响应
"""
# 写入Redis
self.redis.setex(
f"llm:response:{prompt_hash}",
ttl,
response
)
# 更新本地缓存
self.local_cache[prompt_hash] = response
def semantic_cache(self, prompt: str, embedding_model) -> Optional[str]:
"""
语义缓存:基于嵌入相似度查找相似查询
"""
# 计算当前prompt的嵌入
current_embedding = embedding_model.encode(prompt)
# 在向量数据库中搜索相似prompt
similar_prompts = self.vector_db.search(
current_embedding,
top_k=1,
threshold=0.95 # 相似度阈值
)
if similar_prompts:
# 返回相似prompt的缓存结果
return similar_prompts[0]['response']
return None
class PrefixCache:
"""
前缀缓存:复用共享前缀的KV缓存
应用场景:
- 多轮对话的系统prompt
- RAG的固定上下文
- Few-shot示例
"""
def __init__(self):
self.prefix_cache = {} # prefix_hash -> KV cache
def get_prefix_cache(self, prefix_tokens: List[int]):
"""
获取前缀的KV缓存
"""
prefix_hash = hash(tuple(prefix_tokens))
return self.prefix_cache.get(prefix_hash)
def store_prefix_cache(self, prefix_tokens: List[int], kv_cache):
"""
存储前缀的KV缓存
"""
prefix_hash = hash(tuple(prefix_tokens))
self.prefix_cache[prefix_hash] = kv_cache
应用开发模式¶
4.1 RAG 应用架构¶
Python
class RAGApplication:
"""
完整的RAG应用实现
"""
def __init__(self):
# 文档处理
self.document_processor = DocumentProcessor()
# 向量数据库
self.vector_store = ChromaVectorStore()
# 重排序器
self.reranker = CrossEncoderReranker()
# LLM
self.llm = LLMClient()
def ingest_documents(self, documents: List[str]):
"""
文档入库流程
"""
for doc in documents:
# 1. 分块
chunks = self.document_processor.chunk(doc)
# 2. 生成嵌入
embeddings = self.document_processor.embed(chunks)
# 3. 存入向量库
self.vector_store.add(chunks, embeddings)
def query(self, question: str) -> dict:
"""
RAG查询流程
"""
# 1. 查询重写(可选)
rewritten_query = self._rewrite_query(question)
# 2. 检索
query_embedding = self.document_processor.embed([rewritten_query])[0]
retrieved_docs = self.vector_store.search(query_embedding, top_k=20) # re.search正则表达式搜索匹配
# 3. 重排序
reranked_docs = self.reranker.rerank(question, retrieved_docs, top_k=5)
# 4. 构建prompt
context = "\n".join([doc['content'] for doc in reranked_docs])
prompt = f"""基于以下上下文回答问题:
{context}
问题:{question}
回答:"""
# 5. 生成回答
answer = self.llm.generate(prompt)
return {
'answer': answer,
'sources': reranked_docs,
'context': context
}
class AgentApplication:
"""
Agent应用架构
"""
def __init__(self):
self.llm = LLMClient()
self.tools = {
'search': SearchTool(),
'calculator': CalculatorTool(),
'code_executor': CodeExecutorTool()
}
self.memory = ConversationMemory()
def run(self, user_input: str) -> str:
"""
Agent执行循环
"""
self.memory.add_user_message(user_input)
max_iterations = 10
for _ in range(max_iterations):
# 规划下一步
action = self._plan_next_action()
if action['type'] == 'respond':
# 直接回答
response = self.llm.generate(
self.memory.get_context()
)
self.memory.add_assistant_message(response)
return response
elif action['type'] == 'tool_use':
# 使用工具
tool_name = action['tool']
tool_input = action['input']
# 执行工具
observation = self.tools[tool_name].execute(tool_input)
# 记录观察结果
self.memory.add_observation(tool_name, observation)
def _plan_next_action(self) -> dict:
"""
决定下一步行动
"""
prompt = f"""基于对话历史,决定下一步:
{self.memory.get_context()}
可用工具:{list(self.tools.keys())}
决定:
- 如果需要使用工具,返回 {{"type": "tool_use", "tool": "工具名", "input": "输入"}}
- 如果可以回答,返回 {{"type": "respond"}}
"""
response = self.llm.generate(prompt)
return json.loads(response) # json.loads将JSON字符串→Python对象
4.2 前端集成模式¶
JavaScript
// React组件:流式聊天界面
import React, { useState, useEffect, useRef } from 'react';
function ChatInterface() {
const [messages, setMessages] = useState([]);
const [input, setInput] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const eventSourceRef = useRef(null);
const sendMessage = async () => {
if (!input.trim()) return;
const userMessage = { role: 'user', content: input };
setMessages(prev => [...prev, userMessage]);
setInput('');
setIsStreaming(true);
// 创建EventSource连接
const response = await fetch('/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
model: 'llama-2-7b',
messages: [...messages, userMessage],
stream: true
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let assistantMessage = { role: 'assistant', content: '' };
setMessages(prev => [...prev, assistantMessage]);
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
setIsStreaming(false);
return;
}
try {
const parsed = JSON.parse(data);
const content = parsed.choices[0].delta.content || '';
assistantMessage.content += content;
setMessages(prev => [
...prev.slice(0, -1),
{ ...assistantMessage }
]);
} catch (e) {
console.error('Parse error:', e);
}
}
}
}
};
return (
<div className="chat-container">
<div className="messages">
{messages.map((msg, idx) => (
<div key={idx} className={`message ${msg.role}`}>
{msg.content}
</div>
))}
{isStreaming && <div className="typing-indicator">...</div>}
</div>
<div className="input-area">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && sendMessage()}
placeholder="输入消息..."
disabled={isStreaming}
/>
<button onClick={sendMessage} disabled={isStreaming}>
发送
</button>
</div>
</div>
);
}
产品化实践¶
5.1 监控与可观测性¶
Python
class LLMMonitoring:
"""
LLM应用监控
"""
def __init__(self):
self.metrics = {
'request_count': 0,
'latency_p50': [],
'latency_p99': [],
'token_throughput': 0,
'error_rate': 0
}
def log_request(self, request_data, response_data, latency):
"""
记录请求指标
"""
# 延迟
self.metrics['latency_p50'].append(latency)
# Token统计
input_tokens = response_data['usage']['prompt_tokens']
output_tokens = response_data['usage']['completion_tokens']
# 成本估算
cost = self._estimate_cost(input_tokens, output_tokens)
# 发送到监控系统(如Prometheus)
self._send_to_prometheus({
'latency': latency,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'cost': cost
})
def log_quality(self, prompt, response, user_feedback):
"""
记录质量指标
"""
# 用户满意度
satisfaction = user_feedback.get('rating', 0)
# 自动质量评估
perplexity = self._compute_perplexity(response)
diversity = self._compute_diversity(response)
# 安全检测
safety_score = self._safety_check(response)
self._send_to_logging({
'prompt': prompt,
'response': response,
'satisfaction': satisfaction,
'perplexity': perplexity,
'safety_score': safety_score
})
class LLMTracing:
"""
LLM调用链路追踪
"""
def __init__(self):
from opentelemetry import trace
self.tracer = trace.get_tracer(__name__)
def trace_rag_pipeline(self, query):
"""
追踪RAG完整链路
"""
with self.tracer.start_as_current_span("rag_query") as span:
span.set_attribute("query", query)
# 检索阶段
with self.tracer.start_span("retrieval") as retrieval_span:
docs = self.vector_store.search(query)
retrieval_span.set_attribute("num_docs", len(docs))
retrieval_span.set_attribute("retrieval_latency", 0.1)
# 重排序阶段
with self.tracer.start_span("reranking") as rerank_span:
reranked = self.reranker.rerank(query, docs)
rerank_span.set_attribute("rerank_latency", 0.05)
# 生成阶段
with self.tracer.start_span("generation") as gen_span:
response = self.llm.generate(query, reranked)
gen_span.set_attribute("output_tokens", len(response))
gen_span.set_attribute("generation_latency", 2.0)
return response
5.2 A/B 测试与实验¶
Python
class LLMExperiment:
"""
LLM实验框架
"""
def __init__(self):
self.variants = {
'control': ModelVariant('gpt-4o-mini'),
'treatment': ModelVariant('gpt-4o')
}
self.traffic_split = 0.5 # 50/50分流
def route_request(self, user_id: str, request: dict) -> str:
"""
根据用户ID决定使用哪个变体(保证一致性)
"""
# 哈希用户ID决定分组
bucket = hash(user_id) % 100
if bucket < self.traffic_split * 100:
variant = 'control'
else:
variant = 'treatment'
# 记录实验数据
self._log_exposure(user_id, variant)
return self.variants[variant].process(request)
def evaluate(self, metric_fn):
"""
评估实验结果
"""
results = {}
for variant_name, variant in self.variants.items():
metrics = variant.get_metrics()
results[variant_name] = {
'sample_size': metrics['count'],
'mean_latency': metrics['latency_mean'],
'user_satisfaction': metrics['satisfaction_mean'],
'cost_per_request': metrics['cost_mean']
}
# 统计显著性检验
from scipy import stats
control = self.variants['control'].get_metric_values('satisfaction')
treatment = self.variants['treatment'].get_metric_values('satisfaction')
t_stat, p_value = stats.ttest_ind(control, treatment)
return {
'results': results,
'statistical_significance': p_value < 0.05,
'p_value': p_value
}
成本与性能权衡¶
6.1 成本模型¶
Text Only
LLM成本分析
═══════════════════════════════════════════════════════════════════
云端API成本(每1M tokens,2025年参考价格)
─────────────────────────────────────────────────────────────────
模型 输入价格 输出价格 备注
GPT-4o $2.5 $10 高质量多模态
GPT-4o-mini $0.15 $0.6 性价比之选(替代gpt-3.5-turbo)
o3-mini $1.1 $4.4 推理模型
Claude Sonnet 4 $3 $15 最强推理
Claude 3.5 Haiku $0.8 $4 快速响应
Gemini 2.0 Flash $0.1 $0.4 超高性价比
自建成本(月度估算)
─────────────────────────────────────────────────────────────────
配置 硬件成本 运营成本 适用场景
1x A100 80GB $2,000 $500 开发测试
8x A100 80GB $16,000 $2,000 小规模生产
8x H100 80GB $32,000 $4,000 大规模服务(H100已替代旧版A100集群)
成本优化策略:
1. 缓存:减少重复请求的API调用
2. 模型路由:简单任务用小模型
3. 批量处理:提高吞吐降低单位成本
4. 量化:降低推理硬件要求
═══════════════════════════════════════════════════════════════════
6.2 性能优化决策树¶
Text Only
性能优化决策
═══════════════════════════════════════════════════════════════════
延迟太高?
│
├── 首token延迟高(TTFT)
│ ├── 模型太大加载慢
│ │ └──▶ 模型量化 / 使用更小模型
│ └── 批处理不足
│ └──▶ 启用continuous batching
│
├── 生成速度慢(TPOT)
│ ├── 内存带宽瓶颈
│ │ └──▶ 量化 / 投机解码
│ └── 计算瓶颈
│ └──▶ 张量并行 / 使用更快GPU
│
└── 端到端延迟高
├── 网络延迟
│ └──▶ 边缘部署 / CDN
└── 队列延迟
└──▶ 扩容 / 负载均衡优化
吞吐量不足?
│
├── GPU利用率低
│ └──▶ 增大batch size / continuous batching
│
└── GPU已满载
└──▶ 水平扩容 / 模型并行
═══════════════════════════════════════════════════════════════════
下一步¶
完成本章节学习后,你已经系统掌握了大模型从原理到应用的全链路知识。
建议的实践方向: 1. 动手部署:使用 vLLM 部署一个开源模型 2. 构建应用:开发一个 RAG 或 Agent 应用 3. 性能优化:尝试量化和推理优化技术 4. 持续学习:关注最新论文和开源项目
推荐资源: - vLLM Documentation - HuggingFace Inference API - LangChain Production Guide
恭喜你完成了 LLM 系统学习的全部内容!
最后更新日期: 2026-02-12 适用版本: LLM 学习教程 v2026