[面试题]RabbitMQ

  1. [面试题]Java【基础】
  2. [面试题]Java【虚拟机】
  3. [面试题]Java【并发】
  4. [面试题]Java【集合】
  5. [面试题]MySQL
  6. [面试题]Maven
  7. [面试题]Spring Boot
  8. [面试题]Spring Cloud
  9. [面试题]Spring MVC
  10. [面试题]Spring
  11. [面试题]MyBatis
  12. [面试题]Nginx
  13. [面试题]缓存
  14. [面试题]Redis
  15. [面试题]消息队列
  16. [面试题]Kafka
  17. [面试题]RabbitMQ

RabbitMQ 是什么?

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  • 1、可靠性(Reliability)

RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

  • 2、灵活的路由(Flexible Routing)

在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

  • 3、消息集群(Clustering)

多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

  • 4、高可用(Highly Available Queues)

队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

  • 5、多种协议(Multi-protocol)

RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

  • 6、多语言客户端(Many Clients)

RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

  • 7、管理界面(Management UI)

RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

  • 8、跟踪机制(Tracing)

如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

  • 9、插件机制(Plugin System)

RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

更详细的,推荐阅读 《消息队列之 RabbitMQ》 。它提供了:

  • 1、RabbitMQ 的介绍
  • 2、RabbitMQ 的概念
  • 3、RabbitMQ 的 Server 安装
  • 4、RabbitMQ 的 Java Client 使用示例
  • 5、RabbitMQ 的集群

RabbitMQ 中的 Broker 是指什么?Cluster 又是指什么?

  • Broker ,是指一个或多个 erlang node 的逻辑分组,且 node 上运行着 RabbitMQ 应用程序。
  • Cluster ,是在 Broker 的基础之上,增加了 node 之间共享元数据的约束。

vhost 是什么?起什么作用?

vhost 可以理解为虚拟 Broker ,即 mini-RabbitMQ server 。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。

这个,和 Tomcat、Nginx、Apache 的 vhost 是一样的概念。

什么是元数据?元数据分为哪些类型?包括哪些内容?

在非 Cluster 模式下,元数据主要分为:

  • Queue 元数据(queue 名字和属性等)
  • Exchange 元数据(exchange 名字、类型和属性等)
  • Binding 元数据(存放路由关系的查找表)
  • Vhost 元数据(vhost 范围内针对前三者的名字空间约束和安全属性设置)。

与 Cluster 相关的元数据有哪些?元数据是如何保存的?元数据在 Cluster 中是如何分布的?

在 Cluster 模式下,还包括 Cluster 中 node 位置信息和 node 关系信息。

元数据根据 erlang node 的类型确定是仅保存于 RAM 中,还是同时保存在 RAM 和 disk 上。元数据在 Cluster 中是全 node 分布的。

下图所示为 queue 的元数据在单 node 和 cluster 两种模式下的分布图:
在这里插入图片描述

RAM node 和 Disk node 的区别?

  • RAM node 仅将 fabric(即 queue、exchange 和 binding等 RabbitMQ基础构件)相关元数据保存到内存中,但 Disk node 会在内存和磁盘中均进行存储。
  • RAM node 上唯一会存储到磁盘上的元数据是 Cluster 中使用的 Disk node 的地址。并且要求在 RabbitMQ Cluster 中至少存在一个 Disk node 。

RabbitMQ 概念里的 channel、exchange 和 queue 是什么?

  • queue 具有自己的 erlang 进程;
  • exchange 内部实现为保存 binding 关系的查找表;
  • channel 是实际进行路由工作的实体,即负责按照 routing_key 将 message 投递给 queue 。

由 AMQP 协议描述可知,channel 是真实 TCP 连接之上的虚拟连接,所有 AMQP 命令都是通过 channel 发送的,且每一个 channel 有唯一的 ID 。

  • 一个 channel 只能被单独一个操作系统线程使用,故投递到特定 channel 上的 message 是有顺序的。但一个操作系统线程上允许使用多个 channel 。

  • channel 号为 0 的 channel 用于处理所有对于当前 connection 全局有效的帧,而 1-65535 号 channel 用于处理和特定 channel 相关的帧。

  • AMQP 协议给出的 channel 复用模型如下:channel 复用模型

    • 其中每一个 channel 运行在一个独立的线程上,多线程共享同一个 socket 。

