Seata AT模式源码解析二(Seata Client端启动流程)

文章目录

  • 初始化TM和RM
  • 数据源代理

由于我们一般都是在springboot中使用的,而与springboot集成的我们一般就先看starter的spring.factories文件,看看它的自动装配
在这里插入图片描述
在这里插入图片描述
这里面主要关注SeataAutoConfiguration和SeataDataSourceAutoConfiguration。

SeataAutoConfiguration

@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class SeataAutoConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);

    /**
     * 默认的失败处理程序
     */
    @Bean(BEAN_NAME_FAILURE_HANDLER)
    @ConditionalOnMissingBean(FailureHandler.class)
    public FailureHandler failureHandler() {
        return new DefaultFailureHandlerImpl();
    }

    /**
     * 全局事务扫描器,用来扫描 GlobalTransactional 和 GlobalLock 注解,生成代理的
     */
    @Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Automatically configure Seata");
        }
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }
}

该配置类要生效的条件是seata.enabled值为true,该值默认就是为true,所以该配置类默认是生效的。
类中创建了两个bean,一个是FailureHandler,该类是client用来处理全局事务失败的程序。另一个就是核心的bean,全局事务扫描器,用来扫描 GlobalTransactional 和 GlobalLock 注解,当在方法上标注了GlobalTransactional 和 GlobalLock 注解,就会为该类生成代理,在方法的执行前后添加上seata的事务逻辑。
GlobalTransactionScanner继承了AbstractAutoProxyCreator类,而AbstractAutoProxyCreator时aop里面一个把目标对象转换成代理对象的一个后置处理器,用来生成AOP代理的。
生成代理的逻辑就在wrapIfNecessary方法中

@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    try {
        synchronized (PROXYED_SET) {
            // 是否已代理过
            if (PROXYED_SET.contains(beanName)) {
                return bean;
            }
            // 每次生成代理对象时先置空
            interceptor = null;
            // 判断是否是 TCC 模式
            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                                     (ConfigurationChangeListener)interceptor);
            } else {
                // 获取bean的原始类
                Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                // 判断bean中是否有 GlobalTransactional 和 GlobalLock 注解
                if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {
                    return bean;
                }

                // GlobalTransactionalInterceptor 实现了 MethodInterceptor 接口,
                // 可以看做一个advisor,包含了增强逻辑
                if (globalTransactionalInterceptor == null) {
                    globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    ConfigurationCache.addConfigListener(
                        ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)globalTransactionalInterceptor);
                }
                interceptor = globalTransactionalInterceptor;
            }

            LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
            // 如果bean不是代理对象,则直接调用父类的wrapIfNecessary生成代理对象,
            // 在父类中会调用getAdvicesAndAdvisorsForBean获取到上面定义的interceptor
            if (!AopUtils.isAopProxy(bean)) {
                bean = super.wrapIfNecessary(bean, beanName, cacheKey);
            } else {
                // 如果bean已经是代理对象,比如类里也有@Transaction注解,已经生成了代理,
                // 此时无需再代理一层,只需将增强逻辑(advisor)添加到代理对象中即可

                // 获取bean中的ProxyFactory,因为ProxyFactory中含有可以应用于该bean的所有advisor集合,
                // 这里需要将seata的advisor添加到该集合中
                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                // 将GlobalTransactionalInterceptor封装为advisor,添加到AdvisedSupport中
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                for (Advisor avr : advisor) {
                    advised.addAdvisor(0, avr);
                }
            }
            // 将beanName放到已代理过的集合中
            PROXYED_SET.add(beanName);
            return bean;
        }
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}

当调用被@GlobalTransactional或@GlobalLock注解修饰的方法时,会调到代理对象,而增强逻辑在GlobalTransactionalInterceptor类的invoke方法里。而具体是如何增强的以及事务时如何执行的放在另一篇专门讲解。

初始化TM和RM

GlobalTransactionScanner还实现了InitializingBean接口,所以在初始化阶段还会调用afterPropertiesSet方法

@Override
public void afterPropertiesSet() {
    if (disableGlobalTransaction) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global transaction is disabled.");
        }
        ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                             (ConfigurationChangeListener)this);
        return;
    }
    // 如果seata客户端还未初始化,则进行初始化
    if (initialized.compareAndSet(false, true)) {
        initClient();
    }
}

这里会对TM和RM进行初始化,本质上都是创建一个netty客户端,然后向tc注册

