AI 工作流引擎设计:从提示词编排到多步骤任务自动化

AI 工作流引擎设计:从提示词编排到多步骤任务自动化 AI 工作流引擎设计从提示词编排到多步骤任务自动化一、碎片化提示词与不可复现的 AI 调用工作流管理的缺失在 AI 应用开发中一个日益突出的痛点是AI 调用逻辑散落在代码各处缺乏统一的管理和编排。典型表现为提示词硬编码在业务代码中修改需要重新部署多步骤 AI 任务的中间结果没有持久化失败后只能从头开始不同开发者对同一任务写了不同的提示词版本效果差异巨大且无法对比。更严重的是当 AI 任务涉及多个步骤的串联如先提取关键信息再生成摘要最后翻译为目标语言步骤之间的数据传递和错误处理变得极其脆弱。某个步骤的输出格式稍有偏差后续步骤就会崩溃。而传统的 try-catch 只能处理异常情况无法处理模型输出了格式正确但语义错误的内容这种更隐蔽的问题。AI 工作流引擎的核心目标就是解决这些问题将 AI 调用从碎片化的代码片段升级为可编排、可观测、可复现的结构化流程。二、工作流引擎的核心模型DAG 有向无环图与步骤编排2.1 工作流的 DAG 模型AI 工作流本质上是一个有向无环图DAG每个节点代表一个处理步骤边代表数据流向。DAG 结构天然支持步骤的并行执行和依赖管理。graph TD A[输入节点: 原始文本] -- B[步骤1: 实体提取] A -- C[步骤2: 情感分析] B -- D[步骤3: 关系推理] C -- D D -- E[步骤4: 报告生成] E -- F[输出节点: 结构化报告] G[校验节点] -.-|格式校验| B G -.-|格式校验| C G -.-|语义校验| D style A fill:#e8f5e9 style F fill:#e8f5e9 style G fill:#fff3e02.2 步骤的生命周期与状态机每个工作流步骤有明确的生命周期Pending - Running - Succeeded / Failed / Skipped。状态转换必须持久化确保工作流可以从中断点恢复。stateDiagram-v2 [*] -- Pending Pending -- Running: 开始执行 Running -- Succeeded: 输出校验通过 Running -- Failed: 执行异常或校验失败 Running -- Skipped: 前置条件不满足 Failed -- Running: 重试 Failed -- Skipped: 超过最大重试次数 Succeeded -- [*] Skipped -- [*]三、生产级工作流引擎代码实现3.1 工作流定义与步骤抽象// 工作流步骤的输入输出类型约束 interface StepContext { inputs: Recordstring, unknown; outputs: Recordstring, unknown; metadata: { stepId: string; attempt: number; startTime: number; }; } // 步骤定义每个步骤必须声明输入输出和校验逻辑 interface StepDefinition { id: string; name: string; // 依赖的步骤 ID 列表所有依赖完成后才执行 dependsOn: string[]; // 输入校验在执行前验证上游数据是否符合预期 validateInput: (ctx: StepContext) Promiseboolean; // 核心执行逻辑 execute: (ctx: StepContext) PromiseRecordstring, unknown; // 输出校验在执行后验证本步骤输出是否可靠 validateOutput: (output: Recordstring, unknown) Promiseboolean; // 重试策略 retry: { maxAttempts: number; backoffMs: number; retryOn: (timeout | validation | api_error)[]; }; } // 工作流定义 interface WorkflowDefinition { id: string; name: string; version: string; steps: StepDefinition[]; // 全局超时防止工作流无限运行 timeoutMs: number; }3.2 工作流执行引擎// 步骤执行状态 interface StepState { stepId: string; status: pending | running | succeeded | failed | skipped; outputs: Recordstring, unknown; error?: string; attempts: number; startedAt?: number; completedAt?: number; } class WorkflowEngine { private stateStore: Mapstring, StepState; private globalOutputs: Recordstring, unknown; constructor() { this.stateStore new Map(); this.globalOutputs {}; } // 拓扑排序确定步骤执行顺序确保依赖关系正确 private topologicalSort(steps: StepDefinition[]): StepDefinition[] { const inDegree new Mapstring, number(); const adjacency new Mapstring, string[](); steps.forEach((step) { inDegree.set(step.id, step.dependsOn.length); step.dependsOn.forEach((dep) { if (!adjacency.has(dep)) adjacency.set(dep, []); adjacency.get(dep)!.push(step.id); }); }); const queue: string[] []; inDegree.forEach((degree, id) { if (degree 0) queue.push(id); }); const sorted: StepDefinition[] []; const stepMap new Map(steps.map((s) [s.id, s])); while (queue.length 0) { const current queue.shift()!; const step stepMap.get(current); if (step) sorted.push(step); (adjacency.get(current) || []).forEach((next) { const newDegree inDegree.get(next)! - 1; inDegree.set(next, newDegree); if (newDegree 0) queue.push(next); }); } // 检测循环依赖 if (sorted.length ! steps.length) { throw new Error(工作流存在循环依赖无法执行); } return sorted; } async execute(workflow: WorkflowDefinition, initialInputs: Recordstring, unknown): PromiseRecordstring, unknown { const sortedSteps this.topologicalSort(workflow.steps); const workflowStartTime Date.now(); // 初始化所有步骤状态 sortedSteps.forEach((step) { this.stateStore.set(step.id, { stepId: step.id, status: pending, outputs: {}, attempts: 0, }); }); for (const step of sortedSteps) { // 全局超时检查 if (Date.now() - workflowStartTime workflow.timeoutMs) { throw new Error(工作流执行超时${workflow.timeoutMs}ms); } // 检查依赖步骤是否全部成功 const depsAllSucceeded step.dependsOn.every( (depId) this.stateStore.get(depId)?.status succeeded ); if (!depsAllSucceeded) { this.updateState(step.id, { status: skipped, error: 前置步骤未成功完成 }); continue; } // 收集上游输出作为当前步骤的输入 const stepInputs: Recordstring, unknown { ...initialInputs }; step.dependsOn.forEach((depId) { const depState this.stateStore.get(depId); if (depState) Object.assign(stepInputs, depState.outputs); }); const ctx: StepContext { inputs: stepInputs, outputs: {}, metadata: { stepId: step.id, attempt: 0, startTime: Date.now(), }, }; // 执行步骤含重试逻辑 await this.executeStepWithRetry(step, ctx); } // 收集所有成功步骤的输出 this.stateStore.forEach((state) { if (state.status succeeded) { Object.assign(this.globalOutputs, state.outputs); } }); return this.globalOutputs; } private async executeStepWithRetry(step: StepDefinition, ctx: StepContext): Promisevoid { const { maxAttempts, backoffMs, retryOn } step.retry; let lastError: Error | null null; for (let attempt 1; attempt maxAttempts; attempt) { ctx.metadata.attempt attempt; try { // 输入校验在执行前拦截不合规数据 const inputValid await step.validateInput(ctx); if (!inputValid) { this.updateState(step.id, { status: failed, error: 步骤输入校验失败第${attempt}次尝试, attempts: attempt, }); if (!retryOn.includes(validation)) break; lastError new Error(输入校验失败); continue; } this.updateState(step.id, { status: running, startedAt: Date.now() }); const outputs await step.execute(ctx); // 输出校验确保模型输出符合产品预期 const outputValid await step.validateOutput(outputs); if (!outputValid) { this.updateState(step.id, { status: failed, error: 步骤输出校验失败第${attempt}次尝试, attempts: attempt, }); if (!retryOn.includes(validation)) break; lastError new Error(输出校验失败); continue; } this.updateState(step.id, { status: succeeded, outputs, completedAt: Date.now(), attempts: attempt, }); return; } catch (err) { lastError err instanceof Error ? err : new Error(String(err)); this.updateState(step.id, { status: failed, error: lastError.message, attempts: attempt, }); // 判断是否应该重试 const isRetryable this.isRetryableError(lastError, retryOn); if (!isRetryable || attempt maxAttempts) break; // 指数退避等待 await new Promise((r) setTimeout(r, backoffMs * Math.pow(2, attempt - 1))); } } // 所有重试用尽标记为跳过而非失败让后续步骤可以继续 this.updateState(step.id, { status: skipped, error: 步骤在${maxAttempts}次尝试后仍失败: ${lastError?.message}, }); } private isRetryableError( error: Error, retryOn: (timeout | validation | api_error)[] ): boolean { if (error.name AbortError retryOn.includes(timeout)) return true; if (error.message.includes(校验失败) retryOn.includes(validation)) return true; if (error.message.includes(API) retryOn.includes(api_error)) return true; return false; } private updateState(stepId: string, partial: PartialStepState): void { const current this.stateStore.get(stepId)!; this.stateStore.set(stepId, { ...current, ...partial, stepId }); } }3.3 具体工作流示例文档分析流水线const documentAnalysisWorkflow: WorkflowDefinition { id: doc-analysis-v1, name: 文档分析流水线, version: 1.0.0, timeoutMs: 120000, // 全局2分钟超时 steps: [ { id: extract-entities, name: 实体提取, dependsOn: [], validateInput: async (ctx) typeof ctx.inputs.rawText string ctx.inputs.rawText.length 0, execute: async (ctx) { const text ctx.inputs.rawText as string; const res await fetch(/api/ai/extract, { method: POST, body: JSON.stringify({ text, task: entity_extraction }), }); const data await res.json(); return { entities: data.entities }; }, validateOutput: async (output) Array.isArray(output.entities), retry: { maxAttempts: 3, backoffMs: 1000, retryOn: [api_error, timeout] }, }, { id: sentiment-analysis, name: 情感分析, dependsOn: [], validateInput: async (ctx) typeof ctx.inputs.rawText string, execute: async (ctx) { const text ctx.inputs.rawText as string; const res await fetch(/api/ai/analyze, { method: POST, body: JSON.stringify({ text, task: sentiment }), }); const data await res.json(); return { sentiment: data.sentiment, confidence: data.confidence }; }, validateOutput: async (output) typeof output.sentiment string typeof output.confidence number, retry: { maxAttempts: 2, backoffMs: 500, retryOn: [api_error] }, }, { id: generate-report, name: 报告生成, dependsOn: [extract-entities, sentiment-analysis], validateInput: async (ctx) Array.isArray(ctx.inputs.entities) typeof ctx.inputs.sentiment string, execute: async (ctx) { const res await fetch(/api/ai/generate, { method: POST, body: JSON.stringify({ entities: ctx.inputs.entities, sentiment: ctx.inputs.sentiment, template: analysis_report, }), }); const data await res.json(); return { report: data.content }; }, validateOutput: async (output) typeof output.report string output.report.length 100, retry: { maxAttempts: 2, backoffMs: 1000, retryOn: [api_error, validation] }, }, ], };四、工作流引擎的架构权衡与适用边界4.1 DAG 模型的表达力限制DAG 无法表达循环和条件分支。例如如果情感分析结果为负面则增加一轮人工审核这种条件路由需要引入条件节点或子工作流。但每增加一层抽象引擎的复杂度就指数级增长。对于简单的条件逻辑在步骤的execute方法中用 if-else 处理更直接只有当条件分支成为通用模式时才值得抽象为引擎级特性。4.2 状态持久化的性能代价将每个步骤的状态写入数据库可以保证工作流可恢复但也增加了延迟。对于执行时间在秒级以内的短工作流状态持久化的开销可能占总执行时间的 30% 以上。建议根据工作流执行时长选择策略短工作流用内存状态长工作流用数据库持久化。4.3 提示词版本管理的复杂度工作流中的提示词如果硬编码在代码中每次修改都需要重新部署。将提示词外部化如存入数据库或配置中心可以解决部署问题但引入了提示词与代码的版本同步难题。一个折中方案是将提示词模板与工作流定义一起版本化管理每次修改都生成新的工作流版本。4.4 适用场景场景推荐程度原因多步骤文档/数据处理推荐步骤间依赖明确DAG 模型天然适配内容生成流水线推荐可并行执行独立步骤提升吞吐简单单步 AI 调用不推荐引擎开销大于收益直接调用更简单需要人工审批的流程谨慎需要扩展状态机支持挂起/恢复实时对话场景不推荐工作流延迟过高不适合毫秒级响应五、总结AI 工作流引擎通过 DAG 模型将碎片化的 AI 调用编排为结构化的处理流程解决了提示词散落、步骤不可复现、错误处理脆弱三大痛点。拓扑排序保证了步骤执行顺序的正确性输入输出校验在步骤边界建立了质量屏障重试与降级策略提升了工作流的整体鲁棒性。落地路线建议第一步从两到三步的简单流水线开始如提取生成验证 DAG 编排和校验机制的有效性第二步引入状态持久化支持长时间运行的工作流中断恢复第三步将提示词模板外部化实现提示词与代码的独立迭代第四步当条件分支成为通用需求时再评估是否引入条件节点或子工作流抽象。核心原则是工作流引擎的复杂度应该与业务复杂度匹配过度设计比设计不足更危险。