消息基于什么传输?

由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ 使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP连接上的信道数量没有限制。

RabbitMQ 上的一个 queue 中存放的 message 是否有数量限制?

可以认为是无限制,因为限制取决于机器的内存,但是消息过多会导致处理效率的下降。

下面的几个问题,Cluster 相关。

在单 node 系统和多 node 构成的 cluster 系统中声明 queue、exchange ,以及进行 binding 会有什么不同?

  • 当你在单 node 上声明 queue 时,只要该 node 上相关元数据进行了变更,你就会得到 Queue.Declare-ok 回应;而在 cluster 上声明 queue ,则要求 cluster 上的全部 node 都要进行元数据成功更新,才会得到 Queue.Declare-ok 回应。
  • 另外,若 node 类型为 RAM node 则变更的数据仅保存在内存中,若类型为 Disk node 则还要变更保存在磁盘上的数据。

客户端连接到 Cluster 中的任意 node 上是否都能正常工作?

是的。客户端感觉不到有何不同。

若 Cluster 中拥有某个 queue 的 owner node 失效了,且该 queue 被声明具有 durable 属性,是否能够成功从其他 node 上重新声明该 queue ?

  • 不能,在这种情况下,将得到 404 NOT_FOUND 错误。只能等 queue 所属的 node 恢复后才能使用该 queue 。
  • 但若该 queue 本身不具有 durable 属性,则可在其他 node 上重新声明。

Cluster 中 node 的失效会对 consumer 产生什么影响?若是在 cluster 中创建了 mirrored queue ,这时 node 失效会对 consumer 产生什么影响?

  • 若是 consumer 所连接的那个 node 失效(无论该 node 是否为 consumer 所订阅 queue 的 owner node),则 consumer 会在发现 TCP 连接断开时,按标准行为执行重连逻辑,并根据 “Assume Nothing” 原则重建相应的 fabric 即可。
  • 若是失效的 node 为 consumer 订阅 queue 的 owner node,则 consumer 只能通过 Consumer Cancellation Notification 机制来检测与该 queue 订阅关系的终止,否则会出现傻等却没有任何消息来到的问题。

Consumer Cancellation Notification 机制用于什么场景?

用于保证当镜像 queue 中 master 挂掉时,连接到 slave 上的 consumer 可以收到自身 consume 被取消的通知,进而可以重新执行 consume 动作从新选出的 master 出获得消息。

若不采用该机制,连接到 slave 上的 consumer 将不会感知 master 挂掉这个事情,导致后续无法再收到新 master 广播出来的 message 。

另外,因为在镜像 queue 模式下,存在将 message 进行 requeue 的可能,所以实现 consumer 的逻辑时需要能够正确处理出现重复 message 的情况。

能够在地理上分开的不同数据中心使用 RabbitMQ cluster 么?

不能。

  • 第一,你无法控制所创建的 queue 实际分布在 cluster 里的哪个 node 上(一般使用 HAProxy + cluster 模型时都是这样),这可能会导致各种跨地域访问时的常见问题。
  • 第二,Erlang 的 OTP 通信框架对延迟的容忍度有限,这可能会触发各种超时,导致业务疲于处理。
  • 第三,在广域网上的连接失效问题将导致经典的“脑裂”问题,而 RabbitMQ 目前无法处理。(该问题主要是说 Mnesia)

如何确保消息正确地发送至 RabbitMQ?

RabbitMQ 使用发送方确认模式,确保消息正确地发送到 RabbitMQ。

  • 发送方确认模式:将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID 。一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(not acknowledged,未确认)消息。
  • 发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

具体的代码实现,可以看看 《芋道 Spring Boot 消息队列 RabbitMQ 入门》的「14. 生产者的发送确认」 小节

向不存在的 exchange 发 publish 消息会发生什么?向不存在的 queue 执行 consume 动作会发生什么?

