标题响应式编程代码示例
代码示例
多个消费者订阅了同一个生产者
package com.yaeher.infrastructure.userinfovault.user;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.*;
public class ReactorDemo{
public static Scheduler custom_Scheduler() {
Executor executor = new ThreadPoolExecutor(
10,
10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1000),
Executors.defaultThreadFactory()
);
return Schedulers.fromExecutor(executor);
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void flux_generate4() {
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
System.out.println("所发射元素产生的线程: "+Thread.currentThread().getName());
sink.next(value);
sleep(1000);
if (list.size() == 20) {
sink.complete();
}
return list;
}).publishOn(custom_Scheduler(),1)
.map(x -> String.format("[%s] ---> %s", Thread.currentThread().getName(), x))
.subscribe(System.out::println);
sleep(20000);
}
}
结果如下所示
一次订阅,但是元素的消费线程却始终在切换。
这种情况会带来一个问题,若存在这么一种情况:我们需要在线程的上下文中存储一些对象数据,方便我们在整个执行的过程中使用他们,在生命周期结束的时候将存储的数据进行销毁。 所以我们该如何存储呢?
方案1:使用ThreadLocal,但是这会有个缺点,在我们自定义线程池的时候,在调度的时候几个订阅关系可能会共用一个线程池,故不同订阅关系之间可能会由于ThreadLocal而产生数据泄露。
方案2:使用Reactor提供的ContextAPI来解决,其用于服务Flux或者Mono单个订阅关系的上下文数据存储。
context 的简单用法
@Test
public void contextSimple1() {
String key = “message”;
Mono r = Mono.just(“Hello”)
.flatMap(s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.get(key))))
.contextWrite(ctx -> ctx.put(key, “World”));
StepVerifier.create(r).expectNext("Hello World").verifyComplete();
}
注意
contextWrite 放在调用链的最后,订阅从下游流向上游的。
总结
Mono.deferContextual(c-> c.get(key)):获得Context中指定Key对应的Value值。
contextWrite(c -> c.put(key, value):往Context中塞入一个键值对。
简单用法的格式:Mono.deferContextual(c ->Mono.just(c.get(key)))