CountDownLatch实战应用——批量数据多线程协调异步处理(子线程执行事务回滚)

😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: CountDownLatch实战应用——批量数据多线程协调异步处理(子线程执行事务回滚)
⏱️ @ 创作时间: 2023年11月26日

在这里插入图片描述

目录

  • 前言
  • 1、概述
  • 2、实现
  • 3、方法说明:
  • 4、代码实例

前言

通过CountDownLatch开启多个子线程,由子线程完成数据的处理,子线程完成数据处理后进行等待,直到所有的子线程完成数据处理后,再判断是否进行回滚,如果需要回滚则所有线程执行回滚操作

如果需要由子线程处理完数据,但是由主线程进行事务提交或者回滚,参考:https://lhz1219.blog.csdn.net/article/details/134630258

1、概述

CountDownLatch是一个同步器工具类,用来协调多个线程之间的同步,能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行,不可重置使用。

2、实现

使用一个计数器进行实现,计数器初始值为线程的数量,当每一个线程完成自己任务后,计数器的值就会减一,当计数器的值为0时,在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

3、方法说明:

  • public void countDown():递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少.
  • public viod await() /boolean await(long timeout,TimeUnit unit) :使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回true值。当线程调用了CountDownLatch对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:
    • 如果计数到达零,则该方法返回true值。
    • 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态。
    • 如果超出了指定的等待时间,则返回值为false。如果该时间小于等于零,则该方法根本不会等待。参数:timeout-要等待的最长时间、unit-timeout 参数的时间单位

4、代码实例

有用到hutool的工具包,pom如下:

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.0.7</version>
        </dependency>

Controller:

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @Resource
    private CountDownService countDownService;
   

    /**
     * CountDownLatch实现多线程(多个子线程)异步事务处理数据,全部子线程回滚数据
     *
     * @return
     */
    @ApiOperation(value = "测试CountDownLatch", notes = "测试")
    @ApiOperationSupport(order = 5)
    @GetMapping("/countDown/handleDataSonBack")
    public String handleDataSonBack() {
        countDownService.handleDataSonBack();
        return "success";
    }

Sevice:

@Service
@Slf4j
public class CountDownService {
    @Resource
    private TestMapper testMapper;

    @Resource
    private ApplicationContext applicationContext;
    
	/**
     * CountDownLatch实现多线程(多个子线程)异步事务处理数据,全部子线程回滚数据
     *
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public void handleDataSonBack() {
        List<TestEntity> testList = getData();
        AtomicBoolean errorTag = new AtomicBoolean(false);
        long start = System.currentTimeMillis();
        // 使用多线程对list集合进行分批次处理,实际情况可以根据具体耗时来决定
        // 比如:一万条数据,每条单独处理需要50ms,按批次一个线程处理200条数据,分为50个批次,实际情况根据业务来定
        // 需要使用hutool工具类进行分组
        // 由于开启了事务回滚,异步的线程数量要小于,dataSource中配置的maximum-pool-size数量
        List<List<TestEntity>> splitList = CollUtil.split(testList, 200);
        // 设置countDown大小
        CountDownLatch countDownLatch = new CountDownLatch(splitList.size());

        // 再创建一个CountDownLatch,大小固定为一,用于子线程相互等待,最后确定是否回滚
        CountDownLatch errorCountDown = new CountDownLatch(1);

        // 异步调用其他Service,执行业务处理
        CountDownService bean = applicationContext.getBean(CountDownService.class);
        // 简单创建一个线程池,这里的线程池可以自定义,为了方便直接使用
        ExecutorService executorService = Executors.newCachedThreadPool();
        splitList.forEach(list -> {
            // 线程处理
            executorService.execute(() -> {
                bean.handleDataAsyncSonBack(list, countDownLatch, errorCountDown, errorTag);
            });
        });

        executorService.shutdown();
        try {
            // 主线程阻塞
            countDownLatch.await();
            // 可以设置最大阻塞时间,防止线程一直挂起
            /*boolean await = countDownLatch.await(1, TimeUnit.SECONDS);
            if (!await) {
                // 超过时间子线程都还没有结束,直接都回滚
                errorTag.set(true);
            }*/
            log.info("继续执行主线程");

