一看就懂的RxJava源码分析

一看就懂的RxJava源码分析

    • 前言
    • 零、观察模式简介
    • 一、RxJava使用示例一
    • 二、示例一源码分析
      • 0. 示例一代码分解
      • 1. RxJava中的观察者是谁?
      • 2. RxJava中的被观察者又是谁?
      • 3. 观察者又是如何安插到被观察者中的?
      • 4. 示例一RxJava源码整体关系类图
      • 4. RxJava的Hook机制
    • 三、RxJava使用示例二
    • 四、示例二源码分析
      • 1. 同样从subscribe方法看起,看看观察者如何被安插到被观察者中
      • 2. subscribe方法源码如下
      • 3. subscribeActual方法是被map方法返回的对象实现,我们来看看map方法的源码
      • 4. 因此接着第2步去ObservableMap类中查看subscribeActual方法的源码
      • 5. 第4步提到的ObservableMap类中的source和function是什么?我们看下源码
      • 6. 第5步提到的map方法属于哪个对象呢?
      • 7. 我们接着第4步继续分析
      • 8. 我们看看ObservableEmitter发送的数据如何被map处理,然后又被观察者接收
    • 五、后记

前言

RxJava是一种基于观察者模式的异步编程库,且支持响应式编程,适用于处理复杂的事件流。RxJava是观察者模式的扩展应用,其的核心概念是Observable和Observer。Observable表示一个异步事件流,Observer表示对这个事件流的观察者。当Observable发出一个事件时,Observer会收到这个事件并进行相应的处理。RxJava还提供了一些操作符,可以对事件流进行过滤、转换、组合等操作,从而更方便地处理事件流。
本文对于RxJava的源码进行分析,以更加彻底的了解RxJava的实现思路。

零、观察模式简介

前言中我们提到RxJava是观察者模式的扩展应用,那么学习RxJava的源码,肯定要懂观察者模式的,如果不懂这种设计模式,直接来看源码是比较吃力的。本文也是以观察者模式为切入点来讲解RxJava源码

大家可以看下我之前写的全网最全面最精华的设计模式讲解,从程序员转变为工程师的第一步,这篇文章中对于观察者模式的介绍。再啰嗦一句,设计模式非常重要,也非常难以掌握,因为它不是简单的技术,而是一种思想,这世界上最难学习的就是思想,希望各位博友能够真正将这种思想融入自己的思维中。
在这里插入图片描述
简单来说,观察者模式有两个主要角色,一个是观察者,一个是被观察者,而实现观察者模式的核心是将观察者安插(聚合)到被观察者之中,这样被观察者的一举一动都能被观察者所捕捉,而这也是分析RxJava源码的核心,如果不能理解该核心,请移步我上面提到的文章,里面有比较详细的介绍。

一、RxJava使用示例一

使用RxJava需要在build.gradle中引用RxJava库

implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'

最简单的RxJava使用示例

public class MainActivity extends AppCompatActivity {

    private final static String TAG = MainActivity.class.getSimpleName();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("sun");
                emitter.onNext("hao");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(@NonNull String  s) {
                Log.d(TAG, "onNext -》 "+s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }
}

运行结果

2023-11-17 04:12:57.125 15872-15872/com.xiaomi.myaxjava D/MainActivity: onSubscribe
2023-11-17 04:12:57.125 15872-15872/com.xiaomi.myaxjava D/MainActivity: onNext -》 sun
2023-11-17 04:12:57.126 15872-15872/com.xiaomi.myaxjava D/MainActivity: onNext -》 hao

二、示例一源码分析

示例一的代码用于实现观察者监控被观察者发送的字符串,并进行打印。

0. 示例一代码分解

  • 为了便于我们更好的分析源码,我们将上面示例一中的匿名类实现全部去掉。匿名类是简化了代码,但有时候也提高了代码阅读的难度。去掉匿名类后的代码如下,看上去是不是容易理解多了。
public class MainActivity extends AppCompatActivity {

