学习RabbitMQ高级特性

目标:

了解熟悉RabbitMQ的高级特性


学习步骤:

高级特性主要分为以下几点, 官网介绍

1、消息可靠性投递 【confirm 确认模式、return 退回模式】
2、Consumer ACK 【acknowledge】
3、消费端限流 【prefetch】
4、TTL过期时间 【time to live】
5、死信队列 【Dead Letter Exchange】
6、延迟队列 【rabbitmq-delayed-message-exchange】
7、优先级队列 【x-max-priority】

在这里插入图片描述


前戏:项目搭建

1、创建两个module,一个为生产者,一个为消费者

分别添加如下依赖【或者将依赖放置在父工程下,两个module作为子工程引用即可】

 <dependencies>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-test</artifactId>
     </dependency>
 </dependencies>

2、 配置RabbitMQ 的基本信息 [application.yml]

spring:
  rabbitmq:
    host: 服务器IP 
    port: 5672 # 端口默认为 5672
    username: guest # 默认账号有guest 密码一致
    password: guest
    virtual-host: /

3、编写配置类RabbitMQConfig,注册队列、交换机、以及绑定关系

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_NAME = "csnz_queue";
    public static final String EXCHANGE_NAME = "csnz_exchange";

    // 1、注册队列
    @Bean("CSNZQueue")
    public Queue getQueue(){
    	// 使用QueueBuilder构建一个队列,设置队列持久化,以及自动删除。
        return QueueBuilder.durable(QUEUE_NAME).autoDelete().build();
    }
    // 2、注册交换机
    @Bean("CSNZExchange")
    public Exchange getExchange(){
    	// 使用ExchangeBuilder构建一个交换机(类型可选,此处为通配符交换机),设置持久化和自动删除
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).autoDelete().durable(true).build();
    }
    // 3、绑定队列和交换机
    @Bean("CSNZBind")
    public Binding bindQueueExchange(@Qualifier("CSNZQueue")Queue queue,@Qualifier("CSNZExchange")Exchange exchange){
    	// 使用BindingBuilder 将刚刚声明的队列和交换机绑定并设置绑定的路由key
        return BindingBuilder.bind(queue).to(exchange).with("csnz.#").noargs();
    }
}

一、消息可靠性投递

RabbitMQ提供了两种模式来控制消息的投递(生产者发送的)可靠性

  • confirm 确认模式
  • return 退回模式
    在这里插入图片描述

因为消息投递过程是从 生产者Broker[exchange -> queue] 再到 消费者

两种模式过程:

只要利用这俩个callback就可以控制消息的 可靠性投递了

demo演示

确认模式:

1、在配置中开启 publisher-confirms 为 true

2、在rabbitTemplate定义 confirmCallBack 回调函数

!](https://img-blog.csdnimg.cn/83be1b4662e8487c8c29cb69465d6650.png)

	/*
        确认模式:
        1、在配置中开启 publisher-confirms为true
        2、在rabbitTemplate定义confirmCallBack回调函数
     */
    @Test
    public void testConfirm(){
        // 定义confirmCallBack回调函数
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            /*
                CorrelationData:相关的配置信息【在convertAndSend重载方法中有包含此信息】
                ack;exchange交换机,是否成功收到了信息。
                cause:失败原因。如果成功接收则此值为null
             */
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    System.out.println("消息成功发送");
                }else{
                    System.out.println("发送失败原因:" + cause);
                    // 重新发起或其他操作
                }

            }
        });

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testConfirm","测试确认回调模式");
    }

此时,如果发送消息时指定的交换机和路由键都是正确的,即代码块第27行参数正确,则第17行的 ack值为true,执行 消息成功发送到交换机后的逻辑代码
在这里插入图片描述
这里发送失败就是因为我们把交换机的名称写错了,换成正确的交换机名称就好
在这里插入图片描述

回退模式:

当消息发送给Exchange后,Exchange路由到queue失败时才会执行 ReturnCallBack

1、在配置中开启 publisher-returns 为 true

2、设置ReturnCallBack

3、设置Exchange处理消息的模式:

  • 如果消息没路由到queue

    • 1、丢弃消息(默认)即 rabbitTemplate.setMandatory(false);
    • 2、 返回给消息发送方 ReturnCallBack 即 rabbitTemplate.setMandatory(true);

