Spring boot 使用Redis 消息发布订阅

Spring boot 使用Redis 消息发布订阅

文章目录

  • Spring boot 使用Redis 消息发布订阅
    • Redis 消息发布订阅
        • Redis 发布订阅 命令
    • Spring boot 实现消息发布订阅
      • 发布消息
      • 消息监听
      • 主题订阅
    • Spring boot 监听 Key 过期事件
      • 消息监听
      • 主题订阅

最近在做请求风控的时候,在网上搜集了大量的解决方案,最后使用Redis 消息发布订阅 比较符合业务。做一下记录

img

Redis 消息发布订阅

img

Redis 发布订阅 命令:redis命令手册

1、Redis 中"pub/sub"的消息,为"即发即失",server 不会保存消息,如果 publish 的消息没有任何 client 处于 “subscribe” 状态,消息将会被丢弃;如果 client 在 subcribe 时,链接断开后重连,那在么此期间的消息也将丢失。

2、Redis server 将会"尽力"将消息发送给处于 subscribe 状态的 client,但是仍不会保证每条消息都能被正确接收。

**优点:**支持发布订阅,支持多组生产者、消费者处理消息

缺点:

  1. 消费者下线数据会丢失

  2. 不支持数据持久化,Redis宕机则数据也会丢失

  3. 消息堆积,缓存区溢出,消费者会被强制踢下线,数据也会丢失

Redis 发布订阅 命令
命令描述
Redis Unsubscribe 命令指退订给定的频道。
Redis Subscribe 命令订阅给定的一个或多个频道的信息。
Redis Pubsub 命令查看订阅与发布系统状态。
Redis Punsubscribe 命令退订所有给定模式的频道。
Redis Publish 命令将信息发送到指定的频道。
Redis Psubscribe 命令订阅一个或多个符合给定模式的频道。

Spring boot 实现消息发布订阅

1、引入 Redis 依赖

    <!--Spring Boot redis 启动器-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

2、Redis 数据库配置

spring:
  data:
    redis:
      database: 0
      host: localhost
      port: 6379
      password:

发布消息

	/**
     * redis 将信息发送到指定的频道
     * @param topic   :消息所属的主题/频道
     * @param context :消息内容
     * @return
     */
	redisTemplate.convertAndSend(topic, context);

@RequiredArgsConstructor
@Service
public class RequestRateLimiterService {

	private final RedisTemplate<String, Object> redisTemplate;

	// Redis 中的 key 前缀
	private static final String REDIS_KEY_PREFIX = "select_rate_limit:";

	// Redis 中的通道名称
	private static final String REDIS_CHANNEL = "select_rate_limit_channel";

    // 根据用户名 请求风控
	public boolean allowRequest(String username) {
		
		// 每分钟最大请求次数
		Long MAX_REQUESTS_PER_MINUTE = 60L;

		String key = REDIS_KEY_PREFIX + username;
		Long currentRequests = redisTemplate.opsForValue().increment(key);
		if (currentRequests != null && currentRequests > MAX_REQUESTS_PER_MINUTE) {
			redisTemplate.convertAndSend(REDIS_CHANNEL, username);
			return false; // 超过阈值,拒绝请求
		}
		if (currentRequests != null && currentRequests == 1) {
			redisTemplate.expire(key, 1, TimeUnit.MINUTES); // 设置过期时间为1分钟
		}
		return true; // 允许请求
	}

}

消息监听

1、 Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理。

/**
 * Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理
 * <p>
 * 1、可以直接实现 MessageListener 接口,也可以继承它的实现类 MessageListenerAdapter.
 * 2、自动多线程处理,打印日志即可看出,即使手动延迟,也不会影响后面消息的接收。
 *
 */
@Component
public class RequestRateLimitSubscriber implements MessageListener {
    // 直接从容器中获取
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    /**
     * 监听到的消息必须进行与发送时相同的方式进行反序列
     * 1、订阅端与发布端 Redis 序列化的方式必须相同,否则会乱码。
     *
     * @param message :消息实体
     * @param pattern :匹配模式
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 消息订阅的匹配规则,如 new PatternTopic("basic-*") 中的 basic-*
        String msgPattern = new String(pattern);
        // 消息所属的通道,可以根据不同的通道做不同的业务逻辑
        String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
        // 接收的消息内容,可以根据自己需要强转为自己需要的对象,但最好先使用 instanceof 判断一下
        Object body = redisTemplate.getValueSerializer().deserialize(message.getBody());
 
        log.info("收到 Redis 订阅消息: channel={} body={} pattern={} ", channel, body, msgPattern);
 
        // 模拟数据处理 ********
        // 发送警告通知,可以通过邮件、短信等方式进行通知
        log.info("------------数据处理完成.......");
    }
}

主题订阅

1、自定义 RedisTemplate 序列化方式(发布者和订阅者必须相同)。

2、配置主题订阅 - Redis 消息监听器绑定监听指定通道。

/**
 * 自定义 RedisTemplate 序列化方式
 * 配置主题订阅 - Redis 消息监听器绑定监听指定通道
 */
@Configuration
public class RedisConfig {
    // 自定义的消息订阅监听器,当收到阅订的消息时,会将消息交给这个类处理
    @Resource
    private RequestRateLimitSubscriber requestRateLimitSubscriber;
 
