实现流式输出:Server-Sent Events (SSE) 与 Fetch API

实现流式输出:Server-Sent Events (SSE) 与 Fetch API 流式输出是智能体前端的“生命线”这篇文章把两种主流方案讲透附带可运行的 React 代码流式输出对于智能体AI Agent来说几乎是必备功能。用户问一个问题如果界面转圈七八秒才突然跳出一大段文字体验非常焦虑。但如果是文字一个字一个字往外蹦用户就会觉得“它在思考它在打字”心理等待时间缩短一半以上。大模型生成回答的过程本身就是逐字生成的等待全部生成完再一次性返回浪费了用户宝贵的等待时间。流式输出的做法是后端每生成一个 chunk 就立刻推给前端前端收到一个 chunk 就立刻渲染出来。2026 年几乎所有生产级智能体都已经标配流式输出。这篇文章我会把Server-Sent Events (SSE)和Fetch API 流式响应两种方案从头到尾讲清楚包括原理、代码实现、踩坑经验和性能优化。文中有完整的 React TypeScript 代码你可以直接复制到项目里用。一、两种流式方案对比下面这张流程图可以让你快速理解两种方案的本质区别再来看一个序列图展示 SSE 模式下前后端的交互时序SSE (EventSource)是浏览器原生 API使用极其简单自动处理重连。缺点是无法自定义请求头比如不能携带 Bearer Token只能通过 URL 参数传递认证信息并且是单向的服务端→客户端。Fetch API ReadableStream更灵活可以携带任意请求头支持用户主动打断生成AbortController支持更细粒度的错误处理。缺点是实现稍复杂需要手动解析数据帧和处理重连逻辑。在实际项目中两者我都会用内部测试或简单场景用 SSE 快速验证需要 Token 认证、需要用户打断生成、或需要发送复杂请求体时用 Fetch 流式方案。二、方案一SSE (EventSource) 实现流式输出2.1 后端实现Python FastAPISSE 协议要求服务端设置Content-Type: text/event-stream并且每次发送的数据格式为data: {json}\n\n。每条消息以两个换行符结束。fromfastapiimportFastAPIfromfastapi.responsesimportStreamingResponseimportasyncioimportjson appFastAPI()asyncdefgenerate_stream(prompt:str):# 模拟调用 LLM 并逐字返回# 实际项目中你会调用 OpenAI/Claude/DeepSeek 的 streamTrue 接口full_responsef你问的是{prompt}让我想想……forcharinfull_response:yieldfdata:{json.dumps({type:text,content:char})}\n\nawaitasyncio.sleep(0.05)# 发送结束事件yieldfevent: done\ndata: {{}}\n\napp.get(/api/agent/stream)asyncdefagent_stream(prompt:str):returnStreamingResponse(generate_stream(prompt),media_typetext/event-stream)关键点每个data:行必须以\n\n结尾浏览器才能识别为一条完整消息。可以自定义event:字段来区分消息类型例如event: thought用于显示思考过程前端可以分别监听。结束标记用event: done通知前端关闭连接。2.2 前端 React HookuseSSE由于EventSource不支持自定义请求头认证信息只能通过 URL 参数传递例如/api/stream?tokenxxx。// hooks/useSSE.tsimport{useState,useRef,useCallback}fromreact;interfaceStreamMessage{type?:text|thought|tool_call|done|error;content?:string;[key:string]:any;}exportfunctionuseSSE(){const[isStreaming,setIsStreaming]useState(false);consteventSourceRefuseRefEventSource|null(null);conststartStreamuseCallback((prompt:string,onMessage:(msg:StreamMessage)void,onDone?:()void,onError?:(err:Event)void){if(eventSourceRef.current){eventSourceRef.current.close();}// 如果有 token通过 URL 参数传递不推荐仅用于简单场景consttokenlocalStorage.getItem(token);consturl/api/agent/stream?prompt${encodeURIComponent(prompt)}${token?token${token}:};consteventSourcenewEventSource(url);eventSourceRef.currenteventSource;setIsStreaming(true);// 监听默认的 message 事件无 event 字段的数据eventSource.onmessage(event){try{constdataJSON.parse(event.data);if(data.typedone){eventSource.close();setIsStreaming(false);onDone?.();}else{onMessage(data);}}catch(e){console.error(解析 SSE 消息失败,e);}};// 监听自定义事件后端通过 event: xxx 标识eventSource.addEventListener(thought,(event:any){try{constdataJSON.parse(event.data);onMessage({type:thought,content:data.content});}catch(e){}});eventSource.onerror(error){console.error(SSE 连接错误,error);eventSource.close();setIsStreaming(false);onError?.(error);};},[]);conststopStreamuseCallback((){if(eventSourceRef.current){eventSourceRef.current.close();setIsStreaming(false);}},[]);return{isStreaming,startStream,stopStream};}2.3 在 React 组件中使用// components/ChatWithSSE.tsx import { useState } from react; import { useSSE } from /hooks/useSSE; export function ChatWithSSE() { const [input, setInput] useState(); const [answer, setAnswer] useState(); const { isStreaming, startStream, stopStream } useSSE(); const handleSend () { if (!input.trim() || isStreaming) return; setAnswer(); startStream( input, (msg) { if (msg.type text) { setAnswer(prev prev (msg.content || )); } }, () console.log(完成), (err) console.error(错误, err) ); setInput(); }; return ( div classNamep-4 div classNameborder rounded-lg p-4 min-h-[200px] mb-4 {answer || (isStreaming 正在思考...)} /div div classNameflex gap-2 input value{input} onChange{(e) setInput(e.target.value)} classNameflex-1 border rounded px-2 py-1 disabled{isStreaming} / button onClick{handleSend} disabled{isStreaming}发送/button {isStreaming button onClick{stopStream}停止/button} /div /div ); }三、方案二Fetch API ReadableStream 实现流式输出3.1 后端实现支持认证和更灵活的请求后端同样需要返回Transfer-Encoding: chunked但不需要特殊的text/event-stream类型普通application/json也可以只要数据是分块发送的。fromfastapiimportFastAPIfromfastapi.responsesimportStreamingResponseimportasyncioimportjson appFastAPI()asyncdefgenerate_chunks(prompt:str):# 模拟逐字生成fullf你问{prompt}。答案是……forchinfull:yieldjson.dumps({content:ch})\nawaitasyncio.sleep(0.05)yieldjson.dumps({done:True})\napp.post(/api/agent/chat)asyncdefagent_chat(request:dict):promptrequest.get(message,)returnStreamingResponse(generate_chunks(prompt),media_typeapplication/x-ndjson# 或者 text/plain)这里使用了NDJSON (Newline Delimited JSON)格式每个 chunk 是一个 JSON 字符串以换行符分隔解析非常方便。3.2 前端 React HookuseStreamingChat这个 Hook 支持请求头认证、用户主动打断、错误重试等功能。// hooks/useStreamingChat.tsimport{useState,useRef,useCallback}fromreact;exportfunctionuseStreamingChat(){const[answer,setAnswer]useState();const[isLoading,setIsLoading]useState(false);constabortControllerRefuseRefAbortController|null(null);constsendMessageuseCallback(async(message:string,onChunk?:(chunk:string)void){// 取消之前的请求if(abortControllerRef.current){abortControllerRef.current.abort();}constcontrollernewAbortController();abortControllerRef.currentcontroller;setAnswer();setIsLoading(true);try{constresponseawaitfetch(/api/agent/chat,{method:POST,headers:{Content-Type:application/json,Authorization:Bearer${localStorage.getItem(token)},},body:JSON.stringify({message}),signal:controller.signal,});if(!response.ok)thrownewError(HTTP${response.status});if(!response.body)thrownewError(No response body);constreaderresponse.body.getReader();constdecodernewTextDecoder(utf-8);letbuffer;while(true){const{done,value}awaitreader.read();if(done)break;bufferdecoder.decode(value,{stream:true});constlinesbuffer.split(\n);bufferlines.pop()||;for(constlineoflines){if(line.trim()){try{constdataJSON.parse(line);if(data.content){constchunkdata.content;setAnswer(prevprevchunk);onChunk?.(chunk);}if(data.done){// 完成}}catch(e){console.warn(JSON 解析失败,line);}}}}}catch(error:any){if(error.nameAbortError){console.log(请求已取消);}else{console.error(流式请求失败,error);setAnswer(抱歉网络出错了请稍后重试。);}}finally{setIsLoading(false);abortControllerRef.currentnull;}},[]);conststopGenerationuseCallback((){if(abortControllerRef.current){abortControllerRef.current.abort();}},[]);return{answer,isLoading,sendMessage,stopGeneration};}3.3 React 组件中使用带打断功能// components/ChatWithFetch.tsx import { useState } from react; import { useStreamingChat } from /hooks/useStreamingChat; export function ChatWithFetch() { const [input, setInput] useState(); const { answer, isLoading, sendMessage, stopGeneration } useStreamingChat(); const handleSend async () { if (!input.trim() || isLoading) return; await sendMessage(input); setInput(); }; return ( div classNameflex flex-col h-screen p-4 div classNameflex-1 overflow-auto border rounded p-4 mb-4 {answer || (isLoading 正在接收回答...)} /div div classNameflex gap-2 input value{input} onChange{(e) setInput(e.target.value)} classNameflex-1 border rounded px-3 py-2 disabled{isLoading} onKeyDown{(e) e.key Enter handleSend()} / button onClick{handleSend} disabled{isLoading}发送/button {isLoading button onClick{stopGeneration}停止/button} /div /div ); }四、错误处理与重连机制SSE 原生支持自动重连间隔约 3 秒但fetch方案需要手动实现。我们可以封装一个带指数退避的重连逻辑classRobustStreamClient{privateretryCount0;privatemaxRetries5;privatebaseDelay1000;privateisActivefalse;asyncconnect(url:string,onMessage:(data:any)void){this.isActivetrue;while(this.retryCountthis.maxRetriesthis.isActive){try{awaitthis._doConnect(url,onMessage);this.retryCount0;// 成功后重置break;}catch(err){this.retryCount;constdelaythis.baseDelay*Math.pow(2,this.retryCount-1);console.log(${delay}ms 后重试...);awaitthis.sleep(delay);}}}privateasync_doConnect(url:string,onMessage:(data:any)void){constresponseawaitfetch(url);constreaderresponse.body!.getReader();// ... 读取流调用 onMessage}privatesleep(ms:number){returnnewPromise(resolvesetTimeout(resolve,ms));}disconnect(){this.isActivefalse;}}另外对于网络超时如移动端信号差可以在前端设置一个定时器如果 10 秒内没有收到任何数据主动中断并提示用户。五、性能优化与用户体验细节5.1 节流渲染流式输出时高频调用setAnswer会导致组件频繁重绘。React 18 的自动批处理已经能合并大部分更新但如果仍然感觉卡顿可以使用useDeferredValue或手动节流const[answer,setAnswer]useState();constdeferredAnsweruseDeferredValue(answer);// 渲染时使用 deferredAnswer避免高频渲染阻塞用户输入或者使用throttleconstthrottledAppenduseCallback(throttle((chunk:string){setAnswer(prevprevchunk);},50),[]);5.2 显示“正在输入”指示器在第一个 chunk 到达之前显示一个闪烁光标或三个点动画提升等待体验。{isLoading answer.length 0 ( div classNametyping-indicator.../div )}5.3 移动端适配使用-webkit-overflow-scrolling: touch让滚动更流畅。在页面可见性变化时暂停渲染document.visibilityState节省流量。useEffect((){consthandleVisibilityChange(){if(document.hidden){// 暂停流式更新或降低频率}};document.addEventListener(visibilitychange,handleVisibilityChange);return()document.removeEventListener(visibilitychange,handleVisibilityChange);},[]);5.4 自动滚动到最新消息配合消息列表的滚动容器每次收到新 chunk 时滚动到底部前提是用户未主动上滚。constmessagesEndRefuseRefHTMLDivElement(null);useEffect((){messagesEndRef.current?.scrollIntoView({behavior:smooth});},[answer]);六、生产环境注意事项6.1 Nginx 代理配置如果前端通过 Nginx 反向代理后端必须关闭缓冲否则 SSE 会被缓存到一定大小才发送失去流式效果。location /api/agent/ { proxy_pass http://backend; proxy_buffering off; proxy_cache off; proxy_set_header X-Accel-Buffering no; proxy_http_version 1.1; chunked_transfer_encoding off; }6.2 浏览器兼容性SSE 在所有现代浏览器Chrome、Firefox、Safari、Edge中均受支持。fetch流式读取需要 ReadableStream同样被广泛支持IE 除外。移动端表现良好。6.3 大流量下的资源管理每个 SSE 连接都会占用一个文件描述符和内存。当并发连接数很高时例如上千需要考虑使用 HTTP/2 多路复用或者改用 WebSocket 消息队列。对于大多数企业内部智能体SSE 完全够用。6.4 安全与认证SSE 方案由于无法添加自定义请求头推荐在 URL 中使用短期有效的 token并配合HttpOnlyCookie 做辅助认证。Fetch 方案可以直接在Authorization头中携带 Bearer Token更安全。七、总结流式输出是智能体前端体验的“基本盘”。用户不会因为你用了多先进的 Agent 框架而赞叹但会因为回答是一个字一个字蹦出来而觉得“好快”。两种方案的选择场景推荐方案快速原型、内部工具、不需要用户认证SSE (EventSource)生产环境、需要 Token 认证、用户可打断生成Fetch ReadableStream本文提供了完整的后端FastAPI和前端React TypeScript代码示例以及错误处理、重连、性能优化等生产级细节。你可以根据自己的项目需求直接复制代码进行修改。最后送上一张完整的时序图帮助你从整体上理解流式交互的全过程