redis高级数据结构Stream

文章目录

  • 背景
  • stream概述
    • 消息 ID
    • 消息内容
    • 常见操作
    • 独立消费
    • 创建消费组
    • 消费
  • Stream弊端
    • Stream 消息太多怎么办?
    • 消息如果忘记 ACK 会怎样?
    • PEL 如何避免消息丢失?
    • 分区 Partition
  • Stream 的高可用
  • 总结

背景

为了解决list作为消息队列是无法支持消息多播问题,Redis5.0 多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化的消息队列,作者坦言 Redis Stream 狠狠地借鉴了 Kafka 的设计。

stream概述

在这里插入图片描述

Redis Stream 的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的, Redis 重启后,内容还在。

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

每个 Stream 都可以挂多个消费组,每个消费组会有个游标 last_delivered_id 在 Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream内唯一的名称,消费组不会自动创建,它需要单独的指令 xgroup create 进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化 last_delivered_id 变量。

每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响。也就是说同一份Stream 内部的消息会被每个消费组都消费到。

同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。

消费者 (Consumer) 内部会有个状态变量 pending_ids,它记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

消息 ID

消息 ID 的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第 5 条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。

消息内容

消息内容就是键值对,形如 hash 结构的键值对,这没什么特别之处。

常见操作

  • 1、 xadd 追加消息
  • 2、 xdel 删除消息,这里的删除仅仅是设置了标志位,不影响消息总长度
  • 3、 xrange 获取消息列表,会自动过滤已经删除的消息
  • 4、 xlen 消息长度
  • 5、 del 删除 Stream

独立消费

我们可以在不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消
息时,甚至可以阻塞等待。 Redis 设计了一个单独的消费指令 xread,可以将 Stream 当成普通的消息队列 (list) 来使用。使用 xread 时,我们可以完全忽略消费组 (Consumer Group)的存在,就好比 Stream 就是一个普通的列表 (list)。

客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。block 0 表示永远阻塞,直到消息到来, block 1000 表示阻塞 1s,如果 1s 内没有任何消息到来,就返回 nil。类似kafka广播消费,客户端保存offset。

创建消费组

在这里插入图片描述
Stream 通过 xgroup create 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。其实就是kafka广播消费下客户端自己保存offset,消费者在消费前传递offset告知kafka从哪开始消费,果然是借鉴kafka,但是没做到kafka的层度(kafka客户端会自己保存在内存中,不需要使用者自己保存)。

消费

Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。

Stream弊端

Stream 消息太多怎么办?

读者很容易想到,要是消息积累太多, Stream 的链表岂不是很长,内容会不会爆掉?xdel指令又不会删除消息,它只是给消息做了个标志位。

Redis 自然考虑到了这一点,所以它提供了一个定长 Stream 功能。在 xadd 的指令提供一个定长长度 maxlen,就可以将老的消息干掉,确保最多不超过指定长度。

这个类似kafka的日志删除,只是这里固定为根据大小删除,当达到一定量就删除旧数据。

消息如果忘记 ACK 会怎样?

Stream 在每个消费者结构中保存了正在处理中的消息 ID 列表 PEL,如果消费者收到
了消息处理完了但是没有回复 ack,就会导致 PEL 列表不断增长,如果有很多消费组的话,那么这个 PEL 占用的内存就会放大。

PEL 如何避免消息丢失?

在客户端消费者读取 Stream 消息时, Redis 服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是 PEL 里已经保存了发出去的消息 ID。待客户端重新连上之后,可以再次收到 PEL 中的消息 ID 列表。不过此时 xreadgroup 的起始消息ID 不能比PEL内的大,而必须是任意有效的消息 ID,一般将参数设为 0-0,表示读取所有的PEL 消息以及自 last_delivered_id 之后的新消息。这其实就像kafka提供seek方法用于客户端指定从哪个offset开始消费。

分区 Partition

