响应式编程三流处理

响应式编程三流处理

      • 组合响应式流
        • concat
        • merge
        • zip
        • combineLatest
      • flatMap、concatMap、flatMapSequebtial操作符
        • flatMap
        • concatMap
        • flatMapSequential
      • 元素采样
        • sample 和sampleTimeout
      • 流的批处理
        • buffer
        • window操作符
        • group by
        • 将响应式流转化为阻塞结构
        • 在序列处理时查看元素
        • 物化和非物化信号
        • push和create创建流
        • generate工厂方法
        • 将disposable资源包装到响应式流中
      • 错误处理
        • onError
        • retry
        • onErrorResume
        • onErrorReturn
        • onErrorMap
      • 背压处理
        • onBackPressureBuffer
        • onBackPressureDrop
        • onBackPressureLast
        • onBackPressureError
      • 热数据流和冷数据流
        • 多播流元素
        • 缓存流元素
        • 共享流元素
      • 处理时间
        • interval操作符
        • delayElements 操作符
        • delaySequence 操作符
        • timeout 操作符
        • timestamp 操作符
        • elapsed 操作符
        • 组合和和转化响应式流

组合响应式流

concat

concat 操作符通过向下游转发接收的元素来连接所有数据源。当操作符连接两个流时,它首先消费并重新发送第一个流的所有元素,然后对第二个流执行相同的操作。

      // concat
        Flux.concat(
                Flux.range(10,5).delayElements(Duration.ofMillis(100))
                .doOnSubscribe(subscription -> System.out.println("订阅第一个流")),
                Flux.range(100,5).delayElements(Duration.ofMillis(100))
                .doOnSubscribe(subscription -> System.out.println("订阅第二流"))
        ).subscribe(System.out::println);
        Thread.sleep(10*1000);
merge

merge 操作符将来自上游序列的数据合并到一个下游序列中。与 concat 操作符不同,上游数据源是立即(同时)被订阅的。
打印是乱序的

    Flux.merge(
                Flux.range(10,5).delayElements(Duration.ofMillis(100))
                        .doOnSubscribe(subscription -> System.out.println("订阅第一个流")),
                Flux.range(100,5).delayElements(Duration.ofMillis(100))
                        .doOnSubscribe(subscription -> System.out.println("订阅第二流"))
        ).subscribe(System.out::println);
        Thread.sleep(10*1000);
zip

zip 操作符订阅所有上游,等待所有数据源发出一个元素,然后将接收到的元素组合到一个输出元素中。

        //这里最后形成的是一个二元组
 // 这里面依照最慢的那个进行打印,如果我们两个流的打印个数不同,那就以最少的为准,例如将第一个range的第二个参数改为101最后打印还是以10个为准
        Flux.zip(
                Flux.range(1,10)
                .delayElements(Duration.ofMillis(10)),
                Flux.range(100,10)
                .delayElements(Duration.ofMillis(10))
                ).subscribe(System.out::println);
                Thread.sleep(10*1000);

在这里插入图片描述

combineLatest

combineLatest 操作符与 zip 操作符的工作方式类似。但是,只要至少一个上游数据源发出一个值,它就会生成一个新值。
在这里插入图片描述

Flux.combineLatest(
        Flux.range(1,10)
        .delayElements(Duration.ofMillis(1000)),
        Flux.range(100,10)
        .delayElements(Duration.ofMillis(2000)),
        ((integer1, integer2) -> integer1 + "==" +integer2)
).subscribe(System.out::println);
Thread.sleep(10*1000);

flatMap、concatMap、flatMapSequebtial操作符

flatMap 操作符在逻辑上由 map 和 flatten(就 Reactor 而言,flatten 类似于 merge 操作符)这两个操作组成。
flatMap 操作符的 map 部分将传入的
flatten 部分将所有生成的响应式流。
Project Reactor 提供了 flatMap 操作符的一些不同变体。除了重载,该库还提供了flatMapSequential 操作符和 concatMap 操作符。
这 3 个操作符在以下几个方面有所不同。

  1. 操作符是否立即订阅其内部流;flatMap 操作符和 flatMapSequential 操作符会立即订阅,而 concatMap 操作符则会在生成下一个子流并订阅它之前等待每个内部完成。

  2. 操作符是否保留生成元素的顺序;concatMap 天生保留与源元素相同的顺序,flatMapSequential 操作符通过对所接收的元素进行排序来保留顺序,而 flatMap 操作符不一定保留原始排序。

  3. 操作符是否允许对来自不同子流的元素进行交错;

    flatMap 操作符允许交错,而 concatMap和 flatMapSequential 不允许交错。
    flatMap 操作符(及其变体)在函数式编程和响应式编程中都非常重要,因为它能使用一行代码实现复杂的工作流。

flatMap

flatMap允许元素交错,处理的时候是并行处理的
flatMap 操作符

Random random = new Random();
Flux.just(Arrays.asList(1,2,3),Arrays.asList("a","b","c","d"),Arrays.asList(7,8,9))
        .doOnNext(System.out::println)
        .flatMap(item -> Flux.fromIterable(item)
        .doOnSubscribe(subscription -> {
            System.out.println("已经订阅");
        })// 我们增加一个延时,订阅后延时一段时间再发送
        .delayElements(Duration.ofMillis(random.nextInt(100) + 100))
        ).subscribe(System.out::println);

Thread.sleep(10*1000);

在这里插入图片描述

concatMap

concatMap不允许元素交错。

Random random = new Random();
Flux.just(Arrays.asList(1,2,3),Arrays.asList("a","b","c","d"),Arrays.asList(7,8,9))
    .doOnNext(System.out::println)
    .concatMap(item -> Flux.fromIterable(item)
               .doOnSubscribe(subscription -> {
                   System.out.println("已经订阅");
               })// 我们增加一个延时,订阅后延时一段时间再发送
               .delayElements(Duration.ofMillis(random.nextInt(100) + 100))
              ).subscribe(System.out::println);

