多Agent协同实战我是如何设计智能协作系统的前言最近在做一个复杂的AI工作流系统需要多个Agent协同完成任务。一开始简单地让Agent直接通信结果出现了状态不一致的问题。后来引入了消息路由和状态管理机制系统稳定性提升了80%。这篇文章分享我的设计经验。一、底层原理1.1 核心机制多Agent协同的关键是消息路由和状态一致性graph TD A[任务输入] -- B[任务分发器] B -- C[Agent 1] B -- D[Agent 2] B -- E[Agent 3] C -- F[消息总线] D -- F E -- F F -- G[状态管理器] G -- H{状态一致?} H --|是| I[任务完成] H --|否| J[冲突解决] J -- F关键组件组件功能作用消息总线消息路由统一通信状态管理器状态同步一致性保障任务分发器任务分配负载均衡冲突解决器冲突处理协调矛盾1.2 与同类方案的对比架构扩展性一致性复杂度中心化低高低去中心化高中高混合架构高高中二、快速上手from langchain.agents import AgentExecutor from langchain.llms import OpenAI # 创建多个Agent agent1 create_agent(分析专家) agent2 create_agent(执行专家) agent3 create_agent(总结专家) # 简单的协同流程 def simple_workflow(task): # Agent1分析任务 analysis agent1.run(task) # Agent2执行任务 result agent2.run(analysis) # Agent3总结结果 summary agent3.run(result) return summary三、核心 API / 深水区3.1 核心方法速查方法功能适用场景AgentExecutor()Agent执行器单Agent执行ChatOpenAI()对话模型多轮对话Memory()记忆管理状态保持EventEmitter()事件发布消息通知StateMachine()状态机流程控制3.2 生产级配置from langchain.chains import LLMChain from langchain.prompts import PromptTemplate from langchain.chat_models import ChatOpenAI class Coordinator: def __init__(self): self.agents {} self.state {} self.message_bus MessageBus() def register_agent(self, name, agent): self.agents[name] agent def dispatch(self, task): # 分析任务类型 analysis self.analyze_task(task) # 分配给合适的Agent agent_name self.select_agent(analysis) # 执行任务 result self.agents[agent_name].run(task) # 更新状态 self.update_state(agent_name, result) return result def analyze_task(self, task): prompt PromptTemplate( input_variables[task], template分析任务类型{task} ) chain LLMChain(llmChatOpenAI(), promptprompt) return chain.run(task)3.3 高级定制# 消息总线实现 class MessageBus: def __init__(self): self.subscribers {} def subscribe(self, topic, callback): if topic not in self.subscribers: self.subscribers[topic] [] self.subscribers[topic].append(callback) def publish(self, topic, message): if topic in self.subscribers: for callback in self.subscribers[topic]: callback(message)四、实战演练场景多Agent代码审查流程def code_review_workflow(code): # 第一步语法检查 syntax_agent AgentExecutor.from_agent_and_tools( agentsyntax_agent, tools[check_syntax] ) syntax_result syntax_agent.run(code) # 第二步安全检查 security_agent AgentExecutor.from_agent_and_tools( agentsecurity_agent, tools[check_security] ) security_result security_agent.run(code) # 第三步性能检查 performance_agent AgentExecutor.from_agent_and_tools( agentperformance_agent, tools[check_performance] ) performance_result performance_agent.run(code) # 汇总结果 summary f 语法检查{syntax_result} 安全检查{security_result} 性能检查{performance_result} return summary五、避坑指南与最佳实践 技巧使用状态机管理流程from transitions import Machine class WorkflowStateMachine: states [pending, analyzing, executing, reviewing, completed] def __init__(self): self.machine Machine( modelself, statesWorkflowStateMachine.states, initialpending ) # 定义状态转换 self.machine.add_transition( triggerstart, sourcepending, destanalyzing ) self.machine.add_transition( triggeranalyze_done, sourceanalyzing, destexecuting )⚠️ 警告避免状态不一致# 错误示例无状态同步 def badWorkflow(task): agent1_result agent1.run(task) agent2_result agent2.run(task) # 不知道agent1的结果 # 可能导致冲突 return combine(agent1_result, agent2_result) # 正确做法共享状态 def goodWorkflow(task): shared_state {} agent1_result agent1.run(task, stateshared_state) shared_state[agent1_result] agent1_result agent2_result agent2.run(task, stateshared_state) # 知道agent1的结果 return combine(agent1_result, agent2_result)✅ 推荐使用事件驱动架构class EventDrivenCoordinator: def __init__(self): self.message_bus MessageBus() self.message_bus.subscribe(task.completed, self.on_task_completed) def on_task_completed(self, message): task_id message[task_id] result message[result] # 触发下一步 self.trigger_next(task_id, result)六、综合实战演示from langchain.agents import initialize_agent, AgentType from langchain.tools import Tool from langchain.chat_models import ChatOpenAI from collections import defaultdict class MultiAgentSystem: def __init__(self): self.llm ChatOpenAI(model_namegpt-4) self.agents {} self.state defaultdict(dict) self.message_bus MessageBus() # 注册消息处理器 self.message_bus.subscribe(agent.completed, self.handle_completion) def create_agent(self, name, tools, description): agent initialize_agent( tools, self.llm, agentAgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, verboseTrue ) self.agents[name] { agent: agent, description: description } return agent def dispatch(self, task, context): # 分析任务选择Agent agent_name self.select_agent(task) # 执行任务 result self.agents[agent_name][agent].run(task) # 发布完成事件 self.message_bus.publish(agent.completed, { agent: agent_name, task: task, result: result, context: context }) return result def select_agent(self, task): prompt f 任务{task} 可用Agent{[(name, info[description]) for name, info in self.agents.items()]} 请选择最合适的Agent result self.llm.predict(prompt) return result.strip() def handle_completion(self, message): agent_name message[agent] result message[result] context message[context] # 更新状态 self.state[context[task_id]][agent_name] result # 检查是否完成 if self.is_complete(context[task_id]): self.finalize(context[task_id]) def is_complete(self, task_id): # 检查所有必要Agent是否完成 required_agents [analyzer, executor, reviewer] state self.state[task_id] for agent in required_agents: if agent not in state: return False return True def finalize(self, task_id): state self.state[task_id] summary f 任务完成总结 分析结果{state[analyzer]} 执行结果{state[executor]} 审查结果{state[reviewer]} print(summary) # 使用示例 system MultiAgentSystem() # 创建Agent system.create_agent( nameanalyzer, tools[], description任务分析专家 ) system.create_agent( nameexecutor, tools[], description任务执行专家 ) system.create_agent( namereviewer, tools[], description结果审查专家 ) # 提交任务 system.dispatch( task分析并执行代码优化任务, context{task_id: task_001} )七、总结多Agent协同的核心是消息路由和状态管理。关键要点使用消息总线统一通信维护全局状态保证一致性使用状态机管理流程实现事件驱动架构核心收获好的架构设计是多Agent协同成功的关键。
多Agent协同实战:我是如何设计智能协作系统的
多Agent协同实战我是如何设计智能协作系统的前言最近在做一个复杂的AI工作流系统需要多个Agent协同完成任务。一开始简单地让Agent直接通信结果出现了状态不一致的问题。后来引入了消息路由和状态管理机制系统稳定性提升了80%。这篇文章分享我的设计经验。一、底层原理1.1 核心机制多Agent协同的关键是消息路由和状态一致性graph TD A[任务输入] -- B[任务分发器] B -- C[Agent 1] B -- D[Agent 2] B -- E[Agent 3] C -- F[消息总线] D -- F E -- F F -- G[状态管理器] G -- H{状态一致?} H --|是| I[任务完成] H --|否| J[冲突解决] J -- F关键组件组件功能作用消息总线消息路由统一通信状态管理器状态同步一致性保障任务分发器任务分配负载均衡冲突解决器冲突处理协调矛盾1.2 与同类方案的对比架构扩展性一致性复杂度中心化低高低去中心化高中高混合架构高高中二、快速上手from langchain.agents import AgentExecutor from langchain.llms import OpenAI # 创建多个Agent agent1 create_agent(分析专家) agent2 create_agent(执行专家) agent3 create_agent(总结专家) # 简单的协同流程 def simple_workflow(task): # Agent1分析任务 analysis agent1.run(task) # Agent2执行任务 result agent2.run(analysis) # Agent3总结结果 summary agent3.run(result) return summary三、核心 API / 深水区3.1 核心方法速查方法功能适用场景AgentExecutor()Agent执行器单Agent执行ChatOpenAI()对话模型多轮对话Memory()记忆管理状态保持EventEmitter()事件发布消息通知StateMachine()状态机流程控制3.2 生产级配置from langchain.chains import LLMChain from langchain.prompts import PromptTemplate from langchain.chat_models import ChatOpenAI class Coordinator: def __init__(self): self.agents {} self.state {} self.message_bus MessageBus() def register_agent(self, name, agent): self.agents[name] agent def dispatch(self, task): # 分析任务类型 analysis self.analyze_task(task) # 分配给合适的Agent agent_name self.select_agent(analysis) # 执行任务 result self.agents[agent_name].run(task) # 更新状态 self.update_state(agent_name, result) return result def analyze_task(self, task): prompt PromptTemplate( input_variables[task], template分析任务类型{task} ) chain LLMChain(llmChatOpenAI(), promptprompt) return chain.run(task)3.3 高级定制# 消息总线实现 class MessageBus: def __init__(self): self.subscribers {} def subscribe(self, topic, callback): if topic not in self.subscribers: self.subscribers[topic] [] self.subscribers[topic].append(callback) def publish(self, topic, message): if topic in self.subscribers: for callback in self.subscribers[topic]: callback(message)四、实战演练场景多Agent代码审查流程def code_review_workflow(code): # 第一步语法检查 syntax_agent AgentExecutor.from_agent_and_tools( agentsyntax_agent, tools[check_syntax] ) syntax_result syntax_agent.run(code) # 第二步安全检查 security_agent AgentExecutor.from_agent_and_tools( agentsecurity_agent, tools[check_security] ) security_result security_agent.run(code) # 第三步性能检查 performance_agent AgentExecutor.from_agent_and_tools( agentperformance_agent, tools[check_performance] ) performance_result performance_agent.run(code) # 汇总结果 summary f 语法检查{syntax_result} 安全检查{security_result} 性能检查{performance_result} return summary五、避坑指南与最佳实践 技巧使用状态机管理流程from transitions import Machine class WorkflowStateMachine: states [pending, analyzing, executing, reviewing, completed] def __init__(self): self.machine Machine( modelself, statesWorkflowStateMachine.states, initialpending ) # 定义状态转换 self.machine.add_transition( triggerstart, sourcepending, destanalyzing ) self.machine.add_transition( triggeranalyze_done, sourceanalyzing, destexecuting )⚠️ 警告避免状态不一致# 错误示例无状态同步 def badWorkflow(task): agent1_result agent1.run(task) agent2_result agent2.run(task) # 不知道agent1的结果 # 可能导致冲突 return combine(agent1_result, agent2_result) # 正确做法共享状态 def goodWorkflow(task): shared_state {} agent1_result agent1.run(task, stateshared_state) shared_state[agent1_result] agent1_result agent2_result agent2.run(task, stateshared_state) # 知道agent1的结果 return combine(agent1_result, agent2_result)✅ 推荐使用事件驱动架构class EventDrivenCoordinator: def __init__(self): self.message_bus MessageBus() self.message_bus.subscribe(task.completed, self.on_task_completed) def on_task_completed(self, message): task_id message[task_id] result message[result] # 触发下一步 self.trigger_next(task_id, result)六、综合实战演示from langchain.agents import initialize_agent, AgentType from langchain.tools import Tool from langchain.chat_models import ChatOpenAI from collections import defaultdict class MultiAgentSystem: def __init__(self): self.llm ChatOpenAI(model_namegpt-4) self.agents {} self.state defaultdict(dict) self.message_bus MessageBus() # 注册消息处理器 self.message_bus.subscribe(agent.completed, self.handle_completion) def create_agent(self, name, tools, description): agent initialize_agent( tools, self.llm, agentAgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, verboseTrue ) self.agents[name] { agent: agent, description: description } return agent def dispatch(self, task, context): # 分析任务选择Agent agent_name self.select_agent(task) # 执行任务 result self.agents[agent_name][agent].run(task) # 发布完成事件 self.message_bus.publish(agent.completed, { agent: agent_name, task: task, result: result, context: context }) return result def select_agent(self, task): prompt f 任务{task} 可用Agent{[(name, info[description]) for name, info in self.agents.items()]} 请选择最合适的Agent result self.llm.predict(prompt) return result.strip() def handle_completion(self, message): agent_name message[agent] result message[result] context message[context] # 更新状态 self.state[context[task_id]][agent_name] result # 检查是否完成 if self.is_complete(context[task_id]): self.finalize(context[task_id]) def is_complete(self, task_id): # 检查所有必要Agent是否完成 required_agents [analyzer, executor, reviewer] state self.state[task_id] for agent in required_agents: if agent not in state: return False return True def finalize(self, task_id): state self.state[task_id] summary f 任务完成总结 分析结果{state[analyzer]} 执行结果{state[executor]} 审查结果{state[reviewer]} print(summary) # 使用示例 system MultiAgentSystem() # 创建Agent system.create_agent( nameanalyzer, tools[], description任务分析专家 ) system.create_agent( nameexecutor, tools[], description任务执行专家 ) system.create_agent( namereviewer, tools[], description结果审查专家 ) # 提交任务 system.dispatch( task分析并执行代码优化任务, context{task_id: task_001} )七、总结多Agent协同的核心是消息路由和状态管理。关键要点使用消息总线统一通信维护全局状态保证一致性使用状态机管理流程实现事件驱动架构核心收获好的架构设计是多Agent协同成功的关键。