跳转至

项目 1: RAG 知识库问答系统

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

难度: ⭐⭐⭐ 中等 时间: 10-15 小时 涉及知识: RAG 技术、向量数据库、 LangChain 、 API 开发


📖 项目概述

项目背景

随着大语言模型的普及,企业需要一个能够基于内部文档回答用户问题的智能问答系统。传统的搜索引擎只能基于关键词匹配,无法理解用户意图;而大语言模型虽然能理解意图,但缺乏企业内部知识。 RAG ( Retrieval-Augmented Generation ,检索增强生成)技术结合了两者的优势,先从知识库中检索相关文档,然后基于检索结果生成答案。

项目目标

构建一个完整的 RAG 知识库问答系统,能够: - 上传和管理企业文档 - 自动文档切分和向量化 - 基于用户问题检索相关文档 - 生成准确的答案并引用来源 - 支持多轮对话 - 提供友好的 Web 界面

技术栈

  • 后端框架: FastAPI
  • LLM 框架: LangChain
  • 向量数据库: ChromaDB
  • 大模型: OpenAI GPT-4 / 通义千问 / 文心一言
  • 前端框架: Streamlit
  • 文档处理: PyPDF2, Python-docx
  • 文本嵌入: OpenAI Embeddings / HuggingFace Embeddings

🏗️ 项目结构

Text Only
rag-qa-system/
├── app/                      # 应用主目录
│   ├── __init__.py
│   ├── main.py              # FastAPI主应用
│   ├── config.py            # 配置文件
│   ├── document_processor.py # 文档处理模块
│   ├── vector_store.py      # 向量数据库管理
│   ├── rag_chain.py         # RAG链构建
│   └── api/                 # API路由
│       ├── __init__.py
│       ├── documents.py     # 文档管理API
│       └── chat.py          # 聊天API
├── frontend/                 # 前端目录
│   ├── app.py              # Streamlit应用
│   ├── pages/              # 页面组件
│   └── components/         # UI组件
├── data/                    # 数据目录
│   ├── documents/          # 原始文档
│   ├── processed/          # 处理后文档
│   └── vector_store/       # 向量数据库
├── tests/                   # 测试目录
│   ├── test_document_processor.py
│   ├── test_vector_store.py
│   └── test_rag_chain.py
├── utils/                   # 工具函数
│   ├── __init__.py
│   ├── text_splitter.py    # 文本切分工具
│   └── logger.py          # 日志工具
├── requirements.txt         # Python依赖
├── Dockerfile              # Docker配置
├── docker-compose.yml      # Docker Compose配置
└── README.md              # 项目说明

🎯 核心功能

1. 文档管理

  • 文档上传: 支持 PDF 、 Word 、 TXT 、 Markdown 等格式
  • 文档预处理: 自动提取文本、清理格式
  • 文档切分: 智能切分文档为合适大小的文本块
  • 文档索引: 将文本块向量化并存储到向量数据库

2. 向量检索

  • 相似度检索: 基于用户问题检索最相关的文档片段
  • 混合检索: 结合关键词检索和语义检索
  • 重排序: 对检索结果进行重新排序,提高准确性

3. 答案生成

  • 上下文构建: 将检索到的文档片段组合成上下文
  • 答案生成: 基于上下文和问题生成准确答案
  • 来源引用: 标注答案引用的文档来源
  • 置信度评分: 评估答案的可信度

4. 对话管理

  • 多轮对话: 支持连续对话,理解上下文
  • 历史记录: 保存用户对话历史
  • 会话管理: 支持多个独立会话

5. Web 界面

  • 文档管理界面: 上传、查看、删除文档
  • 聊天界面: 用户友好的对话界面
  • 结果展示: 清晰展示答案和来源引用

💻 代码实现

1. 配置文件 (app/config.py)