都会收到 Channel.Close 信令告之不存在(内含原因 404 NOT_FOUND)。

什么情况下会出现 blackholed 问题?

blackholed ,对应中文为“黑洞”。

blackholed 问题是指,向 exchange 投递了 message ,而由于各种原因导致该 message 丢失,但发送者却不知道。可导致 blackholed 的情况:

  • 1、向未绑定 queue 的 exchange 发送 message 。
  • 2、exchange 以 binding_key key_A 绑定了 queue queue_A,但向该 exchange 发送 message 使用的 routing_key 却是 key_B 。

如何防止出现 blackholed 问题?

没有特别好的办法,只能在具体实践中通过各种方式保证相关 fabric 的存在。另外,如果在执行 Basic.Publish 时设置 mandatory=true ,则在遇到可能出现 blackholed 情况时,服务器会通过返回 Basic.Return 告之当前 message 无法被正确投递(内含原因 312 NO_ROUTE)。

具体的代码实现,可以看看 《芋道 Spring Boot 消息队列 RabbitMQ 入门》的「14.3 ReturnCallback」 小节

routing_key 和 binding_key 的最大长度是多少?

255 字节。

消息怎么路由?

从概念上来说,消息路由必须有三部分:交换器、路由、绑定。

  • 生产者把消息发布到交换器上;
  • 绑定

决定了消息如何从路由器路由到特定的队列;

如果一个路由绑定了两个队列,那么发送给该路由时,这两个队列都会增加一条消息。

  • 消息最终到达

队列,并被消费者接收。

详细来说,就是:

  • 消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。
  • 通过队列路由键,可以把队列绑定到交换器上。
  • 消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)。如果能够匹配到队列,则消息会投递到相应队列中;如果不能匹配到任何队列,消息将进入 “黑洞”。

常用的交换器主要分为一下三种:

  • direct:如果路由键完全匹配,消息就被投递到相应的队列。
  • fanout:如果交换器收到消息,将会广播到所有绑定的队列上。
  • topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符,比如:“*” 匹配特定位置的任意文本, “.” 把路由键分为了几部分,“#” 匹配所有规则等。特别注意:发往 topic 交换器的消息不能随意的设置选择键(routing_key),必须是由 “.” 隔开的一系列的标识符组成。

具体的代码实现,可以看看 《芋道 Spring Boot 消息队列 RabbitMQ 入门》的「3. 快速入门」 小节

如何确保消息接收方消费了消息?

RabbitMQ 使用接收方消息确认机制,确保消息接收方消费了消息。

  • 接收方消息确认机制:消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。

这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。

下面罗列几种特殊情况:

  • 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要根据 bizId 去重)
  • 如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。

具体的代码实现,可以看看 《芋道 Spring Boot 消息队列 RabbitMQ 入门》的「13. 消费者的消息确认」 小节。

如何避免消息重复投递或重复消费?

在消息生产时,MQ 内部针对每条生产者发送的消息生成一个 inner-msg-id ,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列

在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付 ID、订单 ID、帖子 ID 等)作为去重和幂等的依据,避免同一条消息被重复消费。

消息如何分发?

若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。

RabbitMQ 有几种消费模式?

RabbitMQ 有 pull 和 push 两种消费模式。具体的使用,可参见 《RabbitMQ 之 Consumer 消费模式(Push & Pull)》 文章。

为什么不应该对所有的 message 都使用持久化机制?

首先,必然导致性能的下降,因为写磁盘比写 RAM 慢的多,message 的吞吐量可能有 10 倍的差距。

其次,message 的持久化机制用在 RabbitMQ 的内置 Cluster 方案时会出现“坑爹”问题。矛盾点在于:

  • 若 message 设置了 persistent 属性,但 queue 未设置 durable 属性,那么当该 queue 的 owner node 出现异常后,在未重建该 queue 前,发往该 queue 的 message 将被 blackholed 。
  • 若 message 设置了 persistent 属性,同时 queue 也设置了 durable 属性,那么当 queue 的 owner node 异常且无法重启的情况下,则该 queue 无法在其他 node 上重建,只能等待其 owner node 重启后,才能恢复该 queue 的使用,而在这段时间内发送给该 queue 的 message 将被 blackholed 。

