一、入口
NameServer的启动源码在NameStartup,现在开始debug之旅
二、createNamesrcController
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
// 启动时的参数信息 有commandLine 管理了。
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// namesrv 配置。
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// netty 服务器配置。
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// namesrv服务器 监听端口 修改为9876
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
// 读取 -c 选项的值
String file = commandLine.getOptionValue('c');
if (file != null) {
// 读取 config 文件数据 到 properties 内
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
// 如果 config 配置文件 内的配置 涉及到 namesrvConfig 或者 nettyServerConfig 的字段,那么进行复写。
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
// 将读取的 配置文件 路径 保存 到 字段。
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
// 将启动时 命令行 设置的kv 复写到 namesrvConfig内。
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 创建日志对象。
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 创建 控制器
// 参数1:namesrvConfig
// 参数2:网络层配置 nettyServerConfig
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
主要做了几件事:
- 创建了NamesrvConfig
- 创建了nettyServerConfig
- 创建了NamesrvController
NamesrvController详解:
public class NamesrvController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvConfig namesrvConfig;
private final NettyServerConfig nettyServerConfig;
// 调度线程池,执行定时任务,两件事:1. 检查存活的broker状态 2. 打印配置
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
// 管理kv配置。
private final KVConfigManager kvConfigManager;
// 管理路由信息的对象,重要。
private final RouteInfoManager routeInfoManager;
// 网络层封装对象,重要。
private RemotingServer remotingServer;
// ChannelEventListener ,用于监听channel 状态,当channel状态 发生改变时 close idle... 会向 事件队列发起事件,事件最终由 该service处理。
private BrokerHousekeepingService brokerHousekeepingService;
// 业务线程池,netty 线程 主要任务是 解析报文 将 报文 解析成 RemotingCommand 对象,然后 就将 该对象 交给 业务 线程池 再继续处理。
private ExecutorService remotingExecutor;
private Configuration configuration;
private FileWatchService fileWatchService;
// 参数1:namesrvConfig
// 参数2:网络层配置 nettyServerConfig
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
}
start(NamesrvController)
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化方法..
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// JVM HOOK ,平滑关闭的逻辑。 当JVM 被关闭时,主动调用 controller.shutdown() 方法,让服务器平滑关机。
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 启动服务器。
controller.start();
return controller;
}
主要做了几件事:
- controller初始化
public boolean initialize() {
// 加载本地kv配置
this.kvConfigManager.load();
// 创建网络服务器对象
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 创建业务线程池,默认线程数 8
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册协议处理器(缺省协议处理器)
this.registerProcessor();
// 定时任务1:每10秒钟检查 broker 存活状态,将idle状态的 broker 移除。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 定时任务2:每10分钟 打印一遍 kv 配置。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
}
- controller启动
会调用到NettyRemotingServer.start方法
public void start() {
// 当向channel pipeline 添加 handler 时 指定了 group 时,网络事件传播到 当前handler时,事件处理 由 分配给 handler 的线程执行。
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 创建共享的 处理器 handler
prepareSharableHandlers();
ServerBootstrap childHandler =
// 配置服务端 启动对象
// 配置工作组 boss 和 worker 组
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
// 设置服务端 ServerSocketChannel类型
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// 设置服务端ch选项
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
// 客户端ch选项
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
// 设置服务器端口
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
//
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 初始化 客户端ch pipeline 的逻辑
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
// 客户端开启 内存池,使用的内存池是 PooledByteBufAllocator.DEFAULT
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
// 服务器 绑定端口。
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
// 将服务器成功绑定的端口号 赋值给 字段 port。
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
// housekeepingService 不为空,则创建 网络异常事件 处理器
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
// 提交定时任务,每一秒 执行一次。
// 扫描 responseTable 表,将过期的 responseFuture 移除。
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}