【Seata源码学习 】篇四 TM事务管理器是如何开启全局事务

TM发送 单个或批量 消息

以发送GlobalBeginRequest消息为例

TM在执行拦截器链路前将向TC发送GlobalBeginRequest 消息

io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)

   @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }

注意 消息TYPE_CODE 为 MessageType.TYPE_GLOBAL_BEGIN 值为 1

package io.seata.core.protocol.transaction;

import io.seata.core.protocol.MessageType;
import io.seata.core.rpc.RpcContext;

/**
 * The type Global begin request.
 *
 * @author slievrly
 */
public class GlobalBeginRequest extends AbstractTransactionRequestToTC {

    private int timeout = 60000;

    private String transactionName;

    /**
     * Gets timeout.
     *
     * @return the timeout
     */
    public int getTimeout() {
        return timeout;
    }

    /**
     * Sets timeout.
     *
     * @param timeout the timeout
     */
    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    /**
     * Gets transaction name.
     *
     * @return the transaction name
     */
    public String getTransactionName() {
        return transactionName;
    }

    /**
     * Sets transaction name.
     *
     * @param transactionName the transaction name
     */
    public void setTransactionName(String transactionName) {
        this.transactionName = transactionName;
    }

    @Override
    public short getTypeCode() {
        return MessageType.TYPE_GLOBAL_BEGIN;
    }


    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return handler.handle(this, rpcContext);
    }

    @Override
    public String toString() {
        StringBuilder result = new StringBuilder();
        result.append("timeout=");
        result.append(timeout);
        result.append(",");
        result.append("transactionName=");
        result.append(transactionName);

        return result.toString();
    }
}

io.seata.tm.DefaultTransactionManager#syncCall

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
        } catch (TimeoutException toe) {
            throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
        }
    }

io.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object)

@Override
    public Object sendSyncRequest(Object msg) throws TimeoutException {
        // 获取seata服务端地址
        String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
        // TM RPC服务调用默认超时时间30s
        long timeoutMillis = this.getRpcRequestTimeout();
        // 将GlobalBeginRequest 封装为 RpcMessage
        RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

        // send batch message
        // put message into basketMap, @see MergedSendRunnable
        // 是否开启seata客户端批量发送消息 1.5默认关闭
        if (this.isEnableClientBatchSendRequest()) {

            // send batch message is sync request, needs to create messageFuture and put it in futures.
            // 批量发送消息需要将消息封装为 MessageFuture 对象 并添加到 futures Map集合中
            MessageFuture messageFuture = new MessageFuture();
            messageFuture.setRequestMessage(rpcMessage);
            messageFuture.setTimeout(timeoutMillis);
            futures.put(rpcMessage.getId(), messageFuture);

            // put message into basketMap
            // 获取当前服务端地址对应的消息队列
            BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
                key -> new LinkedBlockingQueue<>());
            // 将当前消息添加到队列中 一般不会添加失败 LinkedBlockingQueue 是无界队列
            if (!basket.offer(rpcMessage)) {
                LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                        serverAddress, rpcMessage);
                return null;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("offer message: {}", rpcMessage.getBody());
            }
            // 如果当前没有在发送队列消息 给mergeLock对象上锁成功 则唤醒所有等待发送消息的线程
            // isSending 被volatile 修饰 保证可见性和有序性 但是不保证原子性
            if (!isSending) {
                synchronized (mergeLock) {
                    mergeLock.notifyAll();
                }
            }

            try {
                // MessageFuture 封装了 CompletableFuture 对象,此时会超时阻塞当前线程,超时时间30秒
                // 等待 CompletableFuture.complete 完成获取结果
                return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            } catch (Exception exx) {
                // 如果有异常抛出
                LOGGER.error("wait response error:{},ip:{},request:{}",
                    exx.getMessage(), serverAddress, rpcMessage.getBody());
                if (exx instanceof TimeoutException) {
                    throw (TimeoutException) exx;
                } else {
                    throw new RuntimeException(exx);
                }
            }

        } else {
            // 如果没有开启客户端批量发送消息 先获取channel
            Channel channel = clientChannelManager.acquireChannel(serverAddress);
            // 同步发送消息 并将RPCMessage 封装为 MessageFuture,并设置超时时间 放入 futures Map集合中
            // 由父类AbstractNettyRemoting的周期线程每隔3秒检查一次消息是否超时
            // 发送消息时会添加 ChannelFutureListener 监听器,如果消息成功,则调用 CompletableFuture.complete 设置结果,
            // 并将当前消息id对应的MessageFuture 从futures 中移除
            return super.sendSync(channel, rpcMessage, timeoutMillis);
        }

    }

