前言
消息消费者 MQConsumer 即消息的消费方,主要负责消息消息生产者 MQ Producer 投递的消息。它的源码架构如下图,以常用的消费者实现类 DefaultMQPushConsumer 视角分析消费者的源码架构,介绍消费者核心数据结构。
DefaultMQPushConsumer 内部依赖了 DefaultMQPushConsumerImpl 核心实现类,DefaultMQPushConsumerImpl 组件串联了消费者其他组件,如:RebalanceImpl 重平衡组件、OffsetStore偏移量存储组件、MessageListener消息监听组件、ConsumeMessageService 消息消费组件、PullAPIWrapper 消息拉取API组件,并实现了消费者常用功能:发送心跳、拉取消息、消费消息、提交偏移量、与远程broker和NameSrv网络通信。
这里只记录了消费者核心数据结构,细节功能实现,比如:发送心跳、注册消费者实例、长轮询拉取消息、并发消费消息、顺序消费消息,在后续其他文章中详细介绍。
源码版本:4.9.3
源码架构图
核心数据结构
DefaultMQPushConsumerImpl 默认消费者实现组件
// 默认消费者实现组件
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
// 重要:重平衡组件
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
// 网络通信客户端组件,重要
private MQClientInstance mQClientFactory;
// 拉取消息API组件,重要
private PullAPIWrapper pullAPIWrapper;
// 消息监听组件
private MessageListener messageListenerInner;
// 偏移量存储组件
private OffsetStore offsetStore;
// 消息消费服务组件
private ConsumeMessageService consumeMessageService;
}
PullAPIWrapper 拉取消息API组件
// 拉取消息API组件
public class PullAPIWrapper {
// 拉取消息broker节点映射表,key为MessageQueue,value为brokerId
private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
// 网络通信客户端组件
private final MQClientInstance mQClientFactory;
}
RebalanceImpl 重平衡组件
// 重平衡组件实现
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
// 远程消息队列和处理队列映射表,key:MessageQueue,value:ProcessQueue
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
// topic 和 消息队列映射表,key:topic,value:MessageQueue集合
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
new ConcurrentHashMap<String, Set<MessageQueue>>();
// 内部订阅关系映射表,key:topic,value:SubscriptionData
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
new ConcurrentHashMap<String, SubscriptionData>();
// 负载均衡策略, 临近机房、同机房、轮询、一致性hash
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
}
RemoteBrokerOffsetStore 偏移量存储组件
public class RemoteBrokerOffsetStore implements OffsetStore {
// 消费偏移量映射表,key为MessageQueue,value为offset
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
}
ConsumeMessageService 消息消费服务接口
public interface ConsumeMessageService {
// 启动
void start();
// 关闭
void shutdown(long awaitTerminateMillis);
// 更新核心线程池大小
void updateCorePoolSize(int corePoolSize);
// 增加核心线程池大小
void incCorePoolSize();
// 减少核心线程池大小
void decCorePoolSize();
// 获取核心线程池大小
int getCorePoolSize();
// 消费消息
ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
void submitConsumeRequest(
// 拉取到的消息
final List<MessageExt> msgs,
// 处理队列
final ProcessQueue processQueue,
// 拉取请求中的队列
final MessageQueue messageQueue,
// 是否需要派发到消费者
final boolean dispathToConsume);
}