Redis实战(5)——Redis实现消息队列

消息队列,顾名思义,就是一个存放消息的队列。最简单的消息队列包含3个角色

  • 生产者:将消息存入队列中
  • 队列:存放和管理消息
  • 消费者: 将消息从队列中取出来并做业务处理
    在这里插入图片描述
    R e d i s 提供了三种实现消息队列的方式,基于 L i s t 结构、 P u b S u b 、 S t r e a m 结构 \textcolor{red}{Redis 提供了三种实现消息队列的方式,基于List结构、PubSub、Stream结构} Redis提供了三种实现消息队列的方式,基于List结构、PubSubStream结构

1 基于List 结构实现消息队列

Redis的List是一个双向列表。可以从两端存入数据或者取出数据。

  • LPUSH/RPUSH key element 【elements】
  • BLPOP/RPOP key timeout

利用list 结构实现的消息队列主要是依据阻塞取指令 BLPOP/RPOP 来模拟消费者监听队列,直到队列中有消失时获得该数据
优点: 实现简单,且可以持久化
缺点: 只能有一个消费者来消费数据,且只能消费一次,无法避免消息的丢失

2 基于PubSub(发布/订阅)

PubSub 是一个基于点对点的消息模型,消费者可以订阅一个或者多个chanel,当生产者向队列发送了消息时,消费者只要订阅了频道就可以收到并处理消息
在这里插入图片描述

  • PUBLISH channel message 将信息 message 发送到指定的频道 channel
  • SUBSCRIBE channel [channel …] 订阅一个或多个频道
  • PSUBSCRIBE pattren 订阅与通配符匹配的chanel

在使用PSUBSCRIBE pattren 时,支持多种通配符
1 \textcolor{blue}{1} 1 ?:匹配一个字符
2 \textcolor{blue}{2} 2 * :匹配零个字符或多个字符
3 \textcolor{blue}{3} 3 [] :选择匹配,匹配[]中定义的字符 如hell[ae]o 可以匹配 hello 和 hellao

使用PubSub 实现的消费队列时,支持 多生产、多消费 \textcolor{red}{多生产、多消费} 多生产、多消费的模式,不过PubSub不支持数据的持久化,相较于List,它本身就不是一个数据结构无法利用Redis持久化数据。并且无法避免消息的丢失,如生产者向无人订阅的频道发消息时,数据会丢失。另外还会出现由于消费者的缓存空间有效,超时缓存上限时,将会出现消息的丢失。由于这些缺点,redis的PUBSUB模式,无法满足对可靠性要求较高的服务。

3 基于Stream 数据结构

Stream 是redis5.0 及之后针对消息队列场景设计的 数据结构 \textcolor{red}{数据结构} 数据结构,因此数据的安全性得到了保障,因为可以持久化。相较于List 数据结构实现的消息队列的方式,有更多针对消息队列的单独命令,可以实现一个功能更加完善的消息队列

发送消息

  • XADD k e y \textcolor{red}{key} key [ N O M K S T R E A M ] \textcolor{blue}{[NOMKSTREAM] } [NOMKSTREAM] [ < M A X L E N ∣ M I N I D > [ = ∣   ] t h r e s h o l d [ L I M I T c o u n t ] ] \textcolor{green}{[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]]} [<MAXLENMINID>[= ]threshold[LIMITcount]] < ∗ ∣ i d > \textcolor{orange}{<* | id>} <id> F i e l d v a l u e [ F i e l d v a l u e . . . ] \textcolor{purple}{Field value [Field value ...]} Fieldvalue[Fieldvalue...]

