响应式编程-Project Reactor Mono 介绍

响应式编程-Project Reactor Mono 介绍

本文以Mono的角度来介绍Reactor编程,Flux的使用同理。

初体验

Web应用 controller 方法在Spring webmvc 和 Spring webFlux下Controller方法实现示例如下:

Spring webmvc:

    @GetMapping("/test1")

    @ResponseBody

    public String test1(){

        String result =  geterateTest();

        return result;

    }

Spring webFlux

    @GetMapping("/test2")

    @ResponseBody

    public Mono<String> test2(){

        Mono<String> result = Mono.fromSupplier(this:: geterateTest);

        return result;

    }

一个的响应是String对象, 另一个是Mono<String>对象。Reactor Mono表示一个产生0-1元素的异步序列,异步指Mono创建的时候并不会执行任何操作,当Mono发生订阅时才触发Mono序列的运行。非阻塞表示test2方法不会产生任何阻塞,即使genereateTest里面是一个阻塞的操作,因为此时不会执行实际的逻辑,所以不会发生任何阻塞。

NettyHttpServer.onStateChange方法中构建Mono并进行订阅。

HttpServerOperations ops = (HttpServerOperations)connection;

//Web Flux将按照Spring Web中的约定构建一个Publisher(执行过滤器、Controller方//法)

Publisher<Void> publisher = (Publisher)this.handler.apply(ops, ops);

Mono<Void> mono = Mono.deferContextual((ctx) -> {

      ops.currentContext = Context.of(ctx);

      return Mono.fromDirect(publisher);

});

……

//subscribe将触发前面Spring web中封装在Mono构建过程中的业务逻辑的真正执行。

//如果我们按照命令是编程去编写代码,业务逻辑在构建Mono的过程中就执行了。

mono.subscribe(ops.disposeSubscriber());

注: Spring web flux框架下也可以按照传统的命令式编程。

Mono的构建

Reactor编程可以分为 异步序列Mono/Flux的构建和和使用两部分。

Mono的基本构建

Mono类 提供了大量静态方法帮助构建Mono。

  • just(T):返回T类型对象的Mono序列
  • fromFuture(future):Mono序列的元素对象由future产生,订阅时Future产生T并推送至订阅者。其他from方法类似。
  • empty():返回一个订阅时直接完成的异步序列
  • error():返回一个订阅时直接推送错误信号的序列

其他方法详见Mono类API:

如:Mono<String> mono = Mono.just("TEST");

Mono装配

假设我们按照上面示例,将整个程序都以响应式编程的模式进行开发,方法都返回一个异步序列Mono/Flux。当调用者调用某一个方法时,面对返回的Mono/Flux对象有两种选择:1. 订阅(触发执行), 2.装配(Assembly):继续将获取到的异步序列封装到一个新的异步序列中,继续返回给外部调用者。如:Spring Web Flux 则是将Spring web 定义的包括WebFilter、Controller等逻辑组装成一个复合的Mono,最终进行订阅。

图1 Mono装配示例

OptimizableOperator 接口

       OptimizableOperator <IN, OUT>接口提供了指向下一个OptimizableOperator的指针,并且提供了从IN型订阅者获取OUT订阅者的方法,提供了一个Mono串行的组装方法。

图2 OptimizableOperator接口串行组装示意图

要实现一个串行化的Mono组装类通常实现抽象类InternalMonoOperator<I, O>,构造函数传入一个Mono<I>,得到一个新的O型序列。实现subscribeOrReturn方法将O型订阅转化为原I型订阅者,新的I型订阅者实现了基于O性订阅者之上的强化操作。Mono提供了大量InternalMonoOperator<I,O>的实现类。下面对MonoFilter进行分析,解释了如果创建基于InternalMonoOperator实现的装配类和使用方法。

MonoFilter

将原Mono上增加一个过滤Predicate函数,当原Mono产生元素时,只有Predicate测试通过的元素才会传递给最终的订阅者,测试失败将进行过滤,Mono元素直接完成。

final class MonoFilter<T> extends InternalMonoOperator<T, T> {

         final Predicate<? super T> predicate;

         //构造函数必须包含源Mono,和其他附加增加元素,这里是一个Predicate函数

         MonoFilter(Mono<? extends T> source, Predicate<? super T> predicate) {

                  super(source);

                  this.predicate = Objects.requireNonNull(predicate, "predicate");

         }

