消息队列-RabbitMQ:死信队列

十五、死信队列

1、死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

2、死信的来源

  • 消息 TTL 过期:TTL 是 Time To Live 的缩写,也就是生存时间。

  • 队列达到最大长度:队列满了,无法再添加数据到 mq 中。

  • 消息被拒绝:(basic.reject 或 basic.nack) 并且 requeue=false。

3、死信实战

在这里插入图片描述

死信之 TTL

(1)消费者1

①定义普通交换机、死信交换机与正常队列、死信队列

②声明死信交换机、普通交换机与普通队列

③设置正常队列和死信交换机的联系,使用argument参数

④绑定:普通交换机和正常队列、死信交换机和死信队列

⑤正常接收消息

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

public class Consumer01 {

    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定:队列、交换机、路由键(routingKey)
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");

        //正常队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息" + message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
    }

}

(2)生产者:

①定义普通交换机的名称

②设置消息的TTL,死信消息的设置

③正常发送消息

在这里插入图片描述

public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        //设置消息的 TTL 时间 10s
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        //该信息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
            System.out.println("生产者发送消息:" + message);
        }

    }
}

(3)测试效果1:

先启动消费者1,再启动生产者

在这里插入图片描述

注意:正常交换机的Features里面多了 DLX、DLK

在这里插入图片描述

在这里插入图片描述

(4)正常队列和死信队列中消息数的变化对比

在这里插入图片描述

当正常队列的消息TTL到了之后,还没有被接受,正常队列的消息会被转发到死信交换机,再到死信队列里面,这个过程很快的

在这里插入图片描述

(5)消费者2

接收死信队列的消息即可

在这里插入图片描述

public class Consumer02 {
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接收死信消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 接收到消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

(6)测试效果2:

启动消费者2

在这里插入图片描述

在这里插入图片描述

死信之最大长度

①生产者和消费者删除掉TTL的代码

②在消费者1中设置正常队列的长度限制

在这里插入图片描述

//设置正常队列的长度限制,例如发10个,4个则为死信
params.put("x-max-length",6);

注:先把原来生成的队列删除,因为条件参数发生变化了,已不再是同一个了,否则会报错

测试效果:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

死信之消息被拒

先注释掉最大长度的代码,然后,将消息都接收完,才可以继续发消息,还有删掉之前创建的最长时间TTL和最大长度的队列,以免对测试造成影响。

在这里插入图片描述

public class Consumer01 {

    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定:队列、交换机、路由键(routingKey)
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        
        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
//        //设置正常队列的长度限制,例如发10个,4个则为死信
//        params.put("x-max-length",6);
        
        //正常队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("等待接收消息........... ");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            if (message.equals("info5")) {
                System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
                //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01 接收到消息" + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }

        };
        //开启手动应答
        channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {
        });
    }

}

测试效果:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

消息队列-RabbitMQ:死信队列 到此完结,笔者归纳、创作不易,大佬们给个3连再起飞吧

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/401287.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Chrono Engine学习总结】4-vehicle-4.3-两个vehicle碰撞测试

由于Chrono的官方教程在一些细节方面解释的并不清楚&#xff0c;自己做了一些尝试&#xff0c;做学习总结。 今天突发奇想&#xff0c;想试一下&#xff0c;是否可以实现两个vehicle的碰撞&#xff1f; 1、两辆vehicle的仿真 官方提供了demo_VEH_TwoCars这个demo&#xff0c…

【更新公告】AirtestIDE更新至1.2.17版本

1. 前言 本次更新为AirtestIDE、Airtest-Selenium库更新。 AirtestIDE更新至1.2.17版本&#xff0c;AirtestIDE内置库Airtest更新为1.3.3.1版本&#xff0c;Poco更新为1.0.94版本&#xff0c;主要支持了selenium4.0以上版本&#xff0c;ADB更换为41版本&#xff0c;Airtest新…

数据驱动决策:掌握高效数据分析的七大步骤

在这个数据驱动的时代&#xff0c;无论是企业决策还是个人发展&#xff0c;数据分析都扮演着至关重要的角色。然而&#xff0c;想要从海量数据中提炼出有价值的信息&#xff0c;并不是一件容易的事情。本文为你详细解读高效数据分析&#xff0c;让你的数据开口说话&#xff0c;…

GIS技术在灾后重建中的空间规划与决策支持

地质灾害是指全球地壳自然地质演化过程中&#xff0c;由于地球内动力、外动力或者人为地质动力作用下导致的自然地质和人类的自然灾害突发事件。由于降水、地震等自然作用下&#xff0c;地质灾害在世界范围内频繁发生。我国除滑坡灾害外&#xff0c;还包括崩塌、泥石流、地面沉…

单体微服务K8S笔记

单体微服务K8S笔记 https://blog.csdn.net/m0_48341969/article/details/126063832思路参考以上博客 //测试 https://gitee.com/yangbuyi/yi项目组织参考以上git 单体&#xff1a; 不特地介绍 微服务&#xff1a; rpc:远程过程调用 拆分&#xff0c;分别部署&#xff0…

Day04-流程控制语句_循环结构(while,do...while,关键字continue,关键字break,循环嵌套)

文章目录 Day04- 循环结构学习目标1 while循环2 do...while循环4 循环语句的区别5 关键字continue6 关键字break7 循环嵌套案例1&#xff1a;打印5行直角三角形案例2&#xff1a;break结束当层循环 Day04- 循环结构 学习目标 理解for语句的格式和执行流程 随机数公式 理解…

