Redis Stream Redisson Stream

目录

    • 一、Redis Stream
      • 1.1 场景1:多个客户端可以同时接收到消息
        • 1.1.1 XADD - 向stream添加Entry(发消息 )
        • 1.1.2 XREAD - 从stream中读取Entry(收消息)
        • 1.1.3 XRANGE - 从stream指定区间读取Entry(收消息)
      • 1.2 场景2:多个客户端仅收到一部分消息(分片sharded、消费组group)
        • 1.2.1 XGROUP CREATE - 创建消费组
        • 1.2.2 XREADGROUP - 从消费组中读取消息
        • 1.2.3 XACK - 确认消息
        • 1.2.4 XPENDING - 读取PEL消息
        • 1.2.5 XCLAIM & XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者
        • 1.2.6 统计命令
      • 1.3 其他
    • 二、Redisson Stream

一、Redis Stream

之前介绍过Redis Pub/Sub相关内容,通过Redis Pub/Sub可以实现发布/订阅消息传递范式,但是存在丢消息的可能,而本文介绍的Redis Stream是一种可用来实现 可靠消息队列、支持消息分组(类似Kafka Group) 的数据结构。

关于Redis Stream的使用存在如下2个场景

  • 场景1: 多个客户端可以同时接收到消息
  • 场景2: 多个客户端仅收到一部分消息(分片sharded),例如发送消息A,B,C,客户端1收到A,C,客户端2收到B(参考Kafka group概念)。

关于场景1,则可参考XADD、XREAD、XRANGE等相关命令的使用,
关于场景2,则需要了解XGROUP CREATE、XREADGROUP、XACK等相关命令的使用。

1.1 场景1:多个客户端可以同时接收到消息

场景1中相关命令XADD、XREAD、XRANGE的使用汇总如下图:
在这里插入图片描述

1.1.1 XADD - 向stream添加Entry(发消息 )

向stream添加Entry(多个key/value对),XADD命令格式:

XADD stream名称 id key1 value1 key2 value2 …

其中id为此次entry的唯一ID,而key1 value1 key2 value2 …即为entry的具体内容,
id为*则表示由Redis自动生成ID:<millisecondsTime>-<sequenceNumber>
亦可明确指定id。

示例:

XADD mystream * name 罗 age 18
XADD mystream 1692632086370-0 name 刘 age 18
1.1.2 XREAD - 从stream中读取Entry(收消息)

从stream中读取entry,XREAD命令格式:

XREAD COUNT 最多读取数量 BLOCK 阻塞等待毫秒数 STREAMS stream名称 上次接收的id

通过XADD添加一条消息,多个执行XREAD的客户端都会读取到该消息,
XREAD会从参数中指定的 上次接收的id 之后开始读取后续的消息,
上次接受的id 可设置为$,需配合BLOCK使用,表示仅读取从阻塞开始后新添加的消息(即不关心历史消息),
上次接受的id 可设置为+,需要Redis版本>=7.4 RC1,表示仅读取最后一条消息。
阻塞等待的毫秒数 如果为0,则表示一直阻塞,直到读取到一条消息。

示例:

# 从头开始读取1条消息
XREAD STREAMS mystream 0

# 从头开始读取2条消息
XREAD COUNT 2 STREAMS mystream 0-0
# 从指定消息ID之后开始读取2条消息
XREAD COUNT 2 STREAMS mystream 1692632086370-0

# 最长阻塞5秒,最多读取100条消息,仅读取从阻塞开始后新添加的消息
XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
# 继续从上次接受的id之后继续读取
XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3

# 读取最后一条消息(需要Redis版本>=7.4 RC1)
XREAD STREAM mystream +
1.1.3 XRANGE - 从stream指定区间读取Entry(收消息)

从stream指定区间(起始ID范围)正向读取Entry,XRANGE命令格式:

XRANGE stream名称 起始id 结束id COUNT 最多读取数量

按起始到结束正向返回消息,
-表示最小ID,+表示最大ID

示例:

# 返回全部消息(从前到后依次返回)
XRANGE mystream - + 
# 返回5条消息(从前到后依次返回)
XRANGE mystream - + COUNT 5

# 返回指定id(包括指定id)之后5条消息(从前到后依次返回)
XRANGE mystream 1718951980910-0 + COUNT 5

# 返回指定id(不包括指定id)之后5条消息(从前到后依次返回)
XRANGE mystream (1718951980910-0 + COUNT 5

从stream指定区间(起始ID范围)逆向读取Entry,XREVRANGE命令格式:

XREVRANGE stream名称 结束id 起始id COUNT 最多读取数量

按结束到起始逆向返回消息。

示例:

返回全部消息(从后到前逆向依次返回)
XREVRANGE mystream + -
# 返回2条消息(从后到前逆向依次返回)
XREVRANGE mystream + - COUNT 2

1.2 场景2:多个客户端仅收到一部分消息(分片sharded、消费组group)

场景2中相关命令XGROUP CREATE、XREADGROUP、XACK、XPENDING、XCLAIM等使用汇总如下图:

在这里插入图片描述

1.2.1 XGROUP CREATE - 创建消费组

给stream创建消费分组,分组间彼此隔离,分组内多个consumer会轮流消费消息(分片),XGROUP CREATE命令格式:

XGROUP CREATE stream名称 group名称 起始读取id [MKSTREAM]

起始读取id0,表示从头开始读取,
起始读取id$,表示从最后一条消息之后开始读取,
MKSTREAM子命令是可选的,表示自动创建stream。

示例:

# 为mystream创建分组mygroup1,且从最新消息开始消费
 XGROUP CREATE mystream mygroup1 $
1.2.2 XREADGROUP - 从消费组中读取消息

以分组group读取stream中的消息,group中每个客户端需要指定consumer名称,多个consumer分摊group中的消息,而多个group间彼此隔离,XREADGROUP命令格式:

XREADGROUP GROUP group名称 consumer名称 COUNT 最多读取数量 BLOCK 阻塞等待毫秒数 [NOACK] STREAMS stream名称 上次接收的id

PEL(Pending Entries List): 当使用XREADGROUP读取分组下消息时,服务器会记住哪条消息发给了分组下的哪个消费者,该记录存储在消费者组中,称为PEL,即已发送但尚未确认的消息ID列表。后续在消费者处理完消息后,消费者必须手动调用XACK命令对消息ID进行确认,以便从PEL中删除挂起的消息,关于PEL的结构可参见下图(截取自RedisInsight工具):
在这里插入图片描述

上次接收的id>,表示消费者只希望接收从未传递给任何其他消费者的消息,即给我新的信息>号表示从当前消费组的last_delivered_id后面开始读。
上次接收的id 设为0或其他有效的id,则表示仅读取 PEL(当前consumer没有确认的消息) 中指定id之后的消息。

NOACK子命令式可选的,表示无需确认消息,NOACK子命令适用于对可靠性要求不高、偶尔的消息丢失是可以接受的情况,使用NOACK子命令可以避免将消息添加到PEL( Pending Entries List),相当于在读取消息后自动确认消息,后续无需再调用XACK命令进行确认,

示例:

# 消费者c1阻塞读取mystream下分组mygroup1的最新消息(直到读取到1条消息后解除阻塞)
XREADGROUP GROUP mygroup1 c1 BLOCK 0 STREAMS mystream >

# 消费者c1读取mystream下分组mygroup1的PEL消息(即已投递给c1但c1未进行确认的消息列表)
XREADGROUP GROUP mygroup1 c1 STREAMS mystream 0
1.2.3 XACK - 确认消息

确认stream下指定分组group的某条消息已被成功消费,XACK命令格式:

XACK stream名称 group名称 消息id

示例:

# 确认1条消息 
XACK mystream mygroup1 1719206857966-0 

# 同时确认3条消息
XACK mystream mygroup1 1719206857966-0 1719206909894-0 1719207195666-0
1.2.4 XPENDING - 读取PEL消息

读取stream中指定分组group的PEL挂起消息列表,XPENDING命令格式:

XPENDING stream名称 group名称 IDEL 空闲毫秒数 起始消息id 结束消息id 查询数量 consumer名称

示例:

# 查询mystream下mygroup1分组的PEL列表
XPENDING mystream mygroup1

# 查询mystream下mygroup1分组下的消费者c1的空闲9秒的最多10条PEL消息
XPENDING mystream mygroup1 IDLE 9000 - + 10 c1
1.2.5 XCLAIM & XAUTOCLAIM - 转移PEL中消息的所有权给其他消费者

通过XPENDING查询出PEL消息(已投递未确认)后,若原先消息对应的consumer已经挂掉,没有能力继续处理消息,则可通过XCLIAM将对应的消息转移给同分组下的其他consumer进行处理,XCLAIM命令格式如下:

XCLAIM stream名称 group名称 consumer名称 空闲时长毫秒 消息id1 消息id2

转移后消息上次投递时间会重置为当前时间(即消息空闲idle时间为0),
默认会返回已经转移成功的消息内容,且消息投递计数会加1,
也可添加JUSTID子命令,则只返回消息ID不返回消息内容,且消息投递计数不变,
若多个客户端同时通过XCLAIM转移同一条消息的所有权,则只会有一个客户端转移成功。
Redis官方原文如下:

Note that the message is claimed only if its idle time is greater than the minimum idle time we specify when calling XCLAIM. Because as a side effect XCLAIM will also

  • reset the idle time (since this is a new attempt at processing the message),
  • two consumers trying to claim a message at the same time will never both succeed: only one will successfully claim the message. This avoids that we process a given message multiple times in a trivial way (yet multiple processing is possible and unavoidable in the general case).

示例:

# mystream下mygroup1分组下的PEL消息1526569498055-0且空闲时长超过1小时,则将其转移给消费者c2
XCLAIM mystream mygroup1 c2 3600000 1526569498055-0

亦可通过XAUTOCLAIM将PEL中指定起始消息ID后的消息批量进行转移,XAUTOCLIAM命令格式如下:

XAUTOCLAIM stream名称 group名称 consumer名称 空闲时长毫秒 起始消息id COUNT 消息数量

示例:

# 扫描mystream下mygroup1分组下的所有PEL消息,空闲时长超过1小时,则最多转移25条消息给消费者c2
XAUTOCLAIM mystream mygroup1 c2 3600000 0-0 COUNT 25
1.2.6 统计命令
# 查询stream下的分组信息
XINFO GROUPS stream名称

# 查询stream信息
XINFO STREAM stream名称

# 查询stream下指定分组的消费者信息
XINFO CONSUMERS stream名称 group名称

1.3 其他

删除stream中的消息:

XDEL stream名称 id1 id2 …

查询stream中的消息(entry)数量:

XLEN stream名称

压缩stream中的消息数据量:

XTRIM stream名称 MAXLEN 保留的最近消息数量
XTRIM stream名称 MINID 消息ID(小于此ID的消息均会被删除)

二、Redisson Stream

在Redisson中可通过Stream实现Redis Stream,

场景1 相关示例代码如下:

@Test
void testStream() throws InterruptedException {
    String streamName = "mystream";
    MyMessage2 myMessage = this.buildMyMessageWithTimestampId();

    //获取Stream
    RStream<String, Object> stream = this.redisson.getStream(streamName);

    //发消息 - XADD mystream * name 我的消息 age 18
    StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));
    log.info("stream[{}] add success, id: {}", streamName, entryId);

    //读消息 - XREAD COUNT 5 BLOCK 5000 STREAMS mystream 0
    Map<StreamMessageId, Map<String, Object>> entries = stream.read(StreamReadArgs.greaterThan(StreamMessageId.ALL).count(5).timeout(Duration.ofSeconds(5)));
    entries.forEach((id, entryMap) -> {
        log.info("stream[{}] read message: id={}, entry: {}", streamName, id, entryMap);
    });

    //读取区间内消息 - XRANGE mystream 0 entryId COUNT 10
    entries = stream.range(10, StreamMessageId.ALL, entryId);
    entries.forEach((id, entryMap) -> {
        log.info("stream[{}] range message: id={}, entry: {}", streamName, id, entryMap);
    });
}

