文章目录
- 🔊博主介绍
- 🥤本文内容
- 分布式事务介绍
- 分布式事务解决方案
- 1. 2PC(Two Phase Commit)方案
- 2. JTA/XA规范实现
- 3. Seata AT模式实现
- 4. TCC实现
- 使用hmily实现TCC
- Spring Cloud Alibaba项目中整合Seata来实现分布式事务管理
- 1. **启动Seata Server**:
- 2. **整合Seata到Spring Cloud微服务**:
- 分库分表策略
- 分库分表后的分布式事务处理方案
- shardingsphere的XA实现
- shardingsphere的AT实现
- 📢文章总结
- 📥博主目标
🔊博主介绍
🌟我是廖志伟,一名Java开发工程师、Java领域优质创作者、CSDN博客专家、51CTO专家博主、阿里云专家博主、清华大学出版社签约作者、产品软文专业写手、技术文章评审老师、问卷调查设计师、个人社区创始人、开源项目贡献者。🌎跑过十五公里、🚀徒步爬过衡山、🔥有过三个月减肥20斤的经历、是个喜欢躺平的狠人。
📕拥有多年一线研发和团队管理经验,研究过主流框架的底层源码(Spring、SpringBoot、Spring MVC、SpringCould、Mybatis、Dubbo、Zookeeper),消息中间件底层架构原理(RabbitMQ、RockerMQ、Kafka)、Redis缓存、MySQL关系型数据库、 ElasticSearch全文搜索、MongoDB非关系型数据库、Apache ShardingSphere分库分表读写分离、设计模式、领域驱动DDD、Kubernetes容器编排等。
📙有过从0到1的项目高并发项目开发与管理经验,对JVM调优、MySQL调优、Redis调优 、ElasticSearch调优、消息中间件调优、系统架构调优都有着比较全面的实战经验。
📘有过云端搭建服务器环境,自动化部署CI/CD,弹性伸缩扩容服务器(最高200台),了解过秒级部署(阿里云的ACK和华为云的云容器引擎CCE)流程,能独立开发和部署整个后端服务,有过分库分表的实战经验。
🎥经过多年在CSDN创作上千篇文章的经验积累,我已经拥有了不错的写作技巧,与清华大学出版社签下了四本书籍的合约,并将陆续在明年出版。这些书籍包括了基础篇、进阶篇、架构篇的📌《Java项目实战—深入理解大型互联网企业通用技术》📌,以及📚《解密程序员的思维密码–沟通、演讲、思考的实践》📚。具体出版计划会根据实际情况进行调整,希望各位读者朋友能够多多支持!
文章目录
- 🔊博主介绍
- 🥤本文内容
- 分布式事务介绍
- 分布式事务解决方案
- 1. 2PC(Two Phase Commit)方案
- 2. JTA/XA规范实现
- 3. Seata AT模式实现
- 4. TCC实现
- 使用hmily实现TCC
- Spring Cloud Alibaba项目中整合Seata来实现分布式事务管理
- 1. **启动Seata Server**:
- 2. **整合Seata到Spring Cloud微服务**:
- 分库分表策略
- 分库分表后的分布式事务处理方案
- shardingsphere的XA实现
- shardingsphere的AT实现
- 📢文章总结
- 📥博主目标
🌾阅读前,快速浏览目录和章节概览可帮助了解文章结构、内容和作者的重点。了解自己希望从中获得什么样的知识或经验是非常重要的。建议在阅读时做笔记、思考问题、自我提问,以加深理解和吸收知识。
💡在这个美好的时刻,本人不再啰嗦废话,现在毫不拖延地进入文章所要讨论的主题。接下来,我将为大家呈现正文内容。
🥤本文内容
分布式事务介绍
首先,事务被定义为访问和可能更新数据库中数据项的一个程序执行单元,它必须具备原子性、一致性、隔离性和持久性这四个关键特性,通常合称为ACID特性。
原子性意味着事务内的所有操作作为一个不可分割的整体执行,要么全部完成,要么全部不执行。
一致性确保事务使得数据库从一个有效状态转换到另一个有效状态,中间状态对外部不可见。
隔离性确保并发执行的事务间互不影响,每笔事务好像在单独执行一样,隔离性可以通过不同的隔离级别来实现,如读未提交、读已提交、可重复读和串行化。
持久性则保证一旦事务成功提交,其对数据库的更改将会永久保存,不受后续操作或系统故障的影响。
本地事务是指在一个单一数据库中执行的事务,数据库系统直接支持其ACID特性。在Java编程中,可以通过java.sql.Connection
对象来控制事务的开启、提交和回滚。
随着互联网行业的快速发展,企业往往采用数据库拆分和服务化(SOA)策略,这就引出了分布式事务的概念。分布式事务涉及多个数据库资源服务器上的操作,要求这些服务器上的数据变更要么全部成功,要么全部失败,以保持整体数据一致性。
典型的分布式事务场景包括:
- 跨库事务:一个业务功能可能需要在多个数据库之间进行操作。
- 分库分表:为应对大数据量,数据库常常会被水平拆分,这时开发者需要借助数据库中间件来处理复杂的SQL操作,并保证分布式环境下事务的正确执行。
- 服务化:在微服务架构中,一个业务流程可能跨越多个独立的服务,每个服务各自操作数据库,这些跨服务的数据库操作必须保证事务的一致性。
分布式事务的实现面临诸多挑战,比如如何在多个数据库间维持ACID特性和性能优化。针对这些问题,X/Open组织制定了DTP(分布式事务处理)模型和XA规范。DTP模型包含五个核心组件:应用程序(AP)、资源管理器(RM)、事务管理器(TM)、通信资源管理器(CRM)和通信协议(CP)。XA规范则定义了RM和TM之间的交互接口,并优化了两阶段提交协议(2PC)。
2PC在分布式事务中用来确保多个RM的协同提交或回滚,但存在同步阻塞、单点故障和数据不一致的风险。为了解决这些问题,提出了三阶段提交协议(3PC),增加了超时机制和更细致的准备阶段(CanCommit、PreCommit、DoCommit),从而减少了阻塞时间,增强了容错能力。
不过,无论是2PC还是3PC,都不能完全避免分布式环境下的数据一致性问题。例如,单点故障可能导致参与者阻塞,而网络问题可能会造成部分参与者未能接收到协调者的指令,进而引发数据不一致。尽管3PC相比2PC有所改进,但依然存在一定的风险。
分布式事务解决方案
分布式事务解决方案的几种常见实践,主要包括2PC(两阶段提交)协议、JTA/XA规范的实现、Seata AT模式以及RocketMQ事务消息的设计与实现,并探讨了它们的优缺点以及适用场景。
1. 2PC(Two Phase Commit)方案
-
2PC是一种经典的分布式事务解决方案,它将事务提交过程分为两个阶段:准备阶段和提交阶段。在准备阶段,事务管理器(TM)通知所有资源管理器(RM,如数据库服务器)准备提交事务分支,RM根据自身状况决定是否可以提交,并给予响应。如果所有RM均成功准备,TM将在第二阶段通知所有RM进行提交;否则,指示所有RM回滚事务。
-
2PC存在的问题包括:
- 同步阻塞问题:所有参与者在第二阶段需等待协调者TM的指令,降低了并发性能,尤其在高并发场景下,可能出现严重的性能瓶颈。
- 单点故障:协调者TM的失效会导致所有参与者RM阻塞,无法推进事务。
- 数据不一致:在网络故障或TM故障的情况下,可能会造成部分参与者收到提交指令而部分参与者未收到,导致数据不一致。
举个例子:
创建一个XA规范的项目引入依赖:
<!-- mysql jdbc 实现了XA规范-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
创建一个Java类
import com.mysql.jdbc.jdbc2.optional.MysqlXAConnection; //导入MysqlXAConnection类
import com.mysql.jdbc.jdbc2.optional.MysqlXid; //导入MysqlXid类
import javax.sql.XAConnection; //导入XAConnection接口
import javax.transaction.xa.XAException; //导入XAException异常类
import javax.transaction.xa.XAResource; //导入XAResource接口
import javax.transaction.xa.Xid; //导入Xid接口
import java.sql.Connection; //导入Connection类
import java.sql.DriverManager; //导入DriverManager类
import java.sql.PreparedStatement; //导入PreparedStatement类
import java.sql.ResultSet; //导入ResultSet类
import java.sql.SQLException; //导入SQLException异常类
public class MysqlXADemo {
public static void main(String[] args) throws SQLException {
//true表示打印XA语句,,用于调试
boolean logXaCommands = true;
// 获得资源管理器操作接口实例 RM1
Connection conn1 = DriverManager.getConnection
("jdbc:mysql://localhost:3306/db_user", "root", "root"); //创建数据库连接conn1
XAConnection xaConn1 = new MysqlXAConnection(
(com.mysql.jdbc.Connection) conn1, logXaCommands); //创建MysqlXAConnection连接
XAResource rm1 = xaConn1.getXAResource(); //获取xaConn1的XAResource实例
// 获得资源管理器操作接口实例 RM2
Connection conn2 = DriverManager.getConnection
("jdbc:mysql://localhost:3306/db_account", "root", "root"); //创建数据库连接conn2
XAConnection xaConn2 = new MysqlXAConnection(
(com.mysql.jdbc.Connection) conn2, logXaCommands); //创建MysqlXAConnection连接
XAResource rm2 = xaConn2.getXAResource(); //获取xaConn2的XAResource实例
// AP请求TM执行一个分布式事务,TM生成全局事务id
byte[] gtrid = "g12345".getBytes(); //定义全局事务id
int formatId = 1; //定义事务id的格式
try {
// ==============分别执行RM1和RM2上的事务分支====================
// TM生成rm1上的事务分支id
byte[] bqual1 = "b00001".getBytes(); //定义rm1事务分支id
Xid xid1 = new MysqlXid(gtrid, bqual1, formatId); //创建MysqlXid对象
// 执行rm1上的事务分支
rm1.start(xid1, XAResource.TMNOFLAGS);//One of TMNOFLAGS, TMJOIN, or TMRESUME.
PreparedStatement ps1 = conn1.prepareStatement(
"INSERT into user(name) VALUES ('Fox')"); //创建PreparedStatement对象
ps1.execute(); //执行SQL语句
rm1.end(xid1, XAResource.TMSUCCESS); //结束rm1事务分支
// TM生成rm2上的事务分支id
byte[] bqual2 = "b00002".getBytes(); //定义rm2事务分支id
Xid xid2 = new MysqlXid(gtrid, bqual2, formatId); //创建MysqlXid对象
// 执行rm2上的事务分支
rm2.start(xid2, XAResource.TMNOFLAGS); //开始rm2事务分支
PreparedStatement ps2 = conn2.prepareStatement(
"INSERT into account(user_id,money) VALUES (1,10000000)"); //创建PreparedStatement对象
ps2.execute(); //执行SQL语句
rm2.end(xid2, XAResource.TMSUCCESS); //结束rm2事务分支
// ===================两阶段提交================================
// phase1:询问所有的RM 准备提交事务分支(预提交)
int rm1_prepare = rm1.prepare(xid1); //询问rm1是否准备提交事务分支
int rm2_prepare = rm2.prepare(xid2); //询问rm2是否准备提交事务分支
// phase2:提交所有事务分支
boolean onePhase = false; //定义是否为一阶段提交
//TM判断有2个事务分支,所以不能优化为一阶段提交
if (rm1_prepare == XAResource.XA_OK
&& rm2_prepare == XAResource.XA_OK) {
//所有事务分支都prepare成功,提交所有事务分支
rm1.commit(xid1, onePhase); //提交rm1事务分支
rm2.commit(xid2, onePhase); //提交rm2事务分支
} else {
//如果有事务分支没有成功,则回滚
rm1.rollback(xid1); //回滚rm1事务分支
rm2.rollback(xid2); //回滚rm2事务分支
}
} catch (XAException e) {
// 如果出现异常,也要进行回滚
e.printStackTrace(); //打印异常信息
}
}
}
2. JTA/XA规范实现
- JTA(Java Transaction API)和XA规范是Java领域用于处理分布式事务的标准,提供了跨多个数据库或其他事务资源的事务管理能力。Atomikos等开源框架实现了JTA规范,其中TransactionEssentials是免费的开源产品,ExtremeTransactions则是商业版本,它们都提供了事务管理器、用户事务接口等相关实现,并且对实现了XADataSource接口的数据库连接池进行封装,以支持分布式事务。
举个例子:
JTA是对XA规范的Java封装,所以需要引入相关依赖:
<!--JTA规范扩展包 -->
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<!-- atomikos JTA/XA全局事务 -->
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jdbc</artifactId>
<version>4.0.6</version>
</dependency>
创建一个Java类:
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;
public class AtomikosDemo {
// 创建 AtomikosDataSourceBean
private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {
// 连接池的基本属性
Properties p = new Properties();
p.setProperty("url", "jdbc:mysql://localhost:3306/" + dbName);
p.setProperty("user", "root");
p.setProperty("password", "root");
// 使用 AtomikosDataSourceBean 封装 com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
// 设置唯一的资源名称
ds.setUniqueResourceName(dbName);
ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
ds.setXaProperties(p);
return ds;
}
public static void main(String[] args) {
// 创建两个 AtomikosDataSourceBean 对象,分别表示两个数据库
AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("db_user"); // 创建数据库1的AtomikosDataSourceBean对象
AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("db_account"); // 创建数据库2的AtomikosDataSourceBean对象
Connection conn1 = null;
Connection conn2 = null;
PreparedStatement ps1 = null;
PreparedStatement ps2 = null;
UserTransaction userTransaction = new UserTransactionImp();
try {
// 开始事务
userTransaction.begin();
// 在 db1 上执行 SQL
conn1 = ds1.getConnection(); // 获取数据库1的连接
ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS); // 创建预编译的SQL语句
ps1.setString(1, "Fox"); // 设置参数
ps1.executeUpdate(); // 执行更新操作
ResultSet generatedKeys = ps1.getGeneratedKeys(); // 获取自动生成的键
int userId = -1;
while (generatedKeys.next()) {
// 获取自动生成的userId
userId = generatedKeys.getInt(1);
}
// 模拟异常,直接进入 catch 代码块,两个数据库都不会提交
// int i = 1 / 0;
// 在 db2 上执行 SQL
conn2 = ds2.getConnection(); // 获取数据库2的连接
ps2 = conn2.prepareStatement("INSERT into account(user_id, money) VALUES (?, ?)"); // 创建预编译的SQL语句
ps2.setInt(1, userId); // 设置参数
ps2.setDouble(2, 10000000); // 设置参数
ps2.executeUpdate(); // 执行更新操作
// 两阶段提交
userTransaction.commit(); // 提交事务
} catch (Exception e) {
try {
e.printStackTrace();
userTransaction.rollback(); // 回滚事务
} catch (SystemException e1) {
e1.printStackTrace();
}
} finally {
try {
ps1.close();
ps2.close();
conn1.close();
conn2.close();
ds1.close();
ds2.close();
} catch (Exception ignore) {
}
}
}
}
要么全部成功要么全部失败
3. Seata AT模式实现
-
Seata是一个开源的分布式事务框架,其AT模式改进了传统的2PC,实现了对业务代码的无侵入性。AT模式下,事务管理器(TC)与资源管理器(RM)协作,通过生成全局事务ID(XID)将多个微服务的本地事务绑定为一个全局事务,并通过维护回滚日志的方式来实现在分布式环境下的事务原子性和一致性。
-
Seata的优点包括:
- 应用层基于SQL解析自动补偿,减少业务侵入;
- 独立部署的事务协调器TC负责事务注册和回滚;
- 引入全局锁来增强隔离性。
- 适合并发不高的场景,例如定时任务可以结合AT模式使用。
-
存在的问题:
- 性能损耗大,每次SQL操作涉及到多次远程通信,以及undolog的写入和删除操作;
- 全局锁的引入虽然提高了隔离性,但也可能导致热点数据的并发性能下降,增加死锁风险;
- 回滚锁释放时间较长,尤其是在事务回滚时。
一条Update的SQL,则需要全局事务xid获取(与TC通讯)、before image(解析SQL,查询一次数据库)、after image(查询一次数据库)、insert undo log(写一次数据库)、before commit(与TC通讯,判断锁冲突),这些操作都需要一次远程通讯RPC,而且是同步的。另外undo log写入时blob字段的插入性能也是不高的。每条写SQL都会增加这么多开销,粗略估计会增加5倍响应时间。
- RocketMQ事务消息实现
- RocketMQ通过其事务消息功能解决了Producer端消息发送与本地事务执行的原子性问题。Producer发送事务消息后,若本地事务成功,向MQServer发送commit消息,否则发送rollback消息。MQServer根据Producer端的事务执行结果决定是否投递消息给消费者,同时引入事务回查机制来处理Producer故障或超时等情况,保证事务最终一致性。
4. TCC实现
账户A所在的服务需要实现三个方法:try、confirm、cancel方法,账户B所在的服务也需要实现三个方法:try、confirm、cancel方法。
这就存在几种情况了:
这种情况会出现,假设让账户A给账户B转30元,但是账户B先执行了try方法先添加了30,这个时候账户里面有30元了,立马进行消费,消费了5元,这时账户B只有25元了,然后账户A开始执行try方法进行判断,余额够不够30元,假设只有10元,不够,需要执行第二阶段:账户B执行cancel方法,就会发现25-30成为了-5元了,这就不合理了。
这种情况可以保证账户A转账成功之后才会去添加30,第一阶段:二个都执行try方法,可以保证账户A至少转账了,第二阶段:账户B成功收到账户A转账的钱。但是如果账户A余额只有10元不够30元,这个时候账户A和B都会执行cancel操作,账户B执行cancel是个空的,没关系,但是账户A执行了cancel方法,反而添加了30元,这就不对了,只要不满足就加钱,这时不合理的。
这种方案怎么解决上述的问题呢?就是判断一下账号A的try本地事务是否提交,在本地记录一个事务日志表,如果try方法执行了本地事务,记录一条事务记录,等到需要执行cancel方法的时候查看有没有记录,没有记录不执行,有记录就执行cancel的本地事务,这样就可以解决上述问题。
使用hmily实现TCC
添加依赖
<properties>
<!--Spring cloud -->
<spring-cloud.version>Greenwich.SR3</spring-cloud.version>
<spring-cloud-alibaba.version>2.1.1.RELEASE</spring-cloud-alibaba.version>
<hmily-springcloud.version>2.0.4-RELEASE</hmily-springcloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>hmily-springcloud</artifactId>
<version>${hmily-springcloud.version}</version>
</dependency>
<!--Spring Cloud 相关依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--Spring Cloud Alibaba 相关依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
yml中添加配置:
org:
dromara:
hmily :
serializer : kryo
recoverDelayTime : 30
retryMax : 30
scheduledDelay : 30
scheduledThreadMax : 10
repositorySupport : db
started: true
hmilyDbConfig :
driverClassName : com.mysql.jdbc.Driver
url : jdbc:mysql://localhost:3306/hmily?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8
username : root
password : root
创建数据库hmily就可以了,表自己创建
假设这是账户A的业务
import com.liao.bank1.common.TransactionEnum;
import com.liao.bank1.fegin.Bank2FeignClient;
import com.liao.bank1.mapper.AccountMapper;
import com.liao.bank1.service.AccountService;
import lombok.extern.slf4j.Slf4j;
import org.dromara.hmily.annotation.Hmily;
import org.dromara.hmily.core.concurrent.threadlocal.HmilyTransactionContextLocal;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Autowired
AccountMapper accountMapper;
@Autowired
Bank2FeignClient bank2FeignClient;
/**
* try方法执行逻辑:
* 1.try幂等校验
* 2.try悬挂处理
* 3.检查余额是否足够扣减
* 4.扣减金额
* @param from_accountNo 转出账户
* @param to_accountNo 转入账户
* @param amount 转账的金额
*/
@Transactional
//只要标记@Hmily就是try方法,在注解中指定confirm、cancel两个方法的名字
@Hmily(confirmMethod = "commit", cancelMethod = "rollback")
@Override
public void transfer(String from_accountNo, String to_accountNo, Double amount) {
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 try begin 开始执行...xid:{}", transId);
//幂等判断 判断local_transaction_log表中是否有try日志记录,如果有则不再执行
if (accountMapper.isExistTransactionLogByType(transId, TransactionEnum.TRY.getValue()) > 0) {
log.info("bank1 try 已经执行,无需重复执行,xid:{}", transId);
return;
}
//try悬挂处理,如果cancel、confirm有一个已经执行了,try不再执行
if (accountMapper.isExistTransactionLogByType(transId, TransactionEnum.CONFIRM.getValue()) > 0
|| accountMapper.isExistTransactionLogByType(transId, TransactionEnum.CANCEL.getValue()) > 0) {
log.info("bank1 try悬挂处理 cancel或confirm已经执行,不允许执行try,xid:{}", transId);
return;
}
//扣减金额
if (accountMapper.subtractAccountBalance(from_accountNo, amount) <= 0) {
//扣减失败
throw new RuntimeException("bank1 try 扣减金额失败,xid:"+transId);
}
//插入try执行记录,用于幂等判断
accountMapper.addTransactionLog(transId, TransactionEnum.TRY.getValue());
//转账,远程调用bank2
if (!bank2FeignClient.transferTo(to_accountNo,amount)) {
throw new RuntimeException("bank1 远程调用bank2微服务失败,xid:" + transId);
}
if (amount == 20) {
throw new RuntimeException("人为制造异常,xid:" + transId);
}
log.info("bank1 try end 结束执行...xid:{}", transId);
}
@Transactional
public void commit(String from_accountNo, String to_accountNo, Double amount) {
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 confirm begin 开始执行...xid:{},accountNo:{},amount:{}", transId, from_accountNo, amount);
}
/**
* cancel方法执行逻辑: 1.cancel幂等校验 2.cancel空回滚处理 3.增加可用余额
* @param from_accountNo
* @param to_accountNo
* @param amount
*/
@Transactional
public void rollback(String from_accountNo, String to_accountNo, Double amount) {
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 cancel begin 开始执行...xid:{}", transId);
// cancel幂等校验
if (accountMapper.isExistTransactionLogByType(transId, TransactionEnum.CANCEL.getValue()) > 0) {
log.info("bank1 cancel 已经执行,无需重复执行,xid:{}", transId);
return;
}
//cancel空回滚处理,如果try没有执行,cancel不允许执行
if (accountMapper.isExistTransactionLogByType(transId, TransactionEnum.TRY.getValue()) <= 0) {
log.info("bank1 空回滚处理,try没有执行,不允许cancel执行,xid:{}", transId);
return;
}
// 增加可用余额
accountMapper.addAccountBalance(from_accountNo, amount);
//插入一条cancel的执行记录
accountMapper.addTransactionLog(transId, TransactionEnum.CANCEL.getValue());
log.info("bank1 cancel end 结束执行...xid:{}", transId);
}
}
账户B的业务:
import com.liao.bank2.common.TransactionEnum;
import com.liao.bank2.mapper.AccountMapper;
import com.liao.bank2.service.AccountService;
import lombok.extern.slf4j.Slf4j;
import org.dromara.hmily.annotation.Hmily;
import org.dromara.hmily.core.concurrent.threadlocal.HmilyTransactionContextLocal;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Autowired
AccountMapper accountMapper;
@Override
@Hmily(confirmMethod="confirmMethod", cancelMethod="cancelMethod")
public void transferTo(String accountNo, Double amount) {
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank2 try begin 开始执行...xid:{}",transId);
}
/**
* confirm方法:
* 1.confirm幂等校验
* 2.正式增加金额
* @param accountNo
* @param amount
*/
@Transactional
public void confirmMethod(String accountNo, Double amount){
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank2 confirm begin 开始执行...xid:{}",transId);
if(accountMapper.isExistTransactionLogByType(transId,TransactionEnum.CONFIRM.getValue())>0){
log.info("bank2 confirm 已经执行,无需重复执行...xid:{}",transId);
return ;
}
//增加金额
accountMapper.addAccountBalance(accountNo,amount);
//增加一条confirm日志,用于幂等校验
accountMapper.addTransactionLog(transId,TransactionEnum.CONFIRM.getValue());
log.info("bank2 confirm end 结束执行...xid:{}",transId);
}
/**
* @param accountNo
* @param amount
*/
public void cancelMethod(String accountNo, Double amount){
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank2 cancel begin 开始执行...xid:{}",transId);
}
}
创建本地事务日志表
CREATE TABLE `local_transaction_log` (
`tx_no` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`type` int(11) NOT NULL COMMENT '1:try,2:confirm, 3:cancel',
`create_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
再举个例子,账户里100元,事务1需要扣减60元,事务2需要扣减70元,同时去操作都符合条件,余额100嘛,大于扣减的金额,这时候就有问题了,二个事务只能成功一个才可以,这时候可以将账户余额冻结一部分,只看可用的余额,同样的本地事务日志也需要添加冻结金额字段进行判断。
Spring Cloud Alibaba项目中整合Seata来实现分布式事务管理
关于如何在Spring Cloud Alibaba项目中整合Seata来实现分布式事务管理的具体步骤和要点,包括Seata Server的启动、Seata在Spring Cloud微服务架构中的集成以及柔性事务的不同解决方案。以下是你需要知道的关键步骤和概念:
1. 启动Seata Server:
-
准备环境:首先指定Nacos作为Seata Server的配置中心和注册中心,确保客户端和服务端的Nacos配置组(group)一致。
-
将Seata Server的配置同步至Nacos:获取/seata/script/config-center/config.txt,修改并同步相关配置文件,确保事务分组(service.vgroup_mapping)配置正确,与客户端配置(
spring.cloud.alibaba.seata.tx-service-group
)一致。
-
启动Seata Server:通过执行相应的启动脚本启动Seata Server服务,默认监听8091端口,并在注册中心(Nacos)中查看Seata Server是否注册成功。
2. 整合Seata到Spring Cloud微服务:
- 依赖导入:确保项目的
pom.xml
文件中包含Spring Cloud Alibaba Seata Starter及相关依赖。
添加依赖
<!-- seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>${seata.version}</version>
</dependency>
- 在每个微服务对应的数据库中添加undolog表
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
微服务需要使用seata DataSourceProxy代理自己的数据源
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import javax.sql.DataSource;
/**
* 需要用到分布式事务的微服务都需要使用seata DataSourceProxy代理自己的数据源
*/
@Configuration
@MapperScan("com.liao.datasource.mapper")
public class MybatisConfig {
/**
* 从配置文件获取属性构造datasource,注意前缀,这里用的是druid,根据自己情况配置,
* 原生datasource前缀取"spring.datasource"
* @return
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource.druid")
public DataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
return druidDataSource;
}
/**
* 构造datasource代理对象,替换原来的datasource
* @param druidDataSource
* @return
*/
@Primary
@Bean("dataSource")
public DataSourceProxy dataSourceProxy(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
@Bean(name = "sqlSessionFactory")
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
//设置代理数据源
factoryBean.setDataSource(dataSourceProxy);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
factoryBean.setMapperLocations(resolver.getResources("classpath*:mybatis/**/*-mapper.xml"));
org.apache.ibatis.session.Configuration configuration=new org.apache.ibatis.session.Configuration();
//使用jdbc的getGeneratedKeys获取数据库自增主键值
configuration.setUseGeneratedKeys(true);
//使用列别名替换列名
configuration.setUseColumnLabel(true);
//自动使用驼峰命名属性映射字段,如userId ---> user_id
configuration.setMapUnderscoreToCamelCase(true);
factoryBean.setConfiguration(configuration);
return factoryBean.getObject();
}
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@SpringBootApplication(scanBasePackages = "com.liao",exclude = DataSourceAutoConfiguration.class)
public class AccountServiceApplication {
public static void main(String[] args) {
SpringApplication.run(AccountServiceApplication.class, args);
}
}
- 配置事务分组:在
application.yml
或bootstrap.yml
中配置Seata的服务事务分组,确保与Seata Server中的配置相同。将registry.conf文件拷贝到resources目录下,指定注册中心和配置中心都是nacos。
registry.conf文件
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
cluster = "default"
group = "SEATA_GROUP"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = "54433b62-df64-40f1-9527-c907219fc17f"
group = "SEATA_GROUP"
}
}
在yml中指定事务分组(和配置中心的service.vgroup_mapping 配置一一对应)
spring:
application:
name: account-service
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
alibaba:
seata:
tx-service-group:
my_test_tx_group # seata 服务事务分组
spring cloud alibaba 2.1.4 之后支持yml中配置seata属性,可以用来替换registry.conf文件,配置支持实现在seata-spring-boot-starter.jar中,也可以引入依赖:
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.0</version>
</dependency>
如果使用上述依赖,那么spring-cloud-starter-alibaba-seata就需要注释。
application.yml文件
#spring cloud alibaba 2.1.4 之后支持yml中配置seata属性,可以用来替换registry.conf文件
seata:
# seata 服务分组,要与服务端nacos-config.txt中service.vgroup_mapping的后缀对应
tx-service-group: my_test_tx_group
registry:
# 指定nacos作为注册中心
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: ""
group: SEATA_GROUP
config:
# 指定nacos作为配置中心
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: "54433b62-df64-40f1-9527-c907219fc17f"
group: SEATA_GROUP
- 使用
@GlobalTransactional
注解:在微服务的事务发起者的方法上添加@GlobalTransactional
注解,以便将涉及多个微服务的业务逻辑包装在一个分布式事务中。
import com.tuling.datasource.entity.Order;
import com.tuling.datasource.entity.OrderStatus;
import com.tuling.datasource.mapper.OrderMapper;
import com.tuling.order.feign.AccountFeignService;
import com.tuling.order.feign.StorageFeignService;
import com.tuling.order.service.OrderService;
import com.tuling.order.vo.OrderVo;
import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountFeignService accountFeignService;
@Autowired
private StorageFeignService storageFeignService;
@Override
//@Transactional
@GlobalTransactional(name="createOrder")
public Order saveOrder(OrderVo orderVo) {
log.info("=============用户下单=================");
log.info("当前 XID: {}", RootContext.getXID());
// 保存订单
Order order = new Order();
order.setUserId(orderVo.getUserId());
order.setCommodityCode(orderVo.getCommodityCode());
order.setCount(orderVo.getCount());
order.setMoney(orderVo.getMoney());
order.setStatus(OrderStatus.INIT.getValue());
Integer saveOrderRecord = orderMapper.insert(order);
log.info("保存订单{}", saveOrderRecord > 0 ? "成功" : "失败");
//扣减库存 fegin 怎么通知
storageFeignService.deduct(orderVo.getCommodityCode(), orderVo.getCount());
//扣减余额 服务降级 throw
Boolean debit= accountFeignService.debit(orderVo.getUserId(), orderVo.getMoney());
// if(!debit){
// // 解决 feign整合sentinel降级导致SeaTa失效的处理
// throw new RuntimeException("账户服务异常降级了");
// }
//更新订单
Integer updateOrderRecord = orderMapper.updateOrderStatus(order.getId(),OrderStatus.SUCCESS.getValue());
log.info("更新订单id:{} {}", order.getId(), updateOrderRecord > 0 ? "成功" : "失败");
return order;
}
}
具体到一个用户下单的业务场景,该场景涉及仓储服务、订单服务和账务服务,每个服务都要在Seata的管理下参与到全局事务中去。
-
Seata AT模式实现原理:
- AT模式通过解析业务SQL并在本地事务中生成Undo Log,保证事务回滚时能恢复原始数据状态。
- 成功提交时,Seata会异步删除Undo Log;事务失败时,根据XID和分支事务信息回滚相应分支。
-
Seata的优势与问题:
- 优势:非侵入式的事务管理,无需大幅修改业务代码;独立部署的事务协调器TC增强了系统的扩展性;全局锁机制提升了事务的隔离性。
- 问题:性能损耗较大,包括多个远程通信和数据库操作;全局锁可能带来并发性能下降,特别是对热点数据;回滚时,解锁过程较慢;可能发生死锁,尽管有重试机制,但仍会影响性能。
总结起来,要在Spring Cloud Alibaba项目中使用Seata,你需要配置Seata Server、正确地在微服务中集成Seata客户端,然后在需要分布式事务的业务逻辑中使用@GlobalTransactional
注解。同时,还需注意Seata在提高分布式事务一致性的同时,可能带来的性能挑战和并发控制问题。
分库分表策略
如何设计并实施分库分表策略以应对大规模数据库的压力,并结合Spring Cloud Alibaba生态中的Seata组件处理分布式事务。以下是内容的核心梳理:
-
分库分表背景与必要性:
- 当单表数据量过大(如超过500万条记录或达到2GB大小),或者查询性能显著下降时,应考虑采用分库分表策略以分散存储压力、提高查询效率和并发处理能力。
-
分库分表策略设计:
- 文档以电商系统中的订单表为例,介绍了分库分表的基本策略:
- 分库策略:将订单表和订单详情表从业务库中分离出来,放在两个独立的数据库中,以减轻网络压力。
- 分表策略:每个数据库中进一步划分两个表,通过某种分片规则(如取模或范围分片)均衡地将数据分布到总共四个表中。
- 文档以电商系统中的订单表为例,介绍了分库分表的基本策略:
-
Seata整合与分库分表实践:
- Seata通过配置文件或YAML方式指定分库分表规则,例如:
oms_order
表和oms_order_item
表各自按照不同的字段(如id
或order_id
)进行分片,确定数据库和表的位置。- 使用
inline
分片策略,设定具体的分片算法表达式。 - 设置数据源和事务管理相关的配置,包括数据源的实际节点列表、数据库策略、表策略以及主键生成策略(如Snowflake算法)。
- 关联事务的表需在配置中明确指定为绑定表以处理分布式事务。
- Seata通过配置文件或YAML方式指定分库分表规则,例如:
-
分布式事务处理:
- 强调了在分库分表场景下,分布式事务的处理至关重要,特别是要解决事务的一致性和数据完整性问题。
- Seata AT模式在此场景下能够提供事务上下文管理和分布式事务的自动补偿机制,确保多个微服务间的操作要么全部成功,要么全部回滚。
-
主键生成策略:
- 强调了主键生成策略在分库分表中尤为重要,Seata支持Snowflake算法,同时也提示开发者可以根据业务需求自定义主键生成策略。
-
常见问题与注意事项:
- 介绍了二阶段提交协议(2PC)在分库分表场景下可能出现的问题,如同步阻塞、单点故障、数据不一致性等。
- 强调了在分表策略设计中要注意避免Groovy脚本在计算时产生浮点数,推荐使用
intdiv
方法代替除法运算。
分库分表后的分布式事务处理方案
在实施分库分表后的分布式事务处理方案,包括主键生成策略的设计、分片策略的定制、ShardingSphere框架的扩展点,以及如何在分库分表场景下实现XA事务和BASE柔性事务。以下是文档核心内容的解读:
-
主键生成策略
- 在分布式环境中,设计主键生成策略时要考虑全局唯一性、高性能、高可用和趋势递增等特点。
- 数据库自增主键在分布式环境下可能导致冲突,可以通过集群模式(不同节点设置不同的起始值和步长)解决,但不利于扩展和高并发。
- ShardingSphere提供两种内置的分布式主键生成策略:UUID(简单、无需网络、全局唯一,但字符串形式无序、长度较长,对MySQL性能有较大影响)和SnowFlake(趋势递增、全局唯一、无网络开销,但依赖于机器时钟,时钟回拨可能导致冲突)。
- 可以基于Redis扩展自定义的分布式主键生成器,例如OrderByRedisKeyGenerator。
-
分片策略定制
- 示例中展示了一种按年份分片后再按模数取余平均分配数据的分片策略。在分库分表配置中,通过
OrderPreciseShardingAlgorithm
实现类,按照订单表oms_order
的ID逻辑进行分片,实际表名包含年份和按特定算法计算出的分区号。
- 示例中展示了一种按年份分片后再按模数取余平均分配数据的分片策略。在分库分表配置中,通过
-
ShardingSphere扩展点
- ShardingSphere通过SPI机制提供了丰富的扩展点,允许用户根据实际需求定制分片策略,例如Inline、Mod、HashMod、FixedInterval、MutableInterval等多种分片算法。
-
分布式事务处理方式
- 在分库分表场景下,通常选择两阶段提交(XA事务)或BASE柔性事务来处理分布式事务。
- XA事务遵循严格的ACID特性,通过两阶段提交协议来保证一致性,但存在同步阻塞、单点故障和数据不一致等问题。示例代码中展示了如何在MySQL中使用XA事务并通过Java代码进行事务协调。
- BASE柔性事务(如Seata AT模式)追求最终一致性,牺牲强一致性以换取更好的性能和可用性。Seata通过AT模式简化了XA的资源锁定要求,降低了性能损失。
-
分库分表场景下的分布式事务实战
- 在分库分表后,涉及多张表的事务处理变得更加复杂,例如在订单服务中,插入订单表和订单详情表必须保证原子性。
- ShardingSphere通过集成Seata或其他分布式事务框架,支持XA事务和Seata AT事务。开发者可通过SPI扩展点自定义事务管理器,并在应用中配置相关属性和注解以启用分布式事务支持。
-
问题与挑战
- 分布式事务处理面临的主要挑战包括:如何在分库分表后保持事务一致性、如何在高并发场景下维持高效性能、如何处理数据迁移与扩展、如何平衡读写隔离和资源锁定带来的性能影响。
shardingsphere的XA实现
引入依赖:
<!-- 使用XA事务时,需要引入此模块 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-transaction-xa-core</artifactId>
<version>${shardingsphere.version}</version>
</dependency>
在MyBatis中打开@EnableTransactionManagement注解,然后注入PlatformTransactionManager对象。在业务方法上打开@Transactional注解和@ShardingTransactionType(TransactionType.XA) 注解。
shardingsphere的AT实现
首先需要提前部署好Seata,采用nacos配置中心,把Seata的一大堆配置项都提前导入到nacos中。使用时,需要引入依赖以下依赖,并且把XA事务的依赖去掉.
<!-- 使用BASE事务时,需要引入此模块 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-transaction-base-seata-at</artifactId>
<version>${sharding-sphere.version}</version>
</dependency>
在resource下添加seata.conf文件。文件内容:
client {
application.id = example ## 应用唯一id
transaction.service.group = my_test_tx_group ## 所属事务组
}
在每个数据分片建立undo_log表
CREATE TABLE IF NOT EXISTS `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'increment id',
`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as
serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME NOT NULL COMMENT 'modify datetime',
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
在MyBatis中打开@EnableTransactionManagement注解。然后注入PlatformTransactionManager对象。然后在业务方法上打开@Transactional注解和 @ShardingTransactionType(TransactionType.BASE) 注解。
使用上就是一个注解就搞定,比较麻烦的是配置那块的内容。
Sharding本身支持本地事务。
📢文章总结
对本篇文章进行总结:
🔔以上就是今天要讲的内容,阅读结束后,反思和总结所学内容,并尝试应用到现实中,有助于深化理解和应用知识。与朋友或同事分享所读内容,讨论细节并获得反馈,也有助于加深对知识的理解和吸收。
🔔如果您需要转载或者搬运这篇文章的话,非常欢迎您私信我哦~
🚀🎉希望各位读者大大多多支持用心写文章的博主,现在时代变了,🚀🎉 信息爆炸,酒香也怕巷子深🔥,博主真的需要大家的帮助才能在这片海洋中继续发光发热🎨,所以,🏃💨赶紧动动你的小手,点波关注❤️,点波赞👍,点波收藏⭐,甚至点波评论✍️,都是对博主最好的支持和鼓励!
- 💂 博客主页: 我是廖志伟
- 👉开源项目:java_wxid
- 🌥 哔哩哔哩:我是廖志伟
- 🎏个人社区:幕后大佬
- 🔖个人微信号:
SeniorRD
- 🎉微信号二维码:
📥博主目标
- 🍋程序开发这条路不能停,停下来容易被淘汰掉,吃不了自律的苦,就要受平庸的罪,持续的能力才能带来持续的自信。我本是一个很普通的程序员,放在人堆里,除了与生俱来的盛世美颜,就剩180的大高个了,就是我这样的一个人,默默写博文也有好多年了。
- 📺有句老话说的好,牛逼之前都是傻逼式的坚持,希望自己可以通过大量的作品、时间的积累、个人魅力、运气、时机,可以打造属于自己的技术影响力。
- 💥内心起伏不定,我时而激动,时而沉思。我希望自己能成为一个综合性人才,具备技术、业务和管理方面的精湛技能。我想成为产品架构路线的总设计师,团队的指挥者,技术团队的中流砥柱,企业战略和资本规划的实战专家。
- 🎉这个目标的实现需要不懈的努力和持续的成长,但我必须努力追求。因为我知道,只有成为这样的人才,我才能在职业生涯中不断前进并为企业的发展带来真正的价值。在这个不断变化的时代,我们必须随时准备好迎接挑战,不断学习和探索新的领域,才能不断地向前推进。我坚信,只要我不断努力,我一定会达到自己的目标。
🔔有需要对自己进行综合性评估,进行职业方向规划,我可以让技术大牛帮你模拟面试、针对性的指导、传授面试技巧、简历优化、进行技术问题答疑等服务。
可访问:https://java_wxid.gitee.io/tojson/