【Seata源码学习 】篇五 注册分支事务

【Seata源码学习 】篇五 分支事务注册

1.远程服务调用绑定XID

回到事务模版方法类TransactionalTemplate中

                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
                    // 执行执行拦截器链路
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    // 如果抛出异常,判断异常是否在指定的范围中(默认为Throwable类及其子类)
                    // 执行异常回滚的前后钩子方法
                    // 如果当前事务的角色是 launcher 也就是 TM ,通过TmNettyRemotingClient  向TC发送一个 GlobalRollbackRequest 同步消息
                    // 并记录TC返回的当前事务状态Status
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

beginTransaction 开启全局事务我们已经在上一篇看过了,最终会将TC返回到XID绑定到RootContext中,并且TC会将全局事务会话持久化,通常我们使用DB,那么将会往seata数据库的global_table中插入一条数据。

接下来就要执行拦截器链路,最后一个拦截器执行完毕后如果无异常就要执行目标方法。调用目标方法通常会使用到第三方的RPC框架,例如 Fegin,RestTemplate等等

在引入的spring-cloud-seata包下,基于SpringbootStater机制,将会自动装配四个配置类,我们就看最常用的SeataFeignClientAutoConfiguration

image-20231123095810260

public class SeataFeignClientAutoConfiguration {
    public SeataFeignClientAutoConfiguration() {
    }

    @Bean
    @Scope("prototype")
    @ConditionalOnClass(
        name = {"com.netflix.hystrix.HystrixCommand"}
    )
    @ConditionalOnProperty(
        name = {"feign.hystrix.enabled"},
        havingValue = "true"
    )
    Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {
        return SeataHystrixFeignBuilder.builder(beanFactory);
    }

    @Bean
    @Scope("prototype")
    @ConditionalOnClass(
        name = {"com.alibaba.csp.sentinel.SphU"}
    )
    @ConditionalOnProperty(
        name = {"feign.sentinel.enabled"},
        havingValue = "true"
    )
    Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) {
        return SeataSentinelFeignBuilder.builder(beanFactory);
    }

    @Bean
    @ConditionalOnMissingBean
    @Scope("prototype")
    Feign.Builder feignBuilder(BeanFactory beanFactory) {
        return SeataFeignBuilder.builder(beanFactory);
    }
}

实际上无论是否集成hystrix或者sentinel , 最终构建的都是 SeataFeignClient 对象


 static Feign.Builder builder(BeanFactory beanFactory) {
        return SentinelFeign.builder().retryer(Retryer.NEVER_RETRY).client(new SeataFeignClient(beanFactory));
    }

而SeataFeignClient在发送请求时,会将Request进行修改

com.alibaba.cloud.seata.feign.SeataFeignClient#execute

 public Response execute(Request request, Request.Options options) throws IOException {
        //修改请求信息
        Request modifiedRequest = this.getModifyRequest(request);
        return this.delegate.execute(modifiedRequest, options);
    }
    private Request getModifyRequest(Request request) {
    		//获取开启全局事务时绑定到RootContext中到XID
        String xid = RootContext.getXID();
        if (StringUtils.isEmpty(xid)) {
            return request;
        } else {
        		// 放到请求头中
            Map<String, Collection<String>> headers = new HashMap(16);
            headers.putAll(request.headers());
            List<String> seataXid = new ArrayList();
            seataXid.add(xid);
            headers.put("TX_XID", seataXid);
            // 返回添加了 TX_XID 请求头的新Request
            return Request.create(request.method(), request.url(), headers, request.body(), request.charset());
        }
    }

这里需要注意一点,在发起RPC调用时,需要保证跟开启全局事务的方法在同一个线程中,因为RootContext是将XID存储在ThreadLocal中,每个Thread都绑定了自己的ThreadLocal.ThreadLocalMap,而ThreadLocal.get就是从当前线程中获取ThreadLocalMap,再去获取存储在线程中的变量。

2.数据源代理

image-20231123104326151

public class SeataDataSourceAutoConfiguration {

    /**
     * The bean seataAutoDataSourceProxyCreator.
     */
    @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
    @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
    public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
    		// spring上下文中添加SeataAutoDataSourceProxyCreator
        return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
            seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
    }
}

