redis — 基于Spring Boot实现redis延迟队列

1. 业务场景

延时队列场景在我们日常业务开发中经常遇到,它是一种特殊类型的消息队列,它允许把消息发送到队列中,但不立即投递给消费者,而是在一定时间后再将消息投递给消费者。延迟队列的常见使用场景有以下几种:

  • 在各种购物平台上下单,订单超过30分钟未支付,自动关闭。
  • 订单完成后, 如果用户一直未评价, 5天后自动好评。
  • 会员到期前15天, 到期前3天分别发送短信提醒。
  • 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存?
  • 如何定期检查处于退款状态的订单是否已经退款成功?

2. Redis延迟队列实现原理

目前延迟队列的类型主要实现有:

  • 基于消息的延迟:指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,或者定义时间轮,新消息落在指定位置;
  • 基于队列的延迟: 设置不同延迟级别的队列,比如5s、1min、30mins、1h等,每个队列中消息的延迟时间都是相同的。

基于第一种不少组件都有实现方案,比如redis的sortset间接实现,kafka内部时间轮,rabbitMQ可安装插件实现。第一种实时性高,不过主观看会比较依赖组件本身,但自己实现就得考虑持久化、高可用等问题,建议直接使用组件本身;第二种方案可以基于组件去实现,通用性会高点,不过实时性不高,更适合用于重试业务场景。当然Redis本身并不支持延迟队列,所以我们只是实现一个比较简单的延迟队列,而且Redis不太适合大量消息堆积,所以只适合比较简单的场景,然假如我们对消息的实时性以及可靠性要求非常高,可能就需要使用MQ或kafka来实现了。

消息延迟流程图如下:
在这里插入图片描述
Redis延迟队列可以通过 zset 来实现,因为 zset 中有一个 score,我们可以把时间作为 score,将 value 存到 redis 中,然后通过轮询的方式,去不断的读取消息出来,整体思路为:

  1. 消息体设置有效期,设置好score,然后放入zset中
  2. 通过排名拉取消息
  3. 有效期到了,就把当前消息从zset中移除

zadd命令

使用方式:ZADD key score member [[score member][score member] …]
将一个或多个 member 元素及其 score 值加入到有序集 key 当中。如果 key 不存在,则创建一个空的有序集并执行 ZADD 操作。如果某个 member 已经是有序集的成员,那么更新这个 member 的 score 值,并通过重新插入这个 member 元素,来保证该 member 在正确的位置上。score 值可以是整数值或双精度浮点数。

ZRANGEBYSCORE命令

使用方式:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

  1. 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。
  2. 具有相同 score 值的成员按字典序来排列
  3. 可选的 LIMIT 参数指定返回结果的数量及区间(就像SQL中的 SELECT LIMIT offset, count ),注意当 offset 很大时,定位 offset 的操作可能需要遍历整个有序集,此过程最坏复杂度为 O(N) 时间。
  4. 可选的 WITHSCORES 参数决定结果集是单单返回有序集的成员,还是将有序集成员及其 score 值一起返回。

ZREM命令

使用方式:ZREM key member [member …]
移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。
当 key 存在但不是有序集类型时,返回一个错误。

3. 基于springboot实现redis延迟队列

3.1 引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>${version}</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>${version}</version>
</dependency>

3.2 redis基础方法

定义RedisService基础服务方法,本次案例只涉及到以下三个基础方法:

    /**
     * 添加 ZSet 元素
     *
     * @param key
     * @param value
     * @param score
     */
    @Override
    public boolean add(String key, Object value, double score) {
        return redisTemplate.opsForZSet().add(key, value, score);
    }
    
    /**
     * 返回 分数范围内 指定 count 数量的元素集合, 并且从 offset 下标开始(从小到大,不带分数的集合)
     *
     * @param key
     * @param min
     * @param max
     * @param offset 从指定下标开始
     * @param count  输出指定元素数量
     * @return
     */
    @Override
    public Set<Object> rangeByScore(String key, double min, double max, long offset, long count) {
        return redisTemplate.opsForZSet().rangeByScore(key, min, max, offset, count);
    }

    /**
     * Zset 删除一个或多个元素
     *
     * @param key
     * @param values
     * @return
     */
    @Override
    public Long removeZset(String key, Object... values) {
        return redisTemplate.opsForZSet().remove(key, values);
    }