单个发送消息

如果是发送单个消息,则直接调用AbstractNettyRemoting.sendSync 向TC端发送消息

io.seata.core.rpc.netty.AbstractNettyRemoting#sendSync

protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
        if (timeoutMillis <= 0) {
            throw new FrameworkException("timeout should more than 0ms");
        }
        if (channel == null) {
            LOGGER.warn("sendSync nothing, caused by null channel.");
            return null;
        }

        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        // 设置超时时间 用于检测是否超时 System.currentTimeMillis() - start > timeout
        messageFuture.setTimeout(timeoutMillis);
        futures.put(rpcMessage.getId(), messageFuture);

        channelWritableCheck(channel, rpcMessage.getBody());

        String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
        doBeforeRpcHooks(remoteAddr, rpcMessage);

        channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
            if (!future.isSuccess()) {
                //根据消息id从futures中移除,不再进行消息超时检测
                MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
                if (messageFuture1 != null) {
                    //设置结果
                    messageFuture1.setResultMessage(future.cause());
                }
                //销毁连接
                destroyChannel(future.channel());
            }
        });

        try {
            // 超时阻塞等待获取结果
            Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            doAfterRpcHooks(remoteAddr, rpcMessage, result);
            return result;
        } catch (Exception exx) {
            LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),
                rpcMessage.getBody());
            if (exx instanceof TimeoutException) {
                throw (TimeoutException) exx;
            } else {
                throw new RuntimeException(exx);
            }
        }
    }

messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS) 会阻塞等待30s获取消息,超时则抛出异常 TimeoutException

批量发送消息

 // send batch message is sync request, needs to create messageFuture and put it in futures.
            // 批量发送消息需要将消息封装为 MessageFuture 对象 并添加到 futures Map集合中
            MessageFuture messageFuture = new MessageFuture();
            messageFuture.setRequestMessage(rpcMessage);
            messageFuture.setTimeout(timeoutMillis);
            futures.put(rpcMessage.getId(), messageFuture);

            // put message into basketMap
            // 获取当前服务端地址对应的消息队列
            BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
                key -> new LinkedBlockingQueue<>());
            // 将当前消息添加到队列中 一般不会添加失败 LinkedBlockingQueue 是无界队列
            if (!basket.offer(rpcMessage)) {
                LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                        serverAddress, rpcMessage);
                return null;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("offer message: {}", rpcMessage.getBody());
            }
            // 如果当前没有在发送队列消息 给mergeLock对象上锁成功 则唤醒所有等待发送消息的线程
            // isSending 被volatile 修饰 保证可见性和有序性 但是不保证原子性
            if (!isSending) {
                synchronized (mergeLock) {
                    mergeLock.notifyAll();
                }
            }

            try {
                // MessageFuture 封装了 CompletableFuture 对象,此时会超时阻塞当前线程,超时时间30秒
                // 等待 CompletableFuture.complete 完成获取结果
                return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            } catch (Exception exx) {
                // 如果有异常抛出
                LOGGER.error("wait response error:{},ip:{},request:{}",
                    exx.getMessage(), serverAddress, rpcMessage.getBody());
                if (exx instanceof TimeoutException) {
                    throw (TimeoutException) exx;
                } else {
                    throw new RuntimeException(exx);
                }
            }

