Rabbitmq延迟消息

目录

  • 一、延迟消息
    • 1.基于死信实现延迟消息
      • 1.1 消息的TTL(Time To Live)
      • 1.2 死信交换机 Dead Letter Exchanges
      • 1.3 代码实现
    • 2.基于延迟插件实现延迟消息
      • 2.1 插件安装
      • 2.2 代码实现
    • 3.基于延迟插件封装消息

一、延迟消息

延迟消息有两种实现方案:
1,基于死信队列
2,集成延迟插件

1.基于死信实现延迟消息

使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:
消息的TTL(存活时间)和死信交换机Exchange,通过这两者的组合来实现延迟队列

1.1 消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
如何设置TTL:
我们创建一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是毫秒),那所在压在这个队列的消息在5秒后会消失。

1.2 死信交换机 Dead Letter Exchanges

一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
(1) 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
(2)上面的消息的TTL到了,消息过期了。
(3)队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
在这里插入图片描述
我们现在可以测试一下延迟队列。
(1)创建死信队列
(2)创建交换机
(3)建立交换器与队列之间的绑定
(4)创建队列

1.3 代码实现

在service-mq 中添加配置类

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadLetterMqConfig {
    // 声明一些变量

       public static final String exchange_dead = "exchange.dead";
    public static final String routing_dead_1 = "routing.dead.1";
    public static final String routing_dead_2 = "routing.dead.2";
    public static final String queue_dead_1 = "queue.dead.1";
    public static final String queue_dead_2 = "queue.dead.2";

    // 定义交换机
    @Bean
    public DirectExchange exchange(){
        return new DirectExchange(exchange_dead,true,false,null);
    }

    @Bean
    public Queue queue1(){
        // 设置如果队列一 出现问题,则通过参数转到exchange_dead,routing_dead_2 上!
        HashMap<String, Object> map = new HashMap<>();
        // 参数绑定 此处的key 固定值,不能随意写
        map.put("x-dead-letter-exchange",exchange_dead);
        map.put("x-dead-letter-routing-key",routing_dead_2);
        // 设置延迟时间
        map.put("x-message-ttl ", 10 * 1000);
        // 队列名称,是否持久化,是否独享、排外的【true:只可以在本次连接中访问】,是否自动删除,队列的其他属性参数
        return new Queue(queue_dead_1,true,false,false,map);
    }

    @Bean
    public Binding binding(){
        // 将队列一 通过routing_dead_1 key 绑定到exchange_dead 交换机上
        return BindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1);
    }

    // 这个队列二就是一个普通队列
    @Bean
    public Queue queue2(){
        return new Queue(queue_dead_2,true,false,false,null);
    }

    // 设置队列二的绑定规则
    @Bean
    public Binding binding2(){
        // 将队列二通过routing_dead_2 key 绑定到exchange_dead交换机上!
        return BindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2);
    }
}

配置发送消息

@RestController
@RequestMapping("/mq")
@Slf4j
public class MqController {

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @Autowired
   private RabbitService rabbitService;

 @GetMapping("sendDeadLettle")
   public Result sendDeadLettle() {
      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     this.rabbitTemplate.convertAndSend(DeadLetterMqConfig.exchange_dead, DeadLetterMqConfig.routing_dead_1, "ok");
      System.out.println(sdf.format(new Date()) + " Delay sent.");
      return Result.ok();
   }
}

消息接收方

@Component
public class DeadLetterReceiver {


    @RabbitListener(queues = DeadLetterMqConfig.queue_dead_2)
    public void getMessage(String msg, Message message, Channel channel) throws IOException {
        //时间格式化
        SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss");

        System.out.println("消息接收的时间:\t"+simpleDateFormat.format(new Date()));

        System.out.println("消息的内容"+msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);

    }
}

在这里插入图片描述

2.基于延迟插件实现延迟消息

2.1 插件安装

Rabbitmq实现了一个插件x-delay-message来实现延时队列

  1. 首先我们将刚下载下来的rabbitmq_delayed_message_exchange-3.9.0.ez文件上传到RabbitMQ所在服务器,下载地址:https://www.rabbitmq.com/community-plugins.html
  2. 切换到插件所在目录,执行 docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins 命令,将刚插件拷贝到容器内plugins目录下
  3. 执行 docker exec -it rabbitmq /bin/bash 命令进入到容器内部,并 cd plugins 进入plugins目录
  4. 执行 ls -l|grep delay 命令查看插件是否copy成功
  5. 在容器内plugins目录下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令启用插件
  6. exit命令退出RabbitMQ容器内部,然后执行 docker restart rabbitmq 命令重启RabbitMQ容器