参数说明
K e y \textcolor{red}{Key} Key : 存储消息的队列的名字
[ N O M K S T R E A M ] \textcolor{blue}{[NOMKSTREAM] } [NOMKSTREAM] :可选参数,是否在队列不存在时,创建队列。默认是创建的
[ < M A X L E N ∣ M I N I D > [ = ∣   ] t h r e s h o l d [ L I M I T c o u n t ] ] \textcolor{green}{[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]]} [<MAXLENMINID>[= ]threshold[LIMITcount]] :可选参数,设置消息队列的最大消息数,默认是设上限的
< ∗ ∣ i d > \textcolor{orange}{<* | id>} <id> :消息的唯一id,* 表示有redis自动生成,格式是时间戳_递增值 如 1526919030474-0。Id值也可以自定义。
F i e l d v a l u e [ F i e l d v a l u e . . . ] \textcolor{purple}{Field value [Field value ...]} Fieldvalue[Fieldvalue...] 消息体

读取消息

  • XREAD [ C O U N T c o u n t ] \textcolor{red}{[COUNT count] } [COUNTcount] [ B L O C K m i l l i s e c o n d s ] \textcolor{blue}{[BLOCK milliseconds]} [BLOCKmilliseconds] S T R E A M S k e y [ k e y . . . ] \textcolor{green}{STREAMS key [key ...] } STREAMSkey[key...] i d [ i d . . . ] \textcolor{orange}{id [id ...]} id[id...]

参数说明
[ C O U N T c o u n t ] \textcolor{red}{[COUNT count] } [COUNTcount] 可选参数, 指定读取消息的条数
[ B L O C K m i l l i s e c o n d s ] \textcolor{blue}{[BLOCK milliseconds]} [BLOCKmilliseconds] 当没有消息时,读取队列消息的阻塞时长,当设置为0时,永久等待,直到读取到队列中消息
S T R E A M S k e y [ k e y . . . ] \textcolor{green}{STREAMS key [key ...] } STREAMSkey[key...] 需要读取的队列的key名字,可以从多个队列中读取数据
i d [ i d . . . ] \textcolor{orange}{id [id ...]} id[id...] 读取消息的起始Id 。有两个特殊的id,0 表示从第一个消息读起,$ 表示读取最新的一条消息

在读取消息时,可以通过while(true) 循环 调用xread block 0 streams key $ 去永久的监听队列去获得消息。不过这种模式下会出现一个问题,在获得消息并处理消息这个时间间隙中,可能生产者又往队列中增加了好几条消息,由于Id 为$ 只会读取最新的一条消息,那么可能会出现消息的漏读。这里可以采用基于消费者组去读取消息

3.1 基于消费者组去消费消息

可以将多个消费者划分到一个组中,其中每个组消费消息时都会维护一个最后消费消息的标识 L a s t d e l i v e r e d i d \textcolor{red}{Last delivered id} Lastdeliveredid,当宕机重启后,直接从该标识id之后的消息消费。意味者不会重复消费消息。
在消费者组中还维护了一个 Pending_ids集合,该集合中存放了未确认【ACK】消费数据的消息Id,
机器出现宕机后重启,可继续确认未处理的消息。可以通过 X A C K \textcolor{red}{XACK} XACK来确认客户端确认已经消费了消息,之后从Pending_ids集合中移除。

基于消费者组消费消息时,最大程度的保证了消息的安全消费、不重复消费。
在这里插入图片描述

创建消费者组
XGROUP C R E A T E \textcolor{red}{CREATE} CREATE K E Y \textcolor{green}{KEY } KEY G R O U P N A M E \textcolor{blue}{GROUPNAME } GROUPNAME I D \textcolor{orange}{ID} ID [ M K S T R E A M ] \textcolor{purple}{ [MKSTREAM]} [MKSTREAM]

C R E A T E \textcolor{red}{CREATE} CREATE :创建组
K E Y \textcolor{green}{KEY } KEY :基于哪个队列去创建组
G R O U P N A M E \textcolor{blue}{GROUPNAME } GROUPNAME :创建的消费者组名称
I D \textcolor{orange}{ID} ID 消息的标识id。0 从头消费 $ 消费最新的消息
[ M K S T R E A M ] \textcolor{purple}{ [MKSTREAM]} [MKSTREAM] : 可选参数,当队列不存在时,是否创建队列,默认是创建

