RabbitMQ(二)

二、高级特性、应用问题以及集群搭建

高级特性

1.消息的可靠性投递

在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
rabbitMQ整个消息投递的路径为:
producer -> rabbitMQ broker -> exchange -> queue -> consumer

  • confirm确认模式
    confirm确认模式是再producer传递给exchange过程中控制消息的模式,当消息成功的从producer传递到了exchange,那么则会返回一个 confirmCallBack() 回调函数
  • return 退回模式
    return退回模式是指消息从exchange传递给queue过程中消息传递失败,则会返回一个returnCallBack() 回调函数

1.1 confirm确认模式的代码编写:

因为确认模式是producer到exchange,所以代码和配置修改应该写在生产者的模块中。
一步:开启确认模式

新版本的rabbitmq弃用了publish-confirms:true,可以改用
publisher-confirm-type: correlated实现同样的效果

spring:
  rabbitmq:
    password: heima
    username: heima
    port: 5673
    virtual-host: itcast
    host: 1.12.244.105
    #开启确认模式
    publisher-confirm-type: correlated

二步:编写confirmCallBack()函数
回调函数confirm()的返回值在发送消息成功时ack为true,但是我遇到一个问题,就是消息发送成功了,在队列中也能看到,但是返回值ack为false,

clean channel shutdown;

这是因为convertAncSend()方法结束后rabbitMQ的资源也就关闭了,所以就算成功了,回调函数返回值也是false;所以我们在后面强制睡眠200ms,让资源晚点关闭,这样的话得到的ack就是true了

package com.rabbitmq.springboot_mqproducer;

import com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
class SpringbootMqProducerApplicationTests {

    @Resource
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() throws InterruptedException {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 相关的配置信息
             * @param b 消息是否发送成功
             * @param s 消息发送失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("confirm方法被执行了");
                System.out.println(b);
                if(b){
                    System.out.println("消息从producer -> exchange成功");
                    System.out.println("失败原因:" + s);
                }else{
                    System.out.println("消息从producer -> exchange失败");
                    System.out.println("失败原因:" + s);
                }
            }
        });
        rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,"test.hello","测试springboot整合交换机");
        Thread.sleep(200);
    }
}

结果:
在这里插入图片描述

1.2 return回退模式的代码编写

一步:开启回退模式

spring:
  rabbitmq:
    password: heima
    username: heima
    port: 5673
    virtual-host: itcast
    host: 1.12.244.105
    #开启确认模式
    publisher-confirm-type: correlated
    #开启回退模式
    publisher-returns: true

二步:编写returnCallBack()函数
三步:设置exchange处理消息的模式
setMandatory为true,如果消息没有到队列queue,则返回消息给发送方
setMandatory为false,如果消息没有到队列queue,则丢弃消息(默认)

package com.rabbitmq.springboot_mqproducer;

import com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.propertyeditors.StringArrayPropertyEditor;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
class SpringbootMqProducerApplicationTests {

    @Resource
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() throws InterruptedException {
        //编写confirm回调函数
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 相关的配置信息
             * @param b 消息是否发送成功
             * @param s 消息发送失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("confirm方法被执行了");
                System.out.println(b);
                if(b){
                    System.out.println("消息从producer -> exchange成功");
                    System.out.println("失败原因:" + s);
                }else{
                    //消息发送失败,需要做一些处理
                    System.out.println("消息从producer -> exchange失败");
                    System.out.println("失败原因:" + s);
                }
            }
        });
        //编写return回调函数
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                System.out.println("return回退模式回调函数执行了");
                System.out.println("消息:"+returnedMessage.getMessage());
                System.out.println("exchange:"+returnedMessage.getExchange());
                System.out.println("replyCode:"+returnedMessage.getReplyCode());
                System.out.println("replyText:"+returnedMessage.getReplyText());
                System.out.println("routingKey:"+returnedMessage.getRoutingKey());

            }
        });
        //设置回退模式中,exchange处理消息的方式
        /*
        当将mandatory设置为false(默认值),如果RabbitMQ无法将消息路由,消息将会被静默丢弃,生产者不会收到通知。
        当设置mandatory为true时,意味着消息被视为"mandatory",如果在发布消息时RabbitMQ无法将消息路由到任何队列(例如由于没有匹配的队列与指定的路由键),则代理将通过调用ReturnListener回调的returnedMessage方法将消息返回给生产者(发布者)。生产者可以根据需要适当地处理这个返回的消息,例如记录日志或执行某些恢复操作。
         */
        rabbitTemplate.setMandatory(true);
        //TODO 这里把routingKey写错,是为了让交换机找不到queue,从而触发returnCallBack()函数
        rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,"testtttt.hello","测试springboot整合交换机");
        Thread.sleep(200);
    }

}

