响应式编程详解,带你熟悉Reactor响应式编程

文章目录

  • 一、什么是响应式编程
    • 1、Java的流和响应式流
    • 2、Java中响应式的使用
    • 3、Reactor中响应式流的基本接口
    • 4、Reactor中响应式接口的基本使用
  • 二、初始Reactor
    • 1、Flux和Mono的基本介绍
    • 2、引入Reactor依赖
    • 3、响应式类型的创建
    • 4、响应式类型的组合
      • (1)使用mergeWith合并响应式流
      • (2)使用zip压缩合并响应式流
      • (3)使用zip压缩合并为自定义对象的响应式流
      • (4)选择第⼀个反应式类型进⾏发布
    • 5、转换和过滤反应式流
      • (1)skip操作跳过指定数⽬的消息
      • (2)skip()操作的另⼀种形式
      • (3)take操作只发布第⼀批指定数量的数据项
      • (4)take操作的另一种形式
      • (5)filter操作自定义过滤条件
      • (6)distinct操作去重
      • (7)map操作映射新元素
      • (8)flatMap将流转成新的流
      • (9)buffer操作现将数据流拆分为小块
      • (10)collectList操作也可以将所有数据收集到一个List
      • (11)collectMap 操作产生⼀个发布Map的Mono
    • 6、在反应式类型上执行逻辑操作
      • (1)⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件
      • (2)⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件
    • 7、在反应式类型上使用Subscriber订阅
      • (1)使用Subscriber消费消息
      • (2)使用Flux的doOnNext处理数据
    • 8、使用then来处理完成数据返回
  • 写在后面

一、什么是响应式编程

响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

在开发应⽤程序代码时,我们可以编写两种⻛格的代码,即命令式和响应式。

命令式(Imperative)的代码:它由⼀组任务组成,每次只运⾏⼀项任务,每项任务⼜都依赖于前⾯的任务。数据会按批次进⾏处理,在前⼀项任务还没有完成对当前数据批次的处理时,不能将这些数据递交给下⼀项处理任务。

响应式(Reactive)的代码:它定义了⼀组⽤来处理数据的任务,但是这些任务可以并⾏地执⾏。每项任务处理数据的⼀部分⼦集,并将结果交给处理流程中的下⼀项任务,同时继续处理数据的另⼀部分⼦集。

Reactor 是⼀个响应式编程库,同时也是Spring家族的⼀部分。它是Spring 5反应式编程功能的基础。

1、Java的流和响应式流

Java的Stream流通常都是同步的,并且只能处理有限的数据集。从本质上来说,它们只是使⽤函数来对集合进⾏迭代的⼀种⽅式。

响应式流⽀持异步处理任意⼤⼩的数据集,同样也包括⽆限数据集。只要数据就绪,它们就能实时地处理数据,并且能够通过回压来避免压垮数据的消费者。

2、Java中响应式的使用

JDK1.8时,是基于Observer/Observable接口而实现的观察者模式:

ObserverDemo observer = new ObserverDemo();
// 添加观察者
observer.addObserver(new Observer() {
    @Override
    public void update(Observable o, Object arg) {
        System.out.println("发生了变化");
    }
});
observer.addObserver(new Observer() {
    @Override
    public void update(Observable o, Object arg) {
        System.out.println("收到了通知");
    }
});
observer.setChanged(); // 数据变化
observer.notifyObservers(); // 通知

JDK9及以后,Observer/Observable接口就被弃用了,取而代之的是Flow类:

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
 
public class FlowDemo {
 
    public static void main(String[] args) throws Exception {
        // 1. 定义发布者, 发布的数据类型是 Integer
        // 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
 
        // 2. 定义订阅者
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
 
            private Subscription subscription;
 
            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;
 
                // 请求一个数据
                this.subscription.request(1);
            }
 
            @Override
            public void onNext(Integer item) {
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);
 
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                // 处理完调用request再请求一个数据
                this.subscription.request(1);
 
                // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            }
 
            @Override
            public void onError(Throwable throwable) {
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();
 
 
                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            }
 
            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }
 
        };
 
        // 3. 发布者和订阅者 建立订阅关系
        publiser.subscribe(subscriber);
 
        // 4. 生产数据, 并发布
        // 这里忽略数据生产过程
        for (int i = 0; i < 1000; i++) {
            System.out.println("生成数据:" + i);
            // submit是个block方法
            publiser.submit(i);
        }
 
        // 5. 结束后 关闭发布者
        // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
        publiser.close();
 
        // 主线程延迟停止, 否则数据没有消费就退出
        Thread.currentThread().join(1000);
        // debug的时候, 下面这行需要有断点
        // 否则主线程结束无法debug
        System.out.println();
    }
 
}
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
 
