跳转至

04 - 云端推理服务

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

将模型部署到云端,提供可扩展的推理服务

📎 交叉引用: FastAPI 推理服务封装请参考 LLM 学习/03-推理服务部署 §5 ,通用 LLM 部署成本/监控请参考 LLM 应用/11-大模型部署 §11.4 。本章侧重云平台特有功能( SageMaker/GCP Vertex AI )。

📖 章节概述

本章将深入探讨云端推理服务的部署和优化,包括云端部署、 API 服务设计和成本优化等内容。这些技术可以帮助你构建高性能、低成本的云端推理服务。

🎯 学习目标

完成本章后,你将能够:

  • 理解云端推理服务的架构设计
  • 掌握模型部署到云端的方法
  • 了解 API 服务的设计和实现
  • 能够优化云端推理服务的成本

1. 云端推理服务概述

1.1 云端推理的优势

可扩展性: - 根据需求自动扩缩容 - 支持高并发访问 - 灵活调整资源配置

成本效益: - 按需付费 - 无需购买硬件 - 降低运维成本

高性能: - 使用最新的 GPU 硬件 - 优化的网络和存储 - 专业的运维支持

1.2 云端推理架构

Text Only
云端推理服务架构
├── 负载均衡层
│   ├── API网关
│   └── 负载均衡器
├── 推理服务层
│   ├── 模型服务器
│   ├── 批处理服务
│   └── 缓存服务
├── 模型管理层
│   ├── 模型仓库
│   ├── 模型版本控制
│   └── 模型监控
└── 监控和日志
    ├── 性能监控
    ├── 日志收集
    └── 告警系统

2. 云端部署

2.1 使用 Hugging Face Inference Endpoints

Python
from huggingface_hub import InferenceClient

# 初始化推理客户端
client = InferenceClient(
    model="meta-llama/Llama-2-7b-hf",
    token="your_api_token"
)

# 推理
prompt = "请介绍一下人工智能的发展历程"
output = client.text_generation(
    prompt,
    max_new_tokens=200,
    temperature=0.7,
    top_p=0.95
)

print(output)

2.2 使用 AWS SageMaker

Python
import boto3
import sagemaker
from sagemaker.huggingface import HuggingFaceModel

def deploy_to_sagemaker(model_name, instance_type="ml.g4dn.xlarge"):
    """
    部署模型到AWS SageMaker

    Args:
        model_name: 模型名称
        instance_type: 实例类型
    """
    # 创建SageMaker会话
    session = sagemaker.Session()
    role = sagemaker.get_execution_role()

    # 创建HuggingFace模型
    # 注意:版本号需与SageMaker支持的容器镜像匹配
    # 最新支持版本请参考:https://docs.aws.amazon.com/sagemaker/latest/dg-ecr-paths/sagemaker-algo-docker-registry-paths.html
    huggingface_model = HuggingFaceModel(
        model_data=model_name,
        role=role,
        transformers_version="4.37",  # 更新到2024年稳定版本
        pytorch_version="2.1",        # 更新到PyTorch 2.x
        py_version="py310",
        model_server_workers=1
    )

    # 部署模型
    predictor = huggingface_model.deploy(
        initial_instance_count=1,
        instance_type=instance_type,
        endpoint_name=f"{model_name}-endpoint"
    )

    return predictor

# 使用示例
# predictor = deploy_to_sagemaker("meta-llama/Llama-2-7b-hf")

# 推理
# response = predictor.predict({
#     "inputs": "请介绍一下人工智能的发展历程",
#     "parameters": {
#         "max_new_tokens": 200,
#         "temperature": 0.7
#     }
# })

2.3 使用 Google Cloud AI Platform

Python
from google.cloud import aiplatform