如果批量发送消息,则会将消息放到basketMap 集合中,AbstractNettyRemotingClient会在初始化时,启动最大和核心都是一的单线程池线程池,提交MergedSendRunnable 任务,死循环不断遍历basketMap,获取等待发送的消息队列,最终由io.seata.core.rpc.netty.AbstractNettyRemoting#sendAsync 发送异步消息。需要注意的是,不管是发送同步消息还是异步消息,TM开启事务所属的线程都会因messageFuture.get 超时阻塞,只不过发送和获取返回消息都变成了异步。

public void run() {
            // 死循环
            while (true) {
                //先上锁
                synchronized (mergeLock) {
                    // 等待 1s 并释放当前锁
                    try {
                        mergeLock.wait(MAX_MERGE_SEND_MILLS);
                    } catch (InterruptedException e) {
                    }
                }
                isSending = true;
                // 遍历Map集合
                basketMap.forEach((address, basket) -> {
                    if (basket.isEmpty()) {
                        return;
                    }

                    MergedWarpMessage mergeMessage = new MergedWarpMessage();
                    // 弹出同一个seata服务器地址等待发送的所有消息,合并在一块发送
                    while (!basket.isEmpty()) {
                        RpcMessage msg = basket.poll();
                        // 获取消息体 与消息id 封装为 MergedWarpMessage
                        mergeMessage.msgs.add((AbstractMessage) msg.getBody());
                        mergeMessage.msgIds.add(msg.getId());
                    }
                    if (mergeMessage.msgIds.size() > 1) {
                        printMergeMessageLog(mergeMessage);
                    }
                    Channel sendChannel = null;
                    try {
                        // send batch message is sync request, but there is no need to get the return value.
                        // Since the messageFuture has been created before the message is placed in basketMap,
                        // the return value will be obtained in ClientOnResponseProcessor.
                        // 发送批量消息不会在此处阻塞等待消息的返回  将会采用异步的方式 由 ClientOnResponseProcessor 消息处理器获取返回消息
                        sendChannel = clientChannelManager.acquireChannel(address);
                        AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
                    } catch (FrameworkException e) {
                        if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                            destroyChannel(address, sendChannel);
                        }
                        // fast fail
                        for (Integer msgId : mergeMessage.msgIds) {
                            MessageFuture messageFuture = futures.remove(msgId);
                            if (messageFuture != null) {
                                messageFuture.setResultMessage(
                                    new RuntimeException(String.format("%s is unreachable", address), e));
                            }
                        }
                        LOGGER.error("client merge call failed: {}", e.getMessage(), e);
                    }
                });
                isSending = false;
            }
        }

接受异步消息由TM初始化时添加的ClientOnResponseProcessor 进行处理,将会遍历所有合并的消息,根据消息ID将其从futures中移除,并调用 future.setResultMessage 设置结果,此时TM发送消息时的阻塞状态将会被唤醒。

io.seata.core.rpc.processor.client.ClientOnResponseProcessor#process

  @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        //如果当前消息属于合并发送的消息
        if (rpcMessage.getBody() instanceof MergeResultMessage) {
            //获取消息体与消息ID,并将消息id对应的MessageFuture从futures中移除,不再进行超时检测
            MergeResultMessage results = (MergeResultMessage) rpcMessage.getBody();
            MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(rpcMessage.getId());
            //遍历所有的消息
            for (int i = 0; i < mergeMessage.msgs.size(); i++) {
                int msgId = mergeMessage.msgIds.get(i);
                MessageFuture future = futures.remove(msgId);
                if (future == null) {
                    LOGGER.error("msg: {} is not found in futures, result message: {}", msgId,results.getMsgs()[i]);
                } else {
                    //在此时设置消息结果 结束阻塞等待
                    future.setResultMessage(results.getMsgs()[i]);
                }
            }
            // 与合并消息的处理是一致的
        } else if (rpcMessage.getBody() instanceof BatchResultMessage) {
            try {
                BatchResultMessage batchResultMessage = (BatchResultMessage) rpcMessage.getBody();
                for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) {
                    int msgId = batchResultMessage.getMsgIds().get(i);
                    MessageFuture future = futures.remove(msgId);
                    if (future == null) {
                        LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, batchResultMessage.getResultMessages().get(i));
                    } else {
                        future.setResultMessage(batchResultMessage.getResultMessages().get(i));
                    }
                }
            } finally {
                // In order to be compatible with the old version, in the batch sending of version 1.5.0,
                // batch messages will also be placed in the local cache of mergeMsgMap,
                // but version 1.5.0 no longer needs to obtain batch messages from mergeMsgMap
                mergeMsgMap.clear();
            }
        } else {
            MessageFuture messageFuture = futures.remove(rpcMessage.getId());
            if (messageFuture != null) {
                messageFuture.setResultMessage(rpcMessage.getBody());
            } else {
                if (rpcMessage.getBody() instanceof AbstractResultMessage) {
                    if (transactionMessageHandler != null) {
                        transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
                    }
                }
            }
        }
    }

