echo-agent 前身为 2025 年 11 月启动的个人助理项目 fubot最初面向长期陪伴型个人智能体围绕认知记忆、上下文延续、用户偏好沉淀、任务闭环与持续自我优化展开。随着真实场景迭代项目逐步形成多入口接入、统一事件模型、消息总线、Agent Loop、多模型抽象、工具调用、MCP 接入、任务调度、权限审批、运行轨迹、长期记忆和受控自演进等能力。目前已支持微信、QQ、CLI、Gateway、Webhook、Cron 等入口服务用户超过 20 万、累计下载超过 50 万是面向长期运行、记忆增强和可持续成长智能体的开源 Agent Runtime。项目地址GitHub - fuyuxiang/echo-agent: Echo Agent 是一个可自托管、长期运行、持续学习的 AI Agent面向个人与团队的私有自动化场景。它可以部署在自有服务器上统一连接模型、工具、记忆、权限与消息入口。内置四层认知记忆、遗忘曲线与矛盾检测机制能够在跨会话任务中持续沉淀上下文并保持长期记忆的质量。针对命令执行、文件操作等高风险行为它提供基于 LLM 的审批与解释机制为关键操作建立可审计、可追溯的安全边界。原生支持 MCP、A2A、多模型路由、任务调度、工具调用和多通道接入覆盖 CLI、Gateway API、微信、Telegram 等入口。它让 Agent 带着长期记忆和可进化技能持续、安全地为你工作。 · GitHub你让 Agent “帮我修复测试失败”。它读了文件、跑了测试、改了代码最后告诉你“已经修复并验证通过”。很多人会以为到这里一次 Agent 处理就结束了。模型已经生成最后一句话用户也看到了结果似乎只剩下把字符串返回给前端。但真实系统里问题经常出现在最后这一段用户看到了回答session 没保存流式消息已经发过一半最终结果又重复发送后台记忆整理拿到旧会话对象把新消息覆盖模型输出里残留think下一轮又被当成历史事实。本篇只讲一个点ResponseStage 不是尾部清理代码而是一次 Agent 处理从“运行中状态”进入“可恢复历史”的提交边界。问题入口如果只看传统文本型 Chatbot处理链路通常很直接response llm.chat(messages) print(response.content)这段代码的隐含前提是最终回答只是一段文本。模型返回字符串系统就可以结束。Agent 不一样。一次 Agent 处理可能已经产生工具调用、工具结果、审批记录、任务状态、记忆候选、技能复盘信号、流式增量和 trace。最终回答只是用户看到的那一层。简单聊天程序Agent 响应阶段模型返回文本后直接发送先清理、合并、保存再发布历史通常只是上下文数组session 是可恢复状态没有后台认知更新可能触发 embedding、记忆整理、技能复盘一次发送就是结束流式输出需要 final 语义为了不停留在抽象层面下面以 echo-agent 的实现为例。它把一次处理拆成三段 PipelineContextStage准备输入InferenceStage完成推理与工具循环ResponseStage负责最后提交。换句话说ResponseStage不重新推理也不重新执行工具。它接过InferenceResult把推理阶段的结果变成稳定 session、用户可见响应和后台整理任务。提交边界ResponseStage.finalize的输入是PipelineContext和InferenceResult输出是ProcessResult。这里要区分两个结果对象数据结构所属阶段主要含义InferenceResult推理阶段模型推理得到了什么是否产生复盘信号ProcessResult响应阶段最终交付了什么是否已经完成出站发布outbound_sent尤其关键。若ResponseStage已经通过stream_publisher.finalize发布最终消息外层_on_inbound就不应该再兜底发送一次。否则用户可能收到重复回答。最终回答不是模型文本的简单转发而是系统确认“本次处理可以对外表达”的状态同步。把书稿里的主流程压缩成伪代码大致是这样async def finalize(ctx, result): text strip_thinking(result.response_text) text merge_intro(ctx.intro_text, text) session ctx.session session.add_message(assistant, text) await sessions.save(session) if memory.has_pending_embeds(): spawn(memory.flush_pending_embeds()) if should_consolidate(session): await consolidation.schedule( session.key, spawn, on_completeclear_memory_snapshot, ) if result.should_review_skills and result.total_tool_calls 0: spawn(background_skill_review(ctx.messages)) if result.should_review_memory and result.total_tool_calls 0: spawn(background_memory_review(ctx.messages, session.key)) sent False if ctx.publish_response and ctx.stream_publisher: sent await ctx.stream_publisher.finalize(text) return ProcessResult(text, sent)这段流程的重点不是函数名而是顺序先得到干净的最终文本再保存主状态再启动派生任务最后完成响应发布语义。主路径响应阶段第一步是清理模型文本。某些模型会把内部推理放在think.../think标签里。对最终用户来说这些内容通常不应该展示对会话历史来说也不应该保存。书稿中的strip_thinking会移除完整的think.../think片段也会清理残留的单独标签。这个清理必须发生在写入 session 之前。否则下一轮上下文会把内部推理当成历史事实模型可能沿着一段本不该暴露的推理继续生成。它也要发生在流式 final 之前。即使前面已经发送过流式增量最终 full text 仍应以清理后的版本为准。_TokenStreamPublisher.finalize会发布最终完整文本并通过 metadata 标记_stream_full_text通道层可以据此更新最终版本。介绍语也在这里合并而不是让模型自己临场发挥。原因很简单介绍语属于产品行为不属于模型推理。系统可以根据通道、语言、配置模板和 Agent 名称生成稳定介绍避免每次首次响应都变成模型自由发挥。随后响应阶段把最终 assistant 消息写入 session并保存session.add_message(assistant, response_text) await sessions.save(session)这是本篇最重要的提交动作。前面的阶段已经把用户消息、带工具调用的 assistant 消息、tool 消息写入会话。现在最终 assistant 回答被追加进去才形成完整闭环用户提出目标Agent 采取行动工具返回观察Agent 给出结论。如果缺少最终 assistant 消息下一轮模型只能看到用户输入和工具过程却看不到本轮结论。长期会话恢复、上下文压缩边界、记忆整理阈值、审计调试和测试断言都会受到影响。用户看到回答不等于系统已经进入可恢复状态只有回答落盘后下一轮对话才真正站在同一段历史上。慢通道为什么保存要发生在后台任务之前因为 embedding 刷新、consolidation、技能复盘、记忆复盘都是派生任务。它们可以失败、延迟或重试但不应该阻止本轮会话历史被保存。否则一个向量服务失败就可能让用户刚看到的回答在系统里丢失。第一类后台任务是 pending embedding。记忆系统如果新增或更新条目在启用向量索引和 embedding 函数时会先进入_pending_embeds队列。ResponseStage检查memory.has_pending_embeds()若存在待处理项就异步触发flush_pending_embeds()。第二类是长期记忆整理。MemoryConsolidator.should_consolidate的规则很直接用session.message_count - session.last_consolidated计算未整理消息数达到阈值就调度 consolidation。这里的last_consolidated不是时间戳而是 session 消息列表里的边界索引。边界之前的消息已经进入长期记忆整理范围边界之后仍是待整理历史。用索引表达整理进度比按时间猜测更可靠。第三类是技能复盘与记忆复盘。InferenceStage在工具调用达到阈值时只产生复盘信号真正执行由ResponseStage调度。当前实现还要求total_tool_calls 0避免普通闲聊触发复盘。这些任务都属于低优先级认知更新。它们改变未来 Agent 能召回什么、知道什么、如何处理类似任务但不应该改变用户刚刚收到的事实承诺。整理边界后台整理最怕的不是慢而是不安全。一个常见错误是把当前内存里的 session 对象直接丢给后台任务。问题在于后台任务真正运行时前台可能已经处理了下一条消息。如果后台拿着旧对象继续保存就可能覆盖新消息或者推进错误的整理边界。echo-agent 的ConsolidationWorker用两个动作解决这个问题调度时用_pending防止同一个 session 重复排队真正运行时重新获取 session 锁并重新加载 session。整理完成后它也不会盲目把last_consolidated推到消息列表末尾。它会向前调整边界避免停在tool消息或带tool_calls的 assistant 消息之后boundary len(session.messages) while boundary session.last_consolidated: msg session.messages[boundary - 1] if msg.get(role) tool: boundary - 1 elif msg.get(role) assistant and msg.get(tool_calls): boundary - 1 else: break这个细节很工程。工具调用消息有结构依赖带tool_calls的 assistant 消息后面应该跟对应的 tool 消息。如果整理边界切在中间后续上下文压缩或历史裁剪就可能制造非法消息结构。所以记忆整理不是“把前 N 条消息摘要掉”。它必须尊重模型消息协议也必须尊重并发更新。consolidation 成功后ResponseStage传给 worker 的on_complete会清理该 session 的记忆快照。下一轮ContextStage再构造上下文时就会重新读取更新后的记忆。否则 Agent 可能长期使用旧 snapshot看不到刚整理出的长期记忆。发布与失败响应阶段最后处理流式发布outbound_sent await ctx.stream_publisher.finalize(response_text)这个 final 事件很重要。流式 token 让用户更早看到生成过程但外部系统仍需要知道什么时候一次交互真正结束。Gateway wait、通道编辑、最终消息合并和外层兜底发布都依赖这个语义。如果finalize已经发送最终完整文本ProcessResult.outbound_sent就会设为真。外层 Agent Loop 看到这个标记后不再重复发布。失败策略也要分层。ResponseStage.finalize没有把所有异常都吞掉。保存 session、调度关键后台任务、流式 finalize 如果抛出异常会交给外层_on_inbound捕获并发布错误响应。后台 review 方法则会捕获异常并记录 warning避免把用户已经得到的回答变成失败响应。生产级 Agent 的可靠性经常不是败在模型不会回答而是败在回答之后状态不一致。生产可用性判断一个响应阶段是否可用不能只看“能不能返回文本”。更可检验的标准是检查项可检验标准文本治理think等内部标记不会进入最终回答和 session状态提交最终 assistant 消息写入 session并在后台任务前保存顺序一致用户消息、工具调用、工具结果、最终回答按协议顺序落盘流式一致final 事件能发布完整文本outbound_sent防止重复发送后台隔离embedding、consolidation、review 不阻塞当前响应并发安全consolidation 重新加锁、重新加载 session不持有旧对象边界安全last_consolidated不切断 tool call 结构失败分层主路径失败进入错误处理后台失败记录并可治理这里的核心判断很简单回答是交付落盘是记录后台整理是学习。三者相关但不能混在同一条阻塞路径里。交付要让用户及时拿到稳定结论记录要让系统下一轮还能恢复同一段历史学习要让 Agent 从任务中沉淀记忆和技能但不能悄悄改写刚刚对用户承诺过的事实。小结ResponseStage 是 Agent Loop 里容易被低估的一层。它不负责让模型更聪明也不负责让工具更强大它负责在一次智能行动之后把文本、状态、后台任务和出站通道收束到一致结局。理解这一层后很多线上问题会变得清楚重复发送不是模型问题而是发布语义不清会话断片不是上下文问题而是最终回答没有提交记忆污染不是记忆系统单点问题而是后台整理缺少边界和证据治理。Agent 的一次处理不在模型生成最后一个字时结束而在系统确认回答可交付、历史可恢复、后台整理可追踪之后才真正进入下一轮。全篇完本文为 echo-agent 设计笔记系列第 10 篇。项目源码已开源至 GitHub。如果你对工业级 Agent 的工程落地感兴趣欢迎加入技术交流群参与日常讨论。下一篇我们将探讨 《给 Agent 加一个规划与反思层》敬请期待。
ResponseStage 设计笔记:回答落盘与后台整理
echo-agent 前身为 2025 年 11 月启动的个人助理项目 fubot最初面向长期陪伴型个人智能体围绕认知记忆、上下文延续、用户偏好沉淀、任务闭环与持续自我优化展开。随着真实场景迭代项目逐步形成多入口接入、统一事件模型、消息总线、Agent Loop、多模型抽象、工具调用、MCP 接入、任务调度、权限审批、运行轨迹、长期记忆和受控自演进等能力。目前已支持微信、QQ、CLI、Gateway、Webhook、Cron 等入口服务用户超过 20 万、累计下载超过 50 万是面向长期运行、记忆增强和可持续成长智能体的开源 Agent Runtime。项目地址GitHub - fuyuxiang/echo-agent: Echo Agent 是一个可自托管、长期运行、持续学习的 AI Agent面向个人与团队的私有自动化场景。它可以部署在自有服务器上统一连接模型、工具、记忆、权限与消息入口。内置四层认知记忆、遗忘曲线与矛盾检测机制能够在跨会话任务中持续沉淀上下文并保持长期记忆的质量。针对命令执行、文件操作等高风险行为它提供基于 LLM 的审批与解释机制为关键操作建立可审计、可追溯的安全边界。原生支持 MCP、A2A、多模型路由、任务调度、工具调用和多通道接入覆盖 CLI、Gateway API、微信、Telegram 等入口。它让 Agent 带着长期记忆和可进化技能持续、安全地为你工作。 · GitHub你让 Agent “帮我修复测试失败”。它读了文件、跑了测试、改了代码最后告诉你“已经修复并验证通过”。很多人会以为到这里一次 Agent 处理就结束了。模型已经生成最后一句话用户也看到了结果似乎只剩下把字符串返回给前端。但真实系统里问题经常出现在最后这一段用户看到了回答session 没保存流式消息已经发过一半最终结果又重复发送后台记忆整理拿到旧会话对象把新消息覆盖模型输出里残留think下一轮又被当成历史事实。本篇只讲一个点ResponseStage 不是尾部清理代码而是一次 Agent 处理从“运行中状态”进入“可恢复历史”的提交边界。问题入口如果只看传统文本型 Chatbot处理链路通常很直接response llm.chat(messages) print(response.content)这段代码的隐含前提是最终回答只是一段文本。模型返回字符串系统就可以结束。Agent 不一样。一次 Agent 处理可能已经产生工具调用、工具结果、审批记录、任务状态、记忆候选、技能复盘信号、流式增量和 trace。最终回答只是用户看到的那一层。简单聊天程序Agent 响应阶段模型返回文本后直接发送先清理、合并、保存再发布历史通常只是上下文数组session 是可恢复状态没有后台认知更新可能触发 embedding、记忆整理、技能复盘一次发送就是结束流式输出需要 final 语义为了不停留在抽象层面下面以 echo-agent 的实现为例。它把一次处理拆成三段 PipelineContextStage准备输入InferenceStage完成推理与工具循环ResponseStage负责最后提交。换句话说ResponseStage不重新推理也不重新执行工具。它接过InferenceResult把推理阶段的结果变成稳定 session、用户可见响应和后台整理任务。提交边界ResponseStage.finalize的输入是PipelineContext和InferenceResult输出是ProcessResult。这里要区分两个结果对象数据结构所属阶段主要含义InferenceResult推理阶段模型推理得到了什么是否产生复盘信号ProcessResult响应阶段最终交付了什么是否已经完成出站发布outbound_sent尤其关键。若ResponseStage已经通过stream_publisher.finalize发布最终消息外层_on_inbound就不应该再兜底发送一次。否则用户可能收到重复回答。最终回答不是模型文本的简单转发而是系统确认“本次处理可以对外表达”的状态同步。把书稿里的主流程压缩成伪代码大致是这样async def finalize(ctx, result): text strip_thinking(result.response_text) text merge_intro(ctx.intro_text, text) session ctx.session session.add_message(assistant, text) await sessions.save(session) if memory.has_pending_embeds(): spawn(memory.flush_pending_embeds()) if should_consolidate(session): await consolidation.schedule( session.key, spawn, on_completeclear_memory_snapshot, ) if result.should_review_skills and result.total_tool_calls 0: spawn(background_skill_review(ctx.messages)) if result.should_review_memory and result.total_tool_calls 0: spawn(background_memory_review(ctx.messages, session.key)) sent False if ctx.publish_response and ctx.stream_publisher: sent await ctx.stream_publisher.finalize(text) return ProcessResult(text, sent)这段流程的重点不是函数名而是顺序先得到干净的最终文本再保存主状态再启动派生任务最后完成响应发布语义。主路径响应阶段第一步是清理模型文本。某些模型会把内部推理放在think.../think标签里。对最终用户来说这些内容通常不应该展示对会话历史来说也不应该保存。书稿中的strip_thinking会移除完整的think.../think片段也会清理残留的单独标签。这个清理必须发生在写入 session 之前。否则下一轮上下文会把内部推理当成历史事实模型可能沿着一段本不该暴露的推理继续生成。它也要发生在流式 final 之前。即使前面已经发送过流式增量最终 full text 仍应以清理后的版本为准。_TokenStreamPublisher.finalize会发布最终完整文本并通过 metadata 标记_stream_full_text通道层可以据此更新最终版本。介绍语也在这里合并而不是让模型自己临场发挥。原因很简单介绍语属于产品行为不属于模型推理。系统可以根据通道、语言、配置模板和 Agent 名称生成稳定介绍避免每次首次响应都变成模型自由发挥。随后响应阶段把最终 assistant 消息写入 session并保存session.add_message(assistant, response_text) await sessions.save(session)这是本篇最重要的提交动作。前面的阶段已经把用户消息、带工具调用的 assistant 消息、tool 消息写入会话。现在最终 assistant 回答被追加进去才形成完整闭环用户提出目标Agent 采取行动工具返回观察Agent 给出结论。如果缺少最终 assistant 消息下一轮模型只能看到用户输入和工具过程却看不到本轮结论。长期会话恢复、上下文压缩边界、记忆整理阈值、审计调试和测试断言都会受到影响。用户看到回答不等于系统已经进入可恢复状态只有回答落盘后下一轮对话才真正站在同一段历史上。慢通道为什么保存要发生在后台任务之前因为 embedding 刷新、consolidation、技能复盘、记忆复盘都是派生任务。它们可以失败、延迟或重试但不应该阻止本轮会话历史被保存。否则一个向量服务失败就可能让用户刚看到的回答在系统里丢失。第一类后台任务是 pending embedding。记忆系统如果新增或更新条目在启用向量索引和 embedding 函数时会先进入_pending_embeds队列。ResponseStage检查memory.has_pending_embeds()若存在待处理项就异步触发flush_pending_embeds()。第二类是长期记忆整理。MemoryConsolidator.should_consolidate的规则很直接用session.message_count - session.last_consolidated计算未整理消息数达到阈值就调度 consolidation。这里的last_consolidated不是时间戳而是 session 消息列表里的边界索引。边界之前的消息已经进入长期记忆整理范围边界之后仍是待整理历史。用索引表达整理进度比按时间猜测更可靠。第三类是技能复盘与记忆复盘。InferenceStage在工具调用达到阈值时只产生复盘信号真正执行由ResponseStage调度。当前实现还要求total_tool_calls 0避免普通闲聊触发复盘。这些任务都属于低优先级认知更新。它们改变未来 Agent 能召回什么、知道什么、如何处理类似任务但不应该改变用户刚刚收到的事实承诺。整理边界后台整理最怕的不是慢而是不安全。一个常见错误是把当前内存里的 session 对象直接丢给后台任务。问题在于后台任务真正运行时前台可能已经处理了下一条消息。如果后台拿着旧对象继续保存就可能覆盖新消息或者推进错误的整理边界。echo-agent 的ConsolidationWorker用两个动作解决这个问题调度时用_pending防止同一个 session 重复排队真正运行时重新获取 session 锁并重新加载 session。整理完成后它也不会盲目把last_consolidated推到消息列表末尾。它会向前调整边界避免停在tool消息或带tool_calls的 assistant 消息之后boundary len(session.messages) while boundary session.last_consolidated: msg session.messages[boundary - 1] if msg.get(role) tool: boundary - 1 elif msg.get(role) assistant and msg.get(tool_calls): boundary - 1 else: break这个细节很工程。工具调用消息有结构依赖带tool_calls的 assistant 消息后面应该跟对应的 tool 消息。如果整理边界切在中间后续上下文压缩或历史裁剪就可能制造非法消息结构。所以记忆整理不是“把前 N 条消息摘要掉”。它必须尊重模型消息协议也必须尊重并发更新。consolidation 成功后ResponseStage传给 worker 的on_complete会清理该 session 的记忆快照。下一轮ContextStage再构造上下文时就会重新读取更新后的记忆。否则 Agent 可能长期使用旧 snapshot看不到刚整理出的长期记忆。发布与失败响应阶段最后处理流式发布outbound_sent await ctx.stream_publisher.finalize(response_text)这个 final 事件很重要。流式 token 让用户更早看到生成过程但外部系统仍需要知道什么时候一次交互真正结束。Gateway wait、通道编辑、最终消息合并和外层兜底发布都依赖这个语义。如果finalize已经发送最终完整文本ProcessResult.outbound_sent就会设为真。外层 Agent Loop 看到这个标记后不再重复发布。失败策略也要分层。ResponseStage.finalize没有把所有异常都吞掉。保存 session、调度关键后台任务、流式 finalize 如果抛出异常会交给外层_on_inbound捕获并发布错误响应。后台 review 方法则会捕获异常并记录 warning避免把用户已经得到的回答变成失败响应。生产级 Agent 的可靠性经常不是败在模型不会回答而是败在回答之后状态不一致。生产可用性判断一个响应阶段是否可用不能只看“能不能返回文本”。更可检验的标准是检查项可检验标准文本治理think等内部标记不会进入最终回答和 session状态提交最终 assistant 消息写入 session并在后台任务前保存顺序一致用户消息、工具调用、工具结果、最终回答按协议顺序落盘流式一致final 事件能发布完整文本outbound_sent防止重复发送后台隔离embedding、consolidation、review 不阻塞当前响应并发安全consolidation 重新加锁、重新加载 session不持有旧对象边界安全last_consolidated不切断 tool call 结构失败分层主路径失败进入错误处理后台失败记录并可治理这里的核心判断很简单回答是交付落盘是记录后台整理是学习。三者相关但不能混在同一条阻塞路径里。交付要让用户及时拿到稳定结论记录要让系统下一轮还能恢复同一段历史学习要让 Agent 从任务中沉淀记忆和技能但不能悄悄改写刚刚对用户承诺过的事实。小结ResponseStage 是 Agent Loop 里容易被低估的一层。它不负责让模型更聪明也不负责让工具更强大它负责在一次智能行动之后把文本、状态、后台任务和出站通道收束到一致结局。理解这一层后很多线上问题会变得清楚重复发送不是模型问题而是发布语义不清会话断片不是上下文问题而是最终回答没有提交记忆污染不是记忆系统单点问题而是后台整理缺少边界和证据治理。Agent 的一次处理不在模型生成最后一个字时结束而在系统确认回答可交付、历史可恢复、后台整理可追踪之后才真正进入下一轮。全篇完本文为 echo-agent 设计笔记系列第 10 篇。项目源码已开源至 GitHub。如果你对工业级 Agent 的工程落地感兴趣欢迎加入技术交流群参与日常讨论。下一篇我们将探讨 《给 Agent 加一个规划与反思层》敬请期待。