目录
1、RocketMQ源码分析
1-1、读源码前的思考
1-2、RocketMQ整体架构及连通性
1-3、RocketMQ核心组件及整体流程
1-4、NameServer源码分析
1-4-1、RocketMQ核心组件及整体流程
1-4-2、NameServer启动流程概要
1-4-3、Broker启动流程概要
1-4-4、Topic路由注册、剔除机制
1-4-5、路由剔除机制编辑
1-4-6、NameServer的存储
1-4-7、客户端启动核心流程
1-4-8、客户端连接建立的时机
2、RocketMQ源码分析之Producer
2-1、Producer启动流程
2-2、Producer消息发送流程
2-2-1、验证消息
2-2-2、查找路由
2-2-3、选择队列
2-2-4、消息发送
2-2-5、选择队列源码深入分析
3、RocketMQ源码分析之Store
3-1、读Store源码前的思考
3-2、RocketMQ对比 Kafka
3-3、CommitLog之Message格式
3-4、Store架构设计之消息发送
3-4-1、Store层处理消息的入口
3-5、Store架构设计之消息消费
4、源码分析之堆外内存
4-1、开启条件及限制
4-2、TransientStorePool概要设计
4-3、与消息发送流程串联
4-4、两种方式的对比
5、源码分析之ConsumeQueue
5-1、消息发送时数据在ConsumeQueue的落地
5-2、入口:ReputMessageService.doReput(独立线程)
5-3、异步刷盘
6、源码分析之消息消费
6-1、Rebalance(针对集群消费模式)
6-2、触发时机
6-3、举例
6-4、源码解读
6-5、消费者源码解读
6-6、并发消费
6-7、顺序消费
6-8、消费中常见问题
6-8-1、重复消息
6-8-2、消费卡死
6-8-3、启动之后较长时间才消费
7、RocketMQ其他功能
7-1、消息轨迹
7-1-1、配置
7-1-2、保存消息轨迹的Topic定义
7-1-3、案例
7-1-4、关键属性
7-1-5、源码解读
7-2、权限控制
7-2-1、配置
7-2-2、案例实战
8、常见问题
8-1、MQ百万消息积压处理
8-2、消费幂等
8-2-1、消息重复的场景
8-2-2、什么是幂等性?
8-2-3、MVCC
8-2-4、去重表
9、RocketMQ性能优化
9-1、JVM层面
9-2、操作系统层面
9-2-1、基本参数
9-2-2、NIC
1、RocketMQ源码分析
1-1、读源码前的思考
- NameServer、Broker、Producer、Consumer的连通性
- Producer、Consumer连接的建立时机,有何关系?
- NameServer存储哪些信息,如何存储?
- Topic的持久化存储是在NameServer中还是在Broker?
1-2、RocketMQ整体架构及连通性
下图是一个典型的双主双从的架构,包括生产者和消费者。
NameServer集群中它们相互之间是不通讯。
- 生产者同一时间,与NameServer集群中其中一台建立长连接。
- 生产者与Broker之间的Master保持长连接。
- 消费者同一时间,与NameServer集群中其中一台建立长连接。
- 消费者与所有Broker建立长连接。
1-3、RocketMQ核心组件及整体流程
RocketMQ的源码的看起来很多,但是如果按照组件来划分的话,核心只要几个。如下图:
NameServer
- 命名服务,更新和路由发现 broker服务。
- NameServer 要作用是为消息生产者、消息消费者提供关于主题 Topic 的路由信息,NameServer除了要存储路由的基础信息,还要能够管理 Broker节点,包括路由注册、路由删除等功能。
Producer和Consumer
- java版本的mq客户端实现,包括生产者和消费者。
Broker
- 它能接收producer和consumer的请求,并调用store层服务对消息进行处理。HA服务的基本单元,支持同步双写,异步双写等模式。
Store
- 存储层实现,同时包括了索引服务,高可用HA服务实现。
Netty Remoting Server与Netty Remoting Client
- 基于netty的底层通信实现,所有服务间的交互都基于此模块。也区分服务端和客户端。
1-4、NameServer源码分析
1-4-1、RocketMQ核心组件及整体流程
1-4-2、NameServer启动流程概要
启动流程
- 从源码的启动可知,NameServer单独启动。
- 入口类:NamesrvController
- 核心方法:NamesrvController 类中main()->main0-> createNamesrvController->start() -> initialize()
步骤一
- 解析配置文件,填充NameServerConfig、NettyServerConfig属性值,并创建NamesrvController
- NamesrvController类中createNamesrvController方法
这里可以看出,如果你要查看各种参数,直接在启动个参数中送入 -p 就可以打印这个NameServer的所有的参数信息
同理,在启动日志中一定可以找到所有的参数:
步骤二
根据启动属性创建NamesrvController实例,并初始化该实例。NameServerController实例为NameServer核心控制器。
核心控制器会启动定时任务:
- 每隔10s扫描一次Broker,移除不活跃的Broker
- 每隔10min打印一次KV配置
NamesrvController 类中initialize()
步骤三
在JVM进程关闭之前,先将线程池关闭,及时释放资源最后:
1-4-3、Broker启动流程概要
入口类: BrokerController
核心方法:Broker向NameServer发送消息
1-4-4、Topic路由注册、剔除机制
路由注册与发现(读写锁,保证消息发送时的高并发)
消息发送时会获取路由信息,同时Broker会定时更新路由信息,所以路由表:
- 生产者发送消息时需要频繁的获取。对表进行读。
- Broker定时(30s)会更新一个路由表。对表进行写。
为了提高消息发送时的高并发(同时线程安全),这里维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
写锁
Broker每隔30s向NameServer报告自己还活着(包含很多信息)这里使用的是写锁。因为数据最终要写入NameServer的内存(使用的HashMap进行保存)读锁
生产者发送消息时,需要从向NameServer获取路由信息,这里使用读锁。MQClientAPIImplDefaultRequestProcessor. getRouteInfoByTopic()
RouteInfoManager. pickupTopicRouteData()
设计亮点
- 因为Broker每隔30s向NameServer发送一个心跳包,这个操作每次都会更新Broker的状态,但同时生产者发送消息时也需要Broker的状态,要进行频繁的读取操作。所以这个地方就有一个矛盾,Broker的状态会被经常性的更新,同时也会被更加频繁的读取。这里如何提高并发,尤其是生产者进行消息发送时的并发,所以这里使用了读写锁机制(针对读多写少的场景)。
- NameServer每收到一个心跳包,将更新brokerLiveTable中关于Broker的状态信息以及路由表( topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable)。更新上述路由表使用了锁粒度较少的读写锁,允许多个消息发送者(Producer)并发读保证消息发送时的高并发。但同一时刻 NameServer只处理一个 Broker心跳包,多个心跳包请求串行执行。这也是读写锁经典使用场景,更多关于读写锁的信息,可以参考《并发编程》。
1-4-5、路由剔除机制
Broker每隔30s向NameServer发送一个心跳包,心跳包包含BrokerId,Broker地址,Broker名称,Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢?NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。
RocketMQ有两个触发点来删除路由信息:
- NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。
- Broker在正常关闭的情况下,会执行unregisterBroker指令
这两种方式路由删除的方法都是一样的,都是从相关路由表中删除与该broker相关的信息。
1-4-6、NameServer的存储
可知NameServer存储以下信息:
- topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡
- brokerAddrTable:Broker基础信息,包括brokerName、所属集群名称、主备Broker地址
- clusterAddrTable:Broker集群信息,存储集群中所有Broker名称
- brokerLiveTable:Broker状态信息,NameServer每次收到心跳包是会替换该信息
- filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。
- NameServer的实现基于内存,NameServer并不会持久化路由信息,持久化的重任是交给Broker来完成。
1-4-7、客户端启动核心流程
DefaultMQProducer是MQProducer的唯一默认实现,其实现MQProducer接口的时候还继承了ClientConfig类 (客户端配置类),可以配置如sendMsgTimeout超时时间,producerGroup 生产者组最大消息容量和是否启用压缩等。
RocketMQ中消息发送者、消息消费者都属于”客户端“
- 每一个客户端就是一个MQClientInstance,每一个ClientConfig对应一个示例。
- 故不同的生产者、消费端,如果引用同一个客户端配置(ClientConfig),则它们共享一个MQClientInstance实例。
- MQClientInstance不是对外应用类,也就是说用户不需要自己实例化使用他。并且,MQClientInstance的实例化并不是直接new后使用,而是通过MQClientManager这个类型。MQClientManager是个单例类,使用饿汉模式设计保证线程安全。他的作用是提供MQClientInstance实例,RocketMQ认为,MQClientInstance的实例是可以复用的实例,只要client相关特征参数相同,就会复用一个MQClientInstance实例,我们可以看看源码
ClientConfig里的相关参数一致,这些Client会复用一个MQClientInstance,使用的时候需要注意,你的程序里的Client和MQClientInstance的对应关系。下面我们来看看RocketMQ对参数配置如何的Client复用一个MQClientInstance,简单来说就是IP@instanceName@unitName,这么一个串,其中IP我不解释了,InstanceName可以设置,Producer和Consucer都有相应的API设置,如果不设置使用缺省值即Client的PID,unitName不设置缺省为null,当然你可以DefaultMQProducer#setUnitName()这个方法设置这个值,这个方法是继承自ClientConfig类的。
1-4-8、客户端连接建立的时机
注:客户端(MQClientInstance)中连接的建立时机为按需创建,也就是在需要与对端进行数据交互时才建立的。
2、RocketMQ源码分析之Producer
2-1、Producer启动流程
DefaultMQProducerImpl中的start()方法是生产者启动的核心方法
核心三个方法、检查、获取MQ ClientInstance实例、启动。
2-2、Producer消息发送流程
主题也是三个步骤:
- 验证消息
- 查找路由
- 选择队列
- 消息发送
DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法
2-2-1、验证消息
2-2-2、查找路由
2-2-3、选择队列
2-2-4、消息发送
2-2-5、选择队列源码深入分析
默认选择队列策略
采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀。这种算法只要消息投递过程中没有发生重试的话,基本上可以保证每一个Queue队列的消息投递数量尽可能均匀。
当然如果投递中发生问题,比如第一次投递就失败,那么很大的可能性是集群状态下的一台Broker挂了,所以在重试发送中进行规避。这样设置也是比较合理的。
这里地方有一个注意的地方就是计数器使用了线程的ThreadLocal因为本身消息的生产就可以多线程进行,所以当然要基于线程的上下文来计数递增。
选择队列策略增强版(故障延迟机制)
- 默认的投递方式比较简单,但是也暴露了一个问题,就是有些Queue队列可能由于自身数量积压等原因,可能在投递的过程比较长,对于这样的Queue队列会影响后续投递的效果。
- 基于这种现象,RocketMQ在每发送一个MQ消息后,都会统计一下消息投递的时间延迟,根据这个时间延迟,可以知道往哪些Queue队列投递的速度快。
在这种场景下,会优先使用消息投递延迟最小的策略,如果没有生效,再使用Queue队列轮询的方式。
具体的话实现使用了一个策略类:
- 统计一下消息投递的时间延迟:org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem的实现
- 记录的地方还是“消息发送流程”中核心方法中DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法
这里的操作大概如下:
- 根据消息发送时长(currentLatency),计算broker不可用时长(duration),即如果消息发送时间越久,mq会认为broker不可用的时长越久,broker不可用时长是个经验值,如果传入isolation为true,表示默认当前发送时长为30000L,即broker不可用时长为600000L
- 调用latencyFaultTolerance.updateFaultItem更新broker异常容错信息。
这个方法最终会往一个ConcurrentHashMap表中写每台broker的延时、key是brokerName,value是currentLatency(延时)
其关键点在于设置startTimestamp(意味broker预计可用的时间),这里使用的阿里的经验值
假如:消息发送时长为100毫秒,则mq预计broker的不可用时长为0毫秒
假如:消息发送时长为600毫秒,则mq预计broker的不可用时长为30000毫秒
假如:消息发送时长为4000毫秒,则mq预计broker的不可用时长为18000毫秒
broker的预计恢复正常时间为:当前时间+不可用时长,即System.currentTimeMillis() + notAvailableDuration
- updateFaultItem的实现,一个broker对应一个faultItem,faultItem内容包含broker名称、消息发送时长、broker恢复正常的时间startTimestamp。
- 其关键点在于设置startTimestamp(意味broker预计可用的时间),什么意思呢,假设某次消息发送时长为4000毫秒,则mq预计broker的不可用时长为18000L(根据latencyMax数组,notAvailableDuration数组对应关系得到),则broker的预计恢复正常时间为:当前时间+不可用时长,即System.currentTimeMillis() + notAvailableDuration
- 因此LatencyFaultToleranceImpl#isAvailable判断broker是否预计可用的实现也很清晰了,只要当前时间>startTimestamp,即表示该broker正常了(逻辑意义上的正常,预计broker会在这个时间点后恢复正常)
整体实现思路:
- 在消息发送失败,mq根据消息发送耗时来预测该broker不可用的时长,并将broker名称,及”预计恢复时长“,存储于ConcurrentHashMap faultItemTable中
- 在开启消息容错后,选择消息队列时,会根据当前时间与FaultItem中该broker的预计恢复时间做比较,若(System.currentTimeMillis() - startTimestamp) >= 0,则预计该broker恢复正常,选择该broker的消息队列
- 若所有的broker都预计不可用,随机选择一个不可用的broker,从路由信息中选择下一个消息队列,重置其brokerName,queueId,进行消息发送
选择队列策略的对比
- 在默认队列选择机制下,会随机选择一个MessageQueue,若发送失败,轮询队列重新进行重试发送(屏蔽单次发送中不可用的broker),同步模式下默认失败时重试发送2次,但它的问题就是消息发送很大可能再次失败,引发再次重复失败,带来不必要的性能损耗。
- 在开启故障延迟机制后,消息队列选择时,会在一段时间内过滤掉RocketMQ认为不可用的broker,以此来避免不断向宕机的broker发送消息,从而实现消息发送高可用。
- 这两个策略没有绝对的好与坏,King老师个人认为,如果工作中选择,应该是看网络环境和服务器的环境。
- 如果是网络和服务器环境比较好,那么我推荐默认策略,毕竟重试的次数和几率比较小。
- 如果是网络和服务器环境压力比较大,推荐使用故障延迟机制。
3、RocketMQ源码分析之Store
3-1、读Store源码前的思考
- 当topic数量增多到100+时,kafka的单个broker的TPS降低了1个数量级,而RocketMQ在海量topic的场景下,依然保持较高的TPS?kafka如果是topic比较少tps会有上万的TPS,但是topic比较多久会下降到1~2千的TPS。但是 Rocket在topic比较多,就算上万个topic也能保持很高的TPS
- CommitLog的”随机读”对性能的影响?我们前面知道,RocketMQ是基于文件存储,所有消息的本体都保存在Commitlog上,消息的生产是顺序写,效率很高,但是消费的时候是基于主题的,一个主题的消息随机分布式在Commitlog上,所以这个是是随机读,这个对RocketMQ有什么影响。
3-2、RocketMQ对比 Kafka
- Kafka 中文件的布局是以 Topic/partition ,每一个分区一个物理文件夹,在分区文件级别实现文件顺序写,如果一个Kafka集群中拥有成百上千个主题,每一个主题拥有上百个分区,消息在高并发写入时,其IO操作就会显得零散(消息分散的落盘策略会导致磁盘IO竞争激烈成为瓶颈),其操作相当于随机IO,即 Kafka 在消息写入时的IO性能会随着 topic 、分区数量的增长,其写入性能会先上升,然后下降。
- 而 RocketMQ在消息写入时追求极致的顺序写,所有的消息不分主题一律顺序写入 commitlog 文件,并不会随着 topic 和 分区数量的增加而影响其顺序性。
- 在消息发送端,消费端共存的场景下,随着Topic数的增加Kafka吞吐量会急剧下降,而RocketMQ则表现稳定。因此Kafka适合Topic和消费端都比较少的业务场景,而RocketMQ更适合多Topic,多消费端的业务场景。
3-3、CommitLog之Message格式
3-4、Store架构设计之消息发送
整个存储设计层次非常清晰,大致的层次如下图:
- 业务层,也可以称之为网络层,就是收到消息之后,一般交给SendMessageProcessor来分配(交给哪个业务来处理)
- DefaultMessageStore,这个是存储层最核心的入口。
- 另外还有一个重要的是CommitLog.
以上就是三个核心类。
3-4-1、Store层处理消息的入口
这个存储层处理消息的过程就是一次RPC请求,所以我们找入口。当然还是由Broker启动
这里还是类似之前讲过的功能号的概念
1、SendMessageProcessor.processRequest
RocketMQ使用Netty处理网络,框架收到请求的处理就会进入processRequest
2、DefaultMessageStore.processRequest
3、CommitLog. asyncPutMessage
3.1、存储到MappedFileQueue的MappedFile
3.2、同步刷盘:GroupCommitService(独立的线程)
刷盘是在commitlog的构造方法中就启动了独立的线程处理3.3、异步刷盘:CommitRealTimeService/FlushCommitLogService(独立的线程)
3-5、Store架构设计之消息消费
RocketMQ中,所有的队列存储一个文件(commitlog)中,所以rocketmq是顺序写io,随机读。每次读消息时先读逻辑队列consumQue中的元数据,再从commitlog中找到消息体。增加了开销。那么在RocketMQ中是怎么优化的?
- 本身无论是Commitlog文件还是Consumequeue文件,都通过MMAP内存映射。
- 本身存储Commitlog采用写时复制的容器处理,实现读写分离,所以很大程度上可以提高一些效率。
4、源码分析之堆外内存
我们根据之前了解可以,一般情况下RocketMQ是通过MMAP内存映射,生产时消息写入内存映射文件,然后消费的时候再读。但是RocketMQ还提供了一种机制。我们来看下。
- TransientStorePool,短暂的存储池(堆外内存)。RocketMQ单独创建一个ByteBuffer内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。
- RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。同时因为是堆外内存,这么设计可以避免频繁的GC
4-1、开启条件及限制
1、开启位置 broker中的配置文件:
2、在DefaultMessageStore. DefaultMessageStore()构造方法中,也可以看到还有其他限制(开启堆外内存缓冲区,必须是异步刷盘+主节点)
4-2、TransientStorePool概要设计
这个地方的设计有点类似于连接池的设计,首先,构造方法中init方法用于构造堆外内存缓冲值,默认构造5个。
borrowBufferf()借用堆外内存池ByteBuffer
在创建MappedFile时就会进行设置。要注意,这里就会把堆外内存通过returnBuffer()赋给writeBuffer
4-3、与消息发送流程串联
有了上面的知识,我们就可以确定,在MappedFile中,如果writeBuffer不为null,要么就一定开启了堆外内存缓冲!!!再结合消息的发送流程。
数据到了存储存,最终会调用MappedFile的appendMessagesInner()进行消息的存储。
按照上图的流程,消息发送就有两条线。
- 走传统的MMAP内存映射,数据写mappedByteBuffer,然后通过 flush刷盘
- 走堆外内存缓冲区,数据先写writeBuffer,再通过commit提交到FileChannel中,最后再flush刷盘
以上两种方式,处理的都是基于bytebuffer的实现,所以都通过 put方法可以写入内存。
所以对应前面讲的刷盘,你会发现为什么异步刷盘线程有两个。一个是针对的MMAP刷盘,一个是针对的堆外内存缓冲的提交刷盘。所以了堆外内存缓冲区一定是要异步、Commit的是针对堆外内存缓冲的提交。Flush的是针对MMAP的内存映射的处理。
在CommitRealTimeService中最后调用到MappedFile的 commit0方法写入:具体的如下:
4-4、两种方式的对比
- 默认方式,Mmap+PageCache的方式,读写消息都走的是pageCache,这样子读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁,污染页的回写(脏页面)。
- 堆外缓冲区,DirectByteBuffer(堆外内存)+PageCache的两层架构方式,这样子可以实现读写消息分离,写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(对于,DirectByteBuffer是两步刷盘,一步是刷到PageCache,还有一步是刷到磁盘文件中),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了时延,比如说缺页中断降低,内存加锁,污染页的回写。
所以使用堆外缓冲区的方式相对来说会比较好,但是肯定的是,需要消耗一定的内存,如果服务器内存吃紧就不推荐这种模式,同时的话,堆外缓冲区的话也需要配合异步刷盘才能使用。
5、源码分析之ConsumeQueue
5-1、消息发送时数据在ConsumeQueue的落地
- 连续发送5条消息,消息是不定长,首先所有信息先放入 Commitlog中,每一条消息放入Commitlog的时候都需要上锁,确保顺序的写入。
- 当Commitlog写成功了之后。数据再同步到ConsunmeQueue中。
- 并且数据一条条分发,这个是一个典型的轮训。
- Queue Offset 代表一个Queue中的第几条消息
- Logic Offset就是Queue Offset*20 因为每一条ConsumeQueue中的消息长度都是20.
- Physical Offset,这个是在 Commitlog中每一条消息偏移量。
这种设计非常的巧妙:
- 查找消息的时候,可以直按根据队列的消息序号,计算出索引的全局位置(比如序号2,就知道偏移量是20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息、这里面比较耗时两个操作就是分别找到索引和消息所在文件,这两次查找是差不多的,都可以抽象成
- 因为每个索引文件或者消息文件的长度的是固定的,对于每一组文件,都维护了一个由小到大有序的文件数组。查找文件的时候,直接通过计算即可获取文件在数组中的序号:
- 文件在数组中的序号=(全局位置-第一个文件的文件名)/文件固定大小
- 在通过序号在数组中获取数据的时间复杂度是0(1),二次查找文件的时间复杂度也是是:0(1)+0(1) =0 (1),所以消费时查找数据的时间复杂度也是O(1)。
5-2、入口:ReputMessageService.doReput(独立线程)
DefaultMessageStore. start()
ReputMessageService.run()
5-3、异步刷盘
6、源码分析之消息消费
6-1、Rebalance(针对集群消费模式)
(1)消费Group下的所有消费者
(2)Topic的所有Queue队列
(3)Queue分配策略
6-2、触发时机
(1)消费者启动
(2)消费者加入或者退出消费组
(3)定时触发Rebalance(10s)
6-3、举例
- 假设,一个topic中有4个队列,有一个Producer往4个队列中发数据,在集群消费中,在一个消费者分组中如果只有一个消费者。那么这个消费者肯定会消费4个队列,不然就会漏数据。
- 如果加入了一个Consumer2,这个时候就会触发一个Rebalance(Consumer增加了触发),这个2个消费者平均消费4个队列。
如果再加入了一个Consumer3,这个是否平均分不了,一般的处理,默认情况下,Consumer1消费两个,其他的消费一个。
如果再加入了一个Consumer4,刚好一对一,所以每个 Consumer消费一个队列。
如果再加入了一个Consumer5,消费者数据大于队列,那么Consumer5就消费不了数据,除非队列增加了,或者是说Consumer减少了才行。
所以当你启动多个消费者,如果消费者数量大于queue的数量,也只能有queue数量的消费者消费(就跟在软件公司内部找女朋友一样,狼多肉少)
蛋糕都被吃完了,你没得吃了。这个其实就是消费并发度。消费并发度决定因素是queue的数量。
6-4、源码解读
这里讲到的是基于推模式的消费,也就是我们常用的消费模式。
DefaultMQPushConsumerImpl.start()方法
还是要进入MQClientInstance.start()方法
在MQClientInstance.start()方法,有一个线程RebalanceService 就是锁Rebalance。具体实现RebalanceService来做的,是我们来看下
RebalanceService
这里有一个针对MessageQueue的排序。
- 为什么这么设计。如果同一个分组的多个客户端,分布在不同的机器上(消费者的机器上),每台客户端都单独算,并且算出来的效果是一致的。
- 总体消费就是让每一个 Consumer有同样的一个MessageQueue的视图,因为每个消费者的视图是一致的,那么在每个客户端算负载,算出来的结果当然就是一致的。这样就能保障之前的负载均衡的算出之前的效果。
- 对于 Consumer1和Consumer2,经过统一的排序,在Consumer1客户端也好,还是Consumer2的客户端也好,算出来的结果是一致的。
- Consumer1消费 queue1和queue2。Consumer2消费queue3和queue4。
- 对比Kafka,在消费的时候依赖Zookeeper,broker变动还要走选举之类,如果选不出或者比较卡,这个是否会导致负载不正常,负载不成功就不能正常的工作。
- 而RocketMQ的这种方式简单,并且高可用。强一致性必定要牺牲高可用性,RocketMQ设计上更多偏向高可用。
6-5、消费者源码解读
我们知道,在消费的时候有两种模式,一个是并发消费,另外一种是顺序消费。
因为消费者的代码非常复杂,没有必要全部读懂。所以采取了一种偏向于大家都能听懂的高可用方式(牺牲读源码的全面性)读两个流程。
6-6、并发消费
三个角色:消费者Consumer、 Borker、NameServer
NameServer主要记录了Borker上有哪些Topic。
在消费者启动之后,第一步都要从NameServer中获取Topic相关信息。
- 这一步设计到组件之间的交互,RocketMQ使用功能号来设计的。
- GET_ROUTEINFO_BY_TOPIC
- 我在idea上使用ctrl+H 查找功能。
很快就定位这段代码:
消费者拿到topic相关信息之后,第2步需要知道Topic中有哪些queue,并且消费的时候还跟消费者分组相关。所以这里就需要根据group获取相关信息。(这里有定时触发<默认10s一次>,同时在消费者启动的时候也会主动触发一次)
功能号:GET_CONSUMER_LIST_BY_GROUP当我们拿到了消费者Group下的所有信息之后,这个就可以做分配,可以分配到比如自己这台消费者的应该要消费哪些主机上的哪些队列。
这个地方教DoRebalance,同时这个DoRebalacne之前已经细讲(具体这里不细讲)确定了消费者的group、topic、还有queue之后,还需要知道从哪个位置开始消费。于是还需要获取Queue的Offset。功能号:QUERY_CONSUMER_OFFSET
调用的地方RemoteBrokerOffsetStore类中fetchConsumeOffsetFromBroker
确定了消费者的group、topic、还有queue和需要获取Queue的Offset,就要正式开始拉取消息了。送入的信息:topic、queueid、offset,还有maxnum(每次拉取多少条消息),
suspendtimeout 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 默认值 20000 。功能号:PULL_MESSAGE
拉到消息后,消费者就要进行消息的消费了。消费完了之后,要更新offset,这个时候也要发起调用。功能号:UPDATE_CONSUMER_OFFSET
这个地方要注意有两种方式:
1、 定时,默认5s提交
2、 前面步骤的拉取消息时会带入参数:commitoffset,这个时候也会更新。最后的话,消费者关闭的话,也会调用功能号:UNREGISTER_CLIENT。当然,生产者和和Broker之间还有心跳机制,这里就不多说了。
6-7、顺序消费
顺序消费的主体步骤和并发消费差不多,主要的差别就是有一个加锁和解锁的过程。
- 只要确定了是拉哪个queue。这个地方要加锁,加锁的目的就可以达到顺序性。在一个queue中消息是顺序的,当一个消费者确定了一个queue进行消费时,使用一个分布式锁机制,是不是就可以确定这个消费者的顺序性。
加锁Queue,LOCK_BATCH_MQ
同时发现,这个地方也有一个定时执行,20s,这个是周期性的去续锁。因为在broker端,这把的锁的时间也有一定的失效的,(默认60s),如果超过这个时间,这把锁就释放了。
Broker端针对这个的实现就是一个ReentrantLock而已。
解锁Queue,UNLOCK_BATCH_MQ
6-8、消费中常见问题
6-8-1、重复消息
RocketMQ生产也好,消费也好,有重试机制、重发队列等等,所以在网络情况不太好的情况下, RocketMQ避免不了消息的重复。
6-8-2、消费卡死
- 之前我讲到了消费的流程中,尤其是针对顺序消息,我们感觉上会有卡死的现象,由于顺序消息中需要到Broker中加锁,如果消费者某一个挂了,那么在Broker层是维护了60s的时间才能释放锁,所以在这段时间只能,消费者是消费不了的,在等待锁。
- 另外如果还有Broker层面也挂了,如果是主从机构,获取锁都是走的Master节点,如果Master节点挂了,走Slave消费,但是slave节点上没有锁,所以顺序消息如果发生了这样的情况,也是会有卡死的现象。
6-8-3、启动之后较长时间才消费
- 在并发消费的时候,当我们启动了非常多的消费者,维护了非常多的topic的时候、或者queue比较多的时候,你可以看到消费的流程的交互是比较多的(5~6步),要启动多线程,也要做相当多的事情,所以你会感觉要启动较长的时间才能消费。
- 还有顺序消费的时候,如果是之前的消费者挂了,这个锁要60秒才会释放,也会导致下一个消费者启动的时候需要等60s才能消费。
7、RocketMQ其他功能
7-1、消息轨迹
源码目录下也有说明文档:
rocketmq-all-4.8.0-source-release\docs\cn\msg_trace\user_guide.md
7-1-1、配置
Broker端服务器开启配置:traceTopicEnable=true
RocketMQ集群中每一个Broker节点均用于存储Client端收集并发送过来的消息轨迹数据。因此,对于RocketMQ集群中的Broker节点数量并无要求和限制。
对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RockeMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。
7-1-2、保存消息轨迹的Topic定义
RocketMQ的消息轨迹特性支持两种存储轨迹数据的方式:
系统级的TraceTopic
- 在默认情况下,消息轨迹数据是存储于系统级的TraceTopic中(其名称为:RMQ_SYS_TRACE_TOPIC,队列个数为1)。该Topic在Broker节点启动时,会自动创建出来(如上所叙,需要在Broker端的配置文件中将traceTopicEnable的开关变量设置为true)。
用户自定义的TraceTopic
- 如果用户不准备将消息轨迹的数据存储于系统级的默认TraceTopic,也可以自己定义并创建用户级的Topic来保存轨迹(即为创建普通的Topic用于保存消息轨迹数据)。自定义的话需要在Client客户端的处理的时候自定义的TraceTopic。具体见案例。一般推荐使用系统及的TraceTopic。
7-1-3、案例
代码在example目录下,tracemessage目录下。
RocketMQ在消息审核消费时采用对原来接口增加一个开关参数(enableMsgTrace)来实现消息轨迹是否开启;并新增一个自定义参数(customizedTraceTopic)来实现用户存储消息轨迹数据至自己创建的用户级Topic。对于定义轨迹的主题,需要先创建这个主题才能收到消息,RocketMQ不会自动创建该主题。
7-1-4、关键属性
7-1-5、源码解读
代码实现的核心类是AsyncTraceDispatcher
org.apache.rocketmq.client.trace.AsyncTraceDispatcher记录消息的轨迹主要是集中在消息发送前后、消息消费前后,可以通过RokcetMQ的Hook机制。通过如下两个接口来定义钩子函数
SendMessageHook
ConsumeMessageHook
通过实行上述两个接口,可以实现在消息发送、消息消费前后记录消息轨迹,为了不明显增加消息发送与消息消费的时延,记录消息轨迹最好使用异步发送模式。
核心步骤如下:
- 遍历收集的消息轨迹数据
- 获取存储消息轨迹的Topic
- 对TraceContext进行编码,这里是消息轨迹的传输数据。
- 将编码后的数据发送到Broker服务器。
7-2、权限控制
在RocketMQ4.4.0版本升级中加入了ACL权限管控,ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念。
用户的概念,即支持用户名、密码。
资源:需要保护的对象,消息发送涉及的Topic、消息消费涉及的消费组,应该进行保护,故可以抽象成资源。
权限:针对资源,能进行的操作。 角色:RocketMQ中,只定义两种角色:是否是管理员。
7-2-1、配置
- acl默认的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目录下
- 需要使用acl必须在服务端开启此功能,在Broker的配置文件中配置,aclEnable = true开启此功能
- RocketMQ 的权限控制存储的默认实现是基于yml配置文件。用户可以动态修改权限控制定义的属性,而不需重新启动Broker服务节点,因为 Broker 端有文件监听机制,每隔 500ms 监听、处理、加载文件的变更内容。
- 如果ACL与高可用部署(Master/Slave架构)同时启用,那么需要在Broker Master节点的${ROCKETMQ_HOME}/store/conf/plain_acl.yml配置文件中 设置全局白名单信息,即为将Slave节点的ip地址设置至Master节点plain_acl.yml配置文件的全局白名单中。
- plain_acl.yml文件中相关的参数含义及使用
字段
取值
含义
globalWhiteRemoteAddresses
*;192.168.*.*;192.168.0.1
全局IP白名单
accessKey
字符串
Access Key 用户名
secretKey
字符串
Secret Key 密码
whiteRemoteAddress
*;192.168.*.*;192.168.0.1
用户IP白名单
admin
true;false
是否管理员账户
defaultTopicPerm
DENY;PUB;SUB;PUB|SUB
默认的Topic权限
defaultGroupPerm
DENY;PUB;SUB;PUB|SUB
默认的ConsumerGroup权限
topicPerms
topic=权限
各个Topic的权限
groupPerms
group=权限
各个ConsumerGroup的权限
DENY 拒绝
ANY PUB 或者 SUB 权限
PUB 发送权限
SUB 订阅权限
7-2-2、案例实战
在Broker的配置文件中配置,aclEnable = true开启此功能
代码得加入一个返回RPCHook的方法
如果权限受限制:
加入权限对应用户:
发送成功。
很多时候像控制台配置了后会报错。
把全局的白名单打开。
8、常见问题
8-1、MQ百万消息积压处理
- 发生了线上故障,几千万条数据在MQ里积压很久。是修复consumer的问题,让他恢复消费速度,然后等待几个小时消费完毕?这是个解决方案。不过有时候我们还会进行临时紧急扩容。
- 一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条。1000多万条,所以如果积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。
- 一般如果消息不重要的话就在consume上直接释放掉。
- 如果topic的messageQueue设置的比较多,比如设置了20个,consume实例只有4个,那么每个consume实例对应5个messageQueue,这个时候可以申请临时加机器,增加consume实例为20个,达到快速消费的目的。
- 如果messageQueue设置的比较少,比如只设置了4个,那么这个时候就不能通过加consume机器来解决了,这时候就需要修改消费者代码了,不再消费者消费,而是把要消费的消息放到mq的另一个topic中,这个topic设置20个messageQueue,对应20个consume实例,进行消费
8-2、消费幂等
8-2-1、消息重复的场景
生产者
- 重试机制导致的问题,消息成功发送到MQ中,但MQ因网络原因未能成功返回,导致重试机制重试机制重复发送到MQ
消费端
- 手动提交offset未完成
- 消费者成功消费完消息,未返回consume_commit时,系统重启|系统宕机,
- MQ重新发送消息到同消息组其他消费者机器,导致消息重复
8-2-2、什么是幂等性?
对于消息接收端的情况,幂等的含义是采用同样的输入多次调用处理函数,得到同样的结果。例如,一个SQL操作:update stat_table set count= 10 where id =1
这个操作多次执行,id等于1的记录中的 count字段的值都为10,这个操作就是幂等的,我们不用担心这个操作被重复。
再来看另外一个SQL操作:update stat_table set count= count +1 where id= 1;
这样的SQL操作就不是幂等的,一旦重复,结果就会产生变化。
8-2-3、MVCC
多版本并发控制,乐观锁的一种实现,在生产者发送消息时进行数据更新时需要带上数据的版本号,消费者去更新时需要去比较持有数据的版本号,版本号不一致的操作无法成功。例如博客点赞次数自动+1的接口:
public boolean addCount(Long id, Long version);
update blogTable set count= count+1,version=version+1 where id=321 and version=123
每一个version只有一次执行成功的机会,一旦失败了生产者必须重新获取数据的最新版本号再次发起更新。
8-2-4、去重表
利用数据库表单的特性来实现幂等,常用的一个思路是在表上构建唯一性索引,保证某一类数据一旦执行完毕,后续同样的请求不再重复处理了(利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。)
利用RocketMQ的key值
以电商平台为例子,电商平台上的订单id就是最适合的token。当用户下单时,会经历多个环节,比如生成订单,减库存,减优惠券等等。每一个环节执行时都先检测一下该订单id是否已经执行过这一步骤,对未执行的请求,执行操作并缓存结果,而对已经执行过的id,则直接返回之前的执行结果,不做任何操作。这样可以在最大程度上避免操作的重复执行问题,缓存起来的执行结果也能用于事务的控制等。
9、RocketMQ性能优化
9-1、JVM层面
监控暂停
- rocketmq-console这个是官方提供了一个WEB项目,可以查看rocketmq数据和执行一些操作。但是这个监控界面又没有权限控制,并且还有一些消耗性能的查询操作,如果要提高性能,建议这个可以暂停。
- 一般的公司在运维方面会有专门的监控组件,如zabbix会做统一处理。
- 或者是简单的shell命令
- 监控的方式有很多,比如简单点的,我们可以写一个shell脚本,监控执行rocketmqJava进程的存活状态,如果rocketmq crash了,发送告警。
消除偏向锁
- 大家了解,在JDK1.8 sync有偏向锁,但是在RocketMQ都是多线程的执行,所以竞争比较激烈,建议把偏向锁取消,以免没有必要的开销。
- -XX:-UseBiasedLocking: 禁用偏向锁
垃圾回收
- RocketMQ推荐使用G1垃圾回收器。
- -Xms8g -Xmx8g -Xmn4g:这个就是很关键的一块参数了,也是重点需要调整的,就是默认的堆大小是8g内存,新生代是4g内存。
- 如果是内存比较大,比如有48g的内存,所以这里完全可以给他们翻几倍,比如给堆内存20g,其中新生代给10g,甚至可以更多些,当然要留一些内存给操作系统来用
- -XX:+UseG1GC -XX:G1HeapRegionSize=16m:这几个参数也是至关重要的,这是选用了G1垃圾回收器来做分代回收,对新生代和老年代都是用G1来回收。这里把G1的region大小设置为了16m,这个因为机器内存比较多,所以region大小可以调大一些给到16m,不然用2m的region, 会导致region数量过多。
- -XX:G1ReservePercent=25:这个参数是说,在G1管理的老年代里预留25%的空闲内存,保证新生代对象晋升到老年代的时候有足够空间,避免老年代内存都满了,新生代有对象要进入老年代没有充足内存了。默认值是10%,略微偏少,这里RocketMQ给调大了一些
- -XX:initiatingHeapOccupancyPercent= :30:这个参数是说,当堆内存的使用率达到30%之后就会自动启动G1的并发垃圾回收,开始尝试回收一些垃圾对象。默认值是45%,这里调低了一些,也就是提高了GC的频率,但是避免了垃圾对象过多,一次垃圾回收耗时过长的问题
- -XX:-OmitStackTraceInFastThrow:这个参数是说,有时候JVM会抛弃-些异常堆栈信息,因此这个参数设置之后,就是禁用这个特性,要把完整的异常堆栈信息打印出来。
- -XX:+AIwaysPreTouch:这个参数的意思是我们刚开始指定JVM用多少内存,不会真正分配给他,会在实际需要使用的时候再分配给他,所以使用这个参数之后,就是强制让JVM启动的时候直接分配我们指定的内存,不要等到使用内存的时候再分配
- -XX:MaxDirectMemorySize=15g:这是说RocketMQ里大量用了NIO中的direct buffer,这里限定了direct buffer最多申请多少,如果你机器内存比较大,可以适当调大这个值,不了解direct buffer是什么,可以自己查看JVM三期。
- -XX:-UseLargePages:这个参数的意思是禁用大内存页,某些情况下会导致内存浪费或实例无法启动。默认启动。
9-2、操作系统层面
9-2-1、基本参数
vm.overcommit_memory=1
- 是否允许内存的过量分配
- 当为0的时候,当用户申请内存的时候,内核会去检查是否有这么大的内存空间
- 当为1的时候,内核始终认为,有足够大的内存空间,直到它用完了为止
- 当为2的时候,内核禁止任何形式的过量分配内存
vm.swappiness=10
- swappiness=0 仅在内存不足的情况下,当剩余空闲内存低于vm.min_free_kbytes limit时,使用交换空间
- swappiness=1 内核版本3.5及以上、Red Hat内核版本2.6.32-303及以上,进行最少量的交换,而不禁用交换
- swappiness=10 当系统存在足够内存时,推荐设置为该值以提高性能
- swappiness=60 默认值
- swappiness=100 内核将积极的使用交换空间
vm.max_max_count=655360
- 定义了一个进程能拥有的最多的内存区域,默认为65536
- ulimit=1000000
- limits.conf 设置用户能打开的最大文件数.
1、查看当前大小
ulimit -a2、临时修改
ulimit -n 10000003、永久修改
vim /etc/security/limits.conf
9-2-2、NIC
一个请求到RocketMQ的应用,一般会经过网卡、内核空间、用户空间。
网卡
网络接口控制器(英语:network interface controller,NIC)
因 Ring Buffer 写满导致丢包的情况很多。当业务流量过大且出现网卡丢包的时候,建议调整Ring Buffer的大小,这个大小的设置一定程度上是可以缓解丢包的状况。