TC 处理 GlobalBeginRequest 消息

NettyChannel消息处理

io.seata.core.rpc.netty.NettyRemotingServer#init

 @Override
    public void init() {
        // registry processor
        registerProcessor();
        if (initialized.compareAndSet(false, true)) {
            super.init();
        }
    }

io.seata.core.rpc.netty.NettyRemotingServer#registerProcessor

private void registerProcessor() {
        // 1. registry on request message processor
        ServerOnRequestProcessor onRequestProcessor =
            new ServerOnRequestProcessor(this, getHandler());
        ShutdownHook.getInstance().addDisposable(onRequestProcessor);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
        //处理GlobalBeginRequest消息  ServerOnRequestProcessor
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
        // 2. registry on response message processor
        ServerOnResponseProcessor onResponseProcessor =
            new ServerOnResponseProcessor(getHandler(), getFutures());
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
        // 3. registry rm message processor
        RegRmProcessor regRmProcessor = new RegRmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
        // 4. registry tm message processor
        RegTmProcessor regTmProcessor = new RegTmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
        // 5. registry heartbeat message processor
        ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
    }	

进一步跟进查看具体如何处理

io.seata.core.rpc.processor.server.ServerOnRequestProcessor#process

    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        // channel是否已经注册
        if (ChannelManager.isRegistered(ctx.channel())) {
            onRequestMessage(ctx, rpcMessage);
        } else {
            try {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
                }
                ctx.disconnect();
                ctx.close();
            } catch (Exception exx) {
                LOGGER.error(exx.getMessage());
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
            }
        }
    }

进入io.seata.core.rpc.processor.server.ServerOnRequestProcessor#onRequestMessage

private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        Object message = rpcMessage.getBody();
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message,
                NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
        } else {
            try {
                BatchLogHandler.INSTANCE.getLogQueue()
                    .put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:"
                        + rpcContext.getTransactionServiceGroup());
            } catch (InterruptedException e) {
                LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
            }
        }
        //GlobalBeginRequest 继承 AbstractTransactionRequest 继承 AbstractMessage
        if (!(message instanceof AbstractMessage)) {
            return;
        }
        // 合并消息处理
        // the batch send request message
        if (message instanceof MergedWarpMessage) {
            //是否开启了TC批量响应 默认false  rpcContext 的 version 不为空并且大于等于 1.5.0
            // 如果满足 则使用 MergedWarpMessage 来处理请求消息
            if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
                && Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
                List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
                List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;
                //遍历处理
                for (int i = 0; i < msgs.size(); i++) {
                    AbstractMessage msg = msgs.get(i);
                    int msgId = msgIds.get(i);
                    //是否开启并发处理消息 默认关闭
                    if (PARALLEL_REQUEST_HANDLE) {
                        CompletableFuture.runAsync(
                            () -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
                    } else {
                        //单个消息处理
                        handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
                    }
                }
            } else {
                // results 响应结果集 如果开启了并发处理消息 需要保证线程安全
                // completableFutures 并发处理消息
                List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();
                List<CompletableFuture<Void>> completableFutures = null;
                for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
                    // 默认关闭 没有开启并发处理消息 如果开启了 则使用 completableFutures 来并发处理消息
                    if (PARALLEL_REQUEST_HANDLE) {
                        if (completableFutures == null) {
                            completableFutures = new ArrayList<>();
                        }
                        int finalI = i;
                        // 并发异步处理消息,并将结果添加到results中
                        completableFutures.add(CompletableFuture.runAsync(() -> {
                            results.add(finalI, handleRequestsByMergedWarpMessage(
                                ((MergedWarpMessage)message).msgs.get(finalI), rpcContext));
                        }));
                    } else {
                        // 处理消息并按顺序添加到results集合中
                        results.add(i,
                            handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
                    }
                }
                if (CollectionUtils.isNotEmpty(completableFutures)) {
                    try {
                        CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();
                    } catch (InterruptedException | ExecutionException e) {
                        LOGGER.error("handle request error: {}", e.getMessage(), e);
                    }
                }
                MergeResultMessage resultMessage = new MergeResultMessage();
                resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
                remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
            }
        } else {
            // the single send request message
            final AbstractMessage msg = (AbstractMessage) message;
            AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
        }
    }