SeataAutoDataSourceProxyCreator 继承了 AbstractAutoProxyCreator

    private Object[] buildAdvisors(String dataSourceProxyMode) {
    		// 使用 SeataAutoDataSourceProxyAdvice 对目标方法进行拦截
        Advice advice = new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode);
        return new Object[]{new DefaultIntroductionAdvisor(advice)};
    }

    @Override
    protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) {
        return advisors;
    }

    @Override
    protected boolean shouldSkip(Class<?> beanClass, String beanName) {
    //排除指定列表的bean
        if (excludes.contains(beanClass.getName())) {
            return true;
        }
        //对数据源对象进行拦截
        return SeataProxy.class.isAssignableFrom(beanClass);
    }
    
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        // we only care DataSource bean
        if (!(bean instanceof DataSource)) {
            return bean;
        }

        // when this bean is just a simple DataSource, not SeataDataSourceProxy
        if (!(bean instanceof SeataDataSourceProxy)) {
            Object enhancer = super.wrapIfNecessary(bean, beanName, cacheKey);
            // this mean this bean is either excluded by user or had been proxy before
            if (bean == enhancer) {
                return bean;
            }
            // else, build proxy,  put <origin, proxy> to holder and return enhancer
            DataSource origin = (DataSource) bean;
            // 创建代理类 AT模式为 DataSourceProxy  XA模式为 DataSourceProxyXA
            SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);
            // 将 DataSource 与  SeataDataSourceProxy 映射缓存
            DataSourceProxyHolder.put(origin, proxy);
            return enhancer;
        }

io.seata.spring.annotation.datasource.SeataAutoDataSourceProxyAdvice#invoke

    public Object invoke(MethodInvocation invocation) throws Throwable {
        // check whether current context is expected
        if (!inExpectedContext()) {
            return invocation.proceed();
        }

        Method method = invocation.getMethod();
        String name = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();

        Method declared;
        try {
            declared = DataSource.class.getDeclaredMethod(name, parameterTypes);
        } catch (NoSuchMethodException e) {
            // mean this method is not declared by DataSource
            return invocation.proceed();
        }

        // switch invoke instance to its proxy
        DataSource origin = (DataSource) invocation.getThis();
        SeataDataSourceProxy proxy = DataSourceProxyHolder.get(origin);
        Object[] args = invocation.getArguments();
        // 方法由 SeataDataSourceProxy 代理类执行
        return declared.invoke(proxy, args);
    }

获取Connection对象

假设当前为seata的全局事务模式为AT,当原始数据源调用getConnection方法获取连接时,实际上调用的是DataSourceProxy.getConnection方法,拿到的是ConnectionProxy对象

    @Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }

    @Override
    public ConnectionProxy getConnection(String username, String password) throws SQLException {
        Connection targetConnection = targetDataSource.getConnection(username, password);
        return new ConnectionProxy(this, targetConnection);
    }

获取Statement对象

AbstractConnectionProxy

 public Statement createStatement() throws SQLException {
        Statement targetStatement = getTargetConnection().createStatement();
        return new StatementProxy(this, targetStatement);
    }
    
    
    
    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        String dbType = getDbType();
        // support oracle 10.2+
        PreparedStatement targetPreparedStatement = null;
        if (BranchType.AT == RootContext.getBranchType()) {
            List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
            if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
                    TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
                            sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
                    String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
                    tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
                    targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
                }
            }
        }
        if (targetPreparedStatement == null) {
            targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        }
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }

而获取的Statement 与 PreparedStatement 分别是 StatementProxy 与 PreparedStatementProxy,而两者都是由ExecuteTemplate来执行sql语句,假设RM此时要执行的业务是更新操作

   public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        // 如果当前即不需要全局锁 并且事务类型不为AT模式,那么直接执行原生SQL
        if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
        // 获取数据源类型
        String dbType = statementProxy.getConnectionProxy().getDbType();
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            sqlRecognizers = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    dbType);
        }
        Executor<T> executor;
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                // 区分当前执行的SQL语句类型
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    // 假设当前执行的是UPDATAE语句
                    case UPDATE:
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case INSERT_ON_DUPLICATE_UPDATE:
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                            case JdbcConstants.MARIADB:
                                executor =
                                    new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                        }
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {
            // 执行器执行SQL语句
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        return rs;
    }

UPDATE语句使用的是UpdateExecutor执行器,而UpdateExecutor的类关系图如下

