Python事件溯源模式

Python事件溯源模式 Python事件溯源模式实战 ——事件溯源将应用程序状态的所有变更存储为不可变的事件序列而非直接存储当前状态[1] 事件溯源核心概念传统CRUD只保存当前状态丢失了变更历史。事件溯源保存每次变更事件· 事件(Event)已发生的不可变事实是状态的唯一来源· 聚合根(Aggregate Root)通过重放事件重建业务对象· 事件存储(Event Store)追加式日志只插入不修改· 投影(Projection)从事件流构建的读模型[2] 事件定义与基础架构事件是溯源模式的核心数据结构。from dataclasses import dataclass, fieldfrom typing import Dict, Any, List, Optionalfrom datetime import datetimeimport uuiddataclassclass Event:领域事件表示已发生的业务事实一旦创建不可修改event_id: str field(default_factorylambda: uuid.uuid4().hex)aggregate_id: str # 所属聚合IDevent_type: str # 事件类型data: Dict[str, Any] field(default_factorydict) # 事件数据version: int 1 # 版本号(乐观锁)timestamp: datetime field(default_factorydatetime.utcnow)[3] 聚合根事件重放恢复状态聚合根通过apply方法逐步应用事件重建当前状态。class OrderAggregate:订单聚合根从事件流重建订单状态def __init__(self):self.id: Optional[str] Noneself.status: str newself.items: List[dict] []self.total_amount: float 0.0self._version: int 0 # 当前版本号self._changes: List[Event] [] # 未提交的新事件def apply(self, event: Event) - None:应用事件到聚合状态事件重放的核心方法if event.event_type OrderCreated:self.id event.aggregate_idself.items list(event.data.get(items, []))self.total_amount event.data.get(total, 0.0)self.status pendingelif event.event_type OrderShipped:self.status shippedelif event.event_type OrderCancelled:self.status cancelledself._version 1def create_order(self, items: List[dict], total: float) - None:创建订单的领域行为验证并产生事件if not items:raise ValueError(订单必须包含商品)event Event(aggregate_idself.id or uuid.uuid4().hex,event_typeOrderCreated,data{items: items, total: total},)self._changes.append(event) # 先暂存到未提交列表self.apply(event) # 立即应用到当前状态def ship(self) - None:发货状态必须为pending才能发货if self.status ! pending:raise ValueError(f无法发货当前状态: {self.status})event Event(aggregate_idself.id,event_typeOrderShipped,data{},)self._changes.append(event)self.apply(event)def get_uncommitted_changes(self) - List[Event]:获取未持久化的新事件return list(self._changes)[4] 事件存储追加式日志事件存储只追加不修改支持事件溯源和审计。from collections import defaultdictclass EventStore:事件存储追加式日志事件的唯一真实来源def __init__(self):# 按聚合ID存储事件流self._streams: Dict[str, List[Event]] defaultdict(list)# 全局事件日志用于投影重建self._global: List[Event] []def append(self, aggregate_id: str, events: List[Event], expected_version: int):追加事件到聚合流乐观锁检查current_version len(self._streams[aggregate_id])if current_version ! expected_version:raise ValueError(f并发冲突: 期望版本 {expected_version}当前版本 {current_version})self._streams[aggregate_id].extend(events)self._global.extend(events)def get_events(self, aggregate_id: str) - List[Event]:获取指定聚合的事件流return list(self._streams.get(aggregate_id, []))def get_all_events(self) - List[Event]:获取所有事件按时间排序return sorted(self._global, keylambda e: e.timestamp)[5] 聚合重建函数从事件存储中读取事件流重建聚合根到最新状态。def rebuild_aggregate(event_store: EventStore, aggregate_id: str) - OrderAggregate:从事件存储重建聚合根aggregate OrderAggregate()events event_store.get_events(aggregate_id)for event in events:aggregate.apply(event) # 重放每个事件return aggregate # 返回最新状态[6] 快照机制性能优化当事件流过长时每次重建都重放所有事件会导致性能下降。class SnapshotStore:快照存储保存聚合的某个时间点的状态快照def __init__(self):self._snapshots: Dict[str, tuple] {} # aggregate_id - (version, state)def save_snapshot(self, aggregate_id: str, version: int, state: dict):保存快照记录某版本时的完整状态self._snapshots[aggregate_id] (version, state)def get_snapshot(self, aggregate_id: str) - Optional[tuple]:获取快照(版本号, 状态)return self._snapshots.get(aggregate_id)def rebuild_with_snapshot(event_store: EventStore, snap_store: SnapshotStore,aggregate_id: str) - OrderAggregate:带快照的聚合重建从快照版本开始只重放快照之后的事件aggregate OrderAggregate()snapshot snap_store.get_snapshot(aggregate_id)start_version 0if snapshot:# 从快照恢复状态version, state snapshotaggregate.__dict__.update(state)start_version version# 只重放快照之后的新事件events event_store.get_events(aggregate_id)for event in events[start_version:]:aggregate.apply(event)return aggregate[7] 事件升级(Upcasting)随着业务演进事件结构可能变化。升级机制兼容旧格式事件。def upcast_old_event(raw: dict) - Event:将旧格式事件升级到新版本if raw.get(event_type) OrderCreated and raw.get(version, 1) 1:# V1的data是items列表V2增加了total字段data raw[data]if total not in data:data[total] sum(item[price] * item[qty] for item in data.get(items, []))raw[version] 2return Event(**raw)[8] 投影/读模型构建投影从事件流构建查询专用的读模型。class OrderProjection:订单投影从事件流构建读模型def __init__(self):self._orders: Dict[str, dict] {}def project(self, event: Event):处理事件更新读模型if event.event_type OrderCreated:self._orders[event.aggregate_id] {id: event.aggregate_id,items: event.data[items],total: event.data[total],status: pending,}elif event.event_type OrderShipped:if event.aggregate_id in self._orders:self._orders[event.aggregate_id][status] shippeddef rebuild_from_history(self, event_store: EventStore):从全部历史事件重建读模型for event in event_store.get_all_events():self.project(event)if __name__ __main__:store EventStore()snap_store SnapshotStore()# 创建订单order OrderAggregate()order.id ORD-001order.create_order([{sku: A, qty: 1, price: 100}], 100.0)order.ship()# 持久化事件store.append(order.id, order.get_uncommitted_changes(), 0)# 快照snap_store.save_snapshot(order.id, 2, {id: order.id, status: order.status})# 重建验证restored rebuild_with_snapshot(store, snap_store, ORD-001)print(f重建状态: {restored.status})