Spring多线程事务处理

一、背景

本文主要介绍了spring多线程事务的解决方案,心急的小伙伴可以跳过上面的理论介绍分析部分直接看最终解决方案。

在我们日常的业务活动中,经常会出现大规模的修改插入操作,比如在3.0的活动赛事创建,涉及到十几张表的插入(一张表可能插入一行或者多行数据),由于单线程模型的关系,所有的sql都是串行,即后面的sql必须都要等到前面的sql执行完成才能继续。但是在很多场景下,sql的执行顺序并不影响业务的结果,面对这样的场景,我们很自然的想到了使用异步的方式去处理,可是我们同时又希望整个创建操作是事务性的,即要全部成功,要么全部失败,但是单纯的使用异步线程并不能达到我们理想的效果。

这个时候,我们需要一种多线程下保证事务的解决方案。

代码片段,大量的同步保存操作

    public void much(){
        //业务操作1
        doBusiness1();
        //业务操作2
        doBusiness2();
        //业务操作3
        doBusiness3();
        //业务操作4
        doBusiness4();
        
    }

    private void doBusiness1() {
        //执行sql1
        //执行sql2
        //执行sql3
        //执行sql4
    }

每个业务操作可以是相关联的,也有可能是完全无关的,但如果做成异步的话我们就无法保证事务,怎么去解决这个问题呢?

二、理论先行

1.事务介绍

我们先确定spring事务的本质是什么,spring本身不支持事务,spring实现事务只是对我们原有的业务逻辑做了一层包装,他替我们决定了什么时候开启事务,什么情况下应该向数据库提交,什么时候回滚,及实现我们设置的一些事务参数,包括回滚的条件,传播类型等。

我们所熟知的spring事务有两种主流的解决方式,一种是声明式事务,一种是编程式事务。

先来讲我们最常用的声明式事务。

1.1声明式事务

声明式事务就是我们最常用的@Transactional注解,通常我们只需要在我们想交由spring控制的事务方法上加上注解即可,这个注解有一些重要的参数,由于不是本文重点,就不在此展开。这是一个经典的spring的aop实现,为了弄清楚在加上@Transactional注解后spring到底为我们做了什么,我们可以从两方面入手,一是spring如何给我们生成相应的代理对象,二是这个代理对象为我们做了什么。

事务的开始是由@EnableTransactionManagement 注解产生,这个注解在运行时会导入TransactionManagementConfigurationSelector这个类,这个类本质上是一个ImportSelector,他根据adviceMode将特定的配置类导入进去,分别为AutoProxyRegistrar 后置处理器和ProxyTransactionManagementConfiguration Advisor。

AutoProxyRegistrar 实现了ImportBeanDefinitionRegistrar 重写了registerBeanDefinitions 方法

public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    boolean candidateFound = false;
    Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
    for (String annType : annTypes) {
        // ...

        AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
    }
    // ...
}

@Nullable
public static BeanDefinition registerAutoProxyCreatorIfNecessary(
    BeanDefinitionRegistry registry, @Nullable Object source) {

    return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}

该方法最终注入了InfrastructureAdvisorAutoProxyCreator。InfrastructureAdvisorAutoProxyCreator这个类就是一个bean的后置处理器,最终的作用就是处理需要的代理对象。

public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
if (bean != null) {
    Object cacheKey = getCacheKey(bean.getClass(), beanName);
    if (this.earlyProxyReferences.remove(cacheKey) != bean) {
        return wrapIfNecessary(bean, beanName, cacheKey);
    }
}
return bean;
}


protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    // ...
    // 拿当前bean去匹配容器中的 Advisors,如果找到符合的就生成代理对象
    // Create proxy if we have advice.  
    Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
    if (specificInterceptors != DO_NOT_PROXY) {
        this.advisedBeans.put(cacheKey, Boolean.TRUE);
        Object proxy = createProxy(
            bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
        this.proxyTypes.put(cacheKey, proxy.getClass());
        return proxy;
    }

    this.advisedBeans.put(cacheKey, Boolean.FALSE);
    return bean;
}

