SpringBoot ApplicationListener实现发布订阅模式

文章目录

  • 前言
  • 一、Spring对JDK的扩展
  • 二、快速实现发布订阅模式


前言

发布订阅模式(Publish-Subscribe Pattern)通常又称观察者模式,它被广泛应用于事件驱动架构中。即一个事件的发布,该行为会通过同步或者异步的方式告知给订阅该事件的订阅者。JDK中提供了EventListener作为所有订阅者的接口规范(即所有的订阅者都应该实现该接口),而EventObject则作为所有事件发布者的实现规范(即所有事件发布者都应该继承该类)。对于观察者的原理不是本章讨论的重点,本章只是演示如何在SpringBoot中实现发布订阅模式。


一、Spring对JDK的扩展

Spring中,提供了接口ApplicationListener作为Spring观察者(也叫监听者)的实现规范,ApplicationListener其实是对JDKEventListener中的扩展,增加了onApplicationEvent方法作为触发监听的方法。而事件发布对象ApplicationEvent也是继承了JDK中的EventObject类,仅仅增加了参数timestamp用于记录事件创建的时间。也就是说如果要使用Spring提供的发布订阅模式,您的监听器应该实现ApplicationListener接口,通过onApplicationEvent方法获取监听的内容。事件则必须继承ApplicationEvent

ApplicationListener源码:

@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {

	/**
	 * Handle an application event.
	 * @param event the event to respond to
	 */
	void onApplicationEvent(E event);

}

ApplicationEvent源码:

public abstract class ApplicationEvent extends EventObject {

	/** use serialVersionUID from Spring 1.2 for interoperability. */
	private static final long serialVersionUID = 7099057708183571937L;

	/** System time when the event happened. */
	private final long timestamp;


	/**
	 * Create a new {@code ApplicationEvent}.
	 * @param source the object on which the event initially occurred or with
	 * which the event is associated (never {@code null})
	 */
	public ApplicationEvent(Object source) {
		super(source);
		this.timestamp = System.currentTimeMillis();
	}


	/**
	 * Return the system time in milliseconds when the event occurred.
	 */
	public final long getTimestamp() {
		return this.timestamp;
	}

}

二、快速实现发布订阅模式

配置线程池是必要的,因为发布订阅模式的一个好处就是可以实现解耦,而解耦最好的方式就是采用异步线程处理。如果我们不配置线程池,则在spring中默认会采用同步的方式进行消息发布和订阅消费。这样一来就没有任何意义了。首先在yaml或者properties中配置线程池信息:

thread:
  executor:
    corePoolSize: 8 #核心线程
    keepAliveSeconds: 30000 # 活跃时间
    maxPoolSize: 16 #最大线程数
    queueCapacity: 100000 #最大队列长度

然后通过配置文件读取配置信息,创建线程池并注入IOC容器

package com.hl.by.common.thread;

import com.hl.by.common.utils.JsonUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ErrorHandler;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @Author: DI.YIN
 * @Date: 2024/3/6 9:39
 * @Version: 1.0.0
 * @Description: 线程池配置
 **/
@Configuration
public class ThreadPoolTaskExecutorConfig {

    //核心线程
    @Value("${thread.executor.corePoolSize}")
    private Integer corePoolSize;
    //存活时间
    @Value("${thread.executor.keepAliveSeconds}")
    private Integer keepAliveSeconds;
    //最大线程数
    @Value("${thread.executor.maxPoolSize}")
    private Integer maxPoolSize;
    //最大队列长度
    @Value("${thread.executor.queueCapacity}")
    private Integer queueCapacity;

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
        threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
        threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
        threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
        //设置拒绝策略,直接运行,不采用异步
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolTaskExecutor.setThreadNamePrefix("Thread-Pool-Task-");
        return threadPoolTaskExecutor;
    }

    @DependsOn(value = "taskExecutor")
    @Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)
    public SimpleApplicationEventMulticaster eventMulticaster() {
        SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
        simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());
        //设置错误处理器
        simpleApplicationEventMulticaster.setErrorHandler(new ErrorHandler() {
            @Override
            public void handleError(Throwable throwable) {
                System.out.println("抛出异常:" + JsonUtils.writeObjectAsBeautifulJson(throwable));
            }
        });
        return simpleApplicationEventMulticaster;
    }
}