消息的可靠投递小结:

  • 设置配置publisher-confirm-type: correlated开启确认模式
  • 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true, 则发送成功,如果为false,则发送失败,需要处理。
  • 设置ConnectionFactory的publisher-returns="true"开肩退回模式。
  • 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
  • 在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。
    使用channel下列方法,完成事务控制:
    txSelect(),用于将当前channel设置成transaction模式
    txCommit(),用于提交事务
    txRollback(),用于回滚事务

2.Consumer Ack

ack指Acknowledge,确认。表示消费端收到消息后的确认方式。
有三种确认方式:

  • 自动确认:acknowledge=“none”
  • 手动确认:acknowledge=“manual”
  • 根据异常情况确认:acknowledge=“auto”(这种方式很麻烦,不做讲解)

其中自动确认是指,当消息一旦被Consumer接收到, 则自动确认收到,并将相应message从RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck() 手动签收,如果出现异常,则调用channel.basicNack() 方法,让其自动重新发送消息。

代码编写:
发送消息的生产者端代码不用变,只需要能够发送消息就行
消费者端:
一步:编写yml配置文件

spring:
  rabbitmq:
    username: heima
    password: heima
    virtual-host: itcast
    host: 1.12.244.105
    port: 5673
    #设置消息为手动签收
    listener:
      simple:
        acknowledge-mode: manual #消费者端确认模式:none自动确认 manual手动确认 auto通过抛出异常的类型,来做响应的处理
        concurrency: 1 #当前监听的数量
        max-concurrency: 5 #最大监听数量
        retry:
          enabled: true #是否支持重试
          max-attempts: 4 #最大重试次数,默认为3

二步:编写消费者代码
消费者端创建一个listener并实现ChannelAwareMessageListener接口(其实也可以不实现该接口,只要 @RabbitListener 标记的方法,或者 @RabbitListener 标记的类+ @RabbitHandler 标记的方法的参数列表有[com.rabbitmq.client.Channel]和[org.springframework.amqp.core.Message]两个参数,都可以)

package com.rabbit.springboot_mqconsumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

/**
 * @author Watching
 * * @date 2023/7/19
 * * Describe:
 */
@Component
public class RabbitMQListener implements ChannelAwareMessageListener {
//    @RabbitListener(queues = {"boot_topic_queue"})//填写队列名称,可以以字符串数组的方式监听多个队列
//    public void listener(Message message){
//        System.out.println("message:"+message);
//    }

