RabbitMQ三、springboot整合rabbitmq(消息可靠性、高级特性)

一、springboot整合RabbitMQ(jdk17)(创建两个项目,一个生产者项目,一个消费者项目)

  1. 上面使用原生JAVA操作RabbitMQ较为繁琐,很多的代码都是重复书写的,使用springboot可以简化代码的编写。

生产者项目

在这里插入图片描述

第一步:创建springboot工程,然后引入rabbitmq的依赖

<!-- RabbitMQ起步依赖 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:编写配置文件

spring:
  rabbitmq:
   host: 192.168.70.130  # 虚拟机的地址
   port: 5672
   username: admin
   password: admin
   virtual-host: /


#日志格式
logging:
  pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

第三步:编写RabbitMQ的配置类

@Configuration
public class RabbitmqConfig1 {

    private final String EXCHANGE_NAME = "boot_exchange";
    private final String QUEUE_NAME = "boot_queue";
    private final String ROUTE_NAME = "boot_route";

    //创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange getExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    //创建队列
    @Bean(QUEUE_NAME)
    public Queue getQueue(){
        return new Queue(QUEUE_NAME);
    }
    //交换机和队列绑定
    @Bean
    public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTE_NAME).noargs();
    }
}

第四步:编写发送消息测试类

//编写发送消息测试类
@SpringBootTest
public class RabbitmqTest {

    // 注入RabbitTemplate工具类
    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Test
    public void testSendMessage(){
        /**
         * 发送消息
         * 参数1:交换机
         * 参数2:路由key
         * 参数3:要发送的消息
         */
        rabbitTemplate.convertAndSend("boot_exchange","boot_route","你好我有一个毛衫");
        System.out.println("发送消息成功");
    }
}

消费者项目

在这里插入图片描述

第一步:创建springboot工程,然后引入rabbitmq的依赖

<!-- RabbitMQ起步依赖 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:编写配置文件

spring:
  rabbitmq:
   host: 192.168.70.130  # 虚拟机的地址
   port: 5672
   username: admin
   password: admin
   virtual-host: /


#日志格式
logging:
  pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

第三步:编写消费者,监听队列

@Component
public class Consumer1 {
    /**
     * 监听队列
     * @param message
     * queues表示监听的队列的名称
     */

    @RabbitListener(queues = "boot_queue")
    public void listener(String message){
        System.out.println("接受到消息 = " + message);
    }
}


4、rabbitmq的消息可靠性

  1. RabbitMQ消息投递的路径为:
    生产者--->交换机--->队列--->消费者

  2. 在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?

      1. 确认模式(confirm):可以监听消息是否从生产者成功传递到交换机
      1. 退回模式(return):可以监听消息是否从交换机成功传递到队列
      1. 消费者消息确认(Consumer Ack):可以监听消费者是否成功处理消息。

【一】rabbitmq的消息可靠性——确认模式

  1. 确认模式(confirm):可以监听消息是否从生产者成功传递到交换机
  2. 创建一个新的生产者项目,导入mq(上面的第一步操作)依赖进行开发:(也可以在原来的基础上修改信息)
    • 代码组成和上面的生产者项目是一样的,也是三步内容。
第一步:修改配置文件

只是添加了一句代码
在这里插入图片描述

spring:
  rabbitmq:
    host: 192.168.70.130
    port: 5672
    username: admin
    password: admin
    virtual-host: / # 表示使用默认的virtual-host
    #开启确认模式
    publisher-confirm-type: correlated


#????
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第二步:在生产者的配置类创建交换机和队列(RabbitMQ的配置类)
@Configuration
public class RabbitmqConfig2Confirm {

    public final String EXCHANGE_NAME = "confirm_exchange";
    public final String QUEUE_NAME = "confirm_queue";
    public final String ROUTING_NAME = "confirm_routing";
    
//    创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange exchange(){
        return ExchangeBuilder
                .topicExchange(EXCHANGE_NAME)
                .durable(true)
                .build();
    }

//    创建队列
    @Bean(QUEUE_NAME)
    public Queue queue(){
        return QueueBuilder
                .durable(QUEUE_NAME)
                .build();
    }