在这里插入图片描述

	@Test
    public void testReturn(){
        // 设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);
        // 设置ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            /*
                Message:消息对象
                replyCode:错误码
                replyText:错误信息
                exchange:交换机
                routingKey:路由键
             */
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println(message);
            }
        });

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"error.testReturn","测试回退模式");
    }

此时,如果发送消息时指定的交换机和路由键发生找不到的情况,即代码块第20行参数错误
情况一:没写第四行设置交换机处理失败消息的模式为true,则不会执行第15行的代码,因为消息失败默认是丢弃模式

在这里插入图片描述
在这里插入图片描述
因为默认此值就是false
在这里插入图片描述
情况二:设置交换机处理失败消息的模式为true,则会执行第15行的代码块,消息发送失败时,消息会通过回调返回,此时就可以查看消息发送失败的具体原因
在这里插入图片描述

二、Consumer Ack

Ack为Acknowledge,顾名思义,指的是消费者收到消息后的确认模式。

分为三种模式

  • 自动确认:acknowledge=“none”(默认)
    • 当消息一旦被consumer接收了,则自动确认收到,并移除消息缓存中的信息
  • 手动确认:acknowledge=“manual”
    • 手动ACK
    • 手动NACK
  • 根据异常情况判断是否确认:acknowledge=“auto”

实际业务情况:

一般不会使用 自动确认模式,因为收到消息后,很可能在进行业务处理时出现异常,造成数据丢失。真就啪一下没了。
一般都是使用 手动确认模式

  • 即在业务处理成功后,调用 channel.basicAck() 进行手动确认,会发送给 broker 一个应答,代表消息处理成功
  • 如果在进行业务处理时发生异常,则调用 channel.basicNack() 方法【如果设置了重回队列,broker 就会将没有成功处理的消息重新发送。否则将该消息从队列中剔除】。

demo演示

自动确认模式:

1、定义一个监听器:AckListener 实现 MessageListener 接口

2、在onMessage方法上绑定要监听的队列

@Component
public class AckListener implements MessageListener { 
    @Override
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void onMessage(Message message) throws Exception {
        System.out.println(new String(message.getBody()));
    }
}

测试
在这里插入图片描述

手动确认模式:

1、设置手动接收:acknowledge-mode: manual

2、定义一个监听器:AckListener 实现 ChannelAwareMessageListener 接口 :(因为此接口才有返回channel参数)

3、在onMessage方法上绑定要监听的队列

4、消息成功处理:调用 channel.basicAck() 接收

5、消息处理失败:调用channel.basicNack()拒绝接收,让broker重新发送给consumer

application.yml配置文件

# 配置RabbitMQ 的基本信息 IP 端口 username pass
spring:
  rabbitmq:
    host: 服务器IP
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual

监听器:AckListener

@Component
public class AckListener implements ChannelAwareMessageListener { // 自动接收确认实现他即可 MessageListener

    @Override
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(1000); // 模拟业务时间
        // 传递标签:该字段为MQ server 用于消息确认的标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1、接收转换消息
            System.out.println(new String(message.getBody()));
            // 2、处理业务逻辑
            Thread.sleep(1000); // 模拟业务时间
            // 3、没问题的话进行手动接收:basicAck(long deliveryTag, boolean multiple)
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            // 4、有问题的话拒接接收:basicNack(long deliveryTag, boolean multiple, boolean requeue)
            // ->requeue:是否重回队列,如果true,则消息重新回到queue,broker会重新发送该消息给消费端
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

测试正常情况下的手动接收代码:

在这里插入图片描述

测试异常情况下的手动接收代码(在处理业务逻辑时加上错误即可):可以看见消息一直被重新入队进行消费

在这里插入图片描述

三、消费端限流

###

限流机制

  • 设置手动接收:acknowledge-mode: manual
  • 设置消费端每次消费消息的条数
    • prefetch = 1
    • 表示消费端每次从MQ拉取一条消息来消费,直至手动确认消费完毕后,才会继续拉取下一条数据
  • 监听器类实现 ChannelAwareMessageListener 接口
    • 消息成功处理:调用 channel.basicAck() 接收
    • 处理失败:调用channel.basicNack()拒绝接收,让broker重新发送给consumer

demo演示

监听器类LimitListener

@Component
public class LimitListener implements ChannelAwareMessageListener { // 自动接收确认实现他即可 MessageListener