3.3 定义Spring消息事件推送

@Getter
@ToString
public class DelayMsg extends ApplicationEvent {
    private String msg;
    private String topic;

    public DelayMsg(Object source, String msg, String topic) {
        super(source);
        this.msg = msg;
        this.topic = topic;
    }
}

3.4 消息获取

定义redis获取延迟队列消息方法:

/**
 * 从zset中取出score小于当前时间戳的数据
 *
 * @param key
 * @return
 */
public String getDelayOne(String key) {
    //先查后删,一次拿3个做备选,这样抢占到的概率就会高一些
    Set<Object> sets = redisService.rangeByScore(key, 0, System.currentTimeMillis(), 0, 3);
    if (CollectionUtils.isEmpty(sets)) {
        return null;
    }

    for (Object val : sets) {
        if (1L.equals(redisService.removeZset(key, val))) {
            // 删除成功,表示抢占到
            return val.toString();
        }
    }
    return null;
}

这里每次查询时取了三个数据,然后遍历获取到的数据,依次尝试去删除,若删除成功,则表示当前实例抢占到了这个消息

  1. 为什么这样设计? 这里有两个点,先解释第一个,为啥先查后删

如果我们按照正常的实现流程,每次从zset中取一个,但是无法保证这个时候就只有我一个人拿到了这个数据,在多实例的场景下,可能存在多个实例同时拿到了它,那么如何才能表示只有一个实例抢占到呢?

借助redis的单线程机制,只可能有一个实例会删除成功,所以拿到并删除成功的那个小伙伴,就是最终的幸运儿;

因此实现细节就是先查,后删,若删除成功,表示获取成功;否则表示被其他的实例捷足先登。

  1. 接下来再看第二个,为啥一次拿三个

从上面的分析可以看出,如果我一次只拿一个,那么我抢占到的几率并不太大,特别是当实例比较多时,可能会做多次的无效操作;为了减少这个可能性,所以我一次多拿几个做备选,这样抢占到的概率就会高一些,至于为什么是3,这个就看实际的实例与定时任务的执行间隔了。

上面定义了如何获取延迟队列中已到期的消息,接下来需要定时轮训获取消息:

/**
 * 每5s定时轮训消息
 */
@Scheduled(fixedRate = 5000)
public void schedule() {
   for (String specialTopic : topic) {
       String msg = redisDelayQueue.getDelayOne(specialTopic);
       logger.info("开始轮训获取消息 {}", msg);
       if (StringUtil.isNotEmpty(msg)) {
       	   //使用Spring推送事件处理
           applicationContext.publishEvent(new DelayMsg(this, msg, specialTopic));
       }
   }
}

上面的定时任务,直接借助Spring的@Schedule来实现,遍历所有的topic,捞出数据之后,通过spring的 event/listener事件机制来实现消息处理的解耦

3.5 定义消费者注解和切面处理

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface Consumer {
    String topic();
}

注意这个注解上面还有 @EventListener,表明它可以监听的spring的事件

3.6 定义延时业务的切面处理

@Aspect
@Component
public class ConsumerAspect {

    @Around("@annotation(consumer)")
    public Object around(ProceedingJoinPoint joinPoint, Consumer consumer) throws Throwable {
        Object[] args = joinPoint.getArgs();
        boolean check = false;
        for (Object obj : args) {
            if (obj instanceof DelayMsg) {
                check = consumer.topic().equals(((DelayMsg) obj).getTopic());
            }
        }
        if (!check) {
            // 不满足条件,直接忽略
            return null;
        }
        // topic匹配成功,执行
        return joinPoint.proceed();
    }
}

3.7 消息监听

	//使用自定义的consumer注解监听topic延迟队列
    @Consumer(topic = RedisKeyConstant.DELAY_QUEUE)
    public void consumer(DelayMsg delayMsg) {
        logger.info("预约单延时确认: " + delayMsg.getMsg() + " at:" + System.currentTimeMillis());
        //延迟业务具体实现
        //...
        //...
    }

