Redis Streams 实现消息队列

简单介绍

Redis中有三种消息队列模式:
在这里插入图片描述
可以看出,作为Redis 5.0 引入的专门为消息队列设计的数据类型,Stream 功能更加健全,更适合做消息队列分发。 Stream 可以包含 0个 到 n个元素的有序队列,并根据ID的大小进行排序。
Stream类型消息队列的具备以下命令特点:

  • 可以序列化生成消息ID,方便索引、排序
  • 消息可回溯
  • 支持Consumer Groups 消费组:多消费者消息争抢,加快消费速度
  • 可以阻塞读取消息和非阻塞读取消息
  • 没有消息漏读风险
  • 有ACK消息确认机制,保证消息至少被消费一次
  • 支持多播模式:可以让队列从逻辑上分组进行隔离消费
    详细的stream操作见官网文档:https://redis.io/docs/data-types/streams-tutorial/

示例Demo

环境准备
需要下载Redis 5+版本(Redis 5+才支持streams)。
https://github.com/redis-windows/redis-windows/releases/tag/7.0.8
可以在命令行或者客户端进行测试。
客户端下载链接:
https://redis.com/redis-enterprise/redis-insight/

测试步骤

创建一个名为 "stream_demo" 的 Stream
XADD stream_demo* message "Message 1"

创建消息组:
XGROUP CREATE stream_demo mygroup $ MKSTREAM

阻塞式监听 Stream,等待消息到达
将消费者 "consumer1" 加入到消息组 "mygroup" 中,并且阻塞式地监听消息。
一旦消息到达,它会被消费者处理,然后使用 XACK 命令来确认已处理的消息。
XREADGROUP GROUP mygroup consumer1 BLOCK 0 STREAMS stream_demo >

执行添加消息的命令
XADD stream_demo * message "Message 2"  

Python实现

import redis

def func(message):
    print("Processing message:", message.decode('utf-8'))

def consume_messages():
    r = redis.Redis(host='localhost', port=6379, db=0)

    # 创建消息组
    r.xgroup_create('stream_demo', 'mygroup', id='$', mkstream=True)

    while True:
        # 阻塞式监听消息
        messages = r.xreadgroup('mygroup', 'consumer1', {'stream_demo': '>'}, block=0)

        for stream, message_data in messages:
            for message_id, message in message_data:
                # 执行处理操作
                func(message['message'.encode('utf-8')])

                # 确认消息已处理
                r.xack('stream_demo', 'mygroup', message_id)
                # 删除已确认的消息
                # r.xdel('stream_demo', message_id)

if __name__ == '__main__':
    consume_messages()

消费者组实现

可以给各创建一个消费者组,组内只有一个消费者,可以实现一个消息被多个消费者消费。

参考链接:
https://blog.csdn.net/qq_37967783/article/details/131138580
https://www.cnblogs.com/wzh2010/p/17205395.html

生产者生产消息 放到队列中,消费者监听并获取消息 进行消息的消费
消费成功后会给一个ack,队列中就会将这个消息删除。
如果消费失败,消息重回队列;

消息队列中 未被确认的消息 如何重新消费掉

Redis Streams is a Redis data type that represents a log, so you can add new information and message in an append-only mode (Note: This is not 100% accurate, since you can remove messages from the log, but it’s close enough.) Redis Streams lets you build “Kafka-like” applications, which can:

  • Create applications that publish and consume messages. Nothing extraordinary here, you could already do that with Redis Pub/Sub.
  • Consume messages that are published even when the client application (consumer) is not running. This is a big difference from Redis Pub/Sub.
  • Consume messages starting from a specific point. For example, read the whole history or only new messages.

如果消息进入到队列中 但是还没被消费,或者被消费但是没有被ack,redis就挂掉了,那么重启之后,要确保程序还能读得到未被消费或者被消费但是没有被ack的消息,继续处理。
要解决这个问题,你可以使用Redis的持久化功能来确保即使在Redis重启后,未被消费或未被确认的消息仍然可用。Redis提供两种主要的持久化方式:RDB持久化和AOF持久化。

  1. RDB持久化:RDB持久化是将Redis的数据集快照保存到磁盘上的一个二进制文件中。你可以配置Redis以固定的时间间隔自动执行RDB持久化,或者手动执行SAVE命令来创建快照。当Redis重启时,它可以加载最近的RDB文件并恢复之前保存的状态,包括未被消费或未被确认的消息。

  2. AOF持久化:AOF持久化是将Redis的写操作追加到一个日志文件(Append-Only File)中。通过将写操作追加到文件末尾,Redis可以在重启时重新执行这些写操作以恢复数据集的状态。你可以配置Redis以不同的方式执行AOF持久化,如每秒同步、每个写操作同步等。当Redis重启时,它会重新执行AOF文件中的写操作,确保未被消费或未被确认的消息不会丢失。