从消费者组中消费消息

XGROUPREAD GROUP g r o u p \textcolor{red}{group } group c o n s u m e r \textcolor{green}{consumer } consumer [ C O U N T c o u n t ] \textcolor{blue}{[COUNT count] } [COUNTcount] [ B l o c k m i l l i s e c o n d s ] \textcolor{orange}{[Block milliseconds] } [Blockmilliseconds] [ N O A C K ] \textcolor{purple}{ [NOACK] } [NOACK] S T R E A M S K E Y [ k e y . . . ] \textcolor{red}{STREAMS KEY [key ...]} STREAMSKEY[key...] I D [ I D . . . . . ] \textcolor{green}{ ID [ID.....] } ID[ID.....]

g r o u p \textcolor{red}{group } group : 组的名字,定义从哪个消费者组消费消息
c o n s u m e r \textcolor{green}{consumer } consumer :消费者名字,如果不存在,自动创建
[ C O U N T c o u n t ] \textcolor{blue}{[COUNT count] } [COUNTcount] :消费数量
[ B l o c k m i l l i s e c o n d s ] \textcolor{orange}{[Block milliseconds] } [Blockmilliseconds] :可选参数,阻塞时长【单位ms】,不设置时为非阻塞消费。
[ N O A C K ] \textcolor{purple}{ [NOACK] } [NOACK] :可选参数,是否自动确认。true时消息不会进入pending_ids[] 集合中,可能会有未消费的消息。所以为了安全性,无需设置。
S T R E A M S K E Y [ k e y . . . ] \textcolor{red}{STREAMS KEY [key ...]} STREAMSKEY[key...] : 监听的队列的名字
I D [ I D . . . . . ] \textcolor{green}{ ID [ID.....] } ID[ID.....] :获得消息的起始ID 。
设置成 ">" :从下一个未消费的消息开始消费。
设置成其他:均是从pending-list中获得已消费但是未确认的消息,如0 ,从pending-list中第一个消息开始。
根据实际情况可设置不同的ID 去消费消息。正常读取设置> 异常读取未确认的消息

确认消息
XACK k e y \textcolor{red}{key } key g r o u p \textcolor{green}{group } group I D [ I D . . . ] \textcolor{blue}{ ID [ID...] } ID[ID...]

k e y \textcolor{red}{key } key :队列名称
g r o u p \textcolor{green}{group } group :组名称
I D [ I D . . . ] \textcolor{blue}{ ID [ID...] } ID[ID...] :待确认的消息Id

3.2 数据测试

(1) 向order队列中添加4条消息

xadd order * voucherId 9 userId 150 orderId 79297921056506055
xadd order * voucherId 9 userId 129 orderId 79297921056506083
xadd order * voucherId 9 userId 111 orderId 79297921056506108
xadd order * voucherId 9 userId 111 orderId 79297921056506101

在这里插入图片描述
(2) 向order队列创建消费者组group_1

## 消费者组从头开始消费数据
XGROUP CREATE order  group_1 0 

(3) 从消费者组中消费消息

## 消费最新的未消费的消息,采用阻塞式获取,最长等待2000ms
XREADGROUP GROUP group_1 consumer_1 COUNT 1 BLOCK 2000 STRAEAMS order >

在这里插入图片描述
第五次消费时,阻塞等待后返回空。队列中的消息全部消费,此时都处于为确认状态,全部存入了penging-list中。
此时需要手动确认这些消息确实已经被成功的消费了,需要手动确认将其从pending-list 集合中移除

(4) 手动确认已经消费的消息

 XACK order group_1 1691146911471-0  1691148054821-0 1691148657217-0  1691202770386-0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/62518.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux性能学习(4.5):网络_TCP四次挥手内核参数优化

