跳转至

08. 新一代 AI Agent :从工具使用到自主执行

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

📌 定位说明:本章侧重新一代 Agent 的前沿研究( Manus/Claude Code/Operator 等)。 Agent 编码实战(手写框架、多 Agent 系统开发)请参考 AI Agent 开发实战/

目录

  1. AI Agent 演进历程
  2. Manus :通用 AI Agent 的突破
  3. Claude Code 与 Claude Cowork
  4. 3.4 Claude Skills :可复用的 Agent 技能
  5. OpenAI Operator 与 Computer Use
  6. AI Agent 架构设计
  7. 多 Agent 协作系统
  8. Agent 安全与可控性
  9. 实践项目:构建自主研究 Agent

1. AI Agent 演进历程

1.1 从 LLM 到 Agent 的范式转变

AI Agent 代表了人工智能从"被动响应"到"主动执行"的根本性转变。

传统 LLM 的局限性:

Text Only
┌─────────────────────────────────────────────────────────┐
│                    传统LLM交互模式                        │
├─────────────────────────────────────────────────────────┤
│  用户输入 → LLM推理 → 文本输出                           │
│                                                         │
│  局限:                                                  │
│  • 无法执行实际操作                                       │
│  • 缺乏上下文记忆                                        │
│  • 不能访问外部系统                                      │
│  • 单次对话无状态                                        │
└─────────────────────────────────────────────────────────┘

AI Agent 的核心能力:

Text Only
┌─────────────────────────────────────────────────────────┐
│                    AI Agent架构                          │
├─────────────────────────────────────────────────────────┤
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐ │
│  │  感知层     │ →  │  推理层     │ →  │  执行层     │ │
│  │ (Perception)│    │ (Reasoning) │    │ (Action)    │ │
│  └─────────────┘    └─────────────┘    └─────────────┘ │
│         ↑                                    ↓         │
│         └──────────── 记忆层 ─────────────────┘         │
│                    (Memory)                             │
└─────────────────────────────────────────────────────────┘

1.2 Agent 能力分级

级别 名称 能力特征 代表系统
L1 简单工具调用 调用预定义 API GPT-4o-mini + Function Calling
L2 多步骤规划 分解任务并执行 AutoGPT, LangChain Agents
L3 环境感知 理解并操作数字环境 Claude Computer Use
L4 自主决策 动态目标调整与执行 Manus, Operator
L5 通用智能体 跨域自适应学习 未来目标

1.3 ReAct 与 Agent 核心范式

ReAct (Reasoning + Acting) 是 Agent 的基础架构:

Python
class ReActAgent:
    """
    ReAct: Synergizing Reasoning and Acting in Language Models

    核心思想:将推理(Reasoning)和行动(Acting)交织进行
    """
    def __init__(self, llm, tools, max_iterations=10):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
        self.max_iterations = max_iterations
        self.memory = []

    def run(self, query: str) -> str:
        """执行ReAct循环"""
        self.memory.append(f"Task: {query}")

        for i in range(self.max_iterations):
            # 1. 思考 (Thought)
            thought = self.think()

            # 2. 行动 (Action)
            action = self.act(thought)

            # 3. 观察 (Observation)
            observation = self.execute_action(action)

            # 4. 更新记忆
            self.memory.extend([
                f"Thought {i+1}: {thought}",
                f"Action {i+1}: {action}",
                f"Observation {i+1}: {observation}"
            ])

            # 5. 检查是否完成
            if self.is_complete(observation):
                return self.generate_answer()

        return "Max iterations reached"

    def think(self) -> str:
        """生成思考过程"""
        prompt = self._build_react_prompt()
        response = self.llm.generate(prompt)
        return self._parse_thought(response)

    def act(self, thought: str) -> Dict:
        """根据思考选择行动"""
        prompt = f"""Based on the thought: {thought}

Available tools: {list(self.tools.keys())}

Choose the next action in format:
Action: [tool_name]
Action Input: [input]"""

        response = self.llm.generate(prompt)
        return self._parse_action(response)

    def execute_action(self, action: Dict) -> str:
        """执行选定的工具"""
        tool_name = action['tool']
        tool_input = action['input']

        if tool_name not in self.tools:
            return f"Error: Tool {tool_name} not found"

        try:  # try/except捕获异常,防止程序崩溃
            result = self.tools[tool_name].run(tool_input)
            return str(result)
        except Exception as e:
            return f"Error: {str(e)}"

2. Manus :通用 AI Agent 的突破

2.1 Manus 概述

Manus(拉丁语"手"的意思)由中国团队 Monica.im 于 2025 年 3 月发布,是首个真正意义上能够自主执行复杂任务的通用 AI Agent 。

核心突破:

Text Only
┌────────────────────────────────────────────────────────────┐
│                     Manus核心特性                           │
├────────────────────────────────────────────────────────────┤
│                                                            │
│  1. 端到端任务执行                                          │
│     • 从需求理解到成果交付的完整闭环                          │
│     • 支持多步骤、跨平台复杂任务                              │
│                                                            │
│  2. 多Agent协作架构                                         │
│     • 规划Agent + 执行Agent + 验证Agent                      │
│     • 类似软件工程团队的协作模式                              │
│                                                            │
│  3. 虚拟机环境隔离                                          │
│     • 在安全的沙箱环境中执行操作                              │
│     • 支持代码执行、文件操作、网页浏览                         │
│                                                            │
│  4. 自适应学习                                              │
│     • 从执行历史中学习和优化                                  │
│     • 任务完成质量持续提升                                    │
│                                                            │
└────────────────────────────────────────────────────────────┘

2.2 Manus 架构解析

Python
class ManusAgent:
    """
    Manus通用AI Agent架构(基于公开信息推断)

    核心组件:
    1. 任务规划器 (Task Planner)
    2. 执行引擎 (Execution Engine)
    3. 工具集成层 (Tool Integration)
    4. 记忆系统 (Memory System)
    5. 验证器 (Validator)
    """

    def __init__(self):
        self.planner = TaskPlanner(llm="claude-3-5-sonnet")
        self.executor = ExecutionEngine()
        self.tool_registry = ToolRegistry()
        self.memory = HierarchicalMemory()
        self.validator = OutputValidator()

    def execute_task(self, user_request: str) -> TaskResult:
        """
        端到端任务执行流程

        Args:
            user_request: 用户的自然语言请求

        Returns:
            TaskResult: 包含执行结果、中间步骤、日志等
        """
        # Phase 1: 任务理解与规划
        task_plan = self.planner.create_plan(user_request)

        # Phase 2: 执行循环
        execution_log = []
        for step in task_plan.steps:
            # 执行步骤
            result = self._execute_step(step)
            execution_log.append(result)

            # 动态重规划(如果需要)
            if result.needs_replanning:
                task_plan = self.planner.replan(task_plan, execution_log)

            # 更新记忆
            self.memory.add_execution_step(step, result)

        # Phase 3: 结果验证与交付
        final_output = self._assemble_output(execution_log)
        validation = self.validator.validate(final_output, user_request)

        return TaskResult(
            output=final_output,
            execution_log=execution_log,
            validation_score=validation.score,
            artifacts=self._collect_artifacts()
        )

    def _execute_step(self, step: ExecutionStep) -> StepResult:
        """执行单个步骤"""
        if step.type == "web_browsing":
            return self._browse_web(step.params)
        elif step.type == "code_execution":
            return self._execute_code(step.params)
        elif step.type == "file_operation":
            return self._operate_file(step.params)
        elif step.type == "api_call":
            return self._call_api(step.params)
        elif step.type == "human_confirmation":
            return self._request_confirmation(step.params)
        else:
            raise ValueError(f"Unknown step type: {step.type}")

class TaskPlanner:
    """任务规划器:将复杂请求分解为可执行步骤"""

    def create_plan(self, request: str) -> TaskPlan:
        """
        创建任务执行计划

        使用Chain-of-Thought进行任务分解
        """
        planning_prompt = f"""You are an expert task planner. Break down the following request into clear, executable steps.

Request: {request}

For each step, specify:
1. Step type (web_browsing, code_execution, file_operation, api_call, human_confirmation)
2. Required parameters
3. Expected output
4. Dependencies on previous steps

Output the plan in structured JSON format."""

        plan_json = self.llm.generate(planning_prompt, format="json")
        return TaskPlan.from_json(plan_json)

    def replan(self, current_plan: TaskPlan, execution_log: List[StepResult]) -> TaskPlan:
        """根据执行反馈动态调整计划"""
        context = self._build_replanning_context(current_plan, execution_log)

        replan_prompt = f"""The current plan needs adjustment based on execution results.

Current Plan: {current_plan.to_json()}
Execution Log: {execution_log}

Please provide an updated plan that:
1. Addresses the issues encountered
2. Incorporates new information discovered
3. Optimizes remaining steps

Output the revised plan."""

        new_plan_json = self.llm.generate(replan_prompt, format="json")
        return TaskPlan.from_json(new_plan_json)

class HierarchicalMemory:
    """分层记忆系统"""

    def __init__(self):
        self.working_memory = []  # 当前任务上下文
        self.episodic_memory = []  # 历史任务经验
        self.semantic_memory = {}  # 知识库

    def add_execution_step(self, step: ExecutionStep, result: StepResult):
        """添加执行步骤到工作记忆"""
        self.working_memory.append({
            "step": step,
            "result": result,
            "timestamp": time.time()
        })

    def get_relevant_context(self, current_step: ExecutionStep) -> str:
        """检索与当前步骤相关的上下文"""
        # 检索工作记忆中的相关步骤
        relevant_working = self._retrieve_from_working_memory(current_step)

        # 检索历史经验中的相似案例
        similar_episodes = self._retrieve_from_episodic_memory(current_step)

        # 检索相关知识
        relevant_knowledge = self._retrieve_from_semantic_memory(current_step)

        return self._format_context(relevant_working, similar_episodes, relevant_knowledge)

2.3 Manus 应用场景

Python
# 示例:Manus执行复杂任务的流程

