redis原理 6:小道消息 —— PubSub

前面我们讲了 Redis 消息队列的使用方法,但是没有提到 Redis 消息队列的不足之处,那就是它不支持消息的多播机制

img
img

消息多播

消息多播允许生产者生产一次消息,中间件负责将消息复制到多个消息队列,每个消息队列由相应的消费组进行消费。它是分布式系统常用的一种解耦方式,用于将多个消费组的逻辑进行拆分。支持了消息多播,多个消费组的逻辑就可以放到不同的子系统中。

如果是普通的消息队列,就得将多个不同的消费组逻辑串接起来放在一个子系统中,进行连续消费。

img
img

PubSub

为了支持消息多播,Redis 不能再依赖于那 5 种基本数据类型了。它单独使用了一个模块来支持消息多播,这个模块的名字叫着 PubSub,也就是 PublisherSubscriber,发布者订阅者模型。我们使用 Python 语言来演示一下 PubSub 如何使用。

python复制代码# -*- coding: utf-8 -*-
import time
import redis

client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("codehole")
time.sleep(1)
print p.get_message()
client.publish("codehole""java comes")
time.sleep(1)
print p.get_message()
client.publish("codehole""python comes")
time.sleep(1)
print p.get_message()
print p.get_message()
python复制代码{'pattern'None'type''subscribe''channel''codehole''data'1L}
{'pattern'None'type''message''channel''codehole''data''java comes'}
{'pattern'None'type''message''channel''codehole''data''python comes'}
None

img 客户端发起订阅命令后,Redis 会立即给予一个反馈消息通知订阅成功。因为有网络传输延迟,在 subscribe 命令发出后,需要休眠一会,再通过 get\_message 才能拿到反馈消息。客户端接下来执行发布命令,发布了一条消息。同样因为网络延迟,在 publish 命令发出后,需要休眠一会,再通过 get\_message 才能拿到发布的消息。如果当前没有消息,get\_message 会返回空,告知当前没有消息,所以它不是阻塞的。

Redis PubSub 的生产者和消费者是不同的连接,也就是上面这个例子实际上使用了两个 Redis 的连接。这是必须的,因为 Redis 不允许连接在 subscribe 等待消息时还要进行其它的操作。

在生产环境中,我们很少将生产者和消费者放在同一个线程里。如果它们真要在同一个线程里,何必通过中间件来流转,直接使用函数调用就行。所以我们应该将生产者和消费者分离,接下来我们看看分离后的代码要怎么写。

消费者

py复制代码# -*- coding: utf-8 -*-
import time
import redis

client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("codehole")
while True:
    msg = p.get_message()
    if not msg:
        time.sleep(1)
        continue
    print msg

生产者

py复制代码# -*- coding: utf-8 -*-
import redis

client = redis.StrictRedis()
client.publish("codehole""python comes")
client.publish("codehole""java comes")
client.publish("codehole""golang comes")

必须先启动消费者,然后再执行生产者,消费者我们可以启动多个,pubsub 会保证它们收到的是相同的消息序列。

python复制代码{'pattern'None'type''subscribe''channel''codehole''data'1L}
{'pattern'None'type''message''channel''codehole''data''python comes'}
{'pattern'None'type''message''channel''codehole''data''java comes'}
{'pattern'None'type''message''channel''codehole''data''golang comes'}

我们从消费者的控制台窗口可以看到上面的输出,每个消费者窗口都是同样的输出。第一行是订阅成功消息,它很快就会输出,后面的三行会在生产者进程执行的时候立即输出。 上面的消费者是通过轮询 get_message 来收取消息的,如果收取不到就休眠 1s。这让我们想起了第 3 节的消息队列模型,我们使用 blpop 来代替休眠来提高消息处理的及时性。

PubSub 的消费者如果使用休眠的方式来轮询消息,也会遭遇消息处理不及时的问题。不过我们可以使用 listen 来阻塞监听消息来进行处理,这点同 blpop 原理是一样的。下面我们改造一下消费者

