响应式编程初探-自定义实现Reactive Streams规范

最近在学响应式编程,这里先记录下,响应式编程的一些基础内容

1.名词解释

Reactive Streams、Reactor、WebFlux以及响应式编程之间存在密切的关系,它们共同构成了在Java生态系统中处理异步和响应式编程的一系列工具和框架。

  1. Reactive Streams:

    • Reactive Streams 是一个规范,定义了一组接口和协议,用于处理异步数据流的背压。它包括发布者(Publisher)、订阅者(Subscriber)、订阅(Subscription)和处理器(Processor)等接口。
    • Reactive Streams 规范的目标是提供一种标准的方式来处理异步数据流,解决背压问题。Java标准库从Java 9开始提供了 java.util.concurrent.Flow 类,定义了Reactive Streams规范。
  2. Reactor:

    • Reactor 是一个基于Reactive Streams规范的响应式编程框架。它提供了一组用于构建异步、事件驱动、响应式应用程序的工具和库。Reactor 的核心是 Flux(表示一个包含零到多个元素的异步序列)和 Mono(表示一个包含零或一个元素的异步序列)。
    • Reactor 通过提供响应式的操作符,如mapfilterflatMap等,使得开发者能够方便地进行数据流的转换和处理。
  3. WebFlux:

    • WebFlux 是Spring Framework 5引入的响应式编程支持。它构建在 Reactor 之上,提供了一套用于构建异步、非阻塞、响应式的Web应用程序的API。WebFlux支持使用Reactive Streams处理HTTP请求和响应。
    • Spring WebFlux 可以用于构建反应式的RESTful服务,支持使用注解的方式定义路由和处理器函数。
  4. 响应式编程:

    • 响应式编程是一种编程范式,强调数据流和变化的传播。在这个范式中,数据源产生数据并通知观察者,观察者相应地处理这些数据。这种方式更容易处理异步操作和事件。
    • 在Java中,响应式编程通常涉及到使用类似于Reactor或RxJava的库,这些库提供了响应式的操作符和工具。

综上所述,Reactive Streams 提供了规范,Reactor 是一个实现了该规范的响应式编程框架,而WebFlux是Spring对于响应式编程的支持。它们共同致力于构建异步、非阻塞、响应式的应用程序。响应式编程则是一种更广义的编程范式,与Reactive Streams和Reactor等具体实现密切相关。

2.Reactive Streams 规范

2.1.Reactive Streams规范定义

java.util.concurrent.Flow 类中,定义了Reactive Streams规范
在这里插入图片描述

  • Publisher(发布者):负责生成数据流,并向订阅者发送数据。
  • Subscriber(订阅者):表示数据流的消费者,它订阅一个或多个发布者,并接收数据。
  • Subscription(订阅):表示订阅关系的接口,用于控制数据流的请求和取消。
  • Processor(处理器):充当发布者和订阅者的中间组件,可以对数据进行转换和处理。

2.2.API方法

1. Publisher(发布者):
interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}
  • subscribe(Subscriber<? super T> subscriber) 用于订阅数据流。当订阅者调用这个方法时,发布者将建立与订阅者的订阅关系,并开始推送数据。
2. Subscriber(订阅者):
interface Subscriber<T> {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}
  • onSubscribe(Subscription subscription) 在订阅关系建立时调用。通过这个方法,订阅者可以持有 Subscription 对象,以便后续请求数据和取消订阅。

  • onNext(T item) 在接收到新元素时调用。订阅者通过这个方法处理收到的数据。

  • onError(Throwable throwable) 在数据流中出现错误时调用。订阅者通过这个方法处理错误情况。

  • onComplete() 在数据流完成时调用。通知订阅者数据流结束,不再有新的元素。

3. Subscription(订阅):
interface Subscription {
    void request(long n);
    void cancel();
}
  • request(long n) 用于请求订阅者处理指定数量的元素。订阅者通过这个方法告知发布者它可以处理多少个元素。

  • cancel() 用于取消订阅关系。当订阅者不再需要接收数据时,调用此方法取消订阅。

4. Processor(处理器):
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Processor 接口是 SubscriberPublisher 的组合,表示一个中间处理组件,可以同时充当订阅者和发布者的角色。

  • Subscriber 部分的方法:onSubscribe(Subscription subscription), onNext(T item), onError(Throwable throwable), onComplete()

  • Publisher 部分的方法:subscribe(Subscriber<? super R> subscriber)。表示 Processor 可以被其他订阅者订阅。

5.泛型T

泛型T即为数据流