# 场景1:股票分析报告生成
def stock_analysis_example():
    """
    用户请求:"分析特斯拉股票过去一个月的表现,
              生成一份包含技术面和基本面的投资报告"
    """

    manus = ManusAgent()

    result = manus.execute_task("""
    分析特斯拉(TSLA)股票过去一个月的表现,包括:
    1. 股价走势和技术指标分析
    2. 近期新闻和事件影响
    3. 财务数据对比
    4. 生成PDF格式的投资分析报告
    """)

    # Manus的执行流程:
    # 1. 搜索特斯拉股价数据
    # 2. 计算技术指标(MA, RSI, MACD等)
    # 3. 搜索相关新闻
    # 4. 查询财务报表
    # 5. 编写Python代码生成可视化图表
    # 6. 撰写分析报告
    # 7. 转换为PDF格式
    # 8. 交付最终报告

    return result.artifacts['report.pdf']

# 场景2:网站搭建
def website_building_example():
    """
    用户请求:"为我创建一个个人博客网站"
    """

    manus = ManusAgent()

    result = manus.execute_task("""
    创建一个现代化的个人博客网站,要求:
    1. 响应式设计
    2. 支持Markdown文章
    3. 包含关于页面和联系表单
    4. 部署到Vercel
    """)

    # Manus的执行流程:
    # 1. 设计网站架构
    # 2. 创建Next.js项目
    # 3. 编写组件代码
    # 4. 配置样式和主题
    # 5. 实现Markdown渲染
    # 6. 添加联系表单功能
    # 7. 测试和调试
    # 8. 部署到Vercel
    # 9. 提供访问链接

    return result.artifacts['deployment_url']

# 场景3:数据处理流水线
def data_pipeline_example():
    """
    用户请求:"从多个数据源整合销售数据并生成仪表板"
    """

    manus = ManusAgent()

    result = manus.execute_task("""
    构建销售数据整合流水线:
    1. 从CSV文件读取线下销售数据
    2. 通过API获取线上销售数据
    3. 清洗和标准化数据格式
    4. 计算关键指标(销售额、增长率等)
    5. 创建交互式仪表板
    """)

    return result.artifacts['dashboard.html']

2.4 Manus 技术亮点

Text Only
┌─────────────────────────────────────────────────────────────┐
│                    Manus技术创新点                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 异步执行与并行处理                                       │
│     ┌─────────────────────────────────────────────────┐    │
│     │  Task A ──┐                                     │    │
│     │           ├──→ [并行执行引擎] → 结果合并          │    │
│     │  Task B ──┘                                     │    │
│     └─────────────────────────────────────────────────┘    │
│                                                             │
│  2. 智能错误恢复                                            │
│     • 自动检测执行失败                                      │
│     • 分析错误原因并选择恢复策略                             │
│     • 支持重试、替代方案、人工介入                           │
│                                                             │
│  3. 多模态输入处理                                          │
│     • 文本、图片、文档理解                                  │
│     • 从截图中提取信息                                      │
│     • 生成可视化输出                                        │
│                                                             │
│  4. 持续学习机制                                            │
│     • 记录成功执行模式                                      │
│     • 建立任务-策略映射                                     │
│     • 个性化执行风格                                        │
│                                                             │
└─────────────────────────────────────────────────────────────┘

3. Claude Code 与 Claude Cowork

3.1 Claude Code : AI 编程助手

Claude Code是 Anthropic 于 2024 年底发布的 AI 编程工具,代表了代码 Agent 的先进水平。

核心能力:

Python
class ClaudeCode:
    """
    Claude Code核心架构

    特点:
    1. 深度代码理解
    2. 安全的代码执行环境
    3. 版本控制集成
    4. 上下文感知编辑
    """

    def __init__(self, project_path: str):
        self.project = ProjectContext(project_path)
        self.code_index = CodeIndex(project_path)
        self.tool_executor = SecureToolExecutor()
        self.git_integration = GitIntegration(project_path)

    def process_request(self, user_input: str) -> CodeAction:
        """处理用户编程请求"""

        # 1. 理解意图
        intent = self._understand_intent(user_input)

        # 2. 收集上下文
        context = self._gather_context(intent)

        # 3. 规划行动
        plan = self._plan_actions(intent, context)

        # 4. 执行并验证
        results = []
        for action in plan:
            result = self._execute_action(action)
            results.append(result)

            # 实时反馈
            if action.requires_confirmation:
                self._show_diff_and_confirm(result)

        return self._summarize_results(results)

    def _gather_context(self, intent: Intent) -> Context:
        """智能上下文收集"""
        context = Context()

        # 检索相关文件
        if intent.target_files:
            for file_path in intent.target_files:
                context.add_file(self.project.read_file(file_path))
        else:
            # 语义搜索相关代码
            relevant_files = self.code_index.semantic_search(intent.description)
            for file in relevant_files[:5]:  # Top-5相关文件
                context.add_file(file)

        # 获取项目结构
        context.project_structure = self.project.get_structure()

        # 获取依赖信息
        context.dependencies = self.project.get_dependencies()

        # 获取Git状态
        context.git_status = self.git_integration.get_status()

        return context

    def _plan_actions(self, intent: Intent, context: Context) -> List[CodeAction]:
        """规划代码修改行动"""

        planning_prompt = f"""You are an expert software engineer.

Task: {intent.description}

Context:
{context.to_prompt()}

Plan the necessary code changes. For each change, specify:
1. File path
2. Change type (create, modify, delete, rename)
3. Detailed description of changes
4. Dependencies on other changes

Output as a structured action plan."""

        plan_response = self.llm.generate(planning_prompt, format="json")
        return self._parse_action_plan(plan_response)

class CodeIndex:
    """代码索引系统:支持语义搜索"""

    def __init__(self, project_path: str):
        self.project_path = project_path
        self.index = self._build_index()

    def _build_index(self) -> VectorStore:
        """构建代码向量索引"""
        code_chunks = []

        for file_path in self._get_code_files():
            chunks = self._parse_and_chunk(file_path)
            for chunk in chunks:
                embedding = self.embedder.encode(chunk.content)
                code_chunks.append({
                    "embedding": embedding,
                    "content": chunk.content,
                    "metadata": {
                        "file": file_path,
                        "line_start": chunk.line_start,
                        "line_end": chunk.line_end,
                        "type": chunk.type  # function, class, etc.
                    }
                })

        return VectorStore.from_documents(code_chunks)

    def semantic_search(self, query: str, top_k: int = 10) -> List[CodeChunk]:
        """语义搜索相关代码"""
        query_embedding = self.embedder.encode(query)
        results = self.index.similarity_search(query_embedding, k=top_k)
        return [CodeChunk.from_result(r) for r in results]

3.2 Claude Cowork :面向非技术用户的 Agent

Claude Cowork是 Anthropic 推出的通用 Agent 产品,面向更广泛的非技术用户群体。

Python
class ClaudeCowork:
    """
    Claude Cowork: 通用AI助手

    目标用户:非技术人员
    核心场景:日常办公、研究、内容创作
    """

    def __init__(self):
        self.browser = BrowserController()
        self.file_manager = FileManager()
        self.document_processor = DocumentProcessor()
        self.calendar = CalendarIntegration()
        self.email = EmailIntegration()

    def assist(self, request: UserRequest) -> AssistanceResult:
        """处理用户协助请求"""

        # 分类请求类型
        request_type = self._classify_request(request)

        if request_type == "research":
            return self._handle_research(request)
        elif request_type == "document_creation":
            return self._handle_document_creation(request)
        elif request_type == "data_analysis":
            return self._handle_data_analysis(request)
        elif request_type == "scheduling":
            return self._handle_scheduling(request)
        elif request_type == "communication":
            return self._handle_communication(request)
        else:
            return self._handle_general(request)

    def _handle_research(self, request: UserRequest) -> ResearchResult:
        """处理研究类请求"""

        # 1. 理解研究主题
        topic = request.extract_topic()

        # 2. 多源信息收集
        sources = []

        # 网页搜索
        search_results = self.browser.search(topic)
        for result in search_results[:10]:
            page_content = self.browser.read_page(result.url)
            sources.append({
                "url": result.url,
                "title": result.title,
                "content": page_content
            })

        # 3. 信息整合与分析
        analysis = self._synthesize_information(sources, topic)

        # 4. 生成报告
        report = self._generate_research_report(analysis)

        return ResearchResult(
            summary=analysis.summary,
            key_findings=analysis.findings,
            sources=sources,
            report=report
        )

    def _handle_document_creation(self, request: UserRequest) -> DocumentResult:
        """处理文档创作请求"""

        # 理解文档需求
        doc_spec = self._parse_document_requirements(request)

        # 收集参考材料
        references = []
        if request.has_attachments:
            for attachment in request.attachments:
                content = self.document_processor.read(attachment)
                references.append(content)

        # 生成文档大纲
        outline = self._generate_outline(doc_spec, references)

        # 逐段生成内容
        sections = []
        for section in outline.sections:
            content = self._write_section(section, references, sections)
            sections.append({
                "heading": section.heading,
                "content": content
            })

        # 格式化和导出
        document = self._format_document(sections, doc_spec.format)

        return DocumentResult(
            document=document,
            outline=outline,
            word_count=sum(len(s["content"]) for s in sections)
        )

3.3 代码 Agent 的核心挑战

