RxJava/RxAndroid的操作符使用(二)

文章目录

  • 一、创建操作
    • 1、基本创建
    • 2、快速创建
      • 2.1 empty
      • 2.2 never
      • 2.3 error
      • 2.4 from
      • 2.5 just
    • 3、定时与延时创建操作
      • 3.1 defer
      • 3.2 timer
      • 3.3 interval
      • 3.4 intervalRange
      • 3.5 range
      • 3.6 repeat
  • 二、过滤操作
    • 1、skip/skipLast
    • 2、debounce
    • 3、distinct——去重
    • 4、elementAt——获取指定位置元素
    • 5、filter——过滤
    • 6、first——取第一个数据
    • 7、last——取最后一个
    • 8、ignoreElements & ignoreElement(忽略元素)
    • 9、ofType(过滤类型)
    • 10、sample
    • 11 、take & takeLast
  • 三、组合可观察对象操作符
    • 1、CombineLatest
    • 2、merge
    • 3、zip
    • 4、startWith
    • 5、join
  • 四、变化操作符
    • 1、map
    • 2、flatMap / concatMap
    • 3、scan
    • 4、buffer
    • 5、window
  • 关于RxJava/RxAndroid的全部文章

一、创建操作

1、基本创建

create.c

create创建一个基本的被观察者

