深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念

文章目录

  • 文章导图
  • RabbitMQ架构及相关概念
    • 四大核心概念
    • 名词解读
  • 七大工作模式及四大交换机类型
    • 0、前置了解-默认交换机DirectExchange
    • 1、简单模式(Simple Queue)-默认DirectExchange
    • 2、 工作队列模式(Work Queues)-默认DirectExchange
    • 3、发布/订阅模式(Publish/Subscribe)-FanoutExchange
    • 4、路由模式(Routing)-自定义DirectExchange
    • 5、主题模式(Topics)-TopicExchange
    • 总结
  • 三种队列类型
    • 普通队列
    • 死信队列(Dead Letter Queue, DLQ)
      • 定义
      • 触发条件
      • 应用场景
      • 配置
    • 延迟队列(Delayed Queue)
      • 定义
      • 实现方式
      • 应用场景
    • 两者区别
    • 代码实战
      • 1. 延迟队列:TTL+DLX死信队列
        • 配置步骤
      • 2. 延迟队列:RabbitMQ延迟消息插件
        • 配置步骤
      • 3、死信队列: basic.reject或basic.nack
        • 1. 引入依赖
        • 2. 配置交换机、队列和死信队列
        • 3. 生产者发送消息
        • 4. 消费者监听并拒绝消息
        • 5. 消费者监听死信队列
        • 总结

RabbitMQ系列文章
深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念
TODO:RabbitMQ可靠性
TODO:RabbtiMQ顺序性
TODO:RabbitMQ常见问题整理

文章导图

在这里插入图片描述

RabbitMQ架构及相关概念

在这里插入图片描述

四大核心概念

生产者

产生数据发送消息的程序是生产者。

交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得由交换机类型决定 。

队列

队列是RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 。

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者

名词解读

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类 似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务 时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接 Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接, 如果应用程序支持多线程,通常每个thread创建单独的
  • channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走 Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。
  • Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

七大工作模式及四大交换机类型

网上查了很多资料有的说是五种,有的说是四种,可以看到在RabbitMQ在官网提到的共有7种工作模式:https://www.rabbitmq.com/tutorials

第6种是RPC调用,这个我们正常肯定不用这个实现RPC,而第7种是confirm 确认模式,可以用于保证生产者消息发送的可靠性,这个我后面会再专门介绍。
现在我们主要讲前5种工作模式,实际上总结来说5种又可以总结为是3种,其实第1、2、4根据他们都是Direct交换机可以归结为一种,下文我会详细讲解一下。

在这里插入图片描述

0、前置了解-默认交换机DirectExchange

RabbitMQ有一个自带的交换机,也被称为AMQP default exchange。当消息发送到RabbitMQ时,如果没有指定交换机,就会被发送到默认交换机。默认交换机的类型为direct类型,路由键与队列名相同

如果消息的路由键和某个队列的名称一致,那么消息就会被发送到这个队列中。如果消息的路由键和任何一个队列的名称都不一致,那么消息会被丢弃。 默认交换机可以通过设置routing_key来指定消息的目的地,例如:

//  将消息发送到名称为test_queue的队列中,空字符串代表默认交换机
channel.basic_publish(exchange="", routing_key="test_queue", body="Hello, RabbitMQ!")

但是,建议应用程序在发送消息时显式地指定交换机,以避免不必要的麻烦或错误。默认交换机只是一个简单的机制,不应被用于复杂的应用程序。

1、简单模式(Simple Queue)-默认DirectExchange

这个和别的几种模式对比看着没有X,这个其实用了默认的交换机,我们需要提供一个生产者一个队列以及一个消费者。消息传播图如下:

在这里插入图片描述

//Config
@Bean
Queue queue1() {
    return new Queue("simpleQueue");
}

// 生产者
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendSimpleMessage(String message) {
    rabbitTemplate.convertAndSend("simpleQueue", message);
}

// 消费者
@RabbitListener(queues = "simpleQueue")
public void receiveSimpleMessage(String message) {
    System.out.println("Received: " + message);
}

这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “simpleQueue”,则 routingkey 为 “simpleQueue” 的消息会被该消息队列接收。
具体可以看下源码发送convertAndSend