所以,是否要对 message 进行持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到 100,000 条/秒以上的消息吞吐量(单 RabbitMQ 服务器),则要么使用其他的方式来确保 message 的可靠 delivery ,要么使用非常快速的存储系统以支持全持久化(例如使用 SSD)。另外一种处理原则是:仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶颈。

RabbitMQ 允许发送的 message 最大可达多大?

根据 AMQP 协议规定,消息体的大小由 64-bit 的值来指定,所以你就可以知道到底能发多大的数据了。

为什么说保证 message 被可靠持久化的条件是 queue 和 exchange 具有 durable 属性,同时 message 具有 persistent 属性才行?

binding 关系可以表示为 exchange–binding–queue 。从文档中我们知道,若要求投递的 message 能够不丢失,要求 message 本身设置 persistent 属性,同时要求 exchange 和 queue 都设置 durable 属性。

  • 其实这问题可以这么想,若 exchange 或 queue 未设置 durable 属性,则在其 crash 之后就会无法恢复,那么即使 message 设置了 persistent 属性,仍然存在 message 虽然能恢复但却无处容身的问题。
  • 同理,若 message 本身未设置 persistent 属性,则 message 的持久化更无从谈起。

如何确保消息不丢失?

消息持久化的前提是:将交换器/队列的 durable 属性设置为 true ,表示交换器/队列是持久交换器/队列,在服务器崩溃或重启之后不需要重新创建交换器/队列(交换器/队列会自动创建)。

如果消息想要从 RabbitMQ 崩溃中恢复,那么消息必须:

  • 在消息发布前,通过把它的 “投递模式” 选项设置为2(持久)来把消息标记成持久化
  • 将消息发送到持久交换器
  • 消息到达持久队列

RabbitMQ 确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件。

  • 当发布一条持久性消息到持久交换器上时,RabbitMQ 会在消息提交到日志文件后才发送响应(如果消息路由到了非持久队列,它会自动从持久化日志中移除)。
  • 一旦消费者从持久队列中消费了一条持久化消息,RabbitMQ 会在持久化日志中把这条消息标记为等待垃圾收集。如果持久化消息在被消费之前 RabbitMQ 重启,那么 RabbitMQ 会自动重建交换器和队列(以及绑定),并重播持久化日志文件中的消息到合适的队列或者交换器上。

什么是死信队列?

DLX,Dead-Letter-Exchange。利用 DLX ,当消息在一个队列中变成死信(dead message)之后,它能被重新 publish 到另一个 Exchange ,这个 Exchange 就是DLX。消息变成死信一向有一下几种情况:

  • 消息被拒绝(basic.reject / basic.nack)并且 requeue=false 。
  • 消息 TTL 过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间))。
  • 队列达到最大长度。

详细的,可以看看 《RabbitMQ 之死信队列》 文章。

“dead letter”queue 的用途?

当消息被 RabbitMQ server 投递到 consumer 后,但 consumer 却通过 Basic.Reject 进行了拒绝时(同时设置 requeue=false),那么该消息会被放入 “dead letter” queue 中。该 queue 可用于排查 message 被 reject 或 undeliver 的原因。

Basic.Reject 的用法是什么?

该信令可用于 consumer 对收到的 message 进行 reject 。

  • 若在该信令中设置 requeue=true ,则当 RabbitMQ server 收到该拒绝信令后,会将该 message 重新发送到下一个处于 consume 状态的 consumer 处(理论上仍可能将该消息发送给当前 consumer)。
  • 若设置 requeue=false ,则 RabbitMQ server 在收到拒绝信令后,将直接将该 message 从 queue 中移除。

另外一种移除 queue 中 message 的小技巧是,consumer 回复 Basic.Ack 但不对获取到的 message 做任何处理。而 Basic.Nack是对 Basic.Reject 的扩展,以支持一次拒绝多条 message 的能力。

RabbitMQ 中的 cluster、mirrored queue,以及 warrens 机制分别用于解决什么问题?

