1. 项目概述从单点智能到协同智能的跃迁最近在做一个挺有意思的探索想和大家聊聊怎么用Python和Claude API搭建一个多智能体AI流水线。这玩意儿听起来有点玄乎但说白了就是让几个AI“小人儿”各司其职像工厂流水线一样协同完成一个复杂任务。比如你扔给它一篇冗长的市场报告它能自动分解任务一个智能体负责总结核心观点一个负责提取关键数据并生成图表建议另一个则根据前两者的输出草拟一份给高管的执行摘要。整个过程自动流转你只需要在开头给个指令最后收个整合好的成果就行。我之所以折腾这个是因为在实际工作中尤其是处理分析、创作、代码审查这类多步骤任务时单一AI模型的“一次性问答”模式越来越显得力不从心。它要么容易在长上下文中丢失重点要么无法兼顾深度分析和创造性生成。而多智能体架构通过角色划分、任务分解和流程编排恰恰能弥补这些短板。Claude模型在长文本理解、逻辑推理和指令遵循上的优异表现让它成为构建这类智能体的理想“大脑”。Python则提供了无与伦比的灵活性和丰富的生态库用来做流程的“骨架”和“神经系统”再合适不过。这套流水线适合谁呢如果你经常需要处理重复性的、但又有一定复杂度的文本分析、内容生成或数据处理任务并且希望将AI从“聊天伙伴”升级为“自动化助手”那么这个思路会给你带来不少启发。即使你不是开发者理解其设计理念也能更好地规划如何利用现有工具比如Zapier、Make等来实现类似的多步骤自动化。接下来我就把自己搭建过程中的核心设计、踩过的坑以及具体的实现代码毫无保留地拆解给你看。2. 核心架构设计与智能体角色定义2.1 为什么选择“流水线”与“智能体”模式在开始敲代码之前得先想清楚架构。为什么是“流水线(Pipeline)”和“智能体(Agent)”这源于对复杂任务本质的洞察。一个复杂任务比如“分析某季度财报并生成投资建议”可以自然地分解为多个子任务信息提取、数据计算、优劣势分析、风险提示、报告撰写等。这些子任务环环相扣后一个任务的输入依赖于前一个任务的输出。传统的单次API调用需要你把所有指令和上下文塞进一个prompt里不仅容易超出token限制而且模型要同时扮演分析师、会计师、文秘等多个角色效果容易打折扣出现“精神分裂”——前面还在算数后面就开始抒情。而流水线模式让每个步骤由一个专门的“智能体”负责。这个智能体并非一个独立的AI模型而是一个**“角色定义Role 专属指令Instruction Claude API调用”的组合体**。每个智能体只专注于一件事并且拥有清晰的前置输入和后置输出规范。这样做有几个显著优势专注与优化每个智能体可以配备高度定制化的系统提示词System Prompt使其在该特定任务上表现更专业、更稳定。可追溯与可调试任务在哪个环节出了问题可以快速定位。是分析员的提取不准还是撰写员的格式不对一目了然。灵活与可扩展新的处理环节可以像乐高积木一样插入流水线。例如在分析之后、撰写之前可以加入一个“事实核查”智能体。成本与性能优化对于简单的分类、提取任务或许可以使用更小、更快的模型对于需要深度推理的总结、创作任务则使用能力更强的模型。在同一个流水线内混合使用不同模型成为可能虽然本项目初期统一使用Claude。2.2 智能体角色规划实例内容创作流水线理论说多了有点空我们以一个具体的“技术博文创作辅助流水线”为例来定义三个核心智能体。假设我们的目标是输入一个技术概念比如“Python的装饰器”自动生成一篇结构清晰的博文草稿。我设计了以下三个智能体大纲架构师 (Outline Architect)职责接收核心主题生成一份详细的、逻辑连贯的博文大纲。输入用户提供的核心主题如“Python装饰器详解”。输出一个包含H1, H2, H3标题、每个章节核心论点及所需代码示例说明的Markdown格式大纲。核心指令设计要点强调逻辑性、循序渐进从概念到应用、考虑读者认知曲线、明确标注出需要代码示例的位置。内容填充员 (Content Writer)职责根据大纲中的每一个H2章节展开撰写详细的文字内容。输入大纲中指定的某个章节标题、核心论点及上下文如前一个章节的内容摘要。输出该章节的完整段落文字包括概念解释、类比说明、论点阐述等。核心指令设计要点文风需与技术博客匹配亲切、易懂、略带趣味严格遵循提供的论点使用恰当的过渡句避免重复大纲中的标题文字。代码生成与审查员 (Code Generator Reviewer)职责为大纲中标记需要代码的章节生成准确、简洁、注释良好的示例代码并在所有内容生成后通篇检查代码与文字描述的匹配度。输入需要代码的章节描述、以及相关的文字内容。输出符合Python PEP 8规范的代码块以及可选的代码逻辑解释。最终审查报告。核心指令设计要点代码必须可运行、注释需解释“为什么”这么做而不仅仅是“做了什么”优先使用标准库审查时需检查代码是否解决了文字中描述的问题。注意角色定义是成功的关键。指令Instruction要写得像一份给新员工的“岗位说明书”越具体、越可操作越好。避免使用“生成好的内容”这种模糊表述而是用“采用技术博客常见的口语化风格用‘我们’代替‘笔者’每段不超过5行”这样的明确要求。2.3 流水线编排模式串行 vs. 并行 vs. 有条件分支智能体定义好了如何组织它们这里有几种常见模式串行流水线 (Sequential Pipeline)最简单直接。A - B - C。大纲架构师先工作它的输出完整地传递给内容填充员填充员完成所有章节后再交给代码生成员。优点是逻辑简单状态管理方便。缺点是耗时长且后置环节无法给前置环节反馈。并行处理 (Parallel Processing)适用于子任务间独立性高的场景。例如内容填充员可以同时撰写多个章节如果API调用配额允许。这能极大缩短总耗时。需要引入异步编程和结果聚合机制。有条件分支 (Conditional Branching)根据中间结果决定下一步走向。例如大纲架构师生成大纲后由一个“评审员”智能体判断大纲质量。若合格则进入内容填充若不合格则返回给架构师重写或通知用户。这使流水线具备了初步的“决策”能力。在我的实现中我选择了**“主串行辅并行”**的混合模式。即大纲生成必须首先串行完成。然后基于大纲的各个独立章节可以并行调用多个“内容填充员”实例来同时撰写需注意Claude API的速率限制。最后代码生成与审查再串行进行。这样在保证核心逻辑顺序的同时尽可能提升了效率。3. 技术实现用Python构建流水线骨架3.1 环境准备与依赖管理工欲善其事必先利其器。我们首先需要一个干净的Python环境。我强烈推荐使用conda或venv创建虚拟环境避免包冲突。# 使用 conda 创建环境 conda create -n ai-pipeline python3.10 conda activate ai-pipeline # 或使用 venv python -m venv ai-pipeline source ai-pipeline/bin/activate # Linux/Mac # ai-pipeline\Scripts\activate # Windows接下来是安装核心依赖。我们主要需要两个库anthropic官方Claude SDK和pydantic用于数据验证和设置管理。另外我会用python-dotenv管理API密钥用loguru或内置的logging模块来记录流水线运行日志这对调试至关重要。pip install anthropic pydantic python-dotenv loguru项目目录结构我这样组织multi_agent_pipeline/ ├── .env # 存储ANTHROPIC_API_KEY ├── config.py # 配置类Pydantic模型 ├── agents/ # 智能体模块目录 │ ├── __init__.py │ ├── base_agent.py # 智能体基类 │ ├── outline_architect.py │ ├── content_writer.py │ └── code_reviewer.py ├── pipeline/ # 流水线编排模块 │ ├── __init__.py │ └── sequential_pipeline.py ├── models/ # 数据模型定义输入输出结构 │ ├── __init__.py │ └── schemas.py ├── utils/ # 工具函数如日志、API错误处理 │ ├── __init__.py │ └── logger.py └── main.py # 主程序入口3.2 智能体基类与Claude客户端封装所有智能体都有共同的行为接收输入、调用Claude API、解析输出。因此抽象一个基类BaseAgent是明智的选择。这个基类负责初始化Claude客户端、提供通用的调用方法、以及处理错误和日志。首先在.env文件中设置你的API密钥ANTHROPIC_API_KEYyour_anthropic_api_key_here然后在config.py中我用pydantic的BaseSettings来管理配置它能自动从环境变量加载值非常方便安全。# config.py from pydantic_settings import BaseSettings from pydantic import Field class Settings(BaseSettings): anthropic_api_key: str Field(..., envANTHROPIC_API_KEY) anthropic_model: str Field(defaultclaude-3-sonnet-20240229) # 可根据需要切换模型如haiku, opus max_tokens: int Field(default4000) temperature: float Field(default0.7) # 控制创造性分析任务可调低如0.2创作任务可调高如0.8-1.0 class Config: env_file .env settings Settings()接着创建智能体基类# agents/base_agent.py import logging from abc import ABC, abstractmethod from typing import Any, Dict, Optional import anthropic from config import settings class BaseAgent(ABC): 所有智能体的抽象基类 def __init__(self, name: str, system_prompt: str): self.name name self.system_prompt system_prompt self.client anthropic.Anthropic(api_keysettings.anthropic_api_key) self.logger logging.getLogger(self.name) async def call_claude(self, user_prompt: str, **kwargs) - str: 调用Claude API的通用方法。使用异步以支持未来并行化。 try: message await self.client.messages.create( modelkwargs.get(model, settings.anthropic_model), max_tokenskwargs.get(max_tokens, settings.max_tokens), temperaturekwargs.get(temperature, settings.temperature), systemself.system_prompt, messages[{role: user, content: user_prompt}] ) response_text message.content[0].text self.logger.info(fAgent {self.name} 调用成功消耗token: 输入约{message.usage.input_tokens}, 输出约{message.usage.output_tokens}) return response_text except anthropic.APIError as e: self.logger.error(fAgent {self.name} API调用失败: {e}) # 这里可以加入重试逻辑 raise except Exception as e: self.logger.error(fAgent {self.name} 发生未知错误: {e}) raise abstractmethod async def execute(self, input_data: Any) - Any: 每个智能体必须实现的核心执行逻辑 pass实操心得在基类中统一进行API调用和错误处理避免了在每个具体智能体中重复编写模板代码。使用异步async/await是为未来实现并行调用预留的接口。即使你现在用同步方式调用这个结构也是好的。另外务必记录每次调用的token消耗这对成本监控和优化至关重要。3.3 具体智能体实现以大纲架构师为例有了基类实现具体智能体就变得非常清晰。我们来实现OutlineArchitect。首先在models/schemas.py中定义输入输出数据的结构这能让我们对数据流更有把握。# models/schemas.py from pydantic import BaseModel from typing import List, Optional class OutlineSection(BaseModel): 大纲中单个章节的模型 level: int # 1 for H1, 2 for H2, 3 for H3 title: str key_points: List[str] # 该章节需要阐述的核心论点 needs_code_example: bool False code_example_description: Optional[str] None # 对所需代码的简要描述 class BlogOutline(BaseModel): 完整的博文大纲模型 title: str # H1 标题 introduction: str # 引言部分概要 sections: List[OutlineSection] # 所有章节 conclusion: str # 结论部分概要然后实现智能体本身# agents/outline_architect.py import json from agents.base_agent import BaseAgent from models.schemas import BlogOutline import logging class OutlineArchitect(BaseAgent): def __init__(self): # 精心设计的系统提示词这是智能体的“灵魂” system_prompt 你是一位资深的科技博客编辑和架构师。你的任务是根据用户提供的核心主题生成一份详细、逻辑严谨、适合初学者到中级开发者的博文大纲。 请严格按照以下要求输出 1. 输出必须是一个完整的JSON对象能被Python的json.loads解析。 2. JSON结构必须包含title字符串博文主标题、introduction字符串引言段落概要、sections数组、conclusion字符串结论概要。 3. sections数组中的每个元素是一个对象包含level整数2代表H23代表H3、title字符串、key_points字符串数组该节的核心论点、needs_code_example布尔值、code_example_description字符串可选如果需要代码则描述代码功能。 4. 大纲应遵循“总-分-总”结构概念引入 - 原理剖析 - 实战应用 - 常见问题 - 总结展望。 5. 确保章节间有逻辑递进关系标题清晰具体避免“概述”、“其他”等模糊标题。 6. 对于涉及编程的概念在相应的章节明确标记needs_code_example为true并简要描述示例要展示什么。 super().__init__(nameOutlineArchitect, system_promptsystem_prompt) async def execute(self, topic: str) - BlogOutline: 执行大纲生成任务 user_prompt f请为以下技术主题创作一篇博文大纲{topic} self.logger.info(f开始生成大纲主题: {topic}) raw_response await self.call_claude(user_prompt) # 尝试解析Claude返回的JSON try: # 有时Claude的回复会包含一些解释性文字我们需要提取JSON部分 # 一个简单的策略查找第一个{和最后一个} json_start raw_response.find({) json_end raw_response.rfind(}) 1 if json_start ! -1 and json_end ! 0: json_str raw_response[json_start:json_end] outline_dict json.loads(json_str) else: # 如果没有找到大括号尝试直接解析整个响应风险较高 self.logger.warning(响应中未找到明显的JSON边界尝试直接解析。) outline_dict json.loads(raw_response) # 使用Pydantic模型验证和转换数据 blog_outline BlogOutline(**outline_dict) self.logger.info(f大纲生成成功包含 {len(blog_outline.sections)} 个主要章节。) return blog_outline except json.JSONDecodeError as e: self.logger.error(f解析Claude响应为JSON失败。原始响应:\n{raw_response}\n错误: {e}) # 可以在这里加入fallback逻辑例如尝试用文本解析或者抛出特定异常由流水线处理 raise ValueError(f大纲架构师返回了无效的JSON格式: {e})这个实现的关键点在于系统提示词System Prompt的精确性它明确规定了输出格式JSON、结构字段、内容逻辑和风格要求。这相当于给Claude戴上了“大纲架构师”的帽子。结构化输出解析我们要求Claude返回JSON并编写了健壮的代码来提取和解析它。使用Pydantic模型进行验证能第一时间发现数据结构问题避免错误在流水线中传递。清晰的执行接口execute方法接收一个字符串主题返回一个BlogOutline对象。输入输出明确便于流水线编排。按照同样的模式我们可以实现ContentWriter和CodeReviewer。ContentWriter的execute方法接收一个OutlineSection和可能的上下文返回一个字符串章节内容。CodeReviewer的execute方法接收完整的草稿文字代码返回一个修订建议或直接返回修订后的版本。4. 流水线编排与任务调度4.1 构建串行流水线控制器智能体是工人流水线控制器就是工头。它负责把任务按顺序传递给正确的智能体并管理它们之间的数据传递。我们先实现一个基础的串行流水线。# pipeline/sequential_pipeline.py import asyncio from typing import List, Any, Dict import logging from models.schemas import BlogOutline class SequentialPipeline: 一个简单的串行流水线执行器 def __init__(self): self.logger logging.getLogger(SequentialPipeline) self.tasks [] # 存储要执行的任务序列智能体输入 def add_task(self, agent, agent_input): 向流水线添加一个任务 self.tasks.append({agent: agent, input: agent_input}) async def run(self) - List[Any]: 顺序执行所有任务返回每个任务的结果列表 results [] context {} # 用于在任务间传递额外上下文信息 for i, task in enumerate(self.tasks): agent task[agent] agent_input task[input] self.logger.info(f开始执行任务 {i1}/{len(self.tasks)}: {agent.name}) # 这里可以设计更复杂的输入准备逻辑例如将上一个结果和context合并作为输入 actual_input self._prepare_input(agent_input, context, results) try: result await agent.execute(actual_input) results.append(result) # 更新上下文例如将当前结果的关键信息存入context供后续任务使用 self._update_context(context, agent.name, result) self.logger.info(f任务 {i1} 执行成功。) except Exception as e: self.logger.error(f任务 {i1} 执行失败流水线中止。错误: {e}) # 可以选择重试、跳过或完全中止 raise PipelineExecutionError(f任务 {agent.name} 失败) from e return results def _prepare_input(self, agent_input, context, previous_results): 根据智能体类型和上下文准备输入数据。这是一个可扩展的钩子。 # 简单实现直接返回agent_input。复杂情况下可以整合previous_results和context。 return agent_input def _update_context(self, context, agent_name, result): 更新共享上下文。 # 例如存储大纲的标题供内容撰写员在生成引言时使用 if agent_name OutlineArchitect and isinstance(result, BlogOutline): context[blog_title] result.title context[main_sections] [s.title for s in result.sections if s.level 2] # 可以根据需要添加更多上下文更新逻辑4.2 实现并行执行同时撰写多个章节串行执行时内容填充员需要等上一个章节写完才能写下一个效率低下。由于各章节相对独立我们可以并行执行。修改我们的流水线控制器使其支持对一组可并行任务进行并发处理。我们需要引入一个ParallelProcessor或者扩展SequentialPipeline。这里我选择创建一个新的HybridPipeline类来演示混合模式。# pipeline/hybrid_pipeline.py import asyncio from typing import List, Any, Dict, Coroutine import logging class HybridPipeline: 支持串行和并行任务的混合流水线 def __init__(self): self.logger logging.getLogger(HybridPipeline) self.stages [] # 每个stage是一个字典{type: sequential/parallel, tasks: [...]} def add_sequential_stage(self, tasks: List[Dict]): 添加一个串行阶段tasks是一组按顺序执行的任务 self.stages.append({type: sequential, tasks: tasks}) def add_parallel_stage(self, tasks: List[Dict]): 添加一个并行阶段tasks是一组可以并发执行的任务 self.stages.append({type: parallel, tasks: tasks}) async def run(self) - List[Any]: 执行流水线所有阶段 all_results [] global_context {} for stage_index, stage in enumerate(self.stages): self.logger.info(f进入流水线阶段 {stage_index1}: 类型{stage[type]}, 任务数{len(stage[tasks])}) stage_results [] if stage[type] sequential: # 串行执行该阶段内的任务 for task in stage[tasks]: result await self._execute_task(task, global_context, all_results) stage_results.append(result) elif stage[type] parallel: # 并行执行该阶段内的所有任务 tasks_coroutines [] for task in stage[tasks]: # 创建任务协程但不立即执行 coro self._execute_task(task, global_context, all_results) tasks_coroutines.append(coro) # 使用asyncio.gather并发执行 stage_results await asyncio.gather(*tasks_coroutines, return_exceptionsTrue) # 处理并行任务中的异常 for i, result in enumerate(stage_results): if isinstance(result, Exception): self.logger.error(f并行任务 {i} 执行失败: {result}) # 可以选择是终止整个流水线还是记录错误并继续 # 这里我们选择抛出异常 raise PipelineExecutionError(f并行阶段任务失败: {result}) all_results.extend(stage_results) # 更新全局上下文例如基于本阶段的结果 self._update_global_context(global_context, stage_results, stage[type]) return all_results async def _execute_task(self, task: Dict, context: Dict, previous_results: List[Any]) - Any: 执行单个任务 agent task[agent] agent_input task.get(input) prepared_input self._prepare_task_input(agent_input, context, previous_results, agent.name) self.logger.debug(f执行任务: {agent.name}, 输入已准备。) return await agent.execute(prepared_input) def _prepare_task_input(self, raw_input, context, previous_results, agent_name): 准备任务输入数据可根据智能体类型定制 # 这是一个简化版。实际应用中这里会有复杂的逻辑。 # 例如对于ContentWriterraw_input可能是一个(section_id, outline)元组 # 我们需要从outline和context中构造出完整的prompt。 if agent_name ContentWriter and isinstance(raw_input, tuple): section_id, full_outline raw_input section full_outline.sections[section_id] # 构造一个包含章节信息、前后章节标题等上下文的prompt prev_section_title full_outline.sections[section_id-1].title if section_id 0 else None next_section_title full_outline.sections[section_id1].title if section_id len(full_outline.sections)-1 else None prompt f 请撰写博客章节{section.title}。 这是博文《{context.get(blog_title, )}》的一部分。 核心论点{, .join(section.key_points)}。 {上一个章节是关于 prev_section_title if prev_section_title else } {下一个章节将讨论 next_section_title if next_section_title else } 请写出详细、易懂的内容约300-500字。 return prompt # 默认情况返回原始输入 return raw_input def _update_global_context(self, context, stage_results, stage_type): 根据阶段结果更新全局上下文 # 示例如果刚完成大纲阶段将大纲标题存入上下文 if stage_type sequential and stage_results: from models.schemas import BlogOutline for result in stage_results: if isinstance(result, BlogOutline): context[blog_title] result.title context[full_outline] result break现在我们可以在主程序里这样使用混合流水线# main.py 示例片段 import asyncio from agents.outline_architect import OutlineArchitect from agents.content_writer import ContentWriter from agents.code_reviewer import CodeReviewer from pipeline.hybrid_pipeline import HybridPipeline from models.schemas import BlogOutline import logging logging.basicConfig(levellogging.INFO) async def main(): topic Python中的异步编程asyncio入门详解 # 1. 初始化智能体 outline_agent OutlineArchitect() # 创建多个内容撰写员实例或复用同一个但注意API速率限制 writer_agent_1 ContentWriter(nameWriter_Section1) writer_agent_2 ContentWriter(nameWriter_Section2) # ... 可以创建更多 code_agent CodeReviewer() # 2. 初始化流水线 pipeline HybridPipeline() # 3. 构建任务阶段 # 阶段1串行 - 生成大纲 pipeline.add_sequential_stage([ {agent: outline_agent, input: topic} ]) # 阶段2并行 - 撰写多个核心章节假设我们并行写前两个H2章节 # 注意这里需要等阶段1完成后才能获得outline结果。所以input是一个占位符或函数。 # 更优雅的做法是使用“任务工厂”或lambda这里为清晰起见稍后在实际运行前动态赋值。 # 阶段3串行 - 代码生成与审查需要所有章节内容 pipeline.add_sequential_stage([ {agent: code_agent, input: None} # input将在运行时填充 ]) # 4. 运行流水线需要更精细的输入传递控制此处为概念展示 # 实际实现中需要在pipeline.run()内部根据上一阶段的结果动态构造下一阶段的输入。 # 这要求流水线设计更高级的数据流管理可能涉及回调或更复杂的上下文对象。 # 一个简化的运行逻辑伪代码思路 results [] # 运行阶段1 stage1_results await pipeline.run_stage(0) outline stage1_results[0] # 获取大纲 # 准备阶段2的并行任务输入 parallel_tasks [] for i, section in enumerate(outline.sections[:2]): # 假设并行写前两章 task {agent: ContentWriter(namefWriter_{i}), input: (i, outline)} parallel_tasks.append(task) pipeline.replace_stage_tasks(1, parallel_tasks) # 替换阶段2的任务列表 # 运行阶段2 stage2_results await pipeline.run_stage(1) all_content stage1_results stage2_results # 准备阶段3的输入合并所有生成的内容 full_draft combine_draft(outline, all_content) pipeline.replace_stage_tasks(2, [{agent: code_agent, input: full_draft}]) # 运行阶段3 stage3_results await pipeline.run_stage(2) final_result stage3_results[0] print(流水线执行完成) print(final_result) if __name__ __main__: asyncio.run(main())这段代码展示了混合流水线的概念但真实的实现需要更完善的数据流管理和错误处理机制。核心思想是将任务编排的“控制逻辑”与智能体的“执行逻辑”分离。流水线控制器负责调度和传递数据智能体只关心如何完成自己的本职工作。5. 错误处理、日志与性能优化5.1 健壮性设计错误处理与重试机制在生产环境中网络波动、API限流、模型偶尔的“胡言乱语”都会发生。一个健壮的流水线必须能妥善处理这些异常。API调用重试在BaseAgent.call_claude方法中可以增加重试逻辑针对网络超时、速率限制429错误等进行指数退避重试。# agents/base_agent.py (补充) import time from anthropic import APIError, RateLimitError, APITimeoutError async def call_claude_with_retry(self, user_prompt: str, max_retries: int 3, **kwargs) - str: 带重试机制的API调用 last_exception None for attempt in range(max_retries): try: return await self.call_claude(user_prompt, **kwargs) except (RateLimitError, APITimeoutError) as e: last_exception e wait_time (2 ** attempt) 1 # 指数退避 self.logger.warning(fAttempt {attempt1} failed with {type(e).__name__}. Retrying in {wait_time}s...) await asyncio.sleep(wait_time) except APIError as e: # 对于其他API错误如认证失败、无效请求通常重试无益 self.logger.error(fAPI Error (non-retryable): {e}) raise # 所有重试都失败 self.logger.error(fAll {max_retries} retry attempts failed. Last error: {last_exception}) raise last_exception智能体级错误处理每个智能体的execute方法应该捕获可能出现的业务逻辑错误如JSON解析失败并转换为有意义的异常类型或者返回一个表示失败的特定对象如None或一个包含错误信息的Result对象由流水线控制器决定如何应对跳过、重试、终止。流水线级错误处理流水线控制器需要捕获智能体抛出的异常并决定整个流水线的命运。是继续执行下一个不依赖此结果的任务还是完全停止这取决于业务逻辑。可以在HybridPipeline._execute_task中包裹try-except将异常作为结果返回然后在run方法中统一检查。5.2 可观测性结构化日志与监控清晰的日志是调试和监控的命脉。我推荐使用loguru或配置Python标准logging模块输出结构化的JSON日志方便接入ELK等日志系统。# utils/logger.py import sys import json from loguru import logger import datetime def setup_logger(): 配置日志格式和输出 # 移除默认配置 logger.remove() # 控制台输出开发时用 logger.add( sys.stderr, formatgreen{time:YYYY-MM-DD HH:mm:ss}/green | level{level: 8}/level | cyan{name}/cyan:cyan{function}/cyan:cyan{line}/cyan - level{message}/level, levelINFO ) # 文件输出JSON格式便于分析 logger.add( logs/pipeline_{time:YYYY-MM-DD}.log, formatlambda record: json.dumps({ time: record[time].isoformat(), level: record[level].name, agent: record[extra].get(agent_name, system), pipeline_id: record[extra].get(pipeline_id, default), message: record[message], task: record[extra].get(task, {}), }), levelDEBUG, rotation1 day, retention30 days, serializeTrue # 输出为JSON字符串 ) return logger # 在智能体中使用 class BaseAgent: def __init__(self, name, system_prompt): self.name name # ... 其他初始化 self.logger logger.bind(agent_namename)除了日志还可以记录每次API调用的耗时、token使用量并推送到监控仪表盘如Grafana以便实时了解流水线健康度和成本。5.3 性能与成本优化策略随着流水线复杂化性能和成本成为关键考量。模型选型不是所有任务都需要最强的claude-3-opus。claude-3-haiku速度更快、成本更低非常适合分类、简单提取、格式化等轻量任务。可以在BaseAgent或具体智能体初始化时指定模型。例如OutlineArchitect和ContentWriter用sonnet而一个只做关键词提取的智能体可以用haiku。缓存对于输入相同或相似的任务结果可以缓存。例如如果流水线经常处理相似主题大纲可能差别不大。可以使用functools.lru_cache内存缓存或外部缓存如Redis分布式缓存来存储智能体的输入输出对。注意需谨慎评估缓存的有效性避免因细微的prompt差异导致返回不恰当的结果。异步与并发如前所述利用asyncio.gather并发执行独立任务。但要密切关注Claude API的并发请求限制Rate Limits避免触发429错误。需要实现一个简单的信号量asyncio.Semaphore来控制最大并发数。# 在HybridPipeline中控制并发 class HybridPipeline: def __init__(self, max_concurrent_tasks: int 5): # 限制并发数 self.semaphore asyncio.Semaphore(max_concurrent_tasks) # ... async def _execute_task(self, task: Dict, ...) - Any: async with self.semaphore: # 控制并发 # ... 实际执行任务 return await agent.execute(prepared_input)Token使用优化精简Prompt系统提示词和用户提示词都要力求简洁准确移除不必要的客气话和重复描述。上下文管理传递给Claude的上下文如之前生成的内容不宜过长。可以设计一个“总结器”智能体将长上下文总结成要点再传递给下一个环节。输出限制合理设置max_tokens避免为简短回答分配过多额度。6. 扩展思路与高级应用场景基础的多智能体流水线搭建完成后你可以根据需求进行无限扩展。这里分享几个进阶思路动态智能体路由不是所有任务都走固定流程。可以引入一个“调度员”智能体根据初始任务描述或中间结果动态决定调用哪个智能体、以什么顺序执行。这更接近AutoGPT或CrewAI的理念。人类在环Human-in-the-loop在关键节点引入人工审核。例如大纲生成后将结果发送到Slack或生成一个预览网页等待人工确认后再进入下一阶段。流水线可以暂停并等待外部事件webhook回调。工具调用Function Calling集成让智能体不仅能生成文本还能调用外部工具。例如让“数据提取”智能体在分析报告时调用一个内部函数来查询数据库获取最新数据让“代码生成”智能体直接调用GitHub API创建PR。这需要利用Claude的Tool Use功能。长期记忆与知识库为智能体配备向量数据库如Chroma, Pinecone使其在执行任务时能参考过往的历史记录或公司内部文档生成更精准、个性化的内容。应用于特定垂直领域客户支持智能体1分类判断工单类型 - 智能体2检索从知识库找解决方案 - 智能体3润色生成回复草稿 - 智能体4审核检查语气和准确性。内部数据分析智能体1解析理解自然语言查询 - 智能体2转换将其转为SQL - 智能体3执行查询数据库 - 智能体4可视化生成图表描述和解读。游戏设计智能体1设定生成世界观梗概 - 智能体2角色设计主要人物 - 智能体3叙事编写关键剧情 - 智能体4关卡描述场景和挑战。搭建多智能体系统的过程本质上是在用代码定义一套协同工作的“思维模式”。它不会取代人类的创造力而是将人类从重复、繁琐的步骤中解放出来让我们能更专注于更高层次的策略和决策。从用一个智能体生成一段文案到用一组智能体管理一个内容项目这中间的效率提升和可能性拓展才是这项技术最迷人的地方。
Python与Claude API构建多智能体AI流水线:从架构设计到工程实践
1. 项目概述从单点智能到协同智能的跃迁最近在做一个挺有意思的探索想和大家聊聊怎么用Python和Claude API搭建一个多智能体AI流水线。这玩意儿听起来有点玄乎但说白了就是让几个AI“小人儿”各司其职像工厂流水线一样协同完成一个复杂任务。比如你扔给它一篇冗长的市场报告它能自动分解任务一个智能体负责总结核心观点一个负责提取关键数据并生成图表建议另一个则根据前两者的输出草拟一份给高管的执行摘要。整个过程自动流转你只需要在开头给个指令最后收个整合好的成果就行。我之所以折腾这个是因为在实际工作中尤其是处理分析、创作、代码审查这类多步骤任务时单一AI模型的“一次性问答”模式越来越显得力不从心。它要么容易在长上下文中丢失重点要么无法兼顾深度分析和创造性生成。而多智能体架构通过角色划分、任务分解和流程编排恰恰能弥补这些短板。Claude模型在长文本理解、逻辑推理和指令遵循上的优异表现让它成为构建这类智能体的理想“大脑”。Python则提供了无与伦比的灵活性和丰富的生态库用来做流程的“骨架”和“神经系统”再合适不过。这套流水线适合谁呢如果你经常需要处理重复性的、但又有一定复杂度的文本分析、内容生成或数据处理任务并且希望将AI从“聊天伙伴”升级为“自动化助手”那么这个思路会给你带来不少启发。即使你不是开发者理解其设计理念也能更好地规划如何利用现有工具比如Zapier、Make等来实现类似的多步骤自动化。接下来我就把自己搭建过程中的核心设计、踩过的坑以及具体的实现代码毫无保留地拆解给你看。2. 核心架构设计与智能体角色定义2.1 为什么选择“流水线”与“智能体”模式在开始敲代码之前得先想清楚架构。为什么是“流水线(Pipeline)”和“智能体(Agent)”这源于对复杂任务本质的洞察。一个复杂任务比如“分析某季度财报并生成投资建议”可以自然地分解为多个子任务信息提取、数据计算、优劣势分析、风险提示、报告撰写等。这些子任务环环相扣后一个任务的输入依赖于前一个任务的输出。传统的单次API调用需要你把所有指令和上下文塞进一个prompt里不仅容易超出token限制而且模型要同时扮演分析师、会计师、文秘等多个角色效果容易打折扣出现“精神分裂”——前面还在算数后面就开始抒情。而流水线模式让每个步骤由一个专门的“智能体”负责。这个智能体并非一个独立的AI模型而是一个**“角色定义Role 专属指令Instruction Claude API调用”的组合体**。每个智能体只专注于一件事并且拥有清晰的前置输入和后置输出规范。这样做有几个显著优势专注与优化每个智能体可以配备高度定制化的系统提示词System Prompt使其在该特定任务上表现更专业、更稳定。可追溯与可调试任务在哪个环节出了问题可以快速定位。是分析员的提取不准还是撰写员的格式不对一目了然。灵活与可扩展新的处理环节可以像乐高积木一样插入流水线。例如在分析之后、撰写之前可以加入一个“事实核查”智能体。成本与性能优化对于简单的分类、提取任务或许可以使用更小、更快的模型对于需要深度推理的总结、创作任务则使用能力更强的模型。在同一个流水线内混合使用不同模型成为可能虽然本项目初期统一使用Claude。2.2 智能体角色规划实例内容创作流水线理论说多了有点空我们以一个具体的“技术博文创作辅助流水线”为例来定义三个核心智能体。假设我们的目标是输入一个技术概念比如“Python的装饰器”自动生成一篇结构清晰的博文草稿。我设计了以下三个智能体大纲架构师 (Outline Architect)职责接收核心主题生成一份详细的、逻辑连贯的博文大纲。输入用户提供的核心主题如“Python装饰器详解”。输出一个包含H1, H2, H3标题、每个章节核心论点及所需代码示例说明的Markdown格式大纲。核心指令设计要点强调逻辑性、循序渐进从概念到应用、考虑读者认知曲线、明确标注出需要代码示例的位置。内容填充员 (Content Writer)职责根据大纲中的每一个H2章节展开撰写详细的文字内容。输入大纲中指定的某个章节标题、核心论点及上下文如前一个章节的内容摘要。输出该章节的完整段落文字包括概念解释、类比说明、论点阐述等。核心指令设计要点文风需与技术博客匹配亲切、易懂、略带趣味严格遵循提供的论点使用恰当的过渡句避免重复大纲中的标题文字。代码生成与审查员 (Code Generator Reviewer)职责为大纲中标记需要代码的章节生成准确、简洁、注释良好的示例代码并在所有内容生成后通篇检查代码与文字描述的匹配度。输入需要代码的章节描述、以及相关的文字内容。输出符合Python PEP 8规范的代码块以及可选的代码逻辑解释。最终审查报告。核心指令设计要点代码必须可运行、注释需解释“为什么”这么做而不仅仅是“做了什么”优先使用标准库审查时需检查代码是否解决了文字中描述的问题。注意角色定义是成功的关键。指令Instruction要写得像一份给新员工的“岗位说明书”越具体、越可操作越好。避免使用“生成好的内容”这种模糊表述而是用“采用技术博客常见的口语化风格用‘我们’代替‘笔者’每段不超过5行”这样的明确要求。2.3 流水线编排模式串行 vs. 并行 vs. 有条件分支智能体定义好了如何组织它们这里有几种常见模式串行流水线 (Sequential Pipeline)最简单直接。A - B - C。大纲架构师先工作它的输出完整地传递给内容填充员填充员完成所有章节后再交给代码生成员。优点是逻辑简单状态管理方便。缺点是耗时长且后置环节无法给前置环节反馈。并行处理 (Parallel Processing)适用于子任务间独立性高的场景。例如内容填充员可以同时撰写多个章节如果API调用配额允许。这能极大缩短总耗时。需要引入异步编程和结果聚合机制。有条件分支 (Conditional Branching)根据中间结果决定下一步走向。例如大纲架构师生成大纲后由一个“评审员”智能体判断大纲质量。若合格则进入内容填充若不合格则返回给架构师重写或通知用户。这使流水线具备了初步的“决策”能力。在我的实现中我选择了**“主串行辅并行”**的混合模式。即大纲生成必须首先串行完成。然后基于大纲的各个独立章节可以并行调用多个“内容填充员”实例来同时撰写需注意Claude API的速率限制。最后代码生成与审查再串行进行。这样在保证核心逻辑顺序的同时尽可能提升了效率。3. 技术实现用Python构建流水线骨架3.1 环境准备与依赖管理工欲善其事必先利其器。我们首先需要一个干净的Python环境。我强烈推荐使用conda或venv创建虚拟环境避免包冲突。# 使用 conda 创建环境 conda create -n ai-pipeline python3.10 conda activate ai-pipeline # 或使用 venv python -m venv ai-pipeline source ai-pipeline/bin/activate # Linux/Mac # ai-pipeline\Scripts\activate # Windows接下来是安装核心依赖。我们主要需要两个库anthropic官方Claude SDK和pydantic用于数据验证和设置管理。另外我会用python-dotenv管理API密钥用loguru或内置的logging模块来记录流水线运行日志这对调试至关重要。pip install anthropic pydantic python-dotenv loguru项目目录结构我这样组织multi_agent_pipeline/ ├── .env # 存储ANTHROPIC_API_KEY ├── config.py # 配置类Pydantic模型 ├── agents/ # 智能体模块目录 │ ├── __init__.py │ ├── base_agent.py # 智能体基类 │ ├── outline_architect.py │ ├── content_writer.py │ └── code_reviewer.py ├── pipeline/ # 流水线编排模块 │ ├── __init__.py │ └── sequential_pipeline.py ├── models/ # 数据模型定义输入输出结构 │ ├── __init__.py │ └── schemas.py ├── utils/ # 工具函数如日志、API错误处理 │ ├── __init__.py │ └── logger.py └── main.py # 主程序入口3.2 智能体基类与Claude客户端封装所有智能体都有共同的行为接收输入、调用Claude API、解析输出。因此抽象一个基类BaseAgent是明智的选择。这个基类负责初始化Claude客户端、提供通用的调用方法、以及处理错误和日志。首先在.env文件中设置你的API密钥ANTHROPIC_API_KEYyour_anthropic_api_key_here然后在config.py中我用pydantic的BaseSettings来管理配置它能自动从环境变量加载值非常方便安全。# config.py from pydantic_settings import BaseSettings from pydantic import Field class Settings(BaseSettings): anthropic_api_key: str Field(..., envANTHROPIC_API_KEY) anthropic_model: str Field(defaultclaude-3-sonnet-20240229) # 可根据需要切换模型如haiku, opus max_tokens: int Field(default4000) temperature: float Field(default0.7) # 控制创造性分析任务可调低如0.2创作任务可调高如0.8-1.0 class Config: env_file .env settings Settings()接着创建智能体基类# agents/base_agent.py import logging from abc import ABC, abstractmethod from typing import Any, Dict, Optional import anthropic from config import settings class BaseAgent(ABC): 所有智能体的抽象基类 def __init__(self, name: str, system_prompt: str): self.name name self.system_prompt system_prompt self.client anthropic.Anthropic(api_keysettings.anthropic_api_key) self.logger logging.getLogger(self.name) async def call_claude(self, user_prompt: str, **kwargs) - str: 调用Claude API的通用方法。使用异步以支持未来并行化。 try: message await self.client.messages.create( modelkwargs.get(model, settings.anthropic_model), max_tokenskwargs.get(max_tokens, settings.max_tokens), temperaturekwargs.get(temperature, settings.temperature), systemself.system_prompt, messages[{role: user, content: user_prompt}] ) response_text message.content[0].text self.logger.info(fAgent {self.name} 调用成功消耗token: 输入约{message.usage.input_tokens}, 输出约{message.usage.output_tokens}) return response_text except anthropic.APIError as e: self.logger.error(fAgent {self.name} API调用失败: {e}) # 这里可以加入重试逻辑 raise except Exception as e: self.logger.error(fAgent {self.name} 发生未知错误: {e}) raise abstractmethod async def execute(self, input_data: Any) - Any: 每个智能体必须实现的核心执行逻辑 pass实操心得在基类中统一进行API调用和错误处理避免了在每个具体智能体中重复编写模板代码。使用异步async/await是为未来实现并行调用预留的接口。即使你现在用同步方式调用这个结构也是好的。另外务必记录每次调用的token消耗这对成本监控和优化至关重要。3.3 具体智能体实现以大纲架构师为例有了基类实现具体智能体就变得非常清晰。我们来实现OutlineArchitect。首先在models/schemas.py中定义输入输出数据的结构这能让我们对数据流更有把握。# models/schemas.py from pydantic import BaseModel from typing import List, Optional class OutlineSection(BaseModel): 大纲中单个章节的模型 level: int # 1 for H1, 2 for H2, 3 for H3 title: str key_points: List[str] # 该章节需要阐述的核心论点 needs_code_example: bool False code_example_description: Optional[str] None # 对所需代码的简要描述 class BlogOutline(BaseModel): 完整的博文大纲模型 title: str # H1 标题 introduction: str # 引言部分概要 sections: List[OutlineSection] # 所有章节 conclusion: str # 结论部分概要然后实现智能体本身# agents/outline_architect.py import json from agents.base_agent import BaseAgent from models.schemas import BlogOutline import logging class OutlineArchitect(BaseAgent): def __init__(self): # 精心设计的系统提示词这是智能体的“灵魂” system_prompt 你是一位资深的科技博客编辑和架构师。你的任务是根据用户提供的核心主题生成一份详细、逻辑严谨、适合初学者到中级开发者的博文大纲。 请严格按照以下要求输出 1. 输出必须是一个完整的JSON对象能被Python的json.loads解析。 2. JSON结构必须包含title字符串博文主标题、introduction字符串引言段落概要、sections数组、conclusion字符串结论概要。 3. sections数组中的每个元素是一个对象包含level整数2代表H23代表H3、title字符串、key_points字符串数组该节的核心论点、needs_code_example布尔值、code_example_description字符串可选如果需要代码则描述代码功能。 4. 大纲应遵循“总-分-总”结构概念引入 - 原理剖析 - 实战应用 - 常见问题 - 总结展望。 5. 确保章节间有逻辑递进关系标题清晰具体避免“概述”、“其他”等模糊标题。 6. 对于涉及编程的概念在相应的章节明确标记needs_code_example为true并简要描述示例要展示什么。 super().__init__(nameOutlineArchitect, system_promptsystem_prompt) async def execute(self, topic: str) - BlogOutline: 执行大纲生成任务 user_prompt f请为以下技术主题创作一篇博文大纲{topic} self.logger.info(f开始生成大纲主题: {topic}) raw_response await self.call_claude(user_prompt) # 尝试解析Claude返回的JSON try: # 有时Claude的回复会包含一些解释性文字我们需要提取JSON部分 # 一个简单的策略查找第一个{和最后一个} json_start raw_response.find({) json_end raw_response.rfind(}) 1 if json_start ! -1 and json_end ! 0: json_str raw_response[json_start:json_end] outline_dict json.loads(json_str) else: # 如果没有找到大括号尝试直接解析整个响应风险较高 self.logger.warning(响应中未找到明显的JSON边界尝试直接解析。) outline_dict json.loads(raw_response) # 使用Pydantic模型验证和转换数据 blog_outline BlogOutline(**outline_dict) self.logger.info(f大纲生成成功包含 {len(blog_outline.sections)} 个主要章节。) return blog_outline except json.JSONDecodeError as e: self.logger.error(f解析Claude响应为JSON失败。原始响应:\n{raw_response}\n错误: {e}) # 可以在这里加入fallback逻辑例如尝试用文本解析或者抛出特定异常由流水线处理 raise ValueError(f大纲架构师返回了无效的JSON格式: {e})这个实现的关键点在于系统提示词System Prompt的精确性它明确规定了输出格式JSON、结构字段、内容逻辑和风格要求。这相当于给Claude戴上了“大纲架构师”的帽子。结构化输出解析我们要求Claude返回JSON并编写了健壮的代码来提取和解析它。使用Pydantic模型进行验证能第一时间发现数据结构问题避免错误在流水线中传递。清晰的执行接口execute方法接收一个字符串主题返回一个BlogOutline对象。输入输出明确便于流水线编排。按照同样的模式我们可以实现ContentWriter和CodeReviewer。ContentWriter的execute方法接收一个OutlineSection和可能的上下文返回一个字符串章节内容。CodeReviewer的execute方法接收完整的草稿文字代码返回一个修订建议或直接返回修订后的版本。4. 流水线编排与任务调度4.1 构建串行流水线控制器智能体是工人流水线控制器就是工头。它负责把任务按顺序传递给正确的智能体并管理它们之间的数据传递。我们先实现一个基础的串行流水线。# pipeline/sequential_pipeline.py import asyncio from typing import List, Any, Dict import logging from models.schemas import BlogOutline class SequentialPipeline: 一个简单的串行流水线执行器 def __init__(self): self.logger logging.getLogger(SequentialPipeline) self.tasks [] # 存储要执行的任务序列智能体输入 def add_task(self, agent, agent_input): 向流水线添加一个任务 self.tasks.append({agent: agent, input: agent_input}) async def run(self) - List[Any]: 顺序执行所有任务返回每个任务的结果列表 results [] context {} # 用于在任务间传递额外上下文信息 for i, task in enumerate(self.tasks): agent task[agent] agent_input task[input] self.logger.info(f开始执行任务 {i1}/{len(self.tasks)}: {agent.name}) # 这里可以设计更复杂的输入准备逻辑例如将上一个结果和context合并作为输入 actual_input self._prepare_input(agent_input, context, results) try: result await agent.execute(actual_input) results.append(result) # 更新上下文例如将当前结果的关键信息存入context供后续任务使用 self._update_context(context, agent.name, result) self.logger.info(f任务 {i1} 执行成功。) except Exception as e: self.logger.error(f任务 {i1} 执行失败流水线中止。错误: {e}) # 可以选择重试、跳过或完全中止 raise PipelineExecutionError(f任务 {agent.name} 失败) from e return results def _prepare_input(self, agent_input, context, previous_results): 根据智能体类型和上下文准备输入数据。这是一个可扩展的钩子。 # 简单实现直接返回agent_input。复杂情况下可以整合previous_results和context。 return agent_input def _update_context(self, context, agent_name, result): 更新共享上下文。 # 例如存储大纲的标题供内容撰写员在生成引言时使用 if agent_name OutlineArchitect and isinstance(result, BlogOutline): context[blog_title] result.title context[main_sections] [s.title for s in result.sections if s.level 2] # 可以根据需要添加更多上下文更新逻辑4.2 实现并行执行同时撰写多个章节串行执行时内容填充员需要等上一个章节写完才能写下一个效率低下。由于各章节相对独立我们可以并行执行。修改我们的流水线控制器使其支持对一组可并行任务进行并发处理。我们需要引入一个ParallelProcessor或者扩展SequentialPipeline。这里我选择创建一个新的HybridPipeline类来演示混合模式。# pipeline/hybrid_pipeline.py import asyncio from typing import List, Any, Dict, Coroutine import logging class HybridPipeline: 支持串行和并行任务的混合流水线 def __init__(self): self.logger logging.getLogger(HybridPipeline) self.stages [] # 每个stage是一个字典{type: sequential/parallel, tasks: [...]} def add_sequential_stage(self, tasks: List[Dict]): 添加一个串行阶段tasks是一组按顺序执行的任务 self.stages.append({type: sequential, tasks: tasks}) def add_parallel_stage(self, tasks: List[Dict]): 添加一个并行阶段tasks是一组可以并发执行的任务 self.stages.append({type: parallel, tasks: tasks}) async def run(self) - List[Any]: 执行流水线所有阶段 all_results [] global_context {} for stage_index, stage in enumerate(self.stages): self.logger.info(f进入流水线阶段 {stage_index1}: 类型{stage[type]}, 任务数{len(stage[tasks])}) stage_results [] if stage[type] sequential: # 串行执行该阶段内的任务 for task in stage[tasks]: result await self._execute_task(task, global_context, all_results) stage_results.append(result) elif stage[type] parallel: # 并行执行该阶段内的所有任务 tasks_coroutines [] for task in stage[tasks]: # 创建任务协程但不立即执行 coro self._execute_task(task, global_context, all_results) tasks_coroutines.append(coro) # 使用asyncio.gather并发执行 stage_results await asyncio.gather(*tasks_coroutines, return_exceptionsTrue) # 处理并行任务中的异常 for i, result in enumerate(stage_results): if isinstance(result, Exception): self.logger.error(f并行任务 {i} 执行失败: {result}) # 可以选择是终止整个流水线还是记录错误并继续 # 这里我们选择抛出异常 raise PipelineExecutionError(f并行阶段任务失败: {result}) all_results.extend(stage_results) # 更新全局上下文例如基于本阶段的结果 self._update_global_context(global_context, stage_results, stage[type]) return all_results async def _execute_task(self, task: Dict, context: Dict, previous_results: List[Any]) - Any: 执行单个任务 agent task[agent] agent_input task.get(input) prepared_input self._prepare_task_input(agent_input, context, previous_results, agent.name) self.logger.debug(f执行任务: {agent.name}, 输入已准备。) return await agent.execute(prepared_input) def _prepare_task_input(self, raw_input, context, previous_results, agent_name): 准备任务输入数据可根据智能体类型定制 # 这是一个简化版。实际应用中这里会有复杂的逻辑。 # 例如对于ContentWriterraw_input可能是一个(section_id, outline)元组 # 我们需要从outline和context中构造出完整的prompt。 if agent_name ContentWriter and isinstance(raw_input, tuple): section_id, full_outline raw_input section full_outline.sections[section_id] # 构造一个包含章节信息、前后章节标题等上下文的prompt prev_section_title full_outline.sections[section_id-1].title if section_id 0 else None next_section_title full_outline.sections[section_id1].title if section_id len(full_outline.sections)-1 else None prompt f 请撰写博客章节{section.title}。 这是博文《{context.get(blog_title, )}》的一部分。 核心论点{, .join(section.key_points)}。 {上一个章节是关于 prev_section_title if prev_section_title else } {下一个章节将讨论 next_section_title if next_section_title else } 请写出详细、易懂的内容约300-500字。 return prompt # 默认情况返回原始输入 return raw_input def _update_global_context(self, context, stage_results, stage_type): 根据阶段结果更新全局上下文 # 示例如果刚完成大纲阶段将大纲标题存入上下文 if stage_type sequential and stage_results: from models.schemas import BlogOutline for result in stage_results: if isinstance(result, BlogOutline): context[blog_title] result.title context[full_outline] result break现在我们可以在主程序里这样使用混合流水线# main.py 示例片段 import asyncio from agents.outline_architect import OutlineArchitect from agents.content_writer import ContentWriter from agents.code_reviewer import CodeReviewer from pipeline.hybrid_pipeline import HybridPipeline from models.schemas import BlogOutline import logging logging.basicConfig(levellogging.INFO) async def main(): topic Python中的异步编程asyncio入门详解 # 1. 初始化智能体 outline_agent OutlineArchitect() # 创建多个内容撰写员实例或复用同一个但注意API速率限制 writer_agent_1 ContentWriter(nameWriter_Section1) writer_agent_2 ContentWriter(nameWriter_Section2) # ... 可以创建更多 code_agent CodeReviewer() # 2. 初始化流水线 pipeline HybridPipeline() # 3. 构建任务阶段 # 阶段1串行 - 生成大纲 pipeline.add_sequential_stage([ {agent: outline_agent, input: topic} ]) # 阶段2并行 - 撰写多个核心章节假设我们并行写前两个H2章节 # 注意这里需要等阶段1完成后才能获得outline结果。所以input是一个占位符或函数。 # 更优雅的做法是使用“任务工厂”或lambda这里为清晰起见稍后在实际运行前动态赋值。 # 阶段3串行 - 代码生成与审查需要所有章节内容 pipeline.add_sequential_stage([ {agent: code_agent, input: None} # input将在运行时填充 ]) # 4. 运行流水线需要更精细的输入传递控制此处为概念展示 # 实际实现中需要在pipeline.run()内部根据上一阶段的结果动态构造下一阶段的输入。 # 这要求流水线设计更高级的数据流管理可能涉及回调或更复杂的上下文对象。 # 一个简化的运行逻辑伪代码思路 results [] # 运行阶段1 stage1_results await pipeline.run_stage(0) outline stage1_results[0] # 获取大纲 # 准备阶段2的并行任务输入 parallel_tasks [] for i, section in enumerate(outline.sections[:2]): # 假设并行写前两章 task {agent: ContentWriter(namefWriter_{i}), input: (i, outline)} parallel_tasks.append(task) pipeline.replace_stage_tasks(1, parallel_tasks) # 替换阶段2的任务列表 # 运行阶段2 stage2_results await pipeline.run_stage(1) all_content stage1_results stage2_results # 准备阶段3的输入合并所有生成的内容 full_draft combine_draft(outline, all_content) pipeline.replace_stage_tasks(2, [{agent: code_agent, input: full_draft}]) # 运行阶段3 stage3_results await pipeline.run_stage(2) final_result stage3_results[0] print(流水线执行完成) print(final_result) if __name__ __main__: asyncio.run(main())这段代码展示了混合流水线的概念但真实的实现需要更完善的数据流管理和错误处理机制。核心思想是将任务编排的“控制逻辑”与智能体的“执行逻辑”分离。流水线控制器负责调度和传递数据智能体只关心如何完成自己的本职工作。5. 错误处理、日志与性能优化5.1 健壮性设计错误处理与重试机制在生产环境中网络波动、API限流、模型偶尔的“胡言乱语”都会发生。一个健壮的流水线必须能妥善处理这些异常。API调用重试在BaseAgent.call_claude方法中可以增加重试逻辑针对网络超时、速率限制429错误等进行指数退避重试。# agents/base_agent.py (补充) import time from anthropic import APIError, RateLimitError, APITimeoutError async def call_claude_with_retry(self, user_prompt: str, max_retries: int 3, **kwargs) - str: 带重试机制的API调用 last_exception None for attempt in range(max_retries): try: return await self.call_claude(user_prompt, **kwargs) except (RateLimitError, APITimeoutError) as e: last_exception e wait_time (2 ** attempt) 1 # 指数退避 self.logger.warning(fAttempt {attempt1} failed with {type(e).__name__}. Retrying in {wait_time}s...) await asyncio.sleep(wait_time) except APIError as e: # 对于其他API错误如认证失败、无效请求通常重试无益 self.logger.error(fAPI Error (non-retryable): {e}) raise # 所有重试都失败 self.logger.error(fAll {max_retries} retry attempts failed. Last error: {last_exception}) raise last_exception智能体级错误处理每个智能体的execute方法应该捕获可能出现的业务逻辑错误如JSON解析失败并转换为有意义的异常类型或者返回一个表示失败的特定对象如None或一个包含错误信息的Result对象由流水线控制器决定如何应对跳过、重试、终止。流水线级错误处理流水线控制器需要捕获智能体抛出的异常并决定整个流水线的命运。是继续执行下一个不依赖此结果的任务还是完全停止这取决于业务逻辑。可以在HybridPipeline._execute_task中包裹try-except将异常作为结果返回然后在run方法中统一检查。5.2 可观测性结构化日志与监控清晰的日志是调试和监控的命脉。我推荐使用loguru或配置Python标准logging模块输出结构化的JSON日志方便接入ELK等日志系统。# utils/logger.py import sys import json from loguru import logger import datetime def setup_logger(): 配置日志格式和输出 # 移除默认配置 logger.remove() # 控制台输出开发时用 logger.add( sys.stderr, formatgreen{time:YYYY-MM-DD HH:mm:ss}/green | level{level: 8}/level | cyan{name}/cyan:cyan{function}/cyan:cyan{line}/cyan - level{message}/level, levelINFO ) # 文件输出JSON格式便于分析 logger.add( logs/pipeline_{time:YYYY-MM-DD}.log, formatlambda record: json.dumps({ time: record[time].isoformat(), level: record[level].name, agent: record[extra].get(agent_name, system), pipeline_id: record[extra].get(pipeline_id, default), message: record[message], task: record[extra].get(task, {}), }), levelDEBUG, rotation1 day, retention30 days, serializeTrue # 输出为JSON字符串 ) return logger # 在智能体中使用 class BaseAgent: def __init__(self, name, system_prompt): self.name name # ... 其他初始化 self.logger logger.bind(agent_namename)除了日志还可以记录每次API调用的耗时、token使用量并推送到监控仪表盘如Grafana以便实时了解流水线健康度和成本。5.3 性能与成本优化策略随着流水线复杂化性能和成本成为关键考量。模型选型不是所有任务都需要最强的claude-3-opus。claude-3-haiku速度更快、成本更低非常适合分类、简单提取、格式化等轻量任务。可以在BaseAgent或具体智能体初始化时指定模型。例如OutlineArchitect和ContentWriter用sonnet而一个只做关键词提取的智能体可以用haiku。缓存对于输入相同或相似的任务结果可以缓存。例如如果流水线经常处理相似主题大纲可能差别不大。可以使用functools.lru_cache内存缓存或外部缓存如Redis分布式缓存来存储智能体的输入输出对。注意需谨慎评估缓存的有效性避免因细微的prompt差异导致返回不恰当的结果。异步与并发如前所述利用asyncio.gather并发执行独立任务。但要密切关注Claude API的并发请求限制Rate Limits避免触发429错误。需要实现一个简单的信号量asyncio.Semaphore来控制最大并发数。# 在HybridPipeline中控制并发 class HybridPipeline: def __init__(self, max_concurrent_tasks: int 5): # 限制并发数 self.semaphore asyncio.Semaphore(max_concurrent_tasks) # ... async def _execute_task(self, task: Dict, ...) - Any: async with self.semaphore: # 控制并发 # ... 实际执行任务 return await agent.execute(prepared_input)Token使用优化精简Prompt系统提示词和用户提示词都要力求简洁准确移除不必要的客气话和重复描述。上下文管理传递给Claude的上下文如之前生成的内容不宜过长。可以设计一个“总结器”智能体将长上下文总结成要点再传递给下一个环节。输出限制合理设置max_tokens避免为简短回答分配过多额度。6. 扩展思路与高级应用场景基础的多智能体流水线搭建完成后你可以根据需求进行无限扩展。这里分享几个进阶思路动态智能体路由不是所有任务都走固定流程。可以引入一个“调度员”智能体根据初始任务描述或中间结果动态决定调用哪个智能体、以什么顺序执行。这更接近AutoGPT或CrewAI的理念。人类在环Human-in-the-loop在关键节点引入人工审核。例如大纲生成后将结果发送到Slack或生成一个预览网页等待人工确认后再进入下一阶段。流水线可以暂停并等待外部事件webhook回调。工具调用Function Calling集成让智能体不仅能生成文本还能调用外部工具。例如让“数据提取”智能体在分析报告时调用一个内部函数来查询数据库获取最新数据让“代码生成”智能体直接调用GitHub API创建PR。这需要利用Claude的Tool Use功能。长期记忆与知识库为智能体配备向量数据库如Chroma, Pinecone使其在执行任务时能参考过往的历史记录或公司内部文档生成更精准、个性化的内容。应用于特定垂直领域客户支持智能体1分类判断工单类型 - 智能体2检索从知识库找解决方案 - 智能体3润色生成回复草稿 - 智能体4审核检查语气和准确性。内部数据分析智能体1解析理解自然语言查询 - 智能体2转换将其转为SQL - 智能体3执行查询数据库 - 智能体4可视化生成图表描述和解读。游戏设计智能体1设定生成世界观梗概 - 智能体2角色设计主要人物 - 智能体3叙事编写关键剧情 - 智能体4关卡描述场景和挑战。搭建多智能体系统的过程本质上是在用代码定义一套协同工作的“思维模式”。它不会取代人类的创造力而是将人类从重复、繁琐的步骤中解放出来让我们能更专注于更高层次的策略和决策。从用一个智能体生成一段文案到用一组智能体管理一个内容项目这中间的效率提升和可能性拓展才是这项技术最迷人的地方。