CyclicBarrier实战应用——批量数据多线程协调异步处理(主线程执行事务回滚)

😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: CCyclicBarrier实战应用——批量数据多线程协调异步处理(主线程执行事务回滚)
⏱️ @ 创作时间: 2023年12月03日

在这里插入图片描述

目录

  • 前言
  • 1、概述
  • 2、方法说明:
  • 3、代码实例

前言

通过CyclicBarrierCountDownLatch配合开启多个子线程,由子线程完成数据的处理,最后由主线程进行数据库操作,由主线程进行事务的提交或者回滚;
如果需要由子线程处理完数据,并且由子线程进行事务提交或者回滚,参考:https://lhz1219.blog.csdn.net/article/details/134630794

1、概述

CyclicBarrier是一个同步器工具类,用来协调多个线程之间的同步,通过await()进行阻塞,直到所有的线程都执行await()后,所有的线程再继续执行。

2、方法说明:

  • public viod await() /int await(long timeout,TimeUnit unit) :使当前线程一直等待,除非线程被中断或超出了指定的等待时间。
    当线程会被阻塞,直到下面的情况之一发生才会返回:
    • 如果每执行一次await() 计数加一,直到达到初始值。
    • 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态。
    • 如果超出了指定的等待时间,则该方法根本不会再进行阻塞。

3、代码实例

有用到hutool的工具包,pom如下:

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.0.7</version>
        </dependency>

Controller:

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @Resource
    private CyclicService cyclicService;
    
    /**
     * CyclicBarrier实现多线程(多个子线程)异步处理数据,再主线程回归处理
     *
     * @return
     */
    @GetMapping("/cyclic/handleData")
    public String countDownHandleData() {
        cyclicService.handleData();
        return "success";
    }

Sevice:

@Service
@Slf4j
public class CountDownService {
    @Resource
    private TestMapper testMapper;

    @Resource
    private ApplicationContext applicationContext;
    
   /**
     * 实现多线程(多个子线程)异步处理数据,再主线程回归处理
     */
    @Transactional(rollbackFor = Exception.class)
    public void handleData() {
        List<TestEntity> testList = getData();
        AtomicBoolean errorTag = new AtomicBoolean(false);
        long start = System.currentTimeMillis();
        // 使用多线程对list集合进行分批次处理,实际情况可以根据具体耗时来决定
        // 比如:一万条数据,每条单独处理需要200ms,按批次一个线程处理200条数据,分为50个批次,实际情况根据业务来定
        // 需要使用hutool工具类
        List<List<TestEntity>> splitList = CollUtil.split(testList, 200);
        // 设置CyclicBarrier大小,需要比实际子线程+1,业务主线程需要进行阻塞
        CyclicBarrier cyclicBarrier = new CyclicBarrier(splitList.size() + 1);
        // 简单创建一个线程池,这里的线程池可以自定义,为了方便直接使用
        ExecutorService executorService = Executors.newCachedThreadPool();
        splitList.forEach(list -> {
            // 线程处理
            executorService.execute(() -> {
                try {
                    for (TestEntity entity : list) {
                        if (errorTag.get()) {
                            break;
                        }

                        // 对实体类的业务处理,此处模拟业务处理,耗时50ms
                        ThreadUtil.sleep(50);

                        // 模拟数据处理中,出现了异常
                        if (entity.getCount().equals(2000)) {
                            throw new RuntimeException("子线程执行异常");
                        }
                    }
                } catch (Exception e) {
                    log.error("子线程异常:{}", e.getMessage(), e);
                    errorTag.set(true);
                } finally {
                    // 子线程中,业务处理完成后,利用cyclicBarrier的特性,计数器加一操作
                    try {
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        errorTag.set(true);
                    }
                }
                log.info("子线程执行完成");
            });
        });
        executorService.shutdown();
        try {
            // 主线程阻塞,直到子线程执行完成
            cyclicBarrier.await();
            // 可以设置最大阻塞时间,防止线程一直挂起,当子线程时间大于当前时间后会抛出TimeOut异常
            // cyclicBarrier.await(5, TimeUnit.SECONDS);

            // 模拟执行主线程业务逻辑耗时,比如insert、update等
            ThreadUtil.sleep(20);
        } catch (Exception e) {
            errorTag.set(true);
        }
        long end = System.currentTimeMillis();
        log.info("数据处理完成,耗时:{}", (end - start) / 1000);
        // 如果出现异常
        if (errorTag.get()) {
            throw new RuntimeException("异步业务执行出现异常");
        }
        log.info("主线程执行完成");
    }


