目录:
1.讲述事务的一些基础概念。
2.讲述事务的生命周期源码
3.配置事务,以及事务注解的源码
1.前言
具体事务中Spring是怎么管理事务,怎么去管理、创建、销毁等操作的呢?这一次来分解一下。
2.事务概述(复习)
2.1 什么是事务
事务是逻辑上的一组操作,要么都执行,要么都不执行。
2.2 事务的 ACID 特性
- 原子性(Atomicity):事务是最小的执行单位,不允许分割。事务的原子性确保动作要么全部完成,要么完全不起作用。
- 一致性(Consistency):执行事务前后,数据保持一致,例如转账业务中,无论事务是否成功,转账者和收款人的总额应该是不变的。
- 隔离性(Isolation):并发访问数据库时,一个用户的事务不被其他事务所干扰,各并发事务之间数据库是独立的。
- 持久性(Durability):一个事务被提交之后。它对数据库中数据的改变是持久的。
2.3 Spring 支持事务的两种方式
- 编程式事务:在代码中硬编码(不推荐使用) : 通过 TransactionTemplate 的 execute 或者 TransactionManager 的 commit 和 rollback 手动管理事务,实际应用中很少使用,但是对于你理解 Spring 事务管理原理有帮助。
- 声明式事务:在 XML 配置文件中配置或者直接基于注解(推荐使用) : 实际是通过 AOP 实现(基于@Transactional 的全注解方式使用最多)
2.4 事务属性介绍
事务属性包含了 5 个方面
- 隔离级别
- 传播行为
- 回滚规则
- 是否只读
- 事务超时
2.4.1 隔离级别
隔离级别 | 脏读 | 不可重复读 | 幻读 |
---|---|---|---|
Read Uncommitted | √ | √ | √ |
Read Committed | × | √ | √ |
Repeatable Read | × | × | √ |
Serializable | × | × | × |
Read Uncommitted
(读取未提交内容):一个事务可以看到其他事务已执行但是未提交的结果。本隔离级别很少用于实际应用,因为它的性能也不比其他级别好多少,并且存在脏读问题。Read Committed
(读取已提交内容):一个事务只能看到其他事务已执行并已提交的结果(Oracle、SQL Server PostgreSql
默认隔离级别)。这种隔离级别支持不可重复读,因为同一事务的其他实例在该实例处理期间可能会有新的commit
,所以同一select
可能返回不同结果。Repeatable Read
(可重读):同一事务的多个实例在并发读取数据时,会看到同样的数据行(MySQL
的默认事务隔离级别)。InnoDB
和Falcon
存储引擎通过多版本并发控制(MVCC
)机制解决了不可重复读问题,存在幻读问题。Serializable
(可串行化):最高的隔离级别,它通过强制事务排序,使之不可能相互冲突,从而解决幻读问题。它是在每个读的数据行上加上共享锁。在这个级别,可能导致大量的超时现象和锁竞争。
TransactionDefinition是Spring默认事务类,事务会涉及到事物移植、隔离级别、是否只读、事务有效时限等。这些属性就是通过下面的接口定义的。
org.springframework.transaction.TransactionDefinition
public interface TransactionDefinition {
//事物移植策略(传播级别)7种
int PROPAGATION_REQUIRED = 0;// 表示当前方法必须运行在事务中。如果调用其方法处于事务中,那么改方法直接使用该事物,否则启动新的事物。
int PROPAGATION_SUPPORTS = 1;
int PROPAGATION_MANDATORY = 2;
int PROPAGATION_REQUIRES_NEW = 3;
int PROPAGATION_NOT_SUPPORTED = 4;
int PROPAGATION_NEVER = 5;
int PROPAGATION_NESTED = 6;
//隔离级别
int ISOLATION_DEFAULT = -1;// 直接使用底层持久化框架使用的隔离级别
int ISOLATION_READ_UNCOMMITTED = 1;
int ISOLATION_READ_COMMITTED = 2;
int ISOLATION_REPEATABLE_READ = 4;
int ISOLATION_SERIALIZABLE = 8;
int TIMEOUT_DEFAULT = -1;
// 返回事务的传播行为,默认值为 REQUIRED。
default int getPropagationBehavior() {
return PROPAGATION_REQUIRED;
}
//返回事务的隔离级别,默认值是 ISOLATION_DEFAULT
default int getIsolationLevel() {
return ISOLATION_DEFAULT;
}
// 返回事务的超时时间,默认值为-1。如果超过该时间限制但事务还没有完成,则自动回滚事务。
int getTimeout();
// 返回是否为只读事务,默认值为 false
boolean isReadOnly();
@Nullable
String getName();
}
2.4.2 传播行为:
传播级别 | 含义 |
---|---|
PROPAGATION_REQUIRED | 支持当前事务,如果当前没有事务,则新建一个事务 |
PROPAGATION_SUPPORTS | 支持当前事务,如果当前没有事务,则以非事务进行 |
PROPAGATION_MANDATORY | 支持当前事务,如果当前没有事务,则抛异常 |
PROPAGATION_REQUIRES_NEW | 新建事务,如果当前存在事务,则把当前事务挂起 |
PROPAGATION_NESTED | 如果当前存在事务,则在嵌套事务内执行。如果没有,则进行与PROPAGATION_REQUIRED类似操作 |
PROPAGATION_NOT_SUPPORTED | 以非事务进行,如果当前存在事务,则挂起事务,执行当前逻辑,结束后恢复上下文的事务 |
PROPAGATION_NEVER | 以非事务进行,如果当前存在事务,则抛异常 |
2.4.3 事务回滚规则
默认情况下,只有遇到运行时异常 RuntimeException 的子类才会回滚,Error 也会导致事务回滚。
可以指定要回滚的异常类型
@Transactional(rollbackFor= BusinessException.class)
介绍一下 @Transactional 的作用范围
- 方法(只对 Public 方法生效)
- 类(整个类的 Public 方法都生效)
- 接口(不推荐)
2.4.4 是否只读
-
设置事务是否为只读可以提高性能,因为只读事务不需要写入操作的事务支持。
-
可以通过
@Transactional
注解的readOnly
属性设置,表明事务是否为只读事务。
2.4.5 事务超时
timeout 指定事务的超时时间,默认为-1(永远不会超时回滚),指定后超过时间会自动回滚。
注意事项:同类方法自调用会让 AOP 的代理模式失效。因此事务会失效。
@Transactional(
isolation = Isolation.READ_COMMITTED,
propagation = Propagation.REQUIRED,
timeout = 30,
rollbackFor = {SQLException.class},
readOnly = true
)
public void transactionalMethod() {
// 事务处理逻辑
}
3.声明式事务实现源码分析
3.1PlatformTransactionManager 源码分解
PlatformTransactionManager事务管理器
他继承了 TransactionManager顶层接口,Spring 事务抽象的关键是事务策略的概念。它负责管理事务的创建、提交和回滚等操作。事务管理是通过PlatformTransactionManager和TransactionManager接口来实现的。
我们现在使用的是 spring-boot-starter-jdbc 依赖,框架会默认注入 DataSourceTransactionManager实例
事务策略由接口定义 org.springframework.transaction.PlatformTransactionManager
public interface PlatformTransactionManager extends TransactionManager {
//用于获取一个新的事务或者加入一个现有的事务。该方法接受一个TransactionDefinition对象,用于指定事务的属性,如隔离级别、传播行为等。
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
//用于提交一个事务。该方法接受一个TransactionStatus对象,用于表示需要提交的事务。
void commit(TransactionStatus status) throws TransactionException;
//用于回滚一个事务。该方法接受一个TransactionStatus对象,用于表示需要回滚的事务。
void rollback(TransactionStatus status) throws TransactionException;
}
-
PlatformTransactionManager: 事务管理器
-
TransactionDefinition: 事务的一些基础信息,如超时时间、隔离级别、传播属性等
-
TransactionStatus : 事务的一些状态信息,如是否是一个新的事务、是否已被标记为回滚
-
TransactionInfo :整个事务信息保存类
-
DataSourceTransactionObject:获取 DataSourceTransactionObject,它是DataSource事务对象,表示ConnectionHolder。
上面源码有一个interface类 TransactionStatus
接口为事务代码提供了一种简单的方法来控制事务执行和查询事务状态。
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
boolean hasSavepoint(); //是否存在保存点
@Override
void flush();
boolean isNewTransaction();//是否是新创建的事务
void setRollbackOnly();//控制事务执行的行为。
boolean isRollbackOnly(); // 是否只回滚
boolean isCompleted();//事务是否为已完成,即已提交或已回滚。
//org.springframework.transaction.SavepointManager
Object createSavepoint() throws TransactionException;//创建一个新的保存点。
void rollbackToSavepoint(Object savepoint) throws TransactionException;//回滚到给定的保存点。调用此方法回滚到给定的保存点之后,不会自动释放保存点,可以通过调用releaseSavepoint方法释放保存点。
void releaseSavepoint(Object savepoint) throws TransactionException;//显式释放给定的保存点。(大多数事务管理器将在事务完成时自动释放保存点)
}
PlatformTransactionManager 实现类
Spring框架提供了多种PlatformTransactionManager的实现类,用于支持不同的事务管理机制,如JDBC、JPA、Hibernate等。常用的PlatformTransactionManager实现类包括:
-
DataSourceTransactionManager:用于管理基于JDBC的事务。
-
JpaTransactionManager:用于管理基于JPA的事务。
-
HibernateTransactionManager:用于管理基于Hibernate的事务。
-
JtaTransactionManager:用于管理基于JTA的分布式事务。
总的来说 事务管理其实就是:
- spring通过
PlatformTransactionManager
接口定义了事务管理器的标准。这个接口有多个实现,包括常用的DataSourceTransactionManager、JpaTransactionManager、HibernateTransactionManager
等,每个都专门用于不同的持久化技术。 - 事务管理器的主要职责是开始、提交或回滚事务。当使用声明式事务管理时,开发者只需要配置相应的事务管理器,而不必亲自编写事务管理的代码。
代理机制:
Spring
通过代理机制为事务管理提供支持。它使用AOP
来在方法调用前后添加额外的逻辑,即切面。在事务管理中,这个额外的逻辑包括开启、提交或回滚事务。- 当使用声明式事务管理时,
Spring
会动态创建一个代理对象,该代理对象包装了目标对象(拥有业务逻辑的对象)。在方法调用时,代理对象会在执行前后添加事务管理的逻辑。
这里注意:
事务的隔离级别是数据库本身的事务功能,我们只是基于对数据库的Connection,对数据操作做封装,而事务的传播属性则是Spring自己为我们提供的功能,数据库事务没有事务的传播属性这一说法。
Spring就是利用保存点功能实现了事务的嵌套功能。
1.org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
用于获取一个新的事务或者加入一个现有的事务。该方法接受一个TransactionDefinition对象,用于指定事务的属性,如隔离级别、传播行为等。
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// TransactionDefinition 存在直接返回 否则创建一个 TransactionDefinition 对象实例提供默认的事务定义数据。
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
//获取 DataSourceTransactionObject,它是DataSource事务对象,表示ConnectionHolder。由DataSourceTransactionManager 用作事务对象。
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
//如果已存在事务,根据不同的事务传播方式处理获取事务
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 判断事务是否有效
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 必须存在事务
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 事务传播方式为required或required_new或nested(嵌套),创建一个新的事务状态
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 创建新的事务状态对象
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {//其他事务传播方式,返回一个事务对象为null的事务状态对象
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
2.org.springframework.transaction.support.AbstractPlatformTransactionManager#doGetTransaction
org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction
//从当前线程中获取绑定的ConnectionHolder,可能为null,如果为null,则会在下一个开启事务的过程中,
//从dataSource中获取一个Connection,封装成ConnectionHolder,然后再绑定到当前线程,然后new 一个DataSourceTransactionObject
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
//从事务同步管理器中根据DataSource获取数据库连接资源
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
每次执行doGetTransaction方法,即会创建一个DataSourceTransactionObject对象txObject,并从事务同步管理器中根据DataSource获取数据库连接持有对象ConnectionHolder,然后存入txObject中。**事务同步管理类持有一个ThreadLocal级别的resources对象,存储DataSource和ConnectionHolder的映射关系。**因此返回的txObject中持有的ConnectionHolder可能有值,也可能为空。而不同的事务传播方式下,事务管理的处理根据txObejct中是否存在事务有不同的处理方式。
注意:
关于关注事务传播方式的实现,很多人对事务传播方式都是一知半解,只是因为没有了解源码的实现。 事务传播方式的实现分为两种情况,事务不存在和事务已经存在。isExistingTransaction方法判断事务是否存在,默认在AbstractPlatformTransactionManager抽象类中返回false,而在DataSourceTransactionManager实现中,则根据是否有数据库连接来决定。
protected boolean isExistingTransaction(Object transaction) throws TransactionException {
return false;
}
//否有数据库连接
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
3.org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
// 创建事务
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 新建事务状态
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//事务初始化
doBegin(transaction, definition);
//准备同步操作
prepareSynchronization(status, definition);
return status;
}
//开启事务模板方法,事务信息绑定到当前线程
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
// 新建事务对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
//如果ConnectionHolder是否为null,也就是判断事务对象没有数据库连接持有器
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//从连接池中获取连接,并封装到当前事务对象中,并且设置基本属性
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 把数据库连接包装成一个ConnectionHolder对象 然后设置到txObject对象中去
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 设置数据库连接与事务同步
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// 获取JDBC连接
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
// 隔离级别
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 是否只读
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
//关闭自动提交 JDBC通过设置自动提交为false,开启一个新的事务
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
// 如果设置事务只读属性,执行Statement设置只读
prepareTransactionalConnection(con, definition);
// 激活事务状态
txObject.getConnectionHolder().setTransactionActive(true);
// 设置超时时间
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
//如果是新增的ConnectionHolder,则绑定DataSource和数据库连接持有者的映射关系,这里主要就是
// 绑定数据源和连接到同步管理器上,把数据源作为key,数据库连接作为value 设置到线程变量中
if (txObject.isNewConnectionHolder()) {
// 绑定连接到当前线程 ThreadLocalMap ****
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
// 释放数据库连接
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
4.org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction
// 为现有事务创建 TransactionStatus
// 回顾: 上面讲过:TransactionStatus`接口为事务代码提供了一种简单的方法来 “控制事务执行和查询事务状态”
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//以非事务进行,如果当前存在事务,则抛异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//以非事务进行,如果当前存在事务,则挂起事务,执行当前逻辑,结束后恢复上下文的事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);//挂起事务
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
//新建事务,如果当前存在事务,则把当前事务挂起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
//如果当前存在事务,则在嵌套事务内执行。如果没有,则进行与 PROPAGATION_REQUIRED(支持当前事务,如果当前没有事务,则新建一个事务) 类似操作
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
// 使用保存点支持嵌套事务
if (useSavepointForNestedTransaction()) {
// 嵌套事务设置保存点,通常由JDBC3.0支持的savepoint API完成,然后将保存点记录在事务状态中。
// (保存点(savepoint)是事务过程中的一个逻辑点,我们可以把事务回退到这个点,而不必回退整个事务。)
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
//只适用于JTA事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
5.org.springframework.transaction.support.AbstractPlatformTransactionManager#suspend
事务挂起操作基本上其实就是重置了TransactionSynchronizationManager中对于事务的状态,没有改变数据库或者链接的一些信息,修改完成之后重新创建了SuspendedResourcesHolder,把原事务的信息和状态返回,然后记录在新的事物状态对象中,形成一种链式结构。
// 事务挂起操作
@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
到这里基本上获取事务就完成了,下面开始提交事务
6.org.springframework.transaction.support.AbstractPlatformTransactionManager#commit
@Override
public final void commit(TransactionStatus status) throws TransactionException {
//判断事务是否为已完成,即已提交或已回滚。
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 如果事务状态设置了回滚标识,则执行回滚
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
// 设置全局回滚标识为true,则执行回滚
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");//全局事务被标记为仅回滚
}
processRollback(defStatus, true);//commit方法并不是一定提交事务,也可能回滚
return;
}
// 提交事务
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
// 提交前提示
triggerBeforeCommit(status);
// 完成前提示
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {//存在保存点,即嵌套事务,则释放保存点
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {//如果事务是由当前事务状态开启的,即事务传播的第一层,执行事务提交
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 执行事务提交
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {//其他则不做任何操作
unexpectedRollback = status.isGlobalRollbackOnly();
}
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// 回滚完成提示
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
//未知状态完成提示
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
// 事务提交完成提示
triggerAfterCommit(status);
}
finally {
// 操作完成完成提示
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
// 完成后清理
cleanupAfterCompletion(status);
}
}
7.org.springframework.transaction.support.AbstractPlatformTransactionManager#rollback
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {//存在保存点(嵌套事务),则回滚到保存点
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();// 回滚保存点
}
else if (status.isNewTransaction()) {//如果事务是由当前事务状态开启的,则执行回滚操作
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 回滚事务
doRollback(status);
}
else {//其他情况下,如果事务状态设置了回滚标识,则设置事务对象的状态也为回滚,否则不做任何操作
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}
3.2 EnableTransactionManagement
3.2.1 案例
import lombok.Data;
import org.apache.ibatis.datasource.pooled.PooledDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* @author cf
* @description: 配置类
*/
@Configuration
@Data
@ConfigurationProperties(prefix = "spring.datasource")
@EnableTransactionManagement // 开启事务
public class MyDataSourceConfig1 {
private String url;
private String username;
private String password;
private String driver;
@Bean
public PooledDataSource dataSource(){
PooledDataSource source = new PooledDataSource();
source.setUrl(url);
source.setDriver(driver);
source.setPassword(password);
source.setUsername(username);
return source;
}
//注册事务管理器在容器中
@Bean
public PlatformTransactionManager transactionManager(){
return new DataSourceTransactionManager(dataSource());
}
}
@EnableTransactionManagement
EnableTransactionManagement注解是开启事务
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({TransactionManagementConfigurationSelector.class})
public @interface EnableTransactionManagement {
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Integer.MAX_VALUE;
}
TransactionManagementConfigurationSelector源码分解
org.springframework.transaction.annotation.TransactionManagementConfigurationSelector
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY://这个adviceMode就是我们在使用@EnableTransactionManagement注解时内部的model()方法的返回值
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}
private String determineTransactionAspectClass() {
return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
}
EnableTransactionManagement会利用 TransactionManagementConfigurationSelector给容器中会导入两个组件Bean AutoProxyRegistrar、 ProxyTransactionManagementConfiguration
1.AutoProxyRegistrar
AutoProxyRegistrar 给容器中注册一个 InfrastructureAdvisorAutoProxyCreator 组件,利用后置处理器机制在对象创建以后,包装对象,返回一个代理对象(增强器),代理对象执行方法利用拦截器链进行调用.
它会检测导入者类上的某个注解是否带有属性mode
和proxyTargetClass
,如果检测到这些属性,在mode
为PROXY
时,它会向容器注册一个自动代理创建器auto proxy creator
。
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
boolean candidateFound = false;
Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
Iterator var5 = annTypes.iterator();
//遍历导入者类上使用的所有注解,找到第一个带有属性 mode/proxyTargetClass属性的注解
while(var5.hasNext()) {
String annType = (String)var5.next();
AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
//找到第一个带有属性mode/proxyTargetClass属性的注解
if (candidate != null) {
Object mode = candidate.get("mode");
Object proxyTargetClass = candidate.get("proxyTargetClass");
if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() && Boolean.class == proxyTargetClass.getClass()) {
candidateFound = true;
//在mode属性值为PROXY时
if (mode == AdviceMode.PROXY) {
//调用registerAutoProxyCreatorIfNecessary向容器注册一个自动代理创建器auto proxy creator,向Spring容器中添加这个bean:InfrastructureAdvisorAutoProxyCreator
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
if ((Boolean)proxyTargetClass) {
AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
return;
}
}
}
}
}
if (!candidateFound && this.logger.isInfoEnabled()) {
String name = this.getClass().getSimpleName();
}
}
registerAutoProxyCreatorIfNecessary
@Nullable
public static BeanDefinition registerAutoProxyCreatorIfNecessary(
BeanDefinitionRegistry registry, @Nullable Object source) {
return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}
这里注册了一个InfrastructureAdvisorAutoProxyCreator的Bean,这个类是AbstractAdvisorAutoProxyCreator的子类,实现了SmartInstantiationAwareBeanPostProcessor接口,标识这是一个后置处理器
InfrastructureAdvisorAutoProxyCreator 的作用是什么呢?
先看继承关系:
整个Spring AOP寻找切面、通知的过程就是此方法InstantiationAwareBeanPostProcessor#postProcessBeforeInstantiation的功劳,而生成AOP代理对象就是BeanPostProcessor#postProcessAfterInitialization的功劳。而这些寻找切面、生成代理对象的功能其实是抽象父类AbstractAutoProxyCreator的功能。因此,我们的InfrastructureAdvisorAutoProxyCreator具备了寻找切面、通知以及生成代理对象的功能了。
2.ProxyTransactionManagementConfiguration
ProxyTransactionManagementConfiguration 给容器中注册事务增强器
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
// 事务增强器
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
//内部维护了事务相关的属性源,想事务增强器中注入 属性解析器
advisor.setTransactionAttributeSource(transactionAttributeSource);
//注入执行事务时的拦截器,后续会依赖这个拦截器来开启、提交/回滚事务,当调用拥有事务的方法时,最终会调用到此拦截器内部的invoke方法
advisor.setAdvice(transactionInterceptor);
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
BeanFactoryTransactionAttributeSourceAdvisor作用
先看类继承图:
可以看到它属于Advisor类型。在这里够构建了一个Advisor类型的Bean。
在@EnableTransactionManagement注解中已经向spring容器中导入了一个Advisor了。因此,对于在寻找事务切面的过程而言,事务的性能更好一点。因为,它省去了遍历所有bean的过程,在使用@EnableTransactionManagement注解时已经自动为我们导入了一个类型为Advisor的bean。
AnnotationTransactionAttributeSource
事务增强器要用事务注解的信息,AnnotationTransactionAttributeSource解析事务注解。
该事务属性源是一个全局的属性源,记录着Spring容器中所有的事务方法的事务信息,供事务管理器TransactionManager使用。
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
static {
ClassLoader classLoader = AnnotationTransactionAttributeSource.class.getClassLoader();
//如果javax.transaction.Transactional类存在或者可以被加载,则表示要支持该类型的事务
jta12Present = ClassUtils.isPresent("javax.transaction.Transactional", classLoader);
//如果javax.ejb.TransactionAttribute类存在或者可以被加载,则表示要支持该类型的事务
ejb3Present = ClassUtils.isPresent("javax.ejb.TransactionAttribute", classLoader);
}
public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
//设置注解解析器
this.publicMethodsOnly = publicMethodsOnly;
if (jta12Present || ejb3Present) {
this.annotationParsers = new LinkedHashSet<>(4);
//添加Spring的事务org.springframework.transaction.annotation.Transactional注解解析器
this.annotationParsers.add(new SpringTransactionAnnotationParser());
if (jta12Present) {//支持javax.transaction.Transactional注解的解析器
this.annotationParsers.add(new JtaTransactionAnnotationParser());
}
if (ejb3Present) {//支持javax.ejb.TransactionAttribute注解的解析器
this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
}
}
else {
//添加Spring的事务org.springframework.transaction.annotation.Transactional注解解析器
this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());
}
}
这个是我们关心的SpringTransactionAnnotationParser
事务拦截器TransactionInterceptor保存了事务属性信息,事务管理器,并且实现了 MethodInterceptor,在目标方法执行的时候执行拦截器链(事务拦截器)
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource);
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
TransactionInterceptor#TransactionAspectSupport
在事务的调用链路中,就是在ProxyTransactionManagementConfiguration内部定义的transactionInterceptor,在此链路中会根据当前方法的事务隔离机制来做一些额外的处理,不过主线流程就是开启事务、提交/回滚事务。
(所有调用拥有事务特性的方法都会走到这里)
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 在这里去调用:TransactionAspectSupport的invokeWithinTransaction 进行添加事务支持
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}
org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction
具体执行事务的方法
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 获取事务属性源对象(属性解析器 org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration#transactionInterceptor 注入的)
TransactionAttributeSource tas = getTransactionAttributeSource();
// 通过事务属性源对象获取到当前方法的事务属性信息
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取配置的事务管理器对象(TransactionManager 事务管理器)
final TransactionManager tm = determineTransactionManager(txAttr);
// 响应式事务处理 目前暂不涉及
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
boolean hasSuspendingFlowReturnType = isSuspendingFunction &&
COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) {
throw new IllegalStateException("Coroutines invocation not supported: " + method);
}
CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null);
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
Class<?> reactiveType =
(isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType());
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType);
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
InvocationCallback callback = invocation;
if (corInv != null) {
callback = () -> CoroutinesUtils.invokeSuspendingFunction(method, corInv.getTarget(), corInv.getArguments());
}
Object result = txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, (ReactiveTransactionManager) tm);
if (corInv != null) {
Publisher<?> pr = (Publisher<?>) result;
return (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow(pr) :
KotlinDelegate.awaitSingleOrNull(pr, corInv.getContinuation()));
}
return result;
}// 响应式事务处理 目前暂不涉及
// TransactionManager 转换为 PlatformTransactionManager
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// 获取连接点的唯一标识 类名+方法名
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 声明式事务处理
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务信息TransactionInfo
TransactionInfo txInfo =createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 执行被增强方法,调用具体的处理逻辑 提交回滚等操作 ***
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 异常回滚 会获取事务管理器,执行回滚操作 ***
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//清除事务信息,恢复线程私有的老的事务信息
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
//执行commit 会获取事务管理器,执行事务提交操作 ***
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
.......
}
}
总的来说:
Spring 是通过 TransactionInterceptor(事务拦截器) 来自动实现事务拦截的。
TransactionInterceptor 会调用父类的 TransactionAspectSupport#invokeWithinTransaction() 方法来执行事务方法
上面提到的InfrastructureAdvisorAutoProxyCreator后置处理器,它会在代理对象执行目标方法的时候获取拦截器链,而这个拦截器链就是TransactionInterceptor,这就把它们联系起来了
构造方法传入的PlatformTransactionManager(事务管理器)、TransactionAttributeSource(属性解析器),但是在调用其实没有调用此构造方法,而是调用的无参方法,然后在调用set方法注入这两个属性
/**
* 每个被 @Transactional 修饰的方法都会走一遍 transaction interceptor,然后新增一个事务节点。
* 每个事务节点执行前都会判断是否需要新建事务、开启事务。
**/
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
DataSourceTransactionManager源码分解
DataSourceTransactionManager:事务管理器,底层事务处理实现都是通过它来实现的
项目启动的时候会初始化我们自定义的 DataSourceTransactionManager
在这里就把我们的DataSource 传进来
创建一个新的DataSourceTransactionManager实例。必须设置DataSource才能使用它。
public DataSourceTransactionManager() {
setNestedTransactionAllowed(true);
}
该方法的作用是设置数据源。
public void setDataSource(@Nullable DataSource dataSource) {
//首先判断传入的dataSource是否为TransactionAwareDataSourceProxy的实例。如果是,说明需要对目标数据源执行事务操作。
if (dataSource instanceof TransactionAwareDataSourceProxy) {
//通过调用getTargetDataSource()获取目标数据源,并将其赋值给当前对象的dataSource属性。
this.dataSource = ((TransactionAwareDataSourceProxy) dataSource).getTargetDataSource();
}
else {
//如果传入的dataSource不是TransactionAwareDataSourceProxy的实例,则直接将其赋值给当前对象的dataSource属性。
this.dataSource = dataSource;
}
}
@EnableTransactionManagement 注解总结来说:
1.通过@import引入了TransactionManagementConfigurationSelector类,它的selectImports方法导入了另外两个类:AutoProxyRegistrar和ProxyTransactionManagementConfiguration
2.AutoProxyRegistrar类分析
方法registerBeanDefinitions中,引入了其他类,通过AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry)引入InfrastructureAdvisorAutoProxyCreator,它继承了AbstractAutoProxyCreator,是一个后置处理器类
3.ProxyTransactionManagementConfiguration 是一个添加了@Configuration注解的配置类(注册bean)
注册事务增强器(注入属性解析器、事务拦截器)
属性解析器:AnnotationTransactionAttributeSource,内部持有了一个解析器集合Set annotationParsers具体使用的是SpringTransactionAnnotationParser解析器,用来解析@Transactional的事务属性
事务拦截器:TransactionInterceptor实现了MethodInterceptor接口,该拦截会在产生代理对象之前和aop增强合并,最终一起影响到代理对象,TransactionInterceptor的invoke方法中invokeWithinTransaction会触发原有业务逻辑调用(增强事务)
@Transactional 源码
@Transactional 的方式来自动管理事务,底层是通过 Spring AOP 来实现的。
Spring 是通过 TransactionInterceptor
来自动实现事务拦截的。
TransactionInterceptor 会调用父类的 TransactionAspectSupport#invokeWithinTransaction()
方法来执行事务方法。
org.springframework.transaction.interceptor.TransactionInterceptor
org.springframework.transaction.interceptor.TransactionAspectSupport
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
......
}
4 扩展
4.1 org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource#computeTransactionAttribute
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
->
org.springframework.transaction.interceptor.CompositeTransactionAttributeSource#getTransactionAttribute
->
org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource#getTransactionAttribute
->
org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource#computeTransactionAttribute
//获取对应事务属性
@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// Don't allow non-public methods, as configured.
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
//method代表接口中的方法、specificMethod代表实现类的方法
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
//1.查看方法中是否存在事务
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
//如果存在接口,则在接口中查找
if (specificMethod != method) {
//3.查找接口方法
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
//4.到接口类中寻找
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
从上面的代码可以看出来如果一个方法上用了@Transactional,类上和接口上也用了,以方法上的为主,其次才是类,最后才到接口。findTransactionAttribute方法
4.2 org.springframework.jdbc.datasource.DataSourceTransactionManager.DataSourceTransactionObject
调用doGetTransaction创建事务对象DataSourceTransactionObject。它继承JdbcTransactionObjectSupport实现的接口是SavepointManager,为当前事务提供创建保存点、回退到保存点、释放保存点继续执行的支持,而具体的创建保存点这些操作会交给ConnectionHolder完成的。
private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {
private boolean newConnectionHolder;// 持有的ConnectionHolder是否是新创建的
private boolean mustRestoreAutoCommit; // 事务结束时是否需要重置连接为自动提交
public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) {
super.setConnectionHolder(connectionHolder);
this.newConnectionHolder = newConnectionHolder;
}
.....
}
---------------------------------------------------------
public abstract class JdbcTransactionObjectSupport implements SavepointManager, SmartTransactionObject {
private static final Log logger = LogFactory.getLog(JdbcTransactionObjectSupport.class);
@Nullable
private ConnectionHolder connectionHolder;// 持有数据库连接
@Nullable
private Integer previousIsolationLevel;//之前的事务隔离级别,用于当事务退出时,还原Connection的事务隔离级别
private boolean readOnly = false;
private boolean savepointAllowed = false;//是否允许使用保存点
.........
}
---------------------------------------------------------
public class ConnectionHolder extends ResourceHolderSupport {
/**
* Prefix for savepoint names.
*/
public static final String SAVEPOINT_NAME_PREFIX = "SAVEPOINT_";
@Nullable
private ConnectionHandle connectionHandle;
@Nullable
private Connection currentConnection; // 当前数据库连接
private boolean transactionActive = false; // 当前事务状态
@Nullable
private Boolean savepointsSupported; // 是否支持保存点
private int savepointCounter = 0; // 当前连接的事务的保存点总数
.........
}