image-20231126101103175

由父类BaseTransactionalExecutor 调用execute方法,将RootContext中的XID绑定到StatementProxy中,并标识是否需要全局锁

   public T execute(Object... args) throws Throwable {
        String xid = RootContext.getXID();
        if (xid != null) {
            statementProxy.getConnectionProxy().bind(xid);
        }

        statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
        return doExecute(args);
    }

关闭自动提交事务并执行sql

接着调用子类AbstractDMLBaseExecutor的doExecute方法,主要是关闭自动提交,然后执行 executeAutoCommitFalse 方法

    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        // connectionProxy.getAutoCommit() 会判断原始数据源对象是否是自动提交事务 (JDBC默认提交事务)
        // 但在seata中 即使原始数据源是自动化提交,在 executeAutoCommitTrue 方法中也会关闭自动提交事务
        if (connectionProxy.getAutoCommit()) {
            // 将自动提交设置为false,再调用executeAutoCommitFalse
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }

io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse

    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        // 要求当前数据源是MYSQL, 且不能是多主键
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        // 封装前镜像数据 TableRecords 记录了每一个字段对应的值,且获取的是当前最新的数据
        TableRecords beforeImage = beforeImage();
        // 执行sql语句
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        int updateCount = statementProxy.getUpdateCount();
        //修改成功
        if (updateCount > 0) {
            //封装后镜像数据
            TableRecords afterImage = afterImage(beforeImage);
            //将前后镜像打包成回滚日志 其中包括全局锁信息
            prepareUndoLog(beforeImage, afterImage);
        }
        return result;
    }

博主使用的seata版本是1.5.2,此版本不支持多主键.

为了保证前镜像获取的数据是最新的,创建SQL查询语句时会拼接 FOR UPDATE ,使用当前读的方式。成功修改数据后,会将修改后的数据也构建成镜像数据,最后由前后镜像一起组成回滚日志。

提交事务

由于我们获取的数据源连接对象时ConnectionProxy,因此提交事务时也时此对象进行提交

io.seata.rm.datasource.ConnectionProxy#commit

    public void commit() throws SQLException {
        try {
            //由lockRetryPolicy手动提交事务 支持全局锁获取失败重试机制
            lockRetryPolicy.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                // 如果出现SQLException 异常 回滚本地事务,向TC报告,并移除RootContext中的全局事务XID
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

io.seata.rm.datasource.ConnectionProxy#doCommit

    private void doCommit() throws SQLException {
        //全局事务XID不为空
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        //要求全局锁
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            //都不是直接提交
            targetConnection.commit();
        }
    }

io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit

    public void commit() throws SQLException {
        try {
            //由lockRetryPolicy手动提交事务 支持全局锁获取失败重试机制
            // 如果是获取全局锁失败,将会重试
            lockRetryPolicy.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                // 如果出现SQLException 异常 回滚本地事务,向TC报告,并移除RootContext中的全局事务XID
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

由lockRetryPolicy手动提交事务,而如果出现全局锁冲突,将会在线程休眠后重试

io.seata.rm.datasource.ConnectionProxy.LockRetryPolicy#execute

        protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
            LockRetryController lockRetryController = new LockRetryController();
            while (true) {
                try {
                    return callable.call();
                } catch (LockConflictException lockConflict) {
                    //先回滚本地事务
                    onException(lockConflict);
                    // AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is released
                    if (connection.getContext().isAutoCommitChanged()
                        && lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {
                        lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);
                    }
                    //休眠10秒收重试
                    lockRetryController.sleep(lockConflict);
                } catch (Exception e) {
                    onException(e);
                    throw e;
                }
            }
        }

io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit

private void processGlobalTransactionCommit() throws SQLException {
        try {
            //在回滚日志已经准备好的情况下 向TC注册分支事务,并将分支事务ID保存到ConnectionContext中
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            // 将回滚日志刷新到数据库中 等待和本地事务一起提交
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            //本地事务提交
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }

Undo回滚日志将和本地事务一起提交,但在提交前,需要先注册分支事务ID

io.seata.rm.AbstractResourceManager#branchRegister

    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
        try {
            BranchRegisterRequest request = new BranchRegisterRequest();
            request.setXid(xid);
            request.setLockKey(lockKeys);
            request.setResourceId(resourceId);
            request.setBranchType(branchType);
            request.setApplicationData(applicationData);

            BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
            if (response.getResultCode() == ResultCode.Failed) {
                throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
            }
            return response.getBranchId();
        } catch (TimeoutException toe) {
            throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
        } catch (RuntimeException rex) {
            throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
        }
    }

注册分支事务需要将当前全局事务的XID,全局锁,资源信息,分支事务类型以及应用数据,一起构建成BranchRegisterRequest消息发送给TC,具体如下数据所示

{
  "branchType": "AT",
  "lockKey": "product_table:1",
  "resourceId": "jdbc:mysql://localhost:3306/seata-test",
  "typeCode": 11,
  "xid": "xxx.xx.xx.xxx:8091:1378562425205329921"
}

3.TC处理 BranchRegisterRequest 消息

TC在处理 BranchRegisterRequest消息 依然也是 ServerOnRequestProcessor,最终交由AbstractTCInboundHandle进行处理

io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.BranchRegisterRequest, io.seata.core.rpc.RpcContext)

 public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
        BranchRegisterResponse response = new BranchRegisterResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
            @Override
            public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
                throws TransactionException {
                try {
                    //真正处理分支注册
                    doBranchRegister(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore, String
                        .format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
                }
            }
        }, request, response);
        return response;
    }

