消息中间件 RocketMQ 高级功能和源码分析(五)
一、 消息中间件 RocketMQ 源码分析:NameServer 路由元数据
1、消息中间件 RocketMQ 中,NameServer 路由管理
NameServer 的主要作用是为消息的生产者和消息消费者提供关于主题 Topic 的路由信息,那么 NameServer 需要存储路由的基础信息,还要管理 Broker 节点,包括路由注册、路由删除等。
2、消息中间件 RocketMQ 中,NameServer 路由元信息
1) 代码:RouteInfoManager
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
2)消息中间件 RocketMQ 中,NameServer 路由实体图:
3)说明:
topicQueueTable: Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡
brokerAddrTable: Broker 基础信息,包括brokerName、所属集群名称、主备 Broker 地址
clusterAddrTable: Broker 集群信息,存储集群中所有 Broker 名称
brokerLiveTable: Broker 状态信息,NameServer 每次收到心跳包是会替换该信息
filterServerTable: Broker 上的 FilterServer 列表,用于类模式消息过滤。
RocketMQ 基于定于发布机制,一个 Topic 拥有多个消息队列,一个 Broker 为每一个主题创建4个读队列和4个写队列。多个 Broker 组成一个集群,集群由相同的多台 Broker 组成 Master-Slave 架构,brokerId为0 代表 Master,大于0为 Slave。BrokerLiveInfo 中的 lastUpdateTimestamp 存储上次收到 Broker 心跳包的时间。
4)实体数据实例图
3、org.apache.rocketmq.broker.BrokerStartup.java
源码:
/*
D:\RocketMQ\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\BrokerStartup.java
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
public class BrokerStartup {
public static Properties properties = null;
public static CommandLine commandLine = null;
public static String configFile = null;
public static InternalLogger log;
public static void main(String[] args) {
start(createBrokerController(args));
}
public static BrokerController start(BrokerController controller) {
try {
controller.start();
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
}
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
public static void shutdown(final BrokerController controller) {
if (null != controller) {
controller.shutdown();
}
}
public static BrokerController createBrokerController(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 131072;
}
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 131072;
}
try {
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}
break;
default:
break;
}
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
private static void properties2SystemEnv(Properties properties) {
if (properties == null) {
return;
}
String rmqAddressServerDomain = properties.getProperty("rmqAddressServerDomain", MixAll.WS_DOMAIN_NAME);
String rmqAddressServerSubGroup = properties.getProperty("rmqAddressServerSubGroup", MixAll.WS_DOMAIN_SUBGROUP);
System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain);
System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
}
private static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Broker config properties file");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "printConfigItem", false, "Print all config item");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("m", "printImportantConfig", false, "Print important config item");
opt.setRequired(false);
options.addOption(opt);
return options;
}
}
4、org.apache.rocketmq.broker.BrokerController.java
源码:
/*
D:\RocketMQ\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\BrokerController.java
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
import org.apache.rocketmq.broker.processor.ClientManageProcessor;
import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class BrokerController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
private static final InternalLogger LOG_WATER_MARK = InternalLoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
private final BrokerConfig brokerConfig;
private final NettyServerConfig nettyServerConfig;
private final NettyClientConfig nettyClientConfig;
private final MessageStoreConfig messageStoreConfig;
private final ConsumerOffsetManager consumerOffsetManager;
private final ConsumerManager consumerManager;
private final ConsumerFilterManager consumerFilterManager;
private final ProducerManager producerManager;
private final ClientHousekeepingService clientHousekeepingService;
private final PullMessageProcessor pullMessageProcessor;
private final PullRequestHoldService pullRequestHoldService;
private final MessageArrivingListener messageArrivingListener;
private final Broker2Client broker2Client;
private final SubscriptionGroupManager subscriptionGroupManager;
private final ConsumerIdsChangeListener consumerIdsChangeListener;
private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
private final BrokerOuterAPI brokerOuterAPI;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerControllerScheduledThread"));
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private MessageStore messageStore;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
private TopicConfigManager topicConfigManager;
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
private ExecutorService heartbeatExecutor;
private ExecutorService consumerManageExecutor;
private ExecutorService endTransactionExecutor;
private boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats;
private InetSocketAddress storeHost;
private BrokerFastFailure brokerFastFailure;
private Configuration configuration;
private FileWatchService fileWatchService;
private TransactionalMessageCheckService transactionalMessageCheckService;
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private Future<?> slaveSyncFuture;
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
this.consumerOffsetManager = new ConsumerOffsetManager(this);
this.topicConfigManager = new TopicConfigManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pullRequestHoldService = new PullRequestHoldService(this);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
this.consumerFilterManager = new ConsumerFilterManager(this);
this.producerManager = new ProducerManager();
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
this.filterServerManager = new FilterServerManager(this);
this.slaveSynchronize = new SlaveSynchronize(this);
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
this.brokerFastFailure = new BrokerFastFailure(this);
this.configuration = new Configuration(
log,
BrokerPathConfigHelper.getBrokerConfigPath(),
this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
}
public BrokerConfig getBrokerConfig() {
return brokerConfig;
}
public NettyServerConfig getNettyServerConfig() {
return nettyServerConfig;
}
public BlockingQueue<Runnable> getPullThreadPoolQueue() {
return pullThreadPoolQueue;
}
public BlockingQueue<Runnable> getQueryThreadPoolQueue() {
return queryThreadPoolQueue;
}
public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
if (result) {
try {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
result = result && this.messageStore.load();
if (result) {
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));
this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));
this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
this.registerProcessor();
final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
((NettyRemotingServer) fastRemotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
initialTransaction();
initialAcl();
initialRpcHooks();
}
return result;
}
private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
private void initialAcl() {
if (!this.brokerConfig.isAclEnable()) {
log.info("The broker dose not enable acl");
return;
}
List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()) {
log.info("The broker dose not load the AccessValidator");
return;
}
for (AccessValidator accessValidator: accessValidators) {
final AccessValidator validator = accessValidator;
this.registerServerRPCHook(new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
//Do not catch the exception
validator.validate(validator.parse(request, remoteAddr));
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
}
});
}
}
private void initialRpcHooks() {
List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
if (rpcHooks == null || rpcHooks.isEmpty()) {
return;
}
for (RPCHook rpcHook: rpcHooks) {
this.registerServerRPCHook(rpcHook);
}
}
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* QueryMessageProcessor
*/
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
/**
* ClientManageProcessor
*/
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
/**
* ConsumerManageProcessor
*/
ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
/**
* EndTransactionProcessor
*/
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
/**
* Default
*/
AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}
public BrokerStats getBrokerStats() {
return brokerStats;
}
public void setBrokerStats(BrokerStats brokerStats) {
this.brokerStats = brokerStats;
}
public void protectBroker() {
if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {
final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<String, MomentStatsItem> next = it.next();
final long fallBehindBytes = next.getValue().getValue().get();
if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {
final String[] split = next.getValue().getStatsKey().split("@");
final String group = split[2];
LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);
this.subscriptionGroupManager.disableConsume(group);
}
}
}
}
public long headSlowTimeMills(BlockingQueue<Runnable> q) {
long slowTimeMills = 0;
final Runnable peek = q.peek();
if (peek != null) {
RequestTask rt = BrokerFastFailure.castRunnable(peek);
slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
}
if (slowTimeMills < 0) {
slowTimeMills = 0;
}
return slowTimeMills;
}
public long headSlowTimeMills4SendThreadPoolQueue() {
return this.headSlowTimeMills(this.sendThreadPoolQueue);
}
public long headSlowTimeMills4PullThreadPoolQueue() {
return this.headSlowTimeMills(this.pullThreadPoolQueue);
}
public long headSlowTimeMills4QueryThreadPoolQueue() {
return this.headSlowTimeMills(this.queryThreadPoolQueue);
}
public long headSlowTimeMills4EndTransactionThreadPoolQueue() {
return this.headSlowTimeMills(this.endTransactionThreadPoolQueue);
}
public void printWaterMark() {
LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue());
}
public MessageStore getMessageStore() {
return messageStore;
}
public void setMessageStore(MessageStore messageStore) {
this.messageStore = messageStore;
}
private void printMasterAndSlaveDiff() {
long diff = this.messageStore.slaveFallBehindMuch();
// XXX: warn and notify me
log.info("Slave fall behind master: {} bytes", diff);
}
public Broker2Client getBroker2Client() {
return broker2Client;
}
public ConsumerManager getConsumerManager() {
return consumerManager;
}
public ConsumerFilterManager getConsumerFilterManager() {
return consumerFilterManager;
}
public ConsumerOffsetManager getConsumerOffsetManager() {
return consumerOffsetManager;
}
public MessageStoreConfig getMessageStoreConfig() {
return messageStoreConfig;
}
public ProducerManager getProducerManager() {
return producerManager;
}
public void setFastRemotingServer(RemotingServer fastRemotingServer) {
this.fastRemotingServer = fastRemotingServer;
}
public PullMessageProcessor getPullMessageProcessor() {
return pullMessageProcessor;
}
public PullRequestHoldService getPullRequestHoldService() {
return pullRequestHoldService;
}
public SubscriptionGroupManager getSubscriptionGroupManager() {
return subscriptionGroupManager;
}
public void shutdown() {
if (this.brokerStatsManager != null) {
this.brokerStatsManager.shutdown();
}
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.shutdown();
}
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.shutdown();
}
if (this.remotingServer != null) {
this.remotingServer.shutdown();
}
if (this.fastRemotingServer != null) {
this.fastRemotingServer.shutdown();
}
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
if (this.messageStore != null) {
this.messageStore.shutdown();
}
this.scheduledExecutorService.shutdown();
try {
this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
this.unregisterBrokerAll();
if (this.sendMessageExecutor != null) {
this.sendMessageExecutor.shutdown();
}
if (this.pullMessageExecutor != null) {
this.pullMessageExecutor.shutdown();
}
if (this.adminBrokerExecutor != null) {
this.adminBrokerExecutor.shutdown();
}
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.shutdown();
}
this.consumerOffsetManager.persist();
if (this.filterServerManager != null) {
this.filterServerManager.shutdown();
}
if (this.brokerFastFailure != null) {
this.brokerFastFailure.shutdown();
}
if (this.consumerFilterManager != null) {
this.consumerFilterManager.persist();
}
if (this.clientManageExecutor != null) {
this.clientManageExecutor.shutdown();
}
if (this.queryMessageExecutor != null) {
this.queryMessageExecutor.shutdown();
}
if (this.consumerManageExecutor != null) {
this.consumerManageExecutor.shutdown();
}
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(false);
}
if (this.endTransactionExecutor != null) {
this.endTransactionExecutor.shutdown();
}
}
private void unregisterBrokerAll() {
this.brokerOuterAPI.unregisterBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId());
}
public String getBrokerAddr() {
return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort();
}
public void start() throws Exception {
if (this.messageStore != null) {
this.messageStore.start();
}
if (this.remotingServer != null) {
this.remotingServer.start();
}
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
}
this.registerBrokerAll(true, false, true);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
TopicConfig registerTopicConfig = topicConfig;
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
registerTopicConfig =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
}
ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(dataVersion);
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
}
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
}
private boolean needRegister(final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final int timeoutMills) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
boolean needRegister = false;
for (Boolean changed : changeList) {
if (changed) {
needRegister = true;
break;
}
}
return needRegister;
}
public TopicConfigManager getTopicConfigManager() {
return topicConfigManager;
}
public void setTopicConfigManager(TopicConfigManager topicConfigManager) {
this.topicConfigManager = topicConfigManager;
}
public String getHAServerAddr() {
return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
}
public RebalanceLockManager getRebalanceLockManager() {
return rebalanceLockManager;
}
public SlaveSynchronize getSlaveSynchronize() {
return slaveSynchronize;
}
public ExecutorService getPullMessageExecutor() {
return pullMessageExecutor;
}
public void setPullMessageExecutor(ExecutorService pullMessageExecutor) {
this.pullMessageExecutor = pullMessageExecutor;
}
public BlockingQueue<Runnable> getSendThreadPoolQueue() {
return sendThreadPoolQueue;
}
public FilterServerManager getFilterServerManager() {
return filterServerManager;
}
public BrokerStatsManager getBrokerStatsManager() {
return brokerStatsManager;
}
public List<SendMessageHook> getSendMessageHookList() {
return sendMessageHookList;
}
public void registerSendMessageHook(final SendMessageHook hook) {
this.sendMessageHookList.add(hook);
log.info("register SendMessageHook Hook, {}", hook.hookName());
}
public List<ConsumeMessageHook> getConsumeMessageHookList() {
return consumeMessageHookList;
}
public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
this.consumeMessageHookList.add(hook);
log.info("register ConsumeMessageHook Hook, {}", hook.hookName());
}
public void registerServerRPCHook(RPCHook rpcHook) {
getRemotingServer().registerRPCHook(rpcHook);
this.fastRemotingServer.registerRPCHook(rpcHook);
}
public RemotingServer getRemotingServer() {
return remotingServer;
}
public void setRemotingServer(RemotingServer remotingServer) {
this.remotingServer = remotingServer;
}
public void registerClientRPCHook(RPCHook rpcHook) {
this.getBrokerOuterAPI().registerRPCHook(rpcHook);
}
public BrokerOuterAPI getBrokerOuterAPI() {
return brokerOuterAPI;
}
public InetSocketAddress getStoreHost() {
return storeHost;
}
public void setStoreHost(InetSocketAddress storeHost) {
this.storeHost = storeHost;
}
public Configuration getConfiguration() {
return this.configuration;
}
public BlockingQueue<Runnable> getHeartbeatThreadPoolQueue() {
return heartbeatThreadPoolQueue;
}
public TransactionalMessageCheckService getTransactionalMessageCheckService() {
return transactionalMessageCheckService;
}
public void setTransactionalMessageCheckService(
TransactionalMessageCheckService transactionalMessageCheckService) {
this.transactionalMessageCheckService = transactionalMessageCheckService;
}
public TransactionalMessageService getTransactionalMessageService() {
return transactionalMessageService;
}
public void setTransactionalMessageService(TransactionalMessageService transactionalMessageService) {
this.transactionalMessageService = transactionalMessageService;
}
public AbstractTransactionalMessageCheckListener getTransactionalMessageCheckListener() {
return transactionalMessageCheckListener;
}
public void setTransactionalMessageCheckListener(
AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
}
private void handleSlaveSynchronize(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
public void changeToSlave(int brokerId) {
log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
//change the role
brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
//handle the scheduled service
try {
this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
}
//handle the transactional service
try {
this.shutdownProcessorByHa();
} catch (Throwable t) {
log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
}
//handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SLAVE);
try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}
public void changeToMaster(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
return;
}
log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());
//handle the slave synchronise
handleSlaveSynchronize(role);
//handle the scheduled service
try {
this.messageStore.handleScheduleMessageService(role);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
}
//handle the transactional service
try {
this.startProcessorByHa(BrokerRole.SYNC_MASTER);
} catch (Throwable t) {
log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
}
//if the operations above are totally successful, we change to master
brokerConfig.setBrokerId(0); //TO DO check
messageStoreConfig.setBrokerRole(role);
try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
}
private void startProcessorByHa(BrokerRole role) {
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.start();
}
}
}
private void shutdownProcessorByHa() {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(true);
}
}
}
5、org.apache.rocketmq.common.BrokerConfig.java
源码:
/*
D:\RocketMQ\rocketmq-master\common\src\main\java\org\apache\rocketmq\common\BrokerConfig.java
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
public class BrokerConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
@ImportantField
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
@ImportantField
private String brokerIP1 = RemotingUtil.getLocalAddress();
private String brokerIP2 = RemotingUtil.getLocalAddress();
@ImportantField
private String brokerName = localHostName();
@ImportantField
private String brokerClusterName = "DefaultCluster";
@ImportantField
private long brokerId = MixAll.MASTER_ID;
private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;
private int defaultTopicQueueNums = 8;
@ImportantField
private boolean autoCreateTopicEnable = true;
private boolean clusterTopicEnable = true;
private boolean brokerTopicEnable = true;
@ImportantField
private boolean autoCreateSubscriptionGroup = true;
private String messageStorePlugIn = "";
@ImportantField
private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
@ImportantField
private boolean traceTopicEnable = false;
/**
* thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
* value is 1.
*/
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32;
private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
/**
* Thread numbers for EndTransactionProcessor
*/
private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;
private int flushConsumerOffsetInterval = 1000 * 5;
private int flushConsumerOffsetHistoryInterval = 1000 * 60;
@ImportantField
private boolean rejectTransactionMessage = false;
@ImportantField
private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
private int endTransactionPoolQueueCapacity = 100000;
private int filterServerNums = 0;
private boolean longPollingEnable = true;
private long shortPollingTimeMills = 1000;
private boolean notifyConsumerIdsChangedEnable = true;
private boolean highSpeedMode = false;
private boolean commercialEnable = true;
private int commercialTimerCount = 1;
private int commercialTransCount = 1;
private int commercialBigCount = 1;
private int commercialBaseCount = 1;
private boolean transferMsgByHeap = true;
private int maxDelayTime = 40;
private String regionId = MixAll.DEFAULT_TRACE_REGION_ID;
private int registerBrokerTimeoutMills = 6000;
private boolean slaveReadEnable = false;
private boolean disableConsumeIfConsumerReadSlowly = false;
private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;
private boolean brokerFastFailureEnable = true;
private long waitTimeMillsInSendQueue = 200;
private long waitTimeMillsInPullQueue = 5 * 1000;
private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
private long waitTimeMillsInTransactionQueue = 3 * 1000;
private long startAcceptSendRequestTimeStamp = 0L;
private boolean traceOn = true;
// Switch of filter bit map calculation.
// If switch on:
// 1. Calculate filter bit map when construct queue.
// 2. Filter bit map will be saved to consume queue extend file if allowed.
private boolean enableCalcFilterBitMap = false;
// Expect num of consumers will use filter.
private int expectConsumerNumUseFilter = 32;
// Error rate of bloom filter, 1~100.
private int maxErrorRateOfBloomFilter = 20;
//how long to clean filter data after dead.Default: 24h
private long filterDataCleanTimeSpan = 24 * 3600 * 1000;
// whether do filter when retry.
private boolean filterSupportRetry = false;
private boolean enablePropertyFilter = false;
private boolean compressedRegister = false;
private boolean forceRegister = true;
/**
* This configurable item defines interval of topics registration of broker to name server. Allowing values are
* between 10, 000 and 60, 000 milliseconds.
*/
private int registerNameServerPeriod = 1000 * 30;
/**
* The minimum time of the transactional message to be checked firstly, one message only exceed this time interval
* that can be checked.
*/
@ImportantField
private long transactionTimeOut = 6 * 1000;
/**
* The maximum number of times the message was checked, if exceed this value, this message will be discarded.
*/
@ImportantField
private int transactionCheckMax = 15;
/**
* Transaction message check interval.
*/
@ImportantField
private long transactionCheckInterval = 60 * 1000;
/**
* Acl feature switch
*/
@ImportantField
private boolean aclEnable = false;
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.error("Failed to obtain the host name", e);
}
return "DEFAULT_BROKER";
}
public boolean isTraceOn() {
return traceOn;
}
public void setTraceOn(final boolean traceOn) {
this.traceOn = traceOn;
}
public long getStartAcceptSendRequestTimeStamp() {
return startAcceptSendRequestTimeStamp;
}
public void setStartAcceptSendRequestTimeStamp(final long startAcceptSendRequestTimeStamp) {
this.startAcceptSendRequestTimeStamp = startAcceptSendRequestTimeStamp;
}
public long getWaitTimeMillsInSendQueue() {
return waitTimeMillsInSendQueue;
}
public void setWaitTimeMillsInSendQueue(final long waitTimeMillsInSendQueue) {
this.waitTimeMillsInSendQueue = waitTimeMillsInSendQueue;
}
public long getConsumerFallbehindThreshold() {
return consumerFallbehindThreshold;
}
public void setConsumerFallbehindThreshold(final long consumerFallbehindThreshold) {
this.consumerFallbehindThreshold = consumerFallbehindThreshold;
}
public boolean isBrokerFastFailureEnable() {
return brokerFastFailureEnable;
}
public void setBrokerFastFailureEnable(final boolean brokerFastFailureEnable) {
this.brokerFastFailureEnable = brokerFastFailureEnable;
}
public long getWaitTimeMillsInPullQueue() {
return waitTimeMillsInPullQueue;
}
public void setWaitTimeMillsInPullQueue(final long waitTimeMillsInPullQueue) {
this.waitTimeMillsInPullQueue = waitTimeMillsInPullQueue;
}
public boolean isDisableConsumeIfConsumerReadSlowly() {
return disableConsumeIfConsumerReadSlowly;
}
public void setDisableConsumeIfConsumerReadSlowly(final boolean disableConsumeIfConsumerReadSlowly) {
this.disableConsumeIfConsumerReadSlowly = disableConsumeIfConsumerReadSlowly;
}
public boolean isSlaveReadEnable() {
return slaveReadEnable;
}
public void setSlaveReadEnable(final boolean slaveReadEnable) {
this.slaveReadEnable = slaveReadEnable;
}
public int getRegisterBrokerTimeoutMills() {
return registerBrokerTimeoutMills;
}
public void setRegisterBrokerTimeoutMills(final int registerBrokerTimeoutMills) {
this.registerBrokerTimeoutMills = registerBrokerTimeoutMills;
}
public String getRegionId() {
return regionId;
}
public void setRegionId(final String regionId) {
this.regionId = regionId;
}
public boolean isTransferMsgByHeap() {
return transferMsgByHeap;
}
public void setTransferMsgByHeap(final boolean transferMsgByHeap) {
this.transferMsgByHeap = transferMsgByHeap;
}
public String getMessageStorePlugIn() {
return messageStorePlugIn;
}
public void setMessageStorePlugIn(String messageStorePlugIn) {
this.messageStorePlugIn = messageStorePlugIn;
}
public boolean isHighSpeedMode() {
return highSpeedMode;
}
public void setHighSpeedMode(final boolean highSpeedMode) {
this.highSpeedMode = highSpeedMode;
}
public String getRocketmqHome() {
return rocketmqHome;
}
public void setRocketmqHome(String rocketmqHome) {
this.rocketmqHome = rocketmqHome;
}
public String getBrokerName() {
return brokerName;
}
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
public int getBrokerPermission() {
return brokerPermission;
}
public void setBrokerPermission(int brokerPermission) {
this.brokerPermission = brokerPermission;
}
public int getDefaultTopicQueueNums() {
return defaultTopicQueueNums;
}
public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
this.defaultTopicQueueNums = defaultTopicQueueNums;
}
public boolean isAutoCreateTopicEnable() {
return autoCreateTopicEnable;
}
public void setAutoCreateTopicEnable(boolean autoCreateTopic) {
this.autoCreateTopicEnable = autoCreateTopic;
}
public String getBrokerClusterName() {
return brokerClusterName;
}
public void setBrokerClusterName(String brokerClusterName) {
this.brokerClusterName = brokerClusterName;
}
public String getBrokerIP1() {
return brokerIP1;
}
public void setBrokerIP1(String brokerIP1) {
this.brokerIP1 = brokerIP1;
}
public String getBrokerIP2() {
return brokerIP2;
}
public void setBrokerIP2(String brokerIP2) {
this.brokerIP2 = brokerIP2;
}
public int getSendMessageThreadPoolNums() {
return sendMessageThreadPoolNums;
}
public void setSendMessageThreadPoolNums(int sendMessageThreadPoolNums) {
this.sendMessageThreadPoolNums = sendMessageThreadPoolNums;
}
public int getPullMessageThreadPoolNums() {
return pullMessageThreadPoolNums;
}
public void setPullMessageThreadPoolNums(int pullMessageThreadPoolNums) {
this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
}
public int getQueryMessageThreadPoolNums() {
return queryMessageThreadPoolNums;
}
public void setQueryMessageThreadPoolNums(final int queryMessageThreadPoolNums) {
this.queryMessageThreadPoolNums = queryMessageThreadPoolNums;
}
public int getAdminBrokerThreadPoolNums() {
return adminBrokerThreadPoolNums;
}
public void setAdminBrokerThreadPoolNums(int adminBrokerThreadPoolNums) {
this.adminBrokerThreadPoolNums = adminBrokerThreadPoolNums;
}
public int getFlushConsumerOffsetInterval() {
return flushConsumerOffsetInterval;
}
public void setFlushConsumerOffsetInterval(int flushConsumerOffsetInterval) {
this.flushConsumerOffsetInterval = flushConsumerOffsetInterval;
}
public int getFlushConsumerOffsetHistoryInterval() {
return flushConsumerOffsetHistoryInterval;
}
public void setFlushConsumerOffsetHistoryInterval(int flushConsumerOffsetHistoryInterval) {
this.flushConsumerOffsetHistoryInterval = flushConsumerOffsetHistoryInterval;
}
public boolean isClusterTopicEnable() {
return clusterTopicEnable;
}
public void setClusterTopicEnable(boolean clusterTopicEnable) {
this.clusterTopicEnable = clusterTopicEnable;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public long getBrokerId() {
return brokerId;
}
public void setBrokerId(long brokerId) {
this.brokerId = brokerId;
}
public boolean isAutoCreateSubscriptionGroup() {
return autoCreateSubscriptionGroup;
}
public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) {
this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup;
}
public boolean isRejectTransactionMessage() {
return rejectTransactionMessage;
}
public void setRejectTransactionMessage(boolean rejectTransactionMessage) {
this.rejectTransactionMessage = rejectTransactionMessage;
}
public boolean isFetchNamesrvAddrByAddressServer() {
return fetchNamesrvAddrByAddressServer;
}
public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {
this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;
}
public int getSendThreadPoolQueueCapacity() {
return sendThreadPoolQueueCapacity;
}
public void setSendThreadPoolQueueCapacity(int sendThreadPoolQueueCapacity) {
this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity;
}
public int getPullThreadPoolQueueCapacity() {
return pullThreadPoolQueueCapacity;
}
public void setPullThreadPoolQueueCapacity(int pullThreadPoolQueueCapacity) {
this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
}
public int getQueryThreadPoolQueueCapacity() {
return queryThreadPoolQueueCapacity;
}
public void setQueryThreadPoolQueueCapacity(final int queryThreadPoolQueueCapacity) {
this.queryThreadPoolQueueCapacity = queryThreadPoolQueueCapacity;
}
public boolean isBrokerTopicEnable() {
return brokerTopicEnable;
}
public void setBrokerTopicEnable(boolean brokerTopicEnable) {
this.brokerTopicEnable = brokerTopicEnable;
}
public int getFilterServerNums() {
return filterServerNums;
}
public void setFilterServerNums(int filterServerNums) {
this.filterServerNums = filterServerNums;
}
public boolean isLongPollingEnable() {
return longPollingEnable;
}
public void setLongPollingEnable(boolean longPollingEnable) {
this.longPollingEnable = longPollingEnable;
}
public boolean isNotifyConsumerIdsChangedEnable() {
return notifyConsumerIdsChangedEnable;
}
public void setNotifyConsumerIdsChangedEnable(boolean notifyConsumerIdsChangedEnable) {
this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
}
public long getShortPollingTimeMills() {
return shortPollingTimeMills;
}
public void setShortPollingTimeMills(long shortPollingTimeMills) {
this.shortPollingTimeMills = shortPollingTimeMills;
}
public int getClientManageThreadPoolNums() {
return clientManageThreadPoolNums;
}
public void setClientManageThreadPoolNums(int clientManageThreadPoolNums) {
this.clientManageThreadPoolNums = clientManageThreadPoolNums;
}
public boolean isCommercialEnable() {
return commercialEnable;
}
public void setCommercialEnable(final boolean commercialEnable) {
this.commercialEnable = commercialEnable;
}
public int getCommercialTimerCount() {
return commercialTimerCount;
}
public void setCommercialTimerCount(final int commercialTimerCount) {
this.commercialTimerCount = commercialTimerCount;
}
public int getCommercialTransCount() {
return commercialTransCount;
}
public void setCommercialTransCount(final int commercialTransCount) {
this.commercialTransCount = commercialTransCount;
}
public int getCommercialBigCount() {
return commercialBigCount;
}
public void setCommercialBigCount(final int commercialBigCount) {
this.commercialBigCount = commercialBigCount;
}
public int getMaxDelayTime() {
return maxDelayTime;
}
public void setMaxDelayTime(final int maxDelayTime) {
this.maxDelayTime = maxDelayTime;
}
public int getClientManagerThreadPoolQueueCapacity() {
return clientManagerThreadPoolQueueCapacity;
}
public void setClientManagerThreadPoolQueueCapacity(int clientManagerThreadPoolQueueCapacity) {
this.clientManagerThreadPoolQueueCapacity = clientManagerThreadPoolQueueCapacity;
}
public int getConsumerManagerThreadPoolQueueCapacity() {
return consumerManagerThreadPoolQueueCapacity;
}
public void setConsumerManagerThreadPoolQueueCapacity(int consumerManagerThreadPoolQueueCapacity) {
this.consumerManagerThreadPoolQueueCapacity = consumerManagerThreadPoolQueueCapacity;
}
public int getConsumerManageThreadPoolNums() {
return consumerManageThreadPoolNums;
}
public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) {
this.consumerManageThreadPoolNums = consumerManageThreadPoolNums;
}
public int getCommercialBaseCount() {
return commercialBaseCount;
}
public void setCommercialBaseCount(int commercialBaseCount) {
this.commercialBaseCount = commercialBaseCount;
}
public boolean isEnableCalcFilterBitMap() {
return enableCalcFilterBitMap;
}
public void setEnableCalcFilterBitMap(boolean enableCalcFilterBitMap) {
this.enableCalcFilterBitMap = enableCalcFilterBitMap;
}
public int getExpectConsumerNumUseFilter() {
return expectConsumerNumUseFilter;
}
public void setExpectConsumerNumUseFilter(int expectConsumerNumUseFilter) {
this.expectConsumerNumUseFilter = expectConsumerNumUseFilter;
}
public int getMaxErrorRateOfBloomFilter() {
return maxErrorRateOfBloomFilter;
}
public void setMaxErrorRateOfBloomFilter(int maxErrorRateOfBloomFilter) {
this.maxErrorRateOfBloomFilter = maxErrorRateOfBloomFilter;
}
public long getFilterDataCleanTimeSpan() {
return filterDataCleanTimeSpan;
}
public void setFilterDataCleanTimeSpan(long filterDataCleanTimeSpan) {
this.filterDataCleanTimeSpan = filterDataCleanTimeSpan;
}
public boolean isFilterSupportRetry() {
return filterSupportRetry;
}
public void setFilterSupportRetry(boolean filterSupportRetry) {
this.filterSupportRetry = filterSupportRetry;
}
public boolean isEnablePropertyFilter() {
return enablePropertyFilter;
}
public void setEnablePropertyFilter(boolean enablePropertyFilter) {
this.enablePropertyFilter = enablePropertyFilter;
}
public boolean isCompressedRegister() {
return compressedRegister;
}
public void setCompressedRegister(boolean compressedRegister) {
this.compressedRegister = compressedRegister;
}
public boolean isForceRegister() {
return forceRegister;
}
public void setForceRegister(boolean forceRegister) {
this.forceRegister = forceRegister;
}
public int getHeartbeatThreadPoolQueueCapacity() {
return heartbeatThreadPoolQueueCapacity;
}
public void setHeartbeatThreadPoolQueueCapacity(int heartbeatThreadPoolQueueCapacity) {
this.heartbeatThreadPoolQueueCapacity = heartbeatThreadPoolQueueCapacity;
}
public int getHeartbeatThreadPoolNums() {
return heartbeatThreadPoolNums;
}
public void setHeartbeatThreadPoolNums(int heartbeatThreadPoolNums) {
this.heartbeatThreadPoolNums = heartbeatThreadPoolNums;
}
public long getWaitTimeMillsInHeartbeatQueue() {
return waitTimeMillsInHeartbeatQueue;
}
public void setWaitTimeMillsInHeartbeatQueue(long waitTimeMillsInHeartbeatQueue) {
this.waitTimeMillsInHeartbeatQueue = waitTimeMillsInHeartbeatQueue;
}
public int getRegisterNameServerPeriod() {
return registerNameServerPeriod;
}
public void setRegisterNameServerPeriod(int registerNameServerPeriod) {
this.registerNameServerPeriod = registerNameServerPeriod;
}
public long getTransactionTimeOut() {
return transactionTimeOut;
}
public void setTransactionTimeOut(long transactionTimeOut) {
this.transactionTimeOut = transactionTimeOut;
}
public int getTransactionCheckMax() {
return transactionCheckMax;
}
public void setTransactionCheckMax(int transactionCheckMax) {
this.transactionCheckMax = transactionCheckMax;
}
public long getTransactionCheckInterval() {
return transactionCheckInterval;
}
public void setTransactionCheckInterval(long transactionCheckInterval) {
this.transactionCheckInterval = transactionCheckInterval;
}
public int getEndTransactionThreadPoolNums() {
return endTransactionThreadPoolNums;
}
public void setEndTransactionThreadPoolNums(int endTransactionThreadPoolNums) {
this.endTransactionThreadPoolNums = endTransactionThreadPoolNums;
}
public int getEndTransactionPoolQueueCapacity() {
return endTransactionPoolQueueCapacity;
}
public void setEndTransactionPoolQueueCapacity(int endTransactionPoolQueueCapacity) {
this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity;
}
public long getWaitTimeMillsInTransactionQueue() {
return waitTimeMillsInTransactionQueue;
}
public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
}
public String getMsgTraceTopicName() {
return msgTraceTopicName;
}
public void setMsgTraceTopicName(String msgTraceTopicName) {
this.msgTraceTopicName = msgTraceTopicName;
}
public boolean isTraceTopicEnable() {
return traceTopicEnable;
}
public void setTraceTopicEnable(boolean traceTopicEnable) {
this.traceTopicEnable = traceTopicEnable;
}
public boolean isAclEnable() {
return aclEnable;
}
public void setAclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
}
}
6、org.apache.rocketmq.remoting.netty.NettyServerConfig.java
源码:
/*
D:\RocketMQ\rocketmq-master\remoting\src\main\java\org\apache\rocketmq\remoting\netty\NettyServerConfig.java
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
public class NettyServerConfig implements Cloneable {
private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;
/**
* make make install
*
*
* ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
*/
private boolean useEpollNativeSelector = false;
public int getListenPort() {
return listenPort;
}
public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}
public int getServerWorkerThreads() {
return serverWorkerThreads;
}
public void setServerWorkerThreads(int serverWorkerThreads) {
this.serverWorkerThreads = serverWorkerThreads;
}
public int getServerSelectorThreads() {
return serverSelectorThreads;
}
public void setServerSelectorThreads(int serverSelectorThreads) {
this.serverSelectorThreads = serverSelectorThreads;
}
public int getServerOnewaySemaphoreValue() {
return serverOnewaySemaphoreValue;
}
public void setServerOnewaySemaphoreValue(int serverOnewaySemaphoreValue) {
this.serverOnewaySemaphoreValue = serverOnewaySemaphoreValue;
}
public int getServerCallbackExecutorThreads() {
return serverCallbackExecutorThreads;
}
public void setServerCallbackExecutorThreads(int serverCallbackExecutorThreads) {
this.serverCallbackExecutorThreads = serverCallbackExecutorThreads;
}
public int getServerAsyncSemaphoreValue() {
return serverAsyncSemaphoreValue;
}
public void setServerAsyncSemaphoreValue(int serverAsyncSemaphoreValue) {
this.serverAsyncSemaphoreValue = serverAsyncSemaphoreValue;
}
public int getServerChannelMaxIdleTimeSeconds() {
return serverChannelMaxIdleTimeSeconds;
}
public void setServerChannelMaxIdleTimeSeconds(int serverChannelMaxIdleTimeSeconds) {
this.serverChannelMaxIdleTimeSeconds = serverChannelMaxIdleTimeSeconds;
}
public int getServerSocketSndBufSize() {
return serverSocketSndBufSize;
}
public void setServerSocketSndBufSize(int serverSocketSndBufSize) {
this.serverSocketSndBufSize = serverSocketSndBufSize;
}
public int getServerSocketRcvBufSize() {
return serverSocketRcvBufSize;
}
public void setServerSocketRcvBufSize(int serverSocketRcvBufSize) {
this.serverSocketRcvBufSize = serverSocketRcvBufSize;
}
public boolean isServerPooledByteBufAllocatorEnable() {
return serverPooledByteBufAllocatorEnable;
}
public void setServerPooledByteBufAllocatorEnable(boolean serverPooledByteBufAllocatorEnable) {
this.serverPooledByteBufAllocatorEnable = serverPooledByteBufAllocatorEnable;
}
public boolean isUseEpollNativeSelector() {
return useEpollNativeSelector;
}
public void setUseEpollNativeSelector(boolean useEpollNativeSelector) {
this.useEpollNativeSelector = useEpollNativeSelector;
}
@Override
public Object clone() throws CloneNotSupportedException {
return (NettyServerConfig) super.clone();
}
}
7、org.apache.rocketmq.remoting.netty.NettyClientConfig.java
源码:
/*
D:\RocketMQ\rocketmq-master\remoting\src\main\java\org\apache\rocketmq\remoting\netty\NettyClientConfig.java
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
public class NettyClientConfig {
/**
* Worker thread number
*/
private int clientWorkerThreads = 4;
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
private int connectTimeoutMillis = 3000;
private long channelNotActiveInterval = 1000 * 60;
/**
* IdleStateEvent will be triggered when neither read nor write was performed for
* the specified period of this time. Specify {@code 0} to disable
*/
private int clientChannelMaxIdleTimeSeconds = 120;
private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean clientPooledByteBufAllocatorEnable = false;
private boolean clientCloseSocketIfTimeout = false;
private boolean useTLS;
public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;
}
public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) {
this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;
}
public int getClientWorkerThreads() {
return clientWorkerThreads;
}
public void setClientWorkerThreads(int clientWorkerThreads) {
this.clientWorkerThreads = clientWorkerThreads;
}
public int getClientOnewaySemaphoreValue() {
return clientOnewaySemaphoreValue;
}
public void setClientOnewaySemaphoreValue(int clientOnewaySemaphoreValue) {
this.clientOnewaySemaphoreValue = clientOnewaySemaphoreValue;
}
public int getConnectTimeoutMillis() {
return connectTimeoutMillis;
}
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
}
public int getClientCallbackExecutorThreads() {
return clientCallbackExecutorThreads;
}
public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
}
public long getChannelNotActiveInterval() {
return channelNotActiveInterval;
}
public void setChannelNotActiveInterval(long channelNotActiveInterval) {
this.channelNotActiveInterval = channelNotActiveInterval;
}
public int getClientAsyncSemaphoreValue() {
return clientAsyncSemaphoreValue;
}
public void setClientAsyncSemaphoreValue(int clientAsyncSemaphoreValue) {
this.clientAsyncSemaphoreValue = clientAsyncSemaphoreValue;
}
public int getClientChannelMaxIdleTimeSeconds() {
return clientChannelMaxIdleTimeSeconds;
}
public void setClientChannelMaxIdleTimeSeconds(int clientChannelMaxIdleTimeSeconds) {
this.clientChannelMaxIdleTimeSeconds = clientChannelMaxIdleTimeSeconds;
}
public int getClientSocketSndBufSize() {
return clientSocketSndBufSize;
}
public void setClientSocketSndBufSize(int clientSocketSndBufSize) {
this.clientSocketSndBufSize = clientSocketSndBufSize;
}
public int getClientSocketRcvBufSize() {
return clientSocketRcvBufSize;
}
public void setClientSocketRcvBufSize(int clientSocketRcvBufSize) {
this.clientSocketRcvBufSize = clientSocketRcvBufSize;
}
public boolean isClientPooledByteBufAllocatorEnable() {
return clientPooledByteBufAllocatorEnable;
}
public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) {
this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable;
}
public boolean isUseTLS() {
return useTLS;
}
public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;
}
}
8、org.apache.rocketmq.store.config.MessageStoreConfig.java
源码:
/*
D:\RocketMQ\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\config\MessageStoreConfig.java
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.config;
import java.io.File;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.store.ConsumeQueue;
public class MessageStoreConfig {
//The root directory in which the log data is kept
@ImportantField
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
//The directory in which the commitlog is kept
@ImportantField
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
// CommitLog file size,default is 1G
private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
// ConsumeQueue file size,default is 30W
private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
// enable consume queue ext
private boolean enableConsumeQueueExt = false;
// ConsumeQueue extend file size, 48M
private int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024;
// Bit count of filter bit map.
// this will be set by pipe of calculate filter bit map.
private int bitMapLengthConsumeQueueExt = 64;
// CommitLog flush interval
// flush data to disk
@ImportantField
private int flushIntervalCommitLog = 500;
// Only used if TransientStorePool enabled
// flush data to FileChannel
@ImportantField
private int commitIntervalCommitLog = 200;
/**
* introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.<br/>
* By default it is set to false indicating using spin lock when putting message.
*/
private boolean useReentrantLockWhenPutMessage = false;
// Whether schedule flush,default is real-time
@ImportantField
private boolean flushCommitLogTimed = false;
// ConsumeQueue flush interval
private int flushIntervalConsumeQueue = 1000;
// Resource reclaim interval
private int cleanResourceInterval = 10000;
// CommitLog removal interval
private int deleteCommitLogFilesInterval = 100;
// ConsumeQueue removal interval
private int deleteConsumeQueueFilesInterval = 100;
private int destroyMapedFileIntervalForcibly = 1000 * 120;
private int redeleteHangedFileInterval = 1000 * 120;
// When to delete,default is at 4 am
@ImportantField
private String deleteWhen = "04";
private int diskMaxUsedSpaceRatio = 75;
// The number of hours to keep a log file before deleting it (in hours)
@ImportantField
private int fileReservedTime = 72;
// Flow control for ConsumeQueue
private int putMsgIndexHightWater = 600000;
// The maximum size of a single log file,default is 512K
private int maxMessageSize = 1024 * 1024 * 4;
// Whether check the CRC32 of the records consumed.
// This ensures no on-the-wire or on-disk corruption to the messages occurred.
// This check adds some overhead,so it may be disabled in cases seeking extreme performance.
private boolean checkCRCOnRecover = true;
// How many pages are to be flushed when flush CommitLog
private int flushCommitLogLeastPages = 4;
// How many pages are to be committed when commit data to file
private int commitCommitLogLeastPages = 4;
// Flush page size when the disk in warming state
private int flushLeastPagesWhenWarmMapedFile = 1024 / 4 * 16;
// How many pages are to be flushed when flush ConsumeQueue
private int flushConsumeQueueLeastPages = 2;
private int flushCommitLogThoroughInterval = 1000 * 10;
private int commitCommitLogThoroughInterval = 200;
private int flushConsumeQueueThoroughInterval = 1000 * 60;
@ImportantField
private int maxTransferBytesOnMessageInMemory = 1024 * 256;
@ImportantField
private int maxTransferCountOnMessageInMemory = 32;
@ImportantField
private int maxTransferBytesOnMessageInDisk = 1024 * 64;
@ImportantField
private int maxTransferCountOnMessageInDisk = 8;
@ImportantField
private int accessMessageInMemoryMaxRatio = 40;
@ImportantField
private boolean messageIndexEnable = true;
private int maxHashSlotNum = 5000000;
private int maxIndexNum = 5000000 * 4;
private int maxMsgsNumBatch = 64;
@ImportantField
private boolean messageIndexSafe = false;
private int haListenPort = 10912;
private int haSendHeartbeatInterval = 1000 * 5;
private int haHousekeepingInterval = 1000 * 20;
private int haTransferBatchSize = 1024 * 32;
@ImportantField
private String haMasterAddress = null;
private int haSlaveFallbehindMax = 1024 * 1024 * 256;
@ImportantField
private BrokerRole brokerRole = BrokerRole.ASYNC_MASTER;
@ImportantField
private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH;
private int syncFlushTimeout = 1000 * 5;
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
private long flushDelayOffsetInterval = 1000 * 10;
@ImportantField
private boolean cleanFileForciblyEnable = true;
private boolean warmMapedFileEnable = false;
private boolean offsetCheckInSlave = false;
private boolean debugLockEnable = false;
private boolean duplicationEnable = false;
private boolean diskFallRecorded = true;
private long osPageCacheBusyTimeOutMills = 1000;
private int defaultQueryMaxNum = 32;
@ImportantField
private boolean transientStorePoolEnable = false;
private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false;
private boolean enableDLegerCommitLog = false;
private String dLegerGroup;
private String dLegerPeers;
private String dLegerSelfId;
public boolean isDebugLockEnable() {
return debugLockEnable;
}
public void setDebugLockEnable(final boolean debugLockEnable) {
this.debugLockEnable = debugLockEnable;
}
public boolean isDuplicationEnable() {
return duplicationEnable;
}
public void setDuplicationEnable(final boolean duplicationEnable) {
this.duplicationEnable = duplicationEnable;
}
public long getOsPageCacheBusyTimeOutMills() {
return osPageCacheBusyTimeOutMills;
}
public void setOsPageCacheBusyTimeOutMills(final long osPageCacheBusyTimeOutMills) {
this.osPageCacheBusyTimeOutMills = osPageCacheBusyTimeOutMills;
}
public boolean isDiskFallRecorded() {
return diskFallRecorded;
}
public void setDiskFallRecorded(final boolean diskFallRecorded) {
this.diskFallRecorded = diskFallRecorded;
}
public boolean isWarmMapedFileEnable() {
return warmMapedFileEnable;
}
public void setWarmMapedFileEnable(boolean warmMapedFileEnable) {
this.warmMapedFileEnable = warmMapedFileEnable;
}
public int getMapedFileSizeCommitLog() {
return mapedFileSizeCommitLog;
}
public void setMapedFileSizeCommitLog(int mapedFileSizeCommitLog) {
this.mapedFileSizeCommitLog = mapedFileSizeCommitLog;
}
public int getMapedFileSizeConsumeQueue() {
int factor = (int) Math.ceil(this.mapedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
}
public void setMapedFileSizeConsumeQueue(int mapedFileSizeConsumeQueue) {
this.mapedFileSizeConsumeQueue = mapedFileSizeConsumeQueue;
}
public boolean isEnableConsumeQueueExt() {
return enableConsumeQueueExt;
}
public void setEnableConsumeQueueExt(boolean enableConsumeQueueExt) {
this.enableConsumeQueueExt = enableConsumeQueueExt;
}
public int getMappedFileSizeConsumeQueueExt() {
return mappedFileSizeConsumeQueueExt;
}
public void setMappedFileSizeConsumeQueueExt(int mappedFileSizeConsumeQueueExt) {
this.mappedFileSizeConsumeQueueExt = mappedFileSizeConsumeQueueExt;
}
public int getBitMapLengthConsumeQueueExt() {
return bitMapLengthConsumeQueueExt;
}
public void setBitMapLengthConsumeQueueExt(int bitMapLengthConsumeQueueExt) {
this.bitMapLengthConsumeQueueExt = bitMapLengthConsumeQueueExt;
}
public int getFlushIntervalCommitLog() {
return flushIntervalCommitLog;
}
public void setFlushIntervalCommitLog(int flushIntervalCommitLog) {
this.flushIntervalCommitLog = flushIntervalCommitLog;
}
public int getFlushIntervalConsumeQueue() {
return flushIntervalConsumeQueue;
}
public void setFlushIntervalConsumeQueue(int flushIntervalConsumeQueue) {
this.flushIntervalConsumeQueue = flushIntervalConsumeQueue;
}
public int getPutMsgIndexHightWater() {
return putMsgIndexHightWater;
}
public void setPutMsgIndexHightWater(int putMsgIndexHightWater) {
this.putMsgIndexHightWater = putMsgIndexHightWater;
}
public int getCleanResourceInterval() {
return cleanResourceInterval;
}
public void setCleanResourceInterval(int cleanResourceInterval) {
this.cleanResourceInterval = cleanResourceInterval;
}
public int getMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
public boolean isCheckCRCOnRecover() {
return checkCRCOnRecover;
}
public boolean getCheckCRCOnRecover() {
return checkCRCOnRecover;
}
public void setCheckCRCOnRecover(boolean checkCRCOnRecover) {
this.checkCRCOnRecover = checkCRCOnRecover;
}
public String getStorePathCommitLog() {
return storePathCommitLog;
}
public void setStorePathCommitLog(String storePathCommitLog) {
this.storePathCommitLog = storePathCommitLog;
}
public String getDeleteWhen() {
return deleteWhen;
}
public void setDeleteWhen(String deleteWhen) {
this.deleteWhen = deleteWhen;
}
public int getDiskMaxUsedSpaceRatio() {
if (this.diskMaxUsedSpaceRatio < 10)
return 10;
if (this.diskMaxUsedSpaceRatio > 95)
return 95;
return diskMaxUsedSpaceRatio;
}
public void setDiskMaxUsedSpaceRatio(int diskMaxUsedSpaceRatio) {
this.diskMaxUsedSpaceRatio = diskMaxUsedSpaceRatio;
}
public int getDeleteCommitLogFilesInterval() {
return deleteCommitLogFilesInterval;
}
public void setDeleteCommitLogFilesInterval(int deleteCommitLogFilesInterval) {
this.deleteCommitLogFilesInterval = deleteCommitLogFilesInterval;
}
public int getDeleteConsumeQueueFilesInterval() {
return deleteConsumeQueueFilesInterval;
}
public void setDeleteConsumeQueueFilesInterval(int deleteConsumeQueueFilesInterval) {
this.deleteConsumeQueueFilesInterval = deleteConsumeQueueFilesInterval;
}
public int getMaxTransferBytesOnMessageInMemory() {
return maxTransferBytesOnMessageInMemory;
}
public void setMaxTransferBytesOnMessageInMemory(int maxTransferBytesOnMessageInMemory) {
this.maxTransferBytesOnMessageInMemory = maxTransferBytesOnMessageInMemory;
}
public int getMaxTransferCountOnMessageInMemory() {
return maxTransferCountOnMessageInMemory;
}
public void setMaxTransferCountOnMessageInMemory(int maxTransferCountOnMessageInMemory) {
this.maxTransferCountOnMessageInMemory = maxTransferCountOnMessageInMemory;
}
public int getMaxTransferBytesOnMessageInDisk() {
return maxTransferBytesOnMessageInDisk;
}
public void setMaxTransferBytesOnMessageInDisk(int maxTransferBytesOnMessageInDisk) {
this.maxTransferBytesOnMessageInDisk = maxTransferBytesOnMessageInDisk;
}
public int getMaxTransferCountOnMessageInDisk() {
return maxTransferCountOnMessageInDisk;
}
public void setMaxTransferCountOnMessageInDisk(int maxTransferCountOnMessageInDisk) {
this.maxTransferCountOnMessageInDisk = maxTransferCountOnMessageInDisk;
}
public int getFlushCommitLogLeastPages() {
return flushCommitLogLeastPages;
}
public void setFlushCommitLogLeastPages(int flushCommitLogLeastPages) {
this.flushCommitLogLeastPages = flushCommitLogLeastPages;
}
public int getFlushConsumeQueueLeastPages() {
return flushConsumeQueueLeastPages;
}
public void setFlushConsumeQueueLeastPages(int flushConsumeQueueLeastPages) {
this.flushConsumeQueueLeastPages = flushConsumeQueueLeastPages;
}
public int getFlushCommitLogThoroughInterval() {
return flushCommitLogThoroughInterval;
}
public void setFlushCommitLogThoroughInterval(int flushCommitLogThoroughInterval) {
this.flushCommitLogThoroughInterval = flushCommitLogThoroughInterval;
}
public int getFlushConsumeQueueThoroughInterval() {
return flushConsumeQueueThoroughInterval;
}
public void setFlushConsumeQueueThoroughInterval(int flushConsumeQueueThoroughInterval) {
this.flushConsumeQueueThoroughInterval = flushConsumeQueueThoroughInterval;
}
public int getDestroyMapedFileIntervalForcibly() {
return destroyMapedFileIntervalForcibly;
}
public void setDestroyMapedFileIntervalForcibly(int destroyMapedFileIntervalForcibly) {
this.destroyMapedFileIntervalForcibly = destroyMapedFileIntervalForcibly;
}
public int getFileReservedTime() {
return fileReservedTime;
}
public void setFileReservedTime(int fileReservedTime) {
this.fileReservedTime = fileReservedTime;
}
public int getRedeleteHangedFileInterval() {
return redeleteHangedFileInterval;
}
public void setRedeleteHangedFileInterval(int redeleteHangedFileInterval) {
this.redeleteHangedFileInterval = redeleteHangedFileInterval;
}
public int getAccessMessageInMemoryMaxRatio() {
return accessMessageInMemoryMaxRatio;
}
public void setAccessMessageInMemoryMaxRatio(int accessMessageInMemoryMaxRatio) {
this.accessMessageInMemoryMaxRatio = accessMessageInMemoryMaxRatio;
}
public boolean isMessageIndexEnable() {
return messageIndexEnable;
}
public void setMessageIndexEnable(boolean messageIndexEnable) {
this.messageIndexEnable = messageIndexEnable;
}
public int getMaxHashSlotNum() {
return maxHashSlotNum;
}
public void setMaxHashSlotNum(int maxHashSlotNum) {
this.maxHashSlotNum = maxHashSlotNum;
}
public int getMaxIndexNum() {
return maxIndexNum;
}
public void setMaxIndexNum(int maxIndexNum) {
this.maxIndexNum = maxIndexNum;
}
public int getMaxMsgsNumBatch() {
return maxMsgsNumBatch;
}
public void setMaxMsgsNumBatch(int maxMsgsNumBatch) {
this.maxMsgsNumBatch = maxMsgsNumBatch;
}
public int getHaListenPort() {
return haListenPort;
}
public void setHaListenPort(int haListenPort) {
this.haListenPort = haListenPort;
}
public int getHaSendHeartbeatInterval() {
return haSendHeartbeatInterval;
}
public void setHaSendHeartbeatInterval(int haSendHeartbeatInterval) {
this.haSendHeartbeatInterval = haSendHeartbeatInterval;
}
public int getHaHousekeepingInterval() {
return haHousekeepingInterval;
}
public void setHaHousekeepingInterval(int haHousekeepingInterval) {
this.haHousekeepingInterval = haHousekeepingInterval;
}
public BrokerRole getBrokerRole() {
return brokerRole;
}
public void setBrokerRole(BrokerRole brokerRole) {
this.brokerRole = brokerRole;
}
public void setBrokerRole(String brokerRole) {
this.brokerRole = BrokerRole.valueOf(brokerRole);
}
public int getHaTransferBatchSize() {
return haTransferBatchSize;
}
public void setHaTransferBatchSize(int haTransferBatchSize) {
this.haTransferBatchSize = haTransferBatchSize;
}
public int getHaSlaveFallbehindMax() {
return haSlaveFallbehindMax;
}
public void setHaSlaveFallbehindMax(int haSlaveFallbehindMax) {
this.haSlaveFallbehindMax = haSlaveFallbehindMax;
}
public FlushDiskType getFlushDiskType() {
return flushDiskType;
}
public void setFlushDiskType(FlushDiskType flushDiskType) {
this.flushDiskType = flushDiskType;
}
public void setFlushDiskType(String type) {
this.flushDiskType = FlushDiskType.valueOf(type);
}
public int getSyncFlushTimeout() {
return syncFlushTimeout;
}
public void setSyncFlushTimeout(int syncFlushTimeout) {
this.syncFlushTimeout = syncFlushTimeout;
}
public String getHaMasterAddress() {
return haMasterAddress;
}
public void setHaMasterAddress(String haMasterAddress) {
this.haMasterAddress = haMasterAddress;
}
public String getMessageDelayLevel() {
return messageDelayLevel;
}
public void setMessageDelayLevel(String messageDelayLevel) {
this.messageDelayLevel = messageDelayLevel;
}
public long getFlushDelayOffsetInterval() {
return flushDelayOffsetInterval;
}
public void setFlushDelayOffsetInterval(long flushDelayOffsetInterval) {
this.flushDelayOffsetInterval = flushDelayOffsetInterval;
}
public boolean isCleanFileForciblyEnable() {
return cleanFileForciblyEnable;
}
public void setCleanFileForciblyEnable(boolean cleanFileForciblyEnable) {
this.cleanFileForciblyEnable = cleanFileForciblyEnable;
}
public boolean isMessageIndexSafe() {
return messageIndexSafe;
}
public void setMessageIndexSafe(boolean messageIndexSafe) {
this.messageIndexSafe = messageIndexSafe;
}
public boolean isFlushCommitLogTimed() {
return flushCommitLogTimed;
}
public void setFlushCommitLogTimed(boolean flushCommitLogTimed) {
this.flushCommitLogTimed = flushCommitLogTimed;
}
public String getStorePathRootDir() {
return storePathRootDir;
}
public void setStorePathRootDir(String storePathRootDir) {
this.storePathRootDir = storePathRootDir;
}
public int getFlushLeastPagesWhenWarmMapedFile() {
return flushLeastPagesWhenWarmMapedFile;
}
public void setFlushLeastPagesWhenWarmMapedFile(int flushLeastPagesWhenWarmMapedFile) {
this.flushLeastPagesWhenWarmMapedFile = flushLeastPagesWhenWarmMapedFile;
}
public boolean isOffsetCheckInSlave() {
return offsetCheckInSlave;
}
public void setOffsetCheckInSlave(boolean offsetCheckInSlave) {
this.offsetCheckInSlave = offsetCheckInSlave;
}
public int getDefaultQueryMaxNum() {
return defaultQueryMaxNum;
}
public void setDefaultQueryMaxNum(int defaultQueryMaxNum) {
this.defaultQueryMaxNum = defaultQueryMaxNum;
}
/**
* Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is
* ASYNC_FLUSH
*
* @return <tt>true</tt> or <tt>false</tt>
*/
public boolean isTransientStorePoolEnable() {
return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
&& BrokerRole.SLAVE != getBrokerRole();
}
public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
this.transientStorePoolEnable = transientStorePoolEnable;
}
public int getTransientStorePoolSize() {
return transientStorePoolSize;
}
public void setTransientStorePoolSize(final int transientStorePoolSize) {
this.transientStorePoolSize = transientStorePoolSize;
}
public int getCommitIntervalCommitLog() {
return commitIntervalCommitLog;
}
public void setCommitIntervalCommitLog(final int commitIntervalCommitLog) {
this.commitIntervalCommitLog = commitIntervalCommitLog;
}
public boolean isFastFailIfNoBufferInStorePool() {
return fastFailIfNoBufferInStorePool;
}
public void setFastFailIfNoBufferInStorePool(final boolean fastFailIfNoBufferInStorePool) {
this.fastFailIfNoBufferInStorePool = fastFailIfNoBufferInStorePool;
}
public boolean isUseReentrantLockWhenPutMessage() {
return useReentrantLockWhenPutMessage;
}
public void setUseReentrantLockWhenPutMessage(final boolean useReentrantLockWhenPutMessage) {
this.useReentrantLockWhenPutMessage = useReentrantLockWhenPutMessage;
}
public int getCommitCommitLogLeastPages() {
return commitCommitLogLeastPages;
}
public void setCommitCommitLogLeastPages(final int commitCommitLogLeastPages) {
this.commitCommitLogLeastPages = commitCommitLogLeastPages;
}
public int getCommitCommitLogThoroughInterval() {
return commitCommitLogThoroughInterval;
}
public void setCommitCommitLogThoroughInterval(final int commitCommitLogThoroughInterval) {
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
}
public String getdLegerGroup() {
return dLegerGroup;
}
public void setdLegerGroup(String dLegerGroup) {
this.dLegerGroup = dLegerGroup;
}
public String getdLegerPeers() {
return dLegerPeers;
}
public void setdLegerPeers(String dLegerPeers) {
this.dLegerPeers = dLegerPeers;
}
public String getdLegerSelfId() {
return dLegerSelfId;
}
public void setdLegerSelfId(String dLegerSelfId) {
this.dLegerSelfId = dLegerSelfId;
}
public boolean isEnableDLegerCommitLog() {
return enableDLegerCommitLog;
}
public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) {
this.enableDLegerCommitLog = enableDLegerCommitLog;
}
}
二、 消息中间件 RocketMQ 源码分析:路由注册之发送心跳包
1、路由注册 示例图:
2、路由注册:发送心跳包
RocketMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。Broker 启动时向集群中所有的 NameServer 发送心跳信息,每隔30s向集群中所有 NameServer 发送心跳包,NameServer 收到心跳包时会更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdataTimeStamp 信息,然后 NameServer 每隔10s扫描 brokerLiveTable,如果连续120S没有收到心跳包,NameServer 将移除 Broker 的路由信息同时关闭 Socket 连接。
3、代码:BrokerController#start
//注册Broker信息
this.registerBrokerAll(true, false, true);
//每隔30s上报Broker信息到NameServer
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)),
TimeUnit.MILLISECONDS);
4、代码:BrokerOuterAPI#registerBrokerAll
//获得nameServer地址信息
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
//遍历所有nameserver列表
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
//封装请求头
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
//封装请求体
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//分别向NameServer注册
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
5、代码:BrokerOutAPI#registerBroker
if (oneway) {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
三、 消息中间件 RocketMQ 源码分析:路由注册之处理请求包
1、路由注册之处理请求包 示例图:
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor
网路处理类解析请求类型,如果请求类型是为REGISTER_BROKER,则将请求转发到RouteInfoManager#regiesterBroker
2、代码:DefaultRequestProcessor#processRequest
//判断是注册Broker信息
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
//注册Broker信息
return this.registerBroker(ctx, request);
}
3、代码:DefaultRequestProcessor#registerBroker
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
4、代码:RouteInfoManager#registerBroker
维护路由信息
//加锁
this.lock.writeLock().lockInterruptibly();
//维护clusterAddrTable
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
5、维护 brokerAddrTable
//维护brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
//第一次注册,则创建brokerData
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
//非第一次注册,更新Broker
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
6、维护 topicQueueTable
//维护topicQueueTable
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) ||
registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
7、代码:RouteInfoManager#createAndUpdateQueueData
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
//创建QueueData
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
//获得topicQueueTable中队列集合
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
//topicQueueTable为空,则直接添加queueData到队列集合
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
//判断是否是新的队列
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
//如果brokerName相同,代表不是新的队列
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove();
}
}
}
//如果是新的队列,则添加队列到queueDataList
if (addNewOne) {
queueDataList.add(queueData);
}
}
}
8、维护 brokerLiveTable
//维护brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
9、维护 filterServerList
//维护filterServerList
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
四、 消息中间件 RocketMQ 源码分析:路由删除
1、路由删除
Broker
每隔30s向NameServer
发送一个心跳包,心跳包包含BrokerId
,Broker
地址,Broker
名称,Broker
所属集群名称、Broker
关联的FilterServer
列表。但是如果Broker
宕机,NameServer
无法收到心跳包,此时NameServer
如何来剔除这些失效的Broker
呢?NameServer
会每隔10s扫描brokerLiveTable
状态表,如果BrokerLive
的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker
失效,移除该Broker
,关闭与Broker
连接,同时更新topicQueueTable
、brokerAddrTable
、brokerLiveTable
、filterServerTable
。
2、RocketMQ 有两个触发点来删除路由信息:
- NameServer 定期扫描 brokerLiveTable 检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除 broker。
- Broker 在正常关闭的情况下,会执行 unregisterBroker 指令
这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该 broker 相关的信息。
3、代码:NamesrvController#initialize
//每隔10s扫描一次为活跃Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
4、代码:RouteInfoManager#scanNotActiveBroker
public void scanNotActiveBroker() {
//获得brokerLiveTable
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
//遍历brokerLiveTable
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
//如果收到心跳包的时间距当时时间是否超过120s
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
//关闭连接
RemotingUtil.closeChannel(next.getValue().getChannel());
//移除broker
it.remove();
//维护路由表
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
5、代码:RouteInfoManager#onChannelDestroy
//申请写锁,根据brokerAddress从brokerLiveTable和filterServerTable移除
this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
6、维护 brokerAddrTable
//维护brokerAddrTable
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();
//遍历brokerAddrTable
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
//遍历broker地址
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
//根据broker地址移除brokerAddr
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}
//如果当前主题只包含待移除的broker,则移除该topic
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}
7、维护 clusterAddrTable
//维护clusterAddrTable
if (brokerNameFound != null && removeBrokerName) {
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
//遍历clusterAddrTable
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
//获得集群名称
String clusterName = entry.getKey();
//获得集群中brokerName集合
Set<String> brokerNames = entry.getValue();
//从brokerNames中移除brokerNameFound
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
//如果集群中不包含任何broker,则移除该集群
it.remove();
}
break;
}
}
}
8、维护 topicQueueTable队列
//维护topicQueueTable队列
if (removeBrokerName) {
//遍历topicQueueTable
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
//主题名称
String topic = entry.getKey();
//队列集合
List<QueueData> queueDataList = entry.getValue();
//遍历该主题队列
Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
//从队列中移除为活跃broker信息
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, queueData);
}
}
//如果该topic的队列为空,则移除该topic
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
}
}
}
9、释放写锁
//释放写锁
finally {
this.lock.writeLock().unlock();
}
五、 消息中间件 RocketMQ 源码分析:路由发现和小结
1、路由发现
RocketMQ 路由发现是非实时的,当 Topic 路由出现变化后,NameServer 不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。
2、代码:DefaultRequestProcessor#getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
//调用RouteInfoManager的方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable中分别填充TopicRouteData的List<QueueData>、List<BrokerData>、filterServer
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
//如果找到主题对应你的路由信息并且该主题为顺序消息,则从NameServer KVConfig中获取关于顺序消息相关的配置填充路由信息
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
3、 NameServer 小结
# 消息中间件 RocketMQ 高级功能和源码分析(四)