Thread.sleep(10*1000);

concatMap对每个上游的元素,在接收后都立即生成新的流,新流每个元素处理完之后,进行下一个新流的处理。
在这里插入图片描述

flatMapSequential

flatMapSequential不允许元素交错。
数据来了就订阅,输出按照顺序输出。

   Random random = new Random();
        Flux.just(Arrays.asList(1,2,3),Arrays.asList("a","b","c","d"),Arrays.asList(7,8,9))
                .doOnNext(System.out::println)
                .flatMapSequential(item -> Flux.fromIterable(item)
                        .doOnSubscribe(subscription -> {
                            System.out.println("已经订阅");
                        })// 我们增加一个延时,订阅后延时一段时间再发送
                        .delayElements(Duration.ofMillis(random.nextInt(100) + 100))
                ).subscribe(System.out::println);

        Thread.sleep(10*1000);

元素采样

sample 和sampleTimeout

对于高吞吐量场景而言,通过应用采样技术处理一小部分事件是有意义的。
sample 操作符和 sampleTimeout 操作符可以让流周期性地发出与时间窗口内最近看到的值相对应的数据项。

 // 每个100ms从流中获取对应的元素
        Flux.range(1,100)
                .delayElements(Duration.ofMillis(10))
                .sample(Duration.ofMillis(200))
                .subscribe(System.out::println);
        Thread.sleep(10*1000);

使我们每10毫秒都顺序生成数据项,订阅者也只会收到所指定的约束条件内的一小部分事件。通过这种方法,我们可以在不需要所有传入事件就能成功操作的场景下使用被动限速。流控。
在这里插入图片描述
sampleTimeout采样指定随机时间:

        Random random = new Random();
        Flux.range(0,20)
                .delayElements(Duration.ofMillis(100))
                .sampleTimeout( item -> Mono.delay(Duration.ofMillis(random.nextInt(100) + 50)),20)
                .subscribe(System.out::println);
        Thread.sleep(10 * 1000);

流的批处理

  1. 将元素缓冲(buffering)到容器(如 List)中,结果流的类型为 Flux<List<T>> 。
  2. 通过开窗(windowing)方式将元素加入诸如 Flux<Flux<T>> 等流中。请注意,现在的流信号不是值,而是可以处理的子流。
  3. 通过某些键将元素分组(grouping)到具有 Flux<GroupedFlux<K, T>> 类型的流中。每个新键都会触发一个新的 GroupedFlux 实例,并且具有该键的所有元素都将被推送到GroupFlux 类的该实例中。

可以基于以下场景进行缓冲和开窗操作:

  1. 处理元素的数量,比方说每 10 个元素;
  2. 一段时间,比方说每 5 分钟一次;
  3. 基于一些谓语,比方说在每个新的偶数之前切割;
  4. 基于来自其他 Flux 的一个事件,该事件控制着执行过程。
buffer

列表(大小为10)中的整数元素执行缓冲操作

Flux.range(1,100)
    .buffer(10)
    .subscribe(System.out::println);

buffer 操作符将许多事件收集到一个事件集合中。该集合本身成为下游操作符的事件。当需要使用元素集合来生成一些请求,而不是使用仅包含一个元素的集合来生成许多小请求时,用缓冲操作符来实现批处理会比较方便。

window操作符

如果需要根据数字序列中的元素是否为素数进行开窗拆分,可以使用 window 操作符的变体windowUntil。
它使用谓词来确定何时创建新切片。

Flux.range(101,20)
                .windowUntil(ReactorDemo12::isPrime,false)
                .subscribe(
                        window ->
                            window.collectList()
                                    .subscribe(
                                            item -> System.out.println("window:" + item)
                                    )
                );

    }

    private static boolean isPrime(Integer integer){
        double sqrt = Math.sqrt(integer);
        if(integer <  2){
            return false;
        }
        if(integer == 2 || integer == 3){
            return true;
        }
        if(integer % 2 == 0){
            return false;
        }
        for(int i = 3;i <= sqrt;i++){
            if(integer % i == 0){
                return false;
            }
        }
        return true;
    }

请注意第一个窗口为空。这是因为一旦启动原始流,就会生成一个初始窗口。然后,第一个元素会到达(数字 101),它是素数,会触发一个新窗口。因此,已经打开的窗口会在没有任何元素的情况下通过 onComplete 信号关闭。

请注意第一个窗口为空。这是因为一旦启动原始流,就会生成一个初始窗口。然后,第一个元素会到达(数字 101),它是素数,会触发一个新窗口。因此,已经打开的窗口会在没有任何元素的情况下通过 onComplete 信号关闭。

group by

groupBy 操作符通过某些条件对响应式流中的元素进行分组。通过对每个元素打一个标签(key),按照标签将元素进行分组。

如:将整数序列按照奇数和偶数进行分组,并仅跟踪每组中的最后两个元素。

        Flux.range(1,7)
                .groupBy(item -> item % 2 == 0 ? "偶数" : "奇数")
                .subscribe(groupFlux ->groupFlux.scan(new ArrayList<>(),
                        (list,element) ->{
                             list.add(element);
                    return list;

                        }).filter(list -> !list.isEmpty()).
                        subscribe(item -> System.out.println(groupFlux.key() + "======" +item))
                );

在这里插入图片描述
在这里插入图片描述

将响应式流转化为阻塞结构

roject Reactor 库提供了一个 API,用于将响应式流转换为阻塞结构。