/**
* 带 process 的 flow demo
*/
 
/**
* Processor, 需要继承SubmissionPublisher并实现Processor接口
*
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
*/
class MyProcessor extends SubmissionPublisher<String>
        implements Processor<Integer, String> {
 
    private Subscription subscription;
 
    @Override
    public void onSubscribe(Subscription subscription) {
        // 保存订阅关系, 需要用它来给发布者响应
        this.subscription = subscription;
 
        // 请求一个数据
        this.subscription.request(1);
    }
 
    @Override
    public void onNext(Integer item) {
        // 接受到一个数据, 处理
        System.out.println("处理器接受到数据: " + item);
 
        // 过滤掉小于0的, 然后发布出去
        if (item > 0) {
            this.submit("转换后的数据:" + item);
        }
 
        // 处理完调用request再请求一个数据
        this.subscription.request(1);
 
        // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
        // this.subscription.cancel();
    }
 
    @Override
    public void onError(Throwable throwable) {
        // 出现了异常(例如处理数据的时候产生了异常)
        throwable.printStackTrace();
 
 
        // 我们可以告诉发布者, 后面不接受数据了
        this.subscription.cancel();
    }
 
    @Override
    public void onComplete() {
        // 全部数据处理完了(发布者关闭了)
        System.out.println("处理器处理完了!");
        // 关闭发布者
        this.close();
    }
}
 
public class FlowDemo2 {
 
    public static void main(String[] args) throws Exception {
        // 1. 定义发布者, 发布的数据类型是 Integer
        // 直接使用jdk自带的SubmissionPublisher
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
 
        // 2. 定义处理器, 对数据进行过滤, 并转换为String类型
        MyProcessor processor = new MyProcessor();
 
        // 3. 发布者 和 处理器 建立订阅关系
        publiser.subscribe(processor);
 
        // 4. 定义最终订阅者, 消费 String 类型数据
        Subscriber<String> subscriber = new Subscriber<String>() {
 
            private Subscription subscription;
 
            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;
 
                // 请求一个数据
                this.subscription.request(1);
            }
 
            @Override
            public void onNext(String item) {
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);
 
                // 处理完调用request再请求一个数据
                this.subscription.request(1);
 
                // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            }
 
            @Override
            public void onError(Throwable throwable) {
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();
 
 
                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            }
 
            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }
 
        };
 
        // 5. 处理器 和 最终订阅者 建立订阅关系
        processor.subscribe(subscriber);
 
        // 6. 生产数据, 并发布
        // 这里忽略数据生产过程
        publiser.submit(-111);
        publiser.submit(111);
 
        // 7. 结束后 关闭发布者
        // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
        publiser.close();
 
        // 主线程延迟停止, 否则数据没有消费就退出
        Thread.currentThread().join(1000);
    }
 
}

3、Reactor中响应式流的基本接口

响应式流规范可以总结为4个接⼝:Publisher、Subscriber、Subscription和Processor。

Publisher负责⽣成数据,并将数据发送给 Subscription(每个Subscriber对应⼀个Subscription)。

public interface Publisher<T> {
    // Publisher接⼝声明了⼀个⽅法 subscribe(),Subscriber可以通过该⽅法向 Publisher发起订阅。
    public void subscribe(Subscriber<? super T> s);
}

⼀旦Subscriber订阅成功,就可以接收来⾃Publisher的事件。

public interface Subscriber<T> {
    // Subscriber的第⼀个事件是通过对 onSubscribe()⽅法的调⽤接收的。
    public void onSubscribe(Subscription s);
    // 每个数据项都会通过该方法处理
    public void onNext(T t);
    // 异常处理
    public void onError(Throwable t);
    // 结束
    public void onComplete();
}

Publisher调⽤ onSubscribe() ⽅法时,会将Subscription对象传递给 Subscriber。

通过Subscription,Subscriber可以管理其订阅情况:

public interface Subscription {
    // Subscriber可以通过调⽤ request()⽅法来请求 Publisher 发送数据,可以传⼊⼀个long类型的数值以表明它愿意接受多少数据
    // 这也是回压能够发挥作⽤的地⽅,以避免Publisher 发送多于 Subscriber能够处理的数据量
    public void request(long n);
    // 调⽤ cancel()⽅法表明它不再对数据感兴趣并且取消订阅
    public void cancel();
}

Subscriber 请求数据之后,数据就会开始流经响应式流,调用onNext方法。

Processor接⼝,它是Subscriber和Publisher的组合:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

4、Reactor中响应式接口的基本使用

import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
 
