RxJava是什么?根据RxJava在GitHub上给出的描述: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java
大致意思是: RxJava—一个可以在JVM上运行的,基于观察者模式 实现异步操作的java库。 RxJava的作用就是异步
。RxJava的使用,可以使“逻辑复杂的代码”保持极强的阅读性。
Rxjava github地址 RxAndroid github地址
RxAndorid的作用: RxAndorid 封装了AndroidSchedulers.mainThread()
,Android开发者使用过程中,可以轻松的将任务post Andorid主线程
中,执行页面更新操作。
//
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//1、“异步线程” 执行耗时操作
//2、“执行完毕” 调用onNext触发回调,通知观察者
e.onNext("1");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// 订阅线程 订阅的那一刻在订阅线程中执行
}
@Override
public void onNext(String value) {
// “主线程”执行的方法
}
@Override
public void onError(Throwable e) {
// "主线程"执行的方法
}
@Override
public void onComplete() {
// "主线程"执行的方法
}
});
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
// IO 线程
// 请求网络数据
e.onNext("123456");
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) {
// IO 线程
// 网络数据解析(数据转化)
//
// throw new RequestFailException("获取网络请求失败");
return 123;
}
}).doOnNext(new Consumer<Integer>() { //保存登录结果UserInfo
@Override
public void accept(@NonNull Integer bean) throws Exception {
// IO 线程
// 保存网络数据
}
}).subscribeOn(Schedulers.io()) //IO线程
.observeOn(AndroidSchedulers.mainThread()) //主线程
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer bean) throws Exception {
// 更新UI
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
// 错误 显示错误页面
}
});
Flowable是为了应对Backpressure
产生的。 Flowable是一个被观察者
,与Subscriber(观察者)
配合使用
//
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
//1、“异步线程” 执行耗时操作
//2、“执行完毕” 调用onNext触发回调,通知观察者
emitter.onNext(0);
emitter.onComplete();
}
// 若消费者消费能力不足,则抛出MissingBackpressureException异常
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// 订阅时执行,发生在“订阅线程”
// 这个方法是用来向生产者申请可以消费的事件数量
// 这里表明消费者拥有Long.MAX_VALUE的消费能力
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
// “主线程”执行的方法
}
@Override
public void onError(Throwable t) {
// "主线程"执行的方法
}
@Override
public void onComplete() {
// "主线程"执行的方法
}
});
Backpressure(背压)
即生产者的生产速度
大于消费者的消费能力
引起的问题。
在RxJava中有一种情况就是被观察者发送消息十分迅速
以至于观察者不能及时的响应这些消息
。
例如:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// “异步线程”中 生产者有无限的生产能力
while (true){
e.onNext(1);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// “主线程”中 消费者消费能力不足,从而造成事件无限堆积,最后导致OOM
Thread.sleep(2000);
System.out.println(integer);
}
});
异步线程中
生产者有无限的生产能力; 主线程
中 消费者消费能力不足,从而造成事件无限堆积,最后导致OOM。
上述的现象,有个专有的名词来来形容,即:Backpressure(背压)
Subscription.request(long n)
方法是用来向生产者申请可以消费的事件数量
。
request(long n)
方法后,生产者便发送对应数量的事件供消费者消费;不显示调用request
就表示消费能力为0
。
在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。 无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件;当然如果本身并没有这么多事件需要发送,则不会存128个事件。
BackpressureStrategy.ERROR
策略下,如果生产者生产的事件大于128个,缓存池便会溢出,从而抛出MissingBackpressureException
异常;BackpressureStrategy.BUFFER
策略:将RxJava中默认的128个事件的缓存池换成一个更大的缓存池,这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件。但是这种方式比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。总之BUFFER要慎用。BackpressureStrategy.DROP
策略:当消费者处理不了事件,则丢弃。消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。BackpressureStrategy.LATEST
策略: LATEST与DROP功能基本一致。消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。
注:当前使用的源码版本 rxjava:2.1.9
从这段不涉及操作符和线程切换的简单例子开始:
// 创建观察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String o) {
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError data is :" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
// 创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
// 订阅
observable.subscribe(observer);
先看一下ObservableOnSubscribe.java
这个类
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
由代码可知 ObservableOnSubscribe
是一个回调接口,回调方法中参数为ObservableEmitter
,下边看一下ObservableEmitter
这个类。
ObservableEmitter.java
ObservableEmitter字面意思是被观察者发射器,看一下源码:
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
@NonNull
ObservableEmitter<T> serialize();
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
ObservableEmitter
是对Emitter
的扩展,而扩展的方法正是 RxJava2.0 之后引入的。提供了可中途取消等新能力,我们看 Emitter
源码:
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
Emitter
字面意思是发射器,这里边的三个方法,大家都很熟悉了。其对应了以下这段代码:
new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
}
回调说完,下边我们来看Observable.create(ObservableOnSubscribe<T> source)
这段代码。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
ObservableOnSubscribe
被用来创建ObservableCreate
,其实ObservableCreate
就是Observable
的一个实现类
因此 Observable.create(ObservableOnSubscribe<T> source)
这段代码,实际是:
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// IO线程中执行
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
ObservableOnSubscribe.subscribe
方法被执行时,用户通过调用ObservableEmitter.onNext
方法,将数据发送出去(发送给观察者)
下边我们看一下ObservableCreate
这个类
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// 省略部分代码 ...
}
ObservableOnSubscribe.subscribe
方法是在ObservableCreate.subscribeActual
方法中第四行中被执行了;subscribe
方法中,用户通过调用ObservableEmitter.onNext
方法,将数据发送出去;subscribeActual
方法第二行,调用了observer.onSubscribe(parent);
方法。 订阅发生时,在订阅线程主动执行了observer
的onSubscribe
方法;CreateEmitter
是对ObservableCreate.subscribeActual(Observer<? super T> observer)
方法传入的Observer
的封装;CreateEmitter
的作用是任务取消时,可以不再回调其封装的观察者;observer
的onNext
方法,由CreateEmitter.onNext
方法调用;
Observable.create(ObservableOnSubscribe<T> source);
方法最终返回一个 ObservableCreate
对象。 下边看 observable.subscribe(observer);
方法
observable.subscribe(observer);
即 订阅发生的那一刻。observable.subscribe(observer);
实际是ObservableCreate.subscribe(observer);
下边查看Observable
的subscribe(observer)
方法
Observable.subscribe(Observer observer)
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// Observable的subscribe方法,实际执行的是subscribeActual方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
//
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
observable.subscribe(observer);
方法时,实际是调用了observable.subscribeActual(observer)
方法。observable
为ObservableCreate
的引用,因此这里调用的是ObservableCreate.subscribeActual(observer)
方法。
我们又回到 ObservableCreate
这个类的subscribeActual
方法
ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
// subscribeActual 方法在 订阅发生的那一刻被调用 既 observable.subscribe(observer);时被调用
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 若中途任务取消,通过CreateEmitter 可终止对observer中方法onNext 、onError 等的回调
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 订阅发生时,执行 观察者的onSubscribe(Disposable d) 方法
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// 省略部分代码 ...
}
subscribeActual
方法在 订阅发生的那一刻被调用的;在 observable.subscribe(observer);
时被调用;observer.onSubscribe(parent);
订阅发生时,在订阅线程回调observer
的onSubscribe
方法subscribeActual
方法中,传入的Observer
会被包装成一个CreateEmitter
;若中途任务取消,通过CreateEmitter
可终止对observer
中方法onNext 、onError
等的回调;
subscribeActual 中第二行代码 observer.onSubscribe(parent);
observer.onSubscribe(parent);
订阅发生时,执行 观察者的onSubscribe(Disposable d)
方法,这里回到了以下代码
// 创建观察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
}
// ... 省略onNext、onError、onComplete
};
new CreateEmitter<T>(observer)
,其实现了Disposable
接口,若任务取消,则不回调传入的观察者observer
对应的onNext 、onError、onComplete
等方法
subscribeActual 中第四行代码 source.subscribe(parent);
source.subscribe(parent);
是ObservableOnSubscribe.subscribe(new CreateEmitter<T>(observer));
代码最终回到ObservableOnSubscribe
的 subscribe
:
new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
}
subscribe
中,调用到 CreateEmitter
类的onNext 、onComplete、onError
方法,将数据发送CreateEmitter
中的观察者
到此,“这段不涉及操作符和线程切换的简单例子” 的代码跟踪结束。
本文来自网易实践者社区,经作者夏学良授权发布。