前言:对于同一个系统,不同的租户需要自己独立分隔的数据库(每个数据库的表结构可以是相同的),同时也要支持跨数据源的查询;并且支持分布式事务,如果这里不使用分库分表插件,需要怎样实现?本文采用MyBatis-Plus下的dynamic-datasource 进行实现。
MyBatis-Plus的dynamic-datasource 官网;
开始整合:
1 spring-cloud 整合多数据源:
1.1 maven pom jar包,如果启动发生问题则需要排除版本jar 包冲突的问题:
<!-- mybatis 多数据源切换依赖jar -->
<!-- https://mvnrepository.com/artifact/com.baomidou/dynamic-datasource-spring-boot-starter -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<!-- mybatis plus 方便后续业务开发 -->
<!-- https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-extension</artifactId>
<version>3.4.2</version>
</dependency>
<!-- mysql的连接器 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- mysql的分页插件便于业务查询分页处理 -->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper</artifactId>
<version>4.1.6</version>
<exclusions>
<exclusion>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
<version>3.1</version>
</dependency>
<!-- druid的连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.8</version>
</dependency>
<!-- xxl job 定时任务 便于后续定时任务的扩展,如果不需要可以手动去除 -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
<!-- web jar 便于hppt 通信 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- feign 接口 便于服务之间的通信,如果不需要可以手动去除 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!-- lombok 自动生成get set 等方法便于开发 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
1.2 bootstrap.yml 文件配置数据源信息:
server:
port: 8010
spring:
autoconfigure:
# 排除原有的连接池装配
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
# dynamic-datasource-spring-boot-starter 动态数据源的配置内容
datasource:
type: com.alibaba.druid.pool.DruidDataSource
dynamic:
primary: master #设置默认的数据源或者数据源组,默认值即为master
strict: false #严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源
datasource:
master:
url: jdbc:mysql://localhost:3306/master
username: root
password: 123
driver-class-name: com.mysql.jdbc.Driver # 3.2.0开始支持SPI可省略此配置
slave1:
url: jdbc:mysql://localhost:3306/slaveone
username: root
password: 123
driver-class-name: com.mysql.jdbc.Driver
slave2:
url: jdbc:mysql://localhost:3306/slavetwo?useUnicode=true&characterEncoding=UTF-8&useAffectedRows=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
username: root
password: 123
driver-class-name: com.mysql.jdbc.Driver
# druid 公共配置 参数: https://www.jianshu.com/p/f8b720737b20
druid:
# 连接池初始化大小
initialSize: 5
# 最小空闲连接数
minIdle: 5
# 最大连接数
maxActive: 30
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 300000
# 配置一个连接在池中最大生存的时间,单位是毫秒
maxEvictableIdleTimeMillis: 900000
# 配置检测连接是否有效
validationQuery: SELECT 'x'
# 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,
# 如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效
testWhileIdle: true
# 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能
testOnBorrow: false
# 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能
testOnReturn: false
# 是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,
# 比如说oracle。在mysql下建议关闭。
poolPreparedStatements: true
# 要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true
maxPoolPreparedStatementPerConnectionSize: 20
# 属性类型是字符串,通过别名的方式配置扩展插件,监控统计用的filter:stat;
# 日志用的filter:log4j防御sql注入的filter:wall
filters: stat,wall,slf4j,config
# 公用监控数据
useGlobalDataSourceStat: true
# 慢日志输出
stat:
log-slow-sql: true
merge-sql: true
# 10 秒
slow-sql-millis: 10000
# mybatis mapper 包扫描路径
mybatis-plus:
mapper-locations: classpath*:mapper/**/*.xml
# xxl job 任务地址
xxl:
job:
admin:
addresses: http://localhost:8080/xxl-job-admin
accessToken: lgx123456
executor:
appname: dev-bluegrass-dynamic-service
logpath:
logretentiondays: 10
ip: localhost
port: 9087
1.3 启动类增加路径扫描:
package org.lgx.bluegrass.bluegrassdynamic;
import org.lgx.bluegrass.api.constant.BaseConstant;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
// Feign 路径扫描
@EnableFeignClients("xxxx")
// mappper 包路径扫描
@MapperScan(basePackages ={"xxx.bluegrassdynamic.mapper"})
public class BluegrassDynamicApplication {
public static ConfigurableApplicationContext applicationContext;
public static void main(String[] args) {
applicationContext = SpringApplication.run(BluegrassDynamicApplication.class, args);
}
}
2 多数据源主动切换:
考虑方法:虽然使用@Ds 可以在类中进行切换,但是需要在每个类或者方法上增加改注解,如果增加错误或者漏加,就会切错数据库或者默认访问到主库中;那么是否可以有一个切面或者拦截器,可以在http 请求进入到方法之前,收到完成一次切换;
2.1 定义拦截器根据当前登录的用户的便签完成数据源的切换:
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@Slf4j
@Component
public class HttpRequestDynamic extends HandlerInterceptorAdapter {
final static ThreadLocal<Boolean> threadLocal = new ThreadLocal<>();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (!(handler instanceof HandlerMethod)) {
// 不是 httpreuqest 请求直接放行
return true;
}
// 当前用户
String userDb = request.getHeader("userDb");
if (StringUtils.hasText(userDb)){
// 获取用户所属数据源
String dbName = getDbName(userDb);
if (StringUtils.hasText(dbName)){
// 切换数据源
DynamicDataSourceContextHolder.push(dbName);
threadLocal.set(true);
}
}
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 在方法执行完毕或者执行报错后,移除数据源
if (null != threadLocal.get() && threadLocal.get()){
DynamicDataSourceContextHolder.clear();
}
threadLocal.remove();
}
private String getDbName(String userDb) {
// 此处根据用户便签动态映射配置的数据库
return userDb;
}
}
2.2 将拦截器交由spring 管理:
import org.lgx.bluegrass.bluegrassdynamic.interceptor.HttpRequestDynamic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class WebConfiguration implements WebMvcConfigurer {
@Autowired
private HttpRequestDynamic httpRequestDynamic;
/**
* 拦截器配置
*
* @param registry 注册类
*/
@Override
public void addInterceptors(InterceptorRegistry registry) {
// 除了请求报错和请求静态资源外拦截一切http 请求
registry.addInterceptor(httpRequestDynamic).addPathPatterns("/**")
.excludePathPatterns(
"/file/get/*",
"/image/view/*",
"/**/error"
);
}
}
2.3 此处扩展,对xxl job 定时任务进行aop拦截:
import com.xxl.job.core.context.XxlJobHelper;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
@Aspect
@Component
public class XxlJobHandlerMonitor {
@Pointcut("@annotation(com.xxl.job.core.handler.annotation.XxlJob)")
public void xxlJobCut() {
}
@Before(value = "xxlJobCut()")
public void permissionCheck(JoinPoint joinPoint) {
}
@Around(value = "xxlJobCut()")
public Object aroundMethod(ProceedingJoinPoint joinPoint) throws Throwable {
String param = XxlJobHelper.getJobParam();
// 解析参数
// 根据参数所带的目标表数据库,可以进行
System.out.println("\"123\" = " + "123");
return joinPoint.proceed();
}
}
2.4 此处扩展,对feign 进行拦截(可以将user的便签信息设置到header中,同各个服务的拦截器协同作用):
方式1 在feign 的http 请求发送之前,构建header:
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.lgx.bluegrass.api.constant.Constant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
/**
* @Description
* @Date 2021/4/16 15:39
* @Author lgx
* @Version 1.0
*/
@Configuration
public class FeignConfiguration implements RequestInterceptor {
private Logger logger = LoggerFactory.getLogger(FeignConfiguration.class);
@Override
public void apply(RequestTemplate requestTemplate) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
if (attributes != null) {
String authorization = attributes.getRequest().getHeader(Constant.X_AUTHORIZATION);
if (null != authorization) {
requestTemplate.header(Constant.X_AUTHORIZATION, authorization);
}
String dbName = attributes.getRequest().getHeader(Constant.dbName);
if (null != dbName) {
requestTemplate.header(Constant.dbName, dbName);
}
String userDb = attributes.getRequest().getHeader(Constant.userDb);
if (null != userDb) {
requestTemplate.header(Constant.userDb, userDb);
}
}
}
}
Constant.java:
public class Constant{
public static final String BASIC_AUTH_TOKEN_PREFIX = "Basic";
public static final String X_AUTHORIZATION = "X-Authorization";
public static final String dbName = "dbName";
public static final String userDb = "userDb";
}
方式二,feign 方法中增加header 参数:
feign 接口定义:
import org.lgx.bluegrass.api.service.impl.SyncFeignDynamicServiceFallBack;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(name = "dynamic", url = "http://localhost:8010", fallback = SyncFeignDynamicServiceFallBack.class)
public interface SyncFeignDynamicService {
// @RequestHeader("userDb") 将用户所属数据库信息放入header 请求头中
@RequestMapping("/user/getUserById")
Object getUserById(@RequestParam("userId") Integer userId,@RequestHeader("userDb") String userDb) ;
}
controller测试方法,配合拦截器使用在进入该方法之前已经完成数据源切换:
@RequestMapping("/getUserById")
public User getUserById(@RequestParam("userId") Integer userId) {
return userService.getUserById(userId);
}
3 业务中跨数据源使用 :
3.1 增加bean 的获取类,通过改工具获取spring 容器的bean:
import org.lgx.bluegrass.bluegrassdynamic.BluegrassDynamicApplication;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextUtils {
public static <T> T getBean(String beanName){
return (T) BluegrassDynamicApplication.applicationContext.getBean(beanName);
}
public static <T> T getBean(Class classz){
return (T) BluegrassDynamicApplication.applicationContext.getBean(classz);
}
}
BluegrassDynamicApplication.applicationContext 对应启动类,在类启动后已经对其进行赋值:
3.2 service 中业务调用:
首先需要明确自己需要跨哪几个库进行数据的增删改查;然后在创建方法后,有几种选择可以完成数据源的切换;
(1)可以在方法上增加改数据源的标识:
(2)也可以手动切换数据库:
//切换数据源
* DynamicDataSourceContextHolder.push("配置的数据库名称");
* //中间的业务操作。。。
* //可以最后选择清理掉此数据源
* DynamicDataSourceContextHolder.clear();
手动切换请务必保证push和clear 的成对性;
3.2 跨表操作数据要想保证各个数据库数据的一致性,需要使用@DSTransactional 对方法进行标记,开启分布式事务;
3.3 切换数据源查询需要注意数据源切换失效的几个事项:
- 同一个方法中进行方法this调用,因为使用的不是spring 加强的bean ,此时即使后面方法使用@DS 进行标记也会失败,此时要么从spring 容器中获取需要的bean,或者将方法移入其它的类;可以通过本文中已经放入的工具类完成 bean 的获取:ApplicationContextUtils.getBean(类.getClass());
- 在跨数据源进行数据的修改时需要使用 @DSTransactional 而不是 sprin原生的@Transactional 注解,进行分布式事务的保证;
- 因为@DSTransactional 实际上也是采用的aop 模式进行的加强,所有原有对@Transactional 事务失效的方式@DSTransactional 也会失效;
4 总结:
通过引入多数据源dynamic-datasource ,druid ,mysql-connector,pagehelper,mybatis-plus 完成对spring-cloud 的多数据源整合,通过使用@DS 和 @DSTransactional 保证数据源的切换和分布式事务的支持;
5 参考:
5.1 mybatis-plus 多数据源;
5.2 多数源配置其一(dynamic-datasource);
5.3 mybatis-plus 多数据源,源码解析;