观察者模式与RxJava

猪小花1号2018-09-05 09:22

作者:廖祥俐

观察者模式

观察者模式(Observer Pattern):定义对象间的一种一对多依赖关系,使得每当一个对象状态发生改变时,其相关依赖对象皆得到通知并被自动更新。观察者模式又叫做发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。

观察者模式需要的两个主体对象:被观察主体(Subject)、观察者(Observer),直观的说,观察者模式就是 被观察对象发生改变时能够通知观察者状态的变化 所以,观察者(Observer)依赖于被观察主体(Subject)的通知,在得到通知后进行后续的处理。

从接口的角度讲,被观察主体(Subject)需要保存观察者(Observer)对象的列表,在状态发生变化时,能够通知到Observer,对于Observable,其抽象类可如下设计:

public abstract class Subject{
    // 用来保存注册的观察者对象
    private    List<Observer> list = new ArrayList<Observer>();
    //  注册观察者对象
    public void add(Observer observer){ list.add(observer); }
    //  删除观察者对象
    public void delete(Observer observer){ list.remove(observer); }
    //  通知所有注册的观察者对象
    public void nodifyObservers(Object newState){
        for(Observer observer : list){
            observer.update(newState);
        }
    }
}

而对于Observer,它需要做的,就是收到通知之后,进行后续的处理,即

public interface Observer {
    // 更新的状态
    public void update(Object state);
}


RxJava中的观察者模式

RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。

RxJava是结合观察者模式设计的一套异步操作库,首先可以看一下其中主要与观察者模式相关的接口(源码)

首先来看观察者(Observer) Observer.class

public interface Observer<T> {
...
    void onCompleted();

    void onError(Throwable e);

    void onNext(T t);
}

根据前面介绍的观察者模式,Observer在这里更新状态的模式有三种,onCompletedonErroronNext,即三种情境:消息通知(onNext),消息完毕(onCompleted),消息出错(onNext),有这三个接口,编写程序的时候就可以更方便的对要处理的业务进行操作。 再看RxJava对其进一步包装(这个是我们经常用到的类Subscriber.class)

public abstract class Subscriber<T> implements Observer<T>, Subscription {
private final SubscriptionList cs;
...
}

cs管理当前观察者的所有订阅状态(Subscription 记录订阅状态),在实际代码中只需要实现Observer接口的三个函数即可。

然后看被观察主体(Subject)(源码) Subject.class

public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
...
}

Subject继承了一个Observable对象(可观察对象,后面会提到),并继承了Observer,即Subject通知Observer也是通过onCompletedonErroronNext进行通知的,可以查看一下具体的Subject实例,如PublishSubject.class(源码)

public final class PublishSubject<T> extends Subject<T, T> {
...
 final SubjectSubscriptionManager<T> state;
...
 @Override
    public void onCompleted() {
        if (state.active) {
            for (SubjectObserver<T> bo : state.terminate(n)) {
               ...
            }
        }
    }

    @Override
    public void onError(final Throwable e) {
        if (state.active) {
            for (SubjectObserver<T> bo : state.terminate(n)) {
               ...
            }
            Exceptions.throwIfAny(errors);
        }
    }

    @Override
    public void onNext(T v) {
        for (SubjectObserver<T> bo : state.observers()) {
           ...
        }
    }
...
}

从代码中可以看到state存储并管理着观察者对象,通过onCompletedonErroronNext进行消息的通知,从而完成了一个观察者模式的 订阅——通知过程

一个简单的示例:

PublishSubject<String> publishSubject = PublishSubject.create();
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String t) {
              System.out.println(t);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onCompleted() {
    }
};
publishSubject.subscribe(subscriber);
publishSubject.onNext("Hello world");

打印Hello world


Observable

前面提到RxJava是 一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库,RxJava最大的特点有两个:一个是异步,一个是事件序列的变换。

首先讲异步,创造一个可观察对象,如下:

Observable<String> myObservable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            sub.onNext("Hello, world!");
            sub.onCompleted();
        }
    }
);

