spring-boot 整合 redisson 实现延时队列(文末有彩蛋)

应用场景

通常在一些需要经历一段时间或者到达某个指定时间节点才会执行的功能,比如以下这些场景:

  • 订单超时提醒
  • 收货自动确认
  • 会议提醒
  • 代办事项提醒

为什么使用延时队列

对于数据量小且实时性要求不高的需求来说,最简单的方法就是定时扫描数据库。

但是,当数量达到数百万、上千万级别且时,定时扫库就显得非常低效且消耗资源,

甚至有些时间间隔小实时性要求高的情况,上一次扫描还没结束,下一次就又开始了,

这时候如果使用延时队列就会比较合适

延时队列的几种方式:

  • Quartz 定时任务实现扫库
  • DelayQueue JDK中提供了一组实现延迟队列的API
  • Redis sorted set
  • Redis 过期键监听回调
  • RabbitMQ 死信队列
  • RabbitMQ 基于插件实现延迟队列
  • Wheel 时间轮训算法

Redisson 实现延时队列

顾名思义 Redis son 就是 Redis 的儿子,举个栗子先:

1.引入 pom

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>${lastest.version}</version>
</dependency>

2.封装一个 RedissonQueue 类

@Service
public class RedissonQueue {

    public static final String QUEUE = "delayQueue";

    // 默认超时时间,30秒
    public static final Integer DEFAULT_TIMEOUT = 30;

    @Resource
    private RedissonClient redissonClient;

    // 加入任务并设置到期时间
    public void offer(String taskId, Integer timeout) {
        RDelayedQueue<String> delayedQueue = delayedQueue();
        delayedQueue.offer(taskId, Objects.isNull(timeout) ? DEFAULT_TIMEOUT : timeout, TimeUnit.SECONDS);
    }

    // 移除任务
    public void remove(String taskId) {
        RDelayedQueue<String> delayedQueue = delayedQueue();
        delayedQueue.removeIf(messageId -> messageId.equals(taskId));
    }

    // 任务列表
    public RDelayedQueue<String> delayedQueue() {
        RBlockingDeque<String> blockingDeque = blockingDeque();
        return redissonClient.getDelayedQueue(blockingDeque);
    }

    public RBlockingDeque<String> blockingDeque() {
        return redissonClient.getBlockingDeque(QUEUE);
    }

    public boolean isShutdown() {
        return redissonClient.isShutdown();
    }

    public void shutdown() {
        redissonClient.shutdown();
    }

}

3.交给 Spring 管理

@Slf4j
@Service
public class RedissonService implements ApplicationRunner {

    @Resource
    private RedissonQueue redissonQueue;

    @Resource(name = "threadPoolTaskExecutor")
    private ThreadPoolTaskExecutor executor;

    @Override
    public void run(ApplicationArguments args) {
        RBlockingDeque<String> blockingDeque = redissonQueue.blockingDeque();
        executor.execute(() -> {
            while (true) {
                if (redissonQueue.isShutdown()) {
                    return;
                } else {
                    String messageId = null;
                    try {
                        messageId = blockingDeque.take();
                    } catch (InterruptedException e) {
                        log.warn("RedissonConsumer error:{}", e.getMessage());
                    }
                    if (!Objects.isNull(messageId) && !messageId.isEmpty()) {
                        log.warn("timeout messageId : {}", messageId);
                    }
                }
            }
        });

    }

    // 初始化,启动服务就执行一次
    @PostConstruct
    public void init() {
        redissonQueue.delayedQueue();
    }

    @PreDestroy
    public void shutdown() {
        redissonQueue.shutdown();
    }

}

4.测试接口

@Operation(summary = "添加任务", description = "添加任务")
@PostMapping
public ResponseEntity<?> add(@RequestParam(value = "taskId", required = false) String taskId,
                             @RequestParam(value = "timeout", required = false) Integer timeout) {
    taskId = StringUtils.isEmpty(taskId) ? String.valueOf(snowflake.nextId()) : taskId;
    redissonQueue.offer(taskId, timeout);
    return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}

