“RabbitMQ入门指南:从入门到起飞,这一篇就够!打造高效消息通信系统的第一步“。

1.前言

        RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)的标准,并用Erlang语言编写。作为消息代理,RabbitMQ接收、存储和转发消息,帮助应用程序之间实现异步通信。它提供了一个强大而灵活的消息传递机制,可以在分布式系统中可靠地传递消息,确保消息的顺序性和可靠性。

        RabbitMQ的核心概念包括生产者、消费者、交换机、队列和绑定。生产者负责发送消息,消费者负责接收消息,交换机负责接收来自生产者的消息并将它们路由到一个或多个队列,队列存储消息直到消费者准备接收它们,而绑定则定义了交换机和队列之间的关系。

        RabbitMQ具有许多特性,包括可靠性、灵活的路由、集群和高可用性、可扩展性、管理界面、多种协议支持和可编程性。它被广泛应用于构建分布式系统中的消息队列、异步任务处理、日志收集、事件驱动架构等场景,是一个强大而受欢迎的消息中间件解决方案。

                1.1 前置知识

1. 同步通信 和 异步通信

        微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢? 

解读:

  • 同步通讯:就如同打视频电话,双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。

  • 异步通讯:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。因此你可以多线操作,同时跟多人聊天。

两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信,但是往往响应会有延迟。

所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。

同步通信:服务返回响应后才可以进行后续的操作。

存在的问题:

  • 扩展性差
    • 随着业务规模扩大,产品的功能也在不断完善。每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则(面向修改关闭,面向拓展开放),拓展性不好。

  • 性能下降
    • 我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和。

  • 级联失败 
    • 由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。

      这其实就是同步调用的级联失败问题。

    • 比如:比如说支付成功,短信发送出现问题了,就给我们退款了。

    • 级联失败雪崩:由于一系列问题或错误的积累,最终导致系统或项目崩溃或失败的现象。

异步调用: 只发送通知,发送完就可以结束了,具体你有没有收到,什么时候收到,我不关心。

介绍:

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方

  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器

  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。

这样,发送消息的人和接收消息的人就完全解耦了。

优势:

  • 耦合度更低

  • 性能更好

  • 业务拓展性强

  • 故障隔离,避免级联失败

  • 消峰

    • 消峰的原理就是全部都放在消息队列,里面后续的业务慢慢的取

缺点:

  • 完全依赖于Broker的可靠性、安全性和性能

  • 架构复杂,后期维护和调试麻烦

        1.2 不同MQ之间的对比 

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.

目比较常见的MQ实现:

  • ActiveMQ

  • RabbitMQ

  • RocketMQ

  • Kafka

几种常见MQ的对比:

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

2. RabbitMQ的安装

        2.1 执行Docker命令

docker run -d \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_VHOST=default_vhost \
  -e RABBITMQ_DEFAULT_USER=default_user \
  -e RABBITMQ_DEFAULT_PASS=default_pass \
  --hostname my_rabbitmq \
  --name rabbitmq \
  rabbitmq
  • 15672:RabbitMQ提供的管理控制台的端口

  • 5672:RabbitMQ的消息发送处理接口

参数说明:

  • docker run: 运行 Docker 容器的命令。

  • -d: 在后台运行容器,即以守护进程的方式运行容器。

  • -p 15672:15672 -p 5672:5672: 将容器的端口 15672(RabbitMQ 控制台 Web 界面的端口)和 5672(RabbitMQ 应用访问的端口)映射到主机的对应端口。这样可以通过主机的这些端口来访问 RabbitMQ。

  • -e RABBITMQ_DEFAULT_VHOST=my_vhost: 设置 RabbitMQ 默认的虚拟机名为 my_vhost。虚拟机(vhost)是 RabbitMQ 中用于隔离不同应用程序或服务的逻辑隔离单元。

  • -e RABBITMQ_DEFAULT_USER=admin: 设置 RabbitMQ 默认的用户名为 admin

  • -e RABBITMQ_DEFAULT_PASS=123456: 设置 RabbitMQ 默认的用户密码为 123456。

  • --hostname myRabbit: 指定容器的主机名为 myRabbit。在 RabbitMQ 中,节点名称被用于存储数据,而默认情况下会使用主机名。因此,在此设置了容器的主机名。

  • --name rabbitmq: 设置容器的名称为 rabbitmq

  • rabbitmq: 指定要使用的容器镜像为 rabbitmq,这是 RabbitMQ 官方提供的 Docker 镜像。

