一、前言
1.1 业务需求
之前我们在讲解注册和登录的时候,有一个重要的技术点忽略了过去。那就是多数据源的事务问题。
按照我们的业务需求,monitor服务可能涉及同时对监控中心数据库和企业中心数据库进行操作,而我们希望这样的操作在一个事务下。并且,企业中心有多个数据库,我们需要一个自动切库的机制。
1.2 多数据源事务技术选型
多个数据库的切库可以使用AbstractRoutingDataSource作为数据源,但要支持事务却没那么简单。多数据源事务,我们最先想到的就是分布式事务,阿里的seata框架可以很好的解决分布式事务问题。所谓分布式事务,就是指一个事务可能分布在不同的服务器上,但需要各服务器同时完成,然后再提交数据库。如果有哪个服务器失败,则一起回滚。
然而,我们这里的需求,仅仅是单一服务器操作多个数据源。如果因此引入seata,还多了一个事务中心的服务器,无疑增加了运维的成本。所以我们打算使用atomikos实现多数据源的事务。
1.3 atomikos介绍
Atomikos 是一个轻量级的 Java 分布式事务管理器。符合XA 和 JTA(Java Transaction API) 规范。有的帖子说atomikos的性能欠佳,因为会上行锁。但我们扪心自问,我们的业务真的有对同一行数据并发的情况么?
当然,我们后面的大章,可能也会介绍seata的方案。不过当前学习阶段,还是先采用Atomikos吧
二、所遇到的挑战
当我们草草的接入atomikos后,会发现AbstractRoutingDataSource数据源下,在一个Transactional注解内,使用AbstractRoutingDataSource作为动态数据源是无法实现切库的。想要探究其原因,需要翻阅源码。
首先,我们按照网上通行的方式,配置一个AbstractRoutingDataSource。使用单步调试,观察其不可切库的原因。
@Slf4j
@Configuration
public class DataSourceConfig {
@Bean
public DataSource busyDataSource(){
MultiDataSource multiDataSource = new MultiDataSource();
multiDataSource.init();
return multiDataSource;
}
public class MultiDataSource extends AbstractRoutingDataSource{
public static ThreadLocal<String> curKey = new ThreadLocal<>();
public void init(){
HikariDataSource dataSourceMonitor = DataSourceBuilder.create().type(HikariDataSource.class)
.driverClassName("com.mysql.cj.jdbc.Driver")
.url("jdbc:mysql://192.168.0.64:3306/study2024-class007-monitor?useUnicode=true&characterEncoding=utf-8")
.username("dbMgr")
.password("???@7")
.build();
HikariDataSource dataSource1 = DataSourceBuilder.create().type(HikariDataSource.class)
.driverClassName("com.mysql.cj.jdbc.Driver")
.url("jdbc:mysql://192.168.0.64:3306/study2024-class007-busy001?useUnicode=true&characterEncoding=utf-8")
.username("dbMgr")
.password("???@7")
.build();
HikariDataSource dataSource2 = DataSourceBuilder.create().type(HikariDataSource.class)
.driverClassName("com.mysql.cj.jdbc.Driver")
.url("jdbc:mysql://192.168.0.64:3306/study2024-class007-busy002?useUnicode=true&characterEncoding=utf-8")
.username("dbMgr")
.password("???@7")
.build();
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("monitor", dataSourceMonitor);
targetDataSources.put("busy001", dataSource1);
targetDataSources.put("busy002", dataSource2);
setTargetDataSources(targetDataSources);
setDefaultTargetDataSource(dataSourceMonitor);
curKey.set("monitor");
}
@Override
protected Object determineCurrentLookupKey() {
return curKey.get();
}
public static void changeDb(String pKey){
curKey.set(pKey);
}
}
}
我们建一个test_table的表,写一段简单的测试逻辑
@RequiredArgsConstructor
@Service
public class TestCurdServiceImpl implements ITestCurdService {
private final IGenMonitorTestDbService mGenMonitorTestDbService;
private final IGenBusyTestDbService mGenBusyTestDbService;
@Transactional(rollbackFor = Exception.class)
@Override
public void mpSave(TestEntityDto pTestDto) {
GenMonitorTestEntity genMonitorTestEntity = DbDtoEntityUtil.createFromDto(pTestDto,GenMonitorTestEntity.class);
GenBusyTestEntity genBusyTestEntity = DbDtoEntityUtil.createFromDto(pTestDto,GenBusyTestEntity.class);
genBusyTestEntity.setEnpId(0L);
genBusyTestEntity.setEnpCode("sys");
genMonitorTestEntity.setEnpId(0L);
genMonitorTestEntity.setEnpCode("sys");
DataSourceConfig.MultiDataSource.changeDb("monitor");
mGenMonitorTestDbService.save(genMonitorTestEntity);
DataSourceConfig.MultiDataSource.changeDb("busy001");
mGenBusyTestDbService.save(genBusyTestEntity);
DataSourceConfig.MultiDataSource.changeDb("busy002");
mGenBusyTestDbService.save(genBusyTestEntity);
}
}
SpringBoot中,通过@EnableTransactionManagement,通过@Import(TransactionManagementConfigurationSelector.class)等一系列操作,最终在ProxyTransactionManagementConfiguration文件配置了一个TransactionInterceptor的bean,其基类TransactionAspectSupport就是Spring对事务AOP实现的核心代码了,拦截我们的Transaction注解下方法的函数是invokeWithinTransaction。再进一步阅读跟踪,发现在DataSourceTransactionManager的doBegin函数中,需要获取datasource的connection,并关闭自动提交。获取的connection,会放到ConnectionHolder里。
而每次执行mybatis的命令,实质上执行的是mybatis包下SimpleExecutor的prepareStatement,每次查找前,都会调用transaction.getConnection()。而这个类被实例化时,他的transaction用的是SpringManagedTransaction。其getConnection代码如下:
public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
我们可以看到,如果获取过链接了,就不会再获取了。事务doBegin时获取了一次,所以事务注解内的sql执行不会再获取。
而我们发现,SimpleExecutor的实例化是使用Mybatis的Configuration类中的信息,决定使用哪个transactionManager。
那么,我们自己写一个TransactionManager,处理多数据源下的问题,不就解决了么?
另一个问题,我们传统使用bean注解方式创建bean,并不能实现根据配置动态批量的创建bean。然而,我们希望这套代码可以放到公司项目仓库中,被业务代码引用。这时我们就需要使用Spring的容器装配。
三、代码实现
3.1 pom引用
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-spring-boot3-starter</artifactId>
<version>${atomikos.version}</version>
</dependency>
<atomikos.version>6.0.0</atomikos.version>
3.2 MultiDataSourceTransaction
前边提到,之所以无法切库,是Transaction配置的问题。我们为AbstractRoutingDataSource专门写一个Transaction管理器
@Slf4j
public class MultiDataSourceTransaction implements Transaction {
private ZfRoutingDataSource mZfRoutingDataSource;
private Map<String,MultiDataSourceConnectionInfo> connectionMap;
public MultiDataSourceTransaction(DataSource pDataSource) {
if(pDataSource instanceof ZfRoutingDataSource zfRoutingDataSource){
mZfRoutingDataSource = zfRoutingDataSource;
connectionMap = new ConcurrentHashMap<String,MultiDataSourceConnectionInfo>();
}else{
throw new ServiceException("传入的DataSource 必须是ZfRoutingDataSource");
}
}
@Override
public Connection getConnection() throws SQLException {
String curKey = mZfRoutingDataSource.curKey();
MultiDataSourceConnectionInfo multiDataSourceConnectionInfo = connectionMap.get(curKey);
if(null == multiDataSourceConnectionInfo) {
multiDataSourceConnectionInfo = new MultiDataSourceConnectionInfo();
DataSource targetDataSource = mZfRoutingDataSource.getTargetDataSource();
Connection connection = DataSourceUtils.getConnection(targetDataSource);
multiDataSourceConnectionInfo.setDataSource(targetDataSource);
multiDataSourceConnectionInfo.setConnection(connection);
multiDataSourceConnectionInfo.setAutoCommit(connection.getAutoCommit());
multiDataSourceConnectionInfo.setConnectionTransactional(DataSourceUtils.isConnectionTransactional(connection, targetDataSource));
connectionMap.put(curKey, multiDataSourceConnectionInfo);
}
return multiDataSourceConnectionInfo.getConnection();
}
@Override
public void commit() throws SQLException {
for(MultiDataSourceConnectionInfo multiDataSourceConnectionInfo : connectionMap.values()) {
if(!multiDataSourceConnectionInfo.isConnectionTransactional() && !multiDataSourceConnectionInfo.isAutoCommit()){
Connection targetConnection = multiDataSourceConnectionInfo.getConnection();
targetConnection.commit();
}
}
}
@Override
public void rollback() throws SQLException {
for(MultiDataSourceConnectionInfo multiDataSourceConnectionInfo : connectionMap.values()) {
if(!multiDataSourceConnectionInfo.isConnectionTransactional() && !multiDataSourceConnectionInfo.isAutoCommit()){
Connection targetConnection = multiDataSourceConnectionInfo.getConnection();
targetConnection.rollback();
}
}
}
@Override
public void close() throws SQLException {
for(MultiDataSourceConnectionInfo multiDataSourceConnectionInfo : connectionMap.values()) {
DataSourceUtils.releaseConnection(multiDataSourceConnectionInfo.getConnection(), multiDataSourceConnectionInfo.getDataSource());
}
connectionMap.clear();
}
@Override
public Integer getTimeout() throws SQLException {
var holder = (ConnectionHolder) TransactionSynchronizationManager.getResource(mZfRoutingDataSource);
if (holder != null && holder.hasTimeout()) {
return holder.getTimeToLiveInSeconds();
}
return null;
}
}
public class MultiDataSourceTransactionFactory extends SpringManagedTransactionFactory {
@Override
public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
return new MultiDataSourceTransaction(dataSource);
}
}
3.3 mapper的代码结构
首先,我们明确一下需求。我们的数据库分2种,一种是监控中心的库,只有一个数据库实例。而企业中心,却有多个实例。
我们在开发中,可以把监控中心的库的mapper和企业中心的不放在一个包中,分别配置:
这样,我们可以把监控中心配置为单数据源,企业中心配置为多数据源。
@MapperScans(value = {
@MapperScan(value = {"indi.zhifa.study2024.nbr.monitor.gen.monitor.**.mapper"}, sqlSessionFactoryRef = "sqlSessionFactory_monitor"),
@MapperScan(value = {"indi.zhifa.study2024.nbr.monitor.gen.busy.**.mapper"}, sqlSessionFactoryRef = "sqlSessionFactory_busy"),
}
)
3.4 RoutingDataSource
我们针对多数据源,考虑有些项目可能有多组不同的多数据源。比如常见的订单中心,优惠券中心,商品定义中心。写个自己的memo。
public class ZfRoutingDataSource extends AbstractRoutingDataSource {
ThreadLocal<String> curDbKey = new ThreadLocal<String>();
@Override
protected Object determineCurrentLookupKey() {
return curDbKey.get();
}
public String curKey(){
return curDbKey.get();
}
public void set(String pKey) {
curDbKey.set(pKey);
}
public void clear() {
curDbKey.remove();
}
public DataSource getTargetDataSource() {
return determineTargetDataSource();
}
}
public class RoutingDataSourceMemo {
private Map<String,ZfRoutingDataSource> mRoutingDataSource;
public RoutingDataSourceMemo() {
mRoutingDataSource = new ConcurrentHashMap<String,ZfRoutingDataSource>();
}
public ZfRoutingDataSource createRoutingDataSource(String pKey, Map<Object, Object> pDataSourceMap, DataSource pPrimaryDataSource) {
ZfRoutingDataSource zfRoutingDataSource = new ZfRoutingDataSource();
zfRoutingDataSource.setTargetDataSources(pDataSourceMap);
zfRoutingDataSource.setDefaultTargetDataSource(pPrimaryDataSource);
zfRoutingDataSource.initialize();
mRoutingDataSource.put(pKey, zfRoutingDataSource);
return zfRoutingDataSource;
}
public ZfRoutingDataSource get(String pKey) {
return Optional.<ZfRoutingDataSource>ofNullable(mRoutingDataSource.get(pKey)).
orElseThrow(()->new ServiceException("没有找到key为"+pKey+"的数据源"));
}
}
@Configuration
public class RoutingDataSourceMemoConfigure {
@Bean
RoutingDataSourceMemo routingDataSourceMemo(){
return new RoutingDataSourceMemo();
}
}
3.5 bean装配
3.5.1 常见接口的介绍
ImportBeanDefinitionRegistrar
一般该接口配合@Import使用。
该接口一般用于导入bean,其有2个接口。
void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry,
BeanNameGenerator importBeanNameGenerator);
void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry);
该接口可以读取类上的注解,并依据注解的配置信息,批量注册创建bean的PostProcessor的定义。
该接口允许类继承以下4个接口,用于获取Spring容器的一些关键对象
- EnvironmentAware
- BeanFactoryAware
- BeanClassLoaderAware
- ResourceLoaderAware
BeanDefinitionRegistryPostProcessor
该接口一般用于批量注册实际的bean,其下也有2个接口
void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException;
@Override
default void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException;
第一个接口用于向容器注册bean的定义,第二个接口可以直接向容器注册bean的实例
FactoryBean
.该接口用于延迟化实例化bean。由于许多bean的实例化需要依赖其他bean的创建,那么干脆在该bean被第一次使用到时进行加载。实际上,SpringBoot对Bean的生命周期管理,是基于FactoryBean而非具体的Bean。
该接口的核心接口为:
T getObject() throws Exception;
3.5.2 MultiDataSourceConfigurerRegister
现在,我们开始编写本章需求的代码。这个Register用于向容器注册一个批量注册数据源和SessionFactory的PostProcessor
@Slf4j
public class MultiDataSourceConfigurerRegister implements ImportBeanDefinitionRegistrar,
EnvironmentAware, ResourceLoaderAware, BeanFactoryAware {
private Environment mEnvironment;
private ResourceLoader mResourceLoader;
private BeanFactory mBeanFactory;
public MultiDataSourceConfigurerRegister() {
}
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,
BeanDefinitionRegistry registry,
BeanNameGenerator importBeanNameGenerator) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MultiDataSourceConfigurer.class);
builder.addConstructorArgValue(mBeanFactory);
builder.addConstructorArgValue(mResourceLoader);
builder.addConstructorArgValue(mEnvironment);
builder.addConstructorArgReference("routingDataSourceMemo");
registry.registerBeanDefinition(MultiDataSourceConfigurer.class.getName(),builder.getBeanDefinition());
}
@Override
public void setEnvironment(Environment pEnvironment) {
mEnvironment = pEnvironment;
}
@Override
public void setResourceLoader(ResourceLoader pResourceLoader) {
mResourceLoader = pResourceLoader;
}
@Override
public void setBeanFactory(BeanFactory pBeanFactory) throws BeansException {
mBeanFactory = pBeanFactory;
}
}
3.5.3 MultiDataSourceConfigurerRegister
该类是注册Datasource和SessionFactory的FactoryBean的类
@Slf4j
public class MultiDataSourceConfigurer implements BeanDefinitionRegistryPostProcessor, InitializingBean, ApplicationContextAware {
private BeanFactory mBeanFactory;
private ResourceLoader mResourceLoader;
private Environment mEnvironment;
private ApplicationContext mApplicationContext;
static final String DATASOURCE_PREFIX = "datasource_";
static final String SQL_SESSION_FACTORY_PREFIX = "sqlSessionFactory_";
static final String SQL_SESSION_TEMPLATE_PREFIX = "sqlSessionTemplate_";
RoutingDataSourceMemo mRoutingDataSourceMemo;
public MultiDataSourceConfigurer(BeanFactory pBeanFactory,
ResourceLoader pResourceLoader,
Environment pEnvironment,
RoutingDataSourceMemo pRoutingDataSourceMemo) {
mBeanFactory = pBeanFactory;
mResourceLoader = pResourceLoader;
mEnvironment = pEnvironment;
mRoutingDataSourceMemo = pRoutingDataSourceMemo;
}
@Override
public void afterPropertiesSet() throws Exception {
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
String profile = mEnvironment.getProperty("spring.profiles.active");
String url = mEnvironment.getProperty("spring.datasource.url");
String userName = mEnvironment.getProperty("spring.datasource.username");
String password = mEnvironment.getProperty("spring.datasource.password");
SingleConnectionDataSource singleConnectionDataSource = new SingleConnectionDataSource(
url, userName,password,false);
JdbcTemplate jdbcTemplate = new JdbcTemplate(singleConnectionDataSource);
List<Map<String,Object>> res = jdbcTemplate.queryForList("select * from sys_db where profile = ?",profile);
List<SysDbEntity> sysDbEntityList = res.stream().map(m-> BeanUtil.toBean(m,SysDbEntity.class)).collect(Collectors.toList());
Map<String,List<SysDbEntity>> sysDbEntityListMap = sysDbEntityList.stream().collect(Collectors.groupingBy(SysDbEntity::getDatasourceGroup));
for(Map.Entry<String,List<SysDbEntity>> entry : sysDbEntityListMap.entrySet()){
String datasourceGroup = entry.getKey();
List<SysDbEntity> list = entry.getValue();
SysDbEntity first = list.get(0);
switch (first.getDatasourceType()){
case 0 ->{
AbstractBeanDefinition singleDataSourceBeanDef = genSingleDataSourceFactorBean(first);
registry.registerBeanDefinition(DATASOURCE_PREFIX+datasourceGroup,singleDataSourceBeanDef);
AbstractBeanDefinition singleSqlSessionFactorBean = genSqlSessionFactorBean(EDataSourceType.SINGLE,datasourceGroup);
registry.registerBeanDefinition(SQL_SESSION_FACTORY_PREFIX+datasourceGroup,singleSqlSessionFactorBean);
}
case 1 ->{
AbstractBeanDefinition multiDataSourceBeanDef = genMultiDataSourceFactorBean(datasourceGroup,list,mRoutingDataSourceMemo);
registry.registerBeanDefinition(DATASOURCE_PREFIX+datasourceGroup,multiDataSourceBeanDef);
AbstractBeanDefinition multiSqlSessionFactorBean = genSqlSessionFactorBean(EDataSourceType.MULTI,datasourceGroup);
registry.registerBeanDefinition(SQL_SESSION_FACTORY_PREFIX+datasourceGroup,multiSqlSessionFactorBean);
}
}
}
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
mApplicationContext = applicationContext;
}
public AbstractBeanDefinition genSingleDataSourceFactorBean(SysDbEntity pSysDbEntity){
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SingleDataSourceFactorBean.class);
builder.addConstructorArgValue(pSysDbEntity);
return builder.getBeanDefinition();
}
public AbstractBeanDefinition genMultiDataSourceFactorBean(String pKey,List<SysDbEntity> pSysDbEntityList,RoutingDataSourceMemo pRoutingDataSourceMemo){
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MultiDataSourceFactorBean.class);
builder.addConstructorArgValue(pKey);
builder.addConstructorArgValue(pSysDbEntityList);
builder.addConstructorArgValue(pRoutingDataSourceMemo);
return builder.getBeanDefinition();
}
public AbstractBeanDefinition genSqlSessionFactorBean(EDataSourceType pDataSourceType, String pDataSourceName){
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SqlSessionFactorBean.class);
builder.addConstructorArgValue(pDataSourceType);
builder.addConstructorArgReference(DATASOURCE_PREFIX + pDataSourceName);
ObjectProvider<MybatisPlusProperties> mybatisPlusPropertiesProvider = mBeanFactory.getBeanProvider(MybatisPlusProperties.class);
builder.addConstructorArgValue(mybatisPlusPropertiesProvider);
builder.addConstructorArgValue(mResourceLoader);
builder.addConstructorArgValue(mApplicationContext);
ObjectProvider<Interceptor> interceptorsProvider = mBeanFactory.getBeanProvider(Interceptor.class);
builder.addConstructorArgValue(interceptorsProvider);
ObjectProvider<TypeHandler> typeHandlerProvider = mBeanFactory.getBeanProvider(TypeHandler.class);
builder.addConstructorArgValue(typeHandlerProvider);
ObjectProvider<LanguageDriver> languageDriverProvider = mBeanFactory.getBeanProvider(LanguageDriver.class);
builder.addConstructorArgValue(languageDriverProvider);
ObjectProvider<DatabaseIdProvider> databaseIdProviderProvider = mBeanFactory.getBeanProvider(DatabaseIdProvider.class);
builder.addConstructorArgValue(databaseIdProviderProvider);
ResolvableType configurationCustomizerTargetType = ResolvableType.forClassWithGenerics(List.class, ConfigurationCustomizer.class);
ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizerListProvider = mBeanFactory.getBeanProvider(configurationCustomizerTargetType);
builder.addConstructorArgValue(configurationCustomizerListProvider);
ResolvableType sqlSessionFactoryBeanCustomizerTargetType = ResolvableType.forClassWithGenerics(List.class, SqlSessionFactoryBeanCustomizer.class);
ObjectProvider<List<SqlSessionFactoryBeanCustomizer>> sqlSessionFactoryBeanCustomizerListProvider = mBeanFactory.getBeanProvider(sqlSessionFactoryBeanCustomizerTargetType);
builder.addConstructorArgValue(sqlSessionFactoryBeanCustomizerListProvider);
ResolvableType mybatisPlusPropertiesCustomizerTargetType = ResolvableType.forClassWithGenerics(List.class, MybatisPlusPropertiesCustomizer.class);
ObjectProvider<List<MybatisPlusPropertiesCustomizer>> mybatisPlusPropertiesCustomizerProvider = mBeanFactory.getBeanProvider(mybatisPlusPropertiesCustomizerTargetType);
builder.addConstructorArgValue(mybatisPlusPropertiesCustomizerProvider);
// 显式添加配置类的依赖
//builder.addDependsOn("mybatisPlusInterceptor");
return builder.getBeanDefinition();
}
}
3.5.4 Dadasouce的FactorBean
public class SingleDataSourceFactorBean implements FactoryBean<DataSource> {
private final SysDbEntity mSysDbEntity;
public SingleDataSourceFactorBean(SysDbEntity pSysDbEntity) {
mSysDbEntity = pSysDbEntity;
}
@Override
public DataSource getObject() throws Exception {
MysqlXADataSource dataSource = DataSourceBuilder.create().type(MysqlXADataSource.class)
.url(mSysDbEntity.getDbUrl())
.username(mSysDbEntity.getDbUser())
.password(mSysDbEntity.getDbPasswd())
.build();
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setUniqueResourceName(mSysDbEntity.getDbKey());
atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.XADataSource");
atomikosDataSourceBean.setXaDataSource(dataSource);
Optional.ofNullable(mSysDbEntity.getMinIdle()).ifPresent(atomikosDataSourceBean::setMinPoolSize);
Optional.ofNullable(mSysDbEntity.getMaxPoolSize()).ifPresent(atomikosDataSourceBean::setMaxPoolSize);
return atomikosDataSourceBean;
}
@Override
public Class<?> getObjectType() {
return DataSource.class;
}
}
public class MultiDataSourceFactorBean implements FactoryBean<DataSource> {
private final String mKey;
private final List<SysDbEntity> mSysDbEntityList;
private final RoutingDataSourceMemo mRoutingDataSourceMemo;
public MultiDataSourceFactorBean(String pKey, List<SysDbEntity> pSysDbEntityList, RoutingDataSourceMemo pRoutingDataSourceMemo) {
mKey = pKey;
mSysDbEntityList = pSysDbEntityList;
mRoutingDataSourceMemo = pRoutingDataSourceMemo;
}
@Override
public DataSource getObject() throws Exception {
Map<Object, Object> dataSourceMap = new HashMap<>();
DataSource primaryDataSource = null;
for (SysDbEntity sysDbEntity : mSysDbEntityList) {
MysqlXADataSource dataSource = DataSourceBuilder.create().type(MysqlXADataSource.class)
.url(sysDbEntity.getDbUrl())
.username(sysDbEntity.getDbUser())
.password(sysDbEntity.getDbPasswd())
.build();
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setUniqueResourceName(sysDbEntity.getDbKey());
atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.XADataSource");
atomikosDataSourceBean.setXaDataSource(dataSource);
Optional.ofNullable(sysDbEntity.getMinIdle()).ifPresent(atomikosDataSourceBean::setMinPoolSize);
Optional.ofNullable(sysDbEntity.getMaxPoolSize()).ifPresent(atomikosDataSourceBean::setMaxPoolSize);
dataSourceMap.put(sysDbEntity.getDbKey(), atomikosDataSourceBean);
if(sysDbEntity.getPrimary()){
primaryDataSource = atomikosDataSourceBean;
}
}
ZfRoutingDataSource dataSourceRouter = mRoutingDataSourceMemo.createRoutingDataSource(mKey,dataSourceMap,primaryDataSource);
return dataSourceRouter;
}
@Override
public Class<?> getObjectType() {
return DataSource.class;
}
}
3.5.5 Dadasouce的FactorBean
该类需要重点说一下,mybatisplus对mybatis的拓展功能,是依赖SqlSessionFactory实现的。所以单纯的new一个MybatisSqlSessionFactoryBean ,而后getObject,将会使所有mybatis-plus的功能无法使用。我下面的这段代码,是copy并修改mybatis-plus的MybatisPlusAutoConfiguration。
public class SqlSessionFactorBean implements FactoryBean<SqlSessionFactory> {
private final DataSource mDataSource;
private final ObjectProvider<MybatisPlusProperties> mPropertiesProvider;
private final ResourceLoader mResourceLoader;
private final ApplicationContext mApplicationContext;
private final EDataSourceType mDataSourceType;
/* mp用到的一些变量*/
private final ObjectProvider<Interceptor> mInterceptorsProvider;
private final ObjectProvider<TypeHandler> mTypeHandlersProvider;
private final ObjectProvider<LanguageDriver> mLanguageDriversProvider;
private final ObjectProvider<DatabaseIdProvider> mDatabaseIdProviderProvider;
private final ObjectProvider<List<ConfigurationCustomizer>> mConfigurationCustomizersProvider;
private final ObjectProvider<List<SqlSessionFactoryBeanCustomizer>> mSqlSessionFactoryBeanCustomizersProvider;
private final ObjectProvider<List<MybatisPlusPropertiesCustomizer>> mMybatisPlusPropertiesCustomizersProvider;
private Interceptor[] mInterceptors;
private TypeHandler[] mTypeHandlers;
private LanguageDriver[] mLanguageDrivers;
private DatabaseIdProvider mDatabaseIdProvider;
private List<ConfigurationCustomizer> mConfigurationCustomizers;
private List<SqlSessionFactoryBeanCustomizer> mSqlSessionFactoryBeanCustomizers;
private List<MybatisPlusPropertiesCustomizer> mMybatisPlusPropertiesCustomizers;
private MybatisPlusProperties mMybatisPlusProperties;
public SqlSessionFactorBean(EDataSourceType pDataSourceType,
DataSource pDataSource,
ObjectProvider<MybatisPlusProperties> propertiesProvider,
ResourceLoader resourceLoader,
ApplicationContext applicationContext,
ObjectProvider<Interceptor> interceptorsProvider,
ObjectProvider<TypeHandler> typeHandlersProvider,
ObjectProvider<LanguageDriver> languageDriversProvider,
ObjectProvider<DatabaseIdProvider> databaseIdProvider,
ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizersProvider,
ObjectProvider<List<SqlSessionFactoryBeanCustomizer>> sqlSessionFactoryBeanCustomizers,
ObjectProvider<List<MybatisPlusPropertiesCustomizer>> mybatisPlusPropertiesCustomizerProvider) {
mDataSourceType = pDataSourceType;
mDataSource = pDataSource;
this.mPropertiesProvider = propertiesProvider;
this.mInterceptorsProvider = interceptorsProvider;
this.mTypeHandlersProvider = typeHandlersProvider;
this.mLanguageDriversProvider = languageDriversProvider;
this.mResourceLoader = resourceLoader;
this.mDatabaseIdProviderProvider = databaseIdProvider;
this.mConfigurationCustomizersProvider = configurationCustomizersProvider;
this.mSqlSessionFactoryBeanCustomizersProvider = sqlSessionFactoryBeanCustomizers;
this.mMybatisPlusPropertiesCustomizersProvider = mybatisPlusPropertiesCustomizerProvider;
this.mApplicationContext = applicationContext;
}
@Override
public SqlSessionFactory getObject() throws Exception {
getMpBeans();
switch (mDataSourceType){
case MULTI -> {
MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
MultiDataSourceTransactionFactory transactionFactory = new MultiDataSourceTransactionFactory();
sqlSessionFactory(bean,mDataSource);
bean.setTransactionFactory(transactionFactory);
return bean.getObject();
}
case SINGLE -> {
MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
sqlSessionFactory(bean,mDataSource);
return bean.getObject();
}
}
return null;
}
@Override
public Class<?> getObjectType() {
return null;
}
protected void getMpBeans(){
mInterceptors = mInterceptorsProvider.stream().toArray(Interceptor[]::new);
mTypeHandlers = mTypeHandlersProvider.stream().toArray(TypeHandler[]::new);
mLanguageDrivers = mLanguageDriversProvider.stream().toArray(LanguageDriver[]::new);
mDatabaseIdProvider = mDatabaseIdProviderProvider.getIfAvailable();
mConfigurationCustomizers = mConfigurationCustomizersProvider.getIfAvailable();
mSqlSessionFactoryBeanCustomizers = mSqlSessionFactoryBeanCustomizersProvider.getIfAvailable();
mMybatisPlusPropertiesCustomizers = mMybatisPlusPropertiesCustomizersProvider.getIfAvailable();
mMybatisPlusProperties = mPropertiesProvider.getIfAvailable();
}
public MybatisSqlSessionFactoryBean sqlSessionFactory(MybatisSqlSessionFactoryBean pMybatisSqlSessionFactoryBean ,DataSource dataSource) throws Exception {
MybatisSqlSessionFactoryBean factory = pMybatisSqlSessionFactoryBean;
factory.setDataSource(dataSource);
factory.setVfs(SpringBootVFS.class);
if (StringUtils.hasText(this.mMybatisPlusProperties.getConfigLocation())) {
factory.setConfigLocation(this.mResourceLoader.getResource(this.mMybatisPlusProperties.getConfigLocation()));
}
applyConfiguration(factory);
if (this.mMybatisPlusProperties.getConfigurationProperties() != null) {
factory.setConfigurationProperties(this.mMybatisPlusProperties.getConfigurationProperties());
}
if (!ObjectUtils.isEmpty(this.mInterceptors)) {
factory.setPlugins(this.mInterceptors);
}
if (this.mDatabaseIdProvider != null) {
factory.setDatabaseIdProvider(this.mDatabaseIdProvider);
}
if (StringUtils.hasLength(this.mMybatisPlusProperties.getTypeAliasesPackage())) {
factory.setTypeAliasesPackage(this.mMybatisPlusProperties.getTypeAliasesPackage());
}
if (this.mMybatisPlusProperties.getTypeAliasesSuperType() != null) {
factory.setTypeAliasesSuperType(this.mMybatisPlusProperties.getTypeAliasesSuperType());
}
if (StringUtils.hasLength(this.mMybatisPlusProperties.getTypeHandlersPackage())) {
factory.setTypeHandlersPackage(this.mMybatisPlusProperties.getTypeHandlersPackage());
}
if (!ObjectUtils.isEmpty(this.mTypeHandlers)) {
factory.setTypeHandlers(this.mTypeHandlers);
}
if (!ObjectUtils.isEmpty(this.mMybatisPlusProperties.resolveMapperLocations())) {
factory.setMapperLocations(this.mMybatisPlusProperties.resolveMapperLocations());
}
Class<? extends LanguageDriver> defaultLanguageDriver = this.mMybatisPlusProperties.getDefaultScriptingLanguageDriver();
if (!ObjectUtils.isEmpty(this.mLanguageDrivers)) {
factory.setScriptingLanguageDrivers(this.mLanguageDrivers);
}
Optional.ofNullable(defaultLanguageDriver).ifPresent(factory::setDefaultScriptingLanguageDriver);
applySqlSessionFactoryBeanCustomizers(factory);
GlobalConfig globalConfig = this.mMybatisPlusProperties.getGlobalConfig();
this.getBeanThen(MetaObjectHandler.class, globalConfig::setMetaObjectHandler);
this.getBeanThen(AnnotationHandler.class, globalConfig::setAnnotationHandler);
this.getBeanThen(PostInitTableInfoHandler.class, globalConfig::setPostInitTableInfoHandler);
this.getBeansThen(IKeyGenerator.class, i -> globalConfig.getDbConfig().setKeyGenerators(i));
this.getBeanThen(ISqlInjector.class, globalConfig::setSqlInjector);
this.getBeanThen(IdentifierGenerator.class, globalConfig::setIdentifierGenerator);
factory.setGlobalConfig(globalConfig);
return factory;
}
private <T> void getBeanThen(Class<T> clazz, Consumer<T> consumer) {
if (this.mApplicationContext.getBeanNamesForType(clazz, false, false).length > 0) {
consumer.accept(this.mApplicationContext.getBean(clazz));
}
}
private <T> void getBeansThen(Class<T> clazz, Consumer<List<T>> consumer) {
if (this.mApplicationContext.getBeanNamesForType(clazz, false, false).length > 0) {
final Map<String, T> beansOfType = this.mApplicationContext.getBeansOfType(clazz);
List<T> clazzList = new ArrayList<>();
beansOfType.forEach((k, v) -> clazzList.add(v));
consumer.accept(clazzList);
}
}
private void applyConfiguration(MybatisSqlSessionFactoryBean factory) {
MybatisPlusProperties.CoreConfiguration coreConfiguration = this.mMybatisPlusProperties.getConfiguration();
MybatisConfiguration configuration = null;
if (coreConfiguration != null || !StringUtils.hasText(this.mMybatisPlusProperties.getConfigLocation())) {
configuration = new MybatisConfiguration();
}
if (configuration != null && coreConfiguration != null) {
coreConfiguration.applyTo(configuration);
}
if (configuration != null && !CollectionUtils.isEmpty(this.mConfigurationCustomizers)) {
for (ConfigurationCustomizer customizer : this.mConfigurationCustomizers) {
customizer.customize(configuration);
}
}
factory.setConfiguration(configuration);
}
private void applySqlSessionFactoryBeanCustomizers(MybatisSqlSessionFactoryBean factory) {
if (!CollectionUtils.isEmpty(this.mSqlSessionFactoryBeanCustomizers)) {
for (SqlSessionFactoryBeanCustomizer customizer : this.mSqlSessionFactoryBeanCustomizers) {
customizer.customize(factory);
}
}
}
}
四、代码展示
在主类上,添加如下注解,就可以使用多数据源事务了
@EnableZfMultiDataSource
@MapperScans(value = {
@MapperScan(value = {"indi.zhifa.study2024.nbr.monitor.gen.monitor.**.mapper"}, sqlSessionFactoryRef = "sqlSessionFactory_monitor"),
@MapperScan(value = {"indi.zhifa.study2024.nbr.monitor.gen.busy.**.mapper"}, sqlSessionFactoryRef = "sqlSessionFactory_busy"),
}
)
具体代码请移步我的 码云