Python
"""
RAG系统配置文件
"""
import os
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    """应用配置"""

    # API配置
    API_HOST: str = "0.0.0.0"
    API_PORT: int = 8000
    API_PREFIX: str = "/api/v1"

    # LLM配置
    OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
    OPENAI_MODEL: str = "gpt-4o"
    OPENAI_TEMPERATURE: float = 0.7

    # Embeddings配置
    EMBEDDING_MODEL: str = "text-embedding-3-small"
    EMBEDDING_DIMENSION: int = 1536

    # 向量数据库配置
    CHROMA_PERSIST_DIR: str = "./data/vector_store"
    CHROMA_COLLECTION_NAME: str = "documents"

    # 文档处理配置
    CHUNK_SIZE: int = 500
    CHUNK_OVERLAP: int = 50
    MAX_DOCUMENTS: int = 1000

    # RAG配置
    TOP_K_RETRIEVAL: int = 3
    MIN_SIMILARITY_SCORE: float = 0.7

    # 文件上传配置
    UPLOAD_DIR: str = "./data/documents"
    MAX_FILE_SIZE: int = 10 * 1024 * 1024  # 10MB
    ALLOWED_EXTENSIONS: set = {".pdf", ".docx", ".txt", ".md"}

    class Config:
        env_file = ".env"
        case_sensitive = True

# 全局配置实例
settings = Settings()

2. 文档处理模块 (app/document_processor.py)

Python
"""
文档处理模块
"""
import os
from pathlib import Path
import PyPDF2
from docx import Document
import chardet

