Spring Boot 集成 Redisson 实现消息队列

包含组件内容

  • RedisQueue:消息队列监听标识
  • RedisQueueInit:Redis队列监听器
  • RedisQueueListener:Redis消息队列监听实现
  • RedisQueueService:Redis消息队列服务工具

代码实现

RedisQueue

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * Redis消息队列注解
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisQueue {
    /**
     * 队列名
     */
    String value();
}

RedisQueueInit

import jakarta.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * 初始化Redis队列监听器
 *
 * @author 十八
 * @createTime 2024-09-09 22:49
 */
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {

    public static final String REDIS_QUEUE_PREFIX = "redis-queue";
    final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
    @Resource
    private RedissonClient redissonClient;
    private ExecutorService executorService;

    public static String buildQueueName(String queueName) {
        return REDIS_QUEUE_PREFIX + ":" + queueName;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, RedisQueueListener> queueListeners = applicationContext.getBeansOfType(RedisQueueListener.class);
        if (!queueListeners.isEmpty()) {
            executorService = createThreadPool();
            for (Map.Entry<String, RedisQueueListener> entry : queueListeners.entrySet()) {
                RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class);
                if (redisQueue != null) {
                    String queueName = redisQueue.value();
                    executorService.submit(() -> listenQueue(queueName, entry.getValue()));
                }
            }
        }
    }

    private ExecutorService createThreadPool() {
        return new ThreadPoolExecutor(
                Runtime.getRuntime().availableProcessors() * 2,
                Runtime.getRuntime().availableProcessors() * 4,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100),
                new NamedThreadFactory(REDIS_QUEUE_PREFIX),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }

    private void listenQueue(String queueName, RedisQueueListener redisQueueListener) {
        queueName = buildQueueName(queueName);
        RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName);
        log.info("Redis队列监听开启: {}", queueName);
        while (!shutdownRequested.get() && !redissonClient.isShutdown()) {
            try {
                Object message = blockingQueue.take();
                executorService.submit(() -> redisQueueListener.consume(message));
            } catch (RedissonShutdownException e) {
                log.info("Redis连接关闭,停止监听队列: {}", queueName);
                break;
            } catch (Exception e) {
                log.error("监听队列异常: {}", queueName, e);
            }
        }
    }

    public void shutdown() {
        if (executorService != null) {
            executorService.shutdown();
            try {
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    executorService.shutdownNow();
                }
            } catch (InterruptedException ex) {
                executorService.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        shutdownRequested.set(true);
        if (redissonClient != null && !redissonClient.isShuttingDown()) {
            redissonClient.shutdown();
        }
    }

    private static class NamedThreadFactory implements ThreadFactory {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        public NamedThreadFactory(String prefix) {
            this.namePrefix = prefix;
        }

        @Override
        public Thread newThread(@NotNull Runnable r) {
            return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
        }
    }

}

RedisQueueListener

/**
 * Redis消息队列监听实现
 *
 * @author 十八
 * @createTime 2024-09-09 22:51
 */
public interface RedisQueueListener<T> {

    /**
     * 队列消费方法
     *
     * @param content 消息内容
     */
    void consume(T content);
}

RedisQueueService

import jakarta.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;

/**
 * Redis 消息队列服务
 *
 * @author 十八
 * @createTime 2024-09-09 22:52
 */
@Component
public class RedisQueueService {

    @Resource
    private RedissonClient redissonClient;

    /**
     * 添加队列
     *
     * @param queueName 队列名称
     * @param content   消息
     * @param <T>       泛型
     */
    public <T> void send(String queueName, T content) {
        RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));
        blockingQueue.add(content);
    }

    /**
     * 添加延迟队列
     *
     * @param queueName 队列名称
     * @param content   消息类型
     * @param delay     延迟时间
     * @param timeUnit  单位
     * @param <T>       泛型
     */
    public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(content, delay, timeUnit);
    }

    /**
     * 发送延迟队列消息(单位毫秒)
     *
     * @param queueName 队列名称
     * @param content   消息类型
     * @param delay     延迟时间
     * @param <T>       泛型
     */
    public <T> void sendDelay(String queueName, T content, long delay) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS);
    }
}

测试

创建监听对象

import cn.yiyanc.infrastructure.redis.annotation.RedisQueue;
import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author 十八
 * @createTime 2024-09-10 00:09
 */