    /**
     * 使用ChannelAwareMessageListener监听器接口中的onMessage()方法来充当消费者,如果上面注释的方法与当前方法同时存在,一条消息只会被消费一次。不会被两个方法都消费
     *
     * @param message
     * @param channel
     * @throws Exception Consumer ACK机制:
     *                   1.设置手动签收。acknowledge= "manual”
     *                   2.让监听器类实现ChannelAwareMessageListener接口
     *                   3.如果消息成功处理,则调用channel的basicAck()签收
     *                   4.如果消息处理失败,则调用channel的basicNack( )拒绝签收,broker重新发送给consumer
     */
    @RabbitListener(queues = "boot_topic_queue" )
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try{
            //1.接收消息
            System.out.println("message:" + message);
            System.out.println("channel:" + channel);
            //2.处理业务逻辑
            System.out.println("模拟处理业务逻辑......");
            //3.手动签收
        /*
        void basicAck(long deliveryTag, boolean multiple) throws IOException;
        deliveryTag:
            当消费者接收到一条消息后,RabbitMQ 会为该消息分配一个唯一的 DeliveryTag。这个 DeliveryTag 是一个64位的长整型数值,并且只在该 Channel 内唯一,即相同 Channel 下的 DeliveryTag 不会重复。
        multiple:
            当 multiple 设置为 false 时,表示只确认当前指定的 deliveryTag 对应的一条消息。也就是说,只确认指定的单个消息已经成功被处理或处理失败。
            当 multiple 设置为 true 时,表示确认当前指定的 deliveryTag 及其之前所有未确认的消息(在同一个 Channel 下)。也就是说,会一次性确认多条消息的处理状态,将 deliveryTag 小于或等于指定 deliveryTag 的所有消息都确认处理了。
            这种批量确认的机制有助于提高消息的处理效率,特别是当消费者处理多条消息时,可以通过一次性确认多条消息的方式来减少网络开销和消费者端的负担。
            在使用 channel.basicAck(deliveryTag, multiple) 和 channel.basicNack(deliveryTag, multiple, requeue) 方法时,可以根据实际场景来选择是单条确认还是批量确认,以满足不同的业务需求。
         */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
            System.out.println("完成手动签收");
        }catch(Exception e){
            //4.出现异常,拒绝签收
            /*
            deliveryTag:一个唯一标识消息的64位长整型数值,用于确认消息的消费状态。
            multiple:一个布尔类型的参数,用于决定是否批量处理多条消息。若设置为 true,则会否定当前指定 deliveryTag 及其之前的所有未确认消息;若设置为 false,则只否定当前指定 deliveryTag 对应的一条消息。
            requeue:一个布尔类型的参数,表示是否将消息重新放回队列。若设置为 true,则消息会被重新入队列,RabbitMQ 会再次将它发送给消费者;若设置为 false,则消息会被直接丢弃,不会重新放回队列。
             */
            System.out.println("代码逻辑出现异常,拒收");
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
        }
    }
}

只需要两步,就可以实现Consumer ack,下面我们来测试一下:
首先是正常运行的代码的结果:(业务逻辑代码无异常)
在这里插入图片描述
生产者端是用的前面测试boot整合的代码
在这里插入图片描述
然后我们来测试业务逻辑代码出错的情况,我们在业务逻辑代码处添加一个除数不能为0的异常
在这里插入图片描述
再次运行代码,一直在重试,一直再报错
在这里插入图片描述

消息的可靠性总结

1.持久化:

  • exchange要持久化
  • queue要持久化
  • message要持久化