可以看到除了注入线程池之外,还注入了自定义的SimpleApplicationEventMulticaster 对象并将创建的线程池设置到SimpleApplicationEventMulticaster中。因为SimpleApplicationEventMulticaster是处理发布订阅的核心类,通过multicastEvent方法进行事件发布。可以看到multicastEvent中,循环遍历订阅该事件的所有监听器,并判断是否配置了线程池Executor,如果配置了则将发布操作扔入线程池中异步处理,否则将同步处理发布事件操作。很多情况发现我们的事件发布与监听处理是在一个线程中执行,就是因为我们未设置线程池,导致发布订阅无法异步实现。

	@Override
	public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
		ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
		//获取线程池
		Executor executor = getTaskExecutor();
		for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
			if (executor != null) {
			    //如果配置了线程池,则放入线程池中异步处理
				executor.execute(() -> invokeListener(listener, event));
			}
			else {
			   //未配置线程池,则同步处理
				invokeListener(listener, event);
			}
		}
	}

完成以上配置后,就可以定义发布者、订阅者和发布事件了。现在我们定义一个类MessageSource作为发布者发布的事件,结构如下:

import lombok.Data;

/**
 * @Author: DI.YIN
 * @Date: 2024/3/6 13:41
 * @Version:
 * @Description: 消息实体
 **/
@Data
public class MessageSource {

    private String id;
    private String msg;
    private String title;
}

定义好发布事件后,我们定义一个事件发布者MessageEvent,并指定其发布的事件类型是MessageSourceMessageSource 的子类,结构如下:

import org.springframework.context.ApplicationEvent;

/**
 * @Author: DI.YIN
 * @Date: 2024/3/6 13:39
 * @Version: 1.0.0
 * @Description: 消息事件
 **/
public class MessageEvent<T extends MessageSource> extends ApplicationEvent {

    /**
     * Create a new {@code ApplicationEvent}.
     *
     * @param source the object on which the event initially occurred or with
     *               which the event is associated (never {@code null})
     */
    public MessageEvent(MessageSource source) {
        super(source);
    }
}

现在已经定义好了发布事件MessageSource,事件发布者MessageEvent,此时我们可以定义一个事件订阅者MessageListener,用于监听事件发布者MessageEvent发布的事件。代码如下:

import com.alibaba.fastjson.JSONObject;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * @Author: DI.YIN
 * @Date: 2024/3/6 10:19
 * @Version:
 * @Description:
 **/
@Component
public class MessageListener implements ApplicationListener<MessageEvent> {

    @Override
    public void onApplicationEvent(MessageEvent event) {
        MessageSource source = (MessageSource)event.getSource();
        System.out.println("消息监听器监听到消息:===>"+ JSONObject.toJSONString(source));
       
    }
}

现在我们就实现了一个订阅发布模式,事件对象MessageSource,事件发布者MessageEvent专门用于发布MessageSource类型的事件,事件监听者MessageListener 则专门监听MessageEvent发布的事件。可以创建一个接口用于测试发布订阅是否成功。


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;


/**
 * @Author: Greyfus
 * @Create: 2024-03-02 14:08
 * @Version:
 * @Description:
 */
@RestController
@RequestMapping("/mock")
public class TestController {

    @Autowired
    private ApplicationContext applicationContext;

    @RequestMapping(method = RequestMethod.POST, value = "/publishMessage", consumes = MediaType.APPLICATION_JSON_VALUE)
    public void publishMessage() throws Exception {
        //构建信息实体
        MessageSource messageSource = new MessageSource();
        messageSource.setId(String.valueOf(1));
        messageSource.setTitle("日志消息");
        messageSource.setMsg("调用了接口publishMessage");
        //构建消息事件
        MessageEvent<MessageSource> messageEvent = new MessageEvent(messageSource);
        //发布事件
        applicationContext.publishEvent(messageEvent);
    }
}