//    创建交换机和队列绑定
    @Bean
    public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder
                .bind(queue).
                to(exchange)
                .with(ROUTING_NAME)
                .noargs();
    }
}

第三步:编写测试类发生消息:生产者定义确认模式的回调方法(springboot的测试类,能够加载到第二步的配置类)
 @Test
    void testConfirm() {
        //回调确认
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 配置信息
             * @param b 是否成功,true 是 ,false 否
             * @param s 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if (b){
                    System.out.println("发送成功");
                }else{
                    System.out.println("发送失败,原因:"+s);
                }
            }
        });
        //发送消息
          /**
         * 发送消息
         * 参数1:交换机
         * 参数2:路由key
         * 参数3:要发送的消息
         */
         rabbitTemplate.convertAndSend("confirm_exchange","confirm_routing","send message...confirm");
    }

由于rabbitmq的confirm确认模式是确认消息是否从生产者成功传递到交换机的,所以就没必要写消费者进行信息的消费了

  • 当我们执行测试类的时候,先执行rabbitTemplate.convertAndSend(“confirm_exchange”,“confirm_routing”,“send message…confirm”);,无论消息是否成功发送都会调用 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()方法,如果发送成功则执行if语句的代码,如果发送失败则调用else语句的代码。
    • 根据执行的是if或者else的语句,就能判断消息是否成功传递到交换机了。

【二】rabbitmq的消息可靠性——退回模式

  1. 退回模式(return):可以监听消息是否从交换机成功传递到队列
  2. 创建一个新的生产者项目,导入mq(上面的第一步操作)依赖进行开发:(也可以在原来的基础上修改信息)
    • 代码组成和上面的生产者项目是一样的,也是三步内容。
第一步:修改配置文件

只是添加了一句
在这里插入图片描述

# rabbitmq???
spring:
  rabbitmq:
    host: 192.168.70.130
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    #开启确认模式
    publisher-confirm-type: correlated
    #开始回退模式
    publisher-returns: true


#????
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第二步:编写配置类(RabbitMQ的配置类)
@Configuration
public class RabbitmqConfig3Return {

    public final String EXCHANGE_NAME = "return_exchange";
    public final String QUEUE_NAME = "return_queue";
    public final String ROUTING_NAME = "return_routing";
//    创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

//    创建队列
    @Bean(QUEUE_NAME)
    public Queue queue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

//    创建交换机和队列绑定
    @Bean
    public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ROUTING_NAME)
                .noargs();
    }
}

第三步:编写测试类发生消息:生产者定义退回模式的回调方法(springboot的测试类,能够加载到第二步的配置类)
@Test
    void testReturnSendMessage(){
//        调用回退模式的回调方法,只有失败才会回调,成功不会回调哦
// 失败后将失败信息封装到参数中
        rabbitTemplate.setReturnsCallback(returned ->{
            Message message = returned.getMessage();
            System.out.println("消息对象:"+message);
            System.out.println("错误码:"+returned.getReplyCode());
            System.out.println("错误信息:"+returned.getReplyText());
            System.out.println("交换机:"+returned.getExchange());
            System.out.println("路由键:"+returned.getRoutingKey());
        });

//        发送消息
           /**
         * 发送消息
         * 参数1:交换机
         * 参数2:路由key
         * 参数3:要发送的消息
         */
         rabbitTemplate.convertAndSend("return_exchange","return_routing","send message...return");
    }

由于rabbitmq的return回退模式是确认消息是否从交换机成功传递到队列的,还没有传递到消费者,所以就没必要写消费者进行信息的消费了

  • 当我们执行测试类的时候,先执行rabbitTemplate.convertAndSend(“return_exchange”,“return_routing”,“send message…return”);,如果消息成功发送到队列上则不会调用 rabbitTemplate.setReturnsCallback方法,如果发送步成功则调用回调方法rabbitTemplate.setReturnsCallback,
    • 根据运行结果就可以知道在传递消息到队列上的时候哪里发生错误了

在这里插入图片描述

【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)

  1. 在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)
    • 类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。
  2. 消费者消息确认(Consumer Acknowledge,简称Ack)分为自动确认手动确认
    • 自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除
    • 但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功后再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

