如果你的AI应用已经有了一定规模的用户你一定被API账单惊到过。GPT-4o、Claude 3.5这些模型的能力固然强但Token费用叠加起来很快就成为显著的成本中心。成本工程不是用更便宜的模型降质量而是在保持质量的前提下智能地分配计算资源。本文分享一套系统性的成本优化方法。## 先建立成本追踪基线在优化之前必须先知道你在哪里花了钱。pythonfrom openai import AsyncOpenAIfrom dataclasses import dataclass, fieldfrom datetime import datetimeimport asynciodataclassclass LLMCallRecord: timestamp: str model: str prompt_tokens: int completion_tokens: int total_tokens: int cost_usd: float endpoint: str # 哪个功能/接口调用的 latency_ms: int # GPT-4o定价2026 PRICING { gpt-4o: {input: 0.0000025, output: 0.000010}, gpt-4o-mini: {input: 0.00000015, output: 0.0000006}, gpt-4o-mini-realtime: {input: 0.0000006, output: 0.0000024}, } classmethod def from_response(cls, response, endpoint: str, latency_ms: int) - LLMCallRecord: model response.model usage response.usage pricing cls.PRICING.get(model, {input: 0, output: 0}) cost ( usage.prompt_tokens * pricing[input] usage.completion_tokens * pricing[output] ) return cls( timestampdatetime.utcnow().isoformat(), modelmodel, prompt_tokensusage.prompt_tokens, completion_tokensusage.completion_tokens, total_tokensusage.total_tokens, cost_usdcost, endpointendpoint, latency_mslatency_ms )class CostTracker: def __init__(self, db): self.db db async def record(self, record: LLMCallRecord): await self.db.insert(llm_calls, record.__dict__) async def get_cost_breakdown(self, days: int 7) - dict: 按功能模块分析成本 rows await self.db.query(f SELECT endpoint, model, COUNT(*) as call_count, SUM(total_tokens) as total_tokens, SUM(cost_usd) as total_cost, AVG(latency_ms) as avg_latency FROM llm_calls WHERE timestamp datetime(now, -{days} days) GROUP BY endpoint, model ORDER BY total_cost DESC ) return { breakdown: rows, total_cost: sum(r[total_cost] for r in rows), top_cost_endpoints: rows[:5] }有了这个追踪你会清楚地看到哪些功能是成本大户然后有针对性地优化。## 策略一模型路由不是所有请求都需要最强的模型。根据任务复杂度自动选择合适的模型pythonclass SmartModelRouter: 根据任务复杂度自动选择模型 # 成本从低到高 MODEL_TIERS { nano: gpt-4o-mini, # 简单任务 standard: gpt-4o, # 中等任务 premium: o1, # 复杂推理 } async def route(self, task: LLMTask) - str: complexity await self._assess_complexity(task) if complexity low: return self.MODEL_TIERS[nano] elif complexity medium: return self.MODEL_TIERS[standard] else: return self.MODEL_TIERS[premium] async def _assess_complexity(self, task: LLMTask) - str: # 规则判断快速无成本 # 分类任务 → 简单 if task.type in [classification, sentiment, entity_extraction]: return low # 短文本处理 → 简单 if len(task.input) 500 and task.expected_output_length 200: return low # 包含推理要求 → 复杂 reasoning_keywords [分析, 推断, 原因, 为什么, 比较, 权衡] if any(kw in task.input for kw in reasoning_keywords): return medium # 代码生成 → 中等除非很简单 if task.type code_generation: return medium if len(task.input) 300 else high # 默认中等 return medium### 模型路由的成本对比GPT-4o-mini vs GPT-4o的成本差距约为17-20倍。如果你有60%的请求可以用mini处理总体成本可以降低50%以上。## 策略二提示词压缩更短的提示词意味着更低的输入Token成本。pythonclass PromptCompressor: def compress_context(self, messages: list[dict], target_token_budget: int) - list[dict]: 压缩对话历史到目标Token数 current_tokens self._count_tokens(messages) if current_tokens target_token_budget: return messages # 保留第一条系统消息 最近N条消息 result [] if messages[0][role] system: result.append(messages[0]) historical messages[1:] else: historical messages # 先保留最近的消息 recent [] recent_tokens 0 for msg in reversed(historical): tokens self._count_tokens([msg]) if recent_tokens tokens target_token_budget * 0.7: recent.insert(0, msg) recent_tokens tokens else: break # 中间部分压缩成摘要 if len(recent) len(historical): omitted historical[:len(historical) - len(recent)] summary self._summarize_messages(omitted) result.append({ role: system, content: f[对话摘要] {summary} }) result.extend(recent) return result def remove_redundancy(self, prompt: str) - str: 移除提示词中的冗余内容 # 移除多余的空行 prompt re.sub(r\n{3,}, \n\n, prompt) # 移除重复的说明用于测试生产环境慎用 # prompt self._deduplicate_instructions(prompt) # 压缩示例few-shot中的冗长示例 prompt self._compress_examples(prompt) return prompt.strip() def _compress_examples(self, prompt: str) - str: 压缩few-shot示例只保留关键部分 # 找到示例块并压缩具体实现依赖你的Prompt格式 return prompt # 实际实现时根据你的格式处理### 动态Few-shot选择不要在所有请求里都附上所有示例。根据当前请求的特征动态选择最相关的1-3个示例pythonclass DynamicFewShot: def __init__(self, examples: list[dict], embedder): self.examples examples self.embedder embedder # 预计算示例的向量 self.example_vectors embedder.encode([e[input] for e in examples]) def select(self, query: str, n: int 3) - list[dict]: 选择最相关的n个示例 query_vector self.embedder.encode([query])[0] # 计算相似度 similarities np.dot(self.example_vectors, query_vector) top_indices np.argsort(similarities)[-n:][::-1] return [self.examples[i] for i in top_indices]## 策略三缓存缓存是最直接的成本节省方式完全相同的请求不重复调用LLM。pythonimport hashlibimport jsonclass SemanticCache: 语义缓存相似的问题复用答案 def __init__(self, vector_store, threshold: float 0.95): self.store vector_store self.threshold threshold # 相似度阈值 async def get(self, query: str) - str | None: 检查缓存 # 计算查询向量 query_vector await self.embed(query) # 查找相似的缓存条目 results await self.store.search(query_vector, top_k1) if results and results[0].score self.threshold: # 命中缓存 return results[0].cached_response return None async def set(self, query: str, response: str, ttl: int 3600): 写入缓存 query_vector await self.embed(query) await self.store.insert({ query: query, query_vector: query_vector, response: response, created_at: datetime.utcnow().isoformat(), ttl: ttl }) async def embed(self, text: str) - list[float]: 生成文本向量用便宜的嵌入模型 response await self.openai.embeddings.create( modeltext-embedding-3-small, # 最便宜的嵌入模型 inputtext ) return response.data[0].embeddingclass ExactCache: 精确缓存完全相同的请求直接返回 def __init__(self, redis_client): self.redis redis_client def _make_key(self, messages: list[dict], model: str, **params) - str: content json.dumps({ messages: messages, model: model, **params }, sort_keysTrue) return llm: hashlib.md5(content.encode()).hexdigest() async def get(self, messages, model, **params) - dict | None: key self._make_key(messages, model, **params) cached await self.redis.get(key) if cached: return json.loads(cached) return None async def set(self, messages, model, response, ttl3600, **params): key self._make_key(messages, model, **params) await self.redis.setex(key, ttl, json.dumps(response))## 策略四批量处理如果你有大量独立的LLM任务比如批量分析文章、处理数据用批处理API可以省50%费用pythonasync def batch_classify_articles(articles: list[str]) - list[str]: 批量分类文章使用OpenAI Batch API省50%费用 import json # 1. 创建批处理请求文件 requests [] for i, article in enumerate(articles): requests.append({ custom_id: farticle_{i}, method: POST, url: /v1/chat/completions, body: { model: gpt-4o-mini, messages: [ {role: system, content: 将文章分类为技术/产品/市场/其他}, {role: user, content: article[:500]} ], max_tokens: 20 } }) # 写入JSONL文件 batch_file batch_input.jsonl with open(batch_file, w) as f: for req in requests: f.write(json.dumps(req, ensure_asciiFalse) \n) # 2. 上传并创建批处理作业 client AsyncOpenAI() with open(batch_file, rb) as f: uploaded await client.files.create(filef, purposebatch) batch await client.batches.create( input_file_iduploaded.id, endpoint/v1/chat/completions, completion_window24h ) # 3. 等待完成批处理通常在几分钟到几小时内完成 while batch.status not in [completed, failed, cancelled]: await asyncio.sleep(60) batch await client.batches.retrieve(batch.id) # 4. 获取结果 if batch.status completed: results_file await client.files.content(batch.output_file_id) results {} for line in results_file.text.split(\n): if line: result json.loads(line) results[result[custom_id]] ( result[response][body][choices][0][message][content] ) return [results.get(farticle_{i}, 未知) for i in range(len(articles))] return [分类失败] * len(articles)## 成本优化的预期收益综合以上策略一个典型AI应用的成本优化空间| 策略 | 预期节省 | 实施难度 ||------|---------|---------|| 模型路由 | 30-50% | 中 || 语义缓存 | 10-30% | 中 || 提示词压缩 | 10-20% | 低 || 批量处理 | 50%批量任务 | 低 || 精确缓存 | 5-15% | 低 |关键建议先建基线找出Top 3的成本大户针对性优化而不是全面铺开。通常20%的接口贡献了80%的成本精准打击效果最好。—本文关键词LLM成本优化、Token缓存、模型路由、批量处理、AI成本工程
AI应用成本工程:把LLM调用费用降低50%的完整指南
如果你的AI应用已经有了一定规模的用户你一定被API账单惊到过。GPT-4o、Claude 3.5这些模型的能力固然强但Token费用叠加起来很快就成为显著的成本中心。成本工程不是用更便宜的模型降质量而是在保持质量的前提下智能地分配计算资源。本文分享一套系统性的成本优化方法。## 先建立成本追踪基线在优化之前必须先知道你在哪里花了钱。pythonfrom openai import AsyncOpenAIfrom dataclasses import dataclass, fieldfrom datetime import datetimeimport asynciodataclassclass LLMCallRecord: timestamp: str model: str prompt_tokens: int completion_tokens: int total_tokens: int cost_usd: float endpoint: str # 哪个功能/接口调用的 latency_ms: int # GPT-4o定价2026 PRICING { gpt-4o: {input: 0.0000025, output: 0.000010}, gpt-4o-mini: {input: 0.00000015, output: 0.0000006}, gpt-4o-mini-realtime: {input: 0.0000006, output: 0.0000024}, } classmethod def from_response(cls, response, endpoint: str, latency_ms: int) - LLMCallRecord: model response.model usage response.usage pricing cls.PRICING.get(model, {input: 0, output: 0}) cost ( usage.prompt_tokens * pricing[input] usage.completion_tokens * pricing[output] ) return cls( timestampdatetime.utcnow().isoformat(), modelmodel, prompt_tokensusage.prompt_tokens, completion_tokensusage.completion_tokens, total_tokensusage.total_tokens, cost_usdcost, endpointendpoint, latency_mslatency_ms )class CostTracker: def __init__(self, db): self.db db async def record(self, record: LLMCallRecord): await self.db.insert(llm_calls, record.__dict__) async def get_cost_breakdown(self, days: int 7) - dict: 按功能模块分析成本 rows await self.db.query(f SELECT endpoint, model, COUNT(*) as call_count, SUM(total_tokens) as total_tokens, SUM(cost_usd) as total_cost, AVG(latency_ms) as avg_latency FROM llm_calls WHERE timestamp datetime(now, -{days} days) GROUP BY endpoint, model ORDER BY total_cost DESC ) return { breakdown: rows, total_cost: sum(r[total_cost] for r in rows), top_cost_endpoints: rows[:5] }有了这个追踪你会清楚地看到哪些功能是成本大户然后有针对性地优化。## 策略一模型路由不是所有请求都需要最强的模型。根据任务复杂度自动选择合适的模型pythonclass SmartModelRouter: 根据任务复杂度自动选择模型 # 成本从低到高 MODEL_TIERS { nano: gpt-4o-mini, # 简单任务 standard: gpt-4o, # 中等任务 premium: o1, # 复杂推理 } async def route(self, task: LLMTask) - str: complexity await self._assess_complexity(task) if complexity low: return self.MODEL_TIERS[nano] elif complexity medium: return self.MODEL_TIERS[standard] else: return self.MODEL_TIERS[premium] async def _assess_complexity(self, task: LLMTask) - str: # 规则判断快速无成本 # 分类任务 → 简单 if task.type in [classification, sentiment, entity_extraction]: return low # 短文本处理 → 简单 if len(task.input) 500 and task.expected_output_length 200: return low # 包含推理要求 → 复杂 reasoning_keywords [分析, 推断, 原因, 为什么, 比较, 权衡] if any(kw in task.input for kw in reasoning_keywords): return medium # 代码生成 → 中等除非很简单 if task.type code_generation: return medium if len(task.input) 300 else high # 默认中等 return medium### 模型路由的成本对比GPT-4o-mini vs GPT-4o的成本差距约为17-20倍。如果你有60%的请求可以用mini处理总体成本可以降低50%以上。## 策略二提示词压缩更短的提示词意味着更低的输入Token成本。pythonclass PromptCompressor: def compress_context(self, messages: list[dict], target_token_budget: int) - list[dict]: 压缩对话历史到目标Token数 current_tokens self._count_tokens(messages) if current_tokens target_token_budget: return messages # 保留第一条系统消息 最近N条消息 result [] if messages[0][role] system: result.append(messages[0]) historical messages[1:] else: historical messages # 先保留最近的消息 recent [] recent_tokens 0 for msg in reversed(historical): tokens self._count_tokens([msg]) if recent_tokens tokens target_token_budget * 0.7: recent.insert(0, msg) recent_tokens tokens else: break # 中间部分压缩成摘要 if len(recent) len(historical): omitted historical[:len(historical) - len(recent)] summary self._summarize_messages(omitted) result.append({ role: system, content: f[对话摘要] {summary} }) result.extend(recent) return result def remove_redundancy(self, prompt: str) - str: 移除提示词中的冗余内容 # 移除多余的空行 prompt re.sub(r\n{3,}, \n\n, prompt) # 移除重复的说明用于测试生产环境慎用 # prompt self._deduplicate_instructions(prompt) # 压缩示例few-shot中的冗长示例 prompt self._compress_examples(prompt) return prompt.strip() def _compress_examples(self, prompt: str) - str: 压缩few-shot示例只保留关键部分 # 找到示例块并压缩具体实现依赖你的Prompt格式 return prompt # 实际实现时根据你的格式处理### 动态Few-shot选择不要在所有请求里都附上所有示例。根据当前请求的特征动态选择最相关的1-3个示例pythonclass DynamicFewShot: def __init__(self, examples: list[dict], embedder): self.examples examples self.embedder embedder # 预计算示例的向量 self.example_vectors embedder.encode([e[input] for e in examples]) def select(self, query: str, n: int 3) - list[dict]: 选择最相关的n个示例 query_vector self.embedder.encode([query])[0] # 计算相似度 similarities np.dot(self.example_vectors, query_vector) top_indices np.argsort(similarities)[-n:][::-1] return [self.examples[i] for i in top_indices]## 策略三缓存缓存是最直接的成本节省方式完全相同的请求不重复调用LLM。pythonimport hashlibimport jsonclass SemanticCache: 语义缓存相似的问题复用答案 def __init__(self, vector_store, threshold: float 0.95): self.store vector_store self.threshold threshold # 相似度阈值 async def get(self, query: str) - str | None: 检查缓存 # 计算查询向量 query_vector await self.embed(query) # 查找相似的缓存条目 results await self.store.search(query_vector, top_k1) if results and results[0].score self.threshold: # 命中缓存 return results[0].cached_response return None async def set(self, query: str, response: str, ttl: int 3600): 写入缓存 query_vector await self.embed(query) await self.store.insert({ query: query, query_vector: query_vector, response: response, created_at: datetime.utcnow().isoformat(), ttl: ttl }) async def embed(self, text: str) - list[float]: 生成文本向量用便宜的嵌入模型 response await self.openai.embeddings.create( modeltext-embedding-3-small, # 最便宜的嵌入模型 inputtext ) return response.data[0].embeddingclass ExactCache: 精确缓存完全相同的请求直接返回 def __init__(self, redis_client): self.redis redis_client def _make_key(self, messages: list[dict], model: str, **params) - str: content json.dumps({ messages: messages, model: model, **params }, sort_keysTrue) return llm: hashlib.md5(content.encode()).hexdigest() async def get(self, messages, model, **params) - dict | None: key self._make_key(messages, model, **params) cached await self.redis.get(key) if cached: return json.loads(cached) return None async def set(self, messages, model, response, ttl3600, **params): key self._make_key(messages, model, **params) await self.redis.setex(key, ttl, json.dumps(response))## 策略四批量处理如果你有大量独立的LLM任务比如批量分析文章、处理数据用批处理API可以省50%费用pythonasync def batch_classify_articles(articles: list[str]) - list[str]: 批量分类文章使用OpenAI Batch API省50%费用 import json # 1. 创建批处理请求文件 requests [] for i, article in enumerate(articles): requests.append({ custom_id: farticle_{i}, method: POST, url: /v1/chat/completions, body: { model: gpt-4o-mini, messages: [ {role: system, content: 将文章分类为技术/产品/市场/其他}, {role: user, content: article[:500]} ], max_tokens: 20 } }) # 写入JSONL文件 batch_file batch_input.jsonl with open(batch_file, w) as f: for req in requests: f.write(json.dumps(req, ensure_asciiFalse) \n) # 2. 上传并创建批处理作业 client AsyncOpenAI() with open(batch_file, rb) as f: uploaded await client.files.create(filef, purposebatch) batch await client.batches.create( input_file_iduploaded.id, endpoint/v1/chat/completions, completion_window24h ) # 3. 等待完成批处理通常在几分钟到几小时内完成 while batch.status not in [completed, failed, cancelled]: await asyncio.sleep(60) batch await client.batches.retrieve(batch.id) # 4. 获取结果 if batch.status completed: results_file await client.files.content(batch.output_file_id) results {} for line in results_file.text.split(\n): if line: result json.loads(line) results[result[custom_id]] ( result[response][body][choices][0][message][content] ) return [results.get(farticle_{i}, 未知) for i in range(len(articles))] return [分类失败] * len(articles)## 成本优化的预期收益综合以上策略一个典型AI应用的成本优化空间| 策略 | 预期节省 | 实施难度 ||------|---------|---------|| 模型路由 | 30-50% | 中 || 语义缓存 | 10-30% | 中 || 提示词压缩 | 10-20% | 低 || 批量处理 | 50%批量任务 | 低 || 精确缓存 | 5-15% | 低 |关键建议先建基线找出Top 3的成本大户针对性优化而不是全面铺开。通常20%的接口贡献了80%的成本精准打击效果最好。—本文关键词LLM成本优化、Token缓存、模型路由、批量处理、AI成本工程