步骤
- 获取需要进行批量更新的大集合,对大集合进行拆分操作,分成N个小集合1 ~ N 。
- 为每个分段集合开启一个线程,进行数据处理;
把操作封装工具类,如下:
@Component
public class SyncBatchExecUtil {
public <T> void exec(List<T> list, int spiltSize, Consumer<List<T>> consumer){
//拆分
List<List<T>> execList = spilt(list, spiltSize);
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(execList.size());
//所以异步
List< CompletableFuture<Void>> allCompletable = new ArrayList<>();
for (List<T> execData : execList) {
CompletableFuture<Void> execCompletable = CompletableFuture.runAsync(() -> {
consumer.accept(execData);
}, executorService);
allCompletable.add(execCompletable);
}
//等待执行结束
CompletableFuture<Void> allOf = CompletableFuture.allOf(allCompletable.toArray(new CompletableFuture[0]));
allOf.join(); // 等待所有任务完成
System.out.println();
// 关闭线程池
executorService.shutdown();
}
/**
* @param needSpiltList 需要拆分的集合
* @param spiltSize 每个集合大小
* @return 拆分后都集合
*/
public static <T> List<List<T>> spilt(List<T> needSpiltList, int spiltSize){
//防御性编程
if (CollectionUtils.isEmpty(needSpiltList) || spiltSize <=0){
return new ArrayList<>();
}
//结果
List<List<T>> result = new ArrayList<>();
//集合大小
int size = needSpiltList.size();
//最多分一批次
if (size<=spiltSize){
result.add(needSpiltList);
return result;
}
//出最后一个,每批次大小
int preSize = size / spiltSize;
//最后一个批次大小
int lastSize = size % spiltSize;
//开始标记
int beginIndex = 0;
//结束标记
int endIndex = 0;
for (int i = 0; i <= preSize; i++) {
endIndex = beginIndex + spiltSize;
//最后一批,用最后一批的
if (i == preSize && lastSize != 0){
endIndex = beginIndex + lastSize;
}
result.add(needSpiltList.subList(beginIndex,endIndex));
//移动游标
beginIndex = beginIndex + spiltSize;
}
return result;
}
}
具体使用:
@Autowired
private UtilService utilService ;
@Autowired
private SyncBatchExecUtil syncBatchExecUtil;
@Test
public void exec(){
//要处理的数据
List<Demo> list = new ArrayList<>();
//直接执行
syncBatchExecUtil.exec(list,5,(data)->{
utilService .saveBatch(data);
});
}