【Redis】redis高阶-使用zset实现延时队列

Hi,大家好,我是抢老婆酸奶的小肥仔。

最近在使用redis时,就想能不能用其实现消息队列?也在网上看了下其他小伙伴写的实现,结合自身业务实现了如下消息队列,希望对大家有用。

废话不多说,直接开撸。

1、为什么zset可以做消息队列?

首先我们来看下,设计消息队列需要考虑的需求:有序性,消息重复性,可靠性。

  • 有序性:zset所有元素可以根据成员关联的score来进行从低到高的排序,例如,我们可以利用时间戳来进行排序
  • 消息重复性:在zset中每个元素都是唯一的,这也保证了消息的唯一性
  • 可靠性:zset会自动维护元素之间的顺序,在添加或删除元素时无需手动排序,提升操作速度。

2、使用的zset命令

命令描述
zadd将一个给定score的成员添加到有序集合中,返回添加元素的个数
zrange根据元素在有序排序中的位置,从有序集合中获取多个元素
rank(K key, Object o)获取指定元素在集合中的索引,索引从0开始

3、代码实现

使用zset实现消息队列时,具体的流程,如下:

生产者流程:

  1. 用户获取消息Id,并封装消息体
  2. 用户发送数据到生产者,先获取锁
  3. 如果获取到锁,则校验该消息体是否已添加到队列中,已添加则直接返回提醒。
  4. 若未添加则调用方法将数据保存到zset集合中,否则等到指定时间后再获取锁。
  5. 推送数据后,释放锁

消费者流程:

  1. 调用方法获取数据
  2. 获取到数据,则直接返回,否则到指定时间后再次获取数据,直到获取到数据并返回。

统一返回类:

/**
 * @Author: jiangjs
 * @Description:
 * @Date: 2021/11/12 15:46
 **/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ResultUtil<T> implements Serializable {
    private int code;
    private String msg;
    private T data;

    public static <T> ResultUtil<T> success(){
        return ResultUtil.<T>builder().code(1000).msg("成功").build();
    }
    public static <T> ResultUtil<T> success(T data){
        return ResultUtil.<T>builder().code(1000).msg("成功").data(data).build();
    }

    public static <T> ResultUtil<T> error(String msg){
        return ResultUtil.<T>builder().code(5000).msg(msg).data(null).build();
    }

    public static <T> ResultUtil<T> error(int code,String msg){
        return ResultUtil.<T>builder().code(code).msg(msg).build();
    }
}

3.1 消息实体

需添加消息Id,主要防止消息重复消费。

/**
 * @author: jiangjs
 * @description: 消息实体
 * @date: 2023/5/30 11:11
 **/
@Data
@Accessors(chain = true)
public class QueueTask<T> {
    /**
     * 消息Id
     */
    private String taskId;
    /**
     * 任务
     */
    private T task;
}

3.2 队列类型

队列类型可以理解为队列的名称,通过枚举,可以随意添加队列名称。

/**
 * @author: jiangjs
 * @description: 队列类型
 * @date: 2023/5/30 10:53
 **/
public enum QueueTypeEnum {

    /**
     * 订单
     */
    ORDER("order");

    private final String type;

    QueueTypeEnum(String type){
        this.type = type;
    }

    public String getType(){
        return type;
    }
}

3.3 创建消息工具

package com.jiashn.springbootproject.redis.utils;

import com.jiashn.springbootproject.redis.domain.QueueTask;
import com.jiashn.springbootproject.utils.ResultUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author: jiangjs
 * @description: redis实现消息队列
 * @date: 2023/5/30 10:51
 **/
public class RedisQueueUtil<T> {

    private static final Logger log = LoggerFactory.getLogger(RedisQueueUtil.class);

    private RedisTemplate<String,QueueTask<T>> redisTemplate;

    /**
     * 队列类型,即名称
     */
    private final QueueTypeEnum typeEnum;

    public RedisQueueUtil(QueueTypeEnum typeEnum,RedisTemplate<String,QueueTask<T>> redisTemplate){
        this.typeEnum = typeEnum;
        this.redisTemplate = redisTemplate;
    }