    @Override
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(1000); // 模拟业务时间
        // 传递标签:该字段为MQ server 用于消息确认的标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1、接收转换消息
            System.out.println(new String(message.getBody()));
            // 2、处理业务逻辑
            Thread.sleep(1000); // 模拟业务时间
//            int i = 1/0;
            // 3、没问题的话进行手动接收:basicAck(long deliveryTag, boolean multiple)
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            // 4、有问题的话拒接接收:basicNack(long deliveryTag, boolean multiple, boolean requeue)
            // ->requeue:是否重回队列,如果true,则消息重新回到queue,broker会重新发送该消息给消费端
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

消费端:application.yml配置文件

# 配置RabbitMQ 的基本信息 IP 端口 username pass
spring:
  rabbitmq:
    host: 服务器IP
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual
      direct:
        prefetch: 1

生产者测试代码

循环发送数据

@Test
    public void testSend(){
        // 设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);
        // 设置ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            /*
                Message:消息对象
                replyCode:错误码
                replyText:错误信息
                exchange:交换机
                routingKey:路由键
             */
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(message);
            }
        });
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testSend","测试限流-手动接收:"+i);
        }
    }

消息发送完毕后,观察MQ工作台

在这里插入图片描述

消费者测试代码

    @Test
    public void testLimitAck(){
        System.out.println("执行 限流Ack模式");
        while(true){

        }
    }

观察消费端控制台打印

可以看到消息是一条一条消费的(设置了业务时间为2秒),这就是所谓的限流消费。

在这里插入图片描述

四、TTL

TTL 称为 time to live,也就是存活时间,也称过期时间

它的用处是当消息到达存活时间后,如果还没有被消费掉,则会自动清除。此用途也经常被用来做订单的延时付款。

  • 可以对消息设置过期时间使用参数:expiration,单位:毫秒,且当该消息在队列头部时,才会单独判断这一消息是否过期
  • 可以对整个队列所有消息设置过期时间时间一到,队列内全部消息清空使用参数:x-message-ttl,单位:毫秒)
  • 如果两者都进行了设置,则以时间短的为主。(因为 RabbitMQ 是按照消息的过期时间来进行消息的清理的)

且RabbitMQ不保证消息会在精确的TTL时间后立即被删除。这是因为RabbitMQ使用一种基于时间戳的方式来检查消息的过期时间,并且该方式是有一定的误差的

问题一:如果一个rabbitmq队列中同时设置了A消息过期时间,以及队列总体消息过期时间,且A消息设置的过期时间比较短,那么是A先过期还是消息总体一起过期,A消息的过期时间是否会被设置的总体过期时间所覆盖

答:
A 消息会先过期,而不是队列中所有消息一起过期。因为 RabbitMQ 是按照消息的过期时间来进行消息的清理的。当 A 消息过期时,它会被从队列中删除,而不受队列总体消息过期时间的影响。队列总体消息过期时间只会影响那些没有设置过期时间的消息。因此,A 消息的过期时间不会被设置的总体过期时间所覆盖。

问题二:如果一个rabbitmq队列中同时设置了A消息过期时间,以及队列总体消息过期时间,且A消息设置的过期时间比较长,那么A会不会比队列中的其他消息后过期

答:
A 消息会比队列中的其他消息后过期。因为 RabbitMQ 是按照消息的过期时间来进行消息的清理的。当队列中的消息的过期时间早于 A 消息的过期时间时,这些消息会先被删除,而 A 消息会继续存在于队列中,直到其过期时间到达后才会被删除。因此,A 消息会比队列中的其他消息后过期。

问题三:rabbitmq设置队列过期时间 和 设置队列消息过期时间的区别

答:
设置队列过期时间,是指在队列空闲一段时间之后(即没有消费者消费该队列,也没有新消息进入该队列),队列会自动被删除。这个过期时间是应用于整个队列的,而不是具体某一条消息。

设置队列消息过期时间,是指在每一条消息入队时,设置消息的过期时间,当消息在队列中等待时间超过其过期时间时,该消息会被自动删除。这个过期时间是应用于具体某一条消息的,而不是整个队列。

设置队列消息过期使用x-message-ttl参数,而设置队列过期使用x-expires参数
在实际应用中,根据不同的需求,我们可以选择设置队列过期时间或设置队列消息过期时间。如果我们希望在一段时间内没有消费者消费该队列时,自动删除该队列,那么可以设置队列过期时间。如果我们希望在一条消息在队列中存活的时间超过一定时间后自动被删除,那么可以设置队列消息过期时间。

demo演示

