【RabbitMQ】死信(延迟队列)的使用

目录

一、介绍

1、什么是死信队列(延迟队列)

2、应用场景

3、死信队列(延迟队列)的使用

4、死信消息来源

二、案例实践

1、案例一

2、案例二(消息接收确认 )

3、总结


一、介绍

1、什么是死信队列(延迟队列)

  •         死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制。
  •         死信队列(Dead Letter Queue)延迟队列(Delay Queue)是两种不同的队列类型,但在实际应用中它们可以结合使用。
  •         死信队列是当消息在队列中因为过期、被拒绝等原因无法正常处理时,会被重新发送到另一个交换机上,这个交换机就是死信交换机。死信队列可以用于实现重试机制、日志审计等特殊应用逻辑。

        延迟队列则是一种特殊的队列类型,它允许将消息延迟指定的时间后才能被消费者消费。这种队列通常用于处理那些需要在特定时间点被处理的任务,例如定时任务、限时优惠等。在RabbitMQ中,可以通过设置消息的TTL(生存时间)来实现延迟队列的功能。当消息在队列中超过了TTL,它就会被移除并被发送到指定的死信交换机,进而被路由到死信队列中。

结合使用死信队列和延迟队列可以实现一些复杂的应用逻辑。

例如:

        可以将某个需要延迟处理的消息发送到延迟队列中,并在消息过期之前将其存储在死信队列中。这样,当消息从延迟队列中移除时,它会被自动发送到死信队列中,然后由消费者消费并执行相应的操作。这种结合使用的方式可以提供更高的灵活性和可靠性,使得系统能够更好地应对各种异常情况。

2、应用场景

死信队列(Dead Letter Queue)和延迟队列(Delay Queue)在以下应用场景中表现优异:

        这些场景都是通过结合使用死信队列和延迟队列来提高系统的可靠性、鲁棒性和灵活性。通过合理地设置死信队列和延迟队列的参数,可以实现各种复杂的业务逻辑和异常处理机制。

  1. 库存解锁服务:例如,当一个商品被锁定且无法被购买时,可以将其放入死信队列中,并在一定时间后重新发送到原始队列进行处理,从而解锁库存。
  2. 定时关单功能:例如,用户在商城下单成功并点击去支付后,如果在指定时间内未支付,系统可以自动将订单放入死信队列中,并在一定时间后重新发送到原始队列进行处理。
  3. 保证订单业务的消息数据不丢失:当消息消费发生异常时,可以将消息投入死信队列中,以便后续等到环境好了之后,再消费死信队列中的消息。
  4. 实现重试机制:当某个消息处理失败时,可以将它放入死信队列中,并在一段时间后重新发送到交换机进行重试。这样可以提高系统的鲁棒性,确保消息能够被正确处理。
  5. 处理定时任务和秒杀活动:使用延迟队列可以将消息延迟到特定的时间后进行处理,例如定时任务、秒杀活动等。这样可以确保在特定的时间点执行相应的操作。
  6. 日志处理和审计:将日志消息发送到死信队列中,可以在日志发生异常时进行记录和审计,以便分析和排查问题。

3、死信队列(延迟队列)的使用

使用死信队列(Dead Letter Queue)和延迟队列(Delay Queue)可以提高系统的可靠性和灵活性,特别是在处理异常情况、重试机制和定时任务等方面。

需要注意的是,在使用死信队列和延迟队列时,需要考虑系统的可用性和性能。如果大量的消息被放入死信队列中,可能会导致系统资源的过度消耗。因此,需要合理配置死信队列和延迟队列的大小和数量,以及监控和管理系统的性能和资源使用情况。

  1. 定义死信队列和延迟队列:在RabbitMQ中,可以在定义队列时指定队列类型为死信队列或延迟队列。例如,使用RabbitMQ的命令行工具或管理界面创建死信队列或延迟队列。
  2. 配置交换机和队列属性:在绑定交换机和队列时,可以设置一些属性来控制消息的路由和消费。例如,可以设置消息的TTL(生存时间)来实现延迟队列的功能,或者设置消息的优先级和延迟时间等属性。
  3. 发送消息到队列:使用RabbitMQ的生产者将消息发送到定义的队列中。如果需要将消息放入延迟队列中,可以在发送消息时设置相应的延迟时间。
  4. 处理消息:消费者从队列中获取消息并进行处理。如果消息无法正常处理,可以将它放入死信队列中。在处理消息时,可以根据需要设置重试机制,以便在消息处理失败时重新发送消息到队列中进行重试。
  5. 监控和管理:使用RabbitMQ的管理界面或命令行工具监控和管理死信队列和延迟队列的状态和消息。可以查看队列的大小、消息的延迟时间、消费者数量等信息,并根据需要进行调整和管理。

