【Seata源码学习 】 篇二 TM与RM初始化过程
1.GlobalTransactionScanner 初始化
GlobalTransactionScanner 实现了InitializingBean 接口,在初始化后将执行自定义的初始化方法
io.seata.spring.annotation.GlobalTransactionScanner#afterPropertiesSet
@Override
public void afterPropertiesSet() {
//是否禁用了全局事务
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return;
}
//初始化客户端
initClient();
}
io.seata.spring.annotation.GlobalTransactionScanner#initClient
private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
"please change your default configuration as soon as possible " +
"and we don't recommend you to use default tx-service-group's value provided by seata",
DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
//初始化事务管理器
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//init RM
//初始化资源管理器
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
//注册应用上下文关闭回调方法
registerSpringShutdownHook();
}
2. 初始化事务管理器 TM
流程图
实例化 TmNettyRemotingClient
io.seata.tm.TMClient#init(java.lang.String, java.lang.String, java.lang.String, java.lang.String)
public static void init(String applicationId, String transactionServiceGroup) {
//TM进行netty网络通信的客户端
// applicationId 当前应用id transactionServiceGroup 事务分组名称
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
tmNettyRemotingClient.init();
}
首先看下获取实例的方法
io.seata.core.rpc.netty.TmNettyRemotingClient#getInstance(java.lang.String, java.lang.String)
public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
TmNettyRemotingClient tmNettyRemotingClient = getInstance();
tmNettyRemotingClient.setApplicationId(applicationId);
tmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
return tmNettyRemotingClient;
}
io.seata.core.rpc.netty.TmNettyRemotingClient#getInstance()
public static TmNettyRemotingClient getInstance() {
//双检锁,保证只有一个实例
if (instance == null) {
synchronized (TmNettyRemotingClient.class) {
if (instance == null) {
//netty的配置
NettyClientConfig nettyClientConfig = new NettyClientConfig();
//消息处理线程池
//核心线程和最大线程都是16 没有非核心线程
//有界的阻塞队列 容量为200
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
nettyClientConfig.getClientWorkerThreads()),
RejectedPolicies.runsOldestTaskPolicy());
//创建实例
instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
}
}
}
return instance;
}
io.seata.core.rpc.netty.TmNettyRemotingClient#TmNettyRemotingClient
private TmNettyRemotingClient(NettyClientConfig nettyClientConfig,
EventExecutorGroup eventExecutorGroup,
ThreadPoolExecutor messageExecutor) {
//调用父类构造器
super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);
//基于SPI机制加载鉴权签名组件 AuthSigner
this.signer = EnhancedServiceLoader.load(AuthSigner.class);
// set enableClientBatchSendRequest
// 是否开启了批量发送请求对配置。默认 false
this.enableClientBatchSendRequest = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST,
DefaultValues.DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST);
//监听配置是否有变化
ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST, new ConfigurationChangeListener() {
@Override
public void onChangeEvent(ConfigurationChangeEvent event) {
String dataId = event.getDataId();
String newValue = event.getNewValue();
if (ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST.equals(dataId) && StringUtils.isNotBlank(newValue)) {
enableClientBatchSendRequest = Boolean.parseBoolean(newValue);
}
}
});
}
实例化 AbstractNettyRemotingClient
io.seata.core.rpc.netty.AbstractNettyRemotingClient#AbstractNettyRemotingClient
public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
//调用父类构造器 用于处理消息的线程池
super(messageExecutor);
//当前事务角色
this.transactionRole = transactionRole;
//创建 NettyClientBootstrap 实例
clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
//消息处理器
clientBootstrap.setChannelHandlers(new ClientHandler());
//channel管理器
clientChannelManager = new NettyClientChannelManager(
new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
}
实例化 AbstractNettyRemoting
io.seata.core.rpc.netty.AbstractNettyRemoting#AbstractNettyRemoting
public AbstractNettyRemoting(ThreadPoolExecutor messageExecutor) {
//设置处理消息的线程池
this.messageExecutor = messageExecutor;
}
初始化 TmNettyRemotingClient
回到
io.seata.tm.TMClient#init(java.lang.String, java.lang.String, java.lang.String, java.lang.String)
创建完成 TmNettyRemotingClient 实例后,调用init方法
public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
tmNettyRemotingClient.init();
}
@Override
public void init() {
// registry processor
//注册请求处理器
registerProcessor();
if (initialized.compareAndSet(false, true)) {
//调用父类初始化方法
// 1. 定时重连
// 2. 超时检测
super.init();
//如果事务分组不为空
if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
//通过channel管理器建立链接
getClientChannelManager().reconnect(transactionServiceGroup);
}
}
}
io.seata.core.rpc.netty.TmNettyRemotingClient#registerProcessor
private void registerProcessor() {
//根据不同的消息类型,使用不同的消息处理器
//两个处理器 一种对消息进行处理
//还有一种是处理心跳
// 1.registry TC response processor
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
//注册就是将 消息处理器与线程池封装成一对pair,然后在进一步封装成map,map对key为消息类型,value为封装对pair
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);
// 2.registry heartbeat message processor
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
@Override
public void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(requestCode, pair);
}
初始化 AbstractNettyRemotingClient
io.seata.core.rpc.netty.AbstractNettyRemotingClient#init
@Override
public void init() {
//周期线程池 第一次在60秒后通过连接管理器重新建立链接,之后每10秒重新建立一次链接
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
//启动一个周期线程池,每3秒检查一次请求是否超时
super.init();
//启动netty客户端
clientBootstrap.start();
}
io.seata.core.rpc.netty.NettyClientBootstrap#start
启动过程中一共设置了4个消息处理器
- IdleStateHandler 处理心跳
- ProtocolV1Decoder 消息解码
- ProtocolV1Encoder 消息编码
- ClientHandler 消息处理
@Override
public void start() {
if (this.defaultEventExecutorGroup == null) {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
nettyClientConfig.getClientWorkerThreads()));
}
this.bootstrap.group(this.eventLoopGroupWorker).channel(
nettyClientConfig.getClientChannelClazz()).option(
ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,
nettyClientConfig.getClientSocketRcvBufSize());
if (nettyClientConfig.enableNative()) {
if (PlatformDependent.isOsx()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("client run on macOS");
}
} else {
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(EpollChannelOption.TCP_QUICKACK, true);
}
}
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()))
.addLast(new ProtocolV1Decoder())
.addLast(new ProtocolV1Encoder());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}
}
});
if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
LOGGER.info("NettyClientBootstrap has started");
}
}
io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
}
processMessage(ctx, (RpcMessage) msg);
}
io.seata.core.rpc.netty.AbstractNettyRemoting#processMessage
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
}
Object body = rpcMessage.getBody();
//顶层接口 MessageTypeAware
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
//根据消息的类型获取不同的RemotingProcessor进行处理
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
//如果线程池不为空,使用线程池执行 前面封装pair时,线程池都是null
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
long idx = System.currentTimeMillis();
try {
String jstackFile = idx + ".log";
LOGGER.info("jstack command will dump to " + jstackFile);
Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
} catch (IOException exx) {
LOGGER.error(exx.getMessage());
}
allowDumpStack = false;
}
}
} else {
//如果消息处理器对应的线程池是空的,则直接处理
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}
初始化 AbstractNettyRemoting
io.seata.core.rpc.netty.AbstractNettyRemoting#init
public void init() {
//每3秒检查一次请求是否超时
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
MessageFuture future = entry.getValue();
if (future.isTimeout()) {
futures.remove(entry.getKey());
RpcMessage rpcMessage = future.getRequestMessage();
future.setResultMessage(new TimeoutException(String
.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}
nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}
TM客户端与 TC 建立连接
io.seata.core.rpc.netty.NettyClientChannelManager#reconnect
void reconnect(String transactionServiceGroup) {
List<String> availList = null;
try {
//根据事务分组名称找seata服务端地址列表 默认根据File配置映射关系查找
//tx-service-group 事务分组名
//vgroup-mapping.事务分组名=分组seata服务列表名
//seata.service.grouplist.分组seata服务列表名=seata服务地址
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
return;
}
if (CollectionUtils.isEmpty(availList)) {
RegistryService registryService = RegistryFactory.getInstance();
String clusterName = registryService.getServiceGroup(transactionServiceGroup);
//如果找不到任何seata server 服务配置列表,抛出异常
if (StringUtils.isBlank(clusterName)) {
LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
transactionServiceGroup);
return;
}
if (!(registryService instanceof FileRegistryServiceImpl)) {
LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
}
return;
}
Set<String> channelAddress = new HashSet<>(availList.size());
try {
for (String serverAddress : availList) {
try {
//与所有seata server建立长连接
acquireChannel(serverAddress);
channelAddress.add(serverAddress);
} catch (Exception e) {
LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
serverAddress, e.getMessage(), e);
}
}
} finally {
if (CollectionUtils.isNotEmpty(channelAddress)) {
List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());
for (String address : channelAddress) {
String[] array = address.split(":");
aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));
}
RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);
} else {
RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());
}
}
}
3.初始化资源管理器 RM
io.seata.rm.RMClient#init
public static void init(String applicationId, String transactionServiceGroup) {
//获取RmNettyRemotingClient实例
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
//设置资源管理器
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
//设置资源事务管理器
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
//初始化RmNettyRemotingClient
rmNettyRemotingClient.init();
}
实例化 RmNettyRemotingClient
io.seata.core.rpc.netty.RmNettyRemotingClient#getInstance(java.lang.String, java.lang.String)
public static RmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
//获取实例,并创建消息处理线程池
RmNettyRemotingClient rmNettyRemotingClient = getInstance();
//设置应用id
rmNettyRemotingClient.setApplicationId(applicationId);
//设置事务分组名
rmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
return rmNettyRemotingClient;
}
io.seata.core.rpc.netty.RmNettyRemotingClient#getInstance()
public static RmNettyRemotingClient getInstance() {
//双检锁创建实例 保证单例
if (instance == null) {
synchronized (RmNettyRemotingClient.class) {
if (instance == null) {
//netty客户端配置
NettyClientConfig nettyClientConfig = new NettyClientConfig();
//消息处理线程池
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
new NamedThreadFactory(nettyClientConfig.getRmDispatchThreadPrefix(),
nettyClientConfig.getClientWorkerThreads()), new ThreadPoolExecutor.CallerRunsPolicy());
instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
}
}
}
return instance;
}
RmNettyRemotingClient 及其父类的实例化过程都与TM是一致的,我们可以看下继承关系图
真正有区别的地方在于TM客户端初始化的过程与RM客户端初始化的过程
初始化 RmNettyRemotingClient
//获取RmNettyRemotingClient实例
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
//设置资源管理器
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
//设置资源事务管理器
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
//初始化RmNettyRemotingClient
rmNettyRemotingClient.init();
io.seata.core.rpc.netty.RmNettyRemotingClient#init
public void init() {
// 注册消息处理器
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
// Found one or more resources that were registered before initialization
// 与TC建立连接前 会先判断资源是否存在
if (resourceManager != null
&& !resourceManager.getManagedResources().isEmpty()
&& StringUtils.isNotBlank(transactionServiceGroup)) {
getClientChannelManager().reconnect(transactionServiceGroup);
}
}
}
io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor
private void registerProcessor() {
//注册类五种不同的消息处理器
// 1.registry rm client handle branch commit processor
// 分支事务提交
RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
// 2.registry rm client handle branch rollback processor
// 分支事务回滚
RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
// 3.registry rm handler undo log processor
// 回滚日志处理
RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
// 4.registry TC response processor
// TC响应处理
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);
// 5.registry heartbeat message processor
// 心跳信息处理
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
以上就是TM和RM实例化的过程,至于不同的消息处理器的实现我们放到后面去看
4. TM和RM初始化总结
两者其实过程是一致的,TM客户端对象TMClient主要是实例并初始化TmNettyRemotingClient,RM客户端对象RMClient主要是实例并初始化RmNettyRemotingClient。两者类继承关系如下所示
NettyRemotingClient 对象主要是在初始化方法中消息处理器并与TC服务端建立长连接,TM与RM注册的消息处理器是不同的,并且RM在与TC建立连接前会先判断数据库资源是否存在。TmNettyRemotingClient与RmNettyRemotingClient都将共同的方法放到抽象父类 AbstractNettyRemotingClient 中 。
父类 AbstractNettyRemotingClient 封装了原生的Netty信息,用于创建Netty客户端对象,并在初始化方法中启动一个周期线程去定期重新发起连接请求
AbstractNettyRemotingClient 的父类 AbstractNettyRemoting 主要是在执行初始化方法时启动 一个周期线程池,每隔3秒检测一次发送的消息集合中是否有消息超时,默认的超时时间为30秒
io.seata.common.DefaultValues#DEFAULT_RPC_RM_REQUEST_TIMEOUT
long DEFAULT_RPC_RM_REQUEST_TIMEOUT = Duration.ofSeconds(30).toMillis();
我们可以通过设置 transport.rpcRmRequestTimeout (毫秒)去改变这个默认的值