io.seata.core.rpc.processor.server.ServerOnRequestProcessor#handleRequestsByMergedWarpMessage

private AbstractResultMessage handleRequestsByMergedWarpMessage(AbstractMessage subMessage, RpcContext rpcContext) {
    return transactionMessageHandler.onRequest(subMessage, rpcContext);
}

io.seata.server.coordinator.DefaultCoordinator#onRequest

public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
        if (!(request instanceof AbstractTransactionRequestToTC)) {
            throw new IllegalArgumentException();
        }
        AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
        transactionRequest.setTCInboundHandler(this);

        return transactionRequest.handle(context);
    }

io.seata.core.protocol.transaction.GlobalBeginRequest#handle

public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return handler.handle(this, rpcContext);
    }

io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalBeginRequest, io.seata.core.rpc.RpcContext)

  public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
        GlobalBeginResponse response = new GlobalBeginResponse();
        exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
            @Override
            public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
                try {
                    //真正处理beging消息
                    doGlobalBegin(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore,
                        String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
                        e);
                }
            }
        }, request, response);
        return response;
    }

获取全局事务XID

io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin

    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
            throws TransactionException {
        // 通过DefaultCore根据应用ID,事务分组名称,超时时间 创建并开启一个新的事务 返回全局事务XID 放入GlobalBeginResponse中
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
                request.getTransactionName(), request.getTimeout()));
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
                    rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
        }
    }

全局事务会话持久化

io.seata.server.coordinator.DefaultCore#begin

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        // 创建全局事务对象 由XID.generateXID 生成全局事务XID
        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
            timeout);
        // 将xid放进ThreadLocal中
        MDC.put(RootContext.MDC_KEY_XID, session.getXid());
//        SessionHolder.getRootSessionManager() 根据SPI机制去查找
//        SessionHolder 在 io.seata.server.Server.start 启动时初始化,获取当前配置的 会话持久机制模式  ,调用  SessionHolder.init(sessionStoreMode)进行初始化
//        此时我们通过 SessionHolder.getRootSessionManager() 将使用seata的SPI机制去 META-INF/services/ 与  META-INF/seata/ 目录下查找
//        文件名为 io.seata.server.session.SessionManager 的文件 在根据 @LoadLevel注解的name值加载需要的对象
//         例如 如果我们此时的session持久化模式为DB,那么  SessionHolder.getRootSessionManager() 将加载返回 DataBaseSessionManager 对象
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

        //开启事务 标记GlobalSession 的事务状态为 1 ,并记录开启时间与激活状态  调用SessionLifecycleListener监听器的onBegin方法
        session.begin();

        // transaction start event
        //发送事务开启事件
        MetricsPublisher.postSessionDoingEvent(session, false);

        //返回XID
        return session.getXid();
    }

Seata的SPI机制会根据 EnhancedServiceLoader.load(类, 名称) 方法参数一的类的全限定类名,从META-INF/services/ 与 META-INF/seata/ 路径下去匹配,然后根据匹配到的类的全限定类名,定位到具体的类,再根据参数二名称与@LoadLevel注解的name值进行匹配 ,确定要加载的对象,如下所示

image-20231121152441877