在使用create()操作符时,最好在被观察者的回调函数subscribe()中加上isDisposed(),以便在观察者断开连接的时候不在执行subscribe()函数中的相关逻辑,避免意想不到的错误出现。

        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Object> emitter) throws Throwable {
                try {
                    if(!emitter.isDisposed()){
                        emitter.onNext("a");
                        emitter.onNext("b");
                    }
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }
        }).subscribe(
                value -> Log.e(TAG, "onNext: " + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete"));

2、快速创建

完整&快速创建被观察者、数组、集合遍历

操作符作用
empty创建一个只发送 onComplete 事件的 Observable。
never创建一个不发送任何事件的 Observable。
error创建一个只发送 onError 事件的 Observable。
from操作符用于将其他对象或数据结构转换为 Observable,可发送不同类型的数据流
just操作符将对象或一组对象转换为 Observable,并立即发送这些对象,没有延迟。

2.1 empty

empty.c

创建一个不发射任何items但正常终止的 Observable——create an Observable that emits no items but terminates normally

        Observable.empty().subscribe(
                value -> Log.e(TAG, "onNext: "+value ),
                error -> Log.e(TAG, "Error: "+error),
                ()->Log.e(TAG,"onComplete")
        );

image-20231107093722826

2.2 never

never.c

创建一个不发射任何items且不会终止的 Observable——create an Observable that emits no items and does not terminate

        Observable.never().subscribe(
                value -> Log.e(TAG, "onNext: " + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete"));

不发送任何事件

2.3 error

throw.c

创建一个不发射任何items并以错误终止的 Observable——create an Observable that emits no items and terminates with an error

        Observable.error(new Exception("ERROR")).subscribe(
                value -> Log.e(TAG, "onNext: " + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete"));

image-20231107094348333

2.4 from

image-20231107095234857

from.c

以fromAray举例:

        Observable.fromArray(1,2,3,4,5).subscribe(
                value -> Log.e(TAG, "onNext: " + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete"));

image-20231107095642506

2.5 just

just.c

        Observable.just(1,2,3,4,5).subscribe(
                value -> Log.e(TAG, "onNext: " + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete"));

image-20231107094504061

通过just()创建传入Integer类型的参数构建Observable被观察者,相当于执行了onNext(1)~onNext(5),通过链式编程订阅观察者。注意just的数据一般不能超过10个

注意,如果将 null 传递给 Just,它将返回一个将 null 作为项目发出的 Observable。不要错误地认为这将返回一个空的 Observable(根本不发出任何项目)

3、定时与延时创建操作

定时操作、周期性操作

操作符作用
defer直到有Observer观察者订阅时,才会通过Observeable的工厂方法动态创建Observeable,并且发送事件
timer用于延时发送,在给定的延迟后发出单个项目
interval它按照指定时间间隔发出整数序列,通常用于定时操作。
intervalRange类似于interval(),快速创建一个被观察者对象,指定时间间隔就发送事件,可以执行发送事件的数量
range它发出一个连续的整数序列,可以指定发送的次数
repeat重复发送指定次数的某个事件流

3.1 defer

defer.c

直到有Observer观察者订阅时,才会通过Observeable的工厂方法动态创建Observeable,并且发送事件

defer不会立即创建 Observable,而是等到观察者订阅时才动态创建,每个观察者都会得到一个新的 Observable 实例。

defer确保了Observable代码在被订阅后才执行(而不是创建后立即执行)

        Observable<Integer> integerObservable = Observable.defer(new Supplier<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> get() throws Throwable {
                int randomNumber = (int) (Math.random() * 100);
                return Observable.just(randomNumber);
            }
        });

        integerObservable.subscribe(
                new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Throwable {
                        Log.e(TAG, "第一次" + integer.toString());
                    }
                }
        );
        

        integerObservable.subscribe(
                integer -> Log.e(TAG, "第二次" + integer.toString())
        );

image-20231107123942326

3.2 timer

timer.c

构造方法如下:

image-20231107152036633

 timer(long delay, TimeUnit unit)
 timer(long delay, TimeUnit unit, Scheduler scheduler)
  • delay:延时的时间,类型为Long;
  • unit:表示时间单位,有TimeUnit.SECONDS等多种类型;
  • scheduler:表示调度器,用于指定线程。

用于延时发送

        final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Log.e(TAG, "timer:当前时间 ==" + dateFormat.format(System.currentTimeMillis()));

        Observable.timer(5, TimeUnit.SECONDS).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + dateFormat.format(System.currentTimeMillis())),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

表示延迟5s后发送数据

image-20231107154813890

3.3 interval

interval.c

用于定时发送数据,快速创建Observable被观察者对象,每隔指定的时间就发送相应的事件,事件序列从0开始,无限递增1;

//在指定延迟时间后,每个多少时间发送一次事件
interval(long initialDelay, long period, TimeUnit unit)
 
//在指定的延迟时间后,每隔多少时间发送一次事件,可以指定调度器
interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
 
//每间隔多少时间发送一次事件,使用默认的线程
Observable<Long> interval(long period, TimeUnit unit)
 
//每间隔多少时间发送一次事件,可以指定调度器
interval(long period, TimeUnit unit, Scheduler scheduler)
  • initialDelay: 表示延迟开始的时间,类型为Long
  • period:距离下一次发送事件的时间间隔,类型Long
  • unit:时间单位,有TimeUnit.SECONDS等多种类型;
  • scheduler:表示调度器,用于指定线程。

它会从0开始,然后每隔 1 秒发射一个递增的整数值

        Observable.interval(1,3,TimeUnit.SECONDS).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + dateFormat.format(System.currentTimeMillis())),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231107160116792

定时发射指定的结果

// 创建一个每秒发射一个递增整数的 Observable
        Observable<Long> intervalObservable = Observable.interval(1, TimeUnit.SECONDS);

        // 使用 map 操作符将递增的整数值映射为您想要的数据类型
        Observable<String> customObservable = intervalObservable
            .map(index -> "Data_" + index); // 映射为字符串 "Data_" + index

        // 订阅并输出结果
        customObservable.subscribe(
            data -> System.out.println("Received: " + data),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Completed")
        );

3.4 intervalRange

类似于interval(),快速创建一个被观察者对象,指定时间间隔就发送事件,可以执行发送事件的数量,数据依次递增1。

intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
 
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
  • start:表示事件开始的数值大小,类型为Long
  • count:表示事件执行的次数,类型为long,不能为负数;
  • initialDelay:表示延迟开始的时间,类型为Long;
  • period:距离下一次发送事件的时间间隔,类型Long;
  • unit:时间单位,有TimeUnit.SECONDS等多种类型;
  • scheduler:表示调度器,用于指定线程。
        Observable.intervalRange(10, 3, 2, 1,
                TimeUnit.SECONDS,
                Schedulers.io()).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231107161150732

3.5 range

range.c

Range 运算符按顺序发出一系列连续整数,可以在其中选择范围的起点及其长度。

它发出一个连续的整数序列,通常不涉及延迟。类似于intervalRange。

 public static Observable<Integer> range(int start, int count)
 public static Observable<Long> rangeLong(long start, long count)
  • start:事件开始的大小
  • count:发送的事件次数
    Observable.range(10,5).subscribe(
            value -> Log.e(TAG, "timer:onNext ==" + value),
            error -> Log.e(TAG, "Error: " + error),
            () -> Log.e(TAG, "onComplete")
    );

3.6 repeat

repeat.c

repeat操作符可以重复发送指定次数的某个事件流,repeat操作符默认在trampoline调度器上执行,repeat默认重复次数为Long.MAX_VALUE,可使用重载方法指定次数以及使用repeatUntil指定条件。

        //一直重复
        Observable.fromArray(1, 2, 3, 4).repeat();
        //重复发送5次
        Observable.fromArray(1, 2, 3, 4).repeat(5);
        //重复发送直到符合条件时停止重复
        Observable.fromArray(1, 2, 3, 4).repeatUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {
                //自定判断条件,为true即可停止,默认为false
                return false;
            }
        });

二、过滤操作

1、skip/skipLast

skip

可以在Flowable,Observable中使用,表示源发射数据前,跳过多少个。

  1. skip: skip 操作符用于跳过 Observable 开头的一定数量的事件,然后开始发射后续的事件。它忽略序列的头部事件。

    例如,observable.skip(3) 会跳过前面的 3 个事件,然后发射后续的事件。

  2. skipLast: skipLast 操作符用于跳过 Observable 末尾的一定数量的事件,然后发射前面的事件。它忽略序列的末尾事件。

    例如,observable.skipLast(3) 会发射从序列开头到倒数第 3 个事件之前的事件,忽略了最后 3 个事件。

        Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8);
        
        integerObservable.skipLast(3).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.e(TAG, "accept: " + integer);
            }
        });

