前言:本文是博主网络自行收集的一些RabbitMQ相关八股文,还在准备暑期实习,后续应该会持续更新......
参考:三天吃透RabbitMQ面试八股文_牛客网
目录
RabbitMQ概述
什么是 RabbitMQ?
说一说RabbitMQ中的AMQP
为什么要用RabbitMQ?有什么好处?场景是什么?
RabbitMQ 中有哪些重要的角色?
RabbitMQ的优缺点
RabbitMQ的组件和构造
工作模式
RabbitMQ的工作模式
👉 简单模式(Hello World)
👉 工作队列模式(Work queues)
👉 订阅模式( Pub/Sub)
👉 路由模式(Routing )
👉 通配符模式(Topics)
路由
消息是如何路由的?
Exchange四种类型
👉 Direct
👉 fanout
👉 topic
👉 headers
消息分发
消息基于什么传输?
为什么需要信道?为什么不是TCP直接通信?
消息属性和有效载荷(消息主体)
消息分发机制
说说消息pull模式
怎么设置消息的过期时间?
消息丢失
如何确保消息不丢失?
👉 生产者确认机制
👉 路由不可达消息
👉 消费者手动消息确认
👉 持久化
什么是死信队列?
Rabbitmq如何实现延迟队列
重复消费
消息重复消费怎么处理?
消息积压
如何解决消息积压的问题
如何处理消息堆积情况?几千万条数据在MQ里积压了七八个小时
由于消息积压导致过期被清理了怎么办
消费端怎么进行限流?
可靠性
MQ 消息可靠性怎么保证?
如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?
如何保证消息的有序性?
分布式
镜像队列
RabbitMQ的集群
镜像集群模式
集群的优势劣势
RabbitMQ概述
什么是 RabbitMQ?
-
RabbitMQ是一个由Erlang开发的,在AMQP(高级消息队列协议)基础上完成的消息队列。
-
消息队列用于应用间的异步协作
-
最大的特点就是消费并不需要确保提供方存在,实现了服务之间的高度解耦。
说一说RabbitMQ中的AMQP
AMQP,Advanced Message Queuing Protocol,高级消息队列协议
它是一个提供统一消息服务的应用层二进制协议,为面向消息的中间件设计。它是基于TCP/IP协议构造的协议
基于此协议的客户端与消息中间件可传递消息,并不受不同平台、开发语言和操作系统的影响,即可跨平台传输。
下面是AMQP模型的简要示意图:
AMQP的主要处理流程
生产者是将消息发送到Exchange,Exchange根据路由规则Routing Key将消息路由到不同的Queue上,如果Queue上有消费者监听,则消费者可以获得消息。
生产者在生产消息的时候是不知道消费者的状态的,消费者在消费消息时也是不知道消息是从哪个生产者来的,即生产者与消费者之间的完全解耦的。
为什么要用RabbitMQ?有什么好处?场景是什么?
-
异步处理:对于一些不需要立即生效的操作,可以拆分出来,异步执行,使用消息队列实现,性能高
-
流量消峰:订单系统使用消息队列做缓冲,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作
-
应用解耦:比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成
RabbitMQ 中有哪些重要的角色?
RabbitMQ 中重要的角色有:生产者、消费者和代理。
-
生产者:消息的创建者,负责创建和推送数据到消息服务器;
-
消费者:消息的接收方,用于处理数据和确认消息;
-
代理:就是 RabbitMQ 消息队列本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。
RabbitMQ的优缺点
优点:
-
有管理界面,方便使用;
-
功能丰富,支持消息持久化、消息确认机制、多种消息分发机制
-
可靠性高;
缺点:
-
使用erlang实现,不利于二次开发和维护;
-
性能较kafka差,持久化消息和ACK确认的情况下生产和消费消息单机吞吐量大约在1~2w左右,kafka单机吞吐量在十万级别。
RabbitMQ的组件和构造
-
生产者Publisher:生产消息,就是投递消息的一方。消息一般包含两个部分:消息体(payload)和标签(Label)
-
消费者Consumer:消费消息,也就是接收消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
-
Broker服务节点:表示消息队列服务器实体。一般情况下一个Broker可以看做一个RabbitMQ服务器。
-
Queue:消息队列,用来存放消息。一个消息可投入一个或多个队列,多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
-
Exchange:交换器,接受生产者发送的消息,根据路由键将消息路由到绑定的队列上。
-
Routing Key: 路由关键字,用于指定这个消息的路由规则,需要与交换器类型和绑定键(Binding Key)联合使用才能最终生效。
-
Binding:绑定,通过绑定将交换器和队列关联起来,一般会指定一个BindingKey,通过BindingKey,交换器就知道将消息路由给哪个队列了。
-
Connection :网络连接,比如一个TCP连接,用于连接到具体broker
-
Channel: 信道,AMQP 命令都是在信道中进行的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接,一个TCP连接可以用多个信道。客户端可以建立多个channel,每个channel表示一个会话任务。
-
Message:消息,由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
-
Virtual host:虚拟主机,用于逻辑隔离,表示一批独立的交换器、消息队列和相关对象。一个Virtual host可以有若干个Exchange和Queue,同一个Virtual host不能有同名的Exchange或Queue。最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段
工作模式
RabbitMQ的工作模式
https://blog.csdn.net/qq_33406883/article/details/124513956
RabbitMQ 提供了 6 种工作模式:
-
简单模式
-
work queues
-
Publish/Subscribe 发布与订阅模式
-
Routing路由模式
-
Topics 主题模式
-
RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
👉 简单模式(Hello World)
-
P:生产者,也就是要发送消息的程序
-
C:消费者:消息的接收者,会一直等待消息到来
-
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
👉 工作队列模式(Work queues)
-
Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。消费者之间对于同一个消息的关系是
竞争
的关系。 -
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
👉 订阅模式( Pub/Sub)
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
-
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
-
C:消费者,消息的接收者,会一直等待消息到来
-
Queue:消息队列,接收消息、缓存消息
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
👉 路由模式(Routing )
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
👉 通配符模式(Topics)
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
路由
消息是如何路由的?
-
消息提供方 ——> 路由 ——> 一至多个队列
-
消息发布到交换器(Exchange)时,消息将拥有一个路由键(routing key),在消息创建时设定
-
通过队列路由键,可以把队列绑定到交换器上
-
消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配(针对不同的交换器类型有不同的路由规则),常见的交换机类型有四种:direct、fanout、topic、headers
Exchange四种类型
Exchange分发消息时,根据类型的不同分发策略不同。
目前共四种类型:direct、fanout、topic、headers
-
direct:
Routing Key==Binding Key
, 消息中的路由键 Routing Key 如果和 Binding 中的 Binding Key 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式 -
fanout:把所有发送到fanout交换器的消息,路由到所有绑定该交换器的队列 Queue 中,fanout 类型转发消息是最快的
-
topic:通过模糊匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上
-
headers:不依赖于routing key与binding key的匹配规则,而是根据发送消息内容中的headers属性进行匹配;除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多(目前几乎用不到了)
👉 Direct
direct交换机会将消息路由到binding key 和 routing key完全匹配的队列中。它是完全匹配、单播的模式。
👉 fanout
所有发到 fanout 类型交换机的消息都会路由到所有与该交换机绑定的队列上去。fanout 类型转发消息是最快的。
👉 topic
topic交换机使用routing key和binding key进行模糊匹配,匹配成功则将消息发送到相应的队列。routing key和binding key都是句点号“. ”分隔的字符串
-
routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开,这些单词可以是任意单词,比如说:
"stock.usd.nyse"
单词列表最多不能超过 255 个字节 -
替换符:binding key中可以存在两种特殊字符“*”与“##”,其中
*
用于匹配一个单词,##
用于匹配多个单词 -
当一个队列绑定键是
#
,那么这个队列将接收所有数据,就有点像 fanout,如果队列绑定键当中没有#
和*
出现,那么该队列绑定类型就是 direct
👉 headers
headers交换机是根据发送的消息内容中的 headers属性 进行路由的。
-
在绑定Queue与Exchange时,指定一组键值对;
-
当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;
-
如果完全匹配则消息会路由到该Queue
-
否则不会路由到该Queue。
消息分发
消息基于什么传输?
-
由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ 使用 信道 的方式来传输数据
-
信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制
为什么需要信道?为什么不是TCP直接通信?
-
TCP的创建和销毁开销大,创建需要三次握手,销毁需要四次分手
-
如果不使用信道,那么引用程序就会使用TCP的方式连接到rabbitmq,高峰时每秒成千上万条连接会造成资源的巨大浪费(一条tcp消耗资源,成千上万的tcp会非常消耗资源),而且操作系统每秒处理TCP连接数量也是有限的,必定会造成性能瓶颈
-
信道的原理是一条线程一条信道,多条线程多条信道共同使用一条TCP连接。一条TCP连接可以容纳无限的信道,及时每秒造成成千上万的请求也不会造成性能瓶颈
消息属性和有效载荷(消息主体)
AMQP模型中的消息 (Message)对象是带有 属性(Attributes) 的。有些属性非常常见,例如:
-
Content type
: 内容类型 -
Content encoding
: 内容编码 -
Routing Key
: 路由键 -
Delivery mode
: 投递方式(持久化 or 非持久化) -
Message priority
: 消息优先权 -
Message publishing timestamp
: 消息发布的时间戳 -
Expiration period
: 消息的有效期 -
Publisher application id
: 发布应用的id
有些属性是被 AMQP代理所使用的,比如 Routing Key
,但是大多数是对给接收消息的消费者使用的,有些属性是可选为做消息头的。它们与HTTP协议的 X-headers
很相似,比如 Content type
、Content encoding
。
AMQP 的 消息除属性外,还含有一个消息体,即消息实际携带的数据,它对AMQP代理不透明。broker 不会检查或修改消息体,但是消息可以只包含属性而不携带消息体。
消息分发机制
主要有三种分发机制:轮训分发、不公平分发、预值分发
-
轮训分发
RabbitMQ默认采用的轮训分发,当消费者有多个,且处理速度不相等(例如一个快一个慢)的时候不适用
-
不公平分发
通过设置参数 channel.basicQos(1)
实现不公平分发策略,能者多劳
-
预值分发
当消息被消费者接收后,但是没有确认,此时这里就存在一个未确认的消息缓冲区,用于存储非被确认的消息,该缓存区的大小是没有限制的。
通过使用basic.qos
方法设置“预取计数”值定义通道上允许的未确认消息的最大数量
说说消息pull模式
pull模式主要是通过channel.basicGet方法来获取消息
// 从消息队列中获取消息
GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(response.getBody()));
// ACK应答
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
怎么设置消息的过期时间?
过期时间就是TTL,TTL 全称 Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
设置过期时间有两个办法:
-
在生产端发送消息时,给消息设置过期时间,单位毫秒(ms)
// 设置"tyson"消息时间为3000毫秒
Message msg = new Message("tyson".getBytes(), mp);
msg.getMessageProperties().setExpiration("3000");
-
在消息队列创建队列时,指定队列的TTL,从消息入队列开始计算,超过该时间的消息将会被移除。
消息丢失
如何确保消息不丢失?
消息丢失场景:生产者生产消息到RabbitMQ Server消息丢失、RabbitMQ Server存储的消息丢失和RabbitMQ Server到消费者消息丢失。
消息丢失从三个方面来解决:生产者确认机制、消费者手动确认消息和持久化。
👉 生产者确认机制
生产者发送消息到队列,无法确保发送的消息成功的到达server。
解决方法:
-
事务机制。在一条消息发送之后会使发送端阻塞,等待RabbitMQ的回应,之后才能继续发送下一条消息。性能差。
-
开启生产者确认机制(confirm),只要消息成功发送到交换机之后,RabbitMQ就会发送一个ack给生产者(即使消息没有Queue接收,也会发送ack)。如果消息没有成功发送到交换机,就会发送一条nack消息,提示发送失败。异步,性能好。
👉 路由不可达消息
生产者确认机制只确保消息正确到达交换机,对于从交换机路由到Queue失败的消息,会被丢弃掉,导致消息丢失。
对于不可路由的消息,有两种处理方式:Return消息机制和备份交换机。
-
Return消息机制,提供了回调函数 ReturnCallback,当消息从交换机路由到Queue失败才会回调这个方法
-
备份交换机,alternate-exchange 是一个普通的exchange,当你发送消息到对应的exchange时,没有匹配到queue,就会自动转移到备份交换机对应的queue,这样消息就不会丢失。
👉 消费者手动消息确认
有可能消费者收到消息还没来得及处理MQ服务就宕机了,导致消息丢失。因为消息者默认采用自动ack,一旦消费者收到消息后会通知MQ Server这条消息已经处理好了,MQ 就会移除这条消息。
解决方法:
-
消费者设置为手动确认消息,即消费者处理完逻辑之后再给broker回复ack,表示消息已经成功消费,可以从broker中删除。
-
当消息者消费失败的时候,给broker回复nack,根据配置决定重新入队还是从broker移除,或者进入死信队列。只要没收到消费者的 ack,broker 就会一直保存着这条消息,但不会 requeue,也不会分配给其他 消费者。
👉 持久化
如果RabbitMQ服务异常导致重启,将会导致消息丢失。
RabbitMQ 提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ,消息也不会丢失。
消息持久化需要满足以下条件
-
消息设置持久化。发布消息前,设置投递模式
delivery mode
为2,表示消息需要持久化。 -
Queue队列设置持久化。
-
交换机设置持久化。
持久化消息消费和恢复流程
-
当发布一条消息到交换机上时,RabbitMQ 会先把消息写入持久化日志文件,然后才向生产者发送响应。
-
一旦消费者从持久队列中消费了一条持久化消息并且做了确认,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集,从而移除这条消息。
-
如果持久化消息在被消费之前RabbitMQ重启,服务器会自动重建交换机和队列(以及绑定),并重新加载持久化日志中的消息到相应的队列或者交换机上,保证消息不会丢失。
什么是死信队列?
消费失败的消息存放的队列。
消息消费失败的原因:
-
消息被拒绝并且消息没有重新入队(requeue=false)
-
消息超时未消费
-
达到最大队列长度
当普通队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的死信交换机去,然后被路由到死信队列。可以监听死信队列中的消息做相应的处理。
Rabbitmq如何实现延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
-
下单后,30分钟未支付,取消订单,回滚库存。
-
新用户注册成功7天后,发送短信问候。
在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
重复消费
消息重复消费怎么处理?
消息重复的原因有两个:1.生产时消息重复,2.消费时消息重复。
-
生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,这时候生产者就会重新发送这条消息,导致MQ会接收到重复消息。
-
消费者消费成功后,给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息不丢失,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。
由于重复消息是由于网络原因造成的,无法避免。
解决方法
发送消息时让每个消息携带一个全局的唯一ID,在消费消息时先判断消息是否已经被消费过,保证消息消费逻辑的幂等性。
例如引入数据库或者redis:
-
数据库唯一主键去重:主键是不能冲突的,重复的数据无法插入
-
引入Redis解决重复消费问题
-
利用Redis,首先系统生成全局唯一的 id,用set操作放入Redis中
-
如订单信息id,消费后存储在Redis中,如果下次再来,先查看Redis中是否存在
-
如果存在,即此消息已经被消费过(后续不做消费处理)
-
如果不存在,即未消费,此时再将此id存入Redis中,进行后续的逻辑操作
-
综上,消费过程为:
-
消费者获取到消息后先根据id去查询 redis/db 是否存在该消息
-
如果不存在,则正常消费,消费完毕后写入redis/db
-
如果存在,则证明消息被消费过,直接丢弃
消息积压
如何解决消息积压的问题
导致消息积压突然增加,最粗粒度的原因,只有两种:
-
要么是发送变快了
-
要么是消费变慢了。
要解决积压的问题:
-
可以通过 扩容消费端的实例数来提升总体的消费能力。
-
如果短时间内没有足够的服务器资源进行扩容,那么就将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。
如何处理消息堆积情况?几千万条数据在MQ里积压了七八个小时
https://www.cnblogs.com/mengchunchen/p/10025139.html
几千万条数据在MQ里积压了七八个小时,最简单的方法可以让他恢复消费速度,然后等待几个小时消费完毕。
一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:
先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉
新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量
然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue
接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据
这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据
等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息
由于消息积压导致过期被清理了怎么办
https://www.cnblogs.com/mengchunchen/p/10025139.html
rabbitmq是可以设置过期时间的,就是TTL,如果消息在queue中积压超过一定的时间就会被rabbitmq给清理掉,这个数据就没了。
不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。
我们可以采取一个方案,就是批量重导。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来。
假设1万个订单积压在mq里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次
消费端怎么进行限流?
当 RabbitMQ 服务器积压大量消息时,队列里的消息会大量涌入消费端,可能导致消费端服务器崩溃。这种情况下需要对消费端限流。
Spring RabbitMQ 提供参数 prefetch 可以设置单个请求处理的消息个数。如果消费者同时处理的消息到达最大值的时候,则该消费者会阻塞,不会消费新的消息,直到有消息 ack 才会消费新的消息 开启消费端限流:
##在单个请求中处理的消息个数,unack的最大数量
spring.rabbitmq.listener.simple.prefetch=2
原生 RabbitMQ 还提供 prefetchSize 和 global 两个参数。Spring RabbitMQ没有这两个参数。
// 单条消息大小限制,0代表不限制
// global:限制限流功能是channel级别的还是consumer级别。
// 当设置为false,consumer级别,限流功能生效
// 设置为true没有了限流功能,因为channel级别尚未实现。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
可靠性
MQ 消息可靠性怎么保证?
使用一个消息队列,其实就分为三大块:生产者、中间件、消费者
要保证消息,就是保证三个环节都不能丢失数据
消息丢失从三个方面来解决:生产者确认机制、消费者手动确认消息和消息持久化
-
消息生产阶段:生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。
-
消息存储阶段:RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,也就是有多个副本,这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。
-
消息消费阶段:消费者接收消息+消息处理之后,才回复 ack 的话,那么消息阶段的消息不会丢失。不能收到消息就回 ack,否则可能消息处理中途挂掉了,消息就丢失了。
如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?
-
发送方确认模式(confirm)
-
将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。
-
一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。
-
如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
-
接收方确认机制
-
消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。
-
这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的最终一致性
-
下面罗列几种特殊情况:
-
如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
-
如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。
-
如何保证消息的有序性?
-
一对一
-
拆分queue,使得一个queue只对应一个消费者
-
由于MQ一般都能保证内部队列是先进先出的,所以把需要保持先后顺序的一组消息使用某种算法都分配到同一个消息队列中。然后只用一个消费者单线程去消费该队列,这样就能保证消费者是按照顺序进行消费的了
-
但是消费者的吞吐量会出现瓶颈。如果多个消费者同时消费一个队列,还是可能会出现顺序错乱的情况,这就相当于是多线程消费了
-
重试机制
-
对于多线程的消费同一个队列的情况,可以使用重试机制;
-
比如有一个微博业务场景的操作,发微博、写评论、删除微博,这三个异步操作,如果一个消费者先执行了写评论的操作,但是这时微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行发微博的操作后,再执行,就可以成功
分布式
镜像队列
当MQ发生故障时,会导致服务不可用。
引入RabbitMQ的镜像队列机制,将queue镜像到集群中其他的节点之上。
如果集群中的一个节点失效了,能自动地切换到镜像中的另一个节点以保证服务的可用性。
通常每一个镜像队列都包含一个master和多个slave,分别对应于不同的节点。
发送到镜像队列的所有消息总是被直接发送到master和所有的slave之上。
除了publish外所有动作都只会向master发送,然后由master将命令执行的结果广播给slave,从镜像队列中的消费操作实际上是在master上执行的。
RabbitMQ的集群
镜像集群模式
你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。
集群的优势劣势
优势:
-
任何一个机器宕机了,没事儿,别的机器都可以用。
劣势:
-
性能开销大,消息同步所有机器,导致网络带宽压力和消耗很重
-
没有扩展性,如果某个Queue负载很重,加机器,新增的机器也包含了这个Queue的所有数据,并没有办法线性扩展Queue