         /**

         * 实现subscribeOrReturn,接收新Mono类型的订阅者,返回原Mono类型的订阅者。

         * 新的订阅者实现订阅时装配的目的,这里只有通过Predicate函数测试的元素,才会

         * 调用actual.onNext(T)方法推送给最终的订阅者

         **/

         @Override

         @SuppressWarnings("unchecked")

         public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {

                  if (actual instanceof ConditionalSubscriber) {

                          return new FluxFilter.FilterConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate);

                  }

                  return new FluxFilter.FilterSubscriber<>(actual, predicate);

         }

    ......

}

Mono内置了大量的InternalMonoOperator实现类,如MonoFilter,但Reactor框架并不对外暴露这些类,(这些实现类都是包内可见的),而是通过Mono方法的形式去方便获取各个可实现类的对象,并且统一以Mono类型的对外暴露。抽象统一的Mono使用范式比起暴露各种各样的实现细节显得简洁清晰。

我们可以使用Mono内置的InternalMonoOperator实现类,也可以实现自己的InternalMonoOperator类,但应和Reactor框架保持统一的用法, 在Mono的使用上统一以Mono类型和协议进行操作,不对外暴露具体的实现细节。

Mono 提供的装配方法

       Reactor框架并不暴露具体的装配类细节,而是提供了大量静态或实例方法来对Mono进行装配,返回装配后的新Mono。如上节所述的MonoFilter使用方法如下:

Mono.just(2).filter( (v -> v % 2 != 0)).subscribe(i -> System.out.println(i),

                error -> System.err.println("Error: " + error),

                ()-> System.out.println("complete"));

Mono filter方法返回了一个可以对原序列元素进行检测的增强Mono,上述例子因Mono.just(2) 中的元素值2 无法通过(v -> v % 2 != 0)的测试,将被过滤掉,无法传给最终的订阅者,而只能接受到原序列的结束信号, 因此只会打印“complete“。

Filter方法显示实际是返回的MonoFilter对象。

public final Mono<T> filter(final Predicate<? super T> tester) {

         ……

         return onAssembly(new MonoFilter<>(this, tester));

}

其他Mono装配方法:

  • Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2)

:将一个T1类型元素的Mono和一个T2类型元素的Mono中的元素组合成一个Tuple2<T1,T2>元素的Mono. Mono还提供了zip的多种版本,满足各种情况的Mono组合模式。

  • public final Mono<T> timeout(Duration timeout): 当原序列产生一个T类型元素后,如果没有在指定的时间内完成,则将触发一个错误。如果在限期内完成则没有任何影响,该实现使用了MonoTimeout<T, U, V> extends InternalMonoOperator<T, T>。
  • doOnXXXX系列方法,如doOnCancel,  doOnNext, doOnError等, 返回在特定事件上加入行为的增强Mono。

更多Mono的装配方法详见Mono API。

Mono的使用

Mono的使用其实只有一种就是对Mono进行订阅, 但是Mono类也提供了其他传统的接口来进行Mono的使用。

Mono的订阅

订阅Mono很简单,调用Mono对象的subscribe方法,传入一个CoreSubscriber的实现对象即可。

Mono.subscribe.源码中展示了对Mono装配后的复合Mono进行订阅的处理逻辑。

public final void subscribe(Subscriber<? super T> actual) {

    //获取最后一个装配的Mono corePublisher

         CorePublisher publisher = Operators.onLastAssembly(this);

         CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

         ......

             //如果最后一个装配的publisher 实现了OptmizableOperator接口,一路组装

             //增强的Subscriber,按照循序后去下一个OptmizableOperator

                  if (publisher instanceof OptimizableOperator) {

                          OptimizableOperator operator = (OptimizableOperator) publisher;

                          while (true) {

                                   subscriber = operator.subscribeOrReturn(subscriber);

                                   if (subscriber == null) {

                                            return;

                                   }

                                   OptimizableOperator newSource = operator.nextOptimizableSource();

                                   if (newSource == null) {

                                            publisher = operator.source();

                                            break;

                                   }

                                   operator = newSource;

                          }

                  }

             //直到最底层的CorePublisher,使用最终转换所得的subscriber进行订阅,

             //原始序列产生的序号,将在一些列增强subscriber的增强下,或丢弃、或加工后传给

             //实际的订阅者

                  publisher.subscribe(subscriber);

}

Mono的简化使用

       Mono 提供了一些方法简化Mono的订阅操作,如block() 阻塞当前线程知道Mono序列返回元素或完成/异常信号

PublishOn和SubscribeOn

       publishOn 和 SubscribeOn 传入Scheduler对象,将Mono的行为交由Scheduler的现成执行。其中publishOn调用之后的序列行为在新的执行线程执行,而SubscribeOn则是整个序列的执行都在新的现成中执行。

