分布式事务解决方案之 Seata(二):Seata AT 模式

分布式事务解决方案之 Seata(二):Seata AT 模式 前言配套代码仓库https://github.com/iweidujiang/spring-cloud-alibaba-lab 对应模块 15-seata-atSeata Server 部署见 14-seata-deploy。通过上一篇文章对分布式事务解决方案的介绍我们已经对两阶段提交、TCC及基于MQ的最终一致性有所了解了。Seata提供了AT、TCC、SAGA和XA事务模式他是一站式的分布式解决方案。本文将先介绍Seata的AT模式他是基于两阶段提交的演变。Seata AT 模式是一种非侵入式的分布式事务解决方案在 AT 模式下我们只需关注自己的业务 SQL业务 SQL作为一阶段Seata 框架会自动生成事务的二阶段提交和回滚操作。Seata 在内部做了对数据库操作的代理层我们使用 Seata AT 模式时实际上用的是 Seata 自带的数据源代理DataSourceProxySeata 在这层代理中加入了很多逻辑比如插入回滚undo_log日志检查全局锁等。Seata AT 模式整体机制前面说过AT模式是两阶段提交协议的演变其实现机制为一阶段业务数据和回滚日志记录在同一个本地事务中提交释放本地锁和连接资源。二阶段提交异步化非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿。在一阶段中Seata 会拦截业务 SQL首先解析 SQL 语义找到要更新的业务数据在数据被更新前保存下来放到undo_log表然后执行业务SQL更新数据更新之后再次保存数据redo最后生成行锁这些操作都在本地数据库事务内完成这样保证了一阶段的原子性。相对一阶段二阶段比较简单负责整体的回滚和提交如果在一阶段中的事务全部执行通过那么执行全局提交如果之前的一阶段中有本地事务没有通过那么就执行全局回滚回滚用到的就是一阶段记录的undo_log通过回滚记录生成反向更新SQL并执行以完成分支的回滚。Seata 术语TC (Transaction Coordinator) - 事务协调者维护全局和分支事务的状态驱动全局事务提交或回滚。TM (Transaction Manager) - 事务管理器定义全局事务的范围开始全局事务、提交或回滚全局事务。RM (Resource Manager) - 资源管理器管理分支事务处理的资源与TC交谈以注册分支事务和报告分支事务的状态并驱动分支事务提交或回滚。当然事务完成后会释放所有资源和删除所有日志。undo_log表稍后我们会演示观察记录。实战演示 Seata AT 模式解决分布式事务问题案例提供两个服务seata-order-service和seata-ware-service订单服务实现创建订单业务业务包括扣减库存和新增订单。扣减库存是通过OpenFeign进行远程调用仓库服务通过操作数据库seata-ware的表t_ware进行库存量减一操作执行update语句而创建订单则是操作另一个数据库seata-order的表t_order执行insert语句。也就是说这两个服务操作了两个数据库有可能会产生分布式事务的问题。分布式事务问题的产生先看两个服务分别执行 SQL 操作的代码。仓库服务DAOMapper public interface WareMapper extends BaseMapperWare { Update(update t_ware set stockstock-1 where sku_id#{skuId}) void deductStock(Long skuId); }ServiceService Slf4j public class WareServiceImpl extends ServiceImplWareMapper, Ware implements WareService { Autowired private WareMapper wareMapper; Override public void deductStock(Long skuId) { log.info(开始扣减库存skuId{}, skuId); wareMapper.deductStock(skuId); } }ControllerRestController RequestMapping(/ware) public class WareController { Autowired private WareService wareService; GetMapping(/deduct) public void deductStock(RequestParam Long skuId) { wareService.deductStock(skuId); } }订单服务DAOMapper public interface OrderMapper extends BaseMapperOrder { }新增订单的insert语句直接使用Mybatis-Plus提供的默认实现FeignClientFeignClient(seata-ware-service) public interface WareFeignClient { GetMapping(/ware/deduct) void deductStock(RequestParam(value skuId) Long skuId); }ServiceService Slf4j public class OrderServiceImpl extends ServiceImplOrderMapper, Order implements OrderService { Autowired private OrderMapper orderMapper; Autowired private WareFeignClient wareFeignClient; Override Transactional(rollbackFor Exception.class) public void createOrder(Order order) { log.info(开始扣减库存skuId{}, order.getSkuId()); // 扣减库存 wareFeignClient.deductStock(order.getSkuId()); log.info(扣减库存完成skuId{}, order.getSkuId()); // 订单号 String orderSn IdWorker.getTimeId(); order.setOrderSn(orderSn); order.setCreateTime(new Date()); log.info(开始创建订单:{}, order); log.error(此处添加异常order.getId()此时为null模拟分布式事务出现{}, order.getId().toString()); // 创建订单 orderMapper.insert(order); log.info(创建订单完成); } }Service 中先远程调用执行减库存然后在插入订单之前模拟一个异常出现order.getId().toString()此时还未执行insertorder.getId()为null所以此处会出现异常因此下面的insert语句就不会继续执行了而前面的减库存操作却已经执行成功库存减了订单未增加这样就出现了分布式事务的问题。用Spring的Transactional注解看一下能否解决此问题即看一下数据库的数据是否一致。数据库数据初始状态调用创建订单接口http://localhost:8007/order/create按照我们预先设置的异常该接口出现异常了我们来看一下数据库数据的变化从数据库中的数据可以看到即使我们在业务接口上加了Transactional(rollbackFor Exception.class)注解也对分布式事务没有办法解决数据最终还是不一致因为库存扣减了订单却没有相应的增加。使用 Seata 的 AT 模式解决分布式事务问题从前面的案例我们已经得知Spring的Transactional并不能解决分布式事务的问题我们就以Seata提供的方案来处理。Seata解决分布式事务的默认模式就是AT模式。1引入 Seata 依赖dependency groupIdcom.alibaba.cloud/groupId artifactIdspring-cloud-starter-alibaba-seata/artifactId /dependency2涉及到分布式事务的服务数据库均新建undo_log表CREATE TABLE undo_log ( id bigint(20) NOT NULL AUTO_INCREMENT, branch_id bigint(20) NOT NULL, xid varchar(100) NOT NULL, context varchar(128) NOT NULL, rollback_info longblob NOT NULL, log_status int(11) NOT NULL, log_created datetime NOT NULL, log_modified datetime NOT NULL, PRIMARY KEY (id), UNIQUE KEY ux_undo_log (xid,branch_id) ) ENGINEInnoDB AUTO_INCREMENT1 DEFAULT CHARSETutf8;3在两个微服务的application.yml配置文件分别加入Seata的配置seata: tx-service-group: default_tx_group service: vgroup-mapping: default_tx_group: default registry: type: nacos nacos: server-addr: 127.0.0.1:8848 namespace: group: SEATA_GROUP config: type: nacos nacos: />seata.service.vgroupMapping.事务分组名称该配置项的值为 TC 集群名称根据上图可以看到此处的值应为default。seata.registry.xx注册中心这里选择的是Nacos。seata.config.xx配置中心这里也是Nacos。4在 TM 端使用GlobalTransactional开启全局事务Override GlobalTransactional //Transactional(rollbackFor Exception.class) public void createOrder(Order order) { log.info(开始扣减库存skuId{}, order.getSkuId()); // 扣减库存 wareFeignClient.deductStock(order.getSkuId()); log.info(扣减库存完成skuId{}, order.getSkuId()); // 订单号 String orderSn IdWorker.getTimeId(); order.setOrderSn(orderSn); order.setCreateTime(new Date()); log.info(开始创建订单:{}, order); log.error(此处添加异常order.getId()此时为null模拟分布式事务出现{}, order.getId().toString()); // 创建订单 orderMapper.insert(order); log.info(创建订单完成); }好了经过以上几步我们先恢复数据库数据的值为初始值然后再次测试。数据已恢复至初始值再次执行接口发现执行完成以后并没有达到想要的事务回滚的效果通过服务日志看到一直再打印如下日志transaction [127.0.0.1:8091:18317606214187586] current status is [RollbackRetrying]Seata Server端也有日志此时看一下undo_log表种种迹象都在说该事务在尝试回滚but就是一直回滚不成功再看一下微服务的日志可以看到有这样一个提示reason:[Branch session rollback failed and try again later xid 127.0.0.1:8091:18317606214181627 branchId 18317606214181630 Class cannot be created (missing no-arg constructor): java.time.LocalDateTime这是 Seata 的一个 Bug详细的 Issue 见https://github.com/seata/seata/issues/3620该 bug 在1.4.2版本提供了 SPI 扩展接口可以自定义一个序列化类具体做法是1新建一个专门序列化java.time.LocalDateTime类型的类package io.github.iweidujiang.lab15.common.seata; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import io.seata.rm.datasource.undo.parser.spi.JacksonSerializer; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; /** * seata LocalDateTime 序列化扩展点 * * 博客https://chendapeng.cn - 行百里者半九十凡事善始善终吾将上下而求索 * 公众号行百里er * * author 行百里者 * date 2022-09-02 21:17 */ public class LocalDateTimeJacksonSerializer implements JacksonSerializerLocalDateTime { public static final String NORM_DATETIME_MS_PATTERN yyyy-MM-dd HH:mm:ss.SSS; Override public ClassLocalDateTime type() { return LocalDateTime.class; } Override public JsonSerializerLocalDateTime ser() { return new LocalDateTimeSerializer(DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN)); } Override public JsonDeserializer? extends LocalDateTime deser() { return new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN)); } }2在resources目录下新建META-INF/seata文件夹并在其下新增io.seata.rm.datasource.undo.parser.spi.JacksonSerializer文件文件内容为io.github.iweidujiang.lab15.common.seata.LocalDateTimeJacksonSerializer两个微服务均要如此做。然后我们再来调用一下http://localhost:8007/order/create调用完成后2022-09-08 14:28:56.551 INFO 3992 --- [nio-8008-exec-1] c.c.s.s.service.impl.WareServiceImpl : 开始扣减库存skuId10086 2022-09-08 14:28:56.576 INFO 3992 --- [nio-8008-exec-1] i.s.c.rpc.netty.RmNettyRemotingClient : will register resourceId:jdbc:mysql://127.0.0.1:3306/seata-ware 2022-09-08 14:28:56.584 INFO 3992 --- [ctor_RMROLE_1_1] io.seata.rm.AbstractRMHandler : the rm client received response msg [version1.4.2,extraDatanull,identifiedtrue,resultCodenull,msgnull] from tc server. 2022-09-08 14:28:56.787 INFO 3992 --- [nio-8008-exec-1] i.s.r.d.u.parser.JacksonUndoLogParser : jackson undo log parser load [io.github.iweidujiang.lab15.common.seata.LocalDateTimeJacksonSerializer]. 2022-09-08 14:29:57.071 INFO 3992 --- [h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:xid127.0.0.1:8091:18318220201103576,branchId18318220201103579,branchTypeAT,resourceIdjdbc:mysql://127.0.0.1:3306/seata-ware,applicationDatanull 2022-09-08 14:29:57.075 INFO 3992 --- [h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 127.0.0.1:8091:18318220201103576 18318220201103579 jdbc:mysql://127.0.0.1:3306/seata-ware 2022-09-08 14:29:57.187 INFO 3992 --- [h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 127.0.0.1:8091:18318220201103576 branch 18318220201103579, undo_log deleted with GlobalFinished 2022-09-08 14:29:57.189 INFO 3992 --- [h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked再次查看数据数据一致库存没有减订单没有增。AT 模式工作机制分析以上面的案例来分析 AT 模式的工作机制。库存表seata-ware.t_waremysql describe t_ware; ----------------------------------------------------------- | Field | Type | Null | Key | Default | Extra | ----------------------------------------------------------- | id | bigint | NO | PRI | NULL | auto_increment | | sku_id | bigint | YES | | NULL | | | stock | int | YES | | NULL | | | create_time | datetime | YES | | NULL | | | update_time | datetime | YES | | NULL | | -----------------------------------------------------------AT 分支事务的业务逻辑是Update(update t_ware set stockstock-1, update_timenow() where sku_id#{skuId}) void deductStock(Long skuId);具体的 SQL 执行语句update t_ware set stockstock-1,update_timenow() where sku_id10086执行一阶段该阶段的执行过程1解析 SQL得到 SQL 的类型UPDATE表product条件where name ‘TXC’等相关的信息。2查询前镜像根据解析得到的条件信息生成查询语句定位数据。select id,sku_id,stock,create_time,update_time from t_ware where sku_id10086得到执行前的镜像idsku_idstockcreate_timeupdate_time11008610002022-09-01 17:14:162022-09-01 17:14:163执行业务 SQL更新这条记录的 stock 为 999stockstock-1。4查询后镜像根据前镜像的结果通过主键定位数据。select id,sku_id,stock,create_time,update_time from t_ware where id1得到执行后的镜像idsku_idstockcreate_timeupdate_timeid100869992022-09-01 17:14:162022-09-08 14:28:495插入回滚日志表把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录插入到undo_log表中。{ class: io.seata.rm.datasource.undo.BranchUndoLog, xid: 127.0.0.1:8091:18318220201103576, branchId: 18318220201103579, sqlUndoLogs: [ java.util.ArrayList, [ { class: io.seata.rm.datasource.undo.SQLUndoLog, sqlType: UPDATE, tableName: t_ware, beforeImage: { class: io.seata.rm.datasource.sql.struct.TableRecords, tableName: t_ware, rows: [ java.util.ArrayList, [ { class: io.seata.rm.datasource.sql.struct.Row, fields: [ java.util.ArrayList, [ { class: io.seata.rm.datasource.sql.struct.Field, name: id, keyType: PRIMARY_KEY, type: -5, value: [ java.lang.Long, 1 ] }, { class: io.seata.rm.datasource.sql.struct.Field, name: stock, keyType: NULL, type: 4, value: 1000 }, { class: io.seata.rm.datasource.sql.struct.Field, name: update_time, keyType: NULL, type: 93, value: [ java.time.LocalDateTime, 2022-09-01 17:14:16.000 ] } ] ] } ] ] }, afterImage: { class: io.seata.rm.datasource.sql.struct.TableRecords, tableName: t_ware, rows: [ java.util.ArrayList, [ { class: io.seata.rm.datasource.sql.struct.Row, fields: [ java.util.ArrayList, [ { class: io.seata.rm.datasource.sql.struct.Field, name: id, keyType: PRIMARY_KEY, type: -5, value: [ java.lang.Long, 1 ] }, { class: io.seata.rm.datasource.sql.struct.Field, name: stock, keyType: NULL, type: 4, value: 999 }, { class: io.seata.rm.datasource.sql.struct.Field, name: update_time, keyType: NULL, type: 93, value: [ java.time.LocalDateTime, 2022-09-08 14:28:49.000 ] } ] ] } ] ] } } ] ] }6提交前向 TC 注册分支申请t_ware表中主键值等于 1 的记录的全局锁。7本地事务提交业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。8将本地事务提交的结果上报给 TC。执行二阶段-回滚1收到 TC 的分支回滚请求开启一个本地事务执行如下操作2通过XID和Branch ID查找到相应的 UNDO LOG 记录3数据校验拿 UNDO LOG 中的后镜与当前数据进行比较如果有不同说明数据被当前全局事务之外的动作做了修改4根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句update t_ware set stock 1000, update_time2022-09-01 17:14:16 where id 1;5提交本地事务。并把本地事务的执行结果即分支事务回滚的结果上报给 TC。执行二阶段-提交1收到 TC 的分支提交请求把请求放入一个异步任务的队列中马上返回提交成功的结果给 TC2异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。执行完成后undo_log表相应的记录被删除小结使用 Seata 解决分布式事务问题时默认开启的就是 AT 模式该模式是一种无侵入的分布式事务解决方案具体实现机制为一阶段Seata 会拦截业务 SQL首先解析 SQL 语义找到业务 SQL要更新的业务数据在业务数据被更新前将其保存成before image然后执行业务 SQL更新业务数据在业务数据更新之后再将其保存成after image最后生成行锁。以上操作全部在一个数据库事务内完成这样保证了一阶段操作的原子性。二阶段分为提交和回滚两种情况提交的情况因为业务 SQL在一阶段已经提交至数据库 所以 Seata 只需将一阶段保存的快照数据和行锁删掉完成数据清理即可。回滚的情况Seata 需要回滚一阶段已经执行的业务 SQL还原业务数据。回滚方式就是用before image还原业务数据但在还原前要首先要校验脏写对比数据库当前业务数据和after image如果两份数据完全一致就说明没有脏写可以还原业务数据如果不一致就说明有脏写出现脏写就需要转人工处理。关于出现脏写的现象可以模拟出来比如当执行完业务 SQL 后手动再去修改一次数据库中的值这样 after image 中的值和数据库中的值就不一样了这就出现了脏写的现象。从以上实现机制可以看出不管是提交还是回滚均有Seata完成我们只需要安心写我们的业务SQL即可这就是所谓的无侵入。先赞后看养成习惯。举手之劳赞有余香。本文创作于 2022-09-09 。