全网最细RocketMQ源码一:NameSrv

一、入口

在这里插入图片描述
NameServer的启动源码在NameStartup,现在开始debug之旅

二、createNamesrcController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();


        Options options = ServerUtil.buildCommandlineOptions(new Options());
        // 启动时的参数信息 有commandLine 管理了。
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }


        // namesrv 配置。
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        // netty 服务器配置。
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();

        // namesrv服务器 监听端口 修改为9876
        nettyServerConfig.setListenPort(9876);


        if (commandLine.hasOption('c')) {
            // 读取 -c 选项的值
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                // 读取 config 文件数据 到 properties 内
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);

                // 如果 config 配置文件 内的配置 涉及到 namesrvConfig 或者 nettyServerConfig 的字段,那么进行复写。
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                // 将读取的 配置文件 路径 保存 到 字段。
                namesrvConfig.setConfigStorePath(file);
                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }


        // 将启动时 命令行 设置的kv 复写到 namesrvConfig内。
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        // 创建日志对象。
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);


        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        // 创建 控制器
        // 参数1:namesrvConfig
        // 参数2:网络层配置 nettyServerConfig
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }

主要做了几件事:

  1. 创建了NamesrvConfig
  2. 创建了nettyServerConfig
  3. 创建了NamesrvController

NamesrvController详解:

public class NamesrvController {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private final NamesrvConfig namesrvConfig;
    private final NettyServerConfig nettyServerConfig;

    // 调度线程池,执行定时任务,两件事:1. 检查存活的broker状态  2. 打印配置
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));

    // 管理kv配置。
    private final KVConfigManager kvConfigManager;
    // 管理路由信息的对象,重要。
    private final RouteInfoManager routeInfoManager;

    // 网络层封装对象,重要。
    private RemotingServer remotingServer;

    // ChannelEventListener ,用于监听channel 状态,当channel状态 发生改变时 close idle... 会向 事件队列发起事件,事件最终由 该service处理。
    private BrokerHousekeepingService brokerHousekeepingService;

    // 业务线程池,netty 线程 主要任务是 解析报文 将 报文 解析成 RemotingCommand 对象,然后 就将 该对象 交给 业务 线程池 再继续处理。
    private ExecutorService remotingExecutor;

    private Configuration configuration;
    private FileWatchService fileWatchService;

    // 参数1:namesrvConfig
    // 参数2:网络层配置 nettyServerConfig
    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;

        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();

        this.brokerHousekeepingService = new BrokerHousekeepingService(this);

        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }
 }

start(NamesrvController)

  public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }
        // 初始化方法..
        boolean initResult = controller.initialize();

        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }


        // JVM HOOK ,平滑关闭的逻辑。 当JVM 被关闭时,主动调用 controller.shutdown() 方法,让服务器平滑关机。
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        // 启动服务器。
        controller.start();


        return controller;
    }

主要做了几件事:

  1. controller初始化
   public boolean initialize() {
        // 加载本地kv配置
        this.kvConfigManager.load();
        // 创建网络服务器对象
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        // 创建业务线程池,默认线程数 8
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        // 注册协议处理器(缺省协议处理器)
        this.registerProcessor();

        // 定时任务1:每10秒钟检查 broker 存活状态,将idle状态的 broker 移除。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);


        // 定时任务2:每10分钟 打印一遍 kv 配置。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
    }
  1. controller启动
    在这里插入图片描述
    会调用到NettyRemotingServer.start方法
 public void start() {
        // 当向channel pipeline 添加 handler 时 指定了 group 时,网络事件传播到 当前handler时,事件处理 由 分配给 handler 的线程执行。
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {

                    private AtomicInteger threadIndex = new AtomicInteger(0);

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });

        // 创建共享的 处理器 handler
        prepareSharableHandlers();

        ServerBootstrap childHandler =
                // 配置服务端 启动对象
                // 配置工作组 boss 和 worker 组
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                        // 设置服务端 ServerSocketChannel类型
                        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                        // 设置服务端ch选项
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .option(ChannelOption.SO_REUSEADDR, true)
                        .option(ChannelOption.SO_KEEPALIVE, false)
                        // 客户端ch选项
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                        // 设置服务器端口
                        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                        //
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                // 初始化 客户端ch pipeline 的逻辑
                                ch.pipeline()
                                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                        .addLast(defaultEventExecutorGroup,
                                                encoder,
                                                new NettyDecoder(),
                                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                                connectionManageHandler,
                                                serverHandler
                                        );
                            }
                        });


        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            // 客户端开启 内存池,使用的内存池是  PooledByteBufAllocator.DEFAULT
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            // 服务器 绑定端口。
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            // 将服务器成功绑定的端口号 赋值给 字段 port。
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        // housekeepingService 不为空,则创建 网络异常事件 处理器
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        // 提交定时任务,每一秒 执行一次。
        // 扫描 responseTable 表,将过期的 responseFuture 移除。
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/311121.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java中多线程二

