Redis之stream类型解读

目录

基本介绍

数据结构

消息

消费组

消费者

基本使用命令

概述 

xadd 命令

xtrim 命令

 xdel 命令

xlen 命令

xrange 命令

xread 命令 

xgroup 命令 

xreadgroup 命令

xack 命令


基本介绍

Redis stream(流)是一种数据结构,其作用类似于仅追加日志,但也实现了多个操作来克服典型仅追加日志的一些限制。其中包括O(1)时间的随机访问和复杂的消费策略,如消费者群体。 您可以使用流实时记录和同时联合事件。 

Redis 为每个stream(流)条目生成一个唯一的 ID。可以在以后使用这些 ID 检索其关联的条目,或读取和处理流中的所有后续条目。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。stream 有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。

数据结构

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

消息

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。XADD key ID field value[field value ...]

  • 消息 ID:消息 ID 的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第5条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。
  • 消息内容:消息内容就是键值对,形如 hash 结构的键值对。

消费组

每个 Stream 都可以挂多个消费组(Consumer Group),每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息ID开始消费,这个 ID 用来初始化 last_delivered_id 变量。每个消费组的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。同一个消费组可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。

消费者

消费者内部会有一个状态变量pending_ids,维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有ack(Acknowledge character:确认字符)。如果客户端没有 ack,这个变量里面的消息 ID 就会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称为 PEL,也就是 Pending Entries List,这是一个核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没被处理。

基本使用命令

概述 

消息队列相关命令:

  • XADD - 添加消息到末尾
  • XTRIM - 对流进行修剪,限制长度
  • XDEL - 删除消息
  • XLEN - 获取流包含的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息

xadd 命令

XADD 命令将指定的流条目追加到指定 key 的流中。如果 key 不存在,将使用流的条目自动创建 key。

一个条目是由一组键值对组成的,它基本上是一个小的字典。键值对以用户给定的顺序存储,并且读取流的命令(如 XRANGE 或者 XREAD)可以保证按照通过 XADD 添加的顺序返回。

XADD key ID field value [field value ...]
  • key :队列名称,如果不存在就创建
  • ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
  • field value : 记录。
redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1601372323627-1"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"

 返回值:该命令返回添加的条目的 ID。如果 ID 参数传的是*,那么 ID 是自动生成的,否则,命令仅返回用户在插入期间指定的相同的 ID。

xtrim 命令

XTRIM 将流裁剪为指定数量的项目,如有需要,将驱逐旧的项目(ID较小的项目)。

XTRIM key MAXLEN [~] count
  • key :队列名称
  • MAXLEN :长度
  • count :数量
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"

返回值:返回从流中删除的条目数。

 xdel 命令

从指定流中移除指定的条目,并返回成功删除的条目的数量。在传递的ID不存在的情况下,返回的数量可能与传递的ID数量不同。

XDEL key ID[ID ...]
  • key:队列名称。
  • ID:消息 ID 
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"

返回值:删除的条目数量。

xlen 命令

返回流中的条目数。如果指定的key不存在,则此命令返回0,就好像该流为空。

与其他的Redis类型不同,零长度流是可能的,所以你应该调用TYPE或者EXISTS来检查一个key是否存在。一旦内部没有任何的条目(例如调用XDEL后),流不会被自动删除,因为可能还存在与其相关联的消费者组。

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3

返回值:流中包含的条目数量

xrange 命令

xrange命令返回流中满足给定ID范围的条目。范围由最小和最大ID指定。所有ID在指定的两个ID之间或与其中一个ID相等(闭合区间)的条目将会被返回。

XRANGE key start end [COUNT count]
  • key :队列名
  • start :开始值, - 表示最小值
  • end :结束值, + 表示最大值
  • count :数量
redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) "1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"

返回值:该命令返回ID与指定范围匹配的条目。返回的条目是完整的,这意味着ID和所有组成条目的字段都将返回。此外,返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。

xread 命令 

从一个或者多个流中读取数据,仅返回ID大于调用者报告的最后接收ID的条目。此命令有一个阻塞选项,用于等待可用的项目,类似于BRPOP或者BZPOPMIN等等。

