RabbitMQ实现延迟消息发送——实战篇

在项目中,我们经常需要使用消息队列来实现延迟任务,本篇文章就向各位介绍使用RabbitMQ如何实现延迟消息发送,由于是实战篇,所以不会讲太多理论的知识,还不太理解的可以先看看MQ的延迟消息的一个实现原理再来看这篇文章跟着练手哦~

需求背景

我这儿的一个需求背景大概是干部添加完活动后,由管理员进行审批,审批通过后,该活动id连同设置的过期时间会被放入消息队列中,等到活动结束时间到的时候,自动将活动的状态设置为已完成,这里华丽一个活动图,各位参考一下。

ce9bd03466514d6294a1a1de81f7772d.png

需求了解完之后我们就可以开始的写代码啦~(手动微笑)

相关知识点拓展

这里还是简单提一下MQ实现延迟队列的一个方法,一种是用插件,还有一种是使用死信队列,当然本文我们使用的就是通过死信队列来实现的。

当我们的一个正常消息因为设置了过期时间或者被消费者拒绝消费的时候,这条消息就会被放入死信队列中,然后死信队列再进行消费。

然后啰嗦一下,说一下MQ的交换机类型,以及死信交换机一般选用哪种:

1. Direct Exchange(直连交换机)

  • 特点
    • 根据消息的 Routing Key 精确匹配队列的 Binding Key
    • 完全匹配时,消息才会被路由到对应的队列。
  • 适用场景
    • 点对点消息传递,消息需要精确路由到特定队列。
  • 示例
    • 消息的 Routing Key 为 order.created,队列的 Binding Key 也为 order.created,则消息会被路由到该队列。

2. Fanout Exchange(扇出交换机)

  • 特点
    • 将消息广播到所有绑定到该交换机的队列,忽略 Routing Key。
  • 适用场景
    • 广播消息,消息需要发送到多个队列。
  • 示例
    • 消息发送到 Fanout Exchange,所有绑定到该交换机的队列都会收到消息。

3. Topic Exchange(主题交换机)

  • 特点
    • 根据消息的 Routing Key 和队列的 Binding Key 进行模式匹配。
    • Binding Key 支持通配符:
      • *:匹配一个单词。
      • #:匹配零个或多个单词。
  • 适用场景
    • 消息需要根据模式路由到多个队列。
  • 示例
    • 消息的 Routing Key 为 order.created.us,队列的 Binding Key 为 order.created.*,则消息会被路由到该队列。

4. Headers Exchange(头交换机)

  • 特点
    • 根据消息的 Headers(键值对)匹配队列的 Binding Arguments。
    • 忽略 Routing Key。
  • 适用场景
    • 消息需要根据复杂的条件路由到队列。
  • 示例
    • 消息的 Headers 包含 type=order 和 region=us,队列的 Binding Arguments 要求 x-match=all 且 type=order,则消息会被路由到该队列。

5. Default Exchange(默认交换机)

  • 特点
    • RabbitMQ 默认创建的交换机,类型为 Direct Exchange。
    • 每个队列都会自动绑定到默认交换机,Binding Key 为队列名称。
  • 适用场景
    • 默认情况下,消息可以直接发送到队列。

死信交换机适合使用哪种类型?

死信交换机(DLX, Dead Letter Exchange)的类型选择取决于你的业务需求。以下是常见的选择:

1. Direct Exchange

  • 适用场景
    • 死信消息需要精确路由到特定的死信队列。
  • 示例
    • 将死信消息路由到 dlx-queue,用于统一处理所有死信消息。

2. Topic Exchange

  • 适用场景
    • 死信消息需要根据不同的 Routing Key 路由到不同的死信队列。
  • 示例
    • 将死信消息根据业务类型(如 order.deadpayment.dead)路由到不同的死信队列。

3. Fanout Exchange

  • 适用场景
    • 死信消息需要广播到多个死信队列。
  • 示例
    • 将死信消息同时发送到日志队列和报警队列。

推荐选择

  • 大多数情况下,死信交换机使用 Direct Exchange,因为死信消息通常需要精确路由到一个死信队列,用于统一处理。
  • 如果死信消息需要根据不同的条件路由到多个队列,可以使用 Topic Exchange

代码部分

首先,我们需要定义一个死信交换机和死信队列,用来接收来自普通队列的消息。

//    创建死信交换机,处理延迟消息通知
    @Bean("dead_letter_exchange")
    public DirectExchange delayExchange(){
        return new DirectExchange("dead_letter_exchange",true,false);
    }
//    创建死信队列
    public Queue deadLetterQueue(){
        Queue queue = new Queue("dead_letter_queue", true);
        rabbitAdmin.declareQueue(queue);
        log.info("死信队列声明成功:" + queue.getName());
        return queue;    }