有以下选项来阻塞流并同步生成结果:

  1. toIterable 方法将响应式 Flux 转换为阻塞 Iterable。
  2. toStream 方法将响应式 Flux 转换为阻塞 Stream API。从 Reactor 3.2 开始,在底层使用toIterable 方法。
  3. blockFirst 方法阻塞了当前线程,直到上游发出第一个值或完成流为止。
  4. blockLast 方法阻塞当前线程,直到上游发出最后一个值或完成流为止。在 onError的情况下,它会在被阻塞的线程中抛出异常。
 Flux.just(1,2,3).toIterable();
        Stream<Integer> stream = Flux.just(1,2,3,4).toStream();
        // 1、验证toIterable为阻塞
        Iterable<Integer> integers = Flux.just(1, 2, 3, 4)
                .delayElements(Duration.ofSeconds(1))
                .toIterable();
        System.out.println("===============");
        for(Integer num :integers){
            System.out.println(num);
        }
        System.out.println("===============");
        //  2、我们可以做一下改进
        Flux.just(1,2,3)
                .delayElements(Duration.ofSeconds(1))
                .subscribe(System.out::println);
        System.out.println("==========");
        System.out.println("==========");
        Thread.sleep(10*1000);

        // 3、toStream进行阻塞
        Stream<Integer> integerStream = Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(1))
                .toStream();
        System.out.println("================");
        integerStream.forEach(System.out::println);
        System.out.println("================");
        // 4 、BlockFirst 只拿第一个,其他不处理
        Integer integer = Flux.just(1, 2, 3)
                .delayElements(Duration.ofSeconds(1))
                .doOnNext(item -> System.out.println("onNext:" + item))
                .blockFirst();
        System.out.println("==========");
        System.out.println(integer);
        System.out.println("==========");
        Thread.sleep(10*100);

        //  blocklast 直到流的最后一个元素
        Integer integer2 = Flux.just(1, 2, 3)
                .delayElements(Duration.ofSeconds(1))
                .doOnNext(item -> System.out.println("onNext:" + item))
                .blockLast();
        System.out.println("==========");
        System.out.println(integer2);
        System.out.println("==========");

        Flux<Integer> integerFlux = Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(1));
        integerFlux.subscribe(item -> System.out.println("第一个订阅" + item));
        integerFlux.subscribe(item -> System.out.println("第二个订阅" + item));
        integerFlux.subscribe(item -> System.out.println("第三个订阅" + item));
        Integer integer2 = integerFlux.blockFirst();
        System.out.println("阻塞最后一个元素:" + integer2);
        System.out.println("=================");
        Thread.sleep(10*1000);
在序列处理时查看元素

有时,我们需要对处理管道中的每个元素或特定信号执行操作。为满足此类要求,Project Reactor提供了以下方法。

  1. doOnNext(Consumer<T>) 使我们能对 Flux 或 Mono 上的每个元素执行一些操作。
  2. doOnComplete 和 doOnError(Throwable) 可以应用在相应的事件上。
  3. doOnSubscribe(Consumer<Subscription>) 和doOnCancel(Runnable) 使我们能对订阅生命周期事件做出响应。
  4. 无论是什么原因导致的流终止, doOnTerminate(Runnable) 都会在流终止时被调用。
    此外,Flux 和 Mono 提供了 doOnEach(Consumer <Signal>) 方法,该方法处理表示响应式流领域的所有信号,包括 onSubscribe、onNext、onError 和 onComplete
Flux.just(1,2,3)
    .concatWith(Flux.error(new RuntimeException("手动异常")))
    //  .doOnEach(item -> System.out.println(item))
    .subscribe(
    item -> System.out.println("onNext:" + item),
    ex -> System.err.println("onError:" + ex),
    () -> System.out.println("处理完毕")
);
物化和非物化信号

将流中的元素封装为Signal对象进行处理。
有时,采用信号进行流处理比采用数据进行流处理更有用。为了将数据流转换为信号流并再次返回,Flux 和 Mono 提供了 materialize 方法和 dematerialize 方法。示例如下:

  Flux.just(1,2,3)
                .delayElements(Duration.ofMillis(1000))
                .publishOn(Schedulers.parallel())
                .concatWith(Flux.error(new Exception("手动异常")))
                .materialize()
                .doOnEach(item -> System.out.println(item.isOnComplete()))
                .log()
                .dematerialize()
                .subscribe(System.out::println);
        Thread.sleep(10*1000);

这里,在处理信号流时,doOnNext 方法不仅接收带有数据的 onNext 事件,还接收包含在Signal类中的 onComplete 事件。此方法能采用一个类型层次结构来处理 onNext、onError和 onCompete 事件。如果我们只需要记录信号而不修改它们,那么 Reactor 提供了 log 方法,该方法使用可用的记录器记录所有处理过的信号。

push和create创建流

有时候需要一种更复杂的方法来在流中生成信号,或将对象的生命周期绑定到响应式流的生命周期。
push 工厂方法能通过适配一个单线程生产者来编程创建 Flux 实例。
此方法对于适配异步、单线程、多值 API 非常有用,而无须关注背压和取消, push 方法本身包含背压和取消。

Flux<Integer> push = Flux.push(fluxSink -> {
    // 从数据库中获取数据
    // FluxSink 追加到响应式流中,这样将命令处理方式,转化为响应式处理方式
    IntStream.range(1, 10)
            .forEach(item -> fluxSink.next(item));
});
push.subscribe(System.out::println);
Thread.sleep(5*1000);

push 工厂方法可以很方便地使用默认的背压和取消策略来适配异步 API。

create 工厂方法,与 push 工厂方法类似,起到桥接的作用。
该方法能从不同的线程发送事件

