Guava-EventBus 源码解析


EventBus 采用发布订阅者模式的实现方式,它实现了泛化的注册方法以及泛化的方法调用,另外还考虑到了多线程的问题,对多线程使用时做了一些优化,观察者模式都比较熟悉,这里会简单介绍一下,重点介绍的是如何泛化的进行方法的注册以及调用,还有在单个线程和多线程不同的实现方式。

#发布订阅者模式
观察者模式又名发布订阅者模式,类结构图如下:

观察者类结构图

package com.dfan.设计模式.观察者模式;
import java.util.Arrays;
import java.util.List;

public class ObeserverMode {

    static class TestResult {
        private List<IObserver> iObservers;


        public TestResult() {
        }


        public void register(IObserver iObserver){
            this.iObservers.add(iObserver);
        }
        public void declaration() {
            System.out.println("this is result");
        }
        public void notifyRunners(List<IObserver> iObservers) {
            for(IObserver iObserver : iObservers) {
                iObserver.run(this);
            }
        }
        public  void notifyRunner(IObserver iObserver){
            iObserver.run(this);
        }
    }

    static class UITestResult extends TestResult{


        public UITestResult() {
            super();
        }

        public void declaration() {
            System.out.println("i am ui test result");
        }

        public  void notifyRunner(IObserver iObserver){
            iObserver.run(this);
        }
    }

    interface IObserver{
        void run(TestResult testResult) ;
    }

    static class TestObserver implements IObserver {
        private TestResult testResult;

        public TestResult createTestResult(){
            return new TestResult();
        }

        public void run(TestResult testResult) {
            this.testResult = testResult;
            System.out.println("this is test obeserver");
            testResult.declaration();
        }

    }

    static class TestObserver1 implements IObserver {
        private TestResult testResult;

        public TestResult createTestResult(){
            return new TestResult();
        }

        public void run(TestResult testResult) {
            testResult = createTestResult();

            System.out.println("this is test obeserver 1");
            testResult.declaration();
        }

    }

    public static void main(String[] args) {
        IObserver testRunner = new TestObserver();
        UITestResult uiTestResult = new UITestResult();
        uiTestResult.notifyRunner(testRunner);

        System.out.println("华丽分割线");
        List<IObserver> observers = Arrays.asList(new TestObserver1(), new TestObserver());
        uiTestResult.notifyRunners(observers);
    }
}

#Guava EventBus
Guva中EventBus的机制就是观察者模式,因此符合观察者模式的一般结构:

监听者:监听来自被监听者的变更事件,完成动作变更
被监听者:发送变更事件给监听者,使监听者监听到变更事件后,完成动作变更

EventBus的用法简单总结为一句话就是:

订阅者向EventBus进行事件注册(register),表示对这个事件关心;
EventBus会向所有订阅发布者事件的订阅者进行事件的发送(post)

EventBus 区分 同步模式和异步模式,下面将根据这两个点进行展开
##同步模式
###向EventBus进行注册

/**
   * Returns all subscribers for the given listener grouped by the type of event they subscribe to.
   */
  private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
  }
  
/**
   * Registers all subscriber methods on the given listener object.
   */
  void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
        eventSubscribers = MoreObjects.firstNonNull(
            subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }
  • 其中 findAllSubscribers方法 目的是获取所有添加注解@Subscriber的方法,并将根据当前EventBusListener、以及加有@Subscriber注解的方法生成的Subscribe作为 Multimap<Class<?>, Subscriber>的value返回(其中key为方法[注释]的入参)

  • registerSubscriber注册到集合private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();中,其中该Map的key为EventType(即方法[注释]入参)。

EventBus发送事件给所有订阅者

/**
   * Gets an iterator representing an immutable snapshot of all subscribers to the given event at
   * the time this method is called.
   */
  Iterator<Subscriber> getSubscribers(Object event) {
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());

    List<Iterator<Subscriber>> subscriberIterators =
        Lists.newArrayListWithCapacity(eventTypes.size());

    for (Class<?> eventType : eventTypes) {
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
      }
    }

    return Iterators.concat(subscriberIterators.iterator());
  }
  
/**
   * Posts an event to all registered subscribers.  This method will return
   * successfully after the event has been posted to all subscribers, and
   * regardless of any exceptions thrown by subscribers.
   *
   * <p>If no subscribers have been subscribed for {@code event}'s class, and
   * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a
   * DeadEvent and reposted.
   *
   * @param event  event to post.
   */
  public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }
  • getSubscribers 根据刚才提到的参数类型会查找对应的Subscribe,而且不止查指定的类型,还会对这个类型的继承体系上的其他参数类型也会查,比如对于String类型,他会找Serializable,CharSequence,Comparable,Object四种类型,
    举个例子说明下这种情况,在这个例子中,会有两个task被执行,分别是task1和task3