综上所述,该命令的作用是以后台方式启动一个 RabbitMQ 容器,配置了默认的虚拟机名、用户名和密码,并将容器的端口映射到主机上,使得可以通过主机访问 RabbitMQ 控制台和应用服务。

        2.2 设置开机自启动

docker update rabbitmq --restart=always

         2.3 启动 rabbitmq_management

docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management

 为什么要启动这个:

这个命令的作用是在名为“rabbitmq”的Docker容器中启用RabbitMQ管理插件。具体来说,它启用了RabbitMQ的管理插件,这个插件提供了一个Web界面,可以通过浏览器来管理RabbitMQ服务器。
通常情况下,启用管理插件是为了方便地监控和管理RabbitMQ服务器,可以通过浏览器访问http://<RabbitMQ服务器的IP地址>:15672来打开RabbitMQ的管理界面。

3. RabbitMQ 核心模块介绍

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方 (发送给交换机)

  • consumer:消费者,也就是消费消息的一方(和队列进行绑定(监听))

  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

    • 交换机只能路由消息,无法存储消息
    • 交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定 
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue,因为RabiitMQ性能很强,单个项目使用会造成巨大的浪费,所以多个项目,实现一套MQ,virtual host就是为了不同交换机产生隔离(和容器概念一样)

4. 数据隔离 

点击Admin选项卡,首先会看到RabbitMQ控制台的用户管理界面:

这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima这个用户。仔细观察用户表格中的字段,如下:

  • Nameitheima,也就是用户名

  • Tagsadministrator,说明itheima用户是超级管理员,拥有所有权限

  • Can access virtual host/,可以访问的virtual host,这里的/是默认的virtual host

对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。

  • 给每个项目创建不同的virtual host,将每个项目的数据隔离。

5.SpringAMQP 

        Spring AMQP(Spring for Advanced Message Queuing Protocol)是 Spring 框架的一个模块,用于与 AMQP(Advanced Message Queuing Protocol)兼容的消息中间件进行集成。AMQP 是一种消息协议,用于在分布式应用程序之间传递消息。

        将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

        但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

5. 1 导入依赖

<!--    引入SpringAMQP的依赖,包含RabbitMQ-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

5. 2 控制台新建一个队列

5. 3 消息发送

首先配置MQ的地址,在配置文件中添加配置

spring:
  rabbitmq:
    username: windStop  # RabbitMQ用户名
    host: 8.130.10.216  # RabbitMQ主机地址
    password: 123       # RabbitMQ密码
    port: 5672          # RabbitMQ端口号
    virtual-host: /windStop  # RabbitMQ虚拟主机

然后编写测试类ConsumerApplicationTest,并利用RabbitTemplate 实现消息发送。

RabbitTemplate 是一个RabbitMQ的模板类,用于发送消息到RabbitMQ队列或者交换机。

@SpringBootTest
public class ConsumerApplicationTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void ConsumerTest(){
        // 定义要发送的队列
        String queue = "simple.queue";
        // 定义要发送的信息
        String message = "你好啊, spring AMPQ";
        rabbitTemplate.convertAndSend(queue,message);
    }
}

 5.4  消息接收

 上同,首先配置MQ的地址。

spring:
  rabbitmq:
    username: windStop  # RabbitMQ用户名
    host: 8.130.10.216  # RabbitMQ主机地址
    password: 123       # RabbitMQ密码
    port: 5672          # RabbitMQ端口号
    virtual-host: /windStop  # RabbitMQ虚拟主机
 

listener包中新建一个类SpringRabbitListener

@Component
@Slf4j
public class SpringRabbitListener {

    /**
     * 接收到的消息会以String类型的msg参数传入方法中
     * @param msg
     */
    @RabbitListener(queues = "simple.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
    public void listenSimpleQueueMessage(String msg){
        log.info("Spring 消费者接收到的消息:{}",msg);
    }
} 

5.5  总结一下核心步骤 

Spring AMQP收发消息的步骤如下:

  1. 引入spring-boot-starter-amqp依赖。
  2. 配置RabbitMQ服务端信息,包括用户名、密码、主机地址、端口号等。
  3. 使用RabbitTemplate来发送消息到RabbitMQ服务器。
  4. 使用@RabbitListener注解声明要监听的队列,并编写相应的方法来处理接收到的消息内容。

6.  Work Queues

概念:任务模型。简单来说就是让多个消费者绑定到一个队列中,共同消费队列中的信息。

介绍一下:生产者消费者模型

  • 生产者和消费者之间解耦:生产者和消费者之间通过一个共享的缓冲区(队列)来进行通信,彼此不直接依赖。
  • 异步性:生产者可以持续不断地生成数据,而消费者可以独立地处理这些数据,实现异步处理。
  • 实现多线程并发:生产者和消费者可以在不同的线程中运行,提高系统的吞吐量和效率。

     在RabbitMQ中,生产者将消息发送到队列中,而消费者则从队列中获取消息进行处理,实现了生产者消费者模型的应用。

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了

存在的问题: 

        发送完消息会被哪个消费者处理呢?还是所有消费者都会处理?怎么分配?

        6.1 入门案例

6.1.1 创建队列

首先,我们在控制台创建一个新的队列,命名为work.queue

6.1.2 定义两个消费者模型
/**
 * 接收到的消息会以String类型的msg参数传入方法中
 * @param msg
 */
@RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenWorkQueueMessage1(String msg){
    System.out.println("消费者1接收到消息:" + msg + "," + LocalDateTime.now());
}

@RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenWorkQueueMessage2(String msg){
    System.err.println("消费者2接收到消息:" + msg + "," + LocalDateTime.now());
}
6.1.3 生产者发送五十条信息            
@Test
public void workQueueTest(){
    // 定义要发送的队列
    String queue = "work.queue";
    //循环发送五十条数据
    for (int i = 1; i <= 50; i++) {
        String message = "你好, SpringAMQP" + i;
        rabbitTemplate.convertAndSend(queue,message);
    }
}

通过输出结果可以分析答案是:

        消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。每个都是收到一条。

6.1.4 模拟快慢消费者
/**
 * 接收到的消息会以String类型的msg参数传入方法中
 * @param msg
 */
@RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenWorkQueueMessage1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:" + msg + "," + LocalDateTime.now());
    Thread.sleep(50);//通过睡眠短时间模拟快消费者
}

@RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
    System.err.println("消费者2接收到消息:" + msg + "," + LocalDateTime.now());
    Thread.sleep(500);//通过睡眠长时间模拟慢消费者
}

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

  • 消费者1 sleep了50毫秒,相当于每秒钟处理20个消息

  • 消费者2 sleep了500毫秒,相当于每秒处理2个消息

        也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。 

        6.2 能者多劳 (动态分配权重)

        默认情况下,RabbitMO的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没

有考虑到消费者是否已经处理完消息,可能出现消息堆积。

        默认情况下:无论你有没有处理完都给你分配。设置为1,就是处理完才给你分配。

分配消费者的预取限制

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息 

 这是一个消费者的预取(Prefetch)限制设置。它定义了消费者在从 RabbitMQ 服务器获取消息时一次预取的消息数量。 

 如果不设置。消费者默认的预取限制将会是无限制的,即一次性获取尽可能多的消息。

  1. 资源过度占用: 如果消费者一次性获取大量消息,但处理消息的速度较慢,就会导致大量消息堆积在消费者端,消耗大量内存和其他系统资源。这可能导致系统的负载急剧增加,甚至导致系统崩溃。

  2. 不可控的消费行为: 一次性获取大量消息会导致消费者处理速度不可控,快速消费完部分消息后,可能会因为处理时间长的消息而导致整体处理速度下降。

  3. 不公平的消息分发: 如果一次性获取大量消息,可能会导致消息在消费者之间分布不均匀,一些消费者可能会快速处理完消息而另一些消费者处理速度较慢,从而导致消息处理效率不高。

  4. 消息积压和延迟: 一次性获取大量消息可能导致消息积压,影响系统对消息的实时处理能力,也会增加消息的处理延