Redis 的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个Stream,然后在客户端使用一定的策略来生产消息到不同的 Stream。所以个人认为还是kafka强大,stream只是个对kafka进行大量阉割的消息队列,使用上请谨慎。

Stream 的高可用

Stream 的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别,也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。不过鉴于 Redis 的指令复制是异步的,在 failover 发生时, Redis 可能会丢失极小部分数据,这点 Redis 的其它数据结构也是一样的。

总结

如果真需要使用消息队列那么还是选择市面上大家认可的消息队列,以防在后期迭代时发现大量功能问题和性能问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/967105.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringMVC SpringMVC拦截器 拦截器基础知识

1.什么是拦截器 SpringMVC提供了Intercepter拦截器机制,类似于Servlet当中的Filter过滤器,用于拦截用户的请求并作出相应的处理,比如通过拦截器来进行用户权限验证或者用来判断用户是否登录。SpringMVC拦截器是可插拔式的设计,需…

TAPEX:通过神经SQL执行器学习的表格预训练

摘要 近年来,语言模型预训练的进展通过利用大规模非结构化文本数据取得了巨大成功。然而,由于缺乏大规模高质量的表格数据,在结构化表格数据上应用预训练仍然是一个挑战。本文提出了TAPEX,通过在一个合成语料库上学习神经SQL执行…

Matlab机械手碰撞检测应用

本文包含三个部分: Matlab碰撞检测的实现URDF文件的制作机械手STL文件添加夹爪 一.Matlab碰撞检测的实现 首先上代码 %% 检测在结构环境中机器人是否与物体之间发生碰撞情况,如何避免? % https://www.mathworks.com/help/robotics/ug/che…

激活函数篇 04 —— softmax函数

将模型的输出转换为概率分布,使得模型能够输出每个类别的概率值。 Softmax ( a i ) e a i ∑ j 1 n e a j \text{Softmax}(a_i)\frac{e^{a_i}}{\sum_{j1}^n e^{a_j}} Softmax(ai​)∑j1n​eaj​eai​​ 其中, a i a_i ai​ 是输入向量中的第 i i i 个…

软件工程的熵减:AI如何降低系统复杂度

软件开发的世界,如同一个不断膨胀的宇宙。随着功能的增加和时间的推移,代码库越来越庞大,系统复杂度也随之水涨船高。代码膨胀、维护困难、开发效率低下等问题困扰着无数开发者。这不禁让人联想到物理学中的“熵增”原理——一个孤立系统的熵…

springboot008房屋租赁系统

版权声明 所有作品均为本人原创,提供参考学习使用,如需要源码数据库配套文档请移步 www.taobysj.com 搜索获取 技术实现 开发语言:Javavue。 框架:后端spingboot前端vue。 模式:B/S。 数据库:mysql。 开…

DeepSeek训练成本与技术揭秘

引言:在当今人工智能蓬勃发展的时代,DeepSeek 宛如一颗耀眼的新星,突然闯入大众视野,引发了全球范围内的热烈讨论。从其惊人的低成本训练模式,到高性能的模型表现,无一不让业界为之侧目。它打破了传统认知&…

计算机视觉语义分割——Attention U-Net(Learning Where to Look for the Pancreas)

计算机视觉语义分割——Attention U-Net(Learning Where to Look for the Pancreas) 文章目录 计算机视觉语义分割——Attention U-Net(Learning Where to Look for the Pancreas)摘要Abstract一、Attention U-Net1. 基本思想2. Attention Gate模块3. 软注意力与硬注意力4. 实验…

<论文>DeepSeek-R1:通过强化学习激励大语言模型的推理能力(深度思考)

一、摘要 本文跟大家来一起阅读DeepSeek团队发表于2025年1月的一篇论文《DeepSeek-R1: Incentivizing Reasoning Capability in LLMs via Reinforcement Learning | Papers With Code》,新鲜的DeepSeek-R1推理模型,作者规模属实庞大。如果你正在使用Deep…