private void initClient() {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Initializing Global Transaction Clients ... ");
    }
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
    }
    // 初始化 TM,本质就是创建一个tm的netty客户端,然后向tc注册
    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    // 初始化 RM,本质就是创建一个rm的netty客户端,然后向tc注册
    RMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }

    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Global Transaction Clients are initialized. ");
    }
    registerSpringShutdownHook();

}

初始化 TM

public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
    // 获取TM客户端实例
    TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
    // 初始化TM的netty客户端
    tmNettyRemotingClient.init();
}

在获取TM客户端实例时,会创建netty客户端,但还未启动

@Override
public void init() {
    // 注册相关处理器
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
        // 调用父类初始化方法
        super.init();
    }
}

注册两个处理器,用来处理TC返回给TM的响应

private void registerProcessor() {
    // 1.registry TC response processor
    // 注册Seata-Server返回Response的处理Processor,用于处理由Client主动发起Request,
    // Seata-Server返回的Response。
    // ClientOnResponseProcessor负责把Client发送的Request和Seata-Server
    // 返回的Response对应起来,从而实现Rpc
    ClientOnResponseProcessor onResponseProcessor =
        new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
    // 2.registry heartbeat message processor
    // 处理Seata-Server返回的心跳消息
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

在父类AbstractNettyRemotingClient中启动TM的netty客户端,netty相关的不是关注的重点,所以不用深入分析。

@Override
public void init() {
    // 定时重新发送 RegisterTMRequest(RM 客户端会发送 RegisterRMRequest)请求尝试连接服务端
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    if (NettyClientConfig.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                          MAX_MERGE_SEND_THREAD,
                                                          KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<>(),
                                                          new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    // 启动netty 客户端
    clientBootstrap.start();
}

初始化RM
初始化过程跟TM一样,下面只贴出相关代码

public static void init(String applicationId, String transactionServiceGroup) {
    // 创建RM的netty客户端
    RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
    // 设置RM进去
    rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
    rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
    // 初始化
    rmNettyRemotingClient.init();
}
@Override
public void init() {
    // 注册处理器
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
        super.init();

        // Found one or more resources that were registered before initialization
        if (resourceManager != null
            && !resourceManager.getManagedResources().isEmpty()
            && StringUtils.isNotBlank(transactionServiceGroup)) {
            getClientChannelManager().reconnect(transactionServiceGroup);
        }
    }
}
private void registerProcessor() {
    // 1.registry rm client handle branch commit processor
    // 注册Seata-Server发起branchCommit的处理Processor
    RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);

    // 2.registry rm client handle branch commit processor
    // 注册Seata-Server发起branchRollback的处理Processor
    RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);

    // 3.registry rm handler undo log processor
    // 注册Seata-Server发起删除undoLog的处理Processor
    RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);

    // 4.registry TC response processor
    // 注册Seata-Server返回Response的处理Processor,用于处理由Client主动发起Request,
    // Seata-Server返回的Response。
    // ClientOnResponseProcessor负责把Client发送的Request和Seata-Server
    // 返回的Response对应起来,从而实现Rpc
    ClientOnResponseProcessor onResponseProcessor =
        new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);

    // 5.registry heartbeat message processor
    // 处理Seata-Server返回的心跳消息
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
@Override
public void init() {
    // 定时重新发送 RegisterTMRequest(RM 客户端会发送 RegisterRMRequest)请求尝试连接服务端
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    if (NettyClientConfig.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                          MAX_MERGE_SEND_THREAD,
                                                          KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<>(),
                                                          new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    // 启动netty 客户端
    clientBootstrap.start();
}

总结来说初始化TM和RM做的事就是分别注册几个处理器以及启动各自的Netty客户端。

数据源代理

SeataDataSourceAutoConfiguration

@ConditionalOnBean(DataSource.class)
@ConditionalOnExpression("${seata.enable:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}")
public class SeataDataSourceAutoConfiguration {

    /**
     * The bean seataDataSourceBeanPostProcessor.
     */
    @Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
    @ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
    public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {
        return new SeataDataSourceBeanPostProcessor(seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
    }

    /**
     * 负责为Spring中的所有DataSource生成代理对象,从而拦截SQL的执行,在SQL执行前后实现seata的逻辑
     */
    @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
    @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
    public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
        return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
            seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
    }
}

该配置类要生效的条件是${seata.enable:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}这几个配置都为true,但是我在配置文件中都设置为true后也没生效,不知道哪里问题,所以我换一种方式,在启动类上添加@EnableAutoDataSourceProxy注解。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(AutoDataSourceProxyRegistrar.class)
@Documented
public @interface EnableAutoDataSourceProxy {
    /**
     * Whether use JDK proxy instead of CGLIB proxy
     *
     * @return useJdkProxy
     */
    boolean useJdkProxy() default false;

    /**
     * Specifies which datasource bean are not eligible for auto-proxying
     *
     * @return excludes
     */
    String[] excludes() default {};

    /**
     * Data source proxy mode, AT or XA
     *
     * @return dataSourceProxyMode
     */
    String dataSourceProxyMode() default "AT";
}

该注解上导入了另一个类AutoDataSourceProxyRegistrar

public class AutoDataSourceProxyRegistrar implements ImportBeanDefinitionRegistrar {
    private static final String ATTRIBUTE_KEY_USE_JDK_PROXY = "useJdkProxy";
    private static final String ATTRIBUTE_KEY_EXCLUDES = "excludes";
    private static final String ATTRIBUTE_KEY_DATA_SOURCE_PROXY_MODE = "dataSourceProxyMode";

    public static final String BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR = "seataDataSourceBeanPostProcessor";
    public static final String BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR = "seataAutoDataSourceProxyCreator";

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        Map<String, Object> annotationAttributes = importingClassMetadata.getAnnotationAttributes(EnableAutoDataSourceProxy.class.getName());

        boolean useJdkProxy = Boolean.parseBoolean(annotationAttributes.get(ATTRIBUTE_KEY_USE_JDK_PROXY).toString());
        String[] excludes = (String[]) annotationAttributes.get(ATTRIBUTE_KEY_EXCLUDES);
        String dataSourceProxyMode = (String) annotationAttributes.get(ATTRIBUTE_KEY_DATA_SOURCE_PROXY_MODE);

        //register seataDataSourceBeanPostProcessor bean def
        if (!registry.containsBeanDefinition(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)) {
            AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder
                .genericBeanDefinition(SeataDataSourceBeanPostProcessor.class)
                .addConstructorArgValue(excludes)
                .addConstructorArgValue(dataSourceProxyMode)
                .getBeanDefinition();
            registry.registerBeanDefinition(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR, beanDefinition);
        }

        //register seataAutoDataSourceProxyCreator bean def
        if (!registry.containsBeanDefinition(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)) {
            AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder
                .genericBeanDefinition(SeataAutoDataSourceProxyCreator.class)
                .addConstructorArgValue(useJdkProxy)
                .addConstructorArgValue(excludes)
                .addConstructorArgValue(dataSourceProxyMode)
                .getBeanDefinition();
            registry.registerBeanDefinition(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR, beanDefinition);
        }
    }
}

