Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源事务,分布式事务
文章目录
- Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源事务,分布式事务
- 0.前言
- 1. 基础介绍
- ConnectionFactory
- AbstractRoutingDataSource 动态路由数据源的抽象类
- DynamicLocalTransactionInterceptor 动态的本地事务拦截器
- 3. 使用步骤示例
- 4. 官方源码分析
- 5. 参考资料
0.前言
背景
处理多数据源事务一直是一个复杂而棘手的问题,通常我们有两种主流的解决方法。
第一种是通过Atomikos手动创建多数据源事务,这种方法更适合数据源数量较少,参数配置不复杂,对性能要求不高的项目。然而,这种方法的最大困难在于需要手动配置大量设置,这可能会消耗大量时间。
第二种是通过使用Seata等分布式事务解决方案。这种方法的难点在于需要建立并维护像Seata-server这样的统一管理中心。
今天我们使用Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源 实现分布式事务和本地多数据源事务。
每种解决方案都有其适用的场景,然而在实际操作中,我经常接到如下的问题:
“我为什么在添加了事务注解之后,数据源切换还是失败了?”
“我了解到这涉及到分布式事务,但我并不想使用Seata。我的场景比较简单,有没有不需要依赖第三方的解决方案?”
这些问题突显出在现实工作中,我们可能需要更灵活、更简便的解决方案来处理多数据源事务问题。
1. 基础介绍
自从3.3.0
开始,由seata
的核心贡献者https://github.com/a364176773 贡献了基于connection
代理的方案。
完整代码 https://github.com/baomidou/dynamic-datasource-spring-boot-starter/commit/f0cbad193528296eeb64faa76c79743afbdd811d
建议从3.4.0
版本开始使用,其修复了一个功能,老版本不加@DS
只加@DSTransactional
会报错。
核心的几处代码
@Role(value = BeanDefinition.ROLE_INFRASTRUCTURE)
@ConditionalOnProperty(prefix = DynamicDataSourceProperties.PREFIX, name = "seata", havingValue = "false",
matchIfMissing = true)
@Bean
public Advisor localTransactionAdvisor() {
AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
pointcut.setExpression("@annotation(com.baomidou.dynamic.datasource.annotation.DSTransactional)");
return new DefaultPointcutAdvisor(pointcut, new DynamicTransactionAdvisor());
}
我们可以看到通过spring.datasource.dynamic.seata=true
配置来启用条件注解。这个是dynamic-datasource
支持seata事务的开发和入口。
ConnectionFactory
ConnectionFactory
是一个工厂类,主要的作用是管理数据库连接,并提供获取和存储数据库连接的功能。
-
存储每个线程独立的数据库连接:
ConnectionFactory
使用ThreadLocal
为每个线程提供其自己的数据库连接池,这样可以防止在多线程环境中数据库连接的混乱。 -
提供获取数据库连接的方法:
ConnectionFactory
提供getConnection
方法,使得在同一个线程中的多个模块可以共享同一个数据库连接。 -
提供存储数据库连接的方法:
ConnectionFactory
提供putConnection
方法,可以存储新的数据库连接到当前线程的数据库连接池中。 -
提供通知数据库连接的方法:
ConnectionFactory
提供notify
方法,可以对当前线程的所有数据库连接进行统一的操作,比如提交或者回滚事务。
通过这些功能,ConnectionFactory
实现了数据库连接的有效管理,保证了在同一线程中对多个数据库进行操作时,可以共享同一连接,实现事务管理。核心代码如下。大家可以借鉴
package com.baomidou.dynamic.datasource.tx;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author funkye
*/
public class ConnectionFactory {
// 使用ThreadLocal来保存与当前线程相关的数据库连接信息,以Map形式存储,Map中的key为数据源名称,value为对应的数据库连接代理类
private static final ThreadLocal<Map<String, ConnectionProxy>> CONNECTION_HOLDER =
new ThreadLocal<Map<String, ConnectionProxy>>() {
@Override
protected Map<String, ConnectionProxy> initialValue() {
return new ConcurrentHashMap<>(8);
}
};
// 存储数据库连接到当前线程的连接池中,如果当前线程的连接池中没有该数据源的连接,则新建一个并放入
public static void putConnection(String ds, ConnectionProxy connection) {
Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
if (!concurrentHashMap.containsKey(ds)) {
try {
connection.setAutoCommit(false);
} catch (SQLException e) {
e.printStackTrace();
}
concurrentHashMap.put(ds, connection);
}
}
// 从当前线程的连接池中获取指定数据源的数据库连接
public static ConnectionProxy getConnection(String ds) {
return CONNECTION_HOLDER.get().get(ds);
}
// 对当前线程的所有数据库连接执行通知操作,根据参数state决定是提交还是回滚,如果在执行过程中发生错误,则在所有连接处理完后抛出
public static void notify(Boolean state) throws Exception {
Exception exception = null;
try {
Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {
try {
connectionProxy.notify(state);
} catch (SQLException e) {
exception = e;
}
}
} finally {
CONNECTION_HOLDER.remove(); //清除当前线程的连接池
if (exception != null) {
throw exception;
}
}
}
}
AbstractRoutingDataSource 动态路由数据源的抽象类
动态路由数据源的抽象类,用于根据不同的业务需要,动态地选择需要使用的数据源。关键的方法是getConnection()
和getConnection(String username, String password)
,这两个方法会根据当前是否存在全局事务来动态地选择获取原始的数据库连接还是数据库连接代理。
public abstract class AbstractRoutingDataSource extends AbstractDataSource {
// 抽象方法,子类需要实现该方法以确定数据源
protected abstract DataSource determineDataSource();
// 抽象方法,子类需要实现该方法以确定默认的数据源名称
protected abstract String getPrimary();
// 获取数据库连接,根据事务上下文中是否有XID来判断是否需要获取代理连接
@Override
public Connection getConnection() throws SQLException {
String xid = TransactionContext.getXID();
if (StringUtils.isEmpty(xid)) {
// 如果没有XID,说明当前不处于全局事务中,直接获取原始连接
return determineDataSource().getConnection();
} else {
// 如果有XID,说明当前处于全局事务中,需要获取代理连接
String ds = DynamicDataSourceContextHolder.peek();
ds = StringUtils.isEmpty(ds) ? getPrimary() : ds;
ConnectionProxy connection = ConnectionFactory.getConnection(ds);
return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;
}
}
// 与上面的方法类似,只不过这个方法可以传入用户名和密码来获取数据库连接
@Override
public Connection getConnection(String username, String password) throws SQLException {
String xid = TransactionContext.getXID();
if (StringUtils.isEmpty(xid)) {
return determineDataSource().getConnection(username, password);
} else {
String ds = DynamicDataSourceContextHolder.peek();
ds = StringUtils.isEmpty(ds) ? getPrimary() : ds;
ConnectionProxy connection = ConnectionFactory.getConnection(ds);
return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection(username, password))
: connection;
}
}
// 创建数据库连接代理,并将代理连接放入连接工厂
private Connection getConnectionProxy(String ds, Connection connection) {
ConnectionProxy connectionProxy = new ConnectionProxy(connection, ds);
ConnectionFactory.putConnection(ds, connectionProxy);
return connectionProxy;
}
// 获取指定类型的代理对象
@Override
@SuppressWarnings("unchecked")
public <T> T unwrap(Class<T> iface) throws SQLException {
if (iface.isInstance(this)) {
return (T) this;
}
return determineDataSource().unwrap(iface);
}
// 判断是否是指定类型的代理对象
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return (iface.isInstance(this) || determineDataSource().isWrapperFor(iface));
}
}
DynamicLocalTransactionInterceptor 动态的本地事务拦截器
动态的本地事务拦截器。基本思想是在方法调用前后添加事务处理的逻辑。当这个拦截器被应用到某个方法时,那么在调用这个方法时,会首先检查当前是否已经存在事务,如果存在则直接调用原始方法。如果不存在,则会先开启一个新的事务,然后调用原始方法,方法结束后根据方法执行的结果来提交或回滚事务。入口在这,看一眼就懂了。
// 实现MethodInterceptor接口定义拦截器
public class DynamicLocalTransactionInterceptor implements MethodInterceptor {
@Override
// invoke方法会在原方法执行前后进行拦截
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
// 如果当前上下文中已存在事务,则直接调用原方法,不进行拦截处理
if (!StringUtils.isEmpty(TransactionContext.getXID())) {
return methodInvocation.proceed();
}
// 定义一个状态标志,标记事务是否执行成功
boolean state = true;
Object o;
// 开启一个新的事务
LocalTxUtil.startTransaction();
try {
// 调用原始方法
o = methodInvocation.proceed();
} catch (Exception e) {
// 如果原方法执行抛出异常,则标记事务执行失败
state = false;
throw e;
} finally {
// 根据事务执行状态,提交或回滚事务
if (state) {
LocalTxUtil.commit();
} else {
LocalTxUtil.rollback();
}
}
// 返回原方法的执行结果
return o;
}
}
3. 使用步骤示例
官方示例:https://github.com/dynamic-datasource/dynamic-datasource-samples/tree/master/tx-samples/tx-local-sample
完整示例项目 数据库都已准备好,可以直接运行测试。http://localhost:8080/doc.html
示例项目A,B,C分别对应OrderService,ProductService,AccountService。分别是独立的数据库。
用户下单分别调用产品库扣库存,账户库扣余额。
如果库存不足,或用户余额不足都抛出RuntimeException,触发整体回滚。
@Slf4j
@Service
@AllArgsConstructor
public class OrderService {
private final OrderMapper orderMapper;
private final AccountService accountService;
private final ProductService productService;
//@DS("order") 这里不需要,因为order是默认库,如果开启事务的不是默认库则必须加
@DSTransactional //注意这里开启事务
public void placeOrder(PlaceOrderRequest request) {
log.info("=============ORDER START=================");
Long userId = request.getUserId();
Long productId = request.getProductId();
Integer amount = request.getAmount();
log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);
log.info("当前 XID: {}", TransactionContext.getXID());
Order order = Order.builder()
.userId(userId)
.productId(productId)
.status(OrderStatus.INIT)
.amount(amount)
.build();
orderMapper.insert(order);
log.info("订单一阶段生成,等待扣库存付款中");
// 扣减库存并计算总价
Double totalPrice = productService.reduceStock(productId, amount);
// 扣减余额
accountService.reduceBalance(userId, totalPrice);
order.setStatus(OrderStatus.SUCCESS);
order.setTotalPrice(totalPrice);
orderMapper.updateById(order);
log.info("订单已成功下单");
log.info("=============ORDER END=================");
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class ProductService {
private final ProductMapper productMapper;
@DS("product")
public Double reduceStock(Long productId, Integer amount) {
log.info("=============PRODUCT START=================");
log.info("当前 XID: {}", TransactionContext.getXID());
// 检查库存
Product product = productMapper.selectById(productId);
Assert.notNull(product, "商品不存在");
Integer stock = product.getStock();
log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount);
if (stock < amount) {
log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock);
throw new RuntimeException("库存不足");
}
log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice());
// 扣减库存
int currentStock = stock - amount;
product.setStock(currentStock);
productMapper.updateById(product);
double totalPrice = product.getPrice() * amount;
log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice);
log.info("=============PRODUCT END=================");
return totalPrice;
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class AccountService {
private final AccountMapper accountMapper;
@DS("account")
public void reduceBalance(Long userId, Double price) {
log.info("=============ACCOUNT START=================");
log.info("当前 XID: {}", TransactionContext.getXID());
Account account = accountMapper.selectById(userId);
Assert.notNull(account, "用户不存在");
Double balance = account.getBalance();
log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price);
if (balance < price) {
log.warn("用户 {} 余额不足,当前余额:{}", userId, balance);
throw new RuntimeException("余额不足");
}
log.info("开始扣减用户 {} 余额", userId);
double currentBalance = account.getBalance() - price;
account.setBalance(currentBalance);
accountMapper.updateById(account);
log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance);
log.info("=============ACCOUNT END=================");
}
}
4. 官方源码分析
5. 参考资料
- dynamic-datasource GitHub 仓库 ↗:dynamic-datasource 的官方 GitHub 仓库,包含源代码、文档和示例等资源。