并行消费和顺序消费
ConsumeMessageService是一个通用的消费服务接口,它包含两个实现类org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService和
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService,这两个实现类分别用于并发消费和顺序消费
核心方法
- start()方法和shudown()方法分别在启动和关闭服务时使用
- updateCorePoolSize():更新消费线程池的核心线程数
- incCorePoolSize():增加一个消费线程池的核心线程数
- decCorePoolSize():减少一个消费者线程池的核心线程数
- getCorePoolSize():获取消费线程池的核心线程数
- consumeMessageDirectly():如果一个消息已经被消费过了,但是还项再消费一次,就需要实现这个方法
- submitConsumeRequest():将消息封装成线程池任务,提交给消费服务,消费服务再将消息传递给业务消费进行处理
1.ConsumeMessageService消息消费分发。ConsumeMessageService服务通过
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest接口接收消息消费任务后,将消息按照固定条数封装成多个ConsumeRequest任务对象,并发送到消费线程池,等待分发给业务消费;ConsumeMessageOrderlyService先将Pull的全部消息放在一个本地队列中然后提交一个ConsumeRequest到消费者线程池
ConsumeMessageConcurrentlyService
ConsumeMessageOrderlyService
2.消费消息。消费的主要逻辑再ConsumeMessageService接口的两个实现类中,以并发消费为例.
消费消息主要分为消费前预处理、消费回调、消费结构统计、消费结果处理4个步骤
第一步:消费执行前进行预处理。执行消费前的hook和重试消息预处理。消费前的hook可以理解为消费前的消息预处理(比如消息格式校验)。如果拉取的消息来自重试队列,则将Topic重置为原来的Topic,而不用重试Topc名
第二步:消费回调。首先设置消息开始消费时间为当前时间,再将消息列表转为不可修改的List,
然后通过status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
方法将消息传递给用户编写的业务消费代码进行处理
第三步:消费结果统计和执行消费后的hook.客户端原声支持基本消费指标统计,比如消费耗时;消费后的hook和消费前的hook要一一对应,
用户可以用消费后的hook统计与自身业务相关的指标
第四步:消费结果处理。包含消费指标统计、消费重试处理和消费位点处理。消费指标主要是对消费成功和失败的TPS的统计;消费重试处理主要将消费重试次数+1;消费位点处理主要根据消费结果更新消费位点记录
RocketMQ是一个消息队列,FIFO先进先出规则如何再消费失败时保证消息的顺序呢?
可以从消费任务实现类ConsumeRequest和本地缓存队列ProcessQueue的设计来看主要差异
并发消费
顺序消费
顺序消息的ConsumeRequest中并没有保存需要消费的消息,再顺序消费时通过调用ProcessQueue.takeMessage()
方法获取需要消费的消息,而且消费也是同步进行的。
takeMessages()方法实现
msgTreeMap:是一个TreeMap<Long, MessageExt>类型,key是物理位点值,value是消息对象,该容器是ProcessQueue用来缓存本地顺序消息的,保存的数据是按照key(就是物理位点值)顺序排列的
consumingMssgOrderlyTreeMap:是一个TreeMap<Long,MessagExt>类型,key是消息物理位点值,value是消息对象,保存当前正在处理的顺序消息集合,是msgTreeMap的一个子集,保存的数据是按照key(就是物理位点值)顺序排列的
batchSize:一次从本地缓存中获取多少条消息回调给用户消费。顺序消息是如何通过ProcessQueue.takeMesages()
获取消息给业务代码消费的呢?
从msgTreeMap中获取batchSize数量的消息放入consumingMsgOrderlyTreeMap中,并返回给用户消费,
由于当前的MessageQueue是被Synchronized锁住的,并且获取的消费消息也是按照消费位点顺序排列的,
所以消费时用户能按照物理位点顺序消费消息
如果消费失败,又是怎么保证顺序的呢?来看processConsumeResult()实现
RocketMQ支持自动提交offset和手动提交offset两种方式。以自动提交offset为例,手动提交与其完全一致,先看入参
msg:当前处理的一批消息
status:消费结果的状态。目前支持SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT两种状态
消费成功后,程序会执行commit()方法提交当前位点,统计消费成功的TPS。消费失败后,程序会统计消费失败的TPS,通过执行makeMessageToCOnsumeAgin()方法,删除消费失败的消息,通过定时任务将消费失败的消息在延迟一定时间后,重新提交到消费线程池
makeMessagToConsumeAgin()方法将消息从consumingMsgOrderlyTreeMap中删除再重新放入本地缓存度列msgTreeMap中,等待下次被重新消费
submitConsumeRequestLater()方法会执行一个定时任务,延迟一定时间后重新将消费请求发送到消费线程池中,以供下一轮消费
做完这两个操作后,试想以下,消费线程在下一次消费时会发生什么事情?如果是从msgTreeMap中获取一批消息,
那么返回的消息又是那些呢?消息物理位点最小的,也就是之前未成功消费的消息,如果顺序消息消费失败,会再次投递给消费者消费,
直到消费成功,以此来保证顺序性