支付宝:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!

1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。

2,在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。

3,下面用一个简单示例演示多线程事务。

公用的类和方法

/**
 * 平均拆分list方法.
 * @param source
 * @param n
 * @param <T>
 * @return
 */
public static <T> List<List<T>> averageAssign(List<T> source,int n){
    List<List<T>> result=new ArrayList<List<T>>();
    int remaider=source.size()%n; 
    int number=source.size()/n; 
    int offset=0;//偏移量
    for(int i=0;i<n;i++){
        List<T> value=null;
        if(remaider>0){
            value=source.subList(i*number+offset, (i+1)*number+offset+1);
            remaider--;
            offset++;
        }else{
            value=source.subList(i*number+offset, (i+1)*number+offset);
        }
        result.add(value);
    }
    return result;
}
/**  线程池配置
 * @version V1.0
 */
public class ExecutorConfig {
    private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
    private volatile static ExecutorService executorService;
    public static ExecutorService getThreadPool() {
        if (executorService == null){
            synchronized (ExecutorConfig.class){
                if (executorService == null){
                    executorService =  newThreadPool();
                }
            }
        }
        return executorService;
    }

    private static  ExecutorService newThreadPool(){
        int queueSize = 500;
        int corePool = Math.min(5, maxPoolSize);
        return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
    }
    private ExecutorConfig(){}
}
/** 获取sqlSession
 * @author 86182
 * @version V1.0
 */
@Component
public class SqlContext {
    @Resource
    private SqlSessionTemplate sqlSessionTemplate;

    public SqlSession getSqlSession(){
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        return sqlSessionFactory.openSession();
    }
}