            // 继续执行后续的操作,比如insert、update等
            TestEntity entity = new TestEntity();
            entity.setId(new Random().nextInt(999999999));
            entity.setCount(1);
            entity.setCommodityCode("handleTestMain");
            entity.setMoney(new Random().nextInt(1000000));
            entity.setUserId("user-handleTestMain");
            testMapper.insert(entity);

        } catch (Exception e) {
            log.error("主线程业务执行异常");
            errorTag.set(true);
        } finally {
            // 主线程业务执行完成后,执行errorCountDown计时器减一,使得所有阻塞的子线程,继续执行进入到异常判断中
            errorCountDown.countDown();
        }

        long end = System.currentTimeMillis();
        log.info("数据处理完成,耗时:{}", (end - start) / 1000);
        // 如果出现异常
        if (errorTag.get()) {
            throw new RuntimeException("异步业务执行出现异常");
        }
        log.info("主线程执行完成");
    }

    /**
     * 子线程异步处理,并且实现回滚
     * 由于开启了事务回滚,异步的线程数量要小于,dataSource中配置的maximum-pool-size数量
     */
    @Transactional(rollbackFor = Exception.class)
    public void handleDataAsyncSonBack(List<TestEntity> list, CountDownLatch countDownLatch, CountDownLatch errorCountDown, AtomicBoolean errorTag) {
        try {
            log.info("开始执行子线程");
            for (TestEntity entity : list) {
                if (errorTag.get()) {
                    break;
                }

                // 对实体类的业务处理,此处模拟业务处理,耗时50ms
                ThreadUtil.sleep(50);
                // 数据库查询操作
                testMapper.insert(entity);

                // 模拟数据处理中,出现了异常
                if (entity.getCount().equals(2000)) {
                    throw new RuntimeException("子线程执行异常");
                }
            }
        } catch (Exception e) {
            log.error("子线程异常:{}", e.getMessage(), e);
            errorTag.set(true);
        } finally {
            // 子线程中,业务处理完成后,利用countDown的特性,计数器减一操作
            countDownLatch.countDown();
        }

        log.info("handleDataAsyncSonBack-业务处理完成从,等待其他子线程");
        // 子阻塞,直到其他子线程完成操作
        try {
            errorCountDown.await();
        } catch (Exception e) {
            errorTag.set(true);
        }
        log.info("handleDataAsyncSonBack-子线程执行完成");
        if (errorTag.get()) {
            // 抛出异常,回滚数据
            throw new RuntimeException("handleDataAsyncSonBack-子线程业务执行异常");
        }
    }

    /**
     * 模拟解析的excel等文件的数据
     */
    private List<TestEntity> getData() {
        List<TestEntity> list = new ArrayList<>();
        // 此处模拟一万条数据
        for (int i = 1; i <= 10000; i++) {
            TestEntity entity = new TestEntity();
            entity.setId(new Random().nextInt(999999999));
            entity.setCount(i);
            entity.setCommodityCode("code-" + i);
            entity.setMoney(new Random().nextInt(1000000));
            entity.setUserId("user-" + i);

            list.add(entity);
        }
        return list;
    }
}

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/190426.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker 部署 Nacos(单机),利用 MySQL 数据库存储配置信息

前面的话 默认你已经懂 Docker、docker-compose Nacos版本&#xff1a;v2.2.3 MySQL 版本&#xff1a;8.2.0 一、下载 打开 Nacos 官网 官网地址&#xff1a;官网 点击手册 左侧 Nacos Docker 克隆项目到本地 # 克隆项目&#xff0c;如果提示连接不到 github 请自行解决 …

【数据结构】树的概念以及二叉树

目录 1 树概念及结构 1.1 树的概念 1.3 树的存储 2 二叉树的概念及结构 2.1 概念 2.2 特殊的二叉树 2.3 二叉树的性质 2.4 二叉树的存储结构 1 树概念及结构 1.1 树的概念 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组…

「Verilog学习笔记」非整数倍数据位宽转换24to128

专栏前言 本专栏的内容主要是记录本人学习Verilog过程中的一些知识点&#xff0c;刷题网站用的是牛客网 要实现24bit数据至128bit数据的位宽转换&#xff0c;必须要用寄存器将先到达的数据进行缓存。24bit数据至128bit数据&#xff0c;相当于5个输入数据第6个输入数据的拼接成一…

AR眼镜双目光波导/主板硬件方案

AR(增强现实)技术的发展离不开光学元件&#xff0c;而在其中&#xff0c;光波导和Micro OLED被视为AR眼镜光学方案的黄金搭档。光学元件在AR行业中扮演着核心角色&#xff0c;其成本高昂且直接影响用户体验的亮度、清晰度和大小等因素。AR眼镜的硬件成本中&#xff0c;光机部分…

测试工程师必学看系列之Jmeter_性能测试:性能测试的流程和术语

性能测试的流程 一、准备工作 1、系统基础功能验证 一般情况下&#xff0c;只有在系统基础功能测试验证完成、系统趋于稳定的情况下&#xff0c;才会进行性能测试&#xff0c;否则性能测试是无意义的。2、测试团队组建 根据该项目的具体情况&#xff0c;组建一个几人的性能测试…

【刷题宝典NO.5】

有效的括号 https://leetcode.cn/problems/valid-parentheses/ 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号闭合。左括号必…