    /**
     * 添加消息数据
     * @param queueTask 消息
     * @param time 延迟时间,单位s
     */
    public ResultUtil<String> sendQueueTask(QueueTask<T> queueTask, long time){
        //加锁
        if (getLock()){
            try {
                Long rank = redisTemplate.opsForZSet().rank(typeEnum.getType(), queueTask);
                if (Objects.nonNull(rank)){
                    return ResultUtil.error(6000,"消息数据已经存在,不予添加......");
                }
                Boolean result = redisTemplate.opsForZSet().add(typeEnum.getType(), queueTask, System.currentTimeMillis() + time*1000);
                if (Objects.nonNull(result) && result){
                    log.info("添加消息数据成功:" + queueTask + ",添加时间:" + LocalDateTime.now());
                    return ResultUtil.success("添加消息数据成功");
                }
                return ResultUtil.error("添加消息数据失败");
            }finally {
                //释放锁
                releaseLock();
            }
        } else {
            log.info("未获取到锁,稍后再试");
            return ResultUtil.error("未获取到锁,稍后再试");
        }
    }

    /**
     * 获取zset前count数据
     * @param count 数据数
     * @return 返回获取到数据
     */
    public Set<QueueTask<T>> loopGetTask(int count) {
            //rangeByScore,根据score顺序获取zset数据的值
            return redisTemplate.opsForZSet().rangeByScore(typeEnum.getType(), 0, System.currentTimeMillis(), 0, count-1);
    }

    /**
     * 注销消息队列
     * @param typeEnum 消息队列名称
     */
    public void destroy(QueueTypeEnum typeEnum){
        redisTemplate.opsForZSet().remove(typeEnum.getType());
    }

    /**
     * 获取任务Id
     * @return 返回消息Id
     */
    public String getTaskId(){
       return typeEnum.getType() + "_" + UUID.randomUUID().toString().replace("-","");
    }

    /**
     * 获取锁
     * @return 返回加锁状态
     */
    private boolean getLock(){
        Boolean absent = redisTemplate.opsForValue().setIfAbsent(typeEnum.getType() + "_Locked", null, 30L, TimeUnit.MINUTES);
        return Objects.nonNull(absent) ? absent : false;
    }

    /**
     * 释放锁
     */
    public void releaseLock(){
        redisTemplate.delete(typeEnum.getType() + "_Locked");
    }
}

在消息工具类中,创建消息任务时添加了锁,只有在获取锁的前提下才能添加消息任务。

提供获取消息Id的方法是为了让提交消息任务前,先获取Id,即使在提交时网络发生问题,提交的Id还是同一个,再进行消息消费时,可以根据这个Id来进行判断该消息任务是否已被消费,被消费则直接丢弃。

3.4 消费消息

/**
 * @author: jiangjs
 * @description: 启动消费
 * @date: 2023/5/30 14:27
 **/
@Component
public class CustomerTaskLineRunner implements CommandLineRunner {

    @Resource
    private RedisTemplate<String,QueueTask<String>> redisTemplate;

    private final static String QUEUE_TYPE = QueueTypeEnum.ORDER.getType();
    private final static Logger log = LoggerFactory.getLogger(CustomerTaskLineRunner.class);

    @Override
    public void run(String... args) throws Exception {
        RedisQueueUtil<String> queueUtil = new RedisQueueUtil<>(QueueTypeEnum.ORDER,redisTemplate);
        while (true){
            Set<QueueTask<String>> queueTasks = queueUtil.loopGetTask(10);
            if (CollectionUtils.isNotEmpty(queueTasks)){
                for (QueueTask<String> queueTask : queueTasks) {
                    //校验当前消息是否已消费,主要防止网络延时,导致多次提交同一任务 存在
                    QueueTask<String> stringQueueTask = redisTemplate.opsForValue().get(QUEUE_TYPE + "_" + queueTask.getTaskId());
                    if (Objects.nonNull(stringQueueTask)){
                        log.info("该任务已经消费,不能重复消费");
                        redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);
                        continue;
                    }
                    Long removeNum = redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);
                    if (Objects.nonNull(removeNum) && removeNum > 0){
                        String task = queueTask.getTask();
                        log.info("消费任务数据:" + task);
                        //设置过期时间,10分钟内则默认是重复提交
                        redisTemplate.opsForValue().set(QUEUE_TYPE + "_" + queueTask.getTaskId(),queueTask,10L, TimeUnit.MINUTES);
                    }
                }
            }
            log.info("------1分钟后再次获取------");
            Thread.sleep(60000);
        }
    }
}

