文章目录
- 一、消息消费实现
- 二、消息消费过程
- 1、消息拉取
- 2、消息消费
- 1)提交消费请求
- 2)消费消息
一、消息消费实现
消息消费有2种实现,分别为:并发消费实现(ConsumeMessageConcurrentlyService)和顺序消费实现(ConsumeMessageOrderlyService)。本次以并发消费实现为切入进行探讨消息的消费流程。
二、消息消费过程
1、消息拉取
1)在消息服务PullMessageService中完成将消息从远程服务器拉取到本地,具体实现由DefaultMQPushConsumerImpl#pullMessage方法完成
//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage
private void pullMessage(final PullRequest pullRequest) {
//从MQClientInstance中获取内部实现类MQConsumerInner
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
//强转换成PUSH消息消费服务,然后消费消息
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
2)DefaultMQPushConsumerImpl#pullMessage方法中定义了回调实现,在成功拉取消息后,先将消息放到processQueue中,然后再提交消费请求(DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest
)异步完成消息消费。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 回调部分代码
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
// ......
//从服务器拉取到消息后回调 PullCallBack 回调方法后,先将消息放入到 ProccessQueue中,
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 然后把消息提交到消费线程池中执行,
// 也就是调用 ConsumeMessageService#submitConsumeRequest 开始进入到消息消费的事件中来
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// ......
}
}
}
// ......
};
2、消息消费
1)提交消费请求
pullMessage方法中回调提交消息消费(submitConsumeRequest),进入消息并发消费实现(ConsumeMessageConcurrentlyService),其实现代码如下:
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest方法
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
//异步线程池中执行
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
//提交异常,延迟5S再提交
this.submitConsumeRequestLater(consumeRequest);
}
} else {
//超过最大数量,分批
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
submitConsumeRequest方法中,
- 先获取单次消费消息最大条数(consumeBatchSize,默认1条)
- 如果本次提交消息条数小于等于单次消费消息最大条数,则直接创建ConsumeRequest并提交到线程池(consumeExecutor)中执行
- 如果超过单次消费消息最大条数,则按consumeBatchSize分割分批提交
2)消费消息
ConsumeMessageConcurrentlyService中创建消息消费请求线程ConsumeRequest,然后提交到线程池。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
@Override
public void run() {
//在进行消息重新负载时如果该消息队列被分配给消费组内其他消费者,drop设置为true,阻止消费者消费不属于自己的消息队列
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
//类名.this:一般用于内部类需要使用其外部类的实例对象时候使用 ClassName.this 代表其外部类对象,直接写this则代表内部类本身对象
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
//恢复重试消息主题名
// RocketMQ将消息存入 commitlog 文件时,如果发现消息的延时级别 delayTimeLevel 大于0会
//首先将重试主题存人在消息的属性中,然后设置主题名称为 SCHEDULE TOPIC ,以便时间到后重新参与消息消费
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
//执行钩子
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
//内部消息监听器消费消息
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//后置钩子
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
//同步消息消费状态和offset
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
ConsumeRequest线程中,执行步骤如下
- 判断消费的队列是否dropped,如果为true,则停止直接终止该消费请求
- 恢复重试消息的topic和namespace
- 如果存在钩子函数,则执行前置钩子函数
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext)
- 调用消息监听器消费消息
listener.consumeMessage
(io.openmessaging.rocketmq.consumer.PushConsumerImpl.MessageListenerImpl) - 如果存在后置钩子,则执行后置钩子函数
- 消息消费结果处理