Mybatis-Plus
的批量保存saveBatch
性能分析
目录
- `Mybatis-Plus` 的批量保存`saveBatch` 性能分析
- 背景
- 批量保存的使用方案
- 循环插入
- 使用`PreparedStatement `预编译
- 优点:
- 缺点:
- `Mybatis-Plus `的`saveBatch`
- `Mybatis-Plus`实现真正的批量插入
- 自定义`sql`注入器
- 定义通用`mapper``CommonMapper`
- 将自定义的注入器加载到容器中
- 业务`mapper`
- 测试
- 优化
- 执行性能比较
- `rewriteBatchedStatements` 参数分析
背景
昨天同事问我,mybatis-plus
自动生成的service
里面提供的savebatch
最后生成的批量插入语句是多条insert
,而不是insert...vaues (),()
的语句,这样是不是跟我们使用循环调用没区别,这样的批量插入是不是有性能问题?下面我们就此问题来进行分析一下。
批量保存的使用方案
循环插入
使用 for
循环一条一条的插入,这个方式比较简单直观,灵活,但是这个 对于大型数据集,使用for
循环逐条插入数据可能会导致性能问题,特别是在网络延迟高或数据库负载大的情况下。使用for
循环进行数据插入时,需要注意事务管理,确保数据的一致性和完整性。如果不适当地管理事务,可能会导致数据不一致或丢失。而且每次循环迭代都需要建立和关闭数据库连接,这可能会导致额外的数据库连接开销,影响性能。
使用PreparedStatement
预编译
使用预处理的方式进行批量插入是一种常见的优化方法,它可以显著提高插入操作的性能。
优点:
-
性能提升: 预处理可以减少每次插入操作中的数据库通信次数,从而降低了网络通信的开销,提高了插入操作的效率和性能。
-
减少数据库负载: 将多条数据组合成批量插入的方式可以减少数据库服务器的负载,降低了数据库系统的压力,有助于提高整个系统的性能。
-
减少连接开销: 预处理可以减少每次循环迭代中建立和关闭数据库连接的开销,从而节省了系统资源,提高了连接的复用率。
-
事务管理:可以将多个插入操作放在一个事务中,以确保数据的一致性和完整性,并在发生错误时进行回滚,从而保证数据的安全性。
缺点:
-
内存消耗: 将多条数据组合成批量插入的方式可能会增加内存消耗,特别是在处理大量数据时。因此,需要注意内存的使用情况,以避免内存溢出或性能下降。
-
数据格式转换: 在将数据组合成批量插入时,可能需要进行数据格式转换或数据清洗操作,这可能会增加代码的复杂度和维护成本。
-
可读性降低: 预处理方式可能会使代码结构变得复杂,降低了代码的可读性和可维护性,特别是对于一些初学者或新加入团队的开发人员来说可能会造成困扰
所以由此可见预编译方式性能较好,如果想避免内存问题的话,其实使用分批插入也可以解决这个问题。
Mybatis-Plus
的saveBatch
直接看源码
/**
* 批量插入
*
* @param entityList ignore
* @param batchSize ignore
* @return ignore
*/
@Transactional(rollbackFor = Exception.class)
@Override
public boolean saveBatch(Collection<T> entityList, int batchSize) {
String sqlStatement = getSqlStatement(SqlMethod.INSERT_ONE);
return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity));
}
/**
* 执行批量操作
*
* @param entityClass 实体类
* @param log 日志对象
* @param list 数据集合
* @param batchSize 批次大小
* @param consumer consumer
* @param <E> T
* @return 操作结果
* @since 3.4.0
*/
public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {
Assert.isFalse(batchSize < 1, "batchSize must not be less than one");
return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, sqlSession -> {
int size = list.size();
int idxLimit = Math.min(batchSize, size);
int i = 1;
for (E element : list) {
consumer.accept(sqlSession, element);
if (i == idxLimit) {
sqlSession.flushStatements();
idxLimit = Math.min(idxLimit + batchSize, size);
}
i++;
}
});
}
通过代码可以发现2个点,第一个就是批量保存的时候会默认进行分批,每批的大小为1000条数据;第二点就是通过代码
return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity));
和
for (E element : list) {
consumer.accept(sqlSession, element);
if (i == idxLimit) {
sqlSession.flushStatements();
idxLimit = Math.min(idxLimit + batchSize, size);
}
i++;
}
可以看出插入是循环插入,并没有进行拼接处理。但是这里唯一不同与循环插入的是可以看到这里是通过sqlSession.flushStatements()
将一个个单条插入的insert
语句分批次进行提交,用的是同一个sqlSession
。
这里其实就可以看出来mybatis-plus
的批量插入实际上不是真正意义上的批量插入。那如果想实现真正的批量插入就只能手动拼接脚本吗?其实mybatis-plus
提供了sql
注入器,我们可以自定义方法来满足业务的实际开发需求。官方文档:https://baomidou.com/pages/42ea4a/
Mybatis-Plus
实现真正的批量插入
自定义sql
注入器
/**
* @author leo
* @date 2024年03月13日 15:16
*/
public class BatchSqlInjector extends DefaultSqlInjector {
@Override
public List<AbstractMethod> getMethodList(Class<?> mapperClass, TableInfo tableInfo) {
List<AbstractMethod> methodList = super.getMethodList(mapperClass,tableInfo);
//更新时自动填充的字段,不用插入值
methodList.add(new InsertBatchSomeColumn(i -> i.getFieldFill() != FieldFill.UPDATE));
return methodList;
}
}
定义通用mapper``CommonMapper
/**
* @author leo
* @date 2024年03月13日 16:34
*/
public interface CommonMapper<T> extends BaseMapper<T> {
/**
* 真正的批量插入
* @param entityList
* @return
*/
int insertBatch(List<T> entityList);
}
将自定义的注入器加载到容器中
/**
* @author leo
* @date 2024年03月13日 15:41
*/
@Configuration
public class MybatisPlusConfig {
@Bean
public BatchSqlInjector sqlInjector() {
return new BatchSqlInjector();
}
}
业务mapper
/**
*
* @author leo
* @since 2024-01-11
*/
public interface LlfInfoMapper extends CommonMapper<LlfInfoEntity> {
}
测试
List<LlfInfoEntity> llfInfoEntities = new ArrayList<>();
for (int i = 0; i <= 10; i++) {
LlfInfoEntity llfInfoEntity = new LlfInfoEntity();
llfInfoEntity.setChannelNum(i + "");
llfInfoEntity.setGroupNumber(i+"");
llfInfoEntity.setFlight(i+1);
llfInfoEntity.setIdNumber(i+"sadsadsad");
llfInfoEntities.add(llfInfoEntity);
}
llfInfoMapper.insertBatch(llfInfoEntities);
这里我们看下控制台打印的语句:
很明显,达到了我们的效果。
优化
这里可以看到InsertBatchSomeColumn
方法没有批次的概念,如果没有批次的话,那这里地方可能会有性能问题,你想想如果这个条数无穷大的话,我那这个sql
语句会非常大,不仅会超出mysql
的执行sql
的长度限制,也会造成oom
。那么这里我们就需要自己实现一下批次插入了,不知道大家还有没有印象前面的saveBatch()
方法是怎么实现批次插入的。我们也可以参考一下实现方式。直接上代码
public boolean executeBatch(Collection<LlfInfoEntity> list, int batchSize) {
int size = list.size();
int idxLimit = Math.min(batchSize, size);
int i = 1;
List<LlfInfoEntity> batchList = new ArrayList<>();
for (LlfInfoEntity element : list) {
batchList.add(element);
if (i == idxLimit) {
llfInfoMapper.insertBatchSomeColumn(batchList);
batchList.clear();
idxLimit = Math.min(idxLimit + batchSize, size);
}
i++;
}
return true;
}
测试代码:
List<LlfInfoEntity> llfInfoEntities = new ArrayList<>();
for (int i = 0; i <= 10; i++) {
LlfInfoEntity llfInfoEntity = new LlfInfoEntity();
llfInfoEntity.setChannelNum(i + "");
llfInfoEntity.setGroupNumber(i + "");
llfInfoEntity.setFlight(i + 1);
llfInfoEntity.setIdNumber(i + "sadsadsad");
llfInfoEntities.add(llfInfoEntity);
}
executeBatch(llfInfoEntities,5);
看执行结果:
这里就实现了真正的批量插入了。
执行性能比较
这里我就不去具体展现测试数据了,直接下结论了。
首先最快的肯定是手动拼sql
脚本和mybatis-plus
的方式速度最快,其次是mybatis-plus
的saveBatch
。这里要说下有很多文章都说需要单独配置rewriteBatchedStatements
参数,才会启用saveBatch
的批量插入方式。但是我这边跟进源码进行查看的时候默认值就是true
,所以我猜测可能是版本问题,下面会附上版本以及源码供大家参考。
rewriteBatchedStatements
参数分析
首选我们通过com.baomidou.mybatisplus.extension.toolkit.SqlHelper#executeBatch(java.lang.Class<?>, org.apache.ibatis.logging.Log, java.util.Collection<E>, int, java.util.function.BiConsumer<org.apache.ibatis.session.SqlSession,E>)
l里面的sqlSession.flushStatements();
代码可以跟踪到,mysql
驱动包里面的com.mysql.cj.jdbc.StatementImpl#executeBatch
下面这段代码
@Override
public int[] executeBatch() throws SQLException {
return Util.truncateAndConvertToInt(executeBatchInternal());
}
protected long[] executeBatchInternal() throws SQLException {
JdbcConnection locallyScopedConn = checkClosed();
synchronized (locallyScopedConn.getConnectionMutex()) {
if (locallyScopedConn.isReadOnly()) {
throw SQLError.createSQLException(Messages.getString("Statement.34") + Messages.getString("Statement.35"),
MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT, getExceptionInterceptor());
}
implicitlyCloseAllOpenResults();
List<Object> batchedArgs = this.query.getBatchedArgs();
if (batchedArgs == null || batchedArgs.size() == 0) {
return new long[0];
}
// we timeout the entire batch, not individual statements
int individualStatementTimeout = getTimeoutInMillis();
setTimeoutInMillis(0);
CancelQueryTask timeoutTask = null;
try {
resetCancelledState();
statementBegins();
try {
this.retrieveGeneratedKeys = true; // The JDBC spec doesn't forbid this, but doesn't provide for it either...we do..
long[] updateCounts = null;
if (batchedArgs != null) {
int nbrCommands = batchedArgs.size();
this.batchedGeneratedKeys = new ArrayList<>(batchedArgs.size());
boolean multiQueriesEnabled = locallyScopedConn.getPropertySet().getBooleanProperty(PropertyKey.allowMultiQueries).getValue();
if (multiQueriesEnabled || this.rewriteBatchedStatements.getValue() && nbrCommands > 4) {
return executeBatchUsingMultiQueries(multiQueriesEnabled, nbrCommands, individualStatementTimeout);
}
timeoutTask = startQueryTimer(this, individualStatementTimeout);
updateCounts = new long[nbrCommands];
for (int i = 0; i < nbrCommands; i++) {
updateCounts[i] = -3;
}
SQLException sqlEx = null;
int commandIndex = 0;
for (commandIndex = 0; commandIndex < nbrCommands; commandIndex++) {
try {
String sql = (String) batchedArgs.get(commandIndex);
updateCounts[commandIndex] = executeUpdateInternal(sql, true, true);
if (timeoutTask != null) {
// we need to check the cancel state on each iteration to generate timeout exception if needed
checkCancelTimeout();
}
// limit one generated key per OnDuplicateKey statement
getBatchedGeneratedKeys(this.results.getFirstCharOfQuery() == 'I' && containsOnDuplicateKeyInString(sql) ? 1 : 0);
} catch (SQLException ex) {
updateCounts[commandIndex] = EXECUTE_FAILED;
if (this.continueBatchOnError && !(ex instanceof MySQLTimeoutException) && !(ex instanceof MySQLStatementCancelledException)
&& !hasDeadlockOrTimeoutRolledBackTx(ex)) {
sqlEx = ex;
} else {
long[] newUpdateCounts = new long[commandIndex];
if (hasDeadlockOrTimeoutRolledBackTx(ex)) {
for (int i = 0; i < newUpdateCounts.length; i++) {
newUpdateCounts[i] = java.sql.Statement.EXECUTE_FAILED;
}
} else {
System.arraycopy(updateCounts, 0, newUpdateCounts, 0, commandIndex);
}
sqlEx = ex;
break;
//throw SQLError.createBatchUpdateException(ex, newUpdateCounts, getExceptionInterceptor());
}
}
}
if (sqlEx != null) {
throw SQLError.createBatchUpdateException(sqlEx, updateCounts, getExceptionInterceptor());
}
}
if (timeoutTask != null) {
stopQueryTimer(timeoutTask, true, true);
timeoutTask = null;
}
return (updateCounts != null) ? updateCounts : new long[0];
} finally {
this.query.getStatementExecuting().set(false);
}
} finally {
stopQueryTimer(timeoutTask, false, false);
resetCancelledState();
setTimeoutInMillis(individualStatementTimeout);
clearBatch();
}
}
}
我们主要核心看一下这个代码:
if (multiQueriesEnabled || this.rewriteBatchedStatements.getValue() && nbrCommands > 4) {
return executeBatchUsingMultiQueries(multiQueriesEnabled, nbrCommands, individualStatementTimeout);
}
能进入if语句,并执行批处理方法 executeBatchUsingMultiQueryies
的条件如下:
allowMultiQueries = true
rewriteBatchedStatements=true
- 数据总条数 > 4条
PropertyKey.java
中定义了 multiQueriesEnables
和 rewriteBatchedStatements
的枚举值,com.mysql.cj.conf.PropertyKey
如下:
可以看出这个参数都是true。所以我这边默认就是支持批量操作的。
mybatis-plus
版本:3.5.10
mysql-connector-java
版本:8.0.31
Queryies` 的条件如下:
allowMultiQueries = true
rewriteBatchedStatements=true
- 数据总条数 > 4条
PropertyKey.java
中定义了 multiQueriesEnables
和 rewriteBatchedStatements
的枚举值,com.mysql.cj.conf.PropertyKey
如下:
[外链图片转存中…(img-nwh8oV0y-1710751858305)]
[外链图片转存中…(img-AmPKylvo-1710751858305)]
可以看出这个参数都是true。所以我这边默认就是支持批量操作的。
mybatis-plus
版本:3.5.10
mysql-connector-java
版本:8.0.31