基于SpringBoot实现一个可扩展的事件总线

基于SpringBoot实现一个可扩展的事件总线

前言

在日常开发中,我们经常会用到事件总线,SpringBoot通过事件多播器的形式为我们提供了一个事件总线,但是在开发中我们经常会用到其他的实现,比如Guava、Disruptor的。我们将基于SpringBoot封装一套底层驱动可扩展的,统一api的事件驱动组件。

环境准备

jdk1.8
spring-boot-autoconfigure
Guava
Disruptor

pom文件如下

Copy<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.billetsdoux</groupId>
    <artifactId>eventBus</artifactId>
    <version>1.0.0</version>
    <name>eventBus</name>
    <description>eventBus</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>31.1-jre</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <scope>compile</scope>
            <version>5.8.9</version>
        </dependency>

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.4</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
    </dependencies>



</project>


组件介绍

整体架构#

目录结构如下:

[外链图片转存中…(img-m0HXGURP-1703487623977)]

我们的核心是一个EventListenerRegistry,它便是我们提供统一api的入口,它有两个方法,一个是init方法,在SpringBoot容器启动的时候会去注册我们所有的事件监听器,publish 方法则为事件发布的方法。这里我为它提供了3种实现,GuavaSpringDisruptor

[外链图片转存中…(img-AtJntQUE-1703487623978)]

EventModel#

这是我们定义的事件模型,topic为事件主题,我们通过不同的topic对应不同的事件处理器,entity为具体的事件对象模型

Copypackage com.billetsdoux.eventbus.model;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@NoArgsConstructor
public class EventModel<T> implements Serializable {

    /**
     *  事件发布主题
     */
    private String topic;

    /**
     *  事件对象模型
     */
    private T entity;
}


EventListener#

EventListener 为事件消费接口,定义了2个方法,topic() 为监听的事件topic,onMessage()为事件的回调接口。

Copypackage com.billetsdoux.eventbus;

/**
 *  消费接口
 */
public interface EventListener<T> {

    String topic();

    void onMessage(T message);
}


EventListenerRegistry#

这边是我们之前介绍的事件核心接口,它提供两个接口 initRegistryEventListener 负责注册我们所定义的所有事件监听器,publish 负责发送消息,我们底层的驱动需要继承这个接口。

Copypublic interface EventListenerRegistry<P> {
    void initRegistryEventListener(List<EventListener> eventConsumerList);

    void publish(P param);
}


SpringEventListenerRegistry#

这是我们通过Spring为我们提供的消息多播器来实现的一个事件驱动。这个类被@Component标记,那么它会在容器启动的时候,通过构造器为我们注入 eventListeners ,applicationContext 。eventListeners 为所有实现了EventListener接口,并被注入到容器里面的类。

initRegistryEventListener 这是一个空方法,因为他们本身已经在容器中了,所以不需要注册了

publish: 直接调用applicationContext.publishEvent就可以了。

Copypackage com.billetsdoux.eventbus.spring;
import com.billetsdoux.eventbus.EventListener;
import com.billetsdoux.eventbus.EventListenerRegistry;
import com.billetsdoux.eventbus.model.EventModel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

@RequiredArgsConstructor
@Slf4j
@Component
public class SpringEventListenerRegistry implements EventListenerRegistry<EventModel> {


    final ApplicationContext applicationContext;

    final List<EventListener> eventListeners;



    @Override
    public void initRegistryEventListener(List<EventListener> eventConsumerList) {

    }

    @Override
    public void publish(EventModel param) {
        applicationContext.publishEvent(param);
    }

    @PostConstruct
    public void init(){
        log.info("开始初始化Spring事件监听器的组件服务");
        initRegistryEventListener(eventListeners);
        log.info("完成初始化Spring事件监听器的组件服务");
    }
}


GuavaEventListenerRegistry#

基于Guava来实现的事件总线,我们首先还是需要容器帮我们注入eventListeners。相较于Spring我们需要自己定义一个Guava的EventBus,然后把我们的Listener注册到这个EventBus中。

publish方法则是调用EventBus的post方法到。

Copy
package com.billetsdoux.eventbus.guava;

import cn.hutool.core.thread.ThreadUtil;