● 自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
● 手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”

  1. 创建一个新的生产者项目和新的消费者项目,导入mq(上面的第一步操作)依赖进行开发:(也可以在原来的基础上修改信息)
    • 代码组成和上面的生产者项目是一样的,也是三步内容。
生产者项目:第一步:修改配置文件

不用修改

# rabbitmq???
spring:
  rabbitmq:
    host: 192.168.70.130
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    #开启确认模式
    publisher-confirm-type: correlated
    #开始回退模式
    publisher-returns: true


#????
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
生产者项目:第二步:编写配置类(RabbitMQ的配置类)
@Configuration
public class RabbitmqConfig4ACK {

    public final String EXCHANGE_NAME = "ack_exchange";
    public final String QUEUE_NAME = "ack_queue";
    public final String ROUTING_NAME = "ack_routing";
//    创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

//    创建队列
    @Bean(QUEUE_NAME)
    public Queue queue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

//    创建交换机和队列绑定
    @Bean
    public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ROUTING_NAME)
                .noargs();
    }
}
生产者项目:第三步:编写测试类发生消息:(springboot的测试类,能够加载到第二步的配置类)
 @Test
    void testAck(){
        //        发送消息
        rabbitTemplate.convertAndSend("ack_exchange","ack_routing","send message...ack");
    }
消费者项目(自动确认):第一步:修改配置文件
  • 消费者消息确认——自动确认的配置文件
spring:
  rabbitmq:
    host: 192.168.70.130
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    #开动手动签收
    listener:
      simple:
        acknowledge-mode: none   # 默认就是自动确认
#????
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

在这里插入图片描述

  • 自动签收模式就是:消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。当我们拿到消息的时候,业务出现异常了,所以无法正确处理消息,导致消息丢失了。
消费者项目(自动确认):第二步:编写消费者类,监听队列
  • 自动确认的消费者类
@Component
public class AckConsumer {
//    自动签收
    @RabbitListener(queues = "ack_queue")
    public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        获取消息
        String s = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println(s);
//        TODO,处理事务
//        故意出错
        int i= 1/0;
    }


}

消费者项目(手动确认):第一步:修改配置文件
  • 消费者消息确认——手动确认的配置文件
spring:
  rabbitmq:
    host: 192.168.70.130
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    #开动手动签收
    listener:
      simple:
        acknowledge-mode: manual  
#????
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
消费者项目(手动确认):第二步:编写消费者类,监听队列
  • 手动确认
@Component
public class AckConsumer {
    //    手动签收
    @RabbitListener(queues = "ack_queue")
    public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 消息投递序号,消息每次投递该值都会+1
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
//            int i = 1/0; //模拟处理消息出现bug
            System.out.println("成功接受到消息:"+message);

            // 签收消息
            /**
             * 参数1:消息投递序号
             * 参数2:是否一次可以签收多条消息
             */
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            System.out.println("消息消费失败!");
            Thread.sleep(2000);
            // 拒签消息
            /**
             * 参数1:消息投递序号
             * 参数2:是否一次可以拒签多条消息
             * 参数3:拒签后消息是否重回队列
             */
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

在这里插入图片描述

在这里插入图片描述

  • 手动签收模式就是:如果出现异常,则拒签消息,让消息依然保留在队列当中。方便下次请求能够请求到这次因为异常而没有接收到的消息。

【四】RabbitMQ高级特性——消费端限流

在这里插入图片描述

  • 前面说过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。
  • 使用【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)的项目,消费者使用手动确认模式的代码即可(但是要修改配置文件)
第一步:先在生产者项目中,发送多个消息
@Test
    public void testLimitSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", "这是第"+i+"条消息");
        }
    }
第二步:修改消费者项目的配置文件

最主要就是配置文件的修改:
在这里插入图片描述

spring:
  rabbitmq:
    host: 192.168.70.130
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    #开动手动签收
    listener:
      simple:
        acknowledge-mode: manual  #none是默认的
        prefetch: 5  # 每次消费者从队列拉取的消息数量(限制)