public class EventBusSyncEx {
    static class SimpleListener1 {
        /**
         *订阅方式,通过@Subscribe进行事件订阅,方法名随意
         **/
        @Subscribe
        public void task1(String s) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("listener1 do task , String param:" + s);
        }
        @Subscribe
        public void task3(Object s) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("listener1 do task , Object param:" + s);
        }
        @Subscribe
        public void task1(Integer s) {
            System.out.println("listener1 do task , int  param:" + s);
        }
    }
    public static class SimpleEventBusExample {

        public static void main(String[] args) {
            EventBus eventBus = new EventBus();
            eventBus.register(new SimpleListener1());
            System.out.println("Post Simple EventBus Example");
            eventBus.post("Simple EventBus Example");
        }
    }
}
  • subscribers 就是刚才注册subscriber的集合(Map<ParamType, Subscribers>),通过getSubscribers获取到了Subscribe之后,下面就是要根据这个Event的type来执行对应的Event了,首先这里引入一个属性dispatcher (事件分发器 : 用于分发事件给订阅对象的事件处理器,该对象在EventBus构造方法内部初始化,默认的实现是,该分发器将事件存入队列).

    PerThreadQueuedDispatcher: 默认实现,该分发器将事件存入队列,并保证在同一个线程上发送的事件能够按照他们发布的顺序被分发给所有的订阅者。

    private static final class PerThreadQueuedDispatcher extends Dispatcher 
    // This dispatcher matches the original dispatch behavior of EventBus.
    /**
     * Per-thread queue of events to dispatch.
     */
    private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {
          @Override
          protected Queue<Event> initialValue() {
            return Queues.newArrayDeque();
          }
        };
    
    /**
     * Per-thread dispatch state, used to avoid reentrant event dispatching.
     */
    private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {
          @Override
          protected Boolean initialValue() {
            return false;
          }
        };
    
    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
      Queue<Event> queueForThread = queue.get();
      queueForThread.offer(new Event(event, subscribers));
    
      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          while ((nextEvent = queueForThread.poll()) != null) {
            while (nextEvent.subscribers.hasNext()) {
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
    }
    
    private static final class Event {
      private final Object event;
      private final Iterator<Subscriber> subscribers;
    
      private Event(Object event, Iterator<Subscriber> subscribers) {
        this.event = event;
        this.subscribers = subscribers;
      }
    }
    }
    
    • 这段代码有三个关键点需要注意:
      1. PerThreadQueuedDispatcher 通过Queue来对Event进行存储
      2. Queue以及 dispatching都是ThreadLocal变量,也就意味着每个线程维护自己的一个变量,即线程安全的
      3. nextEvent.subscribers.next().dispatchEvent(nextEvent.event);调用了 Subscribe的dispatchEvent (类似于文中开篇所讲的Observer模式中的被监听者中的iObserver.run(this),只是Observer模式中,是在被监听者中执行的,而EventBus中是在dispatcher中执行的) ,如果继续跟进代码会发现,这个dispatchEvent实际工作就是直接通过反射执行了Method方法(method.invoke(target, checkNotNull(event));)

至此,EventBus的同步执行方式已经分析完成


异步模式

异步模式, 它与同步模式的EventBus的主要区别有两点:

  1. 声明EventBus时,声明为 AsyncEventBus, 而AsyncEventBus的构造函数必须要传入一个 Executor
  2. 在Dispatcher上,AsyncEventBus 采用的事件分发器为 LegacyAsyncDispatcher
/**
   * Dispatches {@code event} to this subscriber using the proper executor.
   */
  final void dispatchEvent(final Object event) {
    executor.execute(new Runnable() {
      @Override
      public void run() {
        try {
          invokeSubscriberMethod(event);
        } catch (InvocationTargetException e) {
          bus.handleSubscriberException(e.getCause(), context(event));
        }
      }
    });
  }
  
private static final class LegacyAsyncDispatcher extends Dispatcher {

    /**
     * Global event queue.
     */
    private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
        Queues.newConcurrentLinkedQueue();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        e.subscriber.dispatchEvent(e.event);
      }
    }

    private static final class EventWithSubscriber {
      private final Object event;
      private final Subscriber subscriber;

      private EventWithSubscriber(Object event, Subscriber subscriber) {
        this.event = event;
        this.subscriber = subscriber;
      }
    }
  }

