启动你的RocketMQ之旅(二)-broket和namesrv启动流程

前言
👏作者简介:我是笑霸final,一名热爱技术的在校学生。
📝个人主页: 笑霸final的主页2
📕系列专栏:java专栏
📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏收藏🤏

上一章节:启动你的RocketMQ之旅-基本认知

目录

  • 一、Namesrv启动流程
    • 启动类 main0
    • start(controller)方法
    • contoller.initialize()方法
    • contoller.start()
  • 二、broker 启动流程
    • 启动类
    • controller.initialize()创建并初始化Broker
    • controller.start();

一、Namesrv启动流程

Namesrv启动流程如下图

在这里插入图片描述
namesrv启动流程:先是根据命令行参数来创建和初始化 NnamesrvContoller;然后进入Star方法,此方法主要调用了controller.initialize()和controller.start()分别是用来进行初始化和启动NnamesrvContoller,此外还添加了一个jvm关闭钩子的线程,用来释放资源。在controller.initialize()方法中创建了Netty服务器、用于处理网路请求的线程池。还有开启了2个定时任务,一个是每10s执行一次,来检查未激活的broker节点。另一个是10min执行一次,用来打印KV配置。

启动类 main0

在这里插入图片描述
在main0中有两个主要的方法

  • 一个是 createNamesrvController(args) 根据命令行参数创建并初始化RocketMQ Name Server控制器
  • 另一个是 start(NamesrvController) 负责用于执行具体的启动逻辑,如加载配置、初始化Netty服务器、启动定时任务等

start(controller)方法

该方法的主要作用是启动Name Server控制器,并在启动过程中进行必要的初始化。若初始化失败,将执行清理工作并结束进程。同时,为了确保在Java虚拟机(JVM)关闭时能够正确地关闭并释放资源,添加了一个关闭钩子线程。最后,在所有操作完成后返回已启动的NamesrvController实例。下面是关键代码

public static NamesrvController start(final NamesrvController controller) throws Exception {

        // ...非关键代码 ...
    /*
    * 调用NamesrvController的initialize()方法进行初始化操作
    */
        boolean initResult = controller.initialize();
        // ...非关键代码...

    /**
    * 添加JVM关闭钩子线程,在JVM关闭前执行controller的shutdown()方法以确保资源正常释放
    */
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));
        controller.start();// 启动NamesrvControlle
        return controller; // 初始化和启动成功后,返回已启动的NamesrvController实例
    }

代码中的JVM关闭钩子线程是向Java运行时环境(JVM)注册一个关闭钩子(Shutdown Hook)。Runtime.getRuntime().addShutdownHook() 方法本身不会阻塞当前线程,它只是将指定的 Thread 对象(在这里是一个实现了 Runnable 接口的 ShutdownHookThread 实例)添加到 JVM 的关闭序列中。
在这里插入图片描述
● 关闭NettyRemotingServer实例
● 关闭线程池remotingExecutor,这是一个用于执行与网络通信相关的异步任务的线程池
● 关闭定时任务调度器scheduledExecutorService
● 果存在文件监控服务fileWatchService,则关闭该服务

contoller.initialize()方法

主要功能:

  1. 加载KV配置管理器的配置。
  2. 创建并初始化网络通信服务器(NettyRemotingServer)。
  3. 创建用于处理网络请求的线程池。
  4. 注册消息处理器。
  5. 定时任务:每10秒执行一次,检查并扫描未激活的Broker节点(超过120s 就则认为Broker失效,移除该Broker)
  6. 定时任务:每10分钟执行一次,周期性打印所有KV配置信息
  7. 如果启用了TLS加密,创建文件监听服务以监视证书文件的变化,并在证书或密钥文件更改时动态更新服务器的SSL上下文。
    下面是关键代码
public boolean initialize() {

        //。。。加载KV配置管理器的配置。
        /**
         *  创建一个新的NettyRemotingServer实例,用于处理网络通信,
         *  传入nettyServerConfig和brokerHousekeepingService参数
         */
        this.remotingServer 
        = new NettyRemotingServer(
            this.nettyServerConfig, 
            this.brokerHousekeepingService
        );
        //。。。。创建固定大小的线程池,用于执行网络通信任务
        this.registerProcessor();// 注册处理器

         /**
         *  定时任务:每10秒执行一次,检查并扫描未激活的Broker节点
         */
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        // 。。。。定时任务:每10分钟执行一次,周期性打印所有KV配置信息。。。

        //。。。如果启用了TLS加密,创建文件监听服务以监视证书文件的变化,
        //并在证书或密钥文件更改时动态更新服务器的SSL上下文。。。

        return true;
    }

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

contoller.start()