这些方法共同构成 Reactive Streams 协议,定义了发布者和订阅者之间的协作方式,以及订阅者如何处理数据流。在实际的使用中,这些方法的实现通常需要考虑异步处理、背压机制等方面,以确保响应式编程的目标得以实现。

2.3.工作流程

在 Reactive Streams 中,PublisherSubscriberSubscriptionProcessor 之间的协作流程如下:

有时间再补流程图
在这里插入图片描述

  1. Publisher(发布者):

    • Publisher 是异步产生数据流的组件,它通过 subscribe 方法允许订阅者订阅。subscribe 方法会接收一个 Subscriber 对象作为参数。
    • Publisher 有新数据准备好时,通过调用订阅者的 onNext 方法将数据推送给订阅者。
    interface Publisher<T> {
        void subscribe(Subscriber<? super T> subscriber);
    }
    
  2. Subscriber(订阅者):

    • Subscriber 是数据流的消费者,通过实现 Subscriber 接口来接收来自发布者的数据。订阅者通过调用 subscription.request(n) 请求一定数量的数据,处理数据时通过 onNext 方法接收元素。
    • 当订阅者无法处理更多的元素时,可以调用 subscription.cancel() 来取消订阅。
    interface Subscriber<T> {
        void onSubscribe(Subscription subscription);
        void onNext(T item);
        void onError(Throwable throwable);
        void onComplete();
    }
    
  3. Subscription(订阅):

    • Subscription 表示订阅关系,它在 onSubscribe 方法中被传递给订阅者。通过 Subscription,订阅者可以请求数据和取消订阅。
    • 订阅者通过 request(long n) 方法请求处理 n 个元素,通过 cancel() 方法取消订阅。
    interface Subscription {
        void request(long n);
        void cancel();
    }
    
  4. Processor(处理器):

    • Processor 是一个同时实现了 PublisherSubscriber 接口的中间组件,可以作为数据流的处理器,对数据进行转换和处理。
    • Processor 既能接收数据,也能发布数据。它将 onNextonErroronComplete 方法委托给下游的订阅者,并将数据推送给上游的发布者。
    interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    

这些接口一起构成了 Reactive Streams 的基本协议。发布者产生数据,订阅者订阅数据流并通过 onNext 方法接收元素,订阅者通过 request 方法请求处理一定数量的元素,同时可以通过 cancel 方法取消订阅。Processor 则可以用于在订阅者和发布者之间进行数据转换和处理。在 Reactive Streams 的实现中,这些接口的方法调用是异步进行的,以支持非阻塞的数据流处理。

3.自定义实现Reactive Streams规范

自己实现了一个,参考了SubmissionPublisher

  • 同步实现的
  • 功能不完善
  • 有bug
class MyPublisher implements Flow.Publisher<String>{
    MySubscription<String> subscription;
    public int request ;
    public void publish(String item){
        subscription.items.add(item);
        while (true) {
            if (request > 0) {
                for (int i = 0; i < request; i++) {
                    if (!subscription.items.isEmpty()) {
                        try {
                            Object o = subscription.items.get(subscription.items.size() - 1);
                            subscription.subscriber.onNext(o.toString());
                            subscription.items.remove(o);
                        }catch (Exception e){
                            subscription.subscriber.onError(e);
                            return;
                        }
                    }

                }
            }
            if (subscription.items.isEmpty()) {
                break;
            }
        }
    }

    @Override
    public void subscribe(Flow.Subscriber<? super String> subscriber) {
        System.out.println("第一步:绑定订阅者" );
        MySubscription<String> subscription = new MySubscription<>(subscriber,this);
        this.subscription = subscription;
        subscriber.onSubscribe(subscription);
    }

}

class MySubscriber implements Flow.Subscriber<String>{

    private Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("第二步:接收Subscription" );
        this.subscription = subscription;
        // 请求订阅者处理的元素数量
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.println("第四步:推送数据" );
        System.out.println("MySubscriber 消费了item = " + item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("出异常了 = " + throwable);
    }

    @Override
    public void onComplete() {

    }

}

class MySubscription<T> implements Flow.Subscription{

    final Flow.Subscriber<? super T> subscriber;
    final MyPublisher publisher;
    List items = new ArrayList();
    public MySubscription(Flow.Subscriber<? super T> subscriber, MyPublisher publisher) {
        this.subscriber = subscriber;
        this.publisher = publisher;
    }

    @Override
    public void request(long n) {
        this.publisher.request++;
        System.out.println("第三步:拉取请求" );
    }