2.生产方确认Confirm(在后续文章中会讲解如何在回调函数中进行具体的处理
3.消费方确认Ack
4. Broker高可用(集群搭建

3.消费端限流

在A系统中,每秒最多只能处理1000条请求,如果在一秒钟只能瞬间有5000条请求打入A系统,那么A系统就会崩溃,所以我们在A系统中加入一个MQ中间件,让5000个请求先发送到MQ,然后A系统再分批次的从MQ中拉取1000条请求,这样A系统就避免了崩溃的情况。
这也是我们常说的MQ的削峰功能
在这里插入图片描述
设置MQ消费限流很简单,只需要设置两个属性:

  • 确认模式设置为手动确认(在上面的Ack我们已经讲过)
  • 设置prefetch属性,prefetch = n,n就是每次从MQ中获取消息的数量
    在这里插入图片描述
    其余的消费端代码和生产者端代码不用修改。
    当设置了消费端限流后,如果从MQ中取出1条消息,消费者端没有进行确认,那么消费者端将不会再从MQ中取消息,直到消息被确认。

4.TTL

TTL全称Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列Queue设置过期时间。
举一个例子:
生活中我们在购买商品的时候会下订单,系统会提示我们要在30分钟之内付款,否则订单将会被取消。
在这里插入图片描述

Ⅰ、先在控制台模拟上面的情况

①创建一个交换机
在这里插入图片描述
②创建一个队列
在这里插入图片描述
③进入交换机exchange_ttl和队列queue_ttl进行绑定
在这里插入图片描述
④消息的发布
在这里插入图片描述
⑤在消息队列中查看
将鼠标放上ttl,就可以看到设置的时间,等时间一过,这条消息就会被自动清除。
在这里插入图片描述

Ⅱ、代码实现队列过期,和消息过期

①创建交换机,队列,以及绑定关系

package com.rabbitmq.springboot_mqproducer.rabbitMQconfig;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Watching
 * * @date 2023/7/18
 * * Describe:
 */
@Configuration
public class MQConfig {

    public static final String QUEUE_TTL_NAME = "queue_ttl";
    public static final String EXCHANGE_TTL_NAME = "exchange_ttl";



    /*
创建队列,测试ttl特性
 */
    @Bean("test_queue_ttl")
    public Queue ttlQueue() {
        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl",10000);//消息过期的时间
        arguments.put("x-expires",100000);//队列过期的时间
        //设置队列的ttl时间
        return QueueBuilder.durable(QUEUE_TTL_NAME).withArguments(arguments).build();//参数的属性可以在控制台上查看
    }

    /*
创建一个交换机测试队列ttl特性
 */
    @Bean("test_exchange_ttl")
    public Exchange ttlExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_TTL_NAME).durable(true).build();
    }

    /*
    绑定ttl交换机和队列
     */
    @Bean
    public Binding ttlBinding(@Qualifier("test_exchange_ttl") Exchange exchange, @Qualifier("test_queue_ttl") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
    }
}

在创建队列时,我们指定了x-message-ttl,使队列中的所有消息都是一个固定的时间过期
我们还可以在发送消息时,指定每条消息的过期时间。
只需要在发送方法convertAndSend()方法中添加一个消息后处理参数即可


    /*
    MessagePostProcessor 是 Spring AMQP 中的一个接口,用于对消息进行后处理。
    通过实现该接口,你可以在发送消息之前对消息进行一些自定义处理,例如添加自定义的消息头、修改消息内容等。
     */
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            //1.设置消息属性
            message.getMessageProperties().setExpiration("5000");//5000ms过期
            //2.返回该消息
            return message;
        }
    };
    @Test
    void testSend() throws InterruptedException {

        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_TTL_NAME, "ttl.hello", "测试ttl"+i,messagePostProcessor);
        }
        Thread.sleep(200);
    }

小细节:
①当队列设置了x-expires和x-messgae-ttl,消息过期时间以短的为准
②当队列设置了x-messgae-ttl,且发送消息时通过消息后处理也设置了过期时间,那么消息过期时间也以短的为准。
③当十条消息中只有一条消息设置了过期时间,这条消息过期后,只有处于队列顶端,即即将被消费时,才会对这条消息是否过期做判断。

5.死信队列

5.1 概念