阻塞消费者

py复制代码# -*- coding: utf-8 -*-
import time
import redis

client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("codehole")
for msg in p.listen():
    print msg

代码简短了很多,不需要再休眠了,消息处理也及时了。

模式订阅

上面提到的订阅模式是基于名称订阅的,消费者订阅一个主题是必须明确指定主题的名称。如果我们想要订阅多个主题,那就 subscribe 多个名称。

sh复制代码> subscribe codehole.image codehole.text codehole.blog  # 同时订阅三个主题,会有三条订阅成功反馈信息
1) "subscribe"
2) "codehole.image"
3) (integer) 1
1) "subscribe"
2) "codehole.text"
3) (integer) 2
1) "subscribe"
2) "codehole.blog"
3) (integer) 3

这样生产者向这三个主题发布的消息,这个消费者都可以接收到。

sh复制代码> publish codehole.image https://www.google.com/dudo.png
(integer) 1
> publish codehole.text " 你好,欢迎加入码洞 "
(integer) 1
> publish codehole.blog '{"content": "hello, everyone", "title": "welcome"}'
(integer) 1

如果现在要增加一个主题codehole.group,客户端必须也跟着增加一个订阅指令才可以收到新开主题的消息推送。

为了简化订阅的繁琐,redis 提供了模式订阅功能Pattern Subscribe,这样就可以一次订阅多个主题,即使生产者新增加了同模式的主题,消费者也可以立即收到消息

bash复制代码> psubscribe codehole.*  # 用模式匹配一次订阅多个主题,主题以 codehole. 字符开头的消息都可以收到
1) "psubscribe"
2) "codehole.*"
3) (integer) 1

消息结构

前面的消费者消息输出时都是下面的这样一个字典形式

python复制代码{'pattern'None'type''subscribe''channel''codehole''data'1L}
{'pattern'None'type''message''channel''codehole''data''python comes'}
{'pattern'None'type''message''channel''codehole''data''java comes'}
{'pattern'None'type''message''channel''codehole''data''golang comes'}

那这几个字段是什么含义呢?

data 这个毫无疑问就是消息的内容,一个字符串。

channel 这个也很明显,它表示当前订阅的主题名称。

type 它表示消息的类型,如果是一个普通的消息,那么类型就是 message,如果是控制消息,比如订阅指令的反馈,它的类型就是 subscribe,如果是模式订阅的反馈,它的类型就是 psubscribe,还有取消订阅指令的反馈 unsubscribe 和 punsubscribe。

pattern 它表示当前消息是使用哪种模式订阅到的,如果是通过 subscribe 指令订阅的,那么这个字段就是空。

PubSub 缺点

PubSub 的生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息直接丢弃。如果开始有三个消费者,一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息。但是挂掉的消费者重新连上的时候,这断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了。

如果 Redis 停机重启,PubSub 的消息是不会持久化的,毕竟 Redis 宕机就相当于一个消费者都没有,所有的消息直接被丢弃。

正是因为 PubSub 有这些缺点,它几乎找不到合适的应用场景。所以 Redis 的作者单独开启了一个项目 Disque 专门用来做多播消息队列。该项目目前没有成熟,一直长期处于 Beta 版本,但是相应的客户端 sdk 已经非常丰富了,就待 Redis 作者临门一脚发布一个 Release 版本。关于 Disque 的更多细节,本小册不会多做详细介绍,感兴趣的同学可以去阅读相关文档。

补充

近期 Redis5.0 新增了 Stream 数据结构,这个功能给 Redis 带来了持久化消息队列,从此 PubSub 可以消失了,Disqueue 估计也永远发不出它的 Release 版本了。

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/59438.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Leetcode刷题】位运算

