【初始RabbitMQ】死信队列的实现

死信的概念

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信队列

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的来源

死信的来源主要是有以下几个部分:

  1. 消息 TTL 过期
  2. 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  3. 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

死信的实战

代码架构图

消息TTL过期

生产者代码:

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        //设置消息TTL的时间
        /**
         * 设置AMQP(高级消息队列协议)消息的属性
         * AMQP.BasicProperties: 这是AMQP协议中用于定义消息属性的类。
         * new AMQP.BasicProperties().builder(): 这里创建了一个新的AMQP.BasicProperties对象,并通过调用其builder()方法来开始构建该对象的属性。
         * .expiration("10000"): 在构建过程中,通过.expiration()方法设置了消息的expiration属性。expiration属性用于定义消息的TTL,即消息在队列中等待消费的时间。如果在这段时间内消息没有被消费,它将被自动丢弃。
         * 在这里,expiration的值被设置为"10000",这意味着消息的TTL是10,000毫秒,也就是10秒。
         * .build(): 最后,通过调用.build()方法来结束构建过程并返回完全配置的AMQP.BasicProperties对象。
         */
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 1; i < 11; i++) {
            String message = "info"+i;
            /**
             * 发送一个消息
             * 1.发送到那个交换机
             * 2.路由的KEY值是哪个 本次是队列的名称
             * 3.其他参数信息
             * 4.发送消息的消息体
             */
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,
                    message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

消费者1的代码:

public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机 类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        //声明死信队列
        String deadQueue = "dead_queue";
        /*
         *生成一个队列
         * 1.队列名称
         * 2.队列里面的信息是否持久化(磁盘)默认情况时在内存
         * 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许
         * 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除
         * 5.其他参数 延迟消息等
         */
        channel.queueDeclare(deadQueue,false,false,false,null);
        //死信队列绑定死信交换机与routingKey
        channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("C1等待接收消息......");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody(),"UTF-8");
            System.out.println("C1 接收到消息"+message);
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者成功才能更改消费的回调
         * 4.消费者取消消费回调
         */
        channel.basicConsume(normalQueue,true,deliverCallback,consumerTag->{
        });
    }
}

此时启动生产者观看RabbitMQ的变化

消费者2的代码(用于消耗死信队列中的信息): 

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接收死信队列消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收死信队列的消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

运行结果: 

队列达到了最大长度

消息生产者代码去掉 TTL 属性:

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        //该信息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }
    }
}

C1 消费者修改以下代码:

 C2 消费者代码不变(启动 C2 消费者),运行结果如下:

如果 报错请登录rabbitMQ管理平台将此队列删除:

 消息被拒

消息生产者代码同上生产者一致

C1 消费者的代码:

public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机 类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        //声明死信队列
        String deadQueue = "dead-queue";
        /*
         *生成一个队列
         * 1.队列名称
         * 2.队列里面的信息是否持久化(磁盘)默认情况时在内存
         * 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许
         * 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除
         * 5.其他参数 延迟消息等
         */
        channel.queueDeclare(deadQueue,false,false,false,null);
        //死信队列绑定死信交换机与routingKey
        channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("C1等待接收消息......");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody(), "UTF-8");
            if(message.equals("info5")){
                System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
                //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            }else {
                System.out.println("Consumer01 接收到消息"+message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }

        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者成功才能更改消费的回调
         * 4.消费者取消消费回调
         */
        channel.basicConsume(normalQueue,false,deliverCallback,consumerTag->{
        });
    }
}

C2 消费者代码不变

启动消费者 1 然后再启动消费者 2 ,运行结果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/399175.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【机器学习】数据清洗之处理重复点

&#x1f388;个人主页&#xff1a;豌豆射手^ &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;机器学习 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进…

77、Spring、Spring Boot和Spring Cloud的关系

