基于 Python + LangChain + React 的 AI 流式对话与历史存储实战

基于 Python + LangChain + React 的 AI 流式对话与历史存储实战 前言在大语言模型LLM飞速发展的今天将 AI 对话能力集成到自己的应用中已经成为非常普遍的需求。然而一个生产级别的 AI 聊天应用不仅仅是调用 API 那么简单——流式输出让用户体验更自然对话历史持久化确保数据不丢失多会话管理让用户可以在不同话题间自由切换。本文将基于一个完整的实战项目带你从零搭建一个包含以下功能的 AI 聊天系统后端Python Flask LangChain对接通义千问Qwen大模型支持 SSE 流式输出前端React Zustand 状态管理实现类似 DeepSeek 的对话界面数据库MySQL 存储对话历史支持多会话、重命名、置顶等管理功能一、项目架构概览├── test-project/ # 后端Python Flask │ ├── server.py # 主服务所有 API 端点 │ ├── chat_qwen.py # 通义千问模型封装 │ ├── chat_history.py # 对话历史数据库操作 │ ├── rag_service.py # RAG 知识检索服务 │ ├── config.py # 数据库配置 │ ├── init_chat_history.sql # 建表 SQL │ └── .env # 环境变量API Key │ └── my-react-app/ # 前端React ├── src/view/ │ ├── pages/chat/ # 聊天页面组件 │ │ ├── Chat.jsx │ │ ├── Chat.module.scss │ │ └── components/ │ │ ├── ChatSidebar.jsx # 侧边栏对话列表 │ │ ├── ChatMessages.jsx # 消息气泡区域 │ │ └── ChatInput.jsx # 输入框 │ ├── store/chatStore.js # Zustand 状态管理 │ └── services/chatService.js # API 服务SSE REST ├── package.json └── server.js # Node.js 辅助服务二、数据库设计2.1 建表 SQL我们需要两张表chat_conversations对话表和chat_messages消息表。-- 聊天历史相关表 -- 在 pytosql 数据库中执行 CREATE TABLE IF NOT EXISTS chat_conversations ( id VARCHAR(36) PRIMARY KEY COMMENT 会话UUID, title VARCHAR(200) DEFAULT 新对话 COMMENT 会话标题, use_rag TINYINT DEFAULT 0 COMMENT 是否使用RAG, pinned TINYINT DEFAULT 0 COMMENT 是否置顶, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS chat_messages ( id INT AUTO_INCREMENT PRIMARY KEY, conversation_id VARCHAR(36) NOT NULL COMMENT 关联会话ID, role ENUM(user, assistant, system) NOT NULL COMMENT 消息角色, content TEXT NOT NULL COMMENT 消息内容, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (conversation_id) REFERENCES chat_conversations(id) ON DELETE CASCADE, INDEX idx_msg_conv (conversation_id) );2.2 表结构说明字段说明chat_conversations.idUUID 格式的主键与前端会话 ID 一一对应chat_conversations.title对话标题取用户第一条消息的前 20 个字符自动生成chat_conversations.pinned置顶标记置顶的对话排在列表最前面chat_messages.role消息角色user用户、assistantAI、system系统chat_messages.conversation_id外键关联对话表设置ON DELETE CASCADE实现级联删除为什么用 UUID 而不是自增 IDUUID 在分布式场景下更安全且前端可以先生成 ID 再与后端同步避免 ID 冲突。三、后端实现3.1 环境准备pip install flask flask-cors langchain-openai langchain-community pymysql python-dotenv faiss-cpu3.2 数据库配置config.pyDB_CONFIG { host: localhost, port: 3306, user: root, password: your_password, database: pytosql, charset: utf8mb4, }3.3 对话历史管理模块chat_history.py这个模块封装了所有与对话和消息相关的数据库操作import uuid import pymysql from config import DB_CONFIG def _get_conn(): return pymysql.connect(**DB_CONFIG) def create_conversation(use_ragFalse): 创建新对话返回对话 ID conv_id str(uuid.uuid4()) conn _get_conn() try: cursor conn.cursor() cursor.execute( INSERT INTO chat_conversations (id, use_rag) VALUES (%s, %s), (conv_id, 1 if use_rag else 0) ) conn.commit() return conv_id finally: conn.close() def list_conversations(): 获取所有对话列表置顶优先再按更新时间倒序 conn _get_conn() try: cursor conn.cursor() cursor.execute( SELECT id, title, use_rag, pinned, created_at, updated_at FROM chat_conversations ORDER BY pinned DESC, updated_at DESC ) rows cursor.fetchall() return [ { id: row[0], title: row[1], useRag: bool(row[2]), pinned: bool(row[3]), createdAt: str(row[4]) if row[4] else , updatedAt: str(row[5]) if row[5] else , } for row in rows ] finally: conn.close() def get_messages(conv_id): 获取某个对话的所有消息 conn _get_conn() try: cursor conn.cursor() cursor.execute( SELECT id, role, content, created_at FROM chat_messages WHERE conversation_id %s ORDER BY id, (conv_id,) ) rows cursor.fetchall() return [ {id: row[0], role: row[1], content: row[2], createdAt: str(row[3]) if row[3] else } for row in rows ] finally: conn.close() def save_message(conv_id, role, content): 保存一条消息并更新对话时间 conn _get_conn() try: cursor conn.cursor() cursor.execute( INSERT INTO chat_messages (conversation_id, role, content) VALUES (%s, %s, %s), (conv_id, role, content) ) cursor.execute( UPDATE chat_conversations SET updated_at NOW() WHERE id %s, (conv_id,) ) conn.commit() finally: conn.close() def update_title(conv_id, title): 更新对话标题 conn _get_conn() try: cursor conn.cursor() cursor.execute( UPDATE chat_conversations SET title %s WHERE id %s, (title, conv_id) ) conn.commit() finally: conn.close() def toggle_pin(conv_id): 切换置顶状态 conn _get_conn() try: cursor conn.cursor() cursor.execute( UPDATE chat_conversations SET pinned IF(pinned 1, 0, 1) WHERE id %s, (conv_id,) ) conn.commit() finally: conn.close() def delete_conversation(conv_id): 删除对话及其所有消息级联删除 conn _get_conn() try: cursor conn.cursor() cursor.execute(DELETE FROM chat_conversations WHERE id %s, (conv_id,)) conn.commit() finally: conn.close() def get_or_create_conversation(conv_idNone, use_ragFalse): 获取已有对话或创建新对话 if conv_id: conn _get_conn() try: cursor conn.cursor() cursor.execute( SELECT id FROM chat_conversations WHERE id %s, (conv_id,) ) if cursor.fetchone(): return conv_id finally: conn.close() return create_conversation(use_raguse_rag)3.4 Flask 主服务server.py3.4.1 基础配置与模型初始化import os import uuid import json from dotenv import load_dotenv from flask import Flask, request, jsonify, send_from_directory, Response, stream_with_context from flask_cors import CORS from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from chat_history import ( create_conversation, list_conversations, get_messages, save_message, update_title, delete_conversation, get_or_create_conversation, toggle_pin ) load_dotenv() app Flask(__name__, static_folderstatic) CORS(app) # 允许跨域请求 SYSTEM_PROMPT 你是一个有用的AI助手请用中文回答问题。 def get_chat_model(): 创建通义千问聊天模型 return ChatOpenAI( modelqwen-plus, openai_api_keyos.getenv(DASHSCOPE_API_KEY), openai_api_basehttps://dashscope.aliyuncs.com/compatible-mode/v1, )关键点通义千问提供了 OpenAI 兼容的 API 接口所以可以直接使用 LangChain 的ChatOpenAI类只需修改openai_api_base指向阿里云的 DashScope 端点即可。3.4.2 对话管理 API# 创建新对话 app.route(/api/conversations, methods[POST]) def new_conversation(): data request.json or {} use_rag data.get(use_rag, False) try: conv_id create_conversation(use_raguse_rag) return jsonify({code: 200, data: {id: conv_id}}) except Exception as e: return jsonify({code: 500, msg: str(e)}) # 获取所有对话列表 app.route(/api/conversations, methods[GET]) def get_conversations(): try: return jsonify({code: 200, data: list_conversations()}) except Exception as e: return jsonify({code: 500, msg: str(e)}) # 获取某个对话的消息列表 app.route(/api/conversations/conv_id/messages, methods[GET]) def get_conv_messages(conv_id): try: return jsonify({code: 200, data: get_messages(conv_id)}) except Exception as e: return jsonify({code: 500, msg: str(e)}) # 删除对话 app.route(/api/conversations/conv_id, methods[DELETE]) def remove_conversation(conv_id): try: delete_conversation(conv_id) return jsonify({code: 200, msg: 删除成功}) except Exception as e: return jsonify({code: 500, msg: str(e)}) # 重命名对话 app.route(/api/conversations/conv_id/rename, methods[PUT]) def rename_conversation(conv_id): data request.json title data.get(title, ).strip() if not title: return jsonify({code: 400, msg: 标题不能为空}) try: update_title(conv_id, title) return jsonify({code: 200, msg: 重命名成功}) except Exception as e: return jsonify({code: 500, msg: str(e)}) # 切换置顶 app.route(/api/conversations/conv_id/pin, methods[PUT]) def pin_conversation(conv_id): try: toggle_pin(conv_id) return jsonify({code: 200, msg: 操作成功}) except Exception as e: return jsonify({code: 500, msg: str(e)})3.4.3 SSE 流式聊天接口核心这是整个后端最核心的部分实现了 Server-Sent EventsSSE流式输出app.route(/api/chat/stream, methods[POST]) def chat_stream(): data request.json user_message data.get(message, ).strip() session_id data.get(session_id) use_rag data.get(use_rag, False) if not user_message: return jsonify({error: 消息不能为空}), 400 def generate(): full_reply conv_id None try: # 获取或创建对话 conv_id get_or_create_conversation(session_id, use_raguse_rag) # 保存用户消息到数据库 save_message(conv_id, user, user_message) # 加载已有消息历史 existing get_messages(conv_id) # 第一轮对话自动生成标题 if len(existing) 1: title user_message[:20] (... if len(user_message) 20 else ) update_title(conv_id, title) # 构建 LangChain 消息历史 history [SystemMessage(contentSYSTEM_PROMPT)] for msg in existing: if msg[role] user: history.append(HumanMessage(contentmsg[content])) elif msg[role] assistant: history.append(AIMessage(contentmsg[content])) history.append(HumanMessage(contentuser_message)) # 调用 LLM 流式生成 llm get_chat_model() for chunk in llm.stream(history): token chunk.content if token: full_reply token # 注意必须 yield bytes 类型 yield ( fdata: {json.dumps({token: token}, ensure_asciiFalse)}\n\n ).encode(utf-8) # 保存 AI 回复到数据库 if full_reply: save_message(conv_id, assistant, full_reply) # 发送完成信号 yield ( fdata: {json.dumps({done: True, session_id: conv_id}, ensure_asciiFalse)}\n\n ).encode(utf-8) except Exception as e: import traceback traceback.print_exc() yield ( fdata: {json.dumps({error: str(e)}, ensure_asciiFalse)}\n\n ).encode(utf-8) resp Response( stream_with_context(generate()), mimetypetext/event-stream, ) resp.headers[Cache-Control] no-cache resp.headers[X-Accel-Buffering] no resp.headers[Connection] keep-alive return respSSE 实现的几个关键踩坑点问题原因解决方案AssertionError: applications must write bytesWerkzeug 要求生成器 yield bytes 而非 str所有yield后加.encode(utf-8)ERR_INCOMPLETE_CHUNKED_ENCODING响应流被提前截断数据库操作放在generate()内部错误通过 SSE 传回前端前端收不到数据direct_passthrough与stream_with_context冲突去掉direct_passthrough使用标准的Response3.4.4 启动服务if __name__ __main__: app.run(debugTrue, port5000, use_reloaderFalse)注意use_reloaderFalse很重要。Flask 的 reloader 会在独立进程中重启应用导致 SSE 流式连接中断。四、前端实现4.1 安装依赖npm install antd ant-design/x^1.0.5 react-markdown zustand react-router-dom sass注意ant-design/x要安装 v1.x 版本v2.x 需要 antd v6。4.2 状态管理chatStore.js使用 Zustand 管理全局聊天状态搭配persist中间件持久化关键配置import { create } from zustand; import { persist } from zustand/middleware; export const useChatStore create( persist( (set, get) ({ conversations: [], // 对话列表 currentConversationId: null, // 当前对话 ID messages: [], // 当前对话的消息 isStreaming: false, // 是否正在流式输出 streamingContent: , // 流式输出中的内容 useRag: false, // RAG 模式开关 sidebarCollapsed: false, // 侧边栏折叠状态 // 切换对话 selectConversation: (id) set({ currentConversationId: id, messages: [], streamingContent: , isStreaming: false, }), // 流式输出控制 startStreaming: () set({ isStreaming: true, streamingContent: }), appendStreamContent: (token) set((state) ({ streamingContent: state.streamingContent token, })), finishStreaming: () set((state) ({ isStreaming: false, streamingContent: , messages: [...state.messages, { id: Date.now(), role: assistant, content: state.streamingContent, createdAt: new Date().toISOString(), }], })), cancelStreaming: () set((state) { const partial state.streamingContent; return partial ? { isStreaming: false, streamingContent: , messages: [...state.messages, { id: Date.now(), role: assistant, content: partial, createdAt: new Date().toISOString() }] } : { isStreaming: false, streamingContent: }; }), toggleSidebar: () set((s) ({ sidebarCollapsed: !s.sidebarCollapsed })), }), { name: chat-storage, // 只持久化配置项不持久化消息消息从后端数据库加载 partialize: (state) ({ currentConversationId: state.currentConversationId, useRag: state.useRag, }), } ) );4.3 API 服务层chatService.js封装了所有与后端交互的逻辑包括 SSE 流式请求const BASE_URL process.env.REACT_APP_CHAT_API_URL || http://localhost:5000; export class ChatService { // REST 接口封装 async getConversations() { /* GET /api/conversations */ } async getMessages(id) { /* GET /api/conversations/:id/messages */ } async createConversation(useRag false) { /* POST /api/conversations */ } async deleteConversation(id) { /* DELETE /api/conversations/:id */ } async renameConversation(id, title) { /* PUT /api/conversations/:id/rename */ } async togglePin(id) { /* PUT /api/conversations/:id/pin */ } /** * SSE 流式聊天 —— 核心方法 * 使用 fetch ReadableStream 手动解析 SSE 数据 */ streamChat(message, sessionId, useRag, callbacks) { const { onToken, onDone, onError } callbacks; const controller new AbortController(); fetch(${BASE_URL}/api/chat/stream, { method: POST, headers: { Content-Type: application/json }, body: JSON.stringify({ message, session_id: sessionId || null, use_rag: useRag, }), signal: controller.signal, }) .then(async (response) { if (!response.ok) throw new Error(HTTP ${response.status}); const reader response.body.getReader(); const decoder new TextDecoder(); let buffer ; while (true) { const { done, value } await reader.read(); if (done) break; buffer decoder.decode(value, { stream: true }); // 按 SSE 双换行分割数据帧 const parts buffer.split(\n\n); buffer parts.pop(); // 保留不完整的尾部数据 for (const part of parts) { for (const line of part.split(\n)) { if (!line.startsWith(data: )) continue; const raw line.slice(6); try { const parsed JSON.parse(raw); if (parsed.error) { onError(new Error(parsed.error)); return; } if (parsed.token) onToken(parsed.token); if (parsed.done) { onDone(parsed.session_id); return; } } catch { /* 忽略解析失败的行 */ } } } } onDone(); }) .catch((err) { if (err.name ! AbortError) onError(err); }); return controller; // 返回控制器可用于中断请求 } } export const chatService new ChatService();为什么不使用EventSourceEventSource只支持 GET 请求而我们的流式接口是 POST所以使用fetchReadableStream手动解析 SSE 格式。这也是业界通用做法。4.4 聊天页面组件4.4.1 主页面Chat.jsximport React, { useEffect } from react; import { useChatStore } from ../../store/chatStore; import { chatService } from ../../services/chatService; import ChatSidebar from ./components/ChatSidebar; import ChatMessages from ./components/ChatMessages; import ChatInput from ./components/ChatInput; import styles from ./Chat.module.scss; export default function Chat() { const { currentConversationId, sidebarCollapsed, setMessages, toggleSidebar } useChatStore(); // 切换对话时从数据库加载历史消息 useEffect(() { if (!currentConversationId) { setMessages([]); return; } chatService.getMessages(currentConversationId) .then((msgs) setMessages(msgs.map((m) ({ id: m.id, role: m.role, content: m.content, createdAt: m.createdAt, })))) .catch(() setMessages([])); }, [currentConversationId]); return ( div className{styles.chatPage} {!sidebarCollapsed ChatSidebar /} {sidebarCollapsed ( div className{styles.collapsedBar} onClick{toggleSidebar}展开侧栏/div )} div className{styles.mainArea} ChatMessages / ChatInput / /div /div ); }4.4.2 侧边栏ChatSidebar.jsx侧边栏包含对话列表、新建对话按钮、搜索弹窗、重命名/置顶/删除菜单// 对话项组件 —— 悬停显示操作菜单 function ConversationItem({ conv, isActive, onSelect, onRefresh }) { const [renaming, setRenaming] useState(false); const menuItems { items: [ { key: rename, icon: EditOutlined /, label: 重命名 }, { key: pin, icon: conv.pinned ? PushpinFilled / : PushpinOutlined /, label: conv.pinned ? 取消置顶 : 置顶 }, { type: divider }, { key: delete, icon: DeleteOutlined /, label: 删除, danger: true }, ], onClick: ({ key, domEvent }) { domEvent.stopPropagation(); // 处理各操作... }, }; return ( div className{${styles.conversationItem} ${isActive ? styles.active : }} onClick{() !renaming onSelect(conv.id)} {conv.pinned PushpinFilled style{{ color: #1677ff, fontSize: 12 }} /} {renaming ? ( Input sizesmall value{newTitle} onPressEnter{handleRename} onBlur{() setRenaming(false)} autoFocus / ) : ( span className{styles.conversationTitle}{conv.title}/span )} Dropdown menu{menuItems} trigger{[click]} EllipsisOutlined className{styles.ellipsisIcon} onClick{(e) e.stopPropagation()} / /Dropdown /div ); }对话列表按时间自动分组今天 / 昨天 / 7天内 / 30天内 / 更早置顶对话始终在最前面。4.4.3 消息展示ChatMessages.jsxexport default function ChatMessages() { const { messages, isStreaming, streamingContent } useChatStore(); // 无消息时显示欢迎页 if (messages.length 0 !isStreaming !streamingContent) { return ( div className{styles.welcome} RobotOutlined className{styles.icon} / h2AI Chat 助手/h2 p点击「开启新对话」或直接输入消息开始聊天/p /div ); } return ( div className{styles.messagesArea} div className{styles.messageList} {/* 已完成的消息 */} {messages.map((msg) ( div key{msg.id} className{${styles.messageItem} ${styles[msg.role]}} Avatar icon{msg.role user ? UserOutlined / : RobotOutlined /} / div className{${styles.messageContent} ${styles[msg.role]}} {msg.role assistant ? ReactMarkdown{msg.content}/ReactMarkdown : msg.content} /div /div ))} {/* 流式输出中的消息带闪烁光标 */} {isStreaming streamingContent ( div className{${styles.messageItem} ${styles.assistant}} Avatar icon{RobotOutlined /} / div className{${styles.messageContent} ${styles.assistant}} ReactMarkdown{streamingContent}/ReactMarkdown span className{styles.streamingCursor} / /div /div )} {/* 思考中状态 */} {isStreaming !streamingContent div思考中.../div} /div /div ); }4.4.4 输入框ChatInput.jsxexport default function ChatInput() { const [inputValue, setInputValue] useState(); const abortRef useRef(null); const handleSend useCallback(() { const text inputValue.trim(); if (!text || isStreaming) return; // 更新 UI setInputValue(); addMessage({ id: Date.now(), role: user, content: text, createdAt: new Date().toISOString() }); startStreaming(); // 发起 SSE 流式请求 abortRef.current chatService.streamChat( text, currentConversationId, useRag, { onToken: (token) appendStreamContent(token), onDone: (sessionId) { finishStreaming(); if (sessionId) setCurrentConversationId(sessionId); // 刷新侧边栏对话列表 chatService.getConversations() .then(list useChatStore.getState().setConversations(list)); }, onError: (err) { message.error(err.message || 请求失败); cancelStreaming(); }, } ); }, [inputValue, isStreaming, currentConversationId, useRag]); // Enter 发送ShiftEnter 换行 const handleKeyDown (e) { if (e.key Enter !e.shiftKey) { e.preventDefault(); handleSend(); } }; return ( div className{styles.inputArea} textarea value{inputValue} onChange{handleChange} onKeyDown{handleKeyDown} placeholder输入消息Enter 发送ShiftEnter 换行... / {isStreaming ? Button danger icon{StopOutlined /} onClick{handleStop} / : Button typeprimary icon{SendOutlined /} onClick{handleSend} disabled{!inputValue.trim()} / } /div ); }4.5 路由配置聊天页面使用独立路由不嵌套在 Layout 内因为它有自己全屏的侧边栏布局const router createBrowserRouter([ { path: /, element: Layout /, children: [ { path: /, element: Home / }, { path: /count, element: Count / }, // ... 其他页面 ] }, { path: /chat, element: Chat / // 独立路由全屏布局 } ]);五、完整数据流整个系统的数据流如下用户输入消息 ↓ ChatInput.handleSend() ↓ chatService.streamChat() ──── HTTP POST ────→ Flask /api/chat/stream ↓ ↓ onToken(token) ←──── SSE data ────── llm.stream(history) 逐 token 输出 ↓ ↓ appendStreamContent() save_message() 保存到 MySQL ↓ ChatMessages 实时渲染带闪烁光标 ↓ onDone() → finishStreaming() ↓ messages 数组更新流式光标消失六、启动与测试6.1 初始化数据库6.2 启动后端cd test-project python server.py # 聊天机器人服务启动: http://127.0.0.1:50006.3 启动前端cd my-react-app npm start # 访问 http://localhost:3000/chat6.4 验证功能输入消息并发送 → AI 流式逐 token 回答点击「开启新对话」→ 创建空白对话侧边栏按时间分组展示历史对话悬停对话项 → 显示菜单重命名 / 置顶 / 删除刷新页面 → 历史消息从数据库恢复七、总结本文实现了一个完整的 AI 流式聊天系统涵盖了以下技术要点SSE 流式输出Flask LangChain 的llm.stream()实现逐 token 推送前端fetch ReadableStream手动解析 SSE 数据帧对话持久化MySQL 存储对话和消息支持多会话管理、历史恢复前端状态管理Zustand 的persist中间件持久化用户配置消息数据从后端按需加载对话管理重命名、置顶、删除、时间分组提供完整的会话管理体验这个架构可以进一步扩展添加用户认证、接入更多 LLM 模型、支持图片/文件上传、实现 RAG 知识库检索等。希望本文能为你的 AI 应用开发提供参考。注意⚠本文已分享完整流程及主要代码完整代码可后台私信联系