基于Agora与AssemblyAI构建高精度实时语音转录机器人

基于Agora与AssemblyAI构建高精度实时语音转录机器人 1. 项目概述构建一个高精度、低延迟的实时转录机器人在构建实时语音交互应用时一个常见的需求是如何在不干扰主会话的情况下高质量地记录并转写会议或直播中每位参与者的发言传统的方案要么精度不够要么延迟太高要么无法区分说话人。今天我将分享一个我们团队在实际项目中打磨出来的解决方案利用 Agora 的 Python Server SDK 和 AssemblyAI 的 Universal-3 Pro 流式 API构建一个“静默观察者”式的实时转录机器人。这个机器人的核心工作流程非常清晰它像一个隐形的会议秘书以“观众”身份加入 Agora 的音视频频道订阅并接收每个参会者的原始音频流PCM 格式然后实时地将这些音频流推送给 AssemblyAI 进行转写。AssemblyAI 的 Universal-3 Pro 模型不仅转写准确率高还自带说话人分离Speaker Diarization功能能自动区分“谁在什么时候说了什么”。最终结构清晰的逐句转录文本会实时输出你可以轻松地将它们送入数据库、大语言模型或触发其他业务流程。为什么选择这个技术栈关键在于“对齐”与“专精”。Agora Server SDK 让服务器端程序能直接获取纯净的、未经压缩的原始 PCM 音频帧这恰好是 AssemblyAI 流式 API 所期望的输入格式。这种“端到端”的原始数据对接避免了不必要的音频编解码和格式转换是实现超低延迟转录的基石。相比之下许多内置或通用的语音转文本方案在实时性、准确率和多语言支持上往往难以兼顾。2. 核心架构与选型逻辑解析2.1 为什么是 Agora AssemblyAI 的组合在决定技术栈时我们对比了多种方案。最终选择 Agora Python Server SDK 与 AssemblyAI Universal-3 Pro Streaming 的组合是基于以下几个核心考量音频数据管道的纯净性Agora 的subscribe_all_audio配合set_playback_audio_frame_before_mixing_parameters方法可以直接获取到每个用户的、未经混音的原始 PCM 音频流。这意味着机器人听到的是每个参会者独立的“干声”没有房间混响也没有与其他说话人的声音混合。这对于后续的语音识别和说话人分离至关重要能极大提升准确率。协议与格式的天然对齐AssemblyAI 的流式 WebSocket API 接收的是单声道、16kHz 采样率、16位深的 PCM 数据。而 Agora SDK 可以精确地按此规格输出音频帧。这种“开箱即用”的兼容性省去了我们使用 FFmpeg 等工具进行实时转码的复杂性和额外延迟。每一帧从 Agora 接收后几乎可以直接塞进 WebSocket 发送出去。性能指标的显著优势根据官方基准测试和我们自己的实测AssemblyAI Universal-3 Pro 在关键指标上表现突出指标AssemblyAI Universal-3 Pro典型内置/通用 STT 方案P50 延迟~307 毫秒~600-900 毫秒词错误率 (WER)8.9%14%-18%实时说话人分离✅ 支持❌ 通常不支持或非实时支持语言99 种有限通常20种对于需要实时字幕、会议纪要或内容审核的场景低于 500 毫秒的延迟意味着字幕与语音几乎是同步的。而低于 10% 的词错误率使得转录结果基本无需大量人工修正即可使用。实时说话人分离更是锦上添花它自动为每段话打上了“说话人 ID”的标签让后续的分析和处理如区分不同客户的发言变得非常简单。2.2 系统组件与数据流整个系统的运行依赖于几个核心组件的高效协作Agora 服务端连接作为机器人本体在服务端初始化 Agora 服务并以“观众”角色加入目标频道。它不发布任何音视频流只进行订阅。音频帧捕获与分发器监听频道内的用户加入/离开事件。每当有新用户加入就为其创建一个独立的音频帧捕获流和一个对应的 AssemblyAI WebSocket 连接。这是一个典型的一对一映射关系。AssemblyAI 流式转录引擎每个用户的音频流通过一个独立的、持久的 WebSocket 连接推送给 AssemblyAI。该服务实时处理音频流并回传包含说话人标签、标点符号和逐句结束标记的 JSON 格式转录结果。结果处理器接收并解析 AssemblyAI 返回的Turn事件代表一个完整的说话轮次。这个节点是系统的输出口你可以在这里编写逻辑将结构化的转录文本发送到任何你需要的地方——写入数据库、推送给 LLM 生成摘要、或触发业务工作流。注意这里采用“每个用户一个 WebSocket 连接”的设计而非将所有用户音频混音后用一个连接处理。这样做虽然连接数增多但保证了每个音频流的独立性和纯净度使得 AssemblyAI 的说话人分离模型能发挥最佳效果也避免了混音可能带来的语音质量下降和说话人混淆问题。3. 环境准备与项目初始化3.1 前置条件与账号配置在开始写代码之前你需要准备好以下三样东西这就像厨师下锅前的“备菜”环节Python 环境确保你的系统安装了 Python 3.9 或更高版本。我推荐使用pyenv或conda来管理 Python 版本为这个项目创建一个独立的虚拟环境避免依赖冲突。Agora 开发者账号前往 Agora 控制台注册并创建一个项目。创建成功后你会获得两个关键凭证App ID项目的唯一标识符。App Certificate用于生成安全令牌Token的密钥。请务必像保管密码一样保管它不要泄露到客户端代码中。AssemblyAI 账号前往 AssemblyAI 官网注册并在控制台获取你的API Key。Universal-3 Pro 模型是他们的旗舰产品你需要确保账户有足够的额度或已开通相应套餐。3.2 项目骨架搭建与依赖安装拿到所有“食材”后我们开始搭建厨房。首先从 GitHub 克隆项目模板这是一个已经搭好基础框架的起点能让我们避开很多初始配置的坑。# 克隆项目仓库到本地 git clone https://github.com/kelseyefoster/voice-agent-agora-universal-3-pro # 进入项目目录 cd voice-agent-agora-universal-3-pro # 安装所有必需的 Python 依赖包 pip install -r requirements.txt接下来是关键的一步配置环境变量。项目根目录下有一个.env.example文件我们需要复制它并填入自己的凭证。# 复制环境变量示例文件 cp .env.example .env然后用文本编辑器打开新创建的.env文件内容如下你需要替换掉your_xxx部分# .env 文件内容 AGORA_APP_IDyour_agora_app_id AGORA_APP_CERTyour_agora_certificate ASSEMBLYAI_API_KEYyour_assemblyai_api_key AGORA_CHANNELmy-channel AGORA_BOT_UID9999AGORA_APP_ID和AGORA_APP_CERT填入从 Agora 控制台获取的值。ASSEMBLYAI_API_KEY填入从 AssemblyAI 控制台获取的 API 密钥。AGORA_CHANNEL你的机器人将要加入的频道名称。频道是 Agora 中音视频会话的逻辑隔离单元所有加入同一频道名的用户才能互通。AGORA_BOT_UID为你的机器人指定一个数字用户 ID。确保这个 ID 在频道内是唯一的不要与其他真实用户冲突。通常可以使用一个较大的数字如 9999。实操心得我强烈建议在开发阶段使用临时的频道名如test-channel-123和固定的 Bot UID。将.env文件添加到.gitignore中确保密钥不会意外提交到代码仓库。对于团队协作可以使用类似dotenv的方式在部署时注入这些变量。4. 核心代码实现与逐行解析完成环境配置后我们深入到bot.py的核心代码中。我将分模块拆解并解释每一段代码的意图和关键细节。4.1 初始化 Agora 服务并加入频道机器人的第一步是“进入房间”。我们使用 Agora 的 Python Server SDK 来初始化一个服务实例并以“观众”身份加入指定的频道。import asyncio import json import os from dotenv import load_dotenv import websockets from agora.rtc.agora_service import AgoraService, AgoraServiceConfig from agora.rtc.rtc_connection import RTCConnConfig from agora.rtc.agora_base import ClientRoleType, ChannelProfileType, AudioScenarioType # 加载环境变量 load_dotenv() async def main(): # 从环境变量读取配置 AGORA_APP_ID os.getenv(AGORA_APP_ID) AGORA_APP_CERT os.getenv(AGORA_APP_CERT) ASSEMBLYAI_API_KEY os.getenv(ASSEMBLYAI_API_KEY) AGORA_CHANNEL os.getenv(AGORA_CHANNEL, default-channel) AGORA_BOT_UID int(os.getenv(AGORA_BOT_UID, 9999)) # 1. 配置并初始化 Agora 服务 cfg AgoraServiceConfig() cfg.appid AGORA_APP_ID cfg.enable_audio_processor True # 启用音频处理模块 cfg.audio_scenario AudioScenarioType.AUDIO_SCENARIO_CHORUS # 适用于语音场景 service AgoraService() service.initialize(cfg) # 2. 创建连接配置以观众角色加入直播频道 conn_cfg RTCConnConfig( client_role_typeClientRoleType.CLIENT_ROLE_AUDIENCE, channel_profileChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING, ) connection service.create_rtc_connection(conn_cfg) # 3. 生成 Token 并连接频道 (生产环境需使用动态Token此处为演示简化) # 注意在安全的生产环境中Token应在服务端动态生成此处直接使用无Token连接仅用于测试。 token None # 对于未开启鉴权的项目可暂时为None。生产环境务必使用下面章节的方法生成。 connection.connect(token, AGORA_CHANNEL, str(AGORA_BOT_UID)) print(fBot (UID:{AGORA_BOT_UID}) joined channel: {AGORA_CHANNEL})关键点解析ClientRoleType.CLIENT_ROLE_AUDIENCE这是机器人的关键身份。设置为“观众”意味着它只接收流不发送流因此不会占用上行带宽也不会被其他用户看到或听到。ChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING频道模式设为“直播”。在这种模式下观众角色是明确被支持的且通信模式更符合我们“一对多收听”的场景。AudioScenarioType.AUDIO_SCENARIO_CHORUS音频场景设置为“合唱”。这个场景针对多人语音交谈进行了优化能提供更清晰的语音质量非常适合会议转录。token在测试或未开启频道鉴权的项目中可以暂时为None。但在任何生产环境中都必须使用有效的、有时效性的 Token 来连接这是安全保障的第一道门。我们会在后面章节详细讲如何生成 Token。4.2 配置音频输出与订阅所有音频成功加入频道后我们需要告诉 Agora SDK“请把所有人的原始音频以我指定的格式送给我。”# 4. 获取本地用户即机器人自身的频道对象 agora_channel connection.get_local_user() # 5. 关键步骤在订阅音频前设置期望的原始音频帧参数 # 这步至关重要它避免了SDK内部的二次重采样减少延迟和性能开销。 agora_channel.set_playback_audio_frame_before_mixing_parameters( num_of_channels1, # 单声道AssemblyAI要求 sample_rate16000, # 16kHz采样率AssemblyAI要求 ) # 6. 订阅频道内所有用户的音频流 agora_channel.subscribe_all_audio() print(Subscribed to all audio in the channel.)为什么要在订阅前设置参数set_playback_audio_frame_before_mixing_parameters这个方法的名字很长但作用很明确它在你调用subscribe_all_audio之前预先定义了你希望收到的“原始音频帧”的格式。如果你在订阅后才设置SDK 可能已经以默认格式例如 48kHz 立体声开始向你传递数据了这时你再修改格式SDK 内部就需要进行实时重采样这会增加不必要的 CPU 消耗和微小的延迟。提前声明“我要 16kHz 单声道”SDK 就会在音频处理流水线的最初环节直接按此格式输出效率最高。执行完这一步后你就可以通过agora_channel.get_audio_frames(uid)这个异步生成器按帧获取指定用户纯净的 PCM 数据了。每一帧都将是 16 位深度、小端序、16kHz 采样率的单声道 PCM与 AssemblyAI 的输入要求严丝合缝。4.3 为每位用户建立独立的 AssemblyAI 转录流这是系统的核心桥梁。我们为频道内的每个用户创建一个独立的任务该任务负责两件事1) 从 Agora 拉取该用户的音频帧2) 通过一个专属的 WebSocket 连接推送给 AssemblyAI 并接收转录结果。# AssemblyAI 流式 WebSocket 端点 AAI_WS_URL ( wss://streaming.assemblyai.com/v3/ws f?sample_rate16000 speech_modelu3-rt-pro # 指定使用 Universal-3 Pro 实时模型 format_turnstrue # 请求以“说话轮次”的格式返回结果便于处理 ) async def stream_participant(agora_channel, uid: int, api_key: str): 处理单个用户的音频流转录 headers {Authorization: api_key} try: async with websockets.connect(AAI_WS_URL, additional_headersheaders) as ws: # 连接建立后AssemblyAI 会发送一个 Session Begins 消息 session_start json.loads(await ws.recv()) print(f[UID:{uid}] Transcription session started: {session_start[id]}) async def send_audio(): 从Agora读取音频帧并发送到AssemblyAI async for frame in agora_channel.get_audio_frames(uid): # frame.data 就是符合格式的 PCM 字节数据 if ws.open: await ws.send(frame.data) else: break async def recv_transcripts(): 接收并处理AssemblyAI返回的转录消息 async for message in ws: event json.loads(message) # 我们关注 Turn 类型的事件且一个完整的说话轮次结束时会有 end_of_turn 标记 if event[type] Turn and event.get(end_of_turn): speaker event.get(speaker, Unknown) transcript_text event[transcript] # 打印到控制台这里就是你的输出点 print(f[UID:{uid}, Speaker:{speaker}] {transcript_text}) # 在此处可以添加自定义逻辑存入数据库、调用LLM、触发Webhook等 # 并行执行发送和接收任务 await asyncio.gather(send_audio(), recv_transcripts()) except websockets.exceptions.ConnectionClosedOK: print(f[UID:{uid}] WebSocket connection closed normally.) except Exception as e: print(f[UID:{uid}] Error in transcription stream: {e})代码逻辑深度解读WebSocket 连接参数speech_modelu3-rt-pro明确指定使用 Universal-3 Pro 实时模型这是高准确率和低延迟的保证。format_turnstrue参数让 AssemblyAI 以“轮次”为单位返回结果每个Turn事件代表一个相对完整的说话片段通常以自然停顿为界并且end_of_turn为True时表示这个片段结束。这种结构比原始的流式字词word-by-word输出更易于后续处理。双任务并行asyncio.gather(send_audio(), recv_transcripts())是异步编程的经典模式。send_audio是一个永不停止的循环直到连接断开不断从 Agora 拉取音频帧并发送recv_transcripts也是一个永不停止的循环持续监听 WebSocket 返回的消息。这两个 IO 密集型任务被asyncio高效地调度互不阻塞。错误处理WebSocket 连接可能因为网络问题、服务端重启或主动终止而关闭。用try...except包裹连接和循环可以保证当一个用户的转录流出现故障时不会导致整个机器人崩溃只会影响该用户。4.4 动态管理用户加入与离开一个真实的频道用户是动态进出的。我们需要监听这些事件并相应地创建或销毁转录流。# 用于存储活跃用户及其转录任务的字典 active_streams: dict[int, asyncio.Task] {} def on_user_joined(uid: int): 当新用户加入频道时调用 print(fUser joined: {uid}) if uid AGORA_BOT_UID: return # 忽略机器人自己 # 为这个新用户创建一个独立的转录任务 task asyncio.create_task(stream_participant(agora_channel, uid, ASSEMBLYAI_API_KEY)) active_streams[uid] task def on_user_left(uid: int, reason: int): 当用户离开频道时调用 print(fUser left: {uid}, reason: {reason}) if uid in active_streams: # 取消该用户的转录任务 active_streams[uid].cancel() try: # 等待任务被正式取消避免资源泄露 asyncio.create_task(active_streams[uid]) except asyncio.CancelledError: pass del active_streams[uid] # 向Agora连接注册事件回调函数 connection.register_observer_callback(on_user_joined, on_user_joined) connection.register_observer_callback(on_user_offline, on_user_left) # 注意SDK中可能叫on_user_offline注意事项过滤自身在on_user_joined中一定要判断加入的用户 UID 是否等于机器人自身的AGORA_BOT_UID。如果不过滤机器人会尝试订阅自己的音频流这通常会导致错误或无意义的数据。资源清理当用户离开时除了从字典中移除任务引用更重要的是调用task.cancel()来通知异步框架终止这个任务。这会让stream_participant函数中的websockets.connect上下文管理器正常退出关闭 WebSocket 连接避免连接泄露。回调函数名不同版本的 Agora SDK 中用户离开事件的回调函数名可能略有差异如on_user_offline或on_user_left。需要查阅你所用版本的 SDK 文档来确定准确的名称。4.5 优雅地关闭与清理当我们需要停止机器人时例如收到终止信号不能直接退出必须进行优雅的清理关闭所有连接取消所有任务。import signal shutdown_event asyncio.Event() def signal_handler(): print(\nShutdown signal received.) shutdown_event.set() # 注册信号处理如CtrlC loop asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, signal_handler) print(Bot is running. Press CtrlC to stop.) try: # 主循环等待关机事件 await shutdown_event.wait() finally: print(Shutting down...) # 1. 取消所有用户的转录任务 for uid, task in active_streams.items(): task.cancel() if active_streams: await asyncio.gather(*active_streams.values(), return_exceptionsTrue) # 2. 断开Agora连接 connection.disconnect() # 3. 释放Agora服务资源 service.release() print(Bot shutdown complete.)优雅关闭的重要性直接杀死进程可能导致 AssemblyAI 那边的 WebSocket 连接没有发送终止信号服务器端可能认为连接异常中断未处理完的音频数据可能无法生成最终的转录片段。虽然我们的简单示例没有在关闭时向 AssemblyAI 发送Terminate消息如原始代码片段所示但在生产环境中为每个 WebSocket 连接发送一个{type: Terminate}的 JSON 消息是一个好习惯这能通知 AssemblyAI 最终化处理并返回一个包含总处理时长等信息的Termination事件。5. 生产环境关键配置与安全实践将代码从本地测试推向生产环境有几个关键步骤不能忽视它们关系到系统的稳定性和安全性。5.1 安全地生成与使用 Agora Token在测试时我们可能使用了未开启鉴权的 App ID。在生产中必须开启频道鉴权并为每次连接动态生成 Token。Token 是一种临时的、可撤销的访问凭证包含了 App ID、频道名、用户 ID、过期时间等信息并使用 App Certificate 进行签名。# 安装 Token 生成工具包 # pip install agora-token-builder from agora_token_builder import RtcTokenBuilder, Role_Subscriber import time def generate_bot_token(app_id: str, app_certificate: str, channel_name: str, uid: int, expire_in_seconds: int 3600) - str: 生成用于机器人加入频道的 Token。 参数: app_id: Agora 项目的 App ID。 app_certificate: Agora 项目的 App Certificate。 channel_name: 要加入的频道名称。 uid: 机器人的用户 ID。 expire_in_seconds: Token 的有效期默认1小时。根据安全要求调整。 返回: 生成的 Token 字符串。 # 计算 Token 过期的时间戳当前时间 有效期 expire_time int(time.time()) expire_in_seconds # 注意对于观众角色我们使用 Role_Subscriber token RtcTokenBuilder.buildTokenWithUid( app_idapp_id, app_certificateapp_certificate, channel_namechannel_name, uiduid, roleRole_Subscriber, # 关键订阅者角色对应 CLIENT_ROLE_AUDIENCE privilege_expired_tsexpire_time ) return token # 在 main 函数中替换掉 token None token generate_bot_token(AGORA_APP_ID, AGORA_APP_CERT, AGORA_CHANNEL, AGORA_BOT_UID) connection.connect(token, AGORA_CHANNEL, str(AGORA_BOT_UID))安全要点Token 必须在服务端生成绝对不要将App Certificate存放在客户端如网页、移动端 App代码中。Token 的生成必须在你的服务器后端进行。设置合理的有效期对于长时间运行的机器人Token 有效期可以设置得较长如 24 小时但同时你需要在代码中实现 Token 的轮换机制在 Token 过期前重新生成并调用connection.renew_token()方法更新连接。使用正确的角色Role_Subscriber对应的是CLIENT_ROLE_AUDIENCE确保 Token 的权限与连接配置一致。5.2 处理网络波动与重连生产环境的网络是不稳定的。WebSocket 连接可能断开Agora 连接也可能出现闪断。健壮的代码必须具备重连能力。async def resilient_stream_participant(agora_channel, uid: int, api_key: str, max_retries5): 具备重连能力的用户转录流处理函数 retry_count 0 base_delay 1 # 初始重连延迟1秒 while retry_count max_retries: try: await stream_participant(agora_channel, uid, api_key) # 调用之前的核心函数 # 如果 stream_participant 正常退出如用户离开则跳出循环 break except (websockets.exceptions.ConnectionClosedError, ConnectionResetError) as e: retry_count 1 if retry_count max_retries: print(f[UID:{uid}] Max retries ({max_retries}) reached. Giving up.) break delay base_delay * (2 ** (retry_count - 1)) # 指数退避 print(f[UID:{uid}] Connection lost. Retrying ({retry_count}/{max_retries}) in {delay}s...) await asyncio.sleep(delay) except asyncio.CancelledError: # 任务被外部取消如用户离开正常退出 print(f[UID:{uid}] Transcription task cancelled.) break except Exception as e: print(f[UID:{uid}] Unexpected error: {e}. Stopping transcription for this user.) break指数退避策略这是网络编程中标准的重连策略。每次重连失败后等待时间翻倍1s, 2s, 4s...避免在服务短暂故障时发起海量重连请求给服务器造成压力。5.3 性能优化与资源管理当频道内有数十甚至上百个用户时为每个人维持一个 WebSocket 连接和两个异步任务可能会消耗大量内存和网络资源。连接池管理考虑实现一个 AssemblyAI WebSocket 连接池。对于说话不频繁的场景如大型讲座可以设计一种机制在用户静默一段时间后暂停其音频流推送但保持 WebSocket 连接存活以复用减少频繁建连的开销。这需要更精细的状态管理。异步任务限制使用asyncio.Semaphore来限制同时运行的最大转录任务数量防止突发大量用户加入时耗尽系统资源。日志与监控集成像structlog或loguru这样的日志库为不同级别INFO, WARNING, ERROR和不同用户UID输出结构化日志。同时可以定期向监控系统如 Prometheus上报活跃连接数、音频帧处理延迟、转录错误率等指标。6. 扩展应用场景与下游集成转录文本的实时输出只是一个开始。stream_participant函数中打印转录结果的那一行print(f[UID:{uid}, Speaker:{speaker}] {transcript_text})是整个系统的价值溢出点。你可以在这里插入任何业务逻辑。6.1 实时会议纪要生成将转录文本实时存入数据库如 PostgreSQL, MongoDB并附上时间戳、说话人、频道 ID 等元数据。这样会议一结束一份完整的、带说话人标签的逐字稿就自动生成了。# 示例使用异步数据库驱动存入 PostgreSQL import asyncpg async def save_to_database(uid, speaker, transcript, channel, turn_start, turn_end): conn await asyncpg.connect(DATABASE_URL) await conn.execute( INSERT INTO meeting_transcripts (channel_id, user_id, speaker_label, content, start_time, end_time) VALUES ($1, $2, $3, $4, $5, $6) , channel, uid, speaker, transcript, turn_start, turn_end) await conn.close() # 在收到 Turn 事件时调用 # event 中包含 start, end 时间戳毫秒 await save_to_database(uid, speaker, transcript_text, AGORA_CHANNEL, event[start], event[end])6.2 驱动实时 AI 助手与互动将每个用户说完的一句话即一个end_of_turn为 True 的片段实时发送给大语言模型如通过 OpenAI GPT, Claude, 或本地部署的 Llama 接口可以实现强大的实时交互功能。实时问答在培训场景中学员提问AI 助手实时解析问题并从知识库中生成答案再由主持人或 TTS 播报出来。实时摘要在长时间会议中每 5 分钟或每当一个议题结束时将期间的对话文本发送给 LLM生成阶段性摘要并投射到共享屏幕上。实时翻译将转录的英文文本实时发送给翻译 API并输出目标语言的字幕。合规与安全监控实时分析转录文本检测是否包含敏感词、违规内容或特定关键词并即时告警。# 示例调用 OpenAI API 进行实时摘要 import openai openai.api_key os.getenv(OPENAI_API_KEY) async def generate_summary(transcript_segment): response await openai.ChatCompletion.acreate( modelgpt-3.5-turbo, messages[ {role: system, content: 你是一个会议助理请用一句话总结以下发言的核心内容。}, {role: user, content: transcript_segment} ], max_tokens100 ) return response.choices[0].message.content # 在收到 Turn 事件后调用 summary await generate_summary(transcript_text) print(f实时摘要: {summary})6.3 触发外部工作流将转录结果作为触发器通过 Webhook 推送给其他系统可以无缝集成到现有的工作流中。客服系统将客户与客服的对话实时转录并推送给 CRM自动生成工单或更新客户信息。协作工具将团队站会内容实时转录并发送到 Slack 或钉钉的特定频道形成异步记录。内容生产将直播或播客的音频实时转录为文字稿并自动发布到内容管理系统CMS的草稿箱。# 示例通过 Webhook 发送转录结果 import aiohttp async def post_webhook(webhook_url, data): async with aiohttp.ClientSession() as session: async with session.post(webhook_url, jsondata) as resp: if resp.status ! 200: print(fWebhook failed: {resp.status}) # 在收到 Turn 事件后调用 webhook_data { event: transcription_turn, channel: AGORA_CHANNEL, uid: uid, speaker: speaker, text: transcript_text, timestamp: time.time() } await post_webhook(https://your-workflow-server/webhook, webhook_data)7. 常见问题排查与调试技巧在实际部署和运行中你可能会遇到一些问题。这里记录了一些我们踩过的坑和解决方法。7.1 音频相关问题问题机器人加入频道后收不到任何音频或者转录结果一直是空的。检查用户角色确认机器人的client_role_type是CLIENT_ROLE_AUDIENCE观众。如果是CLIENT_ROLE_BROADCASTER主播但在频道中没有发布音频流可能会被其他用户忽略。检查频道模式确认channel_profile是CHANNEL_PROFILE_LIVE_BROADCASTING。在通信模式CHANNEL_PROFILE_COMMUNICATION下所有用户默认都是主播且没有明确的观众角色订阅逻辑可能不同。验证真实用户是否在发言确保频道内有其他用户并且他们的麦克风权限已开启正在正常说话。可以通过 Agora 控制台的“水晶球”监控工具查看频道状态。检查音频帧回调在stream_participant函数的send_audio循环内添加调试日志打印len(frame.data)。正常情况下每个帧的数据长度应该是固定的对于 16kHz 单声道一帧 20ms 的音频数据是 640 字节。如果一直是 0 或没有进入循环说明没有收到音频帧。问题转录结果乱码或全是无意义的单词。确认音频格式这是最常见的原因。双重检查set_playback_audio_frame_before_mixing_parameters的参数是否为sample_rate16000和num_of_channels1。任何不匹配都会导致 AssemblyAI 接收到错误格式的音频从而产生垃圾输出。检查音频数据内容写一小段调试代码将接收到的前几帧 PCM 数据保存为.raw文件然后用 Audacity 等音频软件以“16-bit PCM, 单声道16000Hz”的格式导入播放听一下是否是正常的语音。如果不是说明音频流源头或处理链路有问题。7.2 网络与连接问题问题WebSocket 连接频繁断开。检查防火墙和网络策略确保运行机器人的服务器可以访问wss://streaming.assemblyai.com:443。有些企业网络会限制出站连接。检查 AssemblyAI API Key 配额和状态登录 AssemblyAI 控制台确认 API Key 有效且未超出用量限制。过期的 Key 会导致连接被拒绝。实施重连机制如前文所述务必实现带有指数退避的重连逻辑。网络瞬时抖动是常态。问题高并发下部分用户的转录流延迟变高或不稳定。监控系统资源使用top,htop或asyncpg等工具监控机器人进程的 CPU 和内存使用情况。每个 WebSocket 连接和异步任务都有开销。如果用户数很多如 50可能需要升级服务器配置。优化异步循环确保你没有在异步任务中执行阻塞性的 CPU 密集型操作如复杂的同步计算。如果需要进行大量计算考虑使用asyncio.to_thread或run_in_executor将其放到单独的线程池中执行避免阻塞事件循环。分批处理如果所有用户同时说话瞬间的音频帧处理和网络发送压力会很大。可以考虑在send_audio循环中增加轻微的asyncio.sleep(0.001)来平滑发送速率但这会增加整体延迟需要权衡。7.3 部署与运维问题问题在 Docker 容器中运行失败提示 Agora SDK 相关错误。依赖库与系统环境Agora Python Server SDK 可能依赖某些特定的系统库如音频处理库。确保你的 Docker 镜像基于一个完整的 Linux 发行版如ubuntu:22.04并在 Dockerfile 中安装必要的系统包。参考 Agora 官方文档的安装要求。权限问题某些 SDK 可能需要访问/dev下的设备或具有特定的 Linux 能力capabilities。检查容器是否以特权模式运行或者是否缺少必要的挂载。问题如何监控机器人的健康状态心跳与健康检查为机器人程序添加一个简单的 HTTP 健康检查端点例如使用aiohttp创建一个简单的 web server返回当前活跃用户数、任务状态等。这样容器编排平台如 Kubernetes可以通过探针检查其存活状态。结构化日志将所有日志输出为 JSON 格式并包含uid,channel,session_id,error_type等字段。这样可以通过 ELKElasticsearch, Logstash, Kibana或 Loki 等日志聚合系统进行集中查询和告警。自定义指标使用prometheus_client库暴露一些指标如transcription_active_connections,audio_frame_receive_latency_ms,transcription_error_count。这些指标可以被 Prometheus 抓取并在 Grafana 中绘制成仪表盘直观展示系统运行状态。