    @Override
    public void cancel() {

    }
}
public class FlowDemo {

    public static void main(String[] args) {
        MyPublisher myPublisher = new MyPublisher();
        MySubscriber mySubscriber = new MySubscriber();
        myPublisher.subscribe(mySubscriber);
        myPublisher.publish("111");
        myPublisher.publish("222");
        myPublisher.publish(null);


    }
}

4.Jdk实现Reactive Streams使用示例

class SimplePublisher implements Flow.Publisher<Integer> {

    private final SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

    public void publishItems() {
        for (int i = 1; i <= 5; i++) {
            publisher.submit(i);
        }

        // 发布者完成发布
        publisher.close();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        publisher.subscribe(subscriber);
    }
}

class SimpleSubscriber implements Flow.Subscriber<Integer> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        // 请求订阅者处理的元素数量
        subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Received item: " + item);
        // 处理完一个元素后请求下一个
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error occurred: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Processing completed.");
    }
}

public class ReactiveStreamsExample {

    public static void main(String[] args) throws InterruptedException {
        // 创建发布者和订阅者
        SimplePublisher simplePublisher = new SimplePublisher();
        SimpleSubscriber simpleSubscriber = new SimpleSubscriber();

        // 订阅者订阅发布者
        simplePublisher.subscribe(simpleSubscriber);

        // 发布者发布数据
        simplePublisher.publishItems();
        // 睡一觉,确保数据处理完成
        Thread.sleep(3000);
    }
}

学习打卡:Java学习笔记-day05-响应式编程初探-自定义实现Reactive Streams规范

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/319798.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ruoyi后台管理系统部署-1-安装JDK

CentOS 7 安装JDK 1. 最简单方法 部署JDK最简单的方法是使用&#xff1a;yum 首先查看原服务器有没有安装 jdk # 没有输出 java -version yum list installed | grep javayum 安装 查看可供安装的jdk&#xff1a; yum search java | grep jdk yum -y list java*应该输出一堆…

C#无标题栏窗体拖动代码

文章目录 一、概念二、案例三、常见问题四、链接 一、概念 C#&#xff08;C Sharp&#xff09;是由微软公司开发的一种面向对象的编程语言。它是从C和C语言演化而来的&#xff0c;并结合了Java和其他编程语言的特性。C#是微软.NET平台的一部分&#xff0c;允许开发人员创建各种…

超强站群系统v9.0:最新蜘蛛池优化技术,一键安装,内容无缓存刷新,高效安全

安全、高效&#xff0c;化的优化利用php性能&#xff0c;使得运行流畅稳定 独创内容无缓存刷新不变&#xff0c;节省硬盘。防止搜索引擎识别蜘蛛池 蜘蛛池算法&#xff0c;轻松构建站点&#xff08;电影、资讯、图片、论坛等等&#xff09; 可以个性化每个网站的风格、内容、…

YOLOv7涨点改进:多层次特征融合(SDI),小目标涨点明显,| UNet v2,比UNet显存占用更少、参数更少

💡💡💡本文全网独家改进:多层次特征融合(SDI),能够显著提升不同尺度和小目标的识别率 💡💡💡在YOLOv7中如何使用 1)iAFF加入Neck替代Concat; 收录: YOLOv7高阶自研专栏介绍: http://t.csdnimg.cn/tYI0c ✨✨✨前沿最新计算机顶会复现 🚀🚀🚀YOL…

6.2 声音编辑工具GoldWave5简介(7)

6.2.5其它常用功能 1&#xff0e;高低通 把录制的语音和背景音乐融合在一起时&#xff0c;可能会出现背景音乐音量过大&#xff0c;语音音量过小的现象&#xff0c;这时可以选择“低通”将背景音乐的音量降低一些。 (1)选择【效果】|【波波器】|【低通/高通】命令&#xff0…

wireshark使用教程

目录 windows平台安装Wireshark组件选择Additional TasksPacket CaptureUSB CaptureNpcap Installation Options Ubuntu上安装 Wireshark不使用 sudo 运行 Wireshark 使用GUI抓包使用命令行抓包确定抓取哪个网卡的报文抓取数据包停止抓包设置过滤条件 参考资料 Wireshark 是一款…

SpringBoot整合ES

1.引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> <version>2.6.3</version> </dependency> 2.config配置文件 Configu…

杨中科 EFCORE 第三部分 主键