1)cluster

  • cluster 是为了解决当 cluster 中的任意 node 失效后,producer 和 consumer 均可以通过其他 node 继续工作,即提高了可用性;另外可以通过增加 node 数量增加 cluster 的消息吞吐量的目的。
  • cluster 本身不负责 message 的可靠性问题(该问题由 producer 通过各种机制自行解决);cluster 无法解决跨数据中心的问题(即脑裂问题)。
  • 另外,在cluster 前使用 HAProxy 可以解决 node 的选择问题,即业务无需知道 cluster 中多个 node 的 ip 地址。可以利用 HAProxy 进行失效 node 的探测,可以作负载均衡。下图为 HAProxy + cluster 的模型:
    在这里插入图片描述

2)Mirrored queue

  • Mirrored queue 是为了解决使用 cluster 时所创建的 queue 的完整信息仅存在于单一 node 上的问题,从另一个角度增加可用性。
  • 若想正确使用该功能,需要保证:1)consumer 需要支持 Consumer Cancellation Notification 机制;2)consumer 必须能够正确处理重复 message 。

3)Warrens

Warrens 是为了解决 cluster 中 message 可能被 blackholed 的问题,即不能接受 producer 不停 republish message 但 RabbitMQ server 无回应的情况。

Warrens 有两种构成方式:

  • 一种模型,是两台独立的 RabbitMQ server + HAProxy ,其中两个 server 的状态分别为 active 和 hot-standby 。该模型的特点为:两台 server 之间无任何数据共享和协议交互,两台 server 可以基于不同的 RabbitMQ 版本。如下图所示:
    在这里插入图片描述

  • 另一种模型,为两台共享存储的 RabbitMQ server + keepalived ,其中两个 server 的状态分别为 active 和 cold-standby 。该模型的特点为:两台 server 基于共享存储可以做到完全恢复,要求必须基于完全相同的 RabbitMQ 版本。如下图所示:
    在这里插入图片描述

Warrens 模型存在的问题:

  • 对于第一种模型,虽然理论上讲不会丢失消息,但若在该模型上使用持久化机制,就会出现这样一种情况,即若作为 active 的 server 异常后,持久化在该 server 上的消息将暂时无法被 consume ,因为此时该 queue 将无法在作为 hot-standby 的 server 上被重建,所以,只能等到异常的 active server 恢复后,才能从其上的 queue 中获取相应的 message 进行处理。而对于业务来说,需要具有:a.感知 AMQP 连接断开后重建各种 fabric 的能力;b.感知 active server 恢复的能力;c.切换回 active server 的时机控制,以及切回后,针对 message 先后顺序产生的变化进行处理的能力。
  • 对于第二种模型,因为是基于共享存储的模式,所以导致 active server 异常的条件,可能同样会导致 cold-standby server 异常;另外,在该模型下,要求 active 和 cold-standby 的 server 必须具有相同的 node 名和 UID ,否则将产生访问权限问题;最后,由于该模型是冷备方案,故无法保证 cold-standby server 能在你要求的时限内成功启动。

RabbitMQ 如何实现高可用?

这个问题,和 「RabbitMQ 中的 cluster、mirrored queue,以及 warrens 机制分别用于解决什么问题?」 会比较类似。

RabbitMQ 的高可用,是基于主从做高可用性的。它有三种模式:

  • 单机模式
  • 普通集群模式
  • 镜像集群模式

1)单机模式

单机模式,就是启动单个 RabbitMQ 节点,一般用于本地开发或者测试环境。实际生产环境下,基本不会使用。

普通集群模式(无高可用性)

这种方式,就是上面问题的 cluster 。

普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。

  • 你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。
  • 你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。

这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。

而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。

所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

镜像集群模式(高可用性)

艿艿:请教了下胖友,他们采用这种方式。这种方式,就是上面问题的 Mirrored queue 。

这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。

那么如何开启这个镜像集群模式呢?其实很简单,RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。你想,如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢?

如何使用 RabbitMQ 实现 RPC

基于 RabbitMQ reply_to 特性,可以很轻易使用 RabbitMQ 实现 RPC 功能。

