# 消息中间件 RocketMQ 高级功能和源码分析(四)

消息中间件 RocketMQ 高级功能和源码分析(四)

一、 消息中间件 RocketMQ 源码分析:回顾 NameServer 架构设计。

1、RocketMQ 架构设计

消息中间件的设计思路一般是基于主题订阅发布的机制,消息生产者(Producer)发送某一个主题到消息服务器,消息服务器负责将消息持久化存储,消息消费者(Consumer)订阅该兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费者(Push模式)或者消费者主动向消息服务器拉去(Pull模式),从而实现消息生产者与消息消费者解耦。为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那消息生产者如何知道消息要发送到哪台消息服务器呢?如果某一台消息服务器宕机了,那么消息生产者如何在不重启服务情况下感知呢?

NameServer 就是为了解决以上问题设计的。

在这里插入图片描述

2、Broker 消息服务器

Broker 消息服务器在启动的时向所有 NameServer注册,消息生产者(Producer)在发送消息时之前先从 NameServer 获取 Broker 服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行发送。NameServer与每台Broker保持长连接,并间隔30S检测 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中删除。但是路由变化不会马上通知消息生产者。这样设计的目的是为了降低 NameServer 实现的复杂度,在消息发送端提供容错机制保证消息发送的可用性。

3、NameServer 的高可用

NameServer 本身的高可用是通过部署多台 NameServer 来实现,但彼此之间不通讯,也就是 NameServer 服务器之间在某一个时刻的数据并不完全相同,但这对消息发送并不会造成任何影响,这也是 NameServer 设计的一个亮点,总之,RocketMQ 设计追求简单高效。

二、 消息中间件 RocketMQ 源码分析:NameServer 启动步骤一。

1、RocketMQ 源码分析: NameServer 启动流程

步骤一:

解析配置文件,填充 NameServerConfig、NettyServerConfig 属性值,并创建 NamesrvController。

步骤二

根据启动属性创建 NamesrvController 实例,并初始化该实例。NameServerController 实例为NameServer 核心控制器。

步骤三

在 JVM 进程关闭之前,先将线程池关闭,及时释放资源。

2、NameServer 启动流程图:

在这里插入图片描述

3、RocketMQ 源码 启动类:org.apache.rocketmq.namesrv.NamesrvStartup.java 源码:

/*
D:\RocketMQ\rocketmq-master\namesrv\src\main\java\org\apache\rocketmq\namesrv\NamesrvStartup.java

 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.rocketmq.namesrv;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.srvutil.ShutdownHookThread;
import org.slf4j.LoggerFactory;

public class NamesrvStartup {

    private static InternalLogger log;
    private static Properties properties = null;
    private static CommandLine commandLine = null;

    public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {

        try {
            NamesrvController controller = createNamesrvController(args);
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }

        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }

    public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }

        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        controller.start();

        return controller;
    }

    public static void shutdown(final NamesrvController controller) {
        controller.shutdown();
    }

    public static Options buildCommandlineOptions(final Options options) {
        Option opt = new Option("c", "configFile", true, "Name server config properties file");
        opt.setRequired(false);
        options.addOption(opt);

        opt = new Option("p", "printConfigItem", false, "Print all config item");
        opt.setRequired(false);
        options.addOption(opt);

        return options;
    }

    public static Properties getProperties() {
        return properties;
    }
}

4、org.apache.rocketmq.namesrv.NamesrvController.java 源码:

/*
D:\RocketMQ\rocketmq-master\namesrv\src\main\java\org\apache\rocketmq\namesrv\NamesrvController.java

 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.rocketmq.namesrv;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
import org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor;
import org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor;
import org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.srvutil.FileWatchService;


public class NamesrvController {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private final NamesrvConfig namesrvConfig;

    private final NettyServerConfig nettyServerConfig;

    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));
    private final KVConfigManager kvConfigManager;
    private final RouteInfoManager routeInfoManager;

    private RemotingServer remotingServer;

    private BrokerHousekeepingService brokerHousekeepingService;

    private ExecutorService remotingExecutor;

    private Configuration configuration;
    private FileWatchService fileWatchService;

    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }

    public boolean initialize() {

        this.kvConfigManager.load();

        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }

    private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) {

            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                this.remotingExecutor);
        } else {

            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
        }
    }

    public void start() throws Exception {
        this.remotingServer.start();

        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

    public void shutdown() {
        this.remotingServer.shutdown();
        this.remotingExecutor.shutdown();
        this.scheduledExecutorService.shutdown();

        if (this.fileWatchService != null) {
            this.fileWatchService.shutdown();
        }
    }

    public NamesrvConfig getNamesrvConfig() {
        return namesrvConfig;
    }

    public NettyServerConfig getNettyServerConfig() {
        return nettyServerConfig;
    }

    public KVConfigManager getKvConfigManager() {
        return kvConfigManager;
    }

    public RouteInfoManager getRouteInfoManager() {
        return routeInfoManager;
    }

    public RemotingServer getRemotingServer() {
        return remotingServer;
    }

    public void setRemotingServer(RemotingServer remotingServer) {
        this.remotingServer = remotingServer;
    }

    public Configuration getConfiguration() {
        return configuration;
    }
}

5、org.apache.rocketmq.common.namesrv.NamesrvConfig.java 源码:

/*
D:\RocketMQ\rocketmq-master\common\src\main\java\org\apache\rocketmq\common\namesrv\NamesrvConfig.java

 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/**
 * $Id: NamesrvConfig.java 1839 2013-05-16 02:12:02Z vintagewang@apache.org $
 */