image-20231107205656235

换成skip后结果如下:

image-20231107205732571

2、debounce

仅当特定时间跨度过去而没有发出另一个项目时,才从 Observable 发出一个项目

DM_20231107210151_001

Observable.create(emitter -> {
    emitter.onNext(1);
    Thread.sleep(1_500);
    emitter.onNext(2);
    Thread.sleep(500);
    emitter.onNext(3);
    Thread.sleep(2000);
    emitter.onNext(4);
    emitter.onComplete();
}).subscribeOn(Schedulers.io()).debounce(1,TimeUnit.SECONDS).blockingSubscribe(
    value -> Log.e(TAG, "timer:onNext ==" + value),
    error -> Log.e(TAG, "Error: " + error),
    () -> Log.e(TAG, "onComplete")
);

debounce(1, TimeUnit.SECONDS) 表示将事件流中的事件按照时间窗口的方式进行过滤。具体含义是,如果在连续的 1 秒内没有新的事件发射,那么才会将最后一个事件传递给观察者,否则会丢弃之前的事件。

image-20231108122709311

结合图像理解,红色线条为debounce监听的发射节点,也就是每隔一秒发送一次数据。

在0s时发送了1。

在1s时由于没有数据,就没有发送数据。

在1s—2s期间产生了两次数据,分别是2和3。但是debounce只会将距离2s最近一次的数据发送。因此2被不会发送出来。

image-20231108123140801

3、distinct——去重

DM_20231108124123_001

可作用于Flowable,Observable,去掉数据源重复的数据。

        Observable.just(1,2,3,1,2,3,4).distinct().subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231108123826742

DM_20231108124148_001

distinctUntilChanged()去掉相邻重复数据。

      Observable.just(1,3,3,2,2,3,4).distinctUntilChanged().subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

//还可以指定重复条件
        Observable.just(1,3,3,2,2,3,4).distinctUntilChanged(new Function<Integer, Boolean>() {
            @Override
            public Boolean apply(Integer integer) throws Throwable {
                return integer>3;
            }
        }).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

4、elementAt——获取指定位置元素

20200404170305941

//获取索引为1的元素,如果不存在返回Error
Observable.just("a","b","c","d","e").elementAt(1,"Error").subscribe(
        value -> Log.e(TAG, "timer:onNext ==" + value),
        error -> Log.e(TAG, "Error: " + error),
);

image-20231108124926798

5、filter——过滤

用于过滤指定的发射元素。

20200404170341923

        Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Throwable {
                return (integer % 2) != 0;
            }
        }).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231108125316182

6、first——取第一个数据

20200404170409304

      //不存在则返回100
Observable.just(1, 2, 3, 4, 5, 6).first(100).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error)
        );

image-20231108130922361

7、last——取最后一个

20200404170445807

last、lastElement、lastOrError与fist、firstElement、firstOrError相对应。

   Observable.just(1, 2, 3, 4, 5, 6).last(100).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error)
        );

image-20231108131530432

8、ignoreElements & ignoreElement(忽略元素)

20200404170613536

ignoreElements 作用于FlowableObservableignoreElement作用于MaybeSingle。两者都是忽略掉数据,不发射任何数据,返回完成或者错误时间。

9、ofType(过滤类型)

作用于Flowable、Observable、Maybe,过滤选择类型。

        Observable.just(1, 2, 3, 4.4, 5.5, 6.6).ofType(Integer.class).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error)
        );