    /**
     * 模拟解析的excel等文件的数据
     */
    private List<TestEntity> getData() {
        List<TestEntity> list = new ArrayList<>();
        // 此处模拟一万条数据
        for (int i = 1; i <= 10000; i++) {
            TestEntity entity = new TestEntity();
            entity.setId(new Random().nextInt(999999999));
            entity.setCount(i);
            entity.setCommodityCode("code-" + i);
            entity.setMoney(new Random().nextInt(1000000));
            entity.setUserId("user-" + i);

            list.add(entity);
        }
        return list;
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/212773.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

多级缓存自用

1.什么是多级缓存 传统的缓存策略一般是请求到达Tomcat后,先查询Redis,如果未命中则查询数据库,如图: 存在下面的问题: •请求要经过Tomcat处理,Tomcat的性能成为整个系统的瓶颈 •Redis缓存失效时,会对数据库产生冲击 多级缓存就是充分利用请求处理的每个环节,添加缓…

MDK提示:在多字节的目标代码中,没有此Unicode 字符可以映射到的字符

MDK警告提示在多字节的目标代码中&#xff0c;没有此Unicode 字符可以映射到的字符 警告提示&#xff1a; 在写MDK的工程代码时&#xff0c;发现代码中引入的头文件前方出现一些红色的叉叉&#xff0c;但是编译工程并不报错&#xff0c;功能也能正常执行的&#xff0c;只是提…

使用VC++设计程序实现K近邻中值滤波器(KNNMF)、最小均方差滤波器、矢量中值滤波算法进行滤波

VC实现若干种图像滤波技术2 获取源工程可访问gitee可在此工程的基础上进行学习。 该工程的其他文章&#xff1a; 01- 一元熵值、二维熵值 02- 图像平移变换&#xff0c;图像缩放、图像裁剪、图像对角线镜像以及图像的旋转 03-邻域平均平滑算法、中值滤波算法、K近邻均值滤波器 …

大学程序员的养生之道

呀哈喽&#xff0c;我是结衣。 今天给大家带来的是大学程序员的养生之道&#xff01; 作为一名大学生还没有深刻的感受到未来的恐怖&#xff0c;但每当我看到这些对程序员的评价还是不禁感慨。 不要让自己的学习之路变成这样啊&#xff01;程序员的职业发展&#xff1a;某编程语…

微机原理——并行接口8255学习1

目录 并行接口特点 可编程并行接口芯片8255 8255端口地址 8255的三种工作方式 8255的两种命令&#xff08;方式命令和C端口命令&#xff09; 由用户扩展的并行接口8255的应用 声光报警器接口设计 步进电机控制接口设计 PA端口实现跑马灯 PB端口实现按键输入 并行接口特…

Python第三方库版本管理(管理虚拟环境)

序言 最近使用python发现会有使用不同项目时需要的三方包依赖版本不同&#xff0c;如果各个项目相互切换&#xff0c;那么会经常需要更新版本。比如numpy当前版本时1.26.2&#xff0c;需要它小于版本1.21&#xff0c;有没有像Java一样通过Maven依赖管理中的版本控制去管理这些…

Linguistic Steganalysis in Few-Shot Scenario论文阅读笔记

TIFS期刊 A类期刊 新知识点 Introduction Linguistic Steganalysis in Few-Shot Scenario模型是个预训练方法。 评估了四种文本加密分析方法&#xff0c;TS-CSW、TS-RNN、Zou、SeSy&#xff0c;用于分析和训练的样本都由VAE-Stego生产(编码方式使用AC编码)。 实验是对比在少样…

JavaWeb(二)

一、SQL简介 结构化查询语言&#xff0c;一门操作关系型数据库的编程语言。英文&#xff1a;Structured Query Language&#xff0c;简称 SQL。 二、Mysql和Oracle关于区分大小写 MySQL在Windows下都不区分大小写。 oracle中分为两种情况&#xff0c;单纯的sql语句不区分大小…

深入理解网络非阻塞 I/O:NIO

&#x1f52d; 嗨&#xff0c;您好 &#x1f44b; 我是 vnjohn&#xff0c;在互联网企业担任 Java 开发&#xff0c;CSDN 优质创作者 &#x1f4d6; 推荐专栏&#xff1a;Spring、MySQL、Nacos、Java&#xff0c;后续其他专栏会持续优化更新迭代 &#x1f332;文章所在专栏&…

非标设计之螺纹螺丝选型二

目录 一、螺丝的表面处理工艺&#xff1a;镀锌工艺&#xff1a;渗锌工艺&#xff1a;热浸锌工艺&#xff1a;达克罗工艺&#xff1a;镀镍工艺&#xff1a;氧化&#xff08;发黑&#xff09;工艺&#xff1a;电泳黑工艺&#xff1a;不锈钢螺钉&#xff1a; 二、按照颜色分工艺&a…

掌握视频剪辑技巧,轻松自定义视频速率,打造个性化出彩视频

你是否曾经因为视频节奏平淡而缺乏吸引力而苦恼&#xff1f;现在&#xff0c;我们为你推荐一款视频批量剪辑工具&#xff0c;让你轻松自定义视频速率&#xff0c;实现出彩个性化视频。 首先第一步&#xff0c;我们要打开好简单批量智剪&#xff0c;并登录账号。 第二步&#x…

三十五、Seata的基本架构、部署TC服务、微服务集成Seata

目录 一、基本架构 1、Seata事务中的三个重要角色 2、四种不同的分布式事务解决方案&#xff1a; 二、TC的部署 三、微服务集成Seata 1、引入Seata相关依赖 2、配置yml文件 3、启动服务 一、基本架构 Seata是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决…

centos7 yum安装mysql5.7

1.获取源 wget http://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm 2.安装源 yum -y install mysql57-community-release-el7-11.noarch.rpm 3.安装mysql yum -y install mysql-server 4.如果出现下面错误&#xff0c;没有错误就忽略 使用以下命令解决…

如何在Rocky Linux中安装nmon

一、环境基础 [rootlocalhost nmon16d]# cat /etc/redhat-release Rocky Linux release 9.2 (Blue Onyx) [rootlocalhost nmon16d]# uname -r 5.14.0-284.11.1.el9_2.x86_64 [rootlocalhost nmon16d]# 二、安装步骤 在Rocky Linux和AlmaLinux等基于RHEL 的发行版上&#xff…

把握生成式AI新机遇,亚马逊云科技助力下一位独角兽

文章目录 前言亚马逊云科技生成式AI创业热潮向应用与工具链集中生成式AI初创生而全球化 赛道更细分、布局更广阔后记 前言 DoNews11月20日消息&#xff0c;当一项新技术出现&#xff0c;并成为行业主流甚至是变革的“敲门砖”时&#xff0c;企业应该如何应对&#xff1f; 202…

Zookeeper 安装与部署

Zookeeper官网 目录 1 配置文件参数解读2 Zookeeper 单点安装3 Zookeeper 分布式安装 1 配置文件参数解读 Zookeeper 中的配置文件 zoo.cfg 中参数含义解读如下&#xff1a; &#xff08;1&#xff09;tickTime 2000&#xff1a;通信心跳数&#xff0c;Zookeeper 服务器与客户…

Shutdown Signal: channel error; protocol method: #method<channel.close>

完整异常信息&#xff1a; Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code404, reply-textNOT_FOUND - no exchange fanoutExchange in vhost /, class-id60, method-id40) 意思是找不到名字是 fanoutExchange 的虚拟机 就是虚拟机…

增强现实技术革新零售业:提升购物体验的未来技术

增强现实&#xff08;AR&#xff09;技术正在改变零售业的面貌&#xff0c;为消费者提供了全新的购物体验。本文将探讨AR技术在零售行业中的应用&#xff0c;以及它如何改变传统的购物方式。 首先&#xff0c;AR技术允许消费者在现实世界中查看虚拟的产品展示。在服装和家具行业…

基于51单片机的交通灯_紧急开关+黄灯倒计时+可调时间

51单片机交通灯_紧急开关黄灯倒计时可调时间 开题报告系统硬件设计主控制器选择系统硬件结构图时钟及复位电路指示灯及倒计时模块 倒计时模块&#xff1a;程序软件主流程框架main函数 设计报告资料清单资料下载链接 基于51单片机交通灯_紧急开关黄灯倒计时可调时间 仿真图prote…

【DPDK】Trace Library

概述 跟踪是一种用于了解运行中的软件系统中发生了什么的技术。用于跟踪的软件被称为跟踪器&#xff0c;在概念上类似于磁带记录器。记录时&#xff0c;放置在软件源代码中的特定检测点会生成保存在巨大磁带上的事件&#xff1a;跟踪文件。稍后可以在跟踪查看器中打开跟踪文件…