开始总结记录 RxJava
的相关内容,本文中所有涉及 RxJava
的地方均指 Rxjava2.0
。
本文主要以 Observable
和 Flowable
为例介绍如何创建被观察者和观察者,并连接他们。
推荐文章
记录一下看过的几篇还不错的关于 RxJava
的文章,感谢他们的总结和分享。
RxJava1 - 《 给 Android 开发者的 RxJava 详解 》- 扔物线),这篇文章是对 RxJava1
的讲解,很不错,图文并茂,看了一遍之后仍旧有些疑惑,建议多看几遍,很多关于原理的介绍可以加强对 Rx
的理解。
RxJava2 - 《 给初学者的RxJava2.0教程(一) 》- 掘金 以及 RxJava2 - 《 给初学者的RxJava2.0教程(二) 》- 掘金,特点就是作者使用水管上游下游的描述方式,简化了对观察者模式和事件发送的理解,浅显易懂,也很全面,适合入门。
《 RxJava2.0 你不知道的事》- 简书 是对 RxJava1.x
和 RxJava2.x
的对比,文章中对 2.x
的部分 api
,进行了列举,对比起来看就清晰多了,另外很好的解释了背压的问题,受益匪浅。
我的理解
上面推荐的文章中对 RxJava
的相关原理都做了部分说明,我就不做过多描述,说一下我的一些理解吧。
RxJava
最关键的两个点就是 观察者模式 和 异步。在 RxJava
中被观察者作为事件的产生方,是 主动 的,是整个事件流程的起点。观察者作为事件的处理方,是 被动 的,是整个事件流程的终点。在起点和终点之间,即事件传递的过程中是可以被加工,过滤,转换,合并等等方式处理的。
Observable
,被观察者,被订阅者,可被观察的,他是数据和事件发射的源,他在RxJava
中是有多种实现方式,这里不是说的哪个类,而是一种泛指。
Observer/Subscriber
,观察者,订阅者,他是事件接受者。
整体来看,可以理解为一条事件流,被观察者在上游发送事件,观察者在下游接受事件,中游会有很多针对事件的处理和变换,这样理解更简单一些,也更有助于理解背压(Backpressure
)的存在。为了更好的理解,避免叙述的混乱,文章中我会用 上游 和 下游 这样的描述来代替 被观察者 和 观察者。
使用 Observable
创建一个最简单的 Observable
,发送 Integer
类型的数据,RxJava
有很多创建 Observable
的简单方法,我们暂时就使用最原始的那种,方便理解。
ObservableEmitter
继承 Emitter
,是一个数据发射器,用来向观察者发送数据。
1 | // 创建 Observable |
创建一个 Observer
,接受数据并打印。
1 | Observer<Integer> observer = new Observer<Integer>() { |
实现订阅,查看结果。
1 | observable.subscribe(observer); |
这样我们就实现了一个简单的订阅流程,完成了数据的传递,总结以下要点:
ObservableEmitter
继承 Emitter
,是一个发射器,用来向下游发送事件,它可以发送如下3种事件,也对应下游订阅者的相关方法。
1 | public interface Emitter<T> { |
上游和下游的所有方法都默认运行在当前所在线程内,如上运行结果,我在子线程运行则所有方法会在子线程调用。
订阅发生在
observable.subscribe(observer);
时,此时上游才开始发送事件,并且onSubscribe()
方法会在开始订阅时首先执行。上游可以发送无数个
onNext(T t)
事件,下游都可以接受到。当上游发送
onComplete()
事件之后,上游的事件会继续发送,但是下游在接受到onCompelete()
事件之后就会切换事件流,不会在接受后续的事件,因此发送多个onComplete()
虽然不会导致程序 crash,但是是无意义的。当上游发送
onError()
事件之后,上游的事件会继续发送,但是下游在接受到onError()
事件之后就会切换事件流,不会在接受后续的事件,当你发送第二个onError()
事件时会导致程序 crash。发送
onComplete()
和onError()
事件不是必须的。
onComplete()
和onError()
是唯一且互斥的,你不能发送多个onComplete()
或多个onError()
,也不能不能发送一个onCompelete()
事件再发送onError()
,反过来也是。
中断事件流
Disposable
对象可用用来切断事件流,在 onSubscribe()
被调用时会返回 Disposable
对象,我们在获取到数字 4
时切断事件流。将上面的代码稍微简化一下
1 | // 创建 Observable |
结果为
1 | [ThreadName:pool-13-thread-1] onSubscribe |
其他订阅方法
在实际应用过程中我们可能并不关注下游所有的接受事件的方法,因此 RxJava
提供了多种订阅方式来简化订阅过程。
这里说一下 Action
和 Consumer
,与 RxJava1.x
不同,没有使用 ActionN
这种命名方式。
Action
是无参无返回值的接口,它可以用来替代类似 onComplete()
这种无参无返回值值的方法。
1 | public interface Action { |
Consumer
是单个参数无返回值的接口,它可以用来代替类似 onSubscribe(@NonNull Disposable d)
,onNext(@NonNull Integer integer)
,onError(@NonNull Throwable e)
这类单个参数无返回值的方法。
1 | public interface Consumer<T> { |
重载的订阅方法,除了以 Observer
的方式订阅之外,其他方法都返回 Disposable
对象用来中断事件流。
1 | // 下游不关注上游的任何事件 |
使用只关注 onNext()
事件的订阅方法实现订阅。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
对
Observable
的介绍相对详细,下面的介绍会简单一些,因为很多相似的地方,就不再赘述了。
使用 Flowable
Flowable
与 Observable
的区别就是实现了 背压(Backpressure
) 的管理,讲真,我对背压这个概念也理解的不是很全面,概括的讲背压就是上游发送的事件太多,下游处理事件的速度太慢,导致上游事件堆积,此时如何处理堆积的事件,就是背压处理的策略。
背压处理策略,在 RxJava2.x
中 Observable
不再支持背压,需要支持背压时需要使用 Flowable
创建被观察者,并要求明确指定背压处理策略。
关于 Flowable
和 Backpressure
的内容后面作单独研究,这里不展开。
1 | public enum BackpressureStrategy { |
实现一个订阅,支持背压时,需要在下游调用 request(long n)
来向上游请求,自己要多少数据,请求多少数据上游就会发多少数据过来,如下实例中,只会获取到一次事件。
1 | Flowable.create(new FlowableOnSubscribe<Integer>() { |
中断事件流
Subscription
类似于 Observable
中的 Disposable
,可以用来中断事件流,不同的是需要使用 cancel()
方法,另外有另一个 request(long n)
方法用来向上游请求数据。
1 | public interface Subscription { |
如下,使用 Subscription
中断事件。
1 | Flowable.create(new FlowableOnSubscribe<Integer>() { |
关于 request
下游请求多少就会收到多少事件,但是不会阻塞上游事件发送的过程,上游的事件会一直发,但是下游没请求的话接受不到事件。
request(long n)
中的数量是会累加的,累加的数量就是请求的总量,如果请求的总量超过了发送的总量,则上游事件会被全部接受到,但是不会多出来。
如下实例中,总共请求了 1 + 2 + 2 = 5
次事件,因此只收到了 5 次事件,但是上游的事件发送并没有停止。
1 | Flowable.create(new FlowableOnSubscribe<Integer>() { |
输出结果
1 | [ThreadName:pool-13-thread-1] onSubscribe |
其他订阅方法
Flowable
跟 Observable
一样对订阅操作也有很多重载方法,可以参照Obervable#其他订阅方法。
更多被观察者实现
我现在还不太清楚它们之间的关系,就先列举一下 API
,后面有机会再仔细看看
Maybe
1 | Maybe.create(new MaybeOnSubscribe<Integer>() { |
Completable
1 | Completable.create(new CompletableOnSubscribe() { |
Single
1 | Single.create(new SingleOnSubscribe<Object>() { |