77、Spring、Spring Boot和Spring Cloud的关系 随着 Spring、Spring Boot 和 Spring Cloud 的不断发展&#xff0c;越来越多的开发者加入 Spring 的大军中。对于初学者而言&#xff0c;可能不太了解 Spring、Spring Boot 和 Spring Cloud 这些概念以及它们之间的关系&#xff…

北京高考数学填空题真题练一练(2014-2023)

距离2024年高考还有不到四个月的时间&#xff0c;今天我们来看看北京市的高考数学题真题。最近几年&#xff0c;只有北京、天津、上海三个直辖市的高考题是自主命题&#xff0c;其他省份全部是使用教育部统一命题的试卷。而且北京、天津、上海的数学现在也不再区分文理卷了&…

kali linux出现添加源无法更新的问题:更新时显示签名无效和没有数字签名

kali linux更新源时显示签名无效和没有数字签名 一、出现显示签名无效和没有数字签名二、 解决办法三、几种开源镜像站 一、出现显示签名无效和没有数字签名 原因&#xff1a;因为没有下载签名&#xff0c;所以显示签名无效和没有数字签名 二、 解决办法 wget archive.kali.o…

国际阿里云,想要使用怎么解决支付问题

在国内我们很多时候都需要用到国际阿里云&#xff0c;在国际阿里云需要使用就需要支付&#xff0c;自己办理visa卡比较麻烦&#xff0c;那么我们可以使用虚拟卡&#xff0c;虚拟卡办理快速简单 真实测评使用Fomepay的5347支持国际阿里云的支付&#xff0c;秒下卡&#xff0c;不…

华为---RSTP(二)---RSTP基本配置示例

目录 1. 示例要求 2. 网络拓扑图 3. 配置命令 4. 测试终端连通性 5. RSTP基本配置 5.1 启用STP 5.2 修改生成树协议模式为RSTP 5.3 配置根交换机和次根交换机 5.4 设置边缘端口 6. 指定端口切换为备份端口 7. 测试验证网络 1. 示例要求 为防止网络出现环路&#xf…

Spring Cloud部署篇1——Jar包部署至CentOS云服务器

一、项目介绍 系统模块 com.mingink |--mingink-api // 接口模块 | └──mingink-api-system // 系统接口 |--mingink-common // 通用模块 | └──mingink-common-core // 系统接口 |--mingink-gateway…

jmeter 命令行启动 动态参数化

[Jmeter命令行参数] 一、在linux中&#xff0c;使用非gui的方式执行jmeter。若需更改参数&#xff0c;必须先编辑jmx文件&#xff0c;找到对应的变量进行修改&#xff0c;比较麻烦。因此&#xff0c;可以参数化一些常用的变量&#xff0c;直接在Jmeter命令行进行设置 二、参数…

使用 npm/yarn 等命令的时候会,为什么会发生 Error: certificate has expired

缘起 昨天&#xff0c;我写了一篇文章&#xff0c;介绍如何使用项目模板&#xff0c;构建一个 Electron 项目的脚手架&#xff0c;我发现我自己在本地无法运行成功&#xff0c;出现了错误。 ✖ Failed to install modules: ["electron-forge/plugin-vite^7.2.0",&qu…

[office] excel2016怎么求最大值和最小值 #职场发展#知识分享

excel2016怎么求最大值和最小值 excel求最大值最小值步骤&#xff1a; 1、鼠标左键双击计算机桌面Excel2016程序图标&#xff0c;将其打开运行。在打开的Excel2016程序窗口&#xff0c;点击“打开其他工作簿”选项&#xff0c;打开需要进行编辑的Excel工作表。如图所示; 2、在打…

HTML的特殊字符

HTML的特殊字符 有些特殊的字符在 html 文件中是不能直接表示的&#xff0c;例如: 空格&#xff0c;小于号(<)&#xff0c;大于号(>)&#xff0c;按位与(&)。 空格 示例代码&#xff1a; 运行结果&#xff1a; 由于html 标签就是用 < > 表示的&#xff0…

