Redis 发布订阅机制深入探索

Redis 的发布订阅(pub/sub)机制是一种消息传递模式,允许消息的发送者(发布者)和消息的接收者(订阅者)通过一个中介层(频道)进行通信,而无需彼此直接交互。以下是 Redis 发布订阅机制的工作原理的详细解释:

  1. 基于事件驱动的架构
    Redis 服务器使用一个事件驱动的模型来处理所有的网络通信和客户端请求。这种架构允许 Redis 以非阻塞方式高效地处理多个并发连接。

  2. 发布(Publish)
    当一个客户端想要发送消息时,它会将消息发布到一个指定的频道(channel)上。
    发布操作是通过 PUBLISH 命令实现的。
    例如,PUBLISH channel_name message 会将 message 发送到名为 channel_name 的频道。
    在这里插入图片描述
    这里返回的代表有3个监听者

  3. 订阅(Subscribe)
    客户端使用 SUBSCRIBE 命令来监听一个或多个频道的消息。
    当客户端订阅了一个频道后,它会接收到发送到这个频道的所有消息。例如,通过执行 SUBSCRIBE channel_name,客户端就可以监听 channel_name 频道上的消息。

    订阅会返回3个参数:
    1:代表订阅
    2:订阅频道
    3:发布者数量

当发布着发布消息后,订阅者会被推送如下消息
在这里插入图片描述

  1. 消息传递
    一旦有消息被发布到频道上,Redis 服务器会将这个消息分发给所有订阅了该频道的客户端。
    这种消息传递方式是多对多的:多个发布者可以向同一个频道发送消息,而所有订阅该频道的客户端都可以接收到这些消息。
  2. 非持久化的消息
    Redis 发布订阅机制中的消息是非持久化的。这意味着一旦消息被发送,它不会被存储在服务器上,无法被之后订阅该频道的客户端接收。
  3. 多路复用和非阻塞 I/O
    Redis 使用多路复用技术(如 epoll、kqueue)来同时监听多个网络连接,这使得服务器能够同时处理多个发布和订阅操作。
    使用非阻塞 I/O 确保单个操作不会阻塞整个服务器,从而提高整体性能。

在springboot项目中可以如下:
发布:

@Resource
private RedisTemplate redisTemplate;


@PostMapping(value = "/add")
public Result<?> add(@RequestBody SysApplication sysApplication) {
	sysApplicationService.save(sysApplication);
	//这里需要用JSONObject.toJSONString转成string序列化,否则直接上传对象会带上对象路径信息等
	redisTemplate.convertAndSend(RedisListenerConstant.APP_CHANNEL, JSONObject.toJSONString(sysApplication));
	return Result.OK("添加成功!");
}

订阅者: 这里写了2个listener,因为想说明可以在RedisMessageConfig 同时监听多个频道

package com.sungrowpower.redis;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.sungrowpower.common.system.dto.SysApplicationPermissionDto;
import com.sungrowpower.service.IGoodsSpuService;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

import javax.annotation.Resource;
@Configuration
public class RedisMessageAppListener implements MessageListener {
    @Resource
    private IGoodsSpuService iGoodsSpuService;
    @Override
    public void onMessage(Message message, byte[] bytes) {
        // 假设 message 体是一个 JSON 字符串
        String body = new String(message.getBody());
        // 去除类路径
//        String json = body.substring(body.indexOf("{"), body.lastIndexOf("}") + 1);
        String msg= (String) JSON.parse(body);
        SysApplicationPermissionDto app = JSONObject.parseObject(msg, SysApplicationPermissionDto.class);
        System.out.println("s========"+msg);
        try {
        //这里可以调用本身的业务逻辑
            iGoodsSpuService.operateGoodsSpu(app);
        } catch (Exception e) {
            throw new RuntimeException();
        }
    }
}

 
package com.sungrowpower.redis;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.sungrowpower.common.exception.AutoBizException;
import com.sungrowpower.common.system.dto.SysPermission;
import com.sungrowpower.service.IGoodsSpuService;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class RedisMessageMenuListener implements MessageListener {

    @Resource
    private IGoodsSpuService iGoodsSpuService;
    @Override
    public void onMessage(Message message, byte[] bytes) {
        // 假设 message 体是一个 JSON 字符串
        String body = new String(message.getBody());
        // 去除类路径
//        String json = body.substring(body.indexOf("{"), body.lastIndexOf("}") + 1);
        String msg= (String) JSON.parse(body);
        SysPermission menu = JSONObject.parseObject(msg, SysPermission.class);
        System.out.println("s========"+msg);
        try {
                //这里可以调用本身的业务逻辑
            iGoodsSpuService.operateAttrValue(menu);
        } catch (Exception e) {
            throw new RuntimeException();
        }
    }

}




