1.业务场景
因为需要从一个返利明细表中获取大量的数据,生成返利报告,耗时相对较久,作为后台任务执行。但是后台任务如果不用多线程处理,也会要很长时间才能处理完。
另外考虑到数据量大,不能一次查询所有数据在内存中处理,为了防止内存溢出,分页查询数据,然后分批次多线程处理。
主要思想是采取分治的思想,首先分页查询数据,然后每页数据分成均匀的不同片段,多个线程处理这些片段,一个线程处理一个片段,可以加上等待的同步计数器,让这一页数据全部处理完后再去查询下一页的数据。
2.关键代码
//线程池配置
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10,
10,
10L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200), new ThreadPoolExecutor.CallerRunsPolicy());
public String generateReport(String periodType, String monthWid, String quarterWid) {
int totalNum = 0;
//计时器
StopWatch stopWatch = new StopWatch();
stopWatch.start();
try {
//这里省略了一些其他的逻辑,只关注分页查询然后多线程任务处理的逻辑......
//查询总数量
totalNum = getReportTotalNum(periodType, monthWid, quarterWid, totalNum);
int pageIndex = 0;
int pageSize = 500;
int pageNum = 1;
StoreRebateDetailForReportQueryReq req = null;
while (pageNum <= (totalNum % pageSize == 0 ? (totalNum / pageSize) : (totalNum / pageSize + 1))) {、
//分页查询,每页500条数据
pageIndex = pageSize * (pageNum - 1);
List<StoreRebateDetail> list = storeRebateDetailService.selectListForRebateReport(pageIndex, pageSize);
int batchNum = list.size();
//每个线程处理100条
int perThreadCount = 100;
LOGGER.info("开始处理第{}页(共{}条)数据", pageNum, batchNum);
final CountDownLatch cdl = new CountDownLatch((batchNum % perThreadCount) == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); //计数器
for (int j = 0; j < batchNum; j++) {
//每100条一个线程处理
if (j % perThreadCount == 0) {
int start = j;
int end = (batchNum - j) >= perThreadCount ? (j + perThreadCount) : batchNum;
int pageNums = pageNum;
poolExecutor.submit(()->{
LOGGER.info("第{}页的第{}-{}条数据处理开始", pageNums, start+1, end);
//处理比较复杂的业务逻辑(耗时较久)
processInsert(list, start, end);
LOGGER.info("第{}页的第{}-{}条数据处理结束", pageNums, start+1, end);
cdl.countDown();
});
}
}
cdl.await();
pageNum++;
}
stopWatch.stop();
double totalTimeSeconds = stopWatch.getTotalTimeSeconds();
result.put("syncStatus", "success");
result.put("syncMsg", "调度处理完毕,生成" + totalNum + "条数据,执行时间为" + totalTimeSeconds + "秒");
return SToolUtils.convertResultJSONObj(CommonAbstractService.SUCCESS_STATUS, "处理成功", totalNum, new JSONArray().fluentAdd(result)).toString();
} catch (Exception e) {
stopWatch.stop();
double totalTimeSeconds = stopWatch.getTotalTimeSeconds();
LOGGER.error("调度处理异常:{}--{}", e.getMessage(), e);
result.put("syncStatus", "fail");
result.put("syncMsg", "调度处理完毕,生成" + totalNum + "条数据,执行时间为" + totalTimeSeconds + "秒");
return SToolUtils.convertResultJSONObj(CommonAbstractService.ERROR_STATUS, "处理异常", 0, new JSONArray().fluentAdd(result)).toString();
} finally {
//做业务需要处理的,可以没有
}
}
后面改了个通用版,采用接口中的默认方法实现主要公共逻辑,其他几个需要不同实现的方法让子类去实现。
batchProcess
方法为主要处理逻辑入口方法,供其子类继承,子类需要传递线程池、每页大小、每个线程处理的条数、查询数据的参数等参数。
processLongTimeLogic
方法为处理时间比较长,需要多线程去执行的逻辑,子类直接覆写这个方法,将复杂的耗时比较长的业务逻辑放在里面就可以了。
queryTotalNum
方法为查询总记录数的方法,子类去具体实现查询逻辑,查询数量是为了后续分页处理。
queryDataListByPage
方法为分页查询数据的方法,也是子类去实现具体的逻辑,这里的第一个参数list
加了泛型处理,<T>
为查询数据返回的实体对象类,这样在后续处理的时候就不要去强转类型了。
这样子类只需要关注查询大表的查询逻辑,以及需要处理的具体业务逻辑,而不需要去处理分页和多线程处理的逻辑,这样增加了代码的可读性以及减少了出错的可能性。
public interface BatchProcessService<T> {
/**
* 批量处理,分页+多线程处理
* @param poolExecutor 线程池
* @param pageSize 每页查询的大小
* @param perThreadCount 每个线程处理的记录数
* @param queryTotalNumParam 查询记录总数的参数,必须继承PageReq
* @param queryDataParam 查询分页列表的参数,必须继承PageReq
* @param logger 子类的日志对象
* @param otherParam 其他参数,需要给processLongTimeLogic方法传递的参数
* @throws InterruptedException
*/
default int batchProcess(ThreadPoolExecutor poolExecutor, int pageSize, int perThreadCount, Object queryTotalNumParam, PageReq queryDataParam, Logger logger, Map<String, Object> otherParam) throws InterruptedException {
int pageIndex = 0;
int pageNum = 1;
int totalNum = queryTotalNum(queryTotalNumParam);
if (totalNum == 0) {
logger.info("需要处理的数据数量为0");
return 0;
}
try {
while (pageNum <= (totalNum % pageSize == 0 ? (totalNum / pageSize) : (totalNum / pageSize + 1))) {
pageIndex = pageSize * (pageNum - 1);
queryDataParam.setPageIndex(pageIndex);
queryDataParam.setPageRows(pageSize);
List<T> list = queryDataListByPage(queryDataParam);
int batchNum = list.size();
final CountDownLatch cdl = new CountDownLatch((batchNum % perThreadCount) == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); //计数器
for (int j = 1; j <= (batchNum % perThreadCount == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); j++) {
//每100条一个线程处理
int start = perThreadCount * (j - 1);
int end = (batchNum - start) >= perThreadCount ? (start + perThreadCount) : batchNum;
int pageNums = pageNum;
poolExecutor.submit(() -> {
logger.info("第{}页的第{}-{}条数据处理开始", pageNums, start + 1, end);
//处理其他长时间的逻辑
processLongTimeLogic(list.subList(start, end), otherParam);
logger.info("第{}页的第{}-{}条数据处理结束", pageNums, start + 1, end);
cdl.countDown();
});
}
cdl.await();
pageNum++;
}
} catch (Exception e) {
logger.error("批量处理数据异常", e);
throw e;
}
return totalNum;
}
/**
* 查询记录总数
*
* @param queryParam
* @return
*/
int queryTotalNum(Object queryParam);
/**
* 分页查询数据
*
* @param queryDataParam
* @return
*/
List<T> queryDataListByPage(PageReq queryDataParam);
/**
* 处理长时间业务逻辑
*
* @param list 处理的数据列表
* @param otherParam 其他参数
*/
void processLongTimeLogic(List<T> list, Map<String, Object> otherParam);
}
PageReq
类为分页查询参数的父类,里面包含了分页的一些属性,查询参数的实体继承该类就可以了,其他是自己的业务相关的参数。
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
@Getter
@Setter
public class PageReq implements Serializable {
/**
* 当前页码
*/
private Integer pageIndex = 1;
/**
* 页大小
*/
private Integer pageRows = 10;
public PageReq() {
}
public PageReq(Integer pageIndex, Integer pageRows) {
this.pageIndex = pageIndex;
this.pageRows = pageRows;
}
}
3.测试效果
原来跑一个月的数据需要40多分钟,后面通过这样处理后,采用5个线程跑,时间缩短至8分钟左右,相当于差不多时间缩短到原来的1/5。