#????
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
第三步:重新编写消费者类
@Component
public class ConsumerLimit {
//    手动签收
    @RabbitListener(queues = "limit_queue")
    public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        获取消息
        String s = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println(s);
        //        模拟业务处理
        Thread.sleep(3000);

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手动签收
        channel.basicAck(deliveryTag,true);
    }


}

  • 其实就是修改了消费者项目的配置文件,添加一条配置信息,限制消费者消息的拉取速度。
    在这里插入图片描述

【五】RabbitMQ高级特性——利用限流实现不公平分发

  1. 在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。
  • 在【四】RabbitMQ高级特性——消费端限流的基础上,修改一消费者项目的配置文件,然后在消费者类中多写几个监听消息的方法(或者多写几个消费者类)。
第一步:修改消费者项目的配置文件

最主要就是配置文件的修改:
在这里插入图片描述

spring:
  rabbitmq:
    host: 192.168.70.130
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    #开动手动签收
    listener:
      simple:
        acknowledge-mode: manual  #none是默认的
        prefetch: 1  #  消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发

#????
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'


第二步:修改消费者类,编写多个监听方法
@Component
public class ConsumerUnfair {
//  消费者1
    @RabbitListener(queues = "ack_queue")
    public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        获取消息
        String s = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("消费者1"+s);
        Thread.sleep(3000);

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手动签收
        channel.basicAck(deliveryTag,true);
    }

    //    消费者2
    @RabbitListener(queues = "ack_queue")
    public void listenMessage2(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        获取消息
        String s = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("消费者2"+s);
        Thread.sleep(1000);

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手动签收
        channel.basicAck(deliveryTag,true);
    }

// .......监听方法
}


  • 最主要的就是消费者项目的配置文件的修改: 配置消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发。

【六】RabbitMQ高级特性——消息存活时间

  1. RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。
  • 使用【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)的项目,消费者使用手动确认模式的代码
第一步:修改生产者项目的配置类

在这里插入图片描述

@Configuration
public class RabbitmqConfig7ttl {

    public final String EXCHANGE_NAME = "ack_exchange";
    public final String QUEUE_NAME = "ack_queue";
    public final String ROUTING_NAME = "ack_routing";
//    创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

//    创建队列
    @Bean(QUEUE_NAME)
    public Queue queue(){
        return QueueBuilder
                .durable(QUEUE_NAME)
//                设置队列的超时的时间,单位是毫秒
                .ttl(10000)
                .build();
    }

//    创建交换机和队列绑定
    @Bean
    public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ROUTING_NAME)
                .noargs();
    }
}


第二步:修改生产者项目的测试类

设置单条消息存活时间
在这里插入图片描述

 @Test
    public void testTtlSendBatch() {
        // 发送十条消息
        for (int i = 0; i < 100; i++) {
            if (i%5 == 0) {
                //设置消息属性
                MessageProperties messageProperties = new MessageProperties();
                //设置存活时间
                messageProperties.setExpiration("10000");
                // 创建消息对象(可以配置消息的一些配置)
                Message message = new Message(("这是第" + i + "条消息").getBytes(StandardCharsets.UTF_8), messageProperties);
                // 发送消息
                rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", message);
            }else {
                rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", "这是第" + i + "条消息");
            }
        }
    }
    1. 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间的为准。
    1. 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息

【七】RabbitMQ高级特性——优先级队列

  1. 假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。
  • 使用【三】rabbitmq的消息可靠性——消费者消息确认(Consumer Ack)的项目,消费者使用手动确认模式的代码
第一步:修改生产者项目的配置类

在这里插入图片描述

@Configuration
public class RabbitmqConfig8Priority {

    public final String EXCHANGE_NAME = "priority_exchange";
    public final String QUEUE_NAME = "priority_queue";
    public final String ROUTING_NAME = "priority_routing";
//    创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

//    创建队列
    @Bean(QUEUE_NAME)
    public Queue queue(){
        return QueueBuilder
                .durable(QUEUE_NAME)
//                设置队列的优先级,值越大优先级越高,一般不超过10
                .maxPriority(10)
                .build();
    }

//    创建交换机和队列绑定
    @Bean
    public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ROUTING_NAME)
                .noargs();
    }
}

