跳转至

06 - 分层强化学习与多目标强化学习

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

学习时间: 4-5 小时 重要性: ⭐⭐⭐⭐ 解决复杂长程任务与多目标权衡 前置知识: MDP 基础、 DQN/PPO 、策略梯度


🎯 学习目标

完成本章后,你将能够: - 理解分层强化学习( HRL )的 Options 框架 - 掌握Feudal NetworksHAM的核心思想 - 了解目标条件 RL ( Goal-Conditioned RL )与 HER - 掌握多目标 RL的帕累托优化方法 - 理解自博弈( Self-Play )的原理与应用


Part I: 分层强化学习( Hierarchical RL )

本部分聚焦“任务分解与时序抽象”,从选项策略到高低层协同,解决长程决策难题。

1. 为什么需要分层

1.1 问题:长程稀疏奖励

标准 RL 在面对长期规划任务时效率极低:

Text Only
做一顿饭的RL分解:
├── 底层动作空间(每个时间步)
│   ├── 左手移动(-10cm, 0, 5cm)
│   ├── 右手旋转(5°, -3°, 0°)
│   ├── 手指施力(2N)
│   └── ... 连续高维动作
├── 总步数: ~10000步
├── 奖励: 只在做完饭时 +1
└── 问题: 在10000步中随机探索几乎不可能找到正确序列

分层分解:
├── 高层: 选择子任务("打开冰箱" → "拿出材料" → "切菜" → "炒菜")
├── 中层: 选择子目标("手移到冰箱把手" → "抓住" → "拉开")
└── 底层: 生成关节动作
每一层的决策空间和时间尺度更小 → 更容易学习

1.2 分层 RL 的核心优势

优势 说明
时间抽象 高层策略以更长的时间尺度决策
状态抽象 高层只关注任务相关的宏观特征
可复用性 底层技能可跨任务复用
探索效率 在子目标空间而非原始动作空间探索

2. Options 框架

2.1 定义

Sutton, Precup, Singh (1999) 提出的 Options 框架是分层 RL 的理论基础。

一个Option \(\omega = (I_\omega, \pi_\omega, \beta_\omega)\) 由三部分组成: - \(I_\omega \subseteq S\)初始化集合(在什么状态下可以启动这个 option ) - \(\pi_\omega: S \times A \to [0,1]\)内部策略( option 执行什么动作) - \(\beta_\omega: S \to [0,1]\)终止条件( option 什么时候结束)

直觉:一个 Option 就是一个"技能"。比如"走到门口"是一个 option ,它有自己的策略(怎么走),有触发条件(我离门不太远时可用),有终止条件(到门口了就结束)。

2.2 Semi-MDP (SMDP)

引入 Options 后,高层策略在时间扩展的框架中决策:

