PHP事件溯源与CQRS模式实现事件溯源是一种数据持久化方式不存储对象的当前状态而是存储所有状态变更的事件。CQRS将读操作和写操作分离到不同的模型。今天说说PHP中事件溯源和CQRS模式的实现。事件溯源的核心是事件存储。所有对数据的修改都以事件的形式追加到事件流中。phpinterface DomainEvent{public function getEventName(): string;public function getOccurredAt(): \DateTimeImmutable;public function getPayload(): array;}abstract class DomainEventBase implements DomainEvent{private \DateTimeImmutable $occurredAt;public function __construct(){$this-occurredAt new \DateTimeImmutable();}public function getOccurredAt(): \DateTimeImmutable{return $this-occurredAt;}}class UserRegistered extends DomainEventBase{public function __construct(public readonly string $userId,public readonly string $name,public readonly string $email) {}public function getEventName(): string { return user.registered; }public function getPayload(): array{return [user_id $this-userId, name $this-name, email $this-email];}}class UserEmailChanged extends DomainEventBase{public function __construct(public readonly string $userId,public readonly string $oldEmail,public readonly string $newEmail) {}public function getEventName(): string { return user.email_changed; }public function getPayload(): array{return [user_id $this-userId, old_email $this-oldEmail, new_email $this-newEmail];}}class EventStore{private PDO $pdo;public function __construct(PDO $pdo){$this-pdo $pdo;$this-initTable();}private function initTable(): void{$this-pdo-exec(CREATE TABLE IF NOT EXISTS event_store (id BIGINT AUTO_INCREMENT PRIMARY KEY,aggregate_id VARCHAR(36) NOT NULL,aggregate_type VARCHAR(100) NOT NULL,event_name VARCHAR(200) NOT NULL,payload JSON NOT NULL,version INT NOT NULL,occurred_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,INDEX idx_aggregate (aggregate_id, aggregate_type, version)) ENGINEInnoDB DEFAULT CHARSETutf8mb4);}public function append(string $aggregateId, string $aggregateType, DomainEvent $event, int $expectedVersion): void{$this-pdo-beginTransaction();try {// 检查版本冲突$stmt $this-pdo-prepare(SELECT MAX(version) FROM event_storeWHERE aggregate_id ? AND aggregate_type ?);$stmt-execute([$aggregateId, $aggregateType]);$currentVersion (int)$stmt-fetchColumn();if ($currentVersion ! $expectedVersion) {throw new \RuntimeException(并发冲突: 期望版本{$expectedVersion}当前版本{$currentVersion});}$newVersion $expectedVersion 1;$stmt $this-pdo-prepare(INSERT INTO event_store (aggregate_id, aggregate_type, event_name, payload, version)VALUES (?, ?, ?, ?, ?));$stmt-execute([$aggregateId,$aggregateType,$event-getEventName(),json_encode($event-getPayload()),$newVersion,]);$this-pdo-commit();} catch (\Exception $e) {$this-pdo-rollBack();throw $e;}}public function getEvents(string $aggregateId, string $aggregateType): array{$stmt $this-pdo-prepare(SELECT * FROM event_storeWHERE aggregate_id ? AND aggregate_type ?ORDER BY version ASC);$stmt-execute([$aggregateId, $aggregateType]);return $stmt-fetchAll();}public function getEventsSince(\DateTimeImmutable $since): array{$stmt $this-pdo-prepare(SELECT * FROM event_storeWHERE occurred_at ?ORDER BY occurred_at ASC);$stmt-execute([$since-format(Y-m-d H:i:s)]);return $stmt-fetchAll();}public function getAggregateIds(string $aggregateType): array{$stmt $this-pdo-prepare(SELECT DISTINCT aggregate_id FROM event_storeWHERE aggregate_type ?);$stmt-execute([$aggregateType]);return $stmt-fetchAll(\PDO::FETCH_COLUMN);}}class Projection{public function project(array $events): array{$state [id null,name ,email ,version 0,];foreach ($events as $event) {$payload json_decode($event[payload], true);$state[version] $event[version];switch ($event[event_name]) {case user.registered:$state[id] $payload[user_id];$state[name] $payload[name];$state[email] $payload[email];break;case user.email_changed:$state[email] $payload[new_email];break;}}return $state;}}$pdo new PDO(mysql:hostlocalhost;dbnametest, root, );$eventStore new EventStore($pdo);// 写入事件$userId user_001;$eventStore-append($userId, user, new UserRegistered($userId, 张三, zhangsantest.com), 0);$eventStore-append($userId, user, new UserEmailChanged($userId, zhangsantest.com, newemailtest.com), 1);// 投影重建$events $eventStore-getEvents($userId, user);$projection new Projection();$userState $projection-project($events);echo 当前状态: . json_encode($userState, JSON_UNESCAPED_UNICODE) . \n;?CQRS将读模型和写模型分离php// 写模型class UserCommandHandler{private EventStore $eventStore;public function __construct(EventStore $eventStore){$this-eventStore $eventStore;}public function handleRegister(string $userId, string $name, string $email): void{$event new UserRegistered($userId, $name, $email);$this-eventStore-append($userId, user, $event, 0);}public function handleChangeEmail(string $userId, string $newEmail): void{$events $this-eventStore-getEvents($userId, user);$projection new Projection();$user $projection-project($events);$lastVersion $user[version];$event new UserEmailChanged($userId, $user[email], $newEmail);$this-eventStore-append($userId, user, $event, $lastVersion);}}// 读模型class UserReadModel{private PDO $pdo;public function __construct(PDO $pdo){$this-pdo $pdo;$this-initTable();}private function initTable(): void{$this-pdo-exec(CREATE TABLE IF NOT EXISTS user_read_model (id VARCHAR(36) PRIMARY KEY,name VARCHAR(100),email VARCHAR(255),version INT DEFAULT 0,updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP) ENGINEInnoDB DEFAULT CHARSETutf8mb4);}public function updateFromEvent(array $event): void{$payload json_decode($event[payload], true);switch ($event[event_name]) {case user.registered:$stmt $this-pdo-prepare(INSERT INTO user_read_model (id, name, email, version)VALUES (?, ?, ?, ?)ON DUPLICATE KEY UPDATE name VALUES(name), email VALUES(email), version VALUES(version));$stmt-execute([$payload[user_id], $payload[name], $payload[email], $event[version]]);break;case user.email_changed:$stmt $this-pdo-prepare(UPDATE user_read_model SET email ?, version ? WHERE id ?);$stmt-execute([$payload[new_email], $event[version], $payload[user_id]]);break;}}public function find(string $userId): ?array{$stmt $this-pdo-prepare(SELECT * FROM user_read_model WHERE id ?);$stmt-execute([$userId]);$result $stmt-fetch(\PDO::FETCH_ASSOC);return $result ?: null;}}echo 事件溯源和CQRS模式已实现\n;echo 所有状态变更通过事件存储读模型从投射表获取数据\n;?事件溯源和CQRS让系统的事件历史完整可追溯读模型可以针对查询需求优化。但事件溯源也带来了最终一致性、事件版本管理等复杂性。适合需要完整审计日志或复杂事件处理的场景。简单的CRUD应用不需要事件溯源传统的关系型数据库更合适。
PHP事件溯源与CQRS模式实现
PHP事件溯源与CQRS模式实现事件溯源是一种数据持久化方式不存储对象的当前状态而是存储所有状态变更的事件。CQRS将读操作和写操作分离到不同的模型。今天说说PHP中事件溯源和CQRS模式的实现。事件溯源的核心是事件存储。所有对数据的修改都以事件的形式追加到事件流中。phpinterface DomainEvent{public function getEventName(): string;public function getOccurredAt(): \DateTimeImmutable;public function getPayload(): array;}abstract class DomainEventBase implements DomainEvent{private \DateTimeImmutable $occurredAt;public function __construct(){$this-occurredAt new \DateTimeImmutable();}public function getOccurredAt(): \DateTimeImmutable{return $this-occurredAt;}}class UserRegistered extends DomainEventBase{public function __construct(public readonly string $userId,public readonly string $name,public readonly string $email) {}public function getEventName(): string { return user.registered; }public function getPayload(): array{return [user_id $this-userId, name $this-name, email $this-email];}}class UserEmailChanged extends DomainEventBase{public function __construct(public readonly string $userId,public readonly string $oldEmail,public readonly string $newEmail) {}public function getEventName(): string { return user.email_changed; }public function getPayload(): array{return [user_id $this-userId, old_email $this-oldEmail, new_email $this-newEmail];}}class EventStore{private PDO $pdo;public function __construct(PDO $pdo){$this-pdo $pdo;$this-initTable();}private function initTable(): void{$this-pdo-exec(CREATE TABLE IF NOT EXISTS event_store (id BIGINT AUTO_INCREMENT PRIMARY KEY,aggregate_id VARCHAR(36) NOT NULL,aggregate_type VARCHAR(100) NOT NULL,event_name VARCHAR(200) NOT NULL,payload JSON NOT NULL,version INT NOT NULL,occurred_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,INDEX idx_aggregate (aggregate_id, aggregate_type, version)) ENGINEInnoDB DEFAULT CHARSETutf8mb4);}public function append(string $aggregateId, string $aggregateType, DomainEvent $event, int $expectedVersion): void{$this-pdo-beginTransaction();try {// 检查版本冲突$stmt $this-pdo-prepare(SELECT MAX(version) FROM event_storeWHERE aggregate_id ? AND aggregate_type ?);$stmt-execute([$aggregateId, $aggregateType]);$currentVersion (int)$stmt-fetchColumn();if ($currentVersion ! $expectedVersion) {throw new \RuntimeException(并发冲突: 期望版本{$expectedVersion}当前版本{$currentVersion});}$newVersion $expectedVersion 1;$stmt $this-pdo-prepare(INSERT INTO event_store (aggregate_id, aggregate_type, event_name, payload, version)VALUES (?, ?, ?, ?, ?));$stmt-execute([$aggregateId,$aggregateType,$event-getEventName(),json_encode($event-getPayload()),$newVersion,]);$this-pdo-commit();} catch (\Exception $e) {$this-pdo-rollBack();throw $e;}}public function getEvents(string $aggregateId, string $aggregateType): array{$stmt $this-pdo-prepare(SELECT * FROM event_storeWHERE aggregate_id ? AND aggregate_type ?ORDER BY version ASC);$stmt-execute([$aggregateId, $aggregateType]);return $stmt-fetchAll();}public function getEventsSince(\DateTimeImmutable $since): array{$stmt $this-pdo-prepare(SELECT * FROM event_storeWHERE occurred_at ?ORDER BY occurred_at ASC);$stmt-execute([$since-format(Y-m-d H:i:s)]);return $stmt-fetchAll();}public function getAggregateIds(string $aggregateType): array{$stmt $this-pdo-prepare(SELECT DISTINCT aggregate_id FROM event_storeWHERE aggregate_type ?);$stmt-execute([$aggregateType]);return $stmt-fetchAll(\PDO::FETCH_COLUMN);}}class Projection{public function project(array $events): array{$state [id null,name ,email ,version 0,];foreach ($events as $event) {$payload json_decode($event[payload], true);$state[version] $event[version];switch ($event[event_name]) {case user.registered:$state[id] $payload[user_id];$state[name] $payload[name];$state[email] $payload[email];break;case user.email_changed:$state[email] $payload[new_email];break;}}return $state;}}$pdo new PDO(mysql:hostlocalhost;dbnametest, root, );$eventStore new EventStore($pdo);// 写入事件$userId user_001;$eventStore-append($userId, user, new UserRegistered($userId, 张三, zhangsantest.com), 0);$eventStore-append($userId, user, new UserEmailChanged($userId, zhangsantest.com, newemailtest.com), 1);// 投影重建$events $eventStore-getEvents($userId, user);$projection new Projection();$userState $projection-project($events);echo 当前状态: . json_encode($userState, JSON_UNESCAPED_UNICODE) . \n;?CQRS将读模型和写模型分离php// 写模型class UserCommandHandler{private EventStore $eventStore;public function __construct(EventStore $eventStore){$this-eventStore $eventStore;}public function handleRegister(string $userId, string $name, string $email): void{$event new UserRegistered($userId, $name, $email);$this-eventStore-append($userId, user, $event, 0);}public function handleChangeEmail(string $userId, string $newEmail): void{$events $this-eventStore-getEvents($userId, user);$projection new Projection();$user $projection-project($events);$lastVersion $user[version];$event new UserEmailChanged($userId, $user[email], $newEmail);$this-eventStore-append($userId, user, $event, $lastVersion);}}// 读模型class UserReadModel{private PDO $pdo;public function __construct(PDO $pdo){$this-pdo $pdo;$this-initTable();}private function initTable(): void{$this-pdo-exec(CREATE TABLE IF NOT EXISTS user_read_model (id VARCHAR(36) PRIMARY KEY,name VARCHAR(100),email VARCHAR(255),version INT DEFAULT 0,updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP) ENGINEInnoDB DEFAULT CHARSETutf8mb4);}public function updateFromEvent(array $event): void{$payload json_decode($event[payload], true);switch ($event[event_name]) {case user.registered:$stmt $this-pdo-prepare(INSERT INTO user_read_model (id, name, email, version)VALUES (?, ?, ?, ?)ON DUPLICATE KEY UPDATE name VALUES(name), email VALUES(email), version VALUES(version));$stmt-execute([$payload[user_id], $payload[name], $payload[email], $event[version]]);break;case user.email_changed:$stmt $this-pdo-prepare(UPDATE user_read_model SET email ?, version ? WHERE id ?);$stmt-execute([$payload[new_email], $event[version], $payload[user_id]]);break;}}public function find(string $userId): ?array{$stmt $this-pdo-prepare(SELECT * FROM user_read_model WHERE id ?);$stmt-execute([$userId]);$result $stmt-fetch(\PDO::FETCH_ASSOC);return $result ?: null;}}echo 事件溯源和CQRS模式已实现\n;echo 所有状态变更通过事件存储读模型从投射表获取数据\n;?事件溯源和CQRS让系统的事件历史完整可追溯读模型可以针对查询需求优化。但事件溯源也带来了最终一致性、事件版本管理等复杂性。适合需要完整审计日志或复杂事件处理的场景。简单的CRUD应用不需要事件溯源传统的关系型数据库更合适。