image-20231108132125800

10、sample

  1. debounce:它等待一段时间,如果在这段时间内没有新事件到达,它会发射最后一个事件。它用于处理高频率事件流,例如用户输入,以确保只处理用户停止输入后的事件。debounce 等待事件流静止,然后发射最后一个事件。
  2. sample:它按照固定的时间间隔从事件流中抽样一个事件,并发射该事件。它用于定期采样事件流,例如从传感器数据中每隔一段时间获取一次数据。sample 定期获取事件,无论事件流是否活跃。
        Observable<Integer> observable = Observable.create(emitter -> {
            emitter.onNext(1);
            Thread.sleep(1_500);
            emitter.onNext(2);
            Thread.sleep(500);
            emitter.onNext(3);
            Thread.sleep(2000);
            emitter.onNext(4);
            emitter.onComplete();
        });

        observable.sample(1, TimeUnit.SECONDS).subscribe(
                        value -> Log.e(TAG, "timer:onNext ==" + value),
                        error -> Log.e(TAG, "Error: " + error)
                );

image-20231108133255137

产生的数据在红线处发送

1在第1s时被发送,2在第2s时被发送,3在第3s时被发送,由于4还未在第5s时就已经onComplete所以4无法被发送

image-20231108133014660

11 、take & takeLast

20200404170852608

作用于Flowable、Observable。take发射前n个元素。takeLast发射后n个元素。

        Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        source.take(4)
                .subscribe(value-> Log.e(TAG, "timer:onNext ==" + value));
        //打印:1 2 3 4

        source.takeLast(4)
                .subscribe(value-> Log.e(TAG, "timer:onNext ==" + value));
        //打印:7 8 9 10

三、组合可观察对象操作符

操作符作用
combineLatest用于将多个 Observable 中最新的事件进行组合,并生成一个新的事件。
merge用于将多个 Observable 合并成一个单一的 Observable,按照它们发射事件的顺序合并
zip用于一一配对多个 Observable 发射的事件,只有当所有 Observable 都有事件时才生成新事件
startWith用于在一个 Observable 发射的事件前插入一个或多个初始事件
join用于将两个 Observable 的事件按照时间窗口的方式进行组合。

1、CombineLatest

image-20231108134447676

通过指定的函数将每个 Observable 发出的最新项目组合在一起,并根据该函数的结果发出项目

  • combineLatest 用于将多个 Observable 中最新的事件进行组合,并生成一个新的事件
  • 当任何一个 Observable 发射新数据时,都会生成新的组合事件。
  • 适用于需要及时反应多个数据源最新值变化的情况。
Observable<Integer> source1 = Observable.just(1, 2, 3);
Observable<String> source2 = Observable.just("A", "B", "C");
Observable<Boolean> source3 = Observable.just(true, false, true);

Observable<String> combined = Observable.combineLatest(
        source1,
        source2,
        source3,
        (integer, string, aBoolean) -> integer + " " + string + " " + aBoolean
);

combined.subscribe(
        value -> Log.e(TAG, "timer:onNext ==" + value),
        error -> Log.e(TAG, "Error: " + error),
        () -> Log.e(TAG, "onComplete")
);

image-20231108134458384

2、merge

mergeDelayError.C

  • merge 用于将多个 Observable 合并成一个单一的 Observable,按照它们发射事件的顺序合并
  • merge 不会进行事件的组合,只是合并多个 Observable 的事件。
  • 适用于需要将多个 Observable 的事件合并成一个流的情况。

注意:merge只能合并相同类型的Observable

        Observable<Integer> source1 = Observable.just(1, 2, 3);
        Observable<Integer> source2 = Observable.just(4,5,6);
        Observable<Integer> source3 = Observable.just(7,8,9);

        Observable<Integer> combined = Observable.merge(source1,source2,source3);

        combined.subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231108135448811

3、zip

下载

  • zip 用于一一配对多个 Observable 发射的事件,只有当所有 Observable 都有事件时才生成新事件
  • zip 会等待所有 Observable 都有事件后,才会执行组合函数生成新事件。
  • 适用于需要将多个数据源的事件一一配对的情况。
        Observable<Integer> source1 = Observable.just(1, 2, 3);
        Observable<String> source2 = Observable.just("A", "B", "C");
        Observable<Boolean> source3 = Observable.just(true, false, true);

        Observable<String> combined = Observable.zip(
                source1,
                source2,
                source3,
                (integer, string, aBoolean) -> integer + " " + string + " " + aBoolean
        );

        combined.subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231108135842793

4、startWith