AutoDataSourceProxyRegistrar实现了ImportBeanDefinitionRegistrar接口,这样我们就知道该类额外注册了BeanDefinition。通过源码可知,该类注册了两个bean,分别是SeataDataSourceBeanPostProcessor和SeataAutoDataSourceProxyCreator。

SeataDataSourceBeanPostProcessor
该类是一个BeanPostProcessor,主要就是用来生成数据源代理的

public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(SeataDataSourceBeanPostProcessor.class);

    private final List<String> excludes;
    private final BranchType dataSourceProxyMode;

    public SeataDataSourceBeanPostProcessor(String[] excludes, String dataSourceProxyMode) {
        this.excludes = Arrays.asList(excludes);
        this.dataSourceProxyMode = BranchType.XA.name().equalsIgnoreCase(dataSourceProxyMode) ? BranchType.XA : BranchType.AT;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DataSource) {
            //When not in the excludes, put and init proxy.
            if (!excludes.contains(bean.getClass().getName())) {
                // 这里只是生成代理,并不返回代理,返回的还是真实数据源,
                // 毕竟不是每个sql都需要代理,在需要使用代理的时候再取出来
                DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
            }

            // 如果是代理数据源,则返回真实数据源
            if (bean instanceof SeataDataSourceProxy) {
                LOGGER.info("Unwrap the bean of the data source," +
                    " and return the original data source to replace the data source proxy.");
                return ((SeataDataSourceProxy) bean).getTargetDataSource();
            }
        }
        return bean;
    }
}

DataSourceProxyHolder是用来存放代理数据源的,如果当前bean是DataSource,则会为该DataSource生成一个代理DataSource。在putDataSource方法中,会进行数据源代理类的创建,当然,该方法除了创建数据源代理,获取数据源代理也是调用这个方法。

