JAVA 实现 Redis 发布订阅

Redis 发布订阅

发布订阅:消息发布者发布消息消息订阅者接收消息,两者之间通过某种媒介联系起来

例如订杂志,当自己订阅了爱格杂志,每个月会发刊一本。到发布的时候派送员将杂志送到自己手上就能看到杂志内容。只有我们订阅了该杂志才会派送给我们

Redis 发布订阅(pub/sub)是一种 消息通信模式 :发送者(pub)发送消息,订阅者(sub)接收消息。

Redis 客户端可以订阅任意数量的频道。

订阅 / 发布消息图:

图中可以看出,所需:

  1. 消息发送者 、 2. 频道 、 3. 消息订阅者

发布订阅机制

  1. 当一个客户端通过 PUBLISH 命令向订阅者发布消息的时候,称这个客户端为发布者publisher
  2. 当一个客户端通过subscribe 或者 PSUBSCRIBE 接收消息时,称这个客户端为 订阅者 subscriber
  3. 为了解耦发布者和订阅者之间的关系,Redis 使用了频道channel(频道)作为两者之间的中介,发布者直接把消息发送给 channel,而 channel 负责把消息发送给订阅者,发布者和订阅者之间没有直接的联系,都不知道对方的存在

订阅者 1,2,3 订阅了频道 channel,当有消息发布给频道时,这个消息就会被发送到三个订阅者客户端

demo 实现

学习参考链接

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

添加配置文件

spring:
  redis:
    host: 127.0.0.1
    database: 5
    password:
    port: 6379
  1. Listener模式

创建一个监听容器

@Configuration
public class CatListenerConfig extends CachingConfigurerSupport {

    /**
     * 消息监听容器
     *
     * @param factory
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory factory){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        //订阅一个通道 该处的通道名是发布消息时的名称
       container.setConnectionFactory(connectionFactory);
        //订阅了一个叫cat 的通道
        container.addMessageListener(catAdapter, new PatternTopic("cat"));
        container.addMessageListener(fishAdapter, new PatternTopic("fish"));
        return container;
    }

    

}
@Component
public class CatListener implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("我是监听者,我监听到的消息是 " + message.toString());
    }
}
package com.maoxs.listener;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;

/**
 * 监听发送的消息
 */
public class FishListener  implements MessageListener  {

    @Autowired
    RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("我是Fish监听" + message.toString());
    }
}

建测试类,测试发布监听

@RestController
public class TestController {

    @Resource
    StringRedisTemplate stringRedisTemplate;

    @PostMapping("/cat")
    public void test2(){
        stringRedisTemplate.convertAndSend("cat","测试:消息发布者发布消息");
    }
  
   @PostMapping("/fish")
    public void fish(){
        stringRedisTemplate.convertAndSend("fish","测试:消息发布者发布消息");
    }
}

测试结果

  1. Adapter模式

    2.1 配置

        /**
         * MessageListenerAdapter 模式
         * 该处topic的 key为bean的name
         * @param connectionFactory
         * @param adapterMap
         * @return
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,Map<String, MessageListenerAdapter> adapterMap) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            adapterMap.keySet().forEach(topic-> container.addMessageListener(adapterMap.get(topic),new PatternTopic(topic)));
            return container;
        }
    

    监听1

    package com.sst.loan.risk.listener;
    
    import com.sst.loan.risk.manager.RuleLoadManager;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     * 监听适配器
     *
     * @author 蔡定努
     * @date 2023/06/13 10:26
     */
    @Slf4j
    @Component("ruleRefreshAdapter")
    public class RuleRefreshAdapter  extends MessageListenerAdapter {
      
    
    
        @Override
        public void onMessage(Message message, byte[] bytes) {
            log.info(">>>>>>> 消息适配器收到刷新规则的请求 <<<<<<<<<<<<<<");
    
        }
    }
    
    

    监听2

    package com.sst.loan.risk.listener;
    
    import com.sst.loan.risk.manager.RuleLoadManager;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     * 监听适配器
     *
     * @author 蔡定努
     * @date 2023/06/13 10:26
     */
    @Slf4j
    @Component("ruleRefreshAdapter2")
    public class RuleRefreshAdapter2  extends MessageListenerAdapter {
      
    
    
        @Override
        public void onMessage(Message message, byte[] bytes) {
            log.info(">>>>>>> 消息适配器收到刷新规则的请求 <<<<<<<<<<<<<<");
            
        }
    }
    

    测试

    
        @Resource
        private StringRedisTemplate stringRedisTemplate;
    
    
        /**
         * 
         * @author 蔡定努
         */
        @GetMapping("refresh")
        public Object refresh() {
            stringRedisTemplate.convertAndSend("ruleRefreshAdapter","refresh");
            return Result.success();
        }
        
        
        /**
         * 
         * @author 蔡定努
         */
        @GetMapping("refresh")
        public Object refresh() {
            stringRedisTemplate.convertAndSend("ruleRefreshAdapter2","refresh");
            return Result.success();
        }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/28356.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言之结构体讲解

