[python]FastAPI + 自建SSE 踩坑全记录

[python]FastAPI + 自建SSE 踩坑全记录 1、什么是SSE服务一种服务端向客户端主动推送消息的协议适合用于服务端完成异步任务后主动向客户端推送消息。SSE 的优点浏览器原生支持 EventSource实现简单适合服务端单向推送不需要 WebSocket 那样的握手和协议控制2、技术背景后端使用pythonFastAPI前端使用vue3、后端实现代码services层sse_manage.py提供SSE的创建和底层功能。# app/services/sse_manager.py# 简单的 SSE 管理器支持多个客户端连接,用于向前端主动发送事件通知例如数据更新完成等importasyncioimportjsonimportloggingfromtypingimportDict,Setfromstarlette.responsesimportStreamingResponse loggerlogging.getLogger(__name__)classSSEManager:简单的 SSE 管理器支持多个客户端连接def__init__(self):# Dict[str, Set[asyncio.Queue]]: 事件类型 - 订阅该事件的客户端队列集合# Dict字典Set集合asyncio.Queue异步队列self._clients:Dict[str,Set[asyncio.Queue]]{}self.shutdown_eventasyncio.Event()# 用于优雅关闭asyncdefsubscribe(self,event_type:str)-asyncio.Queue: 订阅指定事件类型返回一个 asyncio.Queue 用于接收事件消息 - event_type: 事件类型字符串例如 data_update - 返回值: asyncio.Queue 对象客户端可以从中异步获取事件消息 - 注意调用方需要负责调用 unsubscribe 来取消订阅并清理资源 - 示例用法 queue await sse_manager.subscribe(data_update) while True: message await queue.get() # 处理消息例如发送给前端 queueasyncio.Queue()# 每个订阅者拥有一个独立的消息队列self._clients.setdefault(event_type,set()).add(queue)returnqueueasyncdefunsubscribe(self,event_type:str,queue:asyncio.Queue): 取消订阅指定事件类型移除对应的 asyncio.Queue - event_type: 事件类型字符串例如 data_update - queue: 之前 subscribe 返回的 asyncio.Queue 对象 - 注意调用方需要确保传入正确的 queue 对象否则可能无法正确取消订阅 ifevent_typeinself._clients:# discard方法会安全地移除元素如果元素不存在也不会抛出异常self._clients[event_type].discard(queue)asyncdefsend_event(self,event_type:str,data:dict): 向所有订阅了指定事件类型的客户端发送事件消息 - event_type: 事件类型字符串例如 data_update - data: 要发送的数据以字典形式提供会被转换为 JSON 字符串发送给客户端 - 注意如果没有订阅该事件类型的客户端则不会发送任何消息 ifevent_typenotinself._clients:return# dumps方法将Python对象转换为JSON字符串event:和data:是SSE协议的格式要求\n\n表示消息结束messagefevent:{event_type}\ndata:{json.dumps(data)}\n\nforqueueinself._clients[event_type]:awaitqueue.put(message)asyncdefshutdown(self):print(SSE shutdown 开始...)self.shutdown_event.set()# 通知所有 SSE 任务退出# 全局单例sse_managerSSEManager()routes层see.py提供用于前端订阅的接口同时会作为客户端长期运行维持SSE。# app/api/routes/sse.pyimportasynciofromfastapiimportAPIRouterfromstarlette.responsesimportStreamingResponsefromapp.services.sse_managerimportsse_manager routerAPIRouter(prefix/sse,tags[实时推送])router.get(/subscribe/{event_type})asyncdefsubscribe(event_type:str): 订阅指定事件类型的 SSE 流前端可以通过 EventSource 连接到这个接口来接收实时事件推送 - event_type: 事件类型字符串例如 data_update前端可以根据这个事件类型来区分不同的事件流 - 返回值: StreamingResponse 对象内容类型为 text/event-stream符合 SSE 协议要求 - 注意前端需要使用 EventSource 来连接这个接口例如 const eventSource new EventSource(/api/sse/subscribe/data_update); eventSource.onmessage (event) { const data JSON.parse(event.data); console.log(Received data update event:, data); }; queueawaitsse_manager.subscribe(event_type)asyncdefevent_generator():try:whileTrue:done,pendingawaitasyncio.wait([asyncio.create_task(queue.get()),asyncio.create_task(sse_manager.shutdown_event.wait())],timeout10,# 心跳间隔return_whenasyncio.FIRST_COMPLETED)# shutdown_event 触发 → 退出ifsse_manager.shutdown_event.is_set():breakifnotdone:yieldevent: heartbeat\ndata: {}\n\ncontinue# queue.get() 返回messagedone.pop().result()ifmessageisNone:breakyieldmessageawaitasyncio.sleep(0.1)exceptasyncio.CancelledError:# 不再抛出直接忽略让连接自然关闭# await sse_manager.unsubscribe(event_type, queue)passfinally:awaitsse_manager.unsubscribe(event_type,queue)# StreamingResponse 用于创建一个流式响应event_generator 是一个异步生成器函数负责从队列中获取消息并发送给前端# 流式响应是一种特殊的HTTP响应允许服务器持续发送数据给客户端而不需要等待所有数据准备好后一次性发送这对于实时推送非常有用returnStreamingResponse(event_generator(),media_typetext/event-stream,headers{Cache-Control:no-cache,Connection:keep-alive,Access-Control-Allow-Origin:*,# 允许前端跨域X-Accel-Buffering:no,})4、前端实现代码composables层useSSE.ts提供前端的持续消息接收服务。exportfunctionuseSSE(eventType:string,callback:(data:any)void){leteventSource:EventSource|nullnullletlastHeartbeatDate.now()constcreateConnection(){eventSourcenewEventSource(${import.meta.env.VITE_API_BASE_URL}/sse/subscribe/${eventType})// 正常业务事件eventSource.addEventListener(eventType,(event){constdataJSON.parse(event.data)callback(data)})// 心跳事件eventSource.addEventListener(heartbeat,(){lastHeartbeatDate.now()})// 服务器关闭事件eventSource.addEventListener(server_shutdown,(){console.log(服务器即将关闭SSE 连接主动断开)eventSource?.close()})// 出错时自动重连排除正常关闭eventSource.onerror(){if(eventSource?.readyStateEventSource.CLOSED)returneventSource?.close()setTimeout(createConnection,3000)}}createConnection()// 心跳超时检测关键setInterval((){if(Date.now()-lastHeartbeat15000){console.log(心跳超时服务器可能已关闭主动断开 SSE)eventSource?.close()}},5000)window.addEventListener(beforeunload,()eventSource?.close())returneventSource}前段使用SSE的方法在APP.vue下配置如下订阅task_completed消息并实时弹出弹窗提示。script setup langtsimport{useSSE}from/composables/useSSEimport{ElNotification}fromelement-plusimport{onMounted,onBeforeUnmount}fromvueletsse:EventSource|nullnull// 监听任务完成事件onMounted((){sseuseSSE(task_completed,(data){ElNotification({title:任务完成,message:data.message||操作已成功,type:data.type||success,duration:5000,})})})onBeforeUnmount((){sse?.close()})/script5、该方案的弊端该方案可以顺利实现SSE服务前后端消息推送功能但是会导致后端服务无法正常关闭。在开发中一般使用如下命令启动python后端服务器uvicorn app.main:app--reload该指令让后端服务可以随着后端文件修改按下ctrls后自动重启后端更新程序同时还可以ctrlc中止程序。但是由于在routes层接口配置了while true的客户端连接这会导致uvicorn一直等待连接的关闭而卡住除非到达超时时间触发uvicorn 的强制关闭。可以通过如下的方式配置超时参数来减少超时等待延迟uvicorn app.main:app--reload--timeout-graceful-shutdown5但是超时时间到达后由于SSE连接被强制关闭会导致后端出现一大片报错。参考如下(strategy-env)PS E:\2025\机器学习\Strategy-Forge\backenduvicorn app.main:app--reload--timeout-graceful-shutdown30INFO: Willwatchforchangesinthese directories:[E:\\2025\\机器学习\\Strategy-Forge\\backend]INFO: Uvicorn running on http://127.0.0.1:8000(Press CTRLC to quit)INFO: Started reloader process[5872]using StatReload INFO: Started server process[20604]INFO: Waitingforapplication startup. API 文档http://127.0.0.1:8000/docs INFO: Application startup complete. INFO:127.0.0.1:53078 -GET /sse/subscribe/task_completed HTTP/1.1200OK INFO: Shutting down INFO: Waitingforconnections to close.(CTRLC to force quit)ERROR: Cancel1running task(s),timeoutgracefulshutdownexceeded INFO: Waitingforapplication shutdown. SSEshutdown开始... INFO: Applicationshutdowncomplete. INFO: Finished server process[20604]ERROR: ExceptioninASGI application Traceback(most recent call last): FileE:\Anaconda\envs\strategy-env\Lib\site-packages\uvicorn\protocols\http\h11_impl.py, line415,inrun_asgi resultawait app(# type: ignore[func-returns-value]^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ self.scope, self.receive, self.send ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^)^ FileE:\Anaconda\envs\strategy-env\Lib\site-packages\uvicorn\middleware\proxy_headers.py, line63,in__call__returnawait self.app(scope, receive, send)^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ FileE:\Anaconda\envs\strategy-env\Lib\site-packages\fastapi\applications.py, line1159,in__call__ await super().__call__(scope, receive, send)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\applications.py, line90,in__call__ await self.middleware_stack(scope, receive, send)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\middleware\errors.py, line164,in__call__ await self.app(scope, receive, _send)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\middleware\cors.py, line96,in__call__ await self.simple_response(scope, receive, send,request_headersheaders)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\middleware\cors.py, line154,insimple_response await self.app(scope, receive, send)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\middleware\exceptions.py, line63,in__call__ await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\_exception_handler.py, line42,inwrapped_app await app(scope, receive, sender)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\fastapi\middleware\asyncexitstack.py, line18,in__call__ await self.app(scope, receive, send)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\routing.py, line660,in__call__ await self.middleware_stack(scope, receive, send)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\routing.py, line680,inapp await route.handle(scope, receive, send)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\routing.py, line276,inhandle await self.app(scope, receive, send)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\fastapi\routing.py, line134,inapp await wrap_app_handling_exceptions(app, request)(scope, receive, send)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\_exception_handler.py, line42,inwrapped_app await app(scope, receive, sender)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\fastapi\routing.py, line121,inapp await response(scope, receive, send)FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\responses.py, line274,in__call__ async with anyio.create_task_group()as task_group: ~~~~~~~~~~~~~~~~~~~~~~~^^ FileE:\Anaconda\envs\strategy-env\Lib\site-packages\anyio\_backends\_asyncio.py, line803,in__aexit__ raise exc_val FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\responses.py, line281,in__call__ await wrap(partial(self.listen_for_disconnect, receive))FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\responses.py, line277,inwrap await func()FileE:\Anaconda\envs\strategy-env\Lib\site-packages\starlette\responses.py, line244,inlisten_for_disconnect messageawait receive()^^^^^^^^^^^^^^^ FileE:\Anaconda\envs\strategy-env\Lib\site-packages\uvicorn\protocols\http\h11_impl.py, line536,inreceive await self.message_event.wait()FileE:\Anaconda\envs\strategy-env\Lib\asyncio\locks.py, line213,inwaitawait fut asyncio.exceptions.CancelledError: Task cancelled,timeoutgracefulshutdownexceeded INFO: Stopping reloader process[5872]一开始我的解决方式是在程序的生命周期结束时主动触发SSE关闭。即在main.py中配置app的生命周期如下asynccontextmanagerasyncdeflifespan(app:FastAPI):print( API 文档http://127.0.0.1:8000/docs)yield# 该任务本意用来在控制面板ctrlc关闭后端的时候主动关闭sse服务# 但是由于unicorn关闭会先于shutdown触发因此总是会导致sse先被异常关闭掉awaitsse_manager.shutdown()上述方法中yield前的程序会在后端开始生命周期时正式运行前执行yield后的程序则会在生命周期结束时执行即sse_manager.shutdown()。事实证明该方案是不可行的从上面的报错例子中也可以看到先触发了ERROR才打印出了sse_manager.shutdown()内部的print这表示按下ctrlc后程序等待超时触发了强制关闭导致了报错后才触发了我的方法这为时已晚。由于上述框架中生命周期的设置想要在FastAPI中正常关闭我的SSE似乎是不可能的了我也尝试过通过心跳机制让前端发送断开连接但是此时后端已经在“死亡的路上”依然无法正常关闭。6、应该如何实现优雅的后端的消息推送最终只能遗憾地判断我的SSE无法和FastAPI兼容虽然功能得以实现但是服务端关闭时的报错让人难以接收。最终查询发现其实FastAPI有提供SSE服务居然完全不需要我自己写吗框架官方提供的SSE应该和它的生命周期是可以兼容的应该考虑使用该方案来实现功能的同时又能优雅地关闭服务。同时还可以考虑用WebSocket协议来实现后端消息的推送作为全双工的协议其应当可以轻松实现主动断开连接防止重复等待。7、补充FastAPI提供的SSE文档只是对于SSE格式数据的封装并没有提供SSE服务依然无法改善我的问题。后续有两个方案可以改进我的问题1、仅临时通讯时使用SSE例如前端触发事件后开启SSE通讯等待超时或者SSE正常返回完成后主动关闭SSE这样就不会出现服务端关闭时SSE客户端持续等待的问题2、改为客户端主动也想服务端发送心跳包超时则主动关闭该修改仍然需要服务端等待一段时间。3、换用WebSocket协议来构建长时间的前后端相互通讯通道websocket似乎也会有类似问题仍然需要测试。