具体的代码实现,可以看看 《芋道 Spring Boot 消息队列 RabbitMQ 入门》的「15. RPC 远程调用」小节。

使用 RabbitMQ 实现 RPC 有什么好处?

  • 1、将客户端和服务器解耦:客户端只是发布一个请求到 MQ 并消费这个请求的响应。并不关心具体由谁来处理这个请求,MQ 另一端的请求的消费者可以随意替换成任何可以处理请求的服务器,并不影响到客户端。

相当于 RPC 的注册发现功能,交给 RabbitMQ 来实现了。

  • 2、减轻服务器的压力:传统的 RPC 模式中如果客户端和请求过多,服务器的压力会过大。由 MQ 作为中间件的话,过多的请求而是被 MQ 消化掉,服务器可以控制消费请求的频次,并不会影响到服务器。
  • 3、服务器的横向扩展更加容易:如果服务器的处理能力不能满足请求的频次,只需要增加服务器来消费 MQ 的消息即可,MQ会帮我们实现消息消费的负载均衡。
  • 4、可以看出 RabbitMQ 对于 RPC 模式的支持也是比较友好地,amq.rabbitmq.reply-to, reply_to, correlation_id 这些特性都说明了这一点,再加上 spring-rabbit 的实现,可以让我们很简单的使用消息队列模式的 RPC 调用。

例如说:rabbitmq-jsonrpc 的实现。

当然,虽然有这些优点,实际场景下,我们并不会这么做。

为什么 heavy RPC 的使用场景下不建议采用 disk node ?

heavy RPC 是指在业务逻辑中高频调用 RabbitMQ 提供的 RPC 机制,导致不断创建、销毁 reply queue ,进而造成 disk node 的性能问题(因为会针对元数据不断写盘)。所以在使用 RPC 机制时需要考虑自身的业务场景,一般来说不建议。

RabbitMQ 是否会弄丢数据?

艿艿:这个问题,基本是我们前面看到的几个问题的总结合并。

弄丢消息的几种情况

生产者弄丢了数据?

生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

方案一:事务功能

🚀 此时可以选择用 RabbitMQ 提供的【事务功能】,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。代码如下:

// 开启事务 
channel.txSelect try { 
// 这里发送消息 
} catch (Exception e) {
 channel.txRollback 
// 这里再次重发这条消息 
} 
// 提交事务 
channel.txCommit
  • 但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能。

具体的代码实现,可以看看 《芋道 Spring Boot 消息队列 RabbitMQ 入门》的「12. 事务消息」小节。

方案二:confirm 功能。

🚀 所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启 【confirm 模式】,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

具体的代码实现,可以看看 《芋道 Spring Boot 消息队列 RabbitMQ 入门》的「14. 生产者的发送确认」小节。

对比总结

🚀 事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

不过 confirm 功能,也可能存在丢消息的情况。举个例子,如果回调到 nack 接口,此时 JVM 挂掉了,那么此消息就丢失了。(这个是艿艿的猜想,还在找胖友探讨中。关于这块,欢迎星球讨论。)

Broker 弄丢了数据

就是 Broker 自己弄丢了数据,这个你必须开启 Broker 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 Broker 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,Broker 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

设置持久化有两个步骤:

  • 创建 queue 的时候将其设置为持久化
  • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2

必须要同时设置这两个持久化才行,Broker 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

注意,哪怕是你给 Broker 开启了持久化机制,也有一种可能,就是这个消息写到了 Broker 中,但是还没来得及持久化到磁盘上,结果不巧,此时 Broker 挂了,就会导致内存里的一点点数据丢失。

所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,Broker 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。

消费端弄丢了数据?

RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。

这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

RabbitMQ 如何保证消息的顺序性?

和 Kafka 与 RocketMQ 不同,Kafka 不存在类似类似 Topic 的概念,而是真正的一条一条队列,并且每个队列可以被多个 Consumer 拉取消息。这个,是非常大的一个差异。

🚀 来看看 RabbitMQ 顺序错乱的场景:

一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者 2 先执行完操作,把 data2 存入数据库,然后是 data1/data3。这不明显乱了。 也就是说,乱序消费的问题。

