hermes源码学习6--工具运行时

hermes源码学习6--工具运行时 Hermes 工具是自注册函数按 toolset工具集分组并通过中央注册表/调度系统执行。主要文件tools/registry.pymodel_tools.pytoolsets.pytools/terminal_tool.pytools/environments/*工具注册模型每个工具模块在导入时调用registry.register(...)。model_tools.py负责导入/发现工具模块并构建供模型使用的 schema 列表。registry.register()的工作原理tools/中的每个工具文件在模块级别调用registry.register()来声明自身。函数签名如下registry.register( nameterminal, # 唯一工具名称用于 API schema toolsetterminal, # 该工具所属的 toolset schema{...}, # OpenAI function-calling schema描述、参数 handlerhandle_terminal, # 工具被调用时执行的函数 check_fncheck_terminal, # 可选返回 True/False 表示是否可用 requires_env[SOME_VAR], # 可选所需的环境变量用于 UI 显示 is_asyncFalse, # handler 是否为异步协程 descriptionRun commands, # 人类可读的描述 emoji, # 用于 spinner/进度显示的 emoji )每次调用都会创建一个ToolEntry以工具名称为键存储在单例ToolRegistry._tools字典中。若不同 toolset 之间出现名称冲突会记录警告后注册的条目覆盖前者。发现机制discover_builtin_tools()当model_tools.py被导入时会调用tools/registry.py中的discover_builtin_tools()。该函数使用 AST 解析扫描所有tools/*.py文件找出包含顶层registry.register()调用的模块然后导入它们# tools/registry.py简化版 def discover_builtin_tools(tools_dirNone): tools_path Path(tools_dir) if tools_dir else Path(__file__).parent for path in sorted(tools_path.glob(*.py)): if path.name in {__init__.py, registry.py, mcp_tool.py}: continue if _module_registers_tools(path): # AST 检查顶层 registry.register() importlib.import_module(ftools.{path.stem})这种自动发现机制意味着新工具文件会被自动识别——无需手动维护列表。AST 检查只匹配顶层的registry.register()调用不匹配函数内部的调用因此tools/中的辅助模块不会被导入。每次导入都会触发模块的registry.register()调用。可选工具中的错误例如图像生成工具缺少fal_client会被捕获并记录——不会阻止其他工具加载。核心工具发现完成后还会发现 MCP 工具和插件工具MCP 工具—tools.mcp_tool.discover_mcp_tools()读取 MCP 服务器配置并注册来自外部服务器的工具。插件工具—hermes_cli.plugins.discover_plugins()加载用户/项目/pip 插件这些插件可能注册额外的工具。工具可用性检查check_fn每个工具可以选择性地提供一个check_fn——一个可调用对象在工具可用时返回True否则返回False。典型的检查包括API 密钥是否存在— 例如lambda: bool(os.environ.get(SERP_API_KEY))用于网络搜索服务是否运行— 例如检查 Honcho 服务器是否已配置二进制文件是否已安装— 例如验证浏览器工具的playwright是否可用当registry.get_definitions()为模型构建 schema 列表时会运行每个工具的check_fn()# 简化自 registry.py if entry.check_fn: try: available bool(entry.check_fn()) except Exception: available False # 异常 不可用 if not available: continue # 完全跳过该工具关键行为检查结果按调用缓存——若多个工具共享同一个check_fn只运行一次。check_fn()中的异常被视为不可用故障安全。is_toolset_available()方法检查某个 toolset 的check_fn是否通过用于 UI 显示和 toolset 解析。Toolset 解析Toolset 是工具的命名集合。Hermes 通过以下方式解析它们显式启用/禁用的 toolset 列表平台预设hermes-cli、hermes-telegram等动态 MCP toolset精选的特殊用途集合如hermes-acpget_tool_definitions()如何过滤工具主入口点为model_tools.get_tool_definitions(enabled_toolsets, disabled_toolsets, quiet_mode)若提供了enabled_toolsets— 仅包含这些 toolset 中的工具。每个 toolset 名称通过resolve_toolset()解析将复合 toolset 展开为单个工具名称。若提供了disabled_toolsets— 从所有 toolset 开始减去已禁用的。若两者均未提供— 包含所有已知 toolset。注册表过滤— 解析后的工具名称集合传递给registry.get_definitions()后者应用check_fn过滤并返回 OpenAI 格式的 schema。动态 schema 修补— 过滤后execute_code和browser_navigate的 schema 会被动态调整仅引用实际通过过滤的工具防止模型幻觉出不可用的工具。旧版 toolset 名称带有_tools后缀的旧版 toolset 名称例如web_tools、terminal_tools通过_LEGACY_TOOLSET_MAP映射到其现代工具名称以保持向后兼容性。调度运行时工具通过中央注册表调度但部分 agent 级别的工具如 memory/todo/session-search 处理由 agent 循环直接处理。调度流程模型 tool_call → handler 执行当模型返回tool_call时流程如下模型响应包含 tool_call ↓ run_agent.py agent 循环 ↓ model_tools.handle_function_call(name, args, task_id, user_task) ↓ [Agent 循环工具] → 由 agent 循环直接处理todo、memory、session_search、delegate_task ↓ [插件 pre-hook] → invoke_hook(pre_tool_call, ...) ↓ registry.dispatch(name, args, **kwargs) ↓ 按名称查找 ToolEntry ↓ [异步 handler] → 通过 _run_async() 桥接 [同步 handler] → 直接调用 ↓ 返回结果字符串或 JSON 错误 ↓ [插件 post-hook] → invoke_hook(post_tool_call, ...)错误包装所有工具执行在两个层级进行错误处理registry.dispatch()— 捕获 handler 抛出的任何异常并以 JSON 形式返回{error: Tool execution failed: ExceptionType: message}。handle_function_call()— 将整个调度包裹在次级 try/except 中返回{error: Error executing tool_name: message}。这确保模型始终收到格式正确的 JSON 字符串而不会遇到未处理的异常。Agent 循环工具以下四个工具在注册表调度之前被拦截因为它们需要 agent 级别的状态TodoStore、MemoryStore 等todo— 规划/任务跟踪memory— 持久化 memory 写入session_search— 跨会话召回delegate_task— 生成子 agent 会话这些工具的 schema 仍在注册表中注册供get_tool_definitions使用但若调度以某种方式直接到达它们其 handler 会返回一个存根错误。异步桥接当工具 handler 为异步时_run_async()将其桥接到同步调度路径CLI 路径无运行中的事件循环— 使用持久化事件循环以保持缓存的异步客户端存活Gateway 路径有运行中的事件循环— 使用asyncio.run()启动一个一次性线程工作线程并行工具— 使用存储在线程本地存储中的每线程持久化循环DANGEROUS_PATTERNS 审批流程终端工具集成了定义在tools/approval.py中的危险命令审批系统模式检测—DANGEROUS_PATTERNS是一个(regex, description)元组列表涵盖破坏性操作递归删除rm -rf文件系统格式化mkfs、ddSQL 破坏性操作DROP TABLE、不带WHERE的DELETE FROM系统配置覆写 /etc/服务操控systemctl stop远程代码执行curl | shFork bomb、进程终止等检测— 在执行任何终端命令之前detect_dangerous_command(command)会对所有模式进行检查。审批提示— 若发现匹配CLI 模式— 交互式提示要求用户批准、拒绝或永久允许Gateway 模式— 异步审批回调将请求发送至消息平台智能审批— 可选地辅助 LLM 可自动批准匹配模式但风险较低的命令例如rm -rf node_modules/是安全的但匹配递归删除模式会话状态— 审批按会话跟踪。一旦在某个会话中批准了递归删除后续的rm -rf命令不会再次提示。永久允许列表— 永久允许选项会将该模式写入config.yaml的command_allowlist跨会话持久化。终端/运行时环境终端系统支持多种后端localdockersshsingularitymodaldaytona还支持按任务的 cwd 覆盖后台进程管理PTY 模式危险命令的审批回调并发工具调用可以顺序执行也可以并发执行具体取决于工具组合和交互需求。工具注册源码# ------------------------------------------------------------------ # Registration # ------------------------------------------------------------------ def register( self, name: str, toolset: str, schema: dict, handler: Callable, check_fn: Callable None, requires_env: list None, is_async: bool False, description: str , emoji: str , max_result_size_chars: int | float | None None, dynamic_schema_overrides: Callable None, override: bool False, ): Register a tool. Called at module-import time by each tool file. overrideTrue is an explicit opt-in for plugins that intend to replace an existing built-in tool implementation (e.g. swap the default browser tool for a headed-Chrome CDP backend). Without it, registrations that would shadow an existing tool from a different toolset are rejected to prevent accidental overwrites. with self._lock: existing self._tools.get(name) if existing and existing.toolset ! toolset: # Allow MCP-to-MCP overwrites (legitimate: server refresh, # or two MCP servers with overlapping tool names). both_mcp ( existing.toolset.startswith(mcp-) and toolset.startswith(mcp-) ) if both_mcp: logger.debug( Tool %s: MCP toolset %s overwriting MCP toolset %s, name, toolset, existing.toolset, ) elif override: # Explicit plugin opt-in: replace the existing tool. # Logged at INFO so the override is auditable in agent.log. logger.info( Tool %s: toolset %s overriding existing toolset %s (overrideTrue opt-in), name, toolset, existing.toolset, ) else: # Reject shadowing — prevent plugins/MCP from overwriting # built-in tools or vice versa. logger.error( Tool registration REJECTED: %s (toolset %s) would shadow existing tool from toolset %s. Pass overrideTrue to register() if the replacement is intentional, or deregister the existing tool first., name, toolset, existing.toolset, ) return self._tools[name] ToolEntry( namename, toolsettoolset, schemaschema, handlerhandler, check_fncheck_fn, requires_envrequires_env or [], is_asyncis_async, descriptiondescription or schema.get(description, ), emojiemoji, max_result_size_charsmax_result_size_chars, dynamic_schema_overridesdynamic_schema_overrides, ) if check_fn and toolset not in self._toolset_checks: self._toolset_checks[toolset] check_fn self._generation 1工具发现源码def _module_registers_tools(module_path: Path) - bool: Return True when the module contains a top-level registry.register(...) call. Only inspects module-body statements so that helper modules which happen to call registry.register() inside a function are not picked up. try: source module_path.read_text(encodingutf-8) tree ast.parse(source, filenamestr(module_path)) except (OSError, SyntaxError): return False return any(_is_registry_register_call(stmt) for stmt in tree.body) def discover_builtin_tools(tools_dir: Optional[Path] None) - List[str]: Import built-in self-registering tool modules and return their module names. tools_path Path(tools_dir) if tools_dir is not None else Path(__file__).resolve().parent module_names [ ftools.{path.stem} for path in sorted(tools_path.glob(*.py)) if path.name not in {__init__.py, registry.py, mcp_tool.py} and _module_registers_tools(path) ] imported: List[str] [] for mod_name in module_names: try: importlib.import_module(mod_name) imported.append(mod_name) except Exception as e: logger.warning(Could not import tool module %s: %s, mod_name, e) return imported工具执行源码# ------------------------------------------------------------------ # Dispatch # ------------------------------------------------------------------ def dispatch(self, name: str, args: dict, **kwargs) - str: Execute a tool handler by name. * Async handlers are bridged automatically via _run_async(). * All exceptions are caught and returned as {error: ...} for consistent error format. entry self.get_entry(name) if not entry: return json.dumps({error: fUnknown tool: {name}}) try: if entry.is_async: from model_tools import _run_async return _run_async(entry.handler(args, **kwargs)) return entry.handler(args, **kwargs) except Exception as e: logger.exception(Tool %s dispatch error: %s, name, e) # Route through the sanitizer so framing tokens / CDATA / fences # in exception strings dont reach the model as structural noise. # See model_tools._sanitize_tool_error for rationale. raw fTool execution failed: {type(e).__name__}: {e} try: from model_tools import _sanitize_tool_error sanitized _sanitize_tool_error(raw) except Exception: sanitized raw # defensive: never let the sanitizer block error propagation return json.dumps({error: sanitized}) def _run_async(coro): Run an async coroutine from a sync context. If the current thread already has a running event loop (e.g., inside the gateways async stack or Atroposs event loop), we spin up a disposable thread so asyncio.run() can create its own loop without conflicting. For the common CLI path (no running loop), we use a persistent event loop so that cached async clients (httpx / AsyncOpenAI) remain bound to a live loop and dont trigger Event loop is closed on GC. When called from a worker thread (parallel tool execution), we use a per-thread persistent loop to avoid both contention with the main threads shared loop AND the Event loop is closed errors caused by asyncio.run()s create-and-destroy lifecycle. This is the single source of truth for sync-async bridging in tool handlers. Each handler is self-protecting via this function. try: loop asyncio.get_running_loop() except RuntimeError: loop None if loop and loop.is_running(): # Inside an async context (gateway, RL env) — run in a fresh thread # with its own event loop we own a reference to, so on timeout we # can cancel the task inside that loop (ThreadPoolExecutor.cancel() # only works on not-yet-started futures — its a no-op on a running # worker, which previously leaked the thread on every 300 s timeout). import concurrent.futures worker_loop: Optional[asyncio.AbstractEventLoop] None loop_ready threading.Event() def _run_in_worker(): nonlocal worker_loop worker_loop asyncio.new_event_loop() loop_ready.set() try: asyncio.set_event_loop(worker_loop) return worker_loop.run_until_complete(coro) finally: try: # Cancel anything still pending (e.g. task cancelled # externally via call_soon_threadsafe on timeout). pending asyncio.all_tasks(worker_loop) for t in pending: t.cancel() if pending: worker_loop.run_until_complete( asyncio.gather(*pending, return_exceptionsTrue) ) except Exception: pass worker_loop.close() pool concurrent.futures.ThreadPoolExecutor(max_workers1) future pool.submit(_run_in_worker) try: return future.result(timeout300) except concurrent.futures.TimeoutError: # Cancel the coroutine inside its own loop so the worker thread # can wind down instead of running forever. if loop_ready.wait(timeout1.0) and worker_loop is not None: try: for t in asyncio.all_tasks(worker_loop): worker_loop.call_soon_threadsafe(t.cancel) except RuntimeError: # Loop already closed — nothing to cancel. pass raise finally: # waitFalse: dont block the caller on a stuck coroutine. Weve # already requested cancellation above; the worker will exit # once the coroutine observes it (usually at the next await). pool.shutdown(waitFalse) # If were on a worker thread (e.g., parallel tool execution in # delegate_task), use a per-thread persistent loop. This avoids # contention with the main threads shared loop while keeping cached # httpx/AsyncOpenAI clients bound to a live loop for the threads # lifetime — preventing Event loop is closed on GC cleanup. if threading.current_thread() is not threading.main_thread(): worker_loop _get_worker_loop() return worker_loop.run_until_complete(coro) tool_loop _get_tool_loop() return tool_loop.run_until_complete(coro)