校验重复消息,若消息重复且在10分钟内未被消费,则直接将该消息从队列中删除。在消息任务被消费后,将数据从队列中移除。

执行结果:

谢谢大家,今天的分享就到这,不合理的地方希望大家指正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/677032.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Kafka之Broker原理

1. 日志数据的存储 1.1 Partition 1. 为了实现横向扩展&#xff0c;把不同的数据存放在不同的 Broker 上&#xff0c;同时降低单台服务器的访问压力&#xff0c;我们把一个Topic 中的数据分隔成多个 Partition 2. 每个 Partition 中的消息是有序的&#xff0c;顺序写入&#x…

【机器学习】Qwen1.5-14B-Chat大模型训练与推理实战

目录 一、引言 二、模型简介 2.1 Qwen1.5 模型概述 2.2 Qwen1.5 模型架构 三、训练与推理 3.1 Qwen1.5 模型训练 3.2 Qwen1.5 模型推理 四、总结 一、引言 Qwen是阿里巴巴集团Qwen团队的大语言模型和多模态大模型系列。现在&#xff0c;大语言模型已升级到Qwen1.5&…

分布式session共享配置

目录 1、spring-session 1.1 添加依赖 1.2 spring-mvc.xml配置文件 1.3 web.xml 2、tomcat配置session、共享 2.1 Tomcat配置 2.2 Web.xml配置 1、spring-session 官方文档&#xff1a;https://docs.spring.io/spring-session/docs/1.3.0.RELEASE/reference/html5/ 1.…

【vue-7】图片轮播

实现功能&#xff1a; 1、通过button实现图片轮训播放&#xff1b; 2、通过标签列表项实现图片的播放&#xff1b; 示例代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"vi…

AI网络爬虫:对网页指定区域批量截图

对网页指定区域批量截图&#xff0c;可以在deepseek的代码助手中输入提示词&#xff1a; 你是一个Python编程专家&#xff0c;一步一步的思考&#xff0c;完成一个对网页指定区域截图的python脚本的任务&#xff0c;具体步骤如下&#xff1a; 设置User-Agent: Mozilla/5.0 (…

【AIOT-Robot】3D pos 相关

1. Mediapipe 3D detection 使用移动增强现实(AR)会话数据(session data),开发了新的数据pipeline。大部分智能手机现在都具备了增强现实的功能,在这个过程中捕捉额外的信息,包括相机姿态、稀疏的3D点云、估计的光照和平面。 利用相机的姿势、检测到的平面、估计的照明,来生…

JVM学习-监控工具(一)

使用数据说明问题&#xff0c;使用知识分析问题&#xff0c;使用工具处理问题 无监控&#xff0c;不调优&#xff01; 命令行工具 在JDK安装目录下&#xff0c;可以查看到相应的命令行工具&#xff0c;如下图 jps(Java Process Status) 显示指定系统内所有的Hotpot虚拟机…

网络工程师----第四十七天

1、请简述super vlan和sub vlan的区别&#xff1f; 2、请简述mux vlan 中不同vlan的特点&#xff1f; 3、请基于工作原理简述GVRP协议中三种接口模式的特点&#xff1f; 4、请简述STP的选举过程&#xff1f; 5、两台交换机在不增加成本的情况下为提高链路带宽和可靠性采用链路聚…

STM32与陶晶驰串口屏交互

1、串口屏界面设计 1.新建工程 保存位置自定义&#xff0c;作为一个合格的嵌入式工程师要有路径下没有中文的情况并命名。 选择自己串口屏对应的芯片&#xff0c;一般屏幕背面会有&#xff0c;也可以查看资料。 选择显示方向&#xff0c;自行选择。按照自己的爱好 右边可对当前…

Android约束布局ConstraintLayout的使用

