跳转至

大模型应用与产品化

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

目录

  1. 模型部署架构
  2. 推理优化技术
  3. 模型服务化
  4. 应用开发模式
  5. 产品化实践
  6. 成本与性能权衡

模型部署架构

1.1 部署模式对比

Text Only
大模型部署模式
═══════════════════════════════════════════════════════════════════

1. 本地部署 (On-Premise)
├── 适用:数据隐私要求高、网络受限
├── 硬件:A100/H100 GPU服务器
├── 软件:vLLM, TensorRT-LLM, llama.cpp
└── 成本:高(硬件采购+运维)

2. 云端API (Cloud API)
├── 适用:快速启动、弹性需求
├── 提供商:OpenAI, Anthropic, Google, 阿里云
├── 模式:按token计费
└── 成本:按使用量,无前期投入

3. 混合部署 (Hybrid)
├── 适用:敏感数据本地处理,通用任务用API
├── 架构:小模型本地 + 大模型云端
└── 成本:平衡灵活性和隐私

4. 边缘部署 (Edge)
├── 适用:低延迟、离线场景
├── 硬件:手机、嵌入式设备
├── 技术:模型量化、蒸馏、MobileLLM
└── 成本:设备成本,无云端费用

═══════════════════════════════════════════════════════════════════

1.2 分布式推理架构

Python
# 分布式推理架构示例

class DistributedInference:
    """
    分布式大模型推理系统
    """

    def __init__(self, model_path, world_size):
        self.world_size = world_size
        self.model = self._load_distributed_model(model_path)

    def _load_distributed_model(self, model_path):
        """
        加载分布式模型(张量并行 + 流水线并行)
        """
        # 张量并行:将单层参数分割到多个GPU
        # 流水线并行:将不同层分配到不同GPU

        from accelerate import init_empty_weights, load_checkpoint_and_dispatch

        with init_empty_weights():
            model = AutoModelForCausalLM.from_config(config)

        # 自动分配层到设备
        model = load_checkpoint_and_dispatch(
            model,
            model_path,
            device_map="auto",  # 自动决定层分配
            no_split_module_classes=["LlamaDecoderLayer"]
        )

        return model

    def tensor_parallel_forward(self, input_ids):
        """
        张量并行前向传播

        示例:8个GPU,每层attention分成8份
        """
        # 每个GPU计算部分注意力头
        # All-Reduce聚合结果
        pass

    def pipeline_parallel_forward(self, input_ids):
        """
        流水线并行前向传播

        示例:4个GPU,每个负责6层(共24层)
        """
        # GPU0处理层0-5,传递给GPU1
        # GPU1处理层6-11,传递给GPU2
        # ...
        pass

class ModelShardingStrategy:
    """
    模型分片策略选择
    """

    @staticmethod  # @staticmethod无需实例即可调用
    def auto_device_map(model_config, gpu_memory):
        """
        自动生成最优设备映射
        """
        # 考虑因素:
        # 1. 每GPU显存限制
        # 2. 层间通信开销
        # 3. 负载均衡

        device_map = {}
        current_device = 0
        current_memory = 0

        for layer_name, layer_size in model_config.layers.items():
            if current_memory + layer_size > gpu_memory:
                current_device += 1
                current_memory = 0

            device_map[layer_name] = current_device
            current_memory += layer_size

        return device_map

1.3 服务架构设计

Text Only
LLM服务架构
═══════════════════════════════════════════════════════════════════

┌─────────────────────────────────────────────────────────────────┐
│                         负载均衡器                               │
│                     (Nginx / AWS ALB)                           │
└─────────────────────────────────────────────────────────────────┘
        ┌─────────────────────┼─────────────────────┐
        ▼                     ▼                     ▼
