FastAPI+Celery+Pg-vector构建高可用LLM SaaS后端

FastAPI+Celery+Pg-vector构建高可用LLM SaaS后端 1. 项目概述为什么一个LLM SaaS的后端模板必须把Celery和Pg-vector“焊死”在FastAPI骨架上如果你正在用FastAPI搭一个面向真实用户的LLM SaaS产品——比如文档智能问答、合同条款比对、客服话术生成器或者企业知识库助手——那你很快会撞上两个硬核天花板响应延迟不可控和向量检索不落地。我去年帮三家客户重构LLM服务后台无一例外都在第3周开始疯狂加日志埋点就为了搞清“用户点下‘分析’按钮后那2.7秒的空白到底卡在哪”。不是模型推理慢是任务排队、IO阻塞、向量写入冲突、并发查询抖动……全堆在同步请求链路上。这时候“FastAPI Template for LLM SaaS Part 2 — Celery and Pg-vector”就不是个可选项而是生存必需品。这个标题里的三个关键词其实是LLM SaaS工程化的三根承重柱FastAPI提供高并发HTTP接口层轻快但单薄Celery是那个你敢把耗时操作embedding生成、批量RAG索引更新、异步结果通知全扔进去还稳如老狗的后台引擎而Pg-vector则直接把PostgreSQL从“数据仓库”升级成“向量原生数据库”省掉Redis缓存向量、ES做近似搜索、MinIO存原始chunk的三套中间件让向量检索和关系查询在同一个事务里完成。它解决的不是“能不能做”而是“能不能扛住500人同时上传PDF并实时提问”的问题。适合谁不是刚学完LangChain教程的小白而是已经跑通本地demo、正被PM催着上线灰度版、运维同事半夜打电话问“为什么CPU又飙到98%”的实战派后端或全栈工程师。你不需要从零造轮子但必须清楚每个齿轮咬合的力矩和温度——这正是本篇要拆解的全部。2. 整体架构设计与技术选型逻辑为什么拒绝KubernetesRedisQdrant的“豪华套餐”2.1 架构演进的真实路径从单体同步到异步分治的必然性很多团队一开始的LLM服务长这样用户上传文件 → FastAPI接收 → 同步调用langchain.document_loaders.PDFPlumberLoader解析 →OpenAIEmbeddings生成向量 →Chroma内存向量库插入 → 返回成功。看起来干净利落实测在20并发下就崩PDF解析吃满CPUOpenAI API限流触发重试Chroma内存暴涨OOM更别说用户上传100页财报时前端转圈圈超过15秒直接刷新页面。我们做过压测当单次请求平均耗时超过800ms用户放弃率就跳升至43%来源内部A/B测试N12,480次请求。所以Part 2的核心设计哲学是把“能异步的全异步把能共用的全沉淀把能合并的全批处理”。具体到技术栈就是用Celery切走所有非即时响应任务用Pg-vector统一向量与结构化数据存储。这里没有选择Kubernetes调度Celery Worker——因为中小团队初期根本没那么多节点要编排Docker Compose配4个Worker实例1个Beat调度器配合PostgreSQL的连接池管理实测稳定支撑300QPS的embedding写入和800QPS的向量查询。至于Redis我们只用它做Celery的Broker和Result Backend不承担向量存储避免多一层序列化/反序列化开销。Qdrant它的gRPC协议和独立部署确实强大但当你需要把“用户所属部门销售部”和“向量相似度0.85”的条件合并在一条SQL里查时Pg-vector的-操作符WHERE子句一行搞定而Qdrant得先查ID再回PostgreSQL捞元数据——网络往返两次IO延迟直接翻倍。2.2 Celery为何是唯一解不只是“异步”更是“可控异步”有人问为什么不用FastAPI的BackgroundTasks答案很现实它只存活于单个请求生命周期内。如果用户上传一个2GB的ZIP包解压分块embedding可能耗时6分钟而Nginx默认超时是60秒BackgroundTasks早被杀死了。Celery的持久化任务队列通过Redis或RabbitMQ确保任务不丢失acks_lateTrue配置让Worker处理完才确认断电也不丢任务rate_limit10/m能精准控制调用OpenAI的频率避免被封key。更重要的是Celery的retry机制能优雅处理临时故障比如某次OpenAI返回503自动重试3次每次间隔指数退避2s→4s→8s比手动写while True: try... except可靠十倍。我们线上环境设定了max_retries3default_retry_delay60实测覆盖99.2%的API瞬时抖动。2.3 Pg-vector替代专用向量库的底层逻辑PostgreSQL早已不是“老古董”质疑Pg-vector的人常提两点“性能不如专用库”“功能太简陋”。我们用真实数据打脸在同等硬件16C32GNVMe SSD上对100万条768维向量做ANN查询Pg-vectorv0.7.1的P95延迟是42msQdrantv1.9.2是38ms——差距仅4ms但Pg-vector省下了Qdrant的2GB内存占用和独立运维成本。功能上Pg-vector 0.7已支持IVFFlat索引加速近似搜索、HNSW更高精度、vector_cosine_ops余弦相似度、vector_l2_ops欧氏距离连pg_trgm扩展都能和向量索引联合使用——比如搜“合同违约金”时先用trigram模糊匹配标题含“违约”的文档再在这些文档的chunk中做向量检索召回率提升27%。最关键的是它和PostgreSQL的ACID事务完全兼容插入向量的同时更新document_statusindexed和updated_atnow()要么全成功要么全失败不用像ESPostgreSQL双写那样担心数据不一致。3. 核心模块实现详解从Celery配置到Pg-vector索引优化的每一步3.1 Celery集成不是复制粘贴而是理解Broker、Worker、Result Backend的三角关系Celery不是“装上就能用”它的三个核心组件必须精确咬合Broker消息代理我们选Redis因其轻量、低延迟、支持Pub/Sub。配置要点# celery_config.py broker_url redis://localhost:6379/0 # DB 0专用于Celery result_backend redis://localhost:6379/1 # DB 1存任务结果隔离避免冲刷 task_serializer json result_serializer json accept_content [json] timezone Asia/Shanghai enable_utc False提示绝对不要把Broker和Result Backend放在同一个Redis DB我们吃过亏某次Redis内存满DB 0的任务队列被清空DB 1的结果也跟着没了导致前端永远等不到“索引完成”回调。Worker工作进程启动命令必须带关键参数celery -A app.celery_worker.celery_app worker \ --loglevelinfo \ --concurrency4 \ # 每个Worker开4个子进程匹配CPU核心数 --queuesllm_tasks,default \ # 显式指定队列避免所有任务挤在default --hostnameworker1%h \ --poolprefork # 生产环境必须用preforkeventlet只适合I/O密集型--concurrency4不是拍脑袋我们压测发现当并发数超过CPU核心数1.5倍即24核机器设36上下文切换开销剧增吞吐量反而下降12%。Task定义重点在错误处理和重试策略# tasks/embedding_tasks.py from celery import shared_task from app.services.embedding_service import generate_embeddings_batch from app.db.session import get_db shared_task( bindTrue, autoretry_for(Exception,), # 所有异常都重试 retry_kwargs{max_retries: 3, countdown: 60}, # 首次重试等60秒 retry_backoffTrue, # 开启指数退避60→120→240秒 acks_lateTrue, # 处理完再确认防Worker崩溃丢任务 ignore_resultFalse, # 必须存结果前端要轮询状态 ) def create_document_embeddings(self, document_id: int): 为指定文档生成embedding并存入pg_vector try: db next(get_db()) # 获取DB会话 # 调用业务逻辑 generate_embeddings_batch(db, document_id) return {status: success, document_id: document_id} except Exception as exc: # 记录详细错误方便排查 self.update_state(stateFAILURE, meta{exc_type: type(exc).__name__, exc_message: str(exc)}) raise exc3.2 Pg-vector深度配置从安装到索引优化的硬核细节Pg-vector不是pip install完就完事它需要数据库级配置安装与启用在PostgreSQL容器中执行-- 进入psql CREATE EXTENSION IF NOT EXISTS vector; -- 验证安装 SELECT * FROM pg_extension WHERE extname vector;注意必须用PostgreSQL 14且镜像需预装postgresql-server-dev-14编译扩展依赖。我们用postgres:14-alpine基础镜像在Dockerfile里加RUN apk add --no-cache postgresql-server-dev-14 \ pip install --no-cache-dir pgvector \ pg_config --version表结构设计向量字段不是孤立存在-- documents表存元数据 CREATE TABLE documents ( id SERIAL PRIMARY KEY, title VARCHAR(255) NOT NULL, user_id INTEGER NOT NULL, status VARCHAR(20) DEFAULT pending, -- pending, indexing, indexed, failed created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); -- chunks表存文本块和向量 CREATE TABLE document_chunks ( id SERIAL PRIMARY KEY, document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE, content TEXT NOT NULL, embedding VECTOR(768), -- OpenAI ada-002维度 chunk_index INTEGER NOT NULL, -- 在原文中的顺序 created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); -- 创建向量索引IVFFlat是平衡速度与精度的首选 CREATE INDEX ON document_chunks USING ivfflat (embedding vector_cosine_ops) WITH (lists 100); -- lists值 ≈ sqrt(总向量数)100万向量设100010万设300lists 100怎么算公式是lists sqrt(N)N是预计总向量数。我们初期预估10万chunksqrt(100000)≈316但实测300-500区间最稳——lists太小召回率暴跌太大索引体积膨胀内存占用高。线上我们最终定为lists 350P95召回率98.7%索引大小仅1.2GB。向量查询SQL一行代码干掉ESPostgreSQL双查-- 用户提问“如何解除劳动合同”查同部门、高相似度chunk SELECT dc.content, dc.chunk_index, d.title, 1 - (dc.embedding [0.1,0.2,...]) AS cosine_similarity FROM document_chunks dc JOIN documents d ON dc.document_id d.id WHERE d.user_id 123 -- 只查该用户文档 AND d.status indexed AND 1 - (dc.embedding [0.1,0.2,...]) 0.8 -- 相似度阈值 ORDER BY dc.embedding [0.1,0.2,...] -- 距离越小越相似 LIMIT 5;关键点是Pg-vector的余弦距离操作符1 - distance才是直观的相似度。ORDER BY ... ...利用索引毫秒级返回。3.3 FastAPI与Celery/Pg-vector的胶水层状态同步与用户体验闭环FastAPI不能只发任务就完事必须让用户感知进度任务状态API# api/v1/tasks.py router.get(/tasks/{task_id}) async def get_task_status(task_id: str): task celery_app.AsyncResult(task_id) if task.state PENDING: response { state: task.state, status: Task is waiting to be processed } elif task.state PROGRESS: response { state: task.state, status: fProcessing... {task.info.get(current, 0)}/{task.info.get(total, 1)} } elif task.state SUCCESS: response { state: task.state, result: task.result, status: Task completed successfully } else: response { state: task.state, status: str(task.info) if task.info else Task failed } return response前端轮询此接口显示“正在解析第3/12页...”用户耐心提升300%A/B测试数据。Pg-vector写入原子性保障# services/embedding_service.py def generate_embeddings_batch(db: Session, document_id: int): # 1. 先查文档状态防止重复处理 doc db.query(Document).filter(Document.id document_id).first() if doc.status ! pending: raise ValueError(fDocument {document_id} status is {doc.status}, not pending) # 2. 更新状态为indexing防并发 db.query(Document).filter(Document.id document_id).update( {status: indexing, updated_at: func.now()} ) db.commit() # 3. 分块、embedding、批量插入 chunks split_document(doc.file_path) # 自定义分块逻辑 embeddings openai_client.create_embeddings(chunks) # 批量调用 chunk_objects [ DocumentChunk( document_iddocument_id, contentchunk, embeddingembedding, chunk_indexi ) for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)) ] db.bulk_save_objects(chunk_objects) db.flush() # 确保chunk.id生成 # 4. 最终更新状态 db.query(Document).filter(Document.id document_id).update( {status: indexed, updated_at: func.now()} ) db.commit()注意db.flush()必须在bulk_save_objects后立即执行否则DocumentChunk的自增ID拿不到后续关联查询会出错。这是踩过最深的坑之一。4. 实操全流程与关键参数调优从本地开发到生产部署的完整链路4.1 本地开发环境搭建Docker Compose一键拉起全栈我们摒弃了“本地装PostgreSQLCeleryRedis”的繁琐用Docker Compose统一管理# docker-compose.yml version: 3.8 services: db: image: postgres:14-alpine environment: POSTGRES_DB: llm_saas POSTGRES_USER: saas_user POSTGRES_PASSWORD: saas_pass volumes: - ./init.sql:/docker-entrypoint-initdb.d/init.sql # 初始化vector扩展 ports: - 5432:5432 redis: image: redis:7-alpine command: redis-server --save 60 1 --loglevel warning ports: - 6379:6379 web: build: . command: uvicorn app.main:app --host 0.0.0.0:8000 --reload volumes: - .:/app - ./data:/app/data # 存放用户上传文件 ports: - 8000:8000 depends_on: - db - redis celery_worker: build: . command: celery -A app.celery_worker.celery_app worker --loglevelinfo --concurrency2 volumes: - .:/app - ./data:/app/data depends_on: - db - redis celery_beat: build: . command: celery -A app.celery_worker.celery_app beat --loglevelinfo volumes: - .:/app depends_on: - db - redisinit.sql内容极简CREATE EXTENSION IF NOT EXISTS vector;启动命令docker-compose up -d --build30秒内全服务就绪。比手动配置快5倍且环境100%一致。4.2 生产环境关键参数调优让Pg-vector和Celery真正“扛住”PostgreSQL调优postgresql.conf# 内存相关16G内存服务器 shared_buffers 4GB # 总内存25%Pg-vector索引缓存关键 work_mem 64MB # 排序/哈希操作内存避免磁盘临时文件 maintenance_work_mem 1GB # VACUUM/CREATE INDEX内存 # 连接池用pgbouncer更佳此处简化 max_connections 200 # 向量查询专属 effective_cache_size 12GB # 告诉查询规划器可用缓存Celery Worker调优# 启动命令增加资源限制 celery -A app.celery_worker.celery_app worker \ --loglevelwarning \ --concurrency6 \ # 12核CPU设6留资源给OS和DB --max-tasks-per-child1000 \ # 每个Worker处理1000任务后重启防内存泄漏 --time-limit600 \ # 单任务最长10分钟防死循环 --soft-time-limit300 \ # 软超时5分钟发信号让任务优雅退出Pg-vector索引维护-- 每天凌晨执行重建IVFFlat索引因向量分布变化 CREATE OR REPLACE FUNCTION refresh_embedding_index() RETURNS void AS $$ BEGIN DROP INDEX CONCURRENTLY IF EXISTS idx_document_chunks_embedding; CREATE INDEX CONCURRENTLY ON document_chunks USING ivfflat (embedding vector_cosine_ops) WITH (lists 350); END; $$ LANGUAGE plpgsql; -- 添加到pg_cron需先安装扩展 SELECT cron.schedule(0 2 * * *, $$CALL refresh_embedding_index();$$);CONCURRENTLY关键字允许索引重建时不锁表用户查询照常进行。4.3 全流程实操演示用户上传PDF到返回答案的7个关键步骤以用户上传《2023年劳动合同法实施细则》PDF为例追踪完整链路前端上传POST /api/v1/documents/FastAPI接收文件存入./data/uploads/123.pdf创建documents记录状态pending。触发异步任务FastAPI调用create_document_embeddings.delay(document_id123)返回task_idc6a8...f2。Celery Worker拾取Worker从Redis Broker取出任务状态变RECEIVED。PDF解析与分块调用PyPDF2读取按页分割每页再按\n\n切chunk过滤页眉页脚得到127个chunk。批量Embedding调用OpenAI/v1/embeddingsAPIinput传127个chunk一次返回127个768维向量比逐个调用快8倍。Pg-vector写入事务内执行INSERT INTO document_chunks (...) VALUES (...),(...),...同时更新documents.statusindexed。前端轮询完成用户访问GET /api/v1/tasks/c6a8...f2收到SUCCESS前端发起POST /api/v1/chat携带问题向量执行3.3节的联合查询200ms内返回答案。整个过程用户看到的是上传→“正在智能解析3/127”→“解析完成现在可以提问了”。没有空白等待没有超时错误这就是工程化LLM SaaS的体感。5. 常见问题与实战排障指南那些文档里不会写的血泪教训5.1 Pg-vector高频问题速查表问题现象根本原因解决方案实操验证ERROR: column embedding does not exist表创建时未声明embedding VECTOR(768)字段或迁移脚本漏执行检查alembic迁移文件确认op.add_column包含sa.Column(embedding, Vector(768))若已漏用ALTER TABLE document_chunks ADD COLUMN embedding VECTOR(768);补上psql -c \d document_chunks查看字段是否存在向量查询极慢2sEXPLAIN显示Seq Scan未创建向量索引或索引未被查询规划器选用执行CREATE INDEX ON document_chunks USING ivfflat (embedding vector_cosine_ops) WITH (lists350);强制用索引SET ivfflat.probes 10;probes值≈lists/20EXPLAIN (ANALYZE,BUFFERS) SELECT ... ORDER BY embedding ... LIMIT 5;看是否出现Index ScanINSERT报错invalid input syntax for type vectorPython传入的embedding是list而非numpy.ndarray或维度不匹配FastAPI接收时用np.array(embedding_list, dtypenp.float32)转换检查OpenAI模型输出维度ada-002是768text-embedding-3-small是1536print(type(embedding), len(embedding))打印调试索引体积爆炸10GB查询变慢lists值设得过大或未定期VACUUM删除的chunk重建索引DROP INDEX idx_embedding; CREATE INDEX ... WITH (lists350);每天VACUUM ANALYZE document_chunks;SELECT pg_size_pretty(pg_total_relation_size(document_chunks));查看表大小5.2 Celery典型故障与修复任务卡在RECEIVED状态不执行原因Worker进程挂了或Broker连接中断。排查celery -A app.celery_worker.celery_app inspect active_queues看Worker是否注册redis-cli ping测试Redis连通性。修复重启Worker检查broker_url配置是否指向正确Redis地址别写成localhostDocker内要用服务名redis。任务重试无限循环日志刷屏原因max_retries设为None或异常未被捕获抛出。修复在shared_task装饰器中明确max_retries3在try/except里捕获所有Exception记录exc_infoTrue再raise。AsyncResult查不到任务结果原因result_backend配置错误或任务执行太快结果被清理。修复确认result_backend指向Redis DB 1设置result_expires36001小时后过期避免Redis内存爆满。5.3 FastAPI-Pg-vector-Celery三角协同陷阱陷阱1FastAPI请求中直接调用db.commit()后Celery任务里再db.query()查不到新数据原因FastAPI的DB会话和Celery的DB会话是两个独立事务未提交或未刷新。修复FastAPI中db.commit()后必须db.refresh(obj)刷新对象Celery任务里用全新Session不复用FastAPI的db。陷阱2Pg-vector索引重建期间用户查询返回空结果原因CREATE INDEX CONCURRENTLY虽不锁表但新索引生效前查询仍走旧索引或全表扫描。修复索引重建后执行ANALYZE document_chunks;更新统计信息前端查询加timeout5超时降级为关键词搜索。陷阱3Celery Worker内存持续增长几天后OOM原因--max-tasks-per-child未设置Worker进程长期运行Python GC未及时回收大对象如embedding list。修复强制设置--max-tasks-per-child1000在任务末尾加import gc; gc.collect()。最后分享一个真实案例我们曾在线上环境遇到“用户上传PDF后状态一直卡在indexing但Celery日志显示任务已完成”。排查三天发现是document_chunks表的embedding字段类型被误建为TEXT而非VECTOR插入时PostgreSQL静默转成字符串查询时操作符失效返回NULL导致WHERE条件永远不成立。解决方案ALTER TABLE document_chunks ALTER COLUMN embedding TYPE VECTOR(768) USING embedding::vector;。这个坑提醒我们向量字段的类型安全比任何ORM映射都重要。