@Slf4j
@Component
@RedisQueue("test")
public class TestListener implements RedisQueueListener<String> {
    @Override
    public void invoke(String content) {
        log.info("队列消息接收 >>> {}", content);
    }
}

测试用例

import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 十八
 * @createTime 2024-09-10 00:11
 */
@RestController
@RequestMapping("queue")
public class QueueController {

    @Resource
    private RedisQueueService redisQueueService;

    @PostMapping("send")
    public void send(String message) {
        redisQueueService.send("test", message);
        redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000);
    }

}

测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/875928.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

清理C盘缓存的垃圾,专业清理C盘缓存垃圾的步骤与策略

在维护计算机系统的过程中&#xff0c;定期清理C盘&#xff08;通常是系统盘&#xff09;中的缓存和垃圾文件是一项至关重要的任务。这不仅能有效释放磁盘空间&#xff0c;提升系统性能&#xff0c;还能减少因磁盘空间不足导致的程序运行缓慢或错误。以下是一系列专业且安全的步…

请求响应-02.请求-postman工具

一.前后端分离开发 当前主流的开发模式是前后端分离开发&#xff0c;每开发一个功能&#xff0c;就需要对该功能接口进行测试&#xff0c;当前我们的测试方法是直接将url地址输入到浏览器中&#xff0c;查看web页面是否满足我们的要求。但是浏览器发起的请求全部都是GET请求&am…

架构设计 - 常用日志收集方案选型对比与推荐

目录 1. 常用组合1.1 ELK Stack -> Elastic Stack1.2 EFK Stack1.3 Graylog1.4 PLG 日志系统1.5 Splunk1.6 Filebeat ELK1.7 AWS CloudWatch Logs1.8 阿里云日志服务1.9 腾讯云 CLS&#xff08;日志服务&#xff09; 2. 推荐 日志收集是系统监控和调试中的关键环节。常见的…

[ RK3566-Android11 ] 关于 RK628F 驱动移植以及调试说明

问题描述 我这个项目的SDK比较老&#xff0c;移植RK628F最新驱动的调试过程&#xff0c;踩了很多坑&#xff0c;希望大家别踩坑。 解决方案&#xff1a; 首先在FTP上下载最新的RK628的驱动 rk628-for-all-v27-240730 版本。 下载完后 不要直接替换&#xff0c;不要直接替换&a…

小琳AI课堂:o1系列模型

大家好&#xff0c;这里是小琳AI课堂&#xff01;今天我们一起来探索OpenAI最新发布的o1系列模型&#xff0c;这可是AI领域的一大突破哦&#xff01; OpenAI o1系列模型技术大揭秘 o1系列模型是基于强化学习&#xff08;RL&#xff09;训练的&#xff0c;包括o1-preview和o1-…

【CSS】选择器(基本选择器、复合选择器、属性匹配选择器、结构伪类选择器、伪元素选择器)

选择器 引入方式基础选择器复合选择器属性匹配选择器结构伪类选择器伪元素选择器 引入方式 1&#xff1a;外联 <!-- css引入方式1&#xff1a;外联 外联与内嵌优先级相同&#xff0c;取决于加载顺序 --><link rel"stylesheet" href"./样式.css"…

起重机检测系统源码分享

起重机检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Computer Visio…

直播相关02-录制麦克风声音,QT 信号与槽,自定义信号和槽

一 信号与槽函数 #include "mainwindow.h" #include <QPushButton> #include <iostream> using namespace std;//我们的目的是在 window中加入一个button&#xff0c;当点击这个button后&#xff0c;关闭 MainWindow 。 MainWindow::MainWindow(QWidget …

速通GPT-3:Language Models are Few-Shot Learners全文解读

文章目录 GPT系列论文速通论文实验总览1. 任务设置与测试策略2. 任务类别3. 关键实验结果4. 数据污染与实验局限性5. 总结与贡献 Abstract1. 概括2. 具体分析3. 摘要全文翻译4. 为什么不需要梯度更新或微调⭐ Introduction1. 概括2. 具体分析3. 进一步分析 Approach1. 概括2. 具…

【Unity学习心得】如何制作俯视角射击游戏

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、导入素材二、制作流程 1.制作地图2.实现人物动画和移动脚本3.制作单例模式和对象池4.制作手枪pistol和子弹bullet和子弹壳bulletShell5.制作散弹枪shotgun总…

