【LangChain】流式处理实战:实时响应优化

【LangChain】流式处理实战:实时响应优化 【LangChain】流式处理实战实时响应优化开篇为什么你的Agent需要实时感想象这样一个场景你走进一家餐厅点完菜后服务员就消失了。你坐在那儿干等15分钟完全不知道厨房是在备菜、炒菜还是已经把你的订单搞丢了。这种信息黑洞让人焦虑对吧这就是大多数AI Agent给用户的感觉——输入问题后屏幕一片空白只有一个小转圈在暗示我还在活着。流式处理Streaming就是解决这个问题的银弹。它让用户实时看到Agent的思考过程模型正在生成什么内容、调用了什么工具、进度如何。就像餐厅里的开放式厨房你能看到厨师切菜、炒菜的全过程等待也变得可以忍受了。LangChain的新架构提供了三种核心流式模式配合Token级输出和自定义进度反馈能让你的Agent体验从黑盒等待跃升到透明协作。一、三种stream_mode选择你的信息粒度LangChain的流式系统通过stream_mode参数控制信息输出的粒度。理解这三种模式的区别是掌握流式处理的第一步。1.updates模式Agent步骤追踪这是最适合监控Agent执行流程的模式。它会在每个Agent步骤完成后推送状态更新。fromlangchain.agentsimportcreate_agentdefget_weather(city:str)-str:returnfIts always sunny in{city}!agentcreate_agent(modelgpt-5-nano,tools[get_weather],)# 使用 updates 模式流式追踪forchunkinagent.stream({messages:[{role:user,content:What is the weather in SF?}]},stream_modeupdates,versionv2,# 推荐使用v2统一格式):ifchunk[type]updates:forstep,datainchunk[data].items():print(f 步骤:{step})print(f 内容:{data[messages][-1].content_blocks})输出示例 步骤: model 内容: [{type: tool_call, name: get_weather, args: {city: San Francisco}}] 步骤: tools 内容: [{type: text, text: Its always sunny in San Francisco!}] 步骤: model 内容: [{type: text, text: The weather in San Francisco is sunny!}]适用场景✅ 需要展示Agent正在做什么的进度条✅ 调试Agent执行流程✅ 记录每个步骤的输入输出用于审计2.messages模式Token级实时输出这是最接近ChatGPT用户体验的模式。它流式返回LLM生成的每个Token配合元数据让你知道消息来自哪个节点。forchunkinagent.stream({messages:[{role:user,content:What is the weather in SF?}]},stream_modemessages,versionv2,):ifchunk[type]messages:token,metadatachunk[data]# 实时打印每个tokeniftoken.content:print(token.content,end,flushTrue)# 追踪工具调用片段iftoken.tool_call_chunks:print(f\n 工具调用:{token.tool_call_chunks})关键特性返回的是(token, metadata)元组metadata[langgraph_node]告诉你消息来自哪个节点token.tool_call_chunks展示工具调用的增量JSON构建过程输出示例工具调用片段 工具调用: [{name: get_weather, args: , id: call_xxx}] 工具调用: [{name: None, args: {, id: None}] 工具调用: [{name: None, args: city, id: None}] 工具调用: [{name: None, args: :, id: None}] 工具调用: [{name: None, args: San, id: None}] 工具调用: [{name: None, args: Francisco, id: None}]看着工具参数像打字机一样逐字出现用户能直观感受到模型正在思考如何调用工具。3.custom模式自定义进度信号这是最有创造力的模式。通过get_stream_writer()你可以在工具或节点内部发送任意自定义数据到前端。fromlanggraph.configimportget_stream_writerdefget_weather(city:str)-str:获取城市天气带进度反馈writerget_stream_writer()# 发送自定义进度事件writer({status:searching,city:city,progress:0})# ... 执行搜索 ...writer({status:analyzing,progress:50})# ... 分析数据 ...writer({status:complete,progress:100,result:sunny})returnfIts always sunny in{city}!# 流式接收自定义事件forchunkinagent.stream({messages:[{role:user,content:What is the weather in SF?}]},stream_modecustom,versionv2,):ifchunk[type]custom:print(f 进度更新:{chunk[data]})输出示例 进度更新: {status: searching, city: San Francisco, progress: 0} 进度更新: {status: analyzing, progress: 50} 进度更新: {status: complete, progress: 100, result: sunny}实战技巧配合前端进度条组件你可以实现正在连接天气服务…、正在解析数据…等精细化反馈。二、组合模式多维度实时反馈单一模式往往不够用。LangChain允许你同时启用多种模式获得全方位的流式信息。forchunkinagent.stream({messages:[{role:user,content:What is the weather in SF?}]},stream_mode[updates,messages,custom],# 三管齐下versionv2,):# 统一格式每个chunk都有 type, ns, datastream_typechunk[type]datachunk[data]ifstream_typemessages:token,metadatadataprint(token.content,end)# 实时打字机效果elifstream_typeupdates:fornode_name,state_updateindata.items():print(f\n 节点{node_name}完成)elifstream_typecustom:print(f\n 自定义事件:{data})v2格式的统一结构{type:updates|messages|custom|...,ns:(),# 命名空间用于子Agentdata:...,# 实际数据类型取决于mode}最佳实践开发调试[updates, debug]—— 看流程看细节生产UI[messages, custom]—— 打字机效果进度条复杂Agent[updates, messages, custom]—— 全方位监控三、思考/推理Token流式展示现代模型如Claude 3.7 Sonnet、o1系列具备推理能力——它们会在回答前进行内部思考。LangChain支持将这些思维过程实时展示给用户。fromlangchain_anthropicimportChatAnthropicfromlangchain.messagesimportAIMessageChunk# 启用推理模式modelChatAnthropic(model_nameclaude-sonnet-4-6,thinking{type:enabled,budget_tokens:5000},)agentcreate_agent(modelmodel,tools[get_weather])forchunkinagent.stream({messages:[{role:user,content:What is the weather in SF?}]},stream_modemessages,):ifnotisinstance(token:chunk[data][0],AIMessageChunk):continue# 分离推理内容和正式回答content_blockstoken.content_blocks# 提取推理块reasoning[bforbincontent_blocksifb[type]reasoning]text[bforbincontent_blocksifb[type]text]ifreasoning:print(f [思考中]{reasoning[0][reasoning]},end)iftext:print(f{text[0][text]},end)输出效果 [思考中] 用户询问旧金山天气我需要调用天气工具 [思考中] 城市参数应该是San Francisco 旧金山天气晴朗气温22°C...UI设计建议用灰色/斜体展示推理内容与正式回答区分提供显示/隐藏思考过程的开关推理Token通常消耗额外成本注意预算控制四、工具调用进度实时反馈工具执行往往是耗时最长的环节。用户需要知道哪个工具正在运行进度如何是否卡住了基础方案工具执行状态defanalyze_data(query:str)-str:分析数据带详细进度反馈writerget_stream_writer()# 阶段1数据获取writer({tool:analyze_data,phase:fetching,message:正在从数据库获取原始数据...,progress:10})time.sleep(1)# 阶段2数据清洗writer({tool:analyze_data,phase:cleaning,message:清洗异常值和缺失数据...,progress:40})time.sleep(1)# 阶段3分析计算writer({tool:analyze_data,phase:analyzing,message:执行统计分析和模式识别...,progress:70})time.sleep(1)# 完成writer({tool:analyze_data,phase:complete,message:分析完成,progress:100})return分析结果数据趋势呈上升态势高级方案多工具协调展示当Agent并行调用多个工具时你需要区分不同工具的执行状态fromtypingimportAnydef_render_tool_progress(chunk:dict)-None:渲染工具执行进度ifchunk[type]custom:datachunk[data]# 工具进度条渲染iftoolindataandprogressindata:bar█*(data[progress]//5)░*(20-data[progress]//5)print(f\r{data[tool]}: [{bar}]{data[progress]}% -{data[message]},end)# 工具完成ifdata.get(phase)complete:print(f\n✅{data[tool]}执行完成)# 流式循环中调用forchunkinagent.stream(input_data,stream_mode[messages,custom]):_render_tool_progress(chunk)# ... 其他处理前端集成思路使用stream_mode[messages, custom]为每个工具调用创建独立的进度卡片通过tool_call_id关联进度更新与具体工具实例五、多Agent系统的流式溯源在多Agent架构中主Agent调用子Agent你需要知道每条消息来自哪个Agent。fromlangchain.agentsimportcreate_agent# 创建子Agent天气专家weather_agentcreate_agent(modelgpt-5-nano,tools[get_weather],nameweather_specialist,# 关键给Agent命名)# 主Agent调用子Agentdefcall_weather_agent(query:str)-str:resultweather_agent.invoke({messages:[{role:user,content:query}]})returnresult[messages][-1].text supervisorcreate_agent(modelgpt-5-nano,tools[call_weather_agent],namesupervisor,)# 流式追踪启用子图流式current_agentNoneforchunkinsupervisor.stream({messages:[{role:user,content:北京天气如何}]},stream_mode[messages,updates],subgraphsTrue,# 关键启用子Agent流式versionv2,):ifchunk[type]messages:token,metadatachunk[data]# 通过 lc_agent_name 识别消息来源agent_namemetadata.get(lc_agent_name,unknown)ifagent_name!current_agent:print(f\n [{agent_name}]:)current_agentagent_nameiftoken.content:print(token.content,end)输出示例 [supervisor]: 我需要查询天气信息让我调用天气专家。 [weather_specialist]: 正在获取北京天气数据... 北京今天晴朗气温25°C。 [supervisor]: 根据天气专家的信息北京今天天气很好适合出门关键配置给每个Agent设置name参数流式时启用subgraphsTrue从metadata[lc_agent_name]读取来源六、实战模式构建生产级流式UI完整示例带进度条的Agentimportasynciofromlangchain.agentsimportcreate_agentfromlanggraph.configimportget_stream_writerfromlangchain.messagesimportAIMessageChunk# 1. 定义带进度反馈的工具defresearch_topic(topic:str)-str:研究主题实时反馈进度writerget_stream_writer()steps[(搜索相关资料,20),(阅读关键文章,40),(提取核心观点,60),(整理研究结论,80),(生成最终报告,100)]forstep_name,progressinsteps:writer({type:progress,tool:research_topic,step:step_name,progress:progress,topic:topic})time.sleep(0.5)# 模拟耗时操作returnf关于{topic}的研究完成这是一个热门话题近期有显著增长趋势。# 2. 创建Agentagentcreate_agent(modelgpt-5-nano,tools[research_topic],)# 3. 流式处理函数asyncdefstream_with_ui(input_text:str):生产级流式处理分离不同类型的更新# 状态管理current_toolNonemessage_bufferasyncforchunkinagent.astream({messages:[{role:user,content:input_text}]},stream_mode[messages,updates,custom],versionv2,):stream_typechunk[type]datachunk[data]# 处理Token流打字机效果ifstream_typemessages:token,metadatadataifisinstance(token,AIMessageChunk):iftoken.text:message_buffertoken.text# 实时更新UI文本yield{type:text,content:token.text}# 工具调用片段iftoken.tool_call_chunks:yield{type:tool_start,data:token.tool_call_chunks}# 处理状态更新elifstream_typeupdates:fornode_name,updateindata.items():ifnode_nametools:yield{type:tool_complete,data:update}elifnode_namemodel:yield{type:model_complete,data:update}# 处理自定义进度elifstream_typecustom:ifdata.get(type)progress:yield{type:progress,tool:data[tool],step:data[step],progress:data[progress]}# 4. 前端消费示例伪代码 async for event in stream_with_ui(研究一下AI Agent发展趋势): if event[type] text: append_to_chat(event[content]) # 打字机效果 elif event[type] progress: update_progress_bar( toolevent[tool], progressevent[progress], labelevent[step] ) elif event[type] tool_complete: show_tool_result(event[data]) 总结流式处理最佳实践清单场景推荐模式关键配置简单对话UImessages实时打字机效果工具执行监控updatescustom步骤追踪进度条多Agent协作messagessubgraphsTrueAgent命名溯源调试开发[updates, messages, debug]全方位信息生产部署[messages, custom]平衡性能与体验核心要点回顾始终使用versionv2—— 统一格式简化处理逻辑合理选择粒度—— 不是所有场景都需要Token级流式善用custom模式—— 它是构建高级UI的秘密武器给Agent命名—— 多Agent场景下溯源的关键分离关注点—— 用chunk[type]路由不同类型的更新流式处理不仅仅是技术优化更是用户体验的革命。当你的Agent开始边想边说用户从焦虑等待转为参与观察信任感自然建立。下一篇我们将深入探讨中间件开发学习如何在Agent执行链路中插入自定义逻辑实现日志、监控、安全护栏等高级功能。参考资源LangChain Streaming官方文档LangGraph Streaming指南关注公众号【dev派】发送 “agent” 获取全部源码和模板