    private final static String TAG = MainActivity.class.getSimpleName();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        MySource mySource = new MySource();
        MyObserver myObserver = new MyObserver();
        Observable.create(mySource).subscribe(myObserver);
    }

    private class MySource implements ObservableOnSubscribe<String>
    {
        @Override
        public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
            emitter.onNext("sun");
            emitter.onNext("hao");
        }
    }

    private class MyObserver implements Observer<String>{

        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Log.d(TAG, "onSubscribe");
        }

        @Override
        public void onNext(@NonNull String s) {
            Log.d(TAG, "onNext -》 "+s);
        }

        @Override
        public void onError(@NonNull Throwable e) {
            Log.d(TAG, "onError");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    }
}
  • RxJava核心代码就一行:
    Observable.create(mySource).subscribe(myObserver);
  • 上面我们提到RxJava是观察者模式的扩展应用,而观察者模式的核心是将观察者Observer安插到被观察者Observable之中,监视被观察者的一举一动。下面我们需要找出观察者是谁,被观察者是谁,观察者又如何被插入到被观察者之中即可。

1. RxJava中的观察者是谁?

RxJava通过subscribe方法订阅观察者(subscribe方法其实就是实现将观察者安插到被观察者,后面我们会详细介绍如何实现安插的),即示例一代码分解后中的观察者是myObserver。

private class MyObserver implements Observer<String>{

        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Log.d(TAG, "onSubscribe");
        }

        @Override
        public void onNext(@NonNull String s) {
            Log.d(TAG, "onNext -》 "+s);
        }

        @Override
        public void onError(@NonNull Throwable e) {
            Log.d(TAG, "onError");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    }

2. RxJava中的被观察者又是谁?

其实如果理解观察者设计模式的话,我们从MyObservable的具体代码中,我们也能猜到,真正的被观察者应该是ObservableEmitter对象,因为被观察者是发数据方,观察者监察被观察者发送的数据,而负责发送数据的是ObservableEmitter对象的onNext方法,因此被观察者应该是ObservableEmitter对象,后面我们会从源码中证实这一点。

   private class MySource implements ObservableOnSubscribe<String>
    {
        @Override
        public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
            emitter.onNext("sun");
            emitter.onNext("hao");
        }
    }

3. 观察者又是如何安插到被观察者中的?

上面我们提到RxJava通过subscribe方法订阅观察者,即通过subscribe方法将观察者安插到被观察者中。下面我们开始分析subscribe方法的源码,看看观察者(示例中的myObserver)如何被安插到被观察者中。

  1. 示例一的RxJava核心代码就是下面一句
Observable.create(mySource).subscribe(myObserver);
  1. subscribe方法是属于哪个对象呢?
    很明显,subscribe方法的对象是由Observable.create()方法创建的,下面我们看看Observable.create方法的源码:
    public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
    }

create方法传入了我们自定义的MySource对象,返回时又被RxJavaPlugins.onAssembly方法处理了一下,我们来看看RxJavaPlugins.onAssembly方法:

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

可以看到如果onObservableAssembly为null,那么onAssembly方法就什么都不做,如果不为null就对source进行处理一下再返回,其实这就是RxJava的Hook机制,也就是通过onAssembly方法拦截一下被观察者,先对被观察者处理一波,后面我们会详细介绍,这儿我们忽略这个hook即可。
那么Observable.create()方法创建的就是ObservableCreate对象,即该案例中subscribe方法是ObservableCreate对象的方法。ObservableCreate的类图如下,subscribe方法被抽象类Observable所实现。
在这里插入图片描述
3. subscribe方法的源码如下

public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer); //这个是核心方法
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