import com.billetsdoux.eventbus.EventListener;
import com.billetsdoux.eventbus.EventListenerRegistry;
import com.billetsdoux.eventbus.model.EventModel;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.objenesis.instantiator.util.ClassUtils;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.ExecutorService;

@Component("guava")
@Slf4j
public class GuavaEventListenerRegistry implements EventListenerRegistry<EventModel> {

    EventBus eventBus;

    final List<EventListener> eventListeners;

    public GuavaEventListenerRegistry(List<EventListener> eventListeners) {
        this.eventListeners = eventListeners;
    }

    @Override
    public void initRegistryEventListener(List<EventListener> eventConsumerList) {
        final ExecutorService executor = ThreadUtil.newExecutor(10, 20, 300);
        eventBus = new AsyncEventBus(GuavaEventListenerRegistry.class.getName(),executor);
        eventConsumerList.forEach(param->{
            log.info("注册监听器:{}",param.getClass().getName());
            eventBus.register(ClassUtils.newInstance(param.getClass()));
        });
    }

    @Override
    public void publish(EventModel param) {
        eventBus.post(param);
    }

    @PostConstruct
    public void init(){

        log.info("开始初始化Guava事件监听器的组件服务");
        initRegistryEventListener(eventListeners);
        log.info("完成初始化Guava事件监听器的组件服务");
    }

}


DisruptorEventListenerRegistry#

Disruptor的实现相对来说麻烦一点,它首先需要一个实现了EventFactory接口的类,它提供一个newInstance接口来创建事件对象模型。

具体的使用方式可以参考我这篇博文:Disruptor入门

EventModelFactory

我们首先还是需要注入我们的Listener,只是这里在init的时候是将我们的Listener交给我们的Disruptor去处理,我们先将Listener转成EventHandler,所以我们的监听器接口具体实现的时候除了实现我们定义的EventListener接口外还需要继承Disruptor的EventHandler接口。 调用disruptor.handleEventsWith(dataListener); 把我们的Listener交给Disruptor去管理。最后再启动Disruptor。

publish:调用Disruptor的RingBuffer来进行消息的发送。

Copy
/**
 *  事件工厂
 *  Disruptor 通过EventFactory在RingBuffer中预创建Event的实例
 * @param <T>
 */
public class EventModelFactory<T> implements EventFactory<EventModel<T>> {
    @Override
    public EventModel<T> newInstance() {
        return new EventModel<>();
    }
}


Copy@Slf4j
@RequiredArgsConstructor
@Component("disruptor")
@Scope("prototype") // 线程安全问题
public class DisruptorEventListenerRegistry implements EventListenerRegistry<EventModel>,AutoCloseable {

    /**
     *  disruptor事件处理器
     */
    @Getter
    @Setter
    private Disruptor<EventModel> disruptor;

    @NonNull
    final List<EventListener> eventListeners;



    /**
     *  RingBuffer的大小
     */
    private final int DEFAULT_RING_SIZE = 1024 * 1024;

    /**
     *  事件工厂
     */
    private EventFactory<EventModel> eventFactory = new EventModelFactory();

    @Override
    public void initRegistryEventListener(List<EventListener> eventConsumerList) {

        disruptor = new Disruptor<>(eventFactory, DEFAULT_RING_SIZE, createThreadFactory(), ProducerType.SINGLE, new BlockingWaitStrategy());
        EventHandler[] dataListener = eventConsumerList.stream().map(param -> {
            EventListener<EventModel> eventModelEventListener = param;
            return eventModelEventListener;
        }).collect(Collectors.toList()).toArray(new EventHandler[eventConsumerList.size()]);
        log.info("注册服务信息接口:{}",dataListener);

        disruptor.handleEventsWith(dataListener);
        disruptor.start();
    }

    @Override
    public void publish(EventModel param) {
        publishEvent(param);
    }

    public void publishEvent(EventModel... eventModels){
        Objects.requireNonNull(disruptor, "当前disruptor核心控制器不可以为null");
        Objects.requireNonNull(eventModels, "当前eventModels事件控制器不可以为null");

        // 发布事件
        final RingBuffer<EventModel> ringBuffer = disruptor.getRingBuffer();
        try {
            final List<EventModel> dataList = Arrays.stream(eventModels).collect(Collectors.toList());

            for (EventModel element : dataList) {

                // 请求下一个序号
                long sequence = ringBuffer.next();

                // 获取该序号对应的事件对象
                EventModel event =  ringBuffer.get(sequence);
                event.setTopic(element.getTopic());
                event.setEntity(element.getEntity());
                ringBuffer.publish(sequence);

            }
        }catch (Exception e) {
            log.error("error",e);
        }

    }