final Flux<String> flux = Flux

    .range(1, 2)

    .map(i -> 10 + i) 

    .publishOn(s) 

.map(i -> "value " + i);

flux.subscribe(System.out::println)

final Flux<String> flux = Flux

    .range(1, 2)

    .map(i -> 10 + i) 

    .subscribeOn(s) 

    .map(i -> "value " + i);

flux.subscribe(System.out::println)

总结

       本文对Reactor的Mono编程进行了初步的介绍,体现了响应式编程的核心在于异步序列的构建(Mono/Flux)和订阅使用。 其中构建时对Mono/Flux的装配(Assembly)是整个编程模型的核心。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/120224.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

麒麟信安获批牵头成立国家关键领域信创行业产教融合共同体

日前&#xff0c;由麒麟信安、长沙理工大学、长沙职业技术学院联合牵头成立的国家关键领域信创行业产教融合共同体&#xff08;以下简称&#xff1a;共同体&#xff09;已获湖南省教育厅批准&#xff0c;并推荐至教育部。 目前共同体已吸引10余家联盟单位及全国20余家企业、高…

C++(Qt)软件调试---下载和安装最新版Windbg(16)

C(Qt)软件调试—下载和安装最新版Windbg&#xff08;16&#xff09; 文章目录 C(Qt)软件调试---下载和安装最新版Windbg&#xff08;16&#xff09;1、前言2、在线安装1.1 安装方法一1.2 安装方法二 3、离线安装 1、前言 Windbg是微软开发的一款强大的调试工具&#xff0c;它主…

公开IP属地信息如何保护用户的隐私?

公开IP属地信息通常涉及与用户或组织的隐私有关&#xff0c;因此在公开此类信息时需要非常小心&#xff0c;以避免侵犯他人的隐私权。以下是触碰底线的几种情况以及如何保护网络安全和用户隐私&#xff1a; 个人隐私保护&#xff1a; 公开IP属地信息可能泄露用户的物理位置&…

【Excel】如何画不同时序交叉的百分比堆积柱状图

这里写自定义目录标题 1 将两表交叉合并为一个表1.1 步骤一&#xff1a;在两独立表的工作天数和工资列下面按1-n顺次标号。1.2 步骤二&#xff1a;选中两表需要合并的部分&#xff0c;调出自定义排序1.3 步骤三&#xff1a;选项 ——> 按行排序 &#xff08;选完后点确定&am…

大数据之LibrA数据库系统告警处理(ALM-12030 无合法license存在)

告警解释 系统在安装集群后和每天零点检查当前系统中是否存在合法的license文件&#xff0c;如果没有则产生该告警。 导入合法license文件时&#xff0c;告警恢复。 说明&#xff1a; 如果当前集群使用节点数小于等于10节点&#xff08;不包含管理节点&#xff09;&#xf…

积木报表操作使用记录

积木报表&#xff1a;JimuReport报表,像搭建积木一样在线设计报表,类似于excel操作风格,通过拖拽完成报表设计!功能涵盖打印设计、报表设计、图形设计、大屏设计等 集成使用方式&#xff1a; 引入依赖&#xff1a;版本可以去maven下载最新版本 org.jeecgframework.jimureport …

网络带宽基础知识简单介绍

网络带宽基础知识简单介绍 前言一、网络带宽是什么&#xff1f;二、影响网络带宽的因素三、网络带宽的单位总结 前言 最近一些需求涉及到了网络带宽&#xff0c;整理后有了本文 一、网络带宽是什么&#xff1f; 网络带宽是指在单位时间内&#xff08;一般指的是1秒钟&#xf…

【vue实战项目】通用管理系统:登录页

目录 1.前言 2.表单 3.容器 4.路由 5.校验 6.请求后端接口 1.前言 本文是博主vue实战项目系列文章的第一篇&#xff0c;本系列将清晰的从搭建环境开始一步步开发一个vue的通用管理系统&#xff0c;项目规模不大&#xff0c;较为小巧&#xff0c;但是覆盖了目前常用的前端…

Intel oneAPI笔记(3)--jupyter官方文档(SYCL Program Structure)学习笔记

前言 本文是对jupyterlab中oneAPI_Essentials/02_SYCL_Program_Structure文档的学习记录&#xff0c;包含对Device Selector、Data Parallel Kernel、Host Accessor、Buffer Destruction、的介绍&#xff0c;最后还有一个小关于向量&#xff08;Vector&#xff09;加法的实例 …

