NameServer源码解析

1 模块入口代码的功能

        本节介绍入口代码的功能,阅读源码的时候,很多人喜欢根据执行逻辑,先从入口代码看起。NameServer部分入口代码主要完成命令行参数解析,初始化Controller的功能。

1.1 入口函数

首先看一下NameServer的源码目录(见图10-1)。

NamesrvStartup是模块的启动入口,NamesrvController是用来协块各个调模功能的代码。

我们从启动代码开始分析,找到NamesrvStartup.java里的main函数public static void main(String[]args){main0(args);},发现它又把逻辑转到main0这个函数里。

图10-1 NameServer源码目录

1.2 解析命令行参数

main0函数主要完成两个功能,第一个功能是解析命令行参数,我们通过源码来看一看,重点是解析-c和-p参数,如代码清单10-1所示。

代码清单10-1 解析NameServer命令行参数

Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args,
    buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
    System.exit(-1);
    return null;
}
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
        InputStream in = new BufferedInputStream(new
            FileInputStream(file));
        properties = new Properties();
        properties.load(in);
        MixAll.properties2Object(properties, namesrvConfig);
        MixAll.properties2Object(properties, nettyServerConfig);
        namesrvConfig.setConfigStorePath(file);
        System.out.printf("load config properties file OK, " +
            file + "%n");
        in.close();
    }
}
if (commandLine.hasOption('p')) {
    MixAll.printObjectProperties(null, namesrvConfig);
    MixAll.printObjectProperties(null, nettyServerConfig);
    System.exit(0);
}

 

-c命令行参数用来指定配置文件的位置;-p命令行参数用来打印所有配置项的值。注意,用-p参数打印配置项的值之后程序就退出了,这是一个帮助调试的选项。

1.3 初始化NameServer的Controller

main0函数的另外一个功能是初始化Controller,如代码清单10-2所示。

代码清单10-2 初始化并启动Controller

// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
boolean initResult = controller.initialize();
if (!initResult) {
    controller.shutdown();
    System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log,
    new Callable<Void>() {
    @Override
    public Void call() throws Exception {
        controller.shutdown();
        return null;
    }
}));
controller.start();

 

根据解析出的配置参数,调用controller.initialize()来初始化,然后调用controller.start()让NameServer开始服务。

还有一个逻辑是注册ShutdownHookThread,当程序退出的时候会调用controller.shutdown来做退出前的清理工作。

2 NameServer的总控逻辑

NameServer的总控逻辑在NamesrvController.java代码中。NameServer是集群的协调者,它只是简单地接收其他角色报上来的状态,然后根据请求返回相应的状态。首先,NameserverController把执行线程池初始化好,如代码清单10-3所示。

代码清单10-3 线程池初始化

this.remotingExecutor =
    Executors.newFixedThreadPool(nettyServerConfig
        .getServerWorkerThreads(), new ThreadFactoryImpl
        ("RemotingExecutorThread_"));
this.registerProcessor();

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
       NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
       NamesrvController.this.kvConfigManager.printAllPeriodically();
    }
}, 1, 10, TimeUnit.MINUTES);

 

启动了一个默认是8个线程的线程池(private int serverWorkerThreads=8),还有两个定时执行的线程,一个用来扫描失效的Broker(scanNotActiveBroker),另一个用来打印配置信息(printAllPeriodically)。

然后启动负责通信的服务remotingServer,remotingServer监听一些端口,收到Broker、Client等发过来的请求后,根据请求的命令,调用不同的Processor来处理。这些不同的处理逻辑被放到上面初始化的线程池中执行,如代码清单10-4所示。

代码清单10-4 启动通信服务,关联初始化的线程池

this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
    this.brokerHousekeepingService);
……
if (namesrvConfig.isClusterTest()) {
    this.remotingServer.registerDefaultProcessor(new
            ClusterTestRequestProcessor(this, namesrvConfig
            .getProductEnvName()),
        this.remotingExecutor);
} else {
    this.remotingServer.registerDefaultProcessor(new
        DefaultRequestProcessor(this), this.remotingExecutor);
}

 

remotingServer是基于Netty封装的一个网络通信服务,要了解remoting-Server需要先对Netty有个基本的认知,后面会单独介绍。

3 核心业务逻辑处理

