消息队列-RabbitMQ

消息队列-RabbitMQ

中间件

中间件就是帮助连接多个系统,能让多个系统紧密协作的技术或者组件。比如:redis、消息队列。

比如在分布式系统中,将整个系统按业务进行拆分。分成不同的子系统,系统A负责往 redis 存数据,系统B从 redis 中取数据。两个系统借助 redis 进行协作。这时 redis 就充当了中间人的角色,连接起了两个系统,这就是中间件的概念。

消息队列

消息队列是一种很常见的中间件。字面意思来看就是存储消息的队列。有三个关键词:存储、消息、队列。就是能够存储任意数据结构的数据,例如字符串、二进制数据、JSON等,且有着队列先进先出的数据结构特点。

消息队列的应用场景(作用):在多个不同的系统、应用之间实现消息的传输,不用考虑应用的编程语言、系统、框架等。比如让 Java 开发的应用发消息,让 python 开发的应用收消息,这样不用把所有代码写到同一个项目里,还能充分发挥不同系统的优势,实现应用的解耦。

消息队列的模型

消息队列主要由五部分组成:消息生产者(Producer)、消息消费者(Consumer)、消息(Message)、消息队列(Queue)。

以收寄快递的场景为例:发件人小明(消息生产者)将一个包裹(消息)交给快递员,快递员将包裹直接送到快递柜(消息队列),等收件人小王(消息消费者)什么时候有时间就去快递柜取快递。

消息队列的优点

异步处理

同步和异步。同步就像是打电话,电话接通后就要停下手头的工作先处理电话回应。异步就像是发邮件,只要把邮件发出去就完事,接着干自己的活,不管对面什么时候才回应。

有了异步处理意味着生产者发送完消息后可以立即转而进行其它任务,无需等待消费者的处理响应,避免了阻塞。如下图所示。

如果是同步的情况下,发布支付成功事件,假设其它服务都在150ms完成返回,这样完成耗时就是50ms+10ms+150ms,足足要210ms。但如果是异步的话,发布支付成功事件就不管了,这样仅需50ms+10ms。

image-20240202162000320

流量削峰

流量削峰是指当前消费者的处理能力有限(例如,AI 应用一次回答时间要隔几秒才能返回响应),而用户的请求量又很大。我们可以将用户的请求存储到消息队列中,利用队列数据结构的特性,按照实际情况,处理完一个请求后再从队列中取出下一个请求。这样就很好的保护了系统,不管请求量多大都能将流量高峰像放到水管的水流一样,以恒定流速稳定的处理。

数据持久化

它能将消息集中存储到硬盘里实现消息持久化,即使服务器宕机重启后数据也不会丢失。

分布式消息队列的优势

消息队列运行在单个应用程序内部,用于实现应用程序内部的异步处理,对于提高应用程序性能非常有用,但它们不具备跨进程或跨网络的能力,此时它仅仅是一般的消息队列。

而分布式消息队列是一种跨网络、跨系统的,可以在多个应用程序之间进行消息的发送和接收。它作用于多个系统构成紧密协作,此时的消息队列就是分布式消息队列。

分布式消息队列的优势

分布式消息队列除了有异步处理、流量削峰、数据持久化的优势还有可扩展性、应用解耦

可扩展性,这是分布式与单机最大的区别。如果一个服务器只能处理1000个用户请求,超出这个数量服务器可能就无法承受。分布式消息队列的节点可以水平扩展,系统可以根据需要动态地添加或移除节点,以应对不断增长的消息流量和用户请求。

应用解耦,一个大系统拆分成不同的子系统,每个子系统负责一部分业务互不干扰,还可以连接不同语言、框架开发的系统,让这些系统能更灵活,这个就是分布式架构的系统。利用消息队列完成消息的传递,衔接起各个子系统构成一个大系统。

消息队列的应用场景

  1. 耗时的场景(异步)
  2. 高并发场景(异步、流量削峰)
  3. 分布式系统协作(应用解耦)
  4. 强稳定性的场景(比如金融业务,持久化、可靠性、流量削峰)

消息队列的缺点

消息队列并非适合所有情况,某些情况下我们应该避免使用消息队列,或者需要认清使用消息队列可能会遇到的问题。

投入成本

使用消息队列意味着你需要学习和掌握一个新的工具,并在你的系统中引入一个额外的中间件。这将会使你的系统变得更复杂,并需要更多的维护工作。若你在公司实施此类解决方案,我们通常会选择由第三方大公司提供的稳定中间件,而这会产生额外的成本。即使你自己部署和维护,也需要额外的资源投入。

消息的顺序性

因为队列数据结构的特性,消息需要按照特定的顺序被消费。

数据的一致性

这是任何分布式系统都要考虑的问题,要保证数据的一致性,防止消息的重复消费,避免同一个系统多次处理同一条消息。分布式锁就是为了解决分布式系统中多个服务器之间的一致性问题。

消息队列的可靠性

开始使用消息队列,就需要承担由此带来的各种可能问题,例如消息丢失。并非消息队列就可以保证消息不会丢失。比如在发送消息的过程中,可能就因为某些原因而失败;或者消息队列宕机情况的发送。

主流消息队列选型

比较常见的MQ实现:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

image-20240203214744783

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

RabbitMQ 的使用

RabbitMQ 官网教程:https://www.rabbitmq.com/getstarted.html