1、单独对一个消息设置过期时间,用到MessagePostProcessor后置处理器

    @Test
    public void testOneTtl(){
        // 消息后置处理器,可以设置一些参数
        MessagePostProcessor processor = new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置message的信息
                message.getMessageProperties().setExpiration("5000");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testTTL","测试TTL过期:",processor);
    }

返回观察rabbitMQ控制台,是已经进了1条数据了

在这里插入图片描述

让我们等他个5秒,看他拜拜了没

在这里插入图片描述

2、对整个队列所有消息设置过期时间, 首先需要修改我们之前声明队列的地方(生产者),给它加点参数

在这里新增了一个args参数,作用是声明队列消息TTL以及过期时间

    // 1、注册队列
    @Bean("CSNZQueue")
    public Queue getQueue(){
        // 设置过期时间参数
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
    }

其他的就不需要修改了,直接写测试

    @Test
    public void testTtl(){
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testTTL","测试TTL过期:"+i);
        }
    }

返回观察rabbitMQ控制台,是已经进了10条数据了

在这里插入图片描述

让我们等他个5秒,看他拜拜了没

在这里插入图片描述

官网还告诉我们:The original expiry time of a message is preserved if it is requeued (for example due to the use of an AMQP method that features a requeue parameter, or due to a channel closure).

我知道你们英语四级没过🤭,把翻译也丢过来了:

如果消息被重新排队(例如,由于使用了具有requeue参数的AMQP方法,或者由于通道关闭),则保留消息的原始到期时间。

也就是说如果 rabbitMQ设置A消息过期时间为10秒 此时5秒后A消息因为某种原因被重新排队 那么A消息的剩余过期时间会被重置为10秒,而不是5秒,因为消息过期时间是从消息第一次被发送到队列开始计算的,而不是从消息第一次被消费开始计算的。

五、死信队列(弥补RabbitMQ3.0以前支持的immediate参数的功能)

概述:在所有MQ产品里,此队列都叫死信队列,在RabbitMQ中也不例外,但是它又有点特殊,因为只有RabbitMQ才有交换机的概念,所有在RabbitMQ中又称死信队列为 DLX(Dead Letter Exchange 死信交换机),

当消息称为死信息后,可以被重新发送到另一个交换机,此时另一个交换机就称之为死信交换机(DLX)。

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性

在这里插入图片描述

消息变成死信的三种情况:

  • 1、队列消息长度到达上限,导致消息被丢弃
  • 2、消费者拒绝接收消息,并且不让消息重新入队
  • 3、消息设置的TTL已经到达 超时时间 而仍未被消费

来自官方提示:Note that expiration of a queue will not dead letter the messages in it.
请注意,队列到期不会使其中的消息成为死信。(是队列到期,而不是队列消息到期)

如何将队列绑定至 死信交换机?

To set the dead letter exchange for a queue, specify the optional x-dead-letter-exchange argument when declaring the queue. The value must be an exchange name in the same virtual host:
要为队列设置死信交换,请在声明队列时指定可选的x-dead-letter-exchange参数。该值必须是同一虚拟主机中的exchange名称
You may also specify a routing key to be used when dead-lettering messages. If this is not set, the message’s own routing keys will be used.
您还可以指定在死信消息时使用的路由关键字。如果没有设置,将使用消息自己的路由关键字 args.put(“x-dead-letter-routing-key”, “some-routing-key”);

队列设置参数:

  • 1、x-dead-letter-exchange
  • 2、x-dead-letter-routing-key

注意:只有当原来的队列绑定了死信交换机后,原队列发生消息变成死信消息,此消息才会被死信交换机重新路由到死信队列

demo演示

1、再创建一套DLX队列和交换机

	@Bean("DLXQueue")
    public Queue DLXQueue(){
        return QueueBuilder.durable(DLX_QUEUE_NAME).autoDelete().build();
    }
    @Bean("DLXExchange")
    public Exchange DLXExchange(){
        return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).autoDelete().durable(true).build();
    }
    @Bean
    public Binding bindDLX(){
        return BindingBuilder.bind(DLXQueue()).to(DLXExchange()).with("DLX.#").noargs();
    }

2、原有的队列添加参数,让他绑定DLX交换机和DLX路由键

    @Bean("CSNZQueue")
    public Queue getQueue(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key","DLX.#");
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
    }