通过用postman调用接口/mock/feign可以看到MessageListener 成功接受到了MessageEvent发布的MessageSource事件。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/467174.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据结构中单向链表(无头)的学习

一.数据结构 1.定义 一组用来保存一种或者多种特定关系的数据的集合&#xff08;组织和存储数据&#xff09; 程序的设计&#xff1a;将现实中大量而复杂的问题以特定的数据类型和特定的存储结构存储在内存中&#xff0c; 并在此基础上实现某个特定的功能的操…

Games101-光栅化(三角形 )

视锥 fov:可视角 aspect ratio&#xff1a;宽高比 MVP 屏幕&#xff1a;抽象的认为是一个二维数组&#xff0c;数组中的每一个元素是一个像素 屏幕是一个典型的光栅成像设备 光栅化&#xff1a;把东西画在屏幕上的过程 像素&#xff1a;理解为不变的最小单位&#xff0c;RGB的…

SSA优化最近邻分类预测(matlab代码)

SSA-最近邻分类预测matlab代码 麻雀搜索算法(Sparrow Search Algorithm, SSA)是一种新型的群智能优化算法&#xff0c;在2020年提出&#xff0c;主要是受麻雀的觅食行为和反捕食行为的启发。 数据为Excel分类数据集数据。 数据集划分为训练集、验证集、测试集,比例为8&#…

Android14之HIDL报错:Invalid sparse file format at header magic(一百九十六)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

利用自定义 URI Scheme 在 Android 应用中实现安全加密解密功能

在现代移动应用开发中&#xff0c;安全性和用户体验是至关重要的考虑因素。在 Android 平台上&#xff0c;开发人员可以利用自定义 URI Scheme 和 JavaScript 加密解密技术来实现更安全的数据传输和处理。本文将介绍如何在 Android 应用中注册自定义 URI Scheme&#xff0c;并结…

苍穹外卖-day06:HttpClient、微信小程序开发、微信登录(业务流程)、导入商品浏览功能代码(业务逻辑)

苍穹外卖-day06 课程内容 HttpClient微信小程序开发微信登录导入商品浏览功能代码 功能实现&#xff1a;微信登录、商品浏览 微信登录效果图&#xff1a; 商品浏览效果图&#xff1a; 1. HttpClient 1.1 介绍 HttpClient 是Apache Jakarta Common 下的子项目&#xff0c;…

Redis中的String编码转换底层原理及6.0新特性

String编码转换底层原理 String对象为什么把大于39字节或者44字节的字符串编码为raw&#xff0c;小于的时候编码为embstr? 在Redis3.2以前的版本中,SDS作为字符串类型中存储字符串内容的结构&#xff0c;源码如下&#xff1a; 3.2版本SDS结构 struct sdshdr {// 记录buf数…

linux 安装常用软件

文件传输工具 sudo yum install –y lrzsz vim编辑器 sudo yum install -y vimDNS 查询 sudo yum install bind-utils用法可以参考文章 《掌握 DNS 查询技巧&#xff0c;dig 命令基本用法》 net-tools包 yum install net-tools -y简单用法&#xff1a; # 查看端口占用情况…

[python3] 设置多进程名称并且在ps命令中可见

Centos7 系统 setproctitle 是一个 Python 模块&#xff0c;用于设置进程标题&#xff08;process title&#xff09;。进程标题是在系统中用来标识进程的名字&#xff0c;通常会显示在系统级的进程管理工具&#xff08;如 ps 命令&#xff09;中。通过设置进程标题&#xff0c…

代码随想录|Day23|回溯03|39.组合总和、40.组合总和II、131.分割回文串

39.组合总和 本题和 216.组合总和III 类似&#xff0c;但有几个区别&#xff1a; 没有元素个数限制&#xff1a;树的深度并不固定&#xff0c;因此递归终止条件有所变化每个元素可以使用多次&#xff1a;下层递归的起始位置和上层相同&#xff08;startIndex不需要改动&#xf…