class DocumentProcessor:
    """文档处理器"""

    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
        """
        初始化文档处理器

        Args:
            chunk_size: 文本块大小
            chunk_overlap: 文本块重叠大小
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def load_document(self, file_path: str) -> str:
        """
        加载文档内容

        Args:
            file_path: 文档路径

        Returns:
            文档文本内容
        """
        file_path = Path(file_path)
        extension = file_path.suffix.lower()

        if extension == ".pdf":
            return self._load_pdf(file_path)
        elif extension == ".docx":
            return self._load_docx(file_path)
        elif extension == ".txt":
            return self._load_txt(file_path)
        elif extension == ".md":
            return self._load_md(file_path)
        else:
            raise ValueError(f"不支持的文件格式: {extension}")

    def _load_pdf(self, file_path: Path) -> str:
        """加载PDF文档"""
        text = ""
        with open(file_path, "rb") as file:  # with自动管理文件关闭
            pdf_reader = PyPDF2.PdfReader(file)
            for page in pdf_reader.pages:
                text += page.extract_text() + "\n"
        return text

    def _load_docx(self, file_path: Path) -> str:
        """加载Word文档"""
        doc = Document(file_path)
        text = ""
        for paragraph in doc.paragraphs:
            text += paragraph.text + "\n"
        return text

    def _load_txt(self, file_path: Path) -> str:
        """加载TXT文档"""
        # 检测文件编码
        with open(file_path, "rb") as file:
            raw_data = file.read()
            result = chardet.detect(raw_data)
            encoding = result["encoding"]

        with open(file_path, "r", encoding=encoding) as file:
            return file.read()

    def _load_md(self, file_path: Path) -> str:
        """加载Markdown文档"""
        with open(file_path, "r", encoding="utf-8") as file:
            return file.read()

    def split_text(self, text: str) -> list[str]:
        """
        切分文本为块

        Args:
            text: 原始文本

        Returns:
            文本块列表
        """
        chunks = []
        start = 0

        while start < len(text):
            end = start + self.chunk_size

            # 如果不是最后一块,尝试在句子边界切分
            if end < len(text):
                # 查找最近的句号、问号、感叹号
                for sep in ["。", "!", "?", ".", "!", "?", "\n"]:
                    last_sep = text.rfind(sep, start, end)
                    if last_sep != -1:
                        end = last_sep + 1
                        break

            chunk = text[start:end].strip()
            if chunk:
                chunks.append(chunk)

            start = end - self.chunk_overlap

        return chunks

    def process_document(self, file_path: str) -> list[str]:
        """
        处理文档:加载和切分

        Args:
            file_path: 文档路径

        Returns:
            文本块列表
        """
        # 加载文档
        text = self.load_document(file_path)

        # 清理文本
        text = self._clean_text(text)

        # 切分文本
        chunks = self.split_text(text)

        return chunks

    def _clean_text(self, text: str) -> str:
        """清理文本"""
        # 移除多余空行
        text = "\n".join(line.strip() for line in text.split("\n") if line.strip())  # 链式调用:strip去除空白
        return text

3. 向量数据库管理 (app/vector_store.py)

Python
"""
向量数据库管理模块
"""
import os
from typing import Any
from pathlib import Path
import chromadb
from chromadb.config import Settings as ChromaSettings
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document

class VectorStoreManager:
    """向量数据库管理器"""

    def __init__(
        self,
        persist_directory: str = "./data/vector_store",
        collection_name: str = "documents",
        embedding_model: str = "text-embedding-3-small"
    ):
        """
        初始化向量数据库管理器

        Args:
            persist_directory: 持久化目录
            collection_name: 集合名称
            embedding_model: 嵌入模型
        """
        self.persist_directory = persist_directory
        self.collection_name = collection_name
        self.embedding_model = embedding_model

        # 创建持久化目录
        Path(persist_directory).mkdir(parents=True, exist_ok=True)

        # 初始化嵌入模型
        self.embeddings = OpenAIEmbeddings(
            model=embedding_model,
            openai_api_key=os.getenv("OPENAI_API_KEY")
        )

        # 初始化向量数据库
        self._init_vector_store()

    def _init_vector_store(self):
        """初始化向量数据库"""
        self.vector_store = Chroma(
            collection_name=self.collection_name,
            embedding_function=self.embeddings,
            persist_directory=self.persist_directory
        )

    def add_documents(
        self,
        chunks: list[str],
        metadata: list[dict[str, Any]] | None = None
    ) -> list[str]:
        """
        添加文档到向量数据库

        Args:
            chunks: 文本块列表
            metadata: 元数据列表

        Returns:
            文档ID列表
        """
        # 创建Document对象
        documents = []
        for i, chunk in enumerate(chunks):  # enumerate同时获取索引和元素
            doc_metadata = metadata[i] if metadata and i < len(metadata) else {}  # 三元表达式:metadata存在且索引有效时取值,否则用空字典
            doc = Document(
                page_content=chunk,
                metadata=doc_metadata
            )
            documents.append(doc)

        # 添加到向量数据库
        ids = self.vector_store.add_documents(documents)

        return ids

    def similarity_search(
        self,
        query: str,
        k: int = 3,
        filter: dict[str, Any] | None = None
    ) -> list[Document]:
        """
        相似度搜索

        Args:
            query: 查询文本
            k: 返回结果数量
            filter: 过滤条件

        Returns:
            相关文档列表
        """
        results = self.vector_store.similarity_search(
            query=query,
            k=k,
            filter=filter
        )
        return results

    def similarity_search_with_score(
        self,
        query: str,
        k: int = 3,
        filter: dict[str, Any] | None = None
    ) -> list[tuple[Document, float]]:
        """
        带分数的相似度搜索

        Args:
            query: 查询文本
            k: 返回结果数量
            filter: 过滤条件

        Returns:
            (文档, 相似度分数)列表
        """
        results = self.vector_store.similarity_search_with_score(
            query=query,
            k=k,
            filter=filter
        )
        return results

    def delete_collection(self):
        """删除集合"""
        # ChromaDB不支持直接删除集合,需要重新初始化
        import shutil
        if Path(self.persist_directory).exists():
            shutil.rmtree(self.persist_directory)
        self._init_vector_store()

    def get_collection_stats(self) -> dict[str, Any]:
        """获取集合统计信息"""
        collection = self.vector_store._collection
        count = collection.count()
        return {
            "document_count": count,
            "collection_name": self.collection_name
        }

4. RAG 链构建 (app/rag_chain.py)

Python
"""
RAG链构建模块
"""
from typing import Any
from langchain_openai import ChatOpenAI
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate
from app.vector_store import VectorStoreManager
from app.config import settings

class RAGChain:
    """RAG链"""

    def __init__(self, vector_store: VectorStoreManager):
        """
        初始化RAG链

        Args:
            vector_store: 向量数据库管理器
        """
        self.vector_store = vector_store

        # 初始化LLM
        self.llm = ChatOpenAI(
            model=settings.OPENAI_MODEL,
            temperature=settings.OPENAI_TEMPERATURE,
            openai_api_key=settings.OPENAI_API_KEY
        )

        # 创建检索器
        self.retriever = vector_store.vector_store.as_retriever(
            search_kwargs={"k": settings.TOP_K_RETRIEVAL}
        )

        # 创建RAG链
        self.qa_chain = self._create_qa_chain()

    def _create_qa_chain(self):
        """创建问答链(LCEL方式,替代已废弃的 RetrievalQA)"""

        # 定义提示模板
        system_prompt = """你是一个智能助手,请基于以下已知信息回答用户的问题。