public class ReactorDemo {
 
   public static void main(String[] args) {
      // reactor = jdk8 stream + jdk9 reactive stream
      // Mono 0-1个元素
      // Flux 0-N个元素
      String[] strs = { "1", "2", "3" };
 
      // 2. 定义订阅者
      Subscriber<Integer> subscriber = new Subscriber<Integer>() {
 
         private Subscription subscription;
 
         @Override
         public void onSubscribe(Subscription subscription) {
            // 保存订阅关系, 需要用它来给发布者响应
            this.subscription = subscription;
 
            // 请求一个数据
            this.subscription.request(1);
         }
 
         @Override
         public void onNext(Integer item) {
            // 接受到一个数据, 处理
            System.out.println("接受到数据: " + item);
 
            try {
               TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
            
            // 处理完调用request再请求一个数据
            this.subscription.request(1);
 
            // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
            // this.subscription.cancel();
         }
 
         @Override
         public void onError(Throwable throwable) {
            // 出现了异常(例如处理数据的时候产生了异常)
            throwable.printStackTrace();
 
 
            // 我们可以告诉发布者, 后面不接受数据了
            this.subscription.cancel();
         }
 
         @Override
         public void onComplete() {
            // 全部数据处理完了(发布者关闭了)
            System.out.println("处理完了!");
         }
 
      };
      
      // 这里就是jdk8的stream
      Flux.fromArray(strs).map(s -> Integer.parseInt(s))
      // 最终操作
      // 这里就是jdk9的reactive stream
      .subscribe(subscriber);
 
   }
}

二、初始Reactor

1、Flux和Mono的基本介绍

Reactor中有两个核心类,Mono和Flux。Flux和Mono是Reactor提供的最基础的构建块,⽽这两种响应式类型所提供的操作符则是组合使⽤它们以构建数据流动管线的黏合剂。

这两个类实现接口Publisher,提供丰富操作符。Flux对象实现发布者,返回N个元素Mono实现发布者,返回0或者1个元素。

Flux和Mono都是数据流的发布者,使用Flux和Mono都可以发出三种数据信号:元素值、错误信号、完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。

Flux和Mono共有500多个操作,这些操作都可以⼤致归类为:创建操作;组合操作;转换操作;逻辑操作。

注意!Mono和Flux的很多操作是相同的,只不过对应的数据数量不同,所以本文更多的操作都是基于Flux的,Mono也同理。

在这里插入图片描述
在这里插入图片描述

2、引入Reactor依赖

需要引入reactor-core核心包和测试包。

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.x.x</version>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.x.x</version>
    <scope>test</scope>
</dependency>

3、响应式类型的创建

Reactor提供了多种创建Flux和Mono的操作。

// 使⽤Flux或Mono上的静态 just()⽅法来创建⼀个响应式类型
Mono.just(1);
Flux<String> fruitFlux = Flux
        .just("Apple", "Orange", "Grape", "Banana", "Strawberry");
// 调用just或其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的。
// 添加一个订阅者,subscribe的方法参数相当于是一个Consumer
fruitFlux.subscribe(
        f -> System.out.println("Here's some fruit: " + f)
);

// 根据集合创建
String[] fruits = new String[] {
        "Apple", "Orange", "Grape", "Banana", "Strawberry" };
Flux<String> fruitFlux2 = Flux.fromArray(fruits);
List<String> list = Arrays.asList(fruits);
Flux.fromIterable(list); // 集合

Stream<String> stream = list.stream();
Flux.fromStream(stream); // stream流

// 根据区间创建1-5
Flux<Integer> intervalFlux =
        Flux.range(1, 5);
intervalFlux.subscribe(
        f -> System.out.println("data is :" + f)
);
// 每秒发布⼀个值的Flux,通过interval()⽅法创建的Flux会从0开始发布值,并且后续的条⽬依次递增。
// 因为interval()⽅法没有指定最⼤值,所以它可能会永远运⾏。我们也可以使⽤take()⽅法将结果限制为前5个条⽬。
Flux<Long> intervalFlux2 =
        Flux.interval(Duration.ofSeconds(1))
                .take(5);
intervalFlux2.subscribe(
        f -> System.out.println("data2 is :" + f)
);

// 阻塞,等待结果
Thread.sleep(100000);

4、响应式类型的组合

(1)使用mergeWith合并响应式流

在这里插入图片描述

Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa")
        .delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据

Flux<String> foodFlux = Flux
        .just("Lasagna", "Lollipops", "Apples")
        .delaySubscription(Duration.ofMillis(250)) // 订阅后250毫秒后开始发布数据
        .delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据

// 使⽤mergeWith()⽅法,将两个Flux合并,合并过后的Flux数据项发布顺序与源Flux的发布时间⼀致
// Garfield Lasagna Kojak Lollipops Barbossa Apples
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);