DM_20231108140443_001

  • startWith 用于在一个 Observable 发射的事件前插入一个或多个初始事件
  • 这些初始事件会作为 Observable 的开头。
  • 适用于需要在 Observable 发射事件前添加一些初始数据的情况。

image-20231108140316862

        Observable<Integer> source = Observable.just(1, 2, 3);
        Observable<Integer> withStart = source.startWithArray(100,200);

        withStart.subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231108140348057

5、join

20200404175817782

  • join 用于将两个 Observable 的事件按照时间窗口的方式进行组合
  • 可以为每个 Observable 设置时间窗口,然后在这些窗口内组合事件。
  • 适用于需要在时间窗口内组合两个 Observable 的事件的情况。

时间窗口:

  • 固定时间窗口:定义一个固定的时间段,将在该时间段内的事件分为一个时间窗口。
  • 延时时间窗口:定义一个时间段,但在事件发生后延迟一段时间后才分为时间窗口。
  • 动态时间窗口:根据事件的特定条件动态地定义时间窗口。
        Observable<Integer> left = Observable.just(1, 2, 3);
        Observable<Integer> right = Observable.just(10, 20, 30);

        left.join(
                        right,
                        leftDuration -> Observable.timer(1, TimeUnit.SECONDS),
                        rightDuration -> Observable.timer(1, TimeUnit.SECONDS),
                        (leftValue, rightValue) -> "Left: " + leftValue + ", Right: " + rightValue
                )
                .subscribe(value -> Log.e(TAG, "timer:onNext ==" + value));

在这个示例中,我们定义了以下时间窗口规则:

  • 左边的时间窗口规则:leftDuration -> Observable.timer(1, TimeUnit.SECONDS) 表示在左边的事件后等待 1 秒后生成一个时间窗口。
  • 右边的时间窗口规则:rightDuration -> Observable.timer(2, TimeUnit.SECONDS) 表示在右边的事件后等待 2 秒后生成一个时间窗口。

现在让我们看看时间窗口如何影响事件的组合:

  • 当左边的事件 1 发生时,它会进入左边的时间窗口,并等待 1 秒。在此期间,右边的事件没有机会进入左边的时间窗口。
  • 当右边的事件 10 发生时,它会进入右边的时间窗口,并等待 2 秒。在此期间,左边的事件也没有机会进入右边的时间窗口。

只有在左边和右边的事件都在各自的时间窗口内时,它们才会被组合。在这个示例中,左边的事件会在右边的时间窗口内被组合。所以,在 1 秒后,左边的事件 1 和右边的事件 10 被组合成 “Left: 1, Right: 10”。

image-20231108185229730

四、变化操作符

| 操作符 | 说明 |

map()对数据流的类型进行转换
flatMap()对数据流的类型进行包装成另一个数据流
scan()scan操作符会对发射的数据上一轮发射的数据进行函数处理,并返回的数据供下一轮使用。
buffer()缓存指定大小数据
window()缓存指定大小数据,返回新的integerObservable


​ 对上一轮处理过后的数据流进行函数处理
​ 对所有的数据流进行分组
​ 缓存发射的数据流到一定数量,随后发射出数据流集合
​ 缓存发射的数据流到一定数量,随后发射出新的事件流

1、map

20200404154637646

        Observable.just(1,2,3).map(new Function<Integer, Object>() {
            @Override
            public Object apply(Integer integer) throws Throwable {
                return integer * 100;
            }
        }).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

2、flatMap / concatMap

20200404155254319

        Observable observable = Observable.just(isLogin("12346")).flatMap(new Function<Boolean, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Boolean aBoolean) throws Throwable {
                String Login = "登陆失败,帐号秘密错误";
                if (aBoolean) Login = "登陆成功";
                return Observable.just(Login).delay(2, TimeUnit.SECONDS);
            }
        });

        observable.subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );
        
            private boolean isLogin(String passWord) {
        if (passWord.equals("123456")) {
            return true;
        }
        return false;
    }