场景2 相关示例代码如下:

@Resource
private RedissonClient redisson;

@Test
void testStreamGroup() throws InterruptedException {
   String streamName = "mystream";
   String groupName = "mygroup1";
   String consumerName = "c1";
   MyMessage2 myMessage = this.buildMyMessageWithTimestampId();

   //获取Stream
   RStream<String, Object> stream = this.redisson.getStream(streamName);

   //发消息 - XADD mystream * name 我的消息 age 18
   StreamMessageId entryId = stream.add(StreamAddArgs.entries(myMessage.toMap()));
   log.info("stream[{}] add success, id: {}", streamName, entryId);

   //查询已存在的分组 - XINFO GROUPS mystream
   List<StreamGroup> streamGroups = stream.listGroups();
   streamGroups.forEach(streamGroup -> {
       log.info("stream[{}] listGroups groupName: {}", streamName, streamGroup.getName());
   });
   Boolean existGroup = streamGroups.stream().anyMatch(group -> groupName.equals(group.getName()));
   if (!existGroup) {
       //创建分组 - XGROUP CREATE mygroup1 $
       stream.createGroup(StreamCreateGroupArgs.name(groupName)
               //此处id支持:NEWEST即$,ALL即0
               .id(StreamMessageId.ALL));
       log.info("stream[{}] createGroup success, groupName: {}", streamName, groupName);
   }

   //读分组消息 - XREADGROUP GROUP mygroup1 c1 COUNT 5 BLOCK 5000 STREAMS mystream >
   Map<StreamMessageId, Map<String, Object>> entries = stream.readGroup(groupName, consumerName,
           //greaterThan即设置从哪个消息ID之后开始读取,支持:NEVER_DELIVERED即>、ALL即0
           StreamReadGroupArgs.greaterThan(StreamMessageId.NEVER_DELIVERED)
                   .count(5)
                   .timeout(Duration.ofSeconds(5)));
   entries.forEach((id, entryMap) -> {
       log.info("stream[{}] readGroup groupName: {}, consumerName: {}, message: id={}, entry: {}",
               streamName, groupName, consumerName, id, entryMap);
   });

   //读取PEL中未确认的消息 - XPENDING mystream mygroup1 - + 100 c1
   Map<StreamMessageId, Map<String, Object>> streamMessageIdMapMap = stream.pendingRange(groupName, consumerName, StreamMessageId.MIN, StreamMessageId.MAX, 100);
   streamMessageIdMapMap.forEach((id, entryMap) -> {
       log.info("stream[{}] pendingRange groupName: {}, consumerName: {}, message: id={}, entry: {}",
               streamName, groupName, consumerName, id, entryMap);
       //确认消息(从PEL中移除) - XACK mystream mygroup1 1600000000000-0
       stream.ack(groupName, id);
       log.info("stream[{}] ack groupName: {}, consumerName: {}, message: id={}",
               streamName, groupName, consumerName, id);
   });

}