目录 结构体类型的声明 结构体初始化 结构体成员访问 结构体传参 对于上期指针初阶&#xff08;2&#xff09;我们后期还会讲数组指针是什么&#xff1f;大家可以先思考一下&#xff0c;后期我们会讲 1.结构体的声明 结构是一些值的集合&#xff0c;这些值被称为成员变量&am…

第二类曲线积分

文章目录 第二类曲线积分一、向量场是什么&#xff1f;二、向量场可视化三、计算1. 计算方式一2. 计算方式二 第二类曲线积分 因为之前学习第二类曲线的时候&#xff0c;不是很理解&#xff1b;所以最近看了mit的多元微积分课程&#xff0c;做一些课程笔记。 一、向量场是什么…

字符集和java的编码与解码

一、ASCII和GBK字符集 计算机存储一个英文字符需要一个字节。 ASCII字符集&#xff0c;包括128&#xff08;0000000B~1111111B&#xff09;个数据&#xff0c;存储英文字母和字符&#xff0c;对于欧美国家够用。 例如&#xff0c;存储字符’a’&#xff0c;查询ASCII得到为97&a…

C语言中的基本数据类型

C语言中的基本数据类型分别为以下几种 整型、浮点型、字符类型 整型又分为整型int、短整型short、长整型long 浮点型分为单精度浮点型float、双精度浮点型double 1、短整型short 2.整型 3.长整型 短整型、长整型、整形都是表示整形的&#xff0c;并且输出结果也都为10&…

【大数据之Hive】十一、Hive-HQL查询之基本查询

