Temporal_Workflow_Engine深度实战

Temporal_Workflow_Engine深度实战 Temporal Workflow Engine深度实战:构建可靠的分布式工作流系统摘要:在微服务架构下,跨服务编排、长时运行任务和故障恢复是开发者的核心痛点。Temporal 作为新一代工作流引擎,用代码定义工作流、自动处理重试与持久化,让分布式事务变得像写本地程序一样简单。本文从原理到生产级实战,带你彻底掌握 Temporal。为什么需要工作流引擎?在传统微服务架构中,编排多个服务协作完成一个业务流程,往往需要大量胶水代码:# ❌ 传统方式:手动编排,脆弱且难以维护asyncdefprocess_order(order_id:str):try:order=awaitorder_service.get(order_id)payment=awaitpayment_service.charge(order.amount)awaitinventory_service.reserve(order.items)awaitshipping_service.schedule(order.address)awaitnotification_service.send(order.user_id,"订单已确认")exceptPaymentError:awaitinventory_service.release(order.items)raiseexceptShippingError:awaitpayment_service.refund(payment.id)awaitinventory_service.release(order.items)raise这段代码存在以下问题:无重试机制:网络抖动直接失败无持久化:进程崩溃后状态丢失补偿逻辑复杂:每一步失败都需要手动回滚无法查询状态:订单进行到哪一步了?Temporal 的出现彻底改变了这一局面。Temporal 核心概念Workflow(工作流)Workflow 是 Temporal 的核心抽象,它是一段确定性的代码,定义了业务流程的步骤:fromtemporalioimportworkflow@workflow.defnclassOrderWorkflow:@workflow.runasyncdefrun(self,order_id:str)-OrderResult:# 1. 获取订单信息order=awaitworkflow.execute_activity(get_order,order_id,start_to_close_timeout=timedelta(seconds=30),)# 2. 扣款payment=awaitworkflow.execute_activity(charge_payment,PaymentInput(order_id=order.id,amount=order.amount),start_to_close_timeout=timedelta(seconds=30),retry_policy=RetryPolicy(maximum_attempts=3,initial_interval=timedelta(seconds=1),backoff_coefficient=2.0,),)# 3. 预留库存awaitworkflow.execute_activity(reserve_inventory,order.items,start_to_close_timeout=timedelta(seconds=30),)# 4. 安排发货awaitworkflow.execute_activity(schedule_shipping,ShippingInput(order.address,order.items),start_to_close_timeout=timedelta(seconds=60),)returnOrderResult(status="completed",order_id=order_id)Activity(活动)Activity 是工作流中执行实际操作的单元——调用 API、读写数据库、发送消息等:fromtemporalioimportactivity@activity.defnasyncdefcharge_payment(input:PaymentInput)-PaymentResult:"""调用支付网关扣款"""asyncwithaiohttp.ClientSession()assession:resp=awaitsession.post("https://payment-gateway.com/charge",json={"order_id":input.order_id,"amount":input.amount},)data=awaitresp.json()returnPaymentResult(payment_id=data["id"],status=data["status"],)关键区别:Workflow vs Activity特性WorkflowActivity确定性✅ 必须确定性❌ 可以有副作用重试自动(基于Workflow重放)自动(基于RetryPolicy)持久化自动持久化状态不持久化超时Workflow Execution TimeoutStart-to-Close / Schedule-to-Close幂等性自动保证需要开发者实现实战:构建电商订单系统Step 1: 项目结构order-system/ ├── worker.py # Worker 进程 ├── workflows/ │ ├── __init__.py │ └── order_workflow.py # Workflow 定义 ├── activities/ │ ├── __init__.py │ ├── order_activities.py │ ├── payment_activities.py │ ├── inventory_activities.py │ └── shipping_activities.py ├── models/ │ ├── __init__.py │ └── schemas.py # 数据模型 ├── client.py # 启动 Workflow 的客户端 └── requirements.txtStep 2: 定义数据模型# models/schemas.pyfromdataclassesimportdataclass,fieldfromtypingimportListfromenumimportEnumclassOrderStatus(Enum):PENDING="pending"PAID="paid"INVENTORY_RESERVED="inventory_reserved"SHIPPED="shipped"COMPLETED="completed"FAILED="failed"REFUNDED="refunded"@dataclassclassOrderItem:product_id:strquantity:intprice:float@dataclassclassOrder:id:struser_id:stritems:List[OrderItem]=field(default_factory=list)total_amount:float=0.0status:str="pending"@dataclassclassPaymentResult:payment_id:strstatus:strerror:str=""@dataclassclassOrderResult:order_id:strstatus:strpayment_id:str=""tracking_number:str=""error:str=""Step 3: 实现 Activities# activities/payment_activities.pyfromtemporalioimportactivityfrommodels.schemasimportPaymentResultimportaiohttpimportasyncio@activity.defnasyncdefcharge_payment(order_id:str,amount:float,idempotency_key:str)-PaymentResult:"""扣款 —— 注意幂等性设计"""activity.logger.info(f"Charging payment for order{order_id}, amount={amount}")asyncwithaiohttp.ClientSession()assession:try:resp=awaitsession.post("https://payment-gateway.example.com/v1/charges",json={"order_id":order_id,"amount":amount,"currency":"CNY","idempotency_key":idempotency_key,},timeout=aiohttp.ClientTimeout(total=25),)ifresp.status==200:data=awaitresp.json(