别再死磕理论了!用Python+PyTorch实战MAPPO,搞定多智能体协同控制(附完整代码)

别再死磕理论了!用Python+PyTorch实战MAPPO,搞定多智能体协同控制(附完整代码) 用PythonPyTorch实战MAPPO从零构建多智能体协同控制方案在强化学习领域多智能体系统正成为解决复杂协同任务的关键技术。许多开发者虽然理解MARL多智能体强化学习的基础概念却在将理论转化为可运行代码时遇到障碍。本文将绕过繁琐的数学推导带您用PyTorch一步步实现MAPPOMulti-Agent Proximal Policy Optimization并适配自定义环境。1. 环境配置与项目初始化首先创建一个干净的Python 3.8环境建议使用conda管理依赖conda create -n mappo python3.8 conda activate mappo pip install torch1.12.0cu113 -f https://download.pytorch.org/whl/torch_stable.html pip install gym0.21.0 numpy1.21.6 wandb0.13.5项目目录结构应保持模块化设计mappo_project/ ├── envs/ # 自定义环境 │ ├── __init__.py │ └── grid_world.py # 示例网格环境 ├── agents/ # 智能体相关代码 │ ├── networks.py # 神经网络结构 │ └── mappo.py # 核心算法实现 ├── configs/ # 参数配置 │ └── default.yaml └── train.py # 主训练脚本2. 构建自定义多智能体环境我们以网格世界为例创建一个简单的协同导航环境。在envs/grid_world.py中定义import gym from gym import spaces import numpy as np class MultiAgentGridWorld(gym.Env): def __init__(self, grid_size5, n_agents2): self.grid_size grid_size self.n_agents n_agents self.observation_space spaces.Dict({ fagent_{i}: spaces.Box(0, grid_size, (2,)) for i in range(n_agents) }) self.action_space spaces.Dict({ fagent_{i}: spaces.Discrete(4) for i in range(n_agents) }) def reset(self): self.agent_pos np.random.randint(0, self.grid_size, (self.n_agents, 2)) self.target_pos np.random.randint(0, self.grid_size, (2,)) return self._get_obs() def _get_obs(self): return {fagent_{i}: self.agent_pos[i] for i in range(self.n_agents)} def step(self, actions): # 处理每个智能体的移动 for i, act in enumerate(actions.values()): if act 0: # 上 self.agent_pos[i][1] min(self.agent_pos[i][1]1, self.grid_size-1) elif act 1: # 右 self.agent_pos[i][0] min(self.agent_pos[i][0]1, self.grid_size-1) # 其他动作类似... # 计算共享奖励 distances [np.linalg.norm(pos - self.target_pos) for pos in self.agent_pos] reward -np.mean(distances) done all(d 1.0 for d in distances) # 所有智能体都接近目标 return self._get_obs(), {__all__: reward}, {__all__: done}, {}3. MAPPO核心架构实现在agents/networks.py中定义策略网络和价值网络import torch import torch.nn as nn class PolicyNetwork(nn.Module): def __init__(self, obs_dim, action_dim, hidden_size64): super().__init__() self.fc nn.Sequential( nn.Linear(obs_dim, hidden_size), nn.Tanh(), nn.Linear(hidden_size, hidden_size), nn.Tanh(), nn.Linear(hidden_size, action_dim) ) def forward(self, x): return torch.softmax(self.fc(x), dim-1) class ValueNetwork(nn.Module): def __init__(self, obs_dim, hidden_size64): super().__init__() self.fc nn.Sequential( nn.Linear(obs_dim, hidden_size), nn.Tanh(), nn.Linear(hidden_size, hidden_size), nn.Tanh(), nn.Linear(hidden_size, 1) ) def forward(self, x): return self.fc(x)在agents/mappo.py中实现核心算法import torch import torch.optim as optim from torch.distributions import Categorical class MAPPO: def __init__(self, env, devicecuda): self.env env self.device device # 为每个智能体初始化网络 self.agents {} for agent_id in env.observation_space.spaces.keys(): obs_dim env.observation_space[agent_id].shape[0] act_dim env.action_space[agent_id].n self.agents[agent_id] { policy: PolicyNetwork(obs_dim, act_dim).to(device), value: ValueNetwork(obs_dim).to(device), optimizer: optim.Adam([ {params: PolicyNetwork(obs_dim, act_dim).parameters()}, {params: ValueNetwork(obs_dim).parameters()} ], lr3e-4) } def compute_returns(self, rewards, gamma0.99): returns [] R 0 for r in reversed(rewards): R r gamma * R returns.insert(0, R) return torch.tensor(returns, deviceself.device) def update(self, samples): for agent_id, data in samples.items(): # 标准化回报 returns self.compute_returns(data[rewards]) returns (returns - returns.mean()) / (returns.std() 1e-8) # 计算策略损失 old_probs data[old_probs] actions data[actions] states data[states] current_probs self.agents[agent_id][policy](states) dist Categorical(current_probs) entropy dist.entropy().mean() ratio (current_probs.gather(1, actions) / old_probs).squeeze() surr1 ratio * returns surr2 torch.clamp(ratio, 0.8, 1.2) * returns policy_loss -torch.min(surr1, surr2).mean() - 0.01 * entropy # 计算价值损失 values self.agents[agent_id][value](states).squeeze() value_loss (returns - values).pow(2).mean() # 更新参数 self.agents[agent_id][optimizer].zero_grad() (policy_loss value_loss).backward() self.agents[agent_id][optimizer].step()4. 训练流程与参数调优在train.py中实现主训练循环import yaml from envs.grid_world import MultiAgentGridWorld from agents.mappo import MAPPO import wandb def train(): # 初始化环境与算法 env MultiAgentGridWorld(grid_size5, n_agents2) agent MAPPO(env) # 训练参数 n_episodes 1000 max_steps 100 batch_size 32 for ep in range(n_episodes): obs env.reset() episode_reward 0 samples {agent_id: {states: [], actions: [], rewards: [], old_probs: []} for agent_id in obs.keys()} for step in range(max_steps): actions {} for agent_id, ob in obs.items(): state torch.FloatTensor(ob).unsqueeze(0).to(agent.device) probs agent.agents[agent_id][policy](state) dist torch.distributions.Categorical(probs) action dist.sample() samples[agent_id][states].append(state) samples[agent_id][actions].append(action.unsqueeze(0)) samples[agent_id][old_probs].append(probs.gather(1, action.unsqueeze(0))) actions[agent_id] action.item() next_obs, rewards, dones, _ env.step(actions) for agent_id in obs.keys(): samples[agent_id][rewards].append(rewards[__all__]) episode_reward rewards[__all__] obs next_obs if dones[__all__]: break # 更新策略 agent.update(samples) # 记录训练过程 wandb.log({episode_reward: episode_reward, episode: ep}) print(fEpisode {ep}, Reward: {episode_reward:.2f})关键参数调优建议参数推荐值调整方向影响说明学习率3e-4±1数量级过高导致不稳定过低收敛慢GAE λ0.950.9-1.0权衡偏差与方差折扣因子 γ0.990.9-0.999影响未来奖励权重PPO clip ε0.20.1-0.3控制策略更新幅度批量大小32-2562的幂次影响梯度估计质量5. 可视化与调试技巧使用WandB监控训练过程wandb.init(projectmappo-gridworld) wandb.config.update({ n_agents: 2, grid_size: 5, learning_rate: 3e-4, gamma: 0.99, clip_epsilon: 0.2 })常见问题排查指南奖励不增长检查奖励函数设计是否合理尝试减小学习率增加熵系数鼓励探索训练不稳定确保状态归一化检查梯度裁剪是否生效增大批量大小智能体行为异常可视化决策轨迹检查动作空间定义验证策略网络输出分布对于更复杂的任务可以考虑以下扩展在策略网络中使用RNN处理部分可观测性实现集中式critic网络添加课程学习逐步提高任务难度