【26年面试题总结】构建生产级 Agent 系统:三个值得深挖的面试题

【26年面试题总结】构建生产级 Agent 系统:三个值得深挖的面试题 最近面试被问到几个很有意思的问题,整理下来发现它们恰好构成了一条主线:如何把一个 Demo 级的 Agent 做成生产系统。从用户交互、到节点执行、再到状态持久化,每一环都有工程上的权衡。这篇文章挑三个题展开:规划 Agent 完成后,如何让用户在中途编辑大纲?(HITL SSE)LangGraph 的搜索节点怎么并发执行?Session 初始化时,如何优化 checkpoint 的加载速度?一、Human-in-the-loop SSE:规划 Agent 的人工干预场景一个 Research Agent 的典型流程是:用户提问 → Planner Agent 生成大纲 → Search Agent 执行搜索 → Writer Agent 生成报告问题来了:Planner 生成的大纲可能不完全符合用户预期,用户想在搜索开始前手动补充几条。也就是说,流程要从:Planner → Search变成:Planner → [用户编辑] → Search这里有两个技术难点:图怎么暂停?Search 节点不能直接跑,得等用户输入SSE 怎么处理?SSE 是单向流,服务端推给客户端,用户的编辑怎么传回来?LangGraph 的 interrupt 机制LangGraph 原生支持暂停/恢复,核心是interruptcheckpointer:from langgraph.types import interrupt, Commandfrom langgraph.checkpoint.postgres import PostgresSaverdef plan_review_node(state): # 把当前大纲抛给外部,图在这里暂停并写 checkpoint user_edit interrupt({ type: outline_review, outline: state[outline], }) # 恢复执行时,user_edit 就是外部传回的内容 return {outline: user_edit}graph builder.compile(checkpointerPostgresSaver(...))interrupt的本质是抛出一个特殊异常,LangGraph 捕获后把当前 state 持久化到 checkpointer,并把 interrupt 的 payload 作为本次stream()的最后一个事件返回。恢复时用Command(resume...):graph.stream( Command(resumeedited_outline), config{configurable: {thread_id: xxx}})**关键点:thread_id**是串联两次请求的唯一纽带,checkpointer 靠它找到上次暂停的位置。SSE 层的协议设计SSE 是单向的(服务端 → 客户端),所以用户编辑这个动作不能走 SSE,必须用一个新的 HTTP 请求。完整流程:┌──────────┐ ┌──────────┐│ Frontend │ │ Backend │└────┬─────┘ └────┬─────┘ │ │ │ POST /run { query, thread_id } │ ├────────────────────────────────────────│ │ │ │ SSE: eventnode_update (planner done) │ │────────────────────────────────────────┤ │ │ │ SSE: eventinterrupt (outline) │ │────────────────────────────────────────┤ │ SSE: [DONE] ─ 关闭连接 │ │ │ │ [用户编辑大纲...] │ │ │ │ POST /resume { thread_id, outline } │ ├────────────────────────────────────────│ │ │ │ SSE: eventnode_update (search 1) │ │────────────────────────────────────────┤ │ SSE: eventnode_update (search 2) │ │────────────────────────────────────────┤ │ SSE: eventdone │ │────────────────────────────────────────┤后端实现from fastapi import FastAPIfrom fastapi.responses import StreamingResponseimport jsonapp FastAPI()asyncdef stream_graph(inputs, thread_id): config {configurable: {thread_id: thread_id}} asyncfor event in graph.astream(inputs, config, stream_modeupdates): # 普通节点更新 yieldfevent: node_update\ndata: {json.dumps(event)}\n\n # 检查是否触发了 interrupt if__interrupt__in event: payload event[__interrupt__][0].value yieldfevent: interrupt\ndata: {json.dumps(payload)}\n\n return# 主动结束 SSE,等待用户输入app.post(/run)asyncdef run(body: dict): return StreamingResponse( stream_graph({query: body[query]}, body[thread_id]), media_typetext/event-stream, )app.post(/resume)asyncdef resume(body: dict): return StreamingResponse( stream_graph( Command(resumebody[outline]), body[thread_id], ), media_typetext/event-stream, )前端实现const threadId crypto.randomUUID();asyncfunction run(query) {const resp await fetch(/run, { method: POST, body: JSON.stringify({ query, thread_id: threadId }), });await handleSSE(resp);}asyncfunction handleSSE(resp) {const reader resp.body.getReader();const decoder new TextDecoder();let buffer ;while (true) { const { done, value } await reader.read(); if (done) break; buffer decoder.decode(value); const events buffer.split(\n\n); buffer events.pop(); for (const raw of events) { const { event, data } parseSSE(raw); if (event interrupt) { // 切换到编辑态,等待用户提交 const edited await showOutlineEditor(JSON.parse(data)); await resume(edited); return; } if (event node_update) { renderUpdate(JSON.parse(data)); } } }}asyncfunction resume(outline) {const resp await fetch(/resume, { method: POST, body: JSON.stringify({ thread_id: threadId, outline }), });await handleSSE(resp);}几个容易踩的坑thread_id 必须前端生成并持久化,否则 resume 时找不到暂停位置SSE 遇到interrupt要主动关闭,不要挂起连接等用户——用户可能编辑十分钟,长连接不划算Checkpointer 必须开启,interrupt依赖它持久化 state前端要做幂等处理,用户可能刷新页面,这时需要用thread_id重新拉取当前 state二、LangGraph 节点并发:三种模式问题Planner 生成了 5 个搜索 query,一个一个串行搜?那一个请求得跑几十秒。LangGraph 支持并发,但方式不止一种,得看场景选。模式 1:静态 Fan-out(编译期决定)图结构里直接定义多条并行边,适合分支数固定的场景:builder.add_edge(planner, search_web)builder.add_edge(planner, search_arxiv)builder.add_edge(planner, search_internal)builder.add_edge(search_web, aggregator)builder.add_edge(search_arxiv, aggregator)builder.add_edge(search_internal, aggregator)LangGraph 会自动并行跑三个 search 节点,全部完成后进 aggregator。State 合并要注意:三个节点同时写results,必须用 reducer:from typing import Annotatedimport operatorclass State(TypedDict): results: Annotated[list, operator.add] # 并发写自动合并模式 2:Send API 动态派发(运行期决定)Planner 生成几个 query 是动态的,这时用Send:from langgraph.types import Senddef dispatch_searches(state): # 根据 planner 产出的 queries 动态派发 return [ Send(search_node, {query: q, topic: state[topic]}) for q in state[queries] ]builder.add_conditional_edges( planner, dispatch_searches, [search_node],)builder.add_edge(search_node, aggregator)每个Send都会启动一份search_node的独立执行,各自的 state 互不干扰,最后通过 reducer 合并到主 state。这是处理N 个搜索****query最优雅的方式,N 可以是 1 也可以是 100。模式 3:节点内部并发(手动控制)如果你不想暴露 N 个节点,只想让一个节点内部并发调多个 API:import asyncioasync def search_node(state): queries state[queries] results await asyncio.gather( *[call_search_api(q) for q in queries] ) return {results: results}适用场景:并发粒度很细(比如 50 个 query),不想让图结构膨胀不需要每个 query 单独 checkpoint(Send 模式每个子任务都会 checkpoint)三种模式怎么选?维度静态 Fan-outSend API节点内并发分支数编译期固定运行时动态运行时动态可观测性高(每个节点独立)高低(黑盒)Checkpoint 粒度每节点每子任务整个节点失败重试节点级子任务级需自己实现适用场景固定几路搜索N 个同类子任务细粒度 I/O 并发经验法则:Query 数固定 → 静态 Fan-outQuery 数动态、需要独立重试 → Send API纯 I/O 批量调用、不关心单个失败 → 节点内asyncio.gather三、Checkpoint 加载优化问题Session 越多,checkpoint 表越大。用户打开一个历史对话,如果加载要 2 秒,体验就崩了。怎么优化?先看 LangGraph checkpoint 表的大致结构(以 Postgres 为例):CREATE TABLE checkpoints ( thread_id TEXTNOTNULL, checkpoint_ns TEXTNOTNULLDEFAULT, checkpoint_id TEXTNOTNULL, parent_checkpoint_id TEXT, checkpoint JSONB NOTNULL, -- 完整 state metadata JSONB NOTNULL, PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id));加载最新 checkpoint 的查询:SELECT checkpoint, metadata FROM checkpoints WHERE thread_id $1 ORDER BY checkpoint_id DESC LIMIT 1;优化手段1、索引:B-tree 复合索引默认主键已经覆盖(thread_id, checkpoint_ns, checkpoint_id),这是 B-tree 复合索引。查询按thread_id过滤 按checkpoint_id排序,最左前缀匹配主键,可以直接命中索引且免排序。这里有个小知识:PG 的 B-tree 叶子节点按序存储,ORDER BY checkpoint_id DESC LIMIT 1等价于沿着索引反向读第一个,O(log N) 定位 O(1) 读取。2、懒加载:分离元数据和 State Blob如果 state 很大(比如包含完整聊天历史 搜索结果),每次都反序列化整个 JSONB 很慢。拆成两张表:-- 热表:只存元数据,小而快CREATETABLE checkpoint_meta ( thread_id TEXT, checkpoint_id TEXT, blob_ref UUID, created_at TIMESTAMPTZ, PRIMARY KEY (thread_id, checkpoint_id));-- 冷表:存大 blob,按需加载CREATETABLE checkpoint_blobs ( blob_ref UUID PRIMARY KEY, data BYTEA -- msgpack zstd 压缩);Session 初始化时先查 meta 表拿到blob_ref,blob 可以懒加载(比如只在用户真正滚动查看历史时才拉)。3、压缩:msgpack zstdJSONB 的存储有点冗余。改用msgpack序列化 zstd压缩,一般能压到 1/5 到 1/10:import msgpack, zstandard as zstddef serialize(state): return zstd.compress(msgpack.packb(state))def deserialize(data): return msgpack.unpackb(zstd.decompress(data))小 state 可能得不偿失(压缩开销 I/O 节省),建议 10KB****才压缩。4、缓存层:Redis 挡在 PG 前面热 session 放 Redis,TTL 30 分钟:async def load_checkpoint(thread_id): # L1: Redis cached await redis.get(fckpt:{thread_id}) if cached: return deserialize(cached) # L2: Postgres row await pg.fetchrow( SELECT checkpoint FROM checkpoints WHERE thread_id$1 ORDER BY checkpoint_id DESC LIMIT 1, thread_id, ) if row: await redis.setex(fckpt:{thread_id}, 1800, serialize(row)) return row注意缓存一致性:每次写 checkpoint 后要同步更新或删除 Redis key。5、连接池 异步驱动同步psycopg2每次建连接要几十毫秒。换asyncpg 连接池:pool await asyncpg.create_pool( dsnDSN, min_size10, max_size50,)生产环境再套一层pgbouncer(transaction pooling 模式),共享连接数。6、冷热分离 归档90 天前的 checkpoint 迁到归档表,保持热表小:-- 分区表按月划分CREATE TABLE checkpoints_2026_04 PARTITION OF checkpointsFOR VALUES FROM (2026-04-01) TO (2026-05-01);PG 的分区表配合pg_partman可以自动滚动,老分区 detach 后归档到 S3。优化效果的一般经验值优化项加载延迟降幅复合索引基线(没有就是几秒全表扫)懒加载 blob30-50%压缩I/O 减少,网络/磁盘慢时效果明显Redis 缓存热路径降到个位数毫秒连接池节省 10-50ms 连接建立开销优先级建议:索引是底线 → 连接池次之 → 命中率高的 session 上 Redis → state 真的很大才搞懒加载和压缩。不要过度工程。总结回头看这三个问题,其实对应 Agent 系统的三个核心能力:HITL** SSE**:让 Agent 能和人协作,不是黑盒跑完节点并发:让 Agent 跑得快,不是串行等死Checkpoint****优化:让 Agent 能记住事,不是每次从零开始一个生产级 Agent 系统,这三块缺一不可。框架(LangGraph)帮你解决了一部分,但协议设计、数据库优化、前后端协同这些事情,还得自己思考。面试如果能把这三块串起来讲,基本能让面试官知道你真的做过东西,而不是只跑过 demo。学AI大模型的正确顺序千万不要搞错了2026年AI风口已来各行各业的AI渗透肉眼可见超多公司要么转型做AI相关产品要么高薪挖AI技术人才机遇直接摆在眼前有往AI方向发展或者本身有后端编程基础的朋友直接冲AI大模型应用开发转岗超合适就算暂时不打算转岗了解大模型、RAG、Prompt、Agent这些热门概念能上手做简单项目也绝对是求职加分王给大家整理了超全最新的AI大模型应用开发学习清单和资料手把手帮你快速入门学习路线:✅大模型基础认知—大模型核心原理、发展历程、主流模型GPT、文心一言等特点解析✅核心技术模块—RAG检索增强生成、Prompt工程实战、Agent智能体开发逻辑✅开发基础能力—Python进阶、API接口调用、大模型开发框架LangChain等实操✅应用场景开发—智能问答系统、企业知识库、AIGC内容生成工具、行业定制化大模型应用✅项目落地流程—需求拆解、技术选型、模型调优、测试上线、运维迭代✅面试求职冲刺—岗位JD解析、简历AI项目包装、高频面试题汇总、模拟面经以上6大模块看似清晰好上手实则每个部分都有扎实的核心内容需要吃透我把大模型的学习全流程已经整理好了抓住AI时代风口轻松解锁职业新可能希望大家都能把握机遇实现薪资/职业跃迁这份完整版的大模型 AI 学习资料已经上传CSDN朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费】