1.前言
RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)的标准,并用Erlang语言编写。作为消息代理,RabbitMQ接收、存储和转发消息,帮助应用程序之间实现异步通信。它提供了一个强大而灵活的消息传递机制,可以在分布式系统中可靠地传递消息,确保消息的顺序性和可靠性。
RabbitMQ的核心概念包括生产者、消费者、交换机、队列和绑定。生产者负责发送消息,消费者负责接收消息,交换机负责接收来自生产者的消息并将它们路由到一个或多个队列,队列存储消息直到消费者准备接收它们,而绑定则定义了交换机和队列之间的关系。
RabbitMQ具有许多特性,包括可靠性、灵活的路由、集群和高可用性、可扩展性、管理界面、多种协议支持和可编程性。它被广泛应用于构建分布式系统中的消息队列、异步任务处理、日志收集、事件驱动架构等场景,是一个强大而受欢迎的消息中间件解决方案。
1.1 前置知识
1. 同步通信 和 异步通信
微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢?
解读:
-
同步通讯:就如同打视频电话,双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。
-
异步通讯:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。因此你可以多线操作,同时跟多人聊天。
两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信,但是往往响应会有延迟。
所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。
同步通信:服务返回响应后才可以进行后续的操作。
存在的问题:
- 扩展性差
-
随着业务规模扩大,产品的功能也在不断完善。每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则(面向修改关闭,面向拓展开放),拓展性不好。
-
- 性能下降
-
我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和。
-
- 级联失败
-
由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。
这其实就是同步调用的级联失败问题。
-
比如:比如说支付成功,短信发送出现问题了,就给我们退款了。
-
级联失败雪崩:由于一系列问题或错误的积累,最终导致系统或项目崩溃或失败的现象。
-
异步调用: 只发送通知,发送完就可以结束了,具体你有没有收到,什么时候收到,我不关心。
介绍:
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
-
消息发送者:投递消息的人,就是原来的调用方
-
消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
-
消息接收者:接收和处理消息的人,就是原来的服务提供方
在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。
这样,发送消息的人和接收消息的人就完全解耦了。
优势:
-
耦合度更低
-
性能更好
-
业务拓展性强
-
故障隔离,避免级联失败
-
消峰
-
消峰的原理就是全部都放在消息队列,里面后续的业务慢慢的取
-
缺点:
-
完全依赖于Broker的可靠性、安全性和性能
-
架构复杂,后期维护和调试麻烦
1.2 不同MQ之间的对比
消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.
目比较常见的MQ实现:
-
ActiveMQ
-
RabbitMQ
-
RocketMQ
-
Kafka
几种常见MQ的对比:
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
2. RabbitMQ的安装
2.1 执行Docker命令
docker run -d \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_VHOST=default_vhost \
-e RABBITMQ_DEFAULT_USER=default_user \
-e RABBITMQ_DEFAULT_PASS=default_pass \
--hostname my_rabbitmq \
--name rabbitmq \
rabbitmq
-
15672:RabbitMQ提供的管理控制台的端口
-
5672:RabbitMQ的消息发送处理接口
参数说明:
-
docker run
: 运行 Docker 容器的命令。 -
-d
: 在后台运行容器,即以守护进程的方式运行容器。 -
-p 15672:15672 -p 5672:5672
: 将容器的端口 15672(RabbitMQ 控制台 Web 界面的端口)和 5672(RabbitMQ 应用访问的端口)映射到主机的对应端口。这样可以通过主机的这些端口来访问 RabbitMQ。 -
-e RABBITMQ_DEFAULT_VHOST=my_vhost
: 设置 RabbitMQ 默认的虚拟机名为my_vhost
。虚拟机(vhost)是 RabbitMQ 中用于隔离不同应用程序或服务的逻辑隔离单元。 -
-e RABBITMQ_DEFAULT_USER=admin
: 设置 RabbitMQ 默认的用户名为admin
。 -
-e RABBITMQ_DEFAULT_PASS=123456
: 设置 RabbitMQ 默认的用户密码为 123456。 -
--hostname myRabbit
: 指定容器的主机名为myRabbit
。在 RabbitMQ 中,节点名称被用于存储数据,而默认情况下会使用主机名。因此,在此设置了容器的主机名。 -
--name rabbitmq
: 设置容器的名称为rabbitmq
。 -
rabbitmq
: 指定要使用的容器镜像为rabbitmq
,这是 RabbitMQ 官方提供的 Docker 镜像。
综上所述,该命令的作用是以后台方式启动一个 RabbitMQ 容器,配置了默认的虚拟机名、用户名和密码,并将容器的端口映射到主机上,使得可以通过主机访问 RabbitMQ 控制台和应用服务。
2.2 设置开机自启动
docker update rabbitmq --restart=always
2.3 启动 rabbitmq_management
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
为什么要启动这个:
这个命令的作用是在名为“rabbitmq”的Docker容器中启用RabbitMQ管理插件。具体来说,它启用了RabbitMQ的管理插件,这个插件提供了一个Web界面,可以通过浏览器来管理RabbitMQ服务器。
通常情况下,启用管理插件是为了方便地监控和管理RabbitMQ服务器,可以通过浏览器访问http://<RabbitMQ服务器的IP地址>:15672来打开RabbitMQ的管理界面。
3. RabbitMQ 核心模块介绍
其中包含几个概念:
-
publisher
:生产者,也就是发送消息的一方 (发送给交换机) -
consumer
:消费者,也就是消费消息的一方(和队列进行绑定(监听)) -
queue
:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理 -
exchange
:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。- 交换机只能路由消息,无法存储消息
- 交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定
-
virtual host
:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue,因为RabiitMQ性能很强,单个项目使用会造成巨大的浪费,所以多个项目,实现一套MQ,virtual host就是为了不同交换机产生隔离(和容器概念一样)
4. 数据隔离
点击Admin
选项卡,首先会看到RabbitMQ控制台的用户管理界面:
这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima
这个用户。仔细观察用户表格中的字段,如下:
-
Name
:itheima
,也就是用户名 -
Tags
:administrator
,说明itheima
用户是超级管理员,拥有所有权限 -
Can access virtual host
:/
,可以访问的virtual host
,这里的/
是默认的virtual host
对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host
的隔离特性,将不同项目隔离。一般会做两件事情:
-
给每个项目创建独立的运维账号,将管理权限分离。
-
给每个项目创建不同的
virtual host
,将每个项目的数据隔离。
5.SpringAMQP
Spring AMQP(Spring for Advanced Message Queuing Protocol)是 Spring 框架的一个模块,用于与 AMQP(Advanced Message Queuing Protocol)兼容的消息中间件进行集成。AMQP 是一种消息协议,用于在分布式应用程序之间传递消息。
将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ
采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ
交互。并且RabbitMQ
官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAMQP提供了三个功能:
-
自动声明队列、交换机及其绑定关系
-
基于注解的监听器模式,异步接收消息
-
封装了RabbitTemplate工具,用于发送消息
5. 1 导入依赖
<!-- 引入SpringAMQP的依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
5. 2 控制台新建一个队列
5. 3 消息发送
首先配置MQ的地址,在配置文件中添加配置
spring: rabbitmq: username: windStop # RabbitMQ用户名 host: 8.130.10.216 # RabbitMQ主机地址 password: 123 # RabbitMQ密码 port: 5672 # RabbitMQ端口号 virtual-host: /windStop # RabbitMQ虚拟主机
然后编写测试类ConsumerApplicationTest,并利用RabbitTemplate 实现消息发送。
RabbitTemplate 是一个RabbitMQ的模板类,用于发送消息到RabbitMQ队列或者交换机。
@SpringBootTest public class ConsumerApplicationTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void ConsumerTest(){ // 定义要发送的队列 String queue = "simple.queue"; // 定义要发送的信息 String message = "你好啊, spring AMPQ"; rabbitTemplate.convertAndSend(queue,message); } }
5.4 消息接收
上同,首先配置MQ的地址。
spring:
rabbitmq:
username: windStop # RabbitMQ用户名
host: 8.130.10.216 # RabbitMQ主机地址
password: 123 # RabbitMQ密码
port: 5672 # RabbitMQ端口号
virtual-host: /windStop # RabbitMQ虚拟主机
listener包中新建一个类SpringRabbitListener
@Component @Slf4j public class SpringRabbitListener { /** * 接收到的消息会以String类型的msg参数传入方法中 * @param msg */ @RabbitListener(queues = "simple.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列 public void listenSimpleQueueMessage(String msg){ log.info("Spring 消费者接收到的消息:{}",msg); } }
5.5 总结一下核心步骤
Spring AMQP收发消息的步骤如下:
- 引入
spring-boot-starter-amqp
依赖。 - 配置RabbitMQ服务端信息,包括用户名、密码、主机地址、端口号等。
- 使用
RabbitTemplate
来发送消息到RabbitMQ服务器。 - 使用
@RabbitListener
注解声明要监听的队列,并编写相应的方法来处理接收到的消息内容。
6. Work Queues
概念:任务模型。简单来说就是让多个消费者绑定到一个队列中,共同消费队列中的信息。
介绍一下:生产者消费者模型
- 生产者和消费者之间解耦:生产者和消费者之间通过一个共享的缓冲区(队列)来进行通信,彼此不直接依赖。
- 异步性:生产者可以持续不断地生成数据,而消费者可以独立地处理这些数据,实现异步处理。
- 实现多线程并发:生产者和消费者可以在不同的线程中运行,提高系统的吞吐量和效率。
在RabbitMQ中,生产者将消息发送到队列中,而消费者则从队列中获取消息进行处理,实现了生产者消费者模型的应用。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
存在的问题:
发送完消息会被哪个消费者处理呢?还是所有消费者都会处理?怎么分配?
6.1 入门案例
6.1.1 创建队列
首先,我们在控制台创建一个新的队列,命名为work.queue
:
6.1.2 定义两个消费者模型
/** * 接收到的消息会以String类型的msg参数传入方法中 * @param msg */ @RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列 public void listenWorkQueueMessage1(String msg){ System.out.println("消费者1接收到消息:" + msg + "," + LocalDateTime.now()); } @RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列 public void listenWorkQueueMessage2(String msg){ System.err.println("消费者2接收到消息:" + msg + "," + LocalDateTime.now()); }
6.1.3 生产者发送五十条信息
@Test public void workQueueTest(){ // 定义要发送的队列 String queue = "work.queue"; //循环发送五十条数据 for (int i = 1; i <= 50; i++) { String message = "你好, SpringAMQP" + i; rabbitTemplate.convertAndSend(queue,message); } }
通过输出结果可以分析答案是:
消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。每个都是收到一条。
6.1.4 模拟快慢消费者
/** * 接收到的消息会以String类型的msg参数传入方法中 * @param msg */ @RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列 public void listenWorkQueueMessage1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:" + msg + "," + LocalDateTime.now()); Thread.sleep(50);//通过睡眠短时间模拟快消费者 } @RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列 public void listenWorkQueueMessage2(String msg) throws InterruptedException { System.err.println("消费者2接收到消息:" + msg + "," + LocalDateTime.now()); Thread.sleep(500);//通过睡眠长时间模拟慢消费者 }
注意到这两消费者,都设置了Thead.sleep
,模拟任务耗时:
-
消费者1 sleep了50毫秒,相当于每秒钟处理20个消息
-
消费者2 sleep了500毫秒,相当于每秒处理2个消息
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
6.2 能者多劳 (动态分配权重)
默认情况下,RabbitMO的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没
有考虑到消费者是否已经处理完消息,可能出现消息堆积。
默认情况下:无论你有没有处理完都给你分配。设置为1,就是处理完才给你分配。
分配消费者的预取限制
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
这是一个消费者的预取(Prefetch)限制设置。它定义了消费者在从 RabbitMQ 服务器获取消息时一次预取的消息数量。
如果不设置。消费者默认的预取限制将会是无限制的,即一次性获取尽可能多的消息。
-
资源过度占用: 如果消费者一次性获取大量消息,但处理消息的速度较慢,就会导致大量消息堆积在消费者端,消耗大量内存和其他系统资源。这可能导致系统的负载急剧增加,甚至导致系统崩溃。
-
不可控的消费行为: 一次性获取大量消息会导致消费者处理速度不可控,快速消费完部分消息后,可能会因为处理时间长的消息而导致整体处理速度下降。
-
不公平的消息分发: 如果一次性获取大量消息,可能会导致消息在消费者之间分布不均匀,一些消费者可能会快速处理完消息而另一些消费者处理速度较慢,从而导致消息处理效率不高。
-
消息积压和延迟: 一次性获取大量消息可能导致消息积压,影响系统对消息的实时处理能力,也会增加消息的处理延
综上所述,合适的预取限制可以帮助控制系统资源的使用,确保消息的平稳处理,避免系统崩溃和消息处理效率低下的问题。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
7. 交换机类型
在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
-
Publisher:生产者,不再发送消息到队列中,而是发给交换机。
-
Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
-
Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
-
Consumer:消费者,与以前一样,订阅队列,没有变化。当消费者处理完毕后,队列中存储的数据就会被删除。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
-
Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机。
-
Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列。
-
Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符。
-
Headers:头匹配,基于MQ的消息头匹配,用的较少。
7.1 Fanout交换机
在广播模式下,消息发送流程是这样的:复制成n份,发送给每一个队列。
-
1) 可以有多个队列
-
2) 每个队列都要绑定到Exchange(交换机)
-
3) 生产者发送的消息,只能发送到交换机
-
4) 交换机把消息发送给绑定过的所有队列
-
5) 订阅队列的消费者都能拿到消息
作用解析:分布式架构中,每个模块绑定一个队列,然后对于支付完成后,我们就可以广播给多个队列让他们进行处理,比如:支付后发信息通知,支付后添加积分。
7.1.1 代码实现
1. 创建队列:
2. 创建交换机:
3. 绑定队列和交换机之间的关系:
4. 添加消费者:
@RabbitListener(queues = "fanout.queue1")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列 public void listenFanoutQueueMessage1(String msg){ System.err.println("消费者fanout1 接收到消息:" + msg + "," + LocalDateTime.now()); } @RabbitListener(queues = "fanout.queue2")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列 public void listenFanoutQueueMessage2(String msg) { System.err.println("消费者fanout2 接收到消息:" + msg + "," + LocalDateTime.now()); }
5. 添加生成者:
@Test public void fanoutTest(){ // 定义要发送的队列 String exchangeNane = "windStop.fanout"; // 定义要发送的信息 String message = "大家好啊!"; rabbitTemplate.convertAndSend(exchangeNane,null ,message); }
交换机的作用是什么?
-
接收publisher发送的消息
-
将消息按照规则路由到与之绑定的队列
-
不能缓存消息,路由失败,消息丢失
-
FanoutExchange的会将消息路由到每个绑定的队列
7.2 Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。按照规则进行路由。并且一个队列可以绑定多个规则(路由键)。
在Direct模型下:
- 每一个Queue都与Exchange设置一个BindingKey(路由key)。
- 发布者发送消息时,指定消息的RoutingKey。
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。
暗号一样,才会给你。
作用解析:分布式架构中,每个模块绑定一个队列,然后对于支付完成后,我们就可以广播给多个队列让他们进行处理,也有些操作不需要发给所有,就需要按照键匹配,支付后发信息通知,支付后添加积分。我取消支付就不需要这两种,但我还需要其他的支付的操作。
7.2.1 代码实现
1. 创建队列
2. 创建交换机
3. 创建交换机和路由之间的关系
3.1 进入交换机
3.2 绑定关系并且指定 BindingKey
因为RabbitMQ官网没有设置同时绑定多个BindingKey,所以要想绑定多个BindingKey就要bind多次。
绑定成功后的页面
4. 创建消费者
/** * 订阅 * @param msg 接收到的内容 */ @RabbitListener(queues = "direct.queue1")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列 public void listenDirectQueueMessage1(String msg){ System.err.println("消费者fanout1 接收到消息:" + msg + "," + LocalDateTime.now()); } @RabbitListener(queues = "direct.queue2")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列 public void listenDirectQueueMessage2(String msg) { System.err.println("消费者fanout2 接收到消息:" + msg + "," + LocalDateTime.now()); }
5. 创建生产者
这个时候就需要指定第二个参数:
@Test public void directTest(){ // 定义要发送的队列 String exchangeNane = "windStop.direct"; // 定义要发送的信息 String message = "红色:震惊,李旭居然是人!"; rabbitTemplate.convertAndSend(exchangeNane,"red",message); }
因为二者都绑定了,red这个路由key,所以direct.queue1和direct.queue2都能收到。
@Test public void directTest2(){ // 定义要发送的队列 String exchangeNane = "windStop.direct"; // 定义要发送的信息 String message = "蓝色:明天就要上课了。"; rabbitTemplate.convertAndSend(exchangeNane,"blue",message); }
因为只有direct.queue1绑定了blue这个路由key,所以只有direct.queue1能收到。
描述下Direct交换机与Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
7.3. Topic交换机
7.3.1 .说明
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。
只不过Topic
类型Exchange
可以让队列在绑定BindingKey
的时候使用通配符!
BindingKey
一般都是有一个或多个单词组成,多个单词之间以.
分割,例如: item.insert
通配符规则:
-
#
:匹配一个或多个词 -
*
:匹配不多不少恰好1个词
举例:
-
item.#
:能够匹配item.spu.insert
或者item.spu
-
item.*
:只能匹配item.spu
假如此时publisher发送的消息使用的RoutingKey
共有四种:
-
china.news
代表有中国的新闻消息; -
china.weather
代表中国的天气消息; -
japan.news
则代表日本新闻 -
japan.weather
代表日本的天气消息;
解释:
-
topic.queue1
:绑定的是china.#
,凡是以china.
开头的routing key
都会被匹配到,包括:-
china.news
-
china.weather
-
-
topic.queue2
:绑定的是#.news
,凡是以.news
结尾的routing key
都会被匹配。包括:-
china.news
-
japan.news
-
接下来,我们就按照上图所示,来演示一下Topic交换机的用法。
首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。创建步骤和上述一样,最终结果如下:
7.3.2 创建消费者
/** * 通配符订阅 * @param msg 接收到的内容 */ @RabbitListener(queues = "topic.queue1")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列 public void listenTopicQueueMessage1(String msg){ System.out.println("消费者topic1 接收到消息:" + msg + "," + LocalDateTime.now()); } @RabbitListener(queues = "topic.queue2")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列 public void listenTopicQueueMessage2(String msg) { System.err.println("消费者topic2 接收到消息:" + msg + "," + LocalDateTime.now()); }
7.3.3 创建生产者
@Test public void testTopicExchange() { // 交换机名称 String exchangeName = "windStop.topic"; // 消息 String message = "喜报!孙悟空大战哥斯拉,胜!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
因为二者都匹配了,前者前缀匹配,后者后缀匹配,所以topic.queue1和topic.queue2都能收到。
@Test public void testTopicExchange2() { // 交换机名称 String exchangeName = "windStop.topic"; // 消息 String message = "今天天气真不错,我的心情好极了"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.weather", message); }
只有topic.queue1会匹配到。.weacher不符合topic.queue2的后缀要求。
7.3.4 总结
描述下Direct交换机与Topic交换机的差异?
-
Topic交换机接收的消息RoutingKey必须是多个单词,以
.
分割 -
Topic交换机与队列绑定时的bindingKey可以指定通配符
-
#
:代表0个或多个词 -
*
:代表1个词
8. 声明队列和交换机
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,需要重新创建发布时候的RabiitMQ,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。-> 使用可视化面板创建
因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。-> 代码创建
8.1 代码创建的基本API
声明队列和交换机:
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类QueueBuilder构建。
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建。
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建。
1. 创建队列
2. 创建交换机
我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:
而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:
3. fanout示例
基于AMQP协议的消息队列系统,通过声明式的配置方式,RabbitMQ客户端会在应用启动时自动创建交换机和队列,并建立它们之间的对应关系,从而为应用程序提供便捷的消息队列支持。
@Configuration
public class FanoutConfig {
// 声明FanoutExchange交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("windStop.fanout");
}
// 声明第 1 个队列
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
//绑定队列 1 和 交换机
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// 声明第 2 个队列
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
4. direct示例
direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:
@Configuration
public class DirectConfig {
/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("hmall.direct").build();
}
/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}
这种方式,虽然可以实现但是很臃肿,每绑定一个BindingKey就需要多写个路由关系的方法。
8.2 基于注解声明
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
例如,我们同样声明Direct模式的交换机和队列:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
是不是简单多了。
介绍一下:
-
@RabbitListener(bindings = @QueueBinding(...))
: 这里声明了一个RabbitMQ的消息监听器,通过bindings参数指定了队列绑定的相关配置。 -
value = @Queue(name = "direct.queue1")
: 在这里,我们声明了一个名为"direct.queue1"的队列。这表示我们将会监听这个特定的队列。 -
exchange = @Exchange(name = "windStop.direct", type = ExchangeTypes.DIRECT)
: 这里声明了一个名为"windStop
.direct"的订阅类型的交换机。订阅交换机(Direct Exchange)根据消息的routing key将消息路由到特定的队列。 -
key = {"red", "blue"}
: 这里指定了队列和交换机之间的绑定关系。对于队列"direct.queue1",它将会接收所有routing key为"red"或"blue"的消息。
再试试Topic模式:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
9. 总结
这段文档内容非常全面地介绍了 RabbitMQ 的核心概念、安装、配置以及各种交换机类型的使用方法。它包括了 RabbitMQ 的前言介绍,不同类型的通信方式,不同 MQ 的对比,以及 RabbitMQ 的安装和核心模块介绍。同时也涵盖了 Spring AMQP 的使用方法和示例,以及 Work Queues、交换机类型(Fanout、Direct、Topic)的详细说明和代码实现。此外祝大家周末愉快!