RocketMq中提供两种消费模式:集群模式和广播模式。
集群模式
集群模式表示同一个消息会被同一个消费组中的消费者消费一次,消息被负载均衡分配到同一个消费者上的多个实例上。
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:
需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。
通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。
但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。
广播模式
广播消息表示同一个消息会推送到集群里面所有的消费者,保证消息至少被每个消费者消费一次。
默认情况下是使用集群模式,Broker端会给每一个消费组维护一个统一的offset,这个offset能够保证一个消息在消费组里面只会被消费一次,而广播模式的实现方式,是将本来由Broker保管的offset交个消费者自行保管,而Broker只管往消费者推送消息即可。
注意事项:
01
由于offset由消费者自行保存和记录,Broker只管推送消息,如果消息失败了就不会存在重试的能力。
02
消费者维护的offset是可以在服务重启时,按照上一次消费的进度处理后面没有处理的消息不会影响服务器的性能,但是如果消费者的offset丢失了,消费者的服务可以正常运行但是此时未消费的消息就不能申请了,只能申请后面推送的。
03
offset文件会存放在本地,当然这里面存在很多的坑。
各场景源码分析
广播消息与集群消息作为RocketMQ的两大类型,存在以下几点差异,首先我们通过DefaultMQPushConsumerImpl类的start方法启动消费者。
01
广播模式不支持消息的重试。
private void copySubscription() throws MQClientException {
try {
//...
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 判断消息模式,如果是广播直接跳出不进行重试
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
02
广播模式使用本地offset缓存(start方法)。
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 广播模式(本地的Offset文件)
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
// 集群模式 (Broker里面的offset)
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
03
没有所谓的负载均衡(执行rebalanceByTopic方法)。
case BROADCASTING: {
// 根据topic获取MessageQueue集合
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
04
不支持顺序消息:由于关闭消息是所有的消费者都需要去消费。
// 顺序消息更新偏移量
if (processQueue.isLocked()) {
if (!pullRequest.isPreviouslyLocked()) {
long offset = -1L;
try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
} catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return;
}
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setPreviouslyLocked(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
05
并发消息消费时也没有重试(执行processConsumeResult方法时)。
// 单纯打印信息
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;