Redis的三种消息队列实现方式

目录

前言

List实现消息队列

PubSub消息队列

Stream消息队列

三种实现方式对比


前言

为什么要使用Redis的消息队列?

成本低,对于RabbitMQ或是Kafka来说,已经是重量级的消息队列。

Redis的三种实现方式:

  • List结构:一种有序的双向链表
  • PubSub发布订阅:基于点对点的消息模型
  • Stream:在Redis5.0之后提供的,比较完善的消息队列模型

List实现消息队列

我们可以利用Redis中List的命令LPUSH与RPOP来实现消息的发送与接收,但是需要注意的是,队列中没有消息时,RPOP会返回null,不会向JVM中阻塞队列一样进行阻塞并等待消息,因此这里应该使用BRPOP来实现阻塞效果。

优点:利用Redis存储,不受限于JVM内存上限。

基于Redis的持久化机制,数据安全性有保证。

可以满足消息有序性。

缺点:无法避免消息丢失。

只支持单消费者。

PubSub消息队列

是Redis2.0版本引入的消息传递模型,顾名思义,消费者可以订阅一个或多个channel,生产者向对应的channel发送消息后,所有订阅者都能收到相关信息。

PubSub消息队列的基本命令

# 订阅一个或多个频道
SUBSCRIBE channel [channel]
# 向一个频道发送消息
PUBLISH channel msg
# 订阅与pattern格式匹配的所有频道
PSUBSCRIBE pattern [pattern]

优点:采用发布订阅模式,支持多生产者,多消费者。

缺点:不支持数据持久化。

无法避免消息丢失。

消息堆积有上限,超出时数据丢失。

Stream消息队列

Stream是Redis5.0之后引入新的数据类型,支持持久化,因此相比于PubSub更加安全,可以通过Stream实现一个功能完善的消息队列

发送消息的命令:

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID filed value[filed value]

命令解释:

  • key:队列名称
  • NOMKSTREAM:如果队列不存在,是否自动创建队列,默认是自动创建
  • MAXLEN|MINID [=|~] threshold [LIMIT count]:设置消息队列的最大消息数量
  • *|ID:消息的唯一ID,*代表由Redis自动生成,格式是"时间戳-递增数字"
  • field value:发送到队列的消息名称为Entry

读取消息的第一种方法

命令如下 

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

命令解释:

  • COUNT count:每次读取消息的最大数量
  • BLOCK milliseconds:当没有消息时进行阻塞,并指定阻塞时长,如果为0则指永久阻塞
  • STREAMS key:要从哪个队列读取消息
  • ID:起始ID,只返回大于该ID的消息,0表示从第一个消息开始读取,$表示从最新消息开始

XREAD命令的特点:

  • 消息可回溯
  • 一个消息可以被多个消费者拿到
  • 可以阻塞读取
  • 有消息漏读的风险

读取消息的第二种方法

将多个消费者划分到一个组(Consumer Group)当中,监听同一个队列。特点如下

  • 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
  • 消息标识:消费者组会维护一个标示记录最后一个被处理的消息哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
  • 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。

创建消费者组命令:

XGROUP CREATE key groupName ID [MKSTREAM]

命令解释:

  • key:队列名称
  • groupName:消费者组名称
  • ID:起始ID标识,$代表队列中最后一个消息,0则代表队列中第一个消息
  • MKSTREAM:队列不存在时自动创建
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumername
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname comsumername 

从消费者组中读取消息

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID[ID ...]

命令解释:

  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID。" > "表示从下一个未消费的消息开始。其它则是根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

Group类型的消息队列特点:

  • 消息可回溯
  • 可以多消费者争抢消息
  • 可以阻塞读取
  • 没有消息漏读风险
  • 有消息确认机制,保证消息至少被消费一次

三种实现方式对比

LIST

PubSub

Stream

消息持久化

支持

不支持

支持

阻塞读取

支持

支持

支持

消息堆积处理

受限于内存空间,可以利用多消费者加快处理

受限于消费者缓冲区

受限于队列长度,可以利用消费者组提高消费速度,减少堆积

消息确认机制

不支持

不支持

支持

消息回溯

不支持

不支持

支持

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/220649.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

VSC改造MD编辑器及图床方案分享

VSC改造MD编辑器及图床方案分享 用了那么多md编辑器,到头来还是觉得VSC最好用。这次就来分享一下我的blog文件编辑流吧。 这篇文章包括:VSC下md功能扩展插件推荐、图床方案、blog文章管理方案 VSC插件 Markdown All in One Markdown Image - 粘粘图片…

在python的Scikit-learn库中,可以使用train_test_split函数来划分训练集和测试集。

文章目录 一、在Scikit-learn库中,可以使用train_test_split函数来划分训练集和测试集总结 一、在Scikit-learn库中,可以使用train_test_split函数来划分训练集和测试集 在Scikit-learn库中,可以使用train_test_split函数来划分训练集和测试…

【网络安全】红蓝对抗之企业互联网安全防护

01 什么是“红蓝对抗”? “红蓝对抗”最早起源于古罗马军队,在沙盘中用红色和蓝色来代表敌人和自己,他们认为蓝色代表勇敢和忠诚,红色代表血腥和暴力,所以选择用蓝色代表自己。 在中国,由于传统习俗与文化…

一、技术体系结构

本章概要 总体技术体系框架概念和理解 1.1 总体技术体系 单一架构一个项目,一个工程,导出为一个war包,在一个Tomcat上运行。也叫all in one。 单一架构,项目主要应用技术框架为:Spring , SpringMVC , Mybatis 分布…

Python如何传递任意数量的实参及什么是返回值