乱序

🚀 解决方案:

  • 方案一,拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点。

这个方式,有点模仿 Kafka 和 RocketMQ 中 Topic 的概念。例如说,原先一个 queue 叫 “xxx” ,那么多个 queue ,我们可以叫 “xxx-01”、“xxx-02” 等,相同前缀,不同后缀。

  • 方案二,或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

这种方式,就是讲一个 queue 里的,相同的“key” 交给同一个 worker 来执行。因为 RabbitMQ 是可以单条消息来 ack ,所以还是比较方便的。这一点,也是和 RocketMQ 和 Kafka 不同的地方。

解决乱序

实际上,我们会发现上述的两个方案,前提都是一个 queue 只能启动一个 consumer 对应。

具体的代码实现,可以看看 《芋道 Spring Boot 消息队列 RabbitMQ 入门》的「11. 顺序消息」小节

参考与推荐如下文章:

  • 《RabbitMQ 面试专题》
  • 《如何保证消息队列的高可用?》
  • 《RabbitMQ 面试要点》
  • 《如何保证消息的可靠性传输?(如何处理消息丢失的问题)》

欢迎关注我们人工智能在新媒体领域应用的公众号。
nicehoe好锄头

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/727046.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

科普文章:怎么远程监控电脑屏幕?三种监控电脑屏幕的方法

远程监控公司电脑屏幕是一项重要的管理手段,它不仅有助于提升工作效率,还能确保公司信息安全和合规性。随着远程办公的普及,这一需求变得日益重要。下面我将详细介绍几种实现远程监控公司电脑屏幕的方法,以及实施过程中需要注意的…

网络安全 DVWA通关指南 SQL Injection(SQL注入)

DVWA SQL Injection 文章目录 DVWA SQL InjectionLowMediumHighImpossible SQL注入漏洞基本原理 Web应用程序对用户输入的数据校验处理不严或者根本没有校验,致使用户可以拼接执行SQL命令。 可能导致数据泄露或数据破坏,缺乏可审计性,甚至导致…

机器学习案例|使用机器学习轻松预测信用卡坏账风险,极大程度降低损失

01、案例说明 对于模型的参数,除了使用系统的设定值之外,可以进行再进一步的优化而得到更好的结果。RM提供了几种参数优化的方法,能够让整体模型的效率提高。而其使用的概念,仍然是使用计算机强大的计算能力,对于不同…

01 Shell 编程规范与变量

目录 1.1 Shell脚本概述 1.1.1 Shell的作用 1.1.2 编写第一个Shell脚本 1.1.3 重定向与管道操作 1. 重定向操作 1. 重定向输出 2. 重定向输入 3. 错误重定向 2. 管代操作 1.2 Shell变量的作用、类型 1.2.1 自定义变量 1. 定义新的变量 2. 查看和引用变量的值 3. 变量赋值的特…

Django使用django-apscheduler实现定时任务

定时任务可以在后台定时执行指定的代码,避免了很多人为操作。下面是在Django项目中如何使用定时任务的具体操作流程。 我在这里使用的 django-apscheduler库来实现定时任务。 一、安装 django-apscheduler pip install django-apscheduler二、在项目的setting.py…

java.io.eofexception:ssl peer shut down incorrectly

可能是因为 1)https设置 2)超时设置 FeignConfig.java package zwf.service;import java.io.IOException; import java.io.InputStream; import java.security.KeyStore;import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory;import org.apac…

PXE高效批量网络装机(补充) 实验部分

然后把防火墙、安全机制全都给关闭掉,不要让它们干扰后续的实验: 然后安装那几个需要用到的软件包: 如果重启了系统vsftpd是不能自动启动起来的,如果想让该服务每次开机都自动的启动起来,可以执行下图中的命令&#xf…

关系数据理论

什么是关系数据理论:用来评判数据库逻辑设计“好坏程度”的标准;二是如果逻辑设计中存在“不好”的关系模式,如何将其修改为“好”的关系模式。 函数依赖:举个例子:学生表中,一个学生的学生号确定了,学生的…

Arduino平台软硬件原理及使用——无源蜂鸣器模块的使用