package com.sungrowpower.redis;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
public class RedisMessageConfig {
    /**
     * 消息监听器适配器menu
     * @return org.springframework.data.redis.listener.adapter.MessageListenerAdapter
     */
    @Bean
    public MessageListenerAdapter listenerAdapterMenu(RedisMessageMenuListener receiver) {
        //这个地方是给 messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“onMessage”
        return new MessageListenerAdapter(receiver, "onMessage");
    }


    /**
     * 消息监听器适配器app
     * @return org.springframework.data.redis.listener.adapter.MessageListenerAdapter
     */
    @Bean
    public MessageListenerAdapter listenerAdapterApp(RedisMessageAppListener receiver) {
        //这个地方是给 messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“onMessage”
        return new MessageListenerAdapter(receiver, "onMessage");
    }

    /**
     * redis消息监听器容器
     * @return org.springframework.data.redis.listener.RedisMessageListenerContainer
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
                                                                       MessageListenerAdapter listenerAdapterMenu,
                                                                       MessageListenerAdapter listenerAdapterApp) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅了一个叫 my-channel 的频道
        container.addMessageListener(listenerAdapterMenu, new PatternTopic("menu_channel"));
        container.addMessageListener(listenerAdapterApp, new PatternTopic("app_channel"));
        // 这个container 可以添加多个 messageListener
        return container;
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/213045.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

半导体工艺发展概述

集成电路发展到今天&#xff0c;经历从1940年的PN结发现&#xff0c;到1950年BJT三极管发明&#xff0c;再到1963年CMOS电路发明。从单纯基于Si的半导体电路&#xff0c;再到GaAs, GaN&#xff0c;SiGe, InP等化合物半导体集成电路。不断的通过化学材料配比&#xff0c;基本单元…

TinyVue 组件库助力赛意信息获得工业软件种子奖

首先恭喜广州赛意信息科技股份有限公司荣获工业软件种子奖&#xff01;在本次大赛中&#xff0c;凭借“数据驱动智造&#xff0c;基于 iDME 的赛意新一代 SMOM 赋能电子行业制造运营管理解决方案”这一作品脱颖而出~ 大赛简介 10月30日至10月31日&#xff0c;由广东省工业和信…

圆通速递查询入口,以表格的形式导出单号的每一条物流信息

批量查询圆通速递单号的物流信息&#xff0c;以表格的形式导出单号的每一条物流信息。 所需工具&#xff1a; 一个【快递批量查询高手】软件 圆通速递单号若干 操作步骤&#xff1a; 步骤1&#xff1a;运行【快递批量查询高手】软件&#xff0c;并登录 步骤2&#xff1a;点击…

Hadoop——分布式计算MapReduce和资源调度Yarn

分布式计算 MapReduce YARN架构 YARN集群部署 一、Hadoop安装目录下/etc/hadoop修改mapred-env配置文件&#xff0c;mapred-site.xml文件 二、etc/hadoop文件内&#xff0c;修改yarn-env.sh&#xff0c;yarn-site.xml 三、将配置好的文件分发到其他服务节点 start-dfs.…

SLAM ORB-SLAM2(10)轨迹跟踪过程

SLAM ORB-SLAM2(10)轨迹跟踪过程 1. 总体过程2. ORB 特征点提取2.1. 相机数据处理2.1.1. 单目相机图像处理2.1.2. 双目相机图像处理2.1.3. RGBD相机图像处理2.2. ORB 特征点3. 地图初始化3.1. 坐标形式3.2. 坐标原点3.3. 地图尺度4. 相机位姿初始估计4.1. 关键帧4.2. 运动模型…

文件搜索神器—Everything,结合内网穿透秒变在线搜索神器!

Everythingcpolar搭建在线资料库&#xff0c;实现随时随地访问 文章目录 Everythingcpolar搭建在线资料库&#xff0c;实现随时随地访问前言1.软件安装完成后&#xff0c;打开Everything2.登录cpolar官网 设置空白数据隧道3.将空白数据隧道与本地Everything软件结合起来总结 前…

【每日一题】1423. 可获得的最大点数-2023.12.3

题目&#xff1a; 1423. 可获得的最大点数 几张卡牌 排成一行&#xff0c;每张卡牌都有一个对应的点数。点数由整数数组 cardPoints 给出。 每次行动&#xff0c;你可以从行的开头或者末尾拿一张卡牌&#xff0c;最终你必须正好拿 k 张卡牌。 你的点数就是你拿到手中的所有…

【Android】解决安卓中并不存在ActivityMainBinding

安卓中并不存在ActivityMainBinding这个类&#xff0c;这个类是在XML布局的最外层加入就会自动生成。但是你在最后绑定主布局时会报错获取不到根节点getRoot(). 最好的办法就是&#xff0c;删除原来的最外层节点&#xff0c;再重新添加&#xff0c;感觉是因为复制时并没有让系…

[蓝桥杯 2020 省 AB1] 解码

做题前思路&#xff1a; 1.因为是多组输入&#xff0c;又包含字符于是我们可以先定义一个char类型数组arr 2.定义数组的长度&#xff1a;题目说简写&#xff08;字母加数字&#xff09;长度不超过100&#xff0c;但原来的长度可能超过100&#xff0c;加上小明不会将连续超过9…

Pandas时序数据分析实践—基础(1)

目录 1. Pandas基本结构2. Pandas数据类型2.1. 类型概述2.1.1. 整数类型&#xff08;int&#xff09;&#xff1a;2.1.2. 浮点数类型&#xff08;float&#xff09;&#xff1a;2.1.3. 布尔类型&#xff08;bool&#xff09;&#xff1a;2.1.4. 字符串类型&#xff08;object&a…

树_对称二叉树

//给你一个二叉树的根节点 root &#xff0c; 检查它是否轴对称。 // // // // 示例 1&#xff1a; // // //输入&#xff1a;root [1,2,2,3,4,4,3] //输出&#xff1a;true // // // 示例 2&#xff1a; // // //输入&#xff1a;root [1,2,2,null,3,null,3] //输出…

JVM内存结构:StringTable与常量池关系

首先看一道题 这就涉及到StringTable和常量池&#xff0c;答案在文末&#xff0c;全做对就不用看了 而StringTable的位置在不同版本也有变化 &#xff0c; 我们只探讨jdk1.8版本 与StringTable 串池对应的是常量池 案例一、常量池和串池联系 引用所指肯定不会是常量池中的字…

vue3中如何实现事件总线eventBus

使用插件 由于vue3中 “$ on”&#xff0c;$ off 和 $ once 实例方法已被移除&#xff0c;组件实例不再实现事件触发接口 所以我们可以使用官方推荐的这个第三方库实现同样的效果 mitt https://github.com/developit/mitt 安装 pnpm install mitt -S挂载全局写法 main.ts 初始…

机械专业个人简历17篇

以下简历内容以机械专业相关岗位招聘需求为背景&#xff0c;我们整理了17篇且具有参考价值的简历案例&#xff0c;大家可以灵活借鉴&#xff0c;助理大家在众多候选人中脱颖而出。 机械专业简历模板下载&#xff08;可在线编辑制作&#xff09;&#xff1a;来幻主简历&#xf…

使用python streamlit库快速创建一个购物网站

streamlit Streamlit 是一个基于 Python 的 Web 应用程序框架&#xff0c;致力于以更高效、更灵活的方式可视化数据&#xff0c;并分析结果。 Streamlit是一个开源库&#xff0c;可以帮助数据科学家和学者在短时间内开发机器学习 (ML) 可视化仪表板。只需几行代码&#xff0c…

封装了一个顺滑嵌套滚动的框架

首先查看效果图 就是开始滚动的时候&#xff0c;上面的头部和下面的内容是 一起滚动的&#xff0c;但是当滚动到segment 的时候&#xff0c;segment 是悬停 的&#xff0c;下面的tableView是分区的 架构设计 我们设计一个架构&#xff0c;以下面的tablView为主体&#xff0…

大数据湖项目建设方案:文档全文101页,附下载

关键词&#xff1a;大数据解决方案&#xff0c;数据湖解决方案&#xff0c;数据治理解决方案&#xff0c;数据中台解决方案 一、大数据湖建设思路 1、明确目标和定位&#xff1a;明确大数据湖的目标和定位是整个项目的基础&#xff0c;这可以帮助我们确定项目的内容、规模、所…

P3 Linux应用编程:系统调用与库函数

前言 &#x1f3ac; 个人主页&#xff1a;ChenPi &#x1f43b;推荐专栏1: 《C_ChenPi的博客-CSDN博客》✨✨✨ &#x1f525; 推荐专栏2: 《Linux C应用编程&#xff08;概念类&#xff09;_ChenPi的博客-CSDN博客》✨✨✨ &#x1f6f8;推荐专栏3: ​​​​​​《 链表_Chen…

Kubernetes入门学习(下)

Kubernetes入门学习&#xff08;下&#xff09; 文章目录 Kubernetes入门学习&#xff08;下&#xff09;运行有状态的应用ConfigMap与SecretConfigMapSecret 卷(Volume)StatefulSet(有状态应用集)Headless Service(无头服务)Mysql主从复制Port-forward端口转发Helm参考 运行有…

Java中异常处理顺序和全局异常处理器

异常处理顺序 我们直接通过代码看下Java中异常的处理顺序。 数组越界异常属于运行时异常&#xff0c;被捕捉后就停止了&#xff0c;打印结果为数组越界了。 Test public void test2(){int[] arr new int[4];try{System.out.println(arr[5]);}catch (ArrayIndexOutOfBoundsE…