3、发送一条没设置过期时间的信息和一条设置10秒过期时间的信息

    @Test
    public void testDLX(){
        // 消息后置处理器,可以设置一些参数
        MessagePostProcessor processor = new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置message的信息
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testDLX","测试DLX",  processor);
    }

4、观察RabbitMQ控制台情况

在这里插入图片描述

发现一开始 原队列中有两条信息,过了十秒后其中设置了过期时间的那条信息转移到了DLX队列,所以最后是各自一条信息

在这里插入图片描述

5、测试队列设置整体消息过期时间,是否会发生信息转移情况

队列设置参数:

  • x-message-ttl
    @Bean("CSNZQueue")
    public Queue getQueue(){
        // 设置过期时间参数
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key","DLX.#");
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
    }

观察控制台,发现数据照样转移

在这里插入图片描述

6、再试试 队列消息长度到达上限,导致消息被丢弃变成死信的情况

队列设置参数:

  • x-max-length
  @Bean("CSNZQueue")
  public Queue getQueue(){
      // 设置过期时间参数
      HashMap<String, Object> args = new HashMap<>();
      args.put("x-max-length",5);
      args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
      args.put("x-dead-letter-routing-key","DLX.#");
      return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
  }

直接测试10条数据,只有5条在原队列,另外5条去了死信队列

在这里插入图片描述

7、测试 设置队列到期时间,是不是跟官网讲的一样 => 队列到期不会使其中的消息成为死信

队列设置参数:

  • x-expires

修改原队列,设置队列的过期时间,我这设置队列的过期时间为20秒!以及配置死信队列,让原队列绑定死信交换机

	@Bean("CSNZQueue")
    public Queue getQueue(){
        // 设置过期时间参数
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-expires",20000);
        args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key","DLX.#");
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
    }
	// 2、注册交换机
    @Bean("CSNZExchange")
    public Exchange getExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).autoDelete().durable(true).build();
	//  return ExchangeBuilder.directExchange(EXCHANGE_NAME).autoDelete().durable(true).build();
    }
    // 3、绑定队列和交换机
    @Bean("CSNZBind")
    public Binding bindQueueExchange(@Qualifier("CSNZQueue")Queue queue,@Qualifier("CSNZExchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("Delay.#").noargs();
    }

    /*
        以下声明 死信交换机
    */
    @Bean("DLXQueue")
    public Queue DLXQueue(){
        return QueueBuilder.durable(DLX_QUEUE_NAME).autoDelete().build();
    }
    @Bean("DLXExchange")
    public Exchange DLXExchange(){
        return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).autoDelete().durable(true).build();
    }
    @Bean
    public Binding bindDLX(){
        return BindingBuilder.bind(DLXQueue()).to(DLXExchange()).with("DLX.#").noargs();
    }

编写一个测试方法 => 测试队列过期是否其中的信息会成为死信

    @Test
    public void testDelayMessageWithExpires(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"Delay.testDelay","测试队列过期是否其中的信息会成为死信",message -> {
            return message;
        });
    }

运行测试方法,结果如下,原队列中有一条我们刚刚录入的信息

在这里插入图片描述

等待二十秒,看是否此信息会从原队列 去往死信队列

在这里插入图片描述

发现原队列确实过期自动删除了,但是原队列中的信息并没有去往死信队列,证明官网没骗人😁

队列到期不会使其中的消息成为死信。(是队列到期,而不是队列消息到期)

六、延迟队列

所谓延迟队列,就是消息进入队列之后不会立即被消费掉,而是等到指定时间后,才会被消费

相关场景:某多多下单后,30分钟内客户未支付,则自动取消此订单,库存回滚。

在这里插入图片描述
在这里插入图片描述

但是RabbitMQ中并没有提供延迟队列这一功能,有两种方式可以实现:

  • 1、靠 TTL + 死信队列 组合实现延迟队列的效果。

    • 缺点:两个交换机、两个队列,且RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,但是如果第一个消息设置的过期时间很长,而后续消息设置的过期时间很短,则会导致后续的消息一直不会被过期处理(不会按时过期)
      在这里插入图片描述
  • 2、使用 rabbitmq-delayed-message-exchange 插件来实现(RabbitMQ 3.5.7及以上的版本、Erlang/OPT 18.0及以上)

    • 优点: 只需要一个交换机、一个队列,且不会出现只判断第一个消息的情况,会根据过期时间优先处理。
      在这里插入图片描述

demo演示

1、靠 TTL + 死信队列 组合实现延迟队列的效果