2.2 代码实现

配置队列

@Configuration
public class DelayedMqConfig {

    public static final String exchange_delay = "exchange.delay";
    public static final String routing_delay = "routing.delay";
    public static final String queue_delay_1 = "queue.delay.1";

     @Bean
    public Queue delayQeue1() {
        // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
        return new Queue(queue_delay_1, true);
    }

    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(exchange_delay, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding delayBbinding1() {
        return BindingBuilder.bind(delayQeue1()).to(delayExchange()).with(routing_delay).noargs();
    }
}

发送消息

@GetMapping("sendelay")
public Result sendDelay() {
   SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
   this.rabbitTemplate.convertAndSend(DelayedMqConfig.exchange_delay, DelayedMqConfig.routing_delay, sdf.format(new Date()), new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
         message.getMessageProperties().setDelay(10 * 1000);
         System.out.println(sdf.format(new Date()) + " Delay sent.");
         return message;
      }
   });
   return Result.ok();
}

接收消息

@Component
public class DelayReceiver {

    @RabbitListener(queues = DelayedMqConfig.queue_delay_1)
    public void get(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("Receive queue_delay_1: " + sdf.format(new Date()) + " Delay rece." + msg);
    }

}

3.基于延迟插件封装消息

/**
 * 封装发送延迟消息方法
 * @param exchange
 * @param routingKey
 * @param msg
 * @param delayTime
 * @return
 */
public Boolean sendDelayMsg(String exchange,String routingKey, Object msg, int delayTime){
    //  将发送的消息 赋值到 自定义的实体类
    GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();
    //  声明一个correlationId的变量
    String correlationId = UUID.randomUUID().toString().replaceAll("-","");
    gmallCorrelationData.setId(correlationId);
    gmallCorrelationData.setExchange(exchange);
    gmallCorrelationData.setRoutingKey(routingKey);
    gmallCorrelationData.setMessage(msg);
    gmallCorrelationData.setDelayTime(delayTime);
    gmallCorrelationData.setDelay(true);

    //  将数据存到缓存
    this.redisTemplate.opsForValue().set(correlationId,JSON.toJSONString(gmallCorrelationData),10,TimeUnit.MINUTES);

    //  发送消息
    this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,message -> {
        //  设置延迟时间
        message.getMessageProperties().setDelay(delayTime*1000);
        return message;
    },gmallCorrelationData);

    //  默认返回
    return true;
}

修改retrySendMsg方法 – 添加判断是否属于延迟消息

//  判断是否属于延迟消息
if (gmallCorrelationData.isDelay()){
    //  属于延迟消息
    this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),message -> {
        //  设置延迟时间
        message.getMessageProperties().setDelay(gmallCorrelationData.getDelayTime()*1000);
        return message;
    },gmallCorrelationData);
}else {
    //  调用发送消息方法 表示发送普通消息  发送消息的时候,不能调用 new RabbitService().sendMsg() 这个方法
    this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);
}

利用封装好的工具类 测试发送延迟消息

//  基于延迟插件的延迟消息
@GetMapping("sendDelay")
public Result sendDelay(){
    //  声明一个时间对象
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    System.out.println("发送时间:"+simpleDateFormat.format(new Date()));
    this.rabbitService.sendDelayMsg(DelayedMqConfig.exchange_delay,DelayedMqConfig.routing_delay,"iuok",3);
    return Result.ok();
}

重试了4次,所以我们需要保证幂等性
在这里插入图片描述
结果会 回发送三次,也被消费三次!
如何保证消息幂等性?
1.使用数据方式
2.使用redis setnx 命令解决 — 推荐