┌───────────────┐    ┌───────────────┐    ┌───────────────┐
│  推理实例 1    │    │  推理实例 2    │    │  推理实例 N    │
│  (vLLM)       │    │  (vLLM)       │    │  (vLLM)       │
│  GPU: A100    │    │  GPU: A100    │    │  GPU: A100    │
└───────┬───────┘    └───────┬───────┘    └───────┬───────┘
        │                     │                     │
        └─────────────────────┼─────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                         缓存层                                   │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │
│  │  提示缓存   │  │  KV缓存     │  │  结果缓存   │             │
│  │  (Redis)   │  │  (GPU内存)  │  │  (Redis)   │             │
│  └─────────────┘  └─────────────┘  └─────────────┘             │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                         队列系统                                 │
│                    (Redis / RabbitMQ)                           │
│  - 请求排队                                                    │
│  - 优先级调度                                                  │
│  - 流式响应                                                    │
└─────────────────────────────────────────────────────────────────┘

═══════════════════════════════════════════════════════════════════

📌 交叉引用:推理优化的系统性讲解(含 KV-Cache 原理、批处理优化、编译优化及面试要点)请参考 LLM 应用/12-推理优化.md,本节侧重推理优化在产品化部署中的实践。

推理优化技术

2.1 量化 (Quantization)

Python
class QuantizationTechniques:
    """
    模型量化技术实现
    """

    @staticmethod
    def int8_quantization(model_path: str):
        """
        INT8量化:将FP16/FP32权重压缩到8位
        减少50%内存,轻微精度损失

        Args:
            model_path: HuggingFace模型名称或本地路径
        """
        from transformers import BitsAndBytesConfig

        quantization_config = BitsAndBytesConfig(
            load_in_8bit=True,
            llm_int8_threshold=6.0,
            llm_int8_has_fp16_weight=False
        )

        model = AutoModelForCausalLM.from_pretrained(
            model_path,
            quantization_config=quantization_config,
            device_map="auto"
        )

        return model

    @staticmethod
    def int4_quantization(model_path: str):
        """
        INT4量化:更激进的压缩
        减少75%内存,适合消费级GPU

        Args:
            model_path: HuggingFace模型名称或本地路径
        """
        quantization_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_compute_dtype=torch.float16,
            bnb_4bit_use_double_quant=True,  # 嵌套量化
            bnb_4bit_quant_type="nf4"  # Normal Float 4
        )

        model = AutoModelForCausalLM.from_pretrained(
            model_path,
            quantization_config=quantization_config,
            device_map="auto"
        )

        return model

    @staticmethod
    def awq_quantization(model_path: str):
        """
        AWQ (Activation-aware Weight Quantization)
        保护对激活影响大的权重,精度更好

        Args:
            model_path: HuggingFace模型名称或本地路径
        """
        from awq import AutoAWQForCausalLM

        model = AutoAWQForCausalLM.from_quantized(
            model_path,
            quant_config={"zero_point": True, "q_group_size": 128}
        )

        return model

    @staticmethod
    def gptq_quantization(model_path: str, calibration_data):
        """
        GPTQ:逐层量化,使用Hessian矩阵信息
        需要校准数据,但精度损失最小

        Args:
            model_path: HuggingFace模型名称或本地路径
            calibration_data: 用于GPTQ量化的校准数据
        """
        from auto_gptq import AutoGPTQForCausalLM

        model = AutoGPTQForCausalLM.from_pretrained(
            model_path,
            quantize_config={
                'bits': 4,
                'group_size': 128,
                'desc_act': True  # 激活重排
            }
        )

        # 量化需要校准数据
        model.quantize(calibration_data)

        return model

# 量化效果对比
"""
量化方法对比
═══════════════════════════════════════════════════════════════════

方法        精度    内存节省    速度提升    适用场景
─────────────────────────────────────────────────────────────────
FP16        100%    基准       基准       训练、高精度推理
INT8        ~99%    50%        10-20%     通用推理
INT4 (bnb)  ~95%    75%        20-30%     消费级GPU
AWQ         ~97%    75%        20-30%     精度敏感场景
GPTQ        ~96%    75%        20-30%     离线量化部署
GGUF        ~95%    75%        -          CPU推理

═══════════════════════════════════════════════════════════════════
"""