package org.apache.rocketmq.common.namesrv;

import java.io.File;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;

public class NamesrvConfig {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;
    private boolean orderMessageEnable = false;

    public boolean isOrderMessageEnable() {
        return orderMessageEnable;
    }

    public void setOrderMessageEnable(boolean orderMessageEnable) {
        this.orderMessageEnable = orderMessageEnable;
    }

    public String getRocketmqHome() {
        return rocketmqHome;
    }

    public void setRocketmqHome(String rocketmqHome) {
        this.rocketmqHome = rocketmqHome;
    }

    public String getKvConfigPath() {
        return kvConfigPath;
    }

    public void setKvConfigPath(String kvConfigPath) {
        this.kvConfigPath = kvConfigPath;
    }

    public String getProductEnvName() {
        return productEnvName;
    }

    public void setProductEnvName(String productEnvName) {
        this.productEnvName = productEnvName;
    }

    public boolean isClusterTest() {
        return clusterTest;
    }

    public void setClusterTest(boolean clusterTest) {
        this.clusterTest = clusterTest;
    }

    public String getConfigStorePath() {
        return configStorePath;
    }

    public void setConfigStorePath(final String configStorePath) {
        this.configStorePath = configStorePath;
    }
}

6、org.apache.rocketmq.remoting.netty.NettyServerConfig.java 源码:

/*
D:\RocketMQ\rocketmq-master\remoting\src\main\java\org\apache\rocketmq\remoting\netty\NettyServerConfig.java

 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.rocketmq.remoting.netty;

public class NettyServerConfig implements Cloneable {
    private int listenPort = 8888;
    private int serverWorkerThreads = 8;
    private int serverCallbackExecutorThreads = 0;
    private int serverSelectorThreads = 3;
    private int serverOnewaySemaphoreValue = 256;
    private int serverAsyncSemaphoreValue = 64;
    private int serverChannelMaxIdleTimeSeconds = 120;

    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private boolean serverPooledByteBufAllocatorEnable = true;

    /**
     * make make install
     *
     *
     * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
     * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
     */
    private boolean useEpollNativeSelector = false;

    public int getListenPort() {
        return listenPort;
    }

    public void setListenPort(int listenPort) {
        this.listenPort = listenPort;
    }

    public int getServerWorkerThreads() {
        return serverWorkerThreads;
    }

    public void setServerWorkerThreads(int serverWorkerThreads) {
        this.serverWorkerThreads = serverWorkerThreads;
    }

    public int getServerSelectorThreads() {
        return serverSelectorThreads;
    }

    public void setServerSelectorThreads(int serverSelectorThreads) {
        this.serverSelectorThreads = serverSelectorThreads;
    }

    public int getServerOnewaySemaphoreValue() {
        return serverOnewaySemaphoreValue;
    }

    public void setServerOnewaySemaphoreValue(int serverOnewaySemaphoreValue) {
        this.serverOnewaySemaphoreValue = serverOnewaySemaphoreValue;
    }

    public int getServerCallbackExecutorThreads() {
        return serverCallbackExecutorThreads;
    }

    public void setServerCallbackExecutorThreads(int serverCallbackExecutorThreads) {
        this.serverCallbackExecutorThreads = serverCallbackExecutorThreads;
    }

    public int getServerAsyncSemaphoreValue() {
        return serverAsyncSemaphoreValue;
    }

    public void setServerAsyncSemaphoreValue(int serverAsyncSemaphoreValue) {
        this.serverAsyncSemaphoreValue = serverAsyncSemaphoreValue;
    }

    public int getServerChannelMaxIdleTimeSeconds() {
        return serverChannelMaxIdleTimeSeconds;
    }

    public void setServerChannelMaxIdleTimeSeconds(int serverChannelMaxIdleTimeSeconds) {
        this.serverChannelMaxIdleTimeSeconds = serverChannelMaxIdleTimeSeconds;
    }

    public int getServerSocketSndBufSize() {
        return serverSocketSndBufSize;
    }

    public void setServerSocketSndBufSize(int serverSocketSndBufSize) {
        this.serverSocketSndBufSize = serverSocketSndBufSize;
    }

    public int getServerSocketRcvBufSize() {
        return serverSocketRcvBufSize;
    }

    public void setServerSocketRcvBufSize(int serverSocketRcvBufSize) {
        this.serverSocketRcvBufSize = serverSocketRcvBufSize;
    }

    public boolean isServerPooledByteBufAllocatorEnable() {
        return serverPooledByteBufAllocatorEnable;
    }

    public void setServerPooledByteBufAllocatorEnable(boolean serverPooledByteBufAllocatorEnable) {
        this.serverPooledByteBufAllocatorEnable = serverPooledByteBufAllocatorEnable;
    }

    public boolean isUseEpollNativeSelector() {
        return useEpollNativeSelector;
    }

    public void setUseEpollNativeSelector(boolean useEpollNativeSelector) {
        this.useEpollNativeSelector = useEpollNativeSelector;
    }

    @Override
    public Object clone() throws CloneNotSupportedException {
        return (NettyServerConfig) super.clone();
    }
}