/** Alias for amq.direct default exchange. */
private static final String DEFAULT_EXCHANGE = "";

private static final String DEFAULT_ROUTING_KEY = "";

private volatile String exchange = DEFAULT_EXCHANGE;
private volatile String routingKey = DEFAULT_ROUTING_KEY;

@Override
public void convertAndSend(String routingKey, final Object object) throws AmqpException {
	//可以发现这个this.exchange就是DEFAULT_EXCHANGE = ""
    convertAndSend(this.exchange, routingKey, object, (CorrelationData) null);
}

2、 工作队列模式(Work Queues)-默认DirectExchange

这种情况是这样的:

在这里插入图片描述

一个生产者,也是一个默认的交换机(DirectExchange),一个队列,两个消费者。
一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。

和第一种对比主要体现在有多个消费者进行消费,主要优势在于可以通过增加消费者来提高处理能力。

//Config
@Bean
Queue queue1() {
    return new Queue("workQueue");
}

// Producer
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendWorkMessage(String message) {
    rabbitTemplate.convertAndSend("workQueue", message);
}

// Consumer
@RabbitListener(queues = "workQueue")
public void receiveWorkMessage(String message) {
    System.out.println("Received: " + message);
    // Simulate work
    Thread.sleep(1000);
}

3、发布/订阅模式(Publish/Subscribe)-FanoutExchange

FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:

在这里插入图片描述

在这里首先创建 FanoutExchange,参数含义与创建 DirectExchange 参数含义一致,然后创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上。

//Config
@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanoutExchange");
}

@Bean
public Queue fanoutQueue1() {
    return new Queue("fanoutQueue1");
}

@Bean
public Queue fanoutQueue2() {
    return new Queue("fanoutQueue2");
}

@Bean
public Binding binding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
    return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

@Bean
public Binding binding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
    return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}

接下来创建两个消费者,两个消费者分别消费两个消息队列中的消息,然后在单元测试中发送消息,如下:

// Producer
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendFanoutMessage(String message) {
    rabbitTemplate.convertAndSend("fanoutExchange", null, message);
}

// Consumer
@RabbitListener(queues = "fanoutQueue1")
public void receiveFanoutMessage1(String message) {
    System.out.println("Queue1 Received: " + message);
}

@RabbitListener(queues = "fanoutQueue2")
public void receiveFanoutMessage2(String message) {
    System.out.println("Queue2 Received: " + message);
}

注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null

4、路由模式(Routing)-自定义DirectExchange

DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “directQueue1”,则 routingkey 为 “directQueue1” 的消息会被该消息队列接收。

在这里插入图片描述

// Config
@Bean
public DirectExchange directExchange() {
    return new DirectExchange("directExchange");
}

@Bean
public Queue directQueue1() {
    return new Queue("directQueue1");
}

@Bean
public Queue directQueue2() {
    return new Queue("directQueue2");
}

@Bean
public Binding directBinding1(DirectExchange directExchange, Queue directQueue1) {
    return BindingBuilder.bind(directQueue1).to(directExchange).with("info");
}

@Bean
public Binding directBinding2(DirectExchange directExchange, Queue directQueue2) {
    return BindingBuilder.bind(directQueue2).to(directExchange).with("error");
}

可以发现我们可以根据routingKey控制发送到哪个队列上,这个本质上和我们前面2种模式都是一样的,采用的都是DirectExchange,只不过前面2种的交换机DirectExchange是""默认值,现在我们这里是指定了自己的DirectExchange

// Producer
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendDirectMessage(String message, String routingKey) {
    rabbitTemplate.convertAndSend("directExchange", routingKey, message);
}

// Consumer
@RabbitListener(queues = "directQueue1")
public void receiveDirectMessage1(String message) {
    System.out.println("Queue1 Received: " + message);
}

@RabbitListener(queues = "directQueue2")
public void receiveDirectMessage2(String message) {
    System.out.println("Queue2 Received: " + message);
}

特别注意:如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。

5、主题模式(Topics)-TopicExchange

在这里插入图片描述