@SneakyThrows
@RabbitListener(queues = DelayedMqConfig.queue_delay_1)
public void getMsg2(String msg,Message message,Channel channel){

    //  使用setnx 命令来解决 msgKey = delay:iuok
    String msgKey = "delay:"+msg;
    Boolean result = this.redisTemplate.opsForValue().setIfAbsent(msgKey, "0", 10, TimeUnit.MINUTES);
    //  result = true : 说明执行成功,redis 里面没有这个key ,第一次创建, 第一次消费。
    //  result = false : 说明执行失败,redis 里面有这个key
    //  不能: 那么就表示这个消息只能被消费一次!  那么第一次消费成功或失败,我们确定不了!  --- 只能被消费一次!
        //        if (result){
        //            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        //            System.out.println("接收时间:"+simpleDateFormat.format(new Date()));
        //            System.out.println("接收的消息:"+msg);
        //            //  手动确认消息
        //            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        //        } else {
        //          //    不能消费!
        //        }
    //  能: 保证消息被消费成功    第二次消费,可以进来,但是要判断上一个消费者,是否将消息消费了。如果消费了,则直接返回,如果没有消费成功,我消费。
    //  在设置key 的时候给了一个默认值 0 ,如果消费成功,则将key的值 改为1
    if (!result){
        //  获取缓存key对应的数据
        String status = (String) this.redisTemplate.opsForValue().get(msgKey);
        if ("1".equals(status)){
            //  手动确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            return;
        } else {
            //  说明第一个消费者没有消费成功,所以消费并确认
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            System.out.println("接收时间:"+simpleDateFormat.format(new Date()));
            System.out.println("接收的消息:"+msg);
            //  修改redis 中的数据
            this.redisTemplate.opsForValue().set(msgKey,"1");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            return;
        }
    }
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    System.out.println("接收时间:"+simpleDateFormat.format(new Date()));
    System.out.println("接收的消息:"+msg);

    //  修改redis 中的数据
    this.redisTemplate.opsForValue().set(msgKey,"1");
    //  手动确认消息
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/74005.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

12-数据结构-数组、矩阵、广义表

数组、矩阵、广义表 目录 数组、矩阵、广义表 一、数组 二.矩阵 三、广义表 一、数组 这一章节理解基本概念即可。数组要看清其实下标是多少&#xff0c;并且二维数组&#xff0c;存取数据&#xff0c;要先看清楚是按照行存还是按列存&#xff0c;按行则是正常一行一行的去读…

汇编知识点之磁盘文件存取技术

1.文件代号式磁盘存取 &#xff08;1&#xff09;两个重要的表 (2)简要说明&#xff1a; 文件代号式存取方式将有关文件的各种信息都包括在DOS中。 在处理指定文件时必须使用一个完整的路径名&#xff0c;一旦文件的路径名被送入操作系统&#xff0c;就被赋予一个简单的文件…

第二章:CSS基础进阶-part2:CSS过渡与动画

文章目录 CSS3 过渡动画一、transition属性二、transform属性-2D变换2.1 tanslate &#xff1a; 移动2.2 rotate-旋转2.3 scale-变形2.4 skew-斜切2.5 transform-origin: 变换中心点设置 三、CSS3关键帧动画四、CSS3-3D变换4.1 perspective 定义3D元素距视图距离4.2 transform-…

微服务系列(2)--注册中心

在博文&#xff1a;微服务系列(1)里我们提到过注册中心的概念&#xff0c;简单来说微服务注册中心是一个用于存储和管理微服务实例信息的组件&#xff0c;它提供了服务注册、服务发现、服务健康检查等功能&#xff0c;以确保微服务之间的稳定通信。在微服务架构中&#xff0c;各…

工程监测振弦采集仪采集到的数据如何进行分析和处理

工程监测振弦采集仪采集到的数据如何进行分析和处理 振弦采集仪是一个用于测量和记录物体振动的设备。它通过测量物体表面的振动来提取振动信号数据&#xff0c;然后将其转换为数字信号&#xff0c;以便进行分析和处理。在实际应用中&#xff0c;振弦采集仪是广泛应用于机械、建…

C#软件外包开发流程

C# 是一种由微软开发的多范式编程语言&#xff0c;常用于开发各种类型的应用程序&#xff0c;从桌面应用程序到移动应用程序和Web应用程序。下面和大家分享 C# 编程学习流程&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#…

深度学习笔记(kaggle课程《Intro to Deep Learning》)

一、什么是深度学习&#xff1f; 深度学习是一种机器学习方法&#xff0c;通过构建和训练深层神经网络来处理和理解数据。它模仿人脑神经系统的工作方式&#xff0c;通过多层次的神经网络结构来学习和提取数据的特征。深度学习在图像识别、语音识别、自然语言处理等领域取得了…

通过代码实现窗口界面布局的方法

在QWidget窗口中添加相关事件resizeEvent()函数并编写相关功能代码&#xff1a; void Widget::resizeEvent(QResizeEvent *event) {QSize szui->plainTextEdit->size();ui->plainTextEdit->move(5,5);ui->pabpic->move(5,sz.height()5);ui->plainTextEd…

上传excel文件

文件上传&#xff0c;其实就是用el-upload组件来实现上传&#xff0c;只是换了样式&#xff0c;和图片上传一样 <el-form-item label"选择文件"><el-input placeholder"请选择文件" v-model"form.file" disabled style"width: 45…

Android T 窗口层级其二 —— 层级结构树的构建(更新中)

如何通过dump中的内容找到对应的代码&#xff1f; 我们dump窗口层级发现会有很多信息&#xff0c;adb shell dumpsys activity containers 这里我们以其中的DefaultTaskDisplayArea为例 在源码的framework目录下查找该字符串&#xff0c;找到对应的代码就可以通过打印堆栈或者…

vue.draggable浅尝

介绍 Vue.Draggable是一款基于Sortable.js实现的vue拖拽插件。支持移动设备、拖拽和选择文本、智能滚动&#xff0c;可以在不同列表间拖拽、不依赖jQuery为基础、vue 2过渡动画兼容、支持撤销操作&#xff0c;总之是一款非常优秀的vue拖拽组件。本篇将介绍如何搭建环境及简单的…

Python爬虫之解决浏览器等待与代理隧道问题

作为专业爬虫程序员&#xff0c;我们往往需要应对一些限制性挑战&#xff0c;比如浏览器等待和使用代理隧道。在Python爬虫开发中&#xff0c;这些问题可能会导致我们的爬虫受阻。本文将为你分享解决这些问题的方案&#xff0c;帮助你顺利应对浏览器等待和代理隧道的挑战&#…

jeecg-boot批量导入问题注意事项

现象&#xff1a; 由于批量导入数据速度很快&#xff0c; 因为数据库中的create time字段的时间可能一样&#xff0c;并且jeecg框架自带的是根据生成时间排序&#xff0c; 因此在前端翻页查询的时候&#xff0c;数据每次排序可能会不一样&#xff0c; 会出现第一页已经出现过一…

分类预测 | MATLAB实现BO-BiGRU贝叶斯优化双向门控循环单元多输入分类预测

分类预测 | MATLAB实现BO-BiGRU贝叶斯优化双向门控循环单元多输入分类预测 目录 分类预测 | MATLAB实现BO-BiGRU贝叶斯优化双向门控循环单元多输入分类预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.Matlab实现BO-BiGRU贝叶斯优化双向门控循环单元多特征分…

dirsearch_暴力扫描网页结构

python3 dirsearch 暴力扫描网页结构&#xff08;包括网页中的目录和文件&#xff09; 下载地址&#xff1a;https://gitee.com/xiaozhu2022/dirsearch/repository/archive/master.zip 下载解压后&#xff0c;在dirsearch.py文件窗口&#xff0c;打开终端&#xff08;任务栏…

Linux命名管道进程通信

文章目录 前言一、什么是命名管道通信二、创建方式三、代码示例四、文件进程通信总结 前言 命名管道 是实现进程间通信的强大工具&#xff0c;它提供了一种简单而有效的方式&#xff0c;允许不同进程之间进行可靠的数据交换。不仅可以在同一主机上的不相关进程间进行通信&…

2023年国赛数学建模思路 - 复盘:光照强度计算的优化模型

文章目录 0 赛题思路1 问题要求2 假设约定3 符号约定4 建立模型5 模型求解6 实现代码 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 问题要求 现在已知一个教室长为15米&#xff0c;宽为12米&…

【JavaScript】match用法 | 正则匹配

match正则匹配 var e "www.apple.com:baidu.com" var match e.match(/com/g) console.log("match: "match);> "match: com,com"match返回值问题 match的返回值是一个数组 数组的第0个元素是与整个正则表达式匹配的结果 数组的第1个元素是…

【新品发布】ChatWork企业知识库系统源码

系统简介 基于前后端分离架构以及Vue3、uni-app、ThinkPHP6.x、PostgreSQL、pgvector技术栈开发&#xff0c;包含PC端、H5端。 ChatWork支持问答式和文档式知识库&#xff0c;能够导入txt、doc、docx、pdf、md等多种格式文档。 导入数据完成向量化训练后&#xff0c;用户提问…

查看单元测试用例覆盖率新姿势:IDEA 集成 JaCoCo

1、什么是 IDEA IDEA 全称 IntelliJ IDEA&#xff0c;是 Java 编程语言开发的集成环境。IntelliJ 在业界被公认为最好的 Java 开发工具&#xff0c;尤其在智能代码助手、代码自动提示、重构、JavaEE 支持、各类版本工具(git、SVN 等)、JUnit、CVS 整合、代码分析、 创新的 GUI…