加载 DataBaseSessionManager 对象后,添加其到session的生命监听器列表中,在执行session.begin方法时,调用监听器的onBegin方法,进而由父类AbstractSessionManager 执行 onBegin方法,并调用DataBaseSessionManager重写的addGlobalSession方法

io.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession

    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        if (StringUtils.isBlank(taskName)) {
            // 将全局事务会话信息写入数据库
            boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
            if (!ret) {
                throw new StoreException("addGlobalSession failed.");
            }
        } else {
            boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
            if (!ret) {
                throw new StoreException("addGlobalSession failed.");
            }
        }
    }

io.seata.server.storage.db.store.DataBaseTransactionStoreManager#writeSession

    public boolean writeSession(LogOperation logOperation, SessionStorable session) {
        if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
            // logStore 封装了 DataSource   根据不同类型的数据源生成相应的insert语句 持久化到数据库中
            return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
        } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
            return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
        } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
            return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
        } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
            return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
        } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
            return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
        } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
            return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
        } else {
            throw new StoreException("Unknown LogOperation:" + logOperation.name());
        }
    }

io.seata.server.storage.db.store.LogStoreDataBaseDAO#insertGlobalTransactionDO

public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
        // 生成SQL插入 global_table (默认) 表中,持久化全局事务会话
        String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
        Connection conn = null;
        PreparedStatement ps = null;
        try {
            int index = 1;
            conn = logStoreDataSource.getConnection();
            //自动提交
            conn.setAutoCommit(true);
            ps = conn.prepareStatement(sql);
            ps.setString(index++, globalTransactionDO.getXid());
            ps.setLong(index++, globalTransactionDO.getTransactionId());
            ps.setInt(index++, globalTransactionDO.getStatus());
            ps.setString(index++, globalTransactionDO.getApplicationId());
            ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
            String transactionName = globalTransactionDO.getTransactionName();
            transactionName = transactionName.length() > transactionNameColumnSize ?
                transactionName.substring(0, transactionNameColumnSize) :
                transactionName;
            ps.setString(index++, transactionName);
            ps.setInt(index++, globalTransactionDO.getTimeout());
            ps.setLong(index++, globalTransactionDO.getBeginTime());
            ps.setString(index++, globalTransactionDO.getApplicationData());
            return ps.executeUpdate() > 0;
        } catch (SQLException e) {
            throw new StoreException(e);
        } finally {
            IOUtil.close(ps, conn);
        }
    }

最终生成的SQL如下所示

insert into global_table(xid, transaction_id, status, application_id, transaction_service_group, transaction_name,
                         timeout, begin_time, application_data, gmt_create, gmt_modified)
values (全局事务XID, 事务id, 事务状态,应用id, 事务分组, 事务名称, 超时时间, 开始时间, 应用数据, now(), now())

数据库中的数据

image-20231121214708246

返回GlobalBeginResponse消息

在创建全局事务并开启后,拿到XID后封装到GlobalBeginResponse中,最终由remotingServer.sendAsyncResponse将GlobalBeginResponse消息返回给TM

TM处理 GlobalBeginResponse 消息

在seata服务端返回 GlobalBeginResponse 消息 后, TM还是由 io.seata.core.rpc.processor.client.ClientOnResponseProcessor#process 处理接收到的消息,通过调用future.setResultMessage 设置消息结果,并恢复阻塞的TM发送GlobalBeginRequest消息的线程,将结果返回

io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)

public void begin(int timeout, String name) throws TransactionException {
        if (role != GlobalTransactionRole.Launcher) {
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNull();
        String currentXid = RootContext.getXID();
        if (currentXid != null) {
            throw new IllegalStateException("Global transaction already exists," +
                " can't begin a new global transaction, currentXid = " + currentXid);
        }
        // 获取到seata服务端返回到XID
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        //将xid绑定到RootContext中
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid);
        }
    }

获取到XID后,将XID绑定到RootContext中,至此,全局事务的开启过程也就结束了。

总结流程图

TM开启全局事务过程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/185468.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络安全工程师究竟是什么?怎么入门?

