redisson的延时队列机制简述

概述

业务中经常会遇到一些延迟执行的需求;通常想到的都是rabbitmq或者rocketmq的延迟消息;
但是系统中不一定集成了mq,但为了控制分布式下的并发,一般redis都是有集成的;
rediskey过期监听那个时间不准确,在集群环境下节点挂了也容易丢失;

那么用redisson的延迟队列,正好可以用来解决轻量级的延时消息;
简单的来说就是消费者生产了一个消息任务,塞到ZSet里(用当前时间戳+延迟时间作为分数),等时间到了,就会放到任务List中,然后消费者真正去执行任务都是从任务List中获取任务;

redisson中的消费者并不是一直轮询获取任务;而是有具体时间的延迟任务,时间到了去任务队列中获取任务;

注意点,在消费者监听处如果使用thread相关操作因为redisson的默认线程nameredisson-netty会抛异常,我的处理方式是把相关操作都放到自己的线程池中操作.

官方解释是在netty线程中调用同步方法可能会导致超时;
issue:https://github.com/redisson/redisson/issues/3549

异常见源码

org.redisson.command.CommandAsyncService.get(org.redisson.api.RFuture<V>)

版本
redissonredisson-spring-boot-starter-3.17.6.jar
redis:6.2.7

redisson延时任务机制简述

生产者先将任务pushdelay_queue_timeout等待队列中,延迟时间到了,消费者会把任务从timeout队列挪到SANYOU任务队列中(消费者实际获取任务的队列),然后消费者就能拿到最终要执行的任务了;

这里具体要说的就是客户端通知和获取机制;
消费者在启动时通常都会去get一下队列,达到订阅队列的目的;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("SANYOU");
RDelayedQueue<String> delayQueue = redissonClient.getDelayedQueue(blockingQueue);

这样做的目的:
消费者订阅队列,从delay_queue_timeout等待延迟队列中将已经到达时间的任务挪到真正的任务List队列中,然后再将delay_queue_timeout队列中第一个(也就是第一个要执行的)的任务的时间拿到,用这个时间开启一个延迟任务,时间到了之后,会发布一个消息到时间通知channel中;然后客户端监听到这个channel中的消息后,会再次重复上述步骤,让delay_queue_timeout中的任务,可以都放到真正的任务List队列中;

这样有一个好处就是不用一直while扫描等待,客户端的延迟任务时间和delay_queue_timeout中的延迟时间是一样的,可以精准利用cpu,理论上是没有延迟的,但是实际消息数量大量增加,消费者消费比较慢,还是会造成延迟任务消费延迟;

另外由于客户端都是用lua脚本去redis的同一个List队列中获取任务,lua脚本在redis中都是原子任务,而且redis真正的操作是单线程的,所以不会存在任务广播情况(并发获取时,一个任务不会被多个消费者同时拿到);

捞一张图片
在这里插入图片描述

代码Demo


import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


@Slf4j
@Component
public class RedissonDelayQueueConfig implements InitializingBean {

    @Resource
    private RedissonClient redissonClient;

    //延时队列map
    private final Map<String, RDelayedQueue<DelayMessageDTO>> delayQueueMap = new ConcurrentHashMap<>(16);


    /**
     * 消费者初始化所有队列,订阅对应的队列,并开启第一个过期任务的过期时间对应的延迟任务
     */
    @PostConstruct
    public void reScheduleDelayedTasks() {
        DelayQueueEnum[] queueEnums = DelayQueueEnum.values();
        for (DelayQueueEnum queueEnum : queueEnums) {
            RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueEnum.getCode());
            RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        }
    }


    @Override
    public void afterPropertiesSet() {
        // 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumer
        DelayQueueEnum[] queueEnums = DelayQueueEnum.values();

        for (DelayQueueEnum queueEnum : queueEnums) {
            DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueEnum.getBeanName());
            if (delayQueueConsumer == null) {
                throw new ServiceException("queueName=" + queueEnum.getBeanName() + ",delayQueueConsumer=null,请检查配置...");
            }
            // Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,
            // 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。
            RBlockingQueue<DelayMessageDTO> rBlockingQueue = redissonClient.getBlockingDeque(queueEnum.getCode());

            //消费者初始化队列
            RDelayedQueue<DelayMessageDTO> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
            //set到map中方便获取
            delayQueueMap.put(queueEnum.getCode(), rDelayedQueue);
            // 订阅新元素的到来,调用的是takeAsync(),异步执行
            rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);
        }
    }

    public RedissonClient getRedissonClient() {
        return redissonClient;
    }

    public Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {
        return delayQueueMap;
    }
}








import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;


@Slf4j
@Component
public class DelayQueueUtil {

    private static RedissonDelayQueueConfig redissonDelayQueueConfig;

    @Resource
    public void setRedissonDelayQueueConfig(RedissonDelayQueueConfig redissonDelayQueueConfig) {
        DelayQueueUtil.redissonDelayQueueConfig = redissonDelayQueueConfig;
    }