3.8 业务facade层调用延迟处理

经过以上的延迟队列封装处理,在facade层,也就是我们的业务中就可以直接调用:

@Autowired
private DelayListWrapper delayListWrapper;
...
delayListWrapper.publish(RedisKeyConstant.DELAY_QUEUE, xxxId, xxx);

4 总结

本文以redis的zset来实现延时队列,并基于SpringBoot实现了延迟队列的推送和消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/79307.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

学点Selenium玩点新鲜~,让分布式测试有更多玩法

前 言 我们都知道 Selenium 是一款在 Web 应用测试领域使用的自动化测试工具&#xff0c;而 Selenium Grid 是 Selenium 中的一大组件&#xff0c;通过它能够实现分布式测试&#xff0c;能够帮助团队简单快速在不同的环境中测试他们的 Web 应用。 分布式执行测试其实并不是一…

LSTM模型

目录 LSTM模型 LSTM结构图 LSTM的核心思想 细胞状态 遗忘门 输入门 输出门 RNN模型 LRNN LSTM模型 什么是LSTM模型 LSTM (Long Short-Term Memory)也称长短时记忆结构,它是传统RNN的变体,与经典RNN相比能够有效捕捉长序列之间的语义关联,缓解梯度消失或爆炸现象.同时LS…

考研算法第46天: 字符串转换整数 【字符串,模拟】

题目前置知识 c中的string判空 string Count; Count.empty(); //正确 Count ! null; //错误c中最大最小宏 #include <limits.h>INT_MAX INT_MIN 字符串使用发运算将字符加到字符串末尾 string Count; string str "liuda"; Count str[i]; 题目概况 AC代码…

QT多屏显示程序

多屏显示的原理其实很好理解&#xff0c;就拿横向扩展来说&#xff1a; 计算机把桌面的 宽度扩展成了 w1&#xff08;屏幕1的宽度&#xff09; w2(屏幕2的宽度) 。 当一个窗口的起始横坐标 > w1&#xff0c;则 他就被显示在第二个屏幕上了。 drm设备可以多用户同时打开&am…

STM32定时器TIM控制

一、CubeMX的设置 1、新建工程&#xff0c;进行基本配置 2、配置定时器TIM2 1&#xff09;定时器计算公式&#xff1a;&#xff08;以下两条公式相同&#xff09; Tout ((ARR1) * PSC1)) / Tclk TimeOut ((Prescaler 1) * (Period 1)) / TimeClockFren Tout TimeOut&…

WordPress更换域名后-后台无法进入,网站模版错乱,css失效,网页中图片不显示。完整解决方案(含宝塔设置)

我在实际解决问题时用到了 【简单暴力解决方案】的《方法一&#xff1a;修改wp-config.php》 和 【简单暴力-且特别粗暴-的解决方案】 更换域名时经常遇到的几个问题&#xff1a; 1、更换域名后&#xff0c;后台无法进入 2、更换域名后&#xff0c;网站模版错乱&#xff0c;c…

第 4 章 链表(1)

4.1链表(Linked List)介绍 链表是有序的列表&#xff0c;但是它在内存中是存储如下 小结: 链表是以节点的方式来存储,是链式存储每个节点包含 data 域&#xff0c; next 域&#xff1a;指向下一个节点.如图&#xff1a;发现链表的各个节点不一定是连续存储.链表分带头节点的链…

Django之定时任务--apscheduler

Django--定时任务apscheduler的使用 apscheduler定时任务的使用1、安装包2、配置settings.py3、在manage.py的文件同级目录下创建文件scheduler.py4、在项目的urls.py中调用这个定时计划5、然后启动项目 python manage.py runserver,在admin中查看就能看到你的定时任务及执行的…

React 高阶组件(HOC)

React 高阶组件(HOC) 高阶组件不是 React API 的一部分&#xff0c;而是一种用来复用组件逻辑而衍生出来的一种技术。 什么是高阶组件 高阶组件就是一个函数&#xff0c;且该函数接受一个组件作为参数&#xff0c;并返回一个新的组件。基本上&#xff0c;这是从 React 的组成…

NVIDIA vGPU License许可服务器高可用全套部署秘籍