本篇文章为 LeetCode 位运算模块的刷题笔记,仅供参考。 位运算的常用性质如下: a ^ a 0 a ^ 0 a a ^ 0xFFFFFFFF ~a目录 一. 基本位运算Leetcode29.两数相除Leetcode89.格雷编码 二. 位运算的性质Leetcode136.只出现一次的数字Leetcode137.只出现一…

好用的数据库管理软件之idea(idea也有数据库???)

1.建立maven项目(maven项目添加依赖,对于后期连接数据库很方便) 2.连接数据库。。。 这里一定注意端口号,不要搞错了 和上一张图片不一样哦 3.数据库测试代码。。。 然后你就可以在这里边写MySQL代码了,这个工具对于新…

RunnerGo条件控制器使用方法

在做性能测试时我们需要根据业务需求、业务场景来配置测试脚本,举个例子:在登录注册场景中,可能会有账号密码全部正确、账号格式错误、密码错误等多种情况,这里的“登录/注册”事件可以视为一个场景。一个真实业务中的场景&#x…

人工智能学习07--pytorch23--目标检测:Deformable-DETR训练自己的数据集

参考 https://blog.csdn.net/qq_44808827/article/details/125326909https://blog.csdn.net/dystsp/article/details/125949720?utm_mediumdistribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-125949720-blog-125326909.235^v38^pc_releva…

【Python】Web学习笔记_flask(3)——上传文件

用GET、POST请求上传图片并呈现出来 首先还是创建文件上传的模板 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>上传图片</title> </head> <body> <form action""…

MySQL 远程操作mysql

可以让别人在他们的电脑上操作我电脑上的数据库 create user admin identified with mysql_native_password by admin; //设置账号密码都为admingrant all on *.* to admin; //给admin账号授权 授权完成

直播课 | 大橡科技研发总监丁端尘博士“类器官芯片技术在新药研发中的应用”

从类器官到类器官芯片&#xff0c;正在生物科学领域大放异彩。 药物研发需要新方法 众所周知&#xff0c;一款新药是一个风险大、周期长、成本高的艰难历程&#xff0c;国际上有一个传统的“双十”说法——10年时间&#xff0c;10亿美金&#xff0c;才可能成功研发出一款新药…

【安全测试】Web应用安全之XSS跨站脚本攻击漏洞

目录 前言 XSS概念及分类 反射型XSS(非持久性XSS) 存储型XSS(持久型XSS) 如何测试XSS漏洞 方法一&#xff1a; 方法二&#xff1a; XSS漏洞修复 原则&#xff1a;不相信客户输入的数据 处理建议 资料获取方法 前言 以前都只是在各类文档中见到过XSS&#xff0c;也进…

微信小程序animation动画,微信小程序animation动画无限循环播放

需求是酱紫的&#xff1a; 页面顶部的喇叭通知&#xff0c;内容不固定&#xff0c;宽度不固定&#xff0c;就是做走马灯&#xff08;轮播&#xff09;效果&#xff0c;从左到右的走马灯&#xff08;轮播&#xff09;&#xff0c;每播放一遍暂停 1500ms &#xff5e; 2000ms 刚…

Python web实战之 Django 的模板语言详解

关键词&#xff1a; Python、web开发、Django、模板语言 概要 作为 Python Web 开发的框架之一&#xff0c;Django 提供了一套完整的 MVC 模式&#xff0c;其中的模板语言为开发者提供了强大的渲染和控制前端的能力。本文介绍 Django 的模板语言。 1. Django 模板语言入门 Dj…

宇树Unitree Z1机械臂使用教程

宇树Unitree Z1机械臂使用教程 作者&#xff1a;Herman Ye Galbot Auromix Auromix是一个机器人爱好者组织&#xff0c;欢迎参与我们Github上的开源项目 更新日期&#xff1a;2023/08/04 注意&#xff1a;此文档在该日期下测试有效。 以下内容参考宇树官方的Z1 Docs。 由宇树…