image-20231108191254259

  1. Observable.just(isLogin("12346")) 创建一个 Observable,它会发射一个布尔值,表示登录是否成功。
  2. .flatMap(new Function<Boolean, ObservableSource<?>>() { ... }:使用 flatMap 操作符将上一步的布尔值结果转换成一个新的 Observable,其中包含登录的结果消息。flatMap 中的 apply 方法根据登录结果 aBoolean 决定返回不同的消息。如果登录成功,返回 “登陆成功” 消息,否则返回 “登陆失败,帐号秘密错误” 消息,并使用 delay 延迟 2 秒发送消息。
  3. observable.subscribe(...):最后,订阅 observable,并设置了三个回调函数,分别处理 onNext、onError、onComplete 事件。

concatMap与flatMap的区别: concatMap是有序的,flatMap是无序的

  1. flatMap():
    • 不保证内部 Observable 的发射顺序,它会尽可能并行地处理内部 Observable,并将它们的发射结果合并到一个单一的 Observable 中。
    • 内部 Observable 可以乱序发射数据,最终结果也可能是乱序的。
  2. concatMap():
    • 保证内部 Observable 的发射顺序,它会按照原始数据的顺序依次处理每个内部 Observable,等待一个内部 Observable 完成后再处理下一个。
    • 内部 Observable 的发射顺序和最终结果的顺序都与原始数据的顺序一致。
        Observable<Integer> source = Observable.just(1, 2, 3);

        // 使用 flatMap
        source.flatMap(num -> Observable.just(num * 2).delay(num, TimeUnit.MILLISECONDS))
                .subscribe(value -> Log.e(TAG, "timer:flatMapOnNext ==" + value));

        // 使用 concatMap
        source.concatMap(num -> Observable.just(num * 2).delay(num, TimeUnit.MILLISECONDS))
                .subscribe(value -> Log.e(TAG, "timer:concatMapOnNext ==" + value));

image-20231108191544320

3、scan

20200404161402851

scan操作符会对发射的数据上一轮发射的数据进行函数处理,并返回的数据供下一轮使用。

        Observable<Integer> observable = Observable.just(1,2,3,4).scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Throwable {
                Log.e(TAG, "integer = " + integer +" integer2 = "+integer2);
                return integer2-integer;
            }
        });

        observable.subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        );

image-20231108192719260

  1. 初始情况下,前一个累积结果为空(因为没有前一个值),所以第一个数据项 1 直接发射出来,产生的结果是 1。
  2. 接下来,前一个累积结果是 1,当前数据项是 2,所以执行操作 2 - 1,产生的结果是 1。
  3. 再次执行,前一个累积结果是 1,当前数据项是 3,所以执行操作 3 - 1,产生的结果是 2。
  4. 最后,前一个累积结果是 2,当前数据项是 4,执行操作 4 - 2,产生的结果是 2。

4、buffer

20200404163816872

buffer操作符可以将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据全部发射出去。

        Observable.just(1,2,3,4,5,6,7,8).buffer(3).subscribe(
                value -> Log.e(TAG, "timer:onNext ==" + value),
                error -> Log.e(TAG, "Error: " + error),
                () -> Log.e(TAG, "onComplete")
        )

image-20231108194136484

5、window

20200404164717644

window操作符和buffer操作符在功能上实现的效果是一样的,但window操作符最大区别在于同样是缓存一定数量的数据项,window操作符最终发射出来的是新的事件流integerObservable,而buffer操作符发射出来的是新的数据流。

也就是说,window操作符发射出来新的事件流中的数据项,还可以经过Rxjava其他操作符进行处理

window 操作符用于将一个 Observable 拆分为多个子 Observable,每个子 Observable 包含一定数量的连续数据项。window 操作符的两个参数的含义如下:

  1. 第一个参数(count):指定每个子 Observable 中包含的数据项的数量。
  2. 第二个参数(skip):指定何时启动新的窗口。它定义了窗口之间的重叠或间隔。如果 skip 等于 count,则窗口之间不重叠。如果 skip 小于 count,则窗口之间有重叠数据。

举个例子来说明:

假设有一个 Observable 发出的数据序列如下:1, 2, 3, 4, 5, 6, 7, 8, 9。

  • 如果你使用 window(3, 2),它的含义是每个窗口包含 3 个数据项,且窗口之间间隔 2 个数据项。那么分割后的子 Observable 将是:
    • 窗口1:1, 2, 3
    • 窗口2:3, 4, 5
    • 窗口3:5, 6, 7
    • 窗口4:7, 8, 9
  • 如果你使用 window(3, 3),窗口之间不重叠,每个窗口包含 3 个数据项。那么分割后的子 Observable 将是:
    • 窗口1:1, 2, 3
    • 窗口2:4, 5, 6
    • 窗口3:7, 8, 9

这里只使用了一个参数,用于指定窗口的大小。然后更具window发射新的事件流integerObservable的特性,对这个事件流进行了去重操作。

        Observable.just(1,1,3,4,6,6,7,8)
                .window(3).subscribe(new Consumer<Observable<Integer>>() {
                    @Override
                    public void accept(Observable<Integer> integerObservable) throws Throwable {
                        integerObservable.distinct().subscribe(value -> Log.e(TAG, "timer:onNext ==" + value.toString()));
                    }
                });

