原理
TM两阶段:
阶段1:TM向TC申请全局事务,netty客户端发起了一次记录xid的请求
阶段2:TC协调之后,决定执行RM是否提交或者回滚。
spring公共组件部分
1、SeataAutoConfiguration类
利用springboot自动装配机制从spring.factories文件加载自动配置类SeataAutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=io.seata.spring.boot.autoconfigure.SeataAutoConfiguration
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@AutoConfigureAfter({SeataCoreAutoConfiguration.class})
public class SeataAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
@Bean(BEAN_NAME_FAILURE_HANDLER)
@ConditionalOnMissingBean(FailureHandler.class)
public FailureHandler failureHandler() {
return new DefaultFailureHandlerImpl();
}
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler,
ConfigurableListableBeanFactory beanFactory,
@Autowired(required = false) List<ScannerChecker> scannerCheckers) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
// set bean factory
GlobalTransactionScanner.setBeanFactory(beanFactory);
// add checkers
// '/META-INF/services/io.seata.spring.annotation.ScannerChecker'
GlobalTransactionScanner.addScannerCheckers(EnhancedServiceLoader.loadAll(ScannerChecker.class));
// spring beans
GlobalTransactionScanner.addScannerCheckers(scannerCheckers);
// add scannable packages
GlobalTransactionScanner.addScannablePackages(seataProperties.getScanPackages());
// add excludeBeanNames
GlobalTransactionScanner.addScannerExcludeBeanNames(seataProperties.getExcludesForScanning());
// create global transaction scanner
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
}
2、 GlobalTransactionScanner类
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean
初始化TM、RM的netty客户端,提供aop能力
2-1、初始化TM、RM的netty客户端
io.seata.spring.annotation.GlobalTransactionScanner#afterPropertiesSet
spring-bean初始化赋值回调,这里初始化TM、RM的netty客户端,为后续发起请求做准备。
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
return;
}
if (initialized.compareAndSet(false, true)) {
initClient();
}
}
private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//init RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
2-2、aop代理实现
io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary,递归调用适配代理,Seata重写了这个方法,TCC代理、AT代理使用的还是spring那一套代理区分方式(jdk还是cglib)
/**
* The following will be scanned, and added corresponding interceptor:
*
* TM:
* @see io.seata.spring.annotation.GlobalTransactional // TM annotation
* Corresponding interceptor:
* @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler
*
* GlobalLock:
* @see io.seata.spring.annotation.GlobalLock // GlobalLock annotation
* Corresponding interceptor:
* @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalLock(MethodInvocation, GlobalLock) // GlobalLock handler
*
* TCC mode:
* @see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface
* @see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method
* @see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser
* Corresponding interceptor:
* @see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode
*/
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// do checkers
if (!doCheckers(bean, beanName)) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
// init tcc fence clean task if enable useTccFence
TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
int pos;
for (Advisor avr : advisor) {
// Find the position based on the advisor's order, and add to advisors by pos
pos = findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
3、GlobalTransactionalInterceptor类