在 RabbitMQ 的官网教程中提供了5个不同的Demo示例,每种都有不同语言实现的版本,这里我们选择使用Java教程。

image-20240202173024856

HelloWord

第一种 HelloWorld 快速入门,它是基于最基础的消息队列模型来实现的,是一对一的消息队列模型。包括三个角色:

  • producer:消息生产者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息

image-20240202173314203

示例代码如下

生产者代码

package com.gdit.rabbitmq.mq_java.helloWorld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

// 定义一个名为 SingleProducer 的类,用于实现消息发送功能
public class SingleProducer {

    // 定义消息队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {

        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.138.*.*"); // 设置 RabbitMQ 所在服务器主机名,本地就localhost
        factory.setUsername("***"); // 指定连接用户名
        factory.setPassword("***"); // 指定连接密码

        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        try (Connection connection = factory.newConnection();
             // 通过已建立的连接创建一个的通道
             Channel channel = connection.createChannel()) {
            // 在通道上声明(创建)队列 参数一:队列名称 参数二:是否持久化 参数三:是否为独占队列 参数四:无人使用是否自动删除
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 要发送的消息内容
            String message = "Hello World!";
            // 使用 channel.basicPublish 方法将消息发送到指定队列 参数一:交换机名称 参数二:路由键(队列名称) 参数三:其它 参数四:消息内容
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

声明(创建)队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

参数一 queue:队列名称 (注意,同名称的消息队列,只能用同样的参数创建一次)

参数二 durable:消息队列重启后,消息是否丢失(消息持久化)

参数三 exclusive :是否只允许当前这个创建队列的连接操作队列(一般为false)

参数四 autoDelete:没人使用队列后是否自动删除该队列

运行生产者代码,查看 IDEA 和 RabbitMQ 控制台

image-20240202175631652

image-20240202175748783

可以看到 IDEA 控制台发送成功,RabbitMQ 控制台多了个 hello 队列,队列内消息总数 1 条,已准备 1 条

消息发送之后,我们根据官方文档编写消费者代码去消费消息

消费者代码

package com.gdit.rabbitmq.mq_java.helloWorld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class SingleConsumer {

    // 定义消息队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {

        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        Connection connection = factory.newConnection();
        // 通过已建立的连接创建一个的通道
        Channel channel = connection.createChannel();

        // 在通道上声明(创建)队列,在该通道上声明要监听的队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 处理接收的消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 将消息体转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'"); // 打印消息
        };
        // 在通道上开始消费队列中的消息,接收的消息会传递给 deliverCallback 来处理,会持续阻塞
        // 参数1:队列名称 参数2:是否自动确认 参数3:消息接收回调函数
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

    }

}

运行消费者代码,查看 IDEA 和 RabbitMQ 控制台

image-20240202181059940

image-20240202180939686

可以看到 IDEA 控制台接收消息成功,RabbitMQ 控制台hello队列内消息总数 0 条,已准备 0 条。说明消费成功了

Work Queues

第二种 Work Queues,它是一对多的消息队列模型,就是一个生产者给一个队列发送消息,可以有多个消费者从这个队列取消息。也是包括三个角色:

  • producer:消息生产者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息

image-20240202195903748

适用场景:多个机器同时去队列接受并处理任务(尤其是每个机器的处理能力有限)。假设当前只有一个消费者,由于其性能有限,无法处理队列中的所有任务。此时我们通过增加机器的方式,让多台机器监听同一队列,处理队列内的消息。就是处理并发

公平分配

在这个模型中,官网有提到一个公平分配。意思是生产者发送多条消息后,这些消息会按照轮询方式将消息分配给消费者,就是一人一条轮着来。这样做的方式可以做到公平分配,但每台机器处理一条消息的时间不同。比如机器A处理一条消息需要10s,而机器B处理一条消息只需要5s。如果还按照公平分配那机器B会因为机器A被拖慢速度。这里我们可以通过控制单个消费者处理任务积压数channel.basicQos(prefetchCount);一次只处理一条消息,也就是说,在消费者未确认前,RabbitMQ 不会向该消费者发送更多的消息。而先完成消费确认可以从消息队列中取下一条消息。这样就不用等了。不过它只对手动确认消息生效(后面讲),如果自动确认消息那它收到消息就确认了还会造成公平分配的情况发生。

image-20240202205420928

示例代码如下:

消息生产者代码

package com.gdit.rabbitmq.mq_java.work_queues;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.util.Scanner;

public class MultiProducer {

    // 定义消息队列名称
    private static final String TASK_QUEUE_NAME = "multi_queue";