    //  自定义 RedisTemplate 序列化方式   
    @Bean
	public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
		RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
		redisTemplate.setKeySerializer(RedisSerializer.string());// key 序列化规则
		redisTemplate.setHashKeySerializer(RedisSerializer.string());// hash key 序列化规则
		redisTemplate.setValueSerializer(RedisSerializer.java());// value 序列化规则
		redisTemplate.setHashValueSerializer(RedisSerializer.java()); // hash value 序列化规则
		redisTemplate.setConnectionFactory(factory); //绑定 RedisConnectionFactory
		return redisTemplate; //返回设置好的 RedisTemplate
	}
    /**
     * 配置主题订阅
     * RedisMessageListenerContainer - Redis 消息监听器绑定监听指定通道
     * 1、可以添加多个监听器,监听多个通道,只需要将消息监听器与订阅的通道/主题绑定即可。
     * 2、订阅的通道可以配置在全局配置文件中,也可以配置在数据库中,
     * <p>
     * addMessageListener(MessageListener listener, Collection<? extends Topic> topics):将消息监听器与多个订阅的通道/主题绑定
     * addMessageListener(MessageListener listener, Topic topic):将消息监听器与订阅的通道/主题绑定
     *
     * @param connectionFactory
     * @return
     */
	@Bean
	public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
		// 设置连接工厂,RedisConnectionFactory 可以直接从容器中取,也可以从 RedisTemplate 中取
		container.setConnectionFactory(factory);
		// 订阅名称叫 select_rate_limit_channel 的通道, 类似 Redis 中的 subscribe 命令
		container.addMessageListener(requestRateLimitSubscriber, new ChannelTopic("*"));
		// 订阅名称以 'basic-' 开头的全部通道, 类似 Redis 的 pSubscribe 命令
		container.addMessageListener(requestRateLimitSubscriber, new PatternTopic("*"));
		return container;

	}
}

Spring boot 监听 Key 过期事件

1、Redis 数据库可以通过命令设置 Key 的有效时间,当一个 Key 过期后会自动从数据库中删除,释放空间。得益于于这个特性,可以很轻松地实现诸多类似于 “Session” 管理、数据缓存等功能。它们都有一个共同点就是,数据不会永久保存!

2、在有些场景中,可能希望在某些 Key 过期的时候获取到通知,进行一些业务处理。或者是干脆用于 “定时通知/任务” 功能,例如:下单 30 分钟后未支付,则取消订单。那么可以在用户下单的时候使用订单号作为 key 设置到 Redis 数据库中,并且设置过期时间为 30 分钟。当超时后,可以在 “key 过期通知” 中获取到 key 也就是订单号,判断用户是否已经支付从而是否取消订单。

3、Redis 的 Key 过期通知功能本质上是通过 发布/订阅 功能实现的,所以它「不能保证通知消息的交付」,当 Key 过期时如果服务器停机、重启后则该通知消息会永久丢失。

消息监听

1、Spring Data Redis 专门提供了一个密钥过期事件消息侦听器:KeyExpirationEventMessageListener,自定义监听器类继承它,然后覆写 doHandleMessage(Message message) 方法即可。

2、doHandleMessage 方法用于处理 Redis Key 过期通知事件,其中 Message 参数表示通知消息,只有 2 属性,分别表示消息正文(在这里就是过期的 Key 名称)以及来自于哪个 channel。

3、在 Redis Key 过期事件中,「只能获取到已过期的 Key 的名称,不能获取到值。」

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
 * Redis 缓存 Key 过期监听器
 * Spring Data Redis 专门提供了一个密钥过期事件消息侦听器:KeyExpirationEventMessageListener,
 * 自定义监听器类继承它,然后覆写 doHandleMessage(Message message) 方法即可。
 */
