文章目录
- 一、什么是响应式编程
- 1、Java的流和响应式流
- 2、Java中响应式的使用
- 3、Reactor中响应式流的基本接口
- 4、Reactor中响应式接口的基本使用
- 二、初始Reactor
- 1、Flux和Mono的基本介绍
- 2、引入Reactor依赖
- 3、响应式类型的创建
- 4、响应式类型的组合
- (1)使用mergeWith合并响应式流
- (2)使用zip压缩合并响应式流
- (3)使用zip压缩合并为自定义对象的响应式流
- (4)选择第⼀个反应式类型进⾏发布
- 5、转换和过滤反应式流
- (1)skip操作跳过指定数⽬的消息
- (2)skip()操作的另⼀种形式
- (3)take操作只发布第⼀批指定数量的数据项
- (4)take操作的另一种形式
- (5)filter操作自定义过滤条件
- (6)distinct操作去重
- (7)map操作映射新元素
- (8)flatMap将流转成新的流
- (9)buffer操作现将数据流拆分为小块
- (10)collectList操作也可以将所有数据收集到一个List
- (11)collectMap 操作产生⼀个发布Map的Mono
- 6、在反应式类型上执行逻辑操作
- (1)⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件
- (2)⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件
- 7、在反应式类型上使用Subscriber订阅
- (1)使用Subscriber消费消息
- (2)使用Flux的doOnNext处理数据
- 8、使用then来处理完成数据返回
- 写在后面
一、什么是响应式编程
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
在开发应⽤程序代码时,我们可以编写两种⻛格的代码,即命令式和响应式。
命令式(Imperative)的代码:它由⼀组任务组成,每次只运⾏⼀项任务,每项任务⼜都依赖于前⾯的任务。数据会按批次进⾏处理,在前⼀项任务还没有完成对当前数据批次的处理时,不能将这些数据递交给下⼀项处理任务。
响应式(Reactive)的代码:它定义了⼀组⽤来处理数据的任务,但是这些任务可以并⾏地执⾏。每项任务处理数据的⼀部分⼦集,并将结果交给处理流程中的下⼀项任务,同时继续处理数据的另⼀部分⼦集。
Reactor 是⼀个响应式编程库,同时也是Spring家族的⼀部分。它是Spring 5反应式编程功能的基础。
1、Java的流和响应式流
Java的Stream流通常都是同步的,并且只能处理有限的数据集。从本质上来说,它们只是使⽤函数来对集合进⾏迭代的⼀种⽅式。
响应式流⽀持异步处理任意⼤⼩的数据集,同样也包括⽆限数据集。只要数据就绪,它们就能实时地处理数据,并且能够通过回压来避免压垮数据的消费者。
2、Java中响应式的使用
JDK1.8时,是基于Observer/Observable接口而实现的观察者模式:
ObserverDemo observer = new ObserverDemo();
// 添加观察者
observer.addObserver(new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println("发生了变化");
}
});
observer.addObserver(new Observer() {
@Override
public void update(Observable o, Object arg) {
System.out.println("收到了通知");
}
});
observer.setChanged(); // 数据变化
observer.notifyObservers(); // 通知
JDK9及以后,Observer/Observable接口就被弃用了,取而代之的是Flow类:
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
public class FlowDemo {
public static void main(String[] args) throws Exception {
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定义订阅者
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
};
// 3. 发布者和订阅者 建立订阅关系
publiser.subscribe(subscriber);
// 4. 生产数据, 并发布
// 这里忽略数据生产过程
for (int i = 0; i < 1000; i++) {
System.out.println("生成数据:" + i);
// submit是个block方法
publiser.submit(i);
}
// 5. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();
// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
// debug的时候, 下面这行需要有断点
// 否则主线程结束无法debug
System.out.println();
}
}
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
/**
* 带 process 的 flow demo
*/
/**
* Processor, 需要继承SubmissionPublisher并实现Processor接口
*
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
*/
class MyProcessor extends SubmissionPublisher<String>
implements Processor<Integer, String> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("处理器接受到数据: " + item);
// 过滤掉小于0的, 然后发布出去
if (item > 0) {
this.submit("转换后的数据:" + item);
}
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理器处理完了!");
// 关闭发布者
this.close();
}
}
public class FlowDemo2 {
public static void main(String[] args) throws Exception {
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定义处理器, 对数据进行过滤, 并转换为String类型
MyProcessor processor = new MyProcessor();
// 3. 发布者 和 处理器 建立订阅关系
publiser.subscribe(processor);
// 4. 定义最终订阅者, 消费 String 类型数据
Subscriber<String> subscriber = new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
};
// 5. 处理器 和 最终订阅者 建立订阅关系
processor.subscribe(subscriber);
// 6. 生产数据, 并发布
// 这里忽略数据生产过程
publiser.submit(-111);
publiser.submit(111);
// 7. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();
// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
}
}
3、Reactor中响应式流的基本接口
响应式流规范可以总结为4个接⼝:Publisher、Subscriber、Subscription和Processor。
Publisher负责⽣成数据,并将数据发送给 Subscription(每个Subscriber对应⼀个Subscription)。
public interface Publisher<T> {
// Publisher接⼝声明了⼀个⽅法 subscribe(),Subscriber可以通过该⽅法向 Publisher发起订阅。
public void subscribe(Subscriber<? super T> s);
}
⼀旦Subscriber订阅成功,就可以接收来⾃Publisher的事件。
public interface Subscriber<T> {
// Subscriber的第⼀个事件是通过对 onSubscribe()⽅法的调⽤接收的。
public void onSubscribe(Subscription s);
// 每个数据项都会通过该方法处理
public void onNext(T t);
// 异常处理
public void onError(Throwable t);
// 结束
public void onComplete();
}
Publisher调⽤ onSubscribe() ⽅法时,会将Subscription对象传递给 Subscriber。
通过Subscription,Subscriber可以管理其订阅情况:
public interface Subscription {
// Subscriber可以通过调⽤ request()⽅法来请求 Publisher 发送数据,可以传⼊⼀个long类型的数值以表明它愿意接受多少数据
// 这也是回压能够发挥作⽤的地⽅,以避免Publisher 发送多于 Subscriber能够处理的数据量
public void request(long n);
// 调⽤ cancel()⽅法表明它不再对数据感兴趣并且取消订阅
public void cancel();
}
Subscriber 请求数据之后,数据就会开始流经响应式流,调用onNext方法。
Processor接⼝,它是Subscriber和Publisher的组合:
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
4、Reactor中响应式接口的基本使用
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
public class ReactorDemo {
public static void main(String[] args) {
// reactor = jdk8 stream + jdk9 reactive stream
// Mono 0-1个元素
// Flux 0-N个元素
String[] strs = { "1", "2", "3" };
// 2. 定义订阅者
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
};
// 这里就是jdk8的stream
Flux.fromArray(strs).map(s -> Integer.parseInt(s))
// 最终操作
// 这里就是jdk9的reactive stream
.subscribe(subscriber);
}
}
二、初始Reactor
1、Flux和Mono的基本介绍
Reactor中有两个核心类,Mono和Flux。Flux和Mono是Reactor提供的最基础的构建块,⽽这两种响应式类型所提供的操作符则是组合使⽤它们以构建数据流动管线的黏合剂。
这两个类实现接口Publisher,提供丰富操作符。Flux对象实现发布者,返回N个元素Mono实现发布者,返回0或者1个元素。
Flux和Mono都是数据流的发布者,使用Flux和Mono都可以发出三种数据信号:元素值、错误信号、完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。
Flux和Mono共有500多个操作,这些操作都可以⼤致归类为:创建操作;组合操作;转换操作;逻辑操作。
注意!Mono和Flux的很多操作是相同的,只不过对应的数据数量不同,所以本文更多的操作都是基于Flux的,Mono也同理。
2、引入Reactor依赖
需要引入reactor-core核心包和测试包。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.x.x</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.x.x</version>
<scope>test</scope>
</dependency>
3、响应式类型的创建
Reactor提供了多种创建Flux和Mono的操作。
// 使⽤Flux或Mono上的静态 just()⽅法来创建⼀个响应式类型
Mono.just(1);
Flux<String> fruitFlux = Flux
.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
// 调用just或其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的。
// 添加一个订阅者,subscribe的方法参数相当于是一个Consumer
fruitFlux.subscribe(
f -> System.out.println("Here's some fruit: " + f)
);
// 根据集合创建
String[] fruits = new String[] {
"Apple", "Orange", "Grape", "Banana", "Strawberry" };
Flux<String> fruitFlux2 = Flux.fromArray(fruits);
List<String> list = Arrays.asList(fruits);
Flux.fromIterable(list); // 集合
Stream<String> stream = list.stream();
Flux.fromStream(stream); // stream流
// 根据区间创建1-5
Flux<Integer> intervalFlux =
Flux.range(1, 5);
intervalFlux.subscribe(
f -> System.out.println("data is :" + f)
);
// 每秒发布⼀个值的Flux,通过interval()⽅法创建的Flux会从0开始发布值,并且后续的条⽬依次递增。
// 因为interval()⽅法没有指定最⼤值,所以它可能会永远运⾏。我们也可以使⽤take()⽅法将结果限制为前5个条⽬。
Flux<Long> intervalFlux2 =
Flux.interval(Duration.ofSeconds(1))
.take(5);
intervalFlux2.subscribe(
f -> System.out.println("data2 is :" + f)
);
// 阻塞,等待结果
Thread.sleep(100000);
4、响应式类型的组合
(1)使用mergeWith合并响应式流
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250)) // 订阅后250毫秒后开始发布数据
.delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据
// 使⽤mergeWith()⽅法,将两个Flux合并,合并过后的Flux数据项发布顺序与源Flux的发布时间⼀致
// Garfield Lasagna Kojak Lollipops Barbossa Apples
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
mergedFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
我们发现,使用mergeWith合并过的两个FLux,并没有严格意义上的先后之分,谁产生了数据就接着消费,与同一个无异。
(2)使用zip压缩合并响应式流
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
// 当两个Flux对象压缩在⼀起的时候,它将会产⽣⼀个新的发布元组的Flux,其中每个元组中都包含了来⾃每个源Flux的数据项
// 这个合并后的Flux发出的每个条⽬都是⼀个Tuple2(⼀个容纳两个其他对象的容器对象)的实例,其中包含了来⾃每个源Flux的数据项,并保持着它们发布的顺序。
Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(characterFlux, foodFlux);
zippedFlux.subscribe(t -> {
System.out.println(t.getT1() + "|" + t.getT2());
});
/**
* 执行结果:
* Garfield|Lasagna
* Kojak|Lollipops
* Barbossa|Apples
*/
(3)使用zip压缩合并为自定义对象的响应式流
如果你不想使⽤Tuple2,⽽想要使⽤其他类型,就可以为zip()⽅法提供⼀个合并函数来⽣成你想要的任何对象,合并函数会传⼊这两个数据项。
zip操作的另⼀种形式(从每个传⼊Flux中各取⼀个元素,然后创建消息对象,并产⽣这些消息组成的Flux)
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
// 压缩成自定义对象
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
zippedFlux.subscribe(System.out:: println);
/**
* 执行结果:
* Garfield eats Lasagna
* Kojak eats Lollipops
* Barbossa eats Apples
*/
(4)选择第⼀个反应式类型进⾏发布
假设我们有两个Flux对象,此时我们不想将它们合并在⼀起,⽽是想要创建⼀个新的Flux,让这个新的Flux从第⼀个产⽣值的Flux中发布值。first()操作会在两个Flux对象中选择第⼀个发布值的Flux,并再次发布它的值。
Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
.delaySubscription(Duration.ofMillis(100)); // 延迟100ms
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
// 选择第⼀个反应式类型进⾏发布
Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
firstFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/**
* 执行结果:
* hare
* cheetah
* squirrel
*/
5、转换和过滤反应式流
在数据流经⼀个流时,我们通常需要过滤掉某些值并对其他的值进⾏处理。
(1)skip操作跳过指定数⽬的消息
skip操作跳过指定数⽬的消息并将剩下的消息继续在结果Flux上进⾏传递
// 跳过3个,并创建一个新的Flux
Flux<String> skipFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.skip(3);
skipFlux.subscribe(System.out::println);
/**
* 执行结果
* ninety nine
* one hundred
*/
(2)skip()操作的另⼀种形式
在⼀段时间之内跳过所有的第⼀批数据。
// 这是skip()操作的另⼀种形式,将会产⽣⼀个新Flux,在发布来⾃源Flux的数据项之前等待指定的⼀段时间
Flux<String> skipFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.delayElements(Duration.ofSeconds(1)) // 每1秒一个
.skip(Duration.ofSeconds(4)); // 4秒前的都跳过
skipFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/**
* 执行结果:
* ninety nine
* one hundred
*/
(3)take操作只发布第⼀批指定数量的数据项
根据对skip操作的描述来看,take可以认为是与skip相反的操作。skip操作会跳过前⾯⼏个数据项,⽽take操作只发布第⼀批指定数量的数据项,然后将取消订阅。
// take操作只发布传⼊Flux中前⾯指定数⽬的数据项,然后将取消订阅
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.take(3);
nationalParkFlux.subscribe(System.out::println);
/**
* 执行结果:
* Yellowstone
* Yosemite
* Grand Canyon
*/
(4)take操作的另一种形式
take()⽅法也有另⼀种替代形式,基于间隔时间⽽不是数据项个数(在指定的时间过期之前,⼀直将消息传递给结果Flux)。它将接受并发布与源Flux⼀样多的数据项,直到某段时间结束,之后Flux将会完成。
// 在订阅之后的前3.5秒发布数据条⽬。
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
nationalParkFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/**
* 执行结果:
* Yellowstone
* Yosemite
* Grand Canyon
*/
(5)filter操作自定义过滤条件
filter操作允许我们根据任何条件进⾏选择性地发布。
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.filter(np -> !np.contains(" ")); // 过滤携带空格的
nationalParkFlux.subscribe(System.out::println);
/**
* 执行结果
* Yellowstone
* Yosemite
* Zion
*/
(6)distinct操作去重
Flux<String> animalFlux = Flux.just(
"dog", "cat", "bird", "dog", "bird", "anteater")
.distinct();
// 去重
animalFlux.subscribe(System.out::println);
/**
* 执行结果:
* dog
* cat
* bird
* anteater
*/
(7)map操作映射新元素
map将元素映射为新的元素,并创建一个新的Flux。
// map将元素映射为新的元素,并创建一个新的Flux
Flux<Integer> integerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.map(n -> {
String[] split = n.split("\\s");
return split.length; // 将String转为Integer
});
integerFlux.subscribe(System.out::println);
/**
* 执行结果:
* 2
* 2
* 2
*/
其中重要的⼀点是:在每个数据项被源Flux发布时,map操作是同步执⾏的,如果你想要异步地转换过程,那么你应该考虑使⽤flatMap操作。
(8)flatMap将流转成新的流
flatMap并不像map操作那样简单地将⼀个对象转换到另⼀个对象,⽽是将对象转换为新的Mono或Flux。结果形成的Mono或Flux会扁平化为新的Flux。当与subscribeOn()⽅法结合使⽤时,flatMap操作可以释放Reactor反应式的异步能⼒。
// 使⽤flatMap()⽅法和subscribeOn()⽅法
Flux<Integer> integerFlux = Flux
.just("Michael", "Scottie Pippen", "Steve Kerr Ob")
.flatMap(n -> Mono.just(n)
.map(p -> {
String[] split = p.split("\\s");
return split.length; // 将String转为Integer
})
.subscribeOn(Schedulers.parallel()) // 定义异步
);
integerFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
(9)buffer操作现将数据流拆分为小块
buffer操作会产⽣⼀个新的包含列表Flux(具备最⼤⻓度限制的列表,包含从传⼊的Flux中收集来的数据)
// buffer操作会产⽣⼀个新的包含列表Flux(具备最⼤⻓度限制的列表,包含从传⼊的Flux中收集来的数据)
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
// 创建⼀个新的包含List 集合的Flux,其中每个List只有不超过指定数量的元素
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3); // 数据切分为小块,每3个一块
bufferedFlux.subscribe(System.out::println);
/**
* 执行结果:
* [apple, orange, banana]
* [kiwi, strawberry]
*/
// 可以分片后并行执行
bufferedFlux.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
).subscribe(l -> {
System.out.println(Thread.currentThread().getName() + "线程执行:" + l);
});
/**
* 执行结果(因为并行执行,结果可能不一致):
* parallel-1线程执行:APPLE
* parallel-1线程执行:ORANGE
* parallel-1线程执行:BANANA
* parallel-2线程执行:KIWI
* parallel-2线程执行:STRAWBERRY
*/
// 阻塞,等待结果
Thread.sleep(100000);
使⽤不带参数的buffer()⽅法可以将Flux发布的所有数据项都收集到⼀个List中:
Flux<List<String>> bufferedFlux = fruitFlux.buffer();
(10)collectList操作也可以将所有数据收集到一个List
collectList操作将产⽣⼀个包含传⼊Flux发布的所有消息的Mono。
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
// 生成一个Mono,里面包含一个List
Mono<List<String>> fruitListMono = fruitFlux.collectList();
(11)collectMap 操作产生⼀个发布Map的Mono
collectMap操作将会产⽣⼀个Mono(包含了由传⼊Flux所发出的消息产⽣的Map,这个Map的key是从传⼊消息的某些特征衍⽣⽽来的)
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> animalMapMono =
animalFlux.collectMap(a -> a.charAt(0)); // 将第一个字符作为Map的key
animalMapMono.subscribe(System.out::println);
/**
* 执行结果:
* {a=aardvark, e=eagle, k=kangaroo}
*/
// 阻塞,等待结果
Thread.sleep(100000);
key相同的,会被覆盖。
6、在反应式类型上执行逻辑操作
(1)⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
都满足条件会返回true,否则返回false。
(2)⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("t"));
至少有一个满足条件,就为true,都不满足就为false。
7、在反应式类型上使用Subscriber订阅
(1)使用Subscriber消费消息
Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.subscribe(new Subscriber<String>() {
// 保存订阅关系, 需要用它来给发布者响应
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("订阅者开始订阅");
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("订阅者开始处理数据" + item);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable t) {
// 出现了异常(例如处理数据的时候产生了异常)
t.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("订阅者处理完了!");
}
});
/**
* 执行结果:
* 订阅者开始订阅
* 订阅者开始处理数据Apple
* 订阅者开始处理数据Orange
* 订阅者开始处理数据Grape
* 订阅者开始处理数据Banana
* 订阅者开始处理数据Strawberry
* 订阅者处理完了!
*/
// 阻塞
Thread.sleep(10000);
(2)使用Flux的doOnNext处理数据
Flux的doOnNext,会添加当Flux发出一个项目时触发的行为(副作用)。
Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者处理数据:" + t))
.subscribe(t -> System.out.println("订阅者处理数据:" + t));
/**
* 执行结果:
* 发布者处理数据:Apple
* 订阅者处理数据:Apple
* 发布者处理数据:Orange
* 订阅者处理数据:Orange
* 发布者处理数据:Grape
* 订阅者处理数据:Grape
* 发布者处理数据:Banana
* 订阅者处理数据:Banana
* 发布者处理数据:Strawberry
* 订阅者处理数据:Strawberry
*/
// 阻塞
Thread.sleep(10000);
但是!以下写法是不会触发发布者的doOnNext事件的:
Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者处理数据:" + t));
stringFlux.subscribe(t -> System.out.println("订阅者处理数据:" + t));
只有链式调用,才会触发发布者的doOnNext事件。
doOnNext可以写多个,顺序执行:
Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者1处理数据:" + t))
.doOnNext(t -> System.out.println("发布者2处理数据:" + t))
.subscribe(t -> System.out.println("订阅者处理数据:" + t));
/**
* 执行结果:
* 发布者1处理数据:Apple
* 发布者2处理数据:Apple
* 订阅者处理数据:Apple
* 发布者1处理数据:Orange
* 发布者2处理数据:Orange
* 订阅者处理数据:Orange
* 发布者1处理数据:Grape
* 发布者2处理数据:Grape
* 订阅者处理数据:Grape
* 发布者1处理数据:Banana
* 发布者2处理数据:Banana
* 订阅者处理数据:Banana
* 发布者1处理数据:Strawberry
* 发布者2处理数据:Strawberry
* 订阅者处理数据:Strawberry
*/
8、使用then来处理完成数据返回
Flux<String> just = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
// 返回一个Mono ,在此Flux完成时完成。这将主动忽略序列,只重放完成或错误信号。
just.doOnNext(t -> System.out.println("发布者处理数据:" + t))
.then(Mono.defer(() -> {
return Mono.just("我完成了");
}))
.subscribe(t -> System.out.println("订阅者处理数据:" + t));
/**
* 执行结果:
* 发布者处理数据:Apple
* 发布者处理数据:Orange
* 发布者处理数据:Grape
* 发布者处理数据:Banana
* 发布者处理数据:Strawberry
* 订阅者处理数据:我完成了
*/
通常来说,发布者发布完之后,都需要调用then来处理数据,或调用thenEmpty返回一个空的Mono(Mono.empty())。
写在后面
如果本文对你有帮助,请点赞收藏关注一下吧 ~