mergedFlux.subscribe(System.out::println);

// 阻塞,等待结果
Thread.sleep(100000);

我们发现,使用mergeWith合并过的两个FLux,并没有严格意义上的先后之分,谁产生了数据就接着消费,与同一个无异。

(2)使用zip压缩合并响应式流

在这里插入图片描述

Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
        .just("Lasagna", "Lollipops", "Apples");
// 当两个Flux对象压缩在⼀起的时候,它将会产⽣⼀个新的发布元组的Flux,其中每个元组中都包含了来⾃每个源Flux的数据项
// 这个合并后的Flux发出的每个条⽬都是⼀个Tuple2(⼀个容纳两个其他对象的容器对象)的实例,其中包含了来⾃每个源Flux的数据项,并保持着它们发布的顺序。
Flux<Tuple2<String, String>> zippedFlux =
        Flux.zip(characterFlux, foodFlux);

zippedFlux.subscribe(t -> {
    System.out.println(t.getT1() + "|" + t.getT2());
});
/**
 * 执行结果:
 * Garfield|Lasagna
 * Kojak|Lollipops
 * Barbossa|Apples
 */

(3)使用zip压缩合并为自定义对象的响应式流

如果你不想使⽤Tuple2,⽽想要使⽤其他类型,就可以为zip()⽅法提供⼀个合并函数来⽣成你想要的任何对象,合并函数会传⼊这两个数据项。
在这里插入图片描述
zip操作的另⼀种形式(从每个传⼊Flux中各取⼀个元素,然后创建消息对象,并产⽣这些消息组成的Flux)

Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
        .just("Lasagna", "Lollipops", "Apples");

// 压缩成自定义对象
Flux<String> zippedFlux =
        Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
zippedFlux.subscribe(System.out:: println);

/**
 * 执行结果:
 * Garfield eats Lasagna
 * Kojak eats Lollipops
 * Barbossa eats Apples
 */

(4)选择第⼀个反应式类型进⾏发布

假设我们有两个Flux对象,此时我们不想将它们合并在⼀起,⽽是想要创建⼀个新的Flux,让这个新的Flux从第⼀个产⽣值的Flux中发布值。first()操作会在两个Flux对象中选择第⼀个发布值的Flux,并再次发布它的值。
在这里插入图片描述

Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
        .delaySubscription(Duration.ofMillis(100)); // 延迟100ms
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
// 选择第⼀个反应式类型进⾏发布
Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
firstFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/**
 * 执行结果:
 * hare
 * cheetah
 * squirrel
 */

5、转换和过滤反应式流

在数据流经⼀个流时,我们通常需要过滤掉某些值并对其他的值进⾏处理。

(1)skip操作跳过指定数⽬的消息

skip操作跳过指定数⽬的消息并将剩下的消息继续在结果Flux上进⾏传递
在这里插入图片描述

// 跳过3个,并创建一个新的Flux
Flux<String> skipFlux = Flux.just(
                "one", "two", "skip a few", "ninety nine", "one hundred")
        .skip(3);
skipFlux.subscribe(System.out::println);
/**
 * 执行结果
 * ninety nine
 * one hundred
 */

(2)skip()操作的另⼀种形式

在⼀段时间之内跳过所有的第⼀批数据。
在这里插入图片描述

// 这是skip()操作的另⼀种形式,将会产⽣⼀个新Flux,在发布来⾃源Flux的数据项之前等待指定的⼀段时间
Flux<String> skipFlux = Flux.just(
                "one", "two", "skip a few", "ninety nine", "one hundred")
        .delayElements(Duration.ofSeconds(1)) // 每1秒一个
        .skip(Duration.ofSeconds(4)); // 4秒前的都跳过
skipFlux.subscribe(System.out::println);

// 阻塞,等待结果
Thread.sleep(100000);

/**
 * 执行结果:
 * ninety nine
 * one hundred
 */

(3)take操作只发布第⼀批指定数量的数据项

根据对skip操作的描述来看,take可以认为是与skip相反的操作。skip操作会跳过前⾯⼏个数据项,⽽take操作只发布第⼀批指定数量的数据项,然后将取消订阅。
在这里插入图片描述

// take操作只发布传⼊Flux中前⾯指定数⽬的数据项,然后将取消订阅
Flux<String> nationalParkFlux = Flux.just(
                "Yellowstone", "Yosemite", "Grand Canyon",
                "Zion", "Grand Teton")
        .take(3);
