【Java】SpringBoot中实现Redis Stream队列

SpringBoot实现Redis Stream队列

前言

简单实现一下在SpringBoot中操作Redis Stream队列的方式,监听队列中的消息进行消费。

jdk:1.8

springboot-version:2.6.3

redis:5.0.1(5版本以上才有Stream队列)

准备工作

1pom

redis 依赖包(version 2.6.3)

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2 yml

spring: 
  redis:
    database: 0
    host: 127.0.0.1

3 RedisStreamUtil工具类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

@Component
public class RedisStreamUtil {

	@Autowired
	private RedisTemplate<String, Object> redisTemplate;

	/**
	 * 创建消费组
	 *
	 * @param key   键名称
	 * @param group 组名称
	 * @return {@link String}
	 */
	public String oup(String key, String group) {
		return redisTemplate.opsForStream().createGroup(key, group);
	}

	/**
	 * 获取消费者信息
	 *
	 * @param key   键名称
	 * @param group 组名称
	 * @return {@link StreamInfo.XInfoConsumers}
	 */
	public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {
		return redisTemplate.opsForStream().consumers(key, group);
	}

	/**
	 * 查询组信息
	 *
	 * @param key 键名称
	 * @return
	 */
	public StreamInfo.XInfoGroups queryGroups(String key) {
		return redisTemplate.opsForStream().groups(key);
	}

	// 添加Map消息
	public String addMap(String key, Map<String, Object> value) {
		return redisTemplate.opsForStream().add(key, value).getValue();
	}

	// 读取消息
	public List<MapRecord<String, Object, Object>> read(String key) {
		return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
	}

	// 确认消费
	public Long ack(String key, String group, String... recordIds) {
		return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
	}

	// 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
	public Long del(String key, String... recordIds) {
		return redisTemplate.opsForStream().delete(key, recordIds);
	}

	// 判断是否存在key
	public boolean hasKey(String key) {
		Boolean aBoolean = redisTemplate.hasKey(key);
		return aBoolean != null && aBoolean;
	}
}


代码实现

生产者发送消息

生产者发送消息,在Service层创建addMessage方法,往队列中发送消息。

代码中addMap()方法第一个参数为key,第二个参数为value,该key要和后续配置的保持一致,暂时先记住这个key。

@Service
@Slf4j
@RequiredArgsConstructor
public class RedisStreamMqServiceImpl implements RedisStreamMqService {

    private final RedisStreamUtil redisStreamUtil;

    /**
     * 发送一个消息
     *
     * @return {@code Object}
     */
    @Override
    public Object addMessage() {
        RedisUser redisUser = new RedisUser();
        redisUser.setAge(18);
        redisUser.setName("hcr");
        redisUser.setEmail("156ef561@gmail.com");

        Map<String, Object> message = new HashMap<>();
        message.put("user", redisUser);

        String recordId = redisStreamUtil.addMap("mystream", message);
        return recordId;
    }
}

controller接口方法

@RestController
@RequestMapping("/redis")
@Slf4j
@RequiredArgsConstructor
public class RedisController {

    private final RedisStreamMqService redisStreamMqService;

    @GetMapping("/addMessage")
    public Object addMessage() {
        return redisStreamMqService.addMessage();
    }
}

调用测试,查看redis中是否正常添加数据。

接口返回数据

1702622585248-0

查看redis中的数据
在这里插入图片描述

消费者监听消息进行消费

创建RedisConsumersListener监听器

import cn.hcr.utils.RedisStreamUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@Slf4j
@RequiredArgsConstructor
public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {

    public final RedisStreamUtil redisStreamUtil;

    /**
     * 监听器
     *
     * @param message
     */
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        // stream的key值
        String streamKey = message.getStream();
        //消息ID
        RecordId recordId = message.getId();
        //消息内容
        Map<String, String> msg = message.getValue();
        log.info("【streamKey】= " + streamKey + ",【recordId】= " + recordId + ",【msg】=" + msg);

        //处理逻辑

        //逻辑处理完成后,ack消息,删除消息,group为消费组名称
        StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamKey);
        xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
        redisStreamUtil.del(streamKey, recordId.getValue());
    }
}

创建RedisConfig配置类,配置监听

package cn.hcr.config;

import cn.hcr.listener.RedisConsumersListener;
import cn.hcr.utils.RedisStreamUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
@Slf4j
public class RedisConfig {

    @Resource
    private RedisStreamUtil redisStreamUtil;