public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
    DataSource originalDataSource;
    // 如果已经是代理数据源并且事务模式也跟想要的一样,则直接返回了
    if (dataSource instanceof SeataDataSourceProxy) {
        SeataDataSourceProxy dataSourceProxy = (SeataDataSourceProxy) dataSource;

        // 就是想要的代理类就直接返回了
        if (dataSourceProxyMode == dataSourceProxy.getBranchType()) {
            return (SeataDataSourceProxy) dataSource;
        }

        // 获取原数据源,下面根据该数据源创建或获取数据源代理类
        originalDataSource = dataSourceProxy.getTargetDataSource();
    } else {
        originalDataSource = dataSource;
    }
    // 从缓存中获取真实数据源对应的代理
    SeataDataSourceProxy dsProxy = dataSourceProxyMap.get(originalDataSource);
    if (dsProxy == null) {
        synchronized (dataSourceProxyMap) {
            dsProxy = dataSourceProxyMap.get(originalDataSource);
            if (dsProxy == null) {
                // 没获取到就根据事务模式和真实数据源创建一个代理
                dsProxy = createDsProxyByMode(dataSourceProxyMode, originalDataSource);
                // 放进缓存
                dataSourceProxyMap.put(originalDataSource, dsProxy);
            }
        }
    }
    return dsProxy;
}

XA模式就创建DataSourceProxyXA,其他模式创建DataSourceProx。

private SeataDataSourceProxy createDsProxyByMode(BranchType mode, DataSource originDs) {
    return BranchType.XA == mode ? new DataSourceProxyXA(originDs) : new DataSourceProxy(originDs);
}

SeataAutoDataSourceProxyCreator
上面为每个数据源生成了seata的代理对象,但是该代理对象并不能通过AOP切入,所以还是需要一个AOP代理对象。SeataAutoDataSourceProxyCreator也是继承了AbstractAutoProxyCreator类,以前解析过AOP源码,可以知道继承该类就可以对指定的bean生成AOP代理 。

public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
    private final List<String> excludes;
    private final Advisor advisor;

    public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes, String dataSourceProxyMode) {
        this.excludes = Arrays.asList(excludes);
        this.advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode));
        setProxyTargetClass(!useJdkProxy);
    }

    @Override
    protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Auto proxy of [{}]", beanName);
        }
        return new Object[]{advisor};
    }

    @Override
    protected boolean shouldSkip(Class<?> beanClass, String beanName) {
        // 这个类只对DataSource生成代理
        return !DataSource.class.isAssignableFrom(beanClass) ||
            SeataProxy.class.isAssignableFrom(beanClass) ||
            excludes.contains(beanClass.getName());
    }
}

从shouldSkip方法可知,只会对DataSource生成代理,而它添加的增强逻辑在SeataAutoDataSourceProxyAdvice内。

/**
 * 对DataSource进行增强,代理DataSource中的方法
 *
 * @author xingfudeshi@gmail.com
 */
public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {

    private final BranchType dataSourceProxyMode;
    private final Class<? extends SeataDataSourceProxy> dataSourceProxyClazz;

    public SeataAutoDataSourceProxyAdvice(String dataSourceProxyMode) {
        if (BranchType.AT.name().equalsIgnoreCase(dataSourceProxyMode)) {
            this.dataSourceProxyMode = BranchType.AT;
            this.dataSourceProxyClazz = DataSourceProxy.class;
        } else if (BranchType.XA.name().equalsIgnoreCase(dataSourceProxyMode)) {
            this.dataSourceProxyMode = BranchType.XA;
            this.dataSourceProxyClazz = DataSourceProxyXA.class;
        } else {
            throw new IllegalArgumentException("Unknown dataSourceProxyMode: " + dataSourceProxyMode);
        }

        //Set the default branch type in the RootContext.
        RootContext.setDefaultBranchType(this.dataSourceProxyMode);
    }

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // 如果不是在@GlobalLock方法或事务模式跟当前的不匹配,则直接调用原方法
        if (!RootContext.requireGlobalLock() && dataSourceProxyMode != RootContext.getBranchType()) {
            return invocation.proceed();
        }

        Method method = invocation.getMethod();
        Object[] args = invocation.getArguments();
        Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
        if (m != null && DataSource.class.isAssignableFrom(method.getDeclaringClass())) {
            // 获取seata创建的代理数据源,调用代理数据源的方法
            SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
            return m.invoke(dataSourceProxy, args);
        } else {
            return invocation.proceed();
        }
    }

    @Override
    public Class<?>[] getInterfaces() {
        return new Class[]{SeataProxy.class};
    }
}