创建观察者(或者成为订阅者)

Subscriber<String> mySubscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) { System.out.println(s); }

    @Override
    public void onCompleted() { }

    @Override
    public void onError(Throwable e) { }
};

订阅:

myObservable.subscribe(mySubscriber);

查看源码可以发现,在被订阅前Observable不会发出任何消息,在subscribe操作中,会发生如下过程:

subscribe(mySubscriber){
....
call(mySubscriber){
}
}

即这一过程会把订阅者mySubscriber作为参数传入call函数中,并执行call函数,所以这里得到打印:Hello, world!

那么,假设有一个任务:对一个音频文件进行入库,需要进行元数据解析、特征提取、匹配等操作,最终入库。

这个任务在前面是个很耗时很慢的过程,最后入库操作可以看成是在前面操作完成后,往数据库塞一条记录即可,是个相对较快的过程。

假设前面处理得到的是个Song数据结构,最后的操作即把该数据插入数据库即可,即最后一步:

addToDb(Song)

如果有个观察者来接收Song,那么它只需要入库就好了,即

Subscriber<Song> songSubscriber = new Subscriber<Song>() {
    @Override
    public void onNext(Song s) { addToDb(Song); }
...
};

对于被观察者,它需要产生数据:

Observable<Song> songObservable = Observable.create(
    new Observable.OnSubscribe<Song>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            try{
                       Song = { // Get Song}
                      sub.onNext(Song)
                      sub.onCompleted();
             } catch(Exception e){ // 处理异常
                   sub.onError(e)
            }
        }
    }
);

然后订阅

songObservable.subscribe(songSubscriber);

那么这里完成的操作就是,当解析、处理完一个音频文件后,才会向songSubscriber发出一个消息,也就是完成异步通知操作,然后songSubscriber把它进行入库 如果,songObservable运行的速度较慢,想开个新线程让它去跑,而不堵塞当前的程序,那么可以用observeOn定制它运行的线程如:

songObservable .observeOn(Schedulers.io())

其中Schedulers.io()是songObservable 执行的线程,这里可以直接采用Schedulers类提供的几个由RxJava产生的线程,或自己定制执行线程池,如

songObservable .observeOn(Schedulers.from(executor))

同样的,可以定制观察者所在的工作线程:

songObservable.observeOn(Schedulers.from(executor)).subscribeOn(Schedulers.from(executor));

如果这个时候,需求发生变化:需要返回的不是一个Song对象,而只要返回Album对象,该怎么做呢?

这里就涉及到另一个概念:序列事件的变换,最常用的就是RxJava的map操作符(都是针对Observable对象)。

Observable.map(new Func1<T, R>() {
    @Override
    public R call(T t) {
        R = dosomething(t)
        return R;
    }
});

通过map函数,将原来类型为T的,转换为类型为R的,然后再进行发送消息,

Subscriber<Album> albumSubscriber = new Subscriber<Album>() {
    @Override
    public void onNext(Album) { addToDb(Album); }
...
};
...
songObservable .map(new Func1<Song, Album>() {
    @Override
    public Album call(Song t) {
        Album = dosomething(Song)
        return Album;
    }
}).subscribe(albumSubscriber);

map操作符可以一直转换下去,也就是说可以Observable.map...map...直至数据符合所需。RxJavaObservable提供了丰富的操作符,使得数据的处理可以直接链式到达,从而可以简化代码。

常用的的操作符分类有:

  • 创建操作

Create — 通过调用观察者的方法从头创建一个Observable

Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable

From — 将其它的对象或数据结构转换为Observable

Just — 将对象或者对象集合转换为一个会发射这些对象的Observable

  • 变换操作

FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。

Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项

  • 过滤操作

Distinct — 去重,过滤掉重复数据项

Filter — 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的

参考资料

另外要注明就是,RxJava中大量使用匿名函数,在JDK8以后,采用lambda表达式可以极大的简化这些代码。


网易云大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者廖祥俐授权发布