Python
class CodeAgentChallenges:
    """代码Agent面临的关键挑战与解决方案"""

    @staticmethod  # @staticmethod无需实例即可调用
    def challenge_1_context_window():
        """
        挑战1: 大型代码库的上下文限制

        解决方案:
        1. 智能代码检索
        2. 分层摘要
        3. 增量索引
        """

        # 智能检索示例
        def intelligent_retrieval(project, query):
            # 第一层:文件级检索
            relevant_files = file_level_search(project, query)

            # 第二层:函数/类级检索
            code_chunks = []
            for file in relevant_files:
                chunks = chunk_code(file)
                scored_chunks = rank_by_relevance(chunks, query)
                code_chunks.extend(scored_chunks[:3])

            # 第三层:关系扩展
            extended_context = expand_by_relations(code_chunks)

            return extended_context

    @staticmethod
    def challenge_2_code_execution_safety():
        """
        挑战2: 代码执行安全性

        解决方案:
        1. 沙箱环境
        2. 资源限制
        3. 权限控制
        """

        class SecureExecutor:
            def __init__(self):
                self.sandbox = DockerSandbox()
                self.resource_limits = {
                    "cpu": "1 core",
                    "memory": "512MB",
                    "timeout": 30,
                    "network": False
                }

            def execute(self, code: str) -> ExecutionResult:
                # 代码静态分析
                if self._contains_dangerous_operations(code):
                    return ExecutionResult.error("Dangerous operations detected")

                # 在沙箱中执行
                return self.sandbox.run(code, limits=self.resource_limits)

    @staticmethod
    def challenge_3_long_horizon_planning():
        """
        挑战3: 长程规划与执行

        解决方案:
        1. 分层规划
        2. 里程碑检查
        3. 动态调整
        """

        class HierarchicalPlanner:
            def plan(self, goal: str) -> Plan:
                # 高层规划
                high_level = self._create_high_level_plan(goal)

                # 逐层细化
                detailed_plan = Plan()
                for phase in high_level.phases:
                    sub_tasks = self._decompose(phase)
                    detailed_plan.add_phase(phase, sub_tasks)

                return detailed_plan

            def execute_with_checkpoints(self, plan: Plan):
                for phase in plan.phases:
                    for task in phase.tasks:
                        result = self.execute_task(task)

                        # 检查点验证
                        if not self._validate_checkpoint(result):
                            self._handle_deviation(phase, result)

3.4 Claude Skills :可复用的 Agent 技能

Skills (技能) 是 Anthropic 于 2025 年推出的 Agent 能力复用机制。核心思想是:将 Agent 成功完成过的工作流程(包括步骤、工具调用序列、上下文模板)保存为可复用的"技能",后续遇到类似任务时一键触发,无需从零推理。

这解决了 Agent 的一个核心痛点:每次执行相似任务都要从头推理,既浪费 Token 又不稳定。

Text Only
传统Agent执行同类任务:

第1次: 用户请求 → Agent从头推理 → 多步试错 → 完成 (耗时, 不稳定)
第2次: 类似请求 → Agent从头推理 → 多步试错 → 完成 (重复劳动)
第3次: 类似请求 → Agent从头推理 → ...

引入Skills后:

第1次: 用户请求 → Agent推理 → 完成 → 💾 保存为Skill
第2次: 类似请求 → 匹配已有Skill → 直接执行 ✅ (快速, 稳定)
第3次: 类似请求 → 匹配已有Skill → 直接执行 ✅

Skill 的核心结构(概念实现):

Python
"""
Claude Skills 概念实现

Skill = 可复用的Agent工作流模板
每个Skill包含:触发条件、执行步骤、所需工具、上下文模板

⚠️ 注意:Skills 是 Anthropic 产品层面的特性,以下代码展示其核心设计理念,
非官方API。实际使用请参考 Anthropic 官方文档。
"""

from dataclasses import dataclass, field
from datetime import datetime
import json
from typing import Callable

@dataclass  # @dataclass自动生成__init__等方法
class Skill:
    """Agent技能定义"""
    name: str                         # 技能名称
    description: str                  # 技能描述(用于匹配)
    trigger_patterns: list[str]       # 触发条件(关键词/意图描述)
    steps: list[dict]                 # 执行步骤模板
    required_tools: list[str]         # 所需工具
    context_template: str             # 上下文模板(含占位符)
    created_at: str = field(default_factory=lambda: datetime.now().isoformat())
    success_count: int = 0            # 成功执行次数
    avg_tokens: int = 0               # 平均Token消耗

@dataclass
class SkillExecutionResult:
    """技能执行结果"""
    skill_name: str
    success: bool
    output: str
    tokens_used: int
    steps_completed: int

class SkillRegistry:
    """技能注册与管理中心"""

    def __init__(self):
        self.skills: dict[str, Skill] = {}

    def register(self, skill: Skill):
        """注册一个技能"""
        self.skills[skill.name] = skill
        print(f"💾 技能已注册: {skill.name}")

    def match(self, user_request: str) -> Skill | None:
        """根据用户请求匹配最合适的技能

        简化实现:关键词匹配。实际产品会使用embedding相似度。
        """
        best_match = None
        best_score = 0

        for skill in self.skills.values():
            score = sum(
                1 for pattern in skill.trigger_patterns
                if pattern.lower() in user_request.lower()
            )
            if score > best_score:
                best_score = score
                best_match = skill

        return best_match if best_score > 0 else None

    def save(self, filepath: str):
        """持久化保存所有技能"""
        data = {}
        for name, skill in self.skills.items():
            data[name] = {
                "name": skill.name,
                "description": skill.description,
                "trigger_patterns": skill.trigger_patterns,
                "steps": skill.steps,
                "required_tools": skill.required_tools,
                "context_template": skill.context_template,
                "success_count": skill.success_count,
            }
        with open(filepath, "w", encoding="utf-8") as f:  # with自动管理文件关闭
            json.dump(data, f, ensure_ascii=False, indent=2)

class SkillableAgent:
    """支持Skills的Agent"""

    def __init__(self, name: str, skill_registry: SkillRegistry):
        self.name = name
        self.skills = skill_registry

    def run(self, user_request: str) -> str:
        """执行请求:优先匹配已有技能,否则从头推理"""

        # 1. 尝试匹配已有技能
        matched_skill = self.skills.match(user_request)

        if matched_skill:
            print(f"⚡ 匹配到技能: {matched_skill.name} (已成功{matched_skill.success_count}次)")
            return self._execute_skill(matched_skill, user_request)

        # 2. 无匹配 → 从头推理
        print(f"🧠 无匹配技能,从头推理...")
        result = self._reason_from_scratch(user_request)

        # 3. 成功后,询问是否保存为技能(可自动化)
        print(f"💡 提示: 此工作流可保存为技能以供复用")
        return result

    def _execute_skill(self, skill: Skill, user_request: str) -> str:
        """按照技能模板执行"""
        output_parts = []

        for i, step in enumerate(skill.steps):  # enumerate同时获取索引和元素
            action = step["action"]
            # 将模板中的占位符替换为实际输入
            if "{user_input}" in action:
                action = action.replace("{user_input}", user_request)

            print(f"  Step {i+1}: {step.get('description', action)}")
            # 实际执行(简化:这里直接输出步骤描述)
            output_parts.append(f"[{step.get('description', '')}] 完成")

        skill.success_count += 1
        return "\n".join(output_parts)

    def _reason_from_scratch(self, user_request: str) -> str:
        """从头推理(常规Agent流程)"""
        return f"[从头推理完成] {user_request}"

    def learn_skill(self, name: str, description: str, trigger_patterns: list[str],
                    steps: list[dict], required_tools: list[str] = None):
        """从成功执行中学习一个新技能"""
        skill = Skill(
            name=name,
            description=description,
            trigger_patterns=trigger_patterns,
            steps=steps,
            required_tools=required_tools or [],
            context_template="",
        )
        self.skills.register(skill)

# === 使用示例 ===

registry = SkillRegistry()

# 预定义一些技能
registry.register(Skill(
    name="代码审查",
    description="对代码进行全面的安全性、性能、可读性审查",
    trigger_patterns=["代码审查", "review", "检查代码", "代码质量"],
    steps=[
        {"description": "读取目标代码文件", "action": "read_file({user_input})"},
        {"description": "检查安全漏洞", "action": "analyze_security"},
        {"description": "检查性能问题", "action": "analyze_performance"},
        {"description": "检查代码风格", "action": "check_style"},
        {"description": "生成审查报告", "action": "generate_report"},
    ],
    required_tools=["read_file", "code_analyzer", "report_generator"],
))

registry.register(Skill(
    name="周报生成",
    description="根据本周工作记录自动生成周报",
    trigger_patterns=["周报", "weekly report", "本周总结"],
    steps=[
        {"description": "收集本周Git提交记录", "action": "git_log_this_week"},
        {"description": "收集本周会议记录", "action": "get_meeting_notes"},
        {"description": "收集本周任务完成情况", "action": "get_task_status"},
        {"description": "汇总并生成周报", "action": "generate_weekly_report"},
    ],
    required_tools=["git", "calendar", "task_tracker", "document_writer"],
))

# 使用Agent
agent = SkillableAgent("Claude", registry)

# 匹配到已有技能 → 直接执行
print("--- 请求1: 有匹配技能 ---")
result = agent.run("请帮我做一下这个PR的代码审查")
print(f"结果: {result}\n")

# 无匹配技能 → 从头推理
print("--- 请求2: 无匹配技能 ---")
result = agent.run("帮我分析这个数据集的异常值")
print(f"结果: {result}\n")

# 从成功执行中学习新技能
agent.learn_skill(
    name="异常值分析",
    description="分析数据集中的异常值并生成报告",
    trigger_patterns=["异常值", "outlier", "数据异常"],
    steps=[
        {"description": "加载数据集", "action": "load_dataset({user_input})"},
        {"description": "统计描述分析", "action": "descriptive_stats"},
        {"description": "异常值检测(IQR+Z-score)", "action": "detect_outliers"},
        {"description": "可视化异常分布", "action": "plot_outliers"},
        {"description": "生成分析报告", "action": "generate_report"},
    ],
)

# 再次请求 → 匹配到刚学习的技能
print("--- 请求3: 匹配到新学习的技能 ---")
result = agent.run("这批数据有很多异常值,帮我分析一下")
print(f"结果: {result}")

Skills vs 传统 Prompt 的关键区别

维度 传统 Prompt Skills
执行方式 每次从头推理 匹配模板后按步骤执行
稳定性 每次可能不同 高度一致
速度 需要多轮推理 跳过推理直接执行
Token 消耗 低(减少推理开销)
可进化 依赖 Prompt 优化 从成功经验中自动学习
产品形态 Claude 对话 Claude Skills 面板