4、死信消息来源

  •  消息 TTL 过期
  • 队列满了,无法再次添加数据
  • 消息被拒绝(reject 或 nack),并且 requeue =false

二、案例实践

1、案例一

生产者

在生产者的Config里面添加队列 。

 //    =============死信队列===============
    // 定义QueueA Bean,返回QueueA实例
    @Bean
    public Queue QueueA() {
        Map<String, Object> config = new HashMap<>();
        //message在该队列queue的存活时间最大为5秒
        config.put("x-message-ttl", 5000);
        //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        config.put("x-dead-letter-exchange", "deadExchange");
        //x-dead-letter-routing-key参数是给这个DLX指定路由键
        config.put("x-dead-letter-routing-key", "deadQueue");
        // 返回QueueA实例
        return new Queue("QueueA", true, true, false, config);
    }

    // 定义DirectExchangeA Bean,返回DirectExchangeA实例
    @Bean
    public DirectExchange DirectExchangeA() {
        // 返回DirectExchangeA实例
        return new DirectExchange("DirectExchangeA");
    }

    // 定义bindingA Bean,将QueueA绑定到DirectExchangeA,并设置路由键为A.A
    @Bean
    public Binding bindingA() {
        // 将QueueA绑定到DirectExchangeA
        return BindingBuilder
                // 设置路由键为A.A
                .bind(QueueA())
                .to(DirectExchangeA())
                .with("A.A");
    }

    // 定义QueueB Bean,返回QueueB实例
    @Bean
    public Queue QueueB() {
        // 返回QueueB实例
        return new Queue("QueueB");
    }

    // 定义DirectExchangeB Bean,返回DirectExchangeB实例
    @Bean
    public DirectExchange DirectExchangeB() {
        // 返回DirectExchangeB实例
        return new DirectExchange("DirectExchangeB");
    }

    // 定义bindingB Bean,将QueueB绑定到DirectExchangeB,并设置路由键为B.B
    @Bean
    public Binding bindingB() {
        // 将QueueB绑定到DirectExchangeB
        return BindingBuilder
                // 设置路由键为B.B
                .bind(QueueB())
                .to(DirectExchangeB())
                .with("B.B");
    }

消费者

在消费者里面添加QueueAQueueB 

QueueA

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueA")
public class ReceiverQA {
 
        // 接收directExchange01交换机中Queue02队列消息的方法
        @RabbitHandler
        public void Queue02(String msg) {
            log.warn("QueueA,接收到信息:" + msg);
        }
 
}

QueueB

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueB")
public class ReceiverQB {

    // 接收directExchange01交换机中Queue02队列消息的方法
    @RabbitHandler
    public void Queue02(String msg) {
        log.warn("QueueB,接收到信息:" + msg);
    }

}

编写Controller层

    @RequestMapping("send5")
    public String send5() {
        rabbitTemplate.convertAndSend("DirectExchangeA", "A.A", "Hello,A");
        return "🐉";
    }

结果:

2、案例二(消息接收确认 )

  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限

  • ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟

  • 消息确认模式有:

    • AcknowledgeMode.NONE:自动确认

    • AcknowledgeMode.AUTO:根据情况确认

    • AcknowledgeMode.MANUAL:手动确认

配置文件
        默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

在消费者里的更改ReceiverQA 

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueA")
public class ReceiverQA {

    // 接收directExchange01交换机中Queue02队列消息的方法
    @RabbitHandler
    public void QueueA(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.warn("QueueA,接收到信息:" + msg);
        channel.basicAck(tag, false);
    }

}
  • 需要注意的 basicAck 方法需要传递两个参数

    • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel

    • multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

