物流建设网站总结报告2345浏览器
AT 模式
概述
Seata AT 模式是一种非侵入式的分布式事务解决方案,Seata 在内部做了对数据库操作的代理层,我们使用 Seata AT 模式时,实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,比如插入回滚 undo_log 日志,检查全局锁等。
为什么要检查全局锁呢,这是由于 Seata AT 模式的事务隔离是建立在分支事务的本地隔离级别基础之上的,在数据库本地隔离级别读已提交或以上的前提下,Seata 设计了由事务协调器维护的全局写排他锁,来保证事务间的写隔离,同时,将全局事务默认定义在读未提交的隔离级别上。
Seata 的事务是一个全局事务,它包含了若干个分支本地事务,在全局事务执行过程中(全局事务还没执行完),某个本地事务提交了,如果 Seata 没有采取任何措施,则会导致已提交的本地事务被读取,造成脏读,如果数据在全局事务提交前已提交的本地事务被修改,则会造成脏写。
一阶段:业务数据和回滚日志在同一个本地事务提交,释放本地锁和连接资源。
二阶段:提交异步化,非常快速地完成。回滚通过一阶段的回滚日志反向补偿。
Seata AT模式隔离级别解读
适用场景
- 基于支持本地 ACID 事务的关系型数据库。
- Java 应用,通过 JDBC 访问数据库。
原理
一阶段
1、解析 SQL。得到 SQL 的类型(UPDATE)、表(product)、条件(where name = ‘TXC’)等相关的信息。
2、查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
select id, name, since from product where name = 'TXC';
得到前镜像:
id | name | since |
---|---|---|
1 | TXC | 2014 |
3、执行业务 SQL。更新这条记录的 name 为 ‘GTS’。
4、查询后镜像:根据前镜像的结果,通过 主键 定位数据。
得到后镜像:
id | name | since |
---|---|---|
1 | GTS | 2014 |
5、插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG
表中。
{"branchId": 641789253,"undoItems": [{"afterImage": {"rows": [{"fields": [{"name": "id","type": 4,"value": 1}, {"name": "name","type": 12,"value": "GTS"}, {"name": "since","type": 12,"value": "2014"}]}],"tableName": "product"},"beforeImage": {"rows": [{"fields": [{"name": "id","type": 4,"value": 1}, {"name": "name","type": 12,"value": "TXC"}, {"name": "since","type": 12,"value": "2014"}]}],"tableName": "product"},"sqlType": "UPDATE"}],"xid": "xid:xxx"
}
6、提交前,向 TC 注册分支:申请 product
表中,主键值等于 1 的记录的 全局锁 。
7、本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
8、将本地事务提交的结果上报给 TC。
二阶段
回滚
1、收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
2、通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
3、数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
4、根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
update product set name = 'TXC' where id = 1;
5、提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
提交
1、收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
2、异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
回滚日志表如下:
CREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`branch_id` bigint(20) NOT NULL,`xid` varchar(100) NOT NULL,`context` varchar(128) NOT NULL,`rollback_info` longblob NOT NULL,`log_status` int(11) NOT NULL,`log_created` datetime NOT NULL,`log_modified` datetime NOT NULL,PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
写隔离
一阶段本地事务提交之前,需要确保先拿到全局锁,这样才能提交本地事务。
尝试获取全局锁如果超时,则放弃尝试,然后回滚本地事务并释放本地锁。
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
读隔离
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
使用
假设 Bussiness 服务调用 Stock 服务和 Order 服务。
1、配置 Bussiness 服务。
-
配置 GlobalTransactionScanner 的applicationId 和 txServiceGroup(事务分组)。
-
使用 @GlobalTransactional 注解
-
可以使用 RootContext.getXID() 方法获取到 XID
-
按需修改 file.conf、registry.conf 文件的内容
@Configuration
public class SeataAutoConfig {@Beanpublic GlobalTransactionScanner globalTransactionScanner() {// 指定 applicationId、txServiceGroupreturn new GlobalTransactionScanner("dubbo-gts-seata-example", "my_test_tx_group");}
}
@Service
public class BusinessServiceImpl implements BusinessService {@Reference(version = "1.0.0")private StockDubboService stockDubboService;@Reference(version = "1.0.0")private OrderDubboService orderDubboService;private boolean flag;@Override@GlobalTransactional(timeoutMills = 300000, name = "dubbo-gts-seata-example")public ObjectResponse handleBusiness(BusinessDTO businessDTO) {System.out.println("开始全局事务,XID = " + RootContext.getXID());ObjectResponse<Object> objectResponse = new ObjectResponse<>();//1、扣减库存CommodityDTO commodityDTO = new CommodityDTO();commodityDTO.setCommodityCode(businessDTO.getCommodityCode());commodityDTO.setCount(businessDTO.getCount());ObjectResponse stockResponse = stockDubboService.decreaseStock(commodityDTO);//2、创建订单OrderDTO orderDTO = new OrderDTO();orderDTO.setUserId(businessDTO.getUserId());orderDTO.setCommodityCode(businessDTO.getCommodityCode());orderDTO.setOrderCount(businessDTO.getCount());orderDTO.setOrderAmount(businessDTO.getAmount());ObjectResponse<OrderDTO> response = orderDubboService.createOrder(orderDTO);//打开注释测试事务发生异常后,全局回滚功能// if (!flag) {// throw new RuntimeException("测试抛异常后,分布式事务回滚!");// }if (stockResponse.getStatus() != 200 || response.getStatus() != 200) {throw new DefaultException(RspStatusEnum.FAIL);}objectResponse.setStatus(RspStatusEnum.SUCCESS.getCode());objectResponse.setMessage(RspStatusEnum.SUCCESS.getMessage());objectResponse.setData(response.getData());return objectResponse;}
}
2、 配置 Stock 服务。
- 配置 DataSource、DataSourceProxy、SqlSessionFactory、GlobalTransactionScanner
- 可以使用 RootContext.getXID() 方法获取到 XID
- 按需修改 file.conf、registry.conf 文件的内容
- 对应数据库配置 UNDO_LOG 表
@Configuration
public class SeataAutoConfig {@Autowiredprivate DataSourceProperties dataSourceProperties;@Bean@Primarypublic DruidDataSource druidDataSource() {DruidDataSource druidDataSource = new DruidDataSource();druidDataSource.setUrl(dataSourceProperties.getUrl());druidDataSource.setUsername(dataSourceProperties.getUsername());druidDataSource.setPassword(dataSourceProperties.getPassword());druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());druidDataSource.setInitialSize(0);druidDataSource.setMaxActive(180);druidDataSource.setMaxWait(60000);druidDataSource.setMinIdle(0);druidDataSource.setValidationQuery("Select 1 from DUAL");druidDataSource.setTestOnBorrow(false);druidDataSource.setTestOnReturn(false);druidDataSource.setTestWhileIdle(true);druidDataSource.setTimeBetweenEvictionRunsMillis(60000);druidDataSource.setMinEvictableIdleTimeMillis(25200000);druidDataSource.setRemoveAbandoned(true);druidDataSource.setRemoveAbandonedTimeout(1800);druidDataSource.setLogAbandoned(true);return druidDataSource;}@Beanpublic DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource) {return new DataSourceProxy(druidDataSource);}@Beanpublic SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(dataSourceProxy);factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:/mapper/*.xml"));factoryBean.setTransactionFactory(new JdbcTransactionFactory());return factoryBean.getObject();}@Beanpublic GlobalTransactionScanner globalTransactionScanner() {return new GlobalTransactionScanner("stock-gts-seata-example", "my_test_tx_group");}
}
3、配置 Order 服务
- 配置 DataSource、DataSourceProxy、SqlSessionFactory、GlobalTransactionScanner
- 可以使用 RootContext.getXID() 方法获取到 XID
- 按需修改 file.conf、registry.conf 文件的内容
- 对应数据库配置 UNDO_LOG 表
@Configuration
public class SeataAutoConfig {@Autowiredprivate DataSourceProperties dataSourceProperties;@Bean@Primarypublic DruidDataSource druidDataSource() {DruidDataSource druidDataSource = new DruidDataSource();druidDataSource.setUrl(dataSourceProperties.getUrl());druidDataSource.setUsername(dataSourceProperties.getUsername());druidDataSource.setPassword(dataSourceProperties.getPassword());druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());druidDataSource.setInitialSize(0);druidDataSource.setMaxActive(180);druidDataSource.setMaxWait(60000);druidDataSource.setMinIdle(0);druidDataSource.setValidationQuery("Select 1 from DUAL");druidDataSource.setTestOnBorrow(false);druidDataSource.setTestOnReturn(false);druidDataSource.setTestWhileIdle(true);druidDataSource.setTimeBetweenEvictionRunsMillis(60000);druidDataSource.setMinEvictableIdleTimeMillis(25200000);druidDataSource.setRemoveAbandoned(true);druidDataSource.setRemoveAbandonedTimeout(1800);druidDataSource.setLogAbandoned(true);return druidDataSource;}@Beanpublic DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource) {return new DataSourceProxy(druidDataSource);}@Beanpublic SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(dataSourceProxy);factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:/mapper/*.xml"));factoryBean.setTransactionFactory(new JdbcTransactionFactory());return factoryBean.getObject();}@Beanpublic GlobalTransactionScanner globalTransactionScanner() {return new GlobalTransactionScanner("order-gts-seata-example", "my_test_tx_group");}
}