前言:在最近的实际开发的过程中,遇到了在多数据源的情况下要保证原子性的问题,这个问题当时遇到了也是思考了一段时间,后来通过搜集大量资料与学习,最后是采用了分布式事务来解决这个问题,在讲解之前,在我往期的博客提前搭好了一个SpringBoot整合MyBatis搭建MySQL多数据源的教程,本篇博客我是在原有的这个项目的基础之上进行的改造,主要新增了几个配置项进行实现的,大家可以先简单的看一下这篇文章!
文章链接:【万字长文】SpringBoot整合MyBatis搭建MySQL多数据源完整教程(提供Gitee源码)
在文章的最后我也把整合好的完整代码都进行了提供!
目录
一、什么是Atomikos
二、什么是XA
三、项目整体结构截图
四、导入pom依赖
五、配置mybatis-config.xml文件
六、修改yml文件
七、配置类
7.1、Atomikos配置类
7.2、DynamicSqlSessionTemplate动态切换数据源配置类
7.3、Druid配置属性
7.4、DruidConfig多数据源核心配置类
7.5、MyBatis配置类
八、运行测试
九、Gitee源码
十、总结
一、什么是Atomikos
Atomikos是一个开源的分布式事务管理器,它可以为分布式系统提供事务管理的解决方案。Atomikos的主要作用有:
1、支持分布式事务,确保跨多个数据源的事务一致性在分布式系统中,一个业务操作可能涉及多个数据库或服务。Atomikos可以协调多个数据源,要么一起提交事务,要么一起回滚,从而保证分布式环境下数据的一致性。
2、支持各种数据库和事务APIAtomikos支持JTA规范,可以通过JTA接口与应用程序集成。同时它对各种数据库如MySQL、PostgreSQL都提供了支持,还支持REST事务等。
3、确保事务的ACID特性Atomikos通过两阶段提交协议,可以确保分布式事务满足原子性、一致性、隔离性的特性,防止分布式事务状态不一致。
4、高可用和故障转移Atomikos本身可以通过设置主备模式来提供高可用性,同时还可以与负载均衡器集成实现故障转移和高可用。
5、管理和监控平台Atomikos自带了管理控制台和日志监控功能,可以方便查看事务信息、统计数据、运行情况等。
总之,Atomikos是一个非常强大和成熟的分布式事务管理器,它为构建健壮可靠的分布式系统提供了关键的事务保证。在分布式场景下,Atomikos可以说是事务处理的不二之选!
二、什么是XA
XA(eXtendedArchitecture)是一种分布式事务处理的标准规范和架构。
主要特征:
1、支持在多个数据库之间进行分布式事务管理。
2、事务处理符合ACID特性。
3、通过事务管理器(TransactionManager)调度事务。
4、通过XA数据源(XADataSource)抽象数据库的事务行为。
5、通过两阶段提交协议(2PC)保证分布式事务的一致性。
其关键角色包括:
1、事务管理器(TM):协调多个数据库的事务,管理全局事务。
2、应用程序(AP):通过XA接口与TM交互,驱动全局事务。
3、资源管理器(RM):数据库的一个抽象,对事务进行持久化保证。
4、XA数据源:实现XA接口,封装了RM,使数据库与TM进行交互。
XA的实现(如Atomikos)通过两阶段提交,能够很好地解决分布式事务的一致性问题,是实现分布式事务的重要方式之一。
三、项目整体结构截图
仅仅实在原有搭建好的多数据源的框架上进行了改造!
四、导入pom依赖
<!-- atomikos分布式事务 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
<version>2.7.14</version>
</dependency>
五、配置mybatis-config.xml文件
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<!-- 全局参数 -->
<settings>
<!-- 使全局的映射器启用或禁用缓存 -->
<setting name="cacheEnabled" value="true" />
<!-- 允许JDBC 支持自动生成主键 -->
<setting name="useGeneratedKeys" value="true" />
<!-- 配置默认的执行器.SIMPLE就是普通执行器;REUSE执行器会重用预处理语句(prepared statements);BATCH执行器将重用语句并执行批量更新 -->
<setting name="defaultExecutorType" value="SIMPLE" />
<!-- 指定 MyBatis 所用日志的具体实现 -->
<setting name="logImpl" value="SLF4J" />
<!-- 使用驼峰命名法转换字段 -->
<!-- <setting name="mapUnderscoreToCamelCase" value="true"/> -->
</settings>
</configuration>
这是目录结构,在resource目录下新建一个mybatis目录。
六、修改yml文件
主要修改的是MyBatis的配置,其他不动!
# MyBatis配置
mybatis:
# 搜索指定包别名
typeAliasesPackage: com.example.**.domain
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapperLocations: classpath:mapper/*/*.xml
# 加载全局的配置文件
configLocation: classpath:mybatis/mybatis-config.xml
七、配置类
我先简单的梳理一下5个配置类的作用和它们之间的联系。
1、DruidProperties这是Druid数据库连接池的配置属性类,里面定义了各个数据源的url、username等信息。
2、AtomikosConfig这是Atomikos事务管理器的配置类,创建了Atomikos的事务管理器AtomikosJtaTransactionManager。
3、DruidConfig该类利用DruidProperties创建了主从数据源masterDataSource和slaveDataSource,并用AtomikosDataSourceBean封装了它们。
4、MyBatisConfig这是MyBatis的配置类,它利用masterDataSource和slaveDataSource分别创建了两个SqlSessionFactory。
5、DynamicSqlSessionTemplate该类继承Spring的SqlSessionTemplate,实现了一个动态切换数据源的SqlSessionTemplate。它内部通过一个ThreadLocal持有当前数据源,在执行sql时,会根据数据源切换到对应的SqlSessionFactory。这样就实现了基于不同数据源的动态切换。
总结一下,DruidProperties和AtomikosConfig提供了基础配置,DruidConfig创建了Atomikos数据源,MyBatisConfig创建了SqlSessionFactory,DynamicSqlSessionTemplate实现动态切换数据源和SqlSessionFactory,从而达到事务管理+动态多数据源的效果。
7.1、Atomikos配置类
1、这个Bean创建了UserTransactionImp,它实现了JTA的UserTransaction接口,用于Begin/Commit/Rollback事务。
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable
{
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
2、这个Bean创建了Atomikos的事务管理器UserTransactionManager,它实现了JTA的TransactionManager接口。这个是核心的事务管理器。
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public TransactionManager atomikosTransactionManager() throws Throwable
{
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
3、这个Bean创建了JtaTransactionManager,它实现了Spring的PlatformTransactionManager接口。它依赖userTransaction和atomikosTransactionManager两个Bean。JtaTransactionManager作为桥接,把Atomikos的JTA事务管理机制桥接到Spring体系中,让Spring能够通过声明式事务(例如@Transactional))来使用Atomikos进行分布式事务管理。
@Bean(name = "transactionManager")
@DependsOn({ "userTransaction", "atomikosTransactionManager" })
public PlatformTransactionManager transactionManager() throws Throwable
{
UserTransaction userTransaction = userTransaction();
TransactionManager atomikosTransactionManager = atomikosTransactionManager();
return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
}
完整代码:
package com.example.multiple.config;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
/**
* JTA 事务配置
*
*/
@Configuration
public class AtomikosConfig
{
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable
{
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public TransactionManager atomikosTransactionManager() throws Throwable
{
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
@Bean(name = "transactionManager")
@DependsOn({ "userTransaction", "atomikosTransactionManager" })
public PlatformTransactionManager transactionManager() throws Throwable
{
UserTransaction userTransaction = userTransaction();
TransactionManager atomikosTransactionManager = atomikosTransactionManager();
return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
}
}
7.2、DynamicSqlSessionTemplate动态切换数据源配置类
这个DynamicSqlSessionTemplate类是一个自定义的SqlSessionTemplate,它的作用是动态切换MyBatis的SqlSessionFactory,从而实现动态数据源切换。
主要逻辑是通过一个Interceptor拦截SqlSession方法调用,在调用前从Context中获取当前数据源,然后使用对应数据源的SqlSessionFactory创建SqlSession。
这么做的好处是不同数据源的MyBatis操作可以通过同一个SqlSessionTemplate实例处理,避免代码里出现多个模板。
其和Atomikos的关系在于:Atomikos要管理多个数据源时,需要给每个数据源配置独立的SqlSessionFactory。如果不使用动态模板,就需要在代码里维护多个SqlSessionTemplate。而DynamicSqlSessionTemplate可以根据当前数据源动态切换SqlSessionFactory,所以只需要配置一个DynamicSqlSessionTemplate实例即可。
在Atomikos多数据源的场景下,可以避免代码里出现多个SqlSessionTemplate。
所以,DynamicSqlSessionTemplate配合Atomikos使用,可以简化代码:
1、为每个数据源配置独立SqlSessionFactory。
2、使用DynamicSqlSessionTemplate+动态数据源Context统一管理。
3、不同数据源的数据库操作通过同一个模板即可执行。
完整代码:
package com.example.multiple.config.datasource;
import org.apache.ibatis.exceptions.PersistenceException;
import org.apache.ibatis.executor.BatchResult;
import org.apache.ibatis.session.*;
import org.mybatis.spring.MyBatisExceptionTranslator;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
import static java.lang.reflect.Proxy.newProxyInstance;
import static org.apache.ibatis.reflection.ExceptionUtil.unwrapThrowable;
import static org.mybatis.spring.SqlSessionUtils.*;
/**
* 自定义SqlSessionTemplate,动态切换数据源
*/
public class DynamicSqlSessionTemplate extends SqlSessionTemplate
{
private final SqlSessionFactory sqlSessionFactory;
private final ExecutorType executorType;
private final SqlSession sqlSessionProxy;
private final PersistenceExceptionTranslator exceptionTranslator;
private Map<Object, SqlSessionFactory> targetSqlSessionFactorys;
private SqlSessionFactory defaultTargetSqlSessionFactory;
public DynamicSqlSessionTemplate(SqlSessionFactory sqlSessionFactory)
{
this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType());
}
public DynamicSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType)
{
this(sqlSessionFactory, executorType, new MyBatisExceptionTranslator(
sqlSessionFactory.getConfiguration().getEnvironment().getDataSource(), true));
}
public DynamicSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator)
{
super(sqlSessionFactory, executorType, exceptionTranslator);
this.sqlSessionFactory = sqlSessionFactory;
this.executorType = executorType;
this.exceptionTranslator = exceptionTranslator;
this.sqlSessionProxy = (SqlSession) newProxyInstance(SqlSessionFactory.class.getClassLoader(),
new Class[] { SqlSession.class }, new SqlSessionInterceptor());
this.defaultTargetSqlSessionFactory = sqlSessionFactory;
}
public void setTargetSqlSessionFactorys(Map<Object, SqlSessionFactory> targetSqlSessionFactorys)
{
this.targetSqlSessionFactorys = targetSqlSessionFactorys;
}
public void setDefaultTargetSqlSessionFactory(SqlSessionFactory defaultTargetSqlSessionFactory)
{
this.defaultTargetSqlSessionFactory = defaultTargetSqlSessionFactory;
}
@Override
public SqlSessionFactory getSqlSessionFactory()
{
SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys
.get(DynamicDataSourceContextHolder.getDataSourceType());
if (targetSqlSessionFactory != null)
{
return targetSqlSessionFactory;
}
else if (defaultTargetSqlSessionFactory != null)
{
return defaultTargetSqlSessionFactory;
}
return this.sqlSessionFactory;
}
@Override
public Configuration getConfiguration()
{
return this.getSqlSessionFactory().getConfiguration();
}
public ExecutorType getExecutorType()
{
return this.executorType;
}
public PersistenceExceptionTranslator getPersistenceExceptionTranslator()
{
return this.exceptionTranslator;
}
/**
* {@inheritDoc}
*/
public <T> T selectOne(String statement)
{
return this.sqlSessionProxy.<T> selectOne(statement);
}
/**
* {@inheritDoc}
*/
public <T> T selectOne(String statement, Object parameter)
{
return this.sqlSessionProxy.<T> selectOne(statement, parameter);
}
/**
* {@inheritDoc}
*/
public <K, V> Map<K, V> selectMap(String statement, String mapKey)
{
return this.sqlSessionProxy.<K, V> selectMap(statement, mapKey);
}
/**
* {@inheritDoc}
*/
public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey)
{
return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey);
}
/**
* {@inheritDoc}
*/
public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds)
{
return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey, rowBounds);
}
/**
* {@inheritDoc}
*/
public <E> List<E> selectList(String statement)
{
return this.sqlSessionProxy.<E> selectList(statement);
}
/**
* {@inheritDoc}
*/
public <E> List<E> selectList(String statement, Object parameter)
{
return this.sqlSessionProxy.<E> selectList(statement, parameter);
}
/**
* {@inheritDoc}
*/
public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds)
{
return this.sqlSessionProxy.<E> selectList(statement, parameter, rowBounds);
}
/**
* {@inheritDoc}
*/
@SuppressWarnings("rawtypes")
public void select(String statement, ResultHandler handler)
{
this.sqlSessionProxy.select(statement, handler);
}
/**
* {@inheritDoc}
*/
@SuppressWarnings("rawtypes")
public void select(String statement, Object parameter, ResultHandler handler)
{
this.sqlSessionProxy.select(statement, parameter, handler);
}
/**
* {@inheritDoc}
*/
@SuppressWarnings("rawtypes")
public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler)
{
this.sqlSessionProxy.select(statement, parameter, rowBounds, handler);
}
/**
* {@inheritDoc}
*/
public int insert(String statement)
{
return this.sqlSessionProxy.insert(statement);
}
/**
* {@inheritDoc}
*/
public int insert(String statement, Object parameter)
{
return this.sqlSessionProxy.insert(statement, parameter);
}
/**
* {@inheritDoc}
*/
public int update(String statement)
{
return this.sqlSessionProxy.update(statement);
}
/**
* {@inheritDoc}
*/
public int update(String statement, Object parameter)
{
return this.sqlSessionProxy.update(statement, parameter);
}
/**
* {@inheritDoc}
*/
public int delete(String statement)
{
return this.sqlSessionProxy.delete(statement);
}
/**
* {@inheritDoc}
*/
public int delete(String statement, Object parameter)
{
return this.sqlSessionProxy.delete(statement, parameter);
}
/**
* {@inheritDoc}
*/
public <T> T getMapper(Class<T> type)
{
return getConfiguration().getMapper(type, this);
}
/**
* {@inheritDoc}
*/
public void commit()
{
throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
public void commit(boolean force)
{
throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
public void rollback()
{
throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
public void rollback(boolean force)
{
throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
public void close()
{
throw new UnsupportedOperationException("Manual close is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
public void clearCache()
{
this.sqlSessionProxy.clearCache();
}
/**
* {@inheritDoc}
*/
public Connection getConnection()
{
return this.sqlSessionProxy.getConnection();
}
/**
* {@inheritDoc}
*
* @since 1.0.2
*/
public List<BatchResult> flushStatements()
{
return this.sqlSessionProxy.flushStatements();
}
/**
* Proxy needed to route MyBatis method calls to the proper SqlSession got from Spring's Transaction Manager It also
* unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to pass a {@code PersistenceException} to
* the {@code PersistenceExceptionTranslator}.
*/
private class SqlSessionInterceptor implements InvocationHandler
{
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
final SqlSession sqlSession = getSqlSession(DynamicSqlSessionTemplate.this.getSqlSessionFactory(),
DynamicSqlSessionTemplate.this.executorType, DynamicSqlSessionTemplate.this.exceptionTranslator);
try
{
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, DynamicSqlSessionTemplate.this.getSqlSessionFactory()))
{
sqlSession.commit(true);
}
return result;
}
catch (Throwable t)
{
Throwable unwrapped = unwrapThrowable(t);
if (DynamicSqlSessionTemplate.this.exceptionTranslator != null
&& unwrapped instanceof PersistenceException)
{
Throwable translated = DynamicSqlSessionTemplate.this.exceptionTranslator
.translateExceptionIfPossible((PersistenceException) unwrapped);
if (translated != null)
{
unwrapped = translated;
}
}
throw unwrapped;
}
finally
{
closeSqlSession(sqlSession, DynamicSqlSessionTemplate.this.getSqlSessionFactory());
}
}
}
}
7.3、Druid配置属性
都是从yml文件当中读取一些参数,具体的注释代码上都标注了,这边就不多作讲解了。
完整代码:
package com.example.multiple.config.properties;
import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* druid 配置属性
*
*/
@Configuration
public class DruidProperties
{
@Value("${spring.datasource.druid.initialSize}")
private int initialSize;
@Value("${spring.datasource.druid.minIdle}")
private int minIdle;
@Value("${spring.datasource.druid.maxActive}")
private int maxActive;
@Value("${spring.datasource.druid.maxWait}")
private int maxWait;
@Value("${spring.datasource.druid.connectTimeout}")
private int connectTimeout;
@Value("${spring.datasource.druid.socketTimeout}")
private int socketTimeout;
@Value("${spring.datasource.druid.timeBetweenEvictionRunsMillis}")
private int timeBetweenEvictionRunsMillis;
@Value("${spring.datasource.druid.minEvictableIdleTimeMillis}")
private int minEvictableIdleTimeMillis;
@Value("${spring.datasource.druid.maxEvictableIdleTimeMillis}")
private int maxEvictableIdleTimeMillis;
@Value("${spring.datasource.druid.validationQuery}")
private String validationQuery;
@Value("${spring.datasource.druid.testWhileIdle}")
private boolean testWhileIdle;
@Value("${spring.datasource.druid.testOnBorrow}")
private boolean testOnBorrow;
@Value("${spring.datasource.druid.testOnReturn}")
private boolean testOnReturn;
public DruidDataSource dataSource(DruidDataSource datasource)
{
/** 配置初始化大小、最小、最大 */
datasource.setInitialSize(initialSize);
datasource.setMaxActive(maxActive);
datasource.setMinIdle(minIdle);
/** 配置获取连接等待超时的时间 */
datasource.setMaxWait(maxWait);
/** 配置驱动连接超时时间,检测数据库建立连接的超时时间,单位是毫秒 */
datasource.setConnectTimeout(connectTimeout);
/** 配置网络超时时间,等待数据库操作完成的网络超时时间,单位是毫秒 */
datasource.setSocketTimeout(socketTimeout);
/** 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 */
datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
/** 配置一个连接在池中最小、最大生存的时间,单位是毫秒 */
datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis);
/**
* 用来检测连接是否有效的sql,要求是一个查询语句,常用select 'x'。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。
*/
datasource.setValidationQuery(validationQuery);
/** 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 */
datasource.setTestWhileIdle(testWhileIdle);
/** 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
datasource.setTestOnBorrow(testOnBorrow);
/** 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
datasource.setTestOnReturn(testOnReturn);
return datasource;
}
public int getInitialSize()
{
return initialSize;
}
public void setInitialSize(int initialSize)
{
this.initialSize = initialSize;
}
public int getMinIdle()
{
return minIdle;
}
public void setMinIdle(int minIdle)
{
this.minIdle = minIdle;
}
public int getMaxActive()
{
return maxActive;
}
public void setMaxActive(int maxActive)
{
this.maxActive = maxActive;
}
public int getMaxWait()
{
return maxWait;
}
public void setMaxWait(int maxWait)
{
this.maxWait = maxWait;
}
public int getConnectTimeout()
{
return connectTimeout;
}
public void setConnectTimeout(int connectTimeout)
{
this.connectTimeout = connectTimeout;
}
public int getSocketTimeout()
{
return socketTimeout;
}
public void setSocketTimeout(int socketTimeout)
{
this.socketTimeout = socketTimeout;
}
public int getTimeBetweenEvictionRunsMillis()
{
return timeBetweenEvictionRunsMillis;
}
public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis)
{
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
}
public int getMinEvictableIdleTimeMillis()
{
return minEvictableIdleTimeMillis;
}
public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis)
{
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
}
public int getMaxEvictableIdleTimeMillis()
{
return maxEvictableIdleTimeMillis;
}
public void setMaxEvictableIdleTimeMillis(int maxEvictableIdleTimeMillis)
{
this.maxEvictableIdleTimeMillis = maxEvictableIdleTimeMillis;
}
public String getValidationQuery()
{
return validationQuery;
}
public void setValidationQuery(String validationQuery)
{
this.validationQuery = validationQuery;
}
public boolean isTestWhileIdle()
{
return testWhileIdle;
}
public void setTestWhileIdle(boolean testWhileIdle)
{
this.testWhileIdle = testWhileIdle;
}
public boolean isTestOnBorrow()
{
return testOnBorrow;
}
public void setTestOnBorrow(boolean testOnBorrow)
{
this.testOnBorrow = testOnBorrow;
}
public boolean isTestOnReturn()
{
return testOnReturn;
}
public void setTestOnReturn(boolean testOnReturn)
{
this.testOnReturn = testOnReturn;
}
}
7.4、DruidConfig多数据源核心配置类
对比之前搭建好的多数据源项目,主要变更如下:
1、构建Druid数据源属性。
protected Properties build(Environment env, String prefix)
{
Properties prop = new Properties();
prop.put("url", env.getProperty(prefix + "url"));
prop.put("username", env.getProperty(prefix + "username"));
prop.put("password", env.getProperty(prefix + "password"));
prop.put("initialSize", druidProperties.getInitialSize());
prop.put("minIdle", druidProperties.getMinIdle());
prop.put("maxActive", druidProperties.getMaxActive());
prop.put("maxWait", druidProperties.getMaxWait());
prop.put("timeBetweenEvictionRunsMillis", druidProperties.getTimeBetweenEvictionRunsMillis());
prop.put("minEvictableIdleTimeMillis", druidProperties.getMinEvictableIdleTimeMillis());
prop.put("maxEvictableIdleTimeMillis", druidProperties.getMaxEvictableIdleTimeMillis());
prop.put("validationQuery", druidProperties.getValidationQuery());
prop.put("testWhileIdle", druidProperties.isTestWhileIdle());
prop.put("testOnBorrow", druidProperties.isTestOnBorrow());
prop.put("testOnReturn", druidProperties.isTestOnReturn());
return prop;
}
2、创建AtomikosDataSourceBean的数据源实例。
具体逻辑:
1、调用build()方法构建Druid数据源的属性配置。
2、创建AtomikosDataSourceBean对象ds。
3、设置XA数据源全类名为DruidXADataSource,即使用Druid作为连接池。
4、添加连接池限制配置,如最大连接数、最小连接数等。
5、设置uniqueResourceName,即数据源名称。
6、将Druid属性设置为XA属性。
7、返回构建好的AtomikosDataSourceBean实例。
AtomikosDataSourceBean是Atomikos提供的XA数据源实现,它封装了一个普通连接池的数据源,具备分布式事务的功能。这里使用Druid作为底层连接池,通过Atomikos进行XA事务管理。
protected DataSource getDataSource(Environment env, String prefix, String dataSourceName)
{
Properties prop = build(env, prefix);
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
// 添加连接池限制
ds.setMaxPoolSize(50);
ds.setMinPoolSize(5);
ds.setBorrowConnectionTimeout(60);
ds.setUniqueResourceName(dataSourceName);
ds.setXaProperties(prop);
return ds;
}
3、创建主从数据源Bean。
@DependsOn注解表示该Bean依赖transactionManager Bean。
@ConfigurationProperties注解加载配置前缀为"spring.datasource.druid.master"的配置。
getDataSource方法使用DruidDataSourceFactory创建Druid数据源,并配置参数.
@Bean
@DependsOn({"transactionManager"})
@ConfigurationProperties("spring.datasource.druid.master")
public DataSource masterDataSource(Environment env)
{
String prefix = "spring.datasource.druid.master.";
return getDataSource(env, prefix, MASTER);
}
@Bean
@ConfigurationProperties("spring.datasource.druid.slave")
@DependsOn({"transactionManager"})
@ConditionalOnProperty(prefix = "spring.datasource.druid.slave", name = "enabled", havingValue = "true")
public DataSource slaveDataSource(Environment env)
{
String prefix = "spring.datasource.druid.slave.";
return getDataSource(env, prefix, SLAVE);
}
完整代码:
package com.example.multiple.config;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import javax.servlet.*;
import javax.sql.DataSource;
import com.alibaba.druid.spring.boot.autoconfigure.properties.DruidStatProperties;
import com.alibaba.druid.util.Utils;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.example.multiple.config.datasource.DynamicDataSource;
import com.example.multiple.enums.DataSourceType;
import com.example.multiple.config.properties.DruidProperties;
import com.example.multiple.utils.SpringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.core.env.Environment;
@Configuration
public class DruidConfig
{
public static final String MASTER = DataSourceType.MASTER.name();
public static final String SLAVE = DataSourceType.SLAVE.name();
@Autowired
private DruidProperties druidProperties;
@Bean
@DependsOn({"transactionManager"})
@ConfigurationProperties("spring.datasource.druid.master")
public DataSource masterDataSource(Environment env)
{
String prefix = "spring.datasource.druid.master.";
return getDataSource(env, prefix, MASTER);
}
@Bean
@ConfigurationProperties("spring.datasource.druid.slave")
@DependsOn({"transactionManager"})
@ConditionalOnProperty(prefix = "spring.datasource.druid.slave", name = "enabled", havingValue = "true")
public DataSource slaveDataSource(Environment env)
{
String prefix = "spring.datasource.druid.slave.";
return getDataSource(env, prefix, SLAVE);
}
protected DataSource getDataSource(Environment env, String prefix, String dataSourceName)
{
Properties prop = build(env, prefix);
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
// 添加连接池限制
ds.setMaxPoolSize(50);
ds.setMinPoolSize(5);
ds.setBorrowConnectionTimeout(60);
ds.setUniqueResourceName(dataSourceName);
ds.setXaProperties(prop);
return ds;
}
protected Properties build(Environment env, String prefix)
{
Properties prop = new Properties();
prop.put("url", env.getProperty(prefix + "url"));
prop.put("username", env.getProperty(prefix + "username"));
prop.put("password", env.getProperty(prefix + "password"));
prop.put("initialSize", druidProperties.getInitialSize());
prop.put("minIdle", druidProperties.getMinIdle());
prop.put("maxActive", druidProperties.getMaxActive());
prop.put("maxWait", druidProperties.getMaxWait());
prop.put("timeBetweenEvictionRunsMillis", druidProperties.getTimeBetweenEvictionRunsMillis());
prop.put("minEvictableIdleTimeMillis", druidProperties.getMinEvictableIdleTimeMillis());
prop.put("maxEvictableIdleTimeMillis", druidProperties.getMaxEvictableIdleTimeMillis());
prop.put("validationQuery", druidProperties.getValidationQuery());
prop.put("testWhileIdle", druidProperties.isTestWhileIdle());
prop.put("testOnBorrow", druidProperties.isTestOnBorrow());
prop.put("testOnReturn", druidProperties.isTestOnReturn());
return prop;
}
@Bean(name = "dynamicDataSource")
@Primary
public DynamicDataSource dataSource(DataSource masterDataSource)
{
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(MASTER, masterDataSource);
setDataSource(targetDataSources, SLAVE, "slaveDataSource");
return new DynamicDataSource(masterDataSource, targetDataSources);
}
/**
* 设置数据源
*
* @param targetDataSources 备选数据源集合
* @param sourceName 数据源名称
* @param beanName bean名称
*/
public void setDataSource(Map<Object, Object> targetDataSources, String sourceName, String beanName)
{
try
{
DataSource dataSource = SpringUtils.getBean(beanName);
targetDataSources.put(sourceName, dataSource);
}
catch (Exception e)
{
}
}
}
7.5、MyBatis配置类
1、常量-设置Mapper接口的扫描路径。
static final String DEFAULT_RESOURCE_PATTERN = "**/*.class";
2、扫描指定的包路径,获取包下面所有的类,从而自动设置MyBatis的类型别名。
具体逻辑:
1、通过ResourcePatternResolver扫描指定的包路径下的所有class文件资源。
2、对每个class资源,使用MetadataReader读取类信息,拿到完整类名。
3、通过Class.forName获取类,再调用getPackage()、getName()获取包名。
4、将扫描到的所有包名存入List中。
5、使用Set对包名进行去重,避免重复。
6、将包名数组JOIN成字符串,赋值给typeAliasesPackage。
7、 如果扫描不到任何包,则抛出异常。
这样就可以扫描指定包路径下的所有类,自动收集它们的包名,并设置到MyBatis的typeAliasesPackage中。
MyBatis就会自动将这些包名下面的类注册为类型别名,我们在Mapper映射文件中可以直接使用类名,而不需要写完整的全限定类名。这种方式可以大大简化配置,也更灵活,当新增子包或者类时,不需要手动维护typeAliases配置。
public static String setTypeAliasesPackage(String typeAliasesPackage)
{
ResourcePatternResolver resolver = (ResourcePatternResolver) new PathMatchingResourcePatternResolver();
MetadataReaderFactory metadataReaderFactory = new CachingMetadataReaderFactory(resolver);
List<String> allResult = new ArrayList<String>();
try
{
for (String aliasesPackage : typeAliasesPackage.split(","))
{
List<String> result = new ArrayList<String>();
aliasesPackage = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX
+ ClassUtils.convertClassNameToResourcePath(aliasesPackage.trim()) + "/" + DEFAULT_RESOURCE_PATTERN;
Resource[] resources = resolver.getResources(aliasesPackage);
if (resources != null && resources.length > 0)
{
MetadataReader metadataReader = null;
for (Resource resource : resources)
{
if (resource.isReadable())
{
metadataReader = metadataReaderFactory.getMetadataReader(resource);
try
{
result.add(Class.forName(metadataReader.getClassMetadata().getClassName()).getPackage().getName());
}
catch (ClassNotFoundException e)
{
e.printStackTrace();
}
}
}
}
if (result.size() > 0)
{
HashSet<String> hashResult = new HashSet<String>(result);
allResult.addAll(hashResult);
}
}
if (allResult.size() > 0)
{
typeAliasesPackage = String.join(",", (String[]) allResult.toArray(new String[0]));
}
else
{
throw new RuntimeException("mybatis typeAliasesPackage 路径扫描错误,参数typeAliasesPackage:" + typeAliasesPackage + "未找到任何包");
}
}
catch (IOException e)
{
e.printStackTrace();
}
return typeAliasesPackage;
}
2、用来解析MyBatis的Mapper接口位置,将字符串位置转换为Resource资源对象。
具体逻辑:
1、创建PathMatchingResourcePatternResolver对象,它可以匹配ANT样式的资源路径。
2、按照给定的mapperLocations数组,逐个解析字符串路径。
3、使用getResources()获取路径对应的Resource资源数组。
4、将所有解析到的Resource资源添加到List中。
5、最后转换为Resource数组返回。
这样做的好处是,我们可以在MyBatis配置文件中,使用类似“classpath*:com/my/mappers/**/*.xml”这样的通配符路径。
public Resource[] resolveMapperLocations(String[] mapperLocations)
{
ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
List<Resource> resources = new ArrayList<Resource>();
if (mapperLocations != null)
{
for (String mapperLocation : mapperLocations)
{
try
{
Resource[] mappers = resourceResolver.getResources(mapperLocation);
resources.addAll(Arrays.asList(mappers));
}
catch (IOException e)
{
// ignore
}
}
}
return resources.toArray(new Resource[resources.size()]);
}
3、创建SqlSessionFactory。
具体逻辑:
1、从Environment环境变量中获取相关配置:
typeAliasesPackage:类型别名包
mapperLocations:Mapper接口位置
configLocation:MyBatis全局配置文件
2、对typeAliasesPackage进行扫描解析,转换为包名数组,用于自动设置别名。
3、添加SpringBootVFS,整合MyBatis的VFS扩展接口。
4、创建SqlSessionFactoryBean实例。
5、设置数据源DataSource。
6、设置typeAliasesPackage。
7、解析mapperLocations为Resource数组。
8、设置MyBatis全局配置文件location。
9、调用getObject()方法获取SqlSessionFactory实例。
public SqlSessionFactory createSqlSessionFactory(Environment env, DataSource dataSource) throws Exception
{
String typeAliasesPackage = env.getProperty("mybatis.typeAliasesPackage");
String mapperLocations = env.getProperty("mybatis.mapperLocations");
String configLocation = env.getProperty("mybatis.configLocation");
typeAliasesPackage = setTypeAliasesPackage(typeAliasesPackage);
VFS.addImplClass(SpringBootVFS.class);
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(dataSource);
sessionFactory.setTypeAliasesPackage(typeAliasesPackage);
sessionFactory.setMapperLocations(resolveMapperLocations(StringUtils.split(mapperLocations, ",")));
sessionFactory.setConfigLocation(new DefaultResourceLoader().getResource(configLocation));
return sessionFactory.getObject();
}
4、通过@Qualifier注入DruidConfig配置中的AtomikosDataSourceBean的主从数据源实例,从而创建主从数据源对应的SqlSessionFactory实例。
@Bean(name = "sqlSessionFactoryMaster")
public SqlSessionFactory sqlSessionFactoryMaster(Environment env, @Qualifier("masterDataSource") DataSource dataSource) throws Exception
{
return createSqlSessionFactory(env, dataSource);
}
@Bean(name = "sqlSessionFactorySlave")
public SqlSessionFactory sqlSessionFactorySlave(Environment env, @Qualifier("slaveDataSource") DataSource dataSource) throws Exception
{
return createSqlSessionFactory(env, dataSource);
}
5、创建DynamicSqlSessionTemplate的Bean,它可以支持动态切换数据源。
具体逻辑:
1、接收主从两个SqlSessionFactory。
2、创建一个HashMap,key是数据源名称,value是SqlSessionFactory。
3、新建DynamicSqlSessionTemplate实例,传入主库的SqlSessionFactory。
4、通过setTargetSqlSessionFactorys设置多个SqlSessionFactory。
这样就创建了一个动态的SqlSessionTemplate,它包含一个主SqlSessionFactory和一个从SqlSessionFactory。
@Bean(name = "sqlSessionTemplate")
public DynamicSqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactoryMaster") SqlSessionFactory factoryMaster,
@Qualifier("sqlSessionFactorySlave") SqlSessionFactory factorySlave) throws Exception
{
Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
sqlSessionFactoryMap.put(DruidConfig.MASTER, factoryMaster);
sqlSessionFactoryMap.put(DruidConfig.SLAVE, factorySlave);
DynamicSqlSessionTemplate customSqlSessionTemplate = new DynamicSqlSessionTemplate(factoryMaster);
customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);
return customSqlSessionTemplate;
}
完整代码:
package com.example.multiple.config;
import com.example.multiple.config.datasource.DynamicSqlSessionTemplate;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.io.VFS;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.boot.autoconfigure.SpringBootVFS;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.core.type.classreading.CachingMetadataReaderFactory;
import org.springframework.core.type.classreading.MetadataReader;
import org.springframework.core.type.classreading.MetadataReaderFactory;
import org.springframework.util.ClassUtils;
import javax.sql.DataSource;
import java.io.IOException;
import java.util.*;
/**
* Mybatis支持*匹配扫描包
*
* @author ruoyi
*/
@Configuration
public class MyBatisConfig
{
static final String DEFAULT_RESOURCE_PATTERN = "**/*.class";
public static String setTypeAliasesPackage(String typeAliasesPackage)
{
ResourcePatternResolver resolver = (ResourcePatternResolver) new PathMatchingResourcePatternResolver();
MetadataReaderFactory metadataReaderFactory = new CachingMetadataReaderFactory(resolver);
List<String> allResult = new ArrayList<String>();
try
{
for (String aliasesPackage : typeAliasesPackage.split(","))
{
List<String> result = new ArrayList<String>();
aliasesPackage = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX
+ ClassUtils.convertClassNameToResourcePath(aliasesPackage.trim()) + "/" + DEFAULT_RESOURCE_PATTERN;
Resource[] resources = resolver.getResources(aliasesPackage);
if (resources != null && resources.length > 0)
{
MetadataReader metadataReader = null;
for (Resource resource : resources)
{
if (resource.isReadable())
{
metadataReader = metadataReaderFactory.getMetadataReader(resource);
try
{
result.add(Class.forName(metadataReader.getClassMetadata().getClassName()).getPackage().getName());
}
catch (ClassNotFoundException e)
{
e.printStackTrace();
}
}
}
}
if (result.size() > 0)
{
HashSet<String> hashResult = new HashSet<String>(result);
allResult.addAll(hashResult);
}
}
if (allResult.size() > 0)
{
typeAliasesPackage = String.join(",", (String[]) allResult.toArray(new String[0]));
}
else
{
throw new RuntimeException("mybatis typeAliasesPackage 路径扫描错误,参数typeAliasesPackage:" + typeAliasesPackage + "未找到任何包");
}
}
catch (IOException e)
{
e.printStackTrace();
}
return typeAliasesPackage;
}
public Resource[] resolveMapperLocations(String[] mapperLocations)
{
ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
List<Resource> resources = new ArrayList<Resource>();
if (mapperLocations != null)
{
for (String mapperLocation : mapperLocations)
{
try
{
Resource[] mappers = resourceResolver.getResources(mapperLocation);
resources.addAll(Arrays.asList(mappers));
}
catch (IOException e)
{
// ignore
}
}
}
return resources.toArray(new Resource[resources.size()]);
}
public SqlSessionFactory createSqlSessionFactory(Environment env, DataSource dataSource) throws Exception
{
String typeAliasesPackage = env.getProperty("mybatis.typeAliasesPackage");
String mapperLocations = env.getProperty("mybatis.mapperLocations");
String configLocation = env.getProperty("mybatis.configLocation");
typeAliasesPackage = setTypeAliasesPackage(typeAliasesPackage);
VFS.addImplClass(SpringBootVFS.class);
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(dataSource);
sessionFactory.setTypeAliasesPackage(typeAliasesPackage);
sessionFactory.setMapperLocations(resolveMapperLocations(StringUtils.split(mapperLocations, ",")));
sessionFactory.setConfigLocation(new DefaultResourceLoader().getResource(configLocation));
return sessionFactory.getObject();
}
@Bean(name = "sqlSessionFactoryMaster")
public SqlSessionFactory sqlSessionFactoryMaster(Environment env, @Qualifier("masterDataSource") DataSource dataSource) throws Exception
{
return createSqlSessionFactory(env, dataSource);
}
@Bean(name = "sqlSessionFactorySlave")
public SqlSessionFactory sqlSessionFactorySlave(Environment env, @Qualifier("slaveDataSource") DataSource dataSource) throws Exception
{
return createSqlSessionFactory(env, dataSource);
}
@Bean(name = "sqlSessionTemplate")
public DynamicSqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactoryMaster") SqlSessionFactory factoryMaster,
@Qualifier("sqlSessionFactorySlave") SqlSessionFactory factorySlave) throws Exception
{
Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
sqlSessionFactoryMap.put(DruidConfig.MASTER, factoryMaster);
sqlSessionFactoryMap.put(DruidConfig.SLAVE, factorySlave);
DynamicSqlSessionTemplate customSqlSessionTemplate = new DynamicSqlSessionTemplate(factoryMaster);
customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);
return customSqlSessionTemplate;
}
}
八、运行测试
我简单写了个服务类,在插入的最后手动抛出一个zero的异常。
@Transactional(rollbackFor = Exception.class)
public void handle(){
Log log = new Log();
log.setContent("主数据源");
masterMapper.insert(log);
Logger logger = new Logger();
logger.setContent("从数据库源");
slaveMapper.insert(logger);
int a = 1/0;
}
运行结果如下:
数据库情况:
都进行了回滚,没有问题。
九、Gitee源码
源码地址:SpringBoot整合Atomikos实现多数据源分布式事务
十、总结
以上就是我目前对于SpringBoot整合Atomikos实现多数据源分布式事务的操作过程和个人理解,如有问题,欢迎评论区讨论!