综上所述,合适的预取限制可以帮助控制系统资源的使用,确保消息的平稳处理,避免系统崩溃和消息处理效率低下的问题。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

7. 交换机类型 

在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机。

  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。

  • Consumer:消费者,与以前一样,订阅队列,没有变化。当消费者处理完毕后,队列中存储的数据就会被删除。
     

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机。

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列。

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符。

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

        7.1 Fanout交换机 

在广播模式下,消息发送流程是这样的:复制成n份,发送给每一个队列。 

  • 1) 可以有多个队列

  • 2) 每个队列都要绑定到Exchange(交换机)

  • 3) 生产者发送的消息,只能发送到交换机

  • 4) 交换机把消息发送给绑定过的所有队列

  • 5) 订阅队列的消费者都能拿到消息

作用解析:分布式架构中,每个模块绑定一个队列,然后对于支付完成后,我们就可以广播给多个队列让他们进行处理,比如:支付后发信息通知,支付后添加积分。 

         7.1.1 代码实现
1. 创建队列:

2. 创建交换机:

3.  绑定队列和交换机之间的关系: 

4. 添加消费者:
@RabbitListener(queues = "fanout.queue1")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenFanoutQueueMessage1(String msg){
    System.err.println("消费者fanout1 接收到消息:" + msg + "," + LocalDateTime.now());
}

@RabbitListener(queues = "fanout.queue2")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenFanoutQueueMessage2(String msg) {
    System.err.println("消费者fanout2 接收到消息:" + msg + "," + LocalDateTime.now());
}
5.  添加生成者:
@Test
public void fanoutTest(){
    // 定义要发送的队列
    String exchangeNane = "windStop.fanout";
    // 定义要发送的信息
    String message = "大家好啊!";
    rabbitTemplate.convertAndSend(exchangeNane,null ,message);
}

交换机的作用是什么?

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

  • 不能缓存消息,路由失败,消息丢失

  • FanoutExchange的会将消息路由到每个绑定的队列 

7.2 Direct交换机 

        在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。按照规则进行路由。并且一个队列可以绑定多个规则(路由键)。

在Direct模型下:

  • 每一个Queue都与Exchange设置一个BindingKey(路由key)。
  • 发布者发送消息时,指定消息的RoutingKey。
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。

暗号一样,才会给你。 

        作用解析:分布式架构中,每个模块绑定一个队列,然后对于支付完成后,我们就可以广播给多个队列让他们进行处理,也有些操作不需要发给所有,就需要按照键匹配,支付后发信息通知,支付后添加积分。我取消支付就不需要这两种,但我还需要其他的支付的操作。

        7.2.1 代码实现

1. 创建队列

2. 创建交换机

3. 创建交换机和路由之间的关系

         3.1 进入交换机

        3.2 绑定关系并且指定 BindingKey

因为RabbitMQ官网没有设置同时绑定多个BindingKey,所以要想绑定多个BindingKey就要bind多次。

 

绑定成功后的页面

4. 创建消费者

/**
 * 订阅
 * @param msg 接收到的内容
 */
@RabbitListener(queues = "direct.queue1")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenDirectQueueMessage1(String msg){
    System.err.println("消费者fanout1 接收到消息:" + msg + "," + LocalDateTime.now());
}

@RabbitListener(queues = "direct.queue2")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenDirectQueueMessage2(String msg) {
    System.err.println("消费者fanout2 接收到消息:" + msg + "," + LocalDateTime.now());
}

5. 创建生产者

这个时候就需要指定第二个参数:

@Test
public void directTest(){
    // 定义要发送的队列
    String exchangeNane = "windStop.direct";
    // 定义要发送的信息
    String message = "红色:震惊,李旭居然是人!";
    rabbitTemplate.convertAndSend(exchangeNane,"red",message);
}

因为二者都绑定了,red这个路由key,所以direct.queue1和direct.queue2都能收到。