ProxyTransactionManagementConfiguration的作用就是来生成具体的Advisor,他注册了三个bean,

  1. 该类主要完成以下几个任务:

  2. 创建TransactionAttributeSource对象:用于解析@Transactional注解并生成事务属性。

  3. 创建TransactionInterceptor对象:用于创建事务通知,将事务属性应用到目标方法,这其实就是一个事务模板,如下所示

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
                                         final InvocationCallback invocation) throws Throwable {
    //TransactionAttributeSource内部保存着当前类某个方法对应的TransactionAttribute---事务属性源
    //可以看做是一个存放TransactionAttribute与method方法映射的池子
    TransactionAttributeSource tas = getTransactionAttributeSource();
    //获取当前事务方法对应的TransactionAttribute
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    //定位TransactionManager
    final TransactionManager tm = determineTransactionManager(txAttr);
    .....
    //类型转换为局部事务管理器
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
        //TransactionManager根据TransactionAttribute创建事务后返回
        //TransactionInfo封装了当前事务的信息--包括TransactionStatus
        TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

        Object retVal;
        try {
            //继续执行过滤器链---过滤链最终会调用目标方法
            //因此可以理解为这里是调用目标方法
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            //目标方法抛出异常则进行判断是否需要回滚
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            //清除当前事务信息
            cleanupTransactionInfo(txInfo);
        }
        ...
        //正常返回,那么就正常提交事务呗(当然还是需要判断TransactionStatus状态先)
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
        ...

  1. 创建TransactionAdvisor对象:将事务通知和切点(Pointcut)组合成Advisor。

  2. 创建TransactionAttributeSourceAdvisor对象:将事务属性和切点组合成Advisor

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

    @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
        TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {

        BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
        advisor.setTransactionAttributeSource(transactionAttributeSource);
        advisor.setAdvice(transactionInterceptor);
        if (this.enableTx != null) {
            advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
        }
        return advisor;
    }

    @Bean
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public TransactionAttributeSource transactionAttributeSource() {
        // TransactionAttributeSource 是一个接口,具体注入的是 Annotationxxxx
        return new AnnotationTransactionAttributeSource();
    }

    @Bean
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
        TransactionInterceptor interceptor = new TransactionInterceptor();
        interceptor.setTransactionAttributeSource(transactionAttributeSource);
        if (this.txManager != null) {
            interceptor.setTransactionManager(this.txManager);
        }
        return interceptor;
    }
}

@Nullable
private TransactionAttributeSource transactionAttributeSource;

private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
    @Override
    @Nullable
    protected TransactionAttributeSource getTransactionAttributeSource() {
        return transactionAttributeSource;
    }
};


/**
 * Set the transaction attribute source which is used to find transaction
 * attributes. This should usually be identical to the source reference
 * set on the transaction interceptor itself.
 * @see TransactionInterceptor#setTransactionAttributeSource
 */
public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
this.transactionAttributeSource = transactionAttributeSource;
}

/**
 * Set the {@link ClassFilter} to use for this pointcut.
 * Default is {@link ClassFilter#TRUE}.
 */
public void setClassFilter(ClassFilter classFilter) {
    this.pointcut.setClassFilter(classFilter);
}

@Override
public Pointcut getPointcut() {
    return this.pointcut;
}

可以见到里面已经包含了pointcut,这就能将我们需要被增加的事务方法找出。

ProxyTransactionManagementConfiguration负责将需要包装的bean和方法找出并包装成advisor,InfrastructureAdvisorAutoProxyCreator根据advisor生成相应的代理对象。

小结:InfrastructureAdvisorAutoProxyCreator遍历容器中的bean,尝试去自动代理,匹配的工作就交由advisor中的point,如果匹配成功就为其创建代理对象,这个代理对象中放入了TransactionInterceptor拦截器

,等到相关方法调用时,调用的是代理对象的方法,然后通过责任链模式通过TransactionInterceptor处理,以此来进行事务的操作。

声明式事务的介绍先到这里,接下来我们来介绍下编程式事务。

1.2编程式事务

编程式事务的核心就是将spring为我们做好的那些步骤拆出来,交由开发者去控制事务何时开启、提交、回滚,他的运行本质和声明式事务并没有两样。

模板如下

public class TransactionMain {
    public static void main(String[] args) throws ClassNotFoundException, SQLException {
        test();
    }
 
    private static void test() {
        DataSource dataSource = getDS();
        JdbcTransactionManager jtm = new JdbcTransactionManager(dataSource);
        //JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
        //包括隔离级别和传播行为等
        DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
        //开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
        TransactionStatus ts = jtm.getTransaction(transactionDef);
        //进行业务逻辑操作
        try {
            update(dataSource);
            jtm.commit(ts);
        }catch (Exception e){
            jtm.rollback(ts);
            System.out.println("发生异常,我已回滚");
        }
    }
    
    private static void update(DataSource dataSource) throws Exception {
        JdbcTemplate jt = new JdbcTemplate();
        jt.setDataSource(dataSource);
        jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6");
        throw new Exception("我是来捣乱的");
    }
 
}

三、方案探索

1.直接使用多线程

我们在开启代码中事务,并在业务逻辑中直接使用多线程,是否能保证事务?

  @Transactional
    public void testDirect() {
        new Thread(()->{
            Per per = new Per();
            per.setName("t1");
            perService.save(per);
        }).start();
        new Thread(()->{
            Per per1 = new Per();
            per1.setName("t2");
            perService.save(per1);
            throw new RuntimeException("Exception test");
        }).start();
    }

显然,这种方式并不能保证事务,哪怕加上了事务注解,因为子线程抛出的异常并不能在主线程中捕获,也不能被其他线程感知到。

2.事务模板中使用多线程