看源码我们要先梳理主线,一定不要想着一开始每一句都看懂(因为这样大概率会看晕),一些支线的细节代码可以先不关注,重点是关注主线代码
大概一看subscribe方法中核心方法是subscribeActual(observer)方法,上面我们分析了subscribe方法属于ObservableCreate对象,那么subscribeActual方法自然也是属于ObservableCreate类,我们去ObservableCreate类中查看subscribeActual方法的源码如下:

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<>(observer);//这里创建了CreateEmitter对象,并将我们的观察者传进去了
        observer.onSubscribe(parent);//观察者执行onSubscribe方法。这里就能明白为什么观察者都是先执行onSubscribe方法了

        try {
            source.subscribe(parent);//这里的source是谁呢?
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

subscribeActual方法中首先创建了CreateEmitter对象,并将我们自定义的观察者聚合进去。其实,从这就证实了,CreateEmitter对象是被观察者,CreateEmitter通过构造函数将观察者注入,如此以来,观察者就被安插进了被观察者之中,具体如何监控被观察者,我们后面再说。
其次,subscribeActual方法不管三七二十一先执行观察者的onSubscribe方法,这里也能说明为什么RxJava都是先执行onSubscribe方法。
最后,subscribeActual方法执行source.subscribe(parent),将创建的被观察者即CreateEmitter对象parent传递了出去,那么这里的source又是谁呢?我们看看源码中source赋值的地方

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;//通过ObservableCreate构造函数传入source的值
    }
    
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
}

从源码中可以看到source是创建ObservableCreate对象的时候传入的,上面分析subscribe方法是属于哪个对象的时候,我们就看到ObservableCreate对象是由Observable.create创建的

    Observable.create(mySource).subscribe(myObserver);
    
    public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));//这里的source就是我们自定的mySource
    }
     
    private class MySource implements ObservableOnSubscribe<String>
    {
        @Override
        public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
            emitter.onNext("sun");
            emitter.onNext("hao");
        }
    }

这时我们再看subscribeActual方法源码

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);//source就是我们自定义MySource对象
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

source.subscribe(parent)中的source就是我们自定义的MySource对象,通过subscribe方法将subscribeActual方法中创建的被观察者CreateEmitter对象parent传递到MySource中。此时,我们再看被观察者发送数据时,观察者是如何监察到的

private class MySource implements ObservableOnSubscribe<String>
    {
        @Override
        public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
            emitter.onNext("sun");//emitter就是subscribeActual方法中创建的CreateEmitter对象
            emitter.onNext("hao");
        }
    }

被观察者emitter通过onNext发送数据,我们来看看onNext方法的源码

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);//观察者接收到被观察者发送的数据
            }
        }
    }

可以看到当被观察者对象调用onNext方法时,安插在内部的观察者会接收到数据,而观察者的onNext方法就是我们自定义实现的

private class MyObserver implements Observer<String>{

        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Log.d(TAG, "onSubscribe");
        }

        @Override
        public void onNext(@NonNull String s) {
            Log.d(TAG, "onNext -》 "+s);//监察到被观察者发送的数据
        }

        @Override
        public void onError(@NonNull Throwable e) {
            Log.d(TAG, "onError");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    }

4. 示例一RxJava源码整体关系类图

在这里插入图片描述
ObservableCreate就是一个中转器,它把MySource和MyObserver中转出去,之后观察者MyObserver聚合(安插)到被观察者CreateEmitter对象,最后,被观察者对象又通过subscribe方法被传递给MySource对象。

4. RxJava的Hook机制

  1. 上面我们也提到了该机制,Observable的create方法源码如下
    public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
    }

创建ObservableCreate对象之后,又被RxJavaPlugins.onAssembly方法处理一下才返回,这个就是RxJava的hook机制,其实就是将创建的被观察者先自己玩一把,再给观察者处理。

  1. RxJavaPlugins.onAssembly方法源码如下:
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;

 public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {//如果设置了onObservableAssembly,则意味着开启hook
            return apply(f, source);
        }
        return source;
    }