死信队列,英文缩写: DLX ,Dead Letter Exchange (死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。(死信队列为什么英文翻译过来使死信交换机呢?因为交换机概念只有在RabbitMQ中才有,其它MQ中间件只有队列概念,所以习惯叫死信队列,而RabbitMQ中存在交换机概念,所以叫死信交换机。)
在这里插入图片描述
在这里我们需要理解的问题有:
①消息什么时候成为死信?

  • 队列长度达到限制,比如队列最多容纳10条消息,当第11条消息进入时,这条消息就成为了死信消息。
  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  • 原队列存在消息过期设置,消息到达超时时间却并未被消费

以上三种,满足一条即为死信消息

②队列如何绑定死信交换机?
队列设置参数:x-dead-letter-exchangex-dead-letter-routing-key
x-dead-letter-exchange:死信交换机的名称
x-dead-letter-routing-key:消息发送时指定的routingKey

在这里插入图片描述

5.2 代码实现死信队列

创建死信队列:

  • 1.声明正常的队列(test_queue_dLx)和交换机(test_exchange_dlx)
  • 2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
  • 3.正常队列绑定死信交换机,正常队列绑定死信队列不需要创建Binding Bean,只需要在正常队列创建时设置参数就可以
    – 设置两个参数:
    x-dead-letter-exchange:死信交换机名称
    x-dead-letter-routing-key:发送给死信交换机的routingkey

设置正常队列中的消息的过期时间x-message-ttl
设置正常队列的长度限制x-max-length

package com.rabbitmq.springboot_mqproducer.rabbitMQconfig;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Watching
 * * @date 2023/7/18
 * * Describe:
 */
@Configuration
public class MQConfig {

    /**
     * 测试死信队列
     */
    /*
    创建普通交换机和普通队列
     */
    @Bean("test_exchange_dlx")
    public Exchange testDlxExchange() {
        return ExchangeBuilder.topicExchange("test_exchange_dlx").durable(true).build();
    }

    @Bean("test_queue_dlx")
    public Queue testDlxQueue() {
        Map<String,Object> map = new HashMap<>();
        //x-dead-letter-exchange:死信交换机名称
        map.put("x-dead-letter-exchange","exchange_dlx");
        //x-dead-letter-routing-key:发送给死信交换机的routingkey
        map.put("x-dead-letter-routing-key","dlx.hehe");//这个routingkey只需要满足死信交换机的路由规则就可以
        //设置正常队列中的消息的过期时间ttl
        map.put("x-message-ttl",10000);
        //设置正常队列的长度限制max-length
        map.put("x-max_length",10);
        return QueueBuilder.durable("test_queue_dlx").withArguments(map).build();
    }

    @Bean
    public Binding binding1(@Qualifier("test_exchange_dlx") Exchange exchange,@Qualifier("test_queue_dlx")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();
    }

    /*
    创建死信交换机和死信队列
     */
    @Bean("exchange_dlx")
    public Exchange dlxExchange() {
        return ExchangeBuilder.topicExchange("exchange_dlx").durable(true).build();
    }

    @Bean("queue_dlx")
    public Queue dlxQueue() {
        return QueueBuilder.durable("queue_dlx").build();
    }
    @Bean
    public Binding binding2(@Qualifier("exchange_dlx") Exchange exchange,@Qualifier("queue_dlx")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
    }

    /*
    绑定普通队列和死信交换机,并不需要写一个Binding,只需要在普通队列中添加参数就行
     */
}

发送消息测试死信消息:
1.过期时间
2.长度限制
3.消息拒收

    @Test
    void testDlx() {
        //1.过期时间
//        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","测试消息超出过期时间变成死信");
        //2.超出队列消息数量限制
//        for (int i = 0; i < 20; i++) {
//            rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hello", "测试消息超出队列数量限制变成死信");
//        }
        //3.消费端拒收
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","测试消息被拒收变成死信");
    }

死信队列小结:
1.死信交换机,死信队列和普通交换机,普通队列没有区别.
2.当消息成为死信后,如果该队列绑定了死信交换机,则消息会被重新路由到死信队列中
3.消息成为死信的三种情况

  • 消息在队列中到达超时时间并未被消费
  • 消息在消费者端被拒收,且设置了不重回队列
  • 队列长度存在限制,消息数量超出了限制

6.延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
1.下单后,30分钟未支付,取消订单,回滚库存。
2.新用户注册成功7天后,发送短信问候。
实现方式:
1.定时器
2.延迟队列
订单系统将订单放入延迟队列种,30分钟后取出,去库存系统中判断订单是否已经支付,再进行后续的支付或者未支付操作
在这里插入图片描述
但是!
RabbitMQ官方没有提供延迟队列,所以我们需要使用ttl+死信队列构成延迟队列
普通队列设置为30min中过期,过期后消息路由到死信队列,库存系统从死信队列中取消息,这样就形成了一个延迟队列
在这里插入图片描述

代码实现延迟队列

1.定义正常交换机(order_exchange)和队列(order_queue),同时绑定
2.定义死信交换机(order_exchange_dlx) 和队列(order_queue_dlx),同时绑定
3.绑定正常队列和死信交换机,设置正常队列过期时间为10秒

    /**
     * 测试延迟队列
     */
    /*
    1.定义正常交换机(order_exchange)和队列(order_queue)
     */
    @Bean("orderQueue")
    public Queue orderQueue(){
        //3.正常队列绑定死信交换机
        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","order_exchange_dlx");
        map.put("x-dead-letter-routing-key","dlx.order.hehe");
        //设置正常队列的消息过期时间
        map.put("x-message-ttl",10000);
        return QueueBuilder.durable("order_queue").withArguments(map).build();
    }
    @Bean("orderExchange")
    public Exchange orderExchange(){
        return ExchangeBuilder.topicExchange("order_exchange").build();
    }
    @Bean
    public Binding orderBinding(@Qualifier("orderQueue")Queue queue,@Qualifier("orderExchange")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }

    /*
    2.定义死信交换机(order_exchange_dlx) 和队列(order_queue_dlx)
     */
    @Bean("orderQueueDlx")
    public Queue orderQueueDlx(){
        return QueueBuilder.durable("order_queue_dlx").build();
    }
    @Bean("orderExchangeDlx")
    public Exchange orderExchangeDlx(){
        return ExchangeBuilder.topicExchange("order_exchange_dlx").build();
    }
    @Bean
    public Binding orderBindingDlx(@Qualifier("orderQueueDlx")Queue queue,@Qualifier("orderExchangeDlx")Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();
    }

4.创建生产者发送消息

    /**
     * 测试延迟队列
     */
    @Test
    void testDelay() throws InterruptedException {
        rabbitTemplate.convertAndSend("order_exchange","order.test","测试延迟队列");
        for (int i = 10;i > 0;i--){
            System.out.println(i+"...");
            Thread.sleep(1000);
        }
    }

5.创建消费者

package com.rabbit.springboot_mqconsumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 * @author Watching
 * * @date 2023/8/2
 * * Describe:
 */
@Component
public class OrderListener implements ChannelAwareMessageListener {

    @RabbitListener(queues = "order_queue_dlx")//监听死信队列
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try{
            //1.接收message
            System.out.println("message:"+message);
            //2.处理业务逻辑
            System.out.println("处理业务逻辑");
            System.out.println("根据订单id在数据库中查询订单状态");
            System.out.println("判断订单是否支付成功");
            System.out.println("未支付,回滚库存,取消订单");
            //3.手动签收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        }catch (Exception e){
            //4.业务出错,拒绝签收
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);//业务出错,拒签后要将这条消息重新放回死信队列
        }
    }
}