【RT-DETR有效改进】 多维度注意力机制 | TripletAttention三重立体特征选择模块

一、本文介绍 本文给大家带来的改进是Triplet Attention三重注意力机制。这个机制,它通过三个不同的视角来分析输入的数据,就好比三个人从不同的角度来观察同一幅画,然后共同决定哪些部分最值得注意。三重注意力机制的主要思想是在网络中引入了一种新的注意力模块,这个模块…

JAVA工程师面试专题-JVM篇

目录 一、运行时数据区 1、说一下JVM的主要组成部分及其作用? 2、说一下 JVM 运行时数据区 ? 3、说一下堆栈的区别 4、成员变量、局部变量、类变量分别存储在什么地方? 5、类常量池、运行时常量池、字符串常量池有什么区别? 6、JVM为什么使用元空间替换永久代 二、…

在前后端分离项目中如何设置统一返回格式

目录 一、步骤一 二、步骤二 在前后端分离的项目中&#xff0c;为了方便前后端交互&#xff0c;后端往往需要给前端返回固定的数据格式&#xff0c;但不同的实体类返回格式不同&#xff0c;所以在真实开发中&#xff0c;我们将所有API接口设置返回统一的格式。基本上包括的有…

EventLoop整合与TimerWheel联合调试(整合二)

目录 概要 tcp_cli.cc tcp_srv.cc server.hpp 测试结果 第二次整合 概要 本主要是将以下模块进行整合测试 Poller模块与Channel模块整合-CSDN博客 时间轮设计-CSDN博客 timerfd的认识与基本使用-CSDN博客 整合基于的理念 tcp_cli.cc #include "../source/server.h…

ThreadLocal内存泄漏?

ThreadLocal内存泄漏&#xff1f; 在分析ThreadLocal导致的内存泄露前&#xff0c;需要普及了解一下内存泄露、强引用与弱引用以及GC回收机制&#xff0c;这样才能更好的分析为什么ThreadLocal会导致内存泄露呢&#xff1f;更重要的是知道该如何避免这样情况发生&#xff0c;增…

Facebook与数字创新:引领社交媒体的数字化革命

在当今数字化时代&#xff0c;社交媒体已经成为了人们日常生活中不可或缺的一部分。而在众多社交媒体平台中&#xff0c;Facebook作为领头羊&#xff0c;一直致力于推动数字创新&#xff0c;引领着社交媒体的数字化革命。本文将探讨Facebook在数字创新方面的表现&#xff0c;以…

第100讲:MHA+Atlas实现MySQL主从复制读写分离分布式集群

文章目录 1.Atlas读写分离简介2.搭建MHA高可用MySQL主从复制集群3.部署配置Atlas读写分离中间件3.1.安装Atlas读写分离中间件3.2.配置读写分离3.3.启动Atlas读写分离 4.读写分离集群测试5.生产环境中创建一个用户通过Atlas使用6.Atlas通过管理接口实现在线管理7.Atlas自动分表 …

ETW Bypass

1.NET 程序集 像 Java 是由 JVM 托管的&#xff0c;.NET 程序集(比如C_Sharp.exe) 都是由 CLR 托管的 硬盘加载 从硬盘中读取加载到内存 通过三个接口可以启动 CLR 来对 .NET 程序集 进行硬盘加载 Program.cs&#xff1a; x64.cpp&#xff1a; Copy #include <iostream…

【已解决】PPT无法复制内容怎么办?

想要复制PPT文件里的内容&#xff0c;却发现复制不了&#xff0c;怎么办&#xff1f; 这种情况&#xff0c;一般是PPT文件被设置了以“只读方式”打开&#xff0c;“只读方式”下的PPT无法进行编辑更改&#xff0c;也无法进行复制粘贴的操作。 想要解决这个问题&#xff0c;我…