之所以异步模式传入Executor就是在通过 dispatchEvent 进行多线程的创建 new ThreadPoolExecutor().excute(new Runnable)
而之所以使用 LegacyAsyncDispatcher 目的还有一个就是这个 Dispatcher中使用的queue是ConcurrentLinkedQueue, 之所以使用这个Queue,后面会有专门的一个讲解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/717270.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网线不通?瞅瞅这里----关于交叉网线的原理。

最近搞了个项目&#xff0c;UDP对接UDP&#xff0c;死活对接不上。 最后发现是交叉网线的事情&#xff0c;在此记录交叉网线的原理。 先说结论&#xff1a;不同设备用直连&#xff0c;相同设备用交叉网线 细说说 1.原理 网线的原理实际就是TX与RX对接。 正常一个设备同时有…

关于使用命令行打开wps word文件

前言 在学习python-docx时&#xff0c;想在完成运行时使用命令行打开生成的docx文件。 总结 在经过尝试后&#xff0c;得出以下代码&#xff1a; commandrstart "C:\Users\86136\AppData\Local\Kingsoft\WPS Office\12.1.0.16929\office6\wps.exe" "./result…

智能室内空气质量监测预警系统小程序设计说明书

智能室内空气质量监测预警系统小程序设计说明书 一、应用功能与系统设计 &#xff08;一&#xff09; 应用功能 该小程序设计的目的是为了配合环境监测吸顶灯,Mini空气监测仪等硬件设备实时数据展示与远程设备控制等功能&#xff0c;系统框架图如图1-1所示。用户可以从小程序…

生活好物:日常更精彩

我们的日用杂货店&#xff0c;是生活美学的聚集地。这里汇聚了各式各样的生活用品&#xff0c;每一件都蕴含着对生活的热爱与追求。 走进我们的日用杂货店&#xff0c;仿佛打开了一个充满生活气息的宝藏盒。从厨房的锅碗瓢盆&#xff0c;到浴室的洗漱用品&#xff0c;再到客厅的…

Excel和Word等工具小技能分享汇编(一)

这里汇集刘小生前期微信公众号分享的Excel和Word等工具小技能&#xff0c;为方便大家查看学习&#xff0c;刘小生对其进行分类整理&#xff0c;后期也会不定期整理更新&#xff0c;如有想学习交流或其他小技巧需求&#xff0c;欢迎留言&#xff0c;我们一起学习进步&#xff01…

免费 逼真:快手“可灵”后又一Sora级选手登场

就在今日&#xff0c;英伟达投资的旧金山初创公司 Luma AI 打出一手王牌&#xff0c;推出新一代 AI 视频生成模型 Dream Machine&#xff0c;可以文生视频&#xff0c;图生视频&#xff0c;人人免费可用。同时&#xff0c;Luma AI 称 Dream Machine 可以从文本和图像生成“高质…

【会议征稿】第五届物联网、人工智能与机械自动化国际学术会议 (IoTAIMA 2024,7月19-21)

由浙江工业大学主办&#xff0c;第五届物联网、人工智能与机械自动化国际学术会议 (IoTAIMA 2024) 将于2024年7月19-21日在浙江杭州召开。 会议旨在为从事物联网、人工智能与机械自动化的专家学者、工程技术人员、技术研发人员提供一个共享科研成果和前沿技术&#xff0c;了解学…

【SXF2024笔试】

编程题 1. 最长不重复子数组2. 编辑任务所需的最短时间3. 主机连通所需的最短跳数4. 十进制数字的汉诺塔编码 1. 最长不重复子数组 2. 编辑任务所需的最短时间 3. 主机连通所需的最短跳数 4. 十进制数字的汉诺塔编码

STM32在进入main函数之前的准备工作

在大部分嵌入式系统中&#xff0c;在进入main函数之前都需要执行一个系统 初始化序列。这里所说的初始化序列特指的是软件运行环境的初始化。 上图是系统开始运行后&#xff0c;在进入main函数之前的默认初始化序列。从图中可以看出&#xff0c;在左侧有2个函数&#xff1a;__m…

各大APP自动化运行插件开发需要用到的源代码有哪些?

在当今数字化时代&#xff0c;自动化运行插件的开发在各大APP中扮演着至关重要的角色&#xff0c;这些插件不仅提升了APP的功能性和效率&#xff0c;同时也为用户带来了更加便捷的使用体验。 在开发这些自动化运行插件的过程中&#xff0c;源代码的选择与使用显得尤为关键&…