延迟队列小结:
1.延迟队列指消息进入队列后,可以被延迟一定时间,再进行消费。
2. RabbitMQ没有提供延迟队列功能,但是可以使用: TTL + DLX来实现延迟队列效果。

应用问题

1.消息补偿

消息补偿机制

2.幂等性保障

幂等性保障

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/55856.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何在Visual Studio Code中用Mocha对TypeScript进行测试

目录 使用TypeScript编写测试用例 在Visual Studio Code中使用调试器在线调试代码 首先&#xff0c;本文不是一篇介绍有关TypeScript、JavaScript或其它编程语言数据结构和算法的文章。如果你正在准备一场面试&#xff0c;或者学习某一个课程&#xff0c;互联网上可以找到许多…

【啥都生】分类项目中的模型搭建代码解析

def build_model(cfg):if isinstance(cfg, list):modules [eval(cfg_.pop("type"))(**cfg_) for cfg_ in cfg]return Sequential(*modules)else:return eval(cfg.pop("type"))(**cfg)b站up啥都生维护的分类项目 这段代码的功能是完成模型搭建&#xff0c;…

Web后端基本设计思想

JavaWeb应用的后端一般基于MVC和三层架构思想实现。 MVC是一种设计模式&#xff0c;用于开发用户界面和交互式应用程序。M即Model&#xff0c;业务模型&#xff0c;负责处理应用程序的业务逻辑和数据&#xff1b;V即View&#xff0c;视图&#xff0c;负责给用户展示界面和数据&…

