Redis队列Stream

1 缘起

项目中处理文件的场景:
将文件处理请求放入队列,
一方面,缓解服务器文件处理压力;
另一方面,可以根据文件大小拆分到不同的队列,提高文件处理效率。
这是Java开发组Leader佳汇提出的文件处理方案,非常实用。
从他那学习到之后,开始搜集Redis Stream相关的知识,整理成文,帮助开发者轻松应对知识交流和考核。

2 Redis Stream

Redis Stream是Redis 5.0.0版本新增的数据结构,想使用Stream需要Redis的最低版本是5.0
Stream是一个高性能、高可靠的消息队列,用于异步消息处理,就是传统的队列功能,完成流量削峰。Redis 5.0之前的版本就有提供队列功能,如列表、有序集合和Pub/Sub均可实现队列功能。既然Redis已经有了队列功能,为什么还要Stream这个数据结构呢?
按照正常的思考过程,新事物的出现,一般是为了解决旧事物的问题,或者,为了防止垄断,当然, 技术圈也遵循这个理论。

2.1 解决的问题

Stream的出现是为了解决原先队列存在的问题:
(1)Pub/Sub模式无法持久化消息,如果Redis网络异常或者宕机,消息会丢失;
(2)列表和有序集合的方式支持消息持久化,但是,不支持消息多播和分组消费。
Redis Stream一一解决了上面的问题,实现了消息持久化、消息多播和分组消费
Redis Stream既然是队列应用场景和其他队列一样:
(1)异步解耦;
(2)流量削峰;
不过Redis是基于内存的,如果是大量的消息数据建议选择其他消息队列,如RabbitMQ、Kafka、RocketMQ等。

2.2 架构

先来看一下Stream的总体架构:
在这里插入图片描述
Redis Stream有生产者、消费者和消费组,其中,
(1)消费组:有多个消费者,消费者之间是竞争关系,消费组中有一个last_delivered_id,消费组中的任意一消费者消费了消息,都会使last_delivered_id移动;
(2)消费者:消费者消费消息后,会产生pending_id,即消费者的状态变量,当消费者消费消息后,使用pending_ids记录被消费的消息,当客户端没有进行消费确认(ACK)时,pending_ids中的数据会一直增加,当客户端进行消息确认(ACK)后, 会移除pending_id。Redis官方称pending_ids为PEL(Pending Entries List),用于确保客户端至少消费一次消息,而不会在网络传输中丢失了处理。

2.3 数据结构

先从源码简单看下Stream相关的数据结构:

/* Stream item ID: a 128 bit number composed of a milliseconds time and
 * a sequence counter. IDs generated in the same millisecond (or in a past
 * millisecond if the clock jumped backward) will use the millisecond time
 * of the latest generated ID and an incremented sequence. */
typedef struct streamID {
    uint64_t ms;        /* Unix time in milliseconds. */
    uint64_t seq;       /* Sequence number. */
} streamID;

typedef struct stream {
    rax *rax;               /* The radix tree holding the stream. */
    uint64_t length;        /* Current number of elements inside this stream. */
    streamID last_id;       /* Zero if there are yet no items. */
    streamID first_id;      /* The first non-tombstone entry, zero if empty. */
    streamID max_deleted_entry_id;  /* The maximal ID that was deleted. */
    uint64_t entries_added; /* All time count of elements added. */
    rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
} stream;

由源码知,stream由Radix树和streamID类型的数据构成,
其中,streamID有两部分组成,ms和seq,ms即毫秒(10位),seq即序列号,
Stream中的每一条消息使用:{毫秒}-{序列号}唯一标识。

3 基础操作

3.1 新建数据:XADD

格式:

XADD key ID field value [field value ...]

参数:

XADD mystream-test * name xiaoyi age 10
XADD mystream-test * name xiaoer age 11

在这里插入图片描述

3.2 查询数据:XRANGE

格式:

XRANGE key start end [COUNT count]

参数:

参数描述
key队列名称
start起始ID标识
end结束ID标识
COUNT查询的条数

3.2.1 查询所有数据

XRANGE mystream-test - +

参数:
-:第一条数据
+:最后一条数据
使用- + 表示拆寻所有数据。
在这里插入图片描述

3.2.2 查询指定条数

XRANGE mystream-test - + COUNT 1

在这里插入图片描述

3.2.3 查询指定范围数据

XRANGE mystream-test 1697335440000-0 1697359922197-0

在这里插入图片描述

3.3 读取数据

格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

参数:

参数描述
COUNT返回的条数
BLOCK用于设置XREAD为阻塞模式,单位毫秒,默认为非阻塞模式。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。如果在这个时间内没有新的数据流入,那么输出(nil) (1.05s)

