07 人机协作Human-in-the-loop一、什么是人机协作人机协作Human-in-the-loop简称 HITL是 LangGraph 的人工干预机制允许在图执行过程中暂停等待人工审核、修改或决策后继续执行。核心理念在关键决策点引入人工判断确保 AI 系统的输出符合人类预期。概念说明类比中断Interrupt图执行到某个节点时暂停流程中的审批环节恢复Resume人工干预后继续执行审批通过继续流转审批点Breakpoint需要人工介入的节点审批签字点二、为什么需要人机协作2.1 典型场景# ❌ 无人工审核 - 可能产生风险输出graphStateGraph(State)graph.add_node(generate,generate_content)graph.add_node(publish,publish_content)# 直接发布无审核2.2 人机协作的价值# ✅ 有人工审核 - 关键节点暂停等待审批graphStateGraph(State)graph.add_node(generate,generate_content)graph.add_node(review,review_content)# 中断点graph.add_node(publish,publish_content)# 在 review 节点设置中断graph.compile(interrupt_before[review])场景说明人工干预点内容审核AI 生成内容后人工审核发布前工具调用LLM 决定调用工具前确认执行前关键决策业务逻辑需要人工判断决策点数据修改更新数据库前人工确认写入前敏感操作删除、发送等不可逆操作操作前三、中断机制3.1interrupt_before- 节点执行前中断在指定节点执行之前暂停图的执行 interrupt_before 演示 运行方式python 07_中断_执行前.py fromtypingimportTypedDictfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverclassState(TypedDict):task:strresult:strapproved:booldefgenerate_task(state:State):return{task:生成任务: 处理文档,result:}defreview_task(state:State):# 这个节点执行前会中断print(f审核任务:{state[task]})return{approved:True}defexecute_task(state:State):ifstate.get(approved):return{result:f执行完成:{state[task]}}return{result:未批准跳过执行}builderStateGraph(State)builder.add_node(generate,generate_task)builder.add_node(review,review_task)builder.add_node(execute,execute_task)builder.add_edge(START,generate)builder.add_edge(generate,review)builder.add_edge(review,execute)builder.add_edge(execute,END)# 在 review 节点执行前中断memoryMemorySaver()graphbuilder.compile(checkpointermemory,interrupt_before[review]# 关键配置)config{configurable:{thread_id:task_1}}# 第一次执行 - 会在 review 前暂停resultgraph.invoke({task:,result:,approved:False},config)print(f暂停后的状态:{graph.get_state(config)})# 查看下一个要执行的节点stategraph.get_state(config)print(f下一个节点:{state.next})3.2interrupt_after- 节点执行后中断在指定节点执行之后暂停图的执行 interrupt_after 演示 运行方式python 07_中断_执行后.py fromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaver# 在 execute 节点执行后中断graphbuilder.compile(checkpointermemory,interrupt_after[execute]# execute 执行后暂停)3.3 对比总结配置暂停时机适用场景interrupt_before[node]节点执行前审批、确认、人工输入interrupt_after[node]节点执行后结果检查、日志记录四、恢复机制4.1 使用None恢复中断后传入None表示继续执行 恢复执行演示 运行方式python 07_恢复执行.py fromlanggraph.typesimportCommand# 假设已经中断在 review 节点# 方式1: 直接传入 None继续执行resultgraph.invoke(None,config)print(f恢复执行结果:{result})# 方式2: 传入 Command 对象携带额外数据resultgraph.invoke(Command(resume{approved:True}),config)4.2 使用Command恢复Command对象可以携带人工输入的数据 Command 恢复演示 运行方式python 07_Command恢复.py fromlanggraph.typesimportCommandfromlanggraph.checkpoint.memoryimportMemorySaverclassState(TypedDict):query:strhuman_input:strresult:strdefask_human(state:State):# 中断点 - 等待人工输入return{}defprocess_with_input(state:State):human_inputstate.get(human_input,)return{result:f处理结果:{state[query]}{human_input}}builderStateGraph(State)builder.add_node(ask,ask_human)builder.add_node(process,process_with_input)builder.add_edge(START,ask)builder.add_edge(ask,process)builder.add_edge(process,END)memoryMemorySaver()graphbuilder.compile(checkpointermemory,interrupt_before[process])config{configurable:{thread_id:input_1}}# 第一次执行 - 中断graph.invoke({query:用户问题,human_input:,result:},config)# 使用 Command 恢复并传入人工数据resultgraph.invoke(Command(resume{human_input:人工补充信息}),config)print(f结果:{result})五、实际应用场景5.1 内容审核流程 内容审核流程示例 图结构 START ──→ generate ──→ [中断:审核] ──→ publish ──→ END ↓ reject ──→ END fromtypingimportTypedDict,Literalfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverclassContentState(TypedDict):topic:strcontent:strreview_status:str# pending / approved / rejectedpublished:booldefgenerate_content(state:ContentState):contentf关于「{state[topic]}」的AI生成内容...return{content:content,review_status:pending}defreview_content(state:ContentState):# 这里会中断等待人工审核# 人工会通过 Command 传入审核结果return{}defpublish_content(state:ContentState):ifstate.get(review_status)approved:return{published:True}return{published:False}defreject_content(state:ContentState):return{published:False,review_status:rejected}builderStateGraph(ContentState)builder.add_node(generate,generate_content)builder.add_node(review,review_content)builder.add_node(publish,publish_content)builder.add_node(reject,reject_content)builder.add_edge(START,generate)builder.add_edge(generate,review)defroute_after_review(state:ContentState):ifstate.get(review_status)approved:returnpublishreturnrejectbuilder.add_conditional_edges(review,route_after_review)builder.add_edge(publish,END)builder.add_edge(reject,END)memoryMemorySaver()content_graphbuilder.compile(checkpointermemory,interrupt_before[review])# 使用示例config{configurable:{thread_id:content_1}}content_graph.invoke({topic:LangGraph教程,content:,review_status:,published:False},config)# 人工审核通过fromlanggraph.typesimportCommand resultcontent_graph.invoke(Command(resume{review_status:approved}),config)print(f发布结果:{result})5.2 工具调用确认 工具调用确认示例 场景LLM 决定调用敏感工具前需要人工确认 图结构 START ──→ llm_decide ──→ [中断:确认] ──→ call_tool ──→ END fromtypingimportTypedDict,Listfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverclassAgentState(TypedDict):messages:List[dict]tool_calls:List[dict]confirmed:boolresult:strdefllm_decide(state:AgentState):# LLM 决定是否调用工具tool_calls[{name:delete_file,args:{path:/tmp/test.txt}}]return{tool_calls:tool_calls}defconfirm_tool_call(state:AgentState):# 中断点 - 等待人工确认return{}defexecute_tool(state:AgentState):ifstate.get(confirmed):toolstate[tool_calls][0]return{result:f已执行:{tool[name]}}return{result:工具调用被拒绝}builderStateGraph(AgentState)builder.add_node(decide,llm_decide)builder.add_node(confirm,confirm_tool_call)builder.add_node(execute,execute_tool)builder.add_edge(START,decide)builder.add_edge(decide,confirm)builder.add_edge(confirm,execute)builder.add_edge(execute,END)memoryMemorySaver()agent_graphbuilder.compile(checkpointermemory,interrupt_before[execute]# 执行前必须确认)5.3 多步骤审批流程 多步骤审批流程示例 场景复杂业务流程需要多个审批环节 图结构 START ──→ draft ──→ [中断:初审] ──→ revise ──→ [中断:终审] ──→ submit ──→ END fromtypingimportTypedDictfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverclassApprovalState(TypedDict):document:strfirst_review:str# pending / approved / rejectedfinal_review:str# pending / approved / rejectedstatus:strdefdraft_document(state:ApprovalState):return{document:初稿内容...,first_review:pending}deffirst_review(state:ApprovalState):# 中断点1 - 初审return{}defrevise_document(state:ApprovalState):ifstate.get(first_review)approved:return{document:修订后内容...,final_review:pending}return{status:初审被拒}deffinal_review(state:ApprovalState):# 中断点2 - 终审return{}defsubmit_document(state:ApprovalState):ifstate.get(final_review)approved:return{status:已提交}return{status:终审被拒}builderStateGraph(ApprovalState)builder.add_node(draft,draft_document)builder.add_node(first_review,first_review)builder.add_node(revise,revise_document)builder.add_node(final_review,final_review)builder.add_node(submit,submit_document)builder.add_edge(START,draft)builder.add_edge(draft,first_review)builder.add_edge(first_review,revise)builder.add_edge(revise,final_review)builder.add_edge(final_review,submit)builder.add_edge(submit,END)memoryMemorySaver()approval_graphbuilder.compile(checkpointermemory,interrupt_before[first_review,final_review]# 两个中断点)六、与持久化结合人机协作依赖持久化机制来保存中断状态 人机协作 持久化 完整示例 运行方式python 07_完整示例.py fromtypingimportTypedDictfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverfromlanggraph.typesimportCommandclassWorkflowState(TypedDict):step:strdata:strhuman_decision:strresult:strdefstep1(state:WorkflowState):return{step:step1,data:步骤1处理的数据}defhuman_input(state:WorkflowState):# 中断点 - 等待人工输入return{}defstep2(state:WorkflowState):decisionstate.get(human_decision,)return{result:f步骤2基于「{decision}」的处理结果}builderStateGraph(WorkflowState)builder.add_node(step1,step1)builder.add_node(human_input,human_input)builder.add_node(step2,step2)builder.add_edge(START,step1)builder.add_edge(step1,human_input)builder.add_edge(human_input,step2)builder.add_edge(step2,END)memoryMemorySaver()graphbuilder.compile(checkpointermemory,interrupt_before[step2])# 第一次执行config{configurable:{thread_id:workflow_1}}result1graph.invoke({step:,data:,human_decision:,result:},config)print(f中断状态:{graph.get_state(config).values})# 模拟长时间后恢复可能是另一个请求# 系统重启后检查点仍然存在result2graph.invoke(Command(resume{human_decision:人工决策: 继续执行}),config)print(f最终结果:{result2})七、最佳实践7.1 设计原则原则说明示例最小中断只在必要节点设置中断敏感操作前明确提示中断时提供清晰的上下文显示待审核内容超时处理设置中断超时机制防止无限等待回退支持允许人工拒绝并回退审批流程7.2 常见问题问题解决方案中断后如何恢复使用graph.invoke(None, config)或Command如何查看中断状态graph.get_state(config)多个中断点如何区分通过state.next判断下一个节点中断数据丢失使用持久化存储如 PostgreSQL八、核心要点总结要点说明中断时机interrupt_before执行前/interrupt_after执行后恢复方式graph.invoke(None, config)或Command(resume...)依赖持久化必须配合 Checkpointer 使用适用场景内容审核、工具确认、审批流程、人工输入状态查看graph.get_state(config)查看中断状态
07 人机协作(Human-in-the-loop)
07 人机协作Human-in-the-loop一、什么是人机协作人机协作Human-in-the-loop简称 HITL是 LangGraph 的人工干预机制允许在图执行过程中暂停等待人工审核、修改或决策后继续执行。核心理念在关键决策点引入人工判断确保 AI 系统的输出符合人类预期。概念说明类比中断Interrupt图执行到某个节点时暂停流程中的审批环节恢复Resume人工干预后继续执行审批通过继续流转审批点Breakpoint需要人工介入的节点审批签字点二、为什么需要人机协作2.1 典型场景# ❌ 无人工审核 - 可能产生风险输出graphStateGraph(State)graph.add_node(generate,generate_content)graph.add_node(publish,publish_content)# 直接发布无审核2.2 人机协作的价值# ✅ 有人工审核 - 关键节点暂停等待审批graphStateGraph(State)graph.add_node(generate,generate_content)graph.add_node(review,review_content)# 中断点graph.add_node(publish,publish_content)# 在 review 节点设置中断graph.compile(interrupt_before[review])场景说明人工干预点内容审核AI 生成内容后人工审核发布前工具调用LLM 决定调用工具前确认执行前关键决策业务逻辑需要人工判断决策点数据修改更新数据库前人工确认写入前敏感操作删除、发送等不可逆操作操作前三、中断机制3.1interrupt_before- 节点执行前中断在指定节点执行之前暂停图的执行 interrupt_before 演示 运行方式python 07_中断_执行前.py fromtypingimportTypedDictfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverclassState(TypedDict):task:strresult:strapproved:booldefgenerate_task(state:State):return{task:生成任务: 处理文档,result:}defreview_task(state:State):# 这个节点执行前会中断print(f审核任务:{state[task]})return{approved:True}defexecute_task(state:State):ifstate.get(approved):return{result:f执行完成:{state[task]}}return{result:未批准跳过执行}builderStateGraph(State)builder.add_node(generate,generate_task)builder.add_node(review,review_task)builder.add_node(execute,execute_task)builder.add_edge(START,generate)builder.add_edge(generate,review)builder.add_edge(review,execute)builder.add_edge(execute,END)# 在 review 节点执行前中断memoryMemorySaver()graphbuilder.compile(checkpointermemory,interrupt_before[review]# 关键配置)config{configurable:{thread_id:task_1}}# 第一次执行 - 会在 review 前暂停resultgraph.invoke({task:,result:,approved:False},config)print(f暂停后的状态:{graph.get_state(config)})# 查看下一个要执行的节点stategraph.get_state(config)print(f下一个节点:{state.next})3.2interrupt_after- 节点执行后中断在指定节点执行之后暂停图的执行 interrupt_after 演示 运行方式python 07_中断_执行后.py fromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaver# 在 execute 节点执行后中断graphbuilder.compile(checkpointermemory,interrupt_after[execute]# execute 执行后暂停)3.3 对比总结配置暂停时机适用场景interrupt_before[node]节点执行前审批、确认、人工输入interrupt_after[node]节点执行后结果检查、日志记录四、恢复机制4.1 使用None恢复中断后传入None表示继续执行 恢复执行演示 运行方式python 07_恢复执行.py fromlanggraph.typesimportCommand# 假设已经中断在 review 节点# 方式1: 直接传入 None继续执行resultgraph.invoke(None,config)print(f恢复执行结果:{result})# 方式2: 传入 Command 对象携带额外数据resultgraph.invoke(Command(resume{approved:True}),config)4.2 使用Command恢复Command对象可以携带人工输入的数据 Command 恢复演示 运行方式python 07_Command恢复.py fromlanggraph.typesimportCommandfromlanggraph.checkpoint.memoryimportMemorySaverclassState(TypedDict):query:strhuman_input:strresult:strdefask_human(state:State):# 中断点 - 等待人工输入return{}defprocess_with_input(state:State):human_inputstate.get(human_input,)return{result:f处理结果:{state[query]}{human_input}}builderStateGraph(State)builder.add_node(ask,ask_human)builder.add_node(process,process_with_input)builder.add_edge(START,ask)builder.add_edge(ask,process)builder.add_edge(process,END)memoryMemorySaver()graphbuilder.compile(checkpointermemory,interrupt_before[process])config{configurable:{thread_id:input_1}}# 第一次执行 - 中断graph.invoke({query:用户问题,human_input:,result:},config)# 使用 Command 恢复并传入人工数据resultgraph.invoke(Command(resume{human_input:人工补充信息}),config)print(f结果:{result})五、实际应用场景5.1 内容审核流程 内容审核流程示例 图结构 START ──→ generate ──→ [中断:审核] ──→ publish ──→ END ↓ reject ──→ END fromtypingimportTypedDict,Literalfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverclassContentState(TypedDict):topic:strcontent:strreview_status:str# pending / approved / rejectedpublished:booldefgenerate_content(state:ContentState):contentf关于「{state[topic]}」的AI生成内容...return{content:content,review_status:pending}defreview_content(state:ContentState):# 这里会中断等待人工审核# 人工会通过 Command 传入审核结果return{}defpublish_content(state:ContentState):ifstate.get(review_status)approved:return{published:True}return{published:False}defreject_content(state:ContentState):return{published:False,review_status:rejected}builderStateGraph(ContentState)builder.add_node(generate,generate_content)builder.add_node(review,review_content)builder.add_node(publish,publish_content)builder.add_node(reject,reject_content)builder.add_edge(START,generate)builder.add_edge(generate,review)defroute_after_review(state:ContentState):ifstate.get(review_status)approved:returnpublishreturnrejectbuilder.add_conditional_edges(review,route_after_review)builder.add_edge(publish,END)builder.add_edge(reject,END)memoryMemorySaver()content_graphbuilder.compile(checkpointermemory,interrupt_before[review])# 使用示例config{configurable:{thread_id:content_1}}content_graph.invoke({topic:LangGraph教程,content:,review_status:,published:False},config)# 人工审核通过fromlanggraph.typesimportCommand resultcontent_graph.invoke(Command(resume{review_status:approved}),config)print(f发布结果:{result})5.2 工具调用确认 工具调用确认示例 场景LLM 决定调用敏感工具前需要人工确认 图结构 START ──→ llm_decide ──→ [中断:确认] ──→ call_tool ──→ END fromtypingimportTypedDict,Listfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverclassAgentState(TypedDict):messages:List[dict]tool_calls:List[dict]confirmed:boolresult:strdefllm_decide(state:AgentState):# LLM 决定是否调用工具tool_calls[{name:delete_file,args:{path:/tmp/test.txt}}]return{tool_calls:tool_calls}defconfirm_tool_call(state:AgentState):# 中断点 - 等待人工确认return{}defexecute_tool(state:AgentState):ifstate.get(confirmed):toolstate[tool_calls][0]return{result:f已执行:{tool[name]}}return{result:工具调用被拒绝}builderStateGraph(AgentState)builder.add_node(decide,llm_decide)builder.add_node(confirm,confirm_tool_call)builder.add_node(execute,execute_tool)builder.add_edge(START,decide)builder.add_edge(decide,confirm)builder.add_edge(confirm,execute)builder.add_edge(execute,END)memoryMemorySaver()agent_graphbuilder.compile(checkpointermemory,interrupt_before[execute]# 执行前必须确认)5.3 多步骤审批流程 多步骤审批流程示例 场景复杂业务流程需要多个审批环节 图结构 START ──→ draft ──→ [中断:初审] ──→ revise ──→ [中断:终审] ──→ submit ──→ END fromtypingimportTypedDictfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverclassApprovalState(TypedDict):document:strfirst_review:str# pending / approved / rejectedfinal_review:str# pending / approved / rejectedstatus:strdefdraft_document(state:ApprovalState):return{document:初稿内容...,first_review:pending}deffirst_review(state:ApprovalState):# 中断点1 - 初审return{}defrevise_document(state:ApprovalState):ifstate.get(first_review)approved:return{document:修订后内容...,final_review:pending}return{status:初审被拒}deffinal_review(state:ApprovalState):# 中断点2 - 终审return{}defsubmit_document(state:ApprovalState):ifstate.get(final_review)approved:return{status:已提交}return{status:终审被拒}builderStateGraph(ApprovalState)builder.add_node(draft,draft_document)builder.add_node(first_review,first_review)builder.add_node(revise,revise_document)builder.add_node(final_review,final_review)builder.add_node(submit,submit_document)builder.add_edge(START,draft)builder.add_edge(draft,first_review)builder.add_edge(first_review,revise)builder.add_edge(revise,final_review)builder.add_edge(final_review,submit)builder.add_edge(submit,END)memoryMemorySaver()approval_graphbuilder.compile(checkpointermemory,interrupt_before[first_review,final_review]# 两个中断点)六、与持久化结合人机协作依赖持久化机制来保存中断状态 人机协作 持久化 完整示例 运行方式python 07_完整示例.py fromtypingimportTypedDictfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverfromlanggraph.typesimportCommandclassWorkflowState(TypedDict):step:strdata:strhuman_decision:strresult:strdefstep1(state:WorkflowState):return{step:step1,data:步骤1处理的数据}defhuman_input(state:WorkflowState):# 中断点 - 等待人工输入return{}defstep2(state:WorkflowState):decisionstate.get(human_decision,)return{result:f步骤2基于「{decision}」的处理结果}builderStateGraph(WorkflowState)builder.add_node(step1,step1)builder.add_node(human_input,human_input)builder.add_node(step2,step2)builder.add_edge(START,step1)builder.add_edge(step1,human_input)builder.add_edge(human_input,step2)builder.add_edge(step2,END)memoryMemorySaver()graphbuilder.compile(checkpointermemory,interrupt_before[step2])# 第一次执行config{configurable:{thread_id:workflow_1}}result1graph.invoke({step:,data:,human_decision:,result:},config)print(f中断状态:{graph.get_state(config).values})# 模拟长时间后恢复可能是另一个请求# 系统重启后检查点仍然存在result2graph.invoke(Command(resume{human_decision:人工决策: 继续执行}),config)print(f最终结果:{result2})七、最佳实践7.1 设计原则原则说明示例最小中断只在必要节点设置中断敏感操作前明确提示中断时提供清晰的上下文显示待审核内容超时处理设置中断超时机制防止无限等待回退支持允许人工拒绝并回退审批流程7.2 常见问题问题解决方案中断后如何恢复使用graph.invoke(None, config)或Command如何查看中断状态graph.get_state(config)多个中断点如何区分通过state.next判断下一个节点中断数据丢失使用持久化存储如 PostgreSQL八、核心要点总结要点说明中断时机interrupt_before执行前/interrupt_after执行后恢复方式graph.invoke(None, config)或Command(resume...)依赖持久化必须配合 Checkpointer 使用适用场景内容审核、工具确认、审批流程、人工输入状态查看graph.get_state(config)查看中断状态