主要完成了基于Netty框架的RocketMQ服务端启动流程,包括设置线程模型、网络参数、处理器链路以及定时任务等关键步骤。

1.初始化了一个默认的事件执行器组,负责处理网络IO任务。

 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {....});

2、通过ServerBootstrap进行Netty服务端的核心配置,根据操作系统选择Epoll或Nio通道类型、设置Socket选项、指定监听端口以及构建子Channel的处理器Pipeline。

ServerBootstrap childHandler = this.serverBootstrap.group(...)
// 根据操作系统选择Epoll或Nio通道类型
    .channel(useEpoll() ? EpollServerSocketChannel.class 
             : NioServerSocketChannel.class) 
//允许最多有1024个未完成连接请求排队等待处理
    .option(ChannelOption.SO_BACKLOG, 1024)
    .option(...) // 设置Socket参数,如监听队列大小、地址重用等
    .childOption(...) // 设置子Channel选项,如TCP_NODELAY、发送/接收缓冲区大小等
    .localAddress(
        new InetSocketAddress(this.nettyServerConfig.getListenPort()))
    .childHandler(...); // 初始化子Channel处理器链路

3.使用Java Timer类安排了一个定期任务,每隔固定时间(此处为每秒)调用scanResponseTable()方法来扫描响应表(所有正在进行的请求。)
● 会将那行超时的请求释放出来。

his.timer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        try {
            NettyRemotingServer.this.scanResponseTable();
        } catch (Throwable e) {
            log.error("scanResponseTable exception", e);
        }
    }
}, 1000 * 3, 1000);

二、broker 启动流程

启动流程如下图

broker启动流程:在启动方法中调用了creatbrokerContoller()来创建brokerContoller,其中调用了controller.creatBrokerController加载broker相关信息后,创建一序列的创建线程池和定时任务。然后就是执行contoller.start()来启动broker相关服务,同时还创建了一个定时任务来向namesrv发送心跳包,使用了CountDownLatch来并发向所有namesrv发送心跳。心跳包的格式封装了请求头和请求体,请求头包括broker地址、brokerId、broker名字、集群名称、高可用(High Availability, HA)服务器地址、压缩标志、CRC32校验码;请求体包括 主题配置信息、滤服务器列表在这里插入图片描述

启动类

在这里插入图片描述
creatBrokerContoller()方法
● 首先初始化网络通信相关的参数、解析命令行输入、加载配置文件、验证并设置必要的Broker相关配置;初始化Netty网络参数时 ,如果系统属性未设置Socket发送和接收缓冲区大小,则使用默认值131072。并且初始化了客户端和服务端。
● 然后创建并初始化BrokerController实例(controller.initialize()😉。同时,该方法还会注册一个JVM关闭钩子,确保在JVM关闭时能正确地关闭BrokerController释放资源

controller.initialize()创建并初始化Broker

在这里插入图片描述
● 先加载Broker中的topic配置、消费者偏移量管理器、订阅组管理器和消费者过滤器管理器的相关数据。然后会进入第一个if【主要用于初始化MessageStore实例及其相关组件】
● 接着 会加载存储在磁盘上的消息数据 然后进入第二个if语句
可以看出来 只要有一个加载失败,broker就会初始化失败。
第一个if
● 创建了 DefaultMessageStore对象
● 判断是否集群模式(DLedger模式)
● 创建brokerStats 用于统计和记录Broker的各种运行状态信息。
● 加载插件。
第二个if

  • 有一序列的创建线程池。根据Broker配置创建了多个线程池,分别用于处理发送消息、拉取消息、回复消息查询消息、管理Broker操作、管理客户端连接、心跳检测以及事务处理等各种任务。在这里插入图片描述
  • 接着就是创建一系列定时任务。将在指定的时间间隔内周期性地执行一些重要功能,如记录Broker统计信息、持久化消费者偏移量和过滤器信息、定期检查保护Broker状态以及打印水印信息等。在这里插入图片描述
  • 根据Broker配置更新NameServer地址列表,并针对DLedgerCommitLog的不同启用状态执行相应的在这里插入图片描述
  • 最后是关于stl的一些配置在这里插入图片描述
    上述流程成功后,就最终返回true 回到createBrokerController() 方法,然后执行controller.start();

controller.start();

1、启动broker的相关服务
2、向NameServer发送心跳包(定时发送)在这里插入图片描述
3.1.registerBrokerAll发送心跳包

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
      
        // ******其他代码
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
            //发送心跳包
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }

doRegisterBrokerAll

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
        TopicConfigSerializeWrapper topicConfigWrapper) {
        //注册
        List<RegisterBrokerResult> registerBrokerResultList 
    = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            topicConfigWrapper,
            this.filterServerManager.buildNewFilterServerList(),
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isCompressedRegister());
        //更新
        if (registerBrokerResultList.size() > 0) {
            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
            if (registerBrokerResult != null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }

                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

                if (checkOrderConfig) {
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }
    }