如果已知信息不足以回答问题,请明确说明。
请用中文回答,并在回答中引用相关信息的来源。

{context}"""

        prompt = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            ("human", "{input}")
        ])

        # 创建问答链
        question_answer_chain = create_stuff_documents_chain(self.llm, prompt)
        qa_chain = create_retrieval_chain(self.retriever, question_answer_chain)

        return qa_chain

    def query(self, question: str) -> dict[str, Any]:
        """
        查询RAG系统

        Args:
            question: 用户问题

        Returns:
            包含答案和来源的字典
        """
        # 执行查询
        result = self.qa_chain.invoke({"input": question})

        # 提取答案和来源
        answer = result["answer"]
        source_documents = result["context"]

        # 提取来源信息
        sources = []
        for doc in source_documents:
            source_info = {
                "content": doc.page_content,
                "metadata": doc.metadata
            }
            sources.append(source_info)

        return {
            "answer": answer,
            "sources": sources,
            "question": question
        }

    def query_with_score(self, question: str) -> dict[str, Any]:
        """
        带相似度分数的查询

        Args:
            question: 用户问题

        Returns:
            包含答案、来源和分数的字典
        """
        # 检索相关文档
        docs_with_scores = self.vector_store.similarity_search_with_score(
            query=question,
            k=settings.TOP_K_RETRIEVAL
        )

        # 过滤低相似度文档
        filtered_docs = [
            (doc, score) for doc, score in docs_with_scores
            if score <= (1 - settings.MIN_SIMILARITY_SCORE)
        ]

        if not filtered_docs:
            return {
                "answer": "抱歉,我没有找到相关信息来回答您的问题。",
                "sources": [],
                "question": question,
                "confidence": 0.0
            }

        # 构建上下文
        context = "\n\n".join([doc.page_content for doc, _ in filtered_docs])

        # 生成答案
        prompt = f"""基于以下信息回答问题:

{context}

问题:{question}