第二步:修改生产者项目的测试
 @Test
    public void testPrioritySendBatch() {
        // 发送十条消息
        for (int i = 0; i < 100; i++) {
            if (i%5 == 0) {
                //设置消息属性
                MessageProperties messageProperties = new MessageProperties();
//             设置优先级
                messageProperties.setPriority(9);
                // 创建消息对象(可以配置消息的一些配置)
                Message message = new Message(("这是第" + i + "条消息").getBytes(StandardCharsets.UTF_8), messageProperties);
                // 发送消息
                rabbitTemplate.convertAndSend("priority_exchange", "priority_routing", message);
            }else {
                rabbitTemplate.convertAndSend("priority_exchange", "priority_routing", "这是第" + i + "条消息");
            }
        }
    }
  • 设置了消息的优先级,那么消费者项目在消费消息的时候就会优先消费等级高的消息。

【八】RabbitMQ高级特性——死信队列

  1. 在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,当前队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。
    在这里插入图片描述
  2. 消息成为死信的情况:
      1. 队列消息长度到达限制。
      1. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
      1. 消息到达存活时间未被消费。
生产者项目:第一步:修改配置文件
# rabbitmq???
spring:
  rabbitmq:
    host: 192.168.70.130
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    #开启确认模式
    publisher-confirm-type: correlated
    #开始回退模式
    publisher-returns: true


#????
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
生产者项目:第二步:编写配置类(RabbitMQ的配置类)
@Configuration
public class RabbitmqConfig9Dead {

//    死信
    private final String DEAD_EXCHANGE = "dead_exchange";
    private final String DEAD_QUEUE = "dead_queue";
    private final String DEAD_ROUTING = "dead_routing";

    // 死信交换机
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange(){
        return ExchangeBuilder
                .topicExchange(DEAD_EXCHANGE)
                .durable(true)
                .build();
    }


    // 死信队列
    @Bean(DEAD_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(DEAD_QUEUE)
                .build();
    }


    // 死信交换机绑定死信队列
    @Bean
    public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,@Qualifier(DEAD_QUEUE)Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(DEAD_ROUTING)
                .noargs();
    }

    // 普通
    private final String NORMAL_EXCHANGE = "normal_exchange";
    private final String NORMAL_QUEUE = "normal_queue";
    private final String NORMAL_ROUTING = "normal_routing";

    // 普通交换机
    @Bean(NORMAL_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
                .topicExchange(NORMAL_EXCHANGE)
                .durable(true)
                .build();
    }


    // 普通队列
    @Bean(NORMAL_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
                .durable(NORMAL_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
                .deadLetterRoutingKey(DEAD_ROUTING) // 死信队列路由关键字
                .ttl(10000) // 消息存活10s
                .maxLength(10) // 队列最大长度为10
                .build();
    }


    // 普通交换机绑定普通队列
    @Bean
    public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,@Qualifier(NORMAL_QUEUE)Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(NORMAL_ROUTING)
                .noargs();
    }
}

生产者项目:第三步:编写测试类发生消息:(springboot的测试类,能够加载到第二步的配置类)
@Test
public void testDlx(){
  // 存活时间过期后变成死信
  //     rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
  
  // 超过队列长度后变成死信
  //     for (int i = 0; i < 20; i++) {
  //       rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
  //     }
  
  // 消息拒签但不返回原队列后变成死信
  rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
}
消费者项目(手动确认):第一步:修改配置文件
  • 消费者消息确认——手动确认的配置文件
spring:
  rabbitmq:
    host: 192.168.70.130
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    #开动手动签收
    listener:
      simple:
        acknowledge-mode: manual  
#????
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'
消费者项目(手动确认):第二步:编写消费者类,监听队列
  • 手动确认
@Component
public class ConsumerDead {

    @RabbitListener(queues = "normal_queue")
    public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
//        获取消息
        String s = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("消费者1"+s);
        Thread.sleep(500);

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 拒绝签收
        channel.basicNack(deliveryTag,true,false);
    }

  • 死信队列小结
      1. 死信交换机和死信队列和普通的没有区别
      1. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
      1. 消息成为死信的三种情况:
        1. 队列消息长度到达限制;
        1. 消费者拒接消费消息,并且不重回队列;
        1. 原队列存在消息过期设置,消息到达超时时间未被消费;