📌 Skills 的产品意义: Skills 让 Agent 从"每次都是新手"进化为"越用越熟练"。这与人类的技能习得过程类似——从刻意思考( System 2 )到自动化执行( System 1 )。 Claude 的 Skills 面板允许用户查看、编辑、分享技能,让 Agent 的能力成为可管理的资产。

📖 交叉引用: Agent 设计模式基础( Prompt Chaining/Routing 等)→ AI Agent 开发实战/01-Agent 基础与架构 §4; Agent 记忆系统如何持久化技能 → AI Agent 开发实战/12-Agent 记忆系统


4. OpenAI Operator 与 Computer Use

4.1 OpenAI Operator

Operator是 OpenAI 于 2025 年 1 月发布的 AI Agent ,能够像人类一样使用计算机界面。

Python
class OpenAIOperator:
    """
    OpenAI Operator架构

    核心能力:
    1. 视觉感知屏幕内容
    2. 理解GUI元素
    3. 执行鼠标/键盘操作
    4. 多步骤任务执行
    """

    def __init__(self):
        self.vision_model = GPT4Vision()
        self.action_model = ActionPredictionModel()
        self.browser = BrowserAutomation()
        self.os_interface = OSInterface()

    def execute_task(self, task_description: str) -> TaskResult:
        """
        执行需要操作计算机界面的任务

        示例任务:
        - "在Amazon上搜索蓝牙耳机并按评分排序"
        - "在Gmail中查找上周的会议邀请并添加到日历"
        """

        max_steps = 50
        step = 0
        state_history = []

        while step < max_steps:
            # 1. 截取当前屏幕
            screenshot = self.os_interface.capture_screen()

            # 2. 视觉理解
            ui_elements = self.vision_model.analyze_ui(screenshot)

            # 3. 决策下一步行动
            action = self.action_model.predict(
                task=task_description,
                current_state=screenshot,
                ui_elements=ui_elements,
                history=state_history
            )

            # 4. 执行行动
            if action.type == "click":
                self.os_interface.click(action.target)
            elif action.type == "type":
                self.os_interface.type(action.text)
            elif action.type == "scroll":
                self.os_interface.scroll(action.direction, action.amount)
            elif action.type == "key":
                self.os_interface.press_key(action.key)
            elif action.type == "complete":
                return TaskResult.success(action.result)

            # 5. 记录状态
            state_history.append({
                "step": step,
                "screenshot": screenshot,
                "action": action,
                "ui_elements": ui_elements
            })

            step += 1
            time.sleep(0.5)  # 等待界面响应

        return TaskResult.incomplete(state_history)

class ComputerUseAgent:
    """
    通用计算机使用Agent

    能够操作:
    - 浏览器
    - 桌面应用
    - 文件系统
    - 命令行
    """

    def __init__(self):
        self.tools = {
            "screenshot": ScreenshotTool(),
            "mouse": MouseControlTool(),
            "keyboard": KeyboardTool(),
            "shell": ShellTool(),
            "browser": BrowserTool()
        }

    def run(self, instruction: str) -> ExecutionTrace:
        """执行需要计算机操作的指令"""

        trace = ExecutionTrace()

        while not self._is_complete():
            # 感知环境
            observation = self._observe()
            trace.add_observation(observation)

            # 推理下一步
            thought = self._reason(instruction, trace)
            trace.add_thought(thought)

            # 选择行动
            action = self._select_action(thought)
            trace.add_action(action)

            # 执行
            result = self._execute(action)
            trace.add_result(result)

        return trace

    def _observe(self) -> Observation:
        """观察当前计算机状态"""
        screenshot = self.tools["screenshot"].capture()

        # 使用视觉模型分析
        analysis = self.vision_model.describe(
            screenshot,
            prompt="Describe the current computer state, including open applications,
                   visible UI elements, and any relevant text content."
        )

        return Observation(
            screenshot=screenshot,
            description=analysis.description,
            ui_elements=analysis.elements
        )

4.2 Computer Use 技术实现

Python
class ComputerUseImplementation:
    """Computer Use Agent的技术实现细节"""

    @staticmethod
    def visual_grounding():
        """
        视觉定位:将自然语言指令映射到屏幕坐标
        """

        class VisualGroundingModel:
            """
            基于视觉-语言模型的UI元素定位
            """

            def __init__(self):
                self.model = CLIP()  # 或类似的视觉-语言模型

            def locate_element(self, screenshot: Image, description: str) -> BoundingBox:
                """
                在截图中定位描述的元素

                Args:
                    screenshot: 屏幕截图
                    description: 元素描述,如"搜索按钮"

                Returns:
                    BoundingBox: 元素位置 (x1, y1, x2, y2)
                """
                # 方法1: 使用OCR检测可点击文本
                text_regions = self.ocr.detect(screenshot)

                # 方法2: 使用图标检测模型
                icon_regions = self.icon_detector.detect(screenshot)

                # 方法3: 使用视觉-语言匹配
                all_regions = text_regions + icon_regions

                # 计算与描述的相似度
                best_match = None
                best_score = 0

                for region in all_regions:
                    # 裁剪区域
                    crop = screenshot.crop(region.bbox)

                    # 计算视觉-文本相似度
                    score = self.model.similarity(crop, description)

                    if score > best_score:
                        best_score = score
                        best_match = region

                return best_match.bbox if best_match else None

    @staticmethod
    def action_space_design():
        """
        动作空间设计
        """

        # 定义Agent可以执行的动作
        ACTION_SPACE = {
            # 鼠标操作
            "mouse_move": {
                "description": "移动鼠标到指定坐标",
                "parameters": {"x": int, "y": int}
            },
            "mouse_click": {
                "description": "在指定位置点击",
                "parameters": {"x": int, "y": int, "button": ["left", "right"]}
            },
            "mouse_drag": {
                "description": "拖拽操作",
                "parameters": {"start": (int, int), "end": (int, int)}
            },
            "mouse_scroll": {
                "description": "滚动",
                "parameters": {"direction": ["up", "down"], "amount": int}
            },

            # 键盘操作
            "key_press": {
                "description": "按下按键",
                "parameters": {"key": str}
            },
            "type_text": {
                "description": "输入文本",
                "parameters": {"text": str}
            },
            "hotkey": {
                "description": "组合键",
                "parameters": {"keys": List[str]}
            },

            # 系统操作
            "screenshot": {
                "description": "截取屏幕",
                "parameters": {}
            },
            "wait": {
                "description": "等待界面响应",
                "parameters": {"seconds": float}
            },

            # 任务控制
            "complete": {
                "description": "标记任务完成",
                "parameters": {"result": str}
            },
            "fail": {
                "description": "标记任务失败",
                "parameters": {"reason": str}
            }
        }

        return ACTION_SPACE

    @staticmethod
    def safety_mechanisms():
        """
        Computer Use的安全机制
        """

        class SafetyGuardrails:
            def __init__(self):
                self.restricted_actions = [
                    "delete_system_files",
                    "modify_system_settings",
                    "access_sensitive_data"
                ]

                self.confirmation_required = [
                    "make_purchase",
                    "send_email",
                    "delete_files",
                    "modify_permissions"
                ]

            def check_action(self, action: Action) -> SafetyResult:
                """检查行动是否安全"""

                # 检查是否在禁止列表
                if action.type in self.restricted_actions:
                    return SafetyResult.reject("Action not allowed")

                # 检查是否需要确认
                if action.type in self.confirmation_required:
                    return SafetyResult.require_confirmation(
                        f"This action will {action.description}. Proceed?"
                    )

                # 检查异常模式
                if self._detect_suspicious_pattern(action):
                    return SafetyResult.require_review("Suspicious pattern detected")

                return SafetyResult.allow()

            def _detect_suspicious_pattern(self, action: Action) -> bool:
                """检测可疑操作模式"""
                # 实现各种安全检查逻辑
                pass

4.3 浏览器自动化

Python
class BrowserAutomation:
    """浏览器自动化实现"""

    def __init__(self):
        self.driver = PlaywrightDriver()
        self.page = None

    async def navigate(self, url: str):  # async def定义协程函数
        """导航到指定URL"""
        self.page = await self.driver.new_page()  # await等待异步操作完成
        await self.page.goto(url)

    async def search(self, query: str, engine: str = "google"):
        """在搜索引擎中搜索"""
        if engine == "google":
            await self.navigate("https://www.google.com")
            await self.page.fill('input[name="q"]', query)
            await self.page.press('input[name="q"]', "Enter")

        # 等待结果加载
        await self.page.wait_for_selector("#search")

    async def fill_form(self, form_data: Dict[str, str]):
        """填写表单"""
        for field, value in form_data.items():
            # 尝试多种选择器策略
            selectors = [
                f'input[name="{field}"]',
                f'input[id="{field}"]',
                f'input[placeholder*="{field}"]',
                f'textarea[name="{field}"]'
            ]

            for selector in selectors:
                try:
                    await self.page.fill(selector, value)
                    break
                except:
                    continue

    async def extract_data(self, extraction_rules: List[Rule]) -> List[Dict]:
        """根据规则提取网页数据"""
        results = []

        for rule in extraction_rules:
            elements = await self.page.query_selector_all(rule.selector)

            for element in elements:
                item = {}
                for field, field_rule in rule.fields.items():
                    value = await element.eval_on_selector(
                        field_rule.selector,
                        "el => el.textContent"
                    )
                    item[field] = value.strip() if value else None  # 链式调用:strip去除空白

                results.append(item)

        return results

    async def monitor_changes(self, selector: str, callback):
        """监控页面元素变化"""
        await self.page.evaluate(f"""
            new MutationObserver((mutations) => {{
                window.__pageChanges = window.__pageChanges || [];
                window.__pageChanges.push(mutations);
            }}).observe(
                document.querySelector('{selector}'),
                {{ childList: true, subtree: true }}
            );
        """)

5. AI Agent 架构设计

5.1 分层 Agent 架构