首先啊骚年们我们必须先了解网络安全这个行业究竟是干啥的。 是打ctf的&#xff1f;一个个都像韩商言吴白那么帅刷刷敲几个代码就能轻易夺旗&#xff1f; 还是像十大黑客之一的米特尼克一样闯入了“北美空中防务指挥系统”的计算机主机内&#xff0c;还在被通缉逃跑期间控制了…

鸿蒙原生应用/元服务开发-AGC分发如何上架HarmonyOS应用

一、上架整体流程 二、上架HarmonyOS应用 获取到HarmonyOS应用软件包后&#xff0c;开发者可将应用提交至AGC申请上架。上架成功后&#xff0c;用户即可在华为应用市场搜索获取开发者的HarmonyOS应用。 配置应用信息 1.登录AppGallery Connect&#xff0c;选择“我的应用”。…

最重要的BI测试-适用于任何BI和分析平台

为什么 BI 测试是答案 相信你的数据可视化是成功执行商业智能 (BI) 和分析项目的关键因素。我敢肯定&#xff0c;你遇到过以下情况&#xff1a;业务主管或业务用户反馈说他们的分析看起来不对&#xff0c;他们的 KPI 看起来有问题&#xff0c;或者速度太慢而无法使用。要问自己…

Spring框架学习 -- Bean的生命周期和作用域

目录 前言 案例 案例分析 作用域的定义 Bean对象的6种作用域 Singleton prototype 设置作用域 ​编辑延迟初始化 Spring的执行流程 Bean的生命周期 前言 我们可以类比一下普通变量的生命周期和作用域, 大多数变量的生命周期和作用域都被限定在了花括号内 {}, 除…

贝锐花生壳:无需公网IP、简单3步,远程访问群晖NAS

面对NAS远程访问难题&#xff0c;贝锐花生壳一招搞定&#xff01;并且无需公网IP、简单3步&#xff0c;即可实现固定域名远程访问NAS。 步骤1&#xff1a; 目前&#xff0c;群晖NAS已在套件中心内置花生壳客户端。 浏览器进入群晖NAS的DSM管理界面&#xff0c;点击【套件中心】…

SSM大学生社团信息管理系统-99953,(免费领取源码)计算机毕业设计选题开题+程序定制+论文书写+答辩ppt书写 包售后 全流程

SSM大学生社团信息管理系统APP 摘 要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;高校当然也不能排除在外。大学生社团信息管理系统APP是以实际运用为开发背景&#xff0c…

[Python程序打包: 使用PyInstaller制作单文件exe以及打包GUI程序详解]

文章目录 概要Python 程序打包—使用 Pyinstaller 打包 exePython程序打包—使用Pyinstaller打包GUI程序Python程序打包—使用 Pyinstaller 设置 exe 图标小结 概要 使用PyInstaller工具将Python程序打包成可执行&#xff08;EXE&#xff09;文件。将Python程序打包成EXE的好处…

unittest指南——不拼花哨,只拼实用

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;欢迎加入我们一起学习&#xff01;&#x1f4e2;资源分享&#xff1a;耗时200小时精选的「软件测试」资…

看完就会,从抓包到接口测试的全过程解析【1500字保姆级教程】

一、为什么抓包 1、从功能测试角度 通过抓包查看隐藏字段 Web 表单中会有很多隐藏的字段&#xff0c;这些隐藏字段一般都有一些特殊的用途&#xff0c;比如收集用户的数据&#xff0c;预防 CRSF 攻击&#xff0c;防网络爬虫&#xff0c;以及一些其他用途。这些隐藏字段在界面…

从裸机启动开始运行一个C++程序(十四)

前序文章请看&#xff1a; 从裸机启动开始运行一个C程序&#xff08;十三&#xff09; 从裸机启动开始运行一个C程序&#xff08;十二&#xff09; 从裸机启动开始运行一个C程序&#xff08;十一&#xff09; 从裸机启动开始运行一个C程序&#xff08;十&#xff09; 从裸机启动…

socket can中是如何根据 结构体can_bittiming_const中的字段 计算bitrate的?