可见心跳包 由this.brokerOuterAPI.registerBrokerAll(…)发送
进入此方法发现在这里插入图片描述在这里插入图片描述
心跳包的

  • 请求头包含:broker地址、brokerId、broker名字、集群名称、高可用(High Availability, HA)服务器地址、压缩标志、CRC32校验码。
  • 请求体包含:主题配置信息、滤服务器列表

高可用(High Availability, HA)服务器地址
● 如果Broker支持主从切换或集群模式,此地址可能是备用Broker或其他参与HA机制的节点地
滤服务器列表
● 是 Broker 关联的过滤服务节点列表,用于过滤或处理特定类型的消息。如Tag过滤或SQL92表达式过滤。这部分信息是Broker在向NameServer注册时,告知NameServer哪些服务器可以提供消息过滤功能。

然后 继续往下看代码在这里插入图片描述
发令枪CountDownLatch :主要作用是在多线程环境中实现一种一次性屏障(barrier),允许一个或多个线程等待其他线程完成一组操作后再继续执行。
● 功能是:并发地向多个Name Server注册Broker,并且只有当所有注册任务都完成时,主线程或其他关心注册结果的线程才能继续执行。
● registerBroker()是具体的注册方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/934132.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue3-canvas实现在图片上框选标记(放大,缩小,移动,删除)

双图版本&#xff08;模板对比&#xff09; 业务描述&#xff1a;模板与图片对比&#xff0c;只操作模板框选的位置进行色差对比&#xff0c;传框选坐标位置给后端&#xff0c;返回对比结果显示 draw.js文件&#xff1a; 新增了 createUuid&#xff0c;和求取两个数组差集的方…

python编程Day13-异常介绍捕获异常抛出异常

异常 介绍 1, 程序在运行时, 如果Python解释器遇到到一个错误, 则会停 止程序的执行, 并且提示一些错误信息, 这就是异常. 2, 程序停止执行并且提示错误信息这个动作, 通常称之为: 抛出 (raise) 异常 # f open(aaaa.txt) # FileNotFoundError: [Errno 2] No such file or dire…

计网(王道的总结)-数据链路层-网络层-传输层

由于时间有限&#xff0c;把每个王道的章节最后一节放在一起&#xff0c;分别看看复习知识点。 3.6.4 IEEE 802.11 无线局域网 重点&#xff1a; 3.7 广域网 真题考频&#xff1a;极低 3.8以太网交换机 4.1网络层的功能 4.2.1IPv4分组 最重要的&#xff1a; TTL&#xff1a;…

【优选算法篇】:揭开二分查找算法的神秘面纱--数据海洋中的精准定位器

✨感谢您阅读本篇文章&#xff0c;文章内容是个人学习笔记的整理&#xff0c;如果哪里有误的话还请您指正噢✨ ✨ 个人主页&#xff1a;余辉zmh–CSDN博客 ✨ 文章所属专栏&#xff1a;c篇–CSDN博客 文章目录 一.二分查找算法二.算法模板模板一模板二模板三 三.例题演练1.x的平…

PlantUML——类图

背景 类图是UML模型中的静态视图&#xff0c;其主要作用包括&#xff1a; 描述系统的结构化设计&#xff0c;显示出类、接口以及它们之间的静态结构和关系。简化对系统的理解&#xff0c;是系统分析与设计阶段的重要产物&#xff0c;也是系统编码和测试的重要模型依据。 在U…

来也RPA程序异常处理

1、程序异常模块怎么弄&#xff1a;连接第一个流程块后&#xff0c;连接第二个流程块就是虚线异常块。这是编辑器固定的功能。 2、异常模块做什么&#xff1f;系统会自动把异常文本&#xff0c;通输入参数 $BlockInput 传入异常流程块。 然后&#xff0c;这个异常文本&#xf…

电子应用设计方案-43:智能手机充电器系统方案设计

智能手机充电器系统方案设计 一、引言 随着智能手机的广泛应用&#xff0c;对充电器的性能、效率和安全性提出了更高的要求。本方案旨在设计一款高效、安全、兼容多种快充协议的智能手机充电器。 二、系统概述 1. 系统目标 - 提供快速、稳定、安全的充电功能。 - 兼容主流的智…

Java Agent(一)、 初步认识Instrumentation

目录 1、什么是Instrumentation&#xff1f; 2、底层机制 2.1、工作流程 2.2、Instrumentation API 3、加载Java Agent 3.1、静态Agent示例 3.1.1、定义一个agent 3.1.2、配置 MANIFEST.MF 3.1.3、定义main测试类 3.1.4、启动参数添加-javaagent 3.2、动态Agent示例…

