Java学习笔记-day05-响应式编程初探-自定义实现Reactive Streams规范

最近在学响应式编程,这里先记录下,响应式编程的一些基础内容

1.名词解释

Reactive Streams、Reactor、WebFlux以及响应式编程之间存在密切的关系,它们共同构成了在Java生态系统中处理异步和响应式编程的一系列工具和框架。

  1. Reactive Streams:

    • Reactive Streams 是一个规范,定义了一组接口和协议,用于处理异步数据流的背压。它包括发布者(Publisher)、订阅者(Subscriber)、订阅(Subscription)和处理器(Processor)等接口。
    • Reactive Streams 规范的目标是提供一种标准的方式来处理异步数据流,解决背压问题。Java标准库从Java 9开始提供了 java.util.concurrent.Flow 类,实现了Reactive Streams规范。
  2. Reactor:

    • Reactor 是一个基于Reactive Streams规范的响应式编程框架。它提供了一组用于构建异步、事件驱动、响应式应用程序的工具和库。Reactor 的核心是 Flux(表示一个包含零到多个元素的异步序列)和 Mono(表示一个包含零或一个元素的异步序列)。
    • Reactor 通过提供响应式的操作符,如mapfilterflatMap等,使得开发者能够方便地进行数据流的转换和处理。
  3. WebFlux:

    • WebFlux 是Spring Framework 5引入的响应式编程支持。它构建在 Reactor 之上,提供了一套用于构建异步、非阻塞、响应式的Web应用程序的API。WebFlux支持使用Reactive Streams处理HTTP请求和响应。
    • Spring WebFlux 可以用于构建反应式的RESTful服务,支持使用注解的方式定义路由和处理器函数。
  4. 响应式编程:

    • 响应式编程是一种编程范式,强调数据流和变化的传播。在这个范式中,数据源产生数据并通知观察者,观察者相应地处理这些数据。这种方式更容易处理异步操作和事件。
    • 在Java中,响应式编程通常涉及到使用类似于Reactor或RxJava的库,这些库提供了响应式的操作符和工具。

综上所述,Reactive Streams 提供了规范,Reactor 是一个实现了该规范的响应式编程框架,而WebFlux是Spring对于响应式编程的支持。它们共同致力于构建异步、非阻塞、响应式的应用程序。响应式编程则是一种更广义的编程范式,与Reactive Streams和Reactor等具体实现密切相关。

2.Reactive Streams 规范

2.1.Reactive Streams规范定义

java.util.concurrent.Flow 类中,定义了Reactive Streams规范
在这里插入图片描述

  • Publisher(发布者):负责生成数据流,并向订阅者发送数据。
  • Subscriber(订阅者):表示数据流的消费者,它订阅一个或多个发布者,并接收数据。
  • Subscription(订阅):表示订阅关系的接口,用于控制数据流的请求和取消。
  • Processor(处理器):充当发布者和订阅者的中间组件,可以对数据进行转换和处理。

2.2.API方法

1. Publisher(发布者):
interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}
  • subscribe(Subscriber<? super T> subscriber) 用于订阅数据流。当订阅者调用这个方法时,发布者将建立与订阅者的订阅关系,并开始推送数据。
2. Subscriber(订阅者):
interface Subscriber<T> {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}
  • onSubscribe(Subscription subscription) 在订阅关系建立时调用。通过这个方法,订阅者可以持有 Subscription 对象,以便后续请求数据和取消订阅。

  • onNext(T item) 在接收到新元素时调用。订阅者通过这个方法处理收到的数据。

  • onError(Throwable throwable) 在数据流中出现错误时调用。订阅者通过这个方法处理错误情况。

  • onComplete() 在数据流完成时调用。通知订阅者数据流结束,不再有新的元素。

3. Subscription(订阅):
interface Subscription {
    void request(long n);
    void cancel();
}
  • request(long n) 用于请求订阅者处理指定数量的元素。订阅者通过这个方法告知发布者它可以处理多少个元素。

  • cancel() 用于取消订阅关系。当订阅者不再需要接收数据时,调用此方法取消订阅。

4. Processor(处理器):
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Processor 接口是 SubscriberPublisher 的组合,表示一个中间处理组件,可以同时充当订阅者和发布者的角色。

  • Subscriber 部分的方法:onSubscribe(Subscription subscription), onNext(T item), onError(Throwable throwable), onComplete()

  • Publisher 部分的方法:subscribe(Subscriber<? super R> subscriber)。表示 Processor 可以被其他订阅者订阅。

5.泛型T

泛型T即为数据流

