RAG 召回质量治理:用 Go 构建可调试的切片、检索与重排链路

RAG 召回质量治理:用 Go 构建可调试的切片、检索与重排链路 RAG 召回质量治理用 Go 构建可调试的切片、检索与重排链路一、检索结果看似很多答案却总是不准RAG 落地的第一道坑很多团队做企业知识库问答时第一版 RAG 通常很快就能跑起来。文档丢进向量库用户问题转成 Embedding再取 TopK 拼进 Prompt最后让大模型回答。演示时效果还行真到业务部门试用问题就开始变得扎心。用户问“离职交接需要哪些材料”系统召回了一堆 HR 制度。看上去都有点关系答案却漏掉了最后的审批附件。用户问“合同超过 50 万怎么走审批”召回结果里有合同模板也有采购制度但关键的额度阈值藏在另一份财务流程里。更麻烦的是大模型会把这些残缺上下文缝起来回答得很像那么回事。这就是 RAG 的典型落地陷阱召回“有相关性”不等于召回“可回答”。如果只盯着向量相似度系统很容易变成一个会说话的模糊搜索框。见证奇迹的时刻不是上线当天而是业务同事拿着错误答案来问你“这玩意儿到底信不信得过”。RAG 质量治理要解决的不是单点算法问题而是一条链路问题。文档切片会影响语义完整性。Query 改写会影响召回方向。粗召回会影响候选范围。重排会影响最终证据顺序。Prompt 组装会影响模型是否忠实于上下文。任何一段偷懒最后都会反映到答案质量上。本文用 Go 设计一条可调试的 RAG 召回链路。重点放在三个生产问题上如何切片才不把知识切碎如何召回才不只看表面相似如何重排才让证据真正可用。示例代码会实现文档切片、混合召回、交叉编码器重排的接口骨架以及必要的超时、错误处理和可观测性埋点。二、从文档到证据链RAG 召回链路的底层机制RAG 的核心不是“把向量库接上大模型”。更准确地说RAG 是把用户问题映射到一组可信证据再让大模型基于证据生成答案。向量库只是其中一个组件。一条生产级链路通常会包含六个阶段。flowchart TD A[原始文档] -- B[结构化解析] B -- C[语义切片与重叠窗口] C -- D[Embedding 写入向量索引] E[用户问题] -- F[Query 规范化与改写] F -- G[向量召回 关键词召回] G -- H[候选片段去重与过滤] H -- I[重排模型计算证据优先级] I -- J[Prompt 证据组装] J -- K[LLM 生成答案] K -- L[引用与质量日志]第一阶段是结构化解析。PDF、Word、Markdown、网页和表格的结构完全不同。如果把它们都粗暴转成纯文本很容易丢掉标题层级、表格字段和章节边界。RAG 对结构很敏感。因为很多答案不是藏在某一句里而是依赖标题、列表和表格上下文。第二阶段是语义切片。切片太短会把一个完整规则拆散。切片太长会让 Embedding 表达变得浑浊。常见做法是按标题、段落和 token 长度综合切分并保留一定 overlap。Overlap 的作用不是凑字数而是避免跨段信息被截断。第三阶段是索引构建。Embedding 模型会把文本映射为高维向量。向量检索通常使用近似最近邻算法例如 HNSW。它的优势是快代价是结果不是严格精确。工程上要接受这个权衡。要想提高召回质量不能只调一个 TopK。第四阶段是 Query 处理。用户问题往往很短甚至有口语化省略。比如“这个怎么报销”没有明确对象直接向量化会很飘。Query 改写可以补全意图也可以生成多个检索子问题。但改写不能太自由否则会把用户原意带偏。第五阶段是混合召回。向量检索擅长语义相似关键词检索擅长精确术语。企业制度里经常有编号、金额、产品名和字段名。单靠向量召回很容易漏掉这些硬条件。更稳的方案是向量召回加 BM25再统一去重。第六阶段是重排。粗召回拿到的是候选集合不是最终证据。重排模型会同时看 Query 和 Chunk判断“这段内容是否真的能回答问题”。这一步通常比向量相似度更准但成本更高所以只对 TopN 候选做。这里有个很现实的原则RAG 的每一步都要能被日志还原。用户问了什么改写成了什么召回了哪些片段相似度是多少重排分是多少最终用了哪些证据。没有这些记录RAG 调优就会变成玄学。技术人不能靠拍脑袋修水管得先看哪里漏。三、用 Go 实现可观测的召回与重排骨架下面的代码不是玩具版“向量库查一下”。它把 RAG 链路拆成可替换接口并在关键路径上处理超时、错误和调试信息。生产环境可以把VectorStore接到 Milvus、pgvector、Qdrant 或 Elasticsearch dense_vector把KeywordStore接到 Elasticsearch 或 OpenSearch。package rag import ( context errors fmt sort strings time ) type Chunk struct { ID string DocID string Title string Content string Metadata map[string]string } type Candidate struct { Chunk Chunk VectorScore float64 BM25Score float64 RerankScore float64 Source string } type VectorStore interface { Search(ctx context.Context, query string, topK int) ([]Candidate, error) } type KeywordStore interface { Search(ctx context.Context, query string, topK int) ([]Candidate, error) } type Reranker interface { Score(ctx context.Context, query string, chunks []Chunk) ([]float64, error) } type Logger interface { Info(msg string, fields map[string]any) Error(msg string, fields map[string]any) } type Retriever struct { vector VectorStore keyword KeywordStore reranker Reranker logger Logger vectorTopK int keywordTopK int finalTopK int } func NewRetriever(v VectorStore, k KeywordStore, r Reranker, l Logger) *Retriever { return Retriever{ vector: v, keyword: k, reranker: r, logger: l, vectorTopK: 30, keywordTopK: 20, finalTopK: 8, } } func (r *Retriever) Retrieve(ctx context.Context, query string) ([]Candidate, error) { query normalizeQuery(query) if query { return nil, errors.New(empty query) } start : time.Now() searchCtx, cancel : context.WithTimeout(ctx, 1500*time.Millisecond) defer cancel() vectorCh : make(chan result, 1) keywordCh : make(chan result, 1) go func() { items, err : r.vector.Search(searchCtx, query, r.vectorTopK) vectorCh - result{items: items, err: err, source: vector} }() go func() { items, err : r.keyword.Search(searchCtx, query, r.keywordTopK) keywordCh - result{items: items, err: err, source: keyword} }() merged : make([]Candidate, 0, r.vectorTopKr.keywordTopK) for i : 0; i 2; i { select { case res : -vectorCh: if res.err ! nil { r.logger.Error(vector search failed, map[string]any{error: res.err.Error()}) continue } merged append(merged, markSource(res.items, res.source)...) case res : -keywordCh: if res.err ! nil { r.logger.Error(keyword search failed, map[string]any{error: res.err.Error()}) continue } merged append(merged, markSource(res.items, res.source)...) case -searchCtx.Done(): return nil, fmt.Errorf(search timeout: %w, searchCtx.Err()) } } merged dedupeCandidates(merged) if len(merged) 0 { return nil, errors.New(no retrieval candidates) } final, err : r.rerank(ctx, query, merged) if err ! nil { return nil, err } r.logger.Info(rag retrieve finished, map[string]any{ query: query, candidates: len(merged), final: len(final), latency_ms: time.Since(start).Milliseconds(), top_chunk: final[0].Chunk.ID, top_score: final[0].RerankScore, }) return final, nil } type result struct { items []Candidate err error source string } func (r *Retriever) rerank(ctx context.Context, query string, candidates []Candidate) ([]Candidate, error) { limit : 24 if len(candidates) limit { limit len(candidates) } // 粗排先融合向量分和关键词分避免把明显无关的片段送进重排模型浪费成本。 sort.Slice(candidates, func(i, j int) bool { return hybridScore(candidates[i]) hybridScore(candidates[j]) }) shortlist : candidates[:limit] chunks : make([]Chunk, 0, len(shortlist)) for _, item : range shortlist { chunks append(chunks, item.Chunk) } rerankCtx, cancel : context.WithTimeout(ctx, 2*time.Second) defer cancel() scores, err : r.reranker.Score(rerankCtx, query, chunks) if err ! nil { return nil, fmt.Errorf(rerank failed: %w, err) } if len(scores) ! len(shortlist) { return nil, fmt.Errorf(rerank score size mismatch, got%d want%d, len(scores), len(shortlist)) } for i : range shortlist { shortlist[i].RerankScore scores[i] } sort.Slice(shortlist, func(i, j int) bool { return shortlist[i].RerankScore shortlist[j].RerankScore }) if len(shortlist) r.finalTopK { shortlist shortlist[:r.finalTopK] } return shortlist, nil } func normalizeQuery(query string) string { query strings.TrimSpace(query) query strings.Join(strings.Fields(query), ) return query } func markSource(items []Candidate, source string) []Candidate { for i : range items { if items[i].Source { items[i].Source source } } return items } func dedupeCandidates(items []Candidate) []Candidate { seen : make(map[string]Candidate, len(items)) for _, item : range items { old, ok : seen[item.Chunk.ID] if !ok || hybridScore(item) hybridScore(old) { seen[item.Chunk.ID] item } } out : make([]Candidate, 0, len(seen)) for _, item : range seen { out append(out, item) } return out } func hybridScore(c Candidate) float64 { // 两类分数通常不在同一分布。生产环境应做分位数归一化。 return 0.65*c.VectorScore 0.35*c.BM25Score }这段代码有几个设计点值得单独说。第一向量召回和关键词召回并发执行但共享同一个检索超时。这样可以避免某个索引慢查询拖住整体请求。RAG 的上游通常是在线问答检索阶段不能无限等。第二召回失败不是立刻全链路失败。比如关键词索引临时异常向量召回还能返回候选。这里要记录错误但不要因为一个召回器失败就让用户完全不可用。反过来如果两个召回器都没有候选就应该明确返回错误而不是把空上下文交给大模型瞎编。第三重排前先做粗排截断。交叉编码器重排质量更好但成本更高。把 200 个候选全送进去是预算不嫌多的写法。更稳的方式是用融合分数截到 20 到 30 个再做精排。第四日志里必须记录候选数量、最终片段、耗时和分数。后续做离线评测时这些字段能直接还原一次召回过程。没有这层记录线上用户说“答错了”你只能打开控制台干瞪眼。切片也要单独治理。下面是一个按段落切片的简化实现。它会保留标题并使用 overlap 避免上下文断裂。func SplitDocument(docID, title, text string, maxChars, overlap int) ([]Chunk, error) { if maxChars 0 || overlap 0 || overlap maxChars { return nil, errors.New(invalid split config) } paragraphs : strings.Split(text, \n) var chunks []Chunk var buf strings.Builder index : 0 flush : func() { content : strings.TrimSpace(buf.String()) if content { return } chunks append(chunks, Chunk{ ID: fmt.Sprintf(%s_%04d, docID, index), DocID: docID, Title: title, Content: title \n content, Metadata: map[string]string{ splitter: paragraph_with_overlap, }, }) index if overlap 0 len(content) overlap { buf.Reset() buf.WriteString(content[len(content)-overlap:]) buf.WriteString(\n) } else { buf.Reset() } } for _, p : range paragraphs { p strings.TrimSpace(p) if p { continue } if buf.Len()len(p) maxChars { flush() } buf.WriteString(p) buf.WriteString(\n) } flush() if len(chunks) 0 { return nil, errors.New(document produced no chunks) } return chunks, nil }这个切片器不复杂但体现了一个原则宁可规则清楚也不要一上来就搞玄学。生产里可以继续增强比如识别 Markdown 标题、表格、编号列表和代码块。关键是每个 chunk 都要保留可追溯信息。否则答案引用无法定位用户也无法核验。四、调参、成本与质量之间的架构权衡RAG 召回质量治理最难的地方不是写代码而是做取舍。第一个取舍是 chunk 大小。chunk 越小语义越聚焦检索更容易命中细节。但上下文可能不完整模型看到的证据会断。chunk 越大上下文更完整但向量表达会变得平均容易召回一段“什么都沾点”的内容。经验上制度类文档可以从 500 到 1000 中文字符开始试技术文档可以按标题层级和代码块结构动态切。第二个取舍是 TopK。TopK 太小召回漏掉关键证据。TopK 太大重排成本和 Prompt 成本都会上升。更糟糕的是过多弱相关证据会干扰大模型让它在一堆半相关内容里自由发挥。线上不要只看命中率还要看最终答案引用的证据是否真的支持结论。第三个取舍是 Query 改写。改写能提升召回但也可能引入幻觉。比如用户问“报销上限是多少”改写器如果擅自补成“差旅报销上限”就会把问题带偏。比较稳的方案是保留原始 Query同时生成 1 到 3 个检索子问题。最终日志里必须记录改写结果方便回放。第四个取舍是重排模型。重排模型越强召回排序越准但延迟和成本越高。对于低频知识库可以用云端 reranker。对于高频客服可以用轻量本地模型或者只在粗排分数接近时触发重排。别为了理论最优把每次问答都堆成成本怪物。第五个取舍是可观测性成本。记录完整召回链路会占用存储也可能涉及敏感文本。企业知识库尤其要注意权限和脱敏。日志里可以存 chunk ID、分数、文档标题和 hash。原文内容是否落日志要看合规要求。能不裸奔就别裸奔数据安全不是出事后才想起来的装饰品。还有一个禁用场景要明确如果业务需要严格事实一致性但知识源本身没有结构化、没有版本控制、没有权限边界先别急着上 RAG。先治理知识库。RAG 只能放大知识库的可用性不能把一堆过期制度自动炼成真理。五、总结RAG 生产质量的核心不是“接一个向量库”而是把文档切片、混合召回、候选去重、重排排序和证据引用串成一条可观察、可回放、可调参的链路。落地时可以按三步推进。第一步先做好结构化切片。保留标题、段落、表格和文档来源。每个 chunk 都必须能追溯到原文位置。第二步引入混合召回。向量召回负责语义相似关键词召回负责术语和编号。两者合并后去重再用粗排控制候选规模。第三步加上重排和质量日志。重排负责把“看起来相关”的片段变成“真正能回答”的证据。质量日志负责让每次错误都能复盘而不是靠感觉改参数。RAG 系统最终拼的不是谁的 Prompt 更花哨而是谁能把证据链治理清楚。答案错了不可怕可怕的是不知道为什么错。把召回链路做成可调试系统后续无论换 Embedding、换向量库还是换重排模型都有明确抓手。这才是大模型应用落地里真正划算的 ROI。