3、总结

  • 持久化

    • exchange要持久化

    • queue要持久化

    • message要持久化

  • 消息确认

    • 启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去)

    • 生产者和Server(broker)之间的消息确认

    • 消费者和Server(broker)之间的消息确认

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/349197.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【c语言】扫雷

前言&#xff1a; 扫雷是一款经典的单人益智游戏&#xff0c;它的目标是在一个方格矩阵中找出所有的地雷&#xff0c;而不触碰到任何一颗地雷。在计算机编程领域&#xff0c;扫雷也是一个非常受欢迎的项目&#xff0c;因为它涉及到许多重要的编程概念&#xff0c;如数组、循环…

基于卡尔曼滤波的平面轨迹优化

文章目录 概要卡尔曼滤波代码主函数代码CMakeLists.txt概要 在进行目标跟踪时,算法实时测量得到的目标平面位置,是具有误差的,连续观测,所形成的轨迹如下图所示,需要对其进行噪声滤除。这篇博客将使用卡尔曼滤波,对轨迹进行优化。 优化的结果为黄色线。 卡尔曼滤波代码…

带【科技感】的Echarts 图表

Echarts脚本在线地址 https://cdn.jsdelivr.net/npm/echarts5.4.3/dist/echarts.min.js 引入Echarts 脚本后粘贴代码 vue2 代码&#xff1a; <template><div><div ref"col-2-row-2" class"col-2-row-2"></div></div> <…

PHP - Yii2 异步队列

1. 前言使用场景 在 PHP Yii2 中&#xff0c;队列是一种特殊的数据结构&#xff0c;用于处理和管理后台任务。队列允许我们将耗时的任务&#xff08;如发送电子邮件、push通知等&#xff09;放入队列中&#xff0c;然后在后台异步执行。这样可以避免在处理大量请求时阻塞主应用…

sklearn 学习-混淆矩阵 Confusion matrix

混淆矩阵Confusion matrix&#xff1a;也称为误差矩阵&#xff0c;通过计算得出矩阵的结果用来表示分类器的精度。其每一列代表预测值&#xff0c;每一行代表的是实际的类别。 from sklearn.metrics import confusion_matrixy_true [2, 0, 2, 2, 0, 1] y_pred [0, 0, 2, 2, 0…

ICMP协议详解

ICMP&#xff08;Internet Control Message Protocol&#xff09;协议是一个网络层协议。 一个新搭建好的网络&#xff0c;往往需要先进行一个简单的测试&#xff0c;来验证网络是否畅通&#xff1b;但是IP协议并不提供可靠传输。如果丢包了&#xff0c;IP协议并不能通知传输层…

【Kafka】开发实战和Springboot集成kafka

目录 消息的发送与接收生产者消费者 SpringBoot 集成kafka服务端参数配置 消息的发送与接收 生产者 生产者主要的对象有&#xff1a; KafkaProducer &#xff0c; ProducerRecord 。 其中 KafkaProducer 是用于发送消息的类&#xff0c; ProducerRecord 类用于封装Kafka的消息…

2023中国高速公路信息化发展盘点

文章目录 前言一、政策规范(一)《加快建设交通强国五年行动计划(2023—2027年)》(二)《关于推进公路数字化转型 加快智慧公路建设发展的意见》(三)《公路工程设施支持自动驾驶技术指南》(四)《贵州省智慧高速公路建设指南(试行)》(五)江苏省《智慧公路车路协同路…

监听元素宽高变化---new ResizeObserver

参考&#xff1a;ResizeObserver API详解-CSDN博客 有的时候需要监听某个元素的宽高变化&#xff0c;这个时候可以使用JS的 resizeObserver 钩子函数。 用于监视元素的大小变化。它可以观察一个或多个 DOM 元素&#xff0c;以便在元素的大小或形状发生变化时触发回调函数。R…

VsCode提高生产力的插件推荐-持续更新中

别名路径跳转 自定义配置// 文件名别名跳转 "alias-skip.mappings": { "~/": "/src", "views": "/src/views", "assets": "/src/assets", "network": "/src/network", "comm…

深圳工业元宇宙赋能新型工业化,推动工业制造业数字化转型发展

