这是本人学习的总结,主要学习资料如下
- 马士兵教育
- rocketMq官方文档
目录
- 1、Overview
- 1.1、创建MQClientInstance
- 1.1.1、检查
- 1.1.1、MQClientInstance的ID
- 1.2、MQClientInstance.start()
1、Overview
这是发送信息的代码样例,
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.start();
for (int i = 0; i < MESSAGE_COUNT; i++) {
try {
Message msg = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
生产者启动最少需要两个信息,group
和nameSrvAddr
。启动的源码则是producer#start()
中。
最终RocketMQ
会创建MQClientInstance
的实例,然后在调用MQClientInstance#start()
完成生产者的启动。
1.1、创建MQClientInstance
1.1.1、检查
代码线索DefaultMQProducer#start() -> DefaultMQProducerImpl#start() -> DefaultMQProducerImpl#checkConfig()
。
创建MQClientInstance
前做前置检查,主要是检查group
的格式,并且不能和系统的group
重命。
1.1.1、MQClientInstance的ID
MQClientInstance
由MQClientManager
进行管理。MQClientManager
整个JVM中只有一个实例,其内部用ConcurrentMap<String, MQClientManager>
管理着所有的MQClientInstance
,其中的String可以看成是每个MQClientInstance
的id,下面通过源码查看id是如何组成的。
代码线索DefaultMQProducer#start() -> DefaultMQProducerImpl#start() -> MQClientManager#getInstance()#getOrCreateMQClientInstance() -> ClientConfig#buildMQClientId()
。
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
if (enableStreamRequestType) {
sb.append("@");
sb.append(RequestType.STREAM);
}
return sb.toString();
}
很明显,每个MQClientInstance
的ID主要是由IP
,instanceName
和unitName
组成,其中instanceName
和unitName
都可以设置。所以如果我们想要创建多个MQClientInstance
使用的话,可以设置不同的instanceName
和unitName
。
1.2、MQClientInstance.start()
启动一些线程池,心跳服务。
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
// NRC start
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}