文章目录
- 启动seata客户端
- 1.导入依赖
- 2.自动装配
- 发送请求的核心方法
- 客户端开启事务的核心流程
- 服务端分布式事务的处理机制
启动seata客户端
1.导入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<!-- 选择SpringCloud和Alibaba版本的时候,一定要参考官网的建议,否则会有问题 -->
<spring-cloud.version>Hoxton.SR12</spring-cloud.version>
<spring-cloud-alibaba.version>2.2.9.RELEASE</spring-cloud-alibaba.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- nacos服务注册与发现 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--引入seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
客户端启动流程图
2.自动装配
自动装配的核心类
在SeataAutoConfiguration我们找到对应注入的类GlobalTransactionScanner,通过名称我们应该推算出,他应该是对应@GlobalTransaction进行扫描,然后注入到容器
我们先看下GlobalTransactionScanner类继承的类
创建的代理类
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
if (!this.doCheckers(bean, beanName)) {
return bean;
} else {
try {
synchronized(PROXYED_SET) {
//如果代理已经存在直接返回bean对象
if (PROXYED_SET.contains(beanName)) {
return bean;
} else {
this.interceptor = null;
//判断是否是TCC模式
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, this.applicationContext)) {
TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), this.applicationContext);
this.interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener("service.disableGlobalTransaction", new ConfigurationChangeListener[]{(ConfigurationChangeListener)this.interceptor});
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//判断是否是被注解@globalTransactional或者@GlobalLock代理的类
if (!this.existsAnnotation(new Class[]{serviceInterface}) && !this.existsAnnotation(interfacesIfJdk)) {
return bean;
}
//创建globalTransactionalInterceptor拦截器
if (this.globalTransactionalInterceptor == null) {
this.globalTransactionalInterceptor = new GlobalTransactionalInterceptor(this.failureHandlerHook);
ConfigurationCache.addConfigListener("service.disableGlobalTransaction", new ConfigurationChangeListener[]{(ConfigurationChangeListener)this.globalTransactionalInterceptor});
}
this.interceptor = this.globalTransactionalInterceptor;
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", new Object[]{bean.getClass().getName(), beanName, this.interceptor.getClass().getName()});
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
//获取拦截器
Advisor[] advisor = this.buildAdvisors(beanName, this.getAdvicesAndAdvisorsForBean((Class)null, (String)null, (TargetSource)null));
Advisor[] var8 = advisor;
int var9 = advisor.length;
for(int var10 = 0; var10 < var9; ++var10) {
Advisor avr = var8[var10];
int pos = this.findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
}
} catch (Exception var14) {
throw new RuntimeException(var14);
}
}
}
事务管理器TM,资源管理器RM初始化实在实例化之后进行这个是在GlobalTransactionScanner
继承的InitializingBean的afterPropertiesSet方法中实现InitializingBean接口的使用
实现InitializingBean接口的bean,在Spring容器初始化并设置所有bean属性后,会调用其afterPropertiesSet()方法。这通常用于在bean的属性全部设置完毕后需要进行的一些自定义初始化工作,例如验证属性或建立资源连接。
public void afterPropertiesSet() {
//判断是否开启事务
if (this.disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
//添加监听器
ConfigurationCache.addConfigListener("service.disableGlobalTransaction", new ConfigurationChangeListener[]{this});
} else {
//CAS将初始化置换成True
if (this.initialized.compareAndSet(false, true)) {
//初始化客户端
this.initClient();
}
}
}
初始化客户端的代码
private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if ("my_test_tx_group".equals(this.txServiceGroup)) {
LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, please change your default configuration as soon as possible and we don't recommend you to use default tx-service-group's value provided by seata", "my_test_tx_group", "default_tx_group");
}
if (!StringUtils.isNullOrEmpty(this.applicationId) && !StringUtils.isNullOrEmpty(this.txServiceGroup)) {
//初始化TM客户端
TMClient.init(this.applicationId, this.txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", this.applicationId, this.txServiceGroup);
}
//初始化RM客户端
RMClient.init(this.applicationId, this.txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", this.applicationId, this.txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
this.registerSpringShutdownHook();
} else {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", this.applicationId, this.txServiceGroup));
}
}
TM和RM底层都是用的Netty进行的通讯
TM的初始化
public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
tmNettyRemotingClient.init();
}
RM的初始化
public static void init(String applicationId, String transactionServiceGroup) {
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
rmNettyRemotingClient.init();
}
发送请求的核心方法
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//利用AOP获取代理类
Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
//获取代理的方法
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//判断此方法是否被@GlobalTranscation修饰
GlobalTransactional globalTransactionalAnnotation = (GlobalTransactional)this.getAnnotation(method, targetClass, GlobalTransactional.class);
//判断此方法是否被注解@GlobalLock修饰
GlobalLock globalLockAnnotation = (GlobalLock)this.getAnnotation(method, targetClass, GlobalLock.class);
boolean localDisable = this.disable || degradeCheck && degradeNum >= degradeCheckAllowTimes;
if (!localDisable) {
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.rollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes());
} else {
transactional = this.aspectTransactional;
}
//被@GlobalTranstional修饰的方法进入此方法
return this.handleGlobalTransaction(methodInvocation, transactional);
}
//被@GlobalLock修饰的方法进入此类
if (globalLockAnnotation != null) {
return this.handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}
事务的核心方法TransactionalTemplate的excute方法
public Object execute(TransactionalExecutor business) throws Throwable {
//获取事务信息
TransactionInfo txInfo = business.getTransactionInfo();
//如果为空直接抛异常
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
} else {
//创建或者获取全局事务标志xid
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
//获取事务的传播机制
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
Object var6;
switch(propagation) {
case NOT_SUPPORTED:
if (this.existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
var6 = business.execute();
return var6;
case REQUIRES_NEW:
if (this.existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
break;
case SUPPORTS:
if (this.notExistingTransaction(tx)) {
var6 = business.execute();
return var6;
}
case REQUIRED:
break;
case NEVER:
if (this.existingTransaction(tx)) {
throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));
}
var6 = business.execute();
return var6;
case MANDATORY:
if (this.notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
//如果事务的xid为空,则重新创建一个新的
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
//获取全局锁配置
GlobalLockConfig previousConfig = this.replaceGlobalLockConfig(txInfo);
try {
//开启全局事务
this.beginTransaction(txInfo, tx);
Object rs;
Object ex;
try {
rs = business.execute();
} catch (Throwable var17) {
ex = var17;
this.completeTransactionAfterThrowing(txInfo, tx, var17);
throw var17;
}
this.commitTransaction(tx);
ex = rs;
return ex;
} finally {
this.resumeGlobalLockConfig(previousConfig);
this.triggerAfterCompletion();
this.cleanUp();
}
} finally {
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
}
客户端开启事务的核心流程
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws ExecutionException {
try {
//执行开启事务前的操作
this.triggerBeforeBegin();
//开启事务
tx.begin(txInfo.getTimeOut(), txInfo.getName());
//执行开启事务后的操作
this.triggerAfterBegin();
} catch (TransactionException var4) {
throw new ExecutionException(tx, var4, Code.BeginFailure);
}
}
public void begin(int timeout, String name) throws TransactionException {
//判断当前的角色是不是Launcher
if (this.role != GlobalTransactionRole.Launcher) {
//不是判断当前的xid是否为null
this.assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", this.xid);
}
} else {
//不是判断当前的xid是否为null
this.assertXIDNull();
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists, can't begin a new global transaction, currentXid = " + currentXid);
} else {
//开启事务,请求后台服务,返回xid
this.xid = this.transactionManager.begin((String)null, (String)null, name, timeout);
//将事务状态置为开启
this.status = GlobalStatus.Begin;
//绑定xid
RootContext.bind(this.xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", this.xid);
}
}
}
}
服务端分布式事务的处理机制
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
/**
* 调用core.begin开启事务,并持久化
* 给response设置全局事务编号XID*/
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
}
}
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
//key1:创建全局会话,这里面已经创建了全局事务Id
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
// 添加事务生命周期监听器
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//2、 开启事务,这里有事务持久话的一个机制
session.begin();
// transaction start event
// 发送事务开启事件
MetricsPublisher.postSessionDoingEvent(session, false);
// 返回全局事务ID
return session.getXid();
}
@Override
public void begin() throws TransactionException {
this.status = GlobalStatus.Begin;
this.beginTime = System.currentTimeMillis();
this.active = true;
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
// 在这里处理
lifecycleListener.onBegin(this);
}
}
private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
// 将事务信息写入数据库
if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to store global session");
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to update global session");
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to remove global session");
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to store branch session");
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to update branch session");
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
"Fail to remove branch session");
} else {
throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
"Unknown LogOperation:" + logOperation.name());
}
}
}