大模型网关实战:用 Go 把超时、重试与熔断讲清楚

大模型网关实战:用 Go 把超时、重试与熔断讲清楚 大模型网关实战用 Go 把超时、重试与熔断讲清楚一、深度引言与场景痛点大模型应用上线后最容易被低估的组件不是 Prompt 模板也不是向量库而是模型调用网关。很多团队一开始会把模型调用写成几行 HTTP 请求。业务服务拿到用户输入拼好 JSON直接打到模型供应商。开发环境看起来没有问题演示环境也能跑。到了生产环境问题就开始排队上门。第一个问题是延迟不稳定。普通接口的 P99 可能是几百毫秒大模型接口的 P99 经常会飘到十几秒。第二个问题是失败形态复杂。它可能返回 429也可能返回 5xx还可能建立连接成功后迟迟没有首 token。第三个问题是成本不可控。一次无脑重试可能就多烧一次 token 预算。这时如果还把大模型调用散落在各个业务服务里排查会很痛苦。每个服务都有自己的超时配置每个开发同学都有自己的重试姿势。最后线上出了问题大家围着日志猜。见证奇迹的时刻往往不是模型变聪明而是账单先变胖。更务实的做法是在业务和模型供应商之间加一层大模型网关。它不负责“让模型更会聊天”而是负责把不稳定的外部调用变成可治理的内部能力。核心工作包括超时控制、重试策略、熔断降级、并发隔离、指标采集和日志追踪。本文用 Go 设计一个生产可用的大模型网关核心骨架。重点不是堆概念而是把工程里真正会踩坑的地方掰开讲清楚。代码会覆盖请求级超时、带抖动的指数退避、基于错误率的熔断、并发限流和基础可观测性。示例不会假装一个网关就能解决所有问题边界和代价也会单独分析。二、底层机制与原理深度剖析大模型调用本质上是长尾延迟明显的外部网络 I/O。普通 RPC 的治理经验可以复用一部分但不能照搬。普通 HTTP 接口通常关注连接超时、读写超时和状态码。大模型接口还要额外关注流式返回、首 token 延迟、上下文长度、模型排队时间和供应商限流。很多请求不是立刻失败而是慢慢拖住连接。拖住的连接多了本地 goroutine、文件描述符、连接池和内存都会被占住。一个合理的大模型网关至少要把调用链拆成五层。flowchart TD A[业务服务] -- B[请求校验与预算计算] B -- C[并发隔离与队列控制] C -- D[超时与重试调度器] D -- E[熔断器与降级策略] E -- F[模型供应商适配器] F -- G[(LLM API)] D -- H[指标与结构化日志] E -- H F -- H第一层是请求校验与预算计算。这里要检查模型名、最大 token、租户配额和调用场景。不要等请求打到供应商后才发现参数非法。那不是优雅那是把钱扔出去听个响。第二层是并发隔离。大模型调用慢慢请求会占住执行资源。如果没有并发上限突发流量会把所有 goroutine 堆满。Go 的 goroutine 很轻但不是不要钱。每个 goroutine 都有栈、调度成本和关联对象。外部 API 卡住时本地资源会跟着卡住。第三层是超时与重试调度。超时不能只写一个全局http.Client.Timeout。它要区分排队等待、连接建立、首 token 等待和整体请求期限。重试也不能对所有错误都重试。429、502、503、网络临时错误可以谨慎重试参数错误、鉴权失败、上下文超限不该重试。第四层是熔断与降级。供应商持续异常时继续重试只会扩大故障面。熔断器会在错误率超过阈值后进入打开状态短时间内快速失败避免业务线程被拖死。半开状态再放少量探测请求确认恢复后再关闭熔断。第五层是可观测性。没有指标的网关只能靠感觉运维。至少要采集请求数、错误数、重试次数、熔断状态、P95/P99 延迟、首 token 延迟和供应商维度的状态码分布。这里有一个容易忽略的底层点Go 的context.Context是超时传播的主线但它不会自动帮你取消所有资源。你必须把ctx传给 HTTP 请求、队列等待、重试 sleep 和下游处理逻辑。否则用户请求早就取消了后台 goroutine 还在替它等模型响应。写到这里就很现实了账单不会因为你忘了取消上下文而心软。三、生产级代码实现与最佳实践下面的代码实现一个精简但可落地的网关核心。为控制篇幅示例聚焦同步非流式调用。流式调用可以复用同样的超时、熔断和并发隔离思想但要额外处理首 token 超时和客户端断开。package llmgateway import ( bytes context encoding/json errors fmt io math math/rand net net/http sync time ) var ( ErrCircuitOpen errors.New(llm provider circuit is open) ErrOverloaded errors.New(llm gateway is overloaded) ) type Request struct { TenantID string json:tenant_id Model string json:model Prompt string json:prompt MaxTokens int json:max_tokens } type Response struct { Text string json:text RequestID string json:request_id } type Metrics interface { Inc(name string, labels map[string]string) Observe(name string, value float64, labels map[string]string) } type noopMetrics struct{} func (noopMetrics) Inc(string, map[string]string) {} func (noopMetrics) Observe(string, float64, map[string]string) {} type Gateway struct { endpoint string apiKey string client *http.Client limiter chan struct{} breaker *CircuitBreaker metrics Metrics maxRetries int } func NewGateway(endpoint, apiKey string, maxConcurrent int, metrics Metrics) *Gateway { if metrics nil { metrics noopMetrics{} } transport : http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (net.Dialer{ Timeout: 2 * time.Second, KeepAlive: 30 * time.Second, }).DialContext, MaxIdleConns: 200, MaxIdleConnsPerHost: 50, IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 2 * time.Second, } return Gateway{ endpoint: endpoint, apiKey: apiKey, client: http.Client{ Transport: transport, // 整体超时仍由外层 context 控制避免流式场景被固定 Timeout 误杀。 }, limiter: make(chan struct{}, maxConcurrent), breaker: NewCircuitBreaker(20, 0.5, 30*time.Second), metrics: metrics, maxRetries: 2, } } func (g *Gateway) Complete(ctx context.Context, req Request) (Response, error) { start : time.Now() labels : map[string]string{model: req.Model} g.metrics.Inc(llm_requests_total, labels) if err : validate(req); err ! nil { g.metrics.Inc(llm_rejected_total, map[string]string{reason: invalid}) return Response{}, err } if !g.breaker.Allow() { g.metrics.Inc(llm_rejected_total, map[string]string{reason: circuit_open}) return Response{}, ErrCircuitOpen } select { case g.limiter - struct{}{}: defer func() { -g.limiter }() case -ctx.Done(): return Response{}, ctx.Err() default: g.metrics.Inc(llm_rejected_total, map[string]string{reason: overloaded}) return Response{}, ErrOverloaded } var lastErr error for attempt : 0; attempt g.maxRetries; attempt { resp, err : g.callProvider(ctx, req) if err nil { g.breaker.Record(true) g.metrics.Observe(llm_latency_seconds, time.Since(start).Seconds(), labels) if attempt 0 { g.metrics.Inc(llm_retry_success_total, labels) } return resp, nil } lastErr err if !isRetryable(err) || attempt g.maxRetries { break } g.metrics.Inc(llm_retries_total, labels) if err : sleepWithBackoff(ctx, attempt); err ! nil { return Response{}, err } } g.breaker.Record(false) g.metrics.Inc(llm_errors_total, labels) g.metrics.Observe(llm_latency_seconds, time.Since(start).Seconds(), labels) return Response{}, lastErr } func (g *Gateway) callProvider(ctx context.Context, req Request) (Response, error) { body, err : json.Marshal(req) if err ! nil { return Response{}, fmt.Errorf(marshal llm request: %w, err) } callCtx, cancel : context.WithTimeout(ctx, 18*time.Second) defer cancel() httpReq, err : http.NewRequestWithContext(callCtx, http.MethodPost, g.endpoint, bytes.NewReader(body)) if err ! nil { return Response{}, fmt.Errorf(build provider request: %w, err) } httpReq.Header.Set(Authorization, Bearer g.apiKey) httpReq.Header.Set(Content-Type, application/json) httpResp, err : g.client.Do(httpReq) if err ! nil { return Response{}, fmt.Errorf(provider network error: %w, err) } defer httpResp.Body.Close() limited : io.LimitReader(httpResp.Body, 420) payload, err : io.ReadAll(limited) if err ! nil { return Response{}, fmt.Errorf(read provider response: %w, err) } if httpResp.StatusCode 500 || httpResp.StatusCode http.StatusTooManyRequests { return Response{}, retryableStatusError{code: httpResp.StatusCode, body: string(payload)} } if httpResp.StatusCode 400 { return Response{}, fmt.Errorf(provider rejected request, status%d, body%s, httpResp.StatusCode, string(payload)) } var out Response if err : json.Unmarshal(payload, out); err ! nil { return Response{}, fmt.Errorf(decode provider response: %w, err) } return out, nil } func validate(req Request) error { if req.TenantID || req.Model || req.Prompt { return errors.New(tenant_id, model and prompt are required) } if req.MaxTokens 0 || req.MaxTokens 4096 { return errors.New(max_tokens is out of allowed range) } return nil } type retryableStatusError struct { code int body string } func (e retryableStatusError) Error() string { return fmt.Sprintf(retryable provider status%d body%s, e.code, e.body) } func isRetryable(err error) bool { var statusErr retryableStatusError if errors.As(err, statusErr) { return true } var netErr net.Error if errors.As(err, netErr) { return netErr.Timeout() } return errors.Is(err, context.DeadlineExceeded) } func sleepWithBackoff(ctx context.Context, attempt int) error { base : 200 * time.Millisecond maxDelay : 2 * time.Second delay : time.Duration(float64(base) * math.Pow(2, float64(attempt))) if delay maxDelay { delay maxDelay } jitter : time.Duration(rand.Int63n(int64(delay / 2))) timer : time.NewTimer(delay/2 jitter) defer timer.Stop() select { case -timer.C: return nil case -ctx.Done(): return ctx.Err() } } type CircuitBreaker struct { mu sync.Mutex window int threshold float64 openFor time.Duration openedAt time.Time results []bool halfOpenTry bool } func NewCircuitBreaker(window int, threshold float64, openFor time.Duration) *CircuitBreaker { return CircuitBreaker{ window: window, threshold: threshold, openFor: openFor, results: make([]bool, 0, window), } } func (b *CircuitBreaker) Allow() bool { b.mu.Lock() defer b.mu.Unlock() if b.openedAt.IsZero() { return true } if time.Since(b.openedAt) b.openFor { return false } if b.halfOpenTry { return false } b.halfOpenTry true return true } func (b *CircuitBreaker) Record(success bool) { b.mu.Lock() defer b.mu.Unlock() if !b.openedAt.IsZero() { b.halfOpenTry false if success { b.openedAt time.Time{} b.results b.results[:0] return } b.openedAt time.Now() return } b.results append(b.results, success) if len(b.results) b.window { b.results b.results[1:] } if len(b.results) b.window { return } failures : 0 for _, ok : range b.results { if !ok { failures } } if float64(failures)/float64(len(b.results)) b.threshold { b.openedAt time.Now() } }这段代码里有几个关键点。第一网关没有把http.Client.Timeout写死。原因是大模型调用常见流式场景。固定 Timeout 会把长生成任务误杀。更稳妥的方式是把请求级截止时间放在context.WithTimeout中再针对流式接口单独实现首 token 超时。第二并发控制使用带缓冲 channel。它不是最高级的限流器但可读性和稳定性都不错。生产环境可以按租户、模型和供应商拆多个 limiter。这样一个大客户的异常流量不会拖垮所有人。第三重试只处理可恢复错误。不能对所有 4xx 重试。比如上下文超长、模型不存在、鉴权失败重试只是在重复犯错。重试还必须带抖动否则同一批请求会在同一时间再次打到供应商形成重试风暴。第四熔断器使用滑动窗口思想。示例里的实现偏小但状态机是完整的。关闭状态正常放行。打开状态快速失败。到达冷却时间后进入半开探测。探测成功则恢复失败则继续打开。第五响应体使用io.LimitReader。这是很多网关代码会漏掉的细节。外部服务返回异常大响应时如果无上限读取内存会被拖住。这个问题不高级但很实在。四、边界分析与架构权衡大模型网关不是银弹。它能把外部调用治理起来但也会引入新的复杂度。首先是延迟代价。网关多了一跳网络调用也多了一层序列化和指标采集。对于内部服务来说这通常是可接受的。因为大模型调用本身已经是秒级延迟多出的几毫秒不是主要矛盾。但如果是极低延迟场景比如边缘设备本地模型调用集中式网关可能反而不划算。其次是状态复杂度。熔断器、限流器和租户配额都带状态。单实例状态容易实现但多实例部署后就会出现一致性问题。比如 A 实例已经熔断B 实例还在继续放行。要不要把熔断状态放到 Redis这要看业务规模。早期不要上来就分布式熔断先用本地熔断把故障面压住。等流量足够大再考虑状态同步。第三是重试成本。大模型调用的重试不是免费午餐。一次重试可能意味着重复消耗输入 token也可能让用户等待更久。对于生成类任务重试后结果还可能不一致。因此重试次数要少重试条件要窄并且要把 retry attempt 写进日志和指标。否则线上排查时账单涨了没人知道是谁动的手。第四是降级体验。熔断后快速失败很理性但用户不关心你的架构是否优雅。生产系统要给出可接受的降级方案。比如摘要场景可以切小模型客服场景可以返回固定兜底话术代码生成场景可以提示稍后重试。降级不是为了装饰架构图而是为了让业务还能喘气。第五是供应商差异。不同模型 API 的错误码、限流策略、超时表现和流式协议都不一样。网关适配层必须隔离这些差异。业务侧不应该知道某个供应商用什么错误码表示额度不足。否则供应商一换业务代码就像拆旧水管哪里都可能漏。还有一个常见误区是把网关做成“万能平台”。今天加 Prompt 管理明天加评测后天加知识库编排。功能越堆越多核心稳定性反而没人管。大模型网关第一优先级应该是可靠调用、成本控制和可观测性。其他能力可以接入但不要喧宾夺主。适合使用集中式大模型网关的场景包括多个业务系统共用模型能力、需要统一审计和成本统计、需要按租户限流、模型供应商可能切换、调用失败会影响核心流程。禁用或谨慎使用的场景包括单一小应用低频调用、本地模型无网络瓶颈、业务强依赖毫秒级响应以及团队暂时没有维护基础设施的能力。五、总结大模型应用能不能稳定落地很多时候不取决于 Prompt 写得多漂亮而取决于外部调用有没有被工程化治理。一个生产可用的大模型网关至少要做好五件事请求预算前置校验、并发隔离、超时传播、有限重试、熔断降级。再往上才是多供应商路由、成本报表、Prompt 版本管理和评测闭环。落地路线可以按三步走。第一步先收口调用入口。把散落在业务里的模型 HTTP 请求迁移到统一 SDK 或网关。此时不要急着做复杂策略先让日志和指标统一起来。第二步加上超时、重试和并发隔离。重点观察 P95/P99 延迟、429 比例、重试成功率和请求取消率。指标稳定后再按租户和模型拆限流桶。第三步引入熔断和降级。先从供应商级熔断开始再逐步扩展到模型级、租户级。熔断事件必须进入告警系统并能关联到请求 ID、模型名和供应商。最后记住一个朴素原则大模型网关不是为了让架构图更漂亮而是为了让系统在模型变慢、供应商限流、网络抖动和预算吃紧时还能保持可解释、可控制、可恢复。其核心价值应当体现在长期的系统稳定性指标中而非临时的故障排查里。