package com.user.util;
 
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
 
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
 

   @Component
   @RequiredArgsConstructor
   public class MultiplyThreadTransactionManager {
   /**
 
    * 如果是多数据源的情况下,需要指定具体是哪一个数据源
      */
      private final DataSource dataSource;
 
  
      public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
      if(executor==null){
          throw new IllegalArgumentException("线程池不能为空");
      }
      DataSourceTransactionManager transactionManager = getTransactionManager();
      //是否发生了异常
      AtomicBoolean ex=new AtomicBoolean();
 
      List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
      List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());
 
      tasks.forEach(task->{
          taskFutureList.add(CompletableFuture.runAsync(
                  () -> {
                      try{
                          //1.开启新事务
                          transactionStatusList.add(openNewTransaction(transactionManager));
                          //2.异步任务执行
                          task.run();
                      }catch (Throwable throwable){
                          //打印异常
                          throwable.printStackTrace();
                          //其中某个异步任务执行出现了异常,进行标记
                          ex.set(Boolean.TRUE);
                          //其他任务还没执行的不需要执行了
                          taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
                      }
                  }
                  , executor)
          );
      });
 
      try {
          //阻塞直到所有任务全部执行结束---如果有任务被取消,这里会抛出异常滴,需要捕获
          CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
      } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
      }
 
      //发生了异常则进行回滚操作,否则提交
      if(ex.get()){
          System.out.println("发生异常,全部事务回滚");
          transactionStatusList.forEach(transactionManager::rollback);
      }else {
          System.out.println("全部事务正常提交");
          transactionStatusList.forEach(transactionManager::commit);
      }
      }
 
   private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
       //JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
       //包括隔离级别和传播行为等
       DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
       //开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
       return transactionManager.getTransaction(transactionDef);
   }
 
   private DataSourceTransactionManager getTransactionManager() {
       return new DataSourceTransactionManager(dataSource);
   }
   }

测试

public void test(){
    List<Runnable> tasks=new ArrayList<>();
 
    tasks.add(()->{
            Per per = new Per();
            per.setName("t1");
            perService.save(per);
    });
    
    tasks.add(()->{
            Per per = new Per();
            per.setName("t2");
            perService.save(per);
    });
    
    multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
 
}

执行结果

java.lang.IllegalStateException: No value for key [HikariDataSource (HikariPool-1)] bound to thread
	at org.springframework.transaction.support.TransactionSynchronizationManager.unbindResource(TransactionSynchronizationManager.java:198) ~[spring-tx-5.3.10.jar:5.3.10]
	at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCleanupAfterCompletion(DataSourceTransactionManager.java:371) ~[spring-jdbc-5.3.10.jar:5.3.10]
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.cleanupAfterCompletion(AbstractPlatformTransactionManager.java:992) ~[spring-tx-5.3.10.jar:5.3.10]
	at org.springframework.transaction.suppoAbstractPlatformTransactionrt.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager

结果报了这个错,这个错误信息室找不到绑定在线程上的key为HikariDataSource的资源,因为事务资源都是绑定在线程上的,当事务提交或者回滚时,他需要寻找绑定在当前线程上的资源,如果找不到,就会报错。

原理剖析:

首先我们找到绑定线程资源的关键方法org.springframework.transaction.support.TransactionSynchronizationManager#bindResource

	/**
	 * Bind the given resource for the given key to the current thread.
	 * @param key the key to bind the value to (usually the resource factory)
	 * @param value the value to bind (usually the active resource object)
	 * @throws IllegalStateException if there is already a value bound to the thread
	 * @see ResourceTransactionManager#getResourceFactory()
	 */
	public static void bindResource(Object key, Object value) throws IllegalStateException {
		Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
		Assert.notNull(value, "Value must not be null");
		Map<Object, Object> map = resources.get();
		// set ThreadLocal Map if none found
		if (map == null) {
			map = new HashMap<>();
			resources.set(map);
		}
		Object oldValue = map.put(actualKey, value);
		// Transparently suppress a ResourceHolder that was marked as void...
		if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
			oldValue = null;
		}
		if (oldValue != null) {
			throw new IllegalStateException(
					"Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread");
		}
	}

根据debug会发现,spring在开启事务时会自动为我们调用这个方法,绑定key为HikariDataSource,value为ConnectionHolder到threadlocal中。第二次sql执行时会绑定key为DefaultSqlSessionFactory,value为DefaultSqlSessionFactory。

既然讲到了事务资源的绑定时机,下面就顺便讲一下这两种资源在何时释放。我们再回顾一下事务的执行流程及机制。spring处理事务的原理就是基于aop,每个需要实现事务的方法都要通过TransactionInterceptor这个拦截器,通过这个拦截器去实现事务增强。

	@Override
	@Nullable
	public Object invoke(MethodInvocation invocation) throws Throwable {
		// Work out the target class: may be {@code null}.
		// The TransactionAttributeSource should be passed the target class
		// as well as the method, which may be from an interface.
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

		// Adapt to TransactionAspectSupport's invokeWithinTransaction...
		//重点,这里注册了一个回调,最后会调回下面
       //父类实现
        return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
			@Override
			@Nullable
			public Object proceedWithInvocation() throws Throwable {
                //原始方法
				return invocation.proceed();
			}
			@Override
			public Object getTarget() {
				return invocation.getThis();
			}
			@Override
			public Object[] getArguments() {
				return invocation.getArguments();
			}
		});
	}

