claw-code 源码详细分析:不调用大模型也能练会话——`QueryEnginePort` 如何把状态机、停止条件与审计位摆对?

claw-code 源码详细分析:不调用大模型也能练会话——`QueryEnginePort` 如何把状态机、停止条件与审计位摆对? 范围本文基于仓库src/query_engine.py并关联src/models.pyUsageSummary、PermissionDenial、src/transcript.py、src/session_store.py、src/runtime.py与src/main.py中的调用方式。分析对象是Python 移植层的QueryEnginePort其submit_message不发起任何外部 LLM HTTP 请求输出由本地格式化的摘要行或 JSON 构成用于练习会话状态、停止语义、流式事件形状与持久化。1. 为什么「不接模型」仍能练会话真实生产 Harness 里大模型只是turn 的一环前后还有配额、轮次上限、权限拒绝、转写是否落盘、压缩何时触发、结构化输出是否可解析等横切逻辑。若在第一天就把这些与openai.chat.completions绑死调试时会出现分不清是prompt 写坏了还是状态机越界分不清是工具真的被拒还是网络超时单测无法稳定复现依赖 API Key、速率与模型随机性。QueryEnginePort刻意把「一轮用户输入 → 结构化TurnResult 可选流式事件」做成纯本地、可单测、可 CLI 驱动让「会话骨架」与「模型推理」解耦。接入真实模型时应替换的是生成output字符串的那一段而不是推翻TurnResult/stop_reason/persist_session的契约。2. 核心类型配置、单轮结果、引擎本体2.1QueryEngineConfig状态机的「旋钮」dataclass(frozenTrue) class QueryEngineConfig: max_turns: int 8 max_budget_tokens: int 2000 compact_after_turns: int 12 structured_output: bool False structured_retry_limit: int 2字段在行为上的含义max_turns会话深度闸门以mutable_messages条数衡量达到上限则拒绝再处理新 prompt见下文提前返回。max_budget_tokens累计用量闸门注意实现里用UsageSummary.add_turn的词数近似非真实 tokenizer。compact_after_turns压缩/截断阈值超过后对mutable_messages与TranscriptStore.entries只保留尾部窗口。structured_output输出是多行文本还是JSON 字符串便于下游解析。structured_retry_limitJSON 序列化失败时的重试次数防御分支。这些字段共同定义了无需模型也能跑通的有限状态行为。2.2TurnResult单轮对外契约含停止原因dataclass(frozenTrue) class TurnResult: prompt: str output: str matched_commands: tuple[str, ...] matched_tools: tuple[str, ...] permission_denials: tuple[PermissionDenial, ...] usage: UsageSummary stop_reason: str学习点stop_reason是审计与编排的一等公民与output并列返回调用方CLI、PortRuntime、未来 UI可以分支处理例如max_turns_reached时禁止继续提交。2.3QueryEnginePort持有的可变状态dataclass class QueryEnginePort: manifest: PortManifest config: QueryEngineConfig field(default_factoryQueryEngineConfig) session_id: str field(default_factorylambda: uuid4().hex) mutable_messages: list[str] field(default_factorylist) permission_denials: list[PermissionDenial] field(default_factorylist) total_usage: UsageSummary field(default_factoryUsageSummary) transcript_store: TranscriptStore field(default_factoryTranscriptStore)manifest与工作区PortManifest绑定render_summary()时拼进报告会话与「移植面」同源展示。session_id稳定关联持久化文件见session_store。mutable_messages轮次计数与压缩的主载体每成功处理一轮 prompt 追加一条。permission_denials跨轮累积的拒绝记录审计位render_summary()打印len(self.permission_denials)。total_usage跨轮累计的「伪 token」计数。transcript_store与mutable_messages同步追加但flush 语义单独维护TranscriptStore.flushed。3. 状态机submit_message的两条路径3.1 路径 A——max_turns已耗尽不修改状态立即返回def submit_message( self, prompt: str, matched_commands: tuple[str, ...] (), matched_tools: tuple[str, ...] (), denied_tools: tuple[PermissionDenial, ...] (), ) - TurnResult: if len(self.mutable_messages) self.config.max_turns: output fMax turns reached before processing prompt: {prompt} return TurnResult( promptprompt, outputoutput, matched_commandsmatched_commands, matched_toolsmatched_tools, permission_denialsdenied_tools, usageself.total_usage, stop_reasonmax_turns_reached, )设计解读条件用的是 max_turns当已有max_turns条历史时新一轮不再入队。等价于「最多容纳max_turns条用户消息」的会话深度。提前返回不会appendmutable_messages、不会更新total_usage、不会extendpermission_denials。TurnResult里仍带回本次传入的matched_*与denied_tools便于 UI 展示「这一轮本想做什么」但引擎内部状态冻结在边界上。审计细节此路径下返回的permission_denials是参数denied_tools但实例字段self.permission_denials未追加若你需要「累计拒绝次数包含越界轮」需在调用层统一合并。这是移植层的小不一致阅读时留意。3.2 路径 B——正常处理生成输出 → 计量 → 写入状态 → 压缩summary_lines [ fPrompt: {prompt}, fMatched commands: {, .join(matched_commands) if matched_commands else none}, fMatched tools: {, .join(matched_tools) if matched_tools else none}, fPermission denials: {len(denied_tools)}, ] output self._format_output(summary_lines) projected_usage self.total_usage.add_turn(prompt, output) stop_reason completed if projected_usage.input_tokens projected_usage.output_tokens self.config.max_budget_tokens: stop_reason max_budget_reached self.mutable_messages.append(prompt) self.transcript_store.append(prompt) self.permission_denials.extend(denied_tools) self.total_usage projected_usage self.compact_messages_if_needed() return TurnResult( promptprompt, outputoutput, matched_commandsmatched_commands, matched_toolsmatched_tools, permission_denialsdenied_tools, usageself.total_usage, stop_reasonstop_reason, )设计解读「模型输出」的替身summary_lines把本应由上游路由/权限算好的matched_commands、matched_tools、拒绝数量写进人类可读摘要再接模型时这一段可换成真实 completion同时保留相同字段供审计。停止条件max_budget_reached在本 turn 已经计入projected_usage之后才判定本轮仍会落盘append、extend、更新total_usage。与max_turns路径不同——预算是「软顶」越界轮仍进入历史但stop_reason标记为预算耗尽便于产品层提示「下一句要新开会话或清理上下文」。permission_denials把本轮denied_tools累积进实例列表实现跨 turn 的拒绝审计render_summary可读到总次数。compact_messages_if_needed在写入后调用避免历史无限增长。4. 用量模型UsageSummary与「假 token」dataclass(frozenTrue) class UsageSummary: input_tokens: int 0 output_tokens: int 0 def add_turn(self, prompt: str, output: str) - UsageSummary: return UsageSummary( input_tokensself.input_tokens len(prompt.split()), output_tokensself.output_tokens len(output.split()), )学习点字段名叫input_tokens/output_tokens实现却是split()词数。这在移植/教学代码里很常见先打通「累计 → 阈值 → stop_reason」管线再换成真实 tokenizer 或 API 返回的 usage。单测里test_bootstrap_session_tracks_turn_state只断言usage.input_tokens 1与这种近似一致。预算判定stop_reason completed if projected_usage.input_tokens projected_usage.output_tokens self.config.max_budget_tokens: stop_reason max_budget_reached即输入侧伪 token 输出侧伪 token与max_budget_tokens比较。5. 转写与压缩TranscriptStore与compact_messages_if_neededdataclass class TranscriptStore: entries: list[str] field(default_factorylist) flushed: bool False def append(self, entry: str) - None: self.entries.append(entry) self.flushed False def compact(self, keep_last: int 10) - None: if len(self.entries) keep_last: self.entries[:] self.entries[-keep_last:] def replay(self) - tuple[str, ...]: return tuple(self.entries) def flush(self) - None: self.flushed Truedef compact_messages_if_needed(self) - None: if len(self.mutable_messages) self.config.compact_after_turns: self.mutable_messages[:] self.mutable_messages[-self.config.compact_after_turns :] self.transcript_store.compact(self.config.compact_after_turns)学习点双缓冲mutable_messages与transcript_store.entries在submit_message里同步append压缩时也用同一compact_after_turns截尾。flushed语义每次append置Falseflush()置True。persist_session会先flush_transcript()再写盘——与「是否已持久化」叙事对齐。压缩是简单截断不是摘要模型与02_inventory.md中「Compaction 接口先行」一致。6. 审计位权限拒绝、流式事件、摘要报告6.1 权限拒绝本轮元组 vs 实例累积本轮TurnResult.permission_denials denied_tools调用方传入的快照。累积self.permission_denials.extend(denied_tools)仅路径 B。render_summary()暴露累积长度fSession id: {self.session_id}, fConversation turns stored: {len(self.mutable_messages)}, fPermission denials tracked: {len(self.permission_denials)}, fUsage totals: in{self.total_usage.input_tokens} out{self.total_usage.output_tokens}, fMax turns: {self.config.max_turns}, fMax budget tokens: {self.config.max_budget_tokens}, fTranscript flushed: {self.transcript_store.flushed},6.2 流式 API 形状仍无网络stream_submit_message用生成器 yield dict模拟常见 SSE/流式协议的阶段def stream_submit_message( self, prompt: str, matched_commands: tuple[str, ...] (), matched_tools: tuple[str, ...] (), denied_tools: tuple[PermissionDenial, ...] (), ): yield {type: message_start, session_id: self.session_id, prompt: prompt} if matched_commands: yield {type: command_match, commands: matched_commands} if matched_tools: yield {type: tool_match, tools: matched_tools} if denied_tools: yield {type: permission_denial, denials: [denial.tool_name for denial in denied_tools]} result self.submit_message(prompt, matched_commands, matched_tools, denied_tools) yield {type: message_delta, text: result.output} yield { type: message_stop, usage: {input_tokens: result.usage.input_tokens, output_tokens: result.usage.output_tokens}, stop_reason: result.stop_reason, transcript_size: len(self.transcript_store.entries), }学习点前端或中间层可以只对接事件类型无需关心底层是否真流式生成 tokenPortRuntime.bootstrap_session把tuple(engine.stream_submit_message(...))收进RuntimeSession.stream_events便于整段 Markdown 报告调试。6.3render_summary会话 工作区 清单面render_summary拉取build_command_backlog()/build_tool_backlog()与manifest把一次会话自省和移植清单打在一张表里适合python3 -m src.main summary的人类阅读测试见test_query_engine_summary_mentions_workspace。7. 持久化与恢复persist_session/from_saved_sessiondef persist_session(self) - str: self.flush_transcript() path save_session( StoredSession( session_idself.session_id, messagestuple(self.mutable_messages), input_tokensself.total_usage.input_tokens, output_tokensself.total_usage.output_tokens, ) ) return str(path)classmethod def from_saved_session(cls, session_id: str) - QueryEnginePort: stored load_session(session_id) transcript TranscriptStore(entrieslist(stored.messages), flushedTrue) return cls( manifestbuild_port_manifest(), session_idstored.session_id, mutable_messageslist(stored.messages), total_usageUsageSummary(stored.input_tokens, stored.output_tokens), transcript_storetranscript, )学习点磁盘 JSON默认目录.port_sessions/只存session_id、messages、input/output 累计不存permission_denials列表。若生产需要完整审计链应在StoredSession中扩展字段或另建日志。from_saved_session不恢复self.permission_denials新实例为空列表这是当前移植层的缺口/简化。8. 与PortRuntime的衔接谁负责路由谁负责 turnPortRuntime.bootstrap_session在外部完成route_prompt、registry shim、_infer_permission_denials再把匹配结果与拒绝喂给QueryEnginePort路由与权限推断 →runtime会话深度、预算、转写、持久化 →QueryEnginePort职责分离清晰QueryEnginePort 不实现「从自然语言猜工具」只实现「给定结构化输入下的会话语义」——这正是「不接模型也能练」的前提路由可用规则/关键词route_prompt代替模型。run_turn_loop用同一组matches重复submit_message用于压测max_turns / budget / structured_output见test_turn_loop_cli_runs而非模拟真实多轮意图变化。9. CLI 入口速查命令作用python3 -m src.main summaryQueryEnginePort(manifest).render_summary()python3 -m src.main turn-loop ...PortRuntime.run_turn_loop打印每轮output与stop_reasonpython3 -m src.main flush-transcript promptfrom_workspace→submit_message→persist_session打印路径与flushedpython3 -m src.main load-session id直接读StoredSessionJSON10. 小结QueryEnginePort如何把三件事摆对状态机用mutable_messages长度与max_turns定义硬停止越界不写入用total_usage与max_budget_tokens定义软停止越界仍写入但stop_reason标记。停止条件统一从TurnResult.stop_reason读出max_turns_reached/max_budget_reached/completed便于上层统一处理。审计位本轮matched_*与denied_tools进入输出与TurnResult拒绝累积在permission_denials流式事件暴露permission_denial与最终message_stop转写flushed与transcript_size可观测持久化保存消息与用量近似。后续接大模型时保留TurnResult与stream_submit_message的事件形状将summary_lines/_format_output替换为真实生成逻辑并把真实 usage写回UsageSummary即可在不大改外围的情况下完成演进。