RocketMQ的核心三流程
RocketMQ服务端由两部分组成NameServer和Broker,NameServer是服务的注册中心,Broker会把自己的地址注册到NameServer,生产者和消费者启动的时候会先从NameServer获取Broker的地址,再去从Broker发送和接受消息。
Producer将消息写入到RocketMQ集群中Broker中具体的Queue。
Comsumer从RocketMQ集群中拉取对应的消息并进行消费确认。
NameServer源码分析
NameServer整体流程
NameServer是整个RocketMQ的“大脑”,它是RocketMQ的服务注册中心,所以RocketMQ需要先启动NameServer再启动Rocket中的Broker。
- NameServer启动
启动监听,等待Broker、Producer、Consumer连接。Broker在启动时向所有NameServer注册,生产者发送消息之前从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行消息消费发送。消费者在订阅某个主题的消息之前从NameServer获取Broker服务器地址列表(有可能是集群),但是消费者选择从Broker中订阅消息,订阅规则由Broker配置决定。 - 路由注册
Broker启动后向所有NameServer发送路由及心跳信息。 - 路由剔除
一处心跳超时的Broker相关路由信息。NameServer与,每台Broker服务保持长连接,并间隔10s检查Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。这样就可以实现RocketMQ的高可用。
NameServer启动流程
启动NameServer
加载KV配置
构建NRS(Netty Remoting Server)通讯,接受路由、心跳信息
构建定时任务(剔除超时的Broker)
加载KV配置
核心解读NamesrvController类中createNamesrvController()
构建NRS通讯接收路由、心跳信息
构建定时任务剔除超时Broker
核心控制器会启动定时任务: 每隔10s扫描一次Broker,移除不活跃的Broker。
Broker每隔30s向NameServer发送一个心跳包,心跳包包含BrokerId,Broker地址,Broker名称,Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢?NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。
RocketMQ有两个触发点来删除路由信息:
-
NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。
-
Broker在正常关闭的情况下,会执行unregisterBroker指令这两种方式路由删除的方法都是一样的,都是从相关路由表中删除与该broker相关的信息。
在消费者启动之后,第一步都要从NameServer中获取Topic相关信息