他最终会交给他的父类模板类org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction实现,在来看一下这个类为我们处理了什么,直接看重点


	/**
	 * General delegate for around-advice-based subclasses, delegating to several other template
	 * methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager}
	 * as well as regular {@link PlatformTransactionManager} implementations and
	 * {@link ReactiveTransactionManager} implementations for reactive return types.
	 * @param method the Method being invoked
	 * @param targetClass the target class that we're invoking the method on
	 * @param invocation the callback to use for proceeding with the target invocation
	 * @return the return value of the method, if any
	 * @throws Throwable propagated from the target invocation
	 */
	@Nullable
	protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {

		// If the transaction attribute is null, the method is non-transactional.
        //说人话就是获取事务资源,装配事务管理器
		TransactionAttributeSource tas = getTransactionAttributeSource();
		final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
		final TransactionManager tm = determineTransactionManager(txAttr);

	

		PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

		if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
			// Standard transaction demarcation with getTransaction and commit/rollback calls.
            //事务相关信息,包括传播级别,什么异常下回滚等
			TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

			Object retVal;
			try {
				// This is an around advice: Invoke the next interceptor in the chain.
				// This will normally result in a target object being invoked.
                //就是他的子类注册的回调,真正的业务逻辑
				retVal = invocation.proceedWithInvocation();
			}
			catch (Throwable ex) {
				// target invocation exception
                //回滚
				completeTransactionAfterThrowing(txInfo, ex);
				throw ex;
			}
			finally {
                //清理事务信息
				cleanupTransactionInfo(txInfo);
			}

			if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
				// Set rollback-only in case of Vavr failure matching our rollback rules...
				TransactionStatus status = txInfo.getTransactionStatus();
				if (status != null && txAttr != null) {
					retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
				}
			}
            //提交
			commitTransactionAfterReturning(txInfo);
			return retVal;
		}

        ...
		
	}

继续往下,观察commitTransactionAfterReturning(txInfo)的实现,发现这一步是由事务管理器完成的。

	protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
		if (txInfo != null && txInfo.getTransactionStatus() != null) {
			if (logger.isTraceEnabled()) {
				logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
			}
			txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
		}
	}

进而可以推断,rollback操作也是同样的道理,有兴趣的小伙伴可以自己debug一下,继续走下去,观察事务管理器为我们做了什么。

@Override
	public final void commit(TransactionStatus status) throws TransactionException {
		if (status.isCompleted()) {
			throw new IllegalTransactionStateException(
					"Transaction is already completed - do not call commit or rollback more than once per transaction");
		}

		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
		if (defStatus.isLocalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Transactional code has requested rollback");
			}
			processRollback(defStatus, false);
			return;
		}

		if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
			}
			processRollback(defStatus, true);
			return;
		}

		processCommit(defStatus);
	}

到此,spring的整个事务流程就已经非常清晰了,我们想要实现多事务管理的方法也找到了,难就是去控制事务的资源。只要我们拿到了相应的事务资源,然后在创建自己的事务管理器控制事务何时提交或者回滚,这样我们就可以实现一个多线程同时提交回滚,类似于二阶段提交的操作,来达到多线程事务的统一。

3.多线程事务管理器

不多说,直接上代码看最终版本

package com.controller;


import lombok.Builder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.CollectionUtils;

