解决RabbitMQ设置TTL过期后不进入死信队列

解决RabbitMQ设置TTL过期后不进入死信队列

  • 问题发现
  • 问题解决
    • 方法一:只监听死信队列,在死信队列里面处理业务逻辑
    • 方法二:改为自动确认模式

问题发现

最近再学习RabbitMQ过程中,看到关于死信队列内容:

来自队列的消息可以是 “死信”,这意味着当以下四个事件中的任何一个发生时,这些消息将被重新发布到 Exchange

  1. 使用 basic.rejectbasic.nackrequeue 参数设置为 false 的使用者否定该消息
  2. 消息由于每条消息的 TTL 而过期
  3. 队列超出了长度限制
  4. 消息返回到 quorum 队列的次数超过了 delivery-limit 的次数。

再模拟TTL过期时遇到的疑惑,特此记录下来,示例代码如下:
先设置为手动应答模式:

#手动应答
spring.rabbitmq.listener.simple.acknowledge-mode = manual

绑定队列,示例代码如下:

@Configuration
public class MQConfig {
    /**
     * 死信队列
     * @return
     */
    @Bean
    public Queue deadQueue(){
        return new Queue("dead_queue");
    }
    /**
     * 死信队列交换机
     * @return
     */
    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange("dead.exchange");
    }

    /**
     * 死信队列和死信交换机绑定
     * @return
     */
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
    }
    
    /**
     * 普通队列
     * @return
     */
    @Bean
    public Queue queue(){
//          方法一  
//        Queue normalQueue = new Queue("normal_queue");
//        normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
//        normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
//        normalQueue.addArgument("x-message-ttl", 10000); // 死信队列routingKey
//        方法二
        return QueueBuilder.durable("normal_queue")
                .deadLetterExchange("dead.exchange")
                .deadLetterRoutingKey("dead")
                .ttl(10000)
                .build();
    }
    /**
     * 普通交换机
     * @return
     */
    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange("normal.exchange");
    }

    /**
     * 普通队列和普通交换机绑定
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
    }
}

监听普通队列消费方,示例代码如下:

@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);

    @RabbitHandler
    public void receive(String msg, Message message, Channel channel) throws IOException, InterruptedException {
        log.info("收到消息:"+msg);
    }
}

监听死信队列消费方,示例代码如下:

@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);

    @RabbitHandler
    public void receive(String msg, Message message, Channel channel) throws IOException {
        log.info("死信队列收到消息:{}",msg);
        // 参数一:当前消息标签,参数二:true该条消息已经之前所有未消费设置为已消费,false只确认当前消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

发送方,向普通队列发送消息,示例代码如下:

@Component
public class MQSender {
    private static final Logger log = LoggerFactory.getLogger(MQSender.class);
    @Autowired
    private RabbitTemplate template;

    public void send() throws UnsupportedEncodingException {
        String msg = "hello world";
        log.info("发送消息:"+msg);
        template.convertAndSend("normal.exchange", "normal", msg);
    }
}

执行结果如图:

在这里插入图片描述
时间到了后,死信队列长时间未收到消息,消息一直在普通队列中,如图所示:

在这里插入图片描述

然后开始百度,网上很多都说什么配置不对啥的,还有说队列的预取值太大导致的问题(扯犊子呢),反正就是没有找到一个合理的解释。

然后吃了个饭回来,发现RabbitMQ报了一个长时间未收到消息确认的错误(大概意思就是说ACK消息确认超时时间为18000毫秒也就是30分钟),原来RabbitMQ一直在等待消息确认,所以一直被持有,当普通队列挂了(重启后),被释放,进入死信队列。

PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more

在这里插入图片描述
这下知道为什么不进入死信队列的原因了。新的问题又来了,如果我手动确认或者拒绝了,那不就达不到TTL过期的效果了吗?

问题解决

方法一:只监听死信队列,在死信队列里面处理业务逻辑

这个方法是参考众多文章比较常见的一个做法,但是个人感觉与我理解的TTL有偏差(应该是在普通队列中处理超时的一种补偿机制,如果只监听死信队列,那就完全不需要在配置时普通队列里面定义死信队列,虽然这种做法可以解决业务问题),另一方面官方也有提到:

消息可以在写入套接字之后过期,但在到达消费者之前过期。

示例代码如下:

@Configuration
public class MQConfig {
    /**
     * 死信队列
     * @return
     */
    @Bean
    public Queue deadQueue(){
        return new Queue("dead_queue");
    }
    /**
     * 死信队列交换机
     * @return
     */
    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange("dead.exchange");
    }

    /**
     * 死信队列和死信交换机绑定
     * @return
     */
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
    }
    
    /**
     * 普通队列
     * @return
     */
    @Bean
    public Queue queue(){
//          方法一  
//        Queue normalQueue = new Queue("normal_queue");
//        normalQueue.addArgument("x-dead-letter-exchange", "dead.exchange"); // 死信队列
//        normalQueue.addArgument("x-dead-letter-routing-key", "dead"); // 死信队列routingKey
//        normalQueue.addArgument("x-message-ttl", 10000); // 死信队列routingKey
//        方法二
        return QueueBuilder.durable("normal_queue")
                .deadLetterExchange("dead.exchange")
                .deadLetterRoutingKey("dead")
                .ttl(10000)
                .build();
    }
    /**
     * 普通交换机
     * @return
     */
    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange("normal.exchange");
    }

    /**
     * 普通队列和普通交换机绑定
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(normalExchange()).with("normal");
    }
    
}

消费方只监听死信队列:

@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);

    @RabbitHandler
    public void receive(String msg, Message message, Channel channel) throws IOException {
        log.info("死信队列收到消息:{}",msg);
        // 伪代码:判断订单状态,1支付成功,2支付超时
//        if(order.state == 1){
//            // 参数一:当前消息标签,参数二:true该条消息已经之前所有未消费设置为已消费,false只确认当前消息
//            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//        }else{
//            // todo 修改订单状态
//            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
//        }
    }
}

发送方代码如下:

@Component
public class MQSender {
    private static final Logger log = LoggerFactory.getLogger(MQSender.class);
    @Autowired
    private RabbitTemplate template;

    public void send() throws UnsupportedEncodingException {
        String msg = "hello world";
        log.info("发送消息:"+msg);
        template.convertAndSend("normal.exchange", "normal", msg);
    }
}

调用send()方法,执行结果如图:

在这里插入图片描述
可以看到从发送时间到进入死信队列时间正好间隔10s。

方法二:改为自动确认模式

经过思考后,既然手动确认走不通,那不如试一试自动模式,我们在普通队列里面,模拟业务出现异常情况(如果只是单纯模拟业务超时,不会进入死信队列,直接就确认消费了)。

我们先把手动确认的配置删除或者修改为自动确认,示例代码如下:

#spring.rabbitmq.listener.simple.acknowledge-mode = auto

发送方代码和配置的代码就不重复展示了(参考之前示例),消费方示例代码如下:

@Component
@RabbitListener(queues = "normal_queue")
public class MQReceiver {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver.class);
    @RabbitHandler
    public void receive(String msg) throws IOException, InterruptedException {
        log.info("收到消息:"+msg);
        throw new RuntimeException();
    }
}
@Component
@RabbitListener(queues = "dead_queue")
public class MQReceiver2 {
    private static final Logger log = LoggerFactory.getLogger(MQReceiver2.class);

    @RabbitHandler
    public void receive(String msg) throws IOException {
        log.info("死信队列收到消息:{}",msg);
        // 伪代码:判断订单状态,1支付成功,2支付超时
//        if(order.state == 1){
//            // 参数一:当前消息标签,参数二:true该条消息已经之前所有未消费设置为已消费,false只确认当前消息
//            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//        }else{
//            // todo 修改订单状态
//            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
//        }
    }
}

调用send()方法,执行结果如图:

在这里插入图片描述
我们可以看到第一次进入普通队列时间和最后一次报错进入死信队列的时间,正好间隔10s。但是这中间会重复发起N次,不合理是时长,可能会导致资源消耗过高,但这又属于另外一个问题了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/880946.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

秋招八股总结

transformer 损失函数 交叉熵的原理 公式 xi是true_label,yi是神经网络预测为正确的概率 对比学习loss 对比学习损失函数 InfoNEC Loss(bge中也用的这个) SimCSE的主要思想:同一句话通过不同的drop out作为正例&#xff0…

【计网】数据链路层:概述之位置|地位|链路|数据链路|帧

✨ Blog’s 主页: 白乐天_ξ( ✿>◡❛) 🌈 个人Motto:他强任他强,清风拂山岗! 💫 欢迎来到我的学习笔记! ① ② ③ ④ ⑤ ⑥ ⑦ ⑧ ⑨ ⑩ 1. 在OSI体系结构中的位置 1. 位置:数…

Parallels Desktop 20(Mac虚拟机) v20.0.0 for Mac 最新破解版(支持M系列)

Parallels Desktop 20 for Mac 正式发布,完全支持 macOS Sequoia 和 Windows 11 24H2,并且在企业版中引入了全新的管理门户。 据介绍,新版本针对 Windows、macOS 和 Linux 虚拟机进行了大量更新,最大的亮点是全新推出的 Parallels…

稳联Profinet转Canopen网关携手伺服,高效提升生产效率

在当今的工业生产领域,追求高效、精准和可靠的生产方式是企业不断努力的方向。稳联技术Profinet转Canopen(WL-ABC3033)网关与伺服系统的携手合作,为提高生产效率带来了新的机遇和突破。 实现无缝通信,优化生产流程稳联…

B站前端错误监控实践

前言 从23年开始,我们团队开始前端错误监控方向的开发。经历了一些列的迭代和发展,从监控SDK、上报、数据治理、看板集成、APM自研可视化初步完成了一条完整且适合B站前端监控。 截止目前(2024.08.01),前端监控在B站85%以上的业务线&#xf…

在基准测试和规划测试中选Flat还是Ramp-up?

Flat测试和Ramp-up测试是各有优势的,下面我们就通过介绍几种实用的性能测试策略来分析这两种加压策略的着重方向。 基准测试 基准测试是一种测量和评估软件性能指标的活动,通过基准测试建立一个已知的性能水平(称为基准线)&…

服务发现和代理实例的自动更新

☞ 返回总目录 1.服务发现的两种方式 StartFindService 方法 这是一个在后台启动的连续 “FindService” 活动,当服务实例的可用性发生变化时,会通过回调通知调用者。 它返回一个FindServiceHandle,可通过调用StopFindService来停止正在进行…

初学者蒙语学习,使用什么翻译软件学习更快?

为了加快蒙古语的学习,初学者应该从基础语法和词汇入手,利用语言学习应用进行系统学习,并通过音频和视频材料提高听力。语言交换和参加课程可以提供实践机会,而使用闪卡和文化沉浸有助于记忆词汇和理解语言背景。定期复习和设定学…

常用的k8s容器网络模式有哪些?

常用的k8s容器网络模式包括Bridge模式、Host模式、Overlay模式、Flannel模式、CNI(ContainerNetworkInterface)模式。K8s的容器网络模式多种多样,每种模式都有其特点和适用场景。Bridge模式适用于简单的容器通信场景;Host模式适用…

微服务保护之熔断降级

在微服务架构中,服务之间的调用是通过网络进行的,网络的不确定性和依赖服务的不可控性,可能导致某个服务出现异常或性能问题,进而引发整个系统的故障,这被称为 微服务雪崩。为了防止这种情况发生,常用的一些…

Debian项目实战——环境搭建篇

Debian系统安装 准备工作 1、系统镜像:根据自己的需要选择合适的版本格式:x86 / arm 架构 | 最好下载离线安装版本 | 清华镜像源 2、制作工具:balenaEtcher 3、系统媒介:16G以上U盘最佳 烧录镜像 打开balenaEtcher进行烧录&am…

克隆GitHub仓库中的一个文件夹

要只克隆GitHub仓库中的一个文件夹&#xff0c;你可以使用 git sparse-checkout 功能。以下是具体步骤&#xff1a; 克隆仓库&#xff08;使用 --no-checkout 选项&#xff0c;避免下载所有内容&#xff09;&#xff1a; git clone --no-checkout <仓库地址> 进入克隆的…

Active Directory 实验室设置第一部分- AD林安装

在之前的文章中&#xff0c;已经讨论了活动目录的基本知识。在这篇文章中&#xff0c;我们将讨论如何设置和配置环境&#xff0c;以便我们可以使用它来执行各种攻击方案和检测。我们将讨论如何通过GUI和CLI方式完成。 # 1、Active Directory 设置 让我们从活动目录实验室设置…

foc原理odrive驱动板的使用,以及功能介绍

文章目录 驱动板引脚&#xff1a;编码器的安装&#xff1a;电机参数编码器设置 odrive控制控制指令设置模式设置输入模式其他指令 调PID调试准则先调整内环&#xff0c;再调整外环在位置模式下调试结论 使用的灯哥开源的odrive驱动板&#xff0c;外接编码器 驱动板引脚&#xf…

泳池软管检测系统源码分享

泳池软管检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Computer Vis…

【数学分析笔记】第3章第2节 连续函数(4)

3. 函数极限与连续函数 3.2 连续函数 3.2.9 反函数的连续性定理 【定理3.2.2】【反函数连续性定理】设 y f ( x ) yf(x) yf(x)在闭区间 [ a , b ] [a,b] [a,b]上连续且严格单调增加&#xff0c;设 f ( a ) α , f ( b ) β f(a)\alpha,f(b)\beta f(a)α,f(b)β&#xff0…

web基础之RCE

简介&#xff1a;RCE称为远程代码执行漏洞&#xff1b;是互联网的一种安全漏洞&#xff1b;攻击者可以直接向后台服务器远程注入操作系统命令&#xff1b;从而操控后台系统&#xff1b;也是CTF比较常考的一个方面 1、eval执行 &#xff08;1&#xff09;分析后端代码&#xf…

[数据集][目标检测]无人机识别检测数据集VOC+YOLO格式6986张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;6986 标注数量(xml文件个数)&#xff1a;6986 标注数量(txt文件个数)&#xff1a;6986 标注…

物流管理系统小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;员工管理&#xff0c;部门管理&#xff0c;物品分类管理&#xff0c;物流公司管理&#xff0c;物流信息管理&#xff0c;配送信息管理 微信端账号功能包括&#xff1a;系统首页&a…

Mybatis 快速入门(maven)

文章目录 需求建表新建了数据库但是navicat界面没有显示 新建maven项目 注意导入依赖 总结 黑马学习笔记 需求 建表 注意&#xff1a;设置字符集 减少出错 drop database mybatis; create database mybatis charset utf8; use mybatis;drop table if exists tb_user;create…