RabbitMQ 延时消息实现

1. 实现方式

1. 设置队列过期时间:延迟队列消息过期 + 死信队列,所有消息过期时间一致
2. 设置消息的过期时间:此种方式下有缺陷,MQ只会判断队列第一条消息是否过期,会导致消息的阻塞
   需要额外安装 `rabbitmq_delayed_message_exchange` 插件才能解决此问题
  • 导入Spring 集成RabbitMQ MAEVN
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

2. 设置队列过期时间:延迟队列消息过期 + 死信队列

推送消息至延迟队列 -> 消息过期自动推送到死信队列 -> 消费死信队列

2.1. MQ配置信息

2.1.1. 自定义队列配置

…/bootstrap.yml

# rabbitmq自定义配置
rabbitmq:
  ttlExchange: medical_dev_ttl_topic_change
  ttlKey: dev_ttl
  ttlQueue: medical.dev.ttl.topic.queue
  delayExpireTime: 600
  ttlQueueSize: 10000
  deadExchange: medical_dev_dead_topic_change
  deadKey: dev_dead
  deadQueue: medical.dev.dead.topic.queue
2.1.2. 读取自定义MQ配置信息
/**
 * amqp配置文件
 */
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class MyConfigProperties {

    /**
     * 延迟队列
     */
    public String ttlExchange;
    public String ttlKey;
    public String ttlQueue;
    private Integer delayExpireTime;
    public Integer ttlQueueSize;

    /**
     * 死信队列
     */
    public String deadExchange;
    public String deadKey;
    public String deadQueue;

}

2.2. 配置文件自动生成队列

2.2.1. 延迟队列
import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.HashMap;

/**
 * 延迟队列配置文件
 * 
 * @author mingAn.xie
 */
@Configuration
public class RabbitMQConfigTTL {

    @Resource
    MyConfigProperties myConfigProperties;

    // 1: 声明交换机
    @Bean
    public TopicExchange ttlTopicExchange(){
        return new TopicExchange(myConfigProperties.getTtlExchange());
    }

    // 2: 声明队列
    // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    @Bean
    public Queue ttlTopicduanxinQueue(){
        HashMap<String, Object> args = new HashMap<>();
        // 给队列设置消息过期时间:毫秒值
        args.put("x-message-ttl", mqConfigProperties.getDelayExpireTime() * 1000);
        // 设置队列最大长度
        args.put("x-max-length", myConfigProperties.getTtlQueueSize());
        // 设置死信队列交换机名称
        // 当消息在一个队列中变成死信后,它能就发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列被称之为死信队列
        // 编程死信队列的原因:消息被拒绝,消息过期,队列达到最大长度
        args.put("x-dead-letter-exchange", myConfigProperties.getDeadExchange());
        // 设置死信队列路由key
        args.put("x-dead-letter-routing-key", myConfigProperties.getDeadKey());
        return new Queue(myConfigProperties.getTtlQueue(), true, false, false, args);
    }

    // 3: 绑定对用关系
    @Bean
    public Binding ttlTopicsmsBinding(){
        return BindingBuilder.bind(ttlTopicduanxinQueue()).to(ttlTopicExchange()).with(myConfigProperties.getTtlKey());
    }

}
2.2.2. 死信队列

import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * 死信队列配置文件
 * 
 * @author mingAn.xie
 */
@Configuration
public class RabbitMQConfigDead {

    @Resource
    MyConfigProperties myConfigProperties;

    // 1: 声明交换机
    @Bean
    public TopicExchange deadTopicExchange(){
        return new TopicExchange(myConfigProperties.getDeadExchange());
    }

    // 2: 声明队列
    // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    @Bean
    public Queue deadTopicduanxinQueue(){
        return new Queue(myConfigProperties.getDeadQueue(), true);
    }

    // 3: 绑定对用关系
    @Bean
    public Binding deadTopicsmsBinding(){
        return BindingBuilder.bind(deadTopicduanxinQueue()).to(deadTopicExchange()).with(myConfigProperties.getDeadKey());
    }

}

2.3. 生产者推送消息

import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * RabbitMQ生产者推送消息类
 * 
 * @author xiemingan
 */
@Component
@Slf4j
public class RabbitmqProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Resource
    private MyConfigProperties myConfigProperties;

    /**
     * @param pushMessage 推送消息体
     */
    public void pushTtlMessage(String pushMessage) {
		// 推送消息至交换机,并指定路由key
        rabbitTemplate.convertAndSend(myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);
        log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}", myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);
    }

}