关于RxJava/RxAndroid的全部文章

RxJava/RxAndroid的基本使用方法(一)

RxJava的操作符使用(二)


参考文档:

官方文档:reactivex

RxJava3 Wiki:Home · ReactiveX/RxJava Wiki · GitHub

RxJava3官方github:What’s different in 3.0 · ReactiveX/RxJava Wiki · GitHub

RxJava2 只看这一篇文章就够了–玉刚说

RxJava2最全面、最详细的讲解–苏火火丶

关于背压(Backpressure)的介绍:关于RxJava最友好的文章——背压(Backpressure)

RXJava3+OKHTTP3+Retrofit2(观察者设计模式)讲解+实战

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/122046.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode-LCR 021 删除链表的倒数第 N 个结点

快慢指针&#xff0c;快指针先移动n-1个节点后&#xff0c;慢指针从虚拟头结点出发&#xff08;相当于快慢指针相隔n个节点&#xff09;&#xff0c;快慢指针一起向链表尾依次移动一个结点&#xff0c;当快指针移动到表位时&#xff0c;慢指针正好移到被删除元素的前一个结点&a…

采用springboot 2.7.10来操作clickhouse

1、采用springboot与clickhouse结合&#xff0c;其实和操作mysql&#xff0c;oracle区别不大。直接上代码开干 2、所采用的环境 jdk1.8 springboot 2.7.10 clickhouse 22.8.3.13 clickhouse 0.5.0 3、项目的pom.xml文件 <dependency><groupId>com.clickhous…

使用matlab实现图像信号的色彩空间转换

利用matlab对图像信号进行读取&#xff0c;并对RGB空间进行转换&#xff0c;如转换到HSI空间等。 下面的这个代码是在使用了rgb2hsi()方法失败后&#xff0c;进行修改的。 rgb2hsi(img)这个方法可以将RGB图像转换为HIS图像&#xff1b;但是爆出了 Untitled5(line 5)hsi rgb2h…

解决IDEA使用卡顿的问题

*问题&#xff1a;使用IDEA的时候卡顿 原因&#xff1a;IDEA默认分配的内存有上限 **可以查看内存分配情况及使用情况__ 解决&#xff1a; 设置JVM的启动参数&#xff1a; 进入idea的安装目录的bin文件夹 -Xms1024m -Xmx2048m -XX:ReservedCodeCacheSize1024m -XX:UseG1G…

【Java】IntelliJ IDEA使用JDBC连接MySQL数据库并写入数据

目录 0 准备工作1 创建Java项目2 添加JDBC 驱动程序3 创建数据库连接配置文件4 创建一个 Java 类来连接和操作数据库5 运行应用程序 在 IntelliJ IDEA 中连接 MySQL 数据库并将数据存储在数据表中&#xff0c;使用 Java 和 JDBC&#xff08;Java Database Connectivity&#xf…

抓包工具fiddler的基础知识详解

一、简介 fiddler是位于客户端和服务端之间的http代理 1、作用 监控浏览器所有的http/https流量查看、分析请求内容细节伪造客户端请求和服务器请求测试网站的性能解密https的web会话全局、局部断电功能第三方插件 2、使用场景 接口调试、接口测试、线上环境调试、web性能分…

论文速览 | TRS 2023: 使用合成微多普勒频谱进行城市鸟类和无人机分类

注1:本文系“最新论文速览”系列之一,致力于简洁清晰地介绍、解读最新的顶会/顶刊论文 论文速览 | TRS 2023: Urban Bird-Drone Classification with Synthetic Micro-Doppler Spectrograms 原始论文:D. White, M. Jahangir, C. J. Baker and M. Antoniou, “Urban Bird-Drone…

IBM Qiskit量子机器学习速成(一)

声明&#xff1a;本篇笔记基于IBM Qiskit量子机器学习教程的第一节&#xff0c;中文版译文详见&#xff1a;https://blog.csdn.net/qq_33943772/article/details/129860346?spm1001.2014.3001.5501 概述 首先导入关键的包 from qiskit import QuantumCircuit from qiskit.u…

文心一言 VS 讯飞星火 VS chatgpt (131)-- 算法导论11.2 3题

三、用go语言&#xff0c;Marley 教授做了这样一个假设&#xff0c;即如果将链模式改动一下&#xff0c;使得每个链表都能保持已排好序的顺序&#xff0c;散列的性能就可以有较大的提高。Marley 教授的改动对成功查找、不成功查找、插入和删除操作的运行时间有何影响? 文心一…