2.2 推理引擎对比

Text Only
推理引擎对比
═══════════════════════════════════════════════════════════════════

vLLM
├── 核心技术:PagedAttention(分页注意力)
├── 特点:高吞吐、低延迟、连续批处理
├── 支持:HuggingFace模型、OpenAI兼容API
├── 适用:生产环境、高并发服务
└── 性能:比HuggingFace Transformers高10-20倍

TensorRT-LLM (NVIDIA)
├── 核心技术:图优化、内核融合、量化
├── 特点:极致性能、NVIDIA GPU优化
├── 支持:FP8/INT8/INT4量化
├── 适用:NVIDIA数据中心GPU
└── 性能:业界领先,但构建复杂

llama.cpp
├── 核心技术:GGUF格式、CPU优化、量化
├── 特点:跨平台、无GPU也能跑
├── 支持:ARM/x86、Mac/Windows/Linux
├── 适用:边缘设备、本地部署
└── 性能:单CPU线程慢,但支持多线程

TGI (HuggingFace)
├── 核心技术:Rust实现、Safetensors
├── 特点:生产就绪、多GPU支持
├── 支持:FlashAttention、PagedAttention
├── 适用:企业部署
└── 性能:接近vLLM

DeepSpeed-Inference
├── 核心技术:ZeRO、内核注入
├── 特点:超大模型支持、张量并行
├── 支持:多GPU、多节点
├── 适用:研究、超大模型
└── 性能:扩展性好

═══════════════════════════════════════════════════════════════════

2.3 批处理与调度优化

Python
class ContinuousBatching:
    """
    连续批处理:动态组合不同请求的生成步骤
    """
    def __init__(self, model, max_batch_size=16):
        self.model = model
        self.max_batch_size = max_batch_size
        self.waiting_queue = []
        self.running_batch = []

    def schedule(self):
        """
        调度策略:
        1. 尽可能填满batch
        2. 优先处理快完成的序列(释放slot)
        3. 考虑优先级和超时
        """
        # 尝试将等待中的请求加入运行batch
        while (len(self.running_batch) < self.max_batch_size and
               self.waiting_queue):

            request = self.waiting_queue.pop(0)

            # 检查是否能加入(KV缓存空间)
            if self._can_fit(request):
                self.running_batch.append(request)

        # 执行一步生成
        if self.running_batch:
            self._step()

    def _step(self):
        """
        执行一步生成,所有序列并行处理
        """
        # 准备输入:不同长度的序列需要padding
        input_ids = []
        position_ids = []

        for request in self.running_batch:
            input_ids.append(request.get_next_input())
            position_ids.append(request.current_position)

        # 批处理前向传播
        outputs = self.model.batch_forward(
            input_ids,
            position_ids,
            attention_mask=self._create_attention_mask()
        )

        # 更新每个请求的状态
        for i, request in enumerate(self.running_batch):  # enumerate同时获取索引和元素
            request.update(outputs[i])

            # 检查是否完成
            if request.is_finished():
                self._finish_request(request)
                self.running_batch.pop(i)