【九】RabbitMQ高级特性——延迟队列

  1. 延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
    • 例如:
        1. 下单后,30分钟未支付,取消订单,回滚库存。
        1. 新用户注册成功7天后,发送短信问候。
        • 实现方式:
            1. 定时器
            1. 延迟队列
              在这里插入图片描述
  • RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。
    在这里插入图片描述
    1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
    1. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。
      在这里插入图片描述
第一步:创建springboot项目并添加依赖
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

第二步:编写配置文件
spring:
  rabbitmq:
    host: 192.168.70.130
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    #开动手动签收
    listener:
      simple:
        acknowledge-mode: manual
# ????
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'


第三步:编写配置类
@Configuration
public class RabbitMQConfig {
    private final String DEAD_EXCHANGE = "order_expire_exchange";
    private final String DEAD_QUEUE = "order_expire_queue";
    private final String DEAD_ROUTING = "order_expire_routing";


    private final String ORDER_EXCHANGE = "order_exchange";
    private final String ORDER_QUEUE = "order_queue";
    private final String ORDER_ROUTING = "order_routing";




    // 死信交换机
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange(){
        return ExchangeBuilder
                .topicExchange(DEAD_EXCHANGE)
                .durable(true)
                .build();
    }


    // 死信队列
    @Bean(DEAD_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
                .durable(DEAD_QUEUE)
                .build();
    }


    // 死信交换机绑定死信队列
    @Bean
    public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE)Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(DEAD_ROUTING)
                .noargs();
    }


    // 普通交换机
    @Bean(ORDER_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
                .topicExchange(ORDER_EXCHANGE)
                .durable(true)
                .build();
    }


    // 普通队列
    @Bean(ORDER_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
                .durable(ORDER_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
                .deadLetterRoutingKey(DEAD_ROUTING) // 死信队列路由关键字
                .ttl(10000) // 消息存活10s(模拟30min超时)
                .build();
    }


    // 普通交换机绑定普通队列
    @Bean
    public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,@Qualifier(ORDER_QUEUE)Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ORDER_ROUTING)
                .noargs();
    }
}
第四步:创建控制器,完成下单功能
@RestController
public class OrderController {
    //注入MQ
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/addOrder")
    public String addOrder(){

        //生成订单号
        String orderNumber = "2030061812251234";
        //在service层完成订单逻辑

        //将订单号发送到订单mq,30分钟过期进入死信队列,死信队列消费查询订单支付状态,做对应处理
        rabbitTemplate.convertAndSend("order_exchange","order_routing",orderNumber);

        return "下单成功! 您的订单号为 :"+orderNumber;
    }
}
第五步:创建消费者,监听消息
@Component
public class ListenerOrder {
    //监听订单过期队列
    @RabbitListener(queues = "order_expire_queue")
    public void orderListener(String orderId){
        System.out.println("orderId = " + orderId);
        //根据订单id查询订单状态是否支付

        /**
         * 监听死信队列的类,回去30min超时订单号,根据订单号查询订单的支付状态
         * 支付:走下一步流程
         * 未支付:关闭订单,库存回滚
         */
    }
}
手动签收模式的结果
  • 在手动签收模式的时候,当我们启动项目,访问订单功能时,立刻生成了一个队列消息
    在这里插入图片描述
  • 然后因为是手动签收模式,所以在消息的存活时间过去了之后,成为了死信消息,所以被消息被拒收了,但是还存在队列中。
    在这里插入图片描述
自动签收模式的结果
spring:
  rabbitmq:
    host: 192.168.70.130
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    #开动自动签收
    listener:
      simple:
        acknowledge-mode: none   # 默认的
# ????
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

  • 在自动签收模式的时候,当我们启动项目,访问订单功能时,立刻生成了一个队列消息
    在这里插入图片描述
  • 因为是自动签收的,所以消息过了存活时间之后就没了(自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除)
    在这里插入图片描述

