使用
val retrofit =Retrofit.Builder()
.baseUrl("https://api.github.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava3CallAdapterFactory.create())
.build()
val api = retrofit.create(API::class.java)
api.getRepo("rengwuxian")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : SingleObserver<MutableList<Repo>>{
override fun onSubscribe(d: Disposable?) {
LogUtil.logD("onSubscribe${Thread.currentThread().name}")
clv1.text = "onSubscribe"
}
override fun onSuccess(t: MutableList<Repo>) {
LogUtil.logD(t[0].name)
clv1.text =t[0].name
}
override fun onError(e: Throwable) {
LogUtil.logD(e.message!!)
clv1.text = e.message!!
}
})
RxJava需要我们主动切线程
.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
也可以
.addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io()))
在所有配置中设为后台线程
.subscribeOn(Schedulers.io()) 那么这个就不需要了
源码:
private RxJava3CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
this.scheduler = scheduler;
this.isAsync = isAsync;
}
内部赋值
Single.just 发送顺时事件
Single.just("1")
.subscribe(object : SingleObserver<String?>{
override fun onSubscribe(d: Disposable?) {
TODO("Not yet implemented")
}
override fun onSuccess(t: String?) {
TODO("Not yet implemented")
}
override fun onError(e: Throwable?) {
TODO("Not yet implemented")
}
})
源码:
public static <@NonNull T> Single<T> just(T item) {
Objects.requireNonNull(item, "item is null"); //先判断空
return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}
//钩子方法
public static <@NonNull T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
//Single
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.single;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
}
关键方法
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed()); //产生订阅 然后丢弃 是个枚举类
observer.onSuccess(value); //成功 ,没有失败的回调因为不可能失败 因为是一个可用对象
}
操作符: map,转换对象类型
val single: Single<Int> = Single.just(1)
val singleString = single.map(object : Function<Int,String>{
override fun apply(t: Int): String {
return t.toString()
}
})
多个single map
按时间发送数据: 从0开始发送,间隔1秒
Observable.interval(0,1,TimeUnit.SECONDS)
.subscribe(object : Observer<Long>{
override fun onSubscribe(d: Disposable?) {
TODO("Not yet implemented")
}
override fun onNext(t: Long?) {
clv1.text= t!!.toString()
}
override fun onError(e: Throwable?) {
TODO("Not yet implemented")
}
override fun onComplete() {
TODO("Not yet implemented")
}
})
public static Observable<Long> interval(long initialDelay, long period, @NonNull TimeUnit unit) {
return interval(initialDelay, period, unit, Schedulers.computation());
}
interval
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.schedulers.TrampolineScheduler;
public final class ObservableInterval extends Observable<Long> {
final Scheduler scheduler;
final long initialDelay;
final long period;
final TimeUnit unit;
public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
}
@Override
public void subscribeActual(Observer<? super Long> observer) {
IntervalObserver is = new IntervalObserver(observer);
observer.onSubscribe(is); //传入
Scheduler sch = scheduler;
if (sch instanceof TrampolineScheduler) {
Worker worker = sch.createWorker();
is.setResource(worker);
worker.schedulePeriodically(is, initialDelay, period, unit);
} else {
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
is.setResource(d);
}
}
static final class IntervalObserver
extends AtomicReference<Disposable>
implements Disposable, Runnable {
private static final long serialVersionUID = 346773832286157679L;
final Observer<? super Long> downstream;
long count;
IntervalObserver(Observer<? super Long> downstream) {
this.downstream = downstream;
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
@Override
public void run() {
if (get() != DisposableHelper.DISPOSED) {
downstream.onNext(count++);
}
}
public void setResource(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
}
===
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.schedulers.TrampolineScheduler;
public final class ObservableInterval extends Observable<Long> {
final Scheduler scheduler;
final long initialDelay;
final long period;
final TimeUnit unit;
public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
}
@Override
public void subscribeActual(Observer<? super Long> observer) {
IntervalObserver is = new IntervalObserver(observer);
observer.onSubscribe(is);
Scheduler sch = scheduler;
if (sch instanceof TrampolineScheduler) {
Worker worker = sch.createWorker();
is.setResource(worker);
worker.schedulePeriodically(is, initialDelay, period, unit);
} else {
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
is.setResource(d);
}
}
//实现了Disposable 然后extends AtomicReference引用,线程安全的Disposable
static final class IntervalObserver
extends AtomicReference<Disposable>
implements Disposable, Runnable {
private static final long serialVersionUID = 346773832286157679L;
final Observer<? super Long> downstream;
long count;
IntervalObserver(Observer<? super Long> downstream) {
this.downstream = downstream;
}
@Override
public void dispose() {
DisposableHelper.dispose(this); //调用内部的dis
}
@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
@Override
public void run() {
if (get() != DisposableHelper.DISPOSED) {
downstream.onNext(count++);
}
}
public void setResource(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
}
// dispose
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get(); //拿到内部的
Disposable d = DISPOSED;
if (current != d) { //如果已经被取消
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
//setResource 把内部值设为传入的值 ,只设置一次
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
Objects.requireNonNull(d, "d is null");
if (!field.compareAndSet(null, d)) {
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
实际设置定时任务的代码: ObservableInterval->subscribeActual
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
@Override public void run() { if (get() != DisposableHelper.DISPOSED) { downstream.onNext(count++); } }
ObservableInterval 内部维护了一个IntervalObserver创建和取消
SingleMap.delay
public final Single<T> delay(long time, @NonNull TimeUnit unit) { return delay(time, unit, Schedulers.computation(), false); }
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.single;
import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
public final class SingleDelay<T> extends Single<T> {
final SingleSource<? extends T> source;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final boolean delayError;
public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) {
this.source = source;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
this.delayError = delayError;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SequentialDisposable sd = new SequentialDisposable(); //内部维护
observer.onSubscribe(sd); //下游的sd
source.subscribe(new Delay(sd, observer)); //传递到上游
}
final class Delay implements SingleObserver<T> {
private final SequentialDisposable sd;
final SingleObserver<? super T> downstream;
Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {
this.sd = sd;
this.downstream = observer;
}
@Override
public void onSubscribe(Disposable d) {
sd.replace(d);
}
@Override
public void onSuccess(final T value) {
sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
//线程调度,进行置换,置换为本地,不再是上游
}
@Override
public void onError(final Throwable e) {
sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
}
final class OnSuccess implements Runnable {
private final T value;
OnSuccess(T value) {
this.value = value;
}
@Override
public void run() {
downstream.onSuccess(value);
}
}
final class OnError implements Runnable {
private final Throwable e;
OnError(Throwable e) {
this.e = e;
}
@Override
public void run() {
downstream.onError(e);
}
}
}
}
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.disposables;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.disposables.Disposable;
/**
* A Disposable container that allows updating/replacing a Disposable
* atomically and with respect of disposing the container itself.
* <p>
* The class extends AtomicReference directly so watch out for the API leak!
* @since 2.0
*/
public final class SequentialDisposable
extends AtomicReference<Disposable>
implements Disposable {
private static final long serialVersionUID = -754898800686245608L;
/**
* Constructs an empty SequentialDisposable.
*/
public SequentialDisposable() {
// nothing to do
}
/**
* Construct a SequentialDisposable with the initial Disposable provided.
* @param initial the initial disposable, null allowed
*/
public SequentialDisposable(Disposable initial) {
lazySet(initial);
}
/**
* Atomically: set the next disposable on this container and dispose the previous
* one (if any) or dispose next if the container has been disposed.
* @param next the Disposable to set, may be null
* @return true if the operation succeeded, false if the container has been disposed
* @see #replace(Disposable)
*/
public boolean update(Disposable next) {
return DisposableHelper.set(this, next);
}
/**
* Atomically: set the next disposable on this container but don't dispose the previous
* one (if any) or dispose next if the container has been disposed.
* @param next the Disposable to set, may be null
* @return true if the operation succeeded, false if the container has been disposed
* @see #update(Disposable)
*/
public boolean replace(Disposable next) {
return DisposableHelper.replace(this, next);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
ObservableMap
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver;
import java.util.Objects;
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
//通过上游调用
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Throwable {
T t = qd.poll();
return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
//上游
public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
//有上下游 且可中断的
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
if (beforeDownstream()) {
downstream.onSubscribe(this);
afterDownstream();
}
}
}
//dispose 上游dispose
@Override
public void dispose() {
upstream.dispose();
}
ObservableDelay
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.SerializedObserver;
public final class ObservableDelay<T> extends AbstractObservableWithUpstream<T, T> {
final long delay;
final TimeUnit unit;
final Scheduler scheduler;
final boolean delayError;
public ObservableDelay(ObservableSource<T> source, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) {
super(source);
this.delay = delay;
this.unit = unit;
this.scheduler = scheduler;
this.delayError = delayError;
}
@Override
@SuppressWarnings("unchecked")
public void subscribeActual(Observer<? super T> t) {
Observer<T> observer;
if (delayError) {
observer = (Observer<T>)t; //
} else {
observer = new SerializedObserver<>(t);
}
Scheduler.Worker w = scheduler.createWorker();//线程调度器
source.subscribe(new DelayObserver<>(observer, delay, unit, w, delayError));
//上游的DelayObserver
}
static final class DelayObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
final long delay;
final TimeUnit unit;
final Scheduler.Worker w;
final boolean delayError;
Disposable upstream;
DelayObserver(Observer<? super T> actual, long delay, TimeUnit unit, Worker w, boolean delayError) {
super();
this.downstream = actual;
this.delay = delay;
this.unit = unit;
this.w = w;
this.delayError = delayError;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
@Override
public void onNext(final T t) {
w.schedule(new OnNext(t), delay, unit);
}
@Override
public void onError(final Throwable t) {
w.schedule(new OnError(t), delayError ? delay : 0, unit);//local延时
}
@Override
public void onComplete() {
w.schedule(new OnComplete(), delay, unit);
}
@Override
public void dispose() {
upstream.dispose();
w.dispose();
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
final class OnNext implements Runnable {
private final T t;
OnNext(T t) {
this.t = t;
}
@Override
public void run() {
downstream.onNext(t);
}
}
final class OnError implements Runnable {
private final Throwable throwable;
OnError(Throwable throwable) {
this.throwable = throwable;
}
@Override
public void run() {
try {
downstream.onError(throwable);
} finally {
w.dispose();
}
}
}
final class OnComplete implements Runnable {
@Override
public void run() {
try {
downstream.onComplete();
} finally {
w.dispose();
}
}
}
}
}
关联上下游 先验证 然后赋值
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this); //启动下游subsc
}
}
主要是看生产者有没有上游,有没有自己生产的 1 有上游,2自己生产的,
dispose 取消也是这个原则
线程切换:
subscribeOn
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.single;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.*;
public final class SingleSubscribeOn<T> extends Single<T> {
final SingleSource<? extends T> source;
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
observer.onSubscribe(parent);
Disposable f = scheduler.scheduleDirect(parent); //runnable,切线程
parent.task.replace(f);//切换线程
}
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 7000911171163930287L;
final SingleObserver<? super T> downstream;
final SequentialDisposable task;
final SingleSource<? extends T> source;
SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
this.downstream = actual;
this.source = source;
this.task = new SequentialDisposable();
}
@Override
public void onSubscribe(Disposable d) {
//d =上游传递的Disposable
DisposableHelper.setOnce(this, d);
}
@Override
public void onSuccess(T value) {
downstream.onSuccess(value);
}
@Override
public void onError(Throwable e) {
downstream.onError(e);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);//上游取消切换
task.dispose(); //内部取消
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public void run() {
source.subscribe(this);//对上游进行订阅
}
}
}
SingleObserveOn:
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.single;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class SingleObserveOn<T> extends Single<T> {
final SingleSource<T> source;
final Scheduler scheduler;
public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
//调用subscribe,第一时间不切换线程
}
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> downstream;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) //赋值下游
{
downstream.onSubscribe(this); //调用下游的Disposable
//取消的时候 下游也取消
}
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);//置换切线程的任务,发消息前取消上游,然后切线程任务
}
@Override
public void onError(Throwable e) {
this.error = e;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onSuccess(value);
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);//取消的时候 下游也取消
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
}
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.single;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class SingleObserveOn<T> extends Single<T> {
final SingleSource<T> source;
final Scheduler scheduler;
public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
}
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> downstream;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
downstream.onSubscribe(this);
}
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this); //切线程
DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e) {
this.error = e;
Disposable d = scheduler.scheduleDirect(this); //切线程
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onSuccess(value);
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
}
切线程work
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker(); //createWorker 1
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//钩子方法
DisposeTask task = new DisposeTask(decoratedRun, w);//
w.schedule(task, delay, unit);
return task;
}
1
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory); //threadFactory 2
}
2
public interface ScheduledExecutorService extends ExecutorService {
}
//本质通过线程池创建管理 Executors
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
return exec;
}
切换主线程 mainThread通过 :Looper.getMainLooper()
static final Scheduler DEFAULT = internalFrom(Looper.getMainLooper(), true);
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
if (async) {
message.setAsynchronous(true);
}
//发消息到主线程 handler = new Handler(looper); loop = mainLoop
handler.sendMessageDelayed(message, unit.toMillis(delay));
return scheduled;
}