NameServer的核心业务逻辑,在DefaultRequestProcessor.java中可以一目了然地看出。网络通信服务模块收到请求后,就调用这个Processor来处理,如代码清单10-5所示。

代码清单10-5 根据请求码调用相应的处理逻辑

switch (request.getCode()) {
    case RequestCode.PUT_KV_CONFIG:
        return this.putKVConfig(ctx, request);
    case RequestCode.GET_KV_CONFIG:
        return this.getKVConfig(ctx, request);
    case RequestCode.DELETE_KV_CONFIG:
        return this.deleteKVConfig(ctx, request);
    case RequestCode.REGISTER_BROKER:
        Version brokerVersion = MQVersion.value2Version(request
            .getVersion());
        if (brokerVersion.ordinal() >= MQVersion.Version
            .V3_0_11.ordinal()) {
            return this.registerBrokerWithFilterServer(ctx, request);
        } else {
            return this.registerBroker(ctx, request);
        }
    case RequestCode.UNREGISTER_BROKER:
        return this.unregisterBroker(ctx, request);
    case RequestCode.GET_ROUTEINTO_BY_TOPIC:
        return this.getRouteInfoByTopic(ctx, request);
    case RequestCode.GET_BROKER_CLUSTER_INFO:
        return this.getBrokerClusterInfo(ctx, request);
    case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
        return this.wipeWritePermOfBroker(ctx, request);
    case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
        return getAllTopicListFromNameserver(ctx, request);
    case RequestCode.DELETE_TOPIC_IN_NAMESRV:
        return deleteTopicInNamesrv(ctx, request);
    case RequestCode.GET_KVLIST_BY_NAMESPACE:
        return this.getKVListByNamespace(ctx, request);
    case RequestCode.GET_TOPICS_BY_CLUSTER:
        return this.getTopicsByCluster(ctx, request);
    case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
        return this.getSystemTopicListFromNs(ctx, request);
    case RequestCode.GET_UNIT_TOPIC_LIST:
        return this.getUnitTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
        return this.getHasUnitSubTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
        return this.getHasUnitSubUnUnitTopicList(ctx, request);
    case RequestCode.UPDATE_NAMESRV_CONFIG:
        return this.updateConfig(ctx, request);
    case RequestCode.GET_NAMESRV_CONFIG:
        return this.getConfig(ctx, request);
    default:
        break;
}

 

逻辑主体是个switch语句,根据RequestCode调用不同的函数来处理,从RequestCode可以了解到NameServer的主要功能,比如:REGISTER_BROKER是在集群中新加入一个Broker机器;GET_ROUTEINTO_BY_TOPIC是请求获取一个Topic的路由信息;WIPE_WRITE_PERM_OF_BROKER是删除一个Broker的写权限。

4 集群状态存储

NameServer作为集群的协调者,需要保存和维护集群的各种元数据,这是通过RouteInfoManager类来实现的,如代码清单10-6所示。

代码清单10-6 RouteInfoManager的存储结构

private final HashMap<String/* topic */, List<QueueData>> topicQueue-Table;
private final HashMap<String/* brokerName */, BrokerData> brokerAddr-Table;
private final HashMap<String/* clusterName */, Set<String/* brokerName
*/>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo>
    brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter
Server */> filterServerTable;
public RouteInfoManager() {
    this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
    this.brokerAddrTable = new HashMap<String, BrokerData>(128);
    this.clusterAddrTable = new HashMap<String, Set<String>>(32);
    this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
    this.filterServerTable = new HashMap<String, List<String>>(256);
}

 

每个结构存储着一类集群信息,具体含义在第5章有介绍。了解RocketMQ各个角色的功能后,对每个结构的处理逻辑就好理解了。下面重点看一下控制访问这些结构的锁机制。

锁分为互斥锁、读写锁;也可分为可重入锁、不可重入锁。在NameServer的场景中,读取操作多,更改操作少,所以选择读写锁能大大提高效率。对于如何选择可重入和不可重入锁,重点看函数间的调用关系,比如多次获取锁的示例代码,如果这个lock是不可重入的,代码无法正常执行,如代码清单10-7所示。

代码清单10-7 多次获取锁示例