然后,我们需要配置一个普通的消息队列和一个普通的交换机,这个消息队列需要设置对应的死信交换机和死信路由,同时我们这个普通队列需要接收一个过期时间,保证一到过期时间消息就会被发送到死信队列当中。

//    创建一个普通队列,接受一个过期时间,出列活动结束后,发送到死信队列
    public Queue normalQueue(Long expireTime){
        Map<String,Object> args = new HashMap<>();
        if (expireTime != null && expireTime > 0) {  // 确保 TTL 是正数
            args.put("x-message-ttl", expireTime);
        }
        // 设置死信交换机
        args.put("x-dead-letter-exchange",deadLetterExchange);
        // 设置死信路由键
        args.put("x-dead-letter-routing-key","dead_letter_routing_key");
        Queue queue = new Queue("normal_queue", true, false, false, args);
        log.info("普通队列声明成功:" + queue.getName());
        return queue;    }
//    创建一个普通交换机,处理活动结束自动设置活动状态为结束
    @Bean("activity_end_exchange")
    public DirectExchange activityEndExchange(){
        return new DirectExchange("activity_end_exchange");
    }

然后我们需要分别将死信交换机和死信队列,普通交换机和普通队列分别进行绑定。

//    将死信队列和死信交换机进行绑定
    public void bindDeadLetterRouting(){
        Queue queue=queueDeclareConfig.deadLetterQueue();
        Binding binding = BindingBuilder.bind(queue)
                .to(deadLetterExchange)
                .with("dead_letter_routing_key");
        rabbitAdmin.declareBinding(binding);
        log.info("死信队列绑定成功,死信队列名称----》" + queue.getName() + ",死信交换机名称----》" + deadLetterExchange.getName());
    }

//    绑定活动结束交换机和普通队列
    public void bindActivityEndRouting(Long expireTime) {
        Queue queue = queueDeclareConfig.normalQueue(expireTime);
        Binding binding = BindingBuilder.bind(queue)
                .to(activityEndExchange)
                .with("activity_end_routing_key");
        rabbitAdmin.declareBinding(binding);
    }

当然,我们还需要配置生产者来发送消息到交换机里面

//活动结束后,发送消息到死信队列,自动设置活动结束状态
    public void sendActivityEndMessage(Long expireTime, Integer activityId) {
        rabbitMQBindRoutingConfig.bindDeadLetterRouting();
        rabbitMQBindRoutingConfig.bindActivityEndRouting(expireTime);
        try {
            // 将消息发送到普通队列,等待消息过期发送到死信交换机
            rabbitTemplate.convertAndSend("activity_end_exchange", "activity_end_routing_key"
                    , activityId
                    , msg -> {
                        msg.getMessageProperties().setExpiration(expireTime.toString());
                        return msg;
                    }
            );
        } catch (Exception e) {
            log.error("发送消息失败------->" + activityId);
            throw new RuntimeException("发送消息失败---->" + activityId);
        }
    }

这里生产者的代码可以根据你的业务逻辑具体进行更改~

消费者逻辑也需要进行编写一下