在 RabbitMQ 的主题模式(Topics)中,消息通过带有路由键的主题交换机(TopicExchange)进行路由。消息的路由键是一个点分隔的字符串,消费者可以使用绑定键(带有通配符)来订阅感兴趣的消息。

  • 队列 topicQueue1 使用绑定键 *.orange.*,匹配任意第一个和第三个单词,以 orange 为第二个单词的消息。
  • 队列 topicQueue2 使用绑定键 *.*.rabbit,匹配任意前两个单词,以 rabbit 为第三个单词的消息。
// Config
@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topicExchange");
}

@Bean
public Queue topicQueue1() {
    return new Queue("topicQueue1");
}

@Bean
public Queue topicQueue2() {
    return new Queue("topicQueue2");
}

@Bean
public Binding topicBinding1(TopicExchange topicExchange, Queue topicQueue1) {
    return BindingBuilder.bind(topicQueue1).to(topicExchange).with("*.orange.*");
}

@Bean
public Binding topicBinding2(TopicExchange topicExchange, Queue topicQueue2) {
    return BindingBuilder.bind(topicQueue2).to(topicExchange).with("*.*.rabbit");
}

  • topicQueue1topicQueue2 接收匹配其绑定键的消息。
  • 灵活路由: 主题模式允许通过复杂的路由键实现灵活的消息路由。
  • 使用场景: 适用于需要按模式匹配路由消息的场景,比如日志分级、区域性数据分发等。
// Producer
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendTopicMessage(String message, String routingKey) {
    rabbitTemplate.convertAndSend("topicExchange", routingKey, message);
}

// Consumer
@RabbitListener(queues = "topicQueue1")
public void receiveTopicMessage1(String message) {
    System.out.println("Queue1 Received: " + message);
}

@RabbitListener(queues = "topicQueue2")
public void receiveTopicMessage2(String message) {
    System.out.println("Queue2 Received: " + message);
}

总结

看了上面的5个例子,其实本质上我们可以根据Exchange交换机类型归结为3种工作模式Direct、Fanout、Topic

  • Direct:定向,把消息交给符合指定routing key 的队列 (第1、2、4其实都是这种交换机)
  • Fanout:广播,将消息交给所有绑定到交换机的队列 第**(第3种模式)**
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列**(第5种模式)**

这里提一下,交换机还有一种类型,Headers:头匹配,基于MQ的消息头匹配,不过这种用的非常少,可以忽略!

不难发现,这三种类型本质上是告诉交换机应该把消息发送给哪些那些队列的,三种类别对应着三种判断角度

  • direct —— 消息发送时都附带一个字段叫routing_key,direct 模式的交换机就会直接把该字段理解成队列名,找到对应的队列并发送;
  • fanout —— 相当于广播,不作任何选择,发送给所有连接的队列;
  • topic —— 交换机会把routing_key理解成一个主题,恰好,队列绑定交换机时也可以以缩略形式指定主题,所以找到匹配主题的队列就发送;

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

在这里插入图片描述

三种队列类型

普通队列

我们平常发送的正常都是普通队列,比如上面5种工作模式说的都是普通队列,就不多说了

死信队列(Dead Letter Queue, DLQ)

特别注意:

  • 队列和消息都有个TTL生存时间,队列的TTL到达后队列会自动删除,消息不会进入死信队列;
  • 消息的生存时间到达后会进入死信队列。消息的生存时间可以在队列设置所有消息的TTL,也可以对某个消息单独设置TTL。

定义

死信队列是用于处理无法被消费者正确处理的消息的队列。当消息在原始队列中无法被消费时,会被转移到死信队列中。

触发条件

消息会变成死信并进入死信队列的几种情况:

  1. 消息被消费者拒绝(通过basic.rejectbasic.nack),并且requeue=false
  2. 消息在队列中超过了TTL(Time To Live)时间。
  3. 队列达到最大长度限制,无法再接收新消息。

在这里插入图片描述

应用场景

  • 处理无法被消费的消息,避免消息堆积影响其他消息的消费。
  • 记录和监控消息处理错误,方便进行后续处理

配置

  • 通过设置 x-dead-letter-exchangex-dead-letter-routing-key 将消息路由到死信队列。
  • 在原始队列中设置死信交换机和死信队列的相关参数

延迟队列(Delayed Queue)

定义

延迟队列是一种特殊的队列,消息在发送到队列后,需要等待一段时间后才能被消费。