nationalParkFlux.subscribe(System.out::println);
/**
 * 执行结果:
 * Yellowstone
 * Yosemite
 * Grand Canyon
 */

(4)take操作的另一种形式

take()⽅法也有另⼀种替代形式,基于间隔时间⽽不是数据项个数(在指定的时间过期之前,⼀直将消息传递给结果Flux)。它将接受并发布与源Flux⼀样多的数据项,直到某段时间结束,之后Flux将会完成。
在这里插入图片描述

// 在订阅之后的前3.5秒发布数据条⽬。
Flux<String> nationalParkFlux = Flux.just(
                "Yellowstone", "Yosemite", "Grand Canyon",
                "Zion", "Grand Teton")
        .delayElements(Duration.ofSeconds(1))
        .take(Duration.ofMillis(3500));
nationalParkFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/**
 * 执行结果:
 * Yellowstone
 * Yosemite
 * Grand Canyon
 */

(5)filter操作自定义过滤条件

filter操作允许我们根据任何条件进⾏选择性地发布。
在这里插入图片描述

Flux<String> nationalParkFlux = Flux.just(
                "Yellowstone", "Yosemite", "Grand Canyon",
                "Zion", "Grand Teton")
        .filter(np -> !np.contains(" ")); // 过滤携带空格的
nationalParkFlux.subscribe(System.out::println);
/**
 * 执行结果
 * Yellowstone
 * Yosemite
 * Zion
 */

(6)distinct操作去重

在这里插入图片描述

Flux<String> animalFlux = Flux.just(
                "dog", "cat", "bird", "dog", "bird", "anteater")
        .distinct();
// 去重
animalFlux.subscribe(System.out::println);
/**
 * 执行结果:
 * dog
 * cat
 * bird
 * anteater
 */

(7)map操作映射新元素

map将元素映射为新的元素,并创建一个新的Flux。
在这里插入图片描述

// map将元素映射为新的元素,并创建一个新的Flux
Flux<Integer> integerFlux = Flux
        .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
        .map(n -> {
            String[] split = n.split("\\s");
            return split.length; // 将String转为Integer
        });
integerFlux.subscribe(System.out::println);

/**
 * 执行结果:
 * 2
 * 2
 * 2
 */

其中重要的⼀点是:在每个数据项被源Flux发布时,map操作是同步执⾏的,如果你想要异步地转换过程,那么你应该考虑使⽤flatMap操作。

(8)flatMap将流转成新的流

flatMap并不像map操作那样简单地将⼀个对象转换到另⼀个对象,⽽是将对象转换为新的Mono或Flux。结果形成的Mono或Flux会扁平化为新的Flux。当与subscribeOn()⽅法结合使⽤时,flatMap操作可以释放Reactor反应式的异步能⼒。
在这里插入图片描述

// 使⽤flatMap()⽅法和subscribeOn()⽅法
Flux<Integer> integerFlux = Flux
        .just("Michael", "Scottie Pippen", "Steve Kerr Ob")
        .flatMap(n -> Mono.just(n)
                .map(p -> {
                    String[] split = p.split("\\s");
                    return split.length; // 将String转为Integer
                })
                .subscribeOn(Schedulers.parallel()) // 定义异步
        );
integerFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);

在这里插入图片描述

(9)buffer操作现将数据流拆分为小块

buffer操作会产⽣⼀个新的包含列表Flux(具备最⼤⻓度限制的列表,包含从传⼊的Flux中收集来的数据)
在这里插入图片描述

// buffer操作会产⽣⼀个新的包含列表Flux(具备最⼤⻓度限制的列表,包含从传⼊的Flux中收集来的数据)
Flux<String> fruitFlux = Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry");
// 创建⼀个新的包含List 集合的Flux,其中每个List只有不超过指定数量的元素
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3); // 数据切分为小块,每3个一块
bufferedFlux.subscribe(System.out::println);
/**
 * 执行结果:
 * [apple, orange, banana]
 * [kiwi, strawberry]
 */
// 可以分片后并行执行
bufferedFlux.flatMap(x ->
        Flux.fromIterable(x)
                .map(y -> y.toUpperCase())
                .subscribeOn(Schedulers.parallel())
).subscribe(l -> {
    System.out.println(Thread.currentThread().getName() + "线程执行:" + l);
});
/**
 * 执行结果(因为并行执行,结果可能不一致):
 * parallel-1线程执行:APPLE
 * parallel-1线程执行:ORANGE
 * parallel-1线程执行:BANANA
 * parallel-2线程执行:KIWI
 * parallel-2线程执行:STRAWBERRY
 */
// 阻塞,等待结果
Thread.sleep(100000);

