API 介绍和原理简析
1. 概念:扩展的观察者模式
观察者模式面向的需求是:举一个例子,警察在小偷实施作案的时候实施抓捕,在这一个例子中警察是观察者,小偷是被观察者。但是程序的观察者模式和这个还是有所区别的。观察者不需要一直看着被观察者,而是采用注册(Register)或者订阅的模式(Subscribe),在被观察者发生变化的时候通知观察者做出相应的变化。在Android开发中有一个典型的例子,机试控件的点击监听OnClickListener,在这个过程中View是被观察者,
OnClickListener
是观察者,二者通过 setOnClickListener()实现的订阅关系。我们可以随意定制自己程序中的观察者和被观察者,而警察叔叔明显无法要求小偷『你在作案的时候务必通知我』
RxJava的观察者模式
RxJava 有四个基本概念:Observable
(可观察者,即被观察者)、 Observer
(观察者)、 subscribe
(订阅)、事件。Observable
和Observer
通过 subscribe()
方法实现订阅关系,从而 Observable
可以在需要的时候发出事件来通知 Observer。
与传统观察者模式不同,RxJava事件回调除了onNext(相当于 onClick()
/ onEvent()
)之外,还有其他的方法,onCompleted()
和 onError()。
onComplete():时间队列完结,RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的onNext()
发出时,需要触发 onCompleted()
方法作为标志
onError();时间队列异常,事件队列异常。在事件处理过程中出异常时,onError()
会被触发,同时队列自动终止,不允许再有事件发出
在一个正常运行的时间序列中,onComplated()和onError()有且只有一个,并且是时间序列中的最后一个,onCompleted()
和 onError()
二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
2.基本实现
1)创建Observer
Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer
接口的实现方式:
Observerobserver = new Observer () { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); }};
除了observer外,Rxjava还内置了一个实现了observer的抽象类:SubScriber,对observer接口进行了一些扩展,但是用法一样
Subscribersubscriber = new Subscriber () { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); }};
用法是一样的,在 RxJava 的 subscribe 过程中,Observer
也总是会先被转换成一个 Subscriber
再使用。两种的区别在于
1.onStart()
: 这是 Subscriber
增加的方法,它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作
2.unsubscribe()
: 解除订阅,一般在这个方法调用前,可以使用 isUnsubscribed()
先判断一下状态
2)创建Observable
被观察者,它决定什么时候出发事件以及怎么触发事件, RxJava 使用 create()
方法来创建一个 Observable ,并为它定义事件触发规则
Observable observable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onNext("Aloha"); subscriber.onCompleted(); }});
create()
方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如:
just(T...)将传入的参数依次发送过来
Observable observable = Observable.just("Hello", "Hi", "Aloha");// 将会依次调用:// onNext("Hello");// onNext("Hi");// onNext("Aloha");// onCompleted();
上面 just(T...)
的例子和 from(T[])
的例子,都和之前的 create(OnSubscribe)
的例子是等价的。
3)Subscribe(订阅)
创建Observable和observer之后,再用再用 subscribe()
方法将它们联结起来,整条链子就可以工作了
observable.subscribe(observer);// 或者:observable.subscribe(subscriber);
Observable.subscribe(Subscriber)
的内部实现是这样的(仅核心代码):
public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber;}
- 调用
Subscriber.onStart()
。这个方法在前面已经介绍过,是一个可选的准备方法。 - 调用
Observable
中的OnSubscribe.call(Subscriber)
。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中,Observable
并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当subscribe()
方法执行的时候。 - 将传入的
Subscriber
作为Subscription
返回。这是为了方便unsubscribe()
3)线程控制-----Scheduler(--)
在不指定线程的时候,RxJava遵循的是线程不变的原则,在那个线程调用 subscribe()
,就在那个线程产生事件,在那个线程产生事件就在那个线程消费事件,所以就需要用到 Scheduler
(调度器)。
1)Scheduler的API(-)
在RxJava中相当于线程控制器,用于指定某一段代码应该运行在那个线程里面,RxJava已经内置了几个Schedule,他们已经适合大多数的使用场景。
Schedulers.immediate():直接在当前线程运行,相当于不指定线程,这是默认色Scheduler;
Schedulers.newThread():总是启动新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler
。行为模式和 newThread()
差不多,区别在于 io()
的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()
比 newThread()
更有效率。不要把计算工作放在 io()
中,可以避免创建不必要的线程.
Schedulers.computation():计算所使用的Scheduler.计算所使用的 Scheduler
。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler
使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation()
中,否则 I/O 操作的等待时间会浪费 CPU
AndroidSchedulers.mainThread()
,:指定在主线程进行操作
有了这几个 Scheduler
,就可以使用 subscribeOn()
和 observeOn()
两个方法来对线程进行控制了。 * subscribeOn()
: 指定subscribe()
所发生的线程,即 Observable.OnSubscribe
被激活时所处的线程。或者叫做事件产生的线程。 * observeOn()
: 指定Subscriber
所运行在的线程。或者叫做事件消费的线程
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Action1() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } });
上面这段代码中,由于 subscribeOn(Schedulers.io())
的指定,被创建的事件的内容 1
、2
、3
、4
将会在 IO 线程发出;而由于observeOn(AndroidScheculers.mainThread()
) 的指定,因此 subscriber
数字的打印将发生在主线程 。事实上,这种在 subscribe()
之前写上两句 subscribeOn(Scheduler.io())
和 observeOn(AndroidSchedulers.mainThread())
的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。
4.变换
RxJava提供了对事件序列进行变换的支持,这是核心功能。变换的概念就是将事件序列中的对象或整个序列进行加工处理,传换成不同的时间或时间序列。
Observable.just("images/logo.png") // 输入类型 String .map(new Func1() { @Override public Bitmap call(String filePath) { // 参数类型 String return getBitmapFromPath(filePath); // 返回类型 Bitmap } }) .subscribe(new Action1 () { @Override public void call(Bitmap bitmap) { // 参数类型 Bitmap showBitmap(bitmap); } });
可以看到,map()
方法将参数中的 String
对象转换成一个 Bitmap
对象后返回,而在经过 map()
方法后,事件的参数类型也由 String
转为了 Bitmap
。这种直接变换对象并返回的,是最常见的也最容易理解的变换。不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。我列举几个常用的变换
map():事件对象的直接变换,是RxJava最常用的变换。
flatMap:是非常难理解的变换,举个例子,假设有一个数据结构【学生】,现在需要打印出一组学生的名字。