参考:

Redis Stream
https://redis.io/docs/latest/develop/data-types/streams/
https://redis.io/docs/latest/commands/xreadgroup/

Redisson Stream
https://github.com/redisson/redisson/wiki/7.-Distributed-collections#720-stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/748702.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

徐徐拉开的帷幕:拜登与特朗普的辩论大戏 日元跌破160大关!创1986年以来最低纪录

北京时间6月27日&#xff08;本周五&#xff09;上午9:00&#xff0c;拜登和特朗普将参加2024年总统候选人电视辩论。作为参考&#xff0c;2016年大选辩论期间&#xff0c;美元汇率对辩论结果的反应相对温和&#xff0c;希拉里胜选预期增强在一定程度上支撑了美元。 时间逐渐临…

AI产品打造全攻略:看我是如何预测用户流失,搞定AI产品全流程的

前言 对于任何互联网公司而言&#xff0c;用户流失无疑是一个不容忽视的问题。在本文中&#xff0c;我将通过一个真实的预测用户流失的项目案例&#xff0c;带领大家深入了解AI产品从筹备到上线的整个流程。这个过程将展现AI产品经理的工作全貌&#xff0c;包括各个环节的角色…

钉钉在MAKE 2024大会上宣布开放AI生态;NBC将用AI主播播报巴黎奥运会内容

