RocketMQ高性能背后的核心原理
1.消息主从复制
如果Broker以一个集群的方式部署,会有一个master节点和多个Slave节点,消息需要从master复制到slave上,而消息复制的方式分为同步复制和异步复制。
同步复制:
同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态.在同步复制下,如果Master节点故障,Slave上有全部的数据备份,这样容易恢复数据,但是同步复制会增大数据写入的延迟,降低系统的吞吐量
异步复制:
只要Master写入消息成功,就反馈给客户端写入成功的状态,然后再异步地将消息复制给Slave节点。在异步复制下,系统拥有较低地延迟和较高地吞吐量,但是如果master节点故障,而有些数据没有完成复制,就会造成数据丢失
配置方式:
消息复制方式是通过Broker配置文件里地brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER,SYNC_MASTER,SLAVE三个值中的一个
2.负载均衡–重点
Producer
Prodicer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,已达到让消息平均落在不同的queue上的目的,而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的Broker上,见上图,同时生产者在发送消息时,可以指定一个MessageQueueSelector,通过这个对象来将消息发送到自己指定的MessageQueue上,这样可以保证消息局部有序
Consumer
Consumer也是以MessageQueue为单位来进行负载均衡的,分为集群模式和广播模式
集群模式
在集群消费模式下,每条消息只需要投递到订阅的这个Topic的Consumer Group下的
一个实例即可,RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要
明确指定拉取哪一条MessageQueue,没当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。每次分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配,内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类,可以在consumer中直接set来指定,默认情况下使用的时最简单的平均分配策略
-
AllocateMachineRoomNearby
将统计放的Consumer和Broker有限分配在一起。这个策略可以通过一个matchineRoomResolve对象来定制Consumer和Broker的机房解析规则。然后还需要引入另外一个分配策略来对统计放的Broker和Consumer进行分配一般也就用简单的平均分配策略或者轮询分配策略(但是比较鸡肋,直接给属性指定机房更好) -
AllocateMessageQueueAveragely
平均分配,将所有MessageQueue平均分给每一个消费者
-
AllocateMessageQueueAveragelyByCircle
轮询分配。轮流地给一个消费者分配一个MessageQueue -
AllocateMessageQueueByConfig
不分配,直接指定一个MessageQueue,类似于广播模式,直接指定所有队列 -
AllocateMessageQueueByMachineRoom
按逻辑机房地概念进行分配。又是对BrokerName和COnsumerId定制化地配置 -
ALlocateMessageQueueConsistenHash
这个一致性哈希策略只需要指定一个虚拟节点数,使用一个Hash环地算法,
虚拟节点是为了让Hash数据在环上分布更为均匀
广播模式
官博模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说,而在实现上,即使在Consumer分配Queue时,所有Consumer都分到所有的Queue.广播模式实现的关键是将消费者的消费偏移量不再保存到Broker当中
3.消息重试
首先对于广播模式下的消息,是不存在消息重试的机制的,即消息消费失败后,会再重新进行发送,而只是继续消费新的消息,而对于普通的消息,当消费者消费失败后,可以通过设置返回状态达到消息重试的结果
如何让消息进行重试?
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。
可以有三种配置方式:
1.返回Action.ReconsumeLater (推荐)
2.返回null
3.抛出异常
如果希望消费失败后不重试,可以直接返回.CommitMessage
重试消息如何处理?
重试的消息会进入一个"%RETRY" + ConsumerGroup的队列中,然后RocketMQ默认允许每条消息最多重试16次,每重试间隔时间如图,随着重试次数的递增,重发间隔时间也是递增的,注:消费者实例要避免只有一个,否则重试次数是没有意义的
重试次数
如果消息重试16次后仍然失败,消息将不再投递,转为进入死信队列。另外一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。这个重试次数,RocketMQ可以进行定制,例如通过consumer.setMaxReconsumeTimes(20)// 将重试次数设置为20次,当定制的重试次数超过16次后,消息的重试时间间隔均为2小时
MessageId
在老版本的RocketMQ中,一条消息无论重试多少次,这些重试消息的MessgeId始终都是一样的但是在4.9.1版本中,每次重试MessageId都会重建
配置覆盖
消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效,并且最后启动的Consuemr会覆盖之前启动的Consumer配置
4.死信队列
当一条消息消费失败,RocketMQ就会自动进行消息重试,而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题,但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列
死信队列的名称是"%DLQ%+ConsumerGroup"
死信队列的特征:
1.一个死信队列对应一个ConsumerGroup,而不是对应某个消费者实例
2.如果一个ConsumerGroup没有产生死信队列,RocketMQ就不会为其创建相应的队列
3.一个死信队列中的消息不会再被消费者正常消费
4.死信队列的有效期跟正常消息相同,默认3天,对应broker.conf中的fileReservedTime属性,超过这个最长时间的消息都会被删除,而不管消息是否被消费过,通常,一条消息进入了死信队列,意味着消息再消费处理的过程中出现了比较严重的错误,并且无法自行恢复,此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查,然后对死信消息进行处理,比如转发到正常的Topic重新进行消费或者丢弃
注:默认创建出来的死信队列,它里面的消息是无法读取的,在控制台和消费者中都无法读取,这是因为这些默认的死信队列,它们的权限perm被设置成了2:禁读(4,禁写,6:可读可写)需要手动将死信队列的权限配置改成6,才能被消费
5.消息幂等
幂等概念
在MQ系统中,对于消息幂等有三种实现语义
1.at most once 最多一次:每条消息最多只会被消费一次
2.at least once 至少一次:每条消息至少会被消费一次
3.exactly once 刚刚好一次:每条消息都追确定地消费一次
这三种语义都有它使用的业务场景。其中,at most once是最好保证的,
RocketMQ中可以直接使用异步发送,sendOneWay等方式就可以保证,
而at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证,
而这个exactly once是MQ种最理想也是最难保证的一种语义,需要有非常惊喜的设计才行,RocketMQ只能保证at least once,保证不了exactly once,所以,使用RocketMQ时,需要由业务系统自行保证消息的幂等性,但是,对于Exactly once语义,阿里云上的商业版RocketMQ是有明确支持的,实现方式未开源
消息幂等的必要性。
在互联网应用种,由器在网络不稳定的情况下,消息队列RocketMQ的消息有可能会出现重复,这个重复简单可以概括为以下情况:
1.发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且MessageId也相同的消息
2.投递时消息重复。
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断,为了保证消息至少被消费一次,消息队列RocketMQ的服务端将在网络恢复后再次投递之前已被处理过的消息,消费者后续会受到两条内容相同并且MessageId也相同的消息
3.负载均衡时消息重复(包括但不限于网络抖动、Broker重启亿级订阅应用重启)
当RocketMQ的Broker或客户端重启,扩容,缩容时,会触发Rebalance,此时消费
者可能会受到重复消息
处理方式
在RocketMQ是无法保证每个消息制备投递一次,所以要在业务上自行来保证消息消费的幂等性。而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程种是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况,所以在一些对幂等要求严格的场景,最好是使用业务上唯一的标识比较靠谱,例如订单id,而这个业务标识可以使用Message的key来进行传递