当调用DataSource的方法时,就会通过AOP代理对象调用到SeataDataSourceProxy实现类的方法,即seata的代理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/23983.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

破解极域(4):万能密码法(可以获取到原密码)

破解极域&#xff08;4&#xff09;&#xff1a;万能密码法 1.思路2.实现2.1 获得密码2.2 解除控制2.3 特别注意 3.视频展示 今天来分享下破解极域的第4种方法——万能密码法 1.思路 首先&#xff0c;我们要知道的是&#xff0c;极域这个东西它有一个万能密码&#xff0c;万能…

如何检查Linux硬盘大小、类型和硬件详细信息?

在Linux系统中&#xff0c;了解硬盘的大小、类型和硬件详细信息对于系统管理和故障排除非常重要。本文将详细介绍如何使用命令行工具来检查Linux硬盘的大小、类型和硬件详细信息。 1. 检查硬盘大小 要检查Linux硬盘的大小&#xff0c;可以使用lsblk命令。该命令显示了系统中所…

我用AI帮我唱了首“基尼太美”,颠覆了我的认知!太牛逼了

目录 前言 AI唱"基尼太美"是什么感觉 使用so-vits-svc打造自己专属歌手 1.声音素材整理 2.训练模型 3.让AI唱歌​编辑 AI歌手背后的技术 AI歌手会成为主流吗 写到最后 大家好&#xff0c;我是大侠&#xff0c;AI领域的专业博主 前言 在5月份&#xff0c;孙…

vue2_模版语法

目录 模版语法 react用jsx语法编译后的null作用 插值表达式{{}} v-bind和{{}} 关于国内谷歌自带翻译停用如何解决&#xff08;额外&#xff09; 会一点的插值表达式&#xff0c;也有限制 模版语法 更接近原生js的写法jsx语法 jsx是react提出的&#xff1b;后很多前端框架…

说说你对slot的理解?slot使用场景有哪些?

vue的slot的理解&#xff1f;slot使用场景有哪些&#xff1f; 定义 在Vue.js中&#xff0c;slot&#xff08;插槽&#xff09;是一种用于组件之间内容分发的机制。它允许你在父组件中编写子组件的内容&#xff0c;从而增加了组件的灵活性和可重用性。 Slot 艺名插槽&#xf…

汇编寄存器之内存访问

1.内存中字的存储: 在CPU中用一个16位寄存器来存储一个字, 高8位存高字节,低8位存低字节 如AX寄存器存在一个字,那么AH存高字节,AL存低字节 在内存中存储字时是用两个连续的字节来存储字的, 这个字的低字节存在低单元,高字节存在高单元. 如下表示: 内存单元编号 单元中…

【微博-计算Cell子控件的frame Objective-C语言】

一、计算Cell子控件的frame 1.来,看一下,刚才我们已经做到把这个模型设置给自定义的cell了吧, 那么,在这个自定义Cell里面呢,我们是不是要开始设置数据了, 设置数据,我们,设置数据,其实很简单,就是把我们这里边的每一个控件,对应的值,从模型里面取出来,给了它,…

【独立版】智慧城市同城V4_2.2.7全开源全插件VUE版,修复房产信息组件商户发布二手房房源信息未和商户关联的问题

源码介绍 【独立版】智慧城市同城V4 查看更多关于 智慧城市同城V4 的文章 _2.2.7全开源全插件VUE版&#xff0c;修复房产信息组件商户发布二手房房源信息未和商户关联的问题&#xff01; 智慧城市同城是一套专注于多城市生活服务同城技术解决方案,全面覆盖同堿信息、商家联盟、…

【Linux】搭建SFTP文件服务器

一、协议介绍1.1 FTP 协议1.11 特点1.12 基本工作原理 1.2 SFTP协议1.21 特点1.22 基本工作原理 1.3 ssh协议1.31 特点1.32 基本工作原理 1.4 其他常见文件传输协议 二、搭建Linux的SFTP文件服务器三、连接测试3.1 电脑连接3.2 手机连接 一、协议介绍 1.1 FTP 协议 1.11 特点…

AI落地:高效学习指南

高效学习中有一个共识&#xff1a;学习最小可用知识&#xff0c;然后立马开始实践&#xff0c;做中学&#xff0c;不断获得反馈&#xff0c;不断在实践中改进。 现实生活中&#xff0c;如果我们想实现这种高效学习&#xff0c;基本上只能找一个老师1对1指导&#xff0c;费用贵…

