1. 项目概述与核心挑战在构建企业级应用时我们经常会遇到一个典型的场景一个核心的主数据库Master用于存储全局配置、用户信息等基础数据同时还有N个业务或租户专属的应用数据库App DB。业务逻辑常常需要在一个请求链路中同时操作主库和某个应用库的数据。比如一个电商后台在处理订单时需要先从主库读取用户信息再向该用户所属租户的应用库写入订单详情。这种架构带来了两个非常棘手的技术难题第一如何优雅、动态地管理这多个数据源并在运行时根据业务上下文如租户ID进行无缝切换第二更关键的是当一次业务操作涉及对多个数据库的写操作时如何保证跨库的数据一致性即事务的原子性这两个问题不解决系统的稳定性和数据可靠性就无从谈起。今天我就结合自己在一个大型SaaS平台项目中的实战经验来深度拆解SpringBoot环境下多数据源与事务管理的解决方案。这不是一个简单的配置教程而是会深入到Spring框架的设计原理剖析现有方案的局限性并最终给出一个在生产环境中验证过的、支持动态数据源与自定义跨库事务的完整实现方案。无论你是正在面临类似架构挑战的开发者还是希望深入理解Spring事务机制的技术爱好者相信这篇长文都能给你带来实实在在的收获。2. 数据源动态切换的核心原理与方案选型要解决多数据源问题我们首先得理解Spring框架为我们提供的“脚手架”。Spring JDBC抽象层中有一个关键类AbstractRoutingDataSource。这个类的设计非常巧妙它本身并不直接管理连接而是作为一个“路由员”根据当前的“查找键”lookup key来决定将数据请求路由到哪个具体的DataSource上。2.1 AbstractRoutingDataSource 工作机制深度解析让我们翻开Spring的源码看看这个类是怎么工作的。它内部维护了两个核心的MaptargetDataSources 一个MapObject, DataSource存储了所有备选的数据源其中Key就是我们说的lookup key比如“master”、“tenant_001”Value就是具体的数据库连接池如Druid、HikariCP实例。defaultTargetDataSource 默认的数据源当找不到对应的lookup key时就会使用它。在Spring容器初始化Bean调用afterPropertiesSet()方法时AbstractRoutingDataSource会做一件重要的事它将targetDataSources和defaultTargetDataSource解析resolve并复制到另外两个final字段——resolvedDataSources和resolvedDefaultDataSource中。这里就是第一个关键点这个复制动作在容器启动阶段完成后resolvedDataSources就固定下来了。这意味着后续你无法再向这个Map里动态添加或删除数据源。这对于需要根据租户动态创建数据源的应用来说是一个致命的限制。那么路由是如何发生的呢核心在于那个抽象方法determineCurrentLookupKey()。每次应用通过DataSource.getConnection()请求连接时AbstractRoutingDataSource的determineTargetDataSource()方法会先调用determineCurrentLookupKey()获取当前的key然后用这个key去resolvedDataSources里找到对应的DataSource最后返回这个具体数据源的连接。所以实现动态切换的思路就清晰了我们继承AbstractRoutingDataSource重写determineCurrentLookupKey()方法让它从当前线程的上下文比如ThreadLocal中返回一个标识符。这样只要我们在执行DAO操作前通过AOP等手段设置好这个上下文标识符就能实现数据源的动态切换。2.2 两种管理模式的权衡配置文件 vs. 数据库表基于上述原理业界通常有两种实现数据源管理的模式选择哪种取决于你的业务场景。2.2.1 基于配置文件的静态方案这是最简单、最常见的入门方案。你在application.yml里预先定义好所有数据源的连接信息。spring: datasource: druid: master: url: jdbc:mysql://localhost:3306/master_db username: root password: 123456 tenant_001: url: jdbc:mysql://192.168.1.101:3306/tenant_001_db username: app_user password: pass_001 tenant_002: url: jdbc:mysql://192.168.1.102:3306/tenant_002_db username: app_user password: pass_002然后在Configuration类中将这些配置注入为Bean并组装到我们自定义的DynamicDataSource中。Configuration public class DataSourceConfig { Bean ConfigurationProperties(spring.datasource.druid.master) public DataSource masterDataSource() { return DruidDataSourceBuilder.create().build(); } // ... 类似地定义 tenant_001, tenant_002 等Bean Bean Primary // 标记为主要数据源Spring事务管理等默认会使用它 public DataSource dynamicDataSource(DataSource masterDataSource, DataSource tenant001DataSource, ...) { MapObject, Object targetDataSources new HashMap(); targetDataSources.put(master, masterDataSource); targetDataSources.put(tenant_001, tenant001DataSource); // ... DynamicDataSource ds new DynamicDataSource(); ds.setDefaultTargetDataSource(masterDataSource); ds.setTargetDataSources(targetDataSources); return ds; } }方案优点实现简单配置直观。启动时即加载性能好。方案缺点与局限无法动态增删正如前文所述数据源在启动后就被固化无法应对运行时新增租户新数据源的场景。配置冗长如果租户数量成百上千配置文件将变得难以维护。不够灵活数据源配置变更如密码修改、地址迁移需要重启应用。因此这种方案仅适用于数据源数量固定且极少变化的场景。2.2.2 基于数据库表的动态方案对于SaaS或多租户平台数据源往往是随着租户的注册而动态创建和管理的。这时我们就需要将数据源的配置信息存储到数据库通常是主库的一张表中。我们需要设计一张数据源配置表例如sys_datasource其核心字段包括数据源唯一标识ds_key 如tenant_001、JDBC URL、用户名、密码、驱动类名、连接池参数初始大小、最大连接数等、状态启用/禁用等。应用启动时从这张表中加载所有启用的数据源配置并使用DataSourceBuilder动态创建DataSource实例放入一个我们自己管理的容器例如一个ConcurrentHashMap中。同时我们需要自定义一个DynamicDataSource类它不再完全依赖父类的resolvedDataSources而是重写determineTargetDataSource()方法使其从我们自管理的容器中根据lookup key获取数据源。方案优点高度动态化可以在运行时通过管理界面增、删、改、启用、禁用数据源无需重启应用。集中管理所有数据源配置一目了然便于维护和审计。扩展性强可以轻松集成监控、保活等高级功能。方案缺点实现复杂需要自行处理数据源的加载、缓存、刷新和销毁生命周期。启动依赖应用启动依赖于主库的可连接性因为需要从主库读取配置。连接池管理需要谨慎处理动态创建的连接池避免内存泄漏。实操心得动态数据源的缓存与保活直接从数据库表实时查询配置来创建连接池的性能是不可接受的。我们必须在内存中做一层缓存。通常的做法是启动时全量加载到本地缓存Map中同时开启一个定时任务比如每5分钟同步数据库中的变更如状态切换、配置更新。对于配置更新需要优雅地重建连接池先创建新的连接池验证连接有效后再替换缓存中的旧实例并异步关闭旧池避免服务中断。此外还可以定时对缓存中的连接池执行一次简单的查询如SELECT 1进行保活防止数据库端因空闲超时断开连接。3. 基于数据库表的动态数据源完整实现接下来我们深入细节实现一个基于数据库表管理的、生产可用的动态数据源方案。3.1 核心接口与类设计首先定义一个数据源管理器的接口这是对我们自定义数据源容器行为的抽象。public interface DataSourceManager { /** 添加或更新一个数据源 */ void put(String dataSourceKey, DataSource dataSource); /** 获取一个数据源 */ DataSource get(String dataSourceKey); /** 判断是否存在 */ boolean containsKey(String dataSourceKey); /** 移除一个数据源会关闭其连接池 */ void remove(String dataSourceKey); /** 获取所有数据源Key */ SetString keySet(); }然后实现我们自己的DynamicDataSource。它需要继承AbstractRoutingDataSource以实现路由能力同时实现DataSourceManager接口以提供动态管理能力。public class DynamicDataSource extends AbstractRoutingDataSource implements DataSourceManager, InitializingBean { // 本地缓存存储所有动态数据源。Key: ds_key, Value: DataSource private final ConcurrentHashMapString, DataSource dataSourceMap new ConcurrentHashMap(); // 默认数据源主库通常从配置文件注入 private DataSource defaultDataSource; Override protected Object determineCurrentLookupKey() { // 从线程上下文中获取当前数据源键 return DataSourceContextHolder.get(); } Override public DataSource determineTargetDataSource() { Object lookupKey determineCurrentLookupKey(); if (lookupKey null) { return defaultDataSource; } // 优先从我们自己的map中查找 DataSource ds dataSourceMap.get(lookupKey.toString()); if (ds ! null) { return ds; } // 如果没找到回退到默认数据源或抛出异常根据业务决定 logger.warn(DataSource key [{}] not found, using default., lookupKey); return defaultDataSource; } // 实现DataSourceManager接口的方法 Override public void put(String key, DataSource dataSource) { dataSourceMap.put(key, dataSource); // 注意这里并没有更新父类的resolvedDataSources因为父类的机制是固定的。 // 我们的路由逻辑已经重写所以只需要更新自己的map即可。 } Override public DataSource get(String key) { return dataSourceMap.get(key); } // ... 其他接口方法实现 Override public void afterPropertiesSet() { // 父类初始化设置默认数据源这个targetDataSources可以传空因为我们重写了determineTargetDataSource super.setDefaultTargetDataSource(defaultDataSource); super.setTargetDataSources(Collections.emptyMap()); // 传空Map避免父类初始化无关数据源 super.afterPropertiesSet(); // 启动后从数据库加载所有数据源配置到dataSourceMap this.loadDataSourcesFromDatabase(); } private void loadDataSourcesFromDatabase() { // 1. 从主库defaultDataSource查询数据源配置表 // 2. 遍历配置使用DataSourceBuilder.create().build() 创建DataSource实例 // 3. 调用 put(key, dataSource) 放入缓存 } }这里的关键点在于我们完全重写了determineTargetDataSource()方法绕过了父类对resolvedDataSources的依赖转而查询我们自己的dataSourceMap。父类的afterPropertiesSet()方法仍然需要调用以完成一些基本的初始化但我们传入一个空的targetDataSources因为具体的数据源由我们自己管理。3.2 线程上下文与AOP切面设计数据源键lookup key需要与当前执行线程绑定。我们使用ThreadLocal来实现一个简单的上下文持有器。public class DataSourceContextHolder { private static final ThreadLocalString CONTEXT_HOLDER new ThreadLocal(); public static void set(String dataSourceKey) { CONTEXT_HOLDER.set(dataSourceKey); } public static String get() { return CONTEXT.HOLDER.get(); } public static void clear() { CONTEXT_HOLDER.remove(); } }为了让业务代码无感知地切换数据源我们使用Spring AOP。定义一个注解SwitchDataSource。Target({ElementType.METHOD, ElementType.TYPE}) Retention(RetentionPolicy.RUNTIME) Documented public interface SwitchDataSource { String value(); // 数据源键 }然后编写切面在方法执行前设置上下文执行后清理。Aspect Component Order(1) // 顺序要设置在事务切面之前因为连接获取需要在事务开启之前确定 public class DataSourceAspect { Around(annotation(switchDataSource)) public Object around(ProceedingJoinPoint point, SwitchDataSource switchDataSource) throws Throwable { String previousKey DataSourceContextHolder.get(); String currentKey switchDataSource.value(); try { DataSourceContextHolder.set(currentKey); return point.proceed(); } finally { // 恢复之前的数据源键如果之前没有则为null会清除 if (previousKey ! null) { DataSourceContextHolder.set(previousKey); } else { DataSourceContextHolder.clear(); } } } }业务使用示例Service public class OrderService { Autowired private OrderMapper orderMapper; // 操作应用库 Autowired private UserMapper userMapper; // 操作主库 // 该方法内所有数据库操作默认使用主库 public void processOrder(Long userId, Order order) { User user userMapper.selectById(userId); // 从主库查用户 // 切换到该用户所属租户的应用库 switchToTenantDb(user.getTenantId()); orderMapper.insert(order); // 插入到应用库 } SwitchDataSource(${tenant.id}) // 假设通过某种方式解析出tenantId private void switchToTenantDb(String tenantId) { // 方法体可以为空AOP会处理切换 // 或者更常见的做法是将SwitchDataSource直接标在Mapper方法或Service方法上 } }注意事项AOP的陷阱执行顺序数据源切换切面Order(1)必须在事务管理切面Transactional 通常Order为Integer.MAX_VALUE或默认之前执行。因为Spring事务管理器在开启事务时会从DataSource获取一个连接并绑定到当前线程。如果先开启了事务获取了默认数据源的连接再切换数据源就无效了。确保你的切面Order值小于事务切面。嵌套调用在同一个类内部的方法调用this.method()是不会被AOP拦截的这是Spring AOP基于代理的局限性。因此SwitchDataSource注解最好标注在Controller调用的入口Service方法上或者使用AspectJ编译时织入来规避此问题。上下文清理务必在finally块中清理ThreadLocal否则可能导致内存泄漏和后续请求数据源错乱。4. 多数据源下的分布式事务困境与Spring事务本质数据源切换问题解决了但更大的挑战随之而来事务一致性。假设我们在一个方法里先向主库插入一条日志再向应用库插入订单。如果订单插入失败我们期望日志插入也回滚。但在默认的Spring声明式事务Transactional下这是做不到的。4.1 为什么Transactional在多数据源下失效默认的DataSourceTransactionManager是与一个特定的DataSource绑定的。当你在一个Transactional方法内即使通过AOP切换了数据源事务管理器在方法开始时获取的连接来自默认数据源已经被绑定到当前事务。后续切换数据源后DAO操作从新的数据源获取了另一个连接这个连接并不在最初开启的事务管理范围之内。因此你实际上是在用两个独立的事务操作两个数据库它们之间没有任何原子性保证。一种常见的错误尝试是使用Propagation.REQUIRES_NEWSwitchDataSource(master) Transactional public void outerMethod() { masterDao.insert(); // 操作主库事务A innerMethod(); // 调用内层方法 // 如果这里抛出异常... } SwitchDataSource(tenant_001) Transactional(propagation Propagation.REQUIRES_NEW) public void innerMethod() { tenantDao.insert(); // 操作应用库事务B }这会导致innerMethod启动一个全新的、独立的事务B。如果outerMethod在innerMethod成功后抛出异常事务A回滚但事务B已经提交数据不一致就此产生。4.2 重新审视Spring事务的本质要解决这个问题我们需要拨开云雾理解Spring事务管理器的本质。我们常说的“Spring事务”其实是对JDBC事务或JPA等资源本地事务的一种封装和增强管理。事务的起点begin在JDBC中并没有一个明确的begin命令。事务的开启实际上是通过connection.setAutoCommit(false)来实现的。Spring事务管理器在获取连接后会将其自动提交设置为false。提交commit与回滚rollback这是JDBC连接对象的核心方法Spring在事务成功或失败时调用它们。挂起suspend与恢复resume这是Spring为了处理事务传播行为如REQUIRES_NEW而创造的概念。挂起并不是真的把数据库事务挂起而是将当前线程绑定的连接ConnectionHolder从事务同步管理器TransactionSynchronizationManager中解绑并暂存起来。恢复则是重新绑定。关闭close在Spring中事务结束后的close操作通常是将连接归还给连接池而不是物理关闭。理解了这些我们就明白要想实现跨库事务核心在于让多个数据库连接参与到同一个“事务协调”过程中并在最终决定提交或回滚时让所有连接保持一致。在单体应用、同一个JVM内我们可以尝试实现一个“应用层的事务协调器”这就是下面要讲的自定义多数据源事务管理。5. 自定义多数据源事务管理器实现方案我们的目标是在一个MultiTransaction注解标记的方法内无论操作多少个数据源这些操作要么全部成功要么全部回滚。思路是拦截所有获取数据库连接的操作返回一个被我们“包装”过的连接这个连接的commit和rollback方法被我们控制。然后在一个统一的切面里管理所有这些包装连接的最终提交与回滚。5.1 整体架构与核心组件这个方案涉及几个核心组件ConnectionProxy连接代理包装真实的JDBCConnection重写其commit和close方法使其不立即生效而是由我们统一调度。TransactionHolder事务持有器绑定到当前线程用于管理本次“全局事务”涉及的所有ConnectionProxy以及事务的嵌套栈信息。MultiTransactionManager多事务管理器核心协调器负责开启、注册连接、提交、回滚等全局操作。MultiTransactionAspect事务切面处理MultiTransaction注解串联整个流程。5.2 详细实现步骤5.2.1 定义多事务注解Target(ElementType.METHOD) Retention(RetentionPolicy.RUNTIME) Documented public interface MultiTransaction { /** 指定使用的事务管理器Bean名称 */ String transactionManager() default multiTransactionManager; /** 事务隔离级别默认跟随数据库 */ Isolation isolation() default Isolation.DEFAULT; /** 事务的默认数据源事务起点 */ String defaultDataSource() default master; /** 是否只读事务 */ boolean readOnly() default false; }5.2.2 实现ConnectionProxy这是实现事务控制的关键。我们创建一个代理连接它持有真实的连接但屏蔽了其原生的提交和关闭行为。public class ConnectionProxy implements Connection { private final Connection originalConnection; // 标记该连接是否已被当前全局事务管理 private volatile boolean managed false; // 记录这个连接上执行了多少次“获取”操作用于引用计数 private final AtomicInteger referenceCount new AtomicInteger(0); public ConnectionProxy(Connection originalConnection) { this.originalConnection originalConnection; } Override public void commit() throws SQLException { // 关键屏蔽原生提交。真正的提交由MultiTransactionManager统一处理。 // do nothing logger.debug(Commit operation is intercepted and suppressed on proxy connection.); } Override public void rollback() throws SQLException { // 同样屏蔽原生回滚。但这里有个细节如果业务代码主动调用rollback我们可能需要处理。 // 一种策略是记录一个标记在全局回滚时使用。 // 为了简化我们先屏蔽所有回滚由管理器控制。 logger.debug(Rollback operation is intercepted and suppressed on proxy connection.); } Override public void close() throws SQLException { // 关键屏蔽原生关闭。我们通过引用计数来控制何时真正关闭连接。 int count referenceCount.decrementAndGet(); if (count 0) { // 当所有使用方都“关闭”后由事务管理器在全局提交或回滚后统一真实关闭。 // 这里只是标记实际关闭逻辑在MultiTransactionManager中。 logger.debug(All references released, connection marked for later real close.); } } public void realCommit() throws SQLException { if (!originalConnection.getAutoCommit()) { originalConnection.commit(); } } public void realRollback() throws SQLException { if (!originalConnection.getAutoCommit()) { originalConnection.rollback(); } } public void realClose() throws SQLException { // 真正关闭前确保连接处于自动提交模式避免连接池拿到一个未提交事务的连接。 if (!originalConnection.getAutoCommit()) { originalConnection.setAutoCommit(true); } originalConnection.close(); } public Connection getOriginalConnection() { return originalConnection; } // ... 需要实现Connection接口的所有其他方法大部分直接委托给originalConnection // 这是一个繁琐但必要的工作可以使用动态代理如JDK Proxy简化但这里为了清晰展示原理使用静态包装。 }5.2.3 实现TransactionHolder这个类持有当前线程的全局事务状态。public class TransactionHolder { // 全局事务ID private final String globalTransactionId UUID.randomUUID().toString(); // 事务隔离级别 private final Isolation isolation; // 是否只读 private final boolean readOnly; // 存储数据源键与ConnectionProxy的映射 private final MapString, ConnectionProxy connectionMap new ConcurrentHashMap(); // 事务执行栈用于支持嵌套虽然多数据源嵌套事务场景复杂这里先提供基础支持 private final DequeString transactionStack new ArrayDeque(); // 数据源键栈与事务栈对应 private final DequeString dataSourceKeyStack new ArrayDeque(); public TransactionHolder(Isolation isolation, boolean readOnly) { this.isolation isolation; this.readOnly readOnly; // 初始压入一个空栈帧代表最外层事务的起点 transactionStack.push(ROOT_ globalTransactionId); dataSourceKeyStack.push(null); // 最外层可能还没有指定数据源 } public void registerConnection(String dataSourceKey, ConnectionProxy connection) { connectionMap.putIfAbsent(dataSourceKey, connection); } public ConnectionProxy getConnection(String dataSourceKey) { return connectionMap.get(dataSourceKey); } public CollectionConnectionProxy getAllConnections() { return connectionMap.values(); } // ... getters and setters }5.2.4 改造DynamicDataSource以支持连接代理我们需要修改之前DynamicDataSource的getConnection()方法使其在全局事务开启时返回ConnectionProxy。public class DynamicDataSource extends AbstractRoutingDataSource implements DataSourceManager { // ... 之前的字段和方法 Override public Connection getConnection() throws SQLException { return getConnection(null, null); } Override public Connection getConnection(String username, String password) throws SQLException { // 1. 判断当前线程是否处于自定义的全局事务中 TransactionHolder holder MultiTransactionContext.getCurrentTransactionHolder(); if (holder null) { // 不在全局事务中直接返回原始连接 return determineTargetDataSource().getConnection(); } // 2. 在全局事务中获取当前数据源键 String currentLookupKey determineCurrentLookupKey(); if (currentLookupKey null) { currentLookupKey default; // 或使用注解指定的defaultDataSource } // 3. 从TransactionHolder中获取已注册的代理连接 ConnectionProxy proxy holder.getConnection(currentLookupKey); if (proxy ! null) { // 增加引用计数模拟多次getConnection proxy.incrementReference(); return proxy; } // 4. 如果还没有为该数据源创建代理连接则创建并注册 DataSource targetDs determineTargetDataSource(); Connection originalConn targetDs.getConnection(); // 设置连接属性关闭自动提交设置隔离级别 if (originalConn.getAutoCommit()) { originalConn.setAutoCommit(false); } // 根据holder中的隔离级别设置连接隔离级别如果非DEFAULT // ... proxy new ConnectionProxy(originalConn); holder.registerConnection(currentLookupKey, proxy); proxy.incrementReference(); // 初始引用计数为1 return proxy; } }5.2.5 实现MultiTransactionManager与切面这是最复杂的部分负责事务边界的控制。Component(multiTransactionManager) public class MultiTransactionManager { // 用于将TransactionHolder绑定到线程 private static final ThreadLocalTransactionHolder CURRENT_HOLDER new ThreadLocal(); public void begin(String defaultDataSourceKey, Isolation isolation, boolean readOnly) { if (CURRENT_HOLDER.get() ! null) { // 处理嵌套事务这里简化处理暂不支持真正的嵌套。可以抛出异常或合并处理。 throw new IllegalTransactionStateException(Nested multi-transactions are not fully supported.); } TransactionHolder holder new TransactionHolder(isolation, readOnly); CURRENT_HOLDER.set(holder); // 设置初始数据源上下文如果指定了 if (defaultDataSourceKey ! null) { DataSourceContextHolder.set(defaultDataSourceKey); } } public void commit() { TransactionHolder holder CURRENT_HOLDER.get(); if (holder null) { throw new TransactionException(No transaction in progress); } try { for (ConnectionProxy conn : holder.getAllConnections()) { conn.realCommit(); // 统一提交所有代理连接 } } catch (SQLException e) { throw new TransactionException(Failed to commit multi-transaction, e); } finally { closeAllConnections(holder); cleanup(); } } public void rollback() { TransactionHolder holder CURRENT_HOLDER.get(); if (holder null) { throw new TransactionException(No transaction in progress); } try { for (ConnectionProxy conn : holder.getAllConnections()) { conn.realRollback(); // 统一回滚所有代理连接 } } catch (SQLException e) { logger.error(Failed to rollback connection, e); // 回滚失败也要继续清理 } finally { closeAllConnections(holder); cleanup(); } } private void closeAllConnections(TransactionHolder holder) { for (ConnectionProxy conn : holder.getAllConnections()) { try { conn.realClose(); } catch (SQLException e) { logger.warn(Failed to close connection, e); } } } private void cleanup() { CURRENT_HOLDER.remove(); DataSourceContextHolder.clear(); // 清理数据源上下文 } public static TransactionHolder getCurrentTransactionHolder() { return CURRENT_HOLDER.get(); } }最后编写AOP切面来驱动整个流程Aspect Component Order(0) // 优先级最高需要在数据源切换切面之前执行这里需要仔细设计顺序。实际上事务开始需要先确定数据源。 public class MultiTransactionAspect { Autowired private MultiTransactionManager transactionManager; Around(annotation(multiTx)) public Object handleMultiTransaction(ProceedingJoinPoint pjp, MultiTransaction multiTx) throws Throwable { // 1. 获取注解属性 Isolation isolation multiTx.isolation(); boolean readOnly multiTx.readOnly(); String defaultDs multiTx.defaultDataSource(); // 2. 保存旧的数据源上下文 String previousDataSourceKey DataSourceContextHolder.get(); // 3. 开启全局事务 transactionManager.begin(defaultDs, isolation, readOnly); // 设置注解指定的默认数据源 DataSourceContextHolder.set(defaultDs); try { // 4. 执行业务方法 Object result pjp.proceed(); // 5. 提交事务 transactionManager.commit(); return result; } catch (Throwable e) { // 6. 回滚事务 transactionManager.rollback(); throw e; } finally { // 7. 恢复旧的数据源上下文 if (previousDataSourceKey ! null) { DataSourceContextHolder.set(previousDataSourceKey); } else { DataSourceContextHolder.clear(); } } } }5.3 方案局限性、注意事项与排查技巧这个自定义方案实现了在单体应用内跨多个数据源的事务协调但它并非银弹有显著的局限性非标准分布式事务这本质上是“最佳努力一阶段提交”Best-Efforts 1PC。它没有两阶段提交2PC的“准备”阶段无法保证在所有极端情况下如提交过程中某个数据库崩溃的强一致性。它依赖于所有数据源在commit时都能成功。性能开销每个参与事务的连接在整个事务期间都被占用直到全局结束。长事务会导致数据库连接持有时间过长影响系统吞吐量。连接泄露风险必须极其小心地处理ConnectionProxy的引用计数和真实关闭逻辑否则会导致数据库连接池耗尽。嵌套事务支持复杂上述示例简化了嵌套事务。在真实场景中需要更精细地管理事务栈和连接的生命周期。ORM框架兼容性此方案直接操作JDBC连接。如果使用MyBatis它通常能很好地工作。但如果使用JPA (Hibernate)情况会复杂得多因为Hibernate有自己的连接管理和事务同步机制需要更深入的集成。常见问题排查表问题现象可能原因排查步骤与解决方案数据源切换无效始终使用默认库1. AOP顺序问题事务切面先于数据源切面执行。2. 方法内部调用导致AOP失效。3.ThreadLocal上下文未正确清理污染了后续请求。1. 检查Order注解确保数据源切面(DataSourceAspect)的order值小于事务切面(MultiTransactionAspect)。2. 将SwitchDataSource注解移到被外部调用的方法上或使用Autowired注入自身代理对象进行内部调用。3. 确保切面的finally块中正确恢复了上下文。自定义事务MultiTransaction内操作未回滚1.ConnectionProxy的commit/rollback方法未被正确拦截。2. 业务代码捕获了异常未抛出。3. 参与事务的某个操作未使用代理连接如直接new了一个连接。1. 调试确认返回的连接是ConnectionProxy类型并检查其方法是否被重写。2. 确保业务方法抛出的异常是RuntimeException或注解中rollbackFor指定的异常。3. 确保所有数据库操作都通过Spring管理的DataSource/SqlSessionTemplate等获取连接。报错“Connection is closed”或连接泄露1.ConnectionProxy的close()和realClose()逻辑有误。2. 引用计数未正确管理导致连接过早被真实关闭。3. 全局事务结束后未清理ThreadLocal。1. 检查close()方法是否只减少引用计数realClose()是否在全局提交/回滚后调用。2. 在getConnection和close方法中加入详细日志跟踪引用计数变化。3. 确保MultiTransactionManager的cleanup()方法在finally块中被调用。与Spring原生Transactional混用出错两种事务管理器冲突连接管理混乱。强烈不建议混用。在需要使用跨库事务的服务层统一使用MultiTransaction。对于单库操作可以使用原生Transactional但要注意两者不要嵌套。最好在架构上明确分层。6. 总结与更高维度的思考通过上述长篇的剖析与实现我们完成了一个从动态数据源管理到自定义跨库事务的完整解决方案。这个方案适用于传统的单体SpringBoot应用在面对“一主多从”或“一主多租户”数据库架构时提供了一种可行的数据隔离与一致性保障思路。然而我们必须清醒地认识到这个方案是有边界的。它的核心局限在于“同一个JVM进程”。一旦你的应用走向微服务化服务A操作数据库A服务B操作数据库B它们之间的数据一致性就无法通过这种应用层事务管理器来保证了。这时你就需要引入真正的分布式事务解决方案例如基于Seata的AT模式、TCC模式或者基于消息队列的最终一致性方案如本地消息表、RocketMQ事务消息等。技术选型的建议如果数据源数量固定且少优先考虑Spring内置的AbstractRoutingDataSourceTransactional(propagationPropagation.REQUIRES_NEW)并接受可能的数据不一致风险通过业务补偿如对账、冲正来解决。如果数据源动态增删且对一致性要求不是极端严格例如操作失败可接受人工干预修复本文的自定义动态数据源应用层事务方案是一个不错的选择。如果涉及跨服务、跨数据库的强一致性要求不要犹豫直接上成熟的分布式事务中间件如Seata。虽然复杂度高但它提供了标准化的解决方案和更强的保障。最后分享一个我在实际项目中踩过的坑在实现ConnectionProxy时最初我忽略了连接池的autoCommit状态。事务开始时将连接设为autoCommitfalse但在realClose时没有设回true导致连接归还给连接池后下一个使用者拿到的是一个处于事务中的连接引发了诡异的“只读”或“锁等待”问题。所以对连接状态的任何修改都必须确保在归还前恢复到连接池预期的默认状态这是一个至关重要的细节。
SpringBoot多数据源动态切换与跨库事务一致性实战方案
1. 项目概述与核心挑战在构建企业级应用时我们经常会遇到一个典型的场景一个核心的主数据库Master用于存储全局配置、用户信息等基础数据同时还有N个业务或租户专属的应用数据库App DB。业务逻辑常常需要在一个请求链路中同时操作主库和某个应用库的数据。比如一个电商后台在处理订单时需要先从主库读取用户信息再向该用户所属租户的应用库写入订单详情。这种架构带来了两个非常棘手的技术难题第一如何优雅、动态地管理这多个数据源并在运行时根据业务上下文如租户ID进行无缝切换第二更关键的是当一次业务操作涉及对多个数据库的写操作时如何保证跨库的数据一致性即事务的原子性这两个问题不解决系统的稳定性和数据可靠性就无从谈起。今天我就结合自己在一个大型SaaS平台项目中的实战经验来深度拆解SpringBoot环境下多数据源与事务管理的解决方案。这不是一个简单的配置教程而是会深入到Spring框架的设计原理剖析现有方案的局限性并最终给出一个在生产环境中验证过的、支持动态数据源与自定义跨库事务的完整实现方案。无论你是正在面临类似架构挑战的开发者还是希望深入理解Spring事务机制的技术爱好者相信这篇长文都能给你带来实实在在的收获。2. 数据源动态切换的核心原理与方案选型要解决多数据源问题我们首先得理解Spring框架为我们提供的“脚手架”。Spring JDBC抽象层中有一个关键类AbstractRoutingDataSource。这个类的设计非常巧妙它本身并不直接管理连接而是作为一个“路由员”根据当前的“查找键”lookup key来决定将数据请求路由到哪个具体的DataSource上。2.1 AbstractRoutingDataSource 工作机制深度解析让我们翻开Spring的源码看看这个类是怎么工作的。它内部维护了两个核心的MaptargetDataSources 一个MapObject, DataSource存储了所有备选的数据源其中Key就是我们说的lookup key比如“master”、“tenant_001”Value就是具体的数据库连接池如Druid、HikariCP实例。defaultTargetDataSource 默认的数据源当找不到对应的lookup key时就会使用它。在Spring容器初始化Bean调用afterPropertiesSet()方法时AbstractRoutingDataSource会做一件重要的事它将targetDataSources和defaultTargetDataSource解析resolve并复制到另外两个final字段——resolvedDataSources和resolvedDefaultDataSource中。这里就是第一个关键点这个复制动作在容器启动阶段完成后resolvedDataSources就固定下来了。这意味着后续你无法再向这个Map里动态添加或删除数据源。这对于需要根据租户动态创建数据源的应用来说是一个致命的限制。那么路由是如何发生的呢核心在于那个抽象方法determineCurrentLookupKey()。每次应用通过DataSource.getConnection()请求连接时AbstractRoutingDataSource的determineTargetDataSource()方法会先调用determineCurrentLookupKey()获取当前的key然后用这个key去resolvedDataSources里找到对应的DataSource最后返回这个具体数据源的连接。所以实现动态切换的思路就清晰了我们继承AbstractRoutingDataSource重写determineCurrentLookupKey()方法让它从当前线程的上下文比如ThreadLocal中返回一个标识符。这样只要我们在执行DAO操作前通过AOP等手段设置好这个上下文标识符就能实现数据源的动态切换。2.2 两种管理模式的权衡配置文件 vs. 数据库表基于上述原理业界通常有两种实现数据源管理的模式选择哪种取决于你的业务场景。2.2.1 基于配置文件的静态方案这是最简单、最常见的入门方案。你在application.yml里预先定义好所有数据源的连接信息。spring: datasource: druid: master: url: jdbc:mysql://localhost:3306/master_db username: root password: 123456 tenant_001: url: jdbc:mysql://192.168.1.101:3306/tenant_001_db username: app_user password: pass_001 tenant_002: url: jdbc:mysql://192.168.1.102:3306/tenant_002_db username: app_user password: pass_002然后在Configuration类中将这些配置注入为Bean并组装到我们自定义的DynamicDataSource中。Configuration public class DataSourceConfig { Bean ConfigurationProperties(spring.datasource.druid.master) public DataSource masterDataSource() { return DruidDataSourceBuilder.create().build(); } // ... 类似地定义 tenant_001, tenant_002 等Bean Bean Primary // 标记为主要数据源Spring事务管理等默认会使用它 public DataSource dynamicDataSource(DataSource masterDataSource, DataSource tenant001DataSource, ...) { MapObject, Object targetDataSources new HashMap(); targetDataSources.put(master, masterDataSource); targetDataSources.put(tenant_001, tenant001DataSource); // ... DynamicDataSource ds new DynamicDataSource(); ds.setDefaultTargetDataSource(masterDataSource); ds.setTargetDataSources(targetDataSources); return ds; } }方案优点实现简单配置直观。启动时即加载性能好。方案缺点与局限无法动态增删正如前文所述数据源在启动后就被固化无法应对运行时新增租户新数据源的场景。配置冗长如果租户数量成百上千配置文件将变得难以维护。不够灵活数据源配置变更如密码修改、地址迁移需要重启应用。因此这种方案仅适用于数据源数量固定且极少变化的场景。2.2.2 基于数据库表的动态方案对于SaaS或多租户平台数据源往往是随着租户的注册而动态创建和管理的。这时我们就需要将数据源的配置信息存储到数据库通常是主库的一张表中。我们需要设计一张数据源配置表例如sys_datasource其核心字段包括数据源唯一标识ds_key 如tenant_001、JDBC URL、用户名、密码、驱动类名、连接池参数初始大小、最大连接数等、状态启用/禁用等。应用启动时从这张表中加载所有启用的数据源配置并使用DataSourceBuilder动态创建DataSource实例放入一个我们自己管理的容器例如一个ConcurrentHashMap中。同时我们需要自定义一个DynamicDataSource类它不再完全依赖父类的resolvedDataSources而是重写determineTargetDataSource()方法使其从我们自管理的容器中根据lookup key获取数据源。方案优点高度动态化可以在运行时通过管理界面增、删、改、启用、禁用数据源无需重启应用。集中管理所有数据源配置一目了然便于维护和审计。扩展性强可以轻松集成监控、保活等高级功能。方案缺点实现复杂需要自行处理数据源的加载、缓存、刷新和销毁生命周期。启动依赖应用启动依赖于主库的可连接性因为需要从主库读取配置。连接池管理需要谨慎处理动态创建的连接池避免内存泄漏。实操心得动态数据源的缓存与保活直接从数据库表实时查询配置来创建连接池的性能是不可接受的。我们必须在内存中做一层缓存。通常的做法是启动时全量加载到本地缓存Map中同时开启一个定时任务比如每5分钟同步数据库中的变更如状态切换、配置更新。对于配置更新需要优雅地重建连接池先创建新的连接池验证连接有效后再替换缓存中的旧实例并异步关闭旧池避免服务中断。此外还可以定时对缓存中的连接池执行一次简单的查询如SELECT 1进行保活防止数据库端因空闲超时断开连接。3. 基于数据库表的动态数据源完整实现接下来我们深入细节实现一个基于数据库表管理的、生产可用的动态数据源方案。3.1 核心接口与类设计首先定义一个数据源管理器的接口这是对我们自定义数据源容器行为的抽象。public interface DataSourceManager { /** 添加或更新一个数据源 */ void put(String dataSourceKey, DataSource dataSource); /** 获取一个数据源 */ DataSource get(String dataSourceKey); /** 判断是否存在 */ boolean containsKey(String dataSourceKey); /** 移除一个数据源会关闭其连接池 */ void remove(String dataSourceKey); /** 获取所有数据源Key */ SetString keySet(); }然后实现我们自己的DynamicDataSource。它需要继承AbstractRoutingDataSource以实现路由能力同时实现DataSourceManager接口以提供动态管理能力。public class DynamicDataSource extends AbstractRoutingDataSource implements DataSourceManager, InitializingBean { // 本地缓存存储所有动态数据源。Key: ds_key, Value: DataSource private final ConcurrentHashMapString, DataSource dataSourceMap new ConcurrentHashMap(); // 默认数据源主库通常从配置文件注入 private DataSource defaultDataSource; Override protected Object determineCurrentLookupKey() { // 从线程上下文中获取当前数据源键 return DataSourceContextHolder.get(); } Override public DataSource determineTargetDataSource() { Object lookupKey determineCurrentLookupKey(); if (lookupKey null) { return defaultDataSource; } // 优先从我们自己的map中查找 DataSource ds dataSourceMap.get(lookupKey.toString()); if (ds ! null) { return ds; } // 如果没找到回退到默认数据源或抛出异常根据业务决定 logger.warn(DataSource key [{}] not found, using default., lookupKey); return defaultDataSource; } // 实现DataSourceManager接口的方法 Override public void put(String key, DataSource dataSource) { dataSourceMap.put(key, dataSource); // 注意这里并没有更新父类的resolvedDataSources因为父类的机制是固定的。 // 我们的路由逻辑已经重写所以只需要更新自己的map即可。 } Override public DataSource get(String key) { return dataSourceMap.get(key); } // ... 其他接口方法实现 Override public void afterPropertiesSet() { // 父类初始化设置默认数据源这个targetDataSources可以传空因为我们重写了determineTargetDataSource super.setDefaultTargetDataSource(defaultDataSource); super.setTargetDataSources(Collections.emptyMap()); // 传空Map避免父类初始化无关数据源 super.afterPropertiesSet(); // 启动后从数据库加载所有数据源配置到dataSourceMap this.loadDataSourcesFromDatabase(); } private void loadDataSourcesFromDatabase() { // 1. 从主库defaultDataSource查询数据源配置表 // 2. 遍历配置使用DataSourceBuilder.create().build() 创建DataSource实例 // 3. 调用 put(key, dataSource) 放入缓存 } }这里的关键点在于我们完全重写了determineTargetDataSource()方法绕过了父类对resolvedDataSources的依赖转而查询我们自己的dataSourceMap。父类的afterPropertiesSet()方法仍然需要调用以完成一些基本的初始化但我们传入一个空的targetDataSources因为具体的数据源由我们自己管理。3.2 线程上下文与AOP切面设计数据源键lookup key需要与当前执行线程绑定。我们使用ThreadLocal来实现一个简单的上下文持有器。public class DataSourceContextHolder { private static final ThreadLocalString CONTEXT_HOLDER new ThreadLocal(); public static void set(String dataSourceKey) { CONTEXT_HOLDER.set(dataSourceKey); } public static String get() { return CONTEXT.HOLDER.get(); } public static void clear() { CONTEXT_HOLDER.remove(); } }为了让业务代码无感知地切换数据源我们使用Spring AOP。定义一个注解SwitchDataSource。Target({ElementType.METHOD, ElementType.TYPE}) Retention(RetentionPolicy.RUNTIME) Documented public interface SwitchDataSource { String value(); // 数据源键 }然后编写切面在方法执行前设置上下文执行后清理。Aspect Component Order(1) // 顺序要设置在事务切面之前因为连接获取需要在事务开启之前确定 public class DataSourceAspect { Around(annotation(switchDataSource)) public Object around(ProceedingJoinPoint point, SwitchDataSource switchDataSource) throws Throwable { String previousKey DataSourceContextHolder.get(); String currentKey switchDataSource.value(); try { DataSourceContextHolder.set(currentKey); return point.proceed(); } finally { // 恢复之前的数据源键如果之前没有则为null会清除 if (previousKey ! null) { DataSourceContextHolder.set(previousKey); } else { DataSourceContextHolder.clear(); } } } }业务使用示例Service public class OrderService { Autowired private OrderMapper orderMapper; // 操作应用库 Autowired private UserMapper userMapper; // 操作主库 // 该方法内所有数据库操作默认使用主库 public void processOrder(Long userId, Order order) { User user userMapper.selectById(userId); // 从主库查用户 // 切换到该用户所属租户的应用库 switchToTenantDb(user.getTenantId()); orderMapper.insert(order); // 插入到应用库 } SwitchDataSource(${tenant.id}) // 假设通过某种方式解析出tenantId private void switchToTenantDb(String tenantId) { // 方法体可以为空AOP会处理切换 // 或者更常见的做法是将SwitchDataSource直接标在Mapper方法或Service方法上 } }注意事项AOP的陷阱执行顺序数据源切换切面Order(1)必须在事务管理切面Transactional 通常Order为Integer.MAX_VALUE或默认之前执行。因为Spring事务管理器在开启事务时会从DataSource获取一个连接并绑定到当前线程。如果先开启了事务获取了默认数据源的连接再切换数据源就无效了。确保你的切面Order值小于事务切面。嵌套调用在同一个类内部的方法调用this.method()是不会被AOP拦截的这是Spring AOP基于代理的局限性。因此SwitchDataSource注解最好标注在Controller调用的入口Service方法上或者使用AspectJ编译时织入来规避此问题。上下文清理务必在finally块中清理ThreadLocal否则可能导致内存泄漏和后续请求数据源错乱。4. 多数据源下的分布式事务困境与Spring事务本质数据源切换问题解决了但更大的挑战随之而来事务一致性。假设我们在一个方法里先向主库插入一条日志再向应用库插入订单。如果订单插入失败我们期望日志插入也回滚。但在默认的Spring声明式事务Transactional下这是做不到的。4.1 为什么Transactional在多数据源下失效默认的DataSourceTransactionManager是与一个特定的DataSource绑定的。当你在一个Transactional方法内即使通过AOP切换了数据源事务管理器在方法开始时获取的连接来自默认数据源已经被绑定到当前事务。后续切换数据源后DAO操作从新的数据源获取了另一个连接这个连接并不在最初开启的事务管理范围之内。因此你实际上是在用两个独立的事务操作两个数据库它们之间没有任何原子性保证。一种常见的错误尝试是使用Propagation.REQUIRES_NEWSwitchDataSource(master) Transactional public void outerMethod() { masterDao.insert(); // 操作主库事务A innerMethod(); // 调用内层方法 // 如果这里抛出异常... } SwitchDataSource(tenant_001) Transactional(propagation Propagation.REQUIRES_NEW) public void innerMethod() { tenantDao.insert(); // 操作应用库事务B }这会导致innerMethod启动一个全新的、独立的事务B。如果outerMethod在innerMethod成功后抛出异常事务A回滚但事务B已经提交数据不一致就此产生。4.2 重新审视Spring事务的本质要解决这个问题我们需要拨开云雾理解Spring事务管理器的本质。我们常说的“Spring事务”其实是对JDBC事务或JPA等资源本地事务的一种封装和增强管理。事务的起点begin在JDBC中并没有一个明确的begin命令。事务的开启实际上是通过connection.setAutoCommit(false)来实现的。Spring事务管理器在获取连接后会将其自动提交设置为false。提交commit与回滚rollback这是JDBC连接对象的核心方法Spring在事务成功或失败时调用它们。挂起suspend与恢复resume这是Spring为了处理事务传播行为如REQUIRES_NEW而创造的概念。挂起并不是真的把数据库事务挂起而是将当前线程绑定的连接ConnectionHolder从事务同步管理器TransactionSynchronizationManager中解绑并暂存起来。恢复则是重新绑定。关闭close在Spring中事务结束后的close操作通常是将连接归还给连接池而不是物理关闭。理解了这些我们就明白要想实现跨库事务核心在于让多个数据库连接参与到同一个“事务协调”过程中并在最终决定提交或回滚时让所有连接保持一致。在单体应用、同一个JVM内我们可以尝试实现一个“应用层的事务协调器”这就是下面要讲的自定义多数据源事务管理。5. 自定义多数据源事务管理器实现方案我们的目标是在一个MultiTransaction注解标记的方法内无论操作多少个数据源这些操作要么全部成功要么全部回滚。思路是拦截所有获取数据库连接的操作返回一个被我们“包装”过的连接这个连接的commit和rollback方法被我们控制。然后在一个统一的切面里管理所有这些包装连接的最终提交与回滚。5.1 整体架构与核心组件这个方案涉及几个核心组件ConnectionProxy连接代理包装真实的JDBCConnection重写其commit和close方法使其不立即生效而是由我们统一调度。TransactionHolder事务持有器绑定到当前线程用于管理本次“全局事务”涉及的所有ConnectionProxy以及事务的嵌套栈信息。MultiTransactionManager多事务管理器核心协调器负责开启、注册连接、提交、回滚等全局操作。MultiTransactionAspect事务切面处理MultiTransaction注解串联整个流程。5.2 详细实现步骤5.2.1 定义多事务注解Target(ElementType.METHOD) Retention(RetentionPolicy.RUNTIME) Documented public interface MultiTransaction { /** 指定使用的事务管理器Bean名称 */ String transactionManager() default multiTransactionManager; /** 事务隔离级别默认跟随数据库 */ Isolation isolation() default Isolation.DEFAULT; /** 事务的默认数据源事务起点 */ String defaultDataSource() default master; /** 是否只读事务 */ boolean readOnly() default false; }5.2.2 实现ConnectionProxy这是实现事务控制的关键。我们创建一个代理连接它持有真实的连接但屏蔽了其原生的提交和关闭行为。public class ConnectionProxy implements Connection { private final Connection originalConnection; // 标记该连接是否已被当前全局事务管理 private volatile boolean managed false; // 记录这个连接上执行了多少次“获取”操作用于引用计数 private final AtomicInteger referenceCount new AtomicInteger(0); public ConnectionProxy(Connection originalConnection) { this.originalConnection originalConnection; } Override public void commit() throws SQLException { // 关键屏蔽原生提交。真正的提交由MultiTransactionManager统一处理。 // do nothing logger.debug(Commit operation is intercepted and suppressed on proxy connection.); } Override public void rollback() throws SQLException { // 同样屏蔽原生回滚。但这里有个细节如果业务代码主动调用rollback我们可能需要处理。 // 一种策略是记录一个标记在全局回滚时使用。 // 为了简化我们先屏蔽所有回滚由管理器控制。 logger.debug(Rollback operation is intercepted and suppressed on proxy connection.); } Override public void close() throws SQLException { // 关键屏蔽原生关闭。我们通过引用计数来控制何时真正关闭连接。 int count referenceCount.decrementAndGet(); if (count 0) { // 当所有使用方都“关闭”后由事务管理器在全局提交或回滚后统一真实关闭。 // 这里只是标记实际关闭逻辑在MultiTransactionManager中。 logger.debug(All references released, connection marked for later real close.); } } public void realCommit() throws SQLException { if (!originalConnection.getAutoCommit()) { originalConnection.commit(); } } public void realRollback() throws SQLException { if (!originalConnection.getAutoCommit()) { originalConnection.rollback(); } } public void realClose() throws SQLException { // 真正关闭前确保连接处于自动提交模式避免连接池拿到一个未提交事务的连接。 if (!originalConnection.getAutoCommit()) { originalConnection.setAutoCommit(true); } originalConnection.close(); } public Connection getOriginalConnection() { return originalConnection; } // ... 需要实现Connection接口的所有其他方法大部分直接委托给originalConnection // 这是一个繁琐但必要的工作可以使用动态代理如JDK Proxy简化但这里为了清晰展示原理使用静态包装。 }5.2.3 实现TransactionHolder这个类持有当前线程的全局事务状态。public class TransactionHolder { // 全局事务ID private final String globalTransactionId UUID.randomUUID().toString(); // 事务隔离级别 private final Isolation isolation; // 是否只读 private final boolean readOnly; // 存储数据源键与ConnectionProxy的映射 private final MapString, ConnectionProxy connectionMap new ConcurrentHashMap(); // 事务执行栈用于支持嵌套虽然多数据源嵌套事务场景复杂这里先提供基础支持 private final DequeString transactionStack new ArrayDeque(); // 数据源键栈与事务栈对应 private final DequeString dataSourceKeyStack new ArrayDeque(); public TransactionHolder(Isolation isolation, boolean readOnly) { this.isolation isolation; this.readOnly readOnly; // 初始压入一个空栈帧代表最外层事务的起点 transactionStack.push(ROOT_ globalTransactionId); dataSourceKeyStack.push(null); // 最外层可能还没有指定数据源 } public void registerConnection(String dataSourceKey, ConnectionProxy connection) { connectionMap.putIfAbsent(dataSourceKey, connection); } public ConnectionProxy getConnection(String dataSourceKey) { return connectionMap.get(dataSourceKey); } public CollectionConnectionProxy getAllConnections() { return connectionMap.values(); } // ... getters and setters }5.2.4 改造DynamicDataSource以支持连接代理我们需要修改之前DynamicDataSource的getConnection()方法使其在全局事务开启时返回ConnectionProxy。public class DynamicDataSource extends AbstractRoutingDataSource implements DataSourceManager { // ... 之前的字段和方法 Override public Connection getConnection() throws SQLException { return getConnection(null, null); } Override public Connection getConnection(String username, String password) throws SQLException { // 1. 判断当前线程是否处于自定义的全局事务中 TransactionHolder holder MultiTransactionContext.getCurrentTransactionHolder(); if (holder null) { // 不在全局事务中直接返回原始连接 return determineTargetDataSource().getConnection(); } // 2. 在全局事务中获取当前数据源键 String currentLookupKey determineCurrentLookupKey(); if (currentLookupKey null) { currentLookupKey default; // 或使用注解指定的defaultDataSource } // 3. 从TransactionHolder中获取已注册的代理连接 ConnectionProxy proxy holder.getConnection(currentLookupKey); if (proxy ! null) { // 增加引用计数模拟多次getConnection proxy.incrementReference(); return proxy; } // 4. 如果还没有为该数据源创建代理连接则创建并注册 DataSource targetDs determineTargetDataSource(); Connection originalConn targetDs.getConnection(); // 设置连接属性关闭自动提交设置隔离级别 if (originalConn.getAutoCommit()) { originalConn.setAutoCommit(false); } // 根据holder中的隔离级别设置连接隔离级别如果非DEFAULT // ... proxy new ConnectionProxy(originalConn); holder.registerConnection(currentLookupKey, proxy); proxy.incrementReference(); // 初始引用计数为1 return proxy; } }5.2.5 实现MultiTransactionManager与切面这是最复杂的部分负责事务边界的控制。Component(multiTransactionManager) public class MultiTransactionManager { // 用于将TransactionHolder绑定到线程 private static final ThreadLocalTransactionHolder CURRENT_HOLDER new ThreadLocal(); public void begin(String defaultDataSourceKey, Isolation isolation, boolean readOnly) { if (CURRENT_HOLDER.get() ! null) { // 处理嵌套事务这里简化处理暂不支持真正的嵌套。可以抛出异常或合并处理。 throw new IllegalTransactionStateException(Nested multi-transactions are not fully supported.); } TransactionHolder holder new TransactionHolder(isolation, readOnly); CURRENT_HOLDER.set(holder); // 设置初始数据源上下文如果指定了 if (defaultDataSourceKey ! null) { DataSourceContextHolder.set(defaultDataSourceKey); } } public void commit() { TransactionHolder holder CURRENT_HOLDER.get(); if (holder null) { throw new TransactionException(No transaction in progress); } try { for (ConnectionProxy conn : holder.getAllConnections()) { conn.realCommit(); // 统一提交所有代理连接 } } catch (SQLException e) { throw new TransactionException(Failed to commit multi-transaction, e); } finally { closeAllConnections(holder); cleanup(); } } public void rollback() { TransactionHolder holder CURRENT_HOLDER.get(); if (holder null) { throw new TransactionException(No transaction in progress); } try { for (ConnectionProxy conn : holder.getAllConnections()) { conn.realRollback(); // 统一回滚所有代理连接 } } catch (SQLException e) { logger.error(Failed to rollback connection, e); // 回滚失败也要继续清理 } finally { closeAllConnections(holder); cleanup(); } } private void closeAllConnections(TransactionHolder holder) { for (ConnectionProxy conn : holder.getAllConnections()) { try { conn.realClose(); } catch (SQLException e) { logger.warn(Failed to close connection, e); } } } private void cleanup() { CURRENT_HOLDER.remove(); DataSourceContextHolder.clear(); // 清理数据源上下文 } public static TransactionHolder getCurrentTransactionHolder() { return CURRENT_HOLDER.get(); } }最后编写AOP切面来驱动整个流程Aspect Component Order(0) // 优先级最高需要在数据源切换切面之前执行这里需要仔细设计顺序。实际上事务开始需要先确定数据源。 public class MultiTransactionAspect { Autowired private MultiTransactionManager transactionManager; Around(annotation(multiTx)) public Object handleMultiTransaction(ProceedingJoinPoint pjp, MultiTransaction multiTx) throws Throwable { // 1. 获取注解属性 Isolation isolation multiTx.isolation(); boolean readOnly multiTx.readOnly(); String defaultDs multiTx.defaultDataSource(); // 2. 保存旧的数据源上下文 String previousDataSourceKey DataSourceContextHolder.get(); // 3. 开启全局事务 transactionManager.begin(defaultDs, isolation, readOnly); // 设置注解指定的默认数据源 DataSourceContextHolder.set(defaultDs); try { // 4. 执行业务方法 Object result pjp.proceed(); // 5. 提交事务 transactionManager.commit(); return result; } catch (Throwable e) { // 6. 回滚事务 transactionManager.rollback(); throw e; } finally { // 7. 恢复旧的数据源上下文 if (previousDataSourceKey ! null) { DataSourceContextHolder.set(previousDataSourceKey); } else { DataSourceContextHolder.clear(); } } } }5.3 方案局限性、注意事项与排查技巧这个自定义方案实现了在单体应用内跨多个数据源的事务协调但它并非银弹有显著的局限性非标准分布式事务这本质上是“最佳努力一阶段提交”Best-Efforts 1PC。它没有两阶段提交2PC的“准备”阶段无法保证在所有极端情况下如提交过程中某个数据库崩溃的强一致性。它依赖于所有数据源在commit时都能成功。性能开销每个参与事务的连接在整个事务期间都被占用直到全局结束。长事务会导致数据库连接持有时间过长影响系统吞吐量。连接泄露风险必须极其小心地处理ConnectionProxy的引用计数和真实关闭逻辑否则会导致数据库连接池耗尽。嵌套事务支持复杂上述示例简化了嵌套事务。在真实场景中需要更精细地管理事务栈和连接的生命周期。ORM框架兼容性此方案直接操作JDBC连接。如果使用MyBatis它通常能很好地工作。但如果使用JPA (Hibernate)情况会复杂得多因为Hibernate有自己的连接管理和事务同步机制需要更深入的集成。常见问题排查表问题现象可能原因排查步骤与解决方案数据源切换无效始终使用默认库1. AOP顺序问题事务切面先于数据源切面执行。2. 方法内部调用导致AOP失效。3.ThreadLocal上下文未正确清理污染了后续请求。1. 检查Order注解确保数据源切面(DataSourceAspect)的order值小于事务切面(MultiTransactionAspect)。2. 将SwitchDataSource注解移到被外部调用的方法上或使用Autowired注入自身代理对象进行内部调用。3. 确保切面的finally块中正确恢复了上下文。自定义事务MultiTransaction内操作未回滚1.ConnectionProxy的commit/rollback方法未被正确拦截。2. 业务代码捕获了异常未抛出。3. 参与事务的某个操作未使用代理连接如直接new了一个连接。1. 调试确认返回的连接是ConnectionProxy类型并检查其方法是否被重写。2. 确保业务方法抛出的异常是RuntimeException或注解中rollbackFor指定的异常。3. 确保所有数据库操作都通过Spring管理的DataSource/SqlSessionTemplate等获取连接。报错“Connection is closed”或连接泄露1.ConnectionProxy的close()和realClose()逻辑有误。2. 引用计数未正确管理导致连接过早被真实关闭。3. 全局事务结束后未清理ThreadLocal。1. 检查close()方法是否只减少引用计数realClose()是否在全局提交/回滚后调用。2. 在getConnection和close方法中加入详细日志跟踪引用计数变化。3. 确保MultiTransactionManager的cleanup()方法在finally块中被调用。与Spring原生Transactional混用出错两种事务管理器冲突连接管理混乱。强烈不建议混用。在需要使用跨库事务的服务层统一使用MultiTransaction。对于单库操作可以使用原生Transactional但要注意两者不要嵌套。最好在架构上明确分层。6. 总结与更高维度的思考通过上述长篇的剖析与实现我们完成了一个从动态数据源管理到自定义跨库事务的完整解决方案。这个方案适用于传统的单体SpringBoot应用在面对“一主多从”或“一主多租户”数据库架构时提供了一种可行的数据隔离与一致性保障思路。然而我们必须清醒地认识到这个方案是有边界的。它的核心局限在于“同一个JVM进程”。一旦你的应用走向微服务化服务A操作数据库A服务B操作数据库B它们之间的数据一致性就无法通过这种应用层事务管理器来保证了。这时你就需要引入真正的分布式事务解决方案例如基于Seata的AT模式、TCC模式或者基于消息队列的最终一致性方案如本地消息表、RocketMQ事务消息等。技术选型的建议如果数据源数量固定且少优先考虑Spring内置的AbstractRoutingDataSourceTransactional(propagationPropagation.REQUIRES_NEW)并接受可能的数据不一致风险通过业务补偿如对账、冲正来解决。如果数据源动态增删且对一致性要求不是极端严格例如操作失败可接受人工干预修复本文的自定义动态数据源应用层事务方案是一个不错的选择。如果涉及跨服务、跨数据库的强一致性要求不要犹豫直接上成熟的分布式事务中间件如Seata。虽然复杂度高但它提供了标准化的解决方案和更强的保障。最后分享一个我在实际项目中踩过的坑在实现ConnectionProxy时最初我忽略了连接池的autoCommit状态。事务开始时将连接设为autoCommitfalse但在realClose时没有设回true导致连接归还给连接池后下一个使用者拿到的是一个处于事务中的连接引发了诡异的“只读”或“锁等待”问题。所以对连接状态的任何修改都必须确保在归还前恢复到连接池预期的默认状态这是一个至关重要的细节。