RabbitMQ中死信队列和延迟队列

目录

  • 一、死信队列
    • 1.过期时间代码实现
    • 2.长度限制代码实现
    • 3.测试消息拒收
    • 4.死信队列小结
  • 二、延迟队列
    • 1.代码实现
      • 1.1 生产者
      • 1.2 生产者

一、死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
什么是死信队列
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;
在这里插入图片描述
消息成为死信的三种情况:

  • 1.队列消息数量到达限制;比如队列最大只能存储10条消息,而发了11条消息,根据先进先出,最先发的消息会进入死信队列。
  • 2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  • 3.原队列存在消息过期设置,消息到达超时时间未被消费;

死信的处理方式
死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,
① 丢弃,如果不是很重要,可以选择丢弃
② 记录死信入库,然后做后续的业务分析或处理
③ 通过死信队列,由负责监听死信的应用程序进行处理
综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理
队列绑定死信交换机:
给队列设置参数:

x-dead-letter-exchange 和 x-dead-letter-routing-key

在这里插入图片描述

1.过期时间代码实现

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {
    //死信交换机
    public static final String EXCHANGE_NAME_DLX = "exchange_dlx6";
    //死信队列
    public static final String QUEUE_NAME_DLX = "queue_dlx6";
    //交换机
    public static final String EXCHANGE_NAME = "test_exchange_dlx6";
    //队列
    public static final String QUEUE_NAME = "test_queue_dlx6";

    // 1 交换机
    @Bean("test_exchange_dlx")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    // 1 死信交换机
    @Bean("exchange_dlx")
    public Exchange dlxExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX).durable(true).build();
    }
    //2.Queue 队列
    @Bean("test_queue_dlx")
    public Queue bootQueue(){
        Map<String, Object> args = new HashMap<>();
        // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX);
        // x-dead-letter-routing-key 这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", "dlx.#");
        //设置ttl
        args.put("x-message-ttl",10000);
        //最大长度为10
        args.put("x-max-length",10);
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
    }
    //2.死信 队列
    @Bean("queue_dlx")
    public Queue dlxQueue(){
        return QueueBuilder.durable(QUEUE_NAME_DLX).build();
    }


    //3. 死信队列和死信交互机绑定关系 Binding
    /*
        1. 知道哪个队列
        2. 知道哪个交换机
        3. routing key
        noargs():表示不指定参数
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("queue_dlx") Queue queue,
                                     @Qualifier("exchange_dlx") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
    }

    //3. 队列和交互机绑定关系 Binding
    @Bean
    public Binding bindQueueExchange1(@Qualifier("test_queue_dlx") Queue queue,
                                     @Qualifier("test_exchange_dlx") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();
    }
    
}
    /**
     * 发送测试死信消息:
     *  1. 过期时间
     *  2. 长度限制
     *  3. 消息拒收
     */
    @Test
    public void testDlx(){
        //1. 测试过期时间,死信消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"test.dlx.hello","我是一条消息,我会死吗?");
    }

test开头是正常队列,不是test开头的队列是死信
超过了10秒,没有被消费就进入死信队列
在这里插入图片描述

2.长度限制代码实现

    /**
     * 发送测试死信消息:
     *  1. 过期时间
     *  2. 长度限制
     *  3. 消息拒收
     */
    @Test
    public void testDlx(){
        for (int i = 0; i < 10; i++) {
            //1. 测试过期时间,死信消息
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"test.dlx.hello","我是一条消息,我会死吗?"+i);
        }
    }

test开头是正常队列,不是test开头的队列是死信
发队列最大只能存储10条消息,而发了11条消息,根据先进先出,最先发的消息会进入死信队列。
在这里插入图片描述
隔10s,没有被消费,会进入死信队列
在这里插入图片描述

3.测试消息拒收

在消费者端进行消息拒收
yml

spring:
  rabbitmq:
    host: 192.168.121.140
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        #表示手动确认
        acknowledge-mode: manual

