入口
这里分析源码用的入口是:
org.apache.rocketmq.example.quickstart
package org.apache.rocketmq.example.quickstart;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
producer.setNamesrvAddr("127.0.0.1:9876");
/*
* Launch the instance.
*/
producer.start();
for (int i = 0; i < 1000; i++) {
try {
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
}
}
在发送消息的时候用的是DefaultMQProducer
DefaultMQProducer
这个就是我们业务层经常使用的对象,用来发送消息
public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Wrapping internal implementations for virtually all methods presented in this class.
*/
// 生产者实现类对象
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
private final InternalLogger log = ClientLogger.getLog();
/**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
* important when transactional messages are involved. </p>
*
* For non-transactional messages, it does not matter as long as it's unique per process. </p>
*
* See {@linktourl http://rocketmq.apache.org/docs/core-concept/} for more discussion.
*/
// 生产者组(发送事务消息,broker端进行事务回查时,可以选择 当前 生产者组下的任意一个生产者 进行 事务回查)
private String producerGroup;
/**
* Just for testing or demo program
*/
// TBW102 :broker写死的主题队列信息,当发送消息指定的topic在 nm 未找到路由信息,则使用 该TBW102 作为 模板 去创建 主题发布信息。
private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
/**
* Number of queues to create per default topic.
*/
// 默认broker每个主题 创建的 队列数量
private volatile int defaultTopicQueueNums = 4;
/**
* Timeout for sending messages.
*/
// 发送消息超时限制 默认:3s
private int sendMsgTimeout = 3000;
/**
* Compress message body threshold, namely, message body larger than 4k will be compressed on default.
*/
// 压缩阈值,当msg body 超过 “4k” 后,选择使用压缩。
private int compressMsgBodyOverHowmuch = 1024 * 4;
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
// 同步发送失败之后,重试发送次数:2 再加上第一次发送 =》 3
private int retryTimesWhenSendFailed = 2;
/**
* Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
// 异步发送失败之后,重试发送次数:2
private int retryTimesWhenSendAsyncFailed = 2;
/**
* Indicate whether to retry another broker on sending failure internally.
*/
// 消息未存储成功,是否选择其它broker节点进行消息重试? 一般需要设置为true。
private boolean retryAnotherBrokerWhenNotStoreOK = false;
/**
* Maximum allowed message size in bytes.
*/
// 消息体最大限制,默认值:4mb
private int maxMessageSize = 1024 * 1024 * 4; // 4M
其中MQProducer是生产者接口,定义生产者发送消息的一些规范
ClientConfig是客户端配置类,继承ClientConfig说白了就是共享一些通用的客户端配置属性和功能
-
构造方法
-
start方法
最终还是调用的是defaultMQProducderImpl.start方法 -
send方法
最终调用的还是defaultMQProducderImpl.send方法
DefaultMQProducerImpl详解
public class DefaultMQProducerImpl implements MQProducerInner {
private final InternalLogger log = ClientLogger.getLog();
// 生成invokeID,无实际业务意义,打印日志使用
private final Random random = new Random();
// 生产者门面对象,在这里主要当做 config 使用。
private final DefaultMQProducer defaultMQProducer;
// 主题发布信息映射表
// key:主题
// value:主题的发布信息
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
// 发送消息的钩子,留给用户扩展框架使用的。
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
// rpc Hook 最终会传递给 NettyRemotingClient。 留给用户扩展框架使用的。
private final RPCHook rpcHook;
// 异步发送消息,异步任务线程池使用的队列
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
// 缺省的异步送法消息线程池
private final ExecutorService defaultAsyncSenderExecutor;
// 定时任务,执行:RequestFutureTable.scanExpiredRequest();
private final Timer timer = new Timer("RequestHouseKeepingService", true);
protected BlockingQueue<Runnable> checkRequestQueue;
protected ExecutorService checkExecutor;
// 状态
private ServiceState serviceState = ServiceState.CREATE_JUST;
// 客户端实例对象,生产者启动后 需要注册到该客户端对象内。(观察者模式)
private MQClientInstance mQClientFactory;
// 注意和 SendMessageHook 区别,它可以抛异常,控制消息 是否发送。
private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
// zip 压缩算法 压缩级别,默认:5
private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
// 选择队列容错策略
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
// 异步消息发送线程池,如果指定的话,就不在选择使用defaultAsyncSenderExecutor这个线程池了..
private ExecutorService asyncSenderExecutor;
-
构造方法
-
start方法
/**
* 正常路径:startFactory => true
*/
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// 修改状态为 启动失败,后面启动成功后会修改...
this.serviceState = ServiceState.START_FAILED;
// 判断生产者组名,不能是空,也不能是 “DEFAULT_PRODUCER”
this.checkConfig();
// 条件成立:说明当前生产者 不是 内部生产者 (什么是内部生产者? 处理消息回退这种情况使用的生产者 )
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
// 修改生产者实例名称为:当前进程的 PID
this.defaultMQProducer.changeInstanceNameToPID();
}
// todo 获取当前进程的RocketMQ 客户端实例对象
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 将生产者自己注册到 RocketMQ 客户端实例内 (观察者模式)
// todo 将生产者自己注册到rockemt 客户端实例
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 添加一个主题发布信息
// key:TBW102 value:空对象
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
// 启动RocketMQ 客户端实例对象 入口:
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 设置生产者实例状态为:运行中
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 强制RocketMQ 客户端实例 向 已知的broker节点 发送一次心跳。(讲 客户端定时任务时 再聊..)
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// request 发送的消息 需要 消费者 回执一条消息。
// 怎么实现的呢?
// 生产者 msg 加了一些信息 关联ID 客户端ID ,发送到 broker 之后
// 消费者 从 broker 拿到 这条 消息,检查msg 类型 发现 是一个 需要回执的消息
// 处理完消息 之后,根据 msg 关联ID 和 客户端ID 生成一条消息 (封装响应给 生产者的结果) 发送到 broker
// Broker 拿到这条消息之后,它知道这是一条 回执消息,根据 客户端ID 找到 ch ,将消息 推送给 生产者。
// 生产者 这边 拿到 回执消息之后,读取出来 关联ID 找到 对应的 RequestFuture ,将阻塞的线程 唤醒。
// 类似于 生产者 和 消费者 之间 进行了 一次 RPC ,只不过 中间 由 broker 代理完成的。
// 定时任务 处理 回执太慢的情况..
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
- send方法
// 同步发送
// 参数1:msg
// 参数2:发送模式 (同步)
// 参数3:回调 同步发送时,不需要提供这个参数。 只有异步时才需要!
// 参数4:发送超时时间(默认 3秒 )
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 确定生产者状态 是 运行中,否则抛异常。
this.makeSureStateOK();
// 判断消息规格
Validators.checkMessage(msg, this.defaultMQProducer);
// 生成一个 调用ID,打日志使用
final long invokeID = random.nextLong();
// 发送的初始时间
long beginTimestampFirst = System.currentTimeMillis();
// 本轮发送开始时间
long beginTimestampPrev = beginTimestampFirst;
// 本轮发送结束时间
long endTimestamp = beginTimestampFirst;
// 获取当前消息 主题的发布信息,需要依赖它里面的 MessageQueues 信息,选择 一个队列 后面去发送消息使用。
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
// 是否超时
boolean callTimeout = false;
// 选中的队列
MessageQueue mq = null;
// 异常
Exception exception = null;
// 发送结果
SendResult sendResult = null;
// timesTotal 发送总尝试次数,同步模式发送时:1 + "2" = 3 ,异步情况 重试次数: 1
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
// 当前第几次发送
int times = 0;
// 下标值 代表 发送的第几次, 值 代表 这次发送 选择的 brokerName
String[] brokersSent = new String[timesTotal];
// 循环发送,什么时候跳出循环? 1. 发送成功 2. 发送尝试次数 达到上限
for (; times < timesTotal; times++) {
// 上次发送时的 brokerName ,第一次发送时 lastBrokerName 值为 null,其它情况 就是 上次发送时的 BrokerName
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 从 主题发布信息中 选择一个 队列
// 参数1:主题发布信息
// 参数:上次发送失败的BrokerName
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
// 条件成立:说明已经选择出来一个 可以 发送的 MessageQueue
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
// 记录 本轮发送的开始时间
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
// 计算执行到这里时的耗时,如果已经超过 timeout 限制,则直接不发送消息了,跳出循环。
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 参数1:msg
// 参数2:mq,选择的队列
// 参数3:发送模式 (可选 同步 或者 异步 或者 单向)
// 参数4:异步发送时 需要传递一个 回调处理对象,同步 或者 单向时 这里为null
// 参数5:主题发布信息
// 参数6:计算出一个剩余的超时限制
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
// 本轮发送结束时间
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
// 异步 或者 单向,直接返回null
// 异步:返回值由sendCallback和回调线程处理。
// 单向:服务器不返回任何数据
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
// 条件成立:说明 服务端broker 存储失败..
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 消息未存储成功,是否选择其它broker节点进行消息重试? 一般需要设置为true。
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
// 重试
continue;
}
}
// 正常 从这里返回
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
// 执行到这里 会有几种情况? 3种情况
// 1. for循环尝试发送了 几次 都没能发送成功
// 2. sendKernelImpl方法异常,此时 sendResult 是null
// 3. 发送超时
if (sendResult != null) {// 1. for循环尝试发送了 几次 都没能发送成功
return sendResult;
}
// 2. sendKernelImpl方法异常,此时 sendResult 是null
// 3. 发送超时
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) { // 3. 发送超时
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
// 2. sendKernelImpl方法异常,此时 sendResult 是null
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
validateNameServerSetting();
// 未找到当前主题的路由数据异常...没办法发送消息。
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
总结一下做了几件事:
-
获取当前消息 主题的发布信息
-
从主题发布信息中,选择一个队列
选的时候,用的算法就是一个自增值%队列数目 -
调用sendKernelImpl
// 参数1:msg
// 参数2:mq,选择的队列
// 参数3:发送模式 (可选 同步 或者 异步 或者 单向)
// 参数4:异步发送时 需要传递一个 回调处理对象,同步 或者 单向时 这里为null
// 参数5:主题发布信息
// 参数6:计算出一个剩余的超时限制
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 开始时间
long beginStartTime = System.currentTimeMillis();
// 获取指定brokerName的主机地址 master 节点 addr
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
// 执行sendMessageHook 时 使用的 context
SendMessageContext context = null;
if (brokerAddr != null) {
// 如果客户端开启了 走 VIP 通道的话,broker地址的 端口 将是 VIP 通道的端口号。
// broker 启动的时候,会绑定两个服务器端口,一个是 普通端口 一个是 VIP 端口。(服务器端 根据 不同端口 创建 的NioSocketChannel 提供 线程
// 资源 不是同一组。)
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
// 获取消息体
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
/**
* msgId 由 前缀 + 内容 组成:
* 前缀
* ip 地址,进程号,classLoader 的 hashcode
* 内容
* 时间差(当前时间减去当月一日),计数器
*/
// 给消息生成唯一ID,即在 msg.properties.put("UNIQ_KEY", "msgId")
// 服务器 broker 会给 消息 按照 UNIQ_KEY 建立一个 hash索引。
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
// 返回true 说明消息 已经压缩了
if (this.tryToCompressMessage(msg)) {
// 设置标记位 ,表明此条消息 被压缩过。
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
// 用户扩展点,用户可以注册 CheckForbiddenHook 控制消息发送。
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
// 用户扩展点,执行 msgHook.before 方法。(比如实现 监控埋点...)
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// 生产者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// 消息主题
requestHeader.setTopic(msg.getTopic());
// 缺省主题:TBW102
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
// 生产者主题队列数:4
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
// 选中的消息队列 队列ID
requestHeader.setQueueId(mq.getQueueId());
// 系统标记变量
requestHeader.setSysFlag(sysFlag);
// 消息创建时间
requestHeader.setBornTimestamp(System.currentTimeMillis());
// 消息 flag
requestHeader.setFlag(msg.getFlag());
// 消息properties
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
// 消息重试的逻辑...以后讲消费时 再说。
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
// 发送结果
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
// 单向 和 同步 都是走这里!
// 当前耗时,如果已经大于 timeout 限制,则抛异常,不再发送消息了...
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 获取API对象,调用它的 发送方法,完成发送!
// 参数1:broker地址
// 参数2:brokerName
// 参数3:消息
// 参数4:SendMessageRequestHeader
// 参数5:剩余的超时限制
// 参数6:发送模式
// 参数7:context
// 参数8:生产者对象
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
这里做了几件事:
-
获取指定brokerName主机地址master节点地址,在MQClientInstance里面存放了broker 映射表,具体内容如下:
通过brokerName就知道了broker节点地址 -
创建了requestHeader,带了很多发送message的元数据
-
调用mQClientFactory.getMQClientAPIImpl().sendMessage()发送消息
MQClientInstance 详解
在sendMessage的过程中一直用到这个类,我们来讲讲这个类是做什么用
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final InternalLogger log = ClientLogger.getLog();
// 客户端配置
private final ClientConfig clientConfig;
// 索引值,一般是0,因为 客户端实例 一般都是一个进程 只有一个。
private final int instanceIndex;
// 客户端ID,ip@pid
private final String clientId;
// 客户端启动时间
private final long bootTimestamp = System.currentTimeMillis();
// 生产者 消费者 映射表,key:组名 value:生产者 或者 消费者
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
// 客户端网络层(netty)配置
private final NettyClientConfig nettyClientConfig;
// 核心的一个API实现,它几乎包含了 所有 服务端的API,它的作用就是将 MQ业务层的数据 转换为 网络层 RemotingCommand 对象,
// 然后使用内部的 NettyRemotingClient 的invoke 系列方法 完成 网络IO。
private final MQClientAPIImpl mQClientAPIImpl;
private final MQAdminImpl mQAdminImpl;
// 客户端本地路由数据
// key:主题名称 value:主题路由数据
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
private final Lock lockNamesrv = new ReentrantLock();
private final Lock lockHeartbeat = new ReentrantLock();
// broker 物理节点映射表
// key: brokerName 逻辑层面的东西
// value: map<long /* brokerId 0 的节点是 master,其它的是slave*/, String /* addr ip:port*/>
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
// broker 物理节点版本映射表
// key:brokerName 逻辑层面的东西
// value:map<String /* addr 物理节点地址*/, Integer /* 版本号 */>
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>();
// 单线程的调度线程池,用于执行定时任务
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MQClientFactoryScheduledThread");
}
});
// 客户端协议处理器,用于处理 IO 事件
private final ClientRemotingProcessor clientRemotingProcessor;
// 拉消息服务
private final PullMessageService pullMessageService;
// 消费者负载均衡服务
// todo 消费者负载均衡 源头
// 1、内部线程调用rebalanceService 2、查询 consumerTable中所有的消费者,然后调用消费者的doReblance 3、最后调用消费者的rebalanceImpl.doRebalance
private final RebalanceService rebalanceService;
// 内部生产者实例,用于处理 消费端 消息回退。
private final DefaultMQProducer defaultMQProducer;
private final ConsumerStatsManager consumerStatsManager;
// 心跳次数统计
private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
// RocketMQ 客户端状态
private ServiceState serviceState = ServiceState.CREATE_JUST;
通过属性其实也看的出来,这个类做的几件事:
- 缓存生产者、消费者等数据:
其中DefaultMQProducerImpl就是MQProducerInner的实现类
3. 缓存客户端路由数据:ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable
-
缓存borker元数据:ConcurrentMap<String/* Broker Name /, HashMap<Long/ brokerId /, String/ address */>> brokerAddrTable
-
客户端统一rpc:MQClientAPIImpl mQClientAPIImpl,使用他来发送消息的
MQClientAPIImpl
客户端与server通信的实现,通过名字就能看得出来
public class MQClientAPIImpl {
private final static InternalLogger log = ClientLogger.getLog();
private static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
static {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
}
// 客户端网络层对象,管理 客户端 与服务器 之间 连接 NioSocketChannel 对象。
// 通过 它 提供的 invoke 系列方法,客户端可以与服务端进行远程调用。
// 服务器 也可以 直接 调用 客户端
private final RemotingClient remotingClient;
private final TopAddressing topAddressing;
private final ClientRemotingProcessor clientRemotingProcessor;
private String nameSrvAddr = null;
还是继续之前的sendMessage流程:
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
// 在封装RemotingCommand, 很有意思的东西,后面学习网络通信的可以再研究
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
// 将消息的消息体 放到 网络传输层的body中。
request.setBody(msg.getBody());
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
// 当前耗时,如果已经超过 超时限制,则异常
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
// 进行同步调用,将消息传递到 broker ,broker完成存储后 或者其他 情况 都会返回。
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
}
这里会使用remotingClient.invokeSync(客户端网络通信)
最终会使用netty 将request writeAndFlush