import javax.sql.DataSource;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 多线程事务管理
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class MultiplyThreadTransactionManager {
    /**
     * 如果是多数据源的情况下,需要指定具体是哪一个数据源
     */
    private final DataSource dataSource;

    private final static ThreadLocal<Boolean> immediatelyCommitFlag = new ThreadLocal<>();

    private final static ThreadLocal<List<TransactionStatus>> transactionStatusListThreadLocal = new ThreadLocal<>();

    private final static ThreadLocal<List<TransactionResource>> transactionResourcesthreadLocal = new ThreadLocal<>();

    private final static ThreadLocal<Map<Object, Object>> mainNativeResourceThreadLocal = new ThreadLocal<>();


    /**
     * 多线程下事务执行
     *
     * @param tasks             任务列表
     * @param immediatelyCommit 是否需要立即提交
     */
    public List<CompletableFuture> runAsyncButWaitUntilAllDown(List<Runnable> tasks,  Boolean immediatelyCommit) {

        Executor executor = Executors.newCachedThreadPool();

        DataSourceTransactionManager transactionManager = getTransactionManager();
        //是否发生了异常
        AtomicBoolean ex = new AtomicBoolean();

        List<CompletableFuture> taskFutureList = new CopyOnWriteArrayList<>();
        List<TransactionStatus> transactionStatusList = new CopyOnWriteArrayList<>();
        List<TransactionResource> transactionResources = new CopyOnWriteArrayList<>();

        //记录原生主事务资源
        //这一步可能在原生sql执行前,也可能在原生sql执行后,所以这个资源可能不够充分,需要在下面继续处理
        //如果返回的是原资源集合的引用,下面一步可以不用
        Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
        if (!CollectionUtils.isEmpty(resourceMap)) {
            mainNativeResourceThreadLocal.set(new HashMap<>(resourceMap));
        }
        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
        Executor finalExecutor = executor;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        tasks.forEach(task -> {
            taskFutureList.add(CompletableFuture.runAsync(
                    () -> {
                        log.info("任务开始");
                        try {
                            //1.开启新事务
                            TransactionStatus transactionStatus = openNewTransaction(transactionManager);
                            log.info("开启新事务 successfully");
                            transactionStatusList.add(transactionStatus);
                            atomicInteger.incrementAndGet();
                            System.out.println("atomicInteger.get()"+atomicInteger.incrementAndGet());
                            System.out.println(transactionStatus);
                            //2.异步任务执行
                            task.run();
                            log.info("异步任务执行 successfully");
                            //3.继续事务资源复制,因为在sql执行是会产生新的资源对象
                            transactionResources.add(TransactionResource.copyTransactionResource());

                        } catch (Throwable throwable) {
                            log.info("任务执行异常"+throwable.getMessage());
                            log.error("任务执行异常",throwable);
                            //其中某个异步任务执行出现了异常,进行标记
                            ex.set(Boolean.TRUE);
                            //其他任务还没执行的不需要执行了
                            taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
                        }
                    }
                    , finalExecutor)
            );
        });

        try {
            //阻塞直到所有任务全部执行结束---如果有任务被取消,这里会抛出异常滴,需要捕获
            CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
        } catch (InterruptedException | ExecutionException e) {
            log.info("任务被取消");
            log.error("任务被取消",e);

        }

        //发生了异常则进行回滚操作,否则提交
        if (ex.get()) {
            log.info("发生异常,全部事务回滚");
            for (int i = 0; i < transactionStatusList.size(); i++) {
                transactionResources.get(i).autoWiredTransactionResource();
                Map<Object, Object> rollBackResourceMap = TransactionSynchronizationManager.getResourceMap();
                log.info("回滚前事务资源size{},本身{}",rollBackResourceMap.size(),rollBackResourceMap);
                transactionManager.rollback(transactionStatusList.get(i));
                transactionResources.get(i).removeTransactionResource();
            }
        } else {
            if (immediatelyCommit) {
                log.info("全部事务正常提交");
                for (int i = 0; i < transactionStatusList.size(); i++) {
                    //transactionResources.get(i).autoWiredTransactionResource();
                    Map<Object, Object> commitResourceMap = TransactionSynchronizationManager.getResourceMap();
                    log.info("提交前事务资源size{},本身{}",commitResourceMap.size(),commitResourceMap);
                    transactionManager.commit(transactionStatusList.get(i));
                    transactionResources.get(i).removeTransactionResource();
                }
            } else {
                //缓存全部待提交数据
                immediatelyCommitFlag.set(immediatelyCommit);
                transactionResourcesthreadLocal.set(transactionResources);
                transactionStatusListThreadLocal.set(transactionStatusList);
            }
        }

        //交还给主事务
        if (immediatelyCommit) {
            mainTransactionResourceBack(!ex.get());
        }

        return taskFutureList;
    }

    public void multiplyThreadTransactionCommit() {
        try {
            Boolean immediatelyCommit = immediatelyCommitFlag.get();
            if (immediatelyCommit) {
                throw new IllegalStateException("immediatelyCommit cant call multiplyThreadTransactionCommit");
            }
            //提交
            //获取存储的事务资源和状态
            List<TransactionResource> transactionResources = transactionResourcesthreadLocal.get();
            List<TransactionStatus> transactionStatusList = transactionStatusListThreadLocal.get();
            if (CollectionUtils.isEmpty(transactionResources) || CollectionUtils.isEmpty(transactionStatusList)) {
                throw new IllegalStateException("transactionResources or transactionStatusList is empty");
            }
            //重新提交
            DataSourceTransactionManager transactionManager = getTransactionManager();
            log.info("全部事务正常提交");
            for (int i = 0; i < transactionStatusList.size(); i++) {
                transactionResources.get(i).autoWiredTransactionResource();
                Map<Object, Object> commitResourceMap = TransactionSynchronizationManager.getResourceMap();
                log.info("提交前事务资源size{},本身{}",commitResourceMap.size(),commitResourceMap);
                transactionManager.commit(transactionStatusList.get(i));
                transactionResources.get(i).removeTransactionResource();
            }
        } catch (Exception e) {
            mainTransactionResourceBack(false);
            log.error("multiplyThreadTransactionCommit fail", e);
        } finally {
            transactionResourcesthreadLocal.remove();
            transactionStatusListThreadLocal.remove();
            immediatelyCommitFlag.remove();
        }

        //交还给主事务
        mainTransactionResourceBack(true);
    }

    //主线程事务资源返还
    public void mainTransactionResourceBack(Boolean subTransactionSuccess) {

        if (CollectionUtils.isEmpty(mainNativeResourceThreadLocal.get())) {
            //清除数据
            mainNativeResourceThreadLocal.remove();
            return;
        }
        Map<Object, Object> nativeResource = new HashMap<>(mainNativeResourceThreadLocal.get());
        Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
        log.info("当前线程资事务源size{}--------------------------------{}",resourceMap.size(), resourceMap);
        log.info("原生线程事务资源size{}--------------------------------{}",nativeResource.size(), nativeResource);
        //已经被绑定的资源不能重复绑定
        if (!CollectionUtils.isEmpty(resourceMap)) {
            for (Object o : resourceMap.keySet()) {
                if (nativeResource.containsKey(o)) {
                    nativeResource.remove(o);
                }
            }
        }

        nativeResource.forEach((k,v)->{
            if (!(k instanceof DataSource)){
                log.info("nativeResource 没有 DataSource");
            }
        });
        //交还不能绑定factory
        nativeResource.forEach((k,v)->{
            if (k instanceof DataSource){
                TransactionSynchronizationManager.bindResource(k,v);
            }
        });
        Map<Object, Object> finResource = TransactionSynchronizationManager.getResourceMap();
        log.info("主线程最终事务源size{}--------------------------------{}",finResource.size(), finResource);
        //防止未激活事务
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.initSynchronization();
        }
        //清除数据
        mainNativeResourceThreadLocal.remove();
        if (!subTransactionSuccess) {
            throw new RuntimeException("子事务失败,需要回滚");
        }
    }

    private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
        //JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
        //包括隔离级别和传播行为等
        DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
        //开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
        return transactionManager.getTransaction(transactionDef);
    }

    private DataSourceTransactionManager getTransactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }

    /**
     * 保存当前事务资源,用于线程间的事务资源COPY操作
     */
    @Builder
    private static class TransactionResource {
        //事务结束后默认会移除集合中的DataSource作为key关联的资源记录
        private Map<Object, Object> resources = new HashMap<>();

        //下面五个属性会在事务结束后被自动清理,无需我们手动清理
        private Set<TransactionSynchronization> synchronizations = new HashSet<>();

        private String currentTransactionName;

        private Boolean currentTransactionReadOnly;

        private Integer currentTransactionIsolationLevel;

        private Boolean actualTransactionActive;

        public static TransactionResource copyTransactionResource() {
            return TransactionResource.builder()
                    //返回的是不可变集合,这里为了更加灵活,copy出一个集合过来
                    .resources(new HashMap<>(TransactionSynchronizationManager.getResourceMap()))
                    //如果需要注册事务监听者,这里记得修改--我们这里不需要,就采用默认负责--spring事务内部默认也是这个值
                    .synchronizations(new LinkedHashSet<>())
                    .currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
                    .currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
                    .currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
                    .actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
                    .build();
        }

        //装配事务资源,为提交/回滚做储备
        public void autoWiredTransactionResource() {
            //获取当前线程事务资源
            Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
            for (Object o : resourceMap.keySet()) {
                if (resourceMap.containsKey(o)) {
                    //移除重复事务资源key,避免绑定报错
                    resources.remove(o);
                }

            }
            boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();
            //绑定事务资源,注意 绑定是绑定到当前主线程上,记得最后释放交换主线程,再由主线程收回原有事务自选
            resources.forEach(TransactionSynchronizationManager::bindResource);
            //如果需要注册事务监听者,这里记得修改--我们这里不需要,就采用默认负责--spring事务内部默认也是这个值
            //避免重复激活或者事务未激活
            if (!synchronizationActive) {
                TransactionSynchronizationManager.initSynchronization();
            }

            TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
            TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
        }

        public void removeTransactionResource() {
            Map<Object, Object> resourceMap = new HashMap<>(TransactionSynchronizationManager.getResourceMap());

            //事务结束后默认会移除集合中的DataSource作为key关联的资源记录
            //DataSource如果重复移除,unbindResource时会因为不存在此key关联的事务资源而报错
            resources.keySet().forEach(key -> {
                if (resourceMap.containsKey(key)) {
                    TransactionSynchronizationManager.unbindResource(key);
                }
            });
        }
    }
}