助力青少年学习,亚马逊云科技2024年全球人工智能和机器学习奖学金计划正式启动

云未来将以何种方式发展&#xff1f;方向握在意气风发的少年们手中。近日&#xff0c;亚马逊云科技全球人工智能和机器学习&#xff08;AI和ML&#xff09;奖学金计划在中国区的首次颁奖以及2024年启动仪式在北京中学正式举行&#xff0c;有45名学子凭借杰出的学业成绩、对人工…

打开pr提示找不到vcomp100.dll无法继续执行代码怎么办?5种dll问题解决方案全解析

vcomp100.dll是一个由Microsoft开发的动态链接库&#xff08;DLL&#xff09;文件&#xff0c;它对于许多基于图形的应用程序&#xff08;如Photoshop&#xff09;和多个游戏&#xff08;如《巫师3》&#xff09;至关重要。以下是关于vcomp100.dll的属性介绍以及找不到vcomp100…

Qt工程打包工具 windeployqt 的用法

1.复制工程下的“Debug”或者“Release”文件夹到你喜欢的路径&#xff0c;例如&#xff1a;D:\QT_out\ 2.在操作系统“开始”选项找到“Qt”文件夹&#xff0c;打开“Qt 5.15.2&#xff08;MSVC 2019 64-bit&#xff09;” 重点&#xff1a; 这里要注意的是&#xff0c;一定…

TSINGSEE智慧冶金工厂可视化AI视频智能监管方案,助力安全生产

一、背景与需求 有色金属冶炼工艺复杂&#xff0c;推进互联网、大数据、人工智能、5G、边缘计算、虚拟现实等前沿技术在有色冶炼工厂的应用&#xff0c;建成全流程综合集成信息管控平台、实现全要素数据可视化在线监控&#xff0c;已经成为当前冶金企业升级转型的主要建设目标…

2023年亚太杯APMCM数学建模大赛ABC题辅导及组队

2023年亚太杯APMCM数学建模大赛 ABC题 一元线性回归分析类 回归分析&#xff08;Regression Analysis)是确定两种或两种以上变量间相互依赖的定量关系的一种统计分析方法。   – 按涉及变量个数划分   • 一元回归分析   • 多元回归分析   – 按自变量和因变量之间关…

BSP-STM32移植FreeRTOS

在stm32裸机工程中的Middlewares目录添加freeRtos源码 在裸机工程中的main中调用freertos接口

什么是数据库?数据库有哪些基本分类和主要特点?

数据库是以某种有组织的方式存储的数据集合。本文从数据库的基本概念出发&#xff0c;详细解读了数据库的主要类别和基本特点&#xff0c;并就大模型时代备受瞩目的数据库类型——向量数据库进行了深度剖析&#xff0c;供大家在了解数据库领域的基本概念时起到一点参考作用。 …

c#装饰器模式详解

基础介绍&#xff1a; 动态地给一个对象添加一些额外的职责。适用于需要扩展一个类的功能&#xff0c;或给一个类添加多个变化的情况。 装饰器&#xff0c;顾名思义就是在原有基础上添加一些功能。 大家都只知道如果想单纯的给原有类增加一些功能&#xff0c;可以直接继续该类生…

“Python+”集成技术高光谱遥感数据处理与机器学习深度

高光谱遥感数据处理的基础、python开发基础、机器学习和应用实践。重点解释高光谱数据处理所涉及的基本概念和理论&#xff0c;旨在帮助学员深入理解科学原理。结合Python编程工具&#xff0c;专注于解决高光谱数据读取、数据预处理、高光谱数据机器学习等技术难题&#xff0c;…

[Java/力扣160]相交链表

这道题的关键是&#xff0c;使两个链表上的指针同时到达相交点处 方法一&#xff1a;分别遍历两个链表&#xff0c;得到其长度。然后先让较长的链表上的指针走“两链表长度之差”。然后两指针分别一步一步走&#xff0c;就能同时到达相交点处。 方法二&#xff1a;让 p1 遍历…

Elasticsearch:ES|QL 的限制

尽管 ES|QL 带来性能及使用上的便捷&#xff0c;在实际的使用中&#xff0c;它还是有一些限制。在今天的文章中&#xff0c;我们来列举它的一些限制。 结果集大小限制 默认情况下&#xff0c;ES|QL 查询最多返回 500 行。 你可以使用 LIMIT 命令将行数增加到最多 10,000 行。 …