抢占调度模型 概述&#xff1a;优先让优先级高的线程使用 CPU &#xff0c;如果线程的优先级相同&#xff0c;那么随机会选择一个&#xff0c;优先级高的线程获取的 CPU 时间片相对多一些 Thread 类中一些关于线程的方法 方法简述public final int getPriority()返回此线程的优…

五、Java中SpringBoot组件集成接入【slf4j日志文档】

五、Java中SpringBoot组件集成接入【slf4j日志文档】 1.slf4j简介2.maven依赖3.配置4.使用5.展示6.参考文章 1.slf4j简介 SLF4J&#xff08;Simple Logging Facade for Java&#xff09;是一个为Java应用程序提供统一日志接口的日志门面框架。它旨在解决Java应用程序中日志系统…

居中面试问题

前端常问居中面试问题 css文本居中 文本水平居中 <div class"father"><div class"child"><div> <div>子类元素为行内元素&#xff0c;则给父类元素定义text-align:center 如果子元素是块元素&#xff0c;则给子元素定义margin&…

Vue3快速入门

文章目录 1. Vue3简介1.1. 【性能的提升】1.2. 源码的升级】1.3. 【拥抱TypeScript】1.4. 【新的特性】 2. 创建Vue3工程2.1. 【基于 vue-cli 创建】2.2. 【基于 vite 创建】(推荐)2.3. 【一个简单的效果】 3. Vue3核心语法3.1. 【OptionsAPI 与 CompositionAPI】Options API 的…

Linux系统——测试端口连通性方法

目录 一、TCP端口连通性测试 1、ssh 2、telnet&#xff08;可能需要安装&#xff09; 3、curl 4、tcping&#xff08;需要安装&#xff09; 5、nc&#xff08;需要安装&#xff09; 6、nmap&#xff08;需要安装&#xff09; 二、UDP端口连通性测试 1、nc&#xff08;…

85.乐理基础-记号篇-速度记号

内容来源于&#xff1a;三分钟音乐社 上一个内容&#xff1a;85.乐理基础-记号篇-力度记号-CSDN博客 速度记号在下方两个里面已经写过一部分了&#xff0c;这些标记总体上是属于 不变速度 的标记&#xff0c;比如一首乐谱就记了 每分钟60拍&#xff0c;那整首速度就都是不变的…

org.springframework.web.servlet.HandlerInterceptor

过期 1 配置黑名单 2 启动注册拦截 3 浏览器访问拦截

【Spring Cloud】Sentinel流量限流和熔断降级的讲解

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《Spring Cloud》。&#x1f3af;&#x1f3af; &am…

【SAP-PP】生产订单导入问题--完成日期向前推了一天

问题描述&#xff1a; 在执行BAPI_PRODORD_CREATE生产订单导入的时候&#xff0c;发现填写入模板中的基本完成日期是12月31日&#xff0c;但是到具体工单时变成了12月30日 截图说明&#xff1a; 感觉很神奇&#xff0c;咋一看&#xff0c;真的是日期提前了一天&#xff0c;de…

线性回归实例