\[Q_\Omega(s, \omega) = \mathbb{E}\left[ r_1 + \gamma r_2 + \cdots + \gamma^{k-1} r_k + \gamma^k \max_{\omega'} Q_\Omega(s', \omega') \right]\]

其中 \(k\) 是 option 执行的持续时间。

2.3 代码实现

Python
import numpy as np
from typing import Callable, Optional, Tuple
from dataclasses import dataclass

@dataclass  # @dataclass自动生成__init__等方法
class Option:
    """
    Options框架中的一个Option(技能/子策略)

    参考: Sutton, Precup, Singh (1999)
    "Between MDPs and Semi-MDPs: A Framework for Temporal Abstraction in RL"
    """
    name: str                                      # option名称
    initiation_set: Callable[[np.ndarray], bool]  # 初始化条件 I(s) → bool
    policy: Callable[[np.ndarray], int]           # 内部策略 π(s) → a
    termination: Callable[[np.ndarray], float]    # 终止概率 β(s) → [0,1]

class OptionsFramework:
    """
    Options框架的SMDP Q-Learning实现

    高层策略: 选择Option(使用Q-Learning over options)
    底层策略: 每个Option的内部策略
    """

    def __init__(self, n_states: int, options: list, gamma: float = 0.99, lr: float = 0.1):
        """
        参数:
            n_states: 状态数量(离散状态空间)
            options: Option列表
            gamma: 折扣因子
            lr: Q值学习率
        """
        self.options = options
        self.n_options = len(options)
        self.gamma = gamma
        self.lr = lr

        # 高层Q值: Q(s, option)
        self.Q = np.zeros((n_states, self.n_options))

    def select_option(self, state: int, epsilon: float = 0.1) -> int:
        """
        高层策略: ε-greedy选择option
        只能从当前状态可用的options中选择
        """
        # 筛选可用options
        available = [
            i for i, opt in enumerate(self.options)  # enumerate同时获取索引和元素
            if opt.initiation_set(state)
        ]

        if not available:
            raise ValueError(f"状态 {state} 没有可用option")

        if np.random.random() < epsilon:
            return np.random.choice(available)

        # 在可用options中选Q值最大的
        q_vals = [(i, self.Q[state, i]) for i in available]
        return max(q_vals, key=lambda x: x[1])[0]  # lambda匿名函数

    def execute_option(self, env, state: int, option_idx: int) -> Tuple[int, float, int]:
        """
        执行一个option直到终止

        返回:
            next_state: option终止时的状态
            total_reward: 累积折扣奖励
            steps: 执行的步数
        """
        option = self.options[option_idx]
        total_reward = 0.0
        steps = 0
        current_state = state

        while True:
            # 使用option的内部策略选择动作
            action = option.policy(current_state)
            next_state, reward, done, _ = env.step(action)

            total_reward += (self.gamma ** steps) * reward
            steps += 1
            current_state = next_state

            # 检查终止条件
            if done or np.random.random() < option.termination(current_state):
                break

        return current_state, total_reward, steps

    def update_q(self, state: int, option_idx: int, reward: float,
                 next_state: int, steps: int):
        """
        SMDP Q-Learning更新

        Q(s, ω) ← Q(s, ω) + α[R + γ^k max_ω' Q(s', ω') - Q(s, ω)]
        """
        best_next = np.max(self.Q[next_state])
        target = reward + (self.gamma ** steps) * best_next
        self.Q[state, option_idx] += self.lr * (target - self.Q[state, option_idx])

# === 使用示例 ===
def create_navigation_options():
    """创建导航任务的options"""

    # Option 1: 向右走到边界
    go_right = Option(
        name="向右走",
        initiation_set=lambda s: s % 10 < 9,       # 不在右边界时可用
        policy=lambda s: 1,                          # 动作1=向右
        termination=lambda s: 1.0 if s % 10 == 9 else 0.0  # 到右边界就终止
    )

    # Option 2: 向下走到边界
    go_down = Option(
        name="向下走",
        initiation_set=lambda s: s // 10 < 9,       # 不在下边界时可用
        policy=lambda s: 2,                          # 动作2=向下
        termination=lambda s: 1.0 if s // 10 == 9 else 0.0  # 到下边界就终止
    )

    # Option 3: 走到目标区域
    go_to_goal = Option(
        name="走到目标",
        initiation_set=lambda s: True,               # 任何状态可用
        policy=lambda s: 1 if s % 10 < 9 else 2,    # 先右再下
        termination=lambda s: 1.0 if s == 99 else 0.1  # 接近目标时有概率终止
    )

    return [go_right, go_down, go_to_goal]

3. Option-Critic 架构

3.1 端到端学习 Options

Bacon, Harb, Precup (2017) 提出了 Option-Critic ,可以端到端学习options 的策略和终止条件(而非手动设计)。

Text Only
Option-Critic 架构:
┌─────────────────────────────────────┐
│ 高层: 策略 over Options (π_Ω)      │
│   选择哪个option来执行              │
├─────────────────────────────────────┤
│ 中间: Option内部策略 (π_ω)          │
│   每个option里如何选择动作           │
├─────────────────────────────────────┤
│ 终止: 终止函数 (β_ω)               │
│   option什么时候结束                │
└─────────────────────────────────────┘
三个组件都通过梯度下降端到端学习

3.2 代码实现

Python
import torch
import torch.nn as nn
import torch.nn.functional as F

class OptionCritic(nn.Module):  # 继承nn.Module定义网络层
    """
    Option-Critic 架构

    端到端学习:
    1. 高层策略: 选择option(基于Q_Ω)
    2. option内部策略: 产生动作
    3. 终止函数: 决定option何时结束

    参考: Bacon et al., "The Option-Critic Architecture" (AAAI 2017)
    """

    def __init__(self, state_dim: int, n_actions: int, n_options: int = 4):
        """
        参数:
            state_dim: 状态向量维度
            n_actions: 原子动作数量
            n_options: option数量
        """
        super().__init__()  # super()调用父类方法
        self.n_options = n_options
        self.n_actions = n_actions

        # 共享特征提取器
        self.features = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU()
        )

        # Q_Ω(s, ω): 高层Q值(选择option)
        self.q_omega = nn.Linear(128, n_options)

        # π_ω(a|s): 每个option的内部策略
        # n_options个独立的动作分布
        self.option_policies = nn.ModuleList([
            nn.Linear(128, n_actions) for _ in range(n_options)
        ])

        # β_ω(s): 每个option的终止概率
        self.terminations = nn.Linear(128, n_options)

    def get_features(self, state: torch.Tensor) -> torch.Tensor:
        """提取共享特征"""
        return self.features(state)

    def get_q_omega(self, state: torch.Tensor) -> torch.Tensor:
        """计算所有option的Q值"""
        feat = self.get_features(state)
        return self.q_omega(feat)  # (batch, n_options)

    def get_option_policy(self, state: torch.Tensor, option: int) -> torch.Tensor:
        """获取某个option的动作概率分布"""
        feat = self.get_features(state)
        logits = self.option_policies[option](feat)
        return F.softmax(logits, dim=-1)   # (batch, n_actions)  # F.xxx PyTorch函数式API

    def get_termination(self, state: torch.Tensor, option: int) -> torch.Tensor:
        """获取某个option的终止概率"""
        feat = self.get_features(state)
        term_probs = torch.sigmoid(self.terminations(feat))  # (batch, n_options)
        return term_probs[:, option]  # (batch,)

    def select_option(self, state: torch.Tensor, epsilon: float = 0.1) -> int:
        """ε-greedy选择option"""
        if torch.rand(1).item() < epsilon:  # 将单元素张量转为Python数值
            return torch.randint(self.n_options, (1,)).item()
        q_values = self.get_q_omega(state)
        return q_values.argmax(dim=-1).item()

    def select_action(self, state: torch.Tensor, option: int) -> int:
        """根据当前option的策略选择动作"""
        action_probs = self.get_option_policy(state, option)
        return torch.multinomial(action_probs, 1).item()

class OptionCriticTrainer:
    """Option-Critic训练器"""

    def __init__(self, model: OptionCritic, lr: float = 1e-3, gamma: float = 0.99,
                 termination_reg: float = 0.01):
        """
        参数:
            model: OptionCritic模型
            lr: 学习率
            gamma: 折扣因子
            termination_reg: 终止正则化系数
                (鼓励options不要太快终止,促进时间抽象)
        """
        self.model = model
        self.gamma = gamma
        self.termination_reg = termination_reg
        self.optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    def update(self, state, option, action, reward, next_state, done):
        """
        一步更新

        三个梯度同时更新:
        1. Q_Ω: 高层Q值 (类似DQN)
        2. π_ω: option策略 (策略梯度)
        3. β_ω: 终止函数 (优势函数驱动)
        """
        state_t = torch.FloatTensor(state).unsqueeze(0)  # unsqueeze增加一个维度
        next_state_t = torch.FloatTensor(next_state).unsqueeze(0)

        # --- Q_Ω 更新 ---
        q_omega = self.model.get_q_omega(state_t)[0, option]

        with torch.no_grad():  # 禁用梯度计算,节省内存
            # 下一状态的option选择考虑终止
            next_q = self.model.get_q_omega(next_state_t)
            beta_next = self.model.get_termination(next_state_t, option)
            # 如果终止→选新option(max Q),否则继续当前option
            target = reward + self.gamma * (1 - done) * (
                beta_next * next_q.max() + (1 - beta_next) * next_q[0, option]
            )

        q_loss = F.mse_loss(q_omega, target)

        # --- π_ω 策略梯度 ---
        action_probs = self.model.get_option_policy(state_t, option)
        log_prob = torch.log(action_probs[0, action] + 1e-8)

        with torch.no_grad():
            advantage = reward + self.gamma * (1 - done) * next_q.max() - q_omega

        policy_loss = -log_prob * advantage.detach()  # 分离计算图,不参与梯度计算

        # --- β_ω 终止梯度 ---
        beta = self.model.get_termination(state_t, option)
        with torch.no_grad():
            # 终止的优势: 终止后选最优option vs 继续当前option
            term_advantage = next_q.max() - next_q[0, option]

        # 只有当终止有优势时才鼓励终止 + 正则化鼓励不要过早终止
        termination_loss = beta * (term_advantage - self.termination_reg)

        # 总损失
        total_loss = q_loss + policy_loss + termination_loss

        self.optimizer.zero_grad()  # 清零梯度
        total_loss.backward()  # 反向传播计算梯度
        self.optimizer.step()  # 更新参数

        return {
            'q_loss': q_loss.item(),
            'policy_loss': policy_loss.item(),
            'termination_loss': termination_loss.item()
        }

4. 目标条件强化学习( Goal-Conditioned RL )

4.1 核心思想

在标准 RL 中,目标是固定的。Goal-Conditioned RL 将目标 \(g\) 作为额外输入,使策略可以泛化到不同目标:

\[\pi(a|s, g) \quad \text{—— 策略同时依赖状态和目标}\]

4.2 Hindsight Experience Replay (HER)

Andrychowicz et al. (OpenAI, 2017) 提出的 HER 是 Goal-Conditioned RL 的关键突破。

核心洞察:即使智能体没能到达预定目标,它到达的位置本身也可以作为一个"事后目标"来学习。

Text Only
HER的直觉:
┌────────────────────────────────────────┐
│ 原始经验:                              │
│   目标: 到达A点                        │
│   实际: 到达了B点                      │
│   奖励: -1 (失败)                      │
│                                        │
│ HER重新标记:                           │
│   目标: 到达B点 (把实际到达的作为目标)  │
│   实际: 到达了B点                      │
│   奖励: 0 (成功!)                      │
│                                        │
│ → 从"失败"中也能学到有用的信息         │
└────────────────────────────────────────┘

4.3 代码实现

Python
import numpy as np
from collections import deque
from typing import Dict, List, Tuple

class HindsightExperienceReplay:
    """
    Hindsight Experience Replay (HER)

    将失败的轨迹重新标记为成功经验,
    解决稀疏奖励+目标条件RL的样本效率问题。

    参考: Andrychowicz et al., "Hindsight Experience Replay" (2017)
    """

    def __init__(
        self,
        buffer_size: int = 100000,
        n_sampled_goal: int = 4,
        strategy: str = 'future'
    ):
        """
        参数:
            buffer_size: 经验池大小
            n_sampled_goal: 每个transition重新标记的目标数
            strategy: HER目标采样策略
                - 'future': 从同轨迹的未来状态中采样(最常用)
                - 'final': 使用轨迹最后到达的状态
                - 'random': 随机采样
                - 'episode': 从同一episode中随机采样
        """
        self.buffer = deque(maxlen=buffer_size)
        self.n_sampled_goal = n_sampled_goal
        self.strategy = strategy

    def store_episode(self, episode: List[Dict]):
        """
        存储一个episode并进行HER重新标记

        参数:
            episode: transition列表,每个包含:
                {state, action, reward, next_state, goal, achieved_goal, done}
        """
        # 1. 存储原始经验
        for transition in episode:
            self.buffer.append(transition)

        # 2. HER重标记: 为每个transition生成额外的成功经验
        for idx, transition in enumerate(episode):
            sampled_goals = self._sample_goals(episode, idx)

            for new_goal in sampled_goals:
                # 用新目标重新计算奖励
                new_reward = self._compute_reward(
                    transition['achieved_goal'], new_goal
                )

                # 创建重标记的transition
                her_transition = {
                    'state': transition['state'],
                    'action': transition['action'],
                    'reward': new_reward,
                    'next_state': transition['next_state'],
                    'goal': new_goal,
                    'achieved_goal': transition['achieved_goal'],
                    'done': new_reward == 0  # 达到新目标=成功
                }
                self.buffer.append(her_transition)

    def _sample_goals(self, episode: List[Dict], current_idx: int) -> List[np.ndarray]:
        """根据策略采样替代目标"""
        goals = []

        for _ in range(self.n_sampled_goal):
            if self.strategy == 'future':
                # 从当前时间步之后的状态中采样(最有效的策略)
                if current_idx < len(episode) - 1:
                    future_idx = np.random.randint(current_idx + 1, len(episode))
                    goals.append(episode[future_idx]['achieved_goal'])
            elif self.strategy == 'final':
                # 使用episode最终到达的状态
                goals.append(episode[-1]['achieved_goal'])  # [-1]负索引取最后元素
            elif self.strategy == 'episode':
                # 从同episode中随机采样
                rand_idx = np.random.randint(0, len(episode))
                goals.append(episode[rand_idx]['achieved_goal'])

        return goals

    def _compute_reward(self, achieved_goal: np.ndarray, desired_goal: np.ndarray,
                        threshold: float = 0.05) -> float:
        """
        稀疏奖励函数:到达目标 → 0,否则 → -1
        """
        dist = np.linalg.norm(achieved_goal - desired_goal)  # np.linalg线性代数运算
        return 0.0 if dist < threshold else -1.0

    def sample(self, batch_size: int = 256) -> Dict:
        """从buffer中随机采样一个batch"""
        indices = np.random.randint(0, len(self.buffer), size=batch_size)
        batch = [self.buffer[i] for i in indices]

        return {
            'states': np.array([t['state'] for t in batch]),  # np.array创建NumPy数组
            'actions': np.array([t['action'] for t in batch]),
            'rewards': np.array([t['reward'] for t in batch]),
            'next_states': np.array([t['next_state'] for t in batch]),
            'goals': np.array([t['goal'] for t in batch]),
            'dones': np.array([t['done'] for t in batch])
        }

Part II: 多目标强化学习( Multi-Objective RL )

本部分聚焦“多目标权衡”,介绍向量化奖励、偏好建模与 Pareto 最优求解思路。

5. 多目标优化基础

5.1 问题定义

在多目标 RL 中,有多个奖励函数需要同时优化:

\[\max_\pi \mathbf{J}(\pi) = \left( J_1(\pi), J_2(\pi), \ldots, J_m(\pi) \right)\]

帕累托最优:如果没有其他策略在所有目标上都不差且至少一个目标更好,则称 \(\pi\) 是帕累托最优的。

Text Only
双目标示例: 自动驾驶
├── 目标1: 最短到达时间 (快)
├── 目标2: 最少碰撞风险 (安全)
└── 帕累托前沿:

    安全 ↑
    │    ●  非常安全但很慢
    │   ● ●
    │  ●   ● ← 帕累托前沿
    │ ●     ●    (所有帕累托最优策略)
    │●       ●
    │         ● 很快但不太安全
    └──────────── →  速度

    前沿上的每个点都是"不可能在不牺牲一个目标的情况下改进另一个目标"

5.2 方法分类

方法 描述 优势 劣势
线性加权 \(R = w_1 R_1 + w_2 R_2\) 简单 只能找到凸前沿
约束法 优化一个目标,约束其余 可找非凸前沿 需设阈值
帕累托法 同时优化所有目标 找到整个前沿 计算昂贵
偏好条件 策略以偏好为额外输入 一个模型适应所有偏好 训练复杂

5.3 线性加权方法

Python
import numpy as np
import torch
import torch.nn as nn

class LinearScalarization:
    """
    线性加权法: 最简单的多目标RL方法

    将多个目标的标量化: R = Σ w_i × R_i
    通过改变权重w遍历帕累托前沿

    局限: 只能找到帕累托前沿的凸包部分
    """

    def __init__(self, n_objectives: int):
        self.n_objectives = n_objectives

    def scalarize(self, rewards: np.ndarray, weights: np.ndarray) -> float:
        """
        将多目标奖励向量标量化

        参数:
            rewards: 多目标奖励向量 (n_objectives,)
            weights: 权重向量 (n_objectives,), sum=1
        """
        assert len(rewards) == len(weights) == self.n_objectives  # assert断言
        assert abs(sum(weights) - 1.0) < 1e-6, "权重必须归一化"
        return np.dot(rewards, weights)  # np.dot矩阵/向量点乘

    def sweep_pareto_front(self, n_points: int = 11):
        """
        通过遍历不同权重组合来近似帕累托前沿 (2目标)
        """
        if self.n_objectives != 2:
            raise ValueError("帕累托前沿遍历只支持2目标")

        weight_sets = []
        for i in range(n_points):
            w1 = i / (n_points - 1)
            w2 = 1 - w1
            weight_sets.append(np.array([w1, w2]))

        return weight_sets  # 每组权重训练一个策略

5.4 帕累托条件策略网络

Python
class ParetoConditionedPolicy(nn.Module):
    """
    帕累托条件策略网络

    将偏好权重作为额外输入,使一个策略网络
    能够在不同权重下表现出不同的帕累托最优行为。

    训练时随机采样权重 → 模型学会适应不同偏好
    推理时指定权重 → 输出对应的策略

    参考: Abels et al., "Dynamic Weights in Multi-Objective DRL" (ICML 2019)
    """

    def __init__(self, state_dim: int, action_dim: int, n_objectives: int):
        super().__init__()

        # 输入 = state + preference weights
        input_dim = state_dim + n_objectives

        self.network = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim)
        )

        self.n_objectives = n_objectives

    def forward(self, state: torch.Tensor, preference: torch.Tensor) -> torch.Tensor:
        """
        根据状态和偏好权重输出动作

        参数:
            state: (batch, state_dim)
            preference: (batch, n_objectives),权重向量
        """
        # 拼接状态和偏好
        x = torch.cat([state, preference], dim=-1)  # torch.cat沿已有维度拼接张量
        return self.network(x)

    def train_step(self, batch, optimizer):
        """
        训练一步: 随机采样偏好权重
        """
        states = batch['states']
        batch_size = states.shape[0]

        # 随机采样偏好权重(Dirichlet分布保证和为1)
        preferences = torch.distributions.Dirichlet(
            torch.ones(self.n_objectives)
        ).sample((batch_size,))

        # 计算标量化的奖励
        multi_rewards = batch['multi_rewards']  # (batch, n_objectives)
        scalar_rewards = (multi_rewards * preferences).sum(dim=-1)

        # 正常的策略梯度/DQN更新...
        actions = self.forward(states, preferences)
        loss = -scalar_rewards.mean()  # 简化

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        return loss.item()