&#x1f680; 钉钉在MAKE 2024大会上宣布开放AI生态 摘要&#xff1a;钉钉总裁叶军在MAKE 2024生态大会上宣布&#xff0c;钉钉将对所有大模型厂商开放&#xff0c;构建“国内最开放AI生态”。目前已有六家大模型厂商接入钉钉&#xff0c;用户可直接使用七家大模型产品。未来…

下拉选择输入框(基于elment-ui)

最近在需求中&#xff0c;需要有一个下拉选择功能&#xff0c;又得可以输入&#xff0c;在 element-ui 官网找了&#xff0c;发现没有适合的&#xff0c;然后在修炼 cv 大法的我&#xff0c;也在网上看了一下&#xff0c;但是也都感觉不合适&#xff0c;所以就自己写了一个&…

R语言数据分析案例37-旅游景点聚类分析

一、研究背景 近年来&#xff0c;随着旅游业的迅猛发展&#xff0c;旅游景点的竞争日益激烈。如何在众多景点中脱颖而出&#xff0c;吸引更多游客&#xff0c;成为各大景点管理者关注的焦点。通过对旅游景点进行深入的数据分析&#xff0c;可以帮助管理者更好地了解景点的优势…

C#1.0-11.0所有历史版本主要特性总结

文章目录 前言名词解释主要版本一览表各版本主要特性一句话总结 C# 1.0 (Visual Studio 2002, .Net Framework 1.0)C# 2.0 (Visual Studio 2005, .Net Framework 2.0)C# 3.0 (Visual Studio 2008, .Net Framework 3.0)C# 4.0 (Visual Studio 2010, .Net Framework 4)C# 5.0 (V…

赏金猎人src挖掘入门

文章目录 1. 什么是漏洞2. OWASP Top 103. 利用的漏洞来源4. SRC安全应急响应中心5. Burpsuite简介6. 浏览器代理插件6.1 firefox浏览器代理插件6.2 edge浏览器代理插件3.chrome浏览器代理插件&#xff08;需要科学上网&#xff09; 1. 什么是漏洞 漏洞是指一个系统存在的弱点或…

2024广东省职业技能大赛云计算赛项实战——构建CICD

构建CI/CD 前言 题目如下&#xff1a; 构建CI/CD 编写流水线脚本.gitlab-ci.yml触发自动构建&#xff0c;具体要求如下&#xff1a; &#xff08;1&#xff09;基于镜像maven:3.6-jdk-8构建项目的drone分支&#xff1b; &#xff08;2&#xff09;构建镜像的名称&#xff1a…

C# VTK 自定义封装 vtkwPipeline 多边形管道建模

vtkwPipeline 简介 public vtkwPipeline(vtkLineSource lineSource, double outR, double inR, int sides) vtkwPipeline 是我自定义封装的C# 类 用于对管道壁建模&#xff0c;有内半径&#xff0c;外半径设置&#xff0c; 以及多边形边数设置。 参数 1. vtkLineSource li…

EI CCIE学习笔记-SDAccess之一:SDAccess解决方案

Chapter 1 SD-Access Solution Proposal 1.1 概念引入 SDN三要素&#xff1a;集中控制、转控分离、可编程 DNA DNA:Digital Network Architecture数字网络架构 思科提出的跨园区&#xff0c;分支机构&#xff0c;WAN和扩展企业的企业网络架构它提供了一种开放&#xff0c;可扩…