Text Only
┌─────────────────────────────────────────────────────────────────┐
│                      AI Agent分层架构                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    应用层 (Application)                  │   │
│  │  • 任务定义    • 用户交互    • 结果展示                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    规划层 (Planning)                     │   │
│  │  • 目标分解    • 策略选择    • 动态重规划                 │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    推理层 (Reasoning)                    │   │
│  │  • 逻辑推理    • 因果分析    • 假设验证                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    执行层 (Execution)                    │   │
│  │  • 工具调用    • API请求    • 代码执行                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    感知层 (Perception)                   │   │
│  │  • 文本理解    • 图像识别    • 语音处理                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    记忆层 (Memory)                       │   │
│  │  • 工作记忆    • 长期记忆    • 知识检索                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

5.2 核心组件实现

Python
class AgentCoreArchitecture:
    """Agent核心架构实现"""

    class PerceptionModule:
        """感知模块"""

        def __init__(self):
            self.text_processor = TextProcessor()
            self.vision_processor = VisionProcessor()
            self.audio_processor = AudioProcessor()

        def process(self, input_data: Union[str, Image, Audio]) -> Perception:
            """统一感知处理入口"""
            if isinstance(input_data, str):  # isinstance检查类型
                return self._process_text(input_data)
            elif isinstance(input_data, Image):
                return self._process_image(input_data)
            elif isinstance(input_data, Audio):
                return self._process_audio(input_data)
            else:
                raise ValueError(f"Unsupported input type: {type(input_data)}")

        def _process_text(self, text: str) -> TextPerception:
            """文本感知处理"""
            # 意图识别
            intent = self.text_processor.classify_intent(text)

            # 实体提取
            entities = self.text_processor.extract_entities(text)

            # 情感分析
            sentiment = self.text_processor.analyze_sentiment(text)

            return TextPerception(
                raw_text=text,
                intent=intent,
                entities=entities,
                sentiment=sentiment
            )

    class MemoryModule:
        """记忆模块"""

        def __init__(self, vector_store: VectorStore):
            self.working_memory = WorkingMemory(capacity=7)  # 米勒定律
            self.episodic_memory = EpisodicMemory(vector_store)
            self.semantic_memory = SemanticMemory(vector_store)
            self.procedural_memory = ProceduralMemory()

        def store(self, experience: Experience):
            """存储经验"""
            # 工作记忆
            self.working_memory.add(experience)

            # 情节记忆(长期)
            if experience.is_significant:
                self.episodic_memory.store(experience)

            # 提取知识到语义记忆
            knowledge = self._extract_knowledge(experience)
            self.semantic_memory.store(knowledge)

            # 更新程序性记忆(技能)
            if experience.type == "skill_execution":
                self.procedural_memory.update(experience)

        def retrieve(self, query: str, context: Context) -> RetrievedInfo:
            """检索相关信息"""
            # 多路检索
            working_results = self.working_memory.search(query)
            episodic_results = self.episodic_memory.search(query, top_k=5)
            semantic_results = self.semantic_memory.search(query, top_k=5)
            procedural_results = self.procedural_memory.match(context)

            # 融合排序
            fused = self._fusion_rank(
                working_results,
                episodic_results,
                semantic_results,
                procedural_results
            )

            return fused

    class PlanningModule:
        """规划模块"""

        def __init__(self, llm):
            self.llm = llm
            self.plan_library = PlanLibrary()

        def create_plan(self, goal: Goal, context: Context) -> Plan:
            """创建执行计划"""

            # 尝试从计划库匹配
            similar_plan = self.plan_library.find_similar(goal)
            if similar_plan and similar_plan.success_rate > 0.8:
                return self._adapt_plan(similar_plan, context)

            # 否则生成新计划
            return self._generate_plan(goal, context)

        def _generate_plan(self, goal: Goal, context: Context) -> Plan:
            """使用LLM生成计划"""

            prompt = f"""Create a step-by-step plan to achieve the following goal:

Goal: {goal.description}

Context:
{context.to_prompt()}

Available tools: {context.available_tools}

Generate a detailed plan with:
1. Sequential steps
2. Each step's expected outcome
3. Dependencies between steps
4. Potential failure points and alternatives

Output as structured JSON."""

            plan_json = self.llm.generate(prompt, format="json")
            return Plan.from_json(plan_json)

        def replan(self, current_plan: Plan, failure: Failure) -> Plan:
            """失败后的重规划"""

            prompt = f"""The current plan failed. Create a revised plan.

Original Plan: {current_plan.to_json()}
Failure: {failure.description}
Current State: {failure.state}

Provide a new plan that addresses the failure."""

            new_plan_json = self.llm.generate(prompt, format="json")
            return Plan.from_json(new_plan_json)

    class ToolModule:
        """工具模块"""

        def __init__(self):
            self.tools: Dict[str, Tool] = {}
            self.tool_descriptions = []

        def register(self, tool: Tool):
            """注册工具"""
            self.tools[tool.name] = tool
            self.tool_descriptions.append(tool.get_description())

        def select_and_execute(self, intent: Intent, context: Context) -> ToolResult:
            """选择并执行合适的工具"""

            # 工具选择
            tool_name = self._select_tool(intent, context)
            tool = self.tools[tool_name]

            # 参数提取
            parameters = self._extract_parameters(intent, tool)

            # 执行
            try:
                result = tool.execute(**parameters)  # **parameters将字典解包为关键字参数,实现动态参数传递
                return ToolResult.success(result)
            except Exception as e:
                return ToolResult.failure(str(e))

        def _select_tool(self, intent: Intent, context: Context) -> str:
            """基于意图选择工具"""

            prompt = f"""Given the user intent and available tools, select the most appropriate tool.

Intent: {intent.description}
Available Tools:
{self._format_tool_descriptions()}

Select the best tool and explain why."""

            selection = self.llm.generate(prompt)
            return self._parse_tool_selection(selection)

5.3 Agent 通信协议

Python
class AgentCommunicationProtocol:
    """多Agent系统的通信协议"""

    class Message:
        """Agent间消息格式"""

        def __init__(
            self,
            sender: str,
            receiver: str,
            message_type: str,
            content: Dict,
            conversation_id: str,
            timestamp: float = None
        ):
            self.sender = sender
            self.receiver = receiver
            self.message_type = message_type  # request, response, inform, delegate
            self.content = content
            self.conversation_id = conversation_id
            self.timestamp = timestamp or time.time()

    class CommunicationBus:
        """Agent通信总线"""

        def __init__(self):
            self.channels: Dict[str, asyncio.Queue] = {}
            self.subscribers: Dict[str, List[str]] = {}

        def register_agent(self, agent_id: str):
            """注册Agent到通信总线"""
            self.channels[agent_id] = asyncio.Queue()

        async def send(self, message: Message):
            """发送消息"""
            if message.receiver not in self.channels:
                raise ValueError(f"Unknown receiver: {message.receiver}")

            await self.channels[message.receiver].put(message)

        async def receive(self, agent_id: str, timeout: float = None) -> Message:
            """接收消息"""
            if agent_id not in self.channels:
                raise ValueError(f"Unknown agent: {agent_id}")

            try:
                return await asyncio.wait_for(
                    self.channels[agent_id].get(),
                    timeout=timeout
                )
            except asyncio.TimeoutError:
                return None

        def subscribe(self, agent_id: str, topic: str):
            """订阅主题"""
            if topic not in self.subscribers:
                self.subscribers[topic] = []
            self.subscribers[topic].append(agent_id)

        async def publish(self, topic: str, message: Message):
            """发布到主题"""
            if topic in self.subscribers:
                for subscriber in self.subscribers[topic]:
                    await self.send(message.copy(receiver=subscriber))

6. 多 Agent 协作系统

6.1 协作模式

Text Only
┌─────────────────────────────────────────────────────────────────┐
│                     多Agent协作模式                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. 层级式协作 (Hierarchical)                                    │
│                                                                 │
│         ┌───────────┐                                           │
│         │  Manager  │                                           │
│         └─────┬─────┘                                           │
│       ┌───────┼───────┐                                         │
│       ↓       ↓       ↓                                         │
│    ┌────┐  ┌────┐  ┌────┐                                       │
│    │ A1 │  │ A2 │  │ A3 │                                       │
│    └────┘  └────┘  └────┘                                       │
│                                                                 │
│  2. 对等协作 (Peer-to-Peer)                                      │
│                                                                 │
│         ┌─────┐                                                 │
│         │  A1 │←────────→┌─────┐                                │
│         └──┬──┘          │  A2 │                                │
│            ↓             └──┬──┘                                │
│         ┌─────┐             ↓                                   │
│         │  A3 │←────────→┌─────┐                                │
│         └─────┘          │  A4 │                                │
│                          └─────┘                                │
│                                                                 │
│  3. 市场式协作 (Market-based)                                    │
│                                                                 │
│     ┌─────────────────────────────────────┐                    │
│     │           任务拍卖市场               │                    │
│     │  Task X ──→ [Bid: A1:$5, A2:$8] ──→ A1 wins              │
│     │  Task Y ──→ [Bid: A2:$3, A3:$7] ──→ A2 wins              │
│     └─────────────────────────────────────┘                    │
│                                                                 │
│  4. 流水线协作 (Pipeline)                                        │
│                                                                 │
│    Input → [A1:提取] → [A2:分析] → [A3:生成] → Output            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

6.2 多 Agent 系统实现