6. 自博弈( Self-Play )

6.1 核心思想

Self-Play 是让智能体与自身的历史版本或当前版本对弈来提升能力。

经典成功案例: - AlphaGo / AlphaZero: 围棋、象棋、将棋 - OpenAI Five: Dota 2 - Cicero: 外交策略游戏

6.2 分类

Text Only
Self-Play方法:
├── 朴素自博弈(Naive Self-Play)
│   └── 永远与最新版本对弈
│       缺点: 可能不稳定(策略循环)
├── 虚拟自博弈(Fictitious Self-Play)
│   └── 对手是历史策略的均匀混合
│       优势: 收敛到纳什均衡
├── 人口自博弈(Population-Based Self-Play)
│   └── 维护一个策略群体,多样化对手
│       优势: 避免过拟合特定对手
└── PFSP(优先虚拟自博弈)
    └── 根据胜率优先选择弱势对手
        优势: 加速训练,减少灾难性遗忘

6.3 代码框架

Python
from collections import deque
import numpy as np
import copy

class SelfPlayTrainer:
    """
    Self-Play训练框架

    维护一个历史策略池,智能体与历史版本对弈提升
    """

    def __init__(self, agent, pool_size: int = 20, save_interval: int = 1000):
        """
        参数:
            agent: RL智能体
            pool_size: 历史策略池大小
            save_interval: 每多少步保存一次快照
        """
        self.agent = agent
        self.opponent_pool = deque(maxlen=pool_size)
        self.save_interval = save_interval
        self.step_count = 0

        # 保存初始版本
        self.opponent_pool.append(copy.deepcopy(agent))

    def select_opponent(self, strategy: str = 'pfsp') -> object:
        """
        选择对手

        参数:
            strategy: 对手选择策略
                - 'latest': 最新版本
                - 'uniform': 均匀随机
                - 'pfsp': 优先选择弱势对手
        """
        if strategy == 'latest':
            return self.opponent_pool[-1]
        elif strategy == 'uniform':
            return np.random.choice(list(self.opponent_pool))
        elif strategy == 'pfsp':
            return self._pfsp_select()

    def _pfsp_select(self) -> object:
        """
        优先虚拟自博弈(PFSP)选择
        优先选择当前agent胜率较低的对手
        → 在薄弱环节上更多训练
        """
        if len(self.opponent_pool) <= 1:
            return self.opponent_pool[0]

        # 计算对每个历史对手的胜率
        # 胜率低的对手权重更高(优先挑战弱点)
        win_rates = np.array([
            self._estimate_win_rate(opponent)
            for opponent in self.opponent_pool
        ])

        # 转化为采样权重(胜率越低 → 权重越高)
        weights = (1 - win_rates) ** 2  # 二次权重,强调弱势匹配
        weights = weights / weights.sum()

        idx = np.random.choice(len(self.opponent_pool), p=weights)
        return self.opponent_pool[idx]

    def _estimate_win_rate(self, opponent, n_games: int = 10) -> float:
        """估计对某个对手的胜率(简化版)"""
        # 实际中会维护一个胜率统计表
        return 0.5  # 简化

    def train_step(self):
        """一步训练"""
        # 选择对手
        opponent = self.select_opponent('pfsp')

        # 对弈并训练(具体实现取决于环境)
        # result = play_game(self.agent, opponent)
        # self.agent.learn(result)

        self.step_count += 1

        # 定期保存快照
        if self.step_count % self.save_interval == 0:
            snapshot = copy.deepcopy(self.agent)
            self.opponent_pool.append(snapshot)