@Test
public void directTest2(){
    // 定义要发送的队列
    String exchangeNane = "windStop.direct";
    // 定义要发送的信息
    String message = "蓝色:明天就要上课了。";
    rabbitTemplate.convertAndSend(exchangeNane,"blue",message);
}

 因为只有direct.queue1绑定了blue这个路由key,所以只有direct.queue1能收到。

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

7.3. Topic交换机

7.3.1 .说明

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如item.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu

假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;

  • china.weather 代表中国的天气消息;

  • japan.news 则代表日本新闻

  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:

    • china.news

    • china.weather

  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:

    • china.news

    • japan.news

接下来,我们就按照上图所示,来演示一下Topic交换机的用法。

首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。创建步骤和上述一样,最终结果如下:

7.3.2 创建消费者 
/**
 * 通配符订阅
 * @param msg 接收到的内容
 */
@RabbitListener(queues = "topic.queue1")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenTopicQueueMessage1(String msg){
    System.out.println("消费者topic1 接收到消息:" + msg + "," + LocalDateTime.now());
}

@RabbitListener(queues = "topic.queue2")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenTopicQueueMessage2(String msg) {
    System.err.println("消费者topic2 接收到消息:" + msg + "," + LocalDateTime.now());
}
7.3.3 创建生产者
@Test
public void testTopicExchange() {
    // 交换机名称
    String exchangeName = "windStop.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

 因为二者都匹配了,前者前缀匹配,后者后缀匹配,所以topic.queue1和topic.queue2都能收到。

@Test
public void testTopicExchange2() {
    // 交换机名称
    String exchangeName = "windStop.topic";
    // 消息
    String message = "今天天气真不错,我的心情好极了";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}

只有topic.queue1会匹配到。.weacher不符合topic.queue2的后缀要求。  

7.3.4 总结

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割

  • Topic交换机与队列绑定时的bindingKey可以指定通配符

  • #:代表0个或多个词

  • *:代表1个词

8. 声明队列和交换机

        在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,需要重新创建发布时候的RabiitMQ,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。-> 使用可视化面板创建 

因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。-> 代码创建

         8.1 代码创建的基本API

声明队列和交换机:
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  1. Queue:用于声明队列,可以用工厂类QueueBuilder构建。
  2. Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建。
  3. Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建。
 1. 创建队列

2. 创建交换机

我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:

而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:

        3. fanout示例

        基于AMQP协议的消息队列系统,通过声明式的配置方式,RabbitMQ客户端会在应用启动时自动创建交换机和队列,并建立它们之间的对应关系,从而为应用程序提供便捷的消息队列支持。

@Configuration
public class FanoutConfig {
    // 声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("windStop.fanout");
    }
    // 声明第 1 个队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    //绑定队列 1 和 交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    // 声明第 2 个队列
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }

}
       4. direct示例

direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:

@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

这种方式,虽然可以实现但是很臃肿,每绑定一个BindingKey就需要多写个路由关系的方法。

8.2 基于注解声明

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

例如,我们同样声明Direct模式的交换机和队列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

是不是简单多了。

介绍一下:

  1. @RabbitListener(bindings = @QueueBinding(...)): 这里声明了一个RabbitMQ的消息监听器,通过bindings参数指定了队列绑定的相关配置。

  2. value = @Queue(name = "direct.queue1"): 在这里,我们声明了一个名为"direct.queue1"的队列。这表示我们将会监听这个特定的队列。

  3. exchange = @Exchange(name = "windStop.direct", type = ExchangeTypes.DIRECT): 这里声明了一个名为"windStop.direct"的订阅类型的交换机。订阅交换机(Direct Exchange)根据消息的routing key将消息路由到特定的队列。

  4. key = {"red", "blue"}: 这里指定了队列和交换机之间的绑定关系。对于队列"direct.queue1",它将会接收所有routing key为"red"或"blue"的消息。

再试试Topic模式:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

9. 总结

        这段文档内容非常全面地介绍了 RabbitMQ 的核心概念、安装、配置以及各种交换机类型的使用方法。它包括了 RabbitMQ 的前言介绍,不同类型的通信方式,不同 MQ 的对比,以及 RabbitMQ 的安装和核心模块介绍。同时也涵盖了 Spring AMQP 的使用方法和示例,以及 Work Queues、交换机类型(Fanout、Direct、Topic)的详细说明和代码实现。此外祝大家周末愉快!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/696897.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

