PPO训练小车
PPO 训练小车以经典 CartPole 为例核心是Actor-Critic 架构 裁剪目标 GAE 优势估计通过多轮数据复用稳定更新策略让小车学会平衡杆或完成导航。下面从原理、环境、代码、训练到调优给出完整可运行方案。一、PPO 训练小车核心原理PPOProximal Policy Optimization是Actor-Critic架构的策略梯度算法核心是限制策略更新幅度避免训练震荡。Actor策略网络输入状态输出动作概率分布离散 / 连续指导小车动作。Critic价值网络输入状态输出状态价值 V (s)评估当前状态好坏。裁剪目标PPO-Clip重要性采样比rt(θ)πθold(at∣st)πθ(at∣st)裁剪损失LCLIP(θ)E[min(rtAt,clip(rt,1−ϵ,1ϵ)At)]ϵ通常取 0.2防止策略更新过大。优势函数GAEAt∑k0∞(γλ)kδtkδtrtγV(st1)−V(st)平衡偏差与方差。二、环境选择与搭建1. 经典小车环境CartPole-v1状态空间4 维小车位置、速度、杆角度、杆角速度。动作空间离散 2 维左移、右移。奖励每步 1杆倒 / 车出界则回合结束目标累计奖励≥475。安装依赖bash运行pip install gymnasium torch numpy matplotlib2. 自定义 / ROS 小车环境可选用 GazeboROS 搭建 TurtleBot3定义观测激光 / 图像、动作线速度 / 角速度、奖励函数避障 进度。或用 MetaDrive 做自动驾驶仿真动作空间为连续转向 油门。三、完整 PPO 训练小车代码PyTorch1. 网络定义ActorCriticpython运行import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F import gymnasium as gym import numpy as np from collections import deque import matplotlib.pyplot as plt # 设备配置 device torch.device(cuda if torch.cuda.is_available() else cpu) # Actor网络输出动作概率 class Actor(nn.Module): def __init__(self, state_dim, action_dim, hidden_dim64): super(Actor, self).__init__() self.fc1 nn.Linear(state_dim, hidden_dim) self.fc2 nn.Linear(hidden_dim, hidden_dim) self.fc3 nn.Linear(hidden_dim, action_dim) def forward(self, x): x F.relu(self.fc1(x)) x F.relu(self.fc2(x)) return F.softmax(self.fc3(x), dim-1) # Critic网络输出状态价值 class Critic(nn.Module): def __init__(self, state_dim, hidden_dim64): super(Critic, self).__init__() self.fc1 nn.Linear(state_dim, hidden_dim) self.fc2 nn.Linear(hidden_dim, hidden_dim) self.fc3 nn.Linear(hidden_dim, 1) def forward(self, x): x F.relu(self.fc1(x)) x F.relu(self.fc2(x)) return self.fc3(x)2. PPO Agent 实现python运行class PPO: def __init__(self, state_dim, action_dim, lr_actor3e-4, lr_critic1e-3, gamma0.99, lmbda0.95, eps_clip0.2, epochs10): # 网络初始化 self.actor Actor(state_dim, action_dim).to(device) self.critic Critic(state_dim).to(device) self.optimizer_actor optim.Adam(self.actor.parameters(), lrlr_actor) self.optimizer_critic optim.Adam(self.critic.parameters(), lrlr_critic) # PPO超参数 self.gamma gamma # 折扣因子 self.lmbda lmbda # GAE参数 self.eps_clip eps_clip # 裁剪系数 self.epochs epochs # 每批数据训练轮数 # 经验池 self.memory [] # 存储经验 def store(self, state, action, reward, log_prob, done): self.memory.append((state, action, reward, log_prob, done)) # 选择动作训练/测试 def select_action(self, state, trainingTrue): state torch.FloatTensor(state).unsqueeze(0).to(device) probs self.actor(state) dist torch.distributions.Categorical(probs) action dist.sample() log_prob dist.log_prob(action) if training: return action.item(), log_prob.item() else: return torch.argmax(probs).item() # 测试取最优动作 # 计算GAE优势 def compute_gae(self, rewards, dones, values): advantages [] advantage 0 next_value 0 for t in reversed(range(len(rewards))): delta rewards[t] self.gamma * next_value * (1 - dones[t]) - values[t] advantage delta self.gamma * self.lmbda * (1 - dones[t]) * advantage advantages.insert(0, advantage) next_value values[t] # 优势归一化 advantages torch.FloatTensor(advantages).to(device) advantages (advantages - advantages.mean()) / (advantages.std() 1e-8) return advantages # PPO更新核心 def update(self): # 提取经验 states torch.FloatTensor([s for s, a, r, lp, d in self.memory]).to(device) actions torch.LongTensor([a for s, a, r, lp, d in self.memory]).to(device) rewards torch.FloatTensor([r for s, a, r, lp, d in self.memory]).to(device) old_log_probs torch.FloatTensor([lp for s, a, r, lp, d in self.memory]).to(device) dones torch.FloatTensor([d for s, a, r, lp, d in self.memory]).to(device) # 计算价值与优势 values self.critic(states).squeeze() advantages self.compute_gae(rewards, dones, values.detach().cpu().numpy()) returns advantages values.detach() # TD目标 # 多轮更新 for _ in range(self.epochs): # 新策略概率 new_probs self.actor(states) new_dist torch.distributions.Categorical(new_probs) new_log_probs new_dist.log_prob(actions) # 重要性采样比 ratio torch.exp(new_log_probs - old_log_probs) # 裁剪损失 surr1 ratio * advantages surr2 torch.clamp(ratio, 1-self.eps_clip, 1self.eps_clip) * advantages loss_actor -torch.min(surr1, surr2).mean() # Critic损失 loss_critic F.mse_loss(self.critic(states).squeeze(), returns) # 反向传播 self.optimizer_actor.zero_grad() self.optimizer_critic.zero_grad() loss_actor.backward() loss_critic.backward() self.optimizer_actor.step() self.optimizer_critic.step() # 清空经验池 self.memory []3. 训练主循环python运行def train_ppo(): # 环境初始化 env gym.make(CartPole-v1) state_dim env.observation_space.shape[0] action_dim env.action_space.n ppo PPO(state_dim, action_dim) max_episodes 1000 max_steps 500 reward_history [] avg_reward deque(maxlen100) for episode in range(max_episodes): state, _ env.reset() total_reward 0 done False for step in range(max_steps): # 选择动作 action, log_prob ppo.select_action(state) next_state, reward, terminated, truncated, _ env.step(action) done terminated or truncated # 存储经验 ppo.store(state, action, reward, log_prob, done) total_reward reward state next_state if done: break # 更新策略 ppo.update() # 记录奖励 avg_reward.append(total_reward) reward_history.append(total_reward) print(fEpisode {episode1}, Total Reward: {total_reward}, Avg Reward: {np.mean(avg_reward):.2f}) # 收敛条件平均奖励≥475 if np.mean(avg_reward) 475: print(f训练完成Episode {episode1} 达到收敛条件) torch.save(ppo.actor.state_dict(), cartpole_ppo_actor.pth) torch.save(ppo.critic.state_dict(), cartpole_ppo_critic.pth) break # 绘制奖励曲线 plt.plot(reward_history) plt.xlabel(Episode) plt.ylabel(Total Reward) plt.title(PPO Training on CartPole-v1) plt.show() if __name__ __main__: train_ppo()四、训练流程与关键步骤环境交互每回合用旧策略采样轨迹存储(s,a,r,log_prob,done)。GAE 计算基于 Critic 价值计算每步优势At并归一化。多轮更新同一批数据训练epochs次用裁剪损失限制策略更新。收敛判断连续 100 回合平均奖励≥475CartPole 满分 500。五、超参数调优关键表格参数含义推荐值调优方向lr_actorActor 学习率3e-4收敛慢调大震荡调小lr_criticCritic 学习率1e-3通常比 Actor 大gamma折扣因子0.99长期依赖调大lmbdaGAE 参数0.95平衡偏差 / 方差eps_clip裁剪系数0.2震荡调小收敛慢调大epochs每批训练轮数10数据复用次数六、测试与部署python运行def test_ppo(): env gym.make(CartPole-v1, render_modehuman) state_dim env.observation_space.shape[0] action_dim env.action_space.n ppo PPO(state_dim, action_dim) # 加载模型 ppo.actor.load_state_dict(torch.load(cartpole_ppo_actor.pth)) ppo.critic.load_state_dict(torch.load(cartpole_ppo_critic.pth)) for episode in range(10): state, _ env.reset() total_reward 0 done False while not done: action ppo.select_action(state, trainingFalse) next_state, reward, terminated, truncated, _ env.step(action) done terminated or truncated total_reward reward state next_state print(fTest Episode {episode1}, Reward: {total_reward}) env.close() if __name__ __main__: test_ppo()七、扩展到真实小车 / ROS状态空间替换为激光雷达、相机图像、里程计如 2D/3D 坐标、速度。动作空间连续动作线速度v、角速度wActor 输出高斯分布均值 / 方差。奖励函数正向到达目标点 100、每步前进 1、避障 5负向碰撞 - 200、超时 - 50、偏离路径 - 10环境对接用openai_ros或自定义 Gym 环境实现 ROS 与 PPO Agent 通信。八、常见问题与解决训练震荡减小lr_actor、增大eps_clip、增加epochs。收敛慢增大学习率、调整gamma/lmbda、增加经验池大小。策略退化确保优势归一化、裁剪损失正确、Critic 价值估计准确。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2427150.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!