验证

    @Transactional
    public String test(Integer par) {
        log.info("get(" + par + ")");
        if (par == 3 || par == 5 || par == 6) {
            Per per2 = new Per();
            per2.setName("t3");
            per2.setGrou(Thread.currentThread().getName());
            perService.save(per2);
        }


        List<Runnable> list = new ArrayList<>();
        list.add(() -> {
            Per per = new Per();
            per.setName("t1");
            per.setGrou(Thread.currentThread().getName());
            log.info("任务开始save");
            perService.save(per);
            log.info("任务完成save");
            if (par == 1) {
                throw new RuntimeException();
            }
        });
        list.add(() -> {
            Per per1 = new Per();
            per1.setName("t2");
            per1.setGrou(Thread.currentThread().getName());
            log.info("任务开始save");
            perService.save(per1);
            log.info("任务完成save");
            if (par == 2) {
                throw new RuntimeException();
            }

        });

        log.info("runAsyncButWaitUntilAllDown start");
        multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(list, false);

        if (par == 4 || par == 5 || par == 6) {
            Per per3 = new Per();
            per3.setName("t4");
            per3.setGrou(Thread.currentThread().getName());
            perService.save(per3);
            if (par == 6) {
                throw new RuntimeException();
            }
        }
        log.info("multiplyThreadTransactionCommit start");
        multiplyThreadTransactionManager.multiplyThreadTransactionCommit();

        return "ss";
    }