在 SocketCAN 中&#xff0c;can_bittiming_const 结构体用于表示 CAN 总线的定时参数&#xff0c;包括位率&#xff08;bitrate&#xff09;的计算。can_bittiming_const 包含了许多与位率相关的参数&#xff0c;其中一些参数用于计算实际的位率。 下面是一些与位率计算相关的…

js实现数组的上下移动

思路&#xff1a;上移表示index索引位置减去1&#xff0c;下移表示index索引位置增加1。使用数组的splice方法实现。例如上移&#xff1a;splice(元素当前索引位置&#xff0c;1(删除1个元素)&#xff0c;‘元素当前索引位置 - 1’)。

ruoyi 若依框架采用第三方登录

在项目中&#xff0c;前后端分离的若依项目&#xff0c;需要通过统一认证&#xff0c;或者是第三方协带认证信息跳转到本系统的指定页面。需要前后端都做相应的改造&#xff0c;由于第一次实现时已过了很久&#xff0c;再次重写时&#xff0c;发现还是搞了很长时间&#xff0c;…

任意分圆环下的 RLWE:如何产生正确的噪声分布

参考文献&#xff1a; [Con09] Conrad K. The different ideal[J]. Expository papers/Lecture notes. Available at: http://www.math.uconn.edu/∼kconrad/blurbs/gradnumthy/different.pdf, 2009.[LPR10] Lyubashevsky V, Peikert C, Regev O. On ideal lattices and learn…

友思特分享 | Neuro-T:零代码自动深度学习训练平台

来源&#xff1a;友思特 智能感知 友思特分享 | Neuro-T&#xff1a;零代码自动深度学习训练平台 欢迎关注虹科&#xff0c;为您提供最新资讯&#xff01; 工业自动化、智能化浪潮涌进&#xff0c;视觉技术在其中扮演了至关重要的角色。在汽车、制造业、医药、芯片、食品等行业…

日本服务器访问速度和带宽有没有直接关系?

​  对于许多网站和应用程序来说&#xff0c;服务器的访问速度是至关重要的。用户希望能够快速加载页面、上传和下载文件&#xff0c;而这些都与服务器的带宽有关。那么&#xff0c;日本服务器的访问速度和带宽之间是否存在直接关系呢? 我们需要了解什么是带宽。带宽是指网络…

NVMe-oF E-JBOF设计解析:WD RapidFlex网卡、OpenFlex Data24

OpenFlex Data24 NVMe-oF Storage Platform WD的SN840 NVMeSSD新品并没有太吸引我注意&#xff0c;因为它还是PCIe 3.0接口的&#xff0c;要知道Intel的PCIe 4.0 SSD都已经推出了。 但上面这个NVMe-oF&#xff08;NVMe over Fabric&#xff09;EBOF&#xff08;区别于普通JBO…

中国上市公司漂绿程度及其同构指数(多种测算方法,2012-2022年)

数据简介&#xff1a;20 世纪 90 年代开始&#xff0c;国际上关于绿色市场和绿色管理的学术文献日渐丰富&#xff0c;众多企业积极响应碳排放政策的号召&#xff0c;但其中有多少企业是实实在在的进行碳减排技术创新&#xff0c;又有多少企业打着绿色低碳行为的口号来吸引眼球、…

浏览器缓存、本地存储、Cookie、Session、Token

目录 前端通信&#xff08;渲染、http、缓存、异步、跨域&#xff09; HTTP与HTTPS&#xff0c;HTTP版本、状态码 请求头&#xff0c;响应头 缓存 强制缓存&#xff1a;Cache-Control:max-age&#xff08;HTTP1.1&#xff09;>Expires&#xff08;1.0&#xff09; js、…

【Spring篇】JDK动态代理

目录 什么是代理&#xff1f; 代理模式 动态代理 Java中常用的代理模式 问题来了&#xff0c;如何动态生成代理类&#xff1f; 动态代理底层实现 什么是代理&#xff1f; 顾名思义&#xff0c;代替某个对象去处理一些问题&#xff0c;谓之代理&#xff0c;那么何为动态&a…