    /**
     *  关闭处理机制
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        if (Objects.nonNull(disruptor)) disruptor.shutdown();

    }

    @PostConstruct
    public void init(){
        log.info("开始初始化Disruptor事件监听器的组件服务");
        initRegistryEventListener(eventListeners);
        log.info("完成初始化Disruptor事件监听器的组件服务");
    }

    private static ThreadFactory createThreadFactory(){
        AtomicInteger integer = new AtomicInteger();

        return r-> new Thread(r,"disruptor-"+integer.incrementAndGet());
    }
    
    
}


至此我们已经实现了我们的目标三个EventListenerRegistry,我们接下来看看我们Listener如何实现。

BaseEventListener#

我们刚说过我们的Listener需要同时实现EventHandler跟EventListener,所以我们定义一个抽象类,注意这个EventListener是我们定义的,EventHandler是Disruptor定义的。

Copypublic abstract class BaseEventListener<T> implements EventListener<T>, EventHandler<T> {

}


ExecutableEventListener#

我们定义一个抽象类ExecutableEventListener 我们来实现一下里面的方法。

对于Spring跟Guava来说只需要在方法上添加注解便可以在事件发生的时候回调过来,而对于Disruptor来说它的回调是继承EventHandler里面的onEvent方法。所以我们在onEvent里面手动调用onMessage方法,让所有的消息都转发给onMessage处理。

@org.springframework.context.event.EventListener Spring的回调注解

@Subscribe 的回调注解

onMessage:我们先调用topic()方法获取Listener方法的topic,这个方法我们这里先不实现,交给具体的实现类去实现这个方法。我们再定义一个handle的抽象方法,则是我们具体的消息处理逻辑的方法,也交给具体的实现类去实现。

Copy@Slf4j
public abstract class ExecutableEventListener extends BaseEventListener<EventModel<?>> {

    @org.springframework.context.event.EventListener
    @Subscribe
    @Override
    public void onMessage(EventModel<?> message) {
        log.info("收到消息:{}",message);

        if (topic().equals(message.getTopic())){
            handle(message);
        }
    }

    @Override
    public void onEvent(EventModel<?> event, long sequence, boolean endOfBatch) throws Exception {
        onMessage(event);
    }

    /**
     *  具体消息处理方法
     * @param message
     */
    protected abstract void handle(EventModel<?> message);

}


至此我们的核心代码就开发完成了,现在定义两个注解,让我们能够在项目中启用它。

EnableEventBus:在启动类上添加这个注解以启用EventBus

[外链图片转存中…(img-OjGimDb2-1703487623978)]

EventBusConfiguration:配置一下Spring的包扫描路径
img

测试

我们把我们刚写的项目install到本地maven仓库,以便我们在项目中能够引用它。我们新建一个SpringBootWeb项目添加这个依赖测试下

在pom中添加

Copy  <dependency>
            <groupId>com.billetsdoux</groupId>
            <artifactId>eventBus</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

在启动类上添加这个注解启用

img

添加一个事件监听器,继承ExecutableEventListener,监听”blog“消息的主题。

img

添加一个Controller来测试一下我们的事件总线。它根据我们的type来选择不同的底层驱动:spring,guava,disruptor

[外链图片转存中…(img-ZndsBWCc-1703487623979)]

我们打包成docker镜像然后启动。
Dockerfile如下:

CopyFROM openjdk:8-jre-slim
MAINTAINER billtsdoux

WORKDIR /app

ADD target/eventbus_blog*.jar app.jar

EXPOSE 8080

ENV JVM_OPTS="-Xmx256m -Xms256m" \
    TZ=Asia/Shanghai

RUN ln -sf /usr/share/zoneinfo/$TZ /etc/localtime \
    && echo $TZ > /etc/timezone

ENTRYPOINT ["sh","-c","java -jar $JVM_OPTS app.jar"]



我们这里配置一下打包后镜像的名称,已经启动的容器名称跟监听的端口。

[外链图片转存中…(img-xKzuGe5r-1703487623980)]