    public static void main(String[] argv) throws Exception {

        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");

        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        try (Connection connection = factory.newConnection();
             // 通过已建立的连接创建一个的通道
             Channel channel = connection.createChannel()) {
            // 声明(创建)队列
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

            // 使用循环,每当在控制台输入一行文本,就将其作为消息发送
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.nextLine();
                // 发布消息到队列,设置消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN
                channel.basicPublish("", TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }

}

消息消费者代码

消费者1

package com.gdit.rabbitmq.mq_java.work_queues;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class MultiConsumer1 {

    private static final String TASK_QUEUE_NAME = "multi_queue";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        final Connection connection = factory.newConnection();

        // 通过已建立的连接创建一个的通道
        final Channel channel = connection.createChannel();

        // 在通道上声明(创建)队列,在该通道上声明要监听的队列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 控制单个消费者的处理任务挤压数,每次消费者最多同时处理 1 个任务
        channel.basicQos(1);

        // 处理接收的消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 将消息体转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);

            try {
                // 打印接受的消息
                System.out.println(" [消费者 1] Received '" + message + "'");
                // 消息确认
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                Thread.sleep(10000); // 休眠10s,模拟业务处理
            } catch (InterruptedException e) {
                e.printStackTrace();
                // 发生异常后,拒绝确认消息,发送失败消息,并不重新投递该消息
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            } finally {
                System.out.println(" [x] Done");
                // 消息确认
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 在通道上开始消费队列中的消息,接收的消息会传递给 deliverCallback 来处理,会持续阻塞
        // 参数1:队列名称 参数2:是否自动确认 参数3:消息接收回调函数
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });

    }
}

消费者2

// 消费者2代码与消费者1代码基本一致,只是消费者2休眠5s模拟业务处理
...
try {
    // 打印接受的消息
    System.out.println(" [消费者 1] Received '" + message + "'");
    // 消息确认
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    Thread.sleep(5000); // 休眠10s,模拟业务处理
} 
...

测试运行

测试未设置 channel.basicQos(1);

// 控制单个消费者的处理任务挤压数,每次消费者最多同时处理 1 个任务
// channel.basicQos(1);

image-20240202211839057

可以看到,未设置 channel.basicQos(1);消息公平分配给了两个消费者,没考虑不同消费者的处理能力

测试设置 channel.basicQos(1);

// 控制单个消费者的处理任务挤压数,每次消费者最多同时处理 1 个任务
channel.basicQos(1);

image-20240202212224307

可以看到,设置 channel.basicQos(1);后,消费者2处理完一条消息后就去拿下一条了,不用等消费者1处理完才能拿。

消息确认机制

为了保证消息成功被消费,rabbitmq 提供了消息确认机制,当消费者接收到消息后要给一个反馈:

  • ack:消费成功
  • nack:消费失败
  • reject:拒绝

消息确认机制可以设置为自动确认,不过不建议自动确认。设置为 true 后消息收到立马就确认了,此时工作可能还未完成,建议设置成 false,根据实际情况手动进行确认。

// 参数1:队列名称 参数2:是否自动确认 参数3:消息接收回调函数
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {});

消息确认操作

// 指定确认某条消息 参数2:表示是否批量确认,即是否需要确认所有历史消息知道这条消息为止
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

// 指定某条消息消费失败 参数2:表示是否批量确认 参数3:是否重新入队,可用于重试
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);

// 指定拒绝某条消息 参数2:是否重新入队,可用于重试
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); 

Publish/Subscribe

发布订阅模型

image-20240202221454684

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给交换机

  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:

    • Fanout:扇出,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化

  • Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

发布订阅模型根据交换机类型不同分为三种:Fanout Exchange、Direct Exchange、Topic Exchange。类似于广播、单播、组播

image-20240202221956540

Fanout

fanout模型会将消息交给所有绑定到交换机的队列,它与 work queue 不同的是,fanout会把同一条消息发给所有与之绑定的队列里。而 work queue 则是多个消费者从一个队列里取不同的消息。

image-20240203143658191

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

接下来 demo 的流程如下:

  1. 创建一个消息生产者类;
  2. 声明交换机名称 fanout_exchange,消息生产者向交换机发送消息;
  3. 创建一个消息消费者类;
  4. 并创建两个队列与交换机 fanout_exchange 绑定,同时接收消息

demo 示例代码如下:

消息生产者代码

package com.gdit.rabbitmq.mq_java.publish_subscribe.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class FanoutProducer {

    // 定义交换机名称
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] argv) throws Exception {

        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");

        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        try (Connection connection = factory.newConnection();
             // 通过已建立的连接创建一个的通道
             Channel channel = connection.createChannel()) {
            // 声明(创建)交换机 参数1:交换机名称 参数2:交换机类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            // 使用循环,每当在控制台输入一行文本,就将其作为消息发送
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.nextLine();
                // 发布消息到指定交换机,不指定路由键
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }

}

与之前不同的是消息生产者没有队列的出现了,而是出现了交换机。声明交换机并将消息发送至交换机。

消息消费者代码:

package com.gdit.rabbitmq.mq_java.publish_subscribe.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class FanoutConsumer {

    // 定义交换机名称
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        final Connection connection = factory.newConnection();

        // 创建两个通道
        final Channel channel1 = connection.createChannel();
        final Channel channel2 = connection.createChannel();
        // 声明(创建)交换机
        channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 创建队列1
        String queueName1 = "xiaowang_queue";
        channel1.queueDeclare(queueName1, true, false, false, null); // 声明(创建队列)
        channel1.queueBind(queueName1, EXCHANGE_NAME, ""); // 绑定队列与交换机
        // 创建队列2
        String queueName2 = "xiaoli_queue";
        channel2.queueDeclare(queueName2, true, false, false, null); // 声明(创建队列)
        channel2.queueBind(queueName2, EXCHANGE_NAME, ""); // 绑定队列与交换机

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 处理队列1接收的消息
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            // 将消息体转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            // 打印接收的消息
            System.out.println(" [小王] Received '" + message + "'");
        };
        // 开始消费队列1中的消息,
        channel1.basicConsume(queueName1, true, deliverCallback1, consumerTag -> {});

        // 处理队列2接收的消息
        DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
            // 将消息体转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            // 打印接收的消息
            System.out.println(" [小李] Received '" + message + "'");
        };
        // 开始消费队列1中的消息,
        channel2.basicConsume(queueName2, true, deliverCallback2, consumerTag -> {});

    }

}