使⽤不带参数的buffer()⽅法可以将Flux发布的所有数据项都收集到⼀个List中:
在这里插入图片描述

Flux<List<String>> bufferedFlux = fruitFlux.buffer();

(10)collectList操作也可以将所有数据收集到一个List

collectList操作将产⽣⼀个包含传⼊Flux发布的所有消息的Mono。
在这里插入图片描述

Flux<String> fruitFlux = Flux.just(
                "apple", "orange", "banana", "kiwi", "strawberry");
// 生成一个Mono,里面包含一个List
Mono<List<String>> fruitListMono = fruitFlux.collectList();

(11)collectMap 操作产生⼀个发布Map的Mono

collectMap操作将会产⽣⼀个Mono(包含了由传⼊Flux所发出的消息产⽣的Map,这个Map的key是从传⼊消息的某些特征衍⽣⽽来的)
在这里插入图片描述

Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> animalMapMono =
        animalFlux.collectMap(a -> a.charAt(0)); // 将第一个字符作为Map的key
animalMapMono.subscribe(System.out::println);
/**
 * 执行结果:
 * {a=aardvark, e=eagle, k=kangaroo}
 */

// 阻塞,等待结果
Thread.sleep(100000);

key相同的,会被覆盖。

6、在反应式类型上执行逻辑操作

(1)⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件

在这里插入图片描述