def deploy_to_gcp(model_name, project_id, region="us-central1"):
    """
    部署模型到Google Cloud AI Platform

    Args:
        model_name: 模型名称
        project_id: GCP项目ID
        region: 区域
    """
    # 初始化AI Platform
    aiplatform.init(project=project_id, location=region)

    # 上传模型
    model = aiplatform.Model.upload(
        display_name=model_name,
        artifact_uri=f"gs://your-bucket/{model_name}",
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/pytorch-gpu.1-13:latest"
    )

    # 部署端点
    endpoint = model.deploy(
        machine_type="n1-standard-4",
        accelerator_type="NVIDIA_TESLA_T4",
        accelerator_count=1,
        min_replica_count=1,
        max_replica_count=3
    )

    return endpoint

# 使用示例
# endpoint = deploy_to_gcp("meta-llama/Llama-2-7b-hf", "your-project-id")

3. API 服务设计

3.1 使用 FastAPI 构建推理服务

Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

app = FastAPI(title="LLM Inference API")

# 加载模型
model_name = "meta-llama/Llama-2-7b-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    device_map="auto"
)

class InferenceRequest(BaseModel):  # BaseModel Pydantic数据验证模型
    prompt: str
    max_tokens: int = 200
    temperature: float = 0.7
    top_p: float = 0.95

class InferenceResponse(BaseModel):
    text: str
    tokens_generated: int

@app.post("/generate", response_model=InferenceResponse)
async def generate_text(request: InferenceRequest):  # async定义异步函数
    """
    生成文本
    """
    try:  # try/except捕获异常
        # 编码输入
        inputs = tokenizer(request.prompt, return_tensors="pt").to(model.device)

        # 生成文本
        with torch.no_grad():  # 禁用梯度计算,节省内存
            outputs = model.generate(
                **inputs,
                max_new_tokens=request.max_tokens,
                temperature=request.temperature,
                top_p=request.top_p,
                do_sample=True
            )

        # 解码输出
        generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
        tokens_generated = outputs.shape[1] - inputs["input_ids"].shape[1]

        return InferenceResponse(
            text=generated_text,
            tokens_generated=tokens_generated
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    """
    健康检查
    """
    return {"status": "healthy"}

@app.get("/model/info")
async def model_info():
    """
    模型信息
    """
    return {
        "model_name": model_name,
        "device": str(model.device),
        "parameters": model.num_parameters()
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

3.2 批处理推理服务

Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import asyncio

app = FastAPI(title="Batch Inference API")

# 加载模型
model_name = "meta-llama/Llama-2-7b-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    device_map="auto"
)

class BatchInferenceRequest(BaseModel):
    prompts: list[str]
    max_tokens: int = 200
    temperature: float = 0.7
    top_p: float = 0.95

class BatchInferenceResponse(BaseModel):
    texts: list[str]
    tokens_generated: list[int]

@app.post("/batch-generate", response_model=BatchInferenceResponse)
async def batch_generate_text(request: BatchInferenceRequest):
    """
    批量生成文本
    """
    try:
        # 编码输入
        inputs = tokenizer(
            request.prompts,
            padding=True,
            truncation=True,
            return_tensors="pt"
        ).to(model.device)

        # 生成文本
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=request.max_tokens,
                temperature=request.temperature,
                top_p=request.top_p,
                do_sample=True,
                pad_token_id=tokenizer.eos_token_id
            )

        # 解码输出
        generated_texts = []
        tokens_generated = []

        for i, output in enumerate(outputs):  # enumerate同时获取索引和元素
            generated_text = tokenizer.decode(output, skip_special_tokens=True)
            generated_texts.append(generated_text)
            tokens_generated.append(output.shape[0] - inputs["input_ids"][i].shape[0])

        return BatchInferenceResponse(
            texts=generated_texts,
            tokens_generated=tokens_generated
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

3.3 流式推理服务

Python
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import json

app = FastAPI(title="Streaming Inference API")

# 加载模型
model_name = "meta-llama/Llama-2-7b-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    device_map="auto"
)

class StreamingRequest(BaseModel):
    prompt: str
    max_tokens: int = 200
    temperature: float = 0.7
    top_p: float = 0.95