Python
class MultiAgentSystem:
    """多Agent协作系统"""

    def __init__(self):
        self.agents: Dict[str, Agent] = {}
        self.coordinator = Coordinator()
        self.communication_bus = CommunicationBus()

    def register_agent(self, agent: Agent):
        """注册Agent"""
        self.agents[agent.id] = agent
        self.communication_bus.register_agent(agent.id)
        agent.set_communication_bus(self.communication_bus)

    async def execute_collaborative_task(
        self,
        task: ComplexTask,
        collaboration_mode: str = "hierarchical"
    ) -> CollaborativeResult:
        """执行协作任务"""

        if collaboration_mode == "hierarchical":
            return await self._hierarchical_execution(task)
        elif collaboration_mode == "peer":
            return await self._peer_execution(task)
        elif collaboration_mode == "market":
            return await self._market_execution(task)
        else:
            raise ValueError(f"Unknown collaboration mode: {collaboration_mode}")

    async def _hierarchical_execution(self, task: ComplexTask) -> CollaborativeResult:
        """层级式协作执行"""

        # 1. 选择Manager Agent
        manager = self.coordinator.select_manager(task)

        # 2. Manager分解任务
        subtasks = manager.decompose_task(task)

        # 3. 分配子任务
        assignments = self.coordinator.assign_subtasks(subtasks, self.agents)

        # 4. 并行执行
        results = await asyncio.gather(*[  # 并发执行多个协程任务
            self.agents[agent_id].execute(subtask)
            for subtask, agent_id in assignments.items()
        ])

        # 5. Manager整合结果
        final_result = manager.integrate_results(results)

        return CollaborativeResult(
            output=final_result,
            execution_trace=results,
            coordination_log=self.coordinator.get_log()
        )

    async def _market_execution(self, task: ComplexTask) -> CollaborativeResult:
        """市场式协作执行(基于拍卖)"""

        # 1. 分解任务
        subtasks = self.coordinator.decompose(task)

        # 2. 拍卖分配
        assignments = {}
        for subtask in subtasks:
            # 收集投标
            bids = []
            for agent_id, agent in self.agents.items():
                if agent.can_handle(subtask):
                    cost = agent.estimate_cost(subtask)
                    bids.append((agent_id, cost))

            # 选择最低出价
            if bids:
                winner = min(bids, key=lambda x: x[1])  # lambda匿名函数
                assignments[subtask.id] = winner[0]

        # 3. 执行
        results = await self._execute_assignments(assignments)

        return CollaborativeResult(
            output=self._aggregate_results(results),
            assignments=assignments,
            bids=bids
        )

class Coordinator:
    """协调器:负责任务分配和冲突解决"""

    def __init__(self):
        self.task_queue = PriorityQueue()
        self.agent_status = {}

    def decompose_task(self, task: ComplexTask) -> List[SubTask]:
        """将复杂任务分解为子任务"""

        prompt = f"""Decompose the following complex task into manageable subtasks:

Task: {task.description}
Requirements: {task.requirements}

Provide a list of subtasks with:
1. Subtask description
2. Dependencies on other subtasks
3. Estimated complexity
4. Required capabilities

Output as JSON."""

        decomposition = self.llm.generate(prompt, format="json")
        return [SubTask.from_dict(s) for s in decomposition["subtasks"]]

    def assign_subtasks(
        self,
        subtasks: List[SubTask],
        agents: Dict[str, Agent]
    ) -> Dict[str, str]:
        """将子任务分配给合适的Agent"""

        assignments = {}

        for subtask in subtasks:
            # 计算每个Agent的匹配分数
            scores = {}
            for agent_id, agent in agents.items():
                if agent.is_available():
                    score = self._compute_match_score(agent, subtask)
                    scores[agent_id] = score

            # 选择最佳匹配
            if scores:
                best_agent = max(scores, key=scores.get)
                assignments[subtask.id] = best_agent

        return assignments

    def _compute_match_score(self, agent: Agent, subtask: SubTask) -> float:
        """计算Agent与子任务的匹配分数"""

        score = 0.0

        # 能力匹配
        capability_match = len(
            set(agent.capabilities) & set(subtask.required_capabilities)
        ) / len(subtask.required_capabilities)
        score += capability_match * 0.4

        # 历史表现
        if subtask.type in agent.performance_history:
            score += agent.performance_history[subtask.type] * 0.3

        # 当前负载
        load_factor = 1.0 - (agent.current_load / agent.max_capacity)
        score += load_factor * 0.2

        # 通信开销
        if subtask.dependencies:
            # 优先分配给协作Agent
            score += 0.1

        return score

    def resolve_conflict(self, conflict: Conflict) -> Resolution:
        """解决Agent间冲突"""

        if conflict.type == "resource_contention":
            # 资源竞争:优先级或时间片分配
            return self._resolve_resource_conflict(conflict)

        elif conflict.type == "goal_conflict":
            # 目标冲突:协商或上级裁决
            return self._resolve_goal_conflict(conflict)

        elif conflict.type == "communication_failure":
            # 通信失败:重试或替代路由
            return self._resolve_communication_failure(conflict)

        else:
            raise ValueError(f"Unknown conflict type: {conflict.type}")

6.3 Agent 团队示例

Python
class ResearchTeam:
    """研究团队:多Agent协作示例"""

    def __init__(self):
        self.system = MultiAgentSystem()

        # 创建专业Agent
        self.planner = PlannerAgent("planner")
        self.researcher = ResearchAgent("researcher")
        self.analyst = AnalystAgent("analyst")
        self.writer = WriterAgent("writer")
        self.reviewer = ReviewerAgent("reviewer")

        # 注册到系统
        for agent in [self.planner, self.researcher, self.analyst, self.writer, self.reviewer]:
            self.system.register_agent(agent)

    async def conduct_research(self, topic: str) -> ResearchReport:
        """执行协作研究"""

        # Phase 1: 规划
        plan = await self.planner.create_research_plan(topic)

        # Phase 2: 信息收集(并行)
        search_tasks = [
            self.researcher.search_academic(topic),
            self.researcher.search_news(topic),
            self.researcher.search_web(topic)
        ]
        search_results = await asyncio.gather(*search_tasks)

        # Phase 3: 分析
        analysis = await self.analyst.analyze(search_results)

        # Phase 4: 撰写
        draft = await self.writer.write_report(analysis)

        # Phase 5: 审阅和修订
        review = await self.reviewer.review(draft)
        final_report = await self.writer.revise(draft, review)

        return final_report

class PlannerAgent(Agent):
    """规划Agent"""

    async def create_research_plan(self, topic: str) -> ResearchPlan:
        """创建研究计划"""

        # 理解研究范围
        scope = self._determine_scope(topic)

        # 识别关键问题
        key_questions = self._identify_questions(topic, scope)

        # 规划研究步骤
        steps = []
        for question in key_questions:
            steps.append(ResearchStep(
                question=question,
                sources=self._identify_sources(question),
                methods=self._select_methods(question)
            ))

        return ResearchPlan(
            topic=topic,
            scope=scope,
            steps=steps,
            timeline=self._estimate_timeline(steps)
        )

class ResearchAgent(Agent):
    """研究Agent"""

    async def search_academic(self, topic: str) -> List[Paper]:
        """搜索学术论文"""
        # 使用Google Scholar API、arXiv API等
        pass

    async def search_news(self, topic: str) -> List[NewsArticle]:
        """搜索新闻"""
        # 使用News API
        pass

    async def search_web(self, topic: str) -> List[WebPage]:
        """搜索网页"""
        # 使用搜索引擎API
        pass

class AnalystAgent(Agent):
    """分析Agent"""

    async def analyze(self, sources: List[Source]) -> Analysis:
        """分析收集的信息"""

        # 信息整合
        integrated = self._integrate_information(sources)

        # 模式识别
        patterns = self._identify_patterns(integrated)

        # 趋势分析
        trends = self._analyze_trends(integrated)

        # 差距识别
        gaps = self._identify_gaps(integrated)

        return Analysis(
            summary=integrated,
            patterns=patterns,
            trends=trends,
            gaps=gaps
        )

7. Agent 安全与可控性

7.1 安全架构

Text Only
┌─────────────────────────────────────────────────────────────────┐
│                     Agent安全架构                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    应用层安全                            │   │
│  │  • 输入验证    • 输出过滤    • 敏感信息检测               │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    行为层安全                            │   │
│  │  • 动作审查    • 权限控制    • 异常检测                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    执行层安全                            │   │
│  │  • 沙箱隔离    • 资源限制    • 审计日志                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    基础设施安全                          │   │
│  │  • 网络安全    • 数据加密    • 访问控制                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

7.2 安全机制实现