MMLU-Pro 基准测试数据集上线,含 12k 个跨学科复杂问题,难度提升,更具挑战性!DeepSeek 数学模型一键部署

在大语言模型 (LLM) 蓬勃发展的时代&#xff0c;诸如大规模多任务语言理解 (MMLU) 之类的基准测试&#xff0c;在推动 AI 于不同领域的语言理解与推理能力迈向极限方面&#xff0c;发挥着至关重要的关键作用。 然而&#xff0c;伴随模型的持续改进与优化&#xff0c;LLM 在这些…

Vue路由:Vue router

目录 路由的基本概念 1. 路由 2. 单页应用SPA 3.前端路由的实现方式 3.1Hash模式 3.2History模式 Vue router 4 1.概述 2.安装使用 3.基础用法 3.1路由匹配规则声明 3.2动态路由匹配 3.3路由命名 3.4路由重定向 3.5路由嵌套 3.6命名视图 3.6声明式导航&编程…

el-input设置type=‘number‘和v-model.number的区别

el-input设置typenumber’与设置.number修饰符的区别 1. 设置type‘number’ 使用el-input时想收集数字类型的数据&#xff0c;我们首先会想到typenumber&#xff0c;设置完type为number时会限制我们输入的内容只能为数字&#xff0c;不能为字符/汉字等非数字类型的数值&…

【网络安全】-文件下载漏洞-pikachu

文件操作漏洞包括文件上传漏洞&#xff0c;文件包含漏洞&#xff0c;文件下载漏洞。 文章目录  前言 什么是文件下载漏洞&#xff1f; 1.常见形式&#xff1a; 常见链接形式&#xff1a; 常见参数&#xff1a; 2.利用方式&#xff1a; 3.举例&#xff1a;pikachu不安全的文件…

智能语音技术在人机交互中的应用与发展

摘要&#xff1a;本文主要探讨智能自动语音识别技术与语音合成技术在构建智能口语系统方面的作用。这两项技术实现了人机语音通信&#xff0c;建立起能听能说的智能口语系统。同时&#xff0c;引入开源 AI 智能名片小程序&#xff0c;分析其在智能语音技术应用场景下的意义与发…

使用ESP8266和OLED屏幕实现一个小型电脑性能监控

前言 最近大扫除&#xff0c;发现自己还有几个ESP8266MCU和一个0.96寸的oled小屏幕。又想起最近一直想要买一个屏幕作为性能监控&#xff0c;随机开始自己diy。 硬件&#xff1a; ESP8266 MUColed小屏幕杜邦线可以传输数据的数据线 环境 Windows系统Qt6Arduino Arduino 库…

计算架构模式之负载均衡技巧

通用负载均衡算法 负载均衡算法 -轮询 & 随机 如果服务器挂掉了&#xff0c;那么负载均衡器还是可以感知到的&#xff0c;因为连接已经断掉了。 负载均衡算法-加权轮询 假设你有4核的和8核的&#xff0c;由于你的程序没有办法跑完CPU&#xff0c;那么有可能出现4核的和8核…

Coggle数据科学 | 科大讯飞AI大赛:人岗匹配挑战赛 赛季3

本文来源公众号“Coggle数据科学”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;科大讯飞AI大赛&#xff1a;人岗匹配挑战赛 赛季3 赛题名称&#xff1a;人岗匹配挑战赛 赛季3 赛题类型&#xff1a;自然语言处理、文本匹配 赛题…

Pikachu靶场之csrf

CSRF 跨站请求伪造 CSRF入门及靶场实战 - FreeBuf网络安全行业门户 攻击者伪造恶意链接&#xff0c;诱使用户点击&#xff0c;这个链接附带了用户的认证凭据Cookie、Session等&#xff0c;执行操作如转账。 因为带了cookie、session&#xff0c;服务器认为是用户的行为。借用…

【诉讼流程-健身房-违约认定-私教课-诉讼书前提材料整理-民事诉讼-自我学习-铺平通往法律的阶梯-讲解(2)】

【诉讼流程-健身房-违约-私教课-前期法律流程-民事诉讼-自我学习-铺平通往法律的阶梯-讲解&#xff08;2&#xff09;】 &#xff08;1&#xff09;前言说明1、目的2、一个小测试1、更换原教练2、频繁更换教练3、上课估计拖课&#xff0c;占用上课时间&#xff0c;抽烟等。4、以…