在当今数字化时代&#xff0c;工业制造业正面临着巨大的变革。随着技术的不断进步&#xff0c;工业元宇宙的概念逐渐成为推动工业制造业数字化转型的重要力量。深圳作为中国的高科技之都&#xff0c;在这方面走在了前列&#xff0c;积极探索工业元宇宙的应用&#xff0c;赋能新…

引领未来:云原生在产品、架构与商业模式中的创新与应用

文章目录 一、云原生产品创新二、云原生架构设计三、云原生商业模式变革《云原生落地 产品、架构与商业模式》适读人群编辑推荐内容简介目录 随着云计算技术的不断发展&#xff0c;云原生已经成为企业数字化转型的重要方向。接下来将从产品、架构和商业模式三个方面&#xff0c…

【洛谷】P1135奇怪的电梯(DFS)

这题利用 dfs 解决&#xff0c;编程实现比较简单。 具体来说&#xff0c;每层楼有两种可能&#xff0c;上楼或下楼&#xff0c;因此可以形成一个以 a 楼为根的二叉树&#xff0c;因此只需一个 for 循环遍历某个父节点的两个子节点&#xff0c;之后递归就行。 易错点&#xff…

浅出深入-机器学习

文章目录 一、K近邻算法1.1 先画一个散列图1.2 使用K最近算法建模拟合数据1.3 进行预测1.4 K最近邻算法处理多元分类问题1.5 K最近邻算法用于回归分析1.6 K最近邻算法项目实战-酒的分类1.6.1 对数据进行分析1.6.2 生成训练数据集和测试数据集1.6.3 使用K最近邻算法对数据进行建…

手把手教你用plotly绘制excel中常见的8种图表

目录&#xff1a; 0. 准备工作 1. 柱状图 2. 条形图 3. 折线图 4. 面积图 5. 饼图与圆环图 6. 散点图 7. 气泡图 8. 极坐标(雷达图) 0. 准备工作 我这边是在jupyterlab中演示的plotly图表&#xff0c;如果只安装plotly是无法正常显示图表的&#xff08;会显示为空白…

Mac怎么录屏?简单易懂,关键技巧分享!

随着时代的变迁&#xff0c;人们对mac电脑的使用需求也越来越多样化。其中&#xff0c;屏幕录制成为了很多用户的常用需求&#xff0c;比如录制教程、游戏视频、会议记录等。可是很多用户不知道mac怎么录屏。本文将为你详细介绍两种mac录屏的方法&#xff0c;让大家轻松学会如何…

Internet Download Manager 6.42.3 (IDM) 中文破解免激活绿色版

Internet Download Manager 6.42.3中文破解版&#xff0c;全球最佳下载利器。Internet Download Manager (简称IDM) 是一款Windows 平台功能强大的多线程下载工具&#xff0c;国外非常受欢迎。支持断点续传&#xff0c;支持嗅探视频音频&#xff0c;接管所有浏览器&#xff0c;…

【并发编程】AQS——详细解释公平锁,非公平锁,独占锁,什么是可重入以及condition

目录 1、公平 2.非公平 3.独占锁 4.可重入 5.condition 1、公平 第一步&#xff1a;获取状态的 state 的值。如果 state0 即代表锁没有被其它线程占用&#xff0c;执行第二步。如果 state!0 则代表锁正在被其它线程占用&#xff0c;执行第三步。 第二步&#xff1a;判断队列…

ICSpector:一款功能强大的微软开源工业PLC安全取证框架

关于ICSpector ICSpector是一款功能强大的开源工业PLC安全取证框架&#xff0c;该工具由微软的研究人员负责开发和维护&#xff0c;可以帮助广大研究人员轻松分析工业PLC元数据和项目文件。 ICSpector提供了方便的方式来扫描PLC并识别ICS环境中的可疑痕迹&#xff0c;可以用于…

Spring与Web环境集成

1. Spring与Web环境集成 1.1 ApplicationContext应用上下文获取方式 应用上下文对象是通过new ClasspathXmlApplicationContext(spring配置文件) 方式获取的&#xff0c;但是每次从容器中获得Bean时都要编写new ClasspathXmlApplicationContext(spring配置文件) &#xff0c;这…