在消息消费者的代码中通过创建两个通道来处理两个队列,并新增了交换机,声明交换机后将两个队列都绑定到一个交换机上。

测试运行

image-20240202225842978

可以看到,fanout类型的交换机,消息生产者将消息发给 fanout_exchange 交换机后,与之绑定的两个队列都收到了消息。

Direct

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方再向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

接下来 demo 的流程如下:

  1. 创建一个消息生产者类;
  2. 声明交换机名称 direct_exchange,消息生产者向交换机发送消息并指定路由键;
  3. 创建一个消息消费者类;
  4. 并创建两个队列与交换机 direct_exchange 绑定,并绑定路由键,接收消息

demo 示例代码如下:

消息生产者代码:

package com.gdit.rabbitmq.mq_java.publish_subscribe.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class DirectProducer {

    // 定义交换机名称
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] argv) throws Exception {

        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");

        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        try (Connection connection = factory.newConnection();
             // 通过已建立的连接创建一个的通道
             Channel channel = connection.createChannel()) {
            // 声明(创建)交换机 参数1:交换机名称 参数2:交换机类型
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            // 使用循环,每当在控制台输入一行文本,就将其作为消息发送
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String userInput = scanner.nextLine(); // 获取控制台输入
                String[] strings = userInput.split(" "); // 以空格划分为数组
                String message = strings[0]; // 第一个是消息
                String routingKey = strings[1]; // 第二个是路由键
                // 发布消息到指定交换机,根据输入指定路由键
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }

}

为方便测试,此处获取控制台输入以空格区分,前半部分为消息,后半部分为路由键

消息消费者代码

package com.gdit.rabbitmq.mq_java.publish_subscribe.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class DirectConsumer {

    // 定义交换机名称
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        final Connection connection = factory.newConnection();

        // 创建两个通道
        final Channel channel1 = connection.createChannel();
        final Channel channel2 = connection.createChannel();
        // 声明(创建)交换机
        channel1.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 创建队列1
        String queueName1 = "tom_queue";
        channel1.queueDeclare(queueName1, true, false, false, null); // 声明(创建队列)
        channel1.queueBind(queueName1, EXCHANGE_NAME, "blue"); // 绑定队列与交换机
        channel1.queueBind(queueName1, EXCHANGE_NAME, "yellow"); // 绑定队列与交换机
        // 创建队列2
        String queueName2 = "jerry_queue";
        channel2.queueDeclare(queueName2, true, false, false, null); // 声明(创建队列)
        channel2.queueBind(queueName2, EXCHANGE_NAME, "red"); // 绑定队列与交换机
        channel2.queueBind(queueName2, EXCHANGE_NAME, "yellow"); // 绑定队列与交换机

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 处理队列1接收的消息
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            // 将消息体转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            // 打印接收的消息
            System.out.println(" [汤姆] Received '" + message + "'");
        };
        // 开始消费队列1中的消息,
        channel1.basicConsume(queueName1, true, deliverCallback1, consumerTag -> {});

        // 处理队列2接收的消息
        DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
            // 将消息体转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            // 打印接收的消息
            System.out.println(" [杰瑞] Received '" + message + "'");
        };
        // 开始消费队列1中的消息,
        channel2.basicConsume(queueName2, true, deliverCallback2, consumerTag -> {});

    }

}

在消息消费者代码中,我们在绑定队列交换机时指定了路由键。注意 路由键可以指定多个

测试运行

image-20240203150802471

可以看到,消息生产者第一条消息指定了路由键 yellow,在消息消费者的两个队列都绑定了 yellow 路由键,两条队列都收到了。第二条消息指定了路由键 red,只有 jerry_queue 队列收到了消息。第三条消息指定了路由键 blue,只有 tom_queue 队列收到了消息。所有 Direct 交换机需要指定路由键完成消息的发送与接收。

Topic

image-20240203153302186