1、线性回归&#xff08;linear Regression&#xff09;和逻辑回归&#xff08;logistic Regression&#xff09;的区别 线性回归主要是用来拟合数据&#xff0c;逻辑回归主要是用来区分数据&#xff0c;找到决策边界。 线性回归的代价函数常用平方误差函数&#xff0c;逻辑回…

函数指针和回调函数

文章目录 一.函数指针1.什么是函数指针2.函数指针的形式3.函数指针的用途。1.调用函数2.作为参数进行传递 二.函数指针数组三.回调函数 一.函数指针 1.什么是函数指针 函数指针是指向函数的指针。在C语言和C中&#xff0c;函数指针可以用来存储函数的地址&#xff0c;并且可以…

Kotlin程序设计(二)面向对象

Kotlin程序设计中级篇 我们在前面已经学习了Kotlin程序设计的基础篇&#xff0c;本章我们将继续介绍更多Kotlin特性&#xff0c;以及面向对象编程。 函数 其实函数我们在一开始就在使用了&#xff1a; fun main() {println("Hello World") }我们程序的入口点就是…

恒创科技:解决Windows服务器磁盘空间不足的问题

​  服务器硬盘的大小是决定空间是否充足的主要因素。但在日常使用中&#xff0c;服务器和网站备份会消耗大量存储空间&#xff0c;如果维护不当&#xff0c;最终将耗尽您的容量。同样&#xff0c;日志文件、临时文件和数据库可以在硬盘驱动器上或回收站中无休止地建立。当您…

windows10使用Shift+Win+S快捷键来截图

有时候电脑上没有打开微信QQ等软件&#xff0c;但是想使用一下截图&#xff0c;就很麻烦&#xff0c;还好windows10开始已经支持快捷键截图了&#xff1a; 打开截图工具并获取屏幕截图 - Microsoft 支持 快捷键&#xff1a;ShiftWinS 使用教程&#xff1a; 顺带说下系统这个自…

Mingw32编译opencv库

文章目录 1. 准备工作2. 编译cmake构建程序mingw32-make编译 3. 安装4. 安装完的结果 注意&#xff1a; mingw32-make编译的库和MSVC编译的库不兼容&#xff0c;MSVC和mingw-make生成的动态库使用的是不同的ABI&#xff08;Application Binary Interface&#xff09;&#xff0…

货拉拉智能监控实践:如何解决多云架构下的故障应急问题?

一分钟精华速览 在月活超千万的大规模业务背景下&#xff0c;货拉拉遭遇了多云环境下的监控碎片化、规划无序等问题。为了应对这些挑战&#xff0c;货拉拉开发了一站式监控平台——Monitor。该平台的部署有效地实现了对核心应用的监控和报警全覆盖&#xff0c;显著提高了应急响…

【算法Hot100系列】组合总和

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老导航 檀越剑指大厂系列:全面总结 jav…

【算法】七夕祭

题目 七夕节因牛郎织女的传说而被扣上了「情人节」的帽子。 于是 TYVJ 今年举办了一次线下七夕祭。 Vani 同学今年成功邀请到了 cl 同学陪他来共度七夕&#xff0c;于是他们决定去 TYVJ 七夕祭游玩。 TYVJ 七夕祭和 11 区的夏祭的形式很像。 矩形的祭典会场由 N 排 M 列共…

作业--day45

定时播放 #include "mywidget.h" #include "ui_mywidget.h"MyWidget::MyWidget(QWidget *parent) :QWidget(parent),ui(new Ui::MyWidget) {ui->setupUi(this);ui->bg_lab->setPixmap(QPixmap(":/pictrue/shanChuan.jpg"));ui->bg_…

装饰器模式

装饰器模式 什么是装饰器模式: 是一种结构型设计模型&#xff0c;允许在不改变对象结构的情况下&#xff0c;动态的将新的功能添加到对象上&#xff0c; 核心思想&#xff1a; 通过组合而非继承的方式来实现功能的扩展。他可以在运行时动态的添加活移除对象的功能&#xff…