实现方式

  1. 通过死信队列实现延迟任务

    把死信队列就当成延迟队列,具体来说是这样:

    假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。

    • 将消息发送到一个没有消费者的队列,设置消息的TTL。
    • 消息过期后进入死信队列,再由死信队列的消费者处理。
  2. 通过RabbitMQ延迟插件

    • 使用RabbitMQ的延迟插件(rabbitmq_delayed_message_exchange 插件),消息在延迟一段时间后再投递到目标队列中进行消费。

应用场景

  • 订单超时未支付自动取消。
  • 用户注册后未登录的提醒。
  • 预定会议前的通知

两者区别

使用TTL和死信队列实现延迟插件其实是会有一些问题的:

  • 问题一:当我们的业务比较复杂的时候, 需要针对不同的业务消息类型设置不同的过期时间策略, 必然我们也需要为不同的队列消息的过期时间创建很多的Queue的Bean对象, 当业务复杂到一定程度时, 这种方式维护成本过高;
  • 问题二:就是队列的先进先出原则导致的问题,当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候,消息是串行被消费的,所以必然是等到先进入队列的消息的过期时间结束, 后进入队列的消息的过期时间才会被监听,然而实际上这个消息早就过期了,这就导致了本来过期时间为3秒的消息,实际上过了13秒才会被处理,这在实际应用场景中肯定是不被允许的;

延迟交换机插件可以在一定程度上解决上述两种问题。

特性死信队列延迟队列
定义处理无法被消费的消息消息在指定时间后才被消费
触发条件消息被拒绝、消息过期、队列满消息设置了TTL或使用延迟插件
应用场景处理消费失败的消息,避免队列堵塞订单超时取消、提醒通知等延迟处理场景
实现方式配置死信交换机和死信队列使用TTL和死信队列或延迟插件
消息处理进入死信队列后进行特殊处理延迟一段时间后再投递到目标队列

代码实战

1. 延迟队列:TTL+DLX死信队列

配置步骤

1、引入依赖

pom.xml中引入Spring Boot和RabbitMQ的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置交换机和队列

在Spring Boot的配置类中,配置一个普通队列、一个死信队列、一个普通交换机和一个死信交换机:

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    // 普通交换机
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange("normal_exchange");
    }

    // 死信交换机
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dead_letter_exchange");
    }

    // 普通队列并绑定到普通交换机
    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal_queue")
                .withArgument("x-dead-letter-exchange", "dead_letter_exchange")
                .withArgument("x-dead-letter-routing-key", "dead_letter_routing_key")
                .build();
    }

    @Bean
    public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("normal_routing_key");
    }

    // 死信队列并绑定到死信交换机
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("dead_letter_queue");
    }

    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("dead_letter_routing_key");
    }
}

3、生产者发送消息

在生产者发送消息时,可以指定消息的TTL(Time-To-Live)。TTL到期后,消息会被转发到死信队列:

  • 创建了一个匿名内部类实现了MessagePostProcessor接口,并重写了postProcessMessage()方法。在该方法中,设置了消息的延迟时间为传进来的delay延迟时间
java复制代码import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send(@RequestParam String message, @RequestParam int delay) {
        rabbitTemplate.convertAndSend("normal_exchange", "normal_routing_key", message, msg -> {
			//创建了一个匿名内部类实现了MessagePostProcessor接口,并重写了postProcessMessage()方法。在该方法中,设置了消息的延迟时间为传进来的delay延迟时间
            msg.getMessageProperties().setExpiration(String.valueOf(delay));
            return msg;
        });
        return "Message sent with delay: " + delay;
    }
}

4、消费者监听死信队列

消费者监听死信队列,接收到消息后处理:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @RabbitListener(queues = "dead_letter_queue")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

2. 延迟队列:RabbitMQ延迟消息插件

RabbitMQ有一个插件 rabbitmq-delayed-message-exchange 可以直接支持延迟消息队列。

配置步骤

1、安装RabbitMQ延迟消息插件

首先,确保RabbitMQ服务器上已安装rabbitmq-delayed-message-exchange插件。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange/21、**配置交换机和队列**