【开发者指南】如何在MyEclipse中使用 XML编辑器

XML编辑器包括高级XML编辑功能。通过本文&#xff0c;你将了解其编辑功能和网页XML编辑&#xff0c;一起来看看吧~ 1. Web XML编辑器 MyEclipse Web XML编辑器包括高级XML编辑功能&#xff0c;如: 语法高亮显示标签和属性内容辅助实时验证(在您输入时)文档内容的源视图、设计…

160743-62-4,DMG PEG2000,1,2-二肉豆蔻酰-rac-甘油-3-甲氧基聚乙二醇2000

DMG PEG2000&#xff0c;DMG-mPEG2000&#xff0c;1,2-二肉豆蔻酰-rac-甘油-3-甲氧基聚乙二醇2000 Product structure&#xff1a; Product specifications&#xff1a; 1.CAS No&#xff1a;160743-62-4 2.Molecular formula&#xff1a; C34H66O 3.Molecular weight&#xff…

Java内部类(成员内部类、静态嵌套类、方法内部类、匿名内部类)

文章目录 一、内部类的共性二、为什么需要内部类三、静态内部类&#xff08;静态嵌套类&#xff09;四、成员内部类五、局部内部类&#xff08;方法内部类&#xff09;六、匿名内部类 Java 类中不仅可以定义变量和方法&#xff0c;还可以定义类&#xff0c;这样定义在类内部的类…

挂耳式耳机品牌排行榜,看看谁被推荐上榜

下班路上就想放空自己刷会儿视频&#xff0c;但是马路、地铁还有公交上都会有嘈杂的声音影响&#xff0c;如果佩戴入耳式耳机放大声音不仅会过度屏蔽外界&#xff0c;同时还会损伤我们的耳朵&#xff0c;所以新近流行的开放式耳机很好的解决了这些问题&#xff0c;但也有很多小…

【Linux】深入了解冯诺依曼体系结构与操作系统

目录 导读 &#x1f31e;专栏导读 &#x1f31e;冯诺依曼 &#x1f31e;冯诺依曼体系结构 &#x1f31b;木桶效应 &#x1f31e;操作系统(Operator System) &#x1f31b;概念 &#x1f31b;设计OS的目的 &#x1f31b;系统调用和库函数概念 导读 六一儿童节快到了&…

杜绝开源依赖风险,许可证扫描让高效合规「两不误」

目录 开源许可证及其常见类型 开源许可证扫描是软件研发过程中&#xff0c;不可或缺的工具 极狐GitLab 开源许可证扫描的优势与应用 Step 1&#xff1a;启用及设置许可证策略 Step 2&#xff1a;自动创建策略文件存放项目 Step 3&#xff1a;查看许可证合规情况 Step 4&…

实训可视化项目小结 --- 开启Python初始之旅

Python初试感悟 语言之间是相通的&#xff0c;大多数时候&#xff0c;百分之八十的问题&#xff0c;都可以用常用的容器以及内置函数来辅助解决。之前从未认真接触过Python&#xff0c;但此次学校实训要求使用Python做一个可视化&#xff0c;东西不难&#xff0c;我个人负责爬…

Hadoop学习---8、Hadoop数据压缩

1、Hadoop数据压缩 1.1 概述 1、压缩的好处和坏处 &#xff08;1&#xff09;优点&#xff1a;减少磁盘IO、减少磁盘储存空间 &#xff08;2&#xff09;缺点&#xff1a;增加CPU开销 2、压缩原则 &#xff08;1&#xff09;运算密集型的Job&#xff0c;少用压缩 &#xff08…

【2023 · CANN训练营第一季】应用开发(初级)第四章——模型推理

AscendCL运行资源管理 申请运行管理资源时&#xff0c;需按顺序依次申请: Device、Context、Stream&#xff0c;然后根据实际需求调用aclrtGetRunMode接口获取软件栈的运行模型(当同一个应用既支持在Host运行&#xff0c;也支持在Device运行时&#xff0c;在编程时需要就需要根…

Shap-E:3D资产的生成式AI大模型

OpenAI 刚刚发布了 Shap-E&#xff0c;这是一种基于文本提示和图像创建 3D 资产的生成模型&#xff0c;能够生成带纹理的网格和神经辐射场 &#xff0c;从而实现各种 3D 输出。 推荐&#xff1a;用 NSDT设计器 快速搭建可编程3D场景。 在本教程中&#xff0c;我们将引导你在 Go…