Python
class AgentSafetyFramework:
    """Agent安全框架"""

    class InputGuardrails:
        """输入安全护栏"""

        def __init__(self):
            self.forbidden_patterns = [
                r"ignore previous instructions",
                r"disregard safety",
                r"bypass security",
                r"execute.*system.*command",
            ]
            self.sensitive_keywords = [
                "password", "secret", "key", "token", "credential"
            ]

        def validate(self, user_input: str) -> ValidationResult:
            """验证用户输入"""

            # 检查注入攻击
            for pattern in self.forbidden_patterns:
                if re.search(pattern, user_input, re.IGNORECASE):  # re.search正则表达式搜索匹配
                    return ValidationResult.reject(
                        "Potential prompt injection detected"
                    )

            # 检查敏感信息泄露请求
            if self._is_sensitive_info_request(user_input):
                return ValidationResult.require_confirmation(
                    "This request may involve sensitive information"
                )

            # 内容安全检查
            safety_score = self._check_content_safety(user_input)
            if safety_score < 0.5:
                return ValidationResult.reject("Content violates safety policy")

            return ValidationResult.allow()

        def _is_sensitive_info_request(self, text: str) -> bool:
            """检测是否为敏感信息请求"""
            text_lower = text.lower()
            return any(keyword in text_lower for keyword in self.sensitive_keywords)  # any()任一为True则返回True

    class ActionGuardrails:
        """行动安全护栏"""

        def __init__(self):
            self.risk_levels = {
                "read_file": "low",
                "write_file": "medium",
                "delete_file": "high",
                "execute_code": "high",
                "network_request": "medium",
                "database_query": "medium",
                "send_email": "high",
                "make_payment": "critical"
            }

        def evaluate(self, action: Action) -> SafetyDecision:
            """评估行动安全性"""

            risk_level = self.risk_levels.get(action.type, "unknown")

            if risk_level == "critical":
                return SafetyDecision.require_human_approval(
                    f"Critical action '{action.type}' requires explicit approval"
                )

            elif risk_level == "high":
                # 检查是否有异常模式
                if self._detect_anomaly(action):
                    return SafetyDecision.require_confirmation()

                # 检查是否超出正常范围
                if not self._is_within_normal_scope(action):
                    return SafetyDecision.require_confirmation()

            elif risk_level == "medium":
                # 记录但允许
                self._log_action(action)

            return SafetyDecision.allow()

        def _detect_anomaly(self, action: Action) -> bool:
            """检测异常行动模式"""
            # 实现异常检测逻辑
            # 例如:短时间内大量删除操作
            pass

    class OutputGuardrails:
        """输出安全护栏"""

        def filter(self, output: str, context: Context) -> FilteredOutput:
            """过滤不安全输出"""

            # 检查敏感信息泄露
            if self._contains_sensitive_data(output):
                output = self._redact_sensitive_data(output)

            # 内容安全过滤
            if self._contains_harmful_content(output):
                return FilteredOutput.block("Harmful content detected")

            # 事实准确性检查(对于关键信息)
            if context.requires_fact_checking:
                accuracy = self._check_factual_accuracy(output)
                if accuracy < 0.8:
                    output = self._add_disclaimer(output)

            return FilteredOutput.allow(output)

    class SandboxedExecution:
        """沙箱执行环境"""

        def __init__(self):
            self.docker_client = docker.from_env()

        def execute(self, code: str, timeout: int = 30) -> ExecutionResult:
            """在沙箱中执行代码"""

            # 创建临时容器
            container = self.docker_client.containers.run(
                "python:3.9-slim",
                command=f"python -c '{code}'",
                detach=True,
                mem_limit="512m",
                cpu_quota=100000,  # 1 CPU
                network_mode="none",  # 禁用网络
                read_only=True,  # 只读文件系统
                security_opt=["no-new-privileges"]
            )

            try:
                result = container.wait(timeout=timeout)
                logs = container.logs().decode("utf-8")

                return ExecutionResult(
                    success=result["StatusCode"] == 0,
                    output=logs,
                    exit_code=result["StatusCode"]
                )
            finally:
                container.remove(force=True)

class HumanInTheLoop:
    """人在回路机制"""

    def __init__(self, approval_modes: Dict[str, str]):
        """
        Args:
            approval_modes: 不同风险级别的审批模式
                - "auto": 自动执行
                - "confirm": 需要确认
                - "approve": 需要明确批准
        """
        self.approval_modes = approval_modes

    async def request_approval(
        self,
        action: Action,
        context: Context
    ) -> ApprovalResult:
        """请求人类批准"""

        risk_level = self._assess_risk(action)
        mode = self.approval_modes.get(risk_level, "approve")

        if mode == "auto":
            return ApprovalResult.approved()

        elif mode == "confirm":
            # 发送通知,等待确认
            notification = self._create_notification(action, context)
            response = await self._send_and_wait(notification, timeout=60)

            if response and response.confirmed:
                return ApprovalResult.approved()
            else:
                return ApprovalResult.denied("Not confirmed")

        elif mode == "approve":
            # 需要显式批准
            request = self._create_approval_request(action, context)
            response = await self._send_and_wait(request, timeout=300)

            if response and response.approved:
                return ApprovalResult.approved(response.conditions)
            else:
                return ApprovalResult.denied(response.reason if response else "Timeout")

7.3 可解释性与审计

Python
class AgentExplainability:
    """Agent可解释性框架"""

    class DecisionTracer:
        """决策追踪器"""

        def __init__(self):
            self.trace = DecisionTrace()

        def log_decision(
            self,
            decision_point: str,
            context: Dict,
            reasoning: str,
            decision: str,
            alternatives: List[str]
        ):
            """记录决策过程"""

            self.trace.add_step(DecisionStep(
                timestamp=time.time(),
                decision_point=decision_point,
                context=context,
                reasoning=reasoning,
                decision=decision,
                alternatives_considered=alternatives,
                confidence=self._calculate_confidence(reasoning)
            ))

        def generate_explanation(self, detail_level: str = "summary") -> str:
            """生成决策解释"""

            if detail_level == "summary":
                return self._generate_summary()
            elif detail_level == "detailed":
                return self._generate_detailed_explanation()
            elif detail_level == "technical":
                return self._generate_technical_explanation()
            else:
                raise ValueError(f"Unknown detail level: {detail_level}")

    class AuditLogger:
        """审计日志系统"""

        def __init__(self, storage_backend):
            self.storage = storage_backend

        def log_interaction(self, interaction: AgentInteraction):
            """记录Agent交互"""

            audit_record = {
                "timestamp": interaction.timestamp,
                "session_id": interaction.session_id,
                "user_id": interaction.user_id,
                "input": self._sanitize(interaction.input),
                "actions_taken": [
                    {
                        "action": action.type,
                        "parameters": self._sanitize(action.parameters),
                        "result": action.result,
                        "timestamp": action.timestamp
                    }
                    for action in interaction.actions
                ],
                "output": self._sanitize(interaction.output),
                "safety_checks": [
                    {
                        "check_type": check.type,
                        "result": check.result,
                        "timestamp": check.timestamp
                    }
                    for check in interaction.safety_checks
                ]
            }

            self.storage.store(audit_record)

        def query_history(
            self,
            user_id: str = None,
            time_range: Tuple[datetime, datetime] = None,
            action_type: str = None
        ) -> List[AuditRecord]:
            """查询历史记录"""

            filters = {}
            if user_id:
                filters["user_id"] = user_id
            if time_range:
                filters["timestamp"] = {"$gte": time_range[0], "$lte": time_range[1]}
            if action_type:
                filters["actions_taken.action"] = action_type

            return self.storage.query(filters)

8. 实践项目:构建自主研究 Agent

8.1 项目概述

构建一个能够自主完成研究任务的 AI Agent ,具备以下能力: 1. 理解研究主题 2. 搜索和收集信息 3. 分析和综合信息 4. 生成研究报告

8.2 完整实现

Python
# autonomous_research_agent.py

import asyncio  # Python标准异步库
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class ResearchConfig:
    """研究Agent配置"""
    max_search_results: int = 10
    max_analysis_depth: int = 3
    output_format: str = "markdown"
    include_citations: bool = True
    fact_check: bool = True

class AutonomousResearchAgent:
    """
    自主研究Agent

    功能:
    1. 主题分析与问题生成
    2. 多源信息检索
    3. 信息可信度评估
    4. 综合分析与报告生成
    """

    def __init__(self, llm_client, search_client, config: ResearchConfig = None):
        self.llm = llm_client
        self.search = search_client
        self.config = config or ResearchConfig()

        # 子模块
        self.topic_analyzer = TopicAnalyzer(llm_client)
        self.information_retriever = InformationRetriever(search_client)
        self.credibility_assessor = CredibilityAssessor()
        self.synthesizer = InformationSynthesizer(llm_client)
        self.report_generator = ReportGenerator(llm_client)

    async def conduct_research(self, topic: str) -> ResearchReport:
        """
        执行完整的研究流程

        Args:
            topic: 研究主题

        Returns:
            ResearchReport: 研究报告
        """
        print(f"🔬 开始研究主题: {topic}")

        # Phase 1: 主题分析
        print("📊 Phase 1: 分析研究主题...")
        research_questions = await self.topic_analyzer.analyze(topic)
        print(f"   生成 {len(research_questions)} 个研究问题")

        # Phase 2: 信息检索
        print("🔍 Phase 2: 检索相关信息...")
        all_sources = []
        for question in research_questions:
            sources = await self.information_retriever.retrieve(
                question,
                max_results=self.config.max_search_results
            )
            all_sources.extend(sources)
        print(f"   收集到 {len(all_sources)} 个信息源")

        # Phase 3: 可信度评估
        print("✅ Phase 3: 评估信息可信度...")
        assessed_sources = []
        for source in all_sources:
            assessment = self.credibility_assessor.assess(source)
            if assessment.score > 0.6:  # 过滤低可信度来源
                assessed_sources.append((source, assessment))
        assessed_sources.sort(key=lambda x: x[1].score, reverse=True)
        print(f"   保留 {len(assessed_sources)} 个高可信度来源")

        # Phase 4: 信息综合
        print("🧩 Phase 4: 综合信息...")
        synthesis = await self.synthesizer.synthesize(
            topic,
            assessed_sources,
            depth=self.config.max_analysis_depth
        )
        print("   信息综合完成")

        # Phase 5: 生成报告
        print("📝 Phase 5: 生成研究报告...")
        report = await self.report_generator.generate(
            topic=topic,
            synthesis=synthesis,
            sources=assessed_sources,
            config=self.config
        )
        print("✨ 研究完成!")

        return report

class TopicAnalyzer:
    """主题分析器"""

    def __init__(self, llm):
        self.llm = llm

    async def analyze(self, topic: str) -> List[ResearchQuestion]:
        """分析主题并生成研究问题"""

        prompt = f"""Analyze the following research topic and generate specific research questions.

Topic: {topic}

Generate 5-7 specific research questions that:
1. Cover different aspects of the topic
2. Are answerable through research
3. Build upon each other logically
4. Range from factual to analytical

Output as JSON list with fields: question, type (factual/analytical/comparative), priority (1-5)"""

        response = await self.llm.generate(prompt, format="json")
        questions = json.loads(response)  # json.loads将JSON字符串→Python对象

        return [ResearchQuestion(**q) for q in questions]  # **q将字典解包为关键字参数构造dataclass

class InformationRetriever:
    """信息检索器"""

    def __init__(self, search_client):
        self.search = search_client

    async def retrieve(
        self,
        query: ResearchQuestion,
        max_results: int = 10
    ) -> List[InformationSource]:
        """检索相关信息"""

        sources = []

        # 网页搜索
        web_results = await self.search.web_search(
            query.question,
            num_results=max_results // 2
        )
        for result in web_results:
            content = await self._fetch_content(result.url)
            sources.append(InformationSource(
                type="web",
                title=result.title,
                url=result.url,
                content=content,
                timestamp=datetime.now()
            ))

        # 学术搜索
        academic_results = await self.search.academic_search(
            query.question,
            num_results=max_results // 2
        )
        for result in academic_results:
            sources.append(InformationSource(
                type="academic",
                title=result.title,
                url=result.url,
                content=result.abstract,
                authors=result.authors,
                publication_date=result.date,
                timestamp=datetime.now()
            ))

        return sources

    async def _fetch_content(self, url: str) -> str:
        """获取网页内容"""
        # 实现网页内容抓取
        pass

