DefaultMQPushConsumer消息链路
-
DefaultMQpushConsumer#start方法调用DefaultMQpushConsumerImpl#start方法,接着内部调用MQClientInstance#start方法,接着调用RebalanceService#start方法。
-
RebalanceService#start方法开启一个线程,执行本类中的runnable#run方法。run方法中调用MQClientInstance#doRebalance方法,这个方法阻塞20秒,循环调用。
-
MQClientInstance#doRebalance方法中接着调用DefaultMQpushConsumerImpl#doRebalance方法。
-
DefaultMQpushConsumerImpl#doRebalance中接着调用RebalanceImpl#doRebalance方法。
-
RebalanceImpl#doRebalance方法接着调用本类方法rebalanceByTopic,根据负载均衡分配策略获取当前消费者组对应的messageQueues,接着调用本类方法updateProcessQueueTableInRebalance。
-
RebalanceImpl#updateProcessQueueTableInRebalance方法中。①内部属性processQueueTable维护messageQueue与processQueue的关系。②调用broker获取messagQueue的消息处理偏移量。每个队列封装参数为PullRequest(里面包含消费者组名称,下一个消息的偏移量,messageQueue,processQueue),调用本地方法dispatchPullRequest传入参数PullRequests。
-
RebalanceImpl#dispatchPullRequest方法。这是一个抽象方法,实现在RebanlancePushImpl中。在RebanlancePushImpl#dispatchPullRequest中,for循环遍历PullRequests,调用DefaultMQpushConsumerImpl#executePullRequestImmediately(pullRequest)方法执行。
-
DefaultMQpushConsumerImpl#executePullRequestImmediately方法中调用MQClientInstance的属性对象pullMessageService的方法executePullRequestImmediately,将pullReuqest存到到pullMessageServcie的内部属性队列pullMessageQueue中,这是一个LinkedBlockingQueue队列。
-
PullMessageServcie#start方法,开启线程执行本类中的run方法。run方法中①通过take获取pullMessageQueue中的pullRequest,②调用本地的pullMessage方法进行处理。
10. PullMessageServcie#pullMessage方法,通过MQClientInstance获取到consumer,我这里用的是DefaultMQpushConsumerImpl,调用DefaultMQpushConsumerImpl#pullMessage(pullRequest)方法。
11. DefaultMQpushConsumerImpl#pullMessage方法。①获取pullRequest中的messageQueue的主题关联的订阅者。②定义回调方法PullCallback。回到方法中,如果从broker拉取消息成功,会将消息放入到processQueue的msgTreeMap中③调用pullAPIWrapper#pullKernelImpl方法拉取消息。
12. PullAPIWrapper#pullKernelImpl方法,调用MQClientInstance的属性类MQClientAPIImpl的pullMessage方法。
13. MQClientAPIImpl#pullMessage方法。调用remotingCLient发送netty消息到broker。拿到消息后,调用步骤11中定义的PullCallback方法。
14. DefaultMQpushConsumerImpl#PullCallback方法,将消息封装到ConsumeRequest,提交到MessageListenerConcurrently的任务线程池中consumerExecutor。
15. ConsumeRequest本身实现runnable接口,实现run方法。ConsumeRequest是MessageListenerConcurrently的内部类。①调用消费者的监听器,将消息传入。②根据监听器类处理响应status,处理响应结果。
DefaultMQPushConsumer消息处理类
消息处理类ConsumeMessageService。
DefaultMQPushConsumer–>DefaultMQPushConsumerImpl#start(),根据注册的监听器类型实例化ConsumeMessageService。
(1) 注册的监听器是:MessageListenerOrderly,对应实例化的消息处理类:ConsumeMessageOrderlyService
(2) 注册的监听器是:MessageListenerConcurrently,对应实例化的消息处理类: ConsumeMessageConcurrentlyService
ConsumeMessageConcurrentlyService:
属性中包含了消费者配置的监听类messageListener,defaultMQPushConsumer,defaultMQPushConsumerImpl,consumeRequestQueue(请求阻塞队列)
。初始化了三个线程池。
(1)消息处理线程池consumerExecutor,线程池的核心线程,最大线程通过参数配置,setConsumeThreadMin,setConsumeThreadMax,主要处理正常接收到的消息。
(2) 还有两个定时执行线程池,一个用来执行推迟消费的消息,一个用来定期清理超时消息(15分钟)。
DefaultMQPushConsumer消息偏移量
消费者有两种模式,集群模式和广播模式。
广播模式
广播模式,偏移量的处理类offsiteStore的实现类LocalFileOffsetStore
。广播模式的偏移量是保存到本地。可以通过参数【rocketmq.client.localOffsetStoreDir
】进行配置。
OffsetStore的load方法读取上面的文件,如果读取失败或者文件内容为空,就会读取备份文件,路径是上面的文件名后面加.bak。这个load方法读取这个json文件,然后把内容读取到LocalFileOffsetStore类的offsetTable(ConcurrentHashMap)这个数据结构。
在拉取消息的时候,首先会封装PullRequest请求,PullRequest中nextOffset参数需要从offsetTable中获取。
MQClientInstance的start方法中会开启一个定时任务,默认时间是5秒,每5秒执行一次持久化,将offsetTable持久化到 本地文件。写文件时有以下几步:
1. 首先把内容写入到offsets.json.tmp文件。
2. offsets.json内容备份到offsets.json.bak。
3. 删除offsets.json文件。
4. 把offsets.json.tmp改名为offsets.json。
总结:广播模式下,偏移量保存在消费者本地服务器。
集群模式
集群模式,偏移量的处理类offsiteStore的实现类RemoteBrokerOffsetStore
。
集群模式下,偏移量是从Broker端获取,所以客户端RemoteBrokerOffsetStore中的load方法没有内容,是一个空方法。
集群模式下,偏移量保存在Broker服务器上。默认路径在/home/${user}/store/config/consumerOffset.json
。
集群模式下,消费者端也会定时持久化offsetStore,不过集群模式下这个方法会上报消费点到Broker服务上。Broker服务接收到请求会保存到本地的offsetTable本地缓存中,Broker服务启动也会开启一个定时任务,默认每5秒持久化offsetTable到磁盘文件。
RebalanceImpl的updateProcessQueueTableInRebalance在拉取消息的时候。都需要先计算nextOffset,这个计算方法是一个抽象方法,由其子类实现。我们就看看push里面的实现RebalancePushImpl里面的实现。
- 首先获取DefaultMQPushConsumer里面的配置consumerFromWhere
- 获取对应的offsetStore。我们用的集群模式,这里是RemoteBrokerOffsetStore
- 通过不同的类型,走不通的逻辑获取偏移量。我这里配置的是CONSUME_FROM_MAX_OFFSET,是通过RemoteBrokerOffsetStore的readOffset获取偏移量。参数类型是ReadOffsetType.READ_FROM_STORE
@Override
public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
long result = -1;
// 1. 获取DefaultMQPushConsumer里面的配置consumerFromWhere。
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getCons