Lock lock = new Lock();
public void outer() {
    lock.lock();
    inner();
    lock.unlock();
}
public void inner() {
    lock.lock();
    //do something lock.unlock(); }
}

 

RouteInfoManager中使用的是可重入的读写锁(private final ReadWriteLock lock=new ReentrantReadWriteLock()),我们以deleteTopic函数为例,看一下锁的使用方式,如代码清单10-8所示。

代码清单10-8 锁的使用方式

public void deleteTopic(final String topic) {
    try {
        try {
            this.lock.writeLock().lockInterruptibly();
            this.topicQueueTable.remove(topic);
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("deleteTopic Exception", e);
    }
}

 

 

首先锁的获取和执行逻辑要放到一个try{}里,然后在finally{}中释放。这是一种典型的使用方式,我们可以参考这种方式实现自己的代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/170408.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

长期暴露于空气污染与精神障碍存在因果关系 |UK Biobank周报(11.2)

郑老师统计课程&#xff0c;欢迎点击报名&#xff1a;Nhanes公共数据库挖掘 课程 英国生物银行&#xff08;UK Biobank&#xff0c;UKB&#xff09;是英国迄今以来规模最大的有关致病或预防疾病的基因和环境因子的信息资源库。目的是探求一些特定基因、生活方式和健康状况之间的…

BUUCTF [BJDCTF2020]鸡你太美 1

BUUCTF:https://buuoj.cn/challenges 题目描述&#xff1a; 得到的 flag 请包上 flag{} 提交。来源&#xff1a; https://github.com/BjdsecCA/BJDCTF2020 密文&#xff1a; 下载附件&#xff0c;解压得到两个.gif图片。 第一个gif图片&#xff1a; 第二个gif图片无法打开。…

ROS navigation栅格地图原点位置如何确定?

背景 利用ros进行导航时&#xff0c;生成一张栅格地图&#xff0c;包含gridMap.pgm和gridMap.yaml。现在想要将栅格地图及轨迹在其他应用上显示&#xff0c;需要确定地图的坐标系原点。 gridMap.yaml格式 image: gridMap.pgm  #文件名 resolution: 0.20000  #地图分辨率 …

第十五章---I/O(输入/输出)

15.1输入输出流 流是一组有序的数据序列&#xff0c;根据操作的类型&#xff0c;可分为输入流和输出流两种。I/O(Input/Output,(输出)流提供了一条通道程序&#xff0c;可以使用这条通道把源中的字节序列送到目的地。虽然 I/O 流疆盘文件存取有关&#xff0c;但是程序的源和目…

启动dubbo消费端过程提示No provider available for the service的问题定位与解决

文/朱季谦 某次在启动dubbo消费端时&#xff0c;发现无法从zookeeper注册中心获取到所依赖的消费者API&#xff0c;启动日志一直出现这样的异常提示 Failed to check the status of the service com.fte.zhu.api.testService. No provider available for the service com.fte…

一道简单的积分题目

题目如下图&#xff1a; 解法1&#xff1a; 解法2&#xff1a; 解法3&#xff1a; 错误做法&#xff1a; 在 x ∈ ( 0 , ∞ ) 上有 ln ⁡ x < x &#xff0c;令 f ( x ) ln ⁡ x 1 x 2 &#xff0c; g ( x ) &#xff1d; x 1 x 2 ∴ f ( x ) < g ( x ) &#x…

中贝通信-603220 三季报分析(20231120)

中贝通信-603220 基本情况 公司名称&#xff1a;中贝通信集团股份有限公司 A股简称&#xff1a;中贝通信 成立日期&#xff1a;1999-12-29 上市日期&#xff1a;2018-11-15 所属行业&#xff1a;软件和信息技术服务业 周期性&#xff1a;1 主营业务&#xff1a;通信网络技术服务…

车载毫米波雷达行业发展1——概述

1.1 毫米波雷达定义及产品演进 1.1.1 毫米波雷达定义 毫米波雷达(mmWave Radar)是指工作在毫米波波段的雷达&#xff0c;其频域介于 30&#xff5e;300GHz&#xff0c;波长1~10mm。毫米波雷达稳定性高&#xff0c;抗干扰能力强&#xff0c;可穿透雾、烟、灰尘环境&#xff0…

Microsoft Visual Studio 2019下载及安装流程记录

第一周任务&#xff1a; 1.笔记本上安装vc2019的环境 2.再把OpenCV安装上 3.根据网上的教程&#xff0c;试着写几个opencv的程序 一、安装Visual Studio 2019社区版 首先先完成安装vc2019的环境&#xff0c; 因为&#xff1a; Microsoft Visual C是用于C编程的工具集合&am…

ATTCK 十大免费 工具和资源

01 eBook: Getting Started with ATT&CK 这本免费电子书将有关威胁情报、检测和分析、对手模拟和红队以及评估和工程的博客文章中的内容汇集到一个方便的软件包中。 02 CALDERA CALDERA是一个网络安全平台&#xff0c;旨在轻松自动化对手仿真&#xff0c;协助手动红队并自…

【鸿蒙最新全套教程】<HarmonyOS第一课>1、运行Hello World

下载与安装DevEco Studio 在HarmonyOS应用开发学习之前&#xff0c;需要进行一些准备工作&#xff0c;首先需要完成开发工具DevEco Studio的下载与安装以及环境配置。 进入DevEco Studio下载官网&#xff0c;单击“立即下载”进入下载页面。 DevEco Studio提供了Windows版本和…

【推荐】智元兔AI:一款集写作、问答、绘画于一体的全能工具!

在当今技术飞速发展的时代&#xff0c;越来越多的领域开始应用人工智能&#xff08;Artificial Intelligence&#xff0c;简称AI&#xff09;。其中&#xff0c;AI写作工具备受瞩目&#xff0c;备受推崇。在众多的选择中&#xff0c;智元兔AI是一款在笔者使用过程中非常有帮助的…

使ros1和ros2的bag一直互通

很多文章都是先source ros1 然后source ros2,再play bag source /opt/ros/noetic/setup.bash source /opt/ros/foxy/setup.bash ros2 bag play -s rosbag_v2 kitti_raw00.bag 但实测会出问题: 为使ros1和ros2的bag一直互通 sudo apt update sudo apt install ros-foxy-ro…

在使用tomcat运行项目时,遇到端口80被占用的情况问题解决

问题描述&#xff1a;Failed to initialize end point associated with ProtocolHandler ["http-bio-80"] java.net.BindException: Address already in use: NET_Bind <null>:80 在学习springmvc的时候&#xff0c;跟着黑马视频进行学习&#xff0c;结果&…

Mybatis-Plus 自定义SQL注入器,实现真正的批量插入![MyBatis-Plus系列]

导读 Hi,大家好,我是悟纤。过着爱谁谁的生活,活出不设限的人生。 在使用MyBatis-Plus时,dao层都会去继承BaseMapper接口,这样就可以用BaseMapper接口所有的方法CRUD。 在Mybatis-Plus中调用updateById方法进行数据更新默认情况下是不能更新空值字段的。

python基础练习题库实验9

题目1 编写一个程序来生成一个平方数列表。 例如&#xff1a; 代码 def generate_square_numbers(num):square_list []for i in range(num):square_list.append(i ** 2)return square_listnum_squares int(input("How many square numbers to generate? "))sq…

shell 脚本变量

目录 什么是 shell shell 的两种面向对象 shell 脚本概述 脚本 &#xff08;本质为程序&#xff09; 脚本的组成 执行脚本 例题 脚本构成 脚本执行逻辑及执行方式 脚本的常见错误 编写 shell 脚本 执行脚本文件的方式 重定向 重定向操作 shell 变量的作用及类型…

Shell判断:模式匹配:case(一)

一、前言 shell编程中if和case都是用来做流控的。 二、case语法结构 case 变量 in 模式1&#xff09; 命令序列1 ;; 模式2&#xff09; 命令序列2 ;; 模式3&#xff09; 命令序列3 ;; *) 无匹配…

电脑游戏录屏软件,记录游戏高光时刻

电脑游戏录制是游戏爱好者分享游戏乐趣、技巧和成就的绝佳方式&#xff0c;此时&#xff0c;一款好用的录屏软件就显得尤为重要。本文将为大家介绍三款电脑游戏录屏软件&#xff0c;通过对这三款软件的分步骤详细介绍&#xff0c;让大家更加了解它们的特点及使用方法。 电脑游戏…