目录
一、创建数据库
二、配置consumer-service
1.pom.xml
2.application.properties
3.启动类
4.其他代码
三、配置provider-service
1.pom.xml
2.application.properties
3.启动类
4.其他代码
四、分布式事务问题演示与解决办法
(一)分布式事务问题演示
(二)解决办法:使用@GlobalTransactional全局事务注解
五、SQL回滚日志——beforeImage和afterImage
六、@GlobalLock
(一)数据库脏写
(二)GlobalLock是如何预防数据库脏写的
七、面试重点:Seata分布式事务流程
1.一阶段
2.二阶段-回滚
3.二阶段-提交
一、创建数据库
每个库都创建下面的表:
-- 分布式事务回滚使用, sql在源码文件夹的 script\client\at\db 目录下
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
CREATE TABLE `user` (
`id` int(11) NOT NULL,
`name` varchar(45) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
二、配置consumer-service
1.pom.xml
添加依赖
<!-- seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<!-- mybatis plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<!-- 连接MySQL数据库 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
2.application.properties
添加下面的配置
#数据库
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.157.102:3306/seata-user?useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
# 用于在分布式中区分是哪个服务,应该保持唯一
seata.application-id=${spring.application.name}
# 获取配置
seata.config.type=nacos
seata.config.nacos.server-addr=192.168.157.102:8848
seata.config.nacos.namespace=seata
seata.config.nacos.group=SEATA_GROUP
seata.config.nacos.dataId=seata-server
seata.config.nacos.username=nacos
seata.config.nacos.password=nacos
# 获取服务
seata.registry.type=nacos
seata.registry.nacos.application=seata-server
seata.registry.nacos.server-addr=192.168.157.102:8848
seata.registry.nacos.namespace=seata
seata.registry.nacos.group=SEATA_GROUP
seata.registry.nacos.username=nacos
seata.registry.nacos.password=nacos
3.启动类
@SpringBootApplication
@EnableFeignClients // 开启openFeign注解扫描
@MapperScan("com.javatest.mapper")
// @EnableAutoDataSourceProxy是 Seata 提供的一个注解,用于启用数据源代理功能。
// 它的主要目的是为数据源创建一个代理对象,以便在分布式事务场景中进行拦截和增强。
@EnableAutoDataSourceProxy
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
4.其他代码
User实体类
@TableName("user")
public class User {
@TableId
private Integer id;
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}
UserMapper
public interface UserMapper extends BaseMapper<User> {
}
UserService
@Service
public class UserServiceImpl {
@Autowired
private UserMapper userMapper;
@Autowired
private ProviderService providerService;
public void saveUser(User user) {
userMapper.insert(user); // 操作第一个数据库
// 发起远程调用,操作第二个数据库
providerService.createUser(user);
}
}
ProvicerService 中,新增:
@FeignClient(name = "provider-service")
public interface ProviderService {
@GetMapping("/saveu") // /saveu这个接口名字必须与provider-service的controller中的一致
String createUser(@SpringQueryMap User user);
}
新增 SeataController:
@RestController
public class SeataController {
@Autowired
private UserServiceImpl userService;
@GetMapping("/saveUser")
public String save(User user) {
userService.saveUser(user);
return "success-----------";
}
}
三、配置provider-service
1.pom.xml
添加依赖
<!-- seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<!-- mybatis plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<!-- 连接MySQL数据库 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
2.application.properties
添加下面的配置,注意provider-service使用的是seata-user2数据库
#数据库
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.157.102:3306/seata-user2?useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
# 用于在分布式中区分是哪个服务,应该保持唯一
seata.application-id=${spring.application.name}
# 获取配置
seata.config.type=nacos
seata.config.nacos.server-addr=192.168.157.102:8848
seata.config.nacos.namespace=seata
seata.config.nacos.group=SEATA_GROUP
seata.config.nacos.dataId=seata-server
seata.config.nacos.username=nacos
seata.config.nacos.password=nacos
# 获取服务
seata.registry.type=nacos
seata.registry.nacos.application=seata-server
seata.registry.nacos.server-addr=192.168.157.102:8848
seata.registry.nacos.namespace=seata
seata.registry.nacos.group=SEATA_GROUP
seata.registry.nacos.username=nacos
seata.registry.nacos.password=nacos
3.启动类
@SpringBootApplication
@EnableAutoDataSourceProxy
@MapperScan("com.javatest.mapper")
public class ProviderApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class,args);
}
}
4.其他代码
User实体类
@TableName("user")
public class User {
@TableId
private Integer id;
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}
UserMapper
public interface UserMapper extends BaseMapper<User> {
}
UserService
@Service
public class UserServiceImpl {
@Autowired
private UserMapper userMapper;
public void saveUser(User user) {
userMapper.insert(user); // 操作第二个数据库
}
}
在ProviderController中,新增 :
@Autowired
private UserServiceImpl userService;
@GetMapping("/saveu")
public String createUser(User user) {
userService.saveUser(user);
return "success-----------";
}
四、分布式事务问题演示与解决办法
(一)分布式事务问题演示
按照上面配置,我们在Consumer-service的saveUser方法中制作一个错误:
@Service
public class UserServiceImpl {
@Autowired
private UserMapper userMapper;
@Autowired
private ProviderService providerService;
public void saveUser(User user) {
userMapper.insert(user); // 操作第一个数据库
// 发起远程调用,操作第二个数据库
providerService.createUser(user);
int i = 1 / 0;
}
}
启动项目consumer-service和provider-service,浏览器输入:
http://localhost:8888/saveUser?id=1&name=张三
运行后控制台报错,2张表的数据都存入成功:
当添加@Transactional后,第一个数据库不会写入,第二个数据库会成功写入,因为第一个数据库会因为代码未执行成功,被@Transactional拦截回滚,而第二个数据库没有添加@Transactional,所以不会被回滚,数据会被写入。
清空2个数据库,重启consumer-service服务,浏览器再次输入:
http://localhost:8888/saveUser?id=1&name=张三
结果:
这种现象就是分布式事务问题,导致的数据不一致。
注意:
- 从Spring Boot 1.0 开始,Spring Boot 默认启用了事务管理,因此通常不需要显式添加 @EnableTransactionManagement。
- 只有在自定义事务管理器、非 Spring Boot 项目或禁用自动配置时,才需要显式添加 @EnableTransactionManagement。
- 如果不确定事务管理是否启用,可以通过日志或测试方法验证。
(二)解决办法:使用@GlobalTransactional全局事务注解
@Service
public class UserServiceImpl {
@Autowired
private UserMapper userMapper;
@Autowired
private ProviderService providerService;
// @Transactional
@GlobalTransactional // 开启全局事务
public void saveUser(User user) {
userMapper.insert(user); // 操作第一个数据库
// 发起远程调用,操作第二个数据库
providerService.createUser(user);
int i = 1 / 0;
}
}
清理数据库,开启@GlobalTransactional,重启consumer-service,然后再次访问代码仍然报错,但是数据库中没有数据,证明分布式事务生效。
也可以在@GlobalTransactional后面添加下面的代码
@GlobalTransactional(rollbackFor = RuntimeException.class)
@GlobalTransactional不一定非要放在Service中,放在Controller上也可以。
五、SQL回滚日志——beforeImage和afterImage
@GlobalTransactional的原理就是sql执行前后的beforeImage和afterImage,日志文件就在undo_log表中,但是因为已经回滚完毕,undo_log表中的日志数据会被删除,下面演示undo_log表中的日志数据:
将rollback_info存储的内容复制出来,到JSON在线解析格式化验证 - JSON.cn这个网站格式化:
如果你使用的是Navicat,右键这个字段内容,另存为json再格式化
这段json日志就是分支回滚日志,最外层是大的事务,大事务里面有2个小的本地事务,每一个小的本地事务都称为分支事务。
beforeImage:sql执行之前,没有数据
afterImage:sql执行之后,有数据
Seata会根据这个日志进行提交和回滚。
六、@GlobalLock
(一)数据库脏写
@Service
public class UserServiceImpl {
@Autowired
private UserMapper userMapper;
@Autowired
private ProviderService providerService;
// @Transactional
// 开启全局事务
@GlobalTransactional(rollbackFor = RuntimeException.class)
public void saveUser(User user) {
// userMapper.insert(user); // 操作第一个数据库
userMapper.updateById(user);
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
}
// 发起远程调用,操作第二个数据库
providerService.createUser(user);
int i = 1 / 0;
}
public void updateById(User user) {
userMapper.updateById(user); // 更新第一个数据库
}
}
@RestController
public class SeataController {
@Autowired
private UserServiceImpl userService;
@GetMapping("/saveUser")
public String save(User user) {
userService.saveUser(user);
return "success-----------";
}
@GetMapping("/updateUser")
public String update(User user) {
userService.updateById(user);
return "success-----------";
}
}
将seata-user数据库的user表添加数据,seata-user2数据库表中没有数据
重启consumer-service先访问/saveUser修改表中的值,数据修改成功
http://localhost:8888/saveUser?id=100&name=小乔
在20s之内,访问/updateUser,数据修改成功
http://localhost:8888/updateUser?id=100&name=周瑜
此时,后台会一直尝试回滚,并且一直提示回滚失败
将表中的数据手动改回小乔才能回滚成功,这种情况只能手动修改:
回滚完成后,数据会回到最开始的张三:
(二)GlobalLock是如何预防数据库脏写的
GlobalLock:全局锁,必须配合 @Transactional 和 update 语句,才能使用
作用:提交数据前,先找 seata 获取全局锁,如果加锁失败就报异常,比如:
@Service
public class UserServiceImpl {
@Autowired
private UserMapper userMapper;
@Autowired
private ProviderService providerService;
// 开启全局事务
@GlobalTransactional(rollbackFor = RuntimeException.class)
public void saveUser(User user) {
// userMapper.insert(user); // 操作第一个数据库
userMapper.updateById(user);
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
}
// 发起远程调用,操作第二个数据库
providerService.createUser(user);
int i = 1 / 0;
}
@GlobalLock
@Transactional
public void updateById(User user) {
userMapper.updateById(user); // 更新第一个数据库
}
}
浏览器分别访问:
http://localhost:8888/saveUser?id=100&name=小乔http://localhost:8888/updateUser?id=100&name=周瑜http://localhost:8888/saveUser?id=100&name=小乔
控制台报错:获取锁失败,脏数据未写入
这里获取锁报错是因为,saveUser方法上,使用了@GlobalTransactional 注解,该注解会自动在seata上添加一条全局锁信息,比如:
这些数据过一段时间会自动清除。
执行updateById方法时,因为有@GlobalLock注解,所以也会向seata发送请求,给id=100的数据加全局锁,但是全局锁已经存在了,所以该条数据不会被修改。
七、面试重点:Seata分布式事务流程
流程图:
Seata中有3个角色:
- TC(Transaction Coordinator)——事务协调者:驱动全局事务提交或回滚,其实就是 Seata服务端。
- TM(Transaction Manager)——事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务,其实就是Seata客户端。
- RM(Resource Manager)——资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
TM 和 RM 都在应用程序中
seata 分布式事务分为两个阶段,以下面的 sql 为例:
update user set name = '张三' where name = '小乔';
1.一阶段
- 通过GlobalTransactional注册全局事务,会得到一个XID(全局事务ID)
- 解析SQL:得到SQL的类型(UPDATE),表名(user),条件(name = '哈哈')等相关的信息(元数据)。
- 保存原镜像beforeImage:根据解析得到的条件信息,生成查询语句,定位数据,比如:
select id, name from user where name = '哈哈';
- 执行业务SQL:更新这条记录的name为 '亚瑟'。
- 保存新镜像afterImage:根据前镜像的结果,通过主键定位数据,比如:
select id, name, since from product where id = 1;
- 插入回滚日志:把新老镜像数据以及业务SQL相关的信息组成一条回滚日志记录,插入到UNDO_LOG表中。
- 本地事务提交之前,向TC注册分支事务:申请 user 表中,主键值等于1的记录的全局锁 。
- 本地事务提交:业务数据的更新和前面步骤中生成的UNDO_LOG一并提交。
- 将本地事务提交的结果上报给TC
- 调用,执行另一个事务,并传递XID(放到 Request Header 中,TX_XID)
2.二阶段-回滚
- 收到TC的分支回滚请求,开启一个本地事务,执行如下操作:
- 通过XID和Branch ID(分支事务ID)查找到相应的UNDO_LOG记录。
- 数据校验:拿UNDO_LOG 中的afterImage与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。
- 根据UNDO_LOG中的beforeImage和业务SQL的相关信息生成并执行回滚的语句:
- 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给TC。
3.二阶段-提交
- 收到TC的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
- 异步任务阶段的分支提交请求将异步和批量地删除相应UNDO_LOG记录。