@Operation(summary = "移除任务", description = "移除任务")
@DeleteMapping(value = "/{taskId}")
public ResponseEntity<?> remove(@PathVariable("taskId") String taskId) {
    redissonQueue.remove(taskId);
    return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}

5.测试结果

添加10个任务

在这里插入图片描述

删除第1个任务

在这里插入图片描述

可以看到第一个任务删除后没有被执行(没有设置到期时间,默认为30秒到期)

在这里插入图片描述

实现原理

  • redisson_delay_queue_timeout:delayQueue,sorted set 数据类型,存放所有延迟任务,按延迟任务的到期时间戳(提交任务时间戳 +
    延迟时间)排序,所以列表最前面第一个元素就是整个延迟队列中最早被执行的任务。
  • redisson_delay_queue:delayQueue,list 数据类型,也是存放所有任务。
  • delayQueue,list 数据类型,被称为目标队列,这个里面存放的任务都是已经到延迟时间的,可以被消费者获取的任务,所以上面示例中
    RBlockingQueue 的 take 方法是从此目标队列中获取任务的。
  • redisson_delay_queue_channel:delayQueue,是一个 channel,用来通知客户端开启一个延迟任务
  • 生产者提交任务时将任务放到 redisson_delay_queue_timeout:delayQueue 中,提交任务的时间戳+延迟时间
  • 客户端会有一个延迟任务,这个延迟任务会向 Redis Server 发送一段 lua 脚本,Redis 执行 lua 脚本中的命令,此操作是原子性的

lua 脚本主要干两件事

  • 将到了延迟时间的任务从 redisson_delay_queue_timeout:delayQueue 中移除,存到 delayQueue 这个目标队列
  • 获取到 redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务的到期时间戳,发布到 redisson_delay_queue_channel:
    delayQueue channel 中

当客户端监听到 redisson_delay_queue_channel:delayQueue 这个 channel 的消息时,会再次提交一个客户端延迟任务,延迟时间就是消息(最早到期时间任务的到期时间戳)当前时间戳
这个时间其实也就是 redisson_delay_queue_channel:delayQueue 中最早到期时间的任务的剩余的延迟时间。
一旦时间来到最早到期时间任务的到期时间戳,redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务已经到期,客户端的延迟任务也同时到期,
于是开始执行 lua 脚本操作,及时将到期任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期任务的到期时间戳到 channel
中,
如此循环运行下去,保证 redisson_delay_queue_timeout:delayQueue 中到期数据能及时放到目标队列中。
这里存在一个特殊情况,需要项目启动时就执行一次延时队列。因为由于没有客户端延迟任务的执行,
可能会出现 redisson_delay_queue_timeout:delayQueue 队列中有到期但是没有被放到目标队列的可能,启动就执行一次是为了保证到期的数据能被及时放到目标队列中。

结论

  • Redisson 方案理论上没有延迟,但当消息数量剧增,消费者消费缓慢这种情况下,可能会导致延迟任务消费的延迟。

  • 消息丢失问题 Redisson 方案最大程度上减轻消息丢失的可能性,因为所有任务都是存在 list 和 sorted set 两种数据类型中,Redis
    有持久化机制。除非整个 redis 集群宕机,可能丢失一小部分数据。

  • 广播任务问题,是不会出现的,因为每个客户端都是从同一个目标队列中获取任务。

Redisson 这种实现方案是比较合适且靠谱的,一般中小型项目建议用 Redisson 实现延迟队列,规模较大的项目直接上 MQ。

整合DEMO仓库地址

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/843604.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Odoo17架构概述

多层架构 Odoo遵循多层架构&#xff0c;这意味着演示&#xff0c;业务逻辑和数据存储是分开的。更具体地说&#xff0c;它使用三层架构。 UI展示层 UI表示层是 HTML5、JavaScript 和 CSS 的组合。 应用程序的最顶层是用户界面。界面的主要功能是将任务和结果转换为用户可以理…

MacBook电脑远程连接Linux系统的服务器方法