2、在Spring Boot中配置使用延迟消息交换机:

  • 使用CustomExchange类创建一个自定义交换机对象。CustomExchange是Spring AMQP库提供的一个类,用于创建自定义的交换机。构造方法的参数依次为交换机的名称、类型、是否持久化、是否自动删除和属性。
  • 交换机的名称为DELAYED_EXCHANGE,类型为"x-delayed-message",持久化为true,自动删除为false,属性为之前创建的HashMap对象。
java复制代码import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitDelayedConfig {

    @Bean
    public CustomExchange delayedExchange() {
        return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, Map.of("x-delayed-type", "direct"));
    }

    @Bean
    public Queue delayedQueue() {
        return new Queue("delayed_queue");
    }

    @Bean
    public Binding delayedBinding(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("delayed_routing_key").noargs();
    }
}

3、生产者发送消息

生产者在发送消息时,可以设置延迟时间:

java复制代码import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

@RestController
public class DelayedProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendDelayed")
    public String sendDelayed(@RequestParam String message, @RequestParam int delay) {
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-delay", delay);
        rabbitTemplate.convertAndSend("delayed_exchange", "delayed_routing_key", message, msg -> {
            msg.getMessageProperties().getHeaders().putAll(headers);
            return msg;
        });
        return "Delayed message sent with delay: " + delay;
    }
}

4、消费者监听延迟队列

与TTL+DLX方法相同,消费者直接监听队列接收消息:

java复制代码import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DelayedConsumer {

    @RabbitListener(queues = "delayed_queue")
    public void receiveDelayedMessage(String message) {
        System.out.println("Received delayed message: " + message);
    }
}

3、死信队列: basic.reject或basic.nack

死信队列有3种情况: 这里就举常见的手动ack的情况拒绝消息实现死信队列

要在Spring Boot中使用RabbitMQ实现死信队列(Dead Letter Queue,DLQ),并处理消息被消费者拒绝的情况(通过basic.rejectbasic.nack并且requeue=false),可以按照以下步骤来实现。

1. 引入依赖

首先,在pom.xml中引入Spring Boot和RabbitMQ的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置交换机、队列和死信队列

接下来,在Spring Boot的配置类中配置一个普通队列、一个死信队列、一个普通交换机和一个死信交换机。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    // 普通交换机
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange("normal_exchange");
    }

    // 死信交换机
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dead_letter_exchange");
    }

    // 普通队列并绑定到普通交换机
    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal_queue")
                .withArgument("x-dead-letter-exchange", "dead_letter_exchange") // 设置死信交换机
                .withArgument("x-dead-letter-routing-key", "dead_letter_routing_key") // 设置死信RoutingKey
                .build();
    }

    @Bean
    public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("normal_routing_key");
    }

    // 死信队列并绑定到死信交换机
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("dead_letter_queue");
    }

    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("dead_letter_routing_key");
    }
}
3. 生产者发送消息

在生产者中发送消息到普通队列:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send(@RequestParam String message) {
        rabbitTemplate.convertAndSend("normal_exchange", "normal_routing_key", message);
        return "Message sent: " + message;
    }
}
4. 消费者监听并拒绝消息

注意这里的前提是要开启手动ack:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual  # 手动ack

消费者监听普通队列并有条件地拒绝消息,将消息转发到死信队列:

  • 当发送的消息内容为"reject"时,该消息会被拒绝并转发到死信队列。
  • 当发送其他内容的消息时,消息会被正常消费。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;

@Component
public class Consumer {

    @RabbitListener(queues = "normal_queue")
    public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        try {
            String msg = new String(message.getBody());
            System.out.println("Received message: " + msg);

            // 根据某些条件判断是否拒绝消息
            if ("reject".equals(msg)) {
                // 拒绝消息,并且不重新入队(requeue=false)
                channel.basicReject(tag, false);
                System.out.println("Message rejected: " + msg);
            } else {
                // 消费成功,确认消息
                channel.basicAck(tag, false);
            }
        } catch (Exception e) {
            // 异常情况也可以使用basicNack将消息拒绝,并且不重新入队
            channel.basicNack(tag, false, false);
        }
    }
}
5. 消费者监听死信队列