这些方法共同构成 Reactive Streams 协议,定义了发布者和订阅者之间的协作方式,以及订阅者如何处理数据流。在实际的使用中,这些方法的实现通常需要考虑异步处理、背压机制等方面,以确保响应式编程的目标得以实现。

2.3.工作流程

在 Reactive Streams 中,PublisherSubscriberSubscriptionProcessor 之间的协作流程如下:

有时间再补流程图
在这里插入图片描述

  1. Publisher(发布者):

    • Publisher 是异步产生数据流的组件,它通过 subscribe 方法允许订阅者订阅。subscribe 方法会接收一个 Subscriber 对象作为参数。
    • Publisher 有新数据准备好时,通过调用订阅者的 onNext 方法将数据推送给订阅者。
    interface Publisher<T> {
        void subscribe(Subscriber<? super T> subscriber);
    }
    
  2. Subscriber(订阅者):

    • Subscriber 是数据流的消费者,通过实现 Subscriber 接口来接收来自发布者的数据。订阅者通过调用 subscription.request(n) 请求一定数量的数据,处理数据时通过 onNext 方法接收元素。
    • 当订阅者无法处理更多的元素时,可以调用 subscription.cancel() 来取消订阅。
    interface Subscriber<T> {
        void onSubscribe(Subscription subscription);
        void onNext(T item);
        void onError(Throwable throwable);
        void onComplete();
    }
    
  3. Subscription(订阅):

    • Subscription 表示订阅关系,它在 onSubscribe 方法中被传递给订阅者。通过 Subscription,订阅者可以请求数据和取消订阅。
    • 订阅者通过 request(long n) 方法请求处理 n 个元素,通过 cancel() 方法取消订阅。
    interface Subscription {
        void request(long n);
        void cancel();
    }
    
  4. Processor(处理器):

    • Processor 是一个同时实现了 PublisherSubscriber 接口的中间组件,可以作为数据流的处理器,对数据进行转换和处理。
    • Processor 既能接收数据,也能发布数据。它将 onNextonErroronComplete 方法委托给下游的订阅者,并将数据推送给上游的发布者。
    interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    

这些接口一起构成了 Reactive Streams 的基本协议。发布者产生数据,订阅者订阅数据流并通过 onNext 方法接收元素,订阅者通过 request 方法请求处理一定数量的元素,同时可以通过 cancel 方法取消订阅。Processor 则可以用于在订阅者和发布者之间进行数据转换和处理。在 Reactive Streams 的实现中,这些接口的方法调用是异步进行的,以支持非阻塞的数据流处理。

3.自定义实现Reactive Streams规范

自己实现了一个,参考了SubmissionPublisher

  • 同步实现的
  • 功能不完善
  • 有bug
class MyPublisher implements Flow.Publisher<String>{
    MySubscription<String> subscription;
    public int request ;
    public void publish(String item){
        subscription.items.add(item);
        while (true) {
            if (request > 0) {
                for (int i = 0; i < request; i++) {
                    if (!subscription.items.isEmpty()) {
                        try {
                            Object o = subscription.items.get(subscription.items.size() - 1);
                            subscription.subscriber.onNext(o.toString());
                            subscription.items.remove(o);
                        }catch (Exception e){
                            subscription.subscriber.onError(e);
                            return;
                        }
                    }

                }
            }
            if (subscription.items.isEmpty()) {
                break;
            }
        }
    }

    @Override
    public void subscribe(Flow.Subscriber<? super String> subscriber) {
        System.out.println("第一步:绑定订阅者" );
        MySubscription<String> subscription = new MySubscription<>(subscriber,this);
        this.subscription = subscription;
        subscriber.onSubscribe(subscription);
    }

}

class MySubscriber implements Flow.Subscriber<String>{

    private Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("第二步:接收Subscription" );
        this.subscription = subscription;
        // 请求订阅者处理的元素数量
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.println("第四步:推送数据" );
        System.out.println("MySubscriber 消费了item = " + item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("出异常了 = " + throwable);
    }

    @Override
    public void onComplete() {

    }

}

class MySubscription<T> implements Flow.Subscription{

    final Flow.Subscriber<? super T> subscriber;
    final MyPublisher publisher;
    List items = new ArrayList();
    public MySubscription(Flow.Subscriber<? super T> subscriber, MyPublisher publisher) {
        this.subscriber = subscriber;
        this.publisher = publisher;
    }

    @Override
    public void request(long n) {
        this.publisher.request++;
        System.out.println("第三步:拉取请求" );
    }

    @Override
    public void cancel() {

    }
}
public class FlowDemo {