XREAD[COUNT count][BLOCK milliseconds] STREAMS key[key ...]id[id ...]
  • count:数量。
  • milliseconds:可选,阻塞毫秒数,没有设置就是非阻塞模式。
  • key :队列名。
  • id:消息 ID。
redis> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

返回值:该命令返回一个结果数组:返回数组的每个元素都是一个由两个元素组成的数组(键名和为该键报告的条目)。报告的条目是完整的流条目,具有ID以及所有字段和值的列表。返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。

当使用BLOCK时,超时时将返回一个空回复(nil)。

xgroup 命令 

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key :队列名称,如果不存在就创建
  • groupname :组名。
  • $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

从头开始消费:

XGROUP CREATE mystream consumer-group-name 0-0  

从尾部开始消费:

XGROUP CREATE mystream consumer-group-name $

xreadgroup 命令

使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名
  • consumer :消费者名。
  • count : 读取数量。
  • milliseconds : 阻塞毫秒数。
  • key : 队列名。
  • ID : 消息 ID。

xack 命令

XACK命令用于从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息。当一条消息交付到某个消费者时,它将被存储在PEL中等待处理,这通常出现在作为调用XREADGROUP命令的副作用,或者一个消费者通过调用XCLAIM命令接管消息的时候。

一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理,且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存。

XACK key group ID[ID ...]

返回值:该命令返回成功确认的消息数。某些消息ID可能不再是PEL的一部分(例如因为它们已经被确认),而且XACK不会把他们算到成功确认的数量中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/96263.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uniapp接入广告的问题总结

Uniapp官方解决方案 uni-app 支持接入uni-ad广告联盟,开发者可实现一次开发,多端变现。 uni-ad 支持开屏、信息流、激励视频、视频流、悬浮红包、推送等丰富的广告形式; uni-ad 聚合了全网所有主流广告源,包括腾讯优量汇、字节…

【数学建模竞赛】各类题型及解题方案

评价类赛题建模流程及总结 建模步骤 建立评价指标->评价体系->同向化处理(都越多越好或越少越少)->指标无量纲处理 ->权重-> 主客观->合成 主客观评价问题的区别 主客观概念主要是在指标定权时来划分的。主观评价与客观评价的区别…

【单片机】UART、I2C、SPI、TTL、RS232、RS422、RS485、CAN、USB、SD卡、1-WIRE、Ethernet等常见通信方式

