生产者已经被启动了,接下来我们就得研究研究如何去发送消息给Broker。
前面分析Producer的启动逻辑,启动完成之后就是发消息,接下来我们就来分析Producer的send消息过程,同时发消息过程中存在一些问题以及解决方法也得考虑。
查看源码我们发现,Producer提供了很多方法发送消息,总结后可以通过不同的角度来看待,比如:以消费类型分为普通消息与事务消息,以发送方式分为同步发送,异步发送和单向发送,以消费范围分为单个消息与批量消息,今天我们主要分析的是发送方式,当然其中批量消息与单个消息基本类似就是一次发送的数量的大小而已,事务消息是RocketMQ的特性后期单独分析。
消息发送方式
单向消息
单向消息简单理解就是单方面发送不需要等任何回应,就相当于一个人在做演讲不需要了解听众是否认真接收。
// 自定义消息发送到那个队列的逻辑
public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, InterruptedException {
// 根据namespace设置topic
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
}
// 指定队列
public void sendOneway(Message msg,
MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
// 根据namespace设置topic
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.sendOneway(msg, queueWithNamespace(mq));
}
// 单纯发送msg
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
// 根据namespace设置topic
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.sendOneway(msg);
}
MessageQueueSelector该接口表示发送的Queue,我们可以自定义或者使用默认的,默认提供三种方式,一种为不断新增,轮询方式,一种随机选择一个队列,还有一种未实现返回null,同时我们可以自定义实现该逻辑比如:后面我们要分析的同一订单我们需要发送到同一个队列。
该方法内部调用sendDefaultImpl方法设置消息发送模式为ONEWAY,当我们需要实现日志的记录时可以使用这种方式,可以存在遗漏但是效率要快。
同步消息
同步消息存在返回值,必须你确定发送到了才会结束,简单理解打电话一个说话一个人回答。
// 同步发送消息,该方法会阻塞直到消息发送成功
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
// 同步发送消息,指定超时时间
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, timeout);
}
// 同步发送消息,指定发送队列
public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq));
}
// 同步发送消息,指定发送队列,超时时间
public SendResult send(Message msg, MessageQueue mq, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), timeout);
}
该方法内部调用sendDefaultImpl方法设置消息发送模式为SYNC,当我们需要强一致性,肯定效率上来说就得降低了,必须等待响应结果。
异步消息
异步消息结合单向发送与同步发送,在发送消息后立即返回的同时异步接收传递过去的消息状态,简单理解就是相当于发送微信,我发过去了,可能别人还在看,看完了就会回复你。
// 异步消息,并注册回调。
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, sendCallback);
}
// 异步发送消息,并注册回调,指定超时时间
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}
// 异步发送消息,并注册回调,指定队列
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback);
}
// 异步发送消息,并注册回调,指定队列,指定超时时间
public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), sendCallback, timeout);
}
该方法内部调用sendDefaultImpl方法设置消息发送模式为ASYNC,使用线程池异步的执行sendDefaultImpl方法,发送之前会计算超时时间,如果时间超时则不发送,直接执行回调函数onException方法。
消息发送过程
通过上面的任何发送方法都会到DefaultMQProducerImpl类的sendDefaultImpl()方法处理。
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
// 检查消息
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 获取topic
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 重试次数(可以配置)
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
// 进行重试(在循环中,发送消息,包含消息重试的逻辑)
for (; times < timesTotal; times++) {
// 这里表示lastBrokerName不为空一定是之前发送的一次,重试不会切换broker
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 找到MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
// 同步消息才会有重试
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
}
// ...
}
if (sendResult != null) {
return sendResult;
}
// ...
throw mqClientException;
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
主要包括:消息验证,查找路由,选择消息队列(可故障转移机制),消息发送。
消息验证
调用checkMessage方法主要验证topic和内容。
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic
Validators.checkTopic(msg.getTopic());
Validators.isNotAllowedSendTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
// 长度不等于0
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
// maxMessageSize 默认大小4M
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
检查topic:检查topic是否为空,检查长度是否大于127,检查是否只包含数字和字母,检查是否与系统预留主题相同。
检查消息体:不能为null,长度不能等于0,消息长度不能大于maxMessageSize(默认大小4M)。
查找路由
根据topic查询对应的路由信息即broker。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// nameServer的缓存(只要Producer启动了,不存在nameServer也可以发消息)
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
从路由缓存topicPublishInfoTable里面获取,如果topicPublishInfoTable没有就需要到nameServer查询并更新到本地缓存路由表当中,如果可用就使用,没有则使用默认的topic( “TBW102” )去NameServer里面去并更新到本地缓存路由。
注意:当NameServer启动了在本地缓存中topicPublishInfoTable里面就存在值了所以如果此时NameServer宕机了也不会影响发送消息。
选择消息队列
选择一个可用的消息队列。
// lastBrokerName上次使用的broker
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//延迟故障转移,默认值是false,可以进行配置。
if (this.sendLatencyFaultEnable) {
// 开启是为了实现消息的高可用。
// 第一优先选择延迟慢的broker更快发送成功,第二点不会选择上次判断有问题的broker
try {
int index = tpInfo.getSendWhichQueue().incrementAndGet();
// 轮询获取队列
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 如果broker可以直接返回
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 如果没获取无故障的mq,则在faultItemTable取出一个最好的mq
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
// 获取写队列数量
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 大于0则选择
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
// 小于0就移除faultItemTable
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
// 如果上面抛出异常或未选择出就轮询选择
return tpInfo.selectOneMessageQueue();
}
//如果进到这里lastBrokerName不为空,那么表示上一次向这个Broker发送消息是失败的,这时就尽量不要再往这个Broker发送消息了。
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
判断是否开启延迟故障转移,如果没有开启直接轮询去获取Broker,如果开启了首先去轮询找无故障的mq,如果没有找到就从faultItemTable取出一个最好的mq,而faultItemTable是从发消息时候根据每个broker对消息的处理情况进行存入的,取到了直接返回,如果还是没有取到就直接去轮询获取一个与lastBrokerName不一致的Broker。
消息发送
根据你发送方法不同,单向的不会重试而同步默认重试3次(但是可以配置,次数=配置参数+1)和异步会重试2次(但不是这个方法里面实现)。注意发生RemotingException、MQClientException、以及部分MQBrokerException异常时也会进行重试,但是如果InterruptedException或超时则直接终止。
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,final CommunicationMode communicationMode,
final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
// 查询broker,生产者只会往Master节点发,所以取出来也只会是Master节点
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
// 判断VIP通道
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
// 不是批量消息就确定唯一的unipId
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
// 将nameserver设置为实例Id
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
// 尝试压缩消息,如果是批量不做压缩,如果小于4k不做压缩
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= compressType.getCompressionFlag();
msgBodyCompressed = true;
}
// 事务消息标志
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
//...省略
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// ...省略
// 判断是否为重试消息,设置相关属性
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
switch (communicationMode) {
// 异步消息
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
// 还原压缩消息
if (msgBodyCompressed) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
// 取出topic
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
// 判断是否超时
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 发生异步消息(此处设置重试次数为2)
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,
timeout - costTimeAsync,communicationMode,sendCallback,
topicPublishInfo,this.mQClientFactory,
// 默认重试次数2
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,this);
break;
// 单向消息
case ONEWAY:
// 同步消息
case SYNC:
// 检查超时
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,mq.getBrokerName(),msg,requestHeader,
timeout - costTimeSync,communicationMode,context,this);
break;
default:
assert false;
break;
}
// 钩子函数
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
// 合并源码异常
} catch (RemotingException | MQBrokerException | InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
// 前面消息进行压缩,topic进行合并这里需要还原供客户端查看
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
调用findBrokerAddressInPublish方法从brokerAddrTable中查找broker的Master主节点地址,如果没找到就去远程拉取然后再更新本地缓存。
判断是否开启VIP通道,生成唯一的uniqId(前提不是批量消息),对消息进行压缩(前提其一不是批量消息,其二大小得大于4K)。
设置请求头信息SendMessageRequestHeader,并执行发送消息,不管执行成功都会将压缩的消息与topic进行还原,供客户端查看。