Android引入约束布局的目的是为了减少布局层级的嵌套&#xff0c;从而提升渲染性能。约束布局综合线性布局、相对布局、帧布局的部分功能&#xff0c;缺点也很明显&#xff0c;就是可能要多写几行代码。所以约束布局使用时&#xff0c;还得综合考虑代码量。提升性能也并不一定非…

EulerMaker Yocto Open Build Service

EulerMaker & Yocto & Open Build Service 1 介绍1.1 概述 2 工具2.1 Yocto 【嵌入式领域】介绍目标好处三大关键组件创建流程发行版本 2.2 Open Build Service 【OBS】【服务器领域】介绍应用 2.3 EulerMaker 【全场景】介绍特性需求背景&#xff08;1&#xff09;能支…

群体优化算法---鲸鱼优化算法应用于电力系统优化

介绍 鲸鱼优化算法&#xff08;Whale Optimization Algorithm, WOA&#xff09;是一种基于鲸鱼行为的智能优化算法&#xff0c;由Seyedali Mirjalili等人于2016年提出。WOA受鲸鱼捕食行为的启发&#xff0c;尤其是座头鲸的气泡网捕食策略&#xff0c;模拟了鲸鱼围绕猎物游动和…

Qt图像处理技术十二:QImage实现边缘检测(sobel算法)

效果图 原理 Sobel算法是一种常用的边缘检测算法&#xff0c;它利用图像的灰度变化来检测图像中物体的边缘。Sobel算法主要包括以下几个步骤&#xff1a; 灰度化&#xff1a; 首先将彩色图像转换为灰度图像&#xff0c;因为灰度图像只包含单通道的灰度信息&#xff0c;有利于…

LeetCode刷题之HOT100之全排列

九点半了&#xff0c;做题吧。聊天聊到十一点多哈哈。 1、题目描述 2、逻辑分析 给定一个不重复数组&#xff0c;要求返回所有可能的全排列。这道题跟我上一道题思想一致&#xff0c;都是使用到回溯的算法思想来解决。直接用代码来解释吧 3、代码演示 public List<List&…

java的clone

一、clone的用法&#xff1a; package chatRoom.F5;class Person implements Cloneable{//1.public String name;public Person(String name) {this.name name;}//2.protected Person clone() throws CloneNotSupportedException {return (Person)super.clone();//重写Object…

mac安装nigix

1. 查看是否存在 nginx 执行brew search nginx 命令查询要安装的软件是否存在 brew search nginx 2. 安装nginx brew install nginx 3. 查看版本 nginx -v 4. 查看信息 查看ngxin下载的位置以及nginx配置文件存放路径等信息 brew info nginx 下载的存放路径 /usr/loca…

Django基础学习(一)

前端开发 目的&#xff1a;开发一个平台(网站)- 前端开发&#xff1a; HTML, CSS,JavaScript- web框架&#xff1a;接收请求并进行处理- MySQL数据库&#xff1a;存储相应的数据1.快速开发网站 pip install flask创建项目并导入flask框架,然后建立网址和函数的对应关系。 fr…

C++设计模式——Adapter适配器模式

一&#xff0c;适配器模式简介 适配器模式是一种结构型设计模式&#xff0c;用于将已有接口转换为调用者所期望的另一种接口。 适配器模式让特定的API接口可以适配多种场景。例如&#xff0c;现有一个名为"Reader()"的API接口只能解析txt格式的文件&#xff0c;给这…

JavaEE_CAS_Synchronized原理_线程安全集合类

文章目录 一、CAS1.什么是CAS2.CAS有哪些应用1.实现原子类 - AtomicInteger2.基于CAS实现的自旋锁3.CAS的ABA问题 二、Synchronized原理1.基本特点2.偏向锁3.锁消除4.锁粗化 三、JUC(java.util.concurrent)的常见类1.Callable接口2.ReentrantLock3.信号量Semaphore4.CountDownL…

11.7 堆排序

目录 11.7 堆排序 11.7.1 算法流程 11.7.2 算法特性 11.7 堆排序 Tip 阅读本节前&#xff0c;请确保已学完“堆“章节。 堆排序&#xff08;heap sort&#xff09;是一种基于堆数据结构实现的高效排序算法。我们可以利用已经学过的“建堆操作”和“元素出堆操作”…