win10 C:\Users\Administrator

win10 C:\Users\Administrator C:\Users\Administrator\Documents\ C:\Users\Administrator\Pictures C:\Users\Administrator\Favorites C:\Users\Administrator\Links C:\Users\Administrator\Videos

Shopee API接口——获取商家店铺商品列表

一、引言 在跨境电商领域&#xff0c;Shopee作为东南亚地区领先的电商平台&#xff0c;为众多商家提供了广阔的市场和丰富的销售机会。本文将详细介绍如何通过Shopee API获取商家店铺商品列表&#xff0c;并探讨其应用场景。 二、核心功能介绍 Shopee API获取商家店铺商品列…

数据结构(Java):ArrayList的应用

1、引言 上一篇博客&#xff0c;已经为大家讲解了集合类ArrayList。 这篇博客&#xff0c;就来帮助大家学会使用ArrayList。 2、题1&#xff1a; 删除字符&#xff08;热身题&#xff09; 题目&#xff1a;给出str1和str2两个字符串&#xff0c;删除str1中出现的所有的str2…

【线代基础】张宇30讲+300题错题整理

第一章 行列式 1. 2. 第二章 矩阵 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 第三章 向量 1. 2. 3. 第四章 线性方程组 1. 2. 3. 4. 5. 6. 7. 8. 9. 第五章 特征值与特征向量 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 第六章 二次型 1. 2. 3. 4. 5. 终于结束了线性…

【Mysql】多表关系设计

多表关系设计 实际开发中&#xff0c;一个项目通常需要很多张表才能完成。例如&#xff1a;一个商城项目就需要分类表(category)、商品表(products)、订单表(orders)等多张表。且这些表的数据之间存在一定的关系&#xff0c;接下来我们一起学习一下多表关系设计方面的知识 一对…

系统性掌握C++17容器四件套:std::optional, std::any, std::variant, std::tuple

昨天在写《深入探讨C的高级反射机制&#xff08;2&#xff09;&#xff1a;写个能用的反射库》的时候&#xff0c;正好遇到动态反射需要的类型擦除技术。所谓的类型擦除&#xff0c;就是在两个模块之间的接口层没有任何类型信息&#xff0c;实现两个模块之间安全的通信。可以理…

Unity3D Text使用超链接跳转事件

系列文章目录 Unity工具 文章目录 系列文章目录&#x1f449;前言&#x1f449;一、第一种使用TextMeshPro加入超链接&#x1f449;二、继承Text组件,重载OnPopulateMesh方法&#x1f449;三.壁纸分享&#x1f449;总结 &#x1f449;前言 有时候会用到跳转的问题,所以添加一…

Flutter第十五弹 Flutter插件

目标&#xff1a; 1.Flutter插件是什么&#xff1f;有什么作用&#xff1f; 2.怎么创建Flutter插件&#xff1f; 一、什么是插件 在flutter中&#xff0c;一个插件叫做一个package&#xff0c;使用packages的目的就是为了达到模块化&#xff0c;可以创建出可被复用和共享的代…

关于PX4模拟机型的拓展

#多旋翼 #四旋翼&#xff08;默认&#xff09; sudo make px4_sitl gazebo #带光流的四旋翼 sudo make px4_sitl gazebo_iris_opt_flow #3DR Solo&#xff08;四旋翼&#xff09; sudo make px4_sitl gazebo_solo #Typhoon H480&#xff08;六旋翼&#xff09; sudo make px4_s…

超声波清洗机对眼镜有伤害吗?四大顶尖优品公认力作!

超声波清洗机利用超声波在液体中产生的微小气泡爆炸&#xff0c;产生强大的冲击力&#xff0c;能够深入物品的各个角落&#xff0c;有效去除油污、灰尘和细菌。与传统的手工清洗相比&#xff0c;不仅清洁效率高&#xff0c;而且能够保护眼镜不受损伤&#xff0c;特别适合清洗眼…