学会RabbitMQ的延迟队列,提高消息处理效率

系列文章目录

手把手教你,本地RabbitMQ服务搭建(windows)
消息队列选型——为什么选择RabbitMQ
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ 能保证消息可靠性吗
推或拉? RabbitMQ 消费模式该如何选择
死信是什么,如何运用RabbitMQ的死信机制?
真的好用吗?鲜有人提的 RabbitMQ-RPC模式



在这里插入图片描述
前面我们讲到了RabbitMQ的死信队列,其实除了死信队列,RabbitMQ还有一个常用的延迟队列设计。今天,我们就来说一下这个延迟队列

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 RabbitMQ ,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis kafka docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是延迟队列?

延迟队列指的是当我们将消息发送到RabbitMQ时,可以指定消息的有效期或者消息需要在未来某个时间点才能被消费。这种消息被称为“延迟消息”。因此,RabbitMQ支持通过延迟队列来实现延迟消息的发送和消费。

二、延迟队列的实现

延迟队列的实现原理其实就是将消息放入到一个普通的队列中,只不过这个队列有一个特殊的属性:消息的消费被延迟一段时间。这个延迟时间可以是任意的,也可以是固定的。当消息进入队列时,会有一个定时器在计时,当计时器到达设定的时间时,消息会被转移至消费队列等待被消费。

在RabbitMQ中,延迟队列的实现有两种方式:一种是通过x-delayed-message插件实现;另一种是通过TTL(Time To Live)和死信队列实现。

1. x-delayed-message插件

x-delayed-message插件可以让RabbitMQ支持延迟消息功能,它是一个非官方插件,需要自行下载并安装。其源码地址如下:github地址 或 gitee地址;如果你是从笔者之前的安装博客 手把手教你,本地RabbitMQ服务搭建(windows) 过来的,那么你用的可能是RabbitMQ V3.12,可以直接下载我上传的资源 3.12-插件

首先,需要在RabbitMQ服务器上安装x-delayed-message插件。把上述的插件复制进我们RabbitMQ的服务插件目录下
在这里插入图片描述
然后执行插件的启用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 即可
然后,在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, arguments);
channel.queueDeclare("delayed_queue", true, false, false, null);
channel.queueBind("delayed_queue", "delayed_exchange", "delayed_routing_key");

不难发现,此时其实是交换机在做延迟,
在这里插入图片描述

当然,除了交换机的设置,在发送消息时,还需要在消息头部设置x-delay属性,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
builder.headers(new HashMap<String, Object>(){{put("x-delay", 5000);}});
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("delayed_exchange", "delayed_routing_key", properties, message.getBytes());

2. TTL + 死信队列

此种方式的原理其实我们在学习死信队列的时候应该就察觉到了,就是利用消息超时(TTL)后会转入死信交换机的机制,其模型如下:
在这里插入图片描述

首先,需要在Java代码中定义queue、exchange和connectionFactory,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST_NAME);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setPort(PORT);

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
arguments.put("x-dead-letter-routing-key", "dead_letter_routing_key");
arguments.put("x-message-ttl", 5000);

channel.exchangeDeclare("normal_exchange", "direct", true, false, null);
channel.exchangeDeclare("dead_letter_exchange", "direct", true, false, null);
channel.queueDeclare("normal_queue", true, false, false, arguments);
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("normal_queue", "normal_exchange", "normal_routing_key");
channel.queueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");

在发送消息时,只需要将消息发送到normal_exchange交换机下,代码如下:

channel.basicPublish("normal_exchange", "normal_routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

三、手写延时队列

当然,除了RabbitMQ,实现延时队列的方式还有很多,我们甚至可以自己实现,本节,我们就尝试自己写个延时队列

1. 时间轮概念

在关于计时或定时的设计里,时间轮是一种用于处理定时任务的数据结构。它通过将时间划分为一系列的时刻,每个时刻对应一个槽,将任务存储在相应的槽中
在这里插入图片描述
时间轮通常包含多个槽和指针,其中指针指向当前时刻对应的槽,每过单位时间,指针就指向下一个槽,这样任务调度时按照指针的移动依次执行槽中的任务
在这里插入图片描述

2. JAVA演示

我们先使用JUC相关内容实现一个时间轮

import java.util.*;
import java.util.concurrent.*;

class TimeWheel {
    private int size;
    private int currentIndex;
    private List<BlockingQueue<Task>> slots;
    private Executor executor;

    public TimeWheel(int size, Executor executor) {
        this.size = size;
        this.slots = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            slots.add(new LinkedBlockingQueue<>());
        }
        this.executor = executor;
    }

    public void addTask(Task task) {
        int expireIndex = (int)(currentIndex + task.getDelay() / 1000) % size;
        slots.get(expireIndex).add(task);
    }

    public void start() {
        new Thread(() -> {
            while (true) {
                currentIndex = (currentIndex + 1) % size;
                BlockingQueue<Task> currentSlot = slots.get(currentIndex);
                List<Task> tasks = new ArrayList<>();
                currentSlot.drainTo(tasks);
                for (Task task : tasks) {
                    executor.execute(task);
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

class Task implements Runnable {
    private long delay; // 延迟时间,单位毫秒
    private Runnable task; // 任务

    public Task(long delay, Runnable task) {
        this.delay = delay;
        this.task = task;
    }

    public long getDelay() {
        return delay;
    }

    @Override
    public void run() {
        task.run();
    }
}


我们可以使用main方法来尝试验证这个时间轮效果:

    public static void main(String[] args) {
        TimeWheel timeWheel = new TimeWheel(60 * 60, Executors.newFixedThreadPool(10));
        // 添加任务,延迟5秒执行
        timeWheel.addTask(new Task(5000, () -> System.out.println("Task 1 executed!")));
        // 添加任务,延迟10秒执行
        timeWheel.addTask(new Task(10000, () -> System.out.println("Task 2 executed!")));
        // 启动时间轮
        timeWheel.start();
    }

在这里插入图片描述

当然,以上代码只是一个简化的实现,实际情况中需要考虑任务执行时间和时间轮的精度等问题。

四、应用场景与注意事项

1. 应用场景

  1. 红包预告
    在现在的抢红包的场景下,当用户发起红包活动后,可能不希望立即开抢,而是设定在一段时间后开启。那么我们可以将将红包信息发送到一个延迟队列中,一定时间后,系统会自动激活红包,此时用户才可以真正抢红包
    在这里插入图片描述

  2. 订单系统
    在订单系统中,有一些订单需要在未来某个时间点才能被处理。例如,有些订单需要在一定的时间之后才能发货或者确认收货。这时候,我们可以将这些订单放到延迟队列中,当时间到达时再进行处理。

  3. 优惠券系统
    在优惠券系统中,有一些优惠券需要在未来某个时间点才能使用。这时候,我们可以将这些优惠券放到延迟队列中,当时间到达时再进行激活。

2. 注意事项

  1. 延迟队列不要使用太多
    使用延迟队列可以在一定程度上减少系统的负载,但是使用过多的延迟队列会导致系统变得更加复杂,维护起来也更加困难。

  2. 延迟队列可能会导致消息丢失
    在RabbitMQ中,当一个带有TTL消息被发送到队列中时,如果队列中的消息太多,或者队列的消费者速度太慢,就会导致消息失效,如果没有使用死信机制,消息就会被丢失。为了避免这种情况发生,我们需要对队列进行监控,及时发现问题并进行处理。

  3. 设置合适的延迟时间
    在使用延迟队列时,需要根据实际需求设置合适的延迟时间。如果延迟时间太短,可能会导致消息延迟效果不明显;如果延迟时间太长,可能会导致系统累积大量的消息,导致负载过高。

总结

RabbitMQ的延迟队列是一种非常实用的特性,可以帮助我们实现定时任务、限流、削峰等功能。但是,在使用延迟队列时,需要谨慎对待,根据实际需求设置合适的延迟时间,并及时监控队列中的消息,避免出现消息丢失的情况。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/65657.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

滑动窗口(全面清晰/Java)

数组模拟单调队列 分析 以k3举例&#xff1a; (1)利用单调队列的性质&#xff1a; <1>最小值&#xff1a;确保队列单调递增&#xff0c;处理后&#xff0c;队头即是最小值。 <2>最大值&#xff1a;确保队列单调递减&#xff0c;处理后&#xff0c;队头即是最大值…

分享一个计算器

先看效果&#xff1a; 再看代码&#xff08;查看更多&#xff09;&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>计算器</title><style>* {box-sizing: border-box;}body…

Java分布式微服务1——注册中心(Eureka/Nacos)

文章目录 基础知识注册中心Eureka注册中心与Ribbon负载均衡1、Eureka注册中心2、Eureka的搭建3、Eureka服务注册4、复制服务实例5、拉取服务6、Ribbon负载均衡的流程及Eureka规则调整&#xff1a;7、Ribbon负载均衡饥饿加载 Nacos注册中心1、服务端Nacos安装与启动2、客户端Nac…

管理类联考——逻辑——论证逻辑——汇总篇——目录+提炼

文章目录 一、削弱没有特点的削弱方法关系的削弱必要方法的削弱因果推理的削弱果因推理的削弱概念跳跃的削弱数量比例的削弱比例因果的削弱有效提醒 二、支持方法关系的支持必要方法的支持因果推理的支持果因推理的支持概念跳跃的支持数量比例的支持比例因果的支持 三、假设方法…

IntelliJ IDEA Bookmark使用

1 增加 右键行号栏 2 查看 从favorite这里查看 参考IntelliJ IDEA 小技巧&#xff1a;Bookmark(书签)的使用_bookmark idea 使用_大唐冠军侯的博客-CSDN博客

Windows11右键菜单

刚开始使用Windows11时&#xff0c;新的右键菜单用起来很不习惯。 记录一下修改和恢复Windows11的右键菜单的方法。 1.Win11切换到旧版右键菜单&#xff1a; 方法&#xff1a;WinR打开CMD&#xff0c;运行下面的命令行 添加注册列表重启Windows资源管理器 reg add "HKC…

设备管理系统:提升生产制造企业效率与竞争力的关键

在现代生产制造行业中&#xff0c;设备是企业生产力的核心。有效管理和维护设备对于提高生产效率、降低成本、确保产品质量至关重要。为了满足这些需求&#xff0c;越来越多的生产制造企业开始采用设备管理系统。本文将探讨设备管理系统的重要性以及它对企业的益处。 设备管理…

Qt6之QStackedWidget——Qt仿ToDesk(2)

一、 QStackedWidget概述 QStackedWidget也叫堆栈窗体类&#xff0c;它继承于QFrame&#xff0c;主要与QListWidget等结合使用&#xff0c;实现“一个界面多个页面切换”。 二、QStackedWidget示例 如下图&#xff0c;当点击左边 QListWidget里的菜单时&#xff0c;右边跟随切…

深入学习JVM —— GC垃圾回收机制

前言 前面荔枝已经梳理了有关JVM的体系结构和类加载机制&#xff0c;也详细地介绍了JVM在类加载时的双亲委派模型&#xff0c;而在这篇文章中荔枝将会比较详细地梳理有关JVM学习的另一大重点——GC垃圾回收机制的相关知识&#xff0c;重点了解的比如对象可达性的判断、四种回收…

Prometheus技术文档--基本安装-docker安装并挂载数据卷-《十分钟搭建》

一、查看可安装的版本 docker search prom/prometheus 二、拉取镜像 docker pull prom/prometheus 三、查看镜像 docker images 四、书写配置文件-以及创建挂载目录 宿主机挂载目录位置&#xff1a; 以及准备对应的挂载目录&#xff1a; /usr/local/docker/promethues/se…

linux之find命令

概览 Linux下find命令在目录结构中搜索文件&#xff0c;并执行指定的操作。Linux下find命令提供了相当多的查找条件&#xff0c;功能很强大。由于find具有强大的功能&#xff0c;所以它的选项也很多&#xff0c;其中大部分选项都值得我们花时间来了解一下。即使系统中含有网络…

ffplay数据结构分析(一)

本文为相关课程的学习记录&#xff0c;相关分析均来源于课程的讲解&#xff0c;主要学习音视频相关的操作&#xff0c;对字幕的处理不做分析 下面我们对ffplay的相关数据结构进行分析&#xff0c;本章主要是对PacketQueue的讲解 struct MyAVPacketList和PacketQueue队列 ffp…

常量池-JVM(十九)

上篇文章说gc日志以及arthas。 Arthas & GC日志-JVM&#xff08;十八&#xff09; 一、常量池 常量池主要放两大类&#xff1a;字面量和符号引用。 字面量就是由字母、数字等构成的字符串或者数值常量。 符号引用主要包含三类常量。 类和接口的全限定名。字段的名称和…

【毕业项目】自主设计HTTP

博客介绍&#xff1a;运用之前学过的各种知识 自己独立做出一个HTTP服务器 自主设计WEB服务器 背景目标描述技术特点项目定位开发环境WWW介绍 网络协议栈介绍网络协议栈整体网络协议栈细节与http相关的重要协议 HTTP背景知识补充特点uri & url & urn网址url HTTP请求和…

python的virtualenv虚拟环境无法激活activate

目录 问题描述&#xff1a; 解决办法&#xff1a; 解决结果&#xff1a; 问题描述&#xff1a; PS D:\pythonProject\pythonProject\DisplayToolLibs\venv\Scripts> .\activate .\activate : 无法加载文件 D:\pythonProject\pythonProject\DisplayToolLibs\venv\Scripts\…

form-create-designer整合element-plus使用方法

最近在使用form-create-designer生成表单的时候遇到了很多问题和各种报错&#xff0c;按照官方文档的方法一步步来做&#xff0c;发现行不通&#xff0c;后来经过不断尝试&#xff0c;终于找到了使用方法&#xff0c;这里做一下总结。 1、安装所需的依赖包 npm install eleme…

Redhat Linux 安装MySQL安装手册

Redhat安装MySQL安装手册 1 下载2 上传服务器、解压并安装3 安装安装过程1&#xff1a;MySQL-shared-5.6.51-1.el7.x86_64.rpm安装过程2&#xff1a;MySQL-shared-compat-5.6.51-1.el7.x86_64.rpm安装过程3&#xff1a;MySQL-server-5.6.51-1.el7.x86_64.rpm安装过程4&#xff…

AWS-自定义ami的S3存取使用

需要提前配置好aws-cli哈 对应的区域 要统一 示例&#xff1a;即AWS-CLI 和 EC2、AMI、S3以上资源均要使用同已区域&#xff0c;以下拿新加坡举例 1.新建自定义AMI 2.查看ami状态 确认是可用状态&#xff0c;才能开始操作 3.aws-cli 开始存入s3 只能使用桶的根目录 开始上…

【golang】工作区与GOPATH

在学习go语言时&#xff0c;我们会从官网下载go语言的二进制包&#xff0c;然后解压并安装到某个目录&#xff0c;最后会配置环境变量&#xff0c;通过输入命令go version来验证是否安装成功。 配置了path环境后&#xff0c;我们还需要再配置3个环境变量&#xff0c;GOROOT、G…

XML(eXtensible Markup Language)

目录 为什么需要XML? 一 XML语法 1.文档声明 2.元素 语法: 3.属性 4.注释 5.CDATA节 二 树结构 三 转义字符 四 DOM4J 1.XML解析技术 2.dom4j介绍 3.dom4j基本使用 XML 指可扩展标记语言&#xff08;eXtensible Markup Language&#xff09;。 XML 被设计用来传…