分页多线程处理大批量数据

1.业务场景

因为需要从一个返利明细表中获取大量的数据,生成返利报告,耗时相对较久,作为后台任务执行。但是后台任务如果不用多线程处理,也会要很长时间才能处理完。

另外考虑到数据量大,不能一次查询所有数据在内存中处理,为了防止内存溢出,分页查询数据,然后分批次多线程处理。

主要思想是采取分治的思想,首先分页查询数据,然后每页数据分成均匀的不同片段,多个线程处理这些片段,一个线程处理一个片段,可以加上等待的同步计数器,让这一页数据全部处理完后再去查询下一页的数据。

2.关键代码

//线程池配置
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10,
            10,
            10L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(200), new ThreadPoolExecutor.CallerRunsPolicy());

   

public String generateReport(String periodType, String monthWid, String quarterWid) {
        int totalNum = 0;
        //计时器
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        try {
            //这里省略了一些其他的逻辑,只关注分页查询然后多线程任务处理的逻辑......
            //查询总数量
            totalNum = getReportTotalNum(periodType, monthWid, quarterWid, totalNum);
            int pageIndex = 0;
            int pageSize = 500;
            int pageNum = 1;
            StoreRebateDetailForReportQueryReq req = null;
            while (pageNum <= (totalNum % pageSize == 0 ? (totalNum / pageSize) : (totalNum / pageSize + 1))) {//分页查询,每页500条数据
                pageIndex = pageSize * (pageNum - 1);
                List<StoreRebateDetail> list = storeRebateDetailService.selectListForRebateReport(pageIndex, pageSize);
                int batchNum = list.size();
                //每个线程处理100条                                                                                       
                int perThreadCount = 100;
                LOGGER.info("开始处理第{}页(共{}条)数据", pageNum, batchNum);
                final CountDownLatch cdl = new CountDownLatch((batchNum % perThreadCount) == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); //计数器
                for (int j = 0; j < batchNum; j++) {
                    //每100条一个线程处理
                    if (j % perThreadCount == 0) {
                        int start = j;
                        int end = (batchNum - j) >= perThreadCount ? (j + perThreadCount) : batchNum;
                        int pageNums = pageNum;
                        poolExecutor.submit(()->{
                            LOGGER.info("第{}页的第{}-{}条数据处理开始", pageNums, start+1, end);
                            //处理比较复杂的业务逻辑(耗时较久)
                            processInsert(list, start, end);
                            LOGGER.info("第{}页的第{}-{}条数据处理结束", pageNums, start+1, end);
                            cdl.countDown();
                        });
                    }
                }
                cdl.await();
                pageNum++;
            }
            stopWatch.stop();
            double totalTimeSeconds = stopWatch.getTotalTimeSeconds();
            result.put("syncStatus", "success");
            result.put("syncMsg", "调度处理完毕,生成" + totalNum + "条数据,执行时间为" + totalTimeSeconds + "秒");
            return SToolUtils.convertResultJSONObj(CommonAbstractService.SUCCESS_STATUS, "处理成功", totalNum, new JSONArray().fluentAdd(result)).toString();
        } catch (Exception e) {
            stopWatch.stop();
            double totalTimeSeconds = stopWatch.getTotalTimeSeconds();
            LOGGER.error("调度处理异常:{}--{}", e.getMessage(), e);
            result.put("syncStatus", "fail");
            result.put("syncMsg", "调度处理完毕,生成" + totalNum + "条数据,执行时间为" + totalTimeSeconds + "秒");
            return SToolUtils.convertResultJSONObj(CommonAbstractService.ERROR_STATUS, "处理异常", 0, new JSONArray().fluentAdd(result)).toString();
        } finally {
            //做业务需要处理的,可以没有
        }
    }

后面改了个通用版,采用接口中的默认方法实现主要公共逻辑,其他几个需要不同实现的方法让子类去实现。

batchProcess方法为主要处理逻辑入口方法,供其子类继承,子类需要传递线程池、每页大小、每个线程处理的条数、查询数据的参数等参数。

processLongTimeLogic方法为处理时间比较长,需要多线程去执行的逻辑,子类直接覆写这个方法,将复杂的耗时比较长的业务逻辑放在里面就可以了。