RocketMQ快速入门:集成java客户端实现各类消息发送|异步、同步、顺序、单向、延迟、事务(五)附带源码

0. 引言 前面的章节中&#xff0c;我们已经针对rocketmq的基本概念和消息发送、消费流程进行了讲解&#xff0c;但实际在开发中如何实现rocketmq的接入、实现消息发送、消费还没有落实&#xff0c;那么今天&#xff0c;我们继续来学习如何基于java client集成rocketMQ 1. 集成…

Vue47-修改默认配置webpack.config.js文件

main.js是脚手架项目的入口文件&#xff0c;系统运行时&#xff0c;默认去找src下的main.js文件。这是webpack通过配置文件&#xff1a;webpack.config.js配置的。 脚手架把所有重要的配置文件都隐藏了&#xff0c;方式被开发者修改。 一、查看被隐藏的webpack配置 1-1、webpa…

python基础语法 002 - 3 数据运算

1 运算符 1.1 算术运算符 -*/ 1.1.1 除法&#xff1a;会类型转换、被除数不能为0 #算术运算符a 1 2 print(a) b a - 1 print(b) c b 6 print(c)# 为什么除法得不到整数&#xff1f; #除法可能遇到除不尽 #使用了除法数据类型会转化为浮点数 d c / 2 print(d) print(typ…

SAP 在过账的时候系统提示:被合并的公司 XXXX 和 ‘ ‘ 是不同的解决办法

最近用户反馈在STO的业务模式中交货单过账的时候&#xff0c;报错没有办法过账。查看了一下报错的信息提示&#xff1a;被合并的公司 和1300是不同的 如下图所示&#xff1a; 消息号是F5080 首先根据SAP的消息号找了一下NOTE&#xff0c;发现2091823有详细的说。 主要是财务…

硕士毕业论文《基于磁纹理的磁化动力学研究》

前言 本文是博主的硕士毕业论文&#xff0c;应该也是“自旋电子学&#xff08;微磁学&#xff09;”博客专栏的最后一篇博客&#xff0c;该毕业论文预设排版的PDF版本见下载链接&#xff1a;https://download.csdn.net/download/qq_43572058/89447526。若该博客专栏对读者您的…

Linux:生产消费模型 读者写者模型

Linux&#xff1a;生产消费模型 & 读者写者模型 生产消费模型阻塞队列基本结构构造与析构投放任务获取任务总代码 POSIX 信号量基本概念接口 环形队列基本结构构造与析构投放任务获取任务总代码 读者写者模型读写锁 生产消费模型 生产消费模型是一种用于处理多线程之间任务…

「6.18福利」精选大厂真题|笔试刷题陪伴|明天正式开屋啦 - 打卡赢价值288元丰厚奖励

&#x1f370;关于清隆学长 大家好&#xff0c;我是清隆&#xff0c;拥有ACM区域赛 银牌&#x1f948;&#xff0c;CCCC天梯赛 国一&#xff0c;PTA甲级 98 分。 致力于算法竞赛和算法教育已有 3 年&#xff0c;曾多次 AK 互联网大厂笔试&#xff0c;大厂实习经验丰富。 打卡…

Hive笔记-2

第 3 章 DDL (Data Definition Language) 数据定义 DDL数据定义语言 DML数据操作语言 3.1 数据库 (database) 3.1.1 创建数据库 1) 语法 CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] [LOCATION hdfs_path] [WITH DBPROPERTIES (property_…

环信beta版鸿蒙IM SDK发布!深度适配HarmonyOS NEXT系统

环信beta版鸿蒙IM SDK已正式发布&#xff01;欢迎有需求开发者体验集成&#xff01; 版本亮点 提供原生鸿蒙 SDK&#xff0c;支持原生 ArkTS 语言&#xff0c;全面拥抱鸿蒙生态提供鸿蒙系统上单聊、群聊、会话等能力和服务覆盖消息管理、用户属性、群租管理、离线推送.多设备…

作者推荐 | 探索分析从起源到现今的巅峰之旅(MySQL存储模型)

探索分析从起源到现今的巅峰之旅 背景介绍MySQL内部组织与结构MySQL的数据层次和关系InnoDB的数据存储模型数据记录的基本单元 — 行页目录&#xff08;Page Directory&#xff09;文件头&#xff08;File Header&#xff09;决定页面间的关联方式数据页头&#xff08;Page Hea…