监听器监听死信队列

@Component
public class DelayListener implements ChannelAwareMessageListener { // 自动接收确认实现他即可 MessageListener

    @Override
    @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE_NAME)
    public void onMessage(Message message, Channel channel) throws Exception {
        // 传递标签:该字段为MQ server 用于消息确认的标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1、接收转换消息
            System.out.println(new String(message.getBody()));
            // 2、处理业务逻辑
            Thread.sleep(1000); // 模拟业务时间
            // 3、没问题的话进行手动接收:basicAck(long deliveryTag, boolean multiple)
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            // 4、有问题的话拒接接收:basicNack(long deliveryTag, boolean multiple, boolean requeue)
            // ->requeue:是否重回队列,如果true,则消息重新回到queue,broker会重新发送该消息给消费端
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

注册正常队列、死信队列,并让正常队列绑定死信交换机,且设置正常队列的过期时间为10秒

@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_NAME = "csnz_queue";
    public static final String EXCHANGE_NAME = "csnz_exchange";
    public static final String DLX_QUEUE_NAME = "DLX_QUEUE";
    public static final String DLX_EXCHANGE_NAME = "DLX_EXCHANGE";
    // 1、注册队列
    @Bean("CSNZQueue")
    public Queue getQueue(){
        // 设置过期时间参数
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl",10000);
        args.put("x-max-length",5);
        args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
        args.put("x-dead-letter-routing-key","DLX.#");
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).autoDelete().build();
    }
    // 2、注册交换机
    @Bean("CSNZExchange")
    public Exchange getExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).autoDelete().durable(true).build();
    }
    // 3、绑定队列和交换机
    @Bean("CSNZBind")
    public Binding bindQueueExchange(@Qualifier("CSNZQueue")Queue queue,@Qualifier("CSNZExchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("csnz.#").noargs();
    }

    /*
        以下声明 死信交换机
     */
    @Bean("DLXQueue")
    public Queue DLXQueue(){
        return QueueBuilder.durable(DLX_QUEUE_NAME).autoDelete().build();
    }
    @Bean("DLXExchange")
    public Exchange DLXExchange(){
        return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).autoDelete().durable(true).build();
    }
    @Bean
    public Binding bindDLX(){
        return BindingBuilder.bind(DLXQueue()).to(DLXExchange()).with("DLX.#").noargs();
    }
}

编写测试类

生产者端:

    @Test
    public void testDelay() throws InterruptedException {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"csnz.testDelay","测试延迟队列");
        for (int i = 0; i < 15; i++) {
            System.out.println(i);
            Thread.sleep(1000);
        }
    }

消费者端:

    @Test
    public void testDelay(){
        System.out.println("Delay模式");
        while(true){

        }
    }

运行即可发现,发送消息过了10秒后,消息会到死信队列中,此时再在死信队列中执行逻辑代码即可实现延迟队列功能。

2、使用 rabbitmq-delayed-message-exchange 插件来实现(RabbitMQ 3.5.7及以上的版本、Erlang/OPT 18.0及以上)

RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送

插件官网下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases(记得适配版本号)
下载完成后将其解压在plugins文件夹下
在这里插入图片描述

运行cmd切换到rabbitMQ的sbin目录下执行:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述
重启一下rabbitMQ服务
使用管理员运行CMD

在这里插入图片描述
去RabbitMQ的控制台看看,插件有没有加载成功
在这里插入图片描述

注册队列,交换机

    @Bean("DelayQueue")
    public Queue DelayQueue(){
        return QueueBuilder.durable(DELAY_QUEUE_NAME).autoDelete().build();
    }
    @Bean("DelayExchange")
    public CustomExchange DelayExchange(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type","direct");
        return new CustomExchange(DELAY_EXCHANGE_NAME," x-delayed-message",true,true,args);
    }
    @Bean
    public Binding bindDelay(){
        return BindingBuilder.bind(DelayQueue()).to(DelayExchange()).with("Delay.#").noargs();
    }

生产者代码

    @Test
    public void testDelayMessage(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME,"Delay.testDelay","测试延迟插件队列",message -> {
            // setHeader 为延时时间 不填及为及时发送
            message.getMessageProperties().setDelay(5000);
            return message;
        });
    }

最终实现的效果就是发送的消息得等到5秒后才进入延迟队列,此时一步到位
此时需要注意的是,延迟队列插件的实现方式是通过在消息发布时设置消息的过期时间来实现的,因此在发送消息时,其实是MQ自动将消息的过期时间设置为当前时间加上延迟时间。