RabbitMQ一、RabbitMQ的介绍与安装(docker)

RabbitMQ二、RabbitMQ的六种模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/671856.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

十_信号3-可重入函数

如上图所示链表&#xff0c;在插入节点的时候捕获到了信号&#xff0c;并且该信号的自定义处理方式中也调用了插入节点的函数。 在main函数中&#xff0c;使用insert向链表中插入一个节点node1&#xff0c;在执行insert的时&#xff0c;刚让头节点指向node1以后(如上图序号1)&…

④单细胞学习-cellchat细胞间通讯

目录 1&#xff0c;原理基础 流程 受体配体概念 方法比较 计算原理 2&#xff0c;数据 3&#xff0c;代码运行 1&#xff0c;原理基础 原文学习Inference and analysis of cell-cell communication using CellChat - PMC (nih.gov) GitHub - sqjin/CellChat: R toolk…

算法-找出N个数组的共同元素

一、代码与执行结果 财经新闻是大众了解金融事件的重要渠道&#xff0c;现有N位编辑&#xff0c;分别对K篇新闻进行专业的编辑与排版。需要您找出被这N位编辑共同编辑过的新闻&#xff0c;并根据这些新闻ID升序排列返回一个数组。 import random# 查找编辑共同处理的新闻id def…

测试基础09:缺陷(bug)生命周期和缺陷(bug)管理规范

课程大纲 1、缺陷&#xff08;bug&#xff09;生命周期 2、缺陷&#xff08;bug&#xff09;提交规范 2.1 宗旨 简洁、清晰、可视化&#xff0c;减少沟通成本。 2.2 bug格式和内容 ① 标题&#xff1a;一级功能-二级功能-三级功能_&#xff08;一句话描述bug&#xff1a;&…

eNsp——两台电脑通过一根网线直连通信

一、拓扑结构 二、电脑配置 ip和子网掩码&#xff0c;配置两台电脑处于同一网段 三、测试 四、应用 传文件等操作&#xff0c;可以在一台电脑上配置FTP服务器

含情脉脉的进程

冯诺依曼体系结构 一个计算机在工作的时候是怎样的呢&#xff1f; 我们所认识的计算机都是由一个个的硬件组件组成&#xff1a; 输入设备&#xff1a;键盘、鼠标、摄像头、话筒、磁盘、网卡 中央处理器&#xff08;CPU&#xff09;&#xff1a;运算器、控制器 输出设备&#x…

Java多线程(04)—— 保证线程安全的方法与线程安全的集合类

一、CAS 与原子类 1. CAS CAS&#xff08;compare and swap&#xff09;&#xff0c;是一条 cpu 指令&#xff0c;其含义为&#xff1a;CAS(M, A, B); M 表示内存&#xff0c;A 和 B 分别表示一个寄存器&#xff1b;如果 M 的值和 A 的值相同&#xff0c;则把 M 和 B 的值交…

我成功创建了一个Electron应用程序

1.创建electron项目命令&#xff1a; yarn create quick-start/electron electron-memo 2选择&#xff1a;√ Select a framework: vue √ Add TypeScript? ... No √ Add Electron updater plugin? ... Yes √ Enable Electron download mirror proxy? ... Yes 3.命令&a…

渲染100为什么是高性价比网渲平台?渲染100邀请码1a12

市面上主流的网渲平台有很多&#xff0c;如渲染100、瑞云、炫云、渲云等&#xff0c;这些平台各有特色和优势&#xff0c;也都声称自己性价比高&#xff0c;以渲染100为例&#xff0c;我们来介绍下它的优势有哪些。 1、渲染100对新用户很友好&#xff0c;注册填邀请码1a12有3…

09.责任链模式

09. 责任链模式 什么是责任链设计模式&#xff1f; 责任链设计模式&#xff08;Chain of Responsibility Pattern&#xff09;是一种行为设计模式&#xff0c;它允许将请求沿着处理者对象组成的链进行传递&#xff0c;直到有一个处理者对象能够处理该请求为止。这种模式的目的…

go语言linux安装