示例事务不成功操作

  /**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
@Transactional
public void saveThread(List<EmployeeDO> employeeDOList) {
    try {
        //先做删除操作,如果子线程出现异常,此操作不会回滚
        this.getBaseMapper().delete(null);
        //获取线程池
        ExecutorService service = ExecutorConfig.getThreadPool();
        //拆分数据,拆分5份
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        //执行的线程
        Thread []threadArray = new Thread[lists.size()];
        //监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭
        CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        for (int i =0;i<lists.size();i++){
            if (i==lists.size()-1){
                atomicBoolean.set(false);
            }
            List<EmployeeDO> list  = lists.get(i);
            threadArray[i] =  new Thread(() -> {
                try {
                 //最后一个线程抛出异常
                    if (!atomicBoolean.get()){
                        throw new ServiceException("001","出现异常");
                    }
                    //批量添加,mybatisPlus中自带的batch方法
                    this.saveBatch(list);
                }finally {
                    countDownLatch.countDown();
                }

            });
        }
        for (int i = 0; i <lists.size(); i++){
            service.execute(threadArray[i]);
        }
        //当子线程执行完毕时,主线程再往下执行
        countDownLatch.await();
        System.out.println("添加完毕");
    }catch (Exception e){
        log.info("error",e);
        throw new ServiceException("002","出现异常");
    }finally {
         connection.close();
     }
}

数据库中存在一条数据:

图片

图片

//测试用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
public class ThreadTest01 {

    @Resource
    private EmployeeBO employeeBO;

    /**
     *   测试多线程事务.
     * @throws InterruptedException
     */
    @Test
    public  void MoreThreadTest2() throws InterruptedException {
        int size = 10;
        List<EmployeeDO> employeeDOList = new ArrayList<>(size);
        for (int i = 0; i<size;i++){
            EmployeeDO employeeDO = new EmployeeDO();
            employeeDO.setEmployeeName("lol"+i);
            employeeDO.setAge(18);
            employeeDO.setGender(1);
            employeeDO.setIdNumber(i+"XX");
            employeeDO.setCreatTime(Calendar.getInstance().getTime());
            employeeDOList.add(employeeDO);
        }
        try {
            employeeBO.saveThread(employeeDOList);
            System.out.println("添加成功");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

测试结果:

图片

图片

图片

图片

可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional注解没有生效。

使用sqlSession控制手动提交事务

 @Resource
  SqlContext sqlContext;
 /**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
    // 获取数据库连接,获取会话(内部自有事务)
    SqlSession sqlSession = sqlContext.getSqlSession();
    Connection connection = sqlSession.getConnection();
    try {
        // 设置手动提交
        connection.setAutoCommit(false);
        //获取mapper
        EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
        //先做删除操作
        employeeMapper.delete(null);
        //获取执行器
        ExecutorService service = ExecutorConfig.getThreadPool();
        List<Callable<Integer>> callableList  = new ArrayList<>();
        //拆分list
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        for (int i =0;i<lists.size();i++){
            if (i==lists.size()-1){
                atomicBoolean.set(false);
            }
            List<EmployeeDO> list  = lists.get(i);
            //使用返回结果的callable去执行,
            Callable<Integer> callable = () -> {
                //让最后一个线程抛出异常
                if (!atomicBoolean.get()){
                    throw new ServiceException("001","出现异常");
                }
              return employeeMapper.saveBatch(list);
            };
            callableList.add(callable);
        }
        //执行子线程
       List<Future<Integer>> futures = service.invokeAll(callableList);
        for (Future<Integer> future:futures) {
        //如果有一个执行不成功,则全部回滚
            if (future.get()<=0){
                connection.rollback();
                 return;
            }
        }
        connection.commit();
        System.out.println("添加完毕");
    }catch (Exception e){
        connection.rollback();
        log.info("error",e);
        throw new ServiceException("002","出现异常");
    }finally {
         connection.close();
     }
}
// sql
<insert id="saveBatch" parameterType="List">
 INSERT INTO
 employee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)
 values
     <foreach collection="list" item="item" index="index" separator=",">
     (
     #{item.employeeId},
     #{item.age},
     #{item.employeeName},
     #{item.birthDate},
     #{item.gender},
     #{item.idNumber},
     #{item.creatTime},
     #{item.updateTime},
     #{item.status}
         )
     </foreach>
 </insert>

数据库中一条数据:

图片

图片

测试结果:抛出异常,

图片

图片

删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。

图片

图片

成功操作示例:

 @Resource
SqlContext sqlContext;
/**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
    // 获取数据库连接,获取会话(内部自有事务)
    SqlSession sqlSession = sqlContext.getSqlSession();
    Connection connection = sqlSession.getConnection();
    try {
        // 设置手动提交
        connection.setAutoCommit(false);
        EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
        //先做删除操作
        employeeMapper.delete(null);
        ExecutorService service = ExecutorConfig.getThreadPool();
        List<Callable<Integer>> callableList  = new ArrayList<>();
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        for (int i =0;i<lists.size();i++){
            List<EmployeeDO> list  = lists.get(i);
            Callable<Integer> callable = () -> employeeMapper.saveBatch(list);
            callableList.add(callable);
        }
        //执行子线程
       List<Future<Integer>> futures = service.invokeAll(callableList);
        for (Future<Integer> future:futures) {
            if (future.get()<=0){
                connection.rollback();
                 return;
            }
        }
        connection.commit();
        System.out.println("添加完毕");
    }catch (Exception e){
        connection.rollback();
        log.info("error",e);
        throw new ServiceException("002","出现异常");
       // throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
    }
}

测试结果:

图片

图片

数据库中数据:

删除的删除了,添加的添加成功了,测试成功。

图片

最后说一句(求关注!别白嫖!)

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。

关注公众号:woniuxgg,在公众号中回复:笔记  就可以获得蜗牛为你精心准备的java实战语雀笔记,回复面试、开发手册、有超赞的粉丝福利!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/340664.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Android】TypedArray的使用

介绍 看电池电量组件BatteryMeterView的时候看到的。 Array是个数组&#xff0c;所有TypedArray也是个容器&#xff0c;基本是用于自定义View里面的&#xff08;至少我目前见过的全部都在自定义View里面&#xff09;。 使用 1.自定义View public class RoundSeekbarView e…

vivado 预设文件、IP设置(_P)、用户参数、以太网时钟处理、GT位置限制、当前可识别板的IP列表

了解预设文件 预设文件有助于在特定配置中自定义IP核心。PS7、axi_emc和当linear_flash或DDR3_SDRAM 界面是在Vivado IP集成商的Board选项卡中选择的。预设文件使用XML格式。preset_file是为特定的Board文件定义的&#xff0c;可以是用于将预设应用于多个IP。 <ip_presets…

模拟器单窗口ip有问题?试试关闭IPV6来解决

目前应该不止雷电9有这个问题了&#xff0c;最早是看到无忧群里在说有这个问题&#xff0c;后面发现很多其他的ip软件也有同样的问题&#xff0c;很多人都遇到&#xff0c;所以做个图文教程在这里&#xff0c;没出问题的也可以设置一下&#xff0c;目前ipv6也还没普及&#xff…

数据库(银行数据库表构建)

题目&#xff1a; 通过所提供的E-R图和数据库模型图完成库表的创建&#xff0c;并插入适量的数据.要求必须使用SQL命令进行构建。 表1 UserInfo **建表** CREATE TABLE USERINFO (customerID INT AUTO_INCREMENT COMMENT 客户编号,customerName CHAR(50) CHARACTER SET utf8mb…

Unity -简单键鼠事件和虚拟轴

简单键鼠事件 — “Test_03” KeyTest 键鼠事件每帧都要监听&#xff0c;要放在Update()中处理 public class KeyTest : MonoBehaviour {// Start is called before the first frame updatevoid Start(){}// Update is called once per framevoid Update(){// 【鼠标点击事件…

【论文代码】基于隐蔽带宽的汽车控制网路鲁棒认证-到达时间间隔通道的Java实现(一)

文章目录 一、USBtin 基类1.1 CANSender 类1.1.1 SimpleSender类 1.2 CANReceiver类1.2.1 SimpleReceiver类 1.3 Noise_node类 二、CANMessageListener 接口2.1 IAT_Monitor2.2 BasicListener2.3 DLC_Monitor 三、IATBitConverter 抽象类3.1 OneBitConverter类3.2 TwoBitConver…

Bit Extraction and Bootstrapping for BGV/BFV

参考文献&#xff1a; [GHS12] Gentry C, Halevi S, Smart N P. Better bootstrapping in fully homomorphic encryption[C]//International Workshop on Public Key Cryptography. Berlin, Heidelberg: Springer Berlin Heidelberg, 2012: 1-16.[AP13] Alperin-Sheriff J, Pe…

常见的嵌入式面试问题解答!

1.关键字static的作用是什么&#xff1f;为什么static变量只初始化一次&#xff1f; ​1&#xff09;修饰局部变量&#xff1a;使得变量变成静态变量&#xff0c;存储在静态区&#xff0c;存储在静态区的数据周期和程序相同&#xff0c; 在main函数开始前初始化&#xff0c;在…

【车载开发系列】AutoSar当中的诊断会话控制

【车载开发系列】AutoSar当中的诊断会话控制 【车载开发系列】AutoSar当中的诊断会话控制 【车载开发系列】AutoSar当中的诊断会话控制一. 什么是诊断会话控制服务二. 会话模式分类三. 会话的接口1&#xff09;获取当前会话状态2&#xff09;设置会话状态3&#xff09;返回默认…

分布式锁4 :数据库DB实现分布式锁的悲观锁和乐观锁,unique实现方式

一 方案1 使用悲观锁解决冲突 1.1 使用悲观锁原理 1.1.1 使用悲观锁的原理 1.悲观锁&#xff1a;在select的时候就会加锁&#xff0c;采用先加锁后处理的模式&#xff0c;虽然保证了数据处理的安全性&#xff0c;但也会阻塞其他线程的写操作。在读取数据时锁住那几行&…

UVT音乐证书考试时间确定,学习氛围渐浓

美国职业资格与人才管理中心&#xff08;UVT&#xff09;音乐证书考试时间正式确定&#xff0c;学习氛围逐渐浓厚。众多热爱音乐的从业者和学生开始积极备考&#xff0c;希望通过这一考试获得音乐领域的宝贵证书。音乐证书被认为是音乐人才展示个人专业水平的重要机会&#xff…

re:从0开始的HTML学习之路 2. HTML的标准结构说明

1. <DOCTYPE html> 文档声明&#xff0c;用于告诉浏览器&#xff0c;当前HTML文档采用的是什么版本。 必须写在当前HTML文档的首行&#xff08;可执行代码的首行&#xff09; HTML4的此标签与HTML5不同。 2. <html lang“en”> 根标签&#xff0c;整个HTML文档中…

虚拟机设置固定IP地址以及访问外网

一、虚拟机固定IP地址设置 1、IP地址查看命令 &#xff08;1&#xff09;ip a [rootlocalhost ~]# ip a • inet 192.168.93.129/24这表示该网络接口&#xff08;ens33&#xff09;被分配了一个IPv4地址是192.168.93.129&#xff0c;并且其子网掩码为 24位&#xff08;即/24…

Java多线程并发篇----第二十六篇

系列文章目录 文章目录 系列文章目录前言一、什么是 Executors 框架?二、什么是阻塞队列?阻塞队列的实现原理是什么?如何使用阻塞队列来实现生产者-消费者模型?三、什么是 Callable 和 Future?前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分…

MySQL---多表等级查询综合练习

创建emp表 CREATE TABLE emp( empno INT(4) NOT NULL COMMENT 员工编号, ename VARCHAR(10) COMMENT 员工名字, job VARCHAR(10) COMMENT 职位, mgr INT(4) COMMENT 上司, hiredate DATE COMMENT 入职时间, sal INT(7) COMMENT 基本工资, comm INT(7) COMMENT 补贴, deptno INT…

大模型理论基础3

模型架构 模型概括 先把语言模型看成黑盒&#xff0c;以便于了解整体功能后拆分成&#xff1a;分词、模型架构 分词 首先要知道&#xff1a;语言模型 p 是建立在词元&#xff08;token&#xff09;序列的上的一个概率分布输出&#xff0c;其中每个词元来自某个词汇表V&#…

解决github无法访问的问题(修改hosts)

1.先ping github.com看是否能ping通 不能ping通的话&#xff0c;找到github最新的ip地址&#xff0c;修改hosts文件&#xff08;C:\Windows\System32\drivers\etc&#xff09; 找最新的ip地址的办法&#xff1a; a.cmd中ping时返回的 b.点击ipaddress.com查询网站链接 修改host…

微信小程序入门,学习全局配置与页面配置

目录 一、微信小程序 二、微信小程序的全局配置 三、微信小程序的页面配置 四、全局配置与页面配置的区别 一、微信小程序 微信小程序是一种基于微信平台的应用程序&#xff0c;它可以在微信内部直接运行&#xff0c;无需下载安装。微信小程序具有以下特点和优势&#xff…

数据结构与算法:图

文章目录 图1) 概念有向 vs 无向度权路径环图的连通性 2) 图的表示3) Java 表示4) DFS5) BFS6) 拓扑排序7) 最短路径DijkstraBellman-FordFloyd-Warshall 8) 最小生成树PrimKruskal 图 1) 概念 图是由顶点&#xff08;vertex&#xff09;和边&#xff08;edge&#xff09;组成…