Hugging Face宣布最受欢迎的AI机构,开源模型ChatGLM-6B广受认可

近日&#xff0c;Hugging Face作为开源AI社区的代表&#xff0c;总结了社区最欢迎的前15个公司和机构&#xff0c;几乎囊括了全部国内外风头正盛的AI科技机构&#xff0c;Stability AI、Meta AI、Runway占据排名前三&#xff0c;大众熟知的OpenAI、谷歌、微软也榜上有名。 其中…

C语言—冒泡排序

方法一&#xff08;不使用函数解决&#xff09; #define _CRT_SECURE_NO_WARNINGS 1#include<stdio.h> int main() {int arr[]{15,52,23,0,5,6,45,8,9,10};int i0;int j0;for ( i 0; i < 9; i){int flag1; //flag判断数组元素是否有序&#xff0c;这里先假设…

如何在Ubuntu系统上安装Git

简单介绍 Git是一个开源的分布式版本控制系统&#xff0c;用于敏捷高效地处理任何或小或大的项目。Git是Linus Torvalds为了帮助管理Linux内核开发而开发的一个开放源码的版本控制软件。Git 与常用的版本控制工具CVS&#xff0c;Subversion 等不同&#xff0c;它采用了分布式版…

2018年8月28日 Go生态洞察:Go 2草案设计初探

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

代理模式-C语言实现

UML图&#xff1a; 代码实现&#xff1a; #include <stdio.h>// 抽象主题接口 typedef struct {void (*request)(void*); } Subject;// 具体主题类 typedef struct {void (*request)(void*); } RealSubject;void RealSubject_request(void* obj) {printf("RealSubj…

计算机中vcomp140.dll丢失的解决方法,一键修复vcomp140.dll缺失问题

vcomp140.dll是Visual C 2015 Redistributable的一个组件&#xff0c;它是运行一些基于Visual Studio开发的软件所必需的。当你在运行某些程序时&#xff0c;可能会遇到“找不到vcomp140.dll”的错误提示&#xff0c;这通常是由于系统缺少这个组件导致的。本文将介绍vcomp140.d…

windows运行Pangolin应用填坑心得——如何在window应用轻量级opengl软件Pangolin库显示3D界面及窗口

目录 0、前言1、最有效的安装打开方式准备工作安装git安装vcpkg&#xff08;1&#xff09;下载&#xff08;2&#xff09;安装&#xff08;3&#xff09;集成至vs 安装cmake 安装pangolin 2、应用实例c工程&#xff08;1&#xff09;vs创建新工程&#xff08;2&#xff09;新工…

2018年8月24日 Go生态洞察:Go 1.11的发布及其新特性

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

安装最新版WebStorm来开发JavaScript应用程序

安装最新版WebStorm来开发JavaScript应用程序 Install the Latest Version of JetBrains WebStorm to Develop JavaScript Applications By JacksonML 2023-11-25 1. 系统要求 WebStorm是个跨平台集成开发环境&#xff08;IDE&#xff09;。按照JetBrains官网对WebStorm软件…

PTA-7-55 判断指定字符串是否合法

题目&#xff1a; 输入一个字符串&#xff0c;判断指定字符串是否合法&#xff0c;要求字符串由7个字符组成&#xff0c;并且第一位必须是大写字母&#xff0c;2-4为必须是小写字母&#xff0c;后3为必须是数字字符&#xff0c;要求使用正则表达式来实现。 根据题目要求&#x…

【Python爬虫实战项目】ip代理池项目原理及代码解析

视频讲解链接&#xff1a;https://www.bilibili.com/video/BV1e8411r7xX/ 代码链接&#xff1a;https://github.com/w-x-x-w/Spider-Project 大家好&#xff0c;这一季我们来介绍一个Python爬虫实战项目-ip代理池项目&#xff0c;这一集我们会首先介绍ip代理池的工作原理流程&a…

成为AI产品经理——模型评估(混淆矩阵)

一、混淆矩阵 1.混淆矩阵的介绍 混淆矩阵有两个定义positive&#xff08;正例&#xff09;和negative&#xff08;反例&#xff09;。分别代表模型结果的好和坏。 下图就是一个分类问题的混淆矩阵。横行代表真实的情况&#xff0c;而竖行代表预测的结果。 为了便于理解&…

QT网络协议知识体系(一)

//获取主机的名称和ip地址 //获取主机的所有信息

Android安卓设置跳转默认应用商店为Google Play 链接跳转到谷歌商店临时解决方法

手机链接默认不跳转 Google Play 因为大部分安卓厂商系统都根据了自己的需求进行了修改,就成为了系统级导流,想要彻底解除可刷写国际版等原生系统即可恢复 解决方法 使用冻结软件(例如 爱玩机手机助手(root)等应用)对 应用商城 进行临时冻结,如需保证正常使用解除冻结状态即可…