redis stream restTemplate消息监听队列框架搭建

整体思路

        1. pom增加redis依赖;

        2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;

        3. 将消息订阅bean及监听器注册到配置中;

1. pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.6</version>
    </parent>




<dependencies>

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2. 消息监听器实现代码

package cn.thuniwhir.fileserver.redis;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Description: TODO
 **/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {

    private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);

    // 创建一个线程池
    private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    @Override
    public void onMessage(MapRecord message) {
        // 异步处理消息
        threadPoolExecutor.execute(()->{
            System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));
        });

    }
}

3. redis订阅bean及监听器注册

package cn.thuniwhir.fileserver.redis;

import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import java.time.Duration;
import java.util.stream.Collectors;

/**
 * @Description: TODO
 **/
@Configuration
public class RedisMQConfig {

    @Autowired
    private RedisMQListener redisMQListener;

    @Autowired
    private RedisUtils redisUtils;

    private static RedisTemplate<Object, Object> redisTemplate;
    private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);

    public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory) {
        if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {
            StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);
            if (xInfoGroups.isEmpty()) {
                redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
            } else {
                if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {
                    redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
                }
            }
        } else {
            redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
        }
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                .pollTimeout(Duration.ofSeconds(1)).build();
        StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
        Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);
        streamMessageListenerContainer.start();
        return subscription;
    }

}

4. 测试生产消息 消息监听成功

4.1 生产消息

@RequestMapping("/produceMessage")
    public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {
        String key = jsonObject.getString("key");
        String value = jsonObject.getString("value");
        MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));
        redisTemplate.opsForStream().add(mapRecord);
        System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());
        return formatResult(null);
    }

4.2 消息监听器监听消息到达 代码见第二节

4.3 测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/312805.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue的mvvm模式

1.mvvm优点&#xff1a; 低耦合&#xff1a;视图&#xff08;View&#xff09;可以独立于Model变化和修改&#xff0c;一个ViewModel可以绑定到不同的View上&#xff0c;当View变化的时候Model可以不变&#xff0c;当Model变化的时候&#xff0c;View也可以不变。 可复用&…

PostgreSQL 配置文件、数据储存目录

文章目录 查询配置文件所在位置查询数据储存目录PostgreSQL的数据目录 查询配置文件所在位置 show config_file; -- 查询配置文件所在位置查询数据储存目录 show data_directory; -- 查询数据储存目录PostgreSQL的数据目录 在PostgreSQL的数据目录&#xff08;C:\Program…

android studio使用总结

gradle是项目构建的工具&#xff0c;在gradle-wrapper.properties这个文件中设置&#xff0c; 然后就会下载相应版本的安装包到这个路径C:\Users\ly.gradle\wrapper\dists&#xff0c;例如这里是7.0.2&#xff0c; gradle和studio中的jdk版本需要对应&#xff0c;否则无法构建项…

使用numpy处理图片——图片拼接

大纲 左右拼接上下拼接 在《使用numpy处理图片——图片切割》一文中&#xff0c;我们介绍了如何使用numpy将一张图片切割成4部分。本文我们将反其道而行之&#xff0c;将4张图片拼接成1张图片。 基本的思路就是先用两张图以左右结构拼接成上部&#xff0c;另外两张图也以左右拼…

实现用户注册功能

实现用户注册功能 注&#xff1a;打赏即可获得一对一线下辅导&#xff0c;机不可失&#xff0c;时不再来

软件系统培训方案(Word)

1. 培训概述 2. 培训目的 3. 培训对象及要求 3.1. 培训对象 3.2. 培训人员基本要求 4. 培训方式 5. 培训内容 6. 培训讲师 7. 培训教材 8. 培训质量保证 8.1. 用户培训确认报告 8.2. 培训疑问解答 软件开发全文档下载&#xff1a;软件项目开发全套文档下载_软件项目文档-CSDN博…

MySQL——SQL语句进阶

select * from 表 where 条件 group by 条件 order by 排序 limit 分组 Group by select * from 表 group by 条件 结果为每个分组的第一条记录&#xff0c;该条记录作为该组的标志 select * from subject GROUP BY gradeidselect count(1),gradeid from subject GROUP B…

vue3+ts+vite项目从0 搭建,配置安装router/pinia/element-plus/scss等

一、安装vite环境 官网&#xff1a;https://cn.vitejs.dev/guide/why.html npm init vite1.选择vue 2.选择typescipt 3.创建成功 默认项目结构如下 4.安装项目依赖 npm install 5.启动项目 npm run dev二。安装配置scss 1.运行安装scss npm install -D sass sass-loa…

[UI5] ODATA V4中的CRUD

文章目录 前言一、Read二、Create三、Update四、Delete 前言 ODATA V4在CRUD方面与V2截然不同。 这篇文章简单介绍V4中是如何进行CRUD操作 一、Read Model不再有read方法&#xff0c; 一般是把Path绑定到View中进行读取&#xff0c; 如果需要额外的读取数据&#xff0c;可使用…