七、优先级队列(3.5.0以上版本)

人如其名,优先级队列即优先级比较高,优先被消费

一般通过x-max-priority 参数设置优先级队列的最大值

官方推荐参数设置:支持有限数量的优先级:255。建议使用1到10之间的值,数字越大表示优先级越高,没有设置优先级的消息被视为优先级为0

队列需要设置为优先级队列,消息需要设置优先级(在MQ出现消息堆积情况下、及消费速度小于生产速度,优先级才有意义

交换机设置最大优先级参数x-max-priority

    @Bean("DelayExchange")
    public Exchange DelayExchange(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type","topic");
        args.put("x-max-priority", 10);
        return new CustomExchange(DELAY_EXCHANGE_NAME," x-delayed-message",true,true,args);
    }

测试发送的消息设置优先级setPriority 级别 ,这里可以去掉设置优先级,然后多发几条,最后发现有设置优先级的消息最先被消费

    @Test
    public void testFirstMessage(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME,"Delay.testFirst","测试延迟插件队列",message -> {
            // setPriority 为设置此条数据的优先级
            message.getMessageProperties().setPriority(5);
            return message;
        });
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/23878.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

没有权限merge到源git仓库?一招教你如何解决。

在git上贡献项目的时候&#xff0c;一般步骤是&#xff0c;clone源项目到本地&#xff0c;切出一个新的分支&#xff0c;然后在新分支上开发&#xff0c;最后push到远程&#xff0c;然后提出mr。但是对于一些非开源的项目&#xff0c;可能会出现&#xff1a; 这就是说明没有权…

【C++】布隆过滤器

文章目录 布隆过滤器的引入布隆过滤器的概念如何选择哈希函数个数和布隆过滤器长度布隆过滤器的实现布隆过滤器的优缺点 布隆过滤器的引入 我们在使用新闻客户端看新闻时&#xff0c;它会给我们不停地推荐新的内容&#xff0c;它每次推荐时要去重&#xff0c;去掉那些已经看过…

【高级语言程序设计(一)】第 8 章:结构体类型和自定义类型

目录 前言 一、结构体类型定义 &#xff08;1&#xff09;结构体类型定义的一般形式 &#xff08;2&#xff09;结构体类型定义的说明 二、结构体类型变量 &#xff08;1&#xff09;结构体类型变量的定义和初始化 ① 先定义结构体类型、后定义结构体类型的变量&#xf…

84.Rem和max-width如何工作

max-width 首先我们先看普通的width是什么样的效果&#xff01; 首先给个测试的div <div class"test">TEST</div>● 然后CSS给定一个宽度 .test {width: 1000px;background-color: red;padding: 100px; }如上图&#xff0c;不管你的浏览器窗口如何改变…

HTMLCSS中的树形结构图

我们可以只使用 html 和 css 创建树视图(可折叠列表) &#xff0c;而不需要 JavaScript。可访问性软件将看到树形视图作为列表嵌套在披露窗口小部件中&#xff0c;并且自动支持标准键盘交互。 1、HTML 我们就从简单嵌套列表的 html 开始: <ul><li>Giant planets&…

Hbase操作

(1) 启动 启动顺序&#xff1a;Hadoop--zookeeper—hbase 主进程&#xff1a;HMaster 从进程&#xff1a;HRegionServer 确认进程是否正常 (2) 进入终端 [rootmaster ~]# hbase shell (3) 查看状态 命令&#xff1a;status 表示有3台机器&#xff0c;0台down掉&…

位操作集锦

位操作集锦 异或运算两两交换数据签名检测两个数是否拥有不同的符号&#xff0c;即一个正数&#xff0c;一个负数寻找只出现一次的一个数字1寻找只出现两次的一个数字寻找只出现一次的一个数字2寻找只出现一次的两个数字 与和位移运算判断奇偶数二进制数中1的个数二进制数中最右…

MFC 给对话框添加图片背景

在windows开发当中做界面的主要技术之一就是使用MFC&#xff0c;通常我们看到的QQ,360,暴风影音这些漂亮的界面都可以用MFC来实现。今天我们来说一下如何用MFC美化对话框&#xff0c;默认情况下&#xff0c;对话框的背景如下&#xff1a; 那么&#xff0c;我们如何将它的背景变…

C++服务器框架开发3——协程与线程的简单理解/并发与并行

该专栏记录了在学习一个开发项目的过程中遇到的疑惑和问题。 其教学视频见&#xff1a;[C高级教程]从零开始开发服务器框架(sylar) 上一篇&#xff1a;C服务器框架开发2——头文件memory/typedef C服务器框架开发3——协程与线程的简单理解/并发与并行 目前进度协程与线程的简…

json-server的基本使用

1、mock是什么&#xff1f; mockjs 作用&#xff1a;生成随机数据&#xff0c;拦截 Ajax 请求 目的&#xff1a;很多时候前端开发页面的过程中&#xff0c;后端的接口并没有写好&#xff0c;这个时候需要前端自己定义接口及接口的返回数据的结构体&#xff0c;这个时候就需要…

ReactRouterDom-v5v6用法与异同

本文作者系360奇舞团前端开发工程师 简介&#xff1a; React Router Dom是React.js中用于实现路由功能的常用库。在React应用中&#xff0c;路由可以帮助我们管理页面之间的导航和状态&#xff0c;并实现动态加载组件。本文将深入探讨React Router Dom的两个主要版本&#xff1…

【微电网】含风、光、储联合发电的微电网优化调度研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Jupyter程序安装和使用指南【操作示例】

Jupyter Notebook(简称Jupyter)是一个交互式编辑器&#xff0c;它支持运行40多种编程语言&#xff0c;便于创建和共享文档。Jupyter本质上是一个Web应用程序&#xff0c;与其他编辑器相比&#xff0c;它具有小巧、灵活、支持实时代码、方便图表展示等优点。下面分别为大家演示如…

辅助生成: 低延迟文本生成的新方向

大型语言模型如今风靡一时&#xff0c;许多公司投入大量资源来扩展它们规模并解锁新功能。然而&#xff0c;作为注意力持续时间不断缩短的人类&#xff0c;我们并不喜欢大模型缓慢的响应时间。由于延迟对于良好的用户体验至关重要&#xff0c;人们通常使用较小的模型来完成任务…

EnjoyVIID部署

1、下载 git clone https://gitee.com/tsingeye/EnjoyVIID.git 2、导入数据库 创建表enjoyviid 导入数据库(修改数据库文件里的编码) EnjoyVIID/sql/tsingeye-viid.sql 3、修改配置 vim EnjoyVIID/tsingeye-admin/src/main/resources/application-dev.yml 修改数据库连接、re…

接口测试--apipost接口断言详解

在做接口测试的时候&#xff0c;会对接口进行断言&#xff0c;一个完整的接口测试&#xff0c;包括&#xff1a;请求->获取响应正文->断言。 一、apipost如何进行断言 apipost的断言设置实在后执行脚本中进行编写的。apipost本身提供了11中断言&#xff1a; apt.asser…

Linux-0.11 kernel目录进程管理asm.s详解

Linux-0.11 kernel目录进程管理asm.s详解 模块简介 该模块和CPU异常处理相关&#xff0c;在代码结构上asm.s和traps.c强相关。 CPU探测到异常时&#xff0c;主要分为两种处理方式&#xff0c;一种是有错误码&#xff0c;另一种是没有错误码&#xff0c;对应的方法就是error_c…

URP自定义屏幕后处理

回到目录 大家好&#xff0c;我是阿赵。这次来说一下URP渲染管线里面怎样使用后处理效果&#xff0c;还有怎样去自定义后处理效果。 一、使用URP自带的后处理效果 要使用URP自带的后处理效果&#xff0c;方法很简单&#xff0c;和Unity内置渲染管线的PostProcessing后处理很…

任务7 课程信息管理系统

系列文章 任务7 课程信息管理系统 已知课程的信息包括&#xff1a;课程编号&#xff0c;课程名称&#xff0c;课程性质&#xff08;必修、选修&#xff09;&#xff0c;课时&#xff0c;学分&#xff0c;考核方式&#xff08;考试、考查课&#xff09;&#xff0c;开课学期&a…

Ubuntu22.04安装MySQL8

在 Ubuntu 22.04 上安装 MySQL 8&#xff0c;可以按照以下步骤进行&#xff1a; 安装MySQL需要在root用户下 sudo su -更新软件包列表&#xff1a; sudo apt update安装 MySQL 8&#xff1a; sudo apt install mysql-server安装过程中会提示设置 MySQL root 用户的密码。 确认…