7、NameServer 启动流程 :步骤一:

解析配置文件,填充 NameServerConfig、NettyServerConfig 属性值,并创建 NamesrvController。

8、 代码:NamesrvController#createNamesrvController


//创建NamesrvConfig
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//创建NettyServerConfig
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//设置启动端口号
nettyServerConfig.setListenPort(9876);
//解析启动-c参数
if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in);
        MixAll.properties2Object(properties, namesrvConfig);
        MixAll.properties2Object(properties, nettyServerConfig);

        namesrvConfig.setConfigStorePath(file);

        System.out.printf("load config properties file OK, %s%n", file);
        in.close();
    }
}
//解析启动-p参数
if (commandLine.hasOption('p')) {
    InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
    MixAll.printObjectProperties(console, namesrvConfig);
    MixAll.printObjectProperties(console, nettyServerConfig);
    System.exit(0);
}
//将启动参数填充到namesrvConfig,nettyServerConfig
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

//创建NameServerController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

9、 NamesrvConfig 属性


private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;

10、 rocketmqHome: rocketmq 主目录

kvConfig: NameServer 存储KV配置属性的持久化路径

configStorePath: nameServer 默认配置文件路径

orderMessageEnable: 是否支持顺序消息

11、NettyServerConfig 属性


private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;
private boolean useEpollNativeSelector = false;

listenPort: NameServer监听端口,该值默认会被初始化为9876
serverWorkerThreads: Netty业务线程池线程个数
serverCallbackExecutorThreads: Netty public任务线程池线程个数,Netty网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型未注册线程池,则由public线程池执行。
serverSelectorThreads: IO线程池个数,主要是NameServer、Broker端解析请求、返回相应的线程个数,这类线程主要是处理网路请求的,解析请求包,然后转发到各个业务线程池完成具体的操作,然后将结果返回给调用方;
serverOnewaySemaphoreValue: send oneway消息请求并发读(Broker端参数);
serverAsyncSemaphoreValue: 异步消息发送最大并发度;
serverChannelMaxIdleTimeSeconds : 网络连接最大的空闲时间,默认120s。
serverSocketSndBufSize: 网络socket发送缓冲区大小。
serverSocketRcvBufSize: 网络接收端缓存区大小。
serverPooledByteBufAllocatorEnable: ByteBuffer是否开启缓存;
useEpollNativeSelector: 是否启用Epoll IO模型。

三、 消息中间件 RocketMQ 源码分析:NameServer 启动步骤二。

1、NameServer 启动流程 :步骤二