最后,消费者监听死信队列,处理被拒绝的消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DeadLetterConsumer {

    @RabbitListener(queues = "dead_letter_queue")
    public void receiveDeadLetterMessage(String message) {
        System.out.println("Received dead letter message: " + message);
    }
}
总结
  • 配置普通队列和死信队列,并通过设置x-dead-letter-exchangex-dead-letter-routing-key来实现消息被拒绝后的处理。
  • 消费者可以根据业务逻辑通过basic.rejectbasic.nack拒绝消息,并指定不重新入队(requeue=false),从而将消息转发到死信队列。
  • 死信队列中的消息可以被另一个消费者监听和处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/873464.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

探索EasyCVR与AI技术深度融合:视频汇聚平台的新增长点

随着5G、AI、边缘计算、物联网&#xff08;IoT&#xff09;、云计算等技术的快速发展&#xff0c;万物互联已经从概念逐渐转变为现实&#xff0c;AIoT&#xff08;物联网人工智能&#xff09;的新时代正在加速到来。在这一背景下&#xff0c;视频技术作为信息传输和交互的重要手…

简单实用的php全新实物商城系统

免费开源电商系统,提供灵活的扩展特性、高度自动化与智能化、创新的管理模式和强大的自定义模块,让电商用户零成本拥有安全、高效、专业的移动商城。 代码是全新实物商城系统源码版。 代码下载

c++进阶——unordered的封装

嗨喽大家好呀&#xff0c;今天阿鑫给大家带来的是c进阶——unordered的封装&#xff0c;好久不见啦&#xff0c;下面让我们进入本节博客的内容吧&#xff01; c进阶——unordered的封装 unordered系列的基本架构unordered系列迭代器的封装unordered不支持修改keyoperator[]的…

macos系统内置php文件列表 系统自带php卸载方法

在macos系统中, 自带已经安装了php, 根据不同的macos版本php的版本号可能不同, 我们可以通过 which php 命令来查看mac自带的默认php安装路径, 不过注意这个只是php的执行文件路径. 系统自带php文件列表 一下就是macos默认安装的php文件列表. macos 10.15内置PHP文件列表配置…

软件工程-图书管理系统的概要设计

软件概要设计说明书 目录 软件概要设计说明书 一、引言 1.1 编写目的 1.2 背景 1.3 定义 1.3.1特定对象 1.3.2专业术语 1.4 参考资料 二、总体设计 2.1 需求规定 2.1.1信息要求 2.1.2功能要求 2.2 运行环境 2.3 基本概要设计和处理流程 2.4 体系结构设计 2.5 模…

基于微信小程序在线订餐系统

微信小程序在线订餐系统 摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了微信小程序在线订餐系统的开发全过程。通过分析微信小程序在线订餐系统管理的不足&#xff0c;创建了一个计算机管理微信小程序在线订…

【Python基础】Python函数

本文收录于 《Python编程入门》专栏&#xff0c;从零基础开始&#xff0c;分享一些Python编程基础知识&#xff0c;欢迎关注&#xff0c;谢谢&#xff01; 文章目录 一、前言二、函数的定义与调用三、函数参数3.1 位置参数3.2 默认参数3.3 可变数量参数&#xff08;或不定长参数…

Kubernetes 简介及部署方法

目录 一、Kubernetes简介 1 应用部署方式演变 1.2 容器编排应用 1.3 kubernetes 简介 1.4 K8S的设计架构 1.4.1 K8S各个组件用途 1.4.2 K8S 各组件之间的调用关系 1.4.3 K8S 的 常用名词感念 1.4.4 k8S的分层架构 二 K8S集群环境搭建 2.1 k8s中容器的管理方式 2.2 …

网络安全知识:什么是访问控制列表 (ACL)?

访问控制列表 (ACL) 是网络安全和管理的基础。它们在确定谁或什么可以访问网络内的特定资源方面发挥着重要作用。 本文深入探讨了 ACL 的复杂性&#xff0c;探索了其类型、组件、应用程序和最佳实践。我们还将比较不同操作系统的 ACL&#xff0c;并讨论它们在网络架构中的战略…

生信圆桌x生信分析平台:助力生物信息学研究的综合工具