文章目录 1 四次挥手2 参数优化2.1 tcp_orphan_retries--->FIN报文重传次数2.2 tcp_max_orphans--->孤儿连接最大数量2.3 tcp_fin_timeout--->FINE_WAIT2状态等待时间2.4 tcp_max_tw_buckets2.5 tcp_tw_reuse--->复用未释放的端口 3 总结 参考资料&#xff1a; 1. …

星图按转化线索回传对接思路与示例

一、什么是星图&#xff1f;二、什么是线索转化&#xff1f;三、对接中的一些疑问&#xff1f;四、如何对接开发&#xff1f;五、星图侧如何联调测试&#xff1f; 一、什么是星图&#xff1f; 抖音星图是抖音电商蓝图下&#xff0c;为品牌方、MCN公司和明星/达人服务并收取分…

【DFS】17. 电话号码的字母组合

【DFS】17. 电话号码的字母组合 Halo&#xff0c;这里是Ppeua。平时主要更新C语言&#xff0c;C&#xff0c;数据结构算法…感兴趣就关注我bua&#xff01; TOC 题目: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rikjhVFD-1691333891079)(C:\Us…

IAR开发环境的安装、配置和新建STM32工程模板

IAR到环境配置到新建工程模板-以STM32为例 一、 简单介绍一下IAR软件1. IAR的安装&#xff08;1&#xff09; 下载IAR集成开发环境安装文件&#xff08;2&#xff09; 安装 2. 软件注册授权 二、IAR上手使用(基于STM32标准库新建工程)1、下载标准库文件2、在IAR新建工程&#x…

华为云交付

文章目录 一、华为云-公有云架构华为公有云的主要服务1.华为云服务—计算类2.华为云服务——存储类3.华为云服务—网络类4.华为云服务—管理和监督类5.华为云数据库 二、待续 一、华为云-公有云架构 华为公有云的主要服务 ECS&#xff1a;弹性云服务器&#xff08; Elastic Cl…

交互流程图设计软件都有哪些?

交互流程图是设计行业信息流、观点流或组件流的图形代表。但是市场上应该如何选择各种交互流程图软件呢&#xff1f;如何使用高质量的交互流程图软件来绘制高端氛围的高档流程图&#xff1f;今天&#xff0c;小边给您带来了十个超级实用的交互流程图软件&#xff0c;我希望能帮…

解决宝塔面板升级获取更新包失败,请稍后更新或联系宝塔运维

宝塔Linux面板执行升级命令后失败&#xff0c;提示“获取更新包失败&#xff0c;请稍后更新或联系宝塔运维”如何解决&#xff1f;新手站长分享宝塔面板升级失败的解决方法&#xff1a; 宝塔面板升级失败解决方法 1、使用root账户登录到你的云服务器上&#xff0c;宝塔Linux面…

亿欧智库:2023中国宠物行业新趋势洞察报告(附下载)

关于报告的所有内容&#xff0c;公众【营销人星球】获取下载查看 核心观点 户外赛道本质上迎合了全球共性需求的增长&#xff0c;从养宠意愿的转化到养宠生活的需求&#xff0c;多层次的需求推动行业发展新趋势 从需求端进行分析&#xff0c;可以将养宠意愿的转化分为三个层…

Python语法:... for ... in ... if ...

Python中&#xff0c;for...in...[if]...语句是一种简洁的构建List的方法&#xff0c;从for给定的List中选择出满足if条件的元素组成新的List&#xff0c;其中if是可以省略的。下面举几个简单的例子进行说明 [for in ]: ...for ....in..... 语句. 实例如下&#xff1a; (1) …

零知识证明技术概述

简述 隐私泄露问题给企业带来了巨大的损失&#xff0c;本文简述零知识证明技术并且给出对应的应用示例&#xff1a; 什么是零知识证明&#xff1f; 零知识证明又被称为零知识协议&#xff0c;利用数学知识在双方不需要直接传递信息本身的前提下来验证信息的正确性。这个思想…