为了确保消息的持久性,你可以在Redis配置文件中启用RDB持久化和/或AOF持久化。以下是一些配置示例:

# 启用RDB持久化
save 60 1000  # 在60秒内,如果至少有1000个键发生变化,则执行RDB持久化

# 启用AOF持久化
appendonly yes
appendfsync everysec  # 每秒将写操作同步到磁盘

通过启用持久化功能,即使Redis在重启后,你的程序也能读取到未被消费或未被确认的消息,并继续处理它们。请注意,持久化功能会增加Redis的磁盘IO和内存使用量,因此你需要根据实际情况进行配置和优化。

待续…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/252517.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【CMU 15-445】Lecture 10: Sorting Aggregations Algorithms 学习笔记

Sorting & Aggregations Algorithms SortingTop-N Heap SortExternal Merge Sort2-WAY External Merge SortK-WAY External Merge SortDouble Buffering Optimization AggregationsSortingHashing 本节课主要介绍的是数据库系统中的排序算法以及聚合算法 Sorting 排序算法…

数据结构期末考题之001

算法复杂度就是n(0)

JdbcTemplate

环境搭建 依赖 <!-- Oracle11g 连接驱动依赖。这里是个坑&#xff0c;官方提供的不能用 --><dependency><groupId>cn.easyproject</groupId><artifactId>ojdbc6</artifactId><version>12.1.0.2.0</version></dependency&g…

20231217在Canonical 官网下载ubuntu-20.04.6以及通过rufus-4.3p写入U盘作为安装盘

20231217在Canonical 官网下载ubuntu-20.04.6以及通过rufus-4.3p写入U盘作为安装盘 2023/12/17 11:45 百度搜索&#xff1a;https://ubuntu.com/ https://ubuntu.com/engage/migrating-from-rhel-centos-to-ubuntu-public-cloud?utm_sourcetakeover http://releases.ubuntu.co…

从关键新闻和最新技术看AI行业发展(2023.12.4-12.17第十二期) |【WeThinkIn老实人报】

写在前面 【WeThinkIn老实人报】旨在整理&挖掘AI行业的关键新闻和最新技术&#xff0c;同时Rocky会对这些关键信息进行解读&#xff0c;力求让读者们能从容跟随AI科技潮流。也欢迎大家提出宝贵的优化建议&#xff0c;一起交流学习&#x1f4aa; 大家好&#xff0c;我是Rock…

软件设计规约和评审

软件设计规约 概要设计规约&#xff1a;这是面向软件开发者的文档&#xff0c;主要作为软件项目管理人员、系统分析人员与设计人员之间交流的媒介。它指明了软件的组织结构&#xff0c;主要内容包括&#xff1a; 系统环境&#xff1a;硬件、软件接口与人机界面&#xff1b;外部…

半导体设备之外延炉简述

半导体设备对整个半导体行业起着重要的支撑作用。因半导体制造工艺复杂&#xff0c;各个环节需要的设备也不同&#xff0c;从流程工序分类来看&#xff0c;半导体设备主要可分为晶圆制造设备&#xff08;前道工序&#xff09;、封装测试设备&#xff08;后道工序&#xff09;等…

自动驾驶学习笔记(二十)——Planning算法

#Apollo开发者# 学习课程的传送门如下&#xff0c;当您也准备学习自动驾驶时&#xff0c;可以和我一同前往&#xff1a; 《自动驾驶新人之旅》免费课程—> 传送门 《Apollo 社区开发者圆桌会》免费报名—>传送门 文章目录 前言 参考线平滑 双层状态机 EM Planner …

724. 寻找数组的中心下标

代码&#xff1a; class Solution {public int pivotIndex(int[] nums) {int last0; //放前一个下标的左右差值int now0; //放当前下标的左右差值// 0 位于边界&#xff0c;先算出 0 下标左右的差值for(int i1;i<nums.length;i){lastnums[i];}//判断是否 0 下标就已…

大模型自定义算子优化方案学习笔记:CUDA算子定义、算子编译、正反向梯度实现

