文章目录HumanInTheLoopMiddleware — 人工审批代码示例HumanInTheLoopMiddleware — 人工审批作用在 Agent 执行特定工具前暂停等待人工审批支持批准、编辑或拒绝。前提条件需要配置checkpointer检查点器来维护中断状态。构造参数参数类型说明interrupt_ondict工具名到审批配置的映射。True表示需要审批False表示不需要字典可指定allowed_decisions代码示例fromlangchain.agentsimportcreate_agentfromlangchain.agents.middlewareimportHumanInTheLoopMiddlewarefromlanggraph.checkpoint.memoryimportInMemorySaverfromsrc.utils.configimportConfigfromrichimportprintasrprint modelConfig.get_default_model()defyour_read_email_tool(email_id:str)-str:Mock function to read an email by its ID.returnfEmail content for ID:{email_id}defyour_send_email_tool(recipient:str,subject:str,body:str)-str:Mock function to send an email.returnfEmail sent to{recipient}with subject {subject}agentcreate_agent(modelmodel,tools[your_read_email_tool,your_send_email_tool],checkpointerInMemorySaver(),middleware[HumanInTheLoopMiddleware(interrupt_on{your_send_email_tool:{allowed_decisions:[approve,edit,reject],},your_read_email_tool:True,}),],)config{configurable:{thread_id:demo-thread-1}}forchunkinagent.stream({messages:[{role:user,content:请读取邮件ID为123的邮件}]},configconfig,):rprint(chunk)# 运行时当 Agent 尝试调用 send_email 时会中断# 人工可以通过 Command 对象审批/修改/拒绝代码示例importjsonimportsysfromlangchain.agentsimportcreate_agentfromlangchain.agents.middlewareimportHumanInTheLoopMiddlewarefromlanggraph.checkpoint.memoryimportInMemorySaverfromlanggraph.typesimportCommandfromsrc.utils.configimportConfig modelConfig.get_default_model()defyour_read_email_tool(email_id:str)-str:读取指定ID的邮件内容emails{123:【邮件 #123】发件人: 王经理 | 主题: 项目进展 | 明天下午3点开会请提前准备Q2报告。,456:【邮件 #456】发件人: HR | 主题: 年假通知 | 您的年假还剩5天请于12月底前使用完毕。,}returnemails.get(email_id,f[系统] 未找到 ID{email_id}的邮件)defyour_send_email_tool(recipient:str,subject:str,body:str)-str:发送邮件returnf✅ 邮件已发送 | 收件人:{recipient}| 主题:{subject}defyour_delete_email_tool(email_id:str)-str:删除指定ID的邮件returnf️ 邮件 ID{email_id}已删除# # 创建 agent# checkpointerInMemorySaver()agentcreate_agent(modelmodel,tools[your_read_email_tool,your_send_email_tool,your_delete_email_tool],checkpointercheckpointer,middleware[HumanInTheLoopMiddleware(interrupt_on{your_send_email_tool:{allowed_decisions:[approve,edit,reject],},your_delete_email_tool:{allowed_decisions:[approve,reject],},your_read_email_tool:True,}),],)# # 安全 input — 兼容管道输入非 TTY 时 EOF 返回默认值# def_safe_input(prompt:str,default:str)-str:安全的 input 封装管道模式下 EOF 不会崩溃返回默认值ifnotsys.stdin.isatty():try:valinput(prompt)returnvalifval.strip()elsedefaultexceptEOFError:returndefaultreturninput(prompt).strip()ordefault# # 交互式 HITL 流程# defrun_with_hitl(user_message:str,thread_id:str): 执行 agent 并在遇到中断时通过终端交互收集人工决策。 config{configurable:{thread_id:thread_id}}input_data:dict|Command{messages:[{role:user,content:user_message}]}whileTrue:interrupt_dataNoneforchunkinagent.stream(input_data,configconfig):if__interrupt__inchunk:interrupt_datachunk[__interrupt__][0].valueelse:_print_chunk(chunk)ifinterrupt_dataisNone:break# 没有中断流程结束# ---------- 收集人工决策 ----------decisions_collect_decisions(interrupt_data)input_dataCommand(resume{decisions:decisions})def_print_chunk(chunk:dict):打印 stream 输出 — 区分真实工具执行和中间件注入的人工决策消息fornode_name,node_outputinchunk.items():ifnode_outputisNone:continuemessagesnode_output.get(messages,[])formsginmessages:msg_typetype(msg).__name__ contentgetattr(msg,content,)namegetattr(msg,name,None)tool_callsgetattr(msg,tool_calls,None)statusgetattr(msg,status,None)# --- AIMessage ---ifmsg_typeAIMessageandtool_calls:# 中间件输出的 AIMessage 是修改后的副本已在上轮打印过跳过ifnode_nameHumanInTheLoopMiddleware.after_model:continuefortcintool_calls:print(f LLM 计划调用 →{tc[name]}({_fmt_args(tc[args])}))elifmsg_typeAIMessageandcontent:print(f LLM 回复 →{content})# --- ToolMessage ---elifmsg_typeToolMessage:ifnode_nameHumanInTheLoopMiddleware.after_model:# 中间件注入的人工决策消息 — 工具并未真正执行ifstatuserror:print(f 人工拒绝 [{name}] →{content[:120]})else:print(f ✅ 人工批准 [{name}] → 即将执行)else:# 真实工具执行结果icon❌ifstatuserrorelseprint(f{icon}工具执行 [{name}] →{content[:120]})# --- HumanMessage ---elifmsg_typeHumanMessage:print(f 用户 →{content})def_collect_decisions(interrupt_data:dict)-list[dict]:在终端中交互式收集人工决策requestsinterrupt_data[action_requests]configsinterrupt_data[review_configs]_print_separator(f 需要人工审批 — 共{len(requests)}个工具调用)decisions[]fori,(req,cfg)inenumerate(zip(requests,configs)):tool_namereq[name]tool_argsreq[args]allowedcfg[allowed_decisions]print(f\n 工具 [{i1}/{len(requests)}]:{tool_name})print(f 参数:{_fmt_args(tool_args)})print(f 可选:{, .join(allowed)})decision_prompt_decision(tool_name,tool_args,allowed)decisions.append(decision)print(f ✅ 已记录 →{_describe_decision(decision)})_print_separator()returndecisionsdef_prompt_decision(tool_name:str,tool_args:dict,allowed:list[str])-dict:提示用户输入决策whileTrue:choice_safe_input( 请输入操作: ).strip().lower()ifchoicenotinallowed:print(f ⚠️ 无效操作可选:{, .join(allowed)})continueifchoiceapprove:return{type:approve}ifchoiceedit:ifeditnotinallowed:print( ⚠️ 该工具不支持 edit)continueprint(f 请输入新参数 JSON (回车保留原值):)print(f 原参数:{_fmt_args(tool_args)})new_json_safe_input( → )new_args_parse_edit_args(tool_args,new_json)return{type:edit,edited_action:{name:tool_name,args:new_args}}ifchoicereject:reason_safe_input( 拒绝理由 (可留空): )decision{type:reject}ifreason:decision[message]reasonreturndecisionifchoicerespond:msg_safe_input( 回复内容: ).strip()ifnotmsg:print( ⚠️ respond 必须填写回复内容)continuereturn{type:respond,message:msg}def_parse_edit_args(original:dict,new_json:str)-dict:解析编辑后的参数 — 接受 JSON 或 keyvalue 格式ifnotnew_json:returndict(original)new_jsonnew_json.strip()# 尝试 JSON 格式: {recipient: 李四}ifnew_json.startswith({):try:mergeddict(original)merged.update(json.loads(new_json))returnmergedexceptjson.JSONDecodeError:print(f ⚠️ JSON 解析失败保留原值)returndict(original)# keyvalue 格式: recipient李四resultdict(original)forpartinnew_json.split(,):partpart.strip()ifinpart:key,_,valuepart.partition()key,valuekey.strip(),value.strip()ifkeyinresult:result[key]_coerce_type(value,result[key])returnresultdef_coerce_type(value_str:str,original):尝试将字符串还原为原始类型ifisinstance(original,bool):returnvalue_str.lower()in(true,yes,1)ifisinstance(original,int):try:returnint(value_str)exceptValueError:returnvalue_strifisinstance(original,float):try:returnfloat(value_str)exceptValueError:returnvalue_strreturnvalue_strdef_describe_decision(d:dict)-str:人类可读的决策摘要td[type]iftapprove:return批准 ✅iftedit:returnf编辑 →{_fmt_args(d[edited_action][args])}iftreject:msgd.get(message,)returnf拒绝 ❌{msg}iftrespond:returnf人工回复 {d.get(message,)}returntdef_fmt_args(args:dict)-str:returnjson.dumps(args,ensure_asciiFalse)def_print_separator(title:str):w55iftitle:padmax(0,(w-len(title)-2)//2)print(\n─*padf{title}─*pad)else:print(─*w)# # 交互式演示入口# if__name____main__:print(*55)print( Human-in-the-Loop 交互式演示)ifnotsys.stdin.isatty():print( (管道模式 — 从 stdin 读取决策))print(*55)# ---- 场景1读邮件 ----print(\n 场景1: 用户请求读取邮件)print(-*55)run_with_hitl(帮我读取邮件ID为123的邮件,thread_idthread-1)# ---- 场景2发邮件 ----print(\n 场景2: 用户请求发送邮件)print(-*55)run_with_hitl(帮我给张三发一封邮件主题: 请假内容: 明天请假一天,thread_idthread-2)# ---- 场景3删邮件 ----print(\n️ 场景3: 用户请求删除邮件)print(-*55)run_with_hitl(帮我删除邮件ID为456的邮件,thread_idthread-3)print(\n*55)print( ✅ 演示结束感谢配合)print(*55)
langchain 内置中间件详解 -HumanInTheLoopMiddleware — 人工审批
文章目录HumanInTheLoopMiddleware — 人工审批代码示例HumanInTheLoopMiddleware — 人工审批作用在 Agent 执行特定工具前暂停等待人工审批支持批准、编辑或拒绝。前提条件需要配置checkpointer检查点器来维护中断状态。构造参数参数类型说明interrupt_ondict工具名到审批配置的映射。True表示需要审批False表示不需要字典可指定allowed_decisions代码示例fromlangchain.agentsimportcreate_agentfromlangchain.agents.middlewareimportHumanInTheLoopMiddlewarefromlanggraph.checkpoint.memoryimportInMemorySaverfromsrc.utils.configimportConfigfromrichimportprintasrprint modelConfig.get_default_model()defyour_read_email_tool(email_id:str)-str:Mock function to read an email by its ID.returnfEmail content for ID:{email_id}defyour_send_email_tool(recipient:str,subject:str,body:str)-str:Mock function to send an email.returnfEmail sent to{recipient}with subject {subject}agentcreate_agent(modelmodel,tools[your_read_email_tool,your_send_email_tool],checkpointerInMemorySaver(),middleware[HumanInTheLoopMiddleware(interrupt_on{your_send_email_tool:{allowed_decisions:[approve,edit,reject],},your_read_email_tool:True,}),],)config{configurable:{thread_id:demo-thread-1}}forchunkinagent.stream({messages:[{role:user,content:请读取邮件ID为123的邮件}]},configconfig,):rprint(chunk)# 运行时当 Agent 尝试调用 send_email 时会中断# 人工可以通过 Command 对象审批/修改/拒绝代码示例importjsonimportsysfromlangchain.agentsimportcreate_agentfromlangchain.agents.middlewareimportHumanInTheLoopMiddlewarefromlanggraph.checkpoint.memoryimportInMemorySaverfromlanggraph.typesimportCommandfromsrc.utils.configimportConfig modelConfig.get_default_model()defyour_read_email_tool(email_id:str)-str:读取指定ID的邮件内容emails{123:【邮件 #123】发件人: 王经理 | 主题: 项目进展 | 明天下午3点开会请提前准备Q2报告。,456:【邮件 #456】发件人: HR | 主题: 年假通知 | 您的年假还剩5天请于12月底前使用完毕。,}returnemails.get(email_id,f[系统] 未找到 ID{email_id}的邮件)defyour_send_email_tool(recipient:str,subject:str,body:str)-str:发送邮件returnf✅ 邮件已发送 | 收件人:{recipient}| 主题:{subject}defyour_delete_email_tool(email_id:str)-str:删除指定ID的邮件returnf️ 邮件 ID{email_id}已删除# # 创建 agent# checkpointerInMemorySaver()agentcreate_agent(modelmodel,tools[your_read_email_tool,your_send_email_tool,your_delete_email_tool],checkpointercheckpointer,middleware[HumanInTheLoopMiddleware(interrupt_on{your_send_email_tool:{allowed_decisions:[approve,edit,reject],},your_delete_email_tool:{allowed_decisions:[approve,reject],},your_read_email_tool:True,}),],)# # 安全 input — 兼容管道输入非 TTY 时 EOF 返回默认值# def_safe_input(prompt:str,default:str)-str:安全的 input 封装管道模式下 EOF 不会崩溃返回默认值ifnotsys.stdin.isatty():try:valinput(prompt)returnvalifval.strip()elsedefaultexceptEOFError:returndefaultreturninput(prompt).strip()ordefault# # 交互式 HITL 流程# defrun_with_hitl(user_message:str,thread_id:str): 执行 agent 并在遇到中断时通过终端交互收集人工决策。 config{configurable:{thread_id:thread_id}}input_data:dict|Command{messages:[{role:user,content:user_message}]}whileTrue:interrupt_dataNoneforchunkinagent.stream(input_data,configconfig):if__interrupt__inchunk:interrupt_datachunk[__interrupt__][0].valueelse:_print_chunk(chunk)ifinterrupt_dataisNone:break# 没有中断流程结束# ---------- 收集人工决策 ----------decisions_collect_decisions(interrupt_data)input_dataCommand(resume{decisions:decisions})def_print_chunk(chunk:dict):打印 stream 输出 — 区分真实工具执行和中间件注入的人工决策消息fornode_name,node_outputinchunk.items():ifnode_outputisNone:continuemessagesnode_output.get(messages,[])formsginmessages:msg_typetype(msg).__name__ contentgetattr(msg,content,)namegetattr(msg,name,None)tool_callsgetattr(msg,tool_calls,None)statusgetattr(msg,status,None)# --- AIMessage ---ifmsg_typeAIMessageandtool_calls:# 中间件输出的 AIMessage 是修改后的副本已在上轮打印过跳过ifnode_nameHumanInTheLoopMiddleware.after_model:continuefortcintool_calls:print(f LLM 计划调用 →{tc[name]}({_fmt_args(tc[args])}))elifmsg_typeAIMessageandcontent:print(f LLM 回复 →{content})# --- ToolMessage ---elifmsg_typeToolMessage:ifnode_nameHumanInTheLoopMiddleware.after_model:# 中间件注入的人工决策消息 — 工具并未真正执行ifstatuserror:print(f 人工拒绝 [{name}] →{content[:120]})else:print(f ✅ 人工批准 [{name}] → 即将执行)else:# 真实工具执行结果icon❌ifstatuserrorelseprint(f{icon}工具执行 [{name}] →{content[:120]})# --- HumanMessage ---elifmsg_typeHumanMessage:print(f 用户 →{content})def_collect_decisions(interrupt_data:dict)-list[dict]:在终端中交互式收集人工决策requestsinterrupt_data[action_requests]configsinterrupt_data[review_configs]_print_separator(f 需要人工审批 — 共{len(requests)}个工具调用)decisions[]fori,(req,cfg)inenumerate(zip(requests,configs)):tool_namereq[name]tool_argsreq[args]allowedcfg[allowed_decisions]print(f\n 工具 [{i1}/{len(requests)}]:{tool_name})print(f 参数:{_fmt_args(tool_args)})print(f 可选:{, .join(allowed)})decision_prompt_decision(tool_name,tool_args,allowed)decisions.append(decision)print(f ✅ 已记录 →{_describe_decision(decision)})_print_separator()returndecisionsdef_prompt_decision(tool_name:str,tool_args:dict,allowed:list[str])-dict:提示用户输入决策whileTrue:choice_safe_input( 请输入操作: ).strip().lower()ifchoicenotinallowed:print(f ⚠️ 无效操作可选:{, .join(allowed)})continueifchoiceapprove:return{type:approve}ifchoiceedit:ifeditnotinallowed:print( ⚠️ 该工具不支持 edit)continueprint(f 请输入新参数 JSON (回车保留原值):)print(f 原参数:{_fmt_args(tool_args)})new_json_safe_input( → )new_args_parse_edit_args(tool_args,new_json)return{type:edit,edited_action:{name:tool_name,args:new_args}}ifchoicereject:reason_safe_input( 拒绝理由 (可留空): )decision{type:reject}ifreason:decision[message]reasonreturndecisionifchoicerespond:msg_safe_input( 回复内容: ).strip()ifnotmsg:print( ⚠️ respond 必须填写回复内容)continuereturn{type:respond,message:msg}def_parse_edit_args(original:dict,new_json:str)-dict:解析编辑后的参数 — 接受 JSON 或 keyvalue 格式ifnotnew_json:returndict(original)new_jsonnew_json.strip()# 尝试 JSON 格式: {recipient: 李四}ifnew_json.startswith({):try:mergeddict(original)merged.update(json.loads(new_json))returnmergedexceptjson.JSONDecodeError:print(f ⚠️ JSON 解析失败保留原值)returndict(original)# keyvalue 格式: recipient李四resultdict(original)forpartinnew_json.split(,):partpart.strip()ifinpart:key,_,valuepart.partition()key,valuekey.strip(),value.strip()ifkeyinresult:result[key]_coerce_type(value,result[key])returnresultdef_coerce_type(value_str:str,original):尝试将字符串还原为原始类型ifisinstance(original,bool):returnvalue_str.lower()in(true,yes,1)ifisinstance(original,int):try:returnint(value_str)exceptValueError:returnvalue_strifisinstance(original,float):try:returnfloat(value_str)exceptValueError:returnvalue_strreturnvalue_strdef_describe_decision(d:dict)-str:人类可读的决策摘要td[type]iftapprove:return批准 ✅iftedit:returnf编辑 →{_fmt_args(d[edited_action][args])}iftreject:msgd.get(message,)returnf拒绝 ❌{msg}iftrespond:returnf人工回复 {d.get(message,)}returntdef_fmt_args(args:dict)-str:returnjson.dumps(args,ensure_asciiFalse)def_print_separator(title:str):w55iftitle:padmax(0,(w-len(title)-2)//2)print(\n─*padf{title}─*pad)else:print(─*w)# # 交互式演示入口# if__name____main__:print(*55)print( Human-in-the-Loop 交互式演示)ifnotsys.stdin.isatty():print( (管道模式 — 从 stdin 读取决策))print(*55)# ---- 场景1读邮件 ----print(\n 场景1: 用户请求读取邮件)print(-*55)run_with_hitl(帮我读取邮件ID为123的邮件,thread_idthread-1)# ---- 场景2发邮件 ----print(\n 场景2: 用户请求发送邮件)print(-*55)run_with_hitl(帮我给张三发一封邮件主题: 请假内容: 明天请假一天,thread_idthread-2)# ---- 场景3删邮件 ----print(\n️ 场景3: 用户请求删除邮件)print(-*55)run_with_hitl(帮我删除邮件ID为456的邮件,thread_idthread-3)print(\n*55)print( ✅ 演示结束感谢配合)print(*55)