第1章 前言 近期遇到比较多的场景使用vGPU&#xff0c;比如Citrix 3D场景、Horizon 3D场景&#xff0c;还有AI等&#xff0c;都需要使用显卡设计研发等&#xff0c;此时许可服务器尤为重要&#xff0c;许可断掉会出现掉帧等情况&#xff0c;我们此次教大家部署HA许可服务器。 …

【腾讯云 Cloud Studio 实战训练营】在线 IDE 编写 canvas 转换黑白风格头像

关于 Cloud Studio Cloud Studio 是基于浏览器的集成式开发环境(IDE)&#xff0c;为开发者提供了一个永不间断的云端工作站。用户在使用Cloud Studio 时无需安装&#xff0c;随时随地打开浏览器就能在线编程。 Cloud Studio 作为在线IDE&#xff0c;包含代码高亮、自动补全、Gi…

基于Redis实现关注、取关、共同关注及消息推送(含源码)

微信公众号访问地址&#xff1a;基于Redis实现关注、取关、共同关注及消息推送(含源码) 推荐文章&#xff1a; 1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表; 2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据; 3、为什么引入Rediss…

程序员如何利用公网远程访问查询本地硬盘【内网穿透】

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《高效编程技巧》《cpolar》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 公网远程访问本地硬盘文件【内网穿透】 文章目录 公网远程访问本地硬盘文件【内网穿透】前言1. 下载cpolar和Everything软件1.…

小象课堂在线授课教育系统

此项目包含后端全部代码&#xff0c;前端包括后台和web界面的源码&#xff0c;数据库用的mysql,可当作课设或者毕设&#xff0c;还可写入自己的简历中 web界面展示&#xff1a; 前端后台界面展示&#xff1a; 用户管理 课程管理 内容配置 订单管理 系统管理 系统监控

【山河送书第七期】:《强化学习:原理与Python实战》揭秘大模型核心技术RLHF!

《强化学习&#xff1a;原理与Python实战》揭秘大模型核心技术RLHF&#xff01; 一图书简介二RLHF是什么&#xff1f;三RLHF适用于哪些任务&#xff1f;四RLHF和其他构造奖励模型的方法相比有何优劣&#xff1f;五什么样的人类反馈才是好反馈&#xff1f;六如何减小人类反馈带来…

【C++】模板template

&#x1f525;&#x1f525; 欢迎来到小林的博客&#xff01;&#xff01;       &#x1f6f0;️博客主页&#xff1a;✈️林 子       &#x1f6f0;️博客专栏&#xff1a;✈️ C       &#x1f6f0;️社区 :✈️ 进步学堂       &#x1f6f0;️欢…

stack 、 queue的语法使用及底层实现以及deque的介绍【C++】

文章目录 stack的使用queue的使用适配器queue的模拟实现stack的模拟实现deque stack的使用 stack是一种容器适配器&#xff0c;具有后进先出&#xff0c;只能从容器的一端进行元素的插入与提取操作 #include <iostream> #include <vector> #include <stack&g…

ansible 修改远程主机nginx配置文件

安装ansible brew install ansible 或者 pip3 install ansible 添加远程主机 设置秘钥 mac登录远程主机 ssh -p 5700 root192.168.123.211 ssh localhost #设置双机信任 ssh-kyegen -t rsa #设置主机两边的ssh配置文件 vi /etc/ssh/sshd_config/ PermitRootL…

K8S核心组件etcd详解(下)

1 k8s如何使用etcd 在k8s中所有对象的manifest都需要保存到某个地方&#xff0c;这样他们的manifest在api server重启和失败的时候才不会丢失。 只有api server能访问etcd&#xff0c;其它组件只能间接访问etcd的好处是 增强乐观锁系统及验证系统的健壮性 方便后续存储的替换…

C++新经典03--共用体、枚举类型与typedef

共用体 共用体&#xff0c;也叫联合&#xff0c;有时候需要把几种不同类型的变量存放到同一段内存单元&#xff0c;例如&#xff0c;把一个整型变量、一个字符型变量、一个字符数组放在同一个地址开始的内存单元中。这三个变量在内存中占的字节数不同&#xff0c;但它们都从同…