CODEBASE
← 返回列表

PPO算法实现详解

Proximal Policy Optimization | 强化学习算法

算法概述

PPO(Proximal Policy Optimization,近端策略优化)是一种先进的策略梯度算法,由OpenAI于2017年提出。它通过在每次策略更新时限制策略变化幅度,确保了策略更新的稳定性和收敛性。

核心特性

PPO算法核心实现

import torch
import torch.nn.functional as F
from torch.distributions import Categorical

class PPOAgent:
    def __init__(self, state_dim, action_dim, lr=0.0003, gamma=0.99, clip_ratio=0.2):
        # 初始化策略网络
        self.policy = self._build_network(state_dim, action_dim)
        self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
        self.gamma = gamma
        self.clip_ratio = clip_ratio
        
        # 用于存储轨迹
        self.states = []
        self.actions = []
        self.rewards = []
        self.values = []
        self.log_probs = []
        self.dones = []

    def _build_network(self, input_dim, output_dim):
        return torch.nn.Sequential(
            torch.nn.Linear(input_dim, 256),
            torch.nn.ReLU(),
            torch.nn.Linear(256, output_dim)
        )

    def select_action(self, state):
        # 使用当前策略网络选择动作
        logits = self.policy(torch.FloatTensor(state))
        dist = Categorical(logits=logits)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        value = self.policy[:1](logits).squeeze(-1)  # 简单值函数估计
        return action.numpy(), log_prob.numpy(), value.item()

    def compute_gae(self, rewards, values, dones, gamma=0.99, lambda_gae=0.95):
        # 计算广义优势估计
        values = values + [0]
        advantages = []
        gae = 0
        
        for i in range(len(rewards)):
            if dones[i]:
                delta = rewards[i] + gamma * values[i+1] - values[i]
                gae = delta + gamma * lambda_gae * gae
                advantages.append(gae)
            else:
                advantages.append(0)
        
        return torch.FloatTensor(advantages)

    def update_policy(self):
        # 计算折扣奖励和GAE
        rewards = self.rewards
        values = self.values
        dones = self.dones
        advantages = self.compute_gae(rewards, values, dones)
        
        # 归一化优势
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        advantages = torch.FloatTensor(advantages)
        
        # PPO裁剪
        old_log_probs = torch.FloatTensor(self.log_probs)
        ratio = torch.exp(new_log_probs - old_log_probs)
        
        # 限制策略变化幅度
        surr1 = torch.clamp(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio)
        surr2 = torch.clamp(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio)
        surr = torch.min(surr1, surr2)
        
        # PPO目标函数
        policy_loss = -torch.min(
            ratio * advantages,
            surr * advantages
        ).mean()
        
        # 优化
        self.optimizer.zero_grad()
        policy_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 0.5)
        self.optimizer.step()
        
        # 清空轨迹
        self._clear_trajectory()

算法流程

  1. 数据收集:使用当前策略与环境交互,收集状态、动作、奖励等轨迹数据
  2. 优势估计:使用GAE方法计算每个时间步的优势函数
  3. 策略更新:使用裁剪目标函数优化策略网络,限制策略变化幅度
  4. 重复迭代:重复上述过程,直到策略收敛或达到预期性能
← 返回列表