深度揭秘 Claude Code 核心技术:AsyncGenerator 工作流

深度揭秘 Claude Code 核心技术:AsyncGenerator 工作流 如何用 AsyncGenerator 实现流式响应如何管理对话状态和历史如何实现权限追踪和预算控制⚠️声明源码基于开源项目 claude-code部分内部实现可能有所不同。️ 核心架构QueryEngine 类类设计概览export class QueryEngine { private config: QueryEngineConfig // 引擎配置 private mutableMessages: Message[] // 对话消息历史 private abortController: AbortController // 中断控制器 private permissionDenials: SDKPermissionDenial[] // 权限拒绝记录 private totalUsage: NonNullableUsage // 累计使用量 private discoveredSkillNames new Setstring() private loadedNestedMemoryPaths new Setstring() async *submitMessage(prompt, options?) { // 核心处理逻辑 } }设计思想每个 QueryEngine 实例对应一个会话内部维护完整的对话状态。 submitMessage核心处理流程这是整个系统最核心的方法采用AsyncGenerator模式实现流式处理async *submitMessage( prompt: string | ContentBlockParam[], options?: { uuid?: string; isMeta?: boolean }, ): AsyncGeneratorSDKMessage, void, unknown {完整流程图┌─────────────────────────────────────────────────────────────┐ │ submitMessage 完整流程 │ ├─────────────────────────────────────────────────────────────┤ │ 1. 初始化配置 │ │ - 解构工具、命令、模型参数 │ │ - 设置工作目录 │ │ - 包装权限检查器追踪拒绝 │ ├─────────────────────────────────────────────────────────────┤ │ 2. 构建系统提示词 │ │ - fetchSystemPromptParts() │ │ - 加载内存机制提示词 │ │ - 合并自定义提示词 │ ├─────────────────────────────────────────────────────────────┤ │ 3. 处理用户输入 │ │ - processUserInput() 处理 slash 命令 │ │ - 返回 shouldQuery 决定是否调用 API │ ├─────────────────────────────────────────────────────────────┤ │ 4. 会话持久化 │ │ - 立即写入 transcript防 kill 丢失 │ │ - fire-and-forget 优化性能 │ ├─────────────────────────────────────────────────────────────┤ │ 5. 查询循环 │ │ - for await (message of query()) │ │ - 处理各种消息类型 │ ├─────────────────────────────────────────────────────────────┤ │ 6. 预算与错误检查 │ │ - maxTurns / maxBudgetUsd / taskBudget │ ├─────────────────────────────────────────────────────────────┤ │ 7. 返回结果 │ │ - 提取文本、累积使用量、返回 result │ └─────────────────────────────────────────────────────────────┘ 第一阶段初始化与配置1.1 配置解构const { cwd, commands, tools, mcpClients, thinkingConfig, maxTurns, maxBudgetUsd, taskBudget, canUseTool, customSystemPrompt, appendSystemPrompt, // ... } this.config1.2 权限包装器const wrappedCanUseTool: CanUseToolFn async ( tool, input, toolUseContext, assistantMessage, toolUseID, forceDecision, ) { const result await canUseTool(...) // 追踪权限拒绝 if (result.behavior ! allow) { this.permissionDenials.push({ tool_name: tool.name, tool_use_id: toolUseID, tool_input: input, }) } return result }设计亮点在调用原始权限检查器的同时自动记录所有被拒绝的调用用于最终报告。1.3 模型与思考配置const initialMainLoopModel userSpecifiedModel ? parseUserSpecifiedModel(userSpecifiedModel) : getMainLoopModel() const initialThinkingConfig: ThinkingConfig thinkingConfig ? thinkingConfig : shouldEnableThinkingByDefault() ! false ? { type: adaptive } : { type: disabled } 第二阶段系统提示词构建核心调用const { defaultSystemPrompt, userContext, systemContext } await fetchSystemPromptParts({ tools, mainLoopModel: initialMainLoopModel, additionalWorkingDirectories: Array.from( initialAppState.toolPermissionContext.additionalWorkingDirectories.keys() ), mcpClients, customSystemPrompt, }) // 组装最终提示词 const systemPrompt asSystemPrompt([ ...(customPrompt ! undefined ? [customPrompt] : defaultSystemPrompt), ...(memoryMechanicsPrompt ? [memoryMechanicsPrompt] : []), ...(appendSystemPrompt ? [appendSystemPrompt] : []), ])⚙️ 第三阶段用户输入处理processUserInput 的职责const { messages: messagesFromUserInput, // 处理后的消息 shouldQuery, // 是否需要调用 LLM allowedTools, // 允许的工具列表 model: modelFromUserInput, // 可能被 slash 命令修改的模型 resultText, // 本地命令的输出结果 } await processUserInput({ input: prompt, mode: prompt, // ... })关键设计shouldQuerytrue需要调用 LLM API 获取响应false本地命令已处理完成直接返回结果例如用户输入/help这是本地命令不需要调用 LLM。 第四阶段会话持久化关键设计// 在 API 调用前就先写入 transcript if (persistSession messagesFromUserInput.length 0) { const transcriptPromise recordTranscript(messages) if (isBareMode()) { // 脚本模式fire-and-forget void transcriptPromise } else { // 交互模式等待写入完成 await transcriptPromise if (isEnvTruthy(process.env.CLAUDE_CODE_EAGER_FLUSH)) { await flushSessionStorage() } } }设计意图即使在 API 响应回来之前进程被 kill如用户点击 Stop--resume也能恢复会话这是 Claude Code 能实现「可恢复会话」的核心保障。 第五阶段查询循环调用 query() 生成器for await (const message of query({ messages, systemPrompt, userContext, systemContext, canUseTool: wrappedCanUseTool, toolUseContext: processUserInputContext, fallbackModel, maxTurns, taskBudget, })) { // 处理各种消息类型 }消息类型分发switch (message.type) { case tombstone: // 删除消息控制信号 break case assistant: // AI 响应 this.mutableMessages.push(message) yield* normalizeMessage(message) break case progress: // 进度通知 this.mutableMessages.push(message) yield* normalizeMessage(message) break case user: turnCount break case stream_event: // 流式事件 if (message.event.type message_start) { // 重置当前消息使用统计 currentMessageUsage EMPTY_USAGE } if (message.event.type message_delta) { // 更新使用量 stop_reason currentMessageUsage updateUsage(currentMessageUsage, message.event.usage) } if (message.event.type message_stop) { // 累积到总使用量 this.totalUsage accumulateUsage(this.totalUsage, currentMessageUsage) } break case attachment: // 附件structured_output, max_turns_reached break case system: // 系统消息compact_boundary, api_error break case tool_use_summary: // 工具调用摘要 yield { type: tool_use_summary, ... } break }️ 第六阶段预算与错误检查三重保护机制// 1. USD 预算检查 if (maxBudgetUsd ! undefined getTotalCost() maxBudgetUsd) { yield { type: result, subtype: error_max_budget_usd, ... } return } // 2. 最大轮次检查 if (message.attachment?.type max_turns_reached) { yield { type: result, subtype: error_max_turns, ... } return } // 3. 结构化输出重试次数检查 if (jsonSchema callsThisQuery maxRetries) { yield { type: result, subtype: error_max_structured_output_retries, ... } return } 第七阶段返回结果// 提取文本结果 let textResult if (result.type assistant) { const lastContent last(result.message.content) if (lastContent?.type text) { textResult lastContent.text } } // 返回成功结果 yield { type: result, subtype: success, result: textResult, duration_ms: Date.now() - startTime, duration_api_ms: getTotalAPIDuration(), num_turns: turnCount, total_cost_usd: getTotalCost(), usage: this.totalUsage, permission_denials: this.permissionDenials, // ... } 核心设计亮点1. AsyncGenerator 模式async *submitMessage(...) { // 初始化 // ... // 边处理边返回 for await (const message of query(...)) { yield* normalizeMessage(message) } // 最终结果 yield { type: result, ... } }优势流式响应用户体验更好内存效率高不需要等完整响应天然支持中断2. 权限追踪// 包装 canUseTool自动记录拒绝 const wrappedCanUseTool async (...) { const result await canUseTool(...) if (result.behavior ! allow) { this.permissionDenials.push(...) } return result }最终返回给调用者包含完整的权限拒绝记录。3. 会话预持久化// 在 API 调用前就写入 if (persistSession) { await recordTranscript(messages) // 用户消息已到达 } // ... 然后才开始 API 调用 for await (const message of query(...)) { // 处理响应 }这确保了即使 API 从未返回会话也可恢复。4. 历史压缩边界if (message.subtype compact_boundary) { // 释放压缩前的消息节省内存 const boundaryIdx this.mutableMessages.length - 1 if (boundaryIdx 0) { this.mutableMessages.splice(0, boundaryIdx) } }通过compact_boundary消息触发历史压缩防止内存无限增长。 数据流总结用户输入 ↓ processUserInput() → shouldQuery? ↓ ┌──────────────────────────────────────────┐ │ YES → query() 调用 LLM │ │ NO → 本地命令执行直接返回结果 │ └──────────────────────────────────────────┘ ↓ for await message of query(): ↓ ├→ assistant → normalizeMessage → yield ├→ progress → normalizeMessage → yield ├→ stream_event → 更新使用量 ├→ system (compact_boundary) → 压缩历史 └→ ...其他类型 ↓ 预算/轮次检查 ↓ 返回 result 总结QueryEngine 展示了构建生产级 AI 对话系统的完整范式特性实现方式价值流式处理AsyncGenerator实时响应、内存高效权限追踪包装 canUseTool透明记录、可审计会话恢复预持久化 transcript可中断、可恢复预算控制多重检查机制成本可控历史压缩compact_boundary长会话内存不爆炸错误处理分类重试、结构化输出鲁棒性