【PDF提取内容】如何批量提取PDF里面的文字内容,把内容到处表格或者批量给PDF文件改名,基于C++的实现方案和步骤

以下分别介绍基于 C 批量提取 PDF 里文字内容并导出到表格,以及批量给 PDF 文件改名的实现方案、步骤和应用场景。 批量提取 PDF 文字内容并导出到表格 应用场景 文档数据整理:在处理大量学术论文、报告等 PDF 文档时,需要提取其中的关键信…

collabora online+nextcloud+mariadb在线文档协助

1、环境 龙蜥os 8.9 docker 2、安装docker dnf -y install dnf-plugins-core dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sed -i shttps://download.docker.comhttps://mirrors.tuna.tsinghua.edu.cn/docker-ce /etc/yum.repos.…

Meta AI 最近推出了一款全新的机器学习框架ParetoQ,专门用于大型语言模型的4-bit 以下量化

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

w198基于Springboot的智能家居系统

🙊作者简介:多年一线开发工作经验,原创团队,分享技术代码帮助学生学习,独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取,记得注明来意哦~🌹赠送计算机毕业设计600个选题excel文…

电脑连接wifi但是浏览器打开不了网页,使用手机热点能正常使用

电脑连接wifi但是浏览器打开不了网页,使用手机热点能正常使用 打开控制面板 打开网络和Internet(查看网络状态和任务) 点击更改适配器设置 双击WLAN 点击属性并双击打开Internet 协议版本4(TCP/IPv4) 将自动…

蓝桥杯K倍区间(前缀和与差分,取模化简)

输入 5 2 1 2 3 4 5 输出 6 思路:首先由连续子串和可以想用前缀和,由于加减法总和取模和分别取模结果不受影响,所以我们前缀和之后直接取模方便观察性质,本题前缀和:1,3,6,10&#…

《Wiki.js知识库部署实践 + CNB Git数据同步方案解析》

一、wiki.js 知识库简介 基本概述 定义 :Wiki.js 是一个开源、现代、轻量且功能强大的 Wiki 应用程序,基于 Node.js 构建,旨在帮助个人和团队轻松创建、管理和共享知识。开源性质 :它遵循 AGPLv3 许可证,任何人都可以…

递增三元组(蓝桥杯18F)

暴力求解&#xff1a; #include<iostream> using namespace std; int main() {int N;cin >> N;int* A new int[N];int* B new int[N];int* C new int[N];for (int i 0; i < N;i) {cin >> A[i];}for (int i 0; i < N; i) {cin >> B[i];}for…

【抽象代数】1.2. 半群与群

群的定义 群非空集合二元运算性质 定义1. 设 为一个非空集合&#xff0c;上有二元运算&#xff0c;满足结合律&#xff0c;则称或为一个半群。 定义2. 设 为半群&#xff0c;若元素 满足 &#xff0c;则称 为 的左幺元&#xff08;右幺元&#xff1a;&#xff09;&#…

idea如何使用AI编程提升效率-在IntelliJ IDEA 中安装 GitHub Copilot 插件的步骤-卓伊凡

idea如何使用AI编程提升效率-在IntelliJ IDEA 中安装 GitHub Copilot 插件的步骤-卓伊凡 问题 idea编译器 安装copilot AI工具 实际操作 在 IntelliJ IDEA 中安装 GitHub Copilot 插件的步骤如下&#xff1a; 打开 IntelliJ IDEA&#xff1a; 打开你的 IntelliJ IDEA 应用…

机器学习之数学基础:线性代数、微积分、概率论 | PyTorch 深度学习实战

前一篇文章&#xff0c;使用线性回归模型逼近目标模型 | PyTorch 深度学习实战 本系列文章 GitHub Repo: https://github.com/hailiang-wang/pytorch-get-started 本篇文章内容来自于 强化学习必修课&#xff1a;引领人工智能新时代【梗直哥瞿炜】 线性代数、微积分、概率论 …