构建成功

img

并且也启动一个容器:

[外链图片转存中…(img-QETw4G24-1703487623980)]

查看日志可以看到我们内置的三个监听器注册器已经成功启动了。

[外链图片转存中…(img-ScQl7W0a-1703487623980)]

我们测试一下接口:可以看到根据我们选择的不同类型我们可以选择不同的实现。

[外链图片转存中…(img-aFD49ezm-1703487623981)]

[外链图片转存中…(img-CnPONtYE-1703487623981)]

[外链图片转存中…(img-mDQW65LN-1703487623981)]

后言

如果提供这个三个不够用,我们还可以通过实现这个接口EventListenerRegistry来扩展我们的事件总线组件,再注入到容器中,在调用的时候选择具体的实现就好了。

标签: java , Spring

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/269489.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

社交媒体的力量:独立站如何利用海外社媒进行引流

随着全球数字化的浪潮&#xff0c;社交媒体已经成为连接世界的纽带&#xff0c;为企业和个人提供了无限的可能性。对于独立站而言&#xff0c;通过善用海外社交媒体平台&#xff0c;不仅能够拓展用户群体&#xff0c;还能够实现更广泛的品牌曝光和业务引流。本文Nox聚星将和大家…

labelme目标检测数据类型转换

1. labelme数据类型 LabelMe是一个开源的在线图像标注工具&#xff0c;旨在帮助用户创建和标记图像数据集。它提供了一个用户友好的界面&#xff0c;让用户可以直观地在图像上绘制标记框、多边形、线条等&#xff0c;以标识和注释图像中的对象或区域。 GitHub&#xff1a;http…

孔夫子二手书采集

文章目录 项目演示软件采集单本数据网页搜索数据对比 使用场景概述部分核心逻辑Vb工程图数据导入与读取下拉框选择参数设置线程 使用方法下载软件授权导入文件预览处理后的数据 项目结构附件说明 项目演示 操作视频详见演示视频&#xff0c;以下为图文演示 软件采集单本数据 …

unity中使用protobuf工具将proto文件转为C#实体脚本

unity中使用protobuf工具将proto文件转为C#实体脚本 介绍优点缺点Protobuf 为什么比 XML 快得多&#xff1f;Protobuf的EncodingProtobuf封解包的过程通常编写一个Google Protocol Buffer应用需要以下几步&#xff1a; Protostuff是什么Protobuf工具总结 介绍 protobuf也就是G…

设计模式--适配器模式

实验8&#xff1a;适配器模式 本次实验属于模仿型实验&#xff0c;通过本次实验学生将掌握以下内容&#xff1a; 1、理解适配器模式的动机&#xff0c;掌握该模式的结构&#xff1b; 2、能够利用适配器模式解决实际问题。 [实验任务]&#xff1a;双向适配器 实现一个双向…

Java学习时间和日期

1 常用类 1.1 Date 表示日期 具体类 设置时间 1.2 Calendar 表示日历 抽象类 设置日历的设定日期 void set(int year,int month,int date); void set(int year,int month, int date, int hour, int minute,int second); void setTime(Date d); int get(int field)&#…

直播的内容多样性

直播&#xff0c;作为一种新兴的媒体形式&#xff0c;已经深入到我们生活的方方面面。其内容多样性是吸引观众的关键因素之一。以下是直播内容多样性的几个主要方面: 1.主题多样性:直播涵盖的主题非常广泛&#xff0c;包括但不限于娱乐、游戏、体育、教育、招聘、新闻、金融、…

VS2020使用MFC开发一个贪吃蛇游戏

背景&#xff1a; 贪吃蛇游戏 按照如下步骤实现:。初始化地图 。通过键盘控制蛇运动方向&#xff0c;注意重新设置运动方向操作。 。制造食物。 。让蛇移动&#xff0c;如果吃掉食物就重新生成一个食物&#xff0c;如果会死亡就break。用蛇的坐标将地图中的空格替换为 #和”将…

Druid源码阅读-DruidStatInterceptor实现

上次我们在druid-spring-boot-starter里面看到有一个DruidSpringAopConfiguration的配置类&#xff0c;然后引入了DruidStatInterceptor这样一个切面逻辑。今天我们就来看一下这个类的实现。 DruidStatInterceptor 这个类的包路径下入com.alibaba.druid.support.spring.stat。…