io.seata.server.coordinator.DefaultCoordinator#doBranchRegister

 @Override
    protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
                                    RpcContext rpcContext) throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        // 获取分支ID
        response.setBranchId(
                core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
                        request.getXid(), request.getApplicationData(), request.getLockKey()));
    }

io.seata.server.coordinator.DefaultCore#branchRegister

 @Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                               String applicationData, String lockKeys) throws TransactionException {
        // 当前事务类型为AT模式,返回 ATCore 在调用 branchRegister 方法
        return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid,
            applicationData, lockKeys);
    }

io.seata.server.coordinator.AbstractCore#branchRegister

 public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                               String applicationData, String lockKeys) throws TransactionException {
        GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
        return SessionHolder.lockAndExecute(globalSession, () -> {
            // 全局事务当前必须是激活且是beging状态 否则抛出异常
            globalSessionStatusCheck(globalSession);
            globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
            //注册分支事务回话(将分支事务信息绑定上全局事务XID)
            BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                    applicationData, lockKeys, clientId);
            MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
            //加全局锁
            branchSessionLock(globalSession, branchSession);
            try {
                //将分支事务会话添加到全局事务会话到集合中,并向监听器添加onAddBranch事件,将分支事务会话持久到数据库中
                globalSession.addBranch(branchSession);
            } catch (RuntimeException ex) {
                //如果出现异常,从数据库中删除分支事务信息
                branchSessionUnlock(branchSession);
                throw new BranchTransactionException(FailedToAddBranch, String
                        .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
                                branchSession.getBranchId()), ex);
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
                        globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
            }
            return branchSession.getBranchId();
        });
    }

首先会判断当前全局事务的状态,接着创建一个新的BranchSession对象,生成branchId,并绑定上XID;接着就是尝试加全局锁,加锁失败会抛出异常;然后再将分支事务会话添加到全局事务会话到集合中,并向监听器添加onAddBranch事件,将分支事务会话持久到数据库中,并返回分支事务ID。如果此过程出现异常,则会删除分支事务信息。

判断全局事务状态

            // 全局事务当前必须是激活且是beging状态 否则抛出异常
            globalSessionStatusCheck(globalSession);

io.seata.server.coordinator.AbstractCore#globalSessionStatusCheck

protected void globalSessionStatusCheck(GlobalSession globalSession) throws GlobalTransactionException {
        //全局事务必须是激活状态
        if (!globalSession.isActive()) {
            throw new GlobalTransactionException(GlobalTransactionNotActive, String.format(
                "Could not register branch into global session xid = %s status = %s, cause by globalSession not active",
                globalSession.getXid(), globalSession.getStatus()));
        }
        //全局事务必须是begin状态
        if (globalSession.getStatus() != GlobalStatus.Begin) {
            throw new GlobalTransactionException(GlobalTransactionStatusInvalid, String
                    .format("Could not register branch into global session xid = %s status = %s while expecting %s",
                            globalSession.getXid(), globalSession.getStatus(), GlobalStatus.Begin));
        }
    }

