目录
- 1.前置知识
- 1.1.什么是 MQ?它有什么作用?
- 1.2.什么是消费者生产者模型?
- 1.3.AMQP 是什么?
- 2.RabbitMQ 入门
- 2.1.什么是 RabbitMQ?有什么特点?
- 2.2.RabbitMQ 的核心概念有哪些?
- 2.2.1.生产者 (Producer)、消费者 (Consumer) 和消息中间件的服务节点 (Broker)
- 2.2.3.队列 (Queue)
- 2.2.4.交换器 (Exchange)、路由键 (RoutingKey)、绑定 (Binding)
- 2.3.✨交换机 (Exchange) 有哪些类型?不同类型的交换机什么应用场景?
- 2.4.✨RabbitMQ 的工作原理是什么?
- 2.5.RabbitMQ 中的消息怎么传输?
- 3.RabbitMQ 进阶
- 3.1.当消息的生产者将消息发送出去之后,如何确定消息到底有没有正确地到达服务器?
- 3.2.请介绍一下 RabbitMQ 中的事务机制。
- 3.3.使用 RabbitMQ 事务机制可能会引发什么问题?如何解决?
- 3.4.RabbitMQ 中的发布确认机制是指什么?
- 3.5.✨发布确认机制有哪些实现方式?
- 3.6.什么是死信交换机和死信队列?死信队列有哪些应用场景?
- 3.7.什么是延迟队列?它有哪些应用场景?
- 3.8.如何设置死信队列?
- 3.9.什么是优先级队列?它有哪些应用场景?
- 3.10.如何设置优先级队列?
- 3.11.RabbitMQ 中的幂等性是指什么?如何保证?
- 3.12.RabbitMQ 中的消息持久化是指什么?
- 3.13.如何解决消息队列的延时以及过期失效问题?
- 3.14.✨如何保证 RabbitMQ 中消息的顺序性和可靠性?
- 3.15.RabbitMQ 有哪些工作模式?
- 4.RabbitMQ 高阶
- 4.1.RabbitMQ 中的集群是指什么?如何搭建?
- 4.2.如何保证 RabbitMQ 的高可用性?
- 4.3.什么是惰性队列是?它有哪些应用场景?
- 4.4.RabbitMQ 中的镜像队列是指什么?有哪些应用场景?
参考文章:
RabbitMQ——入门介绍
《RabbitMQ 实战指南》朱忠华著
1.前置知识
1.1.什么是 MQ?它有什么作用?
(1)MQ 指的是 Message Queue(消息队列),其本质上是个队列(FIFO 先进先出),它是一种在分布式系统中用于异步通信的技术。消息队列可以让不同的组件(例如应用程序、进程、服务等)通过发送和接收消息来进行通信,这些消息被保存在队列中,直到被接收方处理。
(2)MQ 的作用如下:
- 应用解耦:通过使用消息队列,不同的组件可以通过发送和接收消息来进行通信,这使得它们之间的耦合度更低。这意味着当一个组件发生变化时,其他组件不需要进行大规模的更改,从而使系统更加灵活。
- 异步通信:使用消息队列可以实现异步通信,即发送方不需要等待接收方处理完消息才能继续执行。这使得系统可以更高效地利用资源,处理大量请求。
- 可靠性:消息队列通常提供多种机制来确保消息的可靠性,例如确认机制、持久化、重试等。这些机制可以保证即使在出现故障或网络异常的情况下,消息也不会丢失或重复发送。
- 扩展性:通过使用消息队列,系统可以更容易地扩展,因为不同的组件可以独立地处理消息,而不需要进行紧密的协调。这使得系统可以更容易地处理大量请求和并发访问。
- 顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。
- 缓解流量高峰:消息队列可以缓解流量高峰的问题,因为发送方可以将请求发送到消息队列中,而不是直接发送给接收方。这可以使接收方更容易处理请求,而不会因为突然的高峰导致系统崩溃。
1.2.什么是消费者生产者模型?
(1)消费者生产者模型 (Producer-Consumer Model) 是一种并发编程模型,通常用于解决生产者和消费者之间的数据共享和同步问题。在该模型中,生产者负责生成数据并将其添加到共享数据结构中,而消费者则负责从共享数据结构中获取数据并进行处理。
(2)具体来说,生产者将数据放入共享缓冲区中,消费者从共享缓冲区中取出数据进行处理。如果共享缓冲区已满,则生产者需要等待,直到有空间可用。同样,如果共享缓冲区为空,则消费者需要等待,直到有数据可用。这个模型通过同步机制来实现生产者和消费者之间的协作,以确保数据的正确性和一致性。
(3)生产者消费者模型常用于多线程编程、进程间通信、消息队列等场景中。例如,在消息队列中,生产者将消息发送到队列中,而消费者从队列中获取消息进行处理。这种模型能够有效地解耦生产者和消费者,提高系统的可扩展性和可靠性。
1.3.AMQP 是什么?
(1)RabbitMQ 是 AMQP 协议的 Erlang 的实现(RabbitMQ 也支持 STOMP2、 MQTT3 等协议)。AMQP 的模型架构和 RabbitMQ 的模型架构是一样的。RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相应的概念。
(2)AMQP 协议本身包括三层:
- Module Layer:位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用
Queue.Declare
命令声明一个队列或者使用Basic.Consume
订阅消费一个队列中的消息。 - Session Layer:位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
- Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。
2.RabbitMQ 入门
2.1.什么是 RabbitMQ?有什么特点?
(1)RabbitMQ 是一个开源的消息中间件,它实现了高度灵活的消息队列模式。它是使用 Erlang
语言开发的,基于AMQP(Advanced Message Queuing Protocol,高级消息队列协议),并且提供了许多客户端库,包括Java、Python、Ruby等,使得开发者可以方便地与其进行交互。
(2)RabbitMQ 的具体特点可以概括为以下几点:
- 可靠性:RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认及发布确认等。
- 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起, 可以通过插件机制来实现自己的交换器。
- 扩展性:多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
- 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
- 多种协议 RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议。
- 多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript 等。
- 管理界面:RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
- 插件机制: RabbitMQ 提供了许多插件 以实现从多方面进行扩展,当然也可以编写自己的插件。
2.2.RabbitMQ 的核心概念有哪些?
RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。RabbitMQ 的整体模型架构如下图所示:
2.2.1.生产者 (Producer)、消费者 (Consumer) 和消息中间件的服务节点 (Broker)
(1)Producer:生产者,就是投递消息的一方。生产者创建消息,然后发布到 RabbitMQ 中。消息一般可以包含 2 个部分:消息体和标签。消息体也可以称之为 payload
,在实际应用中,消息体般是一个带有业务逻辑结构的数据,比如一个 JSON 字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产者把消息交由 RabbitMQ,RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者。
(2)Consumer:消费者,就是接收消息的一方。消费者连接到 RabbitMQ 服务器,并订阅到队列上,当消费者消费一条消息时,只是消费消息的消息体 (payload) 在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。
(3)Broker:消息中间件的服务节点。对 RabbitMQ 来说, 一个 RabbitMQ Broker 简单地看作 RabbitMQ 服务节点或者 RabbitMQ 服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
2.2.3.队列 (Queue)
Queue:队列,是 RabbitMQ 的内部对象,用于存储消息。RabbitMQ 的生产者生产消息井最终技递到队列中,消费者可以从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin ,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理,如下图所示:
2.2.4.交换器 (Exchange)、路由键 (RoutingKey)、绑定 (Binding)
(1)Exchange:交换机,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个由交换机类型决定,如下图所示:
(2)RoutingKey:路由键。生产者将消息发给交换器的时候, 一般会指定一个 RoutingKey
,用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键 (BindingKey
) 合使用才能最终生效。在交换器类型和绑定键 (BindingKey) 固定的情况下,生产者可以在发送消息给交换器时,通过指定 RoutingKey 来决定消息流向哪里。
(3)Binding:绑定。RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一 绑定键 BindingKey
,这样RabbitMQ 就知道如何正确地将消息路由到队列了,如下图所示:
生产者将消息发送给交换器时, 需要一个 RoutingKey
,当 BindingKey
和 RoutingKey
相匹时(后面将两者和合称为路由键), 消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型。
通过下面的比喻来理解交换器、路由键、绑定这几个概念:交换器相当于投递包裹的邮箱,
RoutingKey
相当于填写在包裹上的地址,BindingKey
相当于包裹目的地,当填写在包裹上的地址和实际想要投递的地址相匹配时,那么这个包裹就会被正确投递到目的地,最后这个目的地的"主人"一一队列可以保留这个裹。如果填写目的地址出错,邮递员不能正确投递到目的地,包裹可能会回退给寄件人,也有可能被丢弃。
2.3.✨交换机 (Exchange) 有哪些类型?不同类型的交换机什么应用场景?
(1)交换机 (Exchange) 是一种用于将消息路由到一个或多个队列的组件。生产者发送消息到交换机,交换机根据特定的路由规则将消息路由到一个或多个队列,然后消费者从队列中获取消息进行处理。
(2)交换机必须确切知道如何处理收到的消息,是应该把这些消息放到特定队列,还是说把他们到许多队列中,还是说应该丢弃它们。这就的由交换机的类型来决定。RabbitMQ 支持以下四种类型的交换机:
Fanout Exchange
:把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。Direct Exchange
:把消息路由到那些BindingKey
和RoutingKey
完全匹配的队列中。Topic Exchange
:在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到RoutingKey
和BindingKey
相匹配的队列中,但这里的匹配规则有些不同,它约定:根据消息的 Routing Key 和通配符将消息路由到一个或多个队列。RoutingKey
为一个点号"."
分隔的字符串(被点号"."
分隔开的每一段独立的字符串称为一个单词),例如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;BindingKey
和RoutingKey
一样也是点号"."
分隔的字符串;BindingKey
中可以存在两种特殊字符串 “*” 和 “#”,用于做模糊匹配,其中"*"
用于匹配一个单词,"#"
用于匹配多规格单词(可以是零个);
Headers Exchange
:该类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中headers 属性
进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
(3)不同类型的交换机适用于不同的应用场景。例如:
Fanout Exchange
适用于将消息广播给所有绑定到该交换机上的队列;Direct Exchange
适用于将消息路由到特定的队列;Topic Exchange
适用于模糊匹配 Routing Key;Headers Exchange
适用于根据消息的 Headers 属性进行路由,但实际上很少使用;
2.4.✨RabbitMQ 的工作原理是什么?
(1)我们将 RabbitMQ 的工作原理分为生成者发送消息和消费者接收消息这两部分:
- 在最初状态下,生产者发送消息的时候:
- 生产者连接到
RabbitMQ Broker
,建立一个连接 (Connection),开启一个信道 (Channel); - 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等;
- 生产者声明一个队列,并设置相关属性,比如是否排他、是否持久化、是否自动删除等;
- 生产者通过路由键将交换器和队列绑定起来;
- 生产者发送消息至
RabbitMQ Broker
,其中包含路由键、交换器等信息。 - 相应的交换器根据接收到的路由键以及自身类型,来查找相匹配的队列:
- 如果找到 ,则将从生产者发送过来的消息存入相应的队列中。
- 如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者。
- 关闭信道,关闭连接。
- 生产者连接到
- 消费者接收消息的过程:
- 消费者连接到
RabbitMQ Broker
,建立一个连接 (Connection) ,开启一个信道 (Channel); - 消费者向
RabbitMQ Broker
请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作; - 等待
RabbitMQ Broker
回应并投递相应队列中的消息, 消费者接收消息; - 消费者确认 (ack) 接收到的消息;
- RabbitMQ 从队列中删除相应己经被确认的消息;
- 关闭信道,关闭连接。
- 消费者连接到
(2)下面介绍两个新的概念:Connection
和 Channel
。我们知道无论是生者还是消费者,都需要和 RabbitMQ Broker
建立连接,这个连接就是一条 TCP 连接,也就是 Connection。一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道 (Channel) ,每个信道都会被指派一个唯一的 ID。信道是建立在 Connection 之上的虚拟连接, RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。
(3)我们完全可以直接使用 Connection 就能完成信道的工作,为什么还要引入信道呢?试想这样一个场景,一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection
,也就是许多个 TCP 连接。然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用类似 NIO (Non-blocking I/O) 的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。
2.5.RabbitMQ 中的消息怎么传输?
为了避免 TCP 连接的开销和并发数受限的性能瓶颈,RabbitMQ 采用了信道 (Channel) 的方式来传输数据。信道是生产者、消费者与 RabbitMQ 通信的通道,它是建立在 TCP 连接上的虚拟连接,而且每个 TCP 连接可以拥有多个信道,没有数量限制。因此,RabbitMQ 可以在一条 TCP 连接上建立成百上千个信道,以实现多线程处理。这样,多个线程可以共享同一条 TCP 连接,而每个信道在 RabbitMQ 中都有唯一的 ID,从而确保信道的私有性。每个信道对应一个线程的使用,这种方式使得 RabbitMQ 能够高效地利用系统资源。
3.RabbitMQ 进阶
3.1.当消息的生产者将消息发送出去之后,如何确定消息到底有没有正确地到达服务器?
默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前已经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达 RabbitMQ 针对这个问题,提供了两种解决方式:
- 通过事务机制实现;
- 通过发布确认机制实现。
3.2.请介绍一下 RabbitMQ 中的事务机制。
(1)RabbitMQ 支持事务机制,通过事务机制可以保证消息的原子性、一致性和持久性。在 RabbitMQ 中,事务机制通过 AMQP (Advanced Message Queuing Protocol) 事务模型来实现。RabbitMQ 客户端中与事务机制相关的方法有以下三个:
channel.txSelect
:用于将当前的信道设置成事务模式;channel.txCommit
:用于提交事务;channel.txRollback
:用于事务回滚。
(2)在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ 了:
- 如果事务提交成功,则消息一定到达了 RabbitMQ 中;
- 如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行
channel.txRollback
方法来实现事务回滚。
(3)注意这里的 RabbitMQ 中的事务机制与大多数数据库中的事务概念并不相同,需要注意区分。关键示例代码如下所示:
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,
"transaction messages".getBytes());
channel.txCommit();
上面代码对应的 AMQP 协议流转过程如下图所示:
3.3.使用 RabbitMQ 事务机制可能会引发什么问题?如何解决?
(1)事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被 RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。
(2)但是使用事务机制会“吸干” RabbitMQ 的性能,因为事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?从 AMQP 协议层面来看并没有更好的办法,但是 RabbitMQ 提供了一个改进方案,即发送方确认机制。
3.4.RabbitMQ 中的发布确认机制是指什么?
(1)在 RabbitMQ 中,发布确认 (Publish/Confirm) 指的是生产者发送消息到 RabbitMQ 服务器后,等待服务器确认已经接收到消息的过程。具体来说,当生产者通过 RabbitMQ 的 AMQP 协议发送消息到服务器时,可以指定一个回调函数,用于在服务器确认接收到消息时进行处理。如果服务器成功接收到消息,就会调用该回调函数进行通知。如果服务器无法接收到消息,则会触发异常处理。
(2)生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认 (Basic.Ack) 给生产者(包含消息的唯一 ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ 回传给生产者的确认消息中的 deliveryTag
包含了确认消息的序号,此外 RabbitMQ 也可以设置 channel.basicAck
方法中的 multiple
参数,表示到这个序号之前的所有消息都已经得到了处理,可以参考下图。
如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack (Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该 nack 命令。生产者通过调用 channel.confirmSelect
方法(即 Confirm.Select 命令)将信道设置为 confirm 模式,之后 RabbitMQ 会返回 Confirm.Select-Ok
命令表示同意生产者将当前信道设置为 confirm 模式。所有被发送的后续消息都被 ack 或者 nack 一次,不会出现一条消
息既被 ack 又被 nack 的情况,并且 RabbitMQ 也并没有对消息被 confirm 的快慢做任何保证。
3.5.✨发布确认机制有哪些实现方式?
(1)在 RabbitMQ 中,单独确认、批量确认和异步确认模式是发布确认机制的不同实现方式。
- 单独确认模式:指每次发送一条消息后,即等待 RabbitMQ 服务器返回该消息的确认信息。单独确认模式适用于需要及时获取每条消息的确认结果并进行相应处理的场景。但由于每次发送消息都要等待确认,可能会对性能有所影响。具体步骤如下:
- 将 Channel 设置为确认模式 (confirm mode),即通过调用
channel.confirmSelect()
方法启用确认模式。 - 发布消息到 RabbitMQ 服务器。
- 等待 RabbitMQ 服务器返回该消息的确认信息,可以通过调用
channel.waitForConfirms()
方法来等待确认。该方法会阻塞当前线程,直到收到对应消息的确认信息或超时。
- 将 Channel 设置为确认模式 (confirm mode),即通过调用
- 批量确认模式:指在发送多个消息后,等待 RabbitMQ 服务器一次性返回多条确认信息。批量确认模式通过一次性返回多条确认信息,提高了确认的效率。适用于需要批量发送消息并等待确认的场景。具体步骤如下:
- 将 Channel 设置为确认模式。
- 发布多条消息到 RabbitMQ 服务器。
- 使用
channel.waitForConfirms()
方法等待 RabbitMQ 服务器返回全部消息的确认信息。
- 异步确认模式:指通过添加一个确认监听器 (confirm listener) 来异步处理确认结果。使用异步确认模式,可以在消息发送的过程中继续进行其他操作,不需要阻塞等待确认的结果。一旦收到确认信息,即可在监听器中进行相应的处理。具体步骤如下:
- 将 Channel 设置为确认模式;
- 发布消息到 RabbitMQ 服务器。
- 注册一个确认监听器,在
channel.addConfirmListener()
方法中可以添加ConfirmListener
这个回调接口,这个 ConfirmListener 接口包含两个方法:handleAck
和handleNack
,分别用来处理 RabbitMQ 回传的Basic.Ack
和Basic.Nack
。在这两个方法中都包含有一个参数deliveryTag
(在发布确认模式下用来标记消息的唯一有序序号)。我们需要为每一个信道维护一个 “unconfirm” 的消息序号集合,每发送一条消息,集合中的元素加 1。每当调用 ConfirmListener 中的 handleAck 方法时,“unconfirm”集合中删掉相应的一条(multiple 设置为 false)或者多条(multiple 设置为 true)记录。从程序运行效率上来看,这个 “unconfirm” 集合最好采用有序集合 SortedSet 的存储结构。
(2)总结:
- 单独确认模式适用于需要及时知道每条消息的确认结果的场景;
- 批量确认模式适用于需要批量发送消息并等待确认的场景;
- 异步确认模式适用于可以异步处理确认结果的场景,不需要阻塞等待确认。
注意:事务机制和发布确认机制两者是互斥的,不能共存!
3.6.什么是死信交换机和死信队列?死信队列有哪些应用场景?
(1)当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是死信交换机 (Dead-Letter-Exchange, DLX)
,绑定 DLX 的队列就称之为死信队列 (Dead-Letter Queue, DLQ)
。死信队列是指用于处理无法被消费者正常处理的消息的特殊队列。消息变成死信一般是由于以下几种情况:
- 消息被拒绝 (Basic.Reject/Basic.Nack),井且设置 requeue 参数为 false;
- 消息过期,即消息在队列中的生存时间超过设置 TTL(Time-To-Live,存活时间)值;
- 队列达到最大长度;
(2)DLX 是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息新发布到设置的 DLX ,进而被路由到另一个队列,即死信队列。通过在 channel.queueDeclare
方法中设置 x-dead-letter-exchange
参数来为这个队列添加 DLX。
(3)通过使用死信队列,可以使得无法被正常处理的消息得到有效的处理,避免了因为消息被“遗弃”而造成的资源浪费,具体应用场景如下:
- 消息重试:如果一个消息在队列中被消费者拒绝或者超时,可以将该消息发送到死信队列中,以便重新处理该消息。通过设置消息的 TTL(Time-To-Live,存活时间)和重试次数等参数,可以实现自动重试机制。
- 延迟消息:通过将消息发送到带有 TTL 参数的队列中,可以实现延迟消息的功能。当消息的 TTL时间到达后,消息会被自动发送到死信队列中,从而实现延迟消息的处理。
- 分流:通过将不同的消息路由到不同的队列中,并且在队列中设置不同的 TTL 时间,可以实现消息的分流功能。当消息的 TTL 时间到达后,可以根据不同的死信路由键将消息发送到不同的死信队列中,从而实现消息的分流处理。
- 日志收集:通过将不同的日志级别路由到不同的队列中,并且在队列中设置不同的 TTL 时间,可以实现日志的收集功能。当日志的 TTL 时间到达后,可以将日志发送到死信队列中,并且通过指定的死信路由键将日志路由到指定的日志处理系统中。
- 队列监控:通过设置死信交换机和死信队列来监控队列中的异常消息,并且通过将异常消息发送到死信队列中来进行异常处理。通过监控死信队列中的消息,可以及时发现队列中的问题,并且进行处理。
3.7.什么是延迟队列?它有哪些应用场景?
(1)延迟队列指用来存放需要在指定时间被处理的消息的队列。在 AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 DLX 和 TTL 模拟出延迟队列的功能。具体来说,当一个消息被发送到延迟队列时,可以设置消息的 TTL 参数,指定该消息需要被延迟的时间。RabbitMQ 会在消息的 TTL 时间到达后,将该消息发送到一个特定的交换机,然后根据指定的路由键将该消息路由到对应的队列中,从而实现消息的延迟投递。
(2)延迟队列在实际应用中有很多的应用场景,例如:
- 订单超时处理:可以将订单过期时间作为消息的 TTL 参数,当订单过期时,该订单会被自动发送到死信队列中进行处理。
- 短信验证码发送:可以将短信验证码的过期时间作为消息的 TTL 参数,当验证码过期时,该消息会被自动发送到死信队列中进行处理。
- 消息重试:可以将消息发送到延迟队列中,设置合适的 TTL 时间和重试次数,实现消息重试的功能。
- 购物车过期处理:可以将购物车的过期时间作为消息的 TTL 参数,当购物车过期时,该消息会被自动发送到死信队列中进行处理。
- 定时任务调度:可以使用延迟队列来实现定时任务调度功能,将任务的执行时间作为消息的 TTL 参数,当任务到达执行时间时,该消息会被自动发送到死信队列中进行处理。
3.8.如何设置死信队列?
(1)在 RabbitMQ 中设置死信队列需要以下几个步骤:
- 创建死信队列及其绑定:首先,需要创建一个用于存储死信消息的队列,并将其绑定到一个交换机上。可以使用
queue.declare
和exchange.declare
方法进行创建和声明。 - 设置原始队列的参数:接下来,需要设置原始队列(即希望成为死信的队列)的参数,使其成为一个具有死信功能的队列。可以通过调用
queue.declare
方法,并设置x-dead-letter-exchange
和x-dead-letter-routing-key
参数来指定死信队列的交换机和路由键。x-dead-letter-exchange
:指定死信消息发送到的交换机。x-dead-letter-routing-key
:指定死信消息的路由键。
- 将消息发送到原始队列:现在,当消息被发送到原始队列时,如果满足一定的条件,例如消息过期、消息被拒绝等,该消息将被标记为死信,并被发送到死信队列。
- 处理死信消息:最后,应用程序可以从死信队列中接收和处理死信消息。通过订阅死信队列,可以消费这些死信消息,并根据实际需求做出相应的处理,例如记录日志、重新投递等。
(2)需要注意的是,在设置死信队列时还可以配置其他参数,如消息过期时间、最大重试次数等,以满足具体的业务需求。
3.9.什么是优先级队列?它有哪些应用场景?
(1)优先级队列是指一种可以根据消息的优先级进行排序和处理的队列,可以通过设置队列的 x-max-priority
参数来实现。具体来说,当一个消息被发送到优先级队列时,可以为该消息设置一个优先级参数。RabbitMQ 会根据消息的优先级对消息进行排序,并优先处理优先级高的消息。如果两个消息的优先级相同,则按照先进先出的原则进行处理。
(2)优先级队列在实际应用中有很多的应用场景,例如:
- 任务调度:可以使用优先级队列来实现任务调度功能,将不同优先级的任务放入不同的队列中,并按照优先级顺序处理任务。
- 日志处理:可以使用优先级队列来处理日志信息,将重要的日志信息放入优先级高的队列中,以保证这些信息能够及时被处理。
- 数据库更新:可以使用优先级队列来处理数据库更新操作,将高优先级的更新操作放在队列的前面,以保证这些操作能够优先被处理。
(3)总之,优先级队列是 RabbitMQ 中非常有用的一种特性,可以帮助我们优化消息的处理顺序,提高消息的实时性和重要性,适用于很多不同的业务场景。
3.10.如何设置优先级队列?
(1)核心步骤如下:
- 创建队列时设置优先级参数:在创建队列时,可以通过设置
x-max-priority
参数来指定队列的最大优先级。 - 发布消息时设置优先级属性:在消息的属性中,设置 priority 属性来指定消息的优先级。可以使用 basic.publish 方法,并将 priority 设置为所需的优先级数值,范围从 0(最低优先级)到队列最大优先级数值(最高优先级)。
(2)示例代码如下:
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
注意:消费者需要等待消息已经发送到队列中才去消费(即不能消费地太快,否则优先级队列来不及对消息进行排序),这样才有机会对消息进行排序。
3.11.RabbitMQ 中的幂等性是指什么?如何保证?
(1)在消息队列系统中,幂等性是指同一条消息被处理多次时,处理的结果保持一致。
举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。
(2)在 RabbitMQ 中,消息的幂等性可以通过以下方式来保证:
- 消息 ID:可以在发送消息时为每条消息生成一个唯一的 ID,然后在消息被消费时,通过比较消息 ID 和已处理消息的 ID 来判断消息是否已经被处理过。
- 业务 ID:可以使用消息中的业务 ID 来判断消息是否已经被处理过。在处理消息时,可以将消息的业务 ID 和处理结果存储到数据库或缓存中,如果下次收到相同的业务 ID 的消息,则可以从数据库或缓存中获取之前的处理结果,避免重复处理。
- 版本号:可以在消息中加入版本号,当消息被消费时,可以比较消息的版本号和已处理消息的版本号来判断消息是否已经被处理过。如果版本号相同,则表示消息已经被处理过,可以直接忽略。
(3)在 RabbitMQ 中保证消息的幂等性非常重要,因为在消息队列中,同一条消息可能会被多次消费。如果消息的处理结果不保持一致,可能会导致数据不一致或业务错误。因此,需要在实现消费者时考虑消息的幂等性,保证消息的处理结果是可预期的。
3.12.RabbitMQ 中的消息持久化是指什么?
(1)在 RabbitMQ 中,消息持久化是指将消息存储到磁盘上,以确保即使在 RabbitMQ 服务器意外崩溃或重启时,也能保证消息的安全性和可靠性。与之相对的是非持久化的消息,这种消息只会被存储在内存中,如果 RabbitMQ 服务器意外崩溃或重启,这些消息将会丢失。
(2)要将消息持久化,需要同时对队列和消息进行持久化设置。在创建队列时,可以设置 durable 参数为 true,表示该队列是持久化的。例如,以下代码创建了一个名为 test_queue 的持久化队列:
channel.queueDeclare('test_queue', durable=True)
对于要发送的消息,也需要设置持久化属性。例如,以下代码创建了一个名为 message 的持久化消息:
channel.basicPublish("exchange", 'test_queue', "Hello World!", MessageProperties.PERSISTENT_TEXT_PLAIN)
需要注意的是,将消息设置为持久化会对性能产生一定的影响,因为需要将消息写入磁盘。在生产环境中,需要权衡可靠性和性能,选择合适的持久化策略。
3.13.如何解决消息队列的延时以及过期失效问题?
要解决消息队列的延时和过期失效问题,可以采取以下方法:
- 延时队列 (Delayed Queue):一种常见的方法是使用延时队列来处理延时消息。将要延时处理的消息发送到延时队列中,并设置消息的延时时间。在延时队列中,可以使用定时任务或者定时器来检查是否有过期的消息需要被处理,一旦到达延时时间,将消息转移至正常处理队列。
- TTL (Time To Live) 设置:在发送消息时,可以设置消息的过期时间。消息在队列中等待时,会根据设定的过期时间进行判断,一旦超过过期时间还未被消费者处理,消息会被标记为过期失效并被丢弃。RabbitMQ 支持对整个队列设置 TTL,也可以对每条消息单独设置 TTL。
- 定时任务:在消费者端使用定时任务来处理延时和过期消息。消费者可以定期检查队列中是否有到达处理时间的消息,一旦检测到延时或过期消息,即进行相应的处理。
- 死信队列 (Dead Letter Queue):可以为每个队列设置一个死信队列,用于接收处理失败或过期的消息。当消息无法被正常消费或过期失效时,将消息发送到死信队列中,进行后续的处理和记录。
- 定期清理:定期清理队列中的过期消息。可以使用定时任务或者定时脚本来清理过期的消息,避免堆积过多的无效消息。
3.14.✨如何保证 RabbitMQ 中消息的顺序性和可靠性?
(1)RabbitMQ 本身并不直接支持严格的消息顺序性,因为它是一个并发处理消息的分布式系统,多个消费者可以同时处理不同的消息。但是可以通过以下方法帮助实现近似的消息顺序性:
- 单一消费者:将消息队列配置为只有一个消费者(单消费者模式),这样消息将按照发送的顺序依次被消费。但这个方法会降低系统的吞吐量和并发性能。
- 多个队列:可以将消息按照顺序分发到不同的队列中,每个队列对应一个独立的消费者。消费者按照队列的顺序消费消息,从而实现消息的近似顺序性。
- 消息分组:可以为消息设置一个可排序的标识(例如消息 ID 或时间戳),然后在消费者端根据消息的标识将消息进行排序和处理。这需要在应用层面进行一些额外的处理,以确保消息按照指定的顺序进行消费。
- 手动确认:使用消息的手动确认模式 (manual acknowledge),消费者在处理完一条消息后手动发送确认操作给 RabbitMQ。这样可以确保消费者顺序地处理消息,并按照处理完成的顺序进行确认。需要注意的是,这种方式需要消费者处理完一条消息后再处理下一条消息,会降低处理速度。
- 使用有序队列插件:RabbitMQ 社区提供了一个有序队列插件 (rabbitmq-queue-master),该插件可以确保分布式环境中的消息顺序性。该插件会将队列中的消息路由到单独的节点进行处理,保证了消息的顺序性,但也会引入额外的复杂性和效率损耗。
(2)为了保证 RabbitMQ 中消息的可靠性,可以采取以下措施:
- 持久化:通过将消息和队列都设置为持久化,可以确保在 RabbitMQ 重启或崩溃后,消息仍然可靠地保存在磁盘上。在生产者发送消息时,将消息的投递模式设置为
persistent
,同时在队列声明时将durable
参数设置为true
。 - 手动确认:消费者可以使用手动确认模式 (manual ack) 来确认消息的处理状态。当消费者成功处理一条消息后,在代码中发送确认消息给RabbitMQ,表示消息已经被处理。只有在确认之后,RabbitMQ 才会将消息从队列中删除。这样可以保证消息被成功处理后才被删除,避免消息丢失。
- 限制消息重试次数:在消费者处理消息发生错误或异常时,可以设置最大重试次数。当消息达到最大重试次数后,可以进行特殊处理,如记录日志、发送警报或将消息丢弃等。
- 备份与复制:RabbitMQ 支持镜像队列 (mirrored queues) 的功能,通过将队列在多个节点之间进行备份或复制,可以提高消息的可靠性和容错性。如果某个节点发生故障,备份节点可以接管消息的处理。
- 事务机制:RabbitMQ 支持事务,可以将一组操作包装在事务中,要么全部成功执行,要么全部回滚。但是,事务会带来一定的开销,影响性能,因此在需要保证递交的可靠性时才使用事务。
- 心跳机制:通过配置心跳间隔,生产者和消费者可以定期向 RabbitMQ 服务器发送心跳,以确保连接的活性。如果长时间没有收到心跳,可能是网络故障或服务器宕机,此时可尝试重新连接或进行相应的处理。
3.15.RabbitMQ 有哪些工作模式?
(1)RabbitMQ 支持多种工作模式,包括以下几种常用的模式:
- 简单模式 (Simple Mode):也称为点对点模式 (Point-to-Point),是最简单的模式。一个生产者发送消息到一个队列,一个消费者从队列中接收并处理消息。消息只会被一个消费者接收和处理,适用于单个消费者场景。
- 发布/订阅模式 (Publish/Subscribe Mode):也称为广播模式 (Broadcast),一个生产者发送消息到一个交换机,交换机将消息广播给多个队列。每个队列有一个对应的消费者进行消息的接收和处理。适用于多个消费者同时接收同一份消息的场景。
- 工作队列模式 (Work Queue Mode):也称为任务队列模式 (Task Queue),一个生产者发送消息到一个队列,多个消费者并行地从队列中接收和处理消息。消息会被竞争性地分发给多个消费者,每个消息只会被一个消费者接收和处理。适用于任务分发和负载均衡的场景。
- 主题模式 (Topic Mode):消息通过路由键 (Routing Key) 的模式匹配来进行订阅和路由。一个生产者发送消息到一个交换机,交换机根据消息的路由键将消息路由到相关的队列。消费者可以使用通配符匹配路由键,选择性地接收和处理消息。适用于灵活的消息订阅和过滤的场景。
- 头部模式 (Header Mode):消息通过消息头的匹配来进行订阅和路由。生产者发送带有特定消息头的消息到一个交换机,交换机根据消息头的匹配将消息路由到相关的队列。适用于更复杂的消息匹配和路由场景。
(2)不同的工作模式适用于不同的业务场景和需求,可以根据实际情况选择合适的工作模式来实现消息的传输和处理。
4.RabbitMQ 高阶
4.1.RabbitMQ 中的集群是指什么?如何搭建?
(1)在 RabbitMQ 中,集群是指将多个 RabbitMQ 服务器连接在一起,共同处理消息队列。集群可以提高系统的可用性和可扩展性,避免单点故障和瓶颈问题,支持水平扩展。
(2)要搭建 RabbitMQ 集群,可以按照以下步骤:
- 在每个节点上安装 RabbitMQ,假设有 3 三个节点,即 3 台机器,这里分别将它们命名为 node1、node2、node3,,其中 node1 为主节点,并设它们的 IP 地址分别 192.168.88.16、192.168.88.17、192.168.88.18;
- 在每个节点上配置 Erlang Cookie。Erlang Cookie 是一个用于节点间认证的字符串,确保只有具有相同 Erlang Cookie 的节点才能连接在一起。可以通过修改 /var/lib/rabbitmq/.erlang.cookie 文件来设置 Erlang Cookie,确保各节点的 Erlang Cookie 相同。
- 启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RabbitMQ 应用服务(在三台节点上分别执行以下命令)
rabbitmq-server -detached
- 在 node2 依次执行以下命令:
rabbitmqctl stop_app
# rabbitmqctl stop 会将 Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
# 只启动应用服务
rabbitmqctl start_app
- 在 node3 依次执行以下命令:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app
- 查看集群状态
# 在任意一个节点上都可查看
rabbitmqctl cluster_status
- 重新设置用户
# 创建账号
rabbitmqctl add_user admin 123
# 设置用户角色
rabbitmqctl set_user_tags admin administrator
# 设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
登录 Web 管理页面便可以查看该集群的相关信息
- 解除集群节点(在 node2 和 node3 上分别执行)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
# node1 上执行
rabbitmqctl forget_cluster_node rabbit@node2
4.2.如何保证 RabbitMQ 的高可用性?
(1)要保证 RabbitMQ 的高可用性,可以采取以下措施:
- 集群配置:将多个 RabbitMQ 节点组成一个集群。集群中的节点可以互相复制和同步消息,实现数据的冗余存储和备份。当一个节点发生故障时,其他节点可以接管消息的处理。通过集群配置,可以提高系统的可用性和容错性。
- 镜像队列:使用镜像队列 (Mirrored Queue) 将队列数据复制到多个节点上,确保消息的可靠性和持久化。当主节点发生故障时,备份节点可以接管消息的处理,保证服务不中断。镜像队列可用于实现主从架构或者多主之间的数据冗余存储。
- 负载均衡:在高并发的情况下,将请求均匀地分发到不同的 RabbitMQ 节点,避免某个节点负载过重。可以通过负载均衡器、反向代理或者DNS轮询等方式实现负载均衡。
- 心跳机制:配置心跳检测机制,及时监测节点和连接的活跃状态。通过定期发送心跳信号,能够检测节点是否正常运行。当节点或连接不可用时,可以及时进行故障处理和重连。
- 故障转移:当某个节点或连接发生故障时,需要进行故障转移处理,快速切换到备份节点或建立新的连接。可以通过使用自动故障转移工具(如 Pacemaker)或编写自定义的监控脚本来实现。
- 持久化配置:将 RabbitMQ 的配置文件和数据进行持久化存储,确保在节点重启或崩溃后能够恢复配置和数据。可以将配置文件和数据存储在可靠的存储介质上,如磁盘或网络文件系统。
- 监控和报警:设置监控系统来监测 RabbitMQ 的各项指标和运行状态,如节点运行情况、队列长度、消费者数量等。及时发现异常情况并触发报警,以便及时进行故障处理和调优。
(2)综合应用以上措施,能够提高 RabbitMQ 的高可用性,保证系统在节点故障或网络问题等情况下的稳定运行。需要根据实际情况和需求选择合适的策略和配置方式。
4.3.什么是惰性队列是?它有哪些应用场景?
(1)RabbitMQ 中的惰性队列 (Lazy Queue) 是指一种可以延迟加载消息的队列。与普通的队列不同,惰性队列的消息在被消费之前不会被完全加载到内存中,而是在消费者开始消费时才会被加载。具体来说,当一个消息被发送到惰性队列时,该消息会被存储到磁盘中,而不是存储在内存中。当消费者开始消费该队列时,RabbitMQ 会逐个从磁盘中读取消息,并将消息加载到内存中进行处理。这种方式可以避免队列过大导致内存溢出的问题,同时也能够提高队列的性能和扩展性。
(2)队列具备两种模式:default
和 lazy
。默认的为 default
模式,在 3.6.0 之前的版本无须做任何变更。lazy
模式即为惰性队列的模式,可以通过调用 channel.queueDeclare
方法的时候在参数中设置,也可以通过 Policy
的方式设置,如果一个队列同时使用这两种方式设置,那么 Policy
的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式,那么只能先删除队列,然后再重新声明一个新的。在队列声明的时候可以通过 x-queue-mode
参数来设置队列的模式,取值为 default
和 lazy
。
(3)惰性队列在实际应用中有很多的应用场景,例如:
- 高吞吐量:可以使用惰性队列来处理高吞吐量的消息,因为惰性队列可以将消息存储在磁盘中,不会对内存造成过大的压力。
- 消息存储:可以使用惰性队列来存储大量的消息数据,以避免内存溢出的问题。
- 数据分析:可以使用惰性队列来存储大量的数据,例如用户行为数据、网站访问日志等,以便后续进行数据分析和处理。
4.4.RabbitMQ 中的镜像队列是指什么?有哪些应用场景?
(1)RabbitMQ 中的镜像队列 (Mirrored Queue) 是一种特殊的队列配置,通过将队列在多个节点之间进行备份或复制,实现数据的冗余存储,提高消息的可靠性和容灾能力。在镜像队列中,主节点 (Master) 上的消息会被同步地复制到一个或多个备份节点 (Slave),从而保持备份队列与主队列的一致性。当主节点发生故障或不可用时,备份节点可以接管消息的处理,确保消息的持久化和可靠性。
(2)镜像队列可以用于以下应用场景:
- 提高可用性:通过将队列在多个节点之间进行镜像备份,当某个节点发生故障时,备份节点可以快速接管消息的处理,保证系统的高可用性。
- 容灾备份:通过在不同的数据中心或服务器之间进行消息的镜像复制,保证消息的冗余存储,避免数据丢失。在主节点发生灾难性故障时,可以迅速切换到备份节点上继续消息处理。
- 提高读取吞吐量:通过将镜像队列部署在多个节点上,可以实现消费者在多个节点上并行地读取消息,提高整体的消费吞吐量。
需要注意的是,使用镜像队列会增加系统的资源消耗和网络传输开销。因此,在配置镜像队列时需要根据实际情况考虑节点数量、网络带宽和延迟,以及对数据一致性和可用性的要求,综合平衡资源和性能。此外,镜像队列需要使用 RabbitMQ 的集群功能进行配置和管理。
(3)需要注意的是,镜像队列会增加系统的负载,因为它会将消息复制到多个节点。因此,在配置镜像队列时需要考虑系统的性能和可扩展性。另外,镜像队列还需要在集群中进行配置,以确保消息在多个节点之间同步。