static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
        try {
            return f.apply(t);
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }

通过如下方法给onObservableAssembly赋值

   public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
        if (lockdown) {
            throw new IllegalStateException("Plugins can't be changed anymore");
        }
        RxJavaPlugins.onObservableAssembly = onObservableAssembly;
    }

使用RxJava的hook机制示例

 RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
            @Override
            public Observable apply(Observable observable) throws Throwable {
                return null;//在这拦截住Observable,自己先处理一波
            }
        });

三、RxJava使用示例二

public class MainActivity extends AppCompatActivity {

    private final static String TAG = MainActivity.class.getSimpleName();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("sun");
                emitter.onNext("hao");
            }
        }).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Throwable {
                Integer ans = 0;
                if (s.equals("sun"))
                {
                    ans = 66;
                }
                else if (s.equals("hao"))
                {
                    ans = 99;
                }
                else
                {
                    ans = 88;
                }
                return ans;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(@NonNull Integer s) {
                Log.d(TAG, "onNext -》 "+s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }
}

运行结果如下:

2023-11-17 04:58:08.660 16545-16545/com.xiaomi.myaxjava D/MainActivity: onSubscribe
2023-11-17 04:58:08.660 16545-16545/com.xiaomi.myaxjava D/MainActivity: onNext -66
2023-11-17 04:58:08.660 16545-16545/com.xiaomi.myaxjava D/MainActivity: onNext -99

四、示例二源码分析

示例二比示例一稍微复杂一点,加了map操作符,map操作符的作用是将被观察者发送的数据先做一波处理再给观察者。有了示例一源码的基础,分析示例二源码也比较简单了。在分析该部分源码前建议先去看一下我之前写的装饰模式,RxJava就是通过装饰模式将一个个的操作符装饰为各种类型的观察者(都是Observer的子类)。

1. 同样从subscribe方法看起,看看观察者如何被安插到被观察者中

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("sun");
                emitter.onNext("hao");
            }
        }).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Throwable {
                Integer ans = 0;
                if (s.equals("sun"))
                {
                    ans = 66;
                }
                else if (s.equals("hao"))
                {
                    ans = 99;
                }
                else
                {
                    ans = 88;
                }
                return ans;
            }
        }).subscribe(new Observer<Integer>() {//subscribe方法是哪个对象的方法?很明显是map方法返回的对象
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(@NonNull Integer s) {
                Log.d(TAG, "onNext -》 "+s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

2. subscribe方法源码如下

public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            
            subscribeActual(observer);//该方法是被哪个类实现的?很明显subscribe方法别哪个类实现,该方法就是被哪个方法实现
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

3. subscribeActual方法是被map方法返回的对象实现,我们来看看map方法的源码

    public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        //示例一中讲解过RxJavaPlugins.onAssembly是hook函数,在此我们不考虑,那么返回的就是ObservableMap对象
        return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
    }

4. 因此接着第2步去ObservableMap类中查看subscribeActual方法的源码

  @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));//这里面的source和function是什么?我们后面分析
    }

可以看到subscribeActual方法中将我们自定义的观察者送入了MapObserver的构造函数,这儿就是采用装饰模型将我们自定义的观察者进行装饰,形成一个新的观察者。我们看下MapObserver的源码

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual); //把我们自定义的观察者传递给了MapObserver的父类
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            ...
        }
    }
}

我们接着看MapObserver的父类BasicFuseableObserver是如何处理我们自定义的观察者的

public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {

    /** The downstream subscriber. */
    protected final Observer<? super R> downstream;
    
    /**
     * Construct a BasicFuseableObserver by wrapping the given subscriber.
     * @param downstream the subscriber, not null (not verified)
     */
    public BasicFuseableObserver(Observer<? super R> downstream) {
        this.downstream = downstream;//将我们自定义的观察者赋值给了downstream
    }
}

