注:当前使用的源码版本 rxjava:2.1.9
从这段线程切换的简单例子开始:
// 创建观察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 订阅线程 订阅的那一刻在订阅线程中执行
}
@Override
public void onNext(String o) {
// Android 主线程中执行
}
@Override
public void onError(@NonNull Throwable e) {
// Android 主线程中执行
}
@Override
public void onComplete() {
// Android 主线程中执行
}
};
// 创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// IO线程中执行
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
// 被观察者 IO 线程
observable = observable.subscribeOn(Schedulers.io());
// 观察者 Android主线程
observable = observable.observeOn(AndroidSchedulers.mainThread());
// 订阅
observable.subscribe(observer);
先来个流程图
在 源码阅读——简单例子 (一) 中我们了解到了Observable.create(ObservableOnSubscribe<T> source)
实际是 如下代码:
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// IO线程中执行
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
ObservableCreate
中含有一个subscribeActual(observer)
方法,用于执行传入观察者的observer.onSubscribe
方法,和间接调用 观察者的onNext、onComplete
等方法;
ObservableCreate
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// 省略部分代码 ...
}
subscribeActual
方法第二行,调用了传入的观察者的observer.onSubscribe(parent);
方法; 订阅发生时,在订阅线程主动执行了observer
的onSubscribe
方法;subscribeActual
方法第四行,调用了传入的观察者的observer.subscribe
方法;subscribe
方法中,用户通过调用CreateEmitter.onNext
方法,将数据发送出去;CreateEmitter
是对ObservableCreate.subscribeActual(Observer<? super T> observer)
方法传入的Observer
的封装;CreateEmitter
的作用是任务取消时,可以不再回调其封装的观察者;observer
的onNext
方法,由CreateEmitter.onNext
方法调用;
下边查看observable.subscribeOn(Schedulers.io())相关代码
注: ObservableEmitter
是CreateEmitter
的引用,是对Observer
的进一步封装。CreateEmitter
在执行onNext
时,如果任务取消,则不再回调Observer
的onNext
方法。
下边我们查看Observable
类的subscribeOn(Scheduler scheduler)
方法
Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 生成一个ObservableSubscribeOn对象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
RxJavaPlugins
ObservableSubscribeOn
对象
这里Observable observable = observableCreate.subscribeOn(Schedulers.io())
代码实际是
ObservableSubscribeOn observable = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
observable.subscribeOn(Schedulers.io())
返回的是一个ObservableSubscribeOn
的引用
下边查看ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
// ... 省略部分代码
}
看一下ObservableSubscribeOn
中的subscribeActual
方法
subscribeActual
方法第二行代码中,执行了传入Observer
的 onSubscribe
方法;subscribeActual
方法第三行: 在 scheduler
对应的IO线程
中,执行observableCreate
的subscribe
方法,传入参数为SubscribeOnObserver
,即:IO线程中
执行observableCreate.subscribe(new SubscribeOnObserver(observer));
因此,无论ObservableSubscribeOn.subscribeActual(observer)
在哪个线程中被调用observableCreate.subscribe(new SubscribeOnObserver<T>(observer))
均在IO线程中执行,因此观察者的e.onNext("hello"); e.onComplete();
亦在IO线程中执行;
下边我们查看Observable
类的observeOn(Scheduler scheduler)
方法
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
//
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
这里可以看到 Observable observable = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())
实际是:
ObservableObserveOn observable = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);
因此 ,observable.observeOn(AndroidSchedulers.mainThread())
返回的是ObservableObserveOn
的引用。
下边查看ObservableObserveOn
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
// ... 省略部分代码
}
看一下ObservableObserveOn
中的subscribeActual
方法
subscribeActual
方法第五行代码,实际为observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
ObserveOnObserver
的作用是在ObserveOnObserver
的onNext
方法被实行时;将observer
的onNext
方法post到 Android主线程
中;
Observable
的subscribe(Observer<? super T> observer)
方法,实际调用到了Observable
的subscribeActual(Observer<? super T> observer)
方法;observable
实际是ObservableObserveOn
的引用;
因此,observable.subscribe(observer)
实际执行的是observableObserveOn.subscribeActual(observer)
到这里,我们 线程切换 (二) 的小例子变换为了以下代码:
// 创建观察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 订阅线程 订阅的那一刻在订阅线程中执行
}
@Override
public void onNext(String o) {
// Android 主线程中执行
}
@Override
public void onError(@NonNull Throwable e) {
// Android 主线程中执行
}
@Override
public void onComplete() {
// Android 主线程中执行
}
};
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// IO线程中执行
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
//
ObservableSubscribeOn observableSubscribeOn = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
//
ObservableObserveOn observableObserveOn = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);
//
observableObserveOn.subscribeActual(observer);
下边我们查看observableObserveOn.subscribeActual(observer)
ObservableObserveOn.java
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
// source 为 observableSubscribeOn
super(source);
// scheduler 为AndroidSchedulers.mainThread()
this.scheduler = scheduler;
// false
this.delayError = delayError;
// 128
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// AndroidSchedulers.mainThread() 为 HandlerScheduler,因此会走到else部分代码
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
}
// 代码会走到else 部分
else {
Scheduler.Worker w = scheduler.createWorker();
// source 为 observableSubscribeOn
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
// ... 省略部分代码
}
subscribeActual
方法中,AndroidSchedulers.mainThread()
为HandlerScheduler
,因此 if 中的判断语句直接忽略,直接走到代码的 else 部分。subscribeActual
方法中,将观察者observer
封装成了ObserveOnObserver
;并且调用observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))
observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))
实际是
ObserveOnObserver observeOnObserver = new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
// 1、“订阅线程中” —— 执行onSubscribe, 实际执行的是observer的onSubscribe方法
observeOnObserver.onSubscribe(new SubscribeOnObserver<T>(observeOnObserver));
// 2、“IO程中” —— 执行subscribe ;IO线程 subscribe方法中,用户主动调用ObserveOnObserver的onNext、onError、onComplete方法,将数据发出去
observableCreate.subscribe(new SubscribeOnObserver<T>(observeOnObserver))
SubscribeOnObserver
的onNext
是将数据发送出去SubscribeOnObserver.onNext
调用了observeOnObserver.onNext
observeOnObserver.onNext
通过HandlerScheduler
将observer.onNext、observer.onError、observer.onComplete
等方法post到Android主线程中执行。
最后得出的整个代码流程如下
本文来自网易实践者社区,经作者夏学良授权发布。