2.4. 消费者处理消息

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * @author mingAn.xie
 */
@Log4j2
@Component
public class RabbitmqConsumer {

    /**
     * 消费死信队列
     * @param message 消息体
     */
    @RabbitListener(queues = "${rabbitmq.deadQueue}")
    public void pushMessages(Message message) {

        String body = new String(message.getBody()).trim();
        if (StringUtils.isEmpty(body)){
            return;
        }
        log.info("MQ消息消费, RabbitmqConsumer.pushMessages() : {}", body);
    }

}

3. 设置消息的过期时间

设置交换机类型为 x-delayed-type,推送消息至交换机,直连队列消费

3.1. 安装插件 rabbitmq_delayed_message_exchange

前言:这里默认使用环境为 Liunx 系统 Docker 安装 RabbitMQ

具体可以参考这篇文章:Docker 安装 RabbitMQ 挂载配置文件

安装插件版本需要与RabbitMQ版本一致,否则可能会导致安装失败,可先进入RabbitMQ容器中查看其他插件版本

插件各版本地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  • 这里以最新版本 v3.13.0 举例
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez

# 将插件复制进容器中: rabbitmq_xxxxxx
docker cp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq_xxxxxx:/plugins

# 进入容器: rabbitmq_xxxxxx
docker exec -it rabbitmq_xxxxxx bash
cd plugins

# 查询插件列表, 此处可看到插件的版本
rabbitmq-plugins list

# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 交换机类型中出现 x-delayed-type 表示安装成功

3.2. MQ配置信息

3.2.1. 自定义队列配置

…/bootstrap.yml

#mq队列自定义配置
rabbitmq:
  saveTaskTtlExchange: ey240001_pro_save_task_ttl_topic_exchange
  saveTaskTtlKey: ey240001_pro_save_task_ttl
  saveTaskTtlQueue: ey240001.pro.save.task.ttl.topic.queue
  saveTaskTtlQueueSize: 10000
3.2.2. 读取自定义MQ配置信息
/**
 * amqp配置文件
 *
 * @author mingAn.xie
 */
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class MyConfigProperties {

    /**
     * 任务待办生成延时队列
     */
    public String saveTaskTtlExchange;
    public String saveTaskTtlKey;
    public String saveTaskTtlQueue;
    public Integer saveTaskTtlQueueSize;

}

3.3. 配置文件生成 x-delayed-type 交换机

import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * x-delayed-type 交换机延迟队列配置
 * 
 * @author mingAn.xie
 */
@Configuration
public class RabbitMQConfigSaveTaskTtl {

    @Resource
    MyConfigProperties myConfigProperties;

    // 1: 声明交换机
    @Bean
    public CustomExchange saveTaskTopicExchange() {
        Map<String, Object> args = new HashMap<>();
        // 设置延迟队列插件类型:按过期时间消费
        args.put("x-delayed-type", "direct");
        // 参数:name 交换机名称,type 交换机类型,durable 是否持久化,autoDelete 是否自动删除,arguments 参数
        return new CustomExchange(myConfigProperties.getSaveTaskTtlExchange(), "x-delayed-message", true, false, args);
    }

    // 2: 声明队列
    // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    @Bean
    public Queue saveTaskTopicduanxinQueue() {
        return new Queue(myConfigProperties.getSaveTaskTtlQueue(), true, false, false);
    }

    // 3: 绑定对用关系
    @Bean
    public Binding saveTaskTopicsmsBinding() {
        return BindingBuilder.bind(saveTaskTopicduanxinQueue()).to(saveTaskTopicExchange()).with(myConfigProperties.getSaveTaskTtlKey()).noargs();
    }

}

3.4. 生产者推送消息

import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 生产者推送消息类
 * 
 * @author xiemingan
 */
@Component
@Slf4j
public class RabbitmqProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private MyConfigProperties myConfigProperties;

    /**
     * @param pushMessage 推送消息体
     * @param ttlTime     延时时间(毫秒值)
     */
    public void pushTtlMessage(String pushMessage, long ttlTime) {
        ttlTime = ttlTime <= 0 ? 1000 : ttlTime;
        // 3.1.推送MQ延迟消息队列
        long finalTtlTime = ttlTime;
        MessagePostProcessor messagePostProcessor = message -> {
            // 设置延迟时间
            message.getMessageProperties().setDelay((int) finalTtlTime);
            return message;
        };
        rabbitTemplate.convertAndSend(myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, messagePostProcessor);
        log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}, ttlTime: {}", myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, ttlTime);
    }

}