Linux中安装Nginx及日常配置使用

高性能的http服务器/反向代理服务器。官方测试支持5万并发&#xff0c;CPU、内存等消耗较低且运行稳定 使用场景 Http服务器。 Nginx可以单独提供Http服务&#xff0c;做为静态网页的服务器。虚拟主机。 可以在一台服务器虚拟出多个网站。反向代理与负载均衡。 Nginx做反向代理…

SQL注入之DNSLog外带注入

一、认识&#xff1a; 什么是dnslog呢&#xff1f; DNS就是域名解析服务&#xff0c;把一个域名转换成对应的IP地址&#xff0c;转换完成之后&#xff0c;DNS服务器就会有一个日志记录本次转换的时间、域名、域名对应的ip、请求方的一些信息&#xff0c;这个日志就叫DNSLog。…

基于 java springboot+layui仓库管理系统

基于 java springbootlayui仓库管理系统设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 央顺技术团队 Java毕设项目精品实战案例《1000套》 欢迎点赞 收藏 ⭐留言 文末获取源…

好书推荐| After Effects 2022案例实战全视频教程

After Effects 2022案例实战全视频教程 作者 &#xff1a;王红卫 书号&#xff1a;9787302631958 定价&#xff1a;99元 出版时间&#xff1a;2023年7月 作者介绍 王红卫 拥有多年设计师的经学经验&#xff0c;北京理工大学百事特教师&#xff0c;水木风云工作室创始人&a…

网络编程知识整理

目录 1.1 引言 1.2 分层 1.3 TCP/IP的分层 1.4 互联网的地址 1.5 域名服务 1.6 封装 1.7 分用 1.8 端口号 1.1 引言 很多不同的厂家生产各种型号的计算机&#xff0c;它们运行完全不同的操作系统&#xff0c;但 T C P / I P协议族允许它们互相进行通信。这一点很让人感…

⭐北邮复试刷题105. 从前序与中序遍历序列构造二叉树__递归分治 (力扣每日一题)

105. 从前序与中序遍历序列构造二叉树 给定两个整数数组 preorder 和 inorder &#xff0c;其中 preorder 是二叉树的先序遍历&#xff0c; inorder 是同一棵树的中序遍历&#xff0c;请构造二叉树并返回其根节点。 示例 1: 输入: preorder [3,9,20,15,7], inorder [9,3,15,…

论文精读--word2vec

word2vec从大量文本语料中以无监督方式学习语义知识&#xff0c;是用来生成词向量的工具 把文本分散嵌入到另一个离散空间&#xff0c;称作分布式表示&#xff0c;又称为词嵌入&#xff08;word embedding&#xff09;或词向量 Abstract We propose two novel model architec…

Go 中的 init 如何用?它的常见应用场景有哪些呢?

嗨&#xff0c;大家好&#xff01;我是波罗学。本文是系列文章 Go 技巧第十六篇&#xff0c;系列文章查看&#xff1a;Go 语言技巧。 Go 中有一个特别的 init() 函数&#xff0c;它主要用于包的初始化。init() 函数在包被引入后会被自动执行。如果在 main 包中&#xff0c;它也…

四、分类算法 - 随机森林

目录 1、集成学习方法 2、随机森林 3、随机森林原理 4、API 5、总结 sklearn转换器和估算器KNN算法模型选择和调优朴素贝叶斯算法决策树随机森林 1、集成学习方法 2、随机森林 3、随机森林原理 4、API 5、总结

Kubernetes 卷存储 NFS | nfs搭建配置 原理介绍 nfs作为存储卷使用

1、NFS介绍 NFS&#xff08;Network File System&#xff09;是一种分布式文件系统协议&#xff0c;允许客户端远程访问服务器上的文件&#xff0c;实现数据共享。它整合多个存储设备为统一文件系统&#xff0c;方便数据存储和管理&#xff0c;支持负载均衡和故障转移&#xf…

[设计模式Java实现附plantuml源码~行为型]协调多个对象之间的交互——中介者模式

前言&#xff1a; 为什么之前写过Golang 版的设计模式&#xff0c;还在重新写Java 版&#xff1f; 答&#xff1a;因为对于我而言&#xff0c;当然也希望对正在学习的大伙有帮助。Java作为一门纯面向对象的语言&#xff0c;更适合用于学习设计模式。 为什么类图要附上uml 因为很…

DataX - 全量数据同步工具

前言 今天是2024-2-21&#xff0c;农历正月十二&#xff0c;相信今天开始是新的阶段&#xff0c;尽管它不是新的周一、某月一日、某年第一天&#xff0c;尽管我是一个很讲究仪式感的人。新年刚过去 12 天&#xff0c;再过 3 天就开学咯&#xff0c;开学之后我的大学时光就进入了…

内网穿透——NPS突然无法连接

温馨提示 &#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f32d;&#x1f32d;&#x1f32d;&#x1f32d;&#x1f32d;&#x1f32d;&#x1f32d;❤️❤️❤️❤️❤️❤️❤️&#x1f968;&#x1f968;&#x1f9…

Go语言中的TLS加密:深入crypto/tls库的实战指南

Go语言中的TLS加密&#xff1a;深入crypto/tls库的实战指南 引言crypto/tls库的核心组件TLS配置&#xff1a;tls.Config证书加载与管理TLS握手过程及其实现 构建安全的服务端创建TLS加密的HTTP服务器配置TLS属性常见的安全设置和最佳实践 开发TLS客户端应用编写使用TLS的客户端…