使用Python爬虫被封ip的解决方案

在使用 Python 程序进行网络爬虫开发时&#xff0c;可能会因为下面原因导致被封IP或封禁爬虫程序&#xff1a; 1、频繁访问网站 爬虫程序可能会在很短的时间内访问网站很多次&#xff0c;从而对目标网站造成较大的负担和压力&#xff0c;这种行为容易引起目标网站的注意并被封…

DDD技术方案落地实践

1. 引言 从接触领域驱动设计的初学阶段&#xff0c;到实现一个旧系统改造到DDD模型&#xff0c;再到按DDD规范落地的3个的项目。对于领域驱动模型设计研发&#xff0c;从开始的各种疑惑到吸收各种先进的理念&#xff0c;目前在技术实施这一块已经基本比较成熟。在既往经验中总结…

使用JavaScript编写的爬虫程序

这是一个使用JavaScript编写的爬虫程序&#xff0c;它使用了Elasticsearch和Nginx来收集和存储数据。在这个程序中&#xff0c;我们首先设置了代理信息&#xff0c;然后使用JavaScript编写了一个爬虫程序来收集数据。以下是每行代码和步骤的详细解释&#xff1a; // 定义代理信…

华为fusionInsigtht集群es连接工具

华为fusionInsight为用户提供海量数据的管理及分析功能&#xff0c;快速从结构化和非结构化的海量数据中挖掘您所需要的价值数据。开源组件结构复杂&#xff0c;安装、配置、管理过程费时费力&#xff0c;使用华为FusionInsight Manager将为您提供企业级的集群的统一管理平台,在…

漏刻有时百度地图API实战开发(1)华为手机无法使用addEventListener click 的兼容解决方案

现象 漏刻有时项目开发中的调用了百度地图API&#xff0c;在PC端、IOS和安卓机型测试都没有问题。但是使用华为手机部分型号时&#xff0c;前端在监听点击事件的时候是使用 map.addEventListener(click,function(){...})&#xff0c;无法触发。或 原理 通过监听touchstart和…

Android 常用 UI 组件

目录 ​编辑 1. View 和ViewGroup 2. Android UI 开发概述 2.1 界面布局开发 2.2 控件开发 2.3 AdapterView 与 Adapter 开发 2.4 UI 组件开发 2.5 自定义 View、图形图像和动画 1. View 和ViewGroup Android中所有的UI元素都是使用View和ViewGroup对象建立的,…

java split字符串作业

建立一个字符串操作类&#xff08;StringDemo&#xff09;&#xff0c;在main函数中做一下操作&#xff1a; 1、定义字符串变量String s1”I am a good student”; String s2”I am a good worker”; String s3”3,6,12,20”; 2、用compareTo方法比较s1和s2的大小&#xff0c;并…

Vue - Syntax Error: TypeError: this.getOptions is not a function 项目运行时报错,详细解决方案

报错问题 关于此问题网上的教程都无法解决,如果您的报错与本文相似,本文即可 100% 完美解决。 在 vue2.js 项目中,执行 npm run serve 运行时出现如下报错信息, Syntax Error: TypeError: this.getOptions is not a function 解决方案 按照以下步骤,即可完美解决。 这个错…

CRM中的销售机会管理是什么?三个步骤帮你创建销售渠道

企业销售业务中&#xff0c;有个名词叫做“机会管理”&#xff0c;有效的机会管理可以帮助销售人员准确地抓住潜在客户群体&#xff0c;并将其转化为真正的客户、持续带来收入。CRM客户管理系统也是销售机会管理的一个重要工具&#xff0c;帮助销售人员与正确的人建立起关系&am…

python 之 集合的相关知识

文章目录 1. 创建集合使用花括号 {}使用 set() 函数 2. 集合的特点3. 集合操作添加元素删除元素 4. 集合运算5. 不可变集合总结 在 Python 中&#xff0c;集合&#xff08;Set&#xff09;是一种无序且不重复的数据集合。它是由一组唯一元素组成的。下面是关于集合的一些基本知…

(后续补充)vue+express、gitee pm2部署轻量服务器

首先 防火墙全部关闭算了 首先 防火墙全部关闭算了 首先 防火墙全部关闭算了 首先 防火墙全部关闭算了 首先 防火墙全部关闭算了 首先 防火墙全部关闭算了 关闭防火墙 systemctl stop firewalld 重新载入防火墙使设置生效 firewall-cmd --reload 后端的 pm2.config.cjs …