    private static Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {
        if(null == redissonDelayQueueConfig) return Collections.emptyMap();
        return redissonDelayQueueConfig.getDelayQueueMap();
    }

    private static RedissonClient getRedissonClient() {
        if(null == redissonDelayQueueConfig) return null;
        return redissonDelayQueueConfig.getRedissonClient();
    }

    /**
     * 添加延迟消息
     */
    public static void addDelayMessage(DelayMessageDTO delayMessage) {
        log.info("delayMessage={}", delayMessage);

        Assert.isTrue(getDelayQueueMap().containsKey(delayMessage.getQueueName()), "队列不存在");

        delayMessage.setCreateTime(DateUtil.now());
        if(null == delayMessage.getTimeUnit()){
            delayMessage.setTimeUnit(TimeUnit.SECONDS);
        }

        RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());
        //移除相同的消息
        rDelayedQueue.remove(delayMessage);

        //添加消息
        rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit());
    }


    /**
     * 移除指定队列中的消息
     */
    public static void removeDelayMessage(DelayMessageDTO delayMessage) {
        log.info("取消:delayMessage={}", delayMessage);
        if (!getDelayQueueMap().containsKey(delayMessage.getQueueName())) {
            log.error("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName());
            return;
        }

        RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());
        rDelayedQueue.remove(delayMessage);
        removeDelayQueue(delayMessage);
    }


    /**
     * 从所有队列中删除消息
     */
    public static void removeDelayQueue(DelayMessageDTO value) {
        DelayQueueEnum[] queueEnums = DelayQueueEnum.values();
        for (DelayQueueEnum queueEnum : queueEnums) {
            RBlockingDeque<Object> blockingDeque = getRedissonClient().getBlockingDeque(queueEnum.getCode());
            RDelayedQueue<Object> delayedQueue = getRedissonClient().getDelayedQueue(blockingDeque);
            delayedQueue.remove(value);
        }
    }



}

参考了大佬的博文
https://lhalcyon.com/delay-task/index.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/335097.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C#,实用新型加强版的整数数组

疫苗要打加强针&#xff01;数组要用加强版&#xff01; 三连发 加强版整数数组源代码https://mp.csdn.net/mp_blog/creation/editor/124151056 加强版实数数组源代码https://mp.csdn.net/mp_blog/creation/editor/124151110 加强版泛型数组源代码https://mp.csdn.net/mp_bl…

这才是你应该了解的Redis数据结构!

Redis&#xff0c;作为一种高性能的内存数据库&#xff0c;支持多种数据结构&#xff0c;从简单的字符串到复杂的哈希表。在这篇博文中&#xff0c;我们将深入探讨Redis的一些主要数据结构&#xff0c;并通过详细的例子展示它们的使用。 1. 字符串 (String) 1.1 存储和获取 R…

k8s资源介绍

Kubernetes架构图 Kubernetes系统用于管理分布式节点集群中的微服务或容器化应用程序&#xff0c;并且其提供了零停机时间部署、自动回滚、缩放和容器的自愈&#xff08;其中包括自动配置、自动重启、自动复制的高弹性基础设施&#xff0c;以及容器的自动缩放等&#xff09;等…

模糊数学在处理激光雷达的不确定性和模糊性问题中的应用

模糊数学是一种用于处理不确定性和模糊性问题的数学工具&#xff0c;它可以帮助我们更好地处理激光雷达数据中的不确定性和模糊性。激光雷达是一种常用的传感器&#xff0c;用于测量目标物体的距离、速度和方向等信息。然而&#xff0c;在实际应用中&#xff0c;激光雷达所获取…

ITK + ANT,无法显示三维

背景&#xff1a;之前用ANT保存ima格式的数据&#xff0c;选择的是保存所有的序列 用python将dicom转为nii的格式&#xff0c; import nibabel as nib import torch"""不管是nii还是nii.gz都是二维的&#xff0c;为啥呢"""fobj nib.load("…

Linux编辑器---vim

目录 1、vim的基本概念 2正常/普通/命令模式(Normal mode) 2、1命令模式下一些命令&#xff08;不用进入插入模式&#xff09; 3插入模式(Insert mode) 4末行/底行模式(last line mode) 4、1底行模式下的一些命令 5、普通用户无法进行sudo提权的解决方案 6、vim配置问题 6、1配…

使用 Node 创建 Web 服务器

Node.js 提供了 http 模块&#xff0c;http 模块主要用于搭建 HTTP 服务端和客户端&#xff0c;使用 HTTP 服务器或客户端功能必须调用 http 模块&#xff0c;代码如下&#xff1a; var http require(http); 以下是演示一个最基本的 HTTP 服务器架构(使用 8080 端口)&#x…

100天精通鸿蒙从入门到跳槽——第8天:TypeScript 知识储备:泛型

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通Golang》…