queryTotalNum方法为查询总记录数的方法,子类去具体实现查询逻辑,查询数量是为了后续分页处理。

queryDataListByPage方法为分页查询数据的方法,也是子类去实现具体的逻辑,这里的第一个参数list加了泛型处理,<T>为查询数据返回的实体对象类,这样在后续处理的时候就不要去强转类型了。

这样子类只需要关注查询大表的查询逻辑,以及需要处理的具体业务逻辑,而不需要去处理分页和多线程处理的逻辑,这样增加了代码的可读性以及减少了出错的可能性。

public interface BatchProcessService<T> {

    /**
     * 批量处理,分页+多线程处理
     * @param poolExecutor       线程池
     * @param pageSize           每页查询的大小
     * @param perThreadCount     每个线程处理的记录数
     * @param queryTotalNumParam 查询记录总数的参数,必须继承PageReq
     * @param queryDataParam     查询分页列表的参数,必须继承PageReq
     * @param logger             子类的日志对象
     * @param otherParam         其他参数,需要给processLongTimeLogic方法传递的参数
     * @throws InterruptedException
     */
    default int batchProcess(ThreadPoolExecutor poolExecutor, int pageSize, int perThreadCount, Object queryTotalNumParam, PageReq queryDataParam, Logger logger, Map<String, Object> otherParam) throws InterruptedException {
        int pageIndex = 0;
        int pageNum = 1;
        int totalNum = queryTotalNum(queryTotalNumParam);
        if (totalNum == 0) {
            logger.info("需要处理的数据数量为0");
            return 0;
        }
        try {
            while (pageNum <= (totalNum % pageSize == 0 ? (totalNum / pageSize) : (totalNum / pageSize + 1))) {
                pageIndex = pageSize * (pageNum - 1);
                queryDataParam.setPageIndex(pageIndex);
                queryDataParam.setPageRows(pageSize);
                List<T> list = queryDataListByPage(queryDataParam);
                int batchNum = list.size();
                final CountDownLatch cdl = new CountDownLatch((batchNum % perThreadCount) == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); //计数器
                for (int j = 1; j <= (batchNum % perThreadCount == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); j++) {
                    //每100条一个线程处理
                    int start = perThreadCount * (j - 1);
                    int end = (batchNum - start) >= perThreadCount ? (start + perThreadCount) : batchNum;
                    int pageNums = pageNum;
                    poolExecutor.submit(() -> {
                        logger.info("第{}页的第{}-{}条数据处理开始", pageNums, start + 1, end);
                        //处理其他长时间的逻辑
                        processLongTimeLogic(list.subList(start, end), otherParam);
                        logger.info("第{}页的第{}-{}条数据处理结束", pageNums, start + 1, end);
                        cdl.countDown();
                    });
                }
                cdl.await();
                pageNum++;
            }
        } catch (Exception e) {
            logger.error("批量处理数据异常", e);
            throw e;
        }
        return totalNum;
    }

    /**
     * 查询记录总数
     *
     * @param queryParam
     * @return
     */
    int queryTotalNum(Object queryParam);

    /**
     * 分页查询数据
     *
     * @param queryDataParam
     * @return
     */
    List<T> queryDataListByPage(PageReq queryDataParam);

    /**
     * 处理长时间业务逻辑
     *
     * @param list  处理的数据列表
     * @param otherParam 其他参数
     */
    void processLongTimeLogic(List<T> list, Map<String, Object> otherParam);
}

PageReq类为分页查询参数的父类,里面包含了分页的一些属性,查询参数的实体继承该类就可以了,其他是自己的业务相关的参数。

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

@Getter
@Setter
public class PageReq implements Serializable {

    /**
     * 当前页码
     */
    private Integer pageIndex = 1;

    /**
     * 页大小
     */
    private Integer pageRows = 10;

    public PageReq() {
    }

    public PageReq(Integer pageIndex, Integer pageRows) {
        this.pageIndex = pageIndex;
        this.pageRows = pageRows;
    }

}

3.测试效果