有兴趣的小伙伴可以做进一步测试。

在本公司项目组中利用这个机制优化现有的业务,性能提升了约70%。

比起网上常见的多线程事务管理器,主要做了如下增强

1.支持在已存在事务下运行。

在很多场景下,我们可能会遇到多线程事务外还存在其他事物的场景下,我们需要支持兼容多种事务环境。

2.支持自定义提交时机。

有时候我们不希望事务立马提交,希望他能够和外围事务保持一致,这时候可以将runAsyncButWaitUntilAllDown的immediatelyCommit参数写成false,并手动调用multiplyThreadTransactionCommit方法去主动提交。

我们需要注意的地方,任何事都是有舍有得的,耗时的显著降低是因为利用了更多的资源,比如线程资源和数据库连接资源,尤其是数据库连接资源,更是额外宝贵,我们一定要合理评估我们的每一项决策是否有意义,风险和回报是否成正比。

还有一点需要注意,在极高并发的情况下,多线程事务容易造成死锁,因为当主事务开启的情况下,他要为他下面的子线程事务开启连接,当连接不够时就容易造成循环等待。一个比较好的做法是提前获得所有连接,并设置一个合理的超时时间

如果有小伙伴遇到了其他相关疑问,或者使用此代码发现了问题,欢迎留言讨论,共同进步。

文章转载自:doFix

原文链接:https://www.cnblogs.com/fix200/p/18066537

体验地址:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程设计器_表单引擎_工作流引擎_软件架构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/453466.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用DateUtil工具类偏移日期

使用DateUtil工具类偏移日期 一、依赖二、源码三、示例代码 一、依赖 <!--工具依赖--><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.16</version></dependency>二、源码 …

Python常用图片数据方法

文章目录 1. 常用图片数据类型2. 图片的显示2.1 plt.imshow()2.2 使用 turtle 来绘制图片 3.图片ndarray数据的常用切片操作使用 cv2 来读取图片打印数据R G B 通道的获取BGR 转成 RGBcv2 不支持中文路径的解决方法 4 PIL.Image 转成 QImage 或 QPixmap 1. 常用图片数据类型 使…

Android基础开发-通讯录的添加和查询

案例&#xff1a;往手机通讯录添加信息&#xff0c;输入姓名和手机号。 保存的手机的表&#xff1a;一共有两个&#xff0c;一个是主表&#xff0c;提供一个联系人id&#xff0c;另外是辅表&#xff0c;提供id对应的手机号和姓名。 普通操作&#xff1a;一个表一个表的添加 …

【黑马程序员】python函数

文章目录 函数什么是函数为什么学习函数函数定义函数的传入参数函数的返回值返回值基础None返回值 函数说明文档函数的嵌套调用定义代码示例 全局变量和局部变量全局变量global变量局部变量 函数综合案例 函数 什么是函数 组织好的&#xff0c;可重复使用的、用来实现特定功能…

【每日八股】Java基础经典面试题2

前言&#xff1a;哈喽大家好&#xff0c;我是黑洞晓威&#xff0c;25届毕业生&#xff0c;正在为即将到来的秋招做准备。本篇将记录学习过程中经常出现的知识点以及自己学习薄弱的地方进行总结&#x1f970;。 本篇文章记录的Java基础面试题&#xff0c;适合在学Java基础的小白…

设计模式系列之-策略模式(优化过多代码if…else)

首先解释下什么策略模式 如下图&#xff1a; 简而言之&#xff1a;算法的使用与算法的实现分离开来 想象有一个开关按钮&#xff0c;每次按下去都可以切换不同的灯光模式&#xff08;例如&#xff1a;强光、柔光、闪烁&#xff09;&#xff0c;这里的每种灯光模式就是一个策略…

程序人生——Java中基本类型使用建议