一、问题简介 Windows 操作系统的电脑可使用Xshell等功能强大的远程连接软件。通过连接软件&#xff0c;用户可以在一台电脑上访问并控制另一台远程计算机。这对于远程技术支持、远程办公等场景非常有用。但是MacBook电脑的macOS无法使用Xshell。 在Mac上远程连接到Windows服…

解决npm install(‘proxy‘ config is set properly. See: ‘npm help config‘)失败问题

摘要 重装电脑系统后&#xff0c;使用npm install初始化项目依赖失败了&#xff0c;错误提示&#xff1a;‘proxy’ config is set properly…&#xff0c;具体的错误提示如下图所示&#xff1a; 解决方案 经过报错信息查询解决办法&#xff0c;最终找到了两个比较好的方案&a…

最新可用度盘不限速后台系统源码_去授权开心版

某宝同款度盘不限速后台系统源码&#xff0c;验证已被我去除&#xff0c;两个后端系统&#xff0c;账号和卡密系统 第一步安装宝塔&#xff0c;部署卡密系统&#xff0c;需要环境php7.4 把源码丢进去&#xff0c;设置php7.4&#xff0c;和伪静态为thinkphp直接访问安装就行 …

MLIR的TOY教程学习笔记

MLIR TOY Language 文章目录 MLIR TOY Language如何编译该项目ch1: MLIR 前端IR解析ch2: 定义方言和算子 (ODS)1. 定义方言2. 定义OP3. OP相关操作4. 定义OP ODS (Operation Definition Specification)1. 基本定义2. 添加文档3. 验证OP4. 新增构造函数5. 定义打印OP的格式 ch3:…

简单工厂、工厂方法与抽象工厂之间的区别

简单工厂、工厂方法与抽象工厂之间的区别 1、简单工厂&#xff08;Simple Factory&#xff09;1.1 定义1.2 特点1.3 示例场景 2、工厂方法&#xff08;Factory Method&#xff09;2.1 定义2.2 特点2.3 示例场景 3、抽象工厂&#xff08;Abstract Factory&#xff09;3.1 定义3.…

视频共享融合赋能平台LntonCVS视频监控管理平台视频云解决方案

LntonCVS是基于国家标准GB28181协议开发的视频监控与云服务平台&#xff0c;支持多设备同时接入。该平台能够处理和分发多种视频流格式&#xff0c;包括RTSP、RTMP、FLV、HLS和WebRTC。主要功能包括视频直播监控、云端录像与存储、检索回放、智能告警、语音对讲和平台级联&…

buuctf web 第五到八题

[ACTF2020 新生赛]Exec 这里属实有点没想到了&#xff0c;以为要弹shell&#xff0c;结果不用 127.0.0.1;ls /PING 127.0.0.1 (127.0.0.1): 56 data bytes bin dev etc flag home lib media mnt opt proc root run sbin srv sys tmp usr var127.0.0.1;tac /f*[GXYCTF2019]Pin…

全球大模型将往何处去?

在这个信息爆炸的时代&#xff0c;我们如同站在知识的海洋边&#xff0c;渴望着能够驾驭帆船&#xff0c;探索那些深邃的奥秘。 而今天&#xff0c;我们将启航&#xff0c;透过一份精心编制的报告&#xff0c;去洞察全球大模型的未来趋势&#xff0c;探索人工智能的无限可能。…

C++初学者指南-5.标准库(第一部分)--标准库查询存在算法

C初学者指南-5.标准库(第一部分)–标准库查询存在算法 文章目录 C初学者指南-5.标准库(第一部分)--标准库查询存在算法any_of / all_of / none_ofcountcount_if相关内容 不熟悉 C 的标准库算法&#xff1f; ⇒ 简介 any_of / all_of / none_of 如果在输入范围(所有元素…

桌面小宠物发布一周,第一次以独立开发者的身份赚到了100块

收入数据(AppStore一周收入统计) AppStore付费工具榜第七 应用简介 桌面新宠(NewPet)&#xff0c;是我耗时半年开发的一款桌面宠物。我是被 QQ 宠物影响的那批人&#xff0c;上学时天天给 QQ 宠物喂食&#xff0c;很可惜它现在不在了。所以&#xff0c;我开发的初衷是想要在电…