根据启动属性创建 NamesrvController 实例,并初始化该实例。NameServerController 实例为 NameServer 核心控制器

2、代码:NamesrvController#initialize

public boolean initialize() {
	//加载KV配置
    this.kvConfigManager.load();
	//创建NettyServer网络处理对象
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
	//开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    this.registerProcessor();
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
	//开启定时任务:每隔10min打印一次KV配置
	this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
    return true;
}

四、 消息中间件 RocketMQ 源码分析:NameServer 启动步骤三。

1、NameServer 启动流程 :步骤三

在 JVM 进程关闭之前,先将线程池关闭,及时释放资源。

2、代码:NamesrvStartup#start

//注册JVM钩子函数代码
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call() throws Exception {
        //释放资源
        controller.shutdown();
        return null;
    }
}));

# 消息中间件 RocketMQ 高级功能和源码分析(三)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/724263.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CSS3基本语法

文章目录 一、CSS引入方式二、选择器1、标签选择器2、类选择器3、id选择器4、通配符选择器 三、字体操作1、字体大小2、字体粗细3、字体样式&#xff08;是否倾斜&#xff09;4、字体修改常见字体系列 修改字体系列语法 四、文本操作1、文本缩进2、文本水平对齐方式3、文本修饰…

汽车IVI中控开发入门及进阶(二十七):车载摄像头vehicle camera

前言: 在车载IVI、智能座舱系统中,有一个重要的应用场景就是视频。视频应用又可分为三种,一种是直接解码U盘、SD卡里面的视频文件进行播放,一种是手机投屏,就是把手机投屏软件已视频方式投屏到显示屏上显示,另外一种就是对视频采集设备(主要就是摄像头Camera)的视频源…

智能温室大棚在无土栽培中的应用

在全球范围内&#xff0c;农业面临着前所未有的挑战&#xff0c;包括气候变化、土地资源短缺、水资源匮乏等。为了应对这些问题&#xff0c;智能温室大棚成为了一种创新的解决方案。无土栽培作为现代农业的前沿技术&#xff0c;在智能温室的支持下&#xff0c;展现出了巨大的潜…

QT/基于TCP的服务端实现

代码 widget.cpp #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget),p(new QTcpServer(this))//给服务器指针申请空间 {ui->setupUi(this); }Widget::~Widget() {delete ui; }void W…

室内灰尘对老人小孩危害不容忽视,资深家政推荐除灰尘空气净化器

正所谓“病从口入&#xff0c;尘从窗入”&#xff0c;室内灰尘问题不容小觑。尤其是对老人和小孩来说&#xff0c;灰尘中的有害物质更是威胁健康的重要因素。近期天气炎热&#xff0c;家家户户每天都会开窗通风&#xff0c;然而这也带来了灰尘和毛絮的问题。即使每天打扫&#…

使用asyncua模块的call_method方法调用OPC UA的Server端方法报错:asyncio.exceptions.TimeoutError

使用asyncua模块的call_method方法调用OPC UA的Server端方法报错&#xff1a;asyncio.exceptions.TimeoutError 报错信息如下&#xff1a; Traceback (most recent call last): asyncio.run(main()) File “D:\miniconda3\envs\py31013\lib\asyncio\runners.py”, line 44, in…

PID控制算法学习笔记分享

目录 一、参数设定 二、PID计算公式 三、位置式PID代码实现 四、增量式PID代码实现 五、两种控制算法的优缺点 六、PID算法的改进 一、参数设定 比例系数&#xff08;kp&#xff09;&#xff1a;P项的作用是根据当前误差的大小来产生一个控制量。它直接与误差成正比&#…

Codeforces Round 953 (Div. 2) A - C 题解

因为有事只做了A-C&#xff0c;都比较简单&#xff0c;全是很简单的思维&#xff0c;明天有空还会添加上D&#xff0c;如果有人需要可以明天常来看看&#xff01; 进入正题&#xff1a; A. Alice and Books 题意&#xff1a;给你n个数字&#xff0c;将这些数字分到两堆里&am…

【C++初阶路】--- 类和对象(中)

