使用redisMQ-spring-boot-starter实现消息队列和延时队列

简介

redisMQ-spring-boot-starter是一个轻量级的、基于Redis实现的消息队列中间件,它有如下优点:

  • 开箱即用,你几乎不用添加额外的配置
  • 支持消息队列、延时队列,并提供精细化配置参数
  • 提供消息确认机制
  • 支持虚拟空间,不同虚拟空间的数据互相隔离
  • 支持web控制台,实时查看各个队列的消费情况

开始使用

引用依赖

springboot3.0以下版本:

<dependency>
    <groupId>io.github.lengmianshi</groupId>
    <artifactId>redisMQ-spring-boot-starter</artifactId>
    <version>1.0.4</version>
</dependency>

<!-- 以下配置可以改为你自己的版本 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.2</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.83</version>
</dependency>

注:spring-boot-starter-data-redis依赖于spring-data-redis,如果发生依赖冲突,要确保spring-data-redis的版本不低于2.1.0.RELEASE,可在你的pom.xml中锁定版本:

<dependencyManagement>
    <dependencies>
          <dependency>
              <groupId>org.springframework.data</groupId>
              <artifactId>spring-data-redis</artifactId>
              <version>2.1.2.RELEASE</version>
          </dependency>
    </dependencies>
</dependencyManagement>

springboot3.0:

<dependency>
    <groupId>io.github.lengmianshi</groupId>
    <artifactId>redisMQ-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>
        
<!-- 以下配置可以改为你自己的版本 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>3.2.1</version>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>5.1.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.83</version>
</dependency>

配置redis

一般引入redis的项目都会事先配置,如果你的项目没配置过,则可在application.yml中加上如下配置:

springboot3.0以下版本:

spring:
  redis:
    host: test.redis.com  #改成你的
    password: vC403V2KMc0Kghz #改成你的
    port: 6379 #改成你的
    jedis:
      pool:
        max-active: 100
        max-idle: 10
        min-idle: 10
    timeout: 2000

springboot3.0:

spring:
  data:
    redis:
      host: test.redis.com #改成你的
      password: vC403V2KMc0Kghz #改成你的
      port: 6379 #改成你的
      jedis:
        pool:
          max-active: 100
          max-idle: 10
          min-idle: 10
      timeout: 2000

消息队列

生产者发送消息

@Autowired
private RedisQueueTemplate redisQueueTemplate;

/**
 * 1次只发送一条消息
 */
@Test
public void test1() {
    JSONObject message = new JSONObject();
    message.put("bondId", "17f62f1dfb5afb12e8d67cd651c1df53");
    message.put("year", 2022);
    redisQueueTemplate.sendMessage("test_queue", message);
}

/**
 * 批量发送消息
 */
@Test
public void test2() {
    List messageList = new ArrayList<>();
    for (int i = 0; i < 5000; i++) {
        JSONObject mess = new JSONObject();
        mess.put("index", i);
        messageList.add(mess);
    }

    redisQueueTemplate.sendMessageAll("test_queue", messageList);
}

注:示例中每条MQ消息都用JSONObject包装,这只是我的个人习惯,你也可以使用实体类

消费者消费消息

消费方法的参数只能有1个,并且类型要与生产者发送消费的类型保存一致:

@Component
public class QueueConsumer {
    //使用默认参数
    @RedisQueueListener(queue = "test_queue")
    public void test(JSONObject message){
        System.out.println(message);
    }

    //指定单个实例下使用5个消费线程
    @RedisQueueListener(queue = "test_queue2", consumers = 5)
    public void test2(JSONObject message){
        System.out.println(message);
    }

    //单个实例5个线程,手动确认
    @RedisQueueListener(queue = "test_queue3", consumers = 5, autoAck = false)
    public void test3(JSONObject message){
        System.out.println(message);
    }

}

@RedisQueueListener注解支持的所有参数:

package com.leng.project.redisqueue.annotation;

import java.lang.annotation.*;

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisQueueListener {
    /**
     * 队列名
     *
     * @return
     */
    String queue() default "";

    /**
     * 消费者线程数
     *
     * @return
     */
    int consumers() default 1;

    /**
     * 是否自动确认
     *
     * @return
     */
    boolean autoAck() default true;

    /**
     * 一次从队列中取多少数据
     *
     * @return
     */
    int prefetch() default 50;

    /**
     * 获取消息的频率,单位秒
     * @return
     */
    long frequency() default 2;
}