DC电源模块有哪些注意事项和使用技巧?

BOSHIDA DC电源模块有哪些注意事项和使用技巧&#xff1f; DC电源模块的注意事项和使用技巧包括以下几点&#xff1a; 1. 选择适当的电源模块&#xff1a;根据需要选择合适的电源模块&#xff0c;考虑电压、电流和功率等参数。确保模块能够满足所需的电力要求。 2. 输入电压范…

销售如何挖掘客户?有哪些方法?

在当今竞争激烈的市场环境中&#xff0c;客户资源的挖掘已经成为企业生存和发展的关键。销售人员需要掌握一定的技巧和方法&#xff0c;以有效地发掘潜在客户&#xff0c;提高销售业绩。以下分享一些挖掘客户的常用方法&#xff0c;帮助销售人员更好地开展业务。 一、了解客户需…

IP编址,IP地址介绍与子网划分方法

网络层位于数据链路层与传输层之间。网络层中包含了许多协议&#xff0c;其中最为重要的协议就是IP协议。网络层提供了IP路由功能。理解IP路由除了要熟悉IP协议的工作机制之外&#xff0c;还必须理解IP编址以及如何合理地使用IP地址来设计网络。 上层协议类型 以太网帧中的Typ…

Tailwind CSS 原子化开发初体验

Tailwind CSS 的工作原理是扫描所有 HTML 文件、JavaScript 组件以及任何模板中的 CSS 类&#xff08;class&#xff09;名&#xff0c;然后生成相应的样式代码并写入到一个静态 CSS 文件中。他快速、灵活、可靠&#xff0c;没有运行时负担。再也不用为了取一个 classname 类名…

GAMES101-LAB1

文章目录 一、问题总览二、作业参考2.1 get_projection_matrix()函数2.2 static bool insideTriangle()函数2.3 rasterize_triangle() 三、附件 一、问题总览 在屏幕上画出一个实心三角形&#xff0c;换言之&#xff0c;栅格化一个三角形实现并调用函数rasterize_triangle(con…

【MySQL基础】:超详细MySQL完整安装和配置教程

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; MySQL从入门到进阶 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一. MySQL数据库1.1 版本1.2 下载1.3 安装1.4 客户端连接 &#x1f324;️全篇总…

python使用selenium控制浏览器进行爬虫

这里以谷歌浏览器为例&#xff0c;需要安装一下chromedriver&#xff0c;其他浏览器也有相对应的driver&#xff0c;chromedriver下载地址&#xff1a;https://googlechromelabs.github.io/chrome-for-testing/ 然后是打开python环境安装一下依赖pip install selenium&#xf…

json-server实现数据Mock

json-server是一个node包&#xff0c;可以在不到30秒内获得零编码的完整的Mock服务实现步骤&#xff1a;1. 项目内安装json-server ---> npm i -D json-server2. 准备一个json文件 在根目录下&#xff08;src同级&#xff09;创建server文件夹&#xff0c;创建data.json文…

【数据结构】字符串匹配|BF算法|KMP算法|next数组的优化

字符串匹配算法是在实际工程中经常遇到的问题&#xff0c;也是各大公司笔试面试的常考题目&#xff0c;本文主要介绍BF算法&#xff08;最好想到的算法&#xff0c;也最好实现&#xff09;和KMP算法&#xff08;最经典的&#xff09; 一、BF算法 BF算法&#xff0c;即暴力(Bru…

[java] 转义字符控制台最常用快捷键

转义字符 1&#xff09;\t&#xff1a;一个制表位&#xff0c;实现对齐的功能 2&#xff09;\n&#xff1a;换行符 3) \\&#xff1a;一个\ 4&#xff09;\"&#xff1a;一个" 5) \&#xff1a;一个 6) \r&#xff1a;一个回车System.out.println(我亦无他AA\r唯…

Wordpress对接Lsky Pro 兰空图床插件

Wordpress对接Lsky Pro 兰空图床插件 wordpress不想存储图片到本地&#xff0c;访问慢&#xff0c;wordpress图片没有cdn想要使用图床&#xff0c;支持兰空自定义接口 安装教程—在wp后台选择插件zip—然后启用—设置自己图床API接口就ok了&#xff0c;文件全部解密&#xff0c…