01算子优化的意义 随着大模型应用的普及以及算力紧缺&#xff0c;下一步对于计算性能的追求一定是技术的核心方向。因为目前大模型的计算逻辑是由一个个独立的算子或者说OP正反向求导实现的&#xff0c;底层往往调用的是GPU提供的CUDA的驱动程序。如果不能对于整个计算过程学习…

Ubuntu 常用命令之 cp 命令用法介绍

cp命令在Ubuntu系统中用于复制文件或目录。它的基本格式是cp [选项] 源文件或目录 目标文件或目录。 以下是一些常用的cp命令选项 -i&#xff1a;在覆盖目标文件之前将给出提示。-r或-R&#xff1a;递归复制&#xff0c;用于目录的复制操作。-v&#xff1a;详细模式&#xff…

地平线前端实习一面复盘(加深对var的理解+展开运算符+平拍数组)

目录 前言一&#xff0c;var的作用二&#xff0c;展开运算符三&#xff0c;平拍数组总结 前言 地平线的面试&#xff0c;有提示&#xff0c;很专业&#xff0c;体验很好。 可惜后面未收到消息&#xff0c;但还是要做复盘。收获还是很大的。 一&#xff0c;var的作用 且看下…

MySQL中EXPLAIN执行计划的分析

一. 执行计划能告诉我们什么&#xff1f; SQL如何使用索引联接查询的执行顺序查询扫描的数据函数 二. 执行计划中的内容 SQL执行计划的输出可能为多行&#xff0c;每一行代表对一个数据库对象的操作 1. ID列 ID列中的如果数据为一组数字&#xff0c;表示执行SELECT语句的顺…

网络基础(十二):ACL与NAT

目录 一、ACL 1、ACL的概述 2、ACL的分类 3、ACL的应用 4、ACL的组成和基本原理 ​编辑 5、ACL的配置 5.1配置基本ACL 5.2配置高级ACL 二、NAT 1、NAT的概述 2、NAT的分类 3、NAT的工作原理 4、静态NAT的配置 5、动态NAT的配置 6、NAPT&#xff08;端口映射&am…

自动驾驶技术:驶向未来的智能之路

导言 自动驾驶技术正引领着汽车产业向着更安全、高效、智能的未来演进。本文将深入研究自动驾驶技术的核心原理、关键技术、应用场景以及对交通、社会的深远影响。 1. 简介 自动驾驶技术是基于先进传感器、计算机视觉、机器学习等技术的创新&#xff0c;旨在实现汽车在不需要人…

论文降重系统同义词替换功能的改进方向 快码论文

大家好&#xff0c;今天来聊聊论文降重系统同义词替换功能的改进方向&#xff0c;希望能给大家提供一点参考。 以下是针对论文重复率高的情况&#xff0c;提供一些修改建议和技巧&#xff0c;可以借助此类工具&#xff1a; 标题&#xff1a;论文降重系统同义词替换功能的改进方…

java21特性学习

jdk21下载地址 JDK21文件 JDK21是javaSE平台最新的长期支持版本。 Java SE Java Archive | Oracle JDK21版本说明 JDK 21 Release Notes, Important Changes, and Information JavaSE 版本字符串格式 Version-String Format JavaSE平台采用了基于时间的发布模型,JDK每六个…

虚拟化之安全虚拟化

虚拟化首次引入是在Armv7-A架构中。那时&#xff0c;Hyp模式&#xff08;在AArch32中相当于EL2&#xff09;仅在非安全状态下可用。当Armv8.4-A引入时&#xff0c;添加了对安全状态下EL2的支持作为一个可选特性。 当处理器支持安全EL2时&#xff0c;需要使用SCR_EL3.EEL2位从E…

HarmonyOS:使用MindSpore Lite引擎进行模型推理

场景介绍 MindSpore Lite 是一款 AI 引擎&#xff0c;它提供了面向不同硬件设备 AI 模型推理的功能&#xff0c;目前已经在图像分类、目标识别、人脸识别、文字识别等应用中广泛使用。 本文介绍使用 MindSpore Lite 推理引擎进行模型推理的通用开发流程。 基本概念 在进行开…

【elementui笔记:el-table表格的输入校验】

之前做得比较多的校验是在el-form表单里做的&#xff0c;但有时也遇到&#xff0c;需要在table内输入数据&#xff0c;然后校验输入的数据是否符合要求的情况。因此记录一下。 思路&#xff1a; 1.需要借助el-form的校验&#xff0c;el-table外层嵌套一层el-form&#xff0c;使…