MyEventProcessor myEventProcessor = new MyEventProcessor();
        Flux<String> bridage = Flux.create(sink -> {
            myEventProcessor.register(new MyEventListener<String>() {
                @Override
                public void onDataChunk(List<String> chunk) {
                    for (String s : chunk) {
                        sink.next(s);
                    }
                }

                @Override
                public void processComplete() {
                    sink.complete();
                }
            });
        });
        bridage.subscribe(
                System.out::println,
                ex -> System.err.println(ex),
                () ->System.out.println("处理完毕")
        );
        myEventProcessor.process();
        Thread.sleep(5*1000);
static class MyEventProcessor{
    private MyEventListener listener;
    private Random random = new Random();
    void register(MyEventListener listener){
        this.listener = listener;
    }

    public void process(){
        while(random.nextInt(10) % 3 != 0){
            List<String> dataChunk = new ArrayList<>();
            for(int i = 0 ;i < 10;i++){
                dataChunk.add("data - " + i );
            }
            listener.onDataChunk(dataChunk);
        }
        listener.processComplete();;
    }
}
interface  MyEventListener<T>{
    void onDataChunk(List<T> chunk);

    void processComplete();
}
generate工厂方法

generate 工厂方法旨在基于生成器的内部处理状态创建复杂序列。它需要一个初始值和一个函数,该函数根据前一个内部状态计算下一个状态,并将 onNext 信号发送给下游订阅者。
例如,创建一个简单的响应式流来生成斐波那契(Fibonacci)数列(0,1,1,2,3,5,8,13,…)。

Flux.generate(
        // 通过Callable提供初始状态实例
        new Callable<ArrayList<Long>>() {
            @Override
            public ArrayList<Long> call() throws Exception {
                final ArrayList<Long> longs = new ArrayList<>();
                longs.add(0L);
                longs.add(1L);
                return longs;
            }
        }, // 负责斐波拉契数列
        // 函数第一个参数数据、函数第二个参数类型 、返回值
        new BiFunction<ArrayList<Long>, SynchronousSink<Long>, ArrayList<Long>>() {
            @Override
            public ArrayList<Long> apply(ArrayList<Long> longs, SynchronousSink<Long> sink) {
                Long aLong = longs.get(longs.size() - 1);
                Long aLong1 = longs.get(longs.size() - 2);
                sink.next(aLong);
                longs.add(aLong + aLong1);
                return longs;
            }
        }).delayElements(Duration.ofMillis(500))
        .take(10)
        .subscribe(System.out::println);
Thread.sleep(5000);

lambda形式:

Flux.generate(
        // 通过Callable提供初始状态实例
        () -> {
            final ArrayList<Long> longs = new ArrayList<>();
            longs.add(0L);
            longs.add(1L);
            return longs;
        }, // 负责斐波拉契数列
        // 函数第一个参数数据、函数第二个参数类型 、返回值
        (BiFunction<ArrayList<Long>, SynchronousSink<Long>, ArrayList<Long>>) (longs, sink) -> {
        //获取倒数第一个和倒数第二个
            Long aLong = longs.get(longs.size() - 1);
            Long aLong1 = longs.get(longs.size() - 2);
            sink.next(aLong);
            longs.add(aLong + aLong1);
            return longs;
        }).delayElements(Duration.ofMillis(500))
        .take(10)
        .subscribe(System.out::println);
Thread.sleep(5000);

还可以用二元组的方式表示:

Flux.generate(
        // 通过Callable提供初始状态实例
        () -> Tuples.of(0L,1L), // 负责斐波拉契数列
        // 函数第一个参数数据、函数第二个参数类型 、返回值
       (state, sink) -> {
           System.out.println("生成的数字:" + state.getT2());
           sink.next(state.getT1());
           long nextValue  = state.getT1() + state.getT2();
            return Tuples.of(state.getT2(),nextValue);
        }).delayElements(Duration.ofMillis(500))
        .take(10)
        .subscribe(System.out::println);
Thread.sleep(5000);
将disposable资源包装到响应式流中

using 工厂方法能根据一个 disposable 资源创建流。它在响应式编程中实现了 try-with-resources 方法。
假设我们需要包装一个阻塞 API,而该 API 使用以下有如下表示:

static class Connection implements AutoCloseable {
    private final Random rnd = new Random();
    static Connection newConnection() {
        System.out.println("创建Connection对象");
        return new Connection();
    }
    public Iterable<String> getData() {
        if (rnd.nextInt(10) < 3) {
            throw new RuntimeException("通信异常");
        }
        return Arrays.asList("数据1", "数据2");
    }
    // close方法可以释放内部资源,并且应该始终被调用,即使在getData执行期间发生错误也是如此。
    @Override
    public void close() {
        System.out.println("关闭Connection连接");
    }
}
public static void main(String[] args) throws InterruptedException {
    try (Connection connection = Connection.newConnection()) {
        connection.getData().forEach(data -> System.out.println("接收的数据:" +
                data));
    } catch (Exception e) {
        System.err.println("错误信息:" + e);
    }
}
Flux.using(
        Connection::newConnection,
        connection -> Flux.fromIterable(connection.getData()),
        Connection::close
).subscribe(
        data -> System.out.println("onNext接收到数据:" + data),
        ex -> System.err.println("onError接收到的异常信息:" +ex),
        () -> System.out.println("处理完毕")
);

连接的生命周期与流的生命周期绑定。
操作符还可以在通知订阅者流终止之前或之后选择是否应该进行清除动作。

错误处理