class CredibilityAssessor:
    """可信度评估器"""

    def assess(self, source: InformationSource) -> CredibilityAssessment:
        """评估信息源可信度"""

        scores = {
            "domain_authority": self._assess_domain(source),
            "content_quality": self._assess_content(source),
            "recency": self._assess_recency(source),
            "citations": self._assess_citations(source)
        }

        # 加权平均
        weights = {
            "domain_authority": 0.3,
            "content_quality": 0.3,
            "recency": 0.2,
            "citations": 0.2
        }

        overall_score = sum(scores[k] * weights[k] for k in scores)

        return CredibilityAssessment(
            score=overall_score,
            breakdown=scores,
            flags=self._identify_red_flags(source)
        )

    def _assess_domain(self, source: InformationSource) -> float:
        """评估域名权威性"""
        trusted_domains = {
            ".edu": 0.9,
            ".gov": 0.9,
            "wikipedia.org": 0.7,
            "arxiv.org": 0.85,
            "ieee.org": 0.85
        }

        for domain, score in trusted_domains.items():
            if domain in source.url:
                return score

        return 0.5  # 默认分数

    def _assess_content(self, source: InformationSource) -> float:
        """评估内容质量"""
        # 基于内容长度、结构、语言质量等评估
        content = source.content

        score = 0.5

        # 长度检查
        if len(content) > 1000:
            score += 0.1

        # 结构化检查
        if any(marker in content for marker in ["##", "###", "Introduction", "Conclusion"]):
            score += 0.1

        # 引用检查
        if "http" in content or "Source:" in content:
            score += 0.1

        return min(score, 1.0)

    def _assess_recency(self, source: InformationSource) -> float:
        """评估时效性"""
        if not source.publication_date:
            return 0.5

        age_days = (datetime.now() - source.publication_date).days

        if age_days < 30:
            return 1.0
        elif age_days < 365:
            return 0.8
        elif age_days < 365 * 3:
            return 0.6
        else:
            return 0.4

    def _assess_citations(self, source: InformationSource) -> float:
        """评估引用情况"""
        if source.type == "academic":
            return 0.8  # 学术来源默认较高

        # 检查内容中的引用
        content = source.content
        citation_markers = ["[1]", "[2]", "Source:", "According to"]
        citation_count = sum(1 for marker in citation_markers if marker in content)

        return min(0.5 + citation_count * 0.1, 1.0)

class InformationSynthesizer:
    """信息综合器"""

    def __init__(self, llm):
        self.llm = llm

    async def synthesize(
        self,
        topic: str,
        sources: List[tuple],
        depth: int = 3
    ) -> Synthesis:
        """综合多个信息源"""

        # 提取关键信息
        key_points = await self._extract_key_points(sources)

        # 识别共识与分歧
        consensus, disagreements = self._identify_agreements_and_conflicts(key_points)

        # 构建论证结构
        arguments = await self._build_arguments(key_points, depth)

        # 识别知识缺口
        gaps = self._identify_knowledge_gaps(topic, key_points)

        return Synthesis(
            topic=topic,
            key_findings=key_points,
            consensus_areas=consensus,
            contested_areas=disagreements,
            argument_structure=arguments,
            knowledge_gaps=gaps
        )

    async def _extract_key_points(
        self,
        sources: List[tuple]
    ) -> List[KeyPoint]:
        """从来源中提取关键信息点"""

        all_content = "\n\n".join([
            f"Source {i+1} (credibility: {assessment.score}): {source.content}"
            for i, (source, assessment) in enumerate(sources[:5])  # Top 5 sources
        ])

        prompt = f"""Extract key information points from the following sources.

Sources:
{all_content}

Extract 10-15 key points. For each point:
1. State the fact/claim clearly
2. Note which sources support it
3. Assess confidence level (high/medium/low)

Output as JSON list."""

        response = await self.llm.generate(prompt, format="json")
        points = json.loads(response)

        return [KeyPoint(**p) for p in points]

class ReportGenerator:
    """报告生成器"""

    def __init__(self, llm):
        self.llm = llm

    async def generate(
        self,
        topic: str,
        synthesis: Synthesis,
        sources: List[tuple],
        config: ResearchConfig
    ) -> ResearchReport:
        """生成研究报告"""

        # 生成大纲
        outline = await self._generate_outline(topic, synthesis)

        # 逐节生成
        sections = []
        for section in outline.sections:
            content = await self._write_section(section, synthesis, config)
            sections.append(ReportSection(
                title=section.title,
                content=content,
                level=section.level
            ))

        # 添加引用
        if config.include_citations:
            references = self._format_references(sources)
        else:
            references = []

        # 组装报告
        report_content = self._assemble_report(sections, references, config)

        return ResearchReport(
            title=f"Research Report: {topic}",
            content=report_content,
            sections=sections,
            sources_used=len(sources),
            generation_date=datetime.now(),
            metadata={
                "topic": topic,
                "config": config,
                "synthesis_summary": synthesis.summary()
            }
        )

    def _assemble_report(
        self,
        sections: List[ReportSection],
        references: List[str],
        config: ResearchConfig
    ) -> str:
        """组装最终报告"""

        if config.output_format == "markdown":
            return self._to_markdown(sections, references)
        elif config.output_format == "html":
            return self._to_html(sections, references)
        else:
            return self._to_text(sections, references)

    def _to_markdown(
        self,
        sections: List[ReportSection],
        references: List[str]
    ) -> str:
        """生成Markdown格式报告"""

        lines = []

        # 标题
        lines.append(f"# {sections[0].title if sections else 'Research Report'}")
        lines.append(f"\n*Generated on {datetime.now().strftime('%Y-%m-%d')}*\n")

        # 内容
        for section in sections:
            prefix = "#" * section.level
            lines.append(f"\n{prefix} {section.title}\n")
            lines.append(section.content)

        # 引用
        if references:
            lines.append("\n## References\n")
            for i, ref in enumerate(references, 1):
                lines.append(f"{i}. {ref}")

        return "\n".join(lines)

# 数据类定义
@dataclass
class ResearchQuestion:
    question: str
    type: str
    priority: int

@dataclass
class InformationSource:
    type: str
    title: str
    url: str
    content: str
    timestamp: datetime
    authors: List[str] = None
    publication_date: datetime = None

@dataclass
class CredibilityAssessment:
    score: float
    breakdown: Dict[str, float]
    flags: List[str]

@dataclass
class KeyPoint:
    statement: str
    supporting_sources: List[int]
    confidence: str

@dataclass
class Synthesis:
    topic: str
    key_findings: List[KeyPoint]
    consensus_areas: List[str]
    contested_areas: List[str]
    argument_structure: Dict
    knowledge_gaps: List[str]

    def summary(self) -> str:
        return f"Synthesis of {len(self.key_findings)} key points on '{self.topic}'"

@dataclass
class ReportSection:
    title: str
    content: str
    level: int

@dataclass
class ResearchReport:
    title: str
    content: str
    sections: List[ReportSection]
    sources_used: int
    generation_date: datetime
    metadata: Dict

# 使用示例
async def main():
    """主函数"""

    # 初始化组件(需要实际的LLM和搜索客户端)
    # llm_client = ...
    # search_client = ...

    # 创建Agent
    # agent = AutonomousResearchAgent(llm_client, search_client)

    # 执行研究
    # report = await agent.conduct_research("Large Language Models in Healthcare")

    # 保存报告
    # with open("research_report.md", "w", encoding="utf-8") as f:
    #     f.write(report.content)

    print("自主研究Agent框架已加载")
    print("使用方法:")
    print("  agent = AutonomousResearchAgent(llm_client, search_client)")
    print("  report = await agent.conduct_research('你的研究主题')")

if __name__ == "__main__":
    asyncio.run(main())  # 创建事件循环运行顶层协程

8.3 运行与扩展

Bash
# 安装依赖
pip install aiohttp beautifulsoup4 python-dotenv

# 配置API密钥
export OPENAI_API_KEY="your-key"
export SERPER_API_KEY="your-search-key"

# 运行Agent
python autonomous_research_agent.py

扩展方向: 1. 添加更多数据源(数据库、 API 等) 2. 实现多语言支持 3. 添加可视化报告生成功能 4. 集成事实核查 API 5. 支持协作研究模式


总结

新一代 AI Agent 正在从简单的工具调用向自主执行复杂任务的智能体演进。关键发展趋势包括:

  1. 端到端自主性:从 Manus 到 Operator , Agent 能够独立完成从理解需求到交付成果的完整流程
  2. 多模态感知:视觉、听觉等多模态能力的整合使 Agent 能够操作真实世界界面
  3. 多 Agent 协作:复杂任务需要多个专业 Agent 协作完成( Subagent/编排者模式)
  4. 技能复用与进化: Claude Skills 等机制让 Agent 从 Agent“每次从头推理”进化为“越用越熟练”
  5. 安全可控:随着 Agent 能力增强,安全护栏和人在回路机制变得至关重要

未来, AI Agent 将成为人类工作和生活的智能伙伴,在保持人类监督和价值观对齐的前提下,大幅提升生产力和创造力。


参考资源

论文

  • ReAct: Synergizing Reasoning and Acting in Language Models (Yao et al., 2022)
  • Reflexion: Self-Reflective Agents (Shinn et al., 2023)
  • Voyager: An Open-Ended Embodied Agent with Large Language Models (Wang et al., 2023)

项目

产品


文档版本: 1.0 作者: AI Learning Team


最后更新日期: 2026-02-12 适用版本: LLM 学习教程 v2026