设计模式 —— 观察者模式

设计模式 —— 观察者模式 什么是观察者模式观察者模式定义观察者模式的角色观察者模式的使用场景观察者模式的实现 被观察者&#xff08;Subject&#xff09;观察者&#xff08;Observer&#xff09;通知&#xff08;notify&#xff09;更新显示&#xff08;update&#xff09…

44【Aseprite 作图】樱花丸子——拆解

1 枝干 2 花朵&#xff1a;其实只要形状差不多都行&#xff0c;有三个颜色&#xff0c;中间花蕊颜色深一点&#xff0c;中间花蕊外的颜色偏白&#xff1b;不透明度也可以改一下&#xff0c;就变成不同颜色 3 丸子 最外层的颜色最深&#xff0c;中间稍浅&#xff0c;加一些高光…

Jmeter分布式、测试报告、并发数计算、插件添加方式、常用图表

Jmeter分布式 应用场景 当单个测试机无法模拟用户要求的业务场景时&#xff0c;可以使用多台测试机进行模拟&#xff0c;就是Jmeter的分布 式测试。 Jmeter分布式执行原理 Jmeter分布测试时&#xff0c;选择其中一台作为控制机&#xff08;Controller&#xff09;&#xff0c…

第十二届蓝桥杯单片机国赛练习代码

文章目录 前言一、问题重述二、主函数总结 前言 第十五蓝桥杯国赛落幕已有十天&#xff0c;是时候总结一下&#xff0c;这个专栏也将结束。虽然并没有取得预期的结果&#xff0c;但故事结尾并不总是美满的。下面是赛前练习的第十二届国赛的代码。 一、问题重述 二、主函数 完整…

JavaScript前端技术入门教程

引言 在前端开发的广阔天地中&#xff0c;JavaScript无疑是最耀眼的一颗明星。它赋予了网页动态交互的能力&#xff0c;让网页从静态的文本和图片展示&#xff0c;进化为可以与用户进行实时交互的丰富应用。本文将带您走进JavaScript的世界&#xff0c;为您提供一个入门级的教…

文件的基础必备知识(初学者入门)

1. 为什么使用文件 2. 什么是文件 3. 二进制文件和文本文件 4. 文件的打开和关闭 1.为什么使用文件 我们写的程序数据是存储在电脑内存中&#xff0c;如果程序退出&#xff0c;内存回收&#xff0c;数据就丢失&#xff0c;等程序再次运行时&#xff0c;上次的数据已经消失。面…

C++~~期末复习题目讲解---lijiajia版本

目录 1.类和对象 &#xff08;3&#xff09;创建对象的个数 &#xff08;3&#xff09;全局变量&#xff0c;局部变量 &#xff08;4&#xff09;构造函数的执行次数 &#xff08;5&#xff09;静态动态析构和构造顺序 &#xff08;6&#xff09;初始化顺序和声明顺序 &a…

MySQL数据库的基础:逻辑集合数据库与表的基础操作

本篇会加入个人的所谓鱼式疯言 ❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言 而是理解过并总结出来通俗易懂的大白话, 小编会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的. &#x1f92d;&#x1f92d;&#x1f92d;可能说的不是那么严谨.但小编初心是能让更多人能接…

简单聊聊大数据解决方案

个人名片 &#x1f393;作者简介&#xff1a;java领域优质创作者 &#x1f310;个人主页&#xff1a;码农阿豪 &#x1f4de;工作室&#xff1a;新空间代码工作室&#xff08;提供各种软件服务&#xff09; &#x1f48c;个人邮箱&#xff1a;[2435024119qq.com] &#x1f4f1…

Spring5

文章目录 1. Spring 是什么&#xff1f;2. IoC3. Spring Demo4. IoC 创建对象的方式 / DI 方式注入的默认参数在哪里设定? 5. Spring 配置tx:annotation-driven 用于启用基于注解的事务管理 6. Bean的作用域7. 在Spring中有三种自动装配的方式1. 在xml中显式的配置2. 在java中…