全局事务当前必须是激活且是beging状态 否则抛出异常

注册分支事务

BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                    applicationData, lockKeys, clientId);

io.seata.server.session.SessionHelper#newBranchByGlobal(io.seata.server.session.GlobalSession, io.seata.core.model.BranchType, java.lang.String, java.lang.String, java.lang.String, java.lang.String)

 public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId,
            String applicationData, String lockKeys, String clientId) {
        //创建新的BranchSession对象
        BranchSession branchSession = new BranchSession();
        //分别绑定XID,事务ID,新生成的分支事务id 并设置事务类型与资源信息,锁信息,RM客户端信息(服务名和id)
        branchSession.setXid(globalSession.getXid());
        branchSession.setTransactionId(globalSession.getTransactionId());
        branchSession.setBranchId(UUIDGenerator.generateUUID());
        branchSession.setBranchType(branchType);
        branchSession.setResourceId(resourceId);
        branchSession.setLockKey(lockKeys);
        branchSession.setClientId(clientId);
        branchSession.setApplicationData(applicationData);

        return branchSession;
    }

最终的分支信息如图所示

image-20231126172153705

添加全局锁

//加全局锁
            branchSessionLock(globalSession, branchSession);

io.seata.server.transaction.at.ATCore#branchSessionLock

 @Override
    protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession)
        throws TransactionException {
        String applicationData = branchSession.getApplicationData();
        boolean autoCommit = true;
        boolean skipCheckLock = false;
        //applicationData为空 往下走
        if (StringUtils.isNotBlank(applicationData)) {
            if (objectMapper == null) {
                objectMapper = new ObjectMapper();
            }
            try {
                Map<String, Object> data = objectMapper.readValue(applicationData, HashMap.class);
                Object clientAutoCommit = data.get(AUTO_COMMIT);
                if (clientAutoCommit != null && !(boolean)clientAutoCommit) {
                    autoCommit = (boolean)clientAutoCommit;
                }
                Object clientSkipCheckLock = data.get(SKIP_CHECK_LOCK);
                if (clientSkipCheckLock instanceof Boolean) {
                    skipCheckLock = (boolean)clientSkipCheckLock;
                }
            } catch (IOException e) {
                LOGGER.error("failed to get application data: {}", e.getMessage(), e);
            }
        }
        try {
        		//加全局锁
            if (!branchSession.lock(autoCommit, skipCheckLock)) {
                throw new BranchTransactionException(LockKeyConflict,
                    String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),
                        branchSession.getBranchId()));
            }
        } catch (StoreException e) {
            if (e.getCause() instanceof BranchTransactionException) {
                throw new BranchTransactionException(((BranchTransactionException)e.getCause()).getCode(),
                    String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),
                        branchSession.getBranchId()));
            }
            throw e;
        }
    }

io.seata.server.session.BranchSession#lock(boolean, boolean)

public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {
        if (this.getBranchType().equals(BranchType.AT)) {
            //获取锁资源管理器执行加速操作
            return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);
        }
        return true;
    }

无论是哪种锁资源管理器,其调用acquireLock都是走的父类AbstractLockManager的方法

io.seata.server.lock.AbstractLockManager#acquireLock(io.seata.server.session.BranchSession, boolean, boolean)

 @Override
    public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {
        if (branchSession == null) {
            throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
        }
        // lockKey通常是由 表名 + 主键组成
        String lockKey = branchSession.getLockKey();
        if (StringUtils.isNullOrEmpty(lockKey)) {
            // no lock
            return true;
        }
        // get locks of branch
        // 使用 ; 分割lockKey 获取RowLock集合,RowLock封装了 XID,分支事务ID,表民,主键等信息
        List<RowLock> locks = collectRowLocks(branchSession);
        if (CollectionUtils.isEmpty(locks)) {
            // no lock
            return true;
        }
        // 获取枷锁操作对象,调用acquireLock方法
        return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);
    }