LLM reasoners 入门实验 24点游戏

LLM reasoners Ber666/llm-reasoners 实验过程 实验样例24games&#xff0c;examples/tot_game24&#xff0c;在inference.py中配置使用代理和open ai的api key。 首先安装依赖 git clone https://github.com/Ber666/llm-reasoners cd llm-reasoners pip install -e .然后…

【JAVA】继承

作者主页&#xff1a;paper jie的博客 本文作者&#xff1a;大家好&#xff0c;我是paper jie&#xff0c;感谢你阅读本文&#xff0c;欢迎一建三连哦。 本文录入于《JAVASE语法系列》专栏&#xff0c;本专栏是针对于大学生&#xff0c;编程小白精心打造的。笔者用重金(时间和精…

437. 路径总和 III

题目描述&#xff1a; 主要思路&#xff1a; 方法一&#xff1a;递归 从每个节点开始一次递归 class Solution { public:int ans0;void dfs(TreeNode* now,int targetSum, long sum){if(!now)return;sumnow->val;if(sumtargetSum)ans1;dfs(now->left,targetSum,sum);df…

[Docker实现测试部署CI/CD----构建成功后钉钉告警(7)]

目录 15、钉钉告警创建项目群&#xff0c;然后添加机器人添加机器人Jenkins 系统配置项目配置修改Jenkinsfile文件&#xff0c;添加钉钉提示信息测试 不修改Jenkinsfile文件&#xff0c;添加钉钉提示信息测试 15、钉钉告警 创建项目群&#xff0c;然后添加机器人 首先需要在钉…

Grafana制作图表-自定义Flink监控图表

简要 有时候我们在官网的Grafana下载的图表是这样的&#xff0c;如下图 #算子的处理时间&#xff0c;就是处理数据的延迟数据抓取&#xff0c;这个的说明看下下面的文章 metrics.latency.interval: 60 metrics.reporter.promgateway.class: org.apache.flink.metrics.prometh…

websocket服务端大报文发送连接自动断开分析

概述 当前springboot版本&#xff1a;2.7.4 使用依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId> </dependency>现象概述&#xff1a; 客户端和服务端已经有心跳…

《大型网站技术架构》第二篇 架构-高可用

高可用在公司中的重要性 对公司而言&#xff0c;可用性关系网站的生死存亡。对个人而言&#xff0c;可用性关系到自己的绩效升迁。 工程师对架构做了许多优化、对代码做了很多重构&#xff0c;对性能、扩展性、伸缩性做了很多改善&#xff0c;但别人未必能直观地感受到&#…

windows系统安装ElasticSearch7.9.3笔记

windows系统安装ElasticSearch7.9.3笔记 从es中文社区 或elastic官网下载安装包 ES中文社区-浏览器地址https://elasticsearch.cn/download/ 下载7.9.3版本的相关安装包 下载的安装包清单如下 开始配置使用带ik分词器和拼音分词器的ES7.9.3 分别解压这3个zip 拷贝ik分词器…

MATLAB(R2023a)添加工具箱TooLbox的方法-以GPOPS为例

一、找到工具箱存放位置 首先我们需要找到工具箱的存放位置&#xff0c;点击这个设置路径可以看到 我们的matlab工具箱的存放位置 C:\Program Files\MATLAB\R2023a\toolbox\matlab 从资源管理器中打开这个位置&#xff0c;可以看到里面各种工具箱 二、放入工具箱 解压我们…

[JavaScript游戏开发] Q版地图上让英雄、地图都动起来

系列文章目录 第一章 2D二维地图绘制、人物移动、障碍检测 第二章 跟随人物二维动态地图绘制、自动寻径、小地图显示(人物红点显示) 第三章 绘制冰宫宝藏地图、人物鼠标点击移动、障碍检测 第四章 绘制Q版地图、键盘上下左右地图场景切换 第五章 Q版地图上让英雄、地图都动起来…