class SpeculativeDecoding:
    """
    投机解码:用小模型草稿+大模型验证,加速生成
    """
    def __init__(self, draft_model, target_model, gamma=5):
        self.draft_model = draft_model  # 小模型(如7B)
        self.target_model = target_model  # 大模型(如70B)
        self.gamma = gamma  # 每次草稿生成token数

    def generate(self, prompt, max_tokens):
        """
        投机解码流程:
        1. 小模型快速生成gamma个token(草稿)
        2. 大模型并行验证所有草稿token
        3. 接受匹配的token,从第一个不匹配处重新生成
        """
        tokens = prompt.copy()

        while len(tokens) < max_tokens:
            # 步骤1:草稿模型生成
            draft_tokens = self._draft_generate(tokens, self.gamma)

            # 步骤2:目标模型并行验证
            all_tokens = tokens + draft_tokens
            logits = self.target_model.get_logits(all_tokens)

            # 步骤3:接受/拒绝
            accepted = 0
            for i, draft_token in enumerate(draft_tokens):
                # 计算接受概率
                draft_prob = self._get_token_prob(
                    self.draft_model,
                    tokens + draft_tokens[:i],
                    draft_token
                )
                target_prob = self._get_token_prob(
                    self.target_model,
                    tokens + draft_tokens[:i],
                    draft_token
                )

                # 接受概率 = min(1, target_prob / draft_prob)
                accept_prob = min(1.0, target_prob / draft_prob)

                if random.random() < accept_prob:
                    tokens.append(draft_token)
                    accepted += 1
                else:
                    # 从目标分布采样新token
                    new_token = self._sample_from_target(
                        logits[len(tokens) + i]
                    )
                    tokens.append(new_token)
                    break

            if accepted == len(draft_tokens):
                # 全部接受,额外采样一个
                tokens.append(self._sample_from_target(logits[-1]))  # [-1]负索引取最后一个元素

        return tokens

    def _draft_generate(self, tokens, num_tokens):
        """草稿模型快速生成"""
        draft = []
        current = tokens.copy()

        for _ in range(num_tokens):
            next_token = self.draft_model.generate_next(current)
            draft.append(next_token)
            current.append(next_token)

        return draft

模型服务化

3.1 API 服务封装

Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional, AsyncGenerator
import asyncio  # Python标准异步库

app = FastAPI(title="LLM Inference Service")

class ChatCompletionRequest(BaseModel):  # Pydantic BaseModel:自动数据验证和序列化
    model: str
    messages: List[dict]
    temperature: Optional[float] = 0.7  # Optional表示值可以为None
    max_tokens: Optional[int] = 512
    stream: Optional[bool] = False
    top_p: Optional[float] = 1.0

class ChatCompletionResponse(BaseModel):
    id: str
    object: str
    created: int
    model: str
    choices: List[dict]
    usage: dict

# 全局模型实例
model = None
tokenizer = None

@app.on_event("startup")
async def load_model():  # async def定义协程函数
    """启动时加载模型"""
    global model, tokenizer

    from vllm import LLM, SamplingParams

    model = LLM(
        model="meta-llama/Llama-2-7b-chat-hf",
        tensor_parallel_size=1,
        gpu_memory_utilization=0.9
    )

    print("Model loaded successfully")

@app.post("/v1/chat/completions")
async def chat_completion(request: ChatCompletionRequest):
    """
    OpenAI兼容的Chat Completion API
    """
    try:  # try/except捕获异常,防止程序崩溃
        # 格式化消息为prompt
        prompt = format_messages(request.messages)

        # 采样参数
        sampling_params = SamplingParams(
            temperature=request.temperature,
            top_p=request.top_p,
            max_tokens=request.max_tokens
        )

        if request.stream:
            # 流式响应
            return StreamingResponse(
                stream_generate(prompt, sampling_params),
                media_type="text/event-stream"
            )
        else:
            # 非流式响应
            outputs = model.generate(prompt, sampling_params)

            response = ChatCompletionResponse(
                id=f"chatcmpl-{uuid.uuid4()}",
                object="chat.completion",
                created=int(time.time()),
                model=request.model,
                choices=[{
                    "index": 0,
                    "message": {
                        "role": "assistant",
                        "content": outputs[0].outputs[0].text
                    },
                    "finish_reason": "stop"
                }],
                usage={
                    "prompt_tokens": len(outputs[0].prompt_token_ids),
                    "completion_tokens": len(outputs[0].outputs[0].token_ids),
                    "total_tokens": len(outputs[0].prompt_token_ids) +
                                   len(outputs[0].outputs[0].token_ids)
                }
            )

            return response

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