目录 一、this指针1.1 this指针的引出1.2 this指针的特性1.3. C语言和C实现Stack的对比 二、类的6个默认成员函数三、构造函数3.1 概念3.2 特性 一、this指针 1.1 this指针的引出 如下定义一个日期类Date class Date { public://void InitDate(Date* const this, int year …

APP自动化测试-Appium常见操作之详讲

一、基本操作 1、点击操作 示例&#xff1a;element.click() 针对元素进行点击操作 2、初始化&#xff1a;输入中文的处理 说明&#xff1a;如果连接的是虚拟机&#xff08;真机无需加这两个参数&#xff0c;加上可能会影响手工输入&#xff09;&#xff0c;在初始化配置中…

Java--Arrays类

1.数组的工具java.util.Arrays 2.由于数组对象本身并没有什么方法可以供我们调用&#xff0c;但API中提供了一个工具类Arrays供我们使用&#xff0c;从而可以对数据对象进行一些基本的操作。 3.查看JDK帮助文档 4.Arrays类中的方法都是static修饰静态的静态方法&…

minSdkVersion、targetSdkVersion、compileSdkVersion三者的作用解析

minSDK和targetSDK&#xff0c;这两者相当于一个区间。你能够用到targetSDK中最新的API和最酷的新功能&#xff0c;但又需要向后(向下)兼容到minSDK&#xff0c;保证这个区间内的设备都能够正常的执行你的APP。换句话说&#xff0c;想使用Android刚刚推出的新特性&#xff0c;但…

6 PXE高效批量网络装机

6.1部署PXE远程安装服务 在大规模的Liunx应用环境中&#xff0c;如web群集&#xff0c;分布式计算等&#xff0c;服务器往往并不配备光驱设备&#xff0c;在这种情况下&#xff0c;如何为数十乃至上百台服务器裸机快速安装系统呢&#xff1f;传统的USB光驱&#xff0c;移动硬盘…

零编程数据可视化展示:十个简易案例!

数据可视化是呈现数据内在价值的最终手段。数据可视化实例利用各种图表和图形设计手段&#xff0c;合乎逻辑地展示复杂而不直观的数据。为了让用户直观清楚地了解他们想要的数据及其比较关系&#xff0c;数据可视化实例的呈现至关重要。即时设计整理了10个数据可视化实例&#…

17-C语言中的变量生命周期——自动存储期、青苔存储期、自定义存储期

17-C语言中的变量生命周期——自动存储期、青苔存储期、自定义存储期 文章目录 17-C语言中的变量生命周期——自动存储期、青苔存储期、自定义存储期一、自动存储期示例 二、静态存储期2.1 示例 三、自定义存储期3.1 如何申请内存3.2 如何释放内存3.3 如何清空内存3.4 示例 概念…

【学习DayNa】信息系统开发整理

✍&#x1f3fb;记录学习过程中的输出&#xff0c;坚持每天学习一点点~ ❤️希望能给大家提供帮助~欢迎点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;指点&#x1f64f; 结构化方法 结构是指系统内各个组成要素之间的相互联系、相互作用的框架。结构化开发方法就是…

需求工程师的基本职责(合集)

需求工程师的基本职责1 职责&#xff1a; 1、负责用户需求调研、用户需求分析&#xff0c;明确用户需求分析&#xff0c;明确用户功能需求、业务需求&#xff0c;转换成软件需求说明。 2、收集、分析、整理、提炼系统需求&#xff0c;能够对业务流程提出优化建议并写成系统功能…

音视频入门基础:H.264专题(1)——H.264官方文档下载

音视频入门基础&#xff1a;H.264专题系列文章&#xff1a; 音视频入门基础&#xff1a;H.264专题&#xff08;1&#xff09;——H.264官方文档下载 音视频入门基础&#xff1a;H.264专题&#xff08;2&#xff09;——使用FFmpeg命令生成H.264裸流文件 音视频入门基础&…

SpringBoot整合Minio(支持公有及私有bucket)

&#x1f60a; 作者&#xff1a; 一恍过去 &#x1f496; 主页&#xff1a; https://blog.csdn.net/zhuocailing3390 &#x1f38a; 社区&#xff1a; Java技术栈交流 &#x1f389; 主题&#xff1a; SpringBoot整合Minio(支持公有及私有bucket) ⏱️ 创作时间&#xff1…

第6章 设备驱动程序(4)

目录 6.5 块设备操作 6.5.5 请求结构 6.5.6 BIO 6.5.7 提交请求 6.5.8 I/O调度 6.5.9 ioctl实现 本专栏文章将有70篇左右&#xff0c;欢迎关注&#xff0c;查看后续文章。 6.5 块设备操作 6.5.5 请求结构 struct request { //放在请求队列上&#xff0…