接口测试常见接口类型?

常见接口类型 1.根据协议区分 1、webService接口:是走soap协议通过http传输请求报文和返回报文都是xml格式的&#xff0c;我们在测试的时候都用通过工具才能进行调用&#xff0c;测试。可以使用的工具有Soapul、jmeter、loadrunner等; 2、http接口:是走http协议&#xff0c;…

Python爬虫在Django项目中的数据处理与展示实例

当谈到Python爬虫技术与Django项目结合时&#xff0c;我们面临着一个引人入胜又具有挑战性的任务——如何利用爬虫技术从网络上抓取数据&#xff0c;并将这些数据进行有效地处理和展示。在本文中&#xff0c;我将为您介绍Python爬虫技术在Django项目中的数据抓取与处理流程。 在…

Java-JVM 虚拟机原理调优实战

一、基础 栈帧&#xff08;Stack Frame&#xff09;栈空间的 基本元素&#xff0c;用于 方法的调用和方法的执行的数据结构 堆内存用来存放由new创建的对象和数组。在堆中分配的内存&#xff0c;由Java虚拟机的自动垃圾回收器来管理。在堆中产生了一个数组或对象后&#xff0c…

Linux 管道

目录 一、认识管道 二、匿名管道 pipe函数 用法&#xff1a; pipefd&#xff1a; 匿名管道通信&#xff1a; 三、命名管道 概念&#xff1a; 创建&#xff1a; 特性&#xff1a; 用途&#xff1a; 四、命名管道和匿名管道的区别 命名&#xff1a; 持久性&#xff1a;…

汽车电子拓扑架构的演进过程

汽车电子拓扑架构的演进过程 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师 (Wechat:gongkenan2013)。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 本就是小人物,输了就是输了,不要在意别人怎么看自己。江湖一碗茶,喝完再挣扎,出门靠…

系统渐渐沦为“屎山”,这就是真相!

分享是最有效的学习方式。 博客&#xff1a;https://blog.ktdaddy.com/ 背景 小猫维护现有的系统也有一段时间了&#xff0c;踩坑也不少&#xff0c;事故不少。感兴趣的小伙伴可以了解一下&#xff0c;往期的小猫踩坑记合集。 这天&#xff0c;小猫找到了商城系统的第一任开发…

Springboot-软件授权License

无意中看到了一个简单方便的授权方式&#xff0c;只需几步就可集成到boot项目中。 先上地址&#xff1a;smart-license: 保护个人与企业的软件作品权益&#xff0c;降低盗版造成的损失。PS&#xff1a;因个人精力有限&#xff0c;不再提供该项目的咨询答疑服务。 Smart-licen…

Smart Light Random Memory Sprays Retinex 传统图像增强 SLRMSR

文章目录 前言1、Smart Light Random Memory Sprays Retinex概况2、Smart Light Random Memory Sprays Retinex的实现2.1、SLRMSR算法的伪代码2.2、初始化记忆喷雾&#xff08;CreateInitialMemorySpray&#xff09;2.3、更新记忆喷雾 (UpdateMemorySpray)2.4、计算颜色校正因子…

二十几岁的我们:在旷野中找寻自我

二十几岁&#xff0c;这是一个充满变数、充满机遇和挑战的年纪。它如同一片辽阔的旷野&#xff0c;每个人都在其中寻找自己的方向&#xff0c;摸索着自己的道路。这是一个既令人兴奋又令人迷茫的年纪&#xff0c;我们穿着不同的鞋子&#xff0c;注定要走不同的路。 在这个年纪里…

onnx 格式模型可视化工具

onnx 格式模型可视化工具 0. 引言1. 可视化工具2. 安装 Netron: Viewer for ONNX models 0. 引言 ONNX 是一种开放格式&#xff0c;用于表示机器学习模型。ONNX 定义了一组通用运算符&#xff08;机器学习和深度学习模型的构建基块&#xff09;和通用文件格式&#xff0c;使 A…