原来跑一个月的数据需要40多分钟,后面通过这样处理后,采用5个线程跑,时间缩短至8分钟左右,相当于差不多时间缩短到原来的1/5。

image-20240320124945462

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/481403.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LaTeX论文汇报ppt模板

在 LaTeX 的 beamer 类中&#xff0c;您可以使用不同的主题和模板来创建适合论文汇报的演示文稿。以下是一个使用了比较正式的 Madrid 主题的模板&#xff0c;您可以基于这个模板进行定制和扩展&#xff0c;以满足您论文汇报的需求。当需要在ppt输入中文的时候需要将第一行中的…

北京中科富海低温科技有限公司确认出席2024第三届中国氢能国际峰会

会议背景 随着全球对清洁能源的迫切需求&#xff0c;氢能能源转型、工业应用、交通运输等方面具有广阔前景&#xff0c;氢能也成为应对气候变化的重要解决方案。根据德勤的报告显示&#xff0c;到2050年&#xff0c;绿色氢能将有1.4万亿美元市场。氢能产业的各环节的关键技术突…

大数据技术在工厂生产数字转型中的应用与价值

hello宝子们...我们是艾斯视觉擅长ui设计和前端开发10年经验&#xff01;希望我的分享能帮助到您&#xff01;如需帮助可以评论关注私信我们一起探讨&#xff01;致敬感谢感恩&#xff01; 随着大数据技术的快速发展&#xff0c;越来越多的企业开始关注并应用大数据技术&#x…

C语言:自定义类型(结构体)

目录 一、结构的特殊声明二、结构的自引用三、结构体内存对齐1.对齐规则2.为什么存在内存对齐(1)平台原因 (移植原因)&#xff1a;(2)性能原因&#xff1a; 3.修改默认对齐数 四、结构体传参五、结构体实现位段1.什么是位段2.位段的内存分配3.位段的跨平台问题4.位段使用的注意…

tftp使用

下载 sudo apt-get install tftpd-hpa 创建文件夹 mkdir /home/ljl/work/tftpd mkdir /home/ljl/tftpd chmod 777 tftpd/编辑 sudo vim /etc/default/tftpd-hpa //服务器端 sudo apt-get install tftp-hpa //客户端编辑权限 sudo vi /etc/default/tftpd-hpa 内容&#xff1…

jenkins构建完成后部署到本机,无法读取容器外文件夹

项目背景&#xff1a; Dockerjenkins 构建完成后&#xff0c;要把打包的dist文件夹内容移动到网站目录 /www/wwwroot/xxxxxx 文件夹下&#xff1b;但是获取不到jenkins容器外的文件夹。 解决办法&#xff1a; 在容器中&#xff0c;添加挂载/映射本机目录&#xff0c;把网站…

两直线交点算法 C

求两直线交点算法 有中间交点 则CD在AB异侧 A B A C A B A D \nobreak AB \times AC \newline AB \times AD ABACABAD 异号 叉乘后相乘小于零 等于零的几种情况 A B C与AB共线 D与AB共线 求交点&#xff0c;可由面积比例用叉乘计算 C E C D S A B C S A B C D . \frac…

yarn的使用与安装

文章目录 1.安装方式一&#xff1a;全局安装yarn2.安装方式二&#xff1a;通过开启corepack安装3.其他部分yarn命令4.Yarn镜像配置5.Pnpm使用方法同yarn无区别,可按照以上yarn的安装以及使用方式来使用pnmp 1.安装方式一&#xff1a;全局安装yarn 全局安装yarn npm i yarn -g…

视频记录历史播放位置效果

简介 每次打开页面视频从上一次的播放位置开始播放 利用lodash库做节流 代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-sca…

自动驾驶---Motion Planning之轨迹Path优化

1 背景 在之前的几篇文章中,不管是通过构建SL图《自动驾驶---Motion Planning之Path Boundary》,ST图《自动驾驶---Motion Planning之Speed Boundary》,又或者是构建SLT图《自动驾驶---Motion Planning之构建SLT Driving Corridor》,最终我们都是为了得到boundary的信息。 …

牛客题霸-SQL进阶篇(刷题记录二)