3.5. 消费者处理消息

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * @author mingAn.xie
 */
@Log4j2
@Component
public class RabbitmqConsumer {

    /**
     * 消费延时消息
     * @param message 消息体
     */
    @RabbitListener(queues = "${rabbitmq.saveTaskTtlQueue}")
    public void pushMessages(Message message) {

        String body = new String(message.getBody()).trim();
        if (StringUtils.isEmpty(body)) {
            return;
        }
        log.info("MQ延迟消息消费, RabbitmqConsumer.pushMessages() : {}", body);

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/500587.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

下载及安装PHP,composer,phpstudy,thinkPHP6.0框架

文章目录 前言 thinkPHP是一款开源的PHP框架&#xff0c;它是基于MVC&#xff08;Model-View-Controller&#xff09;设计模式构建的。thinkPHP提供了丰富的功能和组件&#xff0c;使得开发人员可以快速、高效地构建和维护Web应用程序。 以下是thinkPHP框架的一些特点和功能&…

C语言最大公约数(辗转相除法)

输入两个整数&#xff0c;求他们的最大公约数&#xff1a; 如果我们不用辗转相除法的话&#xff0c;两个整数的最大公约数&#xff0c;我们就可以定义一个整数为两个整数中最小的那个数&#xff0c;然后两个整数一起除我们新定义的整数&#xff0c;如果都除尽了&#xff0c;这…

Vidmore Video Fix for Mac 视频修复工具

Vidmore Video Fix for Mac是一款功能强大且易于使用的视频修复工具&#xff0c;专为Mac用户设计。它凭借先进的视频修复技术&#xff0c;能够帮助用户解决各种视频问题&#xff0c;如视频文件损坏、无法播放、格式不支持等。 软件下载&#xff1a;Vidmore Video Fix for Mac v…

php将网页用wkhtmltoimage内容生成为图片

php架构ThinkPHP6 1. 安装 knp-snappy架构 composer require knplabs/knp-snappy use Knp\Snappy\Image; use Illuminate\Support\Facades\Storage;// 生成图片 /user/local/bin/wkhtmltoimage为你的wkhtmltoimage的位置。 $snappy new Image(/usr/local/bin/wkhtmltoimage…

【NoSQL】MongoDB

文章目录 概述NoSQL数据库四大家族mongodb和mysql存储数据形式有什么不同 概念适用场景环境搭建1、下载2、安装 基础入门高级查询聚合和管道索引备份和恢复来源 概述 MongoDB是一个基于分布式文件存储的数据库。由C语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案…

vscode调试Unity

文章目录 vscode调试UnityC#环境需求开始调试 Lua添加Debugger环境配置联系.txt文件配置Java环境 添加调试代码断点不生效的问题 vscode调试Unity C# 现在使用vscode调试Unity的C#代码很简单&#xff0c;直接在vscode的EXTENSIONS里面搜索“Unity”&#xff0c;第一个就是&am…

如何在 Mac 上打开、编辑、复制、移动或删除存储在 Windows NTFS 格式 USB 驱动器上的文件 Tuxera NTFS for Mac使用教程

当您获得一台新 Mac 时&#xff0c;它只能读取 Windows NTFS 格式的 USB 驱动器。要将文件添加、保存或写入您的 Mac&#xff0c;您需要一个附加的 NTFS 驱动程序。Tuxera 他可以帮忙实现这一功能&#xff01; Tuxera可以轻松转换驱动器&#xff1a;无论使用Windows PC还是Mac&…

什么是PMP,对工作的作用大不大?

PMP最早是由PMI发起&#xff0c;已成为全球公认的项目管理专业资格认证之一&#xff0c;PMP认证在全球190多个国家和地区获得最广泛的专业认可。 随着PMP的认可度不断提高&#xff0c;持有PMP证书的项目经理在求职过程中更有优势&#xff0c;不少公司对项目经理是否持证上岗也制…

2.java openCV4.x 入门-hello OpenCV

专栏简介 &#x1f492;个人主页 &#x1f4f0;专栏目录 点击上方查看更多内容 &#x1f4d6;心灵鸡汤&#x1f4d6;我们唯一拥有的就是今天&#xff0c;唯一能把握的也是今天 &#x1f9ed;文章导航&#x1f9ed; ⬆️ 1.环境搭建 ⬇️ 3.Mat之构造函数与数据类型 hell…

三步提升IEDA下载速度——修改IDEA中镜像地址

找到IDEA的本地安装地址 D:\tool\IntelliJ IDEA 2022.2.4\plugins\maven\lib\maven3\conf 搜索阿里云maven仓库 复制https://developer.aliyun.com/mvn/guide中红框部分代码 这里也是一样的&#xff1a; <mirror><id>aliyunmaven</id><mirrorOf>*&…

期货开户要找到适合自己的系统

物有一个生物圈&#xff0c;大鱼吃小鱼&#xff0c;小鱼吃虾。在期货市场这条生物圈里面&#xff0c;大部分人就是期货市场的虾子&#xff0c;是被吃的&#xff0c;所以必须成长起来&#xff0c;往更高一层走&#xff0c;到可以吃虾子的时候&#xff0c;就是挣钱的时候。学习不…

TCP/IP 网络模型有哪几层?(计算机网络)

应用层 为用户提供应用功能 传输层 负责为应用层提供网络支持 使用TCP和UDP 当传输层的数据包大小超过 MSS&#xff08;TCP 最大报文段长度&#xff09; &#xff0c;就要将数据包分块&#xff0c;这样即使中途有一个分块丢失或损坏了&#xff0c;只需要重新发送这一个分块…

【解決|三方工具】Obi Rope 编辑器运行即崩溃问题

开发平台&#xff1a;Unity 2021.3.7 三方工具&#xff1a;Unity资产工具 - Obi Rope   问题背景 使用Unity三方开发工具 - Obi Rope 模拟绳索效果。配置后运行 Unity 出现报错并崩溃。通过崩溃日志反馈得到如下图所示 这是一个序列化问题造成的崩溃&#xff0c;指向性为 Obi…

深度学习pytorch——卷积神经网络(持续更新)

计算机如何解析图片&#xff1f; 在计算机的眼中&#xff0c;一张灰度图片&#xff0c;就是许多个数字组成的二维矩阵&#xff0c;每个数字就是此点的像素值&#xff08;图-1&#xff09;。在存储时&#xff0c;像素值通常位于[0, 255]区间&#xff0c;在深度学习中&#xff0…

蓝桥OJ3514 子串简写 (暴力+二分)

子串简写 一.暴力 思路: 只能通过60%。 从字符串开头遍历&#xff0c;如果遇到c1就进入子遍历&#xff0c;遇到长度大于等于k且以c2结尾的子串就使cnt;遍历完之后再从外遍历找c1。 这种方法的弊端在于&#xff1a;外遍历 #include<bits/stdc.h> using namespace std; con…

网络安全新视角:数据可视化的力量

在当今数字化时代&#xff0c;网络安全已成为各大企业乃至国家安全的重要组成部分。随着网络攻击的日益复杂和隐蔽&#xff0c;传统的网络安全防护措施已难以满足需求&#xff0c;急需新型的解决方案以增强网络防护能力。数据可视化技术&#xff0c;作为一种将复杂数据转换为图…

备考ICA----Istio实验13---使用 Istio Ingress 暴露应用

备考ICA----Istio实验13—使用Istio Ingress TLS暴露应用 1. 环境部署 清理之前实验遗留,并重新部署httpbin服务进行测试 # 清理之前的环境 kubectl delete vs httpbin kubectl delete gw mygateway # 部署httpbin kubectl apply -f istio/samples/httpbin/httpbin.yaml 确认…

Android RecyclerView 滑动后选中的条目居中显示

话不多说先看效果: 实录效果视频如下 滚动居中 RecyclerView 在原有的RecyclerView 基础上操作&#xff0c;其他步骤不变&#xff0c;只是替换一下 manager 步骤 导入依赖 maven { url https://www.jitpack.io }//无限滚动implementation com.github.ZhaoChanghu:GalleryLayou…

爬虫(Web Crawler)逆向技术探索

实战案例分析 为了更好地理解爬虫逆向的实际应用&#xff0c;我们以一个具体的案例进行分析。 案例背景 假设我们需要从某电商网站上获取商品价格信息&#xff0c;但该网站采取了反爬虫措施&#xff0c;包括动态Token和用户行为分析等。 分析与挑战 动态Token&#xff1a;…

github拉取的项目添加至自己的仓库

想把GitHub的开源项目拉到本地进行二开&#xff0c;研究了一下上传到gitee的步骤&#xff1a; 步骤 gitee新建仓库&#xff0c;仓库名与本地文件夹的名称一致&#xff0c;建好后gitee的页面也会有显示git命令 打开项目目录&#xff0c;右键打开git bash&#xff08;或者在git…