async def stream_generator(request: StreamingRequest):
    """
    流式生成器
    """
    try:
        # 编码输入
        inputs = tokenizer(request.prompt, return_tensors="pt").to(model.device)

        # 流式生成
        generated_tokens = []
        with torch.no_grad():
            for i in range(request.max_tokens):
                outputs = model.generate(
                    **inputs,
                    max_new_tokens=1,
                    temperature=request.temperature,
                    top_p=request.top_p,
                    do_sample=True
                )

                # 获取新生成的token
                new_token = outputs[0][-1].unsqueeze(0)  # unsqueeze增加一个维度  # [-1]负索引取最后元素
                generated_tokens.append(new_token)

                # 更新输入
                inputs["input_ids"] = torch.cat([inputs["input_ids"], new_token], dim=1)  # torch.cat沿已有维度拼接张量

                # 生成文本片段
                text = tokenizer.decode(torch.cat(generated_tokens, dim=0), skip_special_tokens=True)

                # 发送数据
                yield json.dumps({"text": text, "done": False}) + "\n"  # yield产出值,函数变为生成器  # json.dumps将Python对象序列化为JSON字符串

        # 发送完成信号
        yield json.dumps({"text": text, "done": True}) + "\n"

    except Exception as e:
        yield json.dumps({"error": str(e)}) + "\n"