async def stream_generate(prompt: str, sampling_params) -> AsyncGenerator[str, None]:
    """
    流式生成,模拟OpenAI的SSE格式
    """
    # vLLM支持流式生成
    stream = model.generate(prompt, sampling_params, stream=True)

    for output in stream:
        delta_text = output.outputs[0].text

        chunk = {
            "id": f"chatcmpl-{uuid.uuid4()}",
            "object": "chat.completion.chunk",
            "created": int(time.time()),
            "model": "llama-2-7b",
            "choices": [{
                "index": 0,
                "delta": {"content": delta_text},
                "finish_reason": None
            }]
        }

        yield f"data: {json.dumps(chunk)}\n\n"  # yield产出值,函数变为生成器

    yield "data: [DONE]\n\n"

def format_messages(messages: List[dict]) -> str:
    """
    将OpenAI格式的消息转换为模型prompt
    """
    formatted = ""
    for msg in messages:
        role = msg['role']
        content = msg['content']

        if role == 'system':
            formatted += f"[INST] <<SYS>>\n{content}\n<</SYS>>\n\n"
        elif role == 'user':
            formatted += f"{content} [/INST]"
        elif role == 'assistant':
            formatted += f" {content} </s><s>[INST] "

    return formatted

3.2 缓存策略

Python
class LLMCache:
    """
    多级缓存系统
    """
    def __init__(self, redis_host='localhost'):
        # L1: GPU内存中的KV缓存(由推理引擎管理)

        # L2: Redis缓存(提示模板、常见查询)
        import redis
        self.redis = redis.Redis(host=redis_host, port=6379, db=0)

        # L3: 本地内存缓存(热点数据)
        from functools import lru_cache
        self.local_cache = {}

    def get_cached_response(self, prompt_hash: str) -> Optional[str]:  # Optional表示值可以为None
        """
        获取缓存的响应
        """
        # 先查本地缓存
        if prompt_hash in self.local_cache:
            return self.local_cache[prompt_hash]

        # 再查Redis
        cached = self.redis.get(f"llm:response:{prompt_hash}")
        if cached:
            response = cached.decode('utf-8')
            # 回填本地缓存
            self.local_cache[prompt_hash] = response
            return response

        return None

    def cache_response(self, prompt_hash: str, response: str, ttl=3600):
        """
        缓存响应
        """
        # 写入Redis
        self.redis.setex(
            f"llm:response:{prompt_hash}",
            ttl,
            response
        )

        # 更新本地缓存
        self.local_cache[prompt_hash] = response

    def semantic_cache(self, prompt: str, embedding_model) -> Optional[str]:
        """
        语义缓存:基于嵌入相似度查找相似查询
        """
        # 计算当前prompt的嵌入
        current_embedding = embedding_model.encode(prompt)

        # 在向量数据库中搜索相似prompt
        similar_prompts = self.vector_db.search(
            current_embedding,
            top_k=1,
            threshold=0.95  # 相似度阈值
        )

        if similar_prompts:
            # 返回相似prompt的缓存结果
            return similar_prompts[0]['response']

        return None

class PrefixCache:
    """
    前缀缓存:复用共享前缀的KV缓存

    应用场景:
    - 多轮对话的系统prompt
    - RAG的固定上下文
    - Few-shot示例
    """
    def __init__(self):
        self.prefix_cache = {}  # prefix_hash -> KV cache

    def get_prefix_cache(self, prefix_tokens: List[int]):
        """
        获取前缀的KV缓存
        """
        prefix_hash = hash(tuple(prefix_tokens))
        return self.prefix_cache.get(prefix_hash)

    def store_prefix_cache(self, prefix_tokens: List[int], kv_cache):
        """
        存储前缀的KV缓存
        """
        prefix_hash = hash(tuple(prefix_tokens))
        self.prefix_cache[prefix_hash] = kv_cache

应用开发模式

4.1 RAG 应用架构