学好UEFI,实现从工程师到架构师的跨越

学好UEFI&#xff0c; 实现从工程师到架构师的跨越 2024 / 01 / 19 统一可扩展固件接口 UEFI&#xff0c;是由英特尔、微软等众多全球知名 IT企业共同开发、管理与推进的全新一代 BIOS 体系规范&#xff0c;目前作为最先进最完善的固件架构&#xff0c;已占据绝大部分计算机市…

基于Redisson的RAtomicLong实现全局唯一工单号生成器

最近几年&#xff0c;我一直从事的是运营平台业务开发。每天&#xff0c;我们都需要处理大量的工单配置工作。为了生成工单号&#xff0c;我们建立了一张专用的数据库表&#xff0c;用于记录和生成工单号。每次创建工单时&#xff0c;我们会查询这张表&#xff0c;根据年份字段…

梁山泊国潮风礼盒,传承经典,贺礼新春

在春节来临之际&#xff0c;梁山泊隆重推出新年中国红礼盒酒&#xff0c;为您传递新年的祝福与关爱。这款酒以其独特的魅力&#xff0c;为您带来美好的祝愿和愉悦的享受。中国风国潮礼盒采用中国传统红色为主色调&#xff0c;象征着吉祥、喜庆和繁荣。红色的背景上&#xff0c;…

appium连接手机进行启动失败 ,怎么办 ?检查下这几个地方 。

在使用appium做app自动化&#xff0c;首先需要启动appium连接到手机&#xff0c;然后进行后续操作。 但是往往在启动的时候就会卡住&#xff0c;在点击start session后就会出现报错&#xff0c;具体如下图 &#xff1a; 那么&#xff0c;出现如上的情况该如何解决呢 &#xff1…

(蓝桥杯每日一题)love

问题描述 马上就要到七夕情人节了&#xff0c;小蓝在这天想要心爱得男神表白&#xff0c;于是她写下了一个长度为n仅由小写字母组成的字符串。 她想要使这个字符串有 1314个 love 子序列但是马虎的小蓝却忘记了当前已经有多少个子序列为 love。 请你帮小蓝计算出当前字符串有多…

挑战杯参赛总结-时间序列预测

参赛任务&#xff1a; 目标&#xff1a;针对中国各个市区的不同年份二氧化碳排放量&#xff0c;预测未来年份的二氧化碳排放量。 不同与之前我学习过的波士顿房价预测机器学习-波士顿房价预测-CSDN博客 房价预测是通过学习与房价有关的很多特征&#xff0c;训练出一个模型来预…

RabbitMQ-生产者可靠性

一、生产者重连 1、概念 由于网络波动导致客户端无法连接上MQ&#xff0c;这是可以开启MQ的失败后重连机制。 注意&#xff1a; 是连接失败的重试&#xff0c;而不是消息发送失败后的重试。 2、开启配置 spring:rabbitmq:template:retry:enabled: true # 是否启用重试机制ma…

[python语言]数据类型

目录 知识结构​编辑 复数类型 整数类型、浮点数类型 1、整型 2、浮点型 字符与字符串 1、转义字符 2、字符串的截取 3、字符串的拼接级连 4、字符串的格式化 1、format格式化 2、字符格式化 3、f标志位格式化--(推荐) 5、字符串的常用属性 1、对字符串做出判断…

Mat - 基本映像容器

目标 我们有多种方法可以从现实世界中获取数字图像&#xff1a;数码相机、扫描仪、计算机断层扫描和磁共振成像等等。在每种情况下&#xff0c;我们&#xff08;人类&#xff09;看到的都是图像。但是&#xff0c;当将其转换为我们的数字设备时&#xff0c;我们记录的是图像每…

设计模式篇章(4)——十一种行为型模式

这个设计模式主要思考的是如何分配对象的职责和将对象之间相互协作完成单个对象无法完成的任务&#xff0c;这个与结构型模式有点像&#xff0c;结构型可以理解为静态的组合&#xff0c;例如将不同的组件拼起来成为一个更大的组件&#xff1b;而行为型更是一种动态或者具有某个…

高级编程。JavaScript中有哪些类型转换机制?

一、概述 前面我们讲到&#xff0c;JS中有六种简单数据类型&#xff1a;undefined、null、boolean、string、number、symbol&#xff0c;以及引用类型&#xff1a;object 但是我们在声明的时候只有一种数据类型&#xff0c;只有到运行期间才会确定当前类型 let x y ? 1 : …

MCM备赛笔记——蒙特卡罗方法

Key Concept 蒙特卡罗方法&#xff08;Monte Carlo Method&#xff09;&#xff0c;也称为统计模拟方法&#xff0c;是一种基于概率和统计的数值计算方法。该方法使用随机数&#xff08;或更常见的伪随机数&#xff09;来解决可能非常复杂的数学或物理问题。蒙特卡罗方法广泛应…