onError 信号是响应式流规范的一个组成部分,一种将异常传播给可以处理它的用户。但是,如果最终订阅者没有为 onError 信号定义处理程序,那么 onError 抛异常。
此外,响应式流的语义定义了 onError 是一个终止操作,该操作之后响应式流会停止执行。
此时,我们可能采取以下策略中的一种做出不同响应:

  1. 为 subscribe 操作符中的 onError 信号定义处理程序。
  2. 定义一个在发生错误时重新执行的响应式工作流。如果源响应序列发出错误信号,那么retry 操作符会重新订阅该序列。
  3. 通过 onErrorResume 操作符捕获异常并执行备用工作流。
  4. 通过 onErrorReturn 操作符捕获一个错误,并用一个默认静态值或一个从异常中计算出的值替换它。
  5. 通过 onErrorMap 操作符捕获异常并将其转换为另一个异常来更好地表现当前场景。
onError
//onError
Flux.from(new Publisher<String>() {
    @Override
    public void subscribe(Subscriber<? super String> s) {
        s.onError(new RuntimeException("手动异常"));
    }
    // }).subscribe(System.out::println);
}).subscribe(System.out::println,System.err::println);
retry
 private static Random random = new Random();
    private static Flux<String> recommendedBooks(String userId){
        return Flux.defer(()->{
            if(random.nextInt(10) < 7){
                return Flux.<String>error(new RuntimeException("err"))
                        // 整体向后推移时间
                        .delaySequence(Duration.ofMillis(100));
            }else{
                return Flux.just("西游记","红楼梦")
                        .delayElements(Duration.ofMillis(10));
            }
        }).doOnSubscribe(
                item -> System.out.println("请求:" + userId)
        );
    }
  private static CountDownLatch latch = new CountDownLatch(1);
    public static void main(String[] args) throws InterruptedException {

        Flux.just("user-1000")
                .flatMap(user -> recommendedBooks(user).retry(3))//这里最多会调用四次,第一次正常调用 后三次进行重试
                .subscribe(
                        System.out::println,
                        ex -> {
                            System.err.println(ex);
                            latch.countDown();
                        },
                        () ->{
                            System.out.println("处理完毕");
                            latch.countDown();
                        }
                );
        latch.await();
    }
onErrorResume

返回备用的流

 public static void main(String[] args) throws InterruptedException {

        Flux.just("user-1000")
                .flatMap(user -> recommendedBooks(user))
                .onErrorResume(error -> Flux.just("三国演义"))// 异常捕获返回备用的流
                .subscribe(
                        System.out::println,
                        ex -> {
                            System.err.println(ex);
                            latch.countDown();
                        },
                        () ->{
                            System.out.println("处理完毕");
                            latch.countDown();
                        }
                );
        latch.await();
    }
onErrorReturn

返回一个默认的值

 public static void main(String[] args) throws InterruptedException {

        Flux.just("user-1000")
                .flatMap(user -> recommendedBooks(user))
                .onErrorReturn("水浒传")// 异常捕获返回备用的流
                .subscribe(
                        System.out::println,
                        ex -> {
                            System.err.println(ex);
                            latch.countDown();
                        },
                        () ->{
                            System.out.println("处理完毕");
                            latch.countDown();
                        }
                );
        latch.await();
    }
onErrorMap
 public static void main(String[] args) throws InterruptedException {

        Flux.just("user-1000")
                .flatMap(user -> recommendedBooks(user))
                .onErrorMap(throwable -> {
                    if(throwable.getMessage().equals("err")){
                        return new Exception("业务异常");
                    }
                    return new Exception("未知异常");
                })
                .subscribe(
                        System.out::println,
                        ex -> {
                            System.err.println(ex);
                            latch.countDown();
                        },
                        () ->{
                            System.out.println("处理完毕");
                            latch.countDown();
                        }
                );
        latch.await();
    }

背压处理

尽管响应式流规范要求将背压构建到生产者和消费者之间的通信中,但这仍然可能使消费者溢出。
一些消费者可能无意识地请求无界需求,然后无法处理生成的负载。
另一些消费者则可能对传入消息的速率有严格的限制。例如,数据库客户端每秒不能插入超过 1000条记录。在这种情况下,事件批处理技术可能有所帮助。
可以通过以下方式配置流以处理背压情况:

  1. onBackPressureBuffer 操作符会请求无界需求并将返回的元素推送到下游。如果下游消费者无法跟上,那么元素将缓冲在队列中。
  2. onBackPressureDrop 操作符也请求无界需求(Integer.MAX_VALUE)并向下游推送数据。如果下游请求数量不足,那么元素会被丢弃。自定义处理程序可以用来处理已丢弃的元素。
  3. onBackPressureLast 操作符与 onBackPressureDrop 的工作方式类似。只是会记住最近收到的元素,并在需求出现时立即将其推向下游。
  4. onBackPressureError 操作符在尝试向下游推送数据时请求无界需求。如果下游消费者无法跟上,则操作符会引发错误。

管理背压的另一种方法是使用速率限制技术。

onBackPressureBuffer
public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    Flux.range(1,1000)
            .delayElements(Duration.ofMillis(10))
            .onBackpressureBuffer(600)
            .delayElements(Duration.ofMillis(100))
            .subscribe(
                    System.out::println,
                    ex ->{
                        System.out.println(ex);
                        latch.countDown();
                    },
                    () ->{
                        System.out.println("处理完毕");
                        latch.countDown();
                    }
            );
    latch.await();
    System.out.println("main结束");
}
onBackPressureDrop
Flux.range(1,1000)
        .delayElements(Duration.ofMillis(10))
        .onBackpressureDrop()
        .delayElements(Duration.ofMillis(100))
        .subscribe(
                System.out::println,
                ex ->{
                    System.out.println(ex);
                    latch.countDown();
                },
                () ->{
                    System.out.println("处理完毕");
                    latch.countDown();
                }
        );