Python如何传递任意数量的实参 传递任意数量的实参 形参前加一个 * ,Python会创建一个已形参为名的空元组,将所有收到的值都放到这个元组中: def make_pizza(*toppings):print("\nMaking a pizza with the following toppings: "…

【ArcGIS Pro】探索性插值无法覆盖所需shp范围

做个小记录自用,实际不准。 1 看看就行 pro插值 看看过程就行。有详细过程,类似tutorial https://learn.arcgis.com/zh-cn/projects/interpolate-temperatures-using-the-geostatistical-wizard/ 2 注意用投影坐标系 wgs84转投影坐标系 https://blog…

SR锁存器—>带EN的SR锁存器—>D锁存器—>边沿触发式D触发器—>寄存器

其实选择与非门当做构成SR锁存器的基本逻辑电路是有漏洞的,所以才导致了后续的都为低电平的时候,Q和非Q都是亮起的。但是我们设计的初衷是:Q和非Q是互斥的,是不能同时亮起的,且为了达到这一点,要使得其中两…

用友NC JiuQiClientReqDispatch反序列化RCE漏洞复现

0x01 产品简介 用友NC是一款企业级ERP软件。作为一种信息化管理工具,用友NC提供了一系列业务管理模块,包括财务会计、采购管理、销售管理、物料管理、生产计划和人力资源管理等,帮助企业实现数字化转型和高效管理。 0x02 漏洞概述 用友 NC JiuQiClientReqDispatch 接口存在…

EasyRecovery14破解版 v14.0.0.4 官方免费版(含激活码)

软件介绍 EasyRecovery14高级版是一款功能强大的数据恢复软件,软件对比家庭版本它的使用更加广泛,在恢复数据方面软件可以做到最完整的损失恢复,无论是文档、音乐、软件都可以一键恢复,同时软件还可以对文件的名字、后缀进行修改…

龙芯loongarch64服务器编译安装tokenizers

1、简介 Hugging Face 的 Tokenizers 库提供了一种快速和高效的方式来处理(即分词)自然语言文本,用于后续的机器学习模型训练和推理。这个库提供了各种各样的预训练分词器,如 BPE、Byte-Pair Encoding (Byte-Level BPE)、WordPiece 等,这些都是现代 NLP 模型(如 BERT、GP…

浅谈ArrayBuffer、Blob和File、FileReader

ArrayBuffer、Blob和File都是JavaScript中处理二进制数据的对象。 ArrayBuffer 用于表示一个通用的、固定长度的原始二进制数据缓冲区。它不能直接操作缓冲区中的数据,而需要通过一个类型化数组TypedArray(如Int8Array、Uint8Array等)或者一…

你好!哈希表【JAVA】

1.初识🎶🎶🎶 它基本上是由一个数组和一个哈希函数组成的。哈希函数将每个键映射到数组的特定索引位置,这个位置被称为哈希码。当我们需要查找一个键时,哈希函数会计算其哈希码并立即返回结果,因此我们可以…

消息中间件之间的区别

一.单机吞吐量 ActiveMQ:万级,吞吐量比RocketMQ和Kafka要低了一个数量级 RabbitMQ:万级,吞吐量比RocketMQ和Kafka要低了一个数量级 RocketMQ:10万级,RocketMQ也是可以支撑高吞吐的一种MQ Kafka&#xff…

软件设计模式原则(六)依赖倒置原则

一.定义 依赖倒置原则(Dependence Inversion Principle)是程序要依赖于抽象接口,不要依赖于具体实现。简单的说就是要求对抽象进行编程,不要对实现进行编程,这样就降低了客户与实现模块间的耦合。 即:层次…

SpringBoot整合validation数据校验

1. 首先引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency> 点标识进去可以发现是通过Hibernate Validator使用 Java Bean Validation 2. 属性上…

用AI在抖音直播做姓氏头像的全新玩法,详细分析制作教程

前段时间在圈子里给大家分享了用AI写艺术字做小红书账号案例玩法&#xff0c;同学们都比较热衷学习。纷纷动手实践。 事实上用AI艺术字变现玩法还有许多。 例如上周末在星球给圈友们分享的一个AI艺术字直播的抖音账号&#xff0c;直播内容形式很简单&#xff0c;就是展现用AI…

从一个简单的实际例子看并行处理

在不使用并行处理之前 假如我有一个很大的tif图片 我想算一下这张图片中有多少的像素点是黑色的,我可能会这么做: def cnt_black(filename):img = tf.imread(filename)width, height, channels = img.shapecnt = 0for i in range(width):for j in range(height):r, g, b = …

12、SQL注入——SQL报错注入

文章目录 一、报错注入概述1.1 报错注入1.2 报错注入的前提条件1.3 相关报错函数 二、报错注入payload2.1 利用extractvalue()函数进行报错注入2.2 利用updataxml()函数进行报错注入2.3 利用floor()函数进行报错注入 一、报错注入概述 1.1 报错注入 通过构造特定的SQL语句&am…

coding创建远程分支。并拉取远程新分支+推送代码

进入coding ----项目----代码仓库---点击 下拉之后查看全部----创建分支 创建分支之后执行下面命令 git branch -a // 查看所有分支 这个时候发现自己创建的分支没有显示这是因为自己在远程创建了分支但是本地还没有分支 执行 git fetch命令 用于从远程仓库获取最新的提交…

【软件测试】技术精选:Jmeter常见的几种报错

1、Java.net.UnknownHostException 这个错的含义是 没有连接到服务器地址&#xff0c;因此很可能是 内部网络中断导致。 2、502 Bad gateway 这个和本地的线程数无关 可能原因是网络抖动不稳定导致 3、java.net.SocketException: Socket closed 强制停止线程&#xff0c;连接…