Python
class RAGApplication:
    """
    完整的RAG应用实现
    """
    def __init__(self):
        # 文档处理
        self.document_processor = DocumentProcessor()

        # 向量数据库
        self.vector_store = ChromaVectorStore()

        # 重排序器
        self.reranker = CrossEncoderReranker()

        # LLM
        self.llm = LLMClient()

    def ingest_documents(self, documents: List[str]):
        """
        文档入库流程
        """
        for doc in documents:
            # 1. 分块
            chunks = self.document_processor.chunk(doc)

            # 2. 生成嵌入
            embeddings = self.document_processor.embed(chunks)

            # 3. 存入向量库
            self.vector_store.add(chunks, embeddings)

    def query(self, question: str) -> dict:
        """
        RAG查询流程
        """
        # 1. 查询重写(可选)
        rewritten_query = self._rewrite_query(question)

        # 2. 检索
        query_embedding = self.document_processor.embed([rewritten_query])[0]
        retrieved_docs = self.vector_store.search(query_embedding, top_k=20)  # re.search正则表达式搜索匹配

        # 3. 重排序
        reranked_docs = self.reranker.rerank(question, retrieved_docs, top_k=5)

        # 4. 构建prompt
        context = "\n".join([doc['content'] for doc in reranked_docs])
        prompt = f"""基于以下上下文回答问题:

{context}

问题:{question}

回答:"""

        # 5. 生成回答
        answer = self.llm.generate(prompt)

        return {
            'answer': answer,
            'sources': reranked_docs,
            'context': context
        }

class AgentApplication:
    """
    Agent应用架构
    """
    def __init__(self):
        self.llm = LLMClient()
        self.tools = {
            'search': SearchTool(),
            'calculator': CalculatorTool(),
            'code_executor': CodeExecutorTool()
        }
        self.memory = ConversationMemory()

    def run(self, user_input: str) -> str:
        """
        Agent执行循环
        """
        self.memory.add_user_message(user_input)

        max_iterations = 10
        for _ in range(max_iterations):
            # 规划下一步
            action = self._plan_next_action()

            if action['type'] == 'respond':
                # 直接回答
                response = self.llm.generate(
                    self.memory.get_context()
                )
                self.memory.add_assistant_message(response)
                return response

            elif action['type'] == 'tool_use':
                # 使用工具
                tool_name = action['tool']
                tool_input = action['input']

                # 执行工具
                observation = self.tools[tool_name].execute(tool_input)

                # 记录观察结果
                self.memory.add_observation(tool_name, observation)

    def _plan_next_action(self) -> dict:
        """
        决定下一步行动
        """
        prompt = f"""基于对话历史,决定下一步:

{self.memory.get_context()}

可用工具:{list(self.tools.keys())}

决定:
- 如果需要使用工具,返回 {{"type": "tool_use", "tool": "工具名", "input": "输入"}}
- 如果可以回答,返回 {{"type": "respond"}}
"""

        response = self.llm.generate(prompt)
        return json.loads(response)  # json.loads将JSON字符串→Python对象

4.2 前端集成模式

JavaScript
// React组件:流式聊天界面
import React, { useState, useEffect, useRef } from 'react';

