本文是「设计模式实战解读」系列第四篇。系列文章统一按照定义 → 痛点场景 → 模式结构 → 核心实现 → 真实应用 → 常见变种 → 优缺点 → 避坑指南 → FAQ的结构展开每篇聚焦一个模式讲透。一句话定义观察者模式Observer定义对象间一对多的依赖关系——当一个对象状态变化时所有依赖它的对象自动收到通知并更新。也叫发布-订阅模式Publish-Subscribe。归属行为型模式。一、没有观察者时的痛点假设你在做一个流程引擎流程实例状态变更启动、完成、失败后需要做很多事publicclassFlowExecutor{publicvoidonFlowComplete(FlowInstanceinstance){// 1. 更新数据库状态flowDao.updateStatus(instance.getId(),COMPLETED);// 2. 发送通知给用户notificationService.send(instance.getUserId(),流程执行完成);// 3. 记录审计日志auditLogService.log(instance.getId(),COMPLETED,System.currentTimeMillis());// 4. 更新监控指标metricsService.increment(flow.complete.count);metricsService.recordTime(flow.execution.duration,instance.getDuration());// 5. 触发下游流程如果有if(instance.hasDownstream()){downstreamTrigger.trigger(instance.getDownstreamId());}// 6. 清理临时数据tempDataCleaner.clean(instance.getId());// 7. 发送 Webhook 回调webhookService.callback(instance.getCallbackUrl(),instance.toJson());// 还会继续增长...}}问题FlowExecutor 知道太多东西了——它直接依赖了通知、日志、监控、下游触发、Webhook 等七八个服务每加一个后置动作都要改 FlowExecutor——违反开闭原则某个后置动作失败会影响整条链路——notificationService 超时后面的全部阻塞无法灵活配置——某些流程不需要 Webhook 回调但代码里硬编码了单元测试极其困难——要 mock 七八个依赖核心诉求发布者只负责事情发生了不关心谁想知道、“知道了要干什么”。二、模式结构┌────────────────────────────┐ │ Subject (被观察者/发布者) │ ├────────────────────────────┤ │ - observers: ListObserver│ ├────────────────────────────┤ │ addObserver(observer) │ │ removeObserver(observer) │ │ notifyAll(event) │ └──────────┬─────────────────┘ │ 通知 ↓ ┌──────────────────────┐ ┌──────────────────────┐ │ Observer A (观察者) │ │ Observer B (观察者) │ ├──────────────────────┤ ├──────────────────────┤ │ onEvent(event) │ │ onEvent(event) │ └──────────────────────┘ └──────────────────────┘ 发布者和观察者之间是松耦合的—— 发布者不知道具体有哪些观察者只知道观察者实现了统一接口。三、核心实现3.1 手写基础版// 事件定义publicclassFlowEvent{privatefinalStringflowId;privatefinalStringstatus;// STARTED / COMPLETED / FAILEDprivatefinallongtimestamp;privatefinalMapString,Objectcontext;// constructor、getter 省略}// 观察者接口publicinterfaceFlowEventListener{voidonEvent(FlowEventevent);// 是否关心这个事件过滤用defaultbooleansupports(FlowEventevent){returntrue;}}// 事件发布器被观察者publicclassFlowEventPublisher{privatefinalListFlowEventListenerlistenersnewCopyOnWriteArrayList();publicvoidaddListener(FlowEventListenerlistener){listeners.add(listener);}publicvoidremoveListener(FlowEventListenerlistener){listeners.remove(listener);}publicvoidpublish(FlowEventevent){for(FlowEventListenerlistener:listeners){if(listener.supports(event)){try{listener.onEvent(event);}catch(Exceptione){// 单个观察者异常不影响其他观察者log.error(Listener {} failed,listener.getClass().getSimpleName(),e);}}}}}3.2 具体观察者实现// 观察者1更新数据库状态ComponentpublicclassFlowStatusUpdaterimplementsFlowEventListener{OverridepublicvoidonEvent(FlowEventevent){flowDao.updateStatus(event.getFlowId(),event.getStatus());}}// 观察者2发送通知ComponentpublicclassFlowNotificationListenerimplementsFlowEventListener{OverridepublicvoidonEvent(FlowEventevent){if(COMPLETED.equals(event.getStatus())||FAILED.equals(event.getStatus())){notificationService.send(event.getUserId(),buildMessage(event));}}Overridepublicbooleansupports(FlowEventevent){// 只关心完成和失败事件returnCOMPLETED.equals(event.getStatus())||FAILED.equals(event.getStatus());}}// 观察者3监控指标ComponentpublicclassFlowMetricsListenerimplementsFlowEventListener{OverridepublicvoidonEvent(FlowEventevent){metricsService.increment(flow.event.getStatus().toLowerCase().count);}}// 观察者4审计日志ComponentpublicclassFlowAuditListenerimplementsFlowEventListener{OverridepublicvoidonEvent(FlowEventevent){auditLogService.log(event.getFlowId(),event.getStatus(),event.getTimestamp());}}3.3 改造后的 FlowExecutorpublicclassFlowExecutor{privatefinalFlowEventPublishereventPublisher;publicvoidonFlowComplete(FlowInstanceinstance){// 只做一件事发布事件eventPublisher.publish(newFlowEvent(instance.getId(),COMPLETED,System.currentTimeMillis(),instance.getContext()));}}从依赖 7 个服务变成依赖 1 个 EventPublisher——发布者彻底解耦。新增后置逻辑只需加一个 Listener 实现不改 FlowExecutor。3.4 结合 Spring 的事件机制Spring 自带了观察者模式的基础设施// 事件定义publicclassFlowCompletedEventextendsApplicationEvent{privatefinalStringflowId;privatefinalStringstatus;publicFlowCompletedEvent(Objectsource,StringflowId,Stringstatus){super(source);this.flowIdflowId;this.statusstatus;}}// 发布事件一行代码ComponentpublicclassFlowExecutor{AutowiredprivateApplicationEventPublishereventPublisher;publicvoidonFlowComplete(FlowInstanceinstance){eventPublisher.publishEvent(newFlowCompletedEvent(this,instance.getId(),COMPLETED));}}// 监听事件注解驱动零配置注册ComponentpublicclassFlowMetricsListener{EventListenerpublicvoidhandle(FlowCompletedEventevent){metricsService.increment(flow.complete.count);}}ComponentpublicclassFlowNotificationListener{EventListener(condition#event.status FAILED)publicvoidhandleFailed(FlowCompletedEventevent){notificationService.send(event.getFlowId(),流程执行失败);}}// 异步监听不阻塞主流程ComponentpublicclassFlowAuditListener{AsyncEventListenerpublicvoidhandle(FlowCompletedEventevent){auditLogService.log(event.getFlowId(),event.getStatus());}}Spring 事件机制的优势不需要手动注册观察者自动发现 EventListener支持条件过滤conditionSpEL 表达式支持异步执行Async支持事务绑定TransactionalEventListener四、真实应用场景4.1 框架级应用框架/技术被观察者事件观察者SpringApplicationContextContextRefreshedEvent各种 ListenerSpring BootSpringApplicationApplicationStartedEvent自动配置模块Java SwingJButtonActionEventActionListenerDOM (前端)HTMLElementclick/inputaddEventListenerVue.js响应式数据数据变化视图更新函数RedisKey 空间过期/修改事件订阅者RocketMQTopic消息发布Consumer Group4.2 业务场景业务事件谁发布谁订阅干什么用户注册UserService发欢迎邮件、初始化配额、推送到 CRM流程完成FlowExecutor通知用户、记日志、更新指标、触发下游订单支付成功PaymentService更新订单状态、减库存、发短信、记账配置变更NacosClient刷新 Bean、清缓存、重建连接池连接器授权更新OAuthService刷新 Token 缓存、通知使用方、记审计日志节点执行失败NodeExecutor重试调度、告警通知、快照写入4.3 iPaaS 流程引擎中的事件驱动在数环通 iPaaS 引擎中流程执行状态变更是典型的事件驱动场景FlowExecutionEngine发布者 ├── 发布: FlowStartedEvent │ ├── 监控 Listener: 记录开始时间 │ └── 限流 Listener: 检查并发配额 │ ├── 发布: NodeExecutedEvent │ ├── 日志 Listener: 持久化节点执行日志 │ └── 进度 Listener: 更新流程执行进度 │ ├── 发布: FlowCompletedEvent │ ├── 通知 Listener: 回调用户 Webhook │ ├── 指标 Listener: 记录执行时长 │ └── 清理 Listener: 释放临时资源 │ └── 发布: FlowFailedEvent ├── 恢复 Listener: 写入恢复快照 ├── 告警 Listener: 发送失败通知 └── 重试 Listener: 判断是否自动重试引擎本身不关心流程完成后要做什么——它只负责把事件丢出去。所有后置逻辑通过 Listener 独立实现可以按环境按租户灵活配置。五、常见变种5.1 同步 vs 异步观察者// 同步默认publish 方法内逐个调用观察者阻塞直到全部完成publisher.publish(event);// 所有 listener 执行完才返回// 异步每个观察者在独立线程中执行publicclassAsyncEventPublisher{privatefinalExecutorServiceexecutorExecutors.newFixedThreadPool(4);publicvoidpublish(FlowEventevent){for(FlowEventListenerlistener:listeners){executor.submit(()-{try{listener.onEvent(event);}catch(Exceptione){log.error(Async listener failed,e);}});}}}选择依据同步需要保证执行顺序、需要在同一事务内、需要知道是否全部成功异步后置逻辑耗时长、不影响主流程、允许最终一致性5.2 事件总线EventBusGoogle Guava 提供的 EventBus 是观察者模式的工业级实现// 创建事件总线EventBuseventBusnewEventBus(flow-events);// 异步版AsyncEventBusasyncBusnewAsyncEventBus(executorService);// 订阅通过注解publicclassFlowMetricsSubscriber{SubscribepublicvoidonComplete(FlowCompletedEventevent){metricsService.increment(flow.complete);}}// 注册eventBus.register(newFlowMetricsSubscriber());// 发布eventBus.post(newFlowCompletedEvent(flowId));5.3 跨进程观察者消息队列进程内的观察者模式扩展到分布式——就是消息队列进程内观察者 跨进程观察者 ─────────── ─────────── EventPublisher RocketMQ Producer ↓ publish() ↓ send(topic, msg) ListListener Consumer Group ↓ onEvent() ↓ consumeMessage()从架构上看RocketMQ / Kafka / RabbitMQ 本质就是跨进程的观察者模式。区别在于进程内低延迟、强一致、无持久化跨进程高可用、可持久化、支持重试/死信5.4 响应式流Reactive Streams当事件是连续流时如实时数据推送观察者模式演变为响应式编程// Project ReactorFluxFlowEventeventStreamflowEventSource.stream();eventStream.filter(e-FAILED.equals(e.getStatus())).buffer(Duration.ofSeconds(10))// 每10秒批量处理.subscribe(batch-alertService.batchAlert(batch));六、优缺点优点缺点发布者和订阅者松耦合通知顺序不确定同步模式下按注册顺序符合开闭原则加订阅者不改发布者观察者太多时性能下降支持动态注册/注销调试困难事件链路不直观天然适配事件驱动架构可能导致内存泄漏忘了注销可以轻松切换同步/异步循环依赖难以发现A 通知 BB 又通知 A七、避坑指南坑 1观察者异常导致后续观察者不执行// 错误写法publicvoidpublish(Eventevent){for(Listenerl:listeners){l.onEvent(event);// 如果这里抛异常后面的 listener 全部跳过}}// 正确写法publicvoidpublish(Eventevent){for(Listenerl:listeners){try{l.onEvent(event);}catch(Exceptione){log.error(Listener {} error,l.getClass().getSimpleName(),e);// 继续执行下一个}}}坑 2观察者持有发布者引用导致内存泄漏匿名内部类/Lambda 会隐式持有外部类引用。如果发布者生命周期长如全局单例、观察者生命周期短如请求级对象忘了 remove 会导致内存泄漏。防御用弱引用存储观察者或者严格配对 add/remove。坑 3同步观察者阻塞主流程一个慢观察者比如发邮件耗时 3 秒会把整个 publish 调用阻塞 3 秒。防御耗时操作的观察者必须异步化Async或独立线程池。坑 4事件风暴观察者内部又发布了新事件导致递归/循环通知——最终 StackOverflow 或者 CPU 打满。// 危险观察者内部又触发了事件EventListenerpublicvoidhandle(FlowCompletedEventevent){// 处理完成后发布新事件publisher.publishEvent(newAuditEvent(...));// AuditEvent 的监听者又发布...}防御事件链路最多 2-3 层禁止循环发布。可以加深度计数器或者用异步打断循环。坑 5Spring EventListener 在事务提交前执行默认的EventListener是在事务提交前执行的。如果你在 Listener 里读数据库查刚 INSERT 的数据可能查不到。防御用TransactionalEventListener(phase AFTER_COMMIT)确保事务提交后再执行。坑 6观察者顺序依赖如果多个观察者之间有执行顺序要求A 必须在 B 之前不要依赖注册顺序——这不可靠。防御用Order注解Spring显式指定顺序或者把有序逻辑放在同一个观察者内。八、观察者模式 vs 发布-订阅模式这两个术语经常混用但有微妙区别维度观察者模式发布-订阅模式耦合度发布者知道观察者接口发布者和订阅者完全不知道对方中间层无直接通知有EventBus / Broker典型实现Java Observer、DOM EventRocketMQ、Redis PubSub、EventBus通信进程内同步可跨进程异步过滤观察者自己过滤中间层按 Topic 过滤在实际讨论中两者经常等同使用不必过于纠结概念边界。九、常见问题FAQQ观察者模式和回调有什么区别A回调是一对一调用方注册一个回调函数观察者是一对多一个事件通知多个监听者。回调适合完成通知场景观察者适合广播通知场景。Q什么时候应该用消息队列代替进程内观察者A三个信号① 观察者需要跨服务不在同一个 JVM② 观察者需要可靠重试进程内异常就丢了③ 发布速率远大于消费速率需要削峰。满足任一条就应该上 MQ。Q观察者模式会影响性能吗A同步模式下N 个观察者的总耗时 各观察者耗时之和。如果观察者多且部分耗时会显著拖慢 publish。解法① 异步化耗时观察者② 减少不必要的观察者用 supports() 过滤③ 批量通知代替逐条通知。QSpring 的 EventListener 是异步的吗A默认是同步的在 publishEvent 调用线程中执行。要异步需要额外加Async 开启EnableAsync。注意异步后事件的事务绑定和异常处理策略都需要调整。Q如何测试观察者A① 单测观察者本身传入 mock 事件验证行为② 集成测试发布→监听链路发布事件验证观察者的副作用。不要把发布者和所有观察者放在一个测试里——那就变成集成测试了。Q事件溯源Event Sourcing和观察者模式的关系A事件溯源把状态变更记录为不可变的事件序列通过重放事件恢复状态。观察者模式是事件溯源的通知层——事件产生后由观察者消费并产生副作用。两者是互补关系事件溯源管记录观察者管响应。十、小结观察者模式的核心价值把事情发生了和要对此做什么彻底解耦。三个实践要点用 Spring EventListener 而不是手写 Observer——框架帮你管理注册、支持条件过滤和异步每个观察者用 try-catch 包裹——一个失败不能影响其他耗时操作必须异步——否则慢观察者会拖垮整个事件链设计模式不是银弹但观察者模式几乎是所有事件驱动系统的基石。理解它就理解了从 DOM 事件到消息队列到响应式编程的底层统一模型。标签#设计模式 #观察者模式 #Observer #发布订阅 #事件驱动 #EventListener #Spring #EventBus #Java #行为型模式 #解耦 #消息队列 #面向对象 #软件工程
设计模式实战解读(四):观察者模式——事件驱动的解耦利器
本文是「设计模式实战解读」系列第四篇。系列文章统一按照定义 → 痛点场景 → 模式结构 → 核心实现 → 真实应用 → 常见变种 → 优缺点 → 避坑指南 → FAQ的结构展开每篇聚焦一个模式讲透。一句话定义观察者模式Observer定义对象间一对多的依赖关系——当一个对象状态变化时所有依赖它的对象自动收到通知并更新。也叫发布-订阅模式Publish-Subscribe。归属行为型模式。一、没有观察者时的痛点假设你在做一个流程引擎流程实例状态变更启动、完成、失败后需要做很多事publicclassFlowExecutor{publicvoidonFlowComplete(FlowInstanceinstance){// 1. 更新数据库状态flowDao.updateStatus(instance.getId(),COMPLETED);// 2. 发送通知给用户notificationService.send(instance.getUserId(),流程执行完成);// 3. 记录审计日志auditLogService.log(instance.getId(),COMPLETED,System.currentTimeMillis());// 4. 更新监控指标metricsService.increment(flow.complete.count);metricsService.recordTime(flow.execution.duration,instance.getDuration());// 5. 触发下游流程如果有if(instance.hasDownstream()){downstreamTrigger.trigger(instance.getDownstreamId());}// 6. 清理临时数据tempDataCleaner.clean(instance.getId());// 7. 发送 Webhook 回调webhookService.callback(instance.getCallbackUrl(),instance.toJson());// 还会继续增长...}}问题FlowExecutor 知道太多东西了——它直接依赖了通知、日志、监控、下游触发、Webhook 等七八个服务每加一个后置动作都要改 FlowExecutor——违反开闭原则某个后置动作失败会影响整条链路——notificationService 超时后面的全部阻塞无法灵活配置——某些流程不需要 Webhook 回调但代码里硬编码了单元测试极其困难——要 mock 七八个依赖核心诉求发布者只负责事情发生了不关心谁想知道、“知道了要干什么”。二、模式结构┌────────────────────────────┐ │ Subject (被观察者/发布者) │ ├────────────────────────────┤ │ - observers: ListObserver│ ├────────────────────────────┤ │ addObserver(observer) │ │ removeObserver(observer) │ │ notifyAll(event) │ └──────────┬─────────────────┘ │ 通知 ↓ ┌──────────────────────┐ ┌──────────────────────┐ │ Observer A (观察者) │ │ Observer B (观察者) │ ├──────────────────────┤ ├──────────────────────┤ │ onEvent(event) │ │ onEvent(event) │ └──────────────────────┘ └──────────────────────┘ 发布者和观察者之间是松耦合的—— 发布者不知道具体有哪些观察者只知道观察者实现了统一接口。三、核心实现3.1 手写基础版// 事件定义publicclassFlowEvent{privatefinalStringflowId;privatefinalStringstatus;// STARTED / COMPLETED / FAILEDprivatefinallongtimestamp;privatefinalMapString,Objectcontext;// constructor、getter 省略}// 观察者接口publicinterfaceFlowEventListener{voidonEvent(FlowEventevent);// 是否关心这个事件过滤用defaultbooleansupports(FlowEventevent){returntrue;}}// 事件发布器被观察者publicclassFlowEventPublisher{privatefinalListFlowEventListenerlistenersnewCopyOnWriteArrayList();publicvoidaddListener(FlowEventListenerlistener){listeners.add(listener);}publicvoidremoveListener(FlowEventListenerlistener){listeners.remove(listener);}publicvoidpublish(FlowEventevent){for(FlowEventListenerlistener:listeners){if(listener.supports(event)){try{listener.onEvent(event);}catch(Exceptione){// 单个观察者异常不影响其他观察者log.error(Listener {} failed,listener.getClass().getSimpleName(),e);}}}}}3.2 具体观察者实现// 观察者1更新数据库状态ComponentpublicclassFlowStatusUpdaterimplementsFlowEventListener{OverridepublicvoidonEvent(FlowEventevent){flowDao.updateStatus(event.getFlowId(),event.getStatus());}}// 观察者2发送通知ComponentpublicclassFlowNotificationListenerimplementsFlowEventListener{OverridepublicvoidonEvent(FlowEventevent){if(COMPLETED.equals(event.getStatus())||FAILED.equals(event.getStatus())){notificationService.send(event.getUserId(),buildMessage(event));}}Overridepublicbooleansupports(FlowEventevent){// 只关心完成和失败事件returnCOMPLETED.equals(event.getStatus())||FAILED.equals(event.getStatus());}}// 观察者3监控指标ComponentpublicclassFlowMetricsListenerimplementsFlowEventListener{OverridepublicvoidonEvent(FlowEventevent){metricsService.increment(flow.event.getStatus().toLowerCase().count);}}// 观察者4审计日志ComponentpublicclassFlowAuditListenerimplementsFlowEventListener{OverridepublicvoidonEvent(FlowEventevent){auditLogService.log(event.getFlowId(),event.getStatus(),event.getTimestamp());}}3.3 改造后的 FlowExecutorpublicclassFlowExecutor{privatefinalFlowEventPublishereventPublisher;publicvoidonFlowComplete(FlowInstanceinstance){// 只做一件事发布事件eventPublisher.publish(newFlowEvent(instance.getId(),COMPLETED,System.currentTimeMillis(),instance.getContext()));}}从依赖 7 个服务变成依赖 1 个 EventPublisher——发布者彻底解耦。新增后置逻辑只需加一个 Listener 实现不改 FlowExecutor。3.4 结合 Spring 的事件机制Spring 自带了观察者模式的基础设施// 事件定义publicclassFlowCompletedEventextendsApplicationEvent{privatefinalStringflowId;privatefinalStringstatus;publicFlowCompletedEvent(Objectsource,StringflowId,Stringstatus){super(source);this.flowIdflowId;this.statusstatus;}}// 发布事件一行代码ComponentpublicclassFlowExecutor{AutowiredprivateApplicationEventPublishereventPublisher;publicvoidonFlowComplete(FlowInstanceinstance){eventPublisher.publishEvent(newFlowCompletedEvent(this,instance.getId(),COMPLETED));}}// 监听事件注解驱动零配置注册ComponentpublicclassFlowMetricsListener{EventListenerpublicvoidhandle(FlowCompletedEventevent){metricsService.increment(flow.complete.count);}}ComponentpublicclassFlowNotificationListener{EventListener(condition#event.status FAILED)publicvoidhandleFailed(FlowCompletedEventevent){notificationService.send(event.getFlowId(),流程执行失败);}}// 异步监听不阻塞主流程ComponentpublicclassFlowAuditListener{AsyncEventListenerpublicvoidhandle(FlowCompletedEventevent){auditLogService.log(event.getFlowId(),event.getStatus());}}Spring 事件机制的优势不需要手动注册观察者自动发现 EventListener支持条件过滤conditionSpEL 表达式支持异步执行Async支持事务绑定TransactionalEventListener四、真实应用场景4.1 框架级应用框架/技术被观察者事件观察者SpringApplicationContextContextRefreshedEvent各种 ListenerSpring BootSpringApplicationApplicationStartedEvent自动配置模块Java SwingJButtonActionEventActionListenerDOM (前端)HTMLElementclick/inputaddEventListenerVue.js响应式数据数据变化视图更新函数RedisKey 空间过期/修改事件订阅者RocketMQTopic消息发布Consumer Group4.2 业务场景业务事件谁发布谁订阅干什么用户注册UserService发欢迎邮件、初始化配额、推送到 CRM流程完成FlowExecutor通知用户、记日志、更新指标、触发下游订单支付成功PaymentService更新订单状态、减库存、发短信、记账配置变更NacosClient刷新 Bean、清缓存、重建连接池连接器授权更新OAuthService刷新 Token 缓存、通知使用方、记审计日志节点执行失败NodeExecutor重试调度、告警通知、快照写入4.3 iPaaS 流程引擎中的事件驱动在数环通 iPaaS 引擎中流程执行状态变更是典型的事件驱动场景FlowExecutionEngine发布者 ├── 发布: FlowStartedEvent │ ├── 监控 Listener: 记录开始时间 │ └── 限流 Listener: 检查并发配额 │ ├── 发布: NodeExecutedEvent │ ├── 日志 Listener: 持久化节点执行日志 │ └── 进度 Listener: 更新流程执行进度 │ ├── 发布: FlowCompletedEvent │ ├── 通知 Listener: 回调用户 Webhook │ ├── 指标 Listener: 记录执行时长 │ └── 清理 Listener: 释放临时资源 │ └── 发布: FlowFailedEvent ├── 恢复 Listener: 写入恢复快照 ├── 告警 Listener: 发送失败通知 └── 重试 Listener: 判断是否自动重试引擎本身不关心流程完成后要做什么——它只负责把事件丢出去。所有后置逻辑通过 Listener 独立实现可以按环境按租户灵活配置。五、常见变种5.1 同步 vs 异步观察者// 同步默认publish 方法内逐个调用观察者阻塞直到全部完成publisher.publish(event);// 所有 listener 执行完才返回// 异步每个观察者在独立线程中执行publicclassAsyncEventPublisher{privatefinalExecutorServiceexecutorExecutors.newFixedThreadPool(4);publicvoidpublish(FlowEventevent){for(FlowEventListenerlistener:listeners){executor.submit(()-{try{listener.onEvent(event);}catch(Exceptione){log.error(Async listener failed,e);}});}}}选择依据同步需要保证执行顺序、需要在同一事务内、需要知道是否全部成功异步后置逻辑耗时长、不影响主流程、允许最终一致性5.2 事件总线EventBusGoogle Guava 提供的 EventBus 是观察者模式的工业级实现// 创建事件总线EventBuseventBusnewEventBus(flow-events);// 异步版AsyncEventBusasyncBusnewAsyncEventBus(executorService);// 订阅通过注解publicclassFlowMetricsSubscriber{SubscribepublicvoidonComplete(FlowCompletedEventevent){metricsService.increment(flow.complete);}}// 注册eventBus.register(newFlowMetricsSubscriber());// 发布eventBus.post(newFlowCompletedEvent(flowId));5.3 跨进程观察者消息队列进程内的观察者模式扩展到分布式——就是消息队列进程内观察者 跨进程观察者 ─────────── ─────────── EventPublisher RocketMQ Producer ↓ publish() ↓ send(topic, msg) ListListener Consumer Group ↓ onEvent() ↓ consumeMessage()从架构上看RocketMQ / Kafka / RabbitMQ 本质就是跨进程的观察者模式。区别在于进程内低延迟、强一致、无持久化跨进程高可用、可持久化、支持重试/死信5.4 响应式流Reactive Streams当事件是连续流时如实时数据推送观察者模式演变为响应式编程// Project ReactorFluxFlowEventeventStreamflowEventSource.stream();eventStream.filter(e-FAILED.equals(e.getStatus())).buffer(Duration.ofSeconds(10))// 每10秒批量处理.subscribe(batch-alertService.batchAlert(batch));六、优缺点优点缺点发布者和订阅者松耦合通知顺序不确定同步模式下按注册顺序符合开闭原则加订阅者不改发布者观察者太多时性能下降支持动态注册/注销调试困难事件链路不直观天然适配事件驱动架构可能导致内存泄漏忘了注销可以轻松切换同步/异步循环依赖难以发现A 通知 BB 又通知 A七、避坑指南坑 1观察者异常导致后续观察者不执行// 错误写法publicvoidpublish(Eventevent){for(Listenerl:listeners){l.onEvent(event);// 如果这里抛异常后面的 listener 全部跳过}}// 正确写法publicvoidpublish(Eventevent){for(Listenerl:listeners){try{l.onEvent(event);}catch(Exceptione){log.error(Listener {} error,l.getClass().getSimpleName(),e);// 继续执行下一个}}}坑 2观察者持有发布者引用导致内存泄漏匿名内部类/Lambda 会隐式持有外部类引用。如果发布者生命周期长如全局单例、观察者生命周期短如请求级对象忘了 remove 会导致内存泄漏。防御用弱引用存储观察者或者严格配对 add/remove。坑 3同步观察者阻塞主流程一个慢观察者比如发邮件耗时 3 秒会把整个 publish 调用阻塞 3 秒。防御耗时操作的观察者必须异步化Async或独立线程池。坑 4事件风暴观察者内部又发布了新事件导致递归/循环通知——最终 StackOverflow 或者 CPU 打满。// 危险观察者内部又触发了事件EventListenerpublicvoidhandle(FlowCompletedEventevent){// 处理完成后发布新事件publisher.publishEvent(newAuditEvent(...));// AuditEvent 的监听者又发布...}防御事件链路最多 2-3 层禁止循环发布。可以加深度计数器或者用异步打断循环。坑 5Spring EventListener 在事务提交前执行默认的EventListener是在事务提交前执行的。如果你在 Listener 里读数据库查刚 INSERT 的数据可能查不到。防御用TransactionalEventListener(phase AFTER_COMMIT)确保事务提交后再执行。坑 6观察者顺序依赖如果多个观察者之间有执行顺序要求A 必须在 B 之前不要依赖注册顺序——这不可靠。防御用Order注解Spring显式指定顺序或者把有序逻辑放在同一个观察者内。八、观察者模式 vs 发布-订阅模式这两个术语经常混用但有微妙区别维度观察者模式发布-订阅模式耦合度发布者知道观察者接口发布者和订阅者完全不知道对方中间层无直接通知有EventBus / Broker典型实现Java Observer、DOM EventRocketMQ、Redis PubSub、EventBus通信进程内同步可跨进程异步过滤观察者自己过滤中间层按 Topic 过滤在实际讨论中两者经常等同使用不必过于纠结概念边界。九、常见问题FAQQ观察者模式和回调有什么区别A回调是一对一调用方注册一个回调函数观察者是一对多一个事件通知多个监听者。回调适合完成通知场景观察者适合广播通知场景。Q什么时候应该用消息队列代替进程内观察者A三个信号① 观察者需要跨服务不在同一个 JVM② 观察者需要可靠重试进程内异常就丢了③ 发布速率远大于消费速率需要削峰。满足任一条就应该上 MQ。Q观察者模式会影响性能吗A同步模式下N 个观察者的总耗时 各观察者耗时之和。如果观察者多且部分耗时会显著拖慢 publish。解法① 异步化耗时观察者② 减少不必要的观察者用 supports() 过滤③ 批量通知代替逐条通知。QSpring 的 EventListener 是异步的吗A默认是同步的在 publishEvent 调用线程中执行。要异步需要额外加Async 开启EnableAsync。注意异步后事件的事务绑定和异常处理策略都需要调整。Q如何测试观察者A① 单测观察者本身传入 mock 事件验证行为② 集成测试发布→监听链路发布事件验证观察者的副作用。不要把发布者和所有观察者放在一个测试里——那就变成集成测试了。Q事件溯源Event Sourcing和观察者模式的关系A事件溯源把状态变更记录为不可变的事件序列通过重放事件恢复状态。观察者模式是事件溯源的通知层——事件产生后由观察者消费并产生副作用。两者是互补关系事件溯源管记录观察者管响应。十、小结观察者模式的核心价值把事情发生了和要对此做什么彻底解耦。三个实践要点用 Spring EventListener 而不是手写 Observer——框架帮你管理注册、支持条件过滤和异步每个观察者用 try-catch 包裹——一个失败不能影响其他耗时操作必须异步——否则慢观察者会拖垮整个事件链设计模式不是银弹但观察者模式几乎是所有事件驱动系统的基石。理解它就理解了从 DOM 事件到消息队列到响应式编程的底层统一模型。标签#设计模式 #观察者模式 #Observer #发布订阅 #事件驱动 #EventListener #Spring #EventBus #Java #行为型模式 #解耦 #消息队列 #面向对象 #软件工程