请用中文回答。"""

        response = self.llm.invoke(prompt)

        # 提取来源信息
        sources = []
        for doc, score in filtered_docs:
            source_info = {
                "content": doc.page_content,
                "metadata": doc.metadata,
                "similarity_score": float(1 - score)
            }
            sources.append(source_info)

        # 计算平均置信度
        avg_confidence = sum(1 - score for _, score in filtered_docs) / len(filtered_docs)

        return {
            "answer": response.content,
            "sources": sources,
            "question": question,
            "confidence": avg_confidence
        }

5. FastAPI 主应用 (app/main.py)

Python
"""
FastAPI主应用
"""
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pathlib import Path
import shutil
import uvicorn

from app.config import settings
from app.document_processor import DocumentProcessor
from app.vector_store import VectorStoreManager
from app.rag_chain import RAGChain
from app.api import documents, chat

# 创建FastAPI应用
app = FastAPI(
    title="RAG知识库问答系统",
    description="基于RAG技术的智能问答系统",
    version="1.0.0"
)

# 配置CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 初始化组件
document_processor = DocumentProcessor(
    chunk_size=settings.CHUNK_SIZE,
    chunk_overlap=settings.CHUNK_OVERLAP
)
vector_store_manager = VectorStoreManager(
    persist_directory=settings.CHROMA_PERSIST_DIR,
    collection_name=settings.CHROMA_COLLECTION_NAME,
    embedding_model=settings.EMBEDDING_MODEL
)
rag_chain = RAGChain(vector_store_manager)

# 创建上传目录
Path(settings.UPLOAD_DIR).mkdir(parents=True, exist_ok=True)

# 注册路由
app.include_router(
    documents.router,
    prefix=settings.API_PREFIX,
    tags=["documents"]
)
app.include_router(
    chat.router,
    prefix=settings.API_PREFIX,
    tags=["chat"]
)

@app.get("/")
async def root():  # async def定义协程函数
    """根路径"""
    return {
        "message": "RAG知识库问答系统",
        "version": "1.0.0",
        "docs": "/docs"
    }

@app.get("/health")
async def health_check():
    """健康检查"""
    return {"status": "healthy"}

if __name__ == "__main__":
    uvicorn.run(
        "app.main:app",
        host=settings.API_HOST,
        port=settings.API_PORT,
        reload=True
    )

6. 文档管理 API (app/API/documents.py)

Python
"""
文档管理API
"""
from fastapi import APIRouter, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
from pathlib import Path
import shutil
import uuid

from app.config import settings
from app.document_processor import DocumentProcessor
from app.vector_store import VectorStoreManager

router = APIRouter()

# 初始化组件
document_processor = DocumentProcessor(
    chunk_size=settings.CHUNK_SIZE,
    chunk_overlap=settings.CHUNK_OVERLAP
)
vector_store_manager = VectorStoreManager(
    persist_directory=settings.CHROMA_PERSIST_DIR,
    collection_name=settings.CHROMA_COLLECTION_NAME,
    embedding_model=settings.EMBEDDING_MODEL
)

@router.post("/documents/upload")
async def upload_document(file: UploadFile = File(...)):
    """
    上传文档

    Args:
        file: 上传的文件

    Returns:
        上传结果
    """
    # 检查文件扩展名
    file_extension = Path(file.filename).suffix.lower()
    if file_extension not in settings.ALLOWED_EXTENSIONS:
        raise HTTPException(
            status_code=400,
            detail=f"不支持的文件格式: {file_extension}"
        )

    # 检查文件大小
    content = await file.read()  # await等待异步操作完成
    if len(content) > settings.MAX_FILE_SIZE:
        raise HTTPException(
            status_code=400,
            detail=f"文件大小超过限制: {settings.MAX_FILE_SIZE} bytes"
        )

    # 生成唯一文件名
    file_id = str(uuid.uuid4())
    file_path = Path(settings.UPLOAD_DIR) / f"{file_id}{file_extension}"

    # 保存文件
    with open(file_path, "wb") as f:
        f.write(content)

    try:  # try/except捕获异常,防止程序崩溃
        # 处理文档
        chunks = document_processor.process_document(str(file_path))

        # 准备元数据
        metadata = []
        for i, chunk in enumerate(chunks):
            metadata.append({
                "file_id": file_id,
                "file_name": file.filename,
                "chunk_index": i,
                "total_chunks": len(chunks)
            })

        # 添加到向量数据库
        ids = vector_store_manager.add_documents(chunks, metadata)

        return JSONResponse(
            status_code=200,
            content={
                "message": "文档上传成功",
                "file_id": file_id,
                "file_name": file.filename,
                "chunks_count": len(chunks),
                "document_ids": ids
            }
        )
    except Exception as e:
        # 删除文件
        if file_path.exists():
            file_path.unlink()
        raise HTTPException(status_code=500, detail=str(e))

@router.get("/documents/stats")
async def get_documents_stats():
    """
    获取文档统计信息

    Returns:
        统计信息
    """
    stats = vector_store_manager.get_collection_stats()
    return stats

@router.delete("/documents")
async def delete_all_documents():
    """
    删除所有文档

    Returns:
        删除结果
    """
    vector_store_manager.delete_collection()
    return {"message": "所有文档已删除"}

7. 聊天 API (app/API/chat.py)

Python
"""
聊天API
"""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

from app.config import settings
from app.rag_chain import RAGChain
from app.vector_store import VectorStoreManager

router = APIRouter()

# 初始化组件
vector_store_manager = VectorStoreManager(
    persist_directory=settings.CHROMA_PERSIST_DIR,
    collection_name=settings.CHROMA_COLLECTION_NAME,
    embedding_model=settings.EMBEDDING_MODEL
)
rag_chain = RAGChain(vector_store_manager)

class ChatRequest(BaseModel):  # Pydantic BaseModel:自动数据验证和序列化
    """聊天请求"""
    question: str
    use_score: bool = False

class ChatResponse(BaseModel):
    """聊天响应"""
    answer: str
    sources: list[dict]
    question: str
    confidence: float | None = None

@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """
    聊天接口

    Args:
        request: 聊天请求

    Returns:
        聊天响应
    """
    try:
        if request.use_score:
            result = rag_chain.query_with_score(request.question)
        else:
            result = rag_chain.query(request.question)

        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

8. Streamlit 前端 (frontend/app.py)

Python
"""
Streamlit前端应用
"""
import streamlit as st
import requests

# 配置页面
st.set_page_config(
    page_title="RAG知识库问答系统",
    page_icon="🤖",
    layout="wide"
)

# API配置
API_BASE_URL = "http://localhost:8000/api/v1"

def main():
    """主函数"""
    st.title("🤖 RAG知识库问答系统")

    # 侧边栏
    with st.sidebar:
        st.header("文档管理")

        # 上传文档
        uploaded_file = st.file_uploader(
            "上传文档",
            type=["pdf", "docx", "txt", "md"],
            help="支持PDF、Word、TXT、Markdown格式"
        )

        if uploaded_file:
            if st.button("上传"):
                with st.spinner("正在上传和处理文档..."):
                    try:
                        files = {"file": uploaded_file}
                        response = requests.post(
                            f"{API_BASE_URL}/documents/upload",
                            files=files
                        )

                        if response.status_code == 200:
                            st.success("文档上传成功!")
                            st.json(response.json())
                        else:
                            st.error(f"上传失败: {response.text}")
                    except Exception as e:
                        st.error(f"上传失败: {str(e)}")

        st.divider()

        # 文档统计
        st.subheader("文档统计")
        if st.button("刷新统计"):
            try:
                response = requests.get(f"{API_BASE_URL}/documents/stats")
                if response.status_code == 200:
                    stats = response.json()
                    st.json(stats)
            except Exception as e:
                st.error(f"获取统计失败: {str(e)}")

        st.divider()

        # 删除所有文档
        st.subheader("危险操作")
        confirm_delete = st.checkbox("我确认要删除所有文档(不可恢复)")
        if st.button("删除所有文档", type="primary", disabled=not confirm_delete):
            try:
                response = requests.delete(f"{API_BASE_URL}/documents")
                if response.status_code == 200:
                    st.success("所有文档已删除")
            except Exception as e:
                st.error(f"删除失败: {str(e)}")

    # 主区域 - 聊天界面
    st.header("智能问答")

    # 初始化聊天历史
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # 显示聊天历史
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])

    # 用户输入
    if prompt := st.chat_input("请输入您的问题..."):  # 海象运算符:=:赋值并判断——获取用户输入的同时检查是否非空
        # 显示用户消息
        st.session_state.messages.append({"role": "user", "content": prompt})
        with st.chat_message("user"):
            st.markdown(prompt)

        # 获取AI回复
        with st.chat_message("assistant"):
            with st.spinner("正在思考..."):
                try:
                    response = requests.post(
                        f"{API_BASE_URL}/chat",
                        json={"question": prompt, "use_score": True}
                    )

                    if response.status_code == 200:
                        result = response.json()

                        # 显示答案
                        st.markdown(result["answer"])

                        # 显示来源
                        if result["sources"]:
                            st.divider()
                            st.subheader("📚 参考来源")
                            for i, source in enumerate(result["sources"], 1):
                                with st.expander(f"来源 {i}"):
                                    st.write(f"**相似度**: {source.get('similarity_score', 'N/A'):.2%}")
                                    st.write(f"**文件名**: {source['metadata'].get('file_name', 'N/A')}")
                                    st.write(f"**内容片段**:")
                                    st.write(source["content"])

                        # 显示置信度
                        if "confidence" in result:
                            st.divider()
                            st.write(f"**置信度**: {result['confidence']:.2%}")

                        # 保存到历史
                        st.session_state.messages.append({
                            "role": "assistant",
                            "content": result["answer"]
                        })
                    else:
                        st.error(f"请求失败: {response.text}")
                except Exception as e:
                    st.error(f"请求失败: {str(e)}")

if __name__ == "__main__":
    main()

9. 依赖文件 (requirements.txt)

Text Only
fastapi==0.115.0
uvicorn[standard]==0.32.0
python-multipart==0.0.12
pydantic==2.10.0
pydantic-settings==2.6.0
langchain==0.3.7
openai==1.55.0
chromadb==0.5.20
pypdf2==3.0.1
python-docx==1.1.0
chardet==5.2.0
streamlit==1.40.0
requests==2.32.0
python-dotenv==1.0.1

10. 环境变量文件 (.env.example)

Text Only
# OpenAI API配置
OPENAI_API_KEY=your_openai_api_key_here

# 可选:使用其他LLM
# OPENAI_BASE_URL=https://yz520gzy.top/v1
# OPENAI_MODEL=deepseek-chat

🚀 部署说明

1. 本地部署

步骤 1: 克隆项目

Bash
git clone https://github.com/yourusername/rag-qa-system.git
cd rag-qa-system

步骤 2: 创建虚拟环境

Bash
# 创建虚拟环境
python -m venv venv

# 激活虚拟环境
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate

步骤 3: 安装依赖

Bash
pip install -r requirements.txt

步骤 4: 配置环境变量

Bash
# 复制环境变量模板
cp .env.example .env

# 编辑.env文件,填入你的API密钥
# OPENAI_API_KEY=your_api_key_here

步骤 5: 启动后端服务

Bash
# 启动FastAPI服务
python -m app.main

# 或使用uvicorn
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload

步骤 6: 启动前端服务

Bash
# 新开一个终端
cd frontend
streamlit run app.py

步骤 7: 访问应用

2. Docker 部署

步骤 1: 构建镜像

Bash
docker build -t rag-qa-system .

步骤 2: 运行容器

Bash
docker run -d \
  --name rag-qa \
  -p 8000:8000 \
  -p 8501:8501 \
  -e OPENAI_API_KEY=your_api_key_here \
  rag-qa-system

步骤 3: 使用 Docker Compose

Bash
docker-compose up -d

3. 云部署

部署到阿里云

  1. 购买 ECS 实例
  2. 安装 Docker
  3. 上传代码
  4. 运行 Docker 容器

部署到腾讯云

  1. 购买 CVM 实例
  2. 安装 Docker
  3. 上传代码
  4. 运行 Docker 容器

部署到 AWS

  1. 使用 EC2 服务
  2. 使用 Elastic Beanstalk
  3. 或使用 AWS App Runner

🔧 扩展方向

1. 功能扩展

  • 多模态支持: 支持图片、视频等多媒体文档
  • 多语言支持: 支持中英文等多语言问答
  • 用户管理: 添加用户认证和权限管理
  • 文档版本控制: 支持文档版本管理
  • 高级检索: 支持布尔查询、范围查询等

2. 性能优化

  • 缓存机制: 添加 Redis 缓存常用查询
  • 异步处理: 使用异步 IO 提高性能
  • 负载均衡: 使用 Nginx 做负载均衡
  • 数据库优化: 优化向量数据库配置

3. 用户体验

  • 实时反馈: 显示处理进度
  • 导出功能: 支持导出对话记录
  • 分享功能: 支持分享问答结果
  • 移动端适配: 优化移动端体验

4. 企业功能

  • API 限流: 添加 API 访问限流
  • 日志审计: 完整的操作日志
  • 监控告警: 系统监控和告警
  • 数据备份: 自动数据备份

📚 学习收获

完成本项目后,你将掌握:

  • RAG 技术原理: 理解检索增强生成的核心思想
  • 向量数据库: 掌握 ChromaDB 等向量数据库的使用
  • LangChain 框架: 熟练使用 LangChain 构建 LLM 应用
  • API 开发: 掌握 FastAPI 开发 RESTful API
  • 前端开发: 使用 Streamlit 快速构建 Web 应用
  • 文档处理: 掌握各种文档格式的处理方法
  • 文本嵌入: 理解文本嵌入和相似度计算
  • 系统设计: 学习完整的系统架构设计

🎉 开始学习

现在你已经了解了整个 RAG 系统的实现,开始动手构建你自己的知识库问答系统吧!

推荐学习顺序: 1. 先运行后端 API ,测试基本功能 2. 然后运行前端界面,体验完整流程 3. 修改代码,添加新功能 4. 部署到云服务器,分享给他人使用

祝你学习顺利! 💪