//    使用MQ延迟队列,活动结束,修改活动状态
    @RabbitListener(queues = "dead_letter_queue")
    public void updatePlaceOccupyStatus(Message message, Channel channel){
        try {
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            Integer activityId = Integer.parseInt(messageBody);
            ActivityInfo activityInfo = baseMapper.selectById(activityId);
            LambdaUpdateWrapper<ActivityInfo> wrapper = new LambdaUpdateWrapper<>();
            wrapper.eq(ActivityInfo::getActivityId,activityId)
                    .set(ActivityInfo::getProgress,StatusConstant.FINISH);
            if(baseMapper.update(activityInfo,wrapper)>0){
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        } catch (Exception e) {
            log.error("处理消息时发生错误:" + e.getMessage());
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }

消费者这边需要注意的是如果你选择的提交类型不是自动提交的话,在处理完消息之后需要手动ack一下消息,不然消费的消息不会被认为已经消费,从而导致消息积压,也会在之后的消费中重复进行消费,因此你需要告诉生产者这条消息已经被消费了。

当然,如果在消费的过程中出现了什么问题,可以设置以下这行代码:

419e93610db14c019fcfb49ad0d23703.png

basicNack方法接收三个参数:
deliveryTag: 消息的标识符。
multiple: 是否对多个消息进行否定确认。
requeue: 是否将消息重新放入队列。 

可以根据你的需求进行设定~

然后的然后,我们需要再application.yml当中进行配置相关信息:

rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
#    确认消息发送到交换机上
    publisher-confirm-type: correlated
    #    消息发送到队列确认,失败回调
    publisher-returns: true
    listener:
      direct:
        acknowledge-mode: manual
        retry:
          enabled: true
#          重试的时间间隔为1s
          initial-interval: 1000ms
#          最大重试3次
          max-attempts: 3
#          最大的重试时间间隔为2s
          max-interval: 2000ms
#          每次重试时间间隔为1s,每次重试时间间隔倍数
          multiplier: 1.0
          #重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)
        default-requeue-rejected: false
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
#        最小消费者数量
        concurrency: 1
#        最大消费者数量
        max-concurrency: 10
        retry:
          enabled: true
          initial-interval: 1000ms
          max-attempts: 3
          max-interval: 2000ms
          multiplier: 1.0

上面给出了一个比较全的配置,你可以根据你的需求进行选择,但是需要注意的是default-requeue-rejected: false这一行配置一定要先配置,不然你的消息在普通队列中过期了,是不会发送到死信队列当中进行消费的~

到这儿,基本上所有的代码都写的差不多了,当然我们还需要再rabbitmq控制平台上分别建一个普通交换机和一个死信交换机,一个普通队列和一个私信队列,然后分别绑定就可以了。

注意的是,普通交换机也需要在平台上配置一次死信队列和死信路由:

5f1c7325f13a4910bd2d28e8b62a7f60.png

1d9202ed97554cc4ab1601d6f839b0ef.png

到这儿,如果没有什么问题的话基本上已经可以直接运行了,所以我的这篇文章到这儿基本上也已经结束了,如果你有什么问题,可以评论区留言,我们相互学习~

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/955686.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【PCL】Segmentation 模块—— 欧几里得聚类提取(Euclidean Cluster Extraction)

1、简介 PCL 的 Euclidean Cluster Extraction&#xff08;欧几里得聚类提取&#xff09; 是一种基于欧几里得距离的点云聚类算法。它的目标是将点云数据分割成多个独立的簇&#xff08;clusters&#xff09;&#xff0c;每个簇代表一个独立的物体或结构。该算法通过计算点与点…

ElasticSearch上

安装ElasticSearch Lucene&#xff1a;Java语言的搜索引擎类库&#xff0c;易扩展&#xff1b;高性能&#xff08;基于倒排索引&#xff09;Elasticsearch基于Lucene&#xff0c;支持分布式&#xff0c;可水平扩展&#xff1b;提供Restful接口&#xff0c;可被任何语言调用Ela…

GitLab:添加SSH密钥之前,您不能通过SSH来拉取或推送项目代码

1、查看服务器是否配置过 [rootkingbal-ecs-7612 ~]# cd .ssh/ [rootkingbal-ecs-7612 .ssh]# ls authorized_keys id_ed25519 id_ed25519.pub id_rsa id_rsa.pub2、创建密钥 $ ssh-keygen -t rsa -C kingbalkingbal.com # -C 后写你的邮箱 一路回车 3、复制密钥 [rootk…

《目标检测数据集下载地址》

一、引言 在计算机视觉的广袤领域中&#xff0c;目标检测宛如一颗璀璨的明星&#xff0c;占据着举足轻重的地位。它宛如赋予计算机一双锐利的 “眼睛”&#xff0c;使其能够精准识别图像或视频中的各类目标&#xff0c;并确定其位置&#xff0c;以边界框的形式清晰呈现。这项技…

Kibana 控制台中提供语义、向量和混合搜索

作者&#xff1a;来自 Elastic Mark_Laney 想要将常规 Elasticsearch 查询与新的 AI 搜索功能结合起来吗&#xff1f;那么&#xff0c;你不需要连接到某个第三方的大型语言模型&#xff08;LLM&#xff09;吗&#xff1f;不。你可以使用 Elastic 的 ELSER 模型来改进现有搜索&a…

Golang Gin系列-3:Gin Framework的项目结构

在Gin教程的第3篇&#xff0c;我们将讨论如何设置你的项目。这不仅仅是把文件扔得到处都是&#xff0c;而是要对所有东西的位置做出明智的选择。相信我&#xff0c;这些东西很重要。如果你做得对&#xff0c;你的项目会更容易处理。当你以后不再为了找东西或添加新功能而绞尽脑…

程序设计:排版、检验报告的上下标解决几种办法

【啰嗦两句】 本文重点在于提供几个针对排版文档、各种检验报告系统等程序设计时&#xff0c;遇到的上下标录入、绘制展示等问题的应对办法&#xff0c;但是准确地说&#xff0c;并没有非常优秀的方案。 【上下标难题】 一般的行业或许对上下标并没有严格要求&#xff0c;多数…

TCP 重传演进:TCP RACK Timer 能替代 RTO 吗

本文的建议适用于想改变 TCP 行为的新协议设计&#xff0c;还是那句话&#xff0c;不要抄 TCP 做 yet another TCP。 RTO 一直是 TCP 传输过程所要尽量避免的&#xff0c;因为它会将状态带入 Loss 进而 Go-Back-N&#xff0c;这是一个昂贵的操作。But 在 Fast-Retransmit 被引…

PCL 新增自定义点类型【2025最新版】

目录 一、自定义点类型1、前言2、定义方法3、代码示例二、合并现有类型三、点云按时间渲染1、CloudCompare渲染2、PCL渲染博客长期更新,本文最近更新时间为:2025年1月18日。 一、自定义点类型 1、前言 PCL库自身定义了很多点云类型,但是在使用的时候时如果要使用自己定义的…

Python操作Excel——openpyxl使用笔记(5)

5 其他操作 5.1 合并单元格 有些Excel表格存在合并多个单元格的情况&#xff0c;此时可以使用工作表的merge_cells函数&#xff0c;例如合并第1~2行和1~2列&#xff1a; import openpyxl from openpyxl.comments import Comment wb openpyxl.load_workbook(./test.xlsx) w…

Linux简介和环境搭建

Linux 介绍和环境搭建 1、发行版本 Linux 操作系统有多个主流发行版本&#xff0c;每个版本根据不同的目标、特点和使用场景为用户提供了不同的功能和体验。 Ubuntu • 特点&#xff1a;Ubuntu 是最为人熟知的 Linux 发行版之一&#xff0c;强调易用性和用户友好性&#xff…

LabVIEW时域近场天线测试

随着通信技术的飞速发展&#xff0c;特别是在5G及未来通信技术中&#xff0c;天线性能的测试需求日益增加。对于短脉冲天线和宽带天线的时域特性测试&#xff0c;传统的频域测试方法已无法满足其需求。时域测试方法在这些应用中具有明显优势&#xff0c;可以提供更快速和精准的…

SQL Server查询计划操作符——查询计划相关操作符(4)

7.3. 查询计划相关操作符 28)Declare:该操作符在查询计划中分配一个本地变量。该操作符是一个语言元素。该操作符具体如图7.2-28所示。 图 7.2-28 查询计划操作符Declare示例 29)Delete:该操作符从一个对象中删除满足其参数列中可选谓词的数据行。该操作符具体如图7.2-29…