node的安装

node是前端开发环境&#xff0c;所以运行前端程序需要安装和配置node 1. 下载安装node 去node官网选择你需要的版本进行下载 Node.js — Download Node.js (nodejs.org) ​ 下载到本地后一路点击next傻瓜式安装&#xff0c;安装成功后测试是否安装成功 node -v 显示node版…

电路防护-贴片陶瓷气体放电管

贴片陶瓷气体放电管 GDT工作原理GDT主要特性参数典型电路压敏电阻与 TVS 管的区别 GDT工作原理 陶瓷气体放电管是一种电子器件&#xff0c;其工作原理基于气体放电现象。这种管子的内部填充了一种特定的气体&#xff0c;通常是氖气或氩气。当管子两端施加足够的电压时&#xf…

刚刚❗️德勤2025校招暑期实习测评笔试SHL测评题库已发(答案)

&#x1f4e3;德勤 2024暑期实习测评已发&#xff0c;正在申请的小伙伴看过来哦&#x1f440; ㊙️本次暑期实习优先考虑2025年本科及以上学历的毕业生&#xff0c;此次只有“审计及鉴定”“税务与商务咨询”两个部门开放了岗位~ ⚠️测评注意事项&#xff1a; &#x1f44…

USB转I2C转SPI芯片CH341

CH340与CH341区别 CH340主要用于将USB转换为各种串口&#xff0c;CH340H和CH340S可以实现USB转并口。 CH341和340的不同之处在于CH341提供I2C和SPI接口&#xff0c;方便连接到I2C或SPI总线操作相关的器件。 CH341主要有6种封装。见表1. CH341T SSOP-20封装和丝印 USB 总线转接…

大模型基础——从零实现一个Transformer(2)

大模型基础——从零实现一个Transformer(1) 一、引言 上一章主要实现了一下Transformer里面的BPE算法和 Embedding模块定义 本章主要讲一下 Transformer里面的位置编码以及多头注意力 二、位置编码 2.1正弦位置编码(Sinusoidal Position Encoding) 其中&#xff1a; pos&…

【JVM】从编译后的指令集来再次理解++i和i++的执行顺序

JVM为什么要选用基于栈的指令集架构 与基于寄存器的指令集架构相比&#xff0c;基于栈的指令集架构不依赖于硬件&#xff0c;因此可移植性更好&#xff0c;跨平台性更好因为栈结构的特性&#xff0c;永远都是先处理栈顶的第一条指令&#xff0c;因此大部分指令都是零地址指令&…

SpringMVC[从零开始]

SpringMVC SpringMVC简介 1.1什么是MVC MVC是一种软件架构的思想&#xff0c;将软件按照模型、视图、控制器来划分 M:Model&#xff0c;模型层&#xff0c;指工程中的JavaBean&#xff0c;作用是处理数据 JavaBean分为两类&#xff1a; 一类称为实体类Bean&#xff1a;专…

对猫毛过敏?怎么有效的缓解过敏症状,宠物空气净化器有用吗?

猫过敏是一种常见的过敏反应&#xff0c;由猫的皮屑、唾液或尿液中的蛋白质引起。这些蛋白质被称为过敏原&#xff0c;它们可以通过空气传播&#xff0c;被人体吸入后&#xff0c;会触发免疫系统的过度反应。猫过敏是宠物过敏中最常见的类型之一&#xff0c;对许多人来说&#…

【Java】static 修饰变量

static 一种java内置关键字&#xff0c;静态关键字&#xff0c;可以修饰成员变量、成员方法。 static 成员变量 1.static 成员变量2.类变量图解3.类变量的访问4.类变量的内存原理5.类变量的应用 1.static 成员变量 成员变量按照有无static修饰&#xff0c;可以分为 类变量…

Python学习打卡:day02

day2 笔记来源于&#xff1a;黑马程序员python教程&#xff0c;8天python从入门到精通&#xff0c;学python看这套就够了 8、字符串的三种定义方式 字符串在Python中有多种定义形式 单引号定义法&#xff1a; name 黑马程序员双引号定义法&#xff1a; name "黑马程序…