Topic类型的交换机与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.#`:能够匹配`item.spu.insert` 或者 `item.spu
item.*`:能匹配`item.spu` `item.abc`

示例代码:

消息生产者代码

package com.gdit.rabbitmq.mq_java.publish_subscribe.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class TopicProducer {

    // 定义交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] argv) throws Exception {

        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");

        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        try (Connection connection = factory.newConnection();
             // 通过已建立的连接创建一个的通道
             Channel channel = connection.createChannel()) {
            // 声明(创建)交换机 参数1:交换机名称 参数2:交换机类型
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            // 使用循环,每当在控制台输入一行文本,就将其作为消息发送
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String userInput = scanner.nextLine(); // 获取控制台输入
                String[] strings = userInput.split(" "); // 以空格划分为数组
                String message = strings[0]; // 第一个是消息
                String routingKey = strings[1]; // 第二个是路由键
                // 发布消息到指定交换机,根据输入指定路由键
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }

}

声明了交换机类型为 topic

消息消费者代码

package com.gdit.rabbitmq.mq_java.publish_subscribe.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class TopicConsumer {

    // 定义交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        final Connection connection = factory.newConnection();

        // 创建两个通道
        final Channel channel1 = connection.createChannel();
        final Channel channel2 = connection.createChannel();
        // 声明(创建)交换机
        channel1.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 创建队列1
        String queueName1 = "ironMan_queue";
        channel1.queueDeclare(queueName1, true, false, false, null); // 声明(创建队列)
        channel1.queueBind(queueName1, EXCHANGE_NAME, "*.复联.#"); // 绑定队列与交换机
        // 创建队列2
        String queueName2 = "batMan_queue";
        channel2.queueDeclare(queueName2, true, false, false, null); // 声明(创建队列)
        channel2.queueBind(queueName2, EXCHANGE_NAME, "#.正义.#"); // 绑定队列与交换机

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 处理队列1接收的消息
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            // 将消息体转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            // 打印接收的消息
            System.out.println(" [钢铁侠] Received '" + message + "'");
        };
        // 开始消费队列1中的消息,
        channel1.basicConsume(queueName1, true, deliverCallback1, consumerTag -> {});

        // 处理队列2接收的消息
        DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
            // 将消息体转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            // 打印接收的消息
            System.out.println(" [蝙蝠侠] Received '" + message + "'");
        };
        // 开始消费队列1中的消息,
        channel2.basicConsume(queueName2, true, deliverCallback2, consumerTag -> {});

    }

}

在消息消费者代码中,创建了两个队列,钢铁侠队列的路由键为:*.复联.#,蝙蝠侠的路由键为:#.正义.#。意味着钢铁侠队列需要第一个词+复联+任意个词进行匹配,蝙蝠侠队列需要任意词+正义+任意进行匹配

测试运行

image-20240203154325369

消息过期机制

消息过期机制(Time-to-Live)就是可以给每条消息指定一个有效期,一段时间内未被消费者处理就过期了。这种机制允许系统白动清理和丢弃那些长时间未被消费的消息,以避免消息队列中积累过多的过期消息,从而保持系统的效率和可靠性。

消息过期机制的应用场景:

订单超时取消:用户提交订单后 5 分钟或者 15 分钟还没进行支付操作,那么该订单就以及失效了。因此消息过期机制非常适合这种过期场景的处理。通过设置合适的过期时间,可以确保即使清理无效的消息,提高系统的效率和准确性。

官方文档中提供了消息过期机制的两种方式,官方文档:https://www.rabbitmq.com/ttl.html。一种是给队列所有消息设置过期时间,另一种是给某条具体消息设置过期时间。

给队列所有消息设置过期时间

也就是说,无论什么时候进入这个队列的消息,在设定过期时间后特定的时间点都会过期失效。是针对整个队列而言的。

image-20240203162906613

给某条具体消息设置过期时间

针对特定消息设置一个独立的过期时间。这样,在到达指定时间后,这条消息就会过期失效。

image-20240203162923001

示例代码1:给队列所有消息设置过期时间,

以 helloword 的代码为例设置过期时间。

消息生产者代码

package com.gdit.rabbitmq.mq_java.ttl;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

// 定义一个名为 SingleProducer 的类,用于实现消息发送功能
public class TtlProducer {

    // 定义消息队列名称
    private final static String QUEUE_NAME = "ttl_queue";

    public static void main(String[] argv) throws Exception {

        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置 RabbitMQ 所在服务器主机名,本地就localhost
        factory.setUsername("***"); // 指定连接用户名
        factory.setPassword("***"); // 指定连接密码

        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        try (Connection connection = factory.newConnection();
             // 通过已建立的连接创建一个的通道
             Channel channel = connection.createChannel()) {
            /*Map<String, Object> args = new HashMap<>();
            args.put("x-message-ttl", 60000); // 设定队列内消息过期时间 60s
            // 在通道上声明(创建)队列 参数一:队列名称 参数二:是否持久化 参数三:是否为独占队列 参数四:无人使用是否自动删除 参数五:配置参数
            channel.queueDeclare(QUEUE_NAME, false, false, false, args);*/
            // 要发送的消息内容
            String message = "Hello World!";
            // 使用 channel.basicPublish 方法将消息发送到指定队列 参数一:交换机名称 参数二:路由键(队列名称) 参数三:其它 参数四:消息内容
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

这次我们在消息消费者一处声明队列就行了,不用生产者和消费者都声明

消息消费者代码

package com.gdit.rabbitmq.mq_java.ttl;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class TtlConsumer {

    // 定义消息队列名称
    private final static String QUEUE_NAME = "ttl_queue";

    public static void main(String[] argv) throws Exception {

        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        Connection connection = factory.newConnection();
        // 通过已建立的连接创建一个的通道
        Channel channel = connection.createChannel();
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 60000); // 设定队列内消息过期时间 60s
        // 在通道上声明(创建)队列 参数一:队列名称 参数二:是否持久化 参数三:是否为独占队列 参数四:无人使用是否自动删除 参数五:配置参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, args);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 处理接收的消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 将消息体转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'"); // 打印消息
        };
        // 在通道上开始消费队列中的消息,接收的消息会传递给 deliverCallback 来处理,会持续阻塞
        // 参数1:队列名称 参数2:是否自动确认 参数3:消息接收回调函数
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });

    }

}

可以看到,我们在声明队列时设定了过期时间。注意:要将自动确认设置成 false,否则消费者收到消息就立马确认了。

测试运行

image-20240203164340661

可以看到,消息生产者发送了一条消息,消息消费者也收到了,我们再来看 rabbitmq 控制台。

image-20240203164435404

控制台显示 ttl_queue 队列当前消息总数1,未确认1。因为我们把自动确认关闭了,也没在代码里写确认。但会发现过了 60s 这条消息还在,并没有过期。

为什么?因为消费者确实收到了消息,只是还没确认。就像送快递时快递员打电话给你,你接到了电话只是还没签收,这个快递肯定不能丢。要是快递员打电话联系不上,就会认为这个快递没人处理取消了。我们把消息消费者代码关掉再看控制台。

image-20240203164840286

此时消息就被取消了。

示例代码2:给指定消息设置过期时间

// 要发送的消息内容
String message = "Hello World!";
// 给指定消息设置过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .expiration("10000") // 10s
    .build();
// 使用 channel.basicPublish 方法将消息发送到指定队列 参数一:交换机名称 参数二:路由键(队列名称) 参数三:其它 参数四:消息内容
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

死信队列

死信(Dead Letter):指过期的消息、被拒收的消息、处理失败的消息还有消息队列已满后加入队列的消息的统称。

死信队列:专门处理死信的队列,实际就是一个普通的队列,但被专门用来接收并处理死信消息。

死信交换机:用于将死信消息转发到死信队列的交换机,也可以设置路由绑定来缺点消息的路由规则。其实也是一个普通的交换机,只是专门用来转发死信消息的的。

死信队列主要用于处理以下情况下的死信消息。根据官方文档https://www.rabbitmq.com/dlx.html的说明,有以下三种情况:

  • 消息被拒绝:当消费者使用basic.rejectbasic.nack拒绝消息,并将requeue参数设置为 false,意味着不将消息重新放回队列,这时消息就成为了死信
  • 消息过期:当消息的过期时间设置,并且消息在队列中等待时间超过了设定的过期时间
    该消息就会变成死信。
  • 队列长度限制:当队列达到了设置的最大长度限制,新进入的消息将无法被存储,而被直接丢弃。这些无法进入队列的消息被视为死信。

**使用:**根据官网的提示,死信交换机跟死信队列跟普通的一样,正常创建就行。在声明普通队列时将死信交换机作为参数填入,还可以指定死信交换机转发的路由键,意思是死信转发到指定的死信交换机。

image-20240203174619142

示例代码:

示例代码的思路:创建一个死信消息消费者代码,像普通消息消费者一样正常绑定交换机和队列,并处理消息,只是名称换了。在消息生产者代码处正常发送消息,在消息消费者处声明队列时把指定死信队列的参数的Map对象,然后正常接收消息,消息消费者收到消息后把这条消息拒绝。拒接后就会把消息转发到死信消息消费者那里。

死信消息消费者代码

package com.gdit.rabbitmq.mq_java.dead_letter;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class DlxConsumer {

    // 定义死信交换机名称
    private static final String DLX_EXCHANGE_NAME = "dlx_direct_exchange";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        final Connection connection = factory.newConnection();

        // 创建两个通道
        final Channel channe = connection.createChannel();

        // 声明(创建)死信交换机 参数1:交换机名称 参数2:交换机类型
        channe.exchangeDeclare(DLX_EXCHANGE_NAME, "direct");

        // 创建死信队列
        String queueName = "dlx_queue";
        channe.queueDeclare(queueName, true, false, false, null); // 声明(创建队列)
        channe.queueBind(queueName, DLX_EXCHANGE_NAME, "dead"); // 绑定队列与交换机

        // 处理死信队列接收的消息
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            // 将消息体转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            // 打印接收的消息
            System.out.println(" [死信队列] Received '" + message + "'");
        };
        // 开始消费队列中的消息,
        channe.basicConsume(queueName, true, deliverCallback1, consumerTag -> {});

    }
}

就跟创建普通消息消费者代码一样,只是名称换了

消息生产者代码

package com.gdit.rabbitmq.mq_java.dead_letter;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class WorkProducer {
    // 定义工作交换机名称
    private static final String WORK_EXCHANGE_NAME = "work_direct_exchange";

    public static void main(String[] argv) throws Exception {

        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");

        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        try (Connection connection = factory.newConnection();
             // 通过已建立的连接创建一个的通道
             Channel channel = connection.createChannel()) {
            // 声明(创建)交换机 参数1:交换机名称 参数2:交换机类型
            channel.exchangeDeclare(WORK_EXCHANGE_NAME, "direct");

            // 使用循环,每当在控制台输入一行文本,就将其作为消息发送
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.nextLine(); // 获取控制台输入
                // 发布消息到指定交换机,并指定路由键
                channel.basicPublish(WORK_EXCHANGE_NAME, "work", null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }

}

消息消费者代码

package com.gdit.rabbitmq.mq_java.dead_letter;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class WorkConsumer {
    // 定义死信交换机名称
    private static final String DLX_EXCHANGE_NAME = "dlx_direct_exchange";
    // 定义工作交换机名称
    private static final String WORK_EXCHANGE_NAME = "work_direct_exchange";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象,用于创建到 RabbitMQ 服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.**"); // 设置连接属性
        factory.setUsername("***");
        factory.setPassword("***-1594");
        // 使用连接工厂创建一个新的连接,用于和 RabbitMQ 服务器进行交互
        final Connection connection = factory.newConnection();

        // 创建两个通道
        final Channel channel1 = connection.createChannel();

        // 声明(创建)交换机
        channel1.exchangeDeclare(WORK_EXCHANGE_NAME, "direct");

        // 创建用于指定死信队列的参数的Map对象
        Map<String, Object> args1 = new HashMap<>();
        args1.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
        args1.put("x-dead-letter-routing-key", "dead");

        // 创建队列,并将死信队列的参数map对象传入
        String queueName = "cat_queue";
        channel1.queueDeclare(queueName, true, false, false, args1); // 声明(创建队列)
        channel1.queueBind(queueName, WORK_EXCHANGE_NAME, "work"); // 绑定队列与交换机

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 处理队列接收的消息
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            // 将消息体转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            // 打印接收的消息
            System.out.println(" [猫] Received '" + message + "'");
            // 拒绝消息
            channel1.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
        };
        // 开始消费队列中的消息,
        channel1.basicConsume(queueName, false, deliverCallback1, consumerTag -> {});

    }

}

测试运行

image-20240203182022301

可以看到,消息生产者发送消息后,消息消费者收到了,只是在代码处我们拒绝了。然后消息就被死信交换机送到了死信队列,被死信消息消费者收到了就行处理。

Spring Boot整合RabbitMQ的使用

Spring Boot 天然支持集成RabbitMQ,并提供了封装好的框架。类似于JDBC和 MyBatis 的关系。使用这种方式的优点是简单易用,只需要进行相应的配置即可直接使用。缺点是封装得非常好,如果你没有学习过相关文档,可能不知道该如何使用。刚刚我们讲的官方客户端,大家很快就能创建出生产者和消费者。但是,如果你使用 Spring Boot,如果没有看过官方文档,你知道该如何使用吗?你知道如何配置,如何使用特定的语法来发送消息吗?另外一个点是,封装的框架可能不够灵活。这里的不够灵活并不是指它不能实现某些功能,而是指只有 Spring Boot 官方给你封装了的功能,你才能使用。使用别人框架的一个缺点就是,如果框架没有封装某个功能,你可能无法使用,受到了框架的限制。这是一个双刃剑。所以,选择哪种方式取决于具体场景。

这里提供几个博客供大家参考:

【有道云笔记】03异步通信 https://note.youdao.com/s/PHp3ltNC

【RabbitMQ】SpringBoot整合RabbitMQ实现延迟队列、TTL、DLX死信队列

【SpringBoot】 整合RabbitMQ 消息单独以及批量的TTL

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/369362.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ReactNative实现一个圆环进度条

我们直接看效果,如下图 我们在直接上代码 /*** 圆形进度条*/ import React, {useState, useEffect} from react; import Svg, {Circle,G,LinearGradient,Stop,Defs,Text, } from react-native-svg; import {View, StyleSheet} from react-native;// 渐变色 const CircleProgr…

Android学习之路(29) Gradle初探

前言: 大家回想一下自己第一次接触Gradle是什么时候&#xff1f; 相信大家也都是和我一样&#xff0c;在我们打开第一个AS项目的时候&#xff0c; 发现有很多带gradle字样的文件&#xff1a;setting.gradle, build.gradle,gradle.warpper,以及在gradle文件中各种配置&#xff…

基于LLM的文档搜索引擎开发【Ray+LangChain】

Ray 是一个非常强大的 ML 编排框架&#xff0c;但强大的功能伴随着大量的文档。 事实上120兆字节。 我们如何才能使该文档更易于访问&#xff1f; 答案&#xff1a;使其可搜索&#xff01; 过去&#xff0c;创建自己的高质量搜索结果很困难。 但通过使用 LangChain&#xff0c…

Open CASCADE学习|拓扑变换

目录 平移变换 旋转变换 组合变换 通用变换 平移变换 TopoDS_Shape out;gp_Trsf theTransformation;gp_Vec theVectorOfTranslation(0., 0.125 / 2, 0.);theTransformation.SetTranslation(theVectorOfTranslation);BRepBuilderAPI_Transform myBRepTransformation(out, th…

EAK厚膜功率电阻成功在eVTOL大量使用

eVTOL操作的特点是更高的放电曲线&#xff0c;特别是在起飞和着陆期间。 “传统上&#xff0c;电池要么被设计成提供大量能量&#xff0c;要么被设计成高功率&#xff0c;”Cuberg创始人兼首席执行官Richard Wang说。“对于eVTOL电池来说&#xff0c;在能量和功率之间保持良好…

Acwing---826.单链表

单链表 1.题目2.基本思想3.代码实现 1.题目 实现一个单链表&#xff0c;链表初始为空&#xff0c;支持三种操作&#xff1a; 向链表头插入一个数&#xff1b;删除第 k k k 个插入的数后面的数&#xff1b;在第 k k k 个插入的数后插入一个数。现在要对该链表进行 M M M 次…

中科大计网学习记录笔记(五):协议层次和服务模型

前言&#xff1a; 学习视频&#xff1a;中科大郑烇、杨坚全套《计算机网络&#xff08;自顶向下方法 第7版&#xff0c;James F.Kurose&#xff0c;Keith W.Ross&#xff09;》课程 该视频是B站非常著名的计网学习视频&#xff0c;但相信很多朋友和我一样在听完前面的部分发现信…

如何计算两个指定日期相差几年几月几日

一、题目要求 假定给出两个日期&#xff0c;让你计算两个日期之间相差多少年&#xff0c;多少月&#xff0c;多少天&#xff0c;应该如何操作呢&#xff1f; 本文提供网页、ChatGPT法、VBA法和Python法等四种不同的解法。 二、解决办法 1. 网页计算法 这种方法是利用网站给…

69.请描述Spring MVC的工作流程?描述一下 DispatcherServlet 的工作流程?

69.请描述Spring MVC的工作流程&#xff1f;描述一下 DispatcherServlet 的工作流程&#xff1f; 核心架构的具体流程步骤如下&#xff1a; 首先用户发送请求——>DispatcherServlet&#xff0c;前端控制器收到请求后自己不进行处理&#xff0c;而是委托给其他的解析器进行…

day30 window对象——BOM、定时器setTimeout

目录 JavaScript的组成BOM定时器——延时函数两种定时器对比&#xff1a;执行的次数 JavaScript的组成 ECMAScript: 规定了js基础语法核心知识。比如&#xff1a;变量、分支语句、循环语句、对象等等 Web APIs : DOM 文档对象模型&#xff0c; 定义了一套操作HTML文档的APIBOM…

【Iot】什么是串口?什么是串口通信?串口通信(串口通讯)原理,常见的串口通信方式有哪些?

串口通信原理 1. 串口2. 串口通信4. 波特率与比特率5. 帧格式3. 串口通讯的通讯协议3.1. RS2323.2. RS485 总结 1. 串口 串行接口简称串口&#xff0c;也称串行通信接口或串行通讯接口&#xff08;通常指COM接口&#xff09;&#xff0c;是采用串行通信方式的扩展接口。 串口可…

CICD注册和使用gitlab-runner常见问题

1、现象 fatal: unable to access https://github.com/homebrew/brew/: 2、解决 git config --global --unset http.proxy git config --global --unset https.proxy 查看gitlab-runner是否成功&#xff1a; userusers-MacBook-Pro ~ % gitlab-runner -h 查看gitlab-run…

Vue.js设计与实现(霍春阳)

Vue.js设计与实现 (霍春阳) 电子版获取链接&#xff1a;Vue.js设计与实现(霍春阳) 编辑推荐 适读人群 &#xff1a;1.对Vue.js 2/3具有上手经验&#xff0c;且希望进一步理解Vue.js框架设计原理的开发人员&#xff1b; 2.没有使用过Vue.js&#xff0c;但对Vue.js框架设计感兴趣…

Loki使用指南

转载至我的博客 https://www.infrastack.cn &#xff0c;公众号&#xff1a;架构成长指南 与其他日志系统相比&#xff0c; Loki 的使用方式是有一定差异性的&#xff0c;需要用不同的思维方式。本文分享一下这些差异以及我们应该如何使用 作为 Loki 用户或操作人员&#xff0…

Leetcode—37. 解数独【困难】

2024每日刷题&#xff08;111&#xff09; Leetcode—37. 解数独 实现代码 class Solution { public:bool isValid(vector<vector<char>>& board, int row, int col, char c) {for(int i 0; i < 9; i) {if(board[row][i] c || board[i][col] c || boar…

最新GPT4.0使用教程,AI绘画,GPT语音对话使用,DALL-E3文生图

一、前言 ChatGPT3.5、GPT4.0、GPT语音对话、Midjourney绘画&#xff0c;文档对话总结DALL-E3文生图&#xff0c;相信对大家应该不感到陌生吧&#xff1f;简单来说&#xff0c;GPT-4技术比之前的GPT-3.5相对来说更加智能&#xff0c;会根据用户的要求生成多种内容甚至也可以和…

界面控件DevExpress ASP.NET Spreadsheet组件 - 轻松集成电子表格功能!(一)

DevExpress ASP. NET Spreadsheet组件允许您轻松地将电子表格功能合并到任意ASP. NET应用程序&#xff0c;它可以加载、转换和保存工作簿到XLS-XLSx二进制文件格式&#xff0c;还可以导出和导入XLSX、CSV和TXT文件。 P.S&#xff1a;DevExpress ASP.NET Web Forms Controls拥有…

STM32--SPI通信协议(1)SPI基础知识总结

前言 I2C (Inter-Integrated Circuit)和SPI (Serial Peripheral Interface)是两种常见的串行通信协议&#xff0c;用于连接集成电路芯片之间的通信&#xff0c;选择I2C或SPI取决于具体的应用需求。如果需要较高的传输速度和简单的接口&#xff0c;可以选择SPI。如果需要连接多…

开关电源学习之Buck电路

一、引言 观察上方的电路&#xff0c;当开关闭合到A点时&#xff0c;电流流过电感线圈&#xff0c;形成阻碍电流流过的磁场&#xff0c;即产生相反的电动势&#xff1b;电感L被充磁&#xff0c;流经电感的电流线性增加&#xff0c;在电感未饱和前&#xff0c;电流线性增加&…

零基础Vue框架上手;git,node,yarn安装

项目搭建环境&#xff1a; git安装&#xff1a;Git - 安装 Git (git-scm.com)&#xff08;官网&#xff09; 下载路径&#xff1a;Git - Downloading Package (git-scm.com)&#xff1b;根据自己电脑下载相对应的安装包 ​ 点next ​ 点next&#xff0c;点到最后安装就行。…