复用类(3):在组合与继承之间选择、protected关键字、向上转型

1 在组合与继承之间选择 组合和继承都允许在新的类中放置子对象&#xff0c;组合是显式地这样做&#xff0c;而继承则是隐式地做。你或许想知道二者之间的区别何在&#xff0c;以及怎样在二者之间做出选择。 组合技术通常用于想在新类中使用现有类的功能而非它的接口这种情形。…

Java-数据结构-二叉树习题(1)

对于二叉树的学习&#xff0c;主要的还是得多多练习~毕竟二叉树属于新的知识&#xff0c;并且也并不是线性结构&#xff0c;再加上经常使用递归的方法解决二叉树的问题&#xff0c;所以代码的具体流程还是无法看到的&#xff0c;只能通过画图想象&#xff0c;所以还是必须多加练…

彩色图像面积计算一般方法及MATLAB实现

一、引言 在数字图像处理中&#xff0c;经常需要获取感兴趣区域的面积属性&#xff0c;下面给出图像处理的一般步骤。 1.读入的彩色图像 2.将彩色图像转化为灰度图像 3.灰度图像转化为二值图像 4.区域标记 5.对每个区域的面积进行计算和显示 二、程序代码 %面积计算 cle…

计算机网络 (41)文件传送协议

前言 一、文件传送协议&#xff08;FTP&#xff09; 概述&#xff1a; FTP&#xff08;File Transfer Protocol&#xff09;是互联网上使用得最广泛的文件传送协议。FTP提供交互式的访问&#xff0c;允许客户指明文件的类型与格式&#xff08;如指明是否使用ASCII码&#xff0…

vscode的安装与使用

下载 地址&#xff1a;https://code.visualstudio.com/ 安装 修改安装路径&#xff08;不要有中文&#xff09; 点击下一步&#xff0c;创建桌面快捷方式&#xff0c;等待安装 安装中文插件 可以根据自己的需要安装python和Jupyter插件

用Cursor生成一个企业官网前端页面(生成腾讯、阿里官网静态页面)

用Cursor生成一个企业官网前端页面 第一版&#xff1a; <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><…

简要认识Web技术三剑客:HTMLCSSJavaScript

目录 一、web标准二、什么是HTML三、什么是CSS四、什么是JavaScript 黑马JAVAWeb飞书在线讲义地址&#xff1a; https://heuqqdmbyk.feishu.cn/wiki/LYVswfK4eigRIhkW0pvcqgH9nWd 一、web标准 Web标准也称网页标准&#xff0c;由一系列的标准组成&#xff0c;大部分由W3C&…