注:使用Block模式,配合 作为 I D ,表示读取最新的消息(在非阻塞模式 作为ID,表示读取最新的消息(在非阻塞模式 作为ID,表示读取最新的消息(在非阻塞模式无意义),若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。

3.3.1 直接读取

XREAD STREAMS mystream-test 0

在这里插入图片描述

3.3.2 阻塞读取

XREAD BLOCK 4000 STRRAMS mystream-test $

在这里插入图片描述

3.3.3 非阻塞读取

XREAD STREAMS mystream-test 0

在这里插入图片描述

3.4 删除数据

格式:

XDEL key ID [ID ...]

参数:

参数描述
key队列名称
ID数据ID
XDEL mystream-test 1697376922916-0

在这里插入图片描述

3.5 消费组

3.5.1 创建消费组:XGROUP

格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

参数:

参数描述
CREATE创建消费组
key队列名称
groupname消费组名称
id接收指定ID之后的消息
$接收所有的消息
参数描述
DESTROY删除消费组
key队列名称
groupname消费组名称
参数描述
DELCONSUMER删除消费组中的消费者
key队列名称
groupname消费组组名称
consumername消费者名称
# 创建接收最新消息的消费组
XGROUP CREATE mystream-test mygroup-1 $
# 创建接收所有消息的消费组
XGROUP CREATE mystream-test mygroup-2 0

在这里插入图片描述

3.5.2 删除消费组

# 删除消费组
XGROUP DESTROY mystream-test mygroup-1
XGROUP DELCONSUMER mystream-test mygroup-2

3.5.3 消费组消费消息:XREADGROUP

格式:

 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
参数描述你
group消费组名称。
consumer消费者名称。
count要读取的数量。
milliseconds阻塞时间,以毫秒为单位。
key键指定的队列名称。
ID表示消息 ID。
XREADGROUP GROUP mygroup-1 myconsumer-1 COUNT 1 BLOCK 100000 STREAMS mystream-test >

在这里插入图片描述

3.6 查看等待确认状态:XPENDING

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

在这里插入图片描述

3.7 消费信息确认:XACK

格式:

XACK key group ID [ID ...]

参数:

参数描述
key队列名称
group消费组名称
ID消息ID
XACK mystream-test mygroup-1 1698558137966-0

在这里插入图片描述

3.8 查询信息:XINFO

格式:

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

参数:
查询消费者信息

参数名称
CONSUMERS查询消费者名称
key消费者名称
groupname
查询消费组信息
参数名称
GROUPS查询消费组信息
key消费组名称
查询队列信息
参数名称
STREAM查询队列信息
key队列名称

3.8.1 查询队列信息

XINFO STREAM mystream-test

在这里插入图片描述

3.8.3 查询队列中的消费组

XINFO GROUPS mystream-test

在这里插入图片描述

3.8.4 查询队列消费组中的消费者

XINFO CONSUMERS mystream-test mygroup-1

在这里插入图片描述

4 小结

Stream的出现是为了解决原先队列存在的问题:
(1)Pub/Sub模式无法持久化消息,如果Redis网络异常或者宕机,消息会丢失;
(2)列表和有序集合的方式支持消息持久化,但是,不支持消息多播和分组消费。
Redis Stream一一解决了上面的问题,实现了消息持久化、消息多播和分组消费
Redis Stream既然是队列应用场景和其他队列一样:
(1)异步解耦;
(2)流量削峰;
不过Redis是基于内存的,如果是大量的消息数据建议选择其他消息队列,如RabbitMQ、Kafka、RocketMQ等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/111299.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络安全—小白自学

1.网络安全是什么 网络安全可以基于攻击和防御视角来分类,我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术,而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 2.网络安全市场 一、是市场需求量高; 二、则是发展相对成熟…

QT实现用本地资源管理器来打开文件夹

QString path"文件夹路径";QDesktopServices::openUrl(QUrl("file:"path, QUrl::TolerantMode)); 在windows中QT编程,使用资源管理器来打开指定本地文件夹的方法: 第一种:使用Qprocess命令(相当于在cmd命令管…

测试C#调用Aplayer播放视频(2:VideoPlayer源码学习)

参考文献1除了介绍Aplayer组件的用法之外,还提供有demo下载以供学习,本文学习并记录其中的使用方式。   VideoPlayer项目使用C#在VS2013开发,其解决方案中包括VideoPlayer和VideoPlayer两个小项目,前者基于.net framework4.0&am…

【数据分析】上市公司半年报数据分析

前言 前文介绍过使用网络技术获取上市公司半年报数据的方法,本文将对获取到的数据进行简要的数据分析。 获取数据的代码介绍在下面的两篇文章中 【java爬虫】使用selenium获取某交易所公司半年报数据-CSDN博客 【java爬虫】公司半年报数据展示-CSDN博客 全量数…

C#开发DLL,CAPL调用(CAPL>> .NET DLL)

文章目录 展示说明新建类库工程C# 代码生成dllCAPL脚本调用dll,输出结果展示 ret为dll里函数返回的值。 说明 新建类库工程 在visual studio中建立。 C# 代码 using

Python构造代理IP池提高访问量

目录 前言 一、代理IP是什么 二、代理IP池是什么 三、如何构建代理 IP 池 1. 从网上获取代理 IP 地址 2. 对 IP 地址进行筛选 3. 使用筛选出来的 IP 地址进行数据的爬取 四、总结 前言 爬虫程序是批量获取互联网上的信息的重要工具,在访问目标网站时需要频…

启动Vue项目报错Error: error:0308010C:digital envelope routines::unsupported

问题描述 启动Vue项目报错Error: error:0308010C:digital envelope routines::unsupported 出现这个一般就是node版本的问题,通过命令查看node -v查看node版本; 百度查了好多,都让我降低node版本,属实太麻烦了 在不改node版本的…

[论文笔记]BGE

引言 今天介绍论文BGE,是智源开源的语义向量模型,BAAI General Embedding。 作者发布了C-Pack,一套显著推进中文嵌入领域的资源包。包括三个重要资源: 1) C-MTEB是一个全面的中文文本嵌入基准,涵盖了6个任务和35个数据集。 2) C-MTP是一个从标记和未标记的中文语料库中选…

均值、方差、标准差

1 中间值和均值 表现"中间值"的统计名词: a.均值:   mean,数列的算术平均值,反应了数列的集中趋势,等于有效数值的合除以有效数值的个数.b.中位值:  median,等于排序后中间位置的值&#x…

c++11新特性

文章目录 1. C11简介2. 统一的列表初始化2.1 {}初始化2.2 std::initializer_list 3. 声明3.1 auto3.2 decltype3.3 nullptr 4 范围for循环5. STL中一些变化 1. C11简介 2003年,C标准委员会提交了技术勘误表(TC1)&…

Python的错误和异常处理

一、错误和异常 编程中出现的错误大致可以分为两类:错误和异常。 (一)错误 错误又可以分为两类:语法错误和逻辑错误。 1. 语法错误 语法错误又称解析错误,它是指在编写程序时,程序的语法不符合Python语言的规范,导致…

BI零售数据分析,告别拖延症,及时掌握一线信息

在日常的零售数据分析中,经常会因为数据量太大,分析指标太多且计算组合多变而导致数据分析报表难产,零售运营决策被迫拖延症。随着BI数据可视化分析技术的发展,智能化、可视化、自助分析的BI数据分析逐渐成熟,形成一套…

如何使用navicat图形化工具远程连接MariaDB数据库【cpolar内网穿透】

公网远程连接MariaDB数据库【cpolar内网穿透】 文章目录 公网远程连接MariaDB数据库【cpolar内网穿透】1. 配置MariaDB数据库1.1 安装MariaDB数据库1.2 测试局域网内远程连接 2. 内网穿透2.1 创建隧道映射2.2 测试随机地址公网远程访问3. 配置固定TCP端口地址3.1 保留一个固定的…

Vue项目搭建及使用vue-cli创建项目、创建登录页面、与后台进行交互,以及安装和使用axios、qs和vue-axios

目录 1. 搭建项目 1.1 使用vue-cli创建项目 1.2 通过npm安装element-ui 1.3 导入组件 2 创建登录页面 2.1 创建登录组件 2.2 引入css(css.txt) 2.3 配置路由 2.5 运行效果 3. 后台交互 3.1 引入axios 3.2 axios/qs/vue-axios安装与使用 3.2…

Webpack常见的插件和模式

文章目录 一、认识插件Plugin1.认识Plugin 二、CleanWebpackPlugin三、HtmlWebpackPlugin1.生成index.html分析2.自定义HTML模板3.自定义模板数据填充 四、DefinePlugin1.DefinePlugin的介绍2.DefinePlugin的使用 五、Mode配置 一、认识插件Plugin 1.认识Plugin Webpack的另一…

【HMS Core】机器学习服务热门问题合集

【关键词】 机器学习服务、文本识别、身份证识别 【问题描述1】 机器学习服务的文本识别能力,是否支持草书等? 【解决方案】 草书是不支持的,目前建议使用较为规范的字体测试。 【问题描述2】 机器学习服务是否支持训练模型?…

Flink on yarn 加载失败plugins失效问题解决

Flink on yarn 加载失败plugins失效问题解决 flink版本:1.13.6 1. 问题 flink 任务运行在yarn集群,plugins加载失效,导致通过扩展资源获取任务参数失效 2. 问题定位 yarn容器的jar包及插件信息,jar包是正常上传 源码定位 加载plugins入口,TaskMana…

Mysql权限控制语句

1.创建用户 create user ky32localhost IDENTIFIED by 123456 create user:创建用户开头 ky32:用户名 localhost 新建的用户可以在哪些主机上登录 即可以使用ip地址,网段主机名 ky32localhost ky32192.168.233.22 ky32192.168.233.0/2…

如何在mac 安装 cocos 的 android环境

基本概念: Java: Java 是一种编程语言,由Sun Microsystems(现在是 Oracle Corporation)开发。Java 是一种跨平台的语言,可以用于开发各种应用程序,包括 Android 应用程序。Android 应用程序的核心代码通常用…

在 Visual Studio 中远程调试 C++ 项目

目录 一、说明二、下载远程工具1. 官网下载2. 自己电脑上拷贝 三、 运行远程工具四、本机Visual Studio配置五、自动部署 一、说明 参考官方文档:https://learn.microsoft.com/zh-cn/visualstudio/debugger/remote-debugging-cpp?viewvs-2022 二、下载远程工具 …