观察者模式(Observer Pattern):定义对象间的一种一对多依赖关系,使得每当一个对象状态发生改变时,其相关依赖对象皆得到通知并被自动更新。观察者模式又叫做发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。
观察者模式需要的两个主体对象:被观察主体(Subject)、观察者(Observer),直观的说,观察者模式就是 被观察对象发生改变时能够通知观察者状态的变化 所以,观察者(Observer)依赖于被观察主体(Subject)的通知,在得到通知后进行后续的处理。
从接口的角度讲,被观察主体(Subject)需要保存观察者(Observer)对象的列表,在状态发生变化时,能够通知到Observer,对于Observable,其抽象类可如下设计:
public abstract class Subject{
// 用来保存注册的观察者对象
private List<Observer> list = new ArrayList<Observer>();
// 注册观察者对象
public void add(Observer observer){ list.add(observer); }
// 删除观察者对象
public void delete(Observer observer){ list.remove(observer); }
// 通知所有注册的观察者对象
public void nodifyObservers(Object newState){
for(Observer observer : list){
observer.update(newState);
}
}
}
而对于Observer,它需要做的,就是收到通知之后,进行后续的处理,即
public interface Observer {
// 更新的状态
public void update(Object state);
}
RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。
RxJava是结合观察者模式设计的一套异步操作库,首先可以看一下其中主要与观察者模式相关的接口(源码)
首先来看观察者(Observer) Observer.class
public interface Observer<T> {
...
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
根据前面介绍的观察者模式,Observer在这里更新状态的模式有三种,onCompleted
, onError
,onNext
,即三种情境:消息通知(onNext
),消息完毕(onCompleted
),消息出错(onNext
),有这三个接口,编写程序的时候就可以更方便的对要处理的业务进行操作。 再看RxJava对其进一步包装(这个是我们经常用到的类Subscriber.class)
public abstract class Subscriber<T> implements Observer<T>, Subscription {
private final SubscriptionList cs;
...
}
cs管理当前观察者的所有订阅状态(Subscription 记录订阅状态),在实际代码中只需要实现Observer接口的三个函数即可。
然后看被观察主体(Subject)(源码) Subject.class
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
...
}
Subject继承了一个Observable对象(可观察对象,后面会提到),并继承了Observer,即Subject通知Observer也是通过onCompleted
, onError
,onNext
进行通知的,可以查看一下具体的Subject实例,如PublishSubject.class(源码)
public final class PublishSubject<T> extends Subject<T, T> {
...
final SubjectSubscriptionManager<T> state;
...
@Override
public void onCompleted() {
if (state.active) {
for (SubjectObserver<T> bo : state.terminate(n)) {
...
}
}
}
@Override
public void onError(final Throwable e) {
if (state.active) {
for (SubjectObserver<T> bo : state.terminate(n)) {
...
}
Exceptions.throwIfAny(errors);
}
}
@Override
public void onNext(T v) {
for (SubjectObserver<T> bo : state.observers()) {
...
}
}
...
}
从代码中可以看到state
存储并管理着观察者对象,通过onCompleted
, onError
,onNext
进行消息的通知,从而完成了一个观察者模式的 订阅——通知过程
一个简单的示例:
PublishSubject<String> publishSubject = PublishSubject.create();
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String t) {
System.out.println(t);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onCompleted() {
}
};
publishSubject.subscribe(subscriber);
publishSubject.onNext("Hello world");
打印Hello world
前面提到RxJava是 一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库,RxJava最大的特点有两个:一个是异步,一个是事件序列的变换。
首先讲异步,创造一个可观察对象,如下:
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();
}
}
);
创建观察者(或者成为订阅者)
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
};
订阅:
myObservable.subscribe(mySubscriber);
查看源码可以发现,在被订阅前Observable不会发出任何消息,在subscribe
操作中,会发生如下过程:
subscribe(mySubscriber){
....
call(mySubscriber){
}
}
即这一过程会把订阅者mySubscriber
作为参数传入call
函数中,并执行call
函数,所以这里得到打印:Hello, world!
那么,假设有一个任务:对一个音频文件进行入库,需要进行元数据解析、特征提取、匹配等操作,最终入库。
这个任务在前面是个很耗时很慢的过程,最后入库操作可以看成是在前面操作完成后,往数据库塞一条记录即可,是个相对较快的过程。
假设前面处理得到的是个Song
数据结构,最后的操作即把该数据插入数据库即可,即最后一步:
addToDb(Song)
如果有个观察者来接收Song
,那么它只需要入库就好了,即
Subscriber<Song> songSubscriber = new Subscriber<Song>() {
@Override
public void onNext(Song s) { addToDb(Song); }
...
};
对于被观察者,它需要产生数据:
Observable<Song> songObservable = Observable.create(
new Observable.OnSubscribe<Song>() {
@Override
public void call(Subscriber<? super String> sub) {
try{
Song = { // Get Song}
sub.onNext(Song)
sub.onCompleted();
} catch(Exception e){ // 处理异常
sub.onError(e)
}
}
}
);
然后订阅
songObservable.subscribe(songSubscriber);
那么这里完成的操作就是,当解析、处理完一个音频文件后,才会向songSubscriber
发出一个消息,也就是完成异步通知操作,然后songSubscriber
把它进行入库 如果,songObservable运行的速度较慢,想开个新线程让它去跑,而不堵塞当前的程序,那么可以用observeOn
定制它运行的线程如:
songObservable .observeOn(Schedulers.io())
其中Schedulers.io()
是songObservable 执行的线程,这里可以直接采用Schedulers
类提供的几个由RxJava产生的线程,或自己定制执行线程池,如
songObservable .observeOn(Schedulers.from(executor))
同样的,可以定制观察者所在的工作线程:
songObservable.observeOn(Schedulers.from(executor)).subscribeOn(Schedulers.from(executor));
如果这个时候,需求发生变化:需要返回的不是一个Song
对象,而只要返回Album
对象,该怎么做呢?
这里就涉及到另一个概念:序列事件的变换,最常用的就是RxJava的map
操作符(都是针对Observable对象)。
Observable.map(new Func1<T, R>() {
@Override
public R call(T t) {
R = dosomething(t)
return R;
}
});
通过map函数,将原来类型为T的,转换为类型为R的,然后再进行发送消息,
Subscriber<Album> albumSubscriber = new Subscriber<Album>() {
@Override
public void onNext(Album) { addToDb(Album); }
...
};
...
songObservable .map(new Func1<Song, Album>() {
@Override
public Album call(Song t) {
Album = dosomething(Song)
return Album;
}
}).subscribe(albumSubscriber);
map操作符可以一直转换下去,也就是说可以Observable.map...map...
直至数据符合所需。RxJava
的Observable
提供了丰富的操作符,使得数据的处理可以直接链式到达,从而可以简化代码。
常用的的操作符分类有:
Create — 通过调用观察者的方法从头创建一个Observable
Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable
From — 将其它的对象或数据结构转换为Observable
Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
Distinct — 去重,过滤掉重复数据项
Filter — 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的
参考资料
另外要注明就是,RxJava
中大量使用匿名函数,在JDK8以后,采用lambda
表达式可以极大的简化这些代码。
网易云大礼包:https://www.163yun.com/gift
本文来自网易实践者社区,经作者廖祥俐授权发布