其中:

  • consumers:单个实例下启动多少个消费线程,默认为1
  • autoAck:是否自动确认消息,默认为true。自动确认与手动确认的区别:
    • 自动确认:消费线程从队列中取出消息,如果消费失败,则该条消息丟失
    • 手动确认:消费线程从队列中取出消息,并将消息写入待确认队列中;如果消费失败,则一段时间后(15分钟)会重新入队,消费端要做幂等性处理
  • prefetch:一个消费线程一次性从队列中取出多少条消息,因为涉及锁的竞争,不宜过小,默认为50
  • frequency:单个消费线程每隔多少秒获取一次消息,默认为2,最小值为1。有人可能会奇怪,消息不是应该即时消费吗?不是越快越好吗?实际上,有些业务场景对消息的实时性要求很低,几天、几个月、甚至一年才执行一次,这时我们完全可以把frequency调大,以减轻redis的压力

延时队列

延时队列的常用场景如用户下单,xx分钟后没有支付则自动关闭订单;已支持的订单,xxx天后自动确认收货等。

生产者发送消息

@Autowired
private RedisQueueTemplate redisQueueTemplate;

/**
 * 1次只发送1条消息
 */
public void test1(){
    JSONObject message = new JSONObject();
    message.put("bondId", "17f62f1dfb5afb12e8d67cd651c1df53");
    message.put("year", 2022);
    //延时5秒
    redisQueueTemplate.sendDelayMessage("test_delay_queue", message, 5, TimeUnit.SECONDS);
}

/**
 * 批量发送,每条消息的延时时长一样
 */
public void test2(){
    List messageList = new ArrayList<>();
    for (int i = 0; i < 5000; i++) {
        JSONObject mess = new JSONObject();
        mess.put("index", i);
        messageList.add(mess);
    }
    //延时5秒
    redisQueueTemplate.sendDelayMessageAll(queue, messageList, 5, TimeUnit.SECONDS);
}

/**
 * 批量发送,每条消息的延时时长各不相同
 */
public void test3(){
    List messageList=new ArrayList<>();
    for(int i=0; i< 5000; i++){
        JSONObject mess=new JSONObject();
        mess.put("index",i);
        
        //每条消息可以使用不同的延时时长,这里为了简便,统一写成5了
        DelayMessageParam param=new DelayMessageParam(mess,5,TimeUnit.SECONDS);
        messageList.add(param);
    }

    redisQueueTemplate.sendDelayMessageAll(queue,messageList);
}

注:示例中每条MQ消息都用JSONObject包装,这只是我的个人习惯,你也可以使用实体类

消费者消费消息

@RedisDelayQueueListener注解的参数与@RedisQueueListener完全相同;消费方法的参数只能有1个,并且类型要与生产者发送消费的类型保存一致:

@Component
public class DelayQueueConsumer {
  /**
   * 使用默认参数
   * @param message
   */
  @RedisDelayQueueListener(queue = "test_delay_queue")
    public void test(JSONObject message){
        System.out.println(message);
    }

  /**
   * 单个实例5个消费线程
   * @param message
   */
  @RedisDelayQueueListener(queue = "test_delay_queue2", consumers = 5)
    public void test2(JSONObject message){
        System.out.println(message);
    }

  /**
   * 单个实例5个消费线程,手动确认
   * @param message
   */
  @RedisDelayQueueListener(queue = "test_delay_queue3", consumers = 5, autoAck = false)
    public void test3(JSONObject message){
        System.out.println(message);
    }

}

虚拟空间

参考了RabbitMQ的设计。虚拟空间很有必要,例如,开发环境和测试环境的数据如果没有隔离,在调试时被测试环境的消费端干扰。
配置虚拟空间:

queue:
  virtual-host: '/dev'  #默认为 /

Web管理平台

浏览器访问:http://ip:port/queue.html,默认的账号密码为admin/admin

配置账号:

queue:
  console:
    #是否启用web控制台
    enable: true
    username: admin #登录用户名
    password: 123456 #密码