下载&#xff1a;https://go.dev/dl/ 命令行使用 wget https://dl.google.com/go/go1.19.3.linux-amd64.tar.gz解压下载的压缩包&#xff0c;linux建议放在/opt目录下 我放在/home/ihan/go_sdk下 sudo tar -C /home/ihan/go_sdk -xzf go1.19.3.linux-amd64.tar.gz 这里的参数…

STM32作业实现(一)串口通信

目录 STM32作业设计 STM32作业实现(一)串口通信 STM32作业实现(二)串口控制led STM32作业实现(三)串口控制有源蜂鸣器 STM32作业实现(四)光敏传感器 STM32作业实现(五)温湿度传感器dht11 STM32作业实现(六)闪存保存数据 STM32作业实现(七)OLED显示数据 STM32作业实现(八)触摸按…

谷歌发布文生视频模型——Veo,可生成超过一分钟高质量1080p视频

前期我们介绍过OpenAI的文生视频大模型-Sora 模型&#xff0c;其模型一经发布&#xff0c;便得到了大家疯狂的追捧。而Google最近也发布了自己的文生视频大模型Veo&#xff0c;势必要与OpenAI进行一个正面交锋。 Veo 是Google迄今为止最强大的视频生成模型。它可以生成超过一分…

学习小心意——python创建类与对象

在python中&#xff0c;类表示具有相同属性和方法的对象的集合&#xff0c;一般而言都是先定义类再创建类的实例&#xff0c;然后再通过类的实例去访问类的属性和方法 定义类 类中可以定义为数据成员和成员函数。数据成员用于描述对象特征&#xff08;相当于看人的面貌&#…

针对大模型的上下文注入攻击

大型语言模型&#xff08;LLMs&#xff09;的开发和部署取得了显著进展。例如ChatGPT和Llama-2这样的LLMs&#xff0c;利用庞大的数据集和Transformer架构&#xff0c;能够产生连贯性、上下文准确性甚至具有创造性的文本。LLMs最初和本质上是为静态场景设计的&#xff0c;即输入…

【文档智能】符合人类阅读顺序的文档模型-LayoutReader原理及权重开源

引言 阅读顺序检测旨在捕获人类读者能够自然理解的单词序列。现有的OCR引擎通常按照从上到下、从左到右的方式排列识别到的文本行&#xff0c;但这并不适用于某些文档类型&#xff0c;如多栏模板、表格等。LayoutReader模型使用seq2seq模型捕获文本和布局信息&#xff0c;用于…

libcef.dll丢失的解决方法-多种libcef.dll亲测有效解决方法分享

libcef.dll是Chromium Embedded Framework (CEF)的核心动态链接库&#xff0c;它为开发者提供了一个将Chromium浏览器嵌入到本地桌面应用程序中的解决方案。这个库使得开发者能够利用Chromium的强大功能&#xff0c;如HTML5、CSS3、JavaScript等&#xff0c;来创建跨平台的应用…

Llama(一):Mac M1芯片运行Llama3

目录 安装Ollama for Mac 下载Llama 3模型 运行Llama3 试用Llama3 在命令行中使用Llama3 背景 本地环境&#xff1a;Mac M1,16GB内存 安装Ollama for Mac 官方地址 https://ollama.com/download/Ollama-darwin.zip 链接: 百度网盘 提取码: 8wqx 下载Llama 3模型 oll…

jmeter性能优化之tomcat配置与基础调优

一、 修改tomcat初始和最大堆内存 进入到/usr/local/tomcat7-8083/bin目录下&#xff0c;编辑catalina.sh文件&#xff0c;&#xff0c;默认堆内存是600m&#xff0c;初始堆内存和最大堆内存保持一致&#xff0c; 可以更改到本机内存的70%&#xff0c;对于Linux系统&#xff0…

《平渊》· 柒 —— 大道至简?真传一句话,假传万卷书!

《平渊》 柒 "真传一句话, 假传万卷书" 对于 "大道至简"&#xff0c;不少专家可能会说出一大堆乱七八糟的名词, 比如这样&#xff1a; 所谓 "大道" 即支撑天地运转的 "系统自动力"&#xff0c;更具体地来说&#xff0c;即是天地人以…