    /**
     * redis序列化
     *
     * @param redisConnectionFactory
     * @return {@code RedisTemplate<String, Object>}
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory factory) {
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer
                        .StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条消息
                        .batchSize(5)
                        .executor(executor)
                        .pollTimeout(Duration.ofSeconds(1))
                        .errorHandler(throwable -> {
                            log.error("[MQ handler exception]", throwable);
                            throwable.printStackTrace();
                        })
                        .build();
        
        //该key和group可根据需求自定义配置
        String streamName = "mystream";
        String groupname = "mygroup";

        initStream(streamName, groupname);
        var listenerContainer = StreamMessageListenerContainer.create(factory, options);
        // 手动ask消息
        Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),
                StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumersListener(redisStreamUtil));
        // 自动ask消息
           /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
        listenerContainer.start();
        return subscription;
    }

    private void initStream(String key, String group) {
        boolean hasKey = redisStreamUtil.hasKey(key);
        if (!hasKey) {
            Map<String, Object> map = new HashMap<>(1);
            map.put("field", "value");
            //创建主题
            String result = redisStreamUtil.addMap(key, map);
            //创建消费组
            redisStreamUtil.oup(key, group);
            //将初始化的值删除掉
            redisStreamUtil.del(key, result);
            log.info("stream:{}-group:{} initialize success", key, group);
        }
    }
}


redisTemplate:该bean用于配置redis序列化

subscription:配置监听

initStream:初始化消费组

监听测试

使用addMessage()方法投送一条消息后,查看控制台输出信息。

【streamKey】= mystream,
【recordId】= 1702623008044-0,
【msg】=
{user=[
    "cn.hcr.pojo.RedisUser",
    {"name":"hcr","age":18,"email":"156ef561@gmail.com"}
    ]
}

总结

以上就是在SpringBoot中简单实现Redis Stream队列的Demo,如有需要源码或者哪里不清楚的请评论或者发送私信。
Template:该bean用于配置redis序列化

subscription:配置监听

initStream:初始化消费组

监听测试

使用addMessage()方法投送一条消息后,查看控制台输出信息。

【streamKey】= mystream,
【recordId】= 1702623008044-0,
【msg】=
{user=[
    "cn.hcr.pojo.RedisUser",
    {"name":"hcr","age":18,"email":"156ef561@gmail.com"}
    ]
}

总结

以上就是在SpringBoot中简单实现Redis Stream队列的Demo,如有需要源码或者哪里不清楚的请评论或者发送私信。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/251967.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++】STL 容器 - string 字符串操作 ⑤ ( string 字符串查找 | find 函数查找字符串 | rfind 函数查找字符串 )

文章目录 一、string 字符查找 - find 函数查找字符串1、string 类 find 函数原型说明2、代码示例 - 字符串查找3、代码示例 - 统计字符串子串 二、string 字符查找 - rfind 函数查找字符串1、string 类 rfind 函数原型说明2、代码示例 - rfind 字符串查找 一、string 字符查找…

前端常用去重的几种方式

文章目录 方式1: ES6新语法方式2: 遍历 利用filter方式3: 使用 new Map() for循环方式4: 利用 hasOwnProperty总结 在github 查看该文章 方式1: ES6新语法 过滤出网页中不重复的html标签 结合去重知识点考查 […new Set([…document.querySelectorAll(‘*’)].map(v>v.t…

HTML5+CSS3小实例:3D发光切换按钮效果

目录 一、运行效果 图片效果 二、项目概述 三、开发环境 四、实现步骤及代码 1.创建空文件夹 2.完成页面内容 3.完成css样式 五、项目总结 六、源码获取 一、运行效果 图片效果 二、项目概述 这个项目是一个演示3D发光切换按钮效果的网页。按钮由一个开关和一个指…

spring6 基于xml自动装配

目录结构 代码 UserContronller.java package bean.auto.controller;import bean.auto.service.UserService; import bean.auto.service.UserServiceImpl;public class UserContronller {private UserService userService;public void setUserService(UserService userServ…

智能配电房在线监测系统

智能配电房在线监测系统是一个综合性的系统&#xff0c;依托电力智慧运维工具-电易云&#xff0c;主要用于监控和调整配电房的环境、安防和电气设备状态。以下是该系统的一些主要功能和特点&#xff1a; 环境监控&#xff1a;实时监测配电房内的温度、湿度、SF6气体浓度、臭氧浓…

Cmake基础(4)

这篇文章在上一篇的基础之上应用多文件&#xff0c;即一个项目中添加多个文件 文章目录 GLOBsource_group排除文件 上一篇文章的cmake基本不变&#xff0c;这篇文章的重点在于add_executable(${EXECUTABLE_NAME} main.cpp) GLOB file(GLOB cpp_list ${CMAKE_CURRENT_SOURCE_…

【Python特征工程系列】8步教你用决策树模型分析特征重要性(源码)

一、问题 如果有一个包含数十个甚至数百个特征的数据集&#xff0c;每个特征都可能对你的机器学习模型的性能有所贡献。但是并不是所有的特征都是一样的。有些可能是冗余的或不相关的&#xff0c;这会增加建模的复杂性并可能导致过拟合。特征重要性分析可以识别并关注最具信息量…

浅谈MapReduce

MapReduce是一个抽象的分布式计算模型&#xff0c;主要对键值对进行运算处理。用户需要提供两个自定义函数&#xff1a; map&#xff1a;用于接受输入&#xff0c;并生成中间键值对。reduce&#xff1a;接受map输出的中间键值对集合&#xff0c;进行sorting后进行合并和数据规…

linux下的strerror和perror处理错误函数

strerror和perror是C语言中用于处理错误信息的函数。 strerror函数&#xff1a; strerror函数用于将错误码转换为对应的错误消息字符串。它接受一个整数参数&#xff0c;通常是由系统调用或库函数返回的错误码&#xff0c;然后返回一个描述该错误的字符串。 函数原型&#xff1…

双指针训练

1.原理 双指针是一种解题常用方法&#xff0c;常用于将数组按照某种要求进行分块/划分&#xff0c;这里的指针对于数组来说&#xff0c;可以替换成下标&#xff08;毕竟使用下标实际上就是用了指针&#xff09;。 1.1.划分区间 通常将这两个指针命名位dest/cur&#xff08;或…

AOP切入点表达式和使用连接点获取匹配到的方法信息

目录 第一种 execution(访问修饰符? 返回值 包名.类名.?方法名(方法参数) throws 异常?) 第二种 annotation(com.itheima.anno.Log 首先&#xff0c;自定义一个注解&#xff0c;可以自己随意命名&#xff1a; 第一种 execution(访问修饰符? 返回值 包名.类名.?方法名…

网络编程-认识套接字socket

文章目录 套接字概念端口号网络字节序 套接字类型流套接字数据报套接字 socket常见APIsocket函数bind函数listen函数accept函数connect函数sockaddr结构 套接字概念 socket套接字是进程之间一种通信机制&#xff0c;通过套接字可以在不同进程之间进行数据交流。在TCP/UDP中&…

【golang/g3n】3D游戏引擎G3N的windows安装与测试

目录 说在前面安装测试 说在前面 操作系统&#xff1a;win 11go version&#xff1a;go1.21.5 windows/amd64g3n版本&#xff1a;github.com/g3n/engine v0.2.0其他&#xff1a;找了下golang 3d相关的库&#xff0c;目前好像就这个比较活跃 安装 按照官方教程所说&#xff0c;…

Electron框架:构建跨平台桌面应用的终极解决方案

文章目录 一、Electron框架简介二、Electron框架的优势1. 开发效率高2. 跨平台性能好3. 易于维护4. 强大的原生能力 三、如何使用Electron框架快速开发跨平台桌面应用1. 安装Electron2. 创建项目文件夹3. 编写主进程代码4. 编写界面代码5. 运行应用 《Electron入门与实战》编辑…

nginx_rtmp_module 之 ngx_rtmp_mp4_module 的mp4源码分析

一&#xff1a;整体代码函数预览 static ngx_int_t ngx_rtmp_mp4_postconfiguration(ngx_conf_t *cf) {ngx_rtmp_play_main_conf_t *pmcf;ngx_rtmp_play_fmt_t **pfmt, *fmt;pmcf ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_play_module);pfmt ngx_ar…

极兔速递物流查询,用表格导出单号的每一条物流信息

批量查询极兔速递单号的物流信息&#xff0c;并以表格的形式导出单号的每一条物流信息。 所需工具&#xff1a; 一个【快递批量查询高手】软件 极兔速递单号若干 操作步骤&#xff1a; 步骤1&#xff1a;运行【快递批量查询高手】软件&#xff0c;第一次使用的朋友记得先注册…

HPV治疗期间如何预防重复感染?谭巍主任讲述具体方法

众所周知&#xff0c;人乳头瘤病毒(HPV)是一种常见的性传播疾病&#xff0c;感染后可能会引起生殖器疣、宫颈癌等疾病。在治疗期间&#xff0c;预防重复感染非常重要。今日将介绍一些预防HPV重复感染的方法。 1. 杜绝不洁性行为 在治疗期间&#xff0c;患者应该避免与感染HPV…

CT成像技术—20231210

本文要说的是扇束重排&#xff0c;对于扇束及锥束直接重建公式&#xff0c;可以看我做的PDF https://github.com/leslielee619/CTRec/blob/main/重建公式.pdf 在说重排之前&#xff0c;我还想对那个文件内容补充两点&#xff1a; 1、FDK算法或Feldkamp算法&#xff0c;出自Fel…

linux下开放端口的方法

为了辅助我们查看端口状态&#xff0c;本文采用nmap扫描端口 目标机&#xff1a;192.168.241.1&#xff0c;本文的目的是开启22端口 我们可以根据端口状态&#xff08;filtered&#xff09;看出&#xff0c;端口处于过滤状态&#xff0c;即防火墙过滤了该端口 PS&#xff1a;…

PCB设计规则中的经验公式_笔记

PCB设计规则中的经验公式 规则1 - 临界长度规则2 - 信号带宽与上升时间规则3- 时钟信号带宽规则4-信号传输速度规则5- 集肤 (效应) 深度规则6 - 50Ω传输线电容规则7 - 50Ω传输线电感规则8 - 回流路径电感规则9 - 地弹噪声规则10- 串行传输比特率与信号带宽规则11- PCB走线直流…