RabbitMQ 死信队列应用

1. 概念

死信队列(Dead Letter Queue)是在消息队列系统中的一种特殊队列,用于存储无法被消费的消息。消息可能会因为多种原因变成“死信”,例如消息过期、消息被拒绝、消息队列长度超过限制等。当消息变成“死信”时,它们会被路由到死信队列中,以便进行进一步处理或分析。 死信队列能够帮助系统进行消息跟踪、监控和处理异常情况,是消息队列系统中的重要组成部分。

2. 应用场景

死信队列在消息队列系统中有多种应用场景,包括但不限于以下几个方面:

  • 延迟消息处理:实现延迟消息投递,例如实现消息的定时投递、消息重试机制等。

  • 任务调度:用于实现任务调度系统,例如延迟执行任务、失败重试任务等。

  • 异常处理:处理消息消费失败或超时的情况,对异常消息进行统一处理。

  • 业务流程控制:实现业务流程中的状态控制和超时处理,例如订单超时取消、支付超时处理等。

  • 监控和统计:对异常消息进行统计和分析,用于系统性能监控和问题排查。

这些应用场景展示了死信队列的灵活性和实用性,在实际系统开发中具有广泛的应用价值。

3. 造成消息进入死信队列的原因

消息成为死信的原因有以下几种:

  • 消息被拒绝(basic.reject或basic.nack),并且requeue标志被设置为false。若参数requeue为true,则表示还可以将此跳消息重新塞回普通队列,若为false则消息被拒绝后直接进入死信队列。

  • 消息过期。在生产者设置生产时设置,若消费者未在过期时间内消费消息,则消息被转发到死信队列中。("x-message-ttl")

  • 队列达到最大长度。当普通队列中消息堆积数量长度达到了maxLength,则会将新接收的消息转发到死信队列中去,从而避免消息丢失。

4. 死信队列工作流程图

5. 代码示例

5.1 引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.7.15</version>
</dependency>

5.2 RabbitMQ配置

@Configuration
public class RabbitConfig {

    /**
     * 死信队列消息模型构建----------------------------------------------------------------------------------
     **/
    // 创建普通队列
    @Bean
    public Queue basicQueue() {
        Map<String, Object> params = new HashMap<>(8);
        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", Exchange.DEMO_DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", RoutingKey.DEMO_DEAD_ROUTING_KEY);
        // 注意这里是毫秒单位,这里我们给10秒
        params.put("x-message-ttl", 10*1000);
        return new Queue(MyQueue.DEMO_CONSUMER_QUEUE, true, false, false, params);
    }

    //创建“基本消息模型”的基本交换机,面向生产者
    @Bean
    public TopicExchange basicExchange() {
        //创建并返回基本交换机实例
        return new TopicExchange(Exchange.DEMO_BASIC_NORMAL_EXCHANGE, true, false);
    }

    //创建“基本消息模型”的基本绑定(基本交换机+基本路由),面向生产者
    @Bean
    public Binding basicBinding() {
        //创建并返回基本消息模型中的基本绑定(注意这里是正常交换机跟死信队列绑定在一定,不叫死信路由)
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(RoutingKey.DEMO_ROUTING_KEY);
    }

    // 创建死信交换机
    @Bean
    public TopicExchange deadLetterExchange() {
        //创建并返回死信交换机实例
        return new TopicExchange(Exchange.DEMO_DEAD_LETTER_EXCHANGE, true, false);
    }

    // 创建第二个中转站
    // 创建死信队列
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(MyQueue.DEMO_DEAD_LETTER_QUEUE, true);
    }

    // 创建死信路由及其绑定
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(RoutingKey.DEMO_DEAD_ROUTING_KEY);
    }


    public static class Exchange {
        public static final String DEMO_BASIC_NORMAL_EXCHANGE = "demo.basic.exchange";

        public static final String DEMO_DEAD_LETTER_EXCHANGE = "demo.dead.letter.exchange";
    }

    public static class RoutingKey {
        //交换机与报表队列绑定的RoutingKey
        public static final String DEMO_ROUTING_KEY = "demo.basic.routing.key";

        public static final String DEMO_DEAD_ROUTING_KEY = "demo.dead.routing.key";
    }

    /**
     * 队列名称
     * @author peng.zhang
     * @date 2024/01/30
     */
    public static class MyQueue {
        //报表队列名称
        public static final String DEMO_CONSUMER_QUEUE = "demo.basic.queue";

        //死信队列名称
        public static final String DEMO_DEAD_LETTER_QUEUE = "demo.dead.letter.queue";
    }
}