到这儿我们知道自定义的观察者最终被传递给了downstream,下面我们分析ObservableMap类中的source和function是什么?

5. 第4步提到的ObservableMap类中的source和function是什么?我们看下源码

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    // source和function都是通过ObservableMap的构造函数传入的
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
}

source和function都是通过ObservableMap的构造函数传入的,第3步我们分析map源码的时候看到创建了ObservableMap对象

   public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        //这里的mapper就是我们自定义的Function
        //那这里的this是谁呢?很明显map方法属于哪个对象,这个this是那个对象
        return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
    }

6. 第5步提到的map方法属于哪个对象呢?

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("sun");
                emitter.onNext("hao");
            }
        }).map(new Function<String, Integer>() {//map方法是Observable.create方法返回的对象
            @Override
            public Integer apply(String s) throws Throwable {
                Integer ans = 0;
                if (s.equals("sun"))
                {
                    ans = 66;
                }
                else if (s.equals("hao"))
                {
                    ans = 99;
                }
                else
                {
                    ans = 88;
                }
                return ans;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(@NonNull Integer s) {
                Log.d(TAG, "onNext -》 "+s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

我们接着看Observable.create方法的源码

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        //返回的是一个ObservableCreate的对象,这里的source是我们自定义的ObservableOnSubscribe对象
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
    }

到这儿我们知道ObservableMap类中的function就是我们在map方法中传入的自定义Function匿名类,source就是我们通过Observable.create方法创建的ObservableCreate对象

7. 我们接着第4步继续分析

  @Override
    public void subscribeActual(Observer<? super U> t) {
        //source就是我们通过Observable.create方法创建的ObservableCreate对象
        //function就是我们在map方法中传入的自定义Function匿名类
        source.subscribe(new MapObserver<T, U>(t, function));
    }

我们再来看ObservableCreate类中实现的subscribe方法,ObservableCreate继承自Observable,ObservableCreate实现的subscribe也是继承自Observable,因此我们去Observable中查看subscribe方法的源码

public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);//把装饰后的MapObserver对象通过subscribeActual方法传递出去
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

我们接着看ObservableCreate类中的subscribeActual方法源码,这个时候就和之前示例一分析的源码接上了,就不再详细分析

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //创建CreateEmitter对象,并将MapObserver对象传递进去
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        observer.onSubscribe(parent);

        try {
            //这里的source就是我们传入Observable.create方法的自定义ObservableOnSubscribe匿名类
            //那么subscribe方法就是我们外面自己实现的方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
			@Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                //emitter就是上面subscribeActual方法中实例化的CreateEmitter对象
                emitter.onNext("sun");
                emitter.onNext("hao");
            }

8. 我们看看ObservableEmitter发送的数据如何被map处理,然后又被观察者接收

ObservableEmitter通过onNext发送数据,以emitter.onNext(“sun”)为例,我们看下onNext的源码

 @Override
        public void onNext(T t) {
            if (t == null) {
                onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);//这里的observer就是ObservableMap对象,t是我们发送的数据“sun”
            }
        }

接着我们再看下ObservableMap中的onNext方法源码

		@Override
        public void onNext(T t) {// t = "sun"
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                //这里的mapper就是我们外面在map方法中传入的自定义Function,在此map先对发送的数据“sun”做了处理
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //downstream就是我们外面subscribe方法中传入的自定义Observer对象
            downstream.onNext(v);//这时候拿到数据是经过map处理后的数据
        }
			
		@Override
        public Integer apply(String s) throws Throwable {
                Integer ans = 0;
                if (s.equals("sun"))
                {
                    ans = 66;
                }
                else if (s.equals("hao"))
                {
                    ans = 99;
                }
                else
                {
                    ans = 88;
                }
                return ans;
          }

至此,整个示例二的源码我们就分析完了

五、后记

这也是我们第一次输出分析源码的文章,一是帮助自己理清楚RxJava源码流程,二是希望能够对其他博友有所帮忙,但第一次写源码分析类的文章,确实经验不足,感觉写的还是有点混乱,没有写出自己的预期,后面继续努力。

总之,就针对RxJava的源码,个人任务掌握一个切入点就对了,那就是从subscribe方法入手,看看我们自定义的观察者如何与发送数据的被观察者关联在一起的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/207601.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一本书读懂数据治理

企业数据治理非常必要&#xff0c;它是企业实现数字化转型的基础&#xff0c;是企业的一个顶层策略&#xff0c;一个管理体系&#xff0c;也是一个技术体系&#xff0c;涵盖战略、组织、文化、方法、制度、流程、技术和工具等多个层面的内容。 数据治理不是对“数据”的治理&am…

Unity 与 虚拟机ROS连接

Unity 与 虚拟机ROS连接 知识储备前期准备ROS部分Unity部分 连接测试 知识储备 unity官方教程&#xff1a; https://github.com/Unity-Technologies/Unity-Robotics-HubWin11家庭版开启HyperV&#xff1a; https://zhuanlan.zhihu.com/p/577980646HyperV安装Ubuntu: https://b…

SQL Sever 基础知识 - 数据查询

SQL Sever 基础知识 - 一、查询数据 一、查询数据第1节 基本 SQL Server 语句SELECT第2节 SELECT语句示例2.1 SELECT - 检索表示例的某些列2.2 SELECT - 检索表的所有列2.3 SELECT - 对结果集进行筛选2.4 SELECT - 对结果集进行排序2.5 SELECT - 对结果集进行分组2.5 SELECT - …

浅学指针(4)函数指针数组和qsort的使用

系列文章目录 文章目录 系列文章目录前言1.函数指针数组的⽤途作用&#xff1a;可以让代码更简洁&#xff0c;逻辑更清晰 2. 回调函数回调函数就是⼀个通过函数指针调⽤的函数 3 . qsort函数qsort函数可以排序所有数据类型解释如图&#xff1a;![在这里插入图片描述](https://i…

【高效开发工具系列】Hutool DateUtil工具类

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

Leetcode-二叉树oj题

1.二叉树的前序遍历 144. 二叉树的前序遍历https://leetcode.cn/problems/binary-tree-preorder-traversal/这个题目在遍历的基础上还要求返回数组&#xff0c;数组里面按前序存放二叉树节点的值。 既然要返回数组&#xff0c;就必然要malloc一块空间&#xff0c;那么我们需…

Spring简单的存储和读取

前言 前面讲了spring的创建&#xff0c;现在说说关于Bean和五大类注解 一、Bean是什么&#xff1f; 在 Java 语⾔中对象也叫做 Bean&#xff0c;所以后⾯咱们再遇到对象就以 Bean 著称。这篇文章还是以spring创建为主。 二、存储对象 2.1 俩种存储方式 需要在 spring-conf…

作用域和作用域链

前端面试大全JavaScript作用域和作用域链 &#x1f31f;经典真题 &#x1f31f;作用域&#xff08;Scope&#xff09; 什么是作用域 全局作用域和函数作用域 块级作用域 &#x1f31f;作用域链 什么是自由变量 什么是作用域链 关于自由变量的取值 &#x1f31f;作用域…

初识Linux:权限

目录 提示&#xff1a;以下指令均在Xshell 7 中进行 Linux 的权限 内核&#xff1a; 查看操作系统版本 查看cpu信息 查看内存信息 外部程序&#xff1a; 用户&#xff1a; 普通用户变为超级用户&#xff1a; su 和 su-的区别&#xff1a; root用户变成普通用户&#…

【matlab程序】画海洋流场

【matlab程序】画海洋流场 clear;clc; file ( ‘0227.nc’); latncread(file,‘latitude’); lonncread(file,‘longitude’); uncread(file,‘water_u’); vncread(file,‘water_v’); [x,y]meshgrid(lon,lat); xx’; yy’; interval4; figure (1) set(gcf,‘color’,[1 1 1…

【linux】基本指令(上篇)

1.快速认识5~6个指令 pwd指令 ls指令 touch指令 cd指令 clear指令 touch指令 详细讲解 首先有一个问题就是当我们创建一个文件&#xff0c;但是没有往里面写内容&#xff0c;那么磁盘上会有该文件吗&#xff1f; 磁盘上会保存&#xff0c;因为创建好的文件&#xff0c;没有…

【古月居《ros入门21讲》学习笔记】05_ROS是什么及其核心概念

目录 说明 1. ROS发展史 ROS版本演变 2. ROS是什么 ROS中的通信机制 ROS中的开发工具 ROS中的应用功能 ROS中的生态系统 3. ROS核心概念 节点与节点管理器 通信方式1&#xff1a;话题 通信方式2&#xff1a;服务 话题与服务的区别 参数 文件系统 说明 1. 本系列…

学习笔记7——数据库基础知识以及mysql的查询语句

学习笔记系列开头惯例发布一些寻亲消息 链接&#xff1a;https://baobeihuijia.com/bbhj/contents/3/199913.html 数据库 三个概念区分 DB&#xff1a;数据库&#xff0c;存储数据的仓库&#xff0c;有组织的数据容器DBMS:数据库管理系统SQL&#xff1a;几乎所有的DBMS都支持…

从PDF和图像中提取文本,以供大型语言模型使用

想法 大型语言模型已经席卷了互联网&#xff0c;导致更多的人没有认真关注使用这些模型最重要的部分&#xff1a;高质量的数据&#xff01;本文旨在提供一些有效从任何类型文档中提取文本的技术。 Python库 本文专注于Pytesseract、easyOCR、PyPDF2和LangChain库。实验数据是一…

jQuery的使用

目录 jquery对象&#xff1a; jquery作为一般函数调用参数: jquery事件机制 jquery dom操作 jquery对象&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" cont…

MySQL 教程 1.4

MySQL 连接 使用mysql二进制方式连接 您可以使用MySQL二进制方式进入到mysql命令提示符下来连接MySQL数据库。 实例 以下是从命令行中连接mysql服务器的简单实例&#xff1a; [roothost]# mysql -u root -p Enter password:****** 在登录成功后会出现 mysql> 命令提示窗…

Python全栈之基本数据类型详解

文章目录 1.注释2.输出3.变量4.命名规范5.变量的定义方式1.字符串类型2.数字类型3.List列表类型4.tuple 元组类型的定义5.Dict字典类型6.set集合类型7.数据类型转换8.自动类型转换9.强制类型转换关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品…

Reactor网络线程模型

目录 传统下网络服务模型 事件监听模型 NIO核心概念 单线程Reactor模式 多线程Reactor模式 Kafka 的网络设计 主要概念 类比思维理解 参考文章 传统下网络服务模型 线程太多无法处理大规模请求 事件监听模型 NIO核心概念 nio是实现reactor模式的底层API代码 单…

【SparkSQL】SparkSQL函数定义(重点:定义UDF函数、使用窗口函数)

【大家好&#xff0c;我是爱干饭的猿&#xff0c;本文重点介绍SparkSQL 定义UDF函数、SparkSQL 使用窗口函数。 后续会继续分享其他重要知识点总结&#xff0c;如果喜欢这篇文章&#xff0c;点个赞&#x1f44d;&#xff0c;关注一下吧】 上一篇文章&#xff1a;《【SparkSQL…

C++相关闲碎记录(2)

1、误用shared_ptr int* p new int; shared_ptr<int> sp1(p); shared_ptr<int> sp2(p); //error // 通过原始指针两次创建shared_ptr是错误的shared_ptr<int> sp1(new int); shared_ptr<int> sp2(sp1); //ok 如果对C相关闲碎记录(1)中记录的shar…