华为HCIP Datacom H12-821 卷42

42.填空题 如图所示&#xff0c;MSTP网络中SW1为总根&#xff0c;请将以下交换机与IST域根和主桥配对。 参考答案&#xff1a;主桥1468 既是IST域根又是主桥468 既不是又不是就是25 解析&#xff1a; 主桥1468 既是IST域根又是主桥468 既不是又不是就是25 43.填空题 网络有…

通过HTML/CSS 实现各类进度条的功能。

需求&#xff1a;我们在开发中会遇到使用各式各样的进度条&#xff0c;因为当前插件里面进度条各式各样的&#xff0c;为了方便我们定制化的开发和方便修改样式&#xff0c;我们这里使用HTML和CSS样式来进行开发进度条功能。 通过本文学习我们会明白如何使用 HTML/CSS 创建各种…

【YOLOv10[基础]】热力图可视化实践② | 支持图像热力图 | 论文必备

本文将进行添加YOLOv10版本的热力图可视化功能的实践,支持图像热力图可视化。 目录 一 热力图可视化实践① 1 代码 2 效果图 二 报错处理 在论文中经常可以见到提取的物体特征以热力图的形式展示出来,将特征图以热力图的方式进行可视化在深度学习中有以下的原因: ①强调…

0711springNews新闻系统管理 实现多级评论

0611springmvc新闻系统管理-CSDN博客 0711springNews新闻系统管理项目包 实现多级评论-CSDN博客 数据库字段 需要添加父节点id&#xff0c;通过该字段实现父评论和子评论的关联关系。 对象属性 实现链表&#xff0c;通过一个父评论可以找到它对应的所有子孙评论。 业务层 实现…

像乌鸦一样思考01_玻璃杯罩住的高低蜡烛谁先灭?

大致场景 大致原理 蜡烛燃烧需要氧气&#xff0c;同时会释放二氧化碳但蜡烛燃烧特定场景下&#xff0c;释放的二氧化碳温度很高&#xff0c;密度比空气底所以就会有上传的趋势(水高温变成水蒸气也是会往上升的)持续的燃烧&#xff0c;二氧化碳不停的往上聚集&#xff0c;最终高…

全国区块链职业技能大赛国赛考题前端功能开发

任务3-1:区块链应用前端功能开发 1.请基于前端系统的开发模板,在登录组件login.js、组件管理文件components.js中添加对应的逻辑代码,实现对前端的角色选择功能,并测试功能完整性,示例页面如下: 具体要求如下: (1)有明确的提示,提示用户选择角色; (2)用户可看…

excel系列(三) - 利用 easyexcel 快速实现 excel 文件导入导出

一、介绍 在上篇文章中&#xff0c;我们介绍了 easypoi 工具实现 excel 文件的导入导出。 本篇我们继续深入介绍另一款更优秀的 excel 工具库&#xff1a;easyexcel 。 二、easyexcel easyexcel 是阿里巴巴开源的一款 excel 解析工具&#xff0c;底层逻辑也是基于 apache p…

自动化产线 搭配数据采集监控平台 创新与突破

自动化产线在现在的各行各业中应用广泛&#xff0c;已经是现在的生产趋势&#xff0c;不同的自动化生产设备充斥在各行各业中&#xff0c;自动化的设备会产生很多的数据&#xff0c;这些数据如何更科学化的管理&#xff0c;更优质的利用&#xff0c;就需要数据采集监控平台来完…

【Vue】深入了解 Vue 的 DOM 操作:从基本渲染到高级操作的全面指南

文章目录 一、Vue 中的基本 DOM 渲染1. 响应式数据2. 虚拟 DOM 二、数据绑定与指令1. v-bind2. v-model3. v-show 与 v-if4. v-for 三、与 DOM 相关的生命周期钩子1. mounted 钩子2. updated 钩子 四、动态样式与类1. 动态样式2. 动态类 五、Vue 3 中的新的 DOM 操作 API1. ref…