latch.await();
System.out.println("main结束");
onBackPressureLast
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1,1000)
        .delayElements(Duration.ofMillis(10))
        .onBackpressureLatest()
        .delayElements(Duration.ofMillis(100))
        .subscribe(
                System.out::println,
                ex ->{
                    System.out.println(ex);
                    latch.countDown();
                },
                () ->{
                    System.out.println("处理完毕");
                    latch.countDown();
                }
        );
latch.await();
System.out.println("main结束");
onBackPressureError
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1,1000)
        .delayElements(Duration.ofMillis(10))
        .onBackpressureError()
        .delayElements(Duration.ofMillis(100))
        .subscribe(
                System.out::println,
                ex ->{
                    System.out.println(ex);
                    latch.countDown();
                },
                () ->{
                    System.out.println("处理完毕");
                    latch.countDown();
                }
        );
latch.await();
System.out.println("main结束");

热数据流和冷数据流

冷发布者行为方式:无论订阅者何时出现,都为该订阅者生成所有序列数据,
没有订阅者就不会生成数据

 Flux<String> coldPublisher = Flux.defer(()->{
            System.out.println("生成数据");
            return Flux.just(UUID.randomUUID().toString());
        });
        System.out.println("尚未生成数据");
        coldPublisher.subscribe(e -> System.out.println("onNext:" + e));
        coldPublisher.subscribe(e -> System.out.println("onNext:" + e));
        System.out.println("为两次订阅生成两次数据");

在这里插入图片描述
每当订阅者出现时都会有一个新序列生成,而这些语义可以代表 HTTP请求。
热发布者中的数据生成不依赖于订阅者而存在。因此,热发布者可能在第一个订阅者出现之前开始生成元素。
这种语义代表数据广播场景。例如,一旦股价发生变化,热发布者就可以向其订阅者广播有关当前股价的更新。
但是,当订阅者到达时,它仅接收未来的价格更新,而不接受先前价格历史。

多播流元素

通过响应式转换将冷发布者转变为热发布者。
如,一旦所有订阅者都准备好生成数据,希望在几个订阅者之间共享冷处理器的结果。同时,我们又不希望为每个订阅者重新生成数据。

Flux<Integer> source = Flux.range(0,3)
    .doOnSubscribe(s -> System.out.println("对冷发布者的新订阅票据:" + s));
ConnectableFlux<Integer> conn = source.publish();
conn.subscribe(item -> System.out.println("[subscriber 1] onNext:" + item));
conn.subscribe(item -> System.out.println("[subscriber 2] onNext:" + item));
System.out.println("所有定于这都准备建立连接");
conn.connect();

可以看到,冷发布者收到了订阅,只生成了一次数据项。但是,两个订阅者都收到了整个事件集合。

缓存流元素

使用 ConnectableFlux 可以轻松实现不同的数据缓存策略
cache 操作符使用 ConnectableFlux ,因此它的主要附加值是它所提供的一个流式而直接的API。可以调整缓存所能容纳的数据量以及每个缓存项的到期时间。

Flux<Integer> source = Flux.range(0,5)
    .doOnSubscribe(s -> System.out.println("冷发布者的新订阅数据"));
Flux<Integer> cacheSource = source.cache(Duration.ofMillis(1000));
cacheSource.subscribe(item -> System.out.println("[subscribe 1] on Next:" +item));
cacheSource.subscribe(item -> System.out.println("[subscribe 2] on Next:" +item));
Thread.sleep(1200);
cacheSource.subscribe(item -> System.out.println("[subscribe 3] on Next:" +item));

前两个订阅者共享第一个订阅的同一份缓存数据。然后,在一定延迟之后,由于第三个订阅者无法获取缓存数据,因此一个针对冷发布者的新订阅被触发了。最后,即使该数据不来自缓存,第三个订阅者也接收到了所需的数据。

共享流元素

我们可以使用 ConnectableFlux 向几个订阅者多播事件。但是需要等待订阅者出现才能开始处理。
share 操作符可以将冷发布者转变为热发布者。该操作符会为每个新订阅者传播订阅者尚未错过的事件。

    Flux<Integer> source = Flux.range(0, 5)
                .delayElements(Duration.ofMillis(100))
                .doOnSubscribe(s -> System.out.println("冷发布者新的订阅票据"));
        Flux<Integer> shareSource = source.share();
        shareSource.subscribe(item -> System.out.println("subscribe 1 onNext:" + item));
        Thread.sleep(400);
        shareSource.subscribe(item -> System.out.println("subscribe 2 onNext:" + item));
        Thread.sleep(10*1000);

在前面的代码中,共享了一个冷发布流,该流以每 100 毫秒为间隔生成事件。然后,经过一些延迟,一些订阅者订阅了共享发布者。
第一个订阅者从第一个事件开始接收,而第二个订阅者错过了在其出现之前所产生的事件(S2 仅接收到事件 3 和事件 4)

处理时间

响应式编程是异步的,因此它本身就假定存在时序。

基于 Project Reactor,可以使用 interval 操作符生成基于一定持续时间的事件,使用delayElements 操作符生成延迟元素,并使用 delaySequence 操作符延迟所有信号。

Reactor 的 API 使你能对一些与时间相关的事件做出响应, timestamp 操作符用于输出元素的时间戳, timeout 操作符用于指定消息时间间隔的大小。与 timestamp 类似, elapsed 操作符测量与上一个事件的时间间隔。

interval操作符
 // interval
        // 1、指定生成元素的时间间隔
        Flux.interval(Duration.ofMillis(100))
                .subscribe(item ->{
                    System.out.println(Thread.currentThread().getName() +"========" + item);
                });
        System.out.println(Thread.currentThread().getName());

        // 2、 第一个参数指定发送事件于订阅时间的时间间隔
        // 第二个参数:指定生成序列元素的时间间隔
        Flux.interval(Duration.ofSeconds(20),Duration.ofMillis(300))
                .subscribe(System.out::println);


        Thread.sleep(5 *1000);
        System.out.println("结束");
