在多 Agent 基础篇中我们探讨了角色协同、任务拆分的基本模式。本文进一步深入聚焦高阶架构设计、跨服务协作与复杂场景完整落地帮助读者构建生产级别的多 Agent 系统。一、高阶架构从简单协同到生产级系统1.1 三层架构模型成熟的多 Agent 系统通常采用调度层、执行层、专家层三层分离架构__INLINE_┌─────────────────────────────────────────────────────┐│ 调度层 (Dispatcher) ││ 任务解析 → DAG 拆分 → 结果汇总 → 响应输出 │└─────────────────────────────────────────────────────┘│┌──────────────────┼──────────────────┐▼ ▼ ▼┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ 执行Agent │ │ 执行Agent │ │ 执行Agent ││ (Executor) │ │ (Executor) │ │ (Executor) │└─────────────┘ └─────────────┘ └─────────────┘│ │ │└──────────────────┼──────────────────┘▼┌─────────────────────────────────────────────────────┐│ 专家层 (Specialist) ││ 风控专家 · 法律专家 · 技术专家 │└─────────────────────────────────────────────────────┘____INLINE_调度层负责任务编排不参与具体业务逻辑执行层并行处理独立子任务专家层提供垂直领域的深度能力可被执行层按需调用。1.2 调度策略分类策略适用场景核心优势**同步调度**强依赖任务链结果实时可用的简单场景**异步调度**I/O 密集、耗时任务不阻塞主流程提升吞吐量**分层调度**多阶段复杂流程清晰分层易于维护**混合调度**生产环境综合场景兼顾性能与可靠性二、任务编排复杂业务场景的拆分艺术2.1 DAG 依赖管理复杂任务存在天然的数据依赖关系使用有向无环图DAG管理任务依赖____INLINE_javaServicepublic class DagScheduler {public ListLevel buildDag(ListSubTask tasks) {// 识别无依赖任务Level 0ListSubTask level0 tasks.stream().filter(t - t.getDependencies().isEmpty()).collect(Collectors.toList());// 按层级推进MapInteger, ListSubTask levels new HashMap();levels.put(0, level0);for (int i 1; ; i) {ListSubTask levelN tasks.stream().filter(t - t.getDependencies().stream().allMatch(d - levels.get(i-1).contains(d))).filter(t - !levels.containsValue(t)).collect(Collectors.toList());if (levelN.isEmpty()) break;levels.put(i, levelN);}return levels.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e - new Level(e.getKey(), e.getValue())).collect(Collectors.toList());}}____INLINE_2.2 动态任务编排根据输入特征动态决定执行路径____INLINE_javapublic TaskPlan plan(UserRequest request) {TaskPlan plan new TaskPlan();if (request.isHighRisk()) {// 高风险场景增加风控环节plan.addStage(TaskStage.RISK_EVALUATION);plan.addStage(TaskStage.MANUAL_REVIEW);}if (request.getAmount().compareTo(HIGH_AMOUNT) 0) {// 大额交易多维度复核plan.addParallelTask(income_verify);plan.addParallelTask(asset_verify);}plan.addStage(TaskStage.FINAL_APPROVAL);return plan;}____INLINE_三、跨服务协作分布式多 Agent 实战3.1 服务边界与 Agent 分布在微服务架构下每个服务可部署独立 Agent形成联邦式协作____INLINE_[用户服务] ─────┐Agent │ 消息队列[订单服务] ─────┼─── Kafka ──→ [聚合服务]Agent │ Dispatcher[支付服务] ─────┘Agent____INLINE_3.2 消息总线实现使用消息队列实现跨服务解耦____INLINE_java// 任务发布Servicepublic class TaskPublisher {Autowired private KafkaTemplateString, TaskMessage kafka;public void publishTask(SubTask task) {TaskMessage msg new TaskMessage();msg.setTaskId(task.getId());msg.setPayload(task.getPayload());msg.setCallbackTopic(agent-results);kafka.send(agent-tasks, task.getServiceId(), msg);}}// 结果订阅Servicepublic class ResultCollector {private ConcurrentHashMapString, CountDownLatch latches new ConcurrentHashMap();KafkaListener(topics agent-results)public void onResult(TaskResult result) {collectedResults.put(result.getTaskId(), result);if (latches.containsKey(result.getTaskId())) {latches.get(result.getTaskId()).countDown();}}}____INLINE_3.3 共享状态与一致性跨服务协作时通过 Redis 实现共享上下文____INLINE_javaServicepublic class SharedContextStore {Autowired private StringRedisTemplate redis;public void write(String key, Object value) {redis.opsForValue().set(context: key, JSON.toJSONString(value));redis.opsForValue().set(context: key :ts, System.currentTimeMillis());}public T T read(String key, ClassT clazz) {String json redis.opsForValue().get(context: key);return JSON.parseObject(json, clazz);}}____INLINE_四、实战案例电商订单处理完整实现4.1 业务场景用户提交订单系统自动完成1.库存预占InventoryAgent2.优惠计算CouponAgent3.支付扣款PaymentAgent4.风控审核RiskAgent5.订单创建OrderAgent4.2 核心代码实现____INLINE_javaServiceSlf4jpublic class OrderProcessingService {Autowired private InventoryAgent inventoryAgent;Autowired private CouponAgent couponAgent;Autowired private PaymentAgent paymentAgent;Autowired private RiskAgent riskAgent;Autowired private OrderAgent orderAgent;public OrderResult process(OrderRequest request) {// 第一阶段并行执行无依赖CompletableFutureInventoryResult invFuture CompletableFuture.supplyAsync(() - inventoryAgent.reserve(request.getItems()));CompletableFutureCouponResult couponFuture CompletableFuture.supplyAsync(() - couponAgent.calculate(request.getUserId(), request.getItems()));CompletableFuture.allOf(invFuture, couponFuture).join();InventoryResult invResult invFuture.join();CouponResult couponResult couponFuture.join();if (!invResult.isSuccess()) {return OrderResult.fail(库存不足);}// 第二阶段支付 风控可并行BigDecimal finalAmount couponResult.getFinalAmount();CompletableFuturePaymentResult payFuture CompletableFuture.supplyAsync(() - paymentAgent.deduct(request.getUserId(), finalAmount));CompletableFutureRiskResult riskFuture CompletableFuture.supplyAsync(() - riskAgent.evaluate(request));CompletableFuture.allOf(payFuture, riskFuture).join();PaymentResult payResult payFuture.join();RiskResult riskResult riskFuture.join();if (!payResult.isSuccess()) {// 释放库存inventoryAgent.release(request.getItems());return OrderResult.fail(支付失败);}if (!riskResult.isApproved()) {// 退款paymentAgent.refund(request.getUserId(), finalAmount);return OrderResult.fail(风控拦截);}// 第三阶段创建订单Order order orderAgent.create(request, invResult, couponResult, payResult);return OrderResult.success(order);}}____INLINE_4.3 线程池隔离不同类型的 Agent 使用独立线程池避免资源竞争____INLINE_javaBeanpublic TaskExecutor agentExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(20);executor.setMaxPoolSize(100);executor.setQueueCapacity(500);executor.setThreadNamePrefix(agent-);executor.initialize();return executor;}Beanpublic TaskExecutor ioExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(50);executor.setMaxPoolSize(200);executor.setQueueCapacity(1000);executor.setThreadNamePrefix(io-);executor.initialize();return executor;}____INLINE_4.4 性能对比指标纯串行多 Agent 并行总耗时~3.5s~1.2s吞吐量提升1x~3x失败隔离无支付失败可回滚库存五、容错与可观测性5.1 错误处理策略____INLINE_javapublic TaskResult executeWithRetry(SubTask task) {int maxAttempts 3;for (int i 0; i maxAttempts; i) {try {return executor.execute(task);} catch (Exception e) {log.warn(Task {} attempt {} failed: {}, task.getId(), i1, e.getMessage());if (i maxAttempts - 1) {// 最终失败返回兜底结果return TaskResult.fallback(task.getId(), getDefaultValue(task));}}}return TaskResult.fallback(task.getId(), getDefaultValue(task));}____INLINE_5.2 监控埋点____INLINE_javaAspectComponentpublic class AgentMetricsAspect {Around(execution(com.example.agent.Agent.execute(..)))public Object measure(ProceedingJoinPoint pjp) throws Throwable {String agentName pjp.getTarget().getClass().getSimpleName();long start System.currentTimeMillis();try {Object result pjp.proceed();metrics.record(agentName, success, System.currentTimeMillis() - start);return result;} catch (Exception e) {metrics.record(agentName, failure, System.currentTimeMillis() - start);throw e;}}}__六、总结多 Agent 高阶实战的核心在于1.架构分层调度层专注编排执行层专注业务专家层提供深度能力2.DAG 驱动通过有向无环图管理任务依赖确保正确执行顺序3.跨服务协作消息队列 共享存储实现服务间解耦与状态同步4.容错兜底重试机制 默认值策略确保系统稳定性5.可观测全链路埋点监控快速定位问题生产级多 Agent 系统不是一蹴而就需要根据业务复杂度逐步演进。从简单场景入手逐步引入分层架构、动态编排、服务协作最终实现高效、可靠、可扩展的智能业务系统。作者洛水石作者洛水石
Java 程序员第 24 阶段:多 Agent 高阶实战,复杂业务场景完整落地实现
在多 Agent 基础篇中我们探讨了角色协同、任务拆分的基本模式。本文进一步深入聚焦高阶架构设计、跨服务协作与复杂场景完整落地帮助读者构建生产级别的多 Agent 系统。一、高阶架构从简单协同到生产级系统1.1 三层架构模型成熟的多 Agent 系统通常采用调度层、执行层、专家层三层分离架构__INLINE_┌─────────────────────────────────────────────────────┐│ 调度层 (Dispatcher) ││ 任务解析 → DAG 拆分 → 结果汇总 → 响应输出 │└─────────────────────────────────────────────────────┘│┌──────────────────┼──────────────────┐▼ ▼ ▼┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ 执行Agent │ │ 执行Agent │ │ 执行Agent ││ (Executor) │ │ (Executor) │ │ (Executor) │└─────────────┘ └─────────────┘ └─────────────┘│ │ │└──────────────────┼──────────────────┘▼┌─────────────────────────────────────────────────────┐│ 专家层 (Specialist) ││ 风控专家 · 法律专家 · 技术专家 │└─────────────────────────────────────────────────────┘____INLINE_调度层负责任务编排不参与具体业务逻辑执行层并行处理独立子任务专家层提供垂直领域的深度能力可被执行层按需调用。1.2 调度策略分类策略适用场景核心优势**同步调度**强依赖任务链结果实时可用的简单场景**异步调度**I/O 密集、耗时任务不阻塞主流程提升吞吐量**分层调度**多阶段复杂流程清晰分层易于维护**混合调度**生产环境综合场景兼顾性能与可靠性二、任务编排复杂业务场景的拆分艺术2.1 DAG 依赖管理复杂任务存在天然的数据依赖关系使用有向无环图DAG管理任务依赖____INLINE_javaServicepublic class DagScheduler {public ListLevel buildDag(ListSubTask tasks) {// 识别无依赖任务Level 0ListSubTask level0 tasks.stream().filter(t - t.getDependencies().isEmpty()).collect(Collectors.toList());// 按层级推进MapInteger, ListSubTask levels new HashMap();levels.put(0, level0);for (int i 1; ; i) {ListSubTask levelN tasks.stream().filter(t - t.getDependencies().stream().allMatch(d - levels.get(i-1).contains(d))).filter(t - !levels.containsValue(t)).collect(Collectors.toList());if (levelN.isEmpty()) break;levels.put(i, levelN);}return levels.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(e - new Level(e.getKey(), e.getValue())).collect(Collectors.toList());}}____INLINE_2.2 动态任务编排根据输入特征动态决定执行路径____INLINE_javapublic TaskPlan plan(UserRequest request) {TaskPlan plan new TaskPlan();if (request.isHighRisk()) {// 高风险场景增加风控环节plan.addStage(TaskStage.RISK_EVALUATION);plan.addStage(TaskStage.MANUAL_REVIEW);}if (request.getAmount().compareTo(HIGH_AMOUNT) 0) {// 大额交易多维度复核plan.addParallelTask(income_verify);plan.addParallelTask(asset_verify);}plan.addStage(TaskStage.FINAL_APPROVAL);return plan;}____INLINE_三、跨服务协作分布式多 Agent 实战3.1 服务边界与 Agent 分布在微服务架构下每个服务可部署独立 Agent形成联邦式协作____INLINE_[用户服务] ─────┐Agent │ 消息队列[订单服务] ─────┼─── Kafka ──→ [聚合服务]Agent │ Dispatcher[支付服务] ─────┘Agent____INLINE_3.2 消息总线实现使用消息队列实现跨服务解耦____INLINE_java// 任务发布Servicepublic class TaskPublisher {Autowired private KafkaTemplateString, TaskMessage kafka;public void publishTask(SubTask task) {TaskMessage msg new TaskMessage();msg.setTaskId(task.getId());msg.setPayload(task.getPayload());msg.setCallbackTopic(agent-results);kafka.send(agent-tasks, task.getServiceId(), msg);}}// 结果订阅Servicepublic class ResultCollector {private ConcurrentHashMapString, CountDownLatch latches new ConcurrentHashMap();KafkaListener(topics agent-results)public void onResult(TaskResult result) {collectedResults.put(result.getTaskId(), result);if (latches.containsKey(result.getTaskId())) {latches.get(result.getTaskId()).countDown();}}}____INLINE_3.3 共享状态与一致性跨服务协作时通过 Redis 实现共享上下文____INLINE_javaServicepublic class SharedContextStore {Autowired private StringRedisTemplate redis;public void write(String key, Object value) {redis.opsForValue().set(context: key, JSON.toJSONString(value));redis.opsForValue().set(context: key :ts, System.currentTimeMillis());}public T T read(String key, ClassT clazz) {String json redis.opsForValue().get(context: key);return JSON.parseObject(json, clazz);}}____INLINE_四、实战案例电商订单处理完整实现4.1 业务场景用户提交订单系统自动完成1.库存预占InventoryAgent2.优惠计算CouponAgent3.支付扣款PaymentAgent4.风控审核RiskAgent5.订单创建OrderAgent4.2 核心代码实现____INLINE_javaServiceSlf4jpublic class OrderProcessingService {Autowired private InventoryAgent inventoryAgent;Autowired private CouponAgent couponAgent;Autowired private PaymentAgent paymentAgent;Autowired private RiskAgent riskAgent;Autowired private OrderAgent orderAgent;public OrderResult process(OrderRequest request) {// 第一阶段并行执行无依赖CompletableFutureInventoryResult invFuture CompletableFuture.supplyAsync(() - inventoryAgent.reserve(request.getItems()));CompletableFutureCouponResult couponFuture CompletableFuture.supplyAsync(() - couponAgent.calculate(request.getUserId(), request.getItems()));CompletableFuture.allOf(invFuture, couponFuture).join();InventoryResult invResult invFuture.join();CouponResult couponResult couponFuture.join();if (!invResult.isSuccess()) {return OrderResult.fail(库存不足);}// 第二阶段支付 风控可并行BigDecimal finalAmount couponResult.getFinalAmount();CompletableFuturePaymentResult payFuture CompletableFuture.supplyAsync(() - paymentAgent.deduct(request.getUserId(), finalAmount));CompletableFutureRiskResult riskFuture CompletableFuture.supplyAsync(() - riskAgent.evaluate(request));CompletableFuture.allOf(payFuture, riskFuture).join();PaymentResult payResult payFuture.join();RiskResult riskResult riskFuture.join();if (!payResult.isSuccess()) {// 释放库存inventoryAgent.release(request.getItems());return OrderResult.fail(支付失败);}if (!riskResult.isApproved()) {// 退款paymentAgent.refund(request.getUserId(), finalAmount);return OrderResult.fail(风控拦截);}// 第三阶段创建订单Order order orderAgent.create(request, invResult, couponResult, payResult);return OrderResult.success(order);}}____INLINE_4.3 线程池隔离不同类型的 Agent 使用独立线程池避免资源竞争____INLINE_javaBeanpublic TaskExecutor agentExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(20);executor.setMaxPoolSize(100);executor.setQueueCapacity(500);executor.setThreadNamePrefix(agent-);executor.initialize();return executor;}Beanpublic TaskExecutor ioExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(50);executor.setMaxPoolSize(200);executor.setQueueCapacity(1000);executor.setThreadNamePrefix(io-);executor.initialize();return executor;}____INLINE_4.4 性能对比指标纯串行多 Agent 并行总耗时~3.5s~1.2s吞吐量提升1x~3x失败隔离无支付失败可回滚库存五、容错与可观测性5.1 错误处理策略____INLINE_javapublic TaskResult executeWithRetry(SubTask task) {int maxAttempts 3;for (int i 0; i maxAttempts; i) {try {return executor.execute(task);} catch (Exception e) {log.warn(Task {} attempt {} failed: {}, task.getId(), i1, e.getMessage());if (i maxAttempts - 1) {// 最终失败返回兜底结果return TaskResult.fallback(task.getId(), getDefaultValue(task));}}}return TaskResult.fallback(task.getId(), getDefaultValue(task));}____INLINE_5.2 监控埋点____INLINE_javaAspectComponentpublic class AgentMetricsAspect {Around(execution(com.example.agent.Agent.execute(..)))public Object measure(ProceedingJoinPoint pjp) throws Throwable {String agentName pjp.getTarget().getClass().getSimpleName();long start System.currentTimeMillis();try {Object result pjp.proceed();metrics.record(agentName, success, System.currentTimeMillis() - start);return result;} catch (Exception e) {metrics.record(agentName, failure, System.currentTimeMillis() - start);throw e;}}}__六、总结多 Agent 高阶实战的核心在于1.架构分层调度层专注编排执行层专注业务专家层提供深度能力2.DAG 驱动通过有向无环图管理任务依赖确保正确执行顺序3.跨服务协作消息队列 共享存储实现服务间解耦与状态同步4.容错兜底重试机制 默认值策略确保系统稳定性5.可观测全链路埋点监控快速定位问题生产级多 Agent 系统不是一蹴而就需要根据业务复杂度逐步演进。从简单场景入手逐步引入分层架构、动态编排、服务协作最终实现高效、可靠、可扩展的智能业务系统。作者洛水石作者洛水石