    public static void main(String[] args) {
        MyPublisher myPublisher = new MyPublisher();
        MySubscriber mySubscriber = new MySubscriber();
        myPublisher.subscribe(mySubscriber);
        myPublisher.publish("111");
        myPublisher.publish("222");
        myPublisher.publish(null);


    }
}

4.Jdk实现Reactive Streams使用示例

class SimplePublisher implements Flow.Publisher<Integer> {

    private final SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

    public void publishItems() {
        for (int i = 1; i <= 5; i++) {
            publisher.submit(i);
        }

        // 发布者完成发布
        publisher.close();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        publisher.subscribe(subscriber);
    }
}

class SimpleSubscriber implements Flow.Subscriber<Integer> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        // 请求订阅者处理的元素数量
        subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Received item: " + item);
        // 处理完一个元素后请求下一个
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error occurred: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Processing completed.");
    }
}

public class ReactiveStreamsExample {

    public static void main(String[] args) throws InterruptedException {
        // 创建发布者和订阅者
        SimplePublisher simplePublisher = new SimplePublisher();
        SimpleSubscriber simpleSubscriber = new SimpleSubscriber();

        // 订阅者订阅发布者
        simplePublisher.subscribe(simpleSubscriber);

        // 发布者发布数据
        simplePublisher.publishItems();
        // 睡一觉,确保数据处理完成
        Thread.sleep(3000);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/304972.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

12.8-1.8

2023.12.8 redis容器 docker run -p 6379:6379 --name redis -v /mydata/redis/data:/data -v /mydata/redis/conf/redis.conf:/etc/redis/redis.conf -d redis redis-server /etc/redis/redis.conf redis.conf不存在&#xff0c;需先在宿主机创建该目录下文件&#xff0c…

知识点整理[(GraphGeo)RELATED WORK]

2 RELATED WORK 2.1 IP Geolocation 问题一:IP定位预测方法之一:Data mining-based methods 回答: 依赖于在公开的资源中挖掘位置线索来对目标IP(target IP)进行地理定位。其中一些数据分析了来自与IP相关的数据库,如WHOIS数据库和DNS的数据。 (1)例如,Moore等…

Transformer架构的局限已凸显,被取代还有多久?

江山代有才人出&#xff0c;各领风骚数百年。这句话无论是放在古往今来的人类身上&#xff0c;还是放在当今人工智能领域的大模型之上&#xff0c;都是最贴切不过的。无论是一个时代的伟人&#xff0c;还是统治一个领域的技术&#xff0c;最终都会有新的挑战者将其替代。Transf…

2024-01-09 Android.mk 根据c文件名插入特定的宏定义,我这里用于定义log LOG_TAG 标签

一、在Android的构建系统中&#xff0c;使用Android.mk构建脚本可以根据特定需求来定义宏。如果你想根据C文件的名称来插入特定的宏定义&#xff0c;可以使用条件语句检查文件名&#xff0c;并相应地设置宏。 在Android的构建系统中&#xff0c;使用Android.mk构建脚本可以根据…

解决uni-app小程序获取路由及路由参数

代码: this.id = this.$route.query.id;错误信息: 解决方案: // 获取query对象// #ifdef H5this.id = this.$route

《MLCC电容失效模式揭秘:机械、热、电裂纹分析及预防》

Q&#xff1a;MLCC电容是什么结构的呢&#xff1f; A&#xff1a;多层陶瓷电容器是由印好电极&#xff08;内电极&#xff09;的陶瓷介质膜片以错位的方式叠合起来&#xff0c;经过一次性高温烧结形成陶瓷芯片&#xff0c;再在芯片的两端封上金属层&#xff08;外电极&#xf…

IPv6路由协议---IPv6动态路由(OSPFv3-3)

OSPFv3使用Link-local地址 OSPFv3是运行在IPv6上的路由协议,同样使用链路本地地址来维持邻居,同步LSA数据库。除Vlink外的所有OSPFv3接口都使用链路本地地址作为源地址及下一跳来发送OSPFv3报文,带来的好处: 不需要配置IPv6全局地址,就可以得到OSPFv3拓扑,实现拓扑与地址…

HarmonyOS应用开发者基础认证考试

判断题 1.Ability是系统调度应用的最小单元,是能够完成一个独立功能的组件。一个应用可以包含一个或多个Ability。 正确(True) 2.所有使用Component修饰的自定义组件都支持onPageShow,onBackPress和onPageHide生命周期函数。 错误(False) 3.每调用一次router.pushUrl()方法,…

SAM-Track online / offline配置

segment anything model&#xff08;SAM&#xff09;是Meta于2023年4月5日发布的分割基础模型。SAM 允许分割任何对象而无需微调。 可以在这里尝试SAM模型的效果。 分割效果这么好&#xff0c;都忍不住想用SAM来做场景的语义分割&#xff0c;realtime与否先放在一边&#xff0c…

oracle19c容器数据库rman备份特性-----性能优化(三)

目录 冗余备份片 1.备份的时候指定 2.rman配置中设定 归档备份&#xff08;将备份集保留&#xff09; 二级备份&#xff08;将备份文件保留&#xff09; 1.备份闪回恢复区的恢复文件 2.备份所有恢复文件 recovery catalog database 1.創建recovery catalog 2.创建VPC…

Vscode设置git账户密码(不需要每次都输入)

在Vscode提交项目代码或者拉取代码的时候&#xff0c;如果每次都需要输入git的账户密码&#xff0c;那么就在终端输入&#xff1a; git config --global credential.helper store 命令 然后执行git pull 提示输入用户密码后&#xff0c;就会缓存&#xff1b; ※注&#xff1a;如…

今日实践 — 附加数据库/重定向失败如何解决?

WMS数据库与重定向 前言正文如何建立数据库连接&#xff1f;第一步&#xff1a;打开SSMS&#xff0c;右击数据库&#xff0c;点击附加第二步&#xff1a;点击添加第三步&#xff1a;找到自己的数据库文件&#xff0c;点击确定按钮第四步&#xff1a;若有多个数据库&#xff0c;…

BEV+Transformer感知架构共识下,传感器「火药味」再升级

高阶智能驾驶战火愈演愈烈&#xff0c;正带动感知方案卷入新一轮军备竞赛。 根据高工智能汽车研究院最新发布数据显示&#xff0c;2023年1-9月&#xff0c;中国市场&#xff08;不含进出口&#xff09;乘用车前装标配&#xff08;软硬件&#xff09;NOA交付新车37.73万辆&…

Qt QLineEdit文本框控件

文章目录 1 属性和方法1.1 占位字符串1.2 对齐方式1.3 回显模式1.4 读写控制1.5 格式控制1.6 信号和槽 2 实例2. 布局2.2 代码实现 QLineEdit 是Qt 中的文本框&#xff0c;准确地说是单行文本框&#xff0c;通常用于接受用户的输入。 比如用户输入用户名、密码等&#xff0c;都…

中国京津冀国际光伏展

中国京津冀国际光伏展是一个专门展示光伏技术和产品的展览会。该展览会旨在促进光伏产业的发展和推广&#xff0c;推动太阳能光伏的应用和利用。 这个展览会通常会邀请国内外的光伏企业和专家参加&#xff0c;展示最新的光伏技术和产品&#xff0c;包括太阳能电池板、光伏组件、…

Spring MVC MVC介绍和入门案例

1.SpringMVC概述 1.1.MVC介绍 MVC是一种设计模式&#xff0c;将软件按照模型、视图、控制器来划分&#xff1a; M&#xff1a;Model&#xff0c;模型层&#xff0c;指工程中的JavaBean&#xff0c;作用是处理数据 JavaBean分为两类&#xff1a; 一类称为数据承载Bean&#xf…

解读 Sobit v2:铭文资产跨链更注重安全、易用性

铭文市场的发展正在从早期的“无序”进入到“有序”阶段&#xff0c;我们看到从 12 月份以来&#xff0c;比特币生态内的多个应用纷纷宣布获得融资。这表明&#xff0c;目前仍旧有大量的资金有意向铭文领域&#xff0c;同样铭文赛道新一轮浪潮或许正在酝酿。 另一方面&#xff…

Linux ps命令

一. 说明 用于显示系统中当前运行的进程信息。 提供了查看进程的不同视图和选项&#xff0c;允许用户了解系统上正在运行的进程的状态、资源使用情况等。 -e&#xff1a;显示所有进程&#xff0c;而不仅仅是与当前终端关联的进程。-f&#xff1a;以完整的格式显示进程信息&am…

混淆技术概论

混淆技术概论 引言 在逆向工程领域&#xff0c;混淆技术是一种非常重要的技术手段&#xff0c;通过打破人们的思维惯性&#xff0c;使得逆向分析变得更加困难。本文将会介绍混淆技术的概念、分类及其应用&#xff0c;以及如何使用IPA Guard进行iOS IPA重签名。 混淆技术概述…

python(17)--文件的输入/输出

前言 在Python中&#xff0c;文件文本操作是非常重要的&#xff0c;主要有以下几个原因&#xff1a; 数据持久性&#xff1a;当你需要长期存储数据&#xff0c;如用户的个人信息、交易记录或数据库元数据等&#xff0c;将数据保存在文件中是一种常见的方法。文件系统提供了持…