@app.post("/stream-generate")
async def stream_generate_text(request: StreamingRequest):
    """
    流式生成文本
    """
    return StreamingResponse(
        stream_generator(request),
        media_type="text/event-stream"
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

4. 成本优化

4.1 实例选择策略

Python
def calculate_cost(instance_type, hours, region="us-east-1"):
    """
    计算云实例成本

    Args:
        instance_type: 实例类型
        hours: 使用小时数
        region: 区域
    """
    # AWS定价(示例)
    pricing = {
        "ml.g4dn.xlarge": 0.526,  # 1x T4 GPU
        "ml.g4dn.2xlarge": 0.752,  # 1x T4 GPU
        "ml.g5.xlarge": 1.006,  # 1x A10G GPU
        "ml.g5.2xlarge": 1.341,  # 1x A10G GPU
        "ml.p3.2xlarge": 3.06,  # 1x V100 GPU
        "ml.p3.8xlarge": 12.24,  # 4x V100 GPU
    }

    hourly_cost = pricing.get(instance_type, 0)
    total_cost = hourly_cost * hours

    return {
        "instance_type": instance_type,
        "hourly_cost": hourly_cost,
        "hours": hours,
        "total_cost": total_cost
    }

# 使用示例
# cost = calculate_cost("ml.g4dn.xlarge", 100)
# print(f"总成本: ${cost['total_cost']:.2f}")

4.2 自动扩缩容

Python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import boto3

app = FastAPI(title="Auto-scaling Inference Service")

# 初始化AWS客户端
autoscaling = boto3.client('autoscaling')
ec2 = boto3.client('ec2')

class ScalingPolicy:
    """
    自动扩缩容策略
    """
    def __init__(self, min_instances=1, max_instances=5,
                 scale_up_threshold=0.8, scale_down_threshold=0.3):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.scale_up_threshold = scale_up_threshold
        self.scale_down_threshold = scale_down_threshold

    def should_scale_up(self, current_load, current_instances):
        """
        判断是否需要扩容
        """
        return (current_load > self.scale_up_threshold and
                current_instances < self.max_instances)

    def should_scale_down(self, current_load, current_instances):
        """
        判断是否需要缩容
        """
        return (current_load < self.scale_down_threshold and
                current_instances > self.min_instances)

# 全局变量
scaling_policy = ScalingPolicy()
current_instances = 1
current_load = 0.0

@app.get("/metrics")
async def get_metrics():
    """
    获取指标
    """
    return {
        "current_instances": current_instances,
        "current_load": current_load
    }

@app.post("/scale")
async def scale_instances():
    """
    扩缩容实例
    """
    global current_instances

    if scaling_policy.should_scale_up(current_load, current_instances):
        # 扩容
        new_instances = min(current_instances + 1, scaling_policy.max_instances)
        current_instances = new_instances
        return {"action": "scale_up", "new_instances": new_instances}

    elif scaling_policy.should_scale_down(current_load, current_instances):
        # 缩容
        new_instances = max(current_instances - 1, scaling_policy.min_instances)
        current_instances = new_instances
        return {"action": "scale_down", "new_instances": new_instances}

    else:
        return {"action": "no_change", "current_instances": current_instances}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

4.3 缓存策略

Python
from fastapi import FastAPI
from pydantic import BaseModel
import hashlib
import json
from datetime import datetime, timedelta

app = FastAPI(title="Cached Inference Service")

class CacheEntry:
    """
    缓存条目
    """
    def __init__(self, text: str, ttl: int = 3600):
        self.text = text
        self.created_at = datetime.now()
        self.ttl = ttl

    def is_expired(self):
        """
        检查是否过期
        """
        return datetime.now() - self.created_at > timedelta(seconds=self.ttl)

class InferenceCache:
    """
    推理缓存
    """
    def __init__(self, max_size=1000):
        self.cache = {}
        self.max_size = max_size

    def _generate_key(self, prompt: str, **kwargs) -> str:  # *args接收任意位置参数,**kwargs接收任意关键字参数
        """
        生成缓存键
        """
        data = {"prompt": prompt, **kwargs}
        data_str = json.dumps(data, sort_keys=True)
        return hashlib.md5(data_str.encode()).hexdigest()

    def get(self, prompt: str, **kwargs) -> str | None:
        """
        获取缓存
        """
        key = self._generate_key(prompt, **kwargs)
        entry = self.cache.get(key)

        if entry and not entry.is_expired():
            return entry.text

        return None

    def set(self, prompt: str, text: str, **kwargs):
        """
        设置缓存
        """
        key = self._generate_key(prompt, **kwargs)

        # 如果缓存已满,删除最旧的条目
        if len(self.cache) >= self.max_size:
            oldest_key = min(self.cache.keys(),
                           key=lambda k: self.cache[k].created_at)  # lambda匿名函数
            del self.cache[oldest_key]

        self.cache[key] = CacheEntry(text)

    def clear(self):
        """
        清空缓存
        """
        self.cache.clear()

# 全局缓存
inference_cache = InferenceCache()

class InferenceRequest(BaseModel):
    prompt: str
    max_tokens: int = 200
    temperature: float = 0.7
    use_cache: bool = True

@app.post("/generate")
async def generate_with_cache(request: InferenceRequest):
    """
    带缓存的推理
    """
    # 检查缓存
    if request.use_cache:
        cached_result = inference_cache.get(
            request.prompt,
            max_tokens=request.max_tokens,
            temperature=request.temperature
        )

        if cached_result:
            return {"text": cached_result, "from_cache": True}

    # 执行推理(这里简化处理)
    generated_text = f"生成的文本: {request.prompt}"

    # 缓存结果
    if request.use_cache:
        inference_cache.set(
            request.prompt,
            generated_text,
            max_tokens=request.max_tokens,
            temperature=request.temperature
        )

    return {"text": generated_text, "from_cache": False}

@app.post("/cache/clear")
async def clear_cache():
    """
    清空缓存
    """
    inference_cache.clear()
    return {"status": "cache_cleared"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

5. 监控和日志

5.1 性能监控

Python
from fastapi import FastAPI, Request
import time
import psutil
import torch

app = FastAPI(title="Monitored Inference Service")

# 性能指标
metrics = {
    "total_requests": 0,
    "total_tokens": 0,
    "total_time": 0.0,
    "gpu_memory_used": 0.0,
    "cpu_usage": 0.0
}

@app.middleware("http")
async def monitor_requests(request: Request, call_next):
    """
    监控请求
    """
    start_time = time.time()

    # 处理请求
    response = await call_next(request)  # await等待异步操作完成

    # 更新指标
    process_time = time.time() - start_time
    metrics["total_requests"] += 1
    metrics["total_time"] += process_time

    # 添加响应头
    response.headers["X-Process-Time"] = str(process_time)

    return response

@app.get("/metrics")
async def get_metrics():
    """
    获取性能指标
    """
    # 获取GPU内存使用
    if torch.cuda.is_available():
        gpu_memory_used = torch.cuda.memory_allocated() / 1e9
        metrics["gpu_memory_used"] = gpu_memory_used

    # 获取CPU使用率
    metrics["cpu_usage"] = psutil.cpu_percent()

    # 计算平均响应时间
    avg_time = metrics["total_time"] / metrics["total_requests"] if metrics["total_requests"] > 0 else 0

    return {
        **metrics,
        "avg_response_time": avg_time
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

5.2 日志收集

Python
from fastapi import FastAPI, Request
import logging
from datetime import datetime
import json

app = FastAPI(title="Logging Inference Service")

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('inference.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

@app.middleware("http")
async def log_requests(request: Request, call_next):
    """
    记录请求日志
    """
    start_time = datetime.now()

    # 记录请求信息
    log_data = {
        "timestamp": start_time.isoformat(),
        "method": request.method,
        "url": str(request.url),
        "client": request.client.host if request.client else None
    }

    logger.info(f"Request: {json.dumps(log_data)}")

    # 处理请求
    response = await call_next(request)

    # 记录响应信息
    end_time = datetime.now()
    process_time = (end_time - start_time).total_seconds()

    log_data.update({
        "status_code": response.status_code,
        "process_time": process_time
    })

    logger.info(f"Response: {json.dumps(log_data)}")

    return response

@app.get("/")
async def root():
    """
    根路径
    """
    logger.info("Root endpoint accessed")
    return {"message": "Inference Service"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

6. 练习题

基础练习

  1. 实现简单的推理 API
Python
# 练习: 实现一个简单的推理API
from fastapi import FastAPI
app = FastAPI()

@app.post("/generate")
async def generate(prompt: str):
    # 你的代码
    pass
  1. 实现缓存机制
Python
# 练习: 实现一个简单的缓存类
class SimpleCache:
    def __init__(self):
        # 你的代码
        pass

    def get(self, key):
        # 你的代码
        pass

    def set(self, key, value):
        # 你的代码
        pass

进阶练习

  1. 实现自动扩缩容
Python
# 练习: 实现基于负载的自动扩缩容
class AutoScaler:
    def __init__(self, min_instances, max_instances):
        # 你的代码
        pass

    def check_and_scale(self, current_load):
        # 你的代码
        pass
  1. 实现成本计算器
Python
# 练习: 实现云端推理成本计算器
class CostCalculator:
    def __init__(self, pricing):
        # 你的代码
        pass

    def calculate(self, instance_type, hours):
        # 你的代码
        pass

项目练习

  1. 创建完整的云端推理服务
  2. 支持多种推理模式
  3. 实现自动扩缩容
  4. 集成监控和日志

7. 最佳实践

✅ 推荐做法

  1. 选择合适的云服务
  2. 根据需求选择 AWS 、 GCP 或 Azure
  3. 考虑成本、性能和可用性
  4. 使用托管服务简化部署

  5. 优化成本

  6. 使用预留实例
  7. 实现自动扩缩容
  8. 利用缓存减少计算

  9. 监控和告警

  10. 监控关键指标
  11. 设置告警阈值
  12. 及时响应问题

❌ 避免做法

  1. 过度配置
  2. 不要使用过大的实例
  3. 根据实际需求选择
  4. 定期评估和优化

  5. 忽略安全

  6. 使用 HTTPS 加密
  7. 实现认证和授权
  8. 保护 API 密钥

  9. 缺乏监控

  10. 不要忽略性能监控
  11. 记录关键指标
  12. 建立告警机制

8. 总结

本章介绍了云端推理服务的核心内容:

  • 云端部署: AWS 、 GCP 等云平台部署
  • API 服务: FastAPI 构建推理服务
  • 成本优化: 实例选择、自动扩缩容、缓存
  • 监控日志: 性能监控和日志收集

构建高质量的云端推理服务需要综合考虑性能、成本和可靠性。

9. 下一步

继续学习05-边缘部署,了解如何将模型部署到边缘设备。