接了个活帮客户做批量商品描述生成大概 5 万个 SKU每个生成一段 200 字的产品描述。听起来很简单但实际上线之后一路踩坑这篇把主要坑点都记下来。最初的方案有多天真我写的第一版代码大概是这样importasynciofromopenaiimportAsyncOpenAI clientAsyncOpenAI(api_keyxxx)asyncdefgenerate_description(product:dict)-str:responseawaitclient.chat.completions.create(modelgpt-4o-mini,messages[{role:user,content:f为这个产品生成描述{product}}])returnresponse.choices[0].message.contentasyncdefmain():productsload_products()# 5万条tasks[generate_description(p)forpinproducts]resultsawaitasyncio.gather(*tasks)# 一次性并发 5 万个你大概能猜到发生了什么asyncio.gather同时跑 5 万个请求rate limit 报错轰炸程序崩了还丢了进度。正确的并发控制问题一并发数控制API 都有 rate limit通常是 RPM每分钟请求数和 TPM每分钟 token 数两个维度。要在不触限的情况下尽量跑满。importasynciofromopenaiimportAsyncOpenAIfromasyncioimportSemaphoreimporttime clientAsyncOpenAI(api_keyyour_api_key,base_urlhttps://api.ofox.ai/v1)classRateLimiter: 令牌桶限流器 def__init__(self,max_requests:int,time_window:float60.0):self.max_requestsmax_requests self.time_windowtime_window self.requests[]self._lockasyncio.Lock()asyncdefacquire(self):asyncwithself._lock:nowtime.time()# 清理时间窗口外的记录self.requests[tfortinself.requestsifnow-tself.time_window]iflen(self.requests)self.max_requests:# 需要等待oldestself.requests[0]wait_timeself.time_window-(now-oldest)0.1awaitasyncio.sleep(wait_time)# 重新清理nowtime.time()self.requests[tfortinself.requestsifnow-tself.time_window]self.requests.append(now)# 并发 semaphore 控制同时在飞的请求数MAX_CONCURRENT20# 根据你的 API 限制调整semaphoreSemaphore(MAX_CONCURRENT)# 每分钟最多 500 个请求rate_limiterRateLimiter(max_requests500,time_window60.0)asyncdefgenerate_description(product:dict)-dict:asyncwithsemaphore:awaitrate_limiter.acquire()try:responseawaitclient.chat.completions.create(modelgpt-4o-mini,messages[{role:system,content:你是专业的电商文案写手生成简洁有吸引力的产品描述},{role:user,content:f产品名称{product[name]}\n规格{product[specs]}\n目标用户{product[target]}\n\n请生成一段150-200字的产品描述。}],temperature0.7,max_tokens400)return{id:product[id],description:response.choices[0].message.content,tokens_used:response.usage.total_tokens,status:success}exceptExceptionase:return{id:product[id],description:None,error:str(e),status:error}重试机制Rate limit 被触发了要重试但不能无脑重试要指数退避importrandomfromopenaiimportRateLimitError,APITimeoutError,APIConnectionErrorasyncdefgenerate_with_retry(product:dict,max_retries:int5)-dict: 带指数退避的重试 forattemptinrange(max_retries):try:resultawaitgenerate_description(product)returnresultexceptRateLimitErrorase:ifattemptmax_retries-1:raise# 指数退避 随机抖动避免雷群效应wait_time(2**attempt)random.uniform(0,1)print(fRate limit等待{wait_time:.1f}s 后重试 (attempt{attempt1}/{max_retries}))awaitasyncio.sleep(wait_time)except(APITimeoutError,APIConnectionError)ase:ifattemptmax_retries-1:raisewait_time2**attemptprint(f网络错误等待{wait_time}s 后重试)awaitasyncio.sleep(wait_time)exceptExceptionase:# 其他错误不重试直接返回失败return{id:product[id],description:None,error:str(e),status:error}断点续传5 万条数据跑到一半出错了前功尽弃是不能接受的。要做断点续传importjsonimportosfrompathlibimportPathclassProgressTracker:def__init__(self,checkpoint_file:str):self.checkpoint_filecheckpoint_file self.completedset()self.results{}# 加载已有进度ifos.path.exists(checkpoint_file):withopen(checkpoint_file,r,encodingutf-8)asf:datajson.load(f)self.completedset(data.get(completed,[]))self.resultsdata.get(results,{})print(f从断点恢复已完成{len(self.completed)}条)defmark_done(self,product_id:str,result:dict):self.completed.add(product_id)self.results[product_id]resultdefsave(self):withopen(self.checkpoint_file,w,encodingutf-8)asf:json.dump({completed:list(self.completed),results:self.results},f,ensure_asciiFalse)defis_done(self,product_id:str)-bool:returnproduct_idinself.completedasyncdefbatch_generate(products:list,checkpoint_file:strprogress.json):trackerProgressTracker(checkpoint_file)# 过滤掉已完成的pending[pforpinproductsifnottracker.is_done(p[id])]print(f待处理{len(pending)}条已完成{len(tracker.completed)}条)# 分批处理batch_size100save_interval50# 每完成 50 条保存一次进度completed_count0asyncdefprocess_one(product):nonlocalcompleted_count resultawaitgenerate_with_retry(product)tracker.mark_done(product[id],result)completed_count1ifcompleted_count%save_interval0:tracker.save()print(f进度{len(tracker.completed)}/{len(products)})returnresultforiinrange(0,len(pending),batch_size):batchpending[i:ibatch_size]tasks[process_one(p)forpinbatch]awaitasyncio.gather(*tasks)# 最后保存一次tracker.save()returntracker.results质量一致性问题这个踩得比较深。批量生成完之后我发现有些描述质量很差太短、太通用、跟竞品一模一样的感觉、有时候还有明显的语法问题。问题根源temperature 设置不当 prompt 不够具体。# 烂 promptmessages[{role:user,content:f为{product_name}生成产品描述}]# 好 promptmessages[{role:system,content:你是专业的电商文案写手。生成产品描述时 1. 开头用一句话点出核心卖点 2. 中间展开2-3个功能特点用具体数据而非形容词 3. 结尾强调使用场景或目标用户 4. 语言简洁不用独特、优质这类废话词 5. 不超过200字},{role:user,content:f产品{product[name]}类别{product[category]}核心参数{product[specs]}目标用户{product[target_audience]}竞品差异点{product[differentiator]}生成产品描述}]给的信息越具体生成的内容越有区分度越不会千篇一律。质量校验批量生成完不能直接用要做基础校验defvalidate_description(text:str,product_name:str)-tuple[bool,str]: 基础质量校验 iflen(text)80:returnFalse,描述太短iflen(text)300:returnFalse,描述太长# 检查有没有提到产品名ifproduct_name.lower()notintext.lower()andlen(product_name)4:returnFalse,未提及产品名# 检查有没有明显的语言问题简单的bad_patterns[生成失败,抱歉,对不起,无法生成,作为AI]forpatterninbad_patterns:ifpatternintext:returnFalse,f包含异常内容{pattern}returnTrue,通过校验不通过的自动重新生成一次二次不通过的标记出来人工处理。成本监控跑 5 万条不便宜要实时监控成本避免超预算classCostTracker:# 价格表美元/1M tokensPRICES{gpt-4o-mini:{input:0.15,output:0.6},gpt-4o:{input:2.5,output:10.0},gemini-2.0-flash:{input:0.1,output:0.4},}def__init__(self,model:str,budget_usd:float):self.modelmodel self.budgetbudget_usd self.total_input_tokens0self.total_output_tokens0defrecord(self,input_tokens:int,output_tokens:int):self.total_input_tokensinput_tokens self.total_output_tokensoutput_tokens current_costself.estimate_cost()ifcurrent_costself.budget*0.9:raiseException(f即将超出预算当前花费 ${current_cost:.2f}预算 ${self.budget:.2f})defestimate_cost(self)-float:priceself.PRICES.get(self.model,{input:1.0,output:3.0})input_costself.total_input_tokens/1_000_000*price[input]output_costself.total_output_tokens/1_000_000*price[output]returninput_costoutput_costdefsummary(self)-str:costself.estimate_cost()return(f总 tokens输入{self.total_input_tokens:,}输出{self.total_output_tokens:,}\nf预估成本${cost:.4f})实际跑 5 万条的结果用gpt-4o-mini并发 20每分钟约 480 请求5 万条大概跑了 1 小时 50 分钟。成本平均每条约 350 tokens输入 250 输出 1005 万条合计 1750 万 tokens大约 $3.2。质量不通过率约 2.3%这 1150 条重跑了一次通过率提到 99.1%剩下 0.9% 人工处理。整体来说比预期好主要是 prompt 认真写了之后质量稳定性提升很多。
用 AI API 做内容生成工具:踩坑实录
接了个活帮客户做批量商品描述生成大概 5 万个 SKU每个生成一段 200 字的产品描述。听起来很简单但实际上线之后一路踩坑这篇把主要坑点都记下来。最初的方案有多天真我写的第一版代码大概是这样importasynciofromopenaiimportAsyncOpenAI clientAsyncOpenAI(api_keyxxx)asyncdefgenerate_description(product:dict)-str:responseawaitclient.chat.completions.create(modelgpt-4o-mini,messages[{role:user,content:f为这个产品生成描述{product}}])returnresponse.choices[0].message.contentasyncdefmain():productsload_products()# 5万条tasks[generate_description(p)forpinproducts]resultsawaitasyncio.gather(*tasks)# 一次性并发 5 万个你大概能猜到发生了什么asyncio.gather同时跑 5 万个请求rate limit 报错轰炸程序崩了还丢了进度。正确的并发控制问题一并发数控制API 都有 rate limit通常是 RPM每分钟请求数和 TPM每分钟 token 数两个维度。要在不触限的情况下尽量跑满。importasynciofromopenaiimportAsyncOpenAIfromasyncioimportSemaphoreimporttime clientAsyncOpenAI(api_keyyour_api_key,base_urlhttps://api.ofox.ai/v1)classRateLimiter: 令牌桶限流器 def__init__(self,max_requests:int,time_window:float60.0):self.max_requestsmax_requests self.time_windowtime_window self.requests[]self._lockasyncio.Lock()asyncdefacquire(self):asyncwithself._lock:nowtime.time()# 清理时间窗口外的记录self.requests[tfortinself.requestsifnow-tself.time_window]iflen(self.requests)self.max_requests:# 需要等待oldestself.requests[0]wait_timeself.time_window-(now-oldest)0.1awaitasyncio.sleep(wait_time)# 重新清理nowtime.time()self.requests[tfortinself.requestsifnow-tself.time_window]self.requests.append(now)# 并发 semaphore 控制同时在飞的请求数MAX_CONCURRENT20# 根据你的 API 限制调整semaphoreSemaphore(MAX_CONCURRENT)# 每分钟最多 500 个请求rate_limiterRateLimiter(max_requests500,time_window60.0)asyncdefgenerate_description(product:dict)-dict:asyncwithsemaphore:awaitrate_limiter.acquire()try:responseawaitclient.chat.completions.create(modelgpt-4o-mini,messages[{role:system,content:你是专业的电商文案写手生成简洁有吸引力的产品描述},{role:user,content:f产品名称{product[name]}\n规格{product[specs]}\n目标用户{product[target]}\n\n请生成一段150-200字的产品描述。}],temperature0.7,max_tokens400)return{id:product[id],description:response.choices[0].message.content,tokens_used:response.usage.total_tokens,status:success}exceptExceptionase:return{id:product[id],description:None,error:str(e),status:error}重试机制Rate limit 被触发了要重试但不能无脑重试要指数退避importrandomfromopenaiimportRateLimitError,APITimeoutError,APIConnectionErrorasyncdefgenerate_with_retry(product:dict,max_retries:int5)-dict: 带指数退避的重试 forattemptinrange(max_retries):try:resultawaitgenerate_description(product)returnresultexceptRateLimitErrorase:ifattemptmax_retries-1:raise# 指数退避 随机抖动避免雷群效应wait_time(2**attempt)random.uniform(0,1)print(fRate limit等待{wait_time:.1f}s 后重试 (attempt{attempt1}/{max_retries}))awaitasyncio.sleep(wait_time)except(APITimeoutError,APIConnectionError)ase:ifattemptmax_retries-1:raisewait_time2**attemptprint(f网络错误等待{wait_time}s 后重试)awaitasyncio.sleep(wait_time)exceptExceptionase:# 其他错误不重试直接返回失败return{id:product[id],description:None,error:str(e),status:error}断点续传5 万条数据跑到一半出错了前功尽弃是不能接受的。要做断点续传importjsonimportosfrompathlibimportPathclassProgressTracker:def__init__(self,checkpoint_file:str):self.checkpoint_filecheckpoint_file self.completedset()self.results{}# 加载已有进度ifos.path.exists(checkpoint_file):withopen(checkpoint_file,r,encodingutf-8)asf:datajson.load(f)self.completedset(data.get(completed,[]))self.resultsdata.get(results,{})print(f从断点恢复已完成{len(self.completed)}条)defmark_done(self,product_id:str,result:dict):self.completed.add(product_id)self.results[product_id]resultdefsave(self):withopen(self.checkpoint_file,w,encodingutf-8)asf:json.dump({completed:list(self.completed),results:self.results},f,ensure_asciiFalse)defis_done(self,product_id:str)-bool:returnproduct_idinself.completedasyncdefbatch_generate(products:list,checkpoint_file:strprogress.json):trackerProgressTracker(checkpoint_file)# 过滤掉已完成的pending[pforpinproductsifnottracker.is_done(p[id])]print(f待处理{len(pending)}条已完成{len(tracker.completed)}条)# 分批处理batch_size100save_interval50# 每完成 50 条保存一次进度completed_count0asyncdefprocess_one(product):nonlocalcompleted_count resultawaitgenerate_with_retry(product)tracker.mark_done(product[id],result)completed_count1ifcompleted_count%save_interval0:tracker.save()print(f进度{len(tracker.completed)}/{len(products)})returnresultforiinrange(0,len(pending),batch_size):batchpending[i:ibatch_size]tasks[process_one(p)forpinbatch]awaitasyncio.gather(*tasks)# 最后保存一次tracker.save()returntracker.results质量一致性问题这个踩得比较深。批量生成完之后我发现有些描述质量很差太短、太通用、跟竞品一模一样的感觉、有时候还有明显的语法问题。问题根源temperature 设置不当 prompt 不够具体。# 烂 promptmessages[{role:user,content:f为{product_name}生成产品描述}]# 好 promptmessages[{role:system,content:你是专业的电商文案写手。生成产品描述时 1. 开头用一句话点出核心卖点 2. 中间展开2-3个功能特点用具体数据而非形容词 3. 结尾强调使用场景或目标用户 4. 语言简洁不用独特、优质这类废话词 5. 不超过200字},{role:user,content:f产品{product[name]}类别{product[category]}核心参数{product[specs]}目标用户{product[target_audience]}竞品差异点{product[differentiator]}生成产品描述}]给的信息越具体生成的内容越有区分度越不会千篇一律。质量校验批量生成完不能直接用要做基础校验defvalidate_description(text:str,product_name:str)-tuple[bool,str]: 基础质量校验 iflen(text)80:returnFalse,描述太短iflen(text)300:returnFalse,描述太长# 检查有没有提到产品名ifproduct_name.lower()notintext.lower()andlen(product_name)4:returnFalse,未提及产品名# 检查有没有明显的语言问题简单的bad_patterns[生成失败,抱歉,对不起,无法生成,作为AI]forpatterninbad_patterns:ifpatternintext:returnFalse,f包含异常内容{pattern}returnTrue,通过校验不通过的自动重新生成一次二次不通过的标记出来人工处理。成本监控跑 5 万条不便宜要实时监控成本避免超预算classCostTracker:# 价格表美元/1M tokensPRICES{gpt-4o-mini:{input:0.15,output:0.6},gpt-4o:{input:2.5,output:10.0},gemini-2.0-flash:{input:0.1,output:0.4},}def__init__(self,model:str,budget_usd:float):self.modelmodel self.budgetbudget_usd self.total_input_tokens0self.total_output_tokens0defrecord(self,input_tokens:int,output_tokens:int):self.total_input_tokensinput_tokens self.total_output_tokensoutput_tokens current_costself.estimate_cost()ifcurrent_costself.budget*0.9:raiseException(f即将超出预算当前花费 ${current_cost:.2f}预算 ${self.budget:.2f})defestimate_cost(self)-float:priceself.PRICES.get(self.model,{input:1.0,output:3.0})input_costself.total_input_tokens/1_000_000*price[input]output_costself.total_output_tokens/1_000_000*price[output]returninput_costoutput_costdefsummary(self)-str:costself.estimate_cost()return(f总 tokens输入{self.total_input_tokens:,}输出{self.total_output_tokens:,}\nf预估成本${cost:.4f})实际跑 5 万条的结果用gpt-4o-mini并发 20每分钟约 480 请求5 万条大概跑了 1 小时 50 分钟。成本平均每条约 350 tokens输入 250 输出 1005 万条合计 1750 万 tokens大约 $3.2。质量不通过率约 2.3%这 1150 条重跑了一次通过率提到 99.1%剩下 0.9% 人工处理。整体来说比预期好主要是 prompt 认真写了之后质量稳定性提升很多。