7. 面试要点

7.1 高频问题

  1. 什么是 Options 框架?它如何实现时间抽象?
  2. Option = (初始化集合, 内部策略, 终止条件)
  3. 高层策略选择 option ,底层策略执行→时间尺度分离

  4. Option-Critic 相比手工设计 Options 的优势?

  5. 端到端学习,无需人工设计,可自动发现有意义的子目标和技能

  6. HER 如何解决稀疏奖励问题?

  7. 将失败经验重新标记为"事后成功"→大幅增加正样本

  8. 帕累托最优策略一定是线性加权能找到的吗?

  9. 不一定,线性加权只能找到帕累托前沿的凸包部分

  10. Self-Play 为什么有效? PFSP 相比 Naive Self-Play 的优势?

  11. Self-Play 将自身作为不断提升的对手→ 自举式进步
  12. PFSP 优先挑战弱点→更均衡→避免策略循环

📌 关键要点总结

  1. 分层 RL通过 Options 框架实现时间抽象,解决长程规划问题
  2. Option-Critic可以端到端学习 option 的策略和终止条件
  3. HER通过事后重标记将失败转化为学习信号
  4. 多目标 RL需要考虑帕累托最优,线性加权只是最简单的方法
  5. Self-Play是竞争性环境中的强大训练方法, PFSP 是工程实践中的常用策略

📚 延伸阅读

  • PPO 算法
  • 模仿学习
  • 安全强化学习
  • 论文: Sutton et al., "Between MDPs and Semi-MDPs" (1999)
  • 论文: Bacon et al., "The Option-Critic Architecture" (2017)
  • 论文: Andrychowicz et al., "Hindsight Experience Replay" (2017)