3ds Max建模教程:模拟布料拖拽撕裂和用剑撕裂两种效果

推荐&#xff1a; NSDT场景编辑器 助你快速搭建可二次开发的3D应用场景 1. 拖拽撕布 步骤 1 打开 3ds Max。 打开 3ds Max 步骤 2 在透视视口中创建平面。保持其长度 后座和宽度后座为 100。 创建平面 步骤 3 转到助手>假人并在 飞机的两侧。 助手>假人 步骤 4 选…

基础实验篇 | PX4控制器的外部通信

PX4控制器的外部通信 01 实验名称及目的 PX4控制器的外部通信&#xff1a;在进行硬件在环仿真时&#xff0c;我们常常需要向设计的Simulink控制器中发送数据&#xff08;传感器数据、故障触发、控制指令、参数调整等&#xff09;&#xff0c;同时接收一些感兴趣的数据。RflySi…

ELK 企业级日志分析系统(一)

目录 一、ELK 简介 1.1 组件说明 1.2 为什么要使用ELK 1.3 完整日志系统的基本特征 1.4 ELK工作原理 二、Elasticsearch的介绍 2.1 Elasticsearch的核心: 三、Logstash 3.1 Logstash简介 四、Kibana 五、部署ELK日志分析系统 5.1 服务器配置 5.2 ELK Elasticse…

在PHP8中检测数据类型-PHP8知识详解

在PHP 8中&#xff0c;可以使用多种方法来检测数据类型。以下是常用的四种方法&#xff1a;使用 gettype() 函数、使用 is_* 系列函数、使用 get_debug_type() 函数、使用 get_class() 函数。 一、使用 gettype() 函数 gettype() 函数返回给定变量的数据类型。例如&#xff1a…

机器学习-New Optimization

机器学习(New Optimization) 前言&#xff1a; 学习资料 videopptblog 下面的PPT里面有一些符号错误&#xff0c;但是我还是按照PPT的内容编写公式&#xff0c;自己知道符号表示什么含义就好了 New Optimization 机器学习(New Optimization)NotationOn-line VS Off-line常用优…

Html5播放器按钮在移动端变小的问题解决方法

Html5播放器按钮在移动端变小的问题解决方法 用手机浏览器打开酷播云视频&#xff0c;有时会出现播放器按钮太小的情况&#xff0c;此时只需在<head>中加入下面这段代码即可解决&#xff1a; <meta name"viewport" content"widthdevice-width, initia…

Python入门二

目录&#xff1a; python封装与property装饰器python继承与类型检查python多态与superpython 模块与包错误与异常Debug 调试与分析python类型注解python数据类dataclasspython内置装饰器python装饰器学生信息管理系统 1.python封装与property装饰器 封装的概念 封装&#x…

如何压缩高清PDF文件大小?将PDF文件压缩到最小的三个方法