我们就看 DataBaseLocker 是怎么加锁的

    public boolean acquireLock(List<RowLock> locks, boolean autoCommit, boolean skipCheckLock) {
        // 首先RowLock集合对象不能为空
        if (CollectionUtils.isEmpty(locks)) {
            // no lock
            return true;
        }
        try {
            return lockStore.acquireLock(convertToLockDO(locks), autoCommit, skipCheckLock);
        } catch (StoreException e) {
            throw e;
        } catch (Exception t) {
            LOGGER.error("AcquireLock error, locks:{}", CollectionUtils.toString(locks), t);
            return false;
        }
    }

io.seata.server.storage.db.lock.LockStoreDataBaseDAO#acquireLock(java.util.List<io.seata.core.store.LockDO>, boolean, boolean)

 public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        Set<String> dbExistedRowKeys = new HashSet<>();
        boolean originalAutoCommit = true;
        if (lockDOs.size() > 1) {
            lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
        }
        try {
            conn = lockStoreDataSource.getConnection();
            if (originalAutoCommit = conn.getAutoCommit()) {
                conn.setAutoCommit(false);
            }
            List<LockDO> unrepeatedLockDOs = lockDOs;

            //check lock
            if (!skipCheckLock) {

                boolean canLock = true;
                //query
                // select * from lock_table where row_key in ( ? ) order by status desc
                // RowKey 由数据库资源信息 + 表名 + 主键组成
                String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
                ps = conn.prepareStatement(checkLockSQL);
                for (int i = 0; i < lockDOs.size(); i++) {
                    ps.setString(i + 1, lockDOs.get(i).getRowKey());
                }
                rs = ps.executeQuery();
                String currentXID = lockDOs.get(0).getXid();
                boolean failFast = false;
                // 如果在lock_tabel中根据  RowKey 找到了记录,需要判断当前xid是否与数据库中的xid是否一致
                // 如果不一致 加速失败
                while (rs.next()) {
                    String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
                    if (!StringUtils.equals(dbXID, currentXID)) {
                        if (LOGGER.isInfoEnabled()) {
                            String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
                            String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
                            long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
                            LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
                        }
                        if (!autoCommit) {
                            int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
                            if (status == LockStatus.Rollbacking.getCode()) {
                                failFast = true;
                            }
                        }
                        canLock = false;
                        break;
                    }
                    // 将已经存在的全局锁缓冲起来
                    dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
                }
                if (!canLock) {
                    // 加锁失败 回滚保存分支事务信息的sql
                    conn.rollback();
                    if (failFast) {
                        throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
                    }
                    return false;
                }
                // If the lock has been exists in db, remove it from the lockDOs
                if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
                    unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
                            .collect(Collectors.toList());
                }
                if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
                    conn.rollback();
                    return true;
                }
            }

            // lock
            if (unrepeatedLockDOs.size() == 1) {
                LockDO lockDO = unrepeatedLockDOs.get(0);
                // 生成sql语句,将全局锁信息插入到lock_table表中
                if (!doAcquireLock(conn, lockDO)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
                    }
                    conn.rollback();
                    return false;
                }
            } else {
                if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
                            unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
                    }
                    conn.rollback();
                    return false;
                }
            }
            conn.commit();
            return true;
        } catch (SQLException e) {
            throw new StoreException(e);
        } finally {
            IOUtil.close(rs, ps);
            if (conn != null) {
                try {
                    if (originalAutoCommit) {
                        conn.setAutoCommit(true);
                    }
                    conn.close();
                } catch (SQLException e) {
                }
            }
        }
    }

注册全局锁之前会根据RowKey查询全局锁信息是否已经存在,生成的查询语句如下

select * from lock_table where row_key in ( ? ) order by status desc

RowKey如下所示

jdbc:mysql://localhost:3306/seata-test^^^product_table^^^1

如果locktable表中已经有数据了,则比较数据中XID与当前当前的xid是否一致,如果不一致说明已经被其他全局事务加了锁,则标记当前加锁失败,并抛出异常;如果一致,则将当前的RowKey缓冲到set集合中。最后调用LockStoreDataBaseDAO.doAcquireLock方法将锁信息保存到数据库lock_table表中。

持久化分支信息

成功添加上全局锁后,就要将分支事务会话添加到全局事务会话到集合中,并向监听器添加onAddBranch事件,将分支事务会话持久到数据库中

globalSession.addBranch(branchSession);

