前言:
👏作者简介:我是笑霸final,一名热爱技术的在校学生。
📝个人主页: 笑霸final的主页2
📕系列专栏:java专栏
📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏收藏🤏
上一章节:启动你的RocketMQ之旅-基本认知
目录
- 一、Namesrv启动流程
- 启动类 main0
- start(controller)方法
- contoller.initialize()方法
- contoller.start()
- 二、broker 启动流程
- 启动类
- controller.initialize()创建并初始化Broker
- controller.start();
一、Namesrv启动流程
Namesrv启动流程如下图
namesrv启动流程:先是根据命令行参数来创建和初始化 NnamesrvContoller;然后进入Star方法,此方法主要调用了controller.initialize()和controller.start()分别是用来进行初始化和启动NnamesrvContoller,此外还添加了一个jvm关闭钩子的线程,用来释放资源。在controller.initialize()方法中创建了Netty服务器、用于处理网路请求的线程池。还有开启了2个定时任务,一个是每10s执行一次,来检查未激活的broker节点。另一个是10min执行一次,用来打印KV配置。
启动类 main0
在main0中有两个主要的方法
- 一个是 createNamesrvController(args) 根据命令行参数创建并初始化RocketMQ Name Server控制器
- 另一个是 start(NamesrvController) 负责用于执行具体的启动逻辑,如加载配置、初始化Netty服务器、启动定时任务等
start(controller)方法
该方法的主要作用是启动Name Server控制器,并在启动过程中进行必要的初始化。若初始化失败,将执行清理工作并结束进程。同时,为了确保在Java虚拟机(JVM)关闭时能够正确地关闭并释放资源,添加了一个关闭钩子线程。最后,在所有操作完成后返回已启动的NamesrvController实例。下面是关键代码
public static NamesrvController start(final NamesrvController controller) throws Exception {
// ...非关键代码 ...
/*
* 调用NamesrvController的initialize()方法进行初始化操作
*/
boolean initResult = controller.initialize();
// ...非关键代码...
/**
* 添加JVM关闭钩子线程,在JVM关闭前执行controller的shutdown()方法以确保资源正常释放
*/
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
controller.start();// 启动NamesrvControlle
return controller; // 初始化和启动成功后,返回已启动的NamesrvController实例
}
代码中的JVM关闭钩子线程是向Java运行时环境(JVM)注册一个关闭钩子(Shutdown Hook)。Runtime.getRuntime().addShutdownHook() 方法本身不会阻塞当前线程,它只是将指定的 Thread 对象(在这里是一个实现了 Runnable 接口的 ShutdownHookThread 实例)添加到 JVM 的关闭序列中。
● 关闭NettyRemotingServer实例
● 关闭线程池remotingExecutor,这是一个用于执行与网络通信相关的异步任务的线程池
● 关闭定时任务调度器scheduledExecutorService
● 果存在文件监控服务fileWatchService,则关闭该服务
contoller.initialize()方法
主要功能:
- 加载KV配置管理器的配置。
- 创建并初始化网络通信服务器(NettyRemotingServer)。
- 创建用于处理网络请求的线程池。
- 注册消息处理器。
- 定时任务:每10秒执行一次,检查并扫描未激活的Broker节点(超过120s 就则认为Broker失效,移除该Broker)
- 定时任务:每10分钟执行一次,周期性打印所有KV配置信息
- 如果启用了TLS加密,创建文件监听服务以监视证书文件的变化,并在证书或密钥文件更改时动态更新服务器的SSL上下文。
下面是关键代码
public boolean initialize() {
//。。。加载KV配置管理器的配置。
/**
* 创建一个新的NettyRemotingServer实例,用于处理网络通信,
* 传入nettyServerConfig和brokerHousekeepingService参数
*/
this.remotingServer
= new NettyRemotingServer(
this.nettyServerConfig,
this.brokerHousekeepingService
);
//。。。。创建固定大小的线程池,用于执行网络通信任务
this.registerProcessor();// 注册处理器
/**
* 定时任务:每10秒执行一次,检查并扫描未激活的Broker节点
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 。。。。定时任务:每10分钟执行一次,周期性打印所有KV配置信息。。。
//。。。如果启用了TLS加密,创建文件监听服务以监视证书文件的变化,
//并在证书或密钥文件更改时动态更新服务器的SSL上下文。。。
return true;
}
contoller.start()
主要完成了基于Netty框架的RocketMQ服务端启动流程,包括设置线程模型、网络参数、处理器链路以及定时任务等关键步骤。
1.初始化了一个默认的事件执行器组,负责处理网络IO任务。
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {....});
2、通过ServerBootstrap进行Netty服务端的核心配置,根据操作系统选择Epoll或Nio通道类型、设置Socket选项、指定监听端口以及构建子Channel的处理器Pipeline。
ServerBootstrap childHandler = this.serverBootstrap.group(...)
// 根据操作系统选择Epoll或Nio通道类型
.channel(useEpoll() ? EpollServerSocketChannel.class
: NioServerSocketChannel.class)
//允许最多有1024个未完成连接请求排队等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
.option(...) // 设置Socket参数,如监听队列大小、地址重用等
.childOption(...) // 设置子Channel选项,如TCP_NODELAY、发送/接收缓冲区大小等
.localAddress(
new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(...); // 初始化子Channel处理器链路
3.使用Java Timer类安排了一个定期任务,每隔固定时间(此处为每秒)调用scanResponseTable()方法来扫描响应表(所有正在进行的请求。)
● 会将那行超时的请求释放出来。
his.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
二、broker 启动流程
启动流程如下图
broker启动流程:在启动方法中调用了creatbrokerContoller()来创建brokerContoller,其中调用了controller.creatBrokerController加载broker相关信息后,创建一序列的创建线程池和定时任务。然后就是执行contoller.start()来启动broker相关服务,同时还创建了一个定时任务来向namesrv发送心跳包,使用了CountDownLatch来并发向所有namesrv发送心跳。心跳包的格式封装了请求头和请求体,请求头包括broker地址、brokerId、broker名字、集群名称、高可用(High Availability, HA)服务器地址、压缩标志、CRC32校验码;请求体包括 主题配置信息、滤服务器列表
启动类
creatBrokerContoller()方法
● 首先初始化网络通信相关的参数、解析命令行输入、加载配置文件、验证并设置必要的Broker相关配置;初始化Netty网络参数时 ,如果系统属性未设置Socket发送和接收缓冲区大小,则使用默认值131072。并且初始化了客户端和服务端。
● 然后创建并初始化BrokerController实例(controller.initialize()😉。同时,该方法还会注册一个JVM关闭钩子,确保在JVM关闭时能正确地关闭BrokerController释放资源
controller.initialize()创建并初始化Broker
● 先加载Broker中的topic配置、消费者偏移量管理器、订阅组管理器和消费者过滤器管理器的相关数据。然后会进入第一个if【主要用于初始化MessageStore实例及其相关组件】
● 接着 会加载存储在磁盘上的消息数据 然后进入第二个if语句
可以看出来 只要有一个加载失败,broker就会初始化失败。
第一个if
● 创建了 DefaultMessageStore对象
● 判断是否集群模式(DLedger模式)
● 创建brokerStats 用于统计和记录Broker的各种运行状态信息。
● 加载插件。
第二个if
- 有一序列的创建线程池。根据Broker配置创建了多个线程池,分别用于处理发送消息、拉取消息、回复消息查询消息、管理Broker操作、管理客户端连接、心跳检测以及事务处理等各种任务。
- 接着就是创建一系列定时任务。将在指定的时间间隔内周期性地执行一些重要功能,如记录Broker统计信息、持久化消费者偏移量和过滤器信息、定期检查保护Broker状态以及打印水印信息等。
- 根据Broker配置更新NameServer地址列表,并针对DLedgerCommitLog的不同启用状态执行相应的
- 最后是关于stl的一些配置
上述流程成功后,就最终返回true 回到createBrokerController() 方法,然后执行controller.start();
controller.start();
1、启动broker的相关服务
2、向NameServer发送心跳包(定时发送)
3.1.registerBrokerAll发送心跳包
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
// ******其他代码
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
//发送心跳包
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
doRegisterBrokerAll
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
//注册
List<RegisterBrokerResult> registerBrokerResultList
= this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
//更新
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
}
可见心跳包 由this.brokerOuterAPI.registerBrokerAll(…)发送
进入此方法发现
心跳包的
- 请求头包含:broker地址、brokerId、broker名字、集群名称、高可用(High Availability, HA)服务器地址、压缩标志、CRC32校验码。
- 请求体包含:主题配置信息、滤服务器列表
高可用(High Availability, HA)服务器地址
● 如果Broker支持主从切换或集群模式,此地址可能是备用Broker或其他参与HA机制的节点地
滤服务器列表
● 是 Broker 关联的过滤服务节点列表,用于过滤或处理特定类型的消息。如Tag过滤或SQL92表达式过滤。这部分信息是Broker在向NameServer注册时,告知NameServer哪些服务器可以提供消息过滤功能。
然后 继续往下看代码
发令枪CountDownLatch :主要作用是在多线程环境中实现一种一次性屏障(barrier),允许一个或多个线程等待其他线程完成一组操作后再继续执行。
● 功能是:并发地向多个Name Server注册Broker,并且只有当所有注册任务都完成时,主线程或其他关心注册结果的线程才能继续执行。
● registerBroker()是具体的注册方法