delayElements 操作符
 Flux.range(1,1000)
                // 指定元素之间的时间间隔
                .delayElements(Duration.ofSeconds(1))
                .subscribe(item ->{
                    System.out.println(Thread.currentThread().getName() + "====" + item);
                });

        Thread.sleep(5 *1000);
        System.out.println("结束");
delaySequence 操作符

指定订阅时间和第一个元素发布事件的时间间隔

     Flux.range(1,1000)
                .delaySequence(Duration.ofSeconds(5))
                .subscribe(item ->{
                    System.out.println(Thread.currentThread().getName() + "====" + item);
                });

        Thread.sleep(10 *1000);
        System.out.println("结束");
timeout 操作符
Random random = new Random();
        CountDownLatch latch = new CountDownLatch(1);
        // 指定时间间隔大于我们的timeout时间,就抛异常
        Flux.interval(Duration.ofMillis(300))
                .timeout(Duration.ofMillis(random.nextInt(20) + 290))
                .subscribe(
                        System.out::println,
                        ex -> {
                            System.err.println(ex);
                            latch.countDown();
                        }
                );
        latch.await(10, TimeUnit.SECONDS);
timestamp 操作符
// 为我们响应式流里面增加一个响应式时间戳,时间戳和元素会以二元组形式在流里面传递
        Flux.interval(Duration.ofMillis(300))
                .timestamp()
                .subscribe( item ->{
                    Long timeStamp = item.getT1();
                    Long element = item.getT2();
                    String result = element + "的时间戳" + timeStamp;
                    System.out.println(result);
                });
        Thread.sleep(5000);
elapsed 操作符

时间间隔操作

// elapsed
        Flux.interval(Duration.ofMillis(300))
                .elapsed()
                .subscribe(
                        item ->{
                            Long interval = item.getT1();
                            Long element = item.getT2();
                            String result = element + "与上一个元素的时间间隔" + interval + "ms";
                            System.out.println(result);
                        }
                );
        Thread.sleep(5000);

在这里插入图片描述

但是我们看到这个时间间隔并不准确,所以说不要在 Reactor 库中要求太精确的时间(实时)间隔

组合和和转化响应式流

当我们构建复杂的响应式工作流时,通常需要在几个不同的地方使用相同的操作符序列。

transform 操作符,可以将这些常见的部分提取到单独的对象中,并在需要时重用它们。

transform 操作符,可以增强流结构本身。

public static void main(String[] args) {
        Function<Flux<String>,Flux<String>> logUserInfo = stream ->
            stream.index().doOnNext(
                    tp  -> System.out.println("[" + tp.getT1() +"] user:" + tp.getT2())
            ).map(Tuple2::getT2);

        Flux.range(1000,3)
                .map(i -> "user - " + i)
                .transform(logUserInfo)
                .subscribe(e -> System.out.println("onNext:" + e));

    }

transform 操作符仅在流生命周期的组装阶段更新一次流行为,可以在响应式应用程序中实现代码重用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/385301.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之Marquee组件

鸿蒙&#xff08;HarmonyOS&#xff09;项目方舟框架&#xff08;ArkUI&#xff09;之Marquee组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 二、Marquee组件 跑马灯组件&#xff0c;用于滚动展示一段单行文本&#xff0c;仅当…

【hcie-cloud】【27】华为云Stack网络安全防护

文章目录 前言网络安全概述常见网络攻击类型流量型攻击DDoS单包攻击网络攻击防范 网络安全服务华为云Stack网络防护HCS租户网络纵深防护HCS常用网络安全防护服务对比 云防火墙详述云防火墙&#xff08;CFW&#xff09;- 定义云防火墙&#xff08;CFW&#xff09;- 实现原理云防…

【MySQL进阶之路】亿级数据量表SQL调优实战

欢迎关注公众号&#xff08;通过文章导读关注&#xff1a;【11来了】&#xff09;&#xff0c;及时收到 AI 前沿项目工具及新技术的推送&#xff01; 在我后台回复 「资料」 可领取编程高频电子书&#xff01; 在我后台回复「面试」可领取硬核面试笔记&#xff01; 文章导读地址…

Spark编程实验六:Spark机器学习库MLlib编程

目录 一、目的与要求 二、实验内容 三、实验步骤 1、数据导入 2、进行主成分分析&#xff08;PCA&#xff09; 3、训练分类模型并预测居民收入 4、超参数调优 四、结果分析与实验体会 一、目的与要求 1、通过实验掌握基本的MLLib编程方法&#xff1b; 2、掌握用MLLib…

Elasticsearch深度分页问题

目录 什么是深度分页 深度分页会带来什么问题 深度分页问题的常见解决方案 滚动查询&#xff1a;Scroll Search search_after 总结 什么是深度分页 分页问题是Elasticsearch中最常见的查询场景之一&#xff0c;正常情况下分页代码如实下面这样的&#xff1a; # 查询第一…

Ps:堆栈模式在摄影后期的应用

Photoshop 的堆栈模式 Stack Mode为摄影师提供了一种强大的后期处理能力&#xff0c;通过堆叠和处理多张照片来实现无法单靠一张照片完成的效果。 正确的前期拍摄策略和后期处理技巧可以显著提高最终图像的质量和视觉冲击力。 ◆ ◆ ◆ 前期拍摄通用注意事项 在前期拍摄时&am…

【Linux学习】线程互斥与同步