io.seata.server.session.GlobalSession#addBranch

    public void addBranch(BranchSession branchSession) throws TransactionException {
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            // 分支事务持久化
            lifecycleListener.onAddBranch(this, branchSession);
        }
        // 将分支事务标记为已注册状态
        branchSession.setStatus(BranchStatus.Registered);
        // 将分支事务添加到GlobalSession到List集合中
        add(branchSession);
    }

返回分支事务ID

 return branchSession.getBranchId();

最后携带刚生成的分支事务ID返回BranchRegisterResponse消息给到RM

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/200812.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【java】图书管理系统

完整代码链接&#xff1a;https://gitee.com/zeng-xuehui/Java_repository/tree/master/test_11_27_1/src我们在写这个系统时&#xff0c;首先需要搭建框架&#xff0c;再实现业务逻辑&#xff1b;图书管理系统是用户通过各种功能对图书进行操作的一个系统&#xff1b;我们需要…

Google play开发者账号付款资料暂停的原因及解决方案

相信大多数Google play开发者都收到过这封邮件 邮件内容的大致意思是“由于可疑的活动&#xff0c;我们暂停了你的付款资料。” “要恢复您的帐户&#xff0c;请转到您的帐户并执行所要求的操作。” 这是触发了谷歌的付款风控机制&#xff0c;根据开发者们的反馈&#xff0c;账…

Making Reconstruction-based Method Great Again for Video Anomaly Detection

Making Reconstruction-based Method Great Again for Video Anomaly Detection 文章信息&#xff1a; 发表于ICDM 2022&#xff08;CCF B会议&#xff09; 原文地址&#xff1a;https://arxiv.org/abs/2301.12048 代码地址&#xff1a;https://github.com/wyzjack/MRMGA4VAD…

深入理解 Vue 中的指针操作(二)

文章目录 ☘️引言☘️基本用法&#x1f342;v-for指令&#x1f342;v-model指令&#x1f331;v-model适用表单控件 ☘️结论 ☘️引言 Vue.js 是一款非常流行且功能强大的前端框架&#xff0c;它以其响应式的数据绑定和组件化的开发方式赢得了众多开发者的喜爱。而在 Vue.js …

泗博Modbus转Profinet网关TS-180对水表流量的监控应用

应用场景&#xff1a; 陕西某工程技术有限公司在一民生工程项目中&#xff0c;需要对公园直饮水进行净化保证其水质。直饮水净化装置需根据用水量不定期的维护&#xff0c;通过统计各个净化装置净化的直饮水的流量&#xff0c;来实现提前维护目的。 应用痛点&#xff1a; 项目…

二叉树(判断是否为单值二叉树)

题目&#xff08;力扣&#xff09;&#xff1a; 判断二叉树上每个节点的值是否相同&#xff0c;就需要让root节点分别与左节点和右节点分别比较是否相同。 注意&#xff1a;root等于空时&#xff0c;直接可以返回true&#xff1b; 首先&#xff0c;先判断他的特殊情况&#x…

C++ day38 动态规划 斐波那契数列 爬楼梯 使用最小花费爬楼梯

题目1&#xff1a;509 斐波那契数列 题目链接&#xff1a;斐波那契数列 对题目的理解 斐波那契数列由0和1开始&#xff0c;后面每一项数字(F(n))都是前两个数字的和&#xff0c;给一个n&#xff0c;计算F(n)&#xff0c;&#xff08;0<n<30&#xff09; 动规五部曲 …

“Install Js dependencies failed“JS SDK安装失败【Bug已解决-鸿蒙开发】

文章目录 项目场景:问题描述原因分析:解决方案:解决措施1解决方案2:其他解决方案解决方案3:此Bug解决方案总结项目场景: 在下载JS SDK时,出现下载失败的情况,并显示“Install Js dependencies failed”。 在使用版本为DevEco Studio 3.0.0.601 Beta1进行低代码开发时…

Echarts-使用渐变色填充