5.3 消息生产者

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息到死信队列
     */
    @PostMapping("/testDeadQueue")
    public String testDeadQueue() {
        // 设置生产者到交换机的确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("correlationData:{}, ack:{}, cause:{}", JSON.toJSONString(correlationData), ack, cause);
        });
        // 设置消息未被队列接收时的返回回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, ex, routing) -> {
            log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}", JSON.toJSONString(message),
                    replyCode, replyText, ex, routing);
        });
        // 生成关联数据并发送消息到交换机
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        // 消息内容
        String messageBody = StrUtil.format("this message send at {}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));
        rabbitTemplate.convertAndSend(RabbitConfig.Exchange.DEMO_BASIC_NORMAL_EXCHANGE, RabbitConfig.RoutingKey.DEMO_ROUTING_KEY, messageBody, correlationData);
        log.info(">>>>>{}, 发送消息:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), messageBody);
        return "OK";
    }

}

5.4 消息消费者

@Component
@Slf4j
public class DeadLetterConsumer {
    /**
     * 监听 DEMO_CONSUMER_QUEUE 并处理传入的消息。
     * 为测试目的抛出 IOException 以模拟异常。
     *
     * @param messageBody 消息负载
     * @param headers     消息头
     * @param channel     用于消息确认的通道
     * @throws IOException 如果抛出异常
     */
    @RabbitListener(queues = RabbitConfig.MyQueue.DEMO_CONSUMER_QUEUE)
    @RabbitHandler
    public void testBasicQueueAndThrowsException(@Payload String messageBody, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        /**
         * Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,
         * 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
         * RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
         */
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

        log.info(">>>>>{} 普通队列消费, tag = {}, 消息内容:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), tag, messageBody);
        /**
         *  multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认
         *  如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认
         */
        // ACK,确认一条消息已经被消费
//        channel.basicAck(deliveryTag, false);

        // 对应的业务操作。。。。。
        // doBusiness();

        // 模拟消息拒绝
        channel.basicNack(tag, false, false);
    }

    /**
     * 处理业务逻辑
     */
    private void doBusiness() {
        System.out.println("here do some business code");
    }


    /**
     * 监听死信队列并处理消息。
     *
     * @param data    消息内容
     * @param tag     消息标签
     * @param channel 通道
     */
    @RabbitListener(queues = RabbitConfig.MyQueue.DEMO_DEAD_LETTER_QUEUE)
    @RabbitHandler
    public void fromDeadLetter(@Payload String data, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) {
        log.info(">>>>>{} 死信队列消费, tag = {}, 消息内容:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), tag, data);
        // 对应的业务操作。。。。。
    }
}

5.5 YML配置

spring:
  rabbitmq:
    username: rabbitmq
    password: rabbitmq
    port: 5672
    host: 127.0.0.1
    #publisher-confirm-type参数有三个可选值:
    #SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。
    #CORRELATED:消息从生产者发送到交换机后触发回调方法。
    #NONE(默认):关闭发布确认模式。
    publisher-confirm-type: correlated
    template:
      receive-timeout: 1800000
      reply-timeout: 1800000
      retry:
        enabled: false
    listener:
      direct:
        retry:
          enabled: true
        default-requeue-rejected: false
      simple:
        retry:
          # 是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
          enabled: true
          # 最大重试次数
          max-attempts: 1
          # 重试间隔时间(单位毫秒)
          initial-interval: 10000
          # 重试最大时间间隔(单位毫秒)
          max-interval: 300000
          # 应用于前一重试间隔的乘法器
          multiplier: 5
        default-requeue-rejected: false

5.6 控制台输出

从控制台可以看出,消息被拒绝后,大概10秒后死信队列消息被消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/363283.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(申请积分专用)我的世界(MC)整合包开服教程,Pokehaan Craft 2整合包服务器搭建教程

Minecraft整合包服务器搭建教程&#xff0c;宝可梦/神奇宝贝整合包&#xff08;Pokehaan Craft 2&#xff09;开服教程。 其他整合包也可以参考此教程。要看这个整合包的游戏截图可以翻到文章最底下。 什么是整合包 Minecraft的整合包是一种包含了多个模组&#xff08;mod&a…

面向对象设计的七大设计原则

在我们探讨如何创建健壮且可维护的面向对象系统时&#xff0c;有一些原则可以为我们提供指导。这些原则可以帮助我们理解如何最好地组织我们的类和对象&#xff0c;以实现高效、模块化和可扩展的设计。在本篇文章中&#xff0c;我们将探讨这些原则&#xff0c;以及如何在我们的…

Pyth 预言机: 它们如何影响Hover?

所有链上借贷市场都使用一种称为“oracle&#xff08;预言机&#xff09;”的服务&#xff0c;为dApp提供代币定价。Oracle是一个数据系统&#xff0c;将链下信息&#xff08;例如KuCoin上的BTC/USDT价格&#xff09;传递到链上合约。从那里&#xff0c;应用程序可以支付一小笔…

MAVEN(1)

分模块开发与设计 分模块开发意义 将原始模块按照功能拆分成若干个子模块&#xff0c;方便模块间相互调用&#xff0c;接口共享 步骤示例 这里以之前开发的SpringMVC_ssm中的domain模块为例 第一步、创建Maven模块 父项改为none&#xff0c;文件存储位置需要做出相应调整 …

MySQL基础(三)-学习笔记

一.innodb引擎&#xff1a; 1). 表空间&#xff1a;表空间是InnoDB存储引擎逻辑结构的最高层&#xff0c;启用了参数 innodb_file_per_table(在 8.0版本中默认开启) &#xff0c;则每张表都会有一个表空间&#xff08;xxx.ibd&#xff09;&#xff0c;一个mysql实例可以对应多个…

Revisiting image pyramid structure for high resolution salient object detection

accv2022的技术&#xff0c;在我测评的数据集上确实要明显好于basnet&#xff0c;rembg等一众方法。 1.Introduction 使用LR数据集训练的方法通过调整输入尺寸可以在HR图像上产生不错的结果。本文主要关注仅使用LR数据集进行训练以产生高质量的HR预测。HR的有效感受野ERFs和LR…

QT学习日记 | 信号与槽

目录 前言 一、初始信号与槽 1、信号与槽的本质 2、信号与槽的使用 3、内置信号、内置槽函数与自定义信号、自定义槽函数 &#xff08;1&#xff09;文档查询 &#xff08;2&#xff09;自定义信号与内置槽函数的使用 4、信号与槽函数关联关系 5、带参数的信号与槽函数…

计算机毕业设计 | springboot 多功能商城 购物网站(附源码)

1&#xff0c; 概述 国家大力推进信息化建设的大背景下&#xff0c;城市网络基础设施和信息化应用水平得到了极大的提高和提高。特别是在经济发达的沿海地区&#xff0c;商业和服务业也比较发达&#xff0c;公众接受新事物的能力和消费水平也比较高。开展商贸流通产业的信息化…

Java和JavaScript的区别与联系

引言 Java是一种由Sun Microsystems&#xff08;现在是Oracle公司&#xff09;开发的面向对象编程语言&#xff0c;最初于1995年发布。Java被设计为一种跨平台的语言&#xff0c;可以在多个操作系统上运行&#xff0c;这是其广泛应用的重要原因之一。Java具有丰富的标准库和第三…

常见分类网络的结构

VGG16 图片来自这里 MobilenetV3 small和large版本参数,图片来着这里 Resnet 图片来自这里

AutoDL使用conda运行pytorch、dgl

环境配置要是出现兼容问题还是挺繁琐的。所以这里记录下成功的配置情况。 conda create --name Test python3.9 # 构建一个虚拟环境 conda init bash && source /root/.bashrc # 更新bashrc中的环境变量 conda activate Test # 切换到该虚拟环境 pip install torch…

windows安装oracle之后怎么连接使用

目录 1.打开SQl Developer 2.选择JDK 3.登录 4.创建表空间,用户 安装oracle的详细教程 WINDOWS安装Oracle11.2.0.4-CSDN博客 1.打开SQl Developer 找到 SQl Developer 2.选择JDK 根据你安装的oracle版本,因为我的oracle是安装的32位的,所以这里jdk也要选择32位 选择到ja…

私募证券基金动态-23年12月报

成交量&#xff1a;12月日均7,696.93亿元 2023年12月A股两市日均成交7,696.93亿元&#xff0c;环比下降12.39%、同比下降2.26%。12月整体21个交易日&#xff0c;无单日交易日成交金额过万亿&#xff0c;单日交易日最低成交金额为6,122.84亿元&#xff08;12月25日&#xff09;…

【Linux】进程通信——共享内存+消息队列+信号量

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;折纸花满衣 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;【LeetCode】winter vacation training 目录 &#x1f449;&#x1f3fb;共享内存&#x1f449;&#x1f3fb;关…

测试用例的设计(超详细)

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 关注公众号&#xff1a;互联网杂货铺&#xff0c;回复1 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;薪资嘎嘎涨 1. 测试用例的概念 软件测试人员向被测试系统提供的一…

MySQL窗口函数--lead()函数

lead()函数&#xff1a; 查询当前行向下偏移n行对应的结果 该函数有三个参数&#xff1a;第一个为待查询的参数列名&#xff0c;第二个为向下偏移的位数&#xff0c;第三个参数为超出最下面边界的默认值。 如下代码&#xff1a; 查询向下偏移 2 位的年龄 SELECT user_id,user…

学习Android的第一天

目录 什么是 Android&#xff1f; Android 官网 Android 应用程序 Android 开发环境搭建 Android 平台架构 Android 应用程序组件 附件组件 Android 第一个程序 HelloWorld 什么是 Android&#xff1f; Android&#xff08;发音为[ˈnˌdrɔɪd]&#xff0c;非官方中文…

Linux 驱动开发基础知识——总线设备驱动模型(八)

个人名片&#xff1a; &#x1f981;作者简介&#xff1a;学生 &#x1f42f;个人主页&#xff1a;妄北y &#x1f427;个人QQ&#xff1a;2061314755 &#x1f43b;个人邮箱&#xff1a;2061314755qq.com &#x1f989;个人WeChat&#xff1a;Vir2021GKBS &#x1f43c;本文由…

SAP SD出库单部分开票后无法继续开票

SAP SD出库单部分开票后无法继续开票。 凭证 80051268 没有包含任何带有未清数量的项目 消息编号 VF171 诊断 凭证80051268不包含可以转换到开票类型中的项目。 系统响应 系统拒绝任何后续处理。 步骤 请检查选择的销售和分销凭证。 除了修改VBUP的相关字段&#xff0c;还有…

服务器未启动而端口进程仍在运行如何查看并杀死

首先登录服务器然后查看当前监听的端口&#xff1a; sudo netstat -tuln比如这里的8080&#xff0c;我们此时并未启动服务器&#xff0c;但是它却正在运行&#xff0c;这会导致服务器刚启动就秒挂。如果没有日志的话会让人有点疑惑&#xff0c;这种情况可能是之前运行了该进程…