目录 引出Java中基本类型使用建议建议21&#xff1a;用偶判断&#xff0c;不用奇判断建议22&#xff1a;用整数类型处理货币建议23&#xff1a;不要让类型默默转换建议24&#xff1a;边界、边界、还是边界建议25&#xff1a;不要让四舍五入亏了一方 建议26&#xff1a;提防包装…

Unity类银河恶魔城学习记录8-5 p81 Blackhole duration源代码

Alex教程每一P的教程原代码加上我自己的理解初步理解写的注释&#xff0c;可供学习Alex教程的人参考 此代码仅为较上一P有所改变的代码、 【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili Blackhole_Skill_Controller.cs using System.Collections; using Syste…

UL1642标准_锂聚合物电池亚马逊测试报告

UL1642标准_锂聚合物电池亚马逊测试报告 什么是锂聚合物电池UL1642标准&#xff1f; UL1642 认证要求涵盖旨在用于技术人员可更换或用户可更换应用的锂离子电池。UL1642 认证要求是为了避免锂离子电池在产品中工作时发生火灾或爆炸的风险。 锂聚合物电池 UL是Underwriters L…

Devin:首位人工智能软件工程师的介绍

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

2024春招看了上百份程序员简历,这个工具写的简历最好!(附模板)

你们在制作简历时&#xff0c;是不是基本只关注两件事&#xff1a;简历模板&#xff0c;还有基本信息的填写。 当你再次坐下来更新你的简历时&#xff0c;可能会发现自己不自觉地选择了那个“看起来最好看的模板”&#xff0c;填写基本信息&#xff0c;却没有深入思考如何使简历…

DataWhale公开课笔记2:Diffusion Model和Transformer Diffusion

Stable Diffusion和AIGC AIGC是什么 AIGC的全称叫做AI generated content&#xff0c;AlGC (Al-Generated Content&#xff0c;人工智能生产内容)&#xff0c;是利用AI自动生产内容的生产方式。 在传统的内容创作领域中&#xff0c;专业生成内容&#xff08;PGC&#xff09;…

Python数值方法在工程和科学问题解决中的应用

&#x1f482; 个人网站:【 海拥】【神级代码资源网站】【办公神器】&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交流的小伙伴&#xff0c;请点击【全栈技术交流群】 随着计算机技术的不断发展&#xff0c;Python作…

UI设计中的图标的分类,功能性图标

图标的分类 既然知道了图标的作用和重要性&#xff0c;那么接下来&#xff0c;就要进一步了解在工作中我们要设计哪些图标。图标可以划分成三种大类:功能性图标、装饰性图标、启动图标。 功能性图标 功能图标是具有指代意义且具有功能标识的图形&#xff0c;它不仅是一种图形&a…

代码随想录算法训练营第day41|背包理论基础、416. 分割等和子集

目录 a.背包理论基础——01背包 1.二维数组的01背包表示 2.一维滚动数组表示 b. 416. 分割等和子集 - 力扣&#xff08;LeetCode&#xff09; a.背包理论基础——01背包 背包问题分类&#xff1a; 对于面试的话&#xff0c;其实掌握01背包&#xff0c;和完全背包&#xff…

Sharding sphere分库分表

需要物理自己实现分表分库&#xff0c;然后通过配置文件配置。 配置文件&#xff1a; 需要配置多个数据源&#xff0c;主从表的关系【默认主表修改&#xff0c;从表读取】&#xff0c;定义分库的策略【比如User id】和分表【表Id】的策略 分库和分表策略&#xff1a;分库策略…

浅谈LockBit勒索病毒

在数字时代&#xff0c;随着科技的飞速发展&#xff0c;网络安全问题愈发凸显。恶意软件和勒索软件等网络威胁正不断演变&#xff0c;其中一款备受关注的勒索软件就是LockBit。 LockBit是一种高度复杂且具有破坏性的勒索软件。与传统的勒索软件相比&#xff0c;LockBit在其攻击…

外包干了5天,技术明显退步。。。。。

先说一下自己的情况&#xff0c;本科生&#xff0c;19年通过校招进入南京某软件公司&#xff0c;干了接近2年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了2年的功能测试&…

国产AI视频技术迎来新成员Etna,4K 60帧 15秒超高清视频

国内AI视频技术领域传来好消息&#xff0c;一款名为Etna的AI视频生成工具引起了业界的广泛关注。这款由七火山公司开发的技术&#xff0c;号称能够实现15秒4K 60帧的超高清视频生成&#xff0c;让人眼前一亮&#xff01; &#x1f680; 国产技术的崛起 Etna的问世&#xff0c;…

【软考高项】【论文专题】- 5 - 论文写作思路梳理

目录 一、软考论文考什么&#xff1f; 二、我在项目中做什么&#xff1f; 三、项目做什么&#xff1f; 四、 项目小白常见雷区 五、如何写的不像是模版&#xff1f; 一、软考论文考什么&#xff1f; 《考试大纲》指出&#xff1a;根据试卷上给出的论文题目&#xff0c;选择…