垂直方向的渐变 color: {type: linear,// x0,y1,柱子的颜色在垂直方向渐变x: 0,y: 1,colorStops: [// 0%处的颜色{offset: 0,color: #12c2e9,},// 50%处的颜色{offset: 0.5,color: #c471ed,},// 100%处的颜色{offset: 1,color: #f64f59,},],global: false // 缺省为 false} 水…

力扣347. 前 K 个高频元素(java,最小堆,快速排序法)

Problem: 347. 前 K 个高频元素 文章目录 前言题目描述思路解题方法复杂度Code 前言 对于求取Top K一般有如下两种题型&#xff1a; 1.针对静态数据&#xff08;查询TopK操作&#xff09; 2.针对动态数据&#xff08;包括添加数据操作和查询TOPK操作&#xff09; 一般解决思路…

第22章 NIO编程

在本章中需要掌握NIO中的缓冲区的作用&#xff0c;并理解缓冲区中的数据处理模型&#xff0c;掌握Channel的作用&#xff0c;并结合缓冲区实现数据I/O操作&#xff0c;理解文件锁的作用&#xff0c;并且掌握字符编码处理支持类的使用&#xff0c;掌握Reactor设计模型&#xff0…

Electron+Ts+Vue+Vite桌面应用系列:sqlite增删改查操作篇

文章目录 1️⃣ sqlite应用1.1 sqlite数据结构1.2 初始化数据库1.3 初始化实体类1.4 操作数据类1.5 页面调用 优质资源分享 作者&#xff1a;xcLeigh 文章地址&#xff1a;https://blog.csdn.net/weixin_43151418 ElectronTsVueVite桌面应用系列 &#xff1a;这个系列包括了从桌…

【设计模式-2.2】创建型——简单工厂和工厂模式

说明&#xff1a;本文介绍设计模式中&#xff0c;创建型设计模式中的工厂模式&#xff1b; 飞机大战 创建型设计模式&#xff0c;关注于对象的创建&#xff0c;本文介绍的简单工厂和工厂模式同样也是。举一个游戏例子&#xff0c;如飞机大战游戏中&#xff0c;屏幕中敌人类型…

【开题报告】海洋多源数据质量控制应用服务的WebServer设计与实现

开 题 报 告 内 容 论文选题的意义、主要研究内容和文献资料调研情况 一、选题意义 在当今世界研究自然环境的大背景下&#xff0c;计算机技术与各学科、各领域的综合应用逐渐增多。作为地球上最广阔的水体&#xff0c;同时也是地球上决定气候发展的主要的因素之一&#xff0…

Unity学习笔记11

一、视频播放功能 1.如何让视频在游戏场景中播放&#xff1f; 在Assets目录下添加一个渲染器纹理&#xff0c;步骤&#xff1a;新建→渲染器纹理 首先在创建一个平面&#xff0c;想让视频在平面上显示。在平面上添加一个组件 Video Player 然后将视频文件拖拽到视频剪辑位置上…

自动化测试 —— requests和selenium模块!

一、requests基于POST请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #1.requests的GET与POST用法的区别&#xff1a; GET请求: (HTTP默认的请求方法就是GET) * 没有请求体 * 数据必须在1K之内&#xff01; * GET请求数据会暴露在浏览器…

【MyBatis】MyBatis操作数据库

目录 一&#xff0c;准备工作 1.1 创建工程 1.2 准备数据 1.3 数据库连接字符串 1.4 创建持久层接口UserInfoMapper 1.5 单元测试 二&#xff0c;注解的基础操作 2.1 打印日志 2.2 参数传递 2.3 增&#xff08;Insert&#xff09; 2.4 删&#xff08;Delete&#x…

pytest分布式执行(pytest-xdist)

前言 平常我们手工测试用例非常多时&#xff0c;比如有1千条用例&#xff0c;假设每个用例执行需要1分钟。如果一个测试人员执行需要1000分钟才能执行完&#xff0c;当项目非常紧急的时候&#xff0c;我们会用测试人力成本换取时间成本&#xff0c;这个时候多找个小伙伴把任务…

如何使用群晖Synology Office结合内网穿透实现多人远程编辑文件协同办公

使用群晖Synology Office提升生产力&#xff1a;多人同时编辑一个文件 文章目录 使用群晖Synology Office提升生产力&#xff1a;多人同时编辑一个文件本教程解决的问题是&#xff1a;1. 本地环境配置2. 制作本地分享链接3. 制作公网访问链接4. 公网ip地址访问您的分享相册5. 制…

react-route-dom 实现简单的嵌套路由

最终效果 点击 to test1 点击to test2 > to test21 点击to test2 > to test22 代码如下 path: "page",element: <父组件 />,children: [{ path: "test1", element: <Test1 /> },{path: "test2",element: <Test2 />…