在项目中遇到了类似下面这样的代码
本意是希望当zip操作符中三个Observable执行完毕之后,将他们返回的数据统一进行处理
Observable.zip(startFirst(), startSecond(), startThird(),
(first, second, third) -> {
Log.i("Rxjava", "handle all data");
return 1;
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
Log.e("Rxjava", e.getMessage());
}
@Override
public void onComplete() {
}
});
private Observable<Boolean> startFirst() {
return new Observable<Boolean>() {
@Override
protected void subscribeActual(Observer<? super Boolean> observer) {
Log.i("Rxjava", "startFirst");
}
};
}
private Observable<Boolean> startSecond() {
return Observable
.timer(300, TimeUnit.MILLISECONDS)
.map(aLong -> {
Log.i("Rxjava", "startSecond");
return true;
});
}
private Observable<Boolean> startThird() {
return Observable
.timer(360, TimeUnit.MILLISECONDS)
.map(aLong -> {
Log.i("Rxjava", "startThird");
return true;
});
}
但打印日志后,实际结果如下:
发现他只走了三个Observable流, 并没有去处理这些Observable返回的数据
带着这个问题我去看了zip实现原理‘
下面我把自定义传入的类称为zipper 就是框中的这一段
来看zip方法实现
public static <T1, T2, T3, R> Observable<R> zip(
ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, ObservableSource<? extends T3> source3,
Function3<? super T1, ? super T2, ? super T3, ? extends R> zipper) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2, source3);
}
//上面将我们传入zipper封装 Functions.toFunction(zipper),封装成了一个Array3Func类
public static <T1, T2, T3, R> Function<Object[], R> toFunction(final Function3<T1, T2, T3, R> f) {
ObjectHelper.requireNonNull(f, "f is null");
return new Array3Func<T1, T2, T3, R>(f);
}
//最终zipper被封装在这个类里面
static final class Array3Func<T1, T2, T3, R> implements Function<Object[], R> {
final Function3<T1, T2, T3, R> f;
Array3Func(Function3<T1, T2, T3, R> f) {
this.f = f; //这个f就是我们传的zipper
}
@SuppressWarnings("unchecked")
@Override
public R apply(Object[] a) throws Exception {
if (a.length != 3) {
throw new IllegalArgumentException("Array of size 3 expected but got " + a.length);
}
return f.apply((T1)a[0], (T2)a[1], (T3)a[2]); //这里调用zipper的apply方法 参数对应的就是三个Observable发送的值 至于这个值怎么得来的 继续往下看
}
}
zip调用了zipArray,而zipArray果然不出意外返回了一个ObservableZip ,参数sources就是我们传递的那些Observables
public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper,
boolean delayError, int bufferSize, ObservableSource<? extends T>... sources) {
if (sources.length == 0) {
return empty();
}
ObjectHelper.requireNonNull(zipper, "zipper is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableZip<T, R>(sources, null, zipper, bufferSize, delayError));
}
现在需要去看ObservableZip的subscribeActual(),了解Rxjava原理的应该都知道 当调用ObservableZip的subscribe()时,他一定会去调subscribeActual()
public ObservableZip(ObservableSource<? extends T>[] sources,
Iterable<? extends ObservableSource<? extends T>> sourcesIterable,
Function<? super Object[], ? extends R> zipper,
int bufferSize,
boolean delayError) {
this.sources = sources; //Observable列表
this.sourcesIterable = sourcesIterable;
this.zipper = zipper; //自定义的zipper
this.bufferSize = bufferSize;
this.delayError = delayError;
}
@Override
@SuppressWarnings("unchecked")
public void subscribeActual(Observer<? super R> observer) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new ObservableSource[8];
for (ObservableSource<? extends T> p : sourcesIterable) {
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = p;
}
} else {
count = sources.length; //获取到Observable的数量
}
if (count == 0) {
EmptyDisposable.complete(observer);
return;
}
//将observer zipper count 等参数封装
ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(observer, zipper, count, delayError);
zc.subscribe(sources, bufferSize);
}
当调用subscribe时,会调用subscribeActual,在subscribeActual中 获取到我们传入的Observable数量 ,将zipper ,observer (这个observer是subscribe zip操作符时传入的 ),Observable数量等封装成一个ZipCoordinator类 然后调用ZipCoordinator类的subscribe
ZipCoordinator类:
ZipCoordinator(Observer<? super R> actual,
Function<? super Object[], ? extends R> zipper,
int count, boolean delayError) {
this.downstream = actual; //observer
this.zipper = zipper;
this.observers = new ZipObserver[count]; //创建了一个zipObserver数组
this.row = (T[])new Object[count]; //这个数组用来存储那三个Observable发射的数据
this.delayError = delayError;
}
public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
ZipObserver<T, R>[] s = observers;
int len = s.length;
for (int i = 0; i < len; i++) {
s[i] = new ZipObserver<T, R>(this, bufferSize);
}
// this makes sure the contents of the observers array is visible
this.lazySet(0);
downstream.onSubscribe(this);
for (int i = 0; i < len; i++) {
if (cancelled) {
return;
}
sources[i].subscribe(s[i]); //给传入的那几个Observable注册观察者
}
}
ZipCoordinator的subscribe()主要做的事情就是实例化了count个ZipObserver,count代表zip中传入的Observable数量,然后用这个zipObserver分别作为观察者注册给我们传入的那几个Observable,这样的话我们传入的那几个Observable操作流就可以执行了
针对本例就是startFirst() startSecond() startThird()
这几个方法开始执行,因为Rxjava流是从下到上,由上而下调用 所以他最终去调用Observer的onNext onError或者onComplete
所以现在去看ZipObserver里面这几个方法的实现
ZipObserver(ZipCoordinator<T, R> parent, int bufferSize) {
this.parent = parent; //parent指的是上面的ZipCoordinator
this.queue = new SpscLinkedArrayQueue<T>(bufferSize);
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
queue.offer(t);
parent.drain();
}
@Override
public void onError(Throwable t) {
error = t;
done = true;
parent.drain();
}
@Override
public void onComplete() {
done = true;
parent.drain();
}
ZipObserver的onNext onError或者onComplete
都是将Observable发射的数据存在了一个queue里面 然后调用ZipCoordinator的drain()
public void drain() {
if (getAndIncrement() != 0) {
return;
}
int missing = 1;
final ZipObserver<T, R>[] zs = observers;
final Observer<? super R> a = downstream;
final T[] os = row; //这个row上面提到过 是一个Object[] 长度为count
final boolean delayError = this.delayError;
for (;;) {
for (;;) {
int i = 0;
int emptyCount = 0;
for (ZipObserver<T, R> z : zs) {
if (os[i] == null) {
boolean d = z.done;
T v = z.queue.poll(); //从queue中取出Observable发射的数据
boolean empty = v == null;
if (checkTerminated(d, empty, a, delayError, z)) {
return;
}
if (!empty) { //如果不为空 将数据保存在os[]中
os[i] = v;
} else {
emptyCount++; // 记录空值数量
}
} else {
if (z.done && !delayError) {
Throwable ex = z.error;
if (ex != null) {
cancelled = true;
cancel();
a.onError(ex);
return;
}
}
}
i++;
}
// 在这里判断 如果Observale没有发射数据 就不会再继续往下了
if (emptyCount != 0) {
break;
}
R v;
try {
//调用zipper的apply 这个os就是发射的数据集合
v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancel();
a.onError(ex);
return;
}
a.onNext(v);
Arrays.fill(os, null);
}
missing = addAndGet(-missing);
if (missing == 0) {
return;
}
}
}
好了 这下明白了
zip实现就是给传入的那些Observablef分别注册了一个zipObserver,然后在zipObserver里面是将发射的数据存在了一个queue里面,接着for循环去取queue里面的数据 如果不为空 将把数据存在了一个object[]里面,只要有一个Observable没有发射数据 就不会去分发这个object[] 如果都不为空 才去调用zipper的apply,参数就是这个object[]
还记得吗 之前zipper是被封装成了一个Array3Func
public R apply(Object[] a) throws Exception {
if (a.length != 3) {
throw new IllegalArgumentException("Array of size 3 expected but got " + a.length);
}
return f.apply((T1)a[0], (T2)a[1], (T3)a[2]); //
}
三个Observable发射的数据调用f.apply去处理 f就是自己传的那个zipper啦
至于我遇到的问题再来看,发现在startFirst()里面我没有发送任何数据
修改一下startFirst
果然就可以走到处理数据的地方了 log如下