本文基于前段时间学习总结的 MySQL 相关的查询语法&#xff0c;在牛客网找了相应的 MySQL 题目进行练习&#xff0c;以便加强对于 MySQL 查询语法的理解和应用。 由于涉及到的数据库表较多&#xff0c;因此本文不再展示&#xff0c;只提供 MySQL 代码与示例输出。 部分题目因…

【隐私计算实训营003详解隐私计算框架及技术要点】

1. 隐语架构一览 1.1 隐语架构 隐语架构通常指的是一种面向隐私保护计算的软件框架或解决方案&#xff0c;它采用了密码学、可信执行环境&#xff08;TEE&#xff09;、多方安全计算&#xff08;MPC&#xff09;等多种隐私保护技术来实现在数据加密状态下进行计算&#xff0c;…

【计算机网络篇】数据链路层(2)封装成帧和透明传输

文章目录 &#x1f95a;封装成帧和透明传输&#x1f388;封装成帧&#x1f388;透明传输&#x1f5d2;️面向字节的物理链路使用字节填充的方法实现透明传输。&#x1f5d2;️面向比特的物理链路使用比特填充的方法实现透明传输。 &#x1f6f8;练习 &#x1f95a;封装成帧和透…

河北盟盾:高性能钢结构防火涂料,安全守护新力量

在现代化建设的浪潮中&#xff0c;防火安全日益成为各行业关注的焦点。河北盟盾防火材料有限公司以其卓越的产品质量和稳定性能&#xff0c;赢得了市场的广泛认可。公司始终坚持以科技为先导&#xff0c;以创新为动力&#xff0c;不断推出高品质、高性能的防火涂料产品。 公司的…

通讯录的动态实现

文章目录 通讯录的动态实现模块化编程通讯录的框架构建功能的具体实现初始化通讯录添加联系人删除联系人查找联系人修改联系人打印通讯录排序通讯录检查容量并扩容加载通讯录保留通讯录销毁通讯录 完整代码总结 通讯录的动态实现 模块化编程 分文件 不同模块放在不同的文件下 …

一招鲜吃遍天!CleanMyMac X苹果电脑Mac管家让你的Mac倍儿爽

一招鲜吃遍天&#xff01;CleanMyMac X 苹果电脑Mac管家让你的 Mac 倍儿爽 &#xff0c; 轻松清理、优化、保护你的 Apple 设备&#xff0c;体验前所未有的流畅&#xff0c;在当今数字化时代&#xff0c;我们的生活离不开各类电子设备&#xff0c;尤其是苹果电脑 Mac。 然而&am…

CISP 4.2备考之《安全支撑技术》知识点总结

文章目录 第一节 密码技术第二节 标识和身份鉴别技术第三节 访问控制技术 第一节 密码技术 密码学发展阶段&#xff1a;古典、近代、现代和公钥密码学及特点。 密码系统组成&#xff1a;明文、加密、密钥、解密、密文。 柯克霍夫原则&#xff1a;密钥保密&#xff0c;算法公开…

【Node.js】npx

概述 npx 可以使用户在不安装全局包的情况下&#xff0c;运行已安装在本地项目中的包或者远程仓库中的包。 高版本npm会自带npx命令。 它可以直接运行 node_modules/.bin 下的 exe 可执行文件。而不像之前&#xff0c;我们需要在 scripts 里面配置&#xff0c;然后 npm run …

利用Scala与Apache HttpClient实现网络音频流的抓取

概述 在当今数字化时代&#xff0c;网络数据的抓取和处理已成为许多应用程序和服务的重要组成部分。本文将介绍如何利用Scala编程语言结合Apache HttpClient工具库实现网络音频流的抓取。通过本文&#xff0c;读者将学习如何利用强大的Scala语言和Apache HttpClient库来抓取网…

npm i安装依赖报错,但是cnpm i 却安装成功

问题描述&#xff1a;在a项目中npm i 安装依赖时发生以上报错&#xff0c;但是cnpm i 却成功&#xff0c;而且在其他项目中npm i 安装其他项目依赖也能成功.... 解决办法&#xff1a;删除项目中package-lock.json文件后再npm i 即可