在单片机开发中,UART、I2C、RS485等普遍在用,这里做一个简单的介绍 UART通用异步收发器 UART口指的是一种物理接口形式(硬件)。 UART是异步(指不使用时钟同步,依靠帧长进行判断),全双工(收发…

【Axure高保真原型】中继器网格图片拖动摆放

今天和大家分享中继器网格图片拖动摆放的原型模板,我们可以通过鼠标拖动来移动图片,拖动过程其他图标会根据图片拖动自动排列,松开鼠标是图片停放在指定位置,其他图标自动排列。那这个模板是用中继器制作的,所以使用也…

Redis——》Pipeline

推荐链接: 总结——》【Java】 总结——》【Mysql】 总结——》【Redis】 总结——》【Kafka】 总结——》【Spring】 总结——》【SpringBoot】 总结——》【MyBatis、MyBatis-Plus】 总结——》【Linux】 总结——》【MongoD…

内嵌功能强大、低功耗STM32WB55CEU7、STM32WB55CGU7 射频微控制器 - MCU, 48-UFQFN

一、概述: STM32WB55xx多协议无线和超低功耗器件内嵌功能强大的超低功耗无线电模块(符合蓝牙 低功耗SIG规范5.0和IEEE 802.15.4-2011标准)。该器件内含专用的Arm Cortex -M0,用于执行所有的底层实时操作。这些器件基于高性能Arm …

想与一个大佬探讨一下,他网站说的先验分布、后验分布,似乎不对?

2_probability 我对这个问题,是这么理解的: 几个网友,一开始都和我持不同的观点。感觉这是最基本的抽样检验问题啊。。。。。。。。。。。。。。。。。。。

Mybatis学习笔记(三)——Mybatis的配置(Mybatis-config.xml)

Mybatis学习笔记(三)——Mybatis的配置(Mybatis-config.xml) 传送门:Mybatis中文网——配置 Mybatis配置文档的顶层结构: configuration(配置) properties(属性&#…

Linux用户与组管理(03)(八)

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 前言 一、组管理 1、概述 2、用户信息查看 总结 前言 今天是学习用户与组管理的最后一节课,这节课主要是组管理的内容,希望能一起学习&#xff…

C# 生成唯一ID

1.首先通过nuget安装yitter.idgenerator 下面的三行代码搞定

AP5192 DC-DC降压恒流LED汽车头灯摩托车电动车大灯电源驱动

AP5192是一款PWM工作模式,高效率、外围简单、 内置功率MOS管,适用于4.5-100V输入的高精度 降压LED恒流驱动芯片。最大电流1.5A。 AP5192可实现线性调光和PWM调光,线性调光 脚有效电压范围0.55-2.6V. AP5192 工作频率可以通过RT 外部电阻编程 来设定&…

19.CSS雨云动画特效

效果 源码 <!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><title>Cloud & Rain Animation</title><link rel="stylesheet" href="style.css"> </head> <bo…

考虑储能电池参与一次调频技术经济模型的容量配置方法(matlab代码)

目录 1 主要内容 储能参与调频原理 储能参与一次调频的充放电策略 2 部分代码 3 程序结果 4 下载链接 1 主要内容 该程序复现文献《考虑储能电池参与一次调频技术经济模型的容量配置方法》模型&#xff0c;以调频效果最优为目标&#xff0c;考虑储能参与一次调频的充放电…

VMware虚拟机---Ubuntu无法连接网络该怎么解决?

在学习使用Linux系统时&#xff0c;由于多数同学们的PC上多是Windows系统&#xff0c;故会选择使用VMware创建一个虚拟机来安装Linux系统进行学习。 安装完成之后&#xff0c;在使用时总是会遇到各种各样的问题。本片随笔就主要针对可能出现的网络问题进行一个总结&#xff0c;…

记一次PlanUML时序图学习

记一次PlanUML时序图学习 前言插件效果代码及其属性解析解析actorparticipantqueueskinparam sequenceMessageAlign centerautonumber-->xnote overalt 总结 前言 最近因为工作需要学习了使用PlanUML画时序图&#xff0c;上一次学这个还是在大学的时候&#xff0c;以为这辈…

正则表达式 之 断言详解

正则表达式的先行断言和后行断言一共有 4 种形式&#xff1a; (?pattern) 零宽正向先行断言(zero-width positive lookahead assertion)(?!pattern) 零宽负向先行断言(zero-width negative lookahead assertion)(?<pattern) 零宽正向后行断言(zero-width positive lookb…

基于遗传算法 (Genetic Algorithm, GA) 实现N个城市环游最优路径规划计算

遗传算法&#xff08;Genetic Algorithm&#xff0c;GA&#xff09;是一种基于生物进化过程的优化算法&#xff0c;通过模拟自然界的遗传和进化机制&#xff0c;寻找问题的最优解。在解决复杂和多变量的优化问题中&#xff0c;遗传算法表现出良好的鲁棒性和全局搜索能力。 遗传…

C++ deque底层原理

deque底层原理 一、目的二、底层实现三、原理图四、类结构五、push_back六、pop_back 一、目的 实现双端数组 二、底层实现 双向开口的连续线性空间 三、原理图 四、类结构 class deque : protected Deque base _Deque_base._Deque_impl M_map 指针数组 _M_map_size …

Spring——Spring基础

文章目录 1. Spring架构2. RestController vs Controller3. Autowired和Resource的区别是啥4. Spring IOC & AOP4.1 谈谈自己对于 Spring IoC 和 AOP 的理解IoCAOP 4.2 Spring AOP 和 AspectJ AOP 有什么区别&#xff1f; 5. Spring bean5.1 Spring 中的 bean 的作用域有哪…

如何能使mp3的音量变大?

如何能使mp3的音量变大&#xff1f;我们经常在日常生活中使用的一种音频格式是MP3。许多朋友在下载音乐后&#xff0c;都会选择MP3格式进行播放。然而&#xff0c;在我们的日常生活中&#xff0c;我们有时会遇到音量太小的问题。这时候&#xff0c;我们听歌可能会感到很不舒服。…