JavaScript |(四)正则表达式 | 尚硅谷JavaScript基础实战

学习来源&#xff1a;尚硅谷JavaScript基础&实战丨JS入门到精通全套完整版 文章目录 &#x1f4da;正则表达式&#x1f4da;正则表达式字面量方式&#x1f4da;字符串&正则表达式&#x1f407;split()&#x1f407;search()&#x1f407;match()&#x1f407;replace()…

Vue3_语法糖—— <script setup>以及unplugin-auto-import自动引入插件

<script setup>import { ref , onMounted} from vue;let obj ref({a: 1,b: 2,}); let changeObj ()>{console.log(obj)obj.value.c 3 //ref写法}onMounted(()>{console.log(obj)})</script> 里面的代码会被编译成组件 setup() 函数的内容。 相当于 <…

牛客网Verilog刷题——VL51

牛客网Verilog刷题——VL51 题目答案 题目 请编写一个十六进制计数器模块&#xff0c;计数器输出信号递增每次到达0&#xff0c;给出指示信号zero&#xff0c;当置位信号set 有效时&#xff0c;将当前输出置为输入的数值set_num。模块的接口信号图如下&#xff1a; 模块的时序图…

Redis的Java客户端

目录 1.Jedis的使用 前置工作-ssh进行端口转发 JedisAPI的使用 Jedis连接池 2.SpringDataRedis的使用 1.创建项目 2.配置文件 3.注入RedisTemplate对象 4.编写代码 3.SpringRedisTemplate 哈希结构用法 ​总结 1.Jedis的使用 Jedis&#xff1a;以Redis命令作为方法…

mysql重置和修改密码 Ubuntu系统

忘记密码要重置密码 cat /etc/mysql/debian.cnf/etc/mysql/debian.cnf这个只有Debian或者Ubuntu服务器才有&#xff0c;里面有mysql安装后自带的用户&#xff0c;作用就是重启及运行mysql服务。我们用这个用户登录来达到重置密码的操作 使用上面的那个文件中的用户名和密码登…

强化学习分享(一) DQN算法原理及实现

摘要&#xff1a;主要讲解DQN算法的原理&#xff0c;伪代码解读&#xff0c;基于pytorch版本的DQN小游戏编程&#xff0c;同时对该代码进行详细标注&#xff0c;以及奉上原码。 &#xff08;一&#xff09;强化学习算法介绍 DQN&#xff0c;顾名思义&#xff0c;Deep Q Learni…

SpringCloud《Eureka、Ribbon、Feign、Hystrix、Zuul》作用简单介绍

概述 SpringCloud是一个全家桶&#xff0c;包含多个组件。 本文主要介绍几个重要组件&#xff0c;也就是Eureka、Ribbon、Feign、Hystrix、Zuul这几个组件。 一、业务场景介绍 业务流程&#xff0c;支付订单功能 订单服务改变为已支付订单服务调用库存服务&#xff0c;扣减…

中国农村程序员学习此【正则表达式进阶】发明cahtGPT,购买大平层,开上帕拉梅拉,迎娶白富美出任CEO走上人生巅峰

注&#xff1a;最后有面试挑战&#xff0c;看看自己掌握了吗 文章目录 限制可能的用户名匹配空白字符匹配非空白字符指定匹配的上限和下限只指定匹配的下限指定匹配的确切数量检查全部或无正向先行断言和负向先行断言检查混合字符组使用捕获组重用模式使用捕获组搜索和替换删除…

分类预测 | MATLAB实现WOA鲸鱼算法同步优化特征选择结合支持向量机分类预测

分类预测 | MATLAB实现WOA鲸鱼算法同步优化特征选择结合支持向量机分类预测 目录 分类预测 | MATLAB实现WOA鲸鱼算法同步优化特征选择结合支持向量机分类预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 MATLAB实现WOA鲸鱼算法同步优化特征选择结合支持向量机分类预测…