基础语法 select [all | distinct] select_expr,select_expr, ...from table)name --从什么表查[where where_condition] --过滤[group by col_list] --分组查询[having col_list] --分组后过滤[order by col_list] --排序[cluster by col_list | …

leetcode 152.乘积最大子数组

题目描述 给你一个整数数组 nums &#xff0c;请你找出数组中乘积最大的非空连续子数组&#xff08;该子数组中至少包含一个数字&#xff09;&#xff0c;并返回该子数组所对应的乘积。 测试用例的答案是一个 32-位 整数。 子数组 是数组的连续子序列。 来源&#xff1a;力扣&a…

C++入门攻略

C补足C语言部分缺陷 1.命名空间&#xff1a;1.1 命名空间namespace关键字1.命名空间中可以定义变量、函数、类型2.命名空间可以嵌套3.相同命名空间共存 1.2 命名空间的使用方式&#xff1a;1.名称加用域作用限定符的方式访问&#xff08;同上&#xff09;2.使用using引入某个空…

Java并发之 Lock 锁

一、Lock接口 1 Lock简介&地位&作用 锁是一种工具&#xff0c;用于控制对共享资源的访问Lock和synchronized是最常见的两个锁&#xff0c;他们都能够达到线程安全的目录&#xff0c;但是使用和功能上又有较大的不同Lock接口最常见的实现类就是ReentrantLock通常情况下…

C:\Users\BC>conda -V ‘conda‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件。

C:\Users\BC>conda -V ‘conda’ 不是内部或外部命令&#xff0c;也不是可运行的程序 或批处理文件。 注意&#xff01;&#xff1a;Anaconda安装路径和Scripts路径&#xff0c;两个都添加进去Path 解释&#xff1a;将 Anaconda 安装路径和 Scripts 路径都添加到系统的 PA…

文心一言 VS 讯飞星火 VS chatgpt (40)-- 算法导论5.4 4题

四、一次聚会需要邀请多少人&#xff0c;才能让其中 3 人的生日很可能相同? 文心一言&#xff1a; 为了解决这个问题&#xff0c;我们需要先计算出所有可能的生日组合数&#xff0c;然后计算出其中生日相同的组合数。 已知需要邀请的人数为&#xff1a;23人 根据组合数公式…

k8s如何使用ceph rbd块存储(静态供给、存储类动态供给)

目录 前言安装ceph集群ceph集群创建rbd块存储rbd块存储不支持ReadWriteManyk8s配置rbd块存储&#xff08;静态供给&#xff09;创建secret创建pv创建pvck8s节点安装客户端依赖包部署pod查看pod验证是否持久化 k8s配置rbd块存储&#xff08;动态供给&#xff09;查看官网ceph集群…

【ARM AMBA AXI 入门 6 - AXI3 协议中的锁定访问之AxLOCK信号】

文章目录 1.1 Locked accesses 1.1 Locked accesses 当主机使用 AxLOCK 信号来指示事务是锁定的事务时&#xff0c;互连(Interconnect)必须确保只有该主机可以访问目标从属区域&#xff0c;直到来自同一主机的未锁定事务完成。互连中的仲裁器(arbiter)必须执行此限制。 在主机…

湖南大学CS-2017(另一张)期末考试解析

【特别注意】 答案来源于wolf 是我在备考时自己做的&#xff0c;仅供参考&#xff0c;若有不同的地方欢迎讨论。 【试卷评析】 有必要一做。 【试卷与答案】 由于这张试卷没有电子版&#xff0c;我就直接拍我自己的作答了

Monocle2拟时基因富集分析

****Monocle2全部往期精彩系列&#xff1a;1、群成员专享&#xff1a;Monocle2更新&#xff08;就是重新梳理一下&#xff09;2、一键跑完monocle2&#xff1f;3、ggplot2个性可视化monocle2结果4、ggplot修饰monocle2拟时热图&#xff1a;一众问题全部解决5、Monocle2终极修改…

Day975.如何使用JWT结构化令牌 -OAuth 2.0

如何使用JWT结构化令牌 Hi&#xff0c;我是阿昌&#xff0c;今天学习记录的是关于如何使用JWT结构化令牌的内容。 OAuth 2.0 规范并没有约束访问令牌内容的生成规则&#xff0c;只要符合唯一性、不连续性、不可猜性就够了。这就意味着&#xff0c;可以灵活选择令牌的形式&…

天然气井远程监控解决方案

天然气井远程监控解决方案 一、项目背景 随着天然气开发规模日益增长&#xff0c;天然气井的数量也在不断增加。且位置分散环境恶劣。传统的人工巡检方式越来越不能满足天然气井的生产需求和安全保障。天然气井井由储罐和集气站组成。 集气站通过计量站将天然气输入储罐或由集…

深入学习 Mybatis 的四大组件源码

博主介绍&#xff1a; ✌博主从事应用安全和大数据领域&#xff0c;有8年研发经验&#xff0c;5年面试官经验&#xff0c;Java技术专家✌ Java知识图谱点击链接&#xff1a;体系化学习Java&#xff08;Java面试专题&#xff09; &#x1f495;&#x1f495; 感兴趣的同学可以收…

社会心理学(1) 社会心理学的定义

今天开始 我们一起学习一门课程 社会心理学 社会心理学 他是 应用心理学 或者 心理学专业的一个必修课 吴江霖教授说过 心理学应该分为两大分支 生理心理学 和 社会心理学 如果认同他的观点 那么 社会心理学可谓是相当重要了 社会心理学的定义之广可以说 有多少社会心理学教…

【MySQL数据库一】MySQL数据库初体验

MySQL数据库初体验 1.数据库基本概念1.1 数据Data1.2 表1.3 数据库1.4 数据库管理系统1.5 数据库系统 2.数据库的发展3.主流的数据库介绍3.1 SQL Server&#xff08;微软公司产品&#xff09;3.2 Oracle &#xff08;甲骨文公司产品&#xff09;3.3 DB2&#xff08;IBM公司产品…

【Spark基础编程】 第8章 Spark MLlib

系列文章目录 文章目录 系列文章目录前言【 第8章 Spark MLlib 】8.1 Spark MLlib简介8.1.1 什么是机器学习8.1.2 基于大数据的机器学习8.1.3 Spark 机器学习库MLLib 8.2 机器学习工作流8.2.1 机器学习流水线概念8.2.2 构建一个机器学习流水线 8.3 特征抽取、转化和选择8.4 分类…