文章目录 一、蜂鸣器发声原理 二、无源蜂鸣器与有源蜂鸣器的区分 三、无源蜂鸣器模块在Arduino中的使用 一、蜂鸣器发声原理 上图为常见的不同封装及规格的蜂鸣器。 同蜜蜂、知了等昆虫发声原理一样,蜂鸣器同样靠振动来发出声音; 如上图为无源蜂鸣器的内…

whiteboard - 笔记

1 drawio draw.io GitHub - jgraph/drawio: draw.io is a JavaScript, client-side editor for general diagramming. 2 demo 可以将XML数据保存到服务器上的data目录。需要在服务器端创建一个接收和处理POST请求的脚本,该脚本将接收到的SVG数据保存到指定的文件中。下面是…

subversion

subversion Install # CentOS安装Subversion yum install subversion mkdir /var/svn/ systemctl restart svnserve# Docker安装Subversion(参考:https://github.com/garethflowers/docker-svn-server) docker run \--name my-svn-server \…

《C++ Primer》导学系列:第 6 章 - 函数

6.1 函数基础 6.1.1 基本概念 函数是C程序的基本组成单元,用于将代码组织成可以复用的模块。函数通过函数名进行调用,并且可以接受参数和返回值。函数的定义包括函数头和函数体,其中函数头描述了函数的接口,函数体包含了具体的实…

RabbitMQ 开发指南

连接RabbitMQ 连接方式一: 也可以选择使用URI的方式来实现 连接方式二: Connection接口被用来创建一个Channel,在创建之后,Channel可以用来发送或者接收消息。 Channel channel conn.createChannel();使用交换器和队列 声明…

基于Java的留守儿童爱心网站

你好呀,我是计算机学姐码农小野!如果有相关需求,可以私信联系我。 开发语言:Java 数据库:MySQL 技术:B/S结构,SpringBoot框架 工具:MyEclipse,Navicat,To…

全球森林碳通量(2001-2023年)数据集

简介 全球森林碳通量(2001-2023) 森林碳净通量表示 2001-2023 年间森林与大气之间的碳净交换量,计算方法是模型期内森林排放的碳与森林清除(或封存)的碳之间的平衡(兆克 CO2 排放量/公顷)。碳净…

【PB案例学习笔记】-20制作一个超链接按钮

写在前面 这是PB案例学习笔记系列文章的第19篇,该系列文章适合具有一定PB基础的读者。 通过一个个由浅入深的编程实战案例学习,提高编程技巧,以保证小伙伴们能应付公司的各种开发需求。 文章中设计到的源码,小凡都上传到了gite…

【机器学习】基于稀疏识别方法的洛伦兹混沌系统预测

1. 引言 1.1. DNN模型的来由 从数据中识别非线性动态学意味着什么? 假设我们有时间序列数据,这些数据来自一个(非线性)动态学系统。 识别一个系统意味着基于数据推断该系统的控制方程。换句话说,就是找到动态系统方…

生成式AI时代,数据存储管理与成本如何不失控?

无数据,不AI。 由生成式AI掀起的这一次人工智能浪潮,对企业的产品、服务乃至商业模式都有着颠覆性的影响。因此,在多云、大数据、生成式AI等多元技术的驱动下,数据要素变得愈发重要的同时,企业对于数据存储的需求也在…

LabVIEW开发扫描隧道显微镜

扫描隧道显微镜利用量子隧穿效应,通过一个极细的探针在样品表面上进行扫描,测量隧穿电流的变化,以得到样品表面的原子级别图像。探针与样品之间的距离非常小(约1纳米),隧穿电流对距离变化极其敏感&#xff…

全能AI客户端:ChatGPT Web Midjourney Proxy,AI绘画+GPT4o对话

这绝对是目前最全能的 AI 客户端,ui 界面集成 ChatGPT AI 对话、Midjourney AI 画图、Suno AI 音乐等等市面主流的 AI 功能,只需绑定一个 API key 即可使用全部 AI 功能,Midjourney 甚至比官方好用几倍! 项目简介 ChatGPT Web Mi…