@Component
public class KeyExpireListener extends KeyExpirationEventMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(KeyExpireListener.class);
    /**
     * 通过构造函数注入 RedisMessageListenerContainer 给 KeyExpirationEventMessageListener
     *
     * @param listenerContainer : Redis消息侦听器容器
     */
    public KeyExpireListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    /**
     * doHandleMessage 方法用于处理 Redis Key 过期通知事件,
     * 在 Redis Key 过期事件中,「只能获取到已过期的 Key 的名称,不能获取到值。」
     *
     * @param message:通知消息,只有 2 属性,分别表示消息正文(在这里就是过期的 Key 名称)以及来自于哪个 channel。
     */
    @Override
    public void doHandleMessage(Message message) {
        // 过期的 key
        String key = new String(message.getBody());
        // 消息通道
        String channel = new String(message.getChannel());
        logger.info("过期key={} 消息通道(channel)={}", key, channel);
    }
}

主题订阅

1、与上面稍微有点不同,因为 key 过期事件属于 Redis 内部消息,内部频道/通道,所以只需要往容器中注入 RedisMessageListenerContainer 就行,不需要 addMessageListener 手动设置监听器 监听指定的通道/频道(topic 表达式)。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisConfig {
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        
    //  container.setTaskExecutor(null);            // 设置用于执行监听器方法的 Executor
    //  container.setErrorHandler(null);            // 设置监听器方法执行过程中出现异常的处理器
    //  container.addMessageListener(null, null);   // 手动设置监听器 & 监听的 topic 表达式
        return container;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/224770.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

面试常问的dubbo的spi机制到底是什么?(上)

前言 dubbo是一款微服务开发框架&#xff0c;它提供了 RPC通信 与 微服务治理 两大关键能力。作为spring cloud alibaba体系中重要的一部分&#xff0c;随着spring cloud alibaba在国内活跃起来&#xff0c;dubbo也越来越深受各大公司的青睐。本文就来对dubbo的spi机制源码进行…

Mysql的所有数据类型和它们的区别

一、数值类型 1. 普通整数数值类型 以下数据类型只能用以保存整数 整数数值类型类型存储大小&#xff08;字节&#xff09;有符号的取值范围&#xff08;允许存在负数&#xff09;无符号的取值范围TINYINT1-128 ~ 1270 ~ 255SMALLINT2- 327678 ~ 327670 ~ 65535MEDIUMINT3- 8…

有向图的拓扑序列(拓扑排序)

给定一个 n 个点 m 条边的有向图&#xff0c;点的编号是 1 到 n&#xff0c;图中可能存在重边和自环。 请输出任意一个该有向图的拓扑序列&#xff0c;如果拓扑序列不存在&#xff0c;则输出 −1。 若一个由图中所有点构成的序列 A 满足&#xff1a;对于图中的每条边 (x,y)&a…

zabbix的自动发现机制,代理功能,SNMP监控

1.zabbix自动发现机制 zabbix客户端主动和服务端联系&#xff0c;将自己的地址和端口发送服务端&#xff0c;实现自动添加监控主机 客户端是主动的一方。 缺点&#xff1a;自定义网段中主机数量太多&#xff0c;登记耗时会很久&#xff0c;而且这个自动发现机制不是很稳定 …

CTF刷题记录

刷题 我的md5脏了KFC疯狂星期四坤坤的csgo邀请simplePHPcurl 我的md5脏了 g0at无意间发现了被打乱的flag&#xff1a;I{i?8Sms??Cd_1?T51??F_1?} 但是好像缺了不少东西&#xff0c;flag的md5值已经通过py交易得到了&#xff1a;88875458bdd87af5dd2e3c750e534741 flag…

geemap学习笔记021:提取页面交互区域像素值

前言 本节介绍的内容是如何提取交互界面中的单一像素值以及区域像素均值等&#xff0c;并且导出为CSV或者SHP文件。 1 导入库并显示地图 import ee import geemap import osee.Initialize() Map geemap.Map() Map2 交互提取像素值 2.1 加载数据 landsat7 ee.Image(LANDS…

Spring Cloud + Vue前后端分离-第4章 使用Vue cli 4搭建管理控台

Spring Cloud Vue前后端分离-第4章 使用Vue cli 4搭建管理控台 4-1 使用vue cli创建admin项目 Vue 简介 Vue作者尤雨溪在google工作时&#xff0c;最早只想研究angular的数据绑定功能&#xff0c;后面觉得这个小功能很好用&#xff0c;有前景&#xff0c;就再扩展&#xff…

C语言之数组精讲(2)

目录 数组的复制 输入数组元素的值 对数组的元素进行倒序排列 使用数组进行成绩处理 对象式宏 数组元素的最大值和最小值 赋值表达式的判断 数组的元素个数 结语 数组的复制 我们把数组中的元素全部复制到另一个数组中。 #include<stdio.h>int main() {int i;int…

用23种设计模式打造一个cocos creator的游戏框架----(三)外观模式模式

1、模式标准 模式名称&#xff1a;外观模式 模式分类&#xff1a;结构型 模式意图&#xff1a;为一组复杂的子系统提供了一个统一的简单接口。这个统一接口位于所有子系统之上&#xff0c;使用户可以更方便地使用整个系统。 结构图&#xff1a; 适用于&#xff1a; 当你想为…

基于Java SSM框架实现毕业生就业信息管理系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架实现毕业生就业信息管理系统演示 摘要 目前高校毕&#xff0c;毕业生就业工作意义尤为重大但形势又特别严峻。党中央和国务院高度重视高校毕业生就业工作&#xff0c;及时作出了一系列决策部署&#xff0c;多措并举拓展就业渠道&#xff0c;千方百计帮助高校…

iOS(swiftui)——系统悬浮窗( 可在其他应用上显示,可实时更新内容)

因为ios系统对权限的限制是比较严格的,ios系统本身是不支持全局悬浮窗(可在其他app上显示)。在iphone14及之后的iPhone机型中提供了一个叫 灵动岛的功能,可以在手机上方可以添加一个悬浮窗显示内容并实时更新,但这个功能有很多局限性 如:需要iPhone14及之后的机型且系统…

CTF 7

信息收集 存活主机探测 arp-scan -l 端口探测 nmap -sT --min-rate 10000 -p- 192.168.0.5 服务版本等信息 nmap -sT -sV -sC -O -p22,80,137,138,139,901,5900,8080,10000 192.168.0.5Starting Nmap 7.94 ( https://nmap.org ) at 2023-11-02 21:23 CST Stats: 0:01:30 elaps…

基于vue开发-创建登录页

我们使用vue创建完成项目后就开始我们的项目页面开发&#xff0c;如有不清楚怎么操作的可以看博主的前一篇文档 使用vue UI安装路由插件-CSDN博客 在src/views文件夹中创建一个登录页面 在此之前&#xff0c;我们可以先安装一个插件、element、vant、iview等等&#xff0c;可…

vue中shift+alt+f格式化防止格式掉其它内容

好处就是使得提交记录干净&#xff0c;否则修改一两行代码&#xff0c;习惯性按了一下格式化快捷键&#xff0c;遍地飘红&#xff0c;下次找修改就费时间 1.点击设置图标-设置 2.点击这个转成配置文件 {"extensions.ignoreRecommendations": true,"[vue]":…

Stable Diffusion AI绘画系列【17】:绘本童话风格场景

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

使用 Kubernetes Agent Server 实现 GitOps

目录 温习 GitOps 极狐GitLab Kubernetes Agent 极狐GitLab GitOps workflow 极狐GitLab KAS 的配置 创建极狐GitLab agent 创建 agent token Kubernetes 上安装 agent&#xff08;agentk&#xff09; 极狐GitLab GitOps workflow 实践 写在最后 温习 GitOps GitOps …

课题学习(十五)----阅读《测斜仪旋转姿态测量信号处理方法》论文

一、 论文内容 1.1 摘要 为准确测量旋转钻井时的钻具姿态&#xff0c;提出了一种新的信号处理方法。测斜仪旋转时&#xff0c;垂直于其旋转轴方向加速度计的输出信号中重力加速度信号分量具有周期性特征&#xff0c;以及非周期性离心加速度分量频率低于重力加速度信号分量频率…

渲染(iOS渲染过程解析)

渲染 渲染原理 一个硬核硬件科普视频 CPU和GPU CPU&#xff08;Central Processing Unit&#xff09;&#xff1a;现代计算机整个系统的运算核心、控制核心&#xff0c;适合串行计算。GPU&#xff08;Graphics Processing Unit&#xff09;&#xff1a;可进行绘图运算工作的…

系列四、过滤器简介

一、简介 1.1、概述 过滤器作为JavaWEB的三大组件&#xff08;Servlet程序、Filter过滤器、Listener监听器&#xff09;&#xff0c;它的主要功能是用来拦截请求的&#xff0c;当客户端要访问某个资源时&#xff0c;先来到配置好的过滤器&#xff0c;过滤器可以在用户访问某个…

Docker架构、镜像操作和容器操作

一、docker基本管理和概念 1、概念 docker&#xff1a;开源的应用容器引擎。基于go语言开发的。运行在Linux系统中的开源的轻量级的“虚拟机” docker的容器技术可用在一台主机上轻松到达为任何应用创建一个轻量级到的&#xff0c;可移植的&#xff0c;自给自足的容器 dock…