Flux<String> animalFlux = Flux.just(
	 "aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));

都满足条件会返回true,否则返回false。

(2)⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件

在这里插入图片描述

Flux<String> animalFlux = Flux.just(
	 "aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("t"));

至少有一个满足条件,就为true,都不满足就为false。

7、在反应式类型上使用Subscriber订阅

(1)使用Subscriber消费消息

Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");

stringFlux.subscribe(new Subscriber<String>() {
    // 保存订阅关系, 需要用它来给发布者响应
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        System.out.println("订阅者开始订阅");
        this.subscription = subscription;
        // 请求一个数据
        this.subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.println("订阅者开始处理数据" + item);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 处理完调用request再请求一个数据
        this.subscription.request(1);
        // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable t) {
        // 出现了异常(例如处理数据的时候产生了异常)
        t.printStackTrace();
        // 我们可以告诉发布者, 后面不接受数据了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 全部数据处理完了(发布者关闭了)
        System.out.println("订阅者处理完了!");
    }
});
/**
 * 执行结果:
 * 订阅者开始订阅
 * 订阅者开始处理数据Apple
 * 订阅者开始处理数据Orange
 * 订阅者开始处理数据Grape
 * 订阅者开始处理数据Banana
 * 订阅者开始处理数据Strawberry
 * 订阅者处理完了!
 */

// 阻塞
Thread.sleep(10000);

(2)使用Flux的doOnNext处理数据

Flux的doOnNext,会添加当Flux发出一个项目时触发的行为(副作用)。

Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者处理数据:" + t))
        .subscribe(t -> System.out.println("订阅者处理数据:" + t));
/**
 * 执行结果:
 * 发布者处理数据:Apple
 * 订阅者处理数据:Apple
 * 发布者处理数据:Orange
 * 订阅者处理数据:Orange
 * 发布者处理数据:Grape
 * 订阅者处理数据:Grape
 * 发布者处理数据:Banana
 * 订阅者处理数据:Banana
 * 发布者处理数据:Strawberry
 * 订阅者处理数据:Strawberry
 */

// 阻塞
Thread.sleep(10000);

但是!以下写法是不会触发发布者的doOnNext事件的:

Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者处理数据:" + t));
stringFlux.subscribe(t -> System.out.println("订阅者处理数据:" + t));

只有链式调用,才会触发发布者的doOnNext事件。

doOnNext可以写多个,顺序执行:

Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者1处理数据:" + t))
        .doOnNext(t -> System.out.println("发布者2处理数据:" + t))
        .subscribe(t -> System.out.println("订阅者处理数据:" + t));
/**
 * 执行结果:
 * 发布者1处理数据:Apple
 * 发布者2处理数据:Apple
 * 订阅者处理数据:Apple
 * 发布者1处理数据:Orange
 * 发布者2处理数据:Orange
 * 订阅者处理数据:Orange
 * 发布者1处理数据:Grape
 * 发布者2处理数据:Grape
 * 订阅者处理数据:Grape
 * 发布者1处理数据:Banana
 * 发布者2处理数据:Banana
 * 订阅者处理数据:Banana
 * 发布者1处理数据:Strawberry
 * 发布者2处理数据:Strawberry
 * 订阅者处理数据:Strawberry
 */

8、使用then来处理完成数据返回

Flux<String> just = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
// 返回一个Mono ,在此Flux完成时完成。这将主动忽略序列,只重放完成或错误信号。
just.doOnNext(t -> System.out.println("发布者处理数据:" + t))
    .then(Mono.defer(() -> {
        return Mono.just("我完成了");
    }))
    .subscribe(t -> System.out.println("订阅者处理数据:" + t));
/**
 * 执行结果:
 * 发布者处理数据:Apple
 * 发布者处理数据:Orange
 * 发布者处理数据:Grape
 * 发布者处理数据:Banana
 * 发布者处理数据:Strawberry
 * 订阅者处理数据:我完成了
 */

通常来说,发布者发布完之后,都需要调用then来处理数据,或调用thenEmpty返回一个空的Mono(Mono.empty())。

写在后面

如果本文对你有帮助,请点赞收藏关注一下吧 ~
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/735.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C语言蓝桥杯每日一题】——数字三角形

【C语言蓝桥杯每日一题】—— 数字三角形&#x1f60e;前言&#x1f64c;数字三角形&#x1f64c;总结撒花&#x1f49e;&#x1f60e;博客昵称&#xff1a;博客小梦 &#x1f60a;最喜欢的座右铭&#xff1a;全神贯注的上吧&#xff01;&#xff01;&#xff01; &#x1f60a…

QEMU启动ARM32 Linux内核

目录前言前置知识ARM Versatile Express开发板简介ARM处理器家族简介安装qemu-system-arm安装交叉编译工具交叉编译ARM32 Linux内核交叉编译ARM32 Busybox使用busybox制作initramfs使用QEMU启动ARM32 Linux内核模拟vexpress-a9开发板模拟vexpress-a15开发板参考前言 本文介绍采…

编译原理

文章目录绪论第1章 绪论1.什么是编译2.编译系统的结构3.词法分析第2章 语言及其文法字母表 ∑\sum∑概念终结符非终结符产生式文法Chomsky文法分类体系0型文法 &#xff08;Type-0 Grammar&#xff09;1型文法&#xff08;Type-1 Grammar&#xff09;2型文法&#xff08;Type-2…

JAVA开发与JAVA(一文学会使用ElasticSearch)

在web网站的架设中特别是数据量大的网站或者APP小程序需要搜索或者全文检索的场景&#xff0c;几乎都需要借助ElasticSearch来作为全文检索引擎&#xff0c;以提高网站的搜索效率和性能。 这一节&#xff0c;我们通过一篇文章介绍&#xff0c;使大家通过一文就学会使用Elastic…

python 函数:定义、调用、参数、返回值、嵌套、变量的作用域(局部变量、全局变量)、global、匿名函数lambda

函数可以将我们的程序分解成最小的模块&#xff0c;避免重复使用。函数内部的代码&#xff0c;只有被调用的时候才会执行。 函数的定义&#xff08;def就是define&#xff09;&#xff1a; 格式&#xff1a;def 函数名(): 函数封装的代码 函数的调用&#xff1a; 格式&…

大学生考研的意义?

当我拿起笔头&#xff0c;准备写这个话题时&#xff0c;心里是非常难受的&#xff0c;因为看到太多的学生在最好的年华&#xff0c;在自由的大学本应该开拓知识&#xff0c;提升认知&#xff0c;动手实践&#xff0c;不断尝试和试错&#xff0c;不断历练自己跳出学生思维圈&…

15000 字的 SQL 语句大全 第一部分

一、基础 1、说明&#xff1a;创建数据库CREATE DATABASE database-name 2、说明&#xff1a;删除数据库drop database dbname 3、说明&#xff1a;备份sql server--- 创建 备份数据的 device USE master EXEC sp_addumpdevice disk, testBack, c:\mssql7backup\MyNwind_1.dat …

数据结构--二叉树

目录1.树概念及结构1.1数的概念1.2数的表示2.二叉树概念及结构2.1二叉树的概念2.2数据结构中的二叉树2.3特殊的二叉树2.4二叉树的存储结构2.4.1顺序存储2.4.2链式存储2.5二叉树的性质3.堆的概念及结构3.1堆的实现3.1.1堆的创建3.1.2堆的插入3.1.3堆顶的删除3.1.4堆的代码实现3.…

蓝桥杯刷题冲刺 | 倒计时26天

作者&#xff1a;指针不指南吗 专栏&#xff1a;蓝桥杯倒计时冲刺 &#x1f43e;马上就要蓝桥杯了&#xff0c;最后的这几天尤为重要&#xff0c;不可懈怠哦&#x1f43e; 文章目录1.路径2.特别数的和3.MP3储存4.求和1.路径 题目 链接&#xff1a; 路径 - 蓝桥云课 (lanqiao.cn…

算法学习之二分查找

&#x1f383;个人主页&#x1f383;&#xff1a;勇敢的小牛儿 &#x1f9e8;推荐专栏&#x1f9e8;&#xff1a;C语言知识点 ✨座右铭✨&#xff1a;敢于尝试才有机会 ⚠️今日鸡汤⚠️&#xff1a;Is the true wisdom fortitude ambition. -- Napoleon 真正的才智是刚毅的志向…

【云原生·Docker】常用命令

目录 &#x1f341;1、管理命令 &#x1f341;2、帮助命令 &#x1f341;3、镜像命令 &#x1f341;4、容器命令 &#x1f342;4.1.查看容器 &#x1f342;4.2.创建容器 &#x1f342;4.3.删除容器 &#x1f342;4.4.拷贝文件 &#x1f342;4.5.查看容器IP &#x1f341;5、部署…

LSTM从入门到精通(形象的图解,详细的代码和注释,完美的数学推导过程)

先附上这篇文章的一个思维导图什么是RNN按照八股文来说&#xff1a;RNN实际上就是一个带有记忆的时间序列的预测模型RNN的细胞结构图如下&#xff1a;softmax激活函数只是我举的一个例子&#xff0c;实际上得到y<t>也可以通过其他的激活函数得到其中a<t-1>代表t-1时…

C语言/动态通讯录

本文使用了malloc、realloc、calloc等和内存开辟有关的函数。 文章目录 前言 二、头文件 三、主界面 四、通讯录功能函数 1.全代码 2.增加联系人 3.删除联系人 4.查找联系人 5.修改联系人 6.展示联系人 7.清空联系人 8.退出通讯录 总结 前言 为了使用通讯录时&#xff0c;可以…

Opencv项目实战:22 物体颜色识别并框选

目录 0、项目介绍 1、效果展示 2、项目搭建 3、项目代码展示与部分讲解 Color_trackbar.py bgr_detector.py test.py 4、项目资源 5、项目总结 0、项目介绍 本次项目要完成的是对物体颜色的识别并框选&#xff0c;有如下功能&#xff1a; &#xff08;1&#xff09;…

线程池的使用:如何写出高效的多线程程序?

目录1.线程池的使用2.编写高效的多线程程序Java提供了Executor框架来支持线程池的实现&#xff0c;通过Executor框架&#xff0c;可以快速地创建和管理线程池&#xff0c;从而更加方便地编写多线程程序。 1.线程池的使用 在使用线程池时&#xff0c;需要注意以下几点&#xff…

GDAL python教程基础篇(7)OGR空间计算

1.空间计算 地理数据处理&#xff08;geoprocessing&#xff09;计算函数&#xff1a; 多边形&#xff08;Polygon&#xff09;&#xff1a; 1、交&#xff1a;poly3.Intersection(poly2) 2、并&#xff1a;poly3.Union(poly2) 3、差&#xff1a;poly3.Difference(poly2) 4、补…

python打包成apk界面设计,python打包成安装文件

大家好&#xff0c;给大家分享一下如何将python程序打包成apk文件&#xff0c;很多人还不知道这一点。下面详细解释一下。现在让我们来看看&#xff01; 1、如何用python制作十分秒加减的apk 如何用python制作十分秒加减的apk&#xff1f;用法:. apk包放入apk文件目录,然后输入…

Linux基础命令大全(下)

♥️作者&#xff1a;小刘在C站 ♥️个人主页&#xff1a;小刘主页 ♥️每天分享云计算网络运维课堂笔记&#xff0c;努力不一定有收获&#xff0c;但一定会有收获加油&#xff01;一起努力&#xff0c;共赴美好人生&#xff01; ♥️夕阳下&#xff0c;是最美的绽放&#xff0…

走进哈希心房

目录 哈希的概念 哈希函数 哈希冲突和解决方法 闭散列 插入 查找 删除 开散列 插入 查找 删除 哈希表&#xff08;开散列&#xff09;整体代码 位图 位图模拟实现思路分析&#xff1a; 位图应用 布隆过滤器 本文介绍unordered系列的关联式容器&#xff0c;unor…

安卓手机也可以使用新必应NewBing

没有魔法安卓手机也可以使用新必应NewBing 目前知道的是安卓手机 安卓手机先安装一个猴狐浏览器 打开手机自带浏览器&#xff0c;搜索关键词&#xff1a;猴狐浏览器&#xff0c;找到官网 也可以直接复制这个网址 狐猴浏览器 lemurbrowser CoolAPK 我的手机是荣耀安卓手机…