SSeEmitter的基本使用和介绍

SSeEmitter的基本使用和介绍 目录一、SseEmitter基本方法1、构造2、发送数据3、生命周期回调清理资源4、结束连接5、其他二、SseEmitter的应用场景优点1、配合Consumer,在业务进行中返回消息/进度。2、AI 大模型流式对话打字机效果3、实时消息与系统通知三、SseEmitter和HTTP的对比通信模式数据流向:响应格式连接状态断线重连:适用场景四、SseEmitter的缺点2、仅支持文本传输不支持二进制3、完全不兼容 IE 浏览器4、连接管理与内存泄漏风险五、注意事项1、生产环境的“隐形杀手”代理服务器Nginx缓存2、. 线程池的“大坑”千万不要用默认线程池3、 分布式部署的连接管理六、关于前端的技术选型1. 浏览器原生 EventSource2. fetch API ReadableStream3. 第三方库 microsoft/fetch-event-source 强烈推荐核心差异对比与最终选型建议首先了解一下SseEmitter的基本方法一、SseEmitter基本方法1、构造1new SseEmitter /new SseEmitter(Long timeout)单位为毫秒创建一个 SSE 连接对象。默认超时时间是 30 秒。例如传入60000L表示60秒0L表示永不超时适合长订阅场景。2完整三参数构造(Spring 5.3)new SseEmitter(Long timeout ,Long writeTimeout,TimeZone timeZone);timeout:连接总超时时间毫秒writeTimeout:单次写数据超时(毫秒) |||| 即调用send()发送一条消息时最长允许阻塞等待时间 |||| 默认5000毫秒timeZone:时间时区(一般传TimeZone.getDefault());2、发送数据1emitter.send(Object data)2emitter.send(SseEmitter.event()...)发送带有丰富元数据的 SSE 事件。这是最常用的方式可以链式调用以下属性.name(事件名)指定事件类型前端可以通过 addEventListener(事件名) 单独监听。.data(数据对象)实际要传输的业务数据。.id(消息ID)消息的唯一标识用于断线重连时的消息追踪。.reconnectTime(毫秒数)建议客户端在连接断开后隔多久尝试重连。.comment(注释)发送注释通常用于心跳保活防止代理服务器断开空闲连接。示例// 1、最简单的纯文本推送只包含 data emitter.send(SseEmitter.event().data(这是一条简单的系统通知)); // 2、带事件名的业务推送包含 event 和 data emitter.send(SseEmitter.event() .name(APPROVAL_SUCCESS) .data(你的请假申请已被老板批准)); // 3、完整的防丢包推送包含 id, retry, event, data emitter.send(SseEmitter.event() .id(event_20260601_001) // 消息唯一ID用于断线续传 .reconnectTime(3000L) // 断线后3秒重连 .name(NEW_MESSAGE) // 事件名称 .data(new ChatMessage(你好这是一条新消息))); // 实际数据对象 //4、 发送一个空注释作为心跳保活 emitter.send(SseEmitter.event().comment(ping));3send(Object data, MediaType mediaType)mediaType的选用MediaType.TEXT_PLAIN 对应 HTTP 的 text/plain。 MediaType.APPLICATION_JSON对应 HTTP 的 application/json。 MediaType.APPLICATION_XML对应 HTTP 的 application/xml。 MediaType.APPLICATION_OCTET_STREAM对应 HTTP 的 application/octet-stream。3、生命周期回调清理资源emitter.onCompletion(Runnable)连接正常关闭调用了 complete()时触发。emitter.onTimeout(Runnable)连接超过设定时间未发送数据时触发。emitter.onError(ConsumerThrowable)连接发生异常如客户端强行关闭时触发。可以用lambda表达式进行构造参数例如1emitter.onCompletion(() - emitters.remove(emitterId));//在SSE连接关闭后清理连接。2emitter.onCompletion(() - { // 记录业务日志 log.info(用户 {} 的实时任务推送已正常结束, userId); // 更新数据库状态 taskService.updateStatus(userId, FINISHED); });//SSE连接关闭后记录该次任务完成。4、结束连接(1)emitter.complete()主动正常关闭 SSE 连接。(2)emitter.completeWithError(Throwable)主动以异常状态关闭连接。(3)emitter.isComplete() / emitter.isExpired()判断连接是否已结束或已过期在群发消息遍历集合时可以用来过滤掉失效的连接。可以使用Atomic原子类保证结束连接的原子性例如private final AtomicBoolean isClosed new AtomicBoolean(false); public void safeComplete() { // compareAndSet(false, true) 只有在当前值为 false 时才会将其设为 true并返回 true if (isClosed.compareAndSet(false, true)) { emitter.complete(); } }注意SSESseEmitter的断线重连是无限次的浏览器会一直尝试重连直到你手动关闭页面或前端代码显式调用 .close() 方法为止。5、其他setTimeout(Long timeout):单位毫秒动态修改超时时间。二、SseEmitter的应用场景优点了解一下它的应用场景第一个示例展示了怎么使用1、配合Consumer,在业务进行中返回消息/进度。流程后端在控制层创建SseEmitter和Consumer回调将Consumer传递给服务层在服务层需要的时候进行回调传递给前端。示例SseEmitter emitternew SseEmitter(); ConsumerEvent eventCallback event - { try { emitter.send(SseEmitter.event() .name(event.getEventType()) .data(event)); } catch (IOException e) { log.error(发送 SSE 事件失败, e); emitters.remove(emitterId); }服务层如何使用对应用层传参的时候将eventCallback一并传入在应用层只需要调用accept(Event event)即可回调传递消息给前端。这里的event自己封装一下内容优点1跨层传递回调函数将推送逻辑从 Controller 传递到 Engine,再传递到 Executor。以此闭包捕获 SseEmitter - Consumer 在定义时捕获了所在方法的局部变量(emitter)。2解耦业务逻辑与推送逻辑 - 执行引擎不需要知道 SSE,只需要调用回调。2、AI 大模型流式对话打字机效果场景描述当用户与大模型对话时AI 的回复不是一下子全部返回的而是流式输出。为什么用 SseEmitter大模型生成文本需要时间后端每生成一段文字就立刻通过 emitter.send() 推给前端。这种极低的延迟感提升了用户体验对比WebSocket减少了运维成本降低了开发复杂度。3、实时消息与系统通知场景描述站内信/消息提醒右上角的小红点当有新订单、新评论或系统公告时无需刷新页面立刻弹出提示。审批流通知你的请假申请被老板通过了页面立刻弹出“审批通过”的提示框。为什么用 SseEmitter替代了传统的“前端每x秒轮询一次接口”的方式。SSE 只有在真正有消息时才推送极大地节省了服务器的 CPU 和网络带宽资源。轮询(1)高频的短连接会让服务器的 Web 容器频繁地创建和销毁线程来处理请求。在高并发下这会导致严重的 CPU 上下文切换甚至把服务器的线程池耗尽。(2)每一次轮询客户端和服务器都要重新经历一次完整的“握手”流程建立 TCP 连接、TLS 加密协商、传输完整的 HTTP 请求头。这些头部信息往往比实际的响应内容还要大造成了极大的带宽浪费。SSE(1)虽然连接一直挂着但 SSE 连接在挂起状态下几乎不消耗 CPU。现代服务器维护成千上万个空闲的长连接占用的仅仅是极少的内存而不是宝贵的 CPU 时间片。(2)只需要最开始建立连接时握手一次。三、SseEmitter和HTTP的对比与同样常用的通信模式HTTP做对比也可以和WebSocket对比一下通信模式HTTP:短连接客户端发一次请求服务端回一次响应连接立刻断开。SseEmitter:长连接建立一次 HTTP 连接后服务端可以源源不断地向客户端推送数据。数据流向:HTTP:单向拉取只能由客户端主动发起服务端无法主动给客户端发消息。SseEmitter:单向推送服务端可以主动向客户端推送数据客户端发数据需另开普通 HTTP 请求。响应格式HTTP:完整的 HTML、JSON 或文件等响应体一次性返回。SseEmitter:固定为 text/event-stream 格式的纯文本流数据以 data: ... 的形式持续流出。连接状态HTTP:无状态、无连接每次请求都是独立的事务服务器不保留连接状态。SseEmitter:持久连接依赖 HTTP 的 Connection: keep-alive连接会一直保持直到业务结束或超时。断线重连:HTTP:无如果请求失败需要前端手动重新发起请求。SseEmitter:浏览器原生支持前端 EventSource 会自动在断线后尝试重连无需写额外代码。适用场景HTTP:网页浏览、表单提交、常规的增删改查接口。SseEmitter:AI 打字机流式输出、实时任务进度条、股票行情刷新、系统消息通知。双向实时交互选WebSocket。四、SseEmitter的缺点这之后讲一讲SSE的缺点和注意事项1、单向通信SSE 只能由服务器向客户端单向推送数据。如果客户端需要频繁向服务器发送消息比如实时聊天、在线协作编辑SSE 就无法胜任必须依赖 WebSocket 或额外的 HTTP 请求。2、仅支持文本传输不支持二进制SSE 只能推送 UTF-8 文本。如果你需要推送图片流、文件流或视频帧必须先将二进制数据做 Base64 编码这会导致数据体积膨胀约 33%严重消耗带宽并增加编解码的 CPU 开销。3、完全不兼容 IE 浏览器4、连接管理与内存泄漏风险SseEmitter 会在服务器内存中维持长连接。如果客户端异常断开如直接关闭浏览器、断网而服务器端没有通过 onCompletion、onTimeout 和 onError 等回调及时清理资源这些“僵尸连接”会一直占用内存最终可能导致服务器内存溢出。五、注意事项1、生产环境的“隐形杀手”代理服务器Nginx缓存这是新手使用 SSE 最容易遇到的坑。在本地开发时一切正常但一部署到测试或生产环境通常前面挂着 Nginx流式输出就会失效变成“卡住很久然后一次性吐出所有数据”。原因Nginx 等反向代理默认会开启缓冲Buffering它会傻傻地等后端把数据攒够了一大块才一次性发给前端。解决方案必须在 Nginx 的配置中针对 SSE 的接口路径关闭缓冲。location /api/sse { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Connection ; proxy_buffering off; # 核心配置关闭代理缓冲实现真正的实时流 proxy_cache off; # 关闭缓存 }2、. 线程池的“大坑”千万不要用默认线程池在代码中经常会用异步线程去执行耗时任务并推送 SSE。不要直接使用CompletableFuture.runAsync(...)或 Spring 默认的线程池。原因CompletableFuture 默认使用的是 ForkJoinPool.commonPool()这个全局共享线程池的线程数非常少通常等于 CPU 核心数 - 1。如果高并发下有几个 SSE 长连接任务占用了线程会导致整个应用的其他异步任务全部卡死。解决方案为 SSE 业务自定义一个有界线程池明确指定线程数、队列大小和拒绝策略做到资源隔离。3、 分布式部署的连接管理用 MapString, SseEmitter 来存储连接这在单机部署时完全没问题。多台服务器一旦需要集群部署多台服务器用户的 SSE 连接可能建立在 A 机器上但触发推送的业务消息可能由 B 机器处理。此时 B 机器拿着emitterId在自己的内存 Map 里是找不到对应连接的。解决方案在分布式场景下需要引入 Redis 等中间件。当业务触发推送时先将消息发布到 Redis 的特定频道Pub/Sub或消息队列如 Kafka/RocketMQ各个服务节点订阅消息后判断目标用户是否连接在本机上如果是再进行本地的 emitter.send() 推送。六、关于前端的技术选型1. 浏览器原生EventSource这是最传统、最轻量级的方案浏览器内置了完整的 SSE 协议解析能力。核心特点零依赖、开箱即用、浏览器自动处理断线重连。代码示例const eventSource new EventSource(/api/sse-connection); // 监听普通消息后端未指定 event name 时 eventSource.onmessage (event) { console.log(收到数据:, event.data); }; // 监听指定事件名的消息对应后端 .name(xxx) eventSource.addEventListener(APPROVAL_SUCCESS, (event) { console.log(审批通过:, event.data); }); // 监听连接错误 eventSource.onerror (error) { console.error(连接发生错误:, error); // 浏览器会自动尝试重连这里通常只需要记录日志 };缺点 仅支持 GET 请求无法在请求体Body中传递复杂参数。 无法自定义请求头除了 Cookie需开启 withCredentials无法在 Header 中携带 Token 等鉴权信息。适用场景简单的实时通知、股票行情、运维大屏等不需要携带复杂鉴权信息或请求参数的公开或 Cookie 鉴权接口。2. fetch API ReadableStream这是现代浏览器提供的原生流式请求方案完全抛弃了 SSE 的协议封装由前端手动解析流数据。特点极高的自由度支持 POST、自定义 Header、完美配合 AbortController 中断请求。代码示例const controller new AbortController(); fetch(/api/sse-connection, { method: POST, // 支持 POST headers: { Content-Type: application/json, Authorization: Bearer YOUR_TOKEN // 支持自定义 Header }, body: JSON.stringify({ prompt: 你好 }), signal: controller.signal // 支持随时中断 }) .then(response { const reader response.body.getReader(); const decoder new TextDecoder(utf-8); let buffer ; // 循环读取流数据 function readStream() { reader.read().then(({ done, value }) { if (done) { console.log(流传输结束); return; } // 将二进制流解码为文本 buffer decoder.decode(value, { stream: true }); // 手动按 \n\n 分割并解析 SSE 格式 (data: xxx) const lines buffer.split(\n\n); buffer lines.pop(); // 保留最后一个可能不完整的片段 lines.forEach(line { if (line.startsWith(data:)) { const data line.replace(data:, ).trim(); console.log(解析出的数据:, data); } }); readStream(); // 继续读取下一块 }); } readStream(); }); // 随时可以调用 controller.abort() 来中断连接例如用户点击“停止生成”缺点 实现极其繁琐需要手动处理 data:、event:、id: 的解析还要自己写断线重连和心跳保活逻辑。适用场景后端返回的不是标准 SSE 格式或者你需要完全掌控请求的每一个细节如自定义超时、极其复杂的重试策略。3. 第三方库microsoft/fetch-event-source 强烈推荐这是微软官方维护的一个轻量级库仅 2KB 左右它完美结合了前两者的优点底层基于 fetch上层封装了标准的 SSE 协议解析和自动重连核心特点支持 POST 和自定义 Header、自动处理 SSE 协议解析、内置可控的自动重连机制。代码示例import { fetchEventSource } from microsoft/fetch-event-source; const ctrl new AbortController(); fetchEventSource(/api/sse-connection, { method: POST, headers: { Content-Type: application/json, Authorization: Bearer YOUR_TOKEN }, body: JSON.stringify({ prompt: 你好 }), signal: ctrl.signal, openWhenHidden: true, // 页面隐藏时保持连接解决移动端切后台断开的问题 onmessage(event) { if (event.event APPROVAL_SUCCESS) { console.log(审批通过:, event.data); } else { console.log(普通消息:, event.data); } }, onerror(err) { // 精细控制重连策略 if (err.status 401 || err.status 403) { throw err; // 鉴权失败直接停止重连 } // 其他错误默认会自动重连 } });适用场景AI 大模型流式对话、需要 Token 鉴权的复杂业务推送、任何需要 POST 请求的 SSE 场景。核心差异对比与最终选型建议表格维度原生 EventSourcefetch ReadableStreammicrosoft/fetch-event-source支持 POST 请求仅支持 GET支持支持自定义 Header不支持支持支持SSE 协议解析浏览器原生自动解析需手动解析库自动解析自动重连浏览器原生无脑重连需手动实现内置可精细控制主动中断请求️ 仅支持 close()AbortControllerAbortController代码复杂度极低几行代码极高需处理流和协议低封装完善外部依赖零依赖零依赖极小依赖 (~2KB)选型总结如果是极其简单、无需鉴权或仅 Cookie 鉴权的 GET 请求直接用原生EventSource。如果是AI 对话、需要 POST 传参、需要 Header 带 Token 鉴权的生产级项目选microsoft/fetch-event-source。除非你有极其特殊的流处理需求比如后端返回的不是标准 SSE 格式否则不建议直接使用裸 fetch ReadableStream。