目录 二十.线程互斥 20.1 什么是线程互斥&#xff1f; 20.2 为什么需要线程互斥? 20.3 互斥锁mutex 20.4 互斥量的接口 20.4.1 互斥量初始 20.4.2 互斥量销毁 20.4.3 互斥量加锁 20.4.4 互斥量解锁 20.4.5 互斥量的基本原理 20.4.6 带上互斥锁后的抢票程序 20.5 死锁问题 死锁…

【医学大模型 动态知识图谱】AliCG概念图 = 知识图谱 + 实时更新、细粒度概念挖掘、个性化适应

AliCG概念图 提出背景能力强化细粒度概念获取长尾概念挖掘分类体系进化对比传统知识图谱 部署方法如何提高信息检索的质量&#xff1f;如何在神经网络中学习概念嵌入&#xff1f;如何在预训练阶段利用概念图&#xff1f; 提出背景 论文: https://arxiv.org/pdf/2106.01686.pdf…

论文解读:MobileOne: An Improved One millisecond Mobile Backbone

论文创新点汇总&#xff1a;人工智能论文通用创新点(持续更新中...)-CSDN博客 论文总结 关于如何提升模型速度&#xff0c;当今学术界的研究往往聚焦于如何将FLOPs或者参数量的降低&#xff0c;而作者认为应该是减少分支数和选择高效的网络结构。 概述 MobileOne(≈MobileN…

《剑指Offer》笔记题解思路技巧优化 Java版本——新版leetcode_Part_2

《剑指Offer》笔记&题解&思路&技巧&优化_Part_2 &#x1f60d;&#x1f60d;&#x1f60d; 相知&#x1f64c;&#x1f64c;&#x1f64c; 相识&#x1f353;&#x1f353;&#x1f353;广度优先搜索BFS&#x1f353;&#x1f353;&#x1f353;深度优先搜索DF…

九、java 继承

文章目录 java 继承3.1 根父类Object3.2 方法重写3.3 继承案例&#xff1a;图形类继承体系3.4 继承的细节3.4.1 构造方法3.4.2 重名与静态绑定3.4.3 重载和重写3.4.4 父子类型转换3.4.5 继承访问权限protected3.4.6 可见性重写3.4.7 防止继承final 3.5 继承是把双刃剑3.5.1 继承…

70.SpringMVC怎么和AJAX相互调用的?

70.SpringMVC怎么和AJAX相互调用的&#xff1f; &#xff08;1&#xff09;加入Jackson.jar&#xff08;2&#xff09;在配置文件中配置json的消息转换器.(jackson不需要该配置HttpMessageConverter&#xff09; <!‐‐它就帮我们配置了默认json映射‐‐> <mvc:anno…

Netty应用——实例-群聊系统(十六)

编写一个Netty群聊系统&#xff0c;实现服务器端和客户端之间的数据简单通讯 (非阻塞)实现多人群聊服务器端:可以监测用户上线&#xff0c;离线&#xff0c;并实现消息转发功能客户端:通过channel可以无阳塞发送消息给其它所有用户&#xff0c;同时可以接受其它用户发送的消息(…

哈夫曼树的学习以及实践

哈夫曼树 哈夫曼树的基本了解哈夫曼树的基本概念创建霍夫曼树的思路编码构建的思路代码实现创建HuffmanTree结点初始化HuffmanTree创建霍夫曼树霍夫曼树编码 哈夫曼树的基本了解 给定 n 个 权值 作为 n 个 叶子节点&#xff0c;构造一颗二叉树&#xff0c;若该树的 带权路径长…

C语言第二十三弹---指针(七)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 指针 1、sizeof和strlen的对比 1.1、sizeof 1.2、strlen 1.3、sizeof 和 strlen的对比 2、数组和指针笔试题解析 2.1、⼀维数组 2.2、二维数组 总结 1、si…

C语言每日一题(56)平衡二叉树

力扣网 110 平衡二叉树 题目描述 给定一个二叉树&#xff0c;判断它是否是高度平衡的二叉树。 本题中&#xff0c;一棵高度平衡二叉树定义为&#xff1a; 一个二叉树每个节点 的左右两个子树的高度差的绝对值不超过 1 。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,…

牛客错题整理——C语言(实时更新)

1.以下程序的运行结果是&#xff08;&#xff09; #include <stdio.h> int main() { int sum, pad,pAd; sum pad 5; pAd sum, pAd, pad; printf("%d\n",pAd); }答案为7 由于赋值运算符的优先级高于逗号表达式&#xff0c;因此pAd sum, pAd, pad;等价于(…

Linux系统之部署File Browser文件管理系统

Linux系统之部署File Browser文件管理系统 一、File Browser介绍1.1 File Browser简介1.2 File Browser功能1.3 File Browser使用场景 二、本地环境介绍2.1 本地环境规划2.2 本次实践介绍 三、检查本地环境3.1 检查本地操作系统版本3.2 检查系统内核版本 四、安装File Browser4…

Linux_线程

线程与进程 多级页表 线程控制 线程互斥 线程同步 生产者消费者模型 常见概念 下面选取32位系统举例。 一.线程与进程 上图是曾经我们认为进程所占用的资源的集合。 1.1 线程概念 线程是一个执行分支&#xff0c;执行粒度比进程细&#xff0c;调度成本比进程低线程是cpu…

SpringCloud-Eureka服务注册中心测试实践

5. Eureka服务注册中心 5.1 什么是Eureka Netflix在涉及Eureka时&#xff0c;遵循的就是API原则.Eureka是Netflix的有个子模块&#xff0c;也是核心模块之一。Eureka是基于REST的服务&#xff0c;用于定位服务&#xff0c;以实现云端中间件层服务发现和故障转移&#xff0c;服…