PDF格式是一种非常常用的文档格式&#xff0c;但是有时候我们需要将PDF文件压缩为更小的大小以便于传输和存储。在本文中&#xff0c;我们将介绍三种PDF压缩的方法&#xff0c;包括在线PDF压缩、利用软件PDF压缩以及使用WPS缩小pdf。 首先&#xff0c;在线PDF压缩是最常用的方…

《cuda c编程权威指南》04 - 使用块和线程索引映射矩阵索引

目录 1. 解决的问题 2. 分析 3. 方法 4. 代码示例 1. 解决的问题 利用块和线程索引&#xff0c;从全局内存中访问指定的数据。 2. 分析 通常情况下&#xff0c;矩阵是用行优先的方法在全局内存中线性存储的。如下。 8列6行矩阵&#xff08;nx,ny&#xff09;&#xff08;…

JVM内存结构

JVM内存结构 5个部分 程序计数器 Java 虚拟机栈 本地方法栈 堆 方法区 JDK 1.8 同 JDK 1.7 比&#xff0c;最大的差别就是&#xff1a;元数据区取代了永久代。元空间的本质和永久代类似&#xff0c;都是对 JVM 规范中方法区的实现。不过元空间与永久代之间最大的区别在于…

获取 NGINX QUIC+HTTP/3 预览版的二进制包

原文作者&#xff1a;Robert Haynes of F5 原文链接&#xff1a;获取 NGINX QUICHTTP/3 预览版的二进制包 转载来源&#xff1a;NGINX 官方网站 我们很高兴宣布&#xff0c;NGINX QUICHTTP/3 预览版现提供以下两个发行版的预构建二进制包&#xff1a; Red Hat Enterprise Linux…

java实现5种不同的验证码图片,包括中文、算式等,并返回前端

导入以下依赖 <!--图片验证码--><dependency><groupId>com.github.whvcse</groupId><artifactId>easy-captcha</artifactId><version>1.6.2</version></dependency> 编写controller package com.anXin.user.controlle…

牛客网Verilog刷题——VL42

牛客网Verilog刷题——VL42 题目答案 题目 请设计一个可以实现任意小数分频的时钟分频器&#xff0c;比如说8.7分频的时钟信号&#xff0c;注意rst为低电平复位。提示&#xff1a;其实本质上是一个简单的数学问题&#xff0c;即如何使用最小公倍数得到时钟周期的分别频比。设小…

级联选择框

文章目录 实现级联选择框效果图实现前端工具版本添加依赖main.js导入依赖级联选择框样式 后端数据库设计 实现级联选择框 效果图 实现 前端 工具版本 node.js v16.6.0vue3 级联选择框使用 Element-Plus 实现 添加依赖 在 package.json 添加依赖&#xff0c;并 npm i 导入…

高通滤波器,低通滤波器

1.高通滤波器是根据像素与邻近像素的亮度差值来提升该像素的亮度。 import cv2 import numpy as np from scipy import ndimagekernel_3_3 np.array([[-1,-1,-1],[-1,8,-1],[-1,-1,-1]]) print(kernel_3_3) kernel_5_5 np.array([[-1,-1,-1,-1,-1],[-1,1,2,1,-1],[-1,2,4,2,-…

iOS——锁与死锁问题

iOS中的锁 什么是锁锁的分类互斥锁1. synchronized2. NSLock3. pthread 递归锁1. NSRecursiveLock2. pthread 信号量Semaphore1. dispatch_semaphore_t2. pthread 条件锁1. NSCodition2. NSCoditionLock3. POSIX Conditions 分布式锁NSDistributedLock 读写锁1. dispatch_barri…

C++设计模式之责任链设计模式

C责任链设计模式 什么是责任链设计模式 责任链设计模式是一种行为型设计模式&#xff0c;它允许多个处理请求的对象串联起来&#xff0c;形成一个处理请求的链。每个对象都有机会处理请求&#xff0c;如果该对象不能处理请求&#xff0c;则将请求传递给链中的下一个对象。 该…