function ChatInterface() {
  const [messages, setMessages] = useState([]);
  const [input, setInput] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);
  const eventSourceRef = useRef(null);

  const sendMessage = async () => {
    if (!input.trim()) return;

    const userMessage = { role: 'user', content: input };
    setMessages(prev => [...prev, userMessage]);
    setInput('');
    setIsStreaming(true);

    // 创建EventSource连接
    const response = await fetch('/v1/chat/completions', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        model: 'llama-2-7b',
        messages: [...messages, userMessage],
        stream: true
      })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let assistantMessage = { role: 'assistant', content: '' };

    setMessages(prev => [...prev, assistantMessage]);

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const chunk = decoder.decode(value);
      const lines = chunk.split('\n');

      for (const line of lines) {
        if (line.startsWith('data: ')) {
          const data = line.slice(6);
          if (data === '[DONE]') {
            setIsStreaming(false);
            return;
          }

          try {
            const parsed = JSON.parse(data);
            const content = parsed.choices[0].delta.content || '';

            assistantMessage.content += content;
            setMessages(prev => [
              ...prev.slice(0, -1),
              { ...assistantMessage }
            ]);
          } catch (e) {
            console.error('Parse error:', e);
          }
        }
      }
    }
  };

  return (
    <div className="chat-container">
      <div className="messages">
        {messages.map((msg, idx) => (
          <div key={idx} className={`message ${msg.role}`}>
            {msg.content}
          </div>
        ))}
        {isStreaming && <div className="typing-indicator">...</div>}
      </div>

      <div className="input-area">
        <input
          value={input}
          onChange={(e) => setInput(e.target.value)}
          onKeyPress={(e) => e.key === 'Enter' && sendMessage()}
          placeholder="输入消息..."
          disabled={isStreaming}
        />
        <button onClick={sendMessage} disabled={isStreaming}>
          发送
        </button>
      </div>
    </div>
  );
}

产品化实践

5.1 监控与可观测性

Python
class LLMMonitoring:
    """
    LLM应用监控
    """
    def __init__(self):
        self.metrics = {
            'request_count': 0,
            'latency_p50': [],
            'latency_p99': [],
            'token_throughput': 0,
            'error_rate': 0
        }

    def log_request(self, request_data, response_data, latency):
        """
        记录请求指标
        """
        # 延迟
        self.metrics['latency_p50'].append(latency)

        # Token统计
        input_tokens = response_data['usage']['prompt_tokens']
        output_tokens = response_data['usage']['completion_tokens']

        # 成本估算
        cost = self._estimate_cost(input_tokens, output_tokens)

        # 发送到监控系统(如Prometheus)
        self._send_to_prometheus({
            'latency': latency,
            'input_tokens': input_tokens,
            'output_tokens': output_tokens,
            'cost': cost
        })

    def log_quality(self, prompt, response, user_feedback):
        """
        记录质量指标
        """
        # 用户满意度
        satisfaction = user_feedback.get('rating', 0)

        # 自动质量评估
        perplexity = self._compute_perplexity(response)
        diversity = self._compute_diversity(response)

        # 安全检测
        safety_score = self._safety_check(response)

        self._send_to_logging({
            'prompt': prompt,
            'response': response,
            'satisfaction': satisfaction,
            'perplexity': perplexity,
            'safety_score': safety_score
        })

class LLMTracing:
    """
    LLM调用链路追踪
    """
    def __init__(self):
        from opentelemetry import trace
        self.tracer = trace.get_tracer(__name__)

    def trace_rag_pipeline(self, query):
        """
        追踪RAG完整链路
        """
        with self.tracer.start_as_current_span("rag_query") as span:
            span.set_attribute("query", query)

            # 检索阶段
            with self.tracer.start_span("retrieval") as retrieval_span:
                docs = self.vector_store.search(query)
                retrieval_span.set_attribute("num_docs", len(docs))
                retrieval_span.set_attribute("retrieval_latency", 0.1)

            # 重排序阶段
            with self.tracer.start_span("reranking") as rerank_span:
                reranked = self.reranker.rerank(query, docs)
                rerank_span.set_attribute("rerank_latency", 0.05)

            # 生成阶段
            with self.tracer.start_span("generation") as gen_span:
                response = self.llm.generate(query, reranked)
                gen_span.set_attribute("output_tokens", len(response))
                gen_span.set_attribute("generation_latency", 2.0)

            return response

5.2 A/B 测试与实验

