- 拉取项目:
https://gitee.com/y_project/RuoYi-Vue
。前后端分离版本 - 新建数据库,字符集选择
utf8mb4
。导入mysql文件。
- 主pom文件中引入依赖
<!-- 分库分表引擎 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>5.2.0</version>
</dependency>
4. framework模块也引入shardingsphere-jdbc-core-spring-boot-starter
。
<!-- Sharding-JDBC分库分表 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
</dependency>
- 在admin模块中新增
sharding.yml
配置文件,详细定义分库分表策略。我以sys_notice和sys_job为例
# 数据源配置
spring:
datasource:
# type: com.alibaba.druid.pool.DruidDataSource
# driverClassName: com.mysql.cj.jdbc.Driver
driver-class-name: org.apache.shardingsphere.driver.ShardingSphereDriver
url: jdbc:shardingsphere:classpath:sharding.yaml
# 多数据源
dynamic:
datasource:
druid:
# 主库数据源
master:
dataSourceClassName: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://172.16.61.16:3306/sharding_demo?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: Www_112379
# 从库数据源
slave:
# 从数据源开关/默认关闭
enabled: false
url:
username:
password:
# 初始连接数
initialSize: 5
# 最小连接池数量
minIdle: 10
# 最大连接池数量
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置连接超时时间
connectTimeout: 30000
# 配置网络超时时间
socketTimeout: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 300000
# 配置一个连接在池中最大生存的时间,单位是毫秒
maxEvictableIdleTimeMillis: 900000
# 配置检测连接是否有效
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
webStatFilter:
enabled: true
statViewServlet:
enabled: true
# 设置白名单,不填则允许所有访问
allow:
url-pattern: /druid/*
# 控制台管理用户名和密码
login-username: ruoyi
login-password: 123456
filter:
stat:
enabled: true
# 慢SQL记录
log-slow-sql: true
slow-sql-millis: 1000
merge-sql: true
wall:
config:
multi-statement-allow: true
- 修改配置文件,支持多数据源
# 数据源配置
dataSources:
sharding:
dataSourceClassName: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://172.16.61.16:3306/sharding_demo?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: Www_112379
# 规则配置
rules:
- !SHARDING
# 表策略配置
tables:
# sys_notice,sys_job 为例
sys_notice:
# 配置数据节点,按年分表
# actualDataNodes: sharding.sys_notice_$->{2023..2024}
# actualDataNodes: sharding.sys_notice_$->{202401..202412}
actualDataNodes: sharding.sys_notice
tableStrategy:
# 使用标准分片策略
standard:
# 配置分片字段,以哪个字段作为分片标准
shardingColumn: create_time
# 分片算法名称,不支持大写字母和下划线,否则启动就会报错 time-sharding-monthly-algorithm
shardingAlgorithmName: time-sharding-altorithm
sys_job:
# 配置数据节点,按年分表
actualDataNodes: sharding.sys_job
tableStrategy:
# 使用标准分片策略
standard:
# 配置分片字段,以哪个字段作为分片标准
shardingColumn: create_time
# 分片算法名称,不支持大写字母和下划线,否则启动就会报错
shardingAlgorithmName: time-sharding-altorithm
bindingTables:
- sys_notice,sys_job
# 分片算法配置
shardingAlgorithms:
# 分片算法名称,不支持大写字母和下划线,否则启动就会报错
time-sharding-altorithm:
# 类型:自定义策略
type: CLASS_BASED
props:
# 分片策略
strategy: standard
# 分片算法类
algorithmClassName: com.ruoyi.framework.sharding.TimeShardingAlgorithm
props:
# 输出SQL
sql-show: true
- 配置DruidProperties为动态数据源,确保需要分表的配置能够正确加载。修改framework下的DruidProperties。
package com.ruoyi.framework.config.properties;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import com.alibaba.druid.pool.DruidDataSource;
/**
* druid 配置属性
*
* @author ruoyi
*/
@Configuration
public class DruidProperties
{
@Value("${spring.dynamic.datasource.druid.initialSize}")
private int initialSize;
@Value("${spring.dynamic.datasource.druid.minIdle}")
private int minIdle;
@Value("${spring.dynamic.datasource.druid.maxActive}")
private int maxActive;
@Value("${spring.dynamic.datasource.druid.maxWait}")
private int maxWait;
@Value("${spring.dynamic.datasource.druid.timeBetweenEvictionRunsMillis}")
private int timeBetweenEvictionRunsMillis;
@Value("${spring.dynamic.datasource.druid.minEvictableIdleTimeMillis}")
private int minEvictableIdleTimeMillis;
@Value("${spring.dynamic.datasource.druid.maxEvictableIdleTimeMillis}")
private int maxEvictableIdleTimeMillis;
@Value("${spring.dynamic.datasource.druid.validationQuery}")
private String validationQuery;
@Value("${spring.dynamic.datasource.druid.testWhileIdle}")
private boolean testWhileIdle;
@Value("${spring.dynamic.datasource.druid.testOnBorrow}")
private boolean testOnBorrow;
@Value("${spring.dynamic.datasource.druid.testOnReturn}")
private boolean testOnReturn;
public DruidDataSource dataSource(DruidDataSource datasource)
{
/** 配置初始化大小、最小、最大 */
datasource.setInitialSize(initialSize);
datasource.setMaxActive(maxActive);
datasource.setMinIdle(minIdle);
/** 配置获取连接等待超时的时间 */
datasource.setMaxWait(maxWait);
/** 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 */
datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
/** 配置一个连接在池中最小、最大生存的时间,单位是毫秒 */
datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis);
/**
* 用来检测连接是否有效的sql,要求是一个查询语句,常用select 'x'。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。
*/
datasource.setValidationQuery(validationQuery);
/** 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 */
datasource.setTestWhileIdle(testWhileIdle);
/** 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
datasource.setTestOnBorrow(testOnBorrow);
/** 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
datasource.setTestOnReturn(testOnReturn);
return datasource;
}
}
- 在DataSourceType文件中,增加一个SHARDING类型,用于区分分表操作。
package com.ruoyi.common.enums;
/**
* 数据源
*
* @author ruoyi
*/
public enum DataSourceType
{
/**
* 主库
*/
MASTER,
/**
* 从库
*/
SLAVE,
/**
* 从库
*/
SHARDING
}
- 增加ShardingConfig配置
package com.ruoyi.framework.config;
import org.springframework.context.annotation.Configuration;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
/**
* @Description: shardingSphere分表配置
* @Author WangHan
* @Date 2024/9/19 15:28
* @Version 1.0
*/
@Configuration
public class ShardingConfig {
/** 配置文件路径 */
private static final String CONFIG_FILE = "sharding.yaml";
/**
* 获取数据源配置
*/
public static byte[] getShardingYAMLFile() throws IOException {
InputStream inputStream = Objects.requireNonNull(
ShardingConfig.class.getClassLoader().getResourceAsStream(CONFIG_FILE),
String.format("Resource `%s` is not found in the classpath.", CONFIG_FILE)
);
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int nRead;
byte[] data = new byte[16384]; // 创建一个合适的缓冲区大小
// 读取InputStream到ByteArrayOutputStream
while ((nRead = inputStream.read(data, 0, data.length)) != -1) {
buffer.write(data, 0, nRead);
}
// 将ByteArrayOutputStream转换成byte数组
buffer.flush();
return buffer.toByteArray();
}
}
- 修改DruidConfig,增加shardingDataSource的相关配置。
package com.ruoyi.framework.config;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.sql.DataSource;
import org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import com.alibaba.druid.spring.boot.autoconfigure.properties.DruidStatProperties;
import com.alibaba.druid.util.Utils;
import com.ruoyi.common.enums.DataSourceType;
import com.ruoyi.common.utils.spring.SpringUtils;
import com.ruoyi.framework.config.properties.DruidProperties;
import com.ruoyi.framework.datasource.DynamicDataSource;
/**
* druid 配置多数据源
*
* @author ruoyi
*/
@Configuration
public class DruidConfig
{
@Bean
@ConfigurationProperties("spring.dynamic.datasource.druid.master")
public DataSource masterDataSource(DruidProperties druidProperties)
{
DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
return druidProperties.dataSource(dataSource);
}
@Bean
@ConfigurationProperties("spring.dynamic.datasource.druid.slave")
@ConditionalOnProperty(prefix = "spring.dynamic.datasource.druid.slave", name = "enabled", havingValue = "true")
public DataSource slaveDataSource(DruidProperties druidProperties)
{
DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
return druidProperties.dataSource(dataSource);
}
@Bean(name = "shardingDataSource")
public DataSource shardingDataSource() throws Exception
{
DataSource dataSource = YamlShardingSphereDataSourceFactory.createDataSource(ShardingConfig.getShardingYAMLFile());
return dataSource;
}
@Bean(name = "dynamicDataSource")
@Primary
public DynamicDataSource dataSource(DataSource masterDataSource)
{
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DataSourceType.MASTER.name(), masterDataSource);
setDataSource(targetDataSources, DataSourceType.SLAVE.name(), "slaveDataSource");
setDataSource(targetDataSources, DataSourceType.SHARDING.name(), "shardingDataSource");
return new DynamicDataSource(masterDataSource, targetDataSources);
}
/**
* 设置数据源
*
* @param targetDataSources 备选数据源集合
* @param sourceName 数据源名称
* @param beanName bean名称
*/
public void setDataSource(Map<Object, Object> targetDataSources, String sourceName, String beanName)
{
try
{
DataSource dataSource = SpringUtils.getBean(beanName);
targetDataSources.put(sourceName, dataSource);
}
catch (Exception e)
{
}
}
/**
* 去除监控页面底部的广告
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
@ConditionalOnProperty(name = "spring.dynamic.datasource.druid.statViewServlet.enabled", havingValue = "true")
public FilterRegistrationBean removeDruidFilterRegistrationBean(DruidStatProperties properties)
{
// 获取web监控页面的参数
DruidStatProperties.StatViewServlet config = properties.getStatViewServlet();
// 提取common.js的配置路径
String pattern = config.getUrlPattern() != null ? config.getUrlPattern() : "/druid/*";
String commonJsPattern = pattern.replaceAll("\\*", "js/common.js");
final String filePath = "support/http/resources/js/common.js";
// 创建filter进行过滤
Filter filter = new Filter()
{
@Override
public void init(javax.servlet.FilterConfig filterConfig) throws ServletException
{
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException
{
chain.doFilter(request, response);
// 重置缓冲区,响应头不会被重置
response.resetBuffer();
// 获取common.js
String text = Utils.readFromResource(filePath);
// 正则替换banner, 除去底部的广告信息
text = text.replaceAll("<a.*?banner\"></a><br/>", "");
text = text.replaceAll("powered.*?shrek.wang</a>", "");
response.getWriter().write(text);
}
@Override
public void destroy()
{
}
};
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setFilter(filter);
registrationBean.addUrlPatterns(commonJsPattern);
return registrationBean;
}
}
- 定义在sharding.yaml中指定的TimeShardingMonthlyAlgorithm算法
- 在framework下创建sharding包
- 创建分表表缓存枚举
package com.ruoyi.framework.sharding;
import java.util.*;
/**
* @Description: 分片表缓存枚举
* @Author WangHan
* @Date 2024/9/19 15:16
* @Version 1.0
*/
public enum ShardingTableCache {
/**
* 通知公告表
*/
SYS_NOTICE("sys_notice", new HashSet<>()),
/**
* JOB表
*/
SYS_JOB("sys_job", new HashSet<>());
/**
* 逻辑表名
*/
private final String logicTableName;
/**
* 实际表名
*/
private final Set<String> resultTableNamesCache;
private static Map<String, ShardingTableCache> valueMap = new HashMap<>();
static {
Arrays.stream(ShardingTableCache.values()).forEach(o -> valueMap.put(o.logicTableName, o));
}
ShardingTableCache(String logicTableName, Set<String> resultTableNamesCache) {
this.logicTableName = logicTableName;
this.resultTableNamesCache = resultTableNamesCache;
}
public static ShardingTableCache of(String value) {
return valueMap.get(value);
}
public String logicTableName() {
return logicTableName;
}
public Set<String> resultTableNamesCache() {
return resultTableNamesCache;
}
/**
* 更新缓存、配置(原子操作)
*
* @param tableNameList
*/
public void atomicUpdateCacheAndActualDataNodes(List<String> tableNameList) {
synchronized (resultTableNamesCache) {
// 删除缓存
resultTableNamesCache.clear();
// 写入新的缓存
resultTableNamesCache.addAll(tableNameList);
// 动态更新配置 actualDataNodes
ShardingAlgorithmTool.actualDataNodesRefresh(logicTableName, tableNameList);
}
}
public static Set<String> logicTableNames() {
return valueMap.keySet();
}
@Override
public String toString() {
return "ShardingTableCache{" +
"logicTableName='" + logicTableName + '\'' +
", resultTableNamesCache=" + resultTableNamesCache +
'}';
}
}
- 配置启动时读取分表进行缓存
package com.ruoyi.framework.sharding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* @Description: 项目启动后,读取已有分表,进行缓存
* @Author WangHan
* @Date 2024/9/19 18:46
* @Version 1.0
*/
@Order(value = 1) // 数字越小,越先执行
@Component
public class ShardingTablesLoadRunner implements CommandLineRunner {
private final static Logger log = LoggerFactory.getLogger(ShardingTablesLoadRunner.class);
@Override
public void run(String... args) {
// 读取已有分表,进行缓存
ShardingAlgorithmTool.tableNameCacheReloadAll();
log.info(">>>>>>>>>> 【ShardingTablesLoadRunner】缓存已有分表成功 <<<<<<<<<<");
}
}
- 定义分片算法,按年分片
package com.ruoyi.framework.sharding;
import com.google.common.collect.Range;
import org.apache.shardingsphere.sharding.api.sharding.ShardingAutoTableAlgorithm;
import org.apache.shardingsphere.sharding.api.sharding.standard.PreciseShardingValue;
import org.apache.shardingsphere.sharding.api.sharding.standard.RangeShardingValue;
import org.apache.shardingsphere.sharding.api.sharding.standard.StandardShardingAlgorithm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @Description: 分片算法,按年分片
* @Author WangHan
* @Date 2024/9/19 15:11
* @Version 1.0
*/
public class TimeShardingAlgorithm implements StandardShardingAlgorithm<Date>, ShardingAutoTableAlgorithm {
private static final Logger log = LoggerFactory.getLogger(TimeShardingAlgorithm.class);
/**
* 分片时间格式
*/
private static final DateTimeFormatter TABLE_SHARD_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy");
/**
* 完整时间格式
*/
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss");
private static final DateTimeFormatter YYYY_MM_DD_HH_MM_SS = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 表分片符号,例:mall_member_2024 中,分片符号为 "_"
*/
private final String TABLE_SPLIT_SYMBOL = "_";
private Properties props;
private int autoTablesAmount;
/**
* 精准分片(新增)
* @param tableNames 对应分片库中所有分片表的集合
* @param preciseShardingValue 分片键值,其中 logicTableName 为逻辑表,columnName 分片键,value 为从 SQL 中解析出来的分片键的值
* @return 表名
*/
@Override
public String doSharding(Collection<String> tableNames, PreciseShardingValue<Date> preciseShardingValue) {
String logicTableName = preciseShardingValue.getLogicTableName();
// 打印分片信息
log.info("精确分片,节点配置表名:{}", tableNames);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = preciseShardingValue.getValue();
String string = format.format(date);
LocalDateTime dateTime = LocalDateTime.parse(string, YYYY_MM_DD_HH_MM_SS);
String resultTableName = logicTableName + "_" + dateTime.format(TABLE_SHARD_TIME_FORMATTER);
// 检查是否需要初始化
if (tableNames.size() == 1) {
// 如果只有一个表,说明需要获取所有表名
List<String> allTableNameBySchema = ShardingAlgorithmTool.getAllTableNameBySchema(logicTableName);
tableNames.clear();// 先清除,再加载所有的分片表
tableNames.addAll(allTableNameBySchema);
autoTablesAmount = allTableNameBySchema.size();
}
return getShardingTableAndCreate(logicTableName, resultTableName, tableNames);
}
/**
* 范围分片(查询、更新、删除)
* @param tableNames 对应分片库中所有分片表的集合
* @param rangeShardingValue 分片范围
* @return 表名集合
*/
private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public Collection<String> doSharding(Collection<String> tableNames, RangeShardingValue<Date> rangeShardingValue) {
String logicTableName = rangeShardingValue.getLogicTableName();
// 打印分片信息
log.info(">>>>>>>>>> 【INFO】范围分片,节点配置表名:{}", tableNames);
// 检查是否需要初始化
if (tableNames.size() == 1) {
// 如果只有一个表,说明需要获取所有表名
List<String> allTableNameBySchema = ShardingAlgorithmTool.getAllTableNameBySchema(logicTableName);
tableNames.clear();// 先清除,再加载所有的分片表
tableNames.addAll(allTableNameBySchema);
autoTablesAmount = allTableNameBySchema.size();
}
// between and 的起始值
Range<Date> valueRange = rangeShardingValue.getValueRange();
boolean hasLowerBound = valueRange.hasLowerBound();
boolean hasUpperBound = valueRange.hasUpperBound();
// 获取日期范围的上下界
Date lowerDate = null;
Date upperDate = null;
try {
lowerDate = DATE_FORMATTER.parse(String.valueOf(rangeShardingValue.getValueRange().lowerEndpoint()));
upperDate = DATE_FORMATTER.parse(String.valueOf(rangeShardingValue.getValueRange().upperEndpoint()));
} catch (ParseException e) {
try {
lowerDate = DATE_FORMATTER.parse(String.valueOf(rangeShardingValue.getValueRange().lowerEndpoint()) + " 00:00:00");
upperDate = DATE_FORMATTER.parse(String.valueOf(rangeShardingValue.getValueRange().upperEndpoint()) + " 23:59:59");
} catch (ParseException ex) {
throw new RuntimeException(ex);
}
}
LocalDateTime lowerEndpoint = lowerDate.toInstant()
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
LocalDateTime upperEndpoint = upperDate.toInstant()
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
// 获取最大值和最小值
LocalDateTime min = hasLowerBound ? lowerEndpoint :getLowerEndpoint(tableNames);
LocalDateTime max = hasUpperBound ? upperEndpoint :getUpperEndpoint(tableNames);
// 循环计算分表范围
Set<String> resultTableNames = new LinkedHashSet<>();
resultTableNames.add(logicTableName);
while (min.isBefore(max) || min.equals(max)) {
String tableName = logicTableName + TABLE_SPLIT_SYMBOL + min.format(TABLE_SHARD_TIME_FORMATTER);
resultTableNames.add(tableName);
min = min.plusMinutes(1);
}
// 查询与修改不需要执行创建表
return resultTableNames;
}
@Override
public void init(Properties props) {
this.props = props;
}
@Override
public int getAutoTablesAmount() {
return autoTablesAmount;
}
@Override
public String getType() {
return "CLASS_BASED";
}
public Properties getProps() {
return props;
}
/**
* 检查分表获取的表名是否存在,不存在则自动建表
*
* @param logicTableName 逻辑表
* @param resultTableNames 真实表名,例:mall_order_2022
* @param availableTargetNames 可用的数据库表名
* @return 存在于数据库中的真实表名集合
*/
public Set<String> getShardingTablesAndCreate(String logicTableName, Collection<String> resultTableNames, Collection<String> availableTargetNames) {
return resultTableNames.stream().map(o -> getShardingTableAndCreate(logicTableName, o, availableTargetNames)).collect(Collectors.toSet());
}
/**
* 检查分表获取的表名是否存在,不存在则自动建表
* @param logicTableName 逻辑表
* @param resultTableName 真实表名,例:mall_order_2023
* @return 确认存在于数据库中的真实表名
*/
private String getShardingTableAndCreate(String logicTableName, String resultTableName, Collection<String> availableTargetNames) {
// 缓存中有此表则返回,没有则判断创建
if (availableTargetNames.contains(resultTableName)) {
return resultTableName;
} else {
// 检查分表获取的表名不存在,需要自动建表
boolean isSuccess = ShardingAlgorithmTool.createShardingTable(logicTableName, resultTableName);
if (isSuccess) {
// 如果建表成功,需要更新缓存
availableTargetNames.add(resultTableName);
autoTablesAmount++;
return resultTableName;
} else {
// 如果建表失败,返回逻辑空表
return logicTableName;
}
}
}
/**
* 获取 最小分片值
* @param tableNames 表名集合
* @return 最小分片值
*/
private LocalDateTime getLowerEndpoint(Collection<String> tableNames) {
Optional<LocalDateTime> optional = tableNames.stream()
.map(o -> LocalDateTime.parse(o.replace(TABLE_SPLIT_SYMBOL, "") + "01-01-01 00:00:00", DATE_TIME_FORMATTER))
.min(Comparator.comparing(Function.identity()));
if (optional.isPresent()) {
return optional.get();
} else {
log.error(">>>>>>>>>> 【ERROR】获取数据最小分表失败,请稍后重试,tableName:{}", tableNames);
throw new IllegalArgumentException("获取数据最小分表失败,请稍后重试");
}
}
/**
* 获取 最大分片值
* @param tableNames 表名集合
* @return 最大分片值
*/
private LocalDateTime getUpperEndpoint(Collection<String> tableNames) {
Optional<LocalDateTime> optional = tableNames.stream()
.map(o -> LocalDateTime.parse(o.replace(TABLE_SPLIT_SYMBOL, "") + "01-01-01 00:00:00", DATE_TIME_FORMATTER))
.max(Comparator.comparing(Function.identity()));
if (optional.isPresent()) {
return optional.get();
} else {
log.error(">>>>>>>>>> 【ERROR】获取数据最大分表失败,请稍后重试,tableName:{}", tableNames);
throw new IllegalArgumentException("获取数据最大分表失败,请稍后重试");
}
}
}
- 定义分片算法工具类
package com.ruoyi.framework.sharding;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.framework.config.ShardingConfig;
import org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.sql.*;
import java.time.YearMonth;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
/**
* @Description: 分片算法工具类
* @Author WangHan
* @Date 2024/9/19 15:15
* @Version 1.0
*/
@Component
public class ShardingAlgorithmTool implements InitializingBean {
private static final Logger log = LoggerFactory.getLogger(ShardingAlgorithmTool.class);
/** 表分片符号,例:sys_notice_2024 中,分片符号为 "_" */
private static final String TABLE_SPLIT_SYMBOL = "_";
@Value("${spring.dynamic.datasource.druid.master.url}")
private String MASTER_URL;
@Value("${spring.dynamic.datasource.druid.master.username}")
private String MASTER_USERNAME;
@Value("${spring.dynamic.datasource.druid.master.password}")
private String MASTER_PASSWORD;
private static String DATASOURCE_URL;
private static String DATASOURCE_USERNAME;
private static String DATASOURCE_PASSWORD;
@Override
public void afterPropertiesSet() throws Exception {
DATASOURCE_URL = MASTER_URL;
DATASOURCE_USERNAME = MASTER_USERNAME;
DATASOURCE_PASSWORD = MASTER_PASSWORD;
}
/**
* 获取ContextManager
* @param dataSource
* @return
*/
public static ContextManager getContextManager(final ShardingSphereDataSource dataSource) {
try {
Field field = ShardingSphereDataSource.class.getDeclaredField("contextManager");
field.setAccessible(true);
return (ContextManager) field.get(dataSource);
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
/**
* 重载全部缓存
*/
public static void tableNameCacheReloadAll() {
Arrays.stream(ShardingTableCache.values()).forEach(ShardingAlgorithmTool::tableNameCacheReload);
}
/**
* 重载指定分表缓存
* @param logicTable 逻辑表,例:mall_order
*/
public static void tableNameCacheReload(ShardingTableCache logicTable) {
// 读取数据库中所有表名
List<String> tableNameList = getAllTableNameBySchema(logicTable.logicTableName());
// 更新缓存、配置(原子操作)
logicTable.atomicUpdateCacheAndActualDataNodes(tableNameList);
// 删除旧的缓存(如果存在)
logicTable.resultTableNamesCache().clear();
// 写入新的缓存
logicTable.resultTableNamesCache().addAll(tableNameList);
// 动态更新配置 actualDataNodes
actualDataNodesRefresh(logicTable.logicTableName(), tableNameList);
}
/**
* 获取所有表名
* @return 表名集合
* @param logicTableName 逻辑表
*/
public static List<String> getAllTableNameBySchema(String logicTableName) {
List<String> tableNames = new ArrayList<>();
if (StringUtils.isEmpty(DATASOURCE_URL) || StringUtils.isEmpty(DATASOURCE_USERNAME) || StringUtils.isEmpty(DATASOURCE_PASSWORD)) {
log.error(">>>>>>>>>> 【ERROR】数据库连接配置有误,请稍后重试,URL:{}, username:{}, password:{}", DATASOURCE_URL, DATASOURCE_USERNAME, DATASOURCE_PASSWORD);
throw new IllegalArgumentException("数据库连接配置有误,请稍后重试");
}
try (Connection conn = DriverManager.getConnection(DATASOURCE_URL, DATASOURCE_USERNAME, DATASOURCE_PASSWORD);
Statement st = conn.createStatement()) {
try (ResultSet rs = st.executeQuery("show TABLES like '" + logicTableName + TABLE_SPLIT_SYMBOL + "%'")) {
tableNames.add(logicTableName);
while (rs.next()) {
String tableName = rs.getString(1);
// 匹配分表格式 例:^(mall\_contract_\d{6})$
if (tableName != null && tableName.matches(String.format("^(%s\\d{4})$", logicTableName + TABLE_SPLIT_SYMBOL))) {
tableNames.add(rs.getString(1));
}
}
}
} catch (SQLException e) {
log.error(">>>>>>>>>> 【ERROR】数据库连接失败,请稍后重试,原因:{}", e.getMessage(), e);
throw new IllegalArgumentException("数据库连接失败,请稍后重试");
}
return tableNames;
}
/**
* 动态更新配置 actualDataNodes
*
* @param logicTableName 逻辑表名
* @param tableNamesCache 真实表名集合
*/
public static void actualDataNodesRefresh(String logicTableName, List<String> tableNamesCache) {
try {
// 获取数据分片节点
String dbName = "sharding";
log.info(">>>>>>>>>> 【INFO】更新分表配置,logicTableName:{},tableNamesCache:{}", logicTableName, tableNamesCache);
// generate actualDataNodes
String newActualDataNodes = tableNamesCache.stream().map(o -> String.format("%s.%s", dbName, o)).collect(Collectors.joining(","));
ShardingSphereDataSource shardingSphereDataSource = (ShardingSphereDataSource) YamlShardingSphereDataSourceFactory.createDataSource(ShardingConfig.getShardingYAMLFile());
updateShardRuleActualDataNodes(shardingSphereDataSource, logicTableName, newActualDataNodes);
}catch (Exception e){
log.error("初始化 动态表单失败,原因:{}", e.getMessage(), e);
}
}
/**
* 刷新ActualDataNodes
*/
private static void updateShardRuleActualDataNodes(ShardingSphereDataSource dataSource, String logicTableName, String newActualDataNodes) {
// Context manager.
ContextManager contextManager = getContextManager(dataSource);
// Rule configuration.
String schemaName = "logic_db";
Collection<RuleConfiguration> newRuleConfigList = new LinkedList<>();
Collection<RuleConfiguration> oldRuleConfigList = contextManager.getMetaDataContexts()
.getMetaData()
.getGlobalRuleMetaData()
.getConfigurations();
for (RuleConfiguration oldRuleConfig : oldRuleConfigList) {
if (oldRuleConfig instanceof ShardingRuleConfiguration) {
// Algorithm provided sharding rule configuration
ShardingRuleConfiguration oldAlgorithmConfig = (ShardingRuleConfiguration) oldRuleConfig;
ShardingRuleConfiguration newAlgorithmConfig = new ShardingRuleConfiguration();
// Sharding table rule configuration Collection
Collection<ShardingTableRuleConfiguration> newTableRuleConfigList = new LinkedList<>();
Collection<ShardingTableRuleConfiguration> oldTableRuleConfigList = oldAlgorithmConfig.getTables();
oldTableRuleConfigList.forEach(oldTableRuleConfig -> {
if (logicTableName.equals(oldTableRuleConfig.getLogicTable())) {
ShardingTableRuleConfiguration newTableRuleConfig = new ShardingTableRuleConfiguration(oldTableRuleConfig.getLogicTable(), newActualDataNodes);
newTableRuleConfig.setTableShardingStrategy(oldTableRuleConfig.getTableShardingStrategy());
newTableRuleConfig.setDatabaseShardingStrategy(oldTableRuleConfig.getDatabaseShardingStrategy());
newTableRuleConfig.setKeyGenerateStrategy(oldTableRuleConfig.getKeyGenerateStrategy());
newTableRuleConfigList.add(newTableRuleConfig);
} else {
newTableRuleConfigList.add(oldTableRuleConfig);
}
});
newAlgorithmConfig.setTables(newTableRuleConfigList);
newAlgorithmConfig.setAutoTables(oldAlgorithmConfig.getAutoTables());
newAlgorithmConfig.setBindingTableGroups(oldAlgorithmConfig.getBindingTableGroups());
newAlgorithmConfig.setDefaultDatabaseShardingStrategy(oldAlgorithmConfig.getDefaultDatabaseShardingStrategy());
newAlgorithmConfig.setDefaultTableShardingStrategy(oldAlgorithmConfig.getDefaultTableShardingStrategy());
newAlgorithmConfig.setDefaultKeyGenerateStrategy(oldAlgorithmConfig.getDefaultKeyGenerateStrategy());
newAlgorithmConfig.setDefaultShardingColumn(oldAlgorithmConfig.getDefaultShardingColumn());
newAlgorithmConfig.setShardingAlgorithms(oldAlgorithmConfig.getShardingAlgorithms());
newAlgorithmConfig.setKeyGenerators(oldAlgorithmConfig.getKeyGenerators());
newRuleConfigList.add(newAlgorithmConfig);
}
}
// update context
contextManager.alterRuleConfiguration(schemaName, newRuleConfigList);
}
/**
* 创建分表
* @param logicTableName 逻辑表
* @param resultTableName 真实表名,例:mall_order_2024
* @return 创建结果(true创建成功,false未创建)
*/
public static boolean createShardingTable(String logicTableName, String resultTableName) {
// 根据日期判断,当前年份之后分表不提前创建
String year = resultTableName.replace(logicTableName + TABLE_SPLIT_SYMBOL,"");
YearMonth shardingMonth = YearMonth.parse(year + "01", DateTimeFormatter.ofPattern("yyyyMM"));
if (shardingMonth.isAfter(YearMonth.now())) {
return false;
}
synchronized (logicTableName.intern()) {
// 缓存中无此表,则建表并添加缓存
executeSql(Collections.singletonList("CREATE TABLE IF NOT EXISTS `" + resultTableName + "` LIKE `" + logicTableName + "`;"));
}
return true;
}
/**
* 执行SQL
* @param sqlList SQL集合
*/
private static void executeSql(List<String> sqlList) {
if (StringUtils.isEmpty(DATASOURCE_URL) || StringUtils.isEmpty(DATASOURCE_USERNAME) || StringUtils.isEmpty(DATASOURCE_PASSWORD)) {
log.error(">>>>>>>>>> 【ERROR】数据库连接配置有误,请稍后重试,URL:{}, username:{}, password:{}", DATASOURCE_URL, DATASOURCE_USERNAME, DATASOURCE_PASSWORD);
throw new IllegalArgumentException("数据库连接配置有误,请稍后重试");
}
try (Connection conn = DriverManager.getConnection(DATASOURCE_URL, DATASOURCE_USERNAME, DATASOURCE_PASSWORD)) {
try (Statement st = conn.createStatement()) {
conn.setAutoCommit(false);
for (String sql : sqlList) {
st.execute(sql);
}
} catch (Exception e) {
conn.rollback();
log.error(">>>>>>>>>> 【ERROR】数据表创建执行失败,请稍后重试,原因:{}", e.getMessage(), e);
throw new IllegalArgumentException("数据表创建执行失败,请稍后重试");
}
} catch (SQLException e) {
log.error(">>>>>>>>>> 【ERROR】数据库连接失败,请稍后重试,原因:{}", e.getMessage(), e);
throw new IllegalArgumentException("数据库连接失败,请稍后重试");
}
}
}
使用
在需要使用的serviceImpl和mapper接口上声明,以启用分表功能。未声明的接口将默认使用master数据源。
放行通知公告接口
示例Demo:
https://gitee.com/wwh_work/sharding