RxJava2 使用 及 源码阅读 (下篇)

源码阅读——线程切换 (二)

注:当前使用的源码版本 rxjava:2.1.9

从这段线程切换的简单例子开始:

// 创建观察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        // 订阅线程  订阅的那一刻在订阅线程中执行
    }

    @Override
    public void onNext(String o) {
        // Android 主线程中执行
    }

    @Override
    public void onError(@NonNull Throwable e) {
        // Android 主线程中执行
    }

    @Override
    public void onComplete() {
        // Android 主线程中执行
    }
};

// 创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO线程中执行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
// 被观察者 IO 线程
observable = observable.subscribeOn(Schedulers.io());
// 观察者  Android主线程
observable = observable.observeOn(AndroidSchedulers.mainThread());
// 订阅
observable.subscribe(observer);

先来个流程图

a、Observable.create(ObservableOnSubscribe source)

源码阅读——简单例子 (一) 中我们了解到了Observable.create(ObservableOnSubscribe<T> source)实际是 如下代码:

//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO线程中执行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});

  • ObservableCreate 中含有一个subscribeActual(observer) 方法,用于执行传入观察者的observer.onSubscribe方法,和间接调用 观察者的onNext、onComplete 等方法;

ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代码 ...
}

  • subscribeActual方法第二行,调用了传入的观察者的observer.onSubscribe(parent);方法; 订阅发生时,在订阅线程主动执行了observeronSubscribe方法;
  • subscribeActual方法第四行,调用了传入的观察者的observer.subscribe 方法;subscribe方法中,用户通过调用CreateEmitter.onNext方法,将数据发送出去;
  • CreateEmitter 是对ObservableCreate.subscribeActual(Observer<? super T> observer)方法传入的Observer的封装;
  • CreateEmitter的作用是任务取消时,可以不再回调其封装的观察者;observeronNext方法,由CreateEmitter.onNext方法调用;

下边查看observable.subscribeOn(Schedulers.io())相关代码

注: ObservableEmitterCreateEmitter的引用,是对Observer的进一步封装。CreateEmitter在执行onNext时,如果任务取消,则不再回调ObserveronNext方法。

b、observable.subscribeOn(Schedulers.io())

下边我们查看Observable 类的subscribeOn(Scheduler scheduler)方法

Observable.java

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    // 生成一个ObservableSubscribeOn对象
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

  • 继续忽略RxJavaPlugins
  • 最终返回一个ObservableSubscribeOn 对象

这里Observable observable = observableCreate.subscribeOn(Schedulers.io())代码实际是

ObservableSubscribeOn observable = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())

  • 因此 observable.subscribeOn(Schedulers.io()) 返回的是一个ObservableSubscribeOn 的引用

下边查看ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    // ... 省略部分代码
}

看一下ObservableSubscribeOn中的subscribeActual 方法

  • subscribeActual 方法第二行代码中,执行了传入ObserveronSubscribe 方法;
  • subscribeActual 方法第三行: 在 scheduler 对应的IO线程中,执行observableCreatesubscribe 方法,传入参数为SubscribeOnObserver,即:IO线程中 执行observableCreate.subscribe(new SubscribeOnObserver(observer));

因此,无论ObservableSubscribeOn.subscribeActual(observer)在哪个线程中被调用observableCreate.subscribe(new SubscribeOnObserver<T>(observer))均在IO线程中执行,因此观察者的e.onNext("hello"); e.onComplete(); 亦在IO线程中执行;

c、observable.observeOn(AndroidSchedulers.mainThread())

下边我们查看Observable 类的observeOn(Scheduler scheduler)方法

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}
// 
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

这里可以看到 Observable observable = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())实际是:

ObservableObserveOn observable = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);

因此 ,observable.observeOn(AndroidSchedulers.mainThread()) 返回的是ObservableObserveOn 的引用。

下边查看ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    // ... 省略部分代码
}

看一下ObservableObserveOn中的subscribeActual 方法

  • subscribeActual 方法第五行代码,实际为observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
  • ObserveOnObserver 的作用是在ObserveOnObserveronNext方法被实行时;将observeronNext方法post到 Android主线程中;

d、observable.subscribe(observer)

  • 我们知道Observablesubscribe(Observer<? super T> observer)方法,实际调用到了ObservablesubscribeActual(Observer<? super T> observer) 方法;
  • 而这里的observable 实际是ObservableObserveOn的引用;

因此,observable.subscribe(observer)实际执行的是observableObserveOn.subscribeActual(observer)

到这里,我们 线程切换 (二) 的小例子变换为了以下代码:

// 创建观察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        // 订阅线程  订阅的那一刻在订阅线程中执行
    }

    @Override
    public void onNext(String o) {
        // Android 主线程中执行
    }

    @Override
    public void onError(@NonNull Throwable e) {
        // Android 主线程中执行
    }

    @Override
    public void onComplete() {
        // Android 主线程中执行
    }
};
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO线程中执行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
//
ObservableSubscribeOn observableSubscribeOn = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
//
ObservableObserveOn observableObserveOn = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);
//
observableObserveOn.subscribeActual(observer);

下边我们查看observableObserveOn.subscribeActual(observer)

ObservableObserveOn.java

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        // source 为 observableSubscribeOn
        super(source);
        // scheduler 为AndroidSchedulers.mainThread()
        this.scheduler = scheduler;
        // false
        this.delayError = delayError;
        // 128
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // AndroidSchedulers.mainThread() 为 HandlerScheduler,因此会走到else部分代码
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        }
        // 代码会走到else 部分
         else {
            Scheduler.Worker w = scheduler.createWorker();
            // source 为 observableSubscribeOn
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    // ... 省略部分代码
}

  • subscribeActual 方法中,AndroidSchedulers.mainThread()HandlerScheduler ,因此 if 中的判断语句直接忽略,直接走到代码的 else 部分。
  • subscribeActual 方法中,将观察者observer封装成了ObserveOnObserver;并且调用observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))
  • observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))实际是

ObserveOnObserver observeOnObserver = new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
// 1、“订阅线程中” —— 执行onSubscribe, 实际执行的是observer的onSubscribe方法
observeOnObserver.onSubscribe(new SubscribeOnObserver<T>(observeOnObserver));
// 2、“IO程中” —— 执行subscribe ;IO线程 subscribe方法中,用户主动调用ObserveOnObserver的onNext、onError、onComplete方法,将数据发出去
observableCreate.subscribe(new SubscribeOnObserver<T>(observeOnObserver))

  • 用户调用SubscribeOnObserveronNext 是将数据发送出去
  • SubscribeOnObserver.onNext调用了observeOnObserver.onNext
  • observeOnObserver.onNext通过HandlerSchedulerobserver.onNext、observer.onError、observer.onComplete 等方法post到Android主线程中执行。

e、整体流程图如下

最后得出的整个代码流程如下

参考

手把手教你使用 RxJava 2.0(一)

RxJava2 源码解析(一)

RxJava2 源码解析——流程

本文来自网易实践者社区,经作者夏学良授权发布。