Python
class LLMExperiment:
    """
    LLM实验框架
    """
    def __init__(self):
        self.variants = {
            'control': ModelVariant('gpt-4o-mini'),
            'treatment': ModelVariant('gpt-4o')
        }
        self.traffic_split = 0.5  # 50/50分流

    def route_request(self, user_id: str, request: dict) -> str:
        """
        根据用户ID决定使用哪个变体(保证一致性)
        """
        # 哈希用户ID决定分组
        bucket = hash(user_id) % 100

        if bucket < self.traffic_split * 100:
            variant = 'control'
        else:
            variant = 'treatment'

        # 记录实验数据
        self._log_exposure(user_id, variant)

        return self.variants[variant].process(request)

    def evaluate(self, metric_fn):
        """
        评估实验结果
        """
        results = {}

        for variant_name, variant in self.variants.items():
            metrics = variant.get_metrics()
            results[variant_name] = {
                'sample_size': metrics['count'],
                'mean_latency': metrics['latency_mean'],
                'user_satisfaction': metrics['satisfaction_mean'],
                'cost_per_request': metrics['cost_mean']
            }

        # 统计显著性检验
        from scipy import stats
        control = self.variants['control'].get_metric_values('satisfaction')
        treatment = self.variants['treatment'].get_metric_values('satisfaction')

        t_stat, p_value = stats.ttest_ind(control, treatment)

        return {
            'results': results,
            'statistical_significance': p_value < 0.05,
            'p_value': p_value
        }

成本与性能权衡

6.1 成本模型

Text Only
LLM成本分析
═══════════════════════════════════════════════════════════════════

云端API成本(每1M tokens,2025年参考价格)
─────────────────────────────────────────────────────────────────
模型                输入价格    输出价格    备注
GPT-4o             $2.5       $10        高质量多模态
GPT-4o-mini        $0.15      $0.6       性价比之选(替代gpt-3.5-turbo)
o3-mini            $1.1       $4.4       推理模型
Claude Sonnet 4   $3         $15        最强推理
Claude 3.5 Haiku   $0.8       $4         快速响应
Gemini 2.0 Flash   $0.1       $0.4       超高性价比

自建成本(月度估算)
─────────────────────────────────────────────────────────────────
配置                硬件成本    运营成本    适用场景
1x A100 80GB       $2,000     $500       开发测试
8x A100 80GB       $16,000    $2,000     小规模生产
8x H100 80GB       $32,000    $4,000     大规模服务(H100已替代旧版A100集群)

成本优化策略:
1. 缓存:减少重复请求的API调用
2. 模型路由:简单任务用小模型
3. 批量处理:提高吞吐降低单位成本
4. 量化:降低推理硬件要求

═══════════════════════════════════════════════════════════════════

6.2 性能优化决策树

Text Only
性能优化决策
═══════════════════════════════════════════════════════════════════

延迟太高?
    ├── 首token延迟高(TTFT)
    │       ├── 模型太大加载慢
    │       │       └──▶ 模型量化 / 使用更小模型
    │       └── 批处理不足
    │               └──▶ 启用continuous batching
    ├── 生成速度慢(TPOT)
    │       ├── 内存带宽瓶颈
    │       │       └──▶ 量化 / 投机解码
    │       └── 计算瓶颈
    │               └──▶ 张量并行 / 使用更快GPU
    └── 端到端延迟高
            ├── 网络延迟
            │       └──▶ 边缘部署 / CDN
            └── 队列延迟
                    └──▶ 扩容 / 负载均衡优化

吞吐量不足?
    ├── GPU利用率低
    │       └──▶ 增大batch size / continuous batching
    └── GPU已满载
            └──▶ 水平扩容 / 模型并行

═══════════════════════════════════════════════════════════════════

下一步

完成本章节学习后,你已经系统掌握了大模型从原理到应用的全链路知识。

建议的实践方向: 1. 动手部署:使用 vLLM 部署一个开源模型 2. 构建应用:开发一个 RAG 或 Agent 应用 3. 性能优化:尝试量化和推理优化技术 4. 持续学习:关注最新论文和开源项目

推荐资源: - vLLM Documentation - HuggingFace Inference API - LangChain Production Guide

恭喜你完成了 LLM 系统学习的全部内容!


最后更新日期: 2026-02-12 适用版本: LLM 学习教程 v2026