关于SpringBoot项目创建后构建总是失败的问题

第一个问题&#xff1a;IDEA创建项目总是失败 原因&#xff1a;创建项目的时候默认使用的是https://start.spring.io&#xff0c;这个是一个外国网站&#xff0c;众所周知的就是国内访问总是出现不稳定的现象&#xff0c;这就是导致项目创建失败的最终原因。 解决方法&#x…

Java-自动拆箱/装箱/缓存/效率

为什么基本类型需要包装类&#xff1f; 泛型与集合支持问题&#xff1a;基本数据类型在使用上虽然方便、简单且高效&#xff0c;但像泛型以及集合元素的存储等场景并不支持基本数据类型&#xff0c;而包装类可以解决这个问题&#xff0c;使其能更好地融入到一些需要对象类型的…

计算机毕业设计Python中华古诗词知识图谱可视化 古诗词智能问答系统 古诗词数据分析 古诗词情感分析模型 自然语言处理NLP 机器学习 深度学习

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

微服务网关SpringCloudGateway、Kong比较

网关产品 1. Spring Cloud Gateway 基本信息 Spring Cloud Gateway是Spring Cloud生态系统中的一个组件&#xff0c;基于Spring 5、Project Reactor和Spring Boot 2构建。它旨在为微服务架构提供一种简单而有效的API网关解决方案。 功能特点 路由功能强大&#xff1a;使用Rou…

实现基于分布式的LAMP架构+NFS实时同步到备份服务器

概述 项目计划用WordPress搭建一个博客系统, 为了性能更好,两个服务器都对外提供WordPress博客系统服务, 数据放在MySQL服务器, 有些上传的图片发送到NFS服务器上&#xff0c;并且把NFS数据实时同步到一个备份服务器上。 服务名称IP地址DNS10.0.0.200WEB110.0.0.201W…

【NVIDIA orin nx 安装ultralytics yolov11】

注意:不同用户安装的python可能会在不同的路径,因此不同的pip管理会导致安装的 torch和torchvision会在不同的路径下 记得区分用户来运行yolo 一、确认系统 JetPack 版本 此处使用5.1.1 1、查看JetPack 版本 jtop二、安装 ultralytics、pytorch、torchvision、onnxruntime…

Linux系统挂载exfat格式U盘教程,触觉智能RK3562开发板演示

本文介绍Linux系统&#xff08;Ubuntu/Debian通用&#xff09;挂载exfat格式U盘的方法&#xff0c;触觉智能RK3562开发板演示&#xff0c;搭载4核A53处理器&#xff0c;主频高达2.0GHz&#xff1b;内置独立1Tops算力NPU&#xff0c;可应用于物联网网关、平板电脑、智能家居、教…

【Vulkan入门】08-CreateRenderPass

目录 先叨叨git信息关键代码TestPipeLine::CreateRenderPass() 先叨叨 上篇已经为Pipeline编写好了程序&#xff08;Shader&#xff09;。接下来要为Pipeline创建RenderPass。 关于RenderPass&#xff0c;在【Vulkan入门】06-Pipeline介绍中已经作了简单的介绍。这里再详细说一…

从 HTTP 到 HTTPS 再到 HSTS

近些年&#xff0c;随着域名劫持、信息泄漏等网络安全事件的频繁发生&#xff0c;网站安全也变得越来越重要&#xff0c;也促成了网络传输协议从 HTTP 到 HTTPS 再到 HSTS 的转变。 HTTP HTTP&#xff08;超文本传输协议&#xff09; 是一种用于分布式、协作式和超媒体信息系…

01-Chromedriver下载与配置(mac)

下载地址&#xff1a; 这里我用的最后一个&#xff0c;根据自己chrome浏览器选择相应的版本号即可 ChromeDriver官网下载地址&#xff1a;https://sites.google.com/chromium.org/driver/downloads ChromeDriver官网最新版下载地址&#xff1a;https://googlechromelabs.git…

面试技术点之安卓篇

一、基础 二、高级 三、组件 Android中SurfaceView和TextureView有什么区别&#xff1f; 参考 Android中SurfaceView和TextureView有什么区别&#xff1f; 四、三方框架 五、系统源码 六、性能优化

架构13-持久化存储

零、文章目录 架构13-持久化存储 1、Kubernetes 存储设计 &#xff08;1&#xff09;存储设计考量 **设计哲学&#xff1a;**Kubernetes 遵循用户通过资源和声明式 API 描述意图&#xff0c;Kubernetes 根据意图完成具体操作。**复杂性&#xff1a;**描述用户的存储意图本身…