登录成功后的界面,可查看所有虚拟空间的队列及消费情况:
image.png

注:如果你的系统使用了权限控制框架,如shiro、spring-security等,则需要对如下3个资源放行:

  • /queue.html
  • /queue/**
  • /static/**

ps:
项目地址:https://github.com/lengmianshi/redisMQ-spring-boot-starter,欢迎提bug

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/404342.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

非洲数字经济持续崛起 本地化策略让传音提前入局

非洲市场&#xff0c;被誉为全球最后的“边疆级”市场&#xff0c;吸引着全球目光。近日&#xff0c;非洲开发银行最新报告指出&#xff0c;未来两年非洲的经济增长将优于世界其他地区&#xff0c;2023 年和 2024 年实际国内生产总值 (GDP) 平均约为 4%。广阔的非洲大陆焕发着勃…

PLC设置网口通讯的原因

PLC设置网口通讯功能&#xff0c;是现场总线向工业以太网的迈进&#xff0c;更是为了在网口之上构建更为庞大、开放的大一统的生态系统。 有了以太网&#xff0c;特别是TCP/IP协议后&#xff0c;全员工控人的日常工作更为便利了。 主要体现在以下4点&#xff1a; 1.再也不需要…

TiDB 社区智慧合集丨TiDB 相关 SQL 脚本大全

非常感谢各位 TiDBer 在之前 【TiDBer 唠嗑茶话会 48】非正式 TiDB 相关 SQL 脚本征集大赛&#xff01;( https://asktug.com/t/topic/996635 )里提供的各种常用脚本。 在这篇文章中&#xff0c;我们整理了社区同学提供的一系列 TiDB 相关 SQL 脚本&#xff0c;希望能为大家在…

基于springboot+vue的车辆管理系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

【云动世纪:Apache Doris 技术之光】

本文节选自《基础软件之路&#xff1a;企业级实践及开源之路》一书&#xff0c;该书集结了中国几乎所有主流基础软件企业的实践案例&#xff0c;由 28 位知名专家共同编写&#xff0c;系统剖析了基础软件发展趋势、四大基础软件&#xff08;数据库、操作系统、编程语言与中间件…

短视频新媒体的福音:视频抽插帧AI效率是人工的100倍以上

进入全民短视频时代&#xff0c;人像视频的拍摄也正在迈向专业化。随着固化审美的瓦解&#xff0c;十级磨皮的网红滤镜被打破&#xff0c;多元化的高级质感成为新的风向标&#xff0c;“美”到每一帧是人们对动态视频提出的更高要求。 目前&#xff0c;大部分手机均可记录主流的…

【Vuforia+Unity】AR05-实物3D模型识别功能实现(ModelTarget )

不管是什么类型的识别Vuforia的步骤基本都是&#xff1a; 把被识别的物体转成图、立体图、柱形图&#xff0c;3D模型、环境模型&#xff0c;然后模型生成Vuforia数据库-导入Unity-参考模型位置开始摆放数字内容&#xff0c;然后参考模型自动隐藏-发布APP-识别生活中实物-数字内…

体验LobeChat搭建私人聊天应用

LobeChat是什么 LobeChat 是开源的高性能聊天机器人框架&#xff0c;支持语音合成、多模态、可扩展的&#xff08;Function Call&#xff09;插件系统。支持一键免费部署私人 ChatGPT/LLM 网页应用程序。 地址&#xff1a;https://github.com/lobehub/lobe-chat 为什么要用Lobe…

【Python笔记-设计模式】工厂模式

一、说明 (一) 解决问题 提供了一种方式&#xff0c;在不指定具体类将要创建的情况下&#xff0c;将类的实例化操作延迟到子类中完成。可以实现客户端代码与具体类实现之间的解耦&#xff0c;使得系统更加灵活、可扩展和可维护。 (二) 使用场景 希望复用现有对象来节省系统…

C# cass10 宗地初始化-根据 “预编号” “权利人”图层对应信息 批量添加到宗地图层

运行环境Visual Studio 2022 c# cad2016 cass10 根据 “预编号” “权利人”图层对应信息 批量添加到宗地图层 一、主要步骤 zdimport 方法&#xff1a;这个方法用于导入宗地信息。首先通过调用 AutoCAD API 获取当前活动文档、数据库和编辑器对象。然后根据 CreatePalette.Se…

Web3 基金会推出去中心化之声计划:投入高额 DOT 和 KSM ,助力去中心化治理

作者&#xff1a;Web3 Foundation Team 编译&#xff1a;OneBlock 原文&#xff1a;https://medium.com/web3foundation/decentralized-voices-program-93623c27ae43 Web3 基金会为 Polkadot 和 Kusama 创建了去中心化之声计划&#xff08;Decentralized Voices Program&…

【深度学习笔记】3_1 线性回归

注&#xff1a;本文为《动手学深度学习》开源内容&#xff0c;仅为个人学习记录&#xff0c;无抄袭搬运意图 3.1 线性回归 线性回归输出是一个连续值&#xff0c;因此适用于回归问题。回归问题在实际中很常见&#xff0c;如预测房屋价格、气温、销售额等连续值的问题。与回归问…

3.测试教程 - 基础篇

文章目录 软件测试的生命周期软件测试&软件开发生命周期如何描述一个bug如何定义bug的级别bug的生命周期如何开始第一次测试测试的执行和BUG管理产生争执怎么办&#xff08;处理人际关系&#xff09; 大家好&#xff0c;我是晓星航。今天为大家带来的是 测试基础 相关的讲解…

C++最佳实践之编译篇

C最佳实践之工程编译 在大型c/c工程开发中&#xff0c;往往会涉及多级CMakeLists.txt的调用&#xff0c;并且调用方式错综复杂&#xff0c;主要有以下两种方式&#xff1a; 1. 子目录中的CMakeList.txt独立生成目标&#xff0c;不作为主目标生成过程的依赖关系&#xff08;比…

架构师技能9-深入mybatis:Creating a new SqlSession到查询语句耗时特别长

开篇语录&#xff1a;以架构师的能力标准去分析每个问题&#xff0c;过后由表及里分析问题的本质&#xff0c;复盘总结经验&#xff0c;并把总结内容记录下来。当你解决各种各样的问题&#xff0c;也就积累了丰富的解决问题的经验&#xff0c;解决问题的能力也将自然得到极大的…

【生活】浅浅记录

各位小伙伴们好鸭&#xff0c;今天不是技术文章&#xff0c;浅浅记录一下最近几个月的收获&#x1f60a; 新的一年&#xff0c;一起努力&#xff0c;加油加油&#xff01;

2024年【安全员-A证】免费试题及安全员-A证作业模拟考试

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 安全员-A证免费试题参考答案及安全员-A证考试试题解析是安全生产模拟考试一点通题库老师及安全员-A证操作证已考过的学员汇总&#xff0c;相对有效帮助安全员-A证作业模拟考试学员顺利通过考试。 1、【多选题】 《陕西…

黑色金属冶炼5G智能工厂数字孪生可视化管控系统,推进金属冶炼行业数字化转型

黑色金属冶炼5G智能工厂数字孪生可视化管控系统&#xff0c;推进金属冶炼行业数字化转型。随着科技的不断发展&#xff0c;数字化转型已经成为各行各业发展的必然趋势。金属冶炼行业作为传统工业的重要组成部分&#xff0c;也面临着数字化转型的挑战和机遇。为了推进金属冶炼行…

Vue知识学习

Vue 是什么&#xff1f; 概念&#xff1a;Vue 是一个用于构建用户界面的渐进式框架 Vue 的两种使用方式: ① Vue 核心包开发 场景:局部 模块改造 ② Vue 核心包& Vue插件工程化开发 场景:整站开发 创建Vue 实例&#xff0c;初始化渲染的核心步骤: 1.准备容器 2.引包(官…

这两招,让你轻松俘获客户心

面向政府的数字化解决方案作为睿鸿数字应用的一个分支&#xff0c;在充分借鉴政府项目中积累的丰富经验的基础上&#xff0c;积极开发更多领域通用的标准化产品。 2023年&#xff0c;睿鸿推出了一系列创新的数字应用产品&#xff0c;包括动态表单系统、统一集成门户、统一通信中…