监听
拒绝签收,不重回队列 requeue=false

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

@Component
public class DlxListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            int i = 3/0;//出现错误
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

修改生产者测试代码

    /**
     * 发送测试死信消息:
     *  1. 过期时间
     *  2. 长度限制
     *  3. 消息拒收
     */
    @Test
    public void testDlx(){
       rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"test.dlx.hello","我是一条消息,我会死吗?拒绝");

    }

进入了死信队列
在这里插入图片描述

4.死信队列小结

1.死信交换机和死信队列和普通的没有区别
2.当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
3.消息成为死信的三种情况:

  • 队列消息长度(数量)到达限制;
  • 消费者拒接消费消息,并且不重回队列;
  • 原队列存在消息过期设置,消息到达超时时间未被消费;

二、延迟队列

延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
场景:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行取消处理。这时就可以使用延时队列将订单信息发送到延时队列。
需求:

  • 1.下单后,30分钟未支付,取消订单,回滚库存。
  • 2.新用户注册成功30分钟后,发送短信问候。

实现方式:
在这里插入图片描述
很可惜,在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
在这里插入图片描述

1.代码实现

其实和死信队列差不多,加一个ttl时间就可以了

1.1 生产者

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {
    //死信交换机
    public static final String EXCHANGE_NAME_DLX = "exchange_dlx";
    //死信队列
    public static final String QUEUE_NAME_DLX = "order_que_dlx";
    //交换机
    public static final String EXCHANGE_NAME = "test_exchange_dlx";
    //队列
    public static final String QUEUE_NAME = "order_queue";

    // 1 交换机
    @Bean("test_exchange_dlx")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    // 1 死信交换机
    @Bean("exchange_dlx")
    public Exchange dlxExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX).durable(true).build();
    }
    //2.Queue 队列
    @Bean("test_queue_dlx")
    public Queue bootQueue(){
        Map<String, Object> args = new HashMap<>();
        // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX);
        // x-dead-letter-routing-key 这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", "dlx.order.#");
        //设置ttl
        args.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
    }
    //2.死信 队列
    @Bean("queue_dlx")
    public Queue dlxQueue(){
        return QueueBuilder.durable(QUEUE_NAME_DLX).build();
    }


    //3. 死信队列和死信交互机绑定关系 Binding
    /*
        1. 知道哪个队列
        2. 知道哪个交换机
        3. routing key
        noargs():表示不指定参数
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("queue_dlx") Queue queue,
                                     @Qualifier("exchange_dlx") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();
    }

    //3. 队列和交互机绑定关系 Binding
    @Bean
    public Binding bindQueueExchange1(@Qualifier("test_queue_dlx") Queue queue,
                                     @Qualifier("test_exchange_dlx") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("test.order.#").noargs();
    }

}
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Test
    public  void testDelay() throws InterruptedException {
        //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "test.order.msg","订单信息:id=1,time=2022年03月30日11:41:47");

        //2.打印倒计时10秒
        for (int i = 10; i > 0 ; i--) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }
    }
}

运行程序创建订单延时队列
在这里插入图片描述

1.2 生产者

OrderListener

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

@Component
public class OrderListener implements ChannelAwareMessageListener {


    @RabbitListener(queues = "order_que_dlx")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存....");
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/4631.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Makefile第三课:C语言的编译

目录C语言的编译前言1.Fundamental Compiling2.Comiling C2.1 Preprocessing2.2 Generating Assembly Language2.3 Source File to Object File2.4 Single Source to Executable2.5 Multiple Sources to Executable3.Creating a Static Library4.Creating a Shared Library5.总…

贯穿设计模式第二话--开闭职责原则

&#x1f973;&#x1f973;&#x1f973; 茫茫人海千千万万&#xff0c;感谢这一刻你看到了我的文章&#xff0c;感谢观赏&#xff0c;大家好呀&#xff0c;我是最爱吃鱼罐头&#xff0c;大家可以叫鱼罐头呦~&#x1f973;&#x1f973;&#x1f973; 从今天开始&#xff0c;将…

java基础学习——字符流

1.为什么会出现字符流&#xff1a; 由于字节流 操作中文不是特别的方便&#xff0c;所以java就提供字符流 字符流字节流编码表 用字节流复制文本文件时&#xff0c;文本文件也会有中文&#xff0c;但是没有问题&#xff0c;原因是最终底层操作会自动进行字节拼接成中文&#xf…

摄影入门 | 相机的基本原理

一、获取图像——小孔成像实验 小孔成像实验中&#xff0c;点燃蜡烛&#xff0c;会在小孔另一面的白纸上看到一个倒立的烛焰。 此现象可以用来解释物理学原理&#xff1a;光在同种均匀介质中&#xff0c;在不受引力作用干扰的情况下沿直线传播。 这样&#xff0c;我们就用一种…

从零开始搭建游戏服务器 第一节 创建一个简单的服务器架构

目录引言技术选型正文创建基础架构IDEA创建项目添加Netty监听端口编写客户端进行测试总结引言 由于现在java web太卷了&#xff0c;所以各位同行可以考虑换一个赛道&#xff0c;做游戏还是很开心的。 本篇教程给新人用于学习游戏服务器的基本知识&#xff0c;给新人们一些学习…

伪静态技术

网址和纯静态一样&#xff0c;只是&#xff0c;其内部仍旧需要查询数据库&#xff1a;如果有一个页面&#xff0c;希望这个页面利于seo,但是有不适合使用真静态&#xff0c;比如csdn论坛帖子&#xff0c;可以考虑使用伪静态&#xff0c;即: 形式上是一个静态地址&#xff0c;比…

Chapter7.2:MATLAB在频率法中的应用及频率法稳定性分析

该系列博客主要讲述Matlab软件在自动控制方面的应用&#xff0c;如无自动控制理论基础&#xff0c;请先学习自动控制系列博文&#xff0c;该系列博客不再详细讲解自动控制理论知识。 自动控制理论基础相关链接&#xff1a;https://blog.csdn.net/qq_39032096/category_10287468…

PyTorch 深度学习实战 | DIEN 模拟兴趣演化的序列网络

01、实例&#xff1a;DIEN 模拟兴趣演化的序列网络深度兴趣演化网络(Deep Interest Evolution Network,DIEN)是阿里巴巴团队在2018年推出的另一力作,比DIN 多了一个Evolution,即演化的概念。在DIEN 模型结构上比DIN 复杂许多,但大家丝毫不用担心,我们将DIEN 拆解开来详细地说明…

Intel 处理器 macOS降级到Big Sur

1 创建可引导的 macOS 安装器 将移动硬盘作安装 Mac 操作系统的启动磁盘。 创建可引导安装器需要满足的条件 移动硬盘&#xff08;格式化为 Mac OS 扩展格式&#xff09;&#xff0c;至少有 14GB 可用空间已下载 macOS Big Sur的安装器 2 下载 macOS macOS Big Sur安装器会…

【GPT4】微软 GPT-4 测试报告(1)总体介绍

欢迎关注【youcans的AGI学习笔记】原创作品&#xff0c;火热更新中 微软 GPT-4 测试报告&#xff08;1&#xff09;总体介绍 微软 GPT-4 测试报告&#xff08;2&#xff09;多模态与跨学科能力 微软 GPT-4 测试报告&#xff08;3&#xff09;GPT4 的编程能力 【GPT4】微软 GPT-…

Thingsboard使用gateway网关

简介&#xff1a; 本次是想测试一下thingsboard网关的使用&#xff0c;实现通过网关mqttthingsboardemqx 实现间接设备创建和数据传输 前期准备&#xff1a; thingsboard平台 thingsboard网关 emqx平台 MQTTX工具 详细过程&#xff1a; 1&#xff1a;thingsboard平台搭建 …

字节、阿里等大厂年薪50w+的测试都什么水平?

各位做测试的朋友&#xff0c;但凡经历过几次面试&#xff0c;那么你一定曾被问到过以下问题&#xff1a; 1、在Linux环境下&#xff0c;怎么执行web自动化测试&#xff1f; 2、Shell如何&#xff0c;Docker熟悉吗&#xff1f; 3、全链路的压测实操过吗&#xff0c;如何推进与开…

FasterNet实战:使用FasterNet实现图像分类任务(一)

文章目录摘要安装包安装timm安装 grad-cam数据增强Cutout和MixupEMA项目结构计算mean和std生成数据集摘要 论文翻译&#xff1a;https://wanghao.blog.csdn.net/article/details/129485972?spm1001.2014.3001.5502 官方源码&#xff1a; https://github.com/JierunChen/Faste…

VR实景导航,解决最后几十米的导航问题

你是否跟朋友有过这样的经历&#xff1a;“哎&#xff0c;你说的那个餐厅在哪呀&#xff1f;”&#xff0c;“这家商场好复杂&#xff0c;怎么转啊”&#xff0c;“你在医院哪一层&#xff1f;我怎么找不到你呀&#xff01;”等等。在建筑内部&#xff0c;我们的地图导航并不是…

在U盘上运行的 Windows

版本&#xff1a;5.6.1平台&#xff1a;Win x64语言&#xff1a;简体中文,繁体中文,英文更新时间&#xff1a;2023-03-04 下载地址&#xff1a;https://dl.luobotou.org/wtga5610.zip 解压后运行WTGA文件夹中的wintogo.exe启动程序。从5.5版本开始不再支持32位系统、Win7系统…

HTTP协议加强

HTTP协议加强 Date: January 19, 2023 Sum: HTTP请求、响应、请求方法、响应状态代码 HTTP协议简介 什么是通信 通信&#xff0c;就是信息的传递和交换。 通信三要素&#xff1a;通信的主体、通信的内容、通信的方式 现实生活中的通信 案例&#xff1a;张三要把自己考上传…

美颜SDK技术原理、技术应用、代码分析

随着社交媒体的普及&#xff0c;人们对于自己的外貌越来越重视。为了满足用户对于美颜需求&#xff0c;各大科技公司纷纷推出了美颜SDK技术&#xff0c;使得用户可以在拍照和视频中实现美颜效果。本文将对美颜SDK技术进行详细分析。 一、美颜SDK技术的原理 美颜SDK技术是一种基…

精简指令集结构(Reduced Instruction Set Computer,RISC)

ARM内核采用精简指令集结构&#xff08;Reduced Instruction Set Computer&#xff0c;RISC&#xff09;体系结构。RISC技术产生于20世纪70年代&#xff0c;其设计目标是创建一种能以每个时钟周期执行一条指令的速度很快的计算机。RISC的设计重点在于降低由硬件执行的指令复杂度…

Kubeadm生成的k8s证书内容说明以及延长证书过期时间

Kubeadm生成的k8s证书内容说明Kubeadm生成的k8s证书内容说明&#xff1a;证书分组Kubernetes 集群根证书由此根证书签发的证书有:kube-apiserver 代理根证书(客户端证书)etcd 集群根证书etcd server 持有的服务端证书peer 集群中节点互相通信使用的客户端证书pod 中定义 Livene…

函数的定义与使用及七段数码管绘制

函数的定义 函数是一段代码的表示 函数是一段具有特定功能的、可重用的语句组 函数是一种功能的抽象&#xff0c;一般函数表达特定功能 两个作用&#xff1a;降低编程难度 和 代码复用 求一个阶乘 fact就是 函数名 n就是参数 return就是输出部分即返回值 而函数的调用就是…