介绍 少走弯路,高效分析;了解生信云,访问 【生信圆桌x生信专用云服务器】 : www.tebteb.cc 生物信息学的迅速发展催生了众多生信分析平台&#xff0c;这些平台通过集成各种生物信息学工具和算法&#xff0c;极大地简化了数据处理和分析流程&#xff0c;使研究人员能够更高效地…

如何使div居中?CSS居中终极指南

前言 长期以来&#xff0c;如何在父元素中居中对齐一个元素&#xff0c;一直是一个让人头疼的问题&#xff0c;随着 CSS 的发展&#xff0c;越来越多的工具可以用来解决这个难题&#xff0c;五花八门的招式一大堆&#xff0c;这篇博客&#xff0c;旨在帮助你理解不同的居中方法…

EVO进行轨迹评估

EVO进行轨迹评估 文章目录 EVO进行轨迹评估1 前言1.1 轨迹对齐1.2 尺度变换1.3 绝对轨迹误差ATE和相对轨迹误差RTE1.4 绝对姿态误差APE和相对姿态误差RPE 2 安装evo2.1 evo安装2.2 相关报错2.2.1 版本不兼容问题2.2.2 解决PATH警告 2.3 测试 3 evo指令3.1 evo_traj3.2 evo_ape3…

Spring Boot3.x 启动自动执行sql脚本

1 引言 某些项目在首次启动时&#xff0c;需要先手动创建数据库表&#xff0c;然后再手动写入初始数据才能正常使用。为了省去这个手动操作过程&#xff0c;我们可以使用Spring Boot启动时执行sql脚本的配置&#xff0c;全自动完成这个过程。 2 配置 具体配置如下&#xff1…

Python 调用手机摄像头

Python 调用手机摄像头 在手机上安装软件 这里以安卓手机作为演示&#xff0c;ISO也是差不多的 软件下载地址 注意&#xff1a;要想在电脑上查看手机摄像头拍摄的内容的在一个局域网里面(没有 WIFI 可以使用 热点 ) 安装完打开IP摄像头服务器 点击分享查看IP 查看局域网的I…

[Linux]:进程(下)

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;Linux学习 贝蒂的主页&#xff1a;Betty’s blog 1. 进程终止 1.1 进程退出的场景 进程退出只有以下三种情况&#xff1a; 代…

flume 使用 exec 采集容器日志,转储磁盘

flume 使用 exec 采集容器日志&#xff0c;转储磁盘 在该场景下&#xff0c;docker 服务为superset&#xff0c;flume 的sources 选择 exec &#xff0c; sinks选择 file roll 。 任务配置 具体配置文件如下&#xff1a; #simple.conf: A single-node Flume configuration#…

Shader 渲染路径

实际的游戏开发中&#xff0c;场景中的光源肯定是更多、更复杂的&#xff0c;如果只有一个平行光的处理&#xff0c;完全不能满足需求。处理更多的光源&#xff0c;我们就需要了解Unity底层是如何处理这些光源的。 1、渲染路径是什么 渲染路径&#xff08;Rendering Path&…

Apache POl的使用(导出报表)

介绍 Apache POl是一个处理Miscrosoft Office各种文件格式的开源项目。简单来说就是&#xff0c;我们可以使用 PO! 在 Java 程序中对Miscrosoft Office各种文件进行读写操作。一般情况下&#xff0c;POI都是用于操作 Excel 文件。 Apache POl的应用场景: 银行网银系统导出交…

中秋之美——html5+css+js制作中秋网页

中秋之美——html5cssjs制作中秋网页 一、前言二、功能展示三、系统实现四、其它五、源码下载 一、前言 八月十五&#xff0c;秋已过半&#xff0c;是为中秋。 “但愿人长久&#xff0c;千里共婵娟”&#xff0c;中秋时节&#xff0c;气温已凉未寒&#xff0c;天高气爽&#x…

助贷行业的三大严峻挑战:贷款中介公司转型债务重组业务

大家是否察觉到一种趋势&#xff1f;现如今&#xff0c;众多贷款辅助服务机构与专注于债务再构的公司之间形成了紧密的“联动”。有的选择将获取的贷款需求转介给债务重组方&#xff0c;有的则直接下场&#xff0c;动用自身资本参与债务重组业务。这一现象背后&#xff0c;究竟…