Vant-ui图片懒加载

核心代码 在你的全局顶部引入和初始化 Vue.use(vant.Lazyload, {loading: /StaticFile/img/jiazai.jpg,error: /StaticFile/img/jiazai.jpg,lazyComponent: false, });//图片懒加载 <img v-lazy"https://img-blog.csdnimg.cn/direct/3d2c8a7e2c0040488a8128c3e381d58…

ubuntu20.04 deepstream 6.3安装

1.基础环境gstreamer sudo apt install \ libssl-dev \ libgstreamer1.0-0 \ gstreamer1.0-tools \ gstreamer1.0-plugins-good \ gstreamer1.0-plugins-bad \ gstreamer1.0-plugins-ugly \ gstreamer1.0-libav \ libgstreamer-plugins-base1.0-dev \ libgstrtspserver-1.0-0 …

基于JavaWeb+BS架构+SpringBoot+Vue+Hadoop短视频流量数据分析与可视化系统的设计和实现

基于JavaWebBS架构SpringBootVueHadoop短视频流量数据分析与可视化系统的设计和实现 文末获取源码Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 文末获取源码 Lun文目录 目  录 目  录 I 1绪 论 1 1.1开发背景 1 1.2开…

IDEA创建springboot+mybatis项目(java8 和java21可行)

IDEA创建springbootmybatis项目&#xff08;java8 和java21可行&#xff09; 今天博主讲一下&#xff0c;IDEA创建springbootmybatis项目的文章。 步骤分别是如下几步&#xff1a; 1. 创建maven项目 2. 配置pom.xml文件 3. 创建目录结构 4. 创建配置项目文件 5. 生成创建…

【pytorch】使用pytorch构建线性回归模型-了解计算图和自动梯度

使用pytorch构建线性回归模型 线性方程的一般形式 衡量线性损失的一般形式-均方误差 pytorch中计算图的作用和优势 在 PyTorch 中&#xff0c;计算图&#xff08;Computational Graph&#xff09;是一种用于表示神经网络运算的数据结构。每个节点代表一个操作&#xff0c;例如…

报错:NVIDIA-SMI has failed because it couldn‘t communicate with the NVIDIA driver

记一次关于驱动报错的问题 背景 原始驱动版本515&#xff0c;cuda 11.5.。 要将cuda 版本升级到11.7 内容 我去nvidia官网下载了 11.7.1的cuda tools nvidia CUDA 下载。 按照步骤安装后&#xff0c;执行nvcc -V ,可以看到已经正常更新 但是执行 nvidia-smi 时报错 NVIDIA…

67.网游逆向分析与插件开发-角色数据的获取-分析角色数据基址

内容参考于&#xff1a;易道云信息技术研究院VIP课 上一个内容&#xff1a;角色类的数据分析与C还原-CSDN博客 基址这个东西说好找也好找&#xff0c;说不好找是真找不着&#xff0c;但就根据一个原则&#xff0c;就是确认这个东西有基址还是没基址&#xff0c;为什么会有没基…

专搞大厂?免费开源?这个小工具我相信很多人需要!

软件简介&#xff1a; 软件【下载地址】获取方式见文末。注&#xff1a;推荐使用&#xff0c;更贴合此安装方法&#xff01; XHS-Downloader v1.6是一款功能齐全的免费开源工具&#xff0c;它使用Python Requests库开发而成&#xff0c;用于采集和下载X红S作品。该工具具备多…

thinkphp美容SPA管理系统源码带文字安装教程

thinkphp美容SPA管理系统源码带文字安装教程 运行环境 服务器宝塔面板 PHP 7.0 Mysql 5.5及以上版本 Linux Centos7以上 基于thinkphp3.23B-JUI1.2开发&#xff0c;权限运用了Auth类认证&#xff0c;权限可以细分到每个功能&#xff0c; 增删改查功能一应俱全&#xff0c;整合了…

数据结构与算法教程,数据结构C语言版教程!(第三部分、栈(Stack)和队列(Queue)详解)四

第三部分、栈(Stack)和队列(Queue)详解 栈和队列&#xff0c;严格意义上来说&#xff0c;也属于线性表&#xff0c;因为它们也都用于存储逻辑关系为 "一对一" 的数据&#xff0c;但由于它们比较特殊&#xff0c;因此将其单独作为一章&#xff0c;做重点讲解。 使用栈…

突然又对 Go 感兴趣,GOPATH entry cannot start with shell metacharacter 错误

打发无聊时间&#xff0c;水文一篇&#xff5e; 事情是这样的&#xff0c;因为我们上架的渠道包基本是定制化混淆出包&#xff0c; 混淆出包有一个关键点就是指定映射文件&#xff0c;映射文件的内容有一部分是使用外部工具在打包前按照一定规律随机生成包名、类名&#xff0c…