主键 自增主键 1、EF Core支持多种主键生成策略:自动增长;Guid;Hi/Lo算法等。 2、自动增长。 优点:简单; 缺点: 数据库迁移以及分布式系统中&#xff08;多数据库合并&#xff0c;会有重复主键值&#xff09;比较麻烦;并发性能差&#xff08;大并发情况下&#xff0c;为了保证…

【LeetCode: 57. 插入区间+分类讨论+模拟】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

NLP论文阅读记录 - WOS | ROUGE-SEM:使用ROUGE结合语义更好地评估摘要

文章目录 前言0、论文摘要一、Introduction1.1目标问题1.2相关的尝试1.3本文贡献 二.相关工作三.本文方法四 实验效果4.1数据集4.2 对比模型4.3实施细节4.4评估指标4.5 实验结果4.6 细粒度分析 五 总结 前言 ROUGE-SEM: Better evaluation of summarization using ROUGE combin…

ssh 远程登录协议

一、SSH 服务 1.1 SSH 基础 SSH&#xff08;Secure Shell&#xff09;是一种安全通道协议&#xff0c;主要用来实现字符界面的远程登录、远程 复制等功能。SSH 协议对通信双方的数据传输进行了加密处理&#xff0c;其中包括用户登录时输入的用户口令&#xff0c;SSH 为建立在应…

STM8入门|第一个工程

开发软件 不支持Keil&#xff0c;使用IAR for STM8&#xff0c;注意 IAR系列有很多种 STM8对应软件是 IAR for STM8 软件下载&#xff1a; 官网下载地址&#xff0c;官网版本下载比较麻烦&#xff0c;可以按教程网盘地址下载。 下载安装教程&#xff1a; https://www.cnblogs…

Compileflow工作流引擎使用讲解

文章目录 1 Compileflow1.1 简介1.2 特点1.3 Compileflow插件下载1.4 main方法调用1.4.1 pom.xml1.4.2 新建bpm文件1.4.3 各个节点绑定方法1.4.4 测试方法 1.5 bpm各个标签说明1.5.1 BPM根节点1.5.2 全局变量1.5.3 开始节点: start1.5.4 结束节点: end1.5.5 自动节点: autoTask…

B-TREE(B-树)

B-TREE B-tree 又叫平衡多路查找树。一棵 m 阶的 B-tree (m 叉树)的特性如下&#xff08;其中 ceil(x)是一个取上限的函数&#xff09;&#xff1a; 树中每个结点至多有 m 个孩子&#xff1b; 除根结点和叶子结点外&#xff0c;其它每个结点至少有有 ceil(m / 2)个孩子&#…

JVM实战(13)——JVM优化概述

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 学习必须往深处挖&…

每日一题——LeetCode1128.等价多米诺骨牌对的数量

先尝试暴力解法&#xff1a; var numEquivDominoPairs function(dominoes) {var count0for(let i0;i<dominoes.length-1;i){for(let ji1;j<dominoes.length;j){if((dominoes[i][0]dominoes[j][0] && dominoes[i][1]dominoes[j][1]) || (dominoes[i][0]dominoes…

Qt/QML编程学习之心得:小键盘keyboard(36)

小键盘对于qml应用是经常用到的,在qml里面,就如一个fileDialog也要自己画一样,小键盘keyboard也是要自己画的,对于相应的每个按键的clicked都要一一实现的。 这里有一个示例: 代码如下: import QtQuick 2.5 import QtQuick.Controls 1.4 import QtQuick.Window 2.0 im…

Redis面试篇

redis面试题主要内容 面试官在面试时主要会问以下这些方面的问题 下面是一些问题示例&#xff1a; redis-使用场景 缓存 缓存穿透 介绍 缓存穿透&#xff1a;查询一个不存在的数据&#xff0c;mysql查询不到数据也不会直接写入缓存&#xff0c;就会导致每次请求都会去查数…

2.1 常用计算机网络体系结构

2.1 常用计算机网络体系结构 2.1.1 OSI体系结构 1、为了使不同体系结构的计算机网络都能够互联&#xff0c;国际标准化组织于1977年成立了专门机构研究该问题&#xff0c;不久他们就提出了一个试图使各种计算机在世界范围内都能够互连成网的标准框架&#xff0c;也就是著名的…

Redis命令 - Lists命令组常用命令

先创建一个 key 叫做 mylist&#xff0c;mylist存一个list。 list数据类型底层是一个链表。先进后出&#xff0c;后进先出。 命令中的L&#xff08;Left&#xff09;、R&#xff08;Right&#xff09;代表链表的头部L&#xff08;下标0的位置&#xff09;和尾部R&#xff08;…