本文以 Observable
为例,本文主要总结 RxJava2.x
关于 创建 操作相关操作符的用法。
Create
Create
是最基本的创建操作符,他用来创建一个标准的被观察者,然后恰当的调用观察者的 onNext
,onError
和 onCompleted
方法。 一个形式正确的有限 Observable
必须尝试调用观察者的 onCompleted
正好一次或者它的 onError
正好一次,而且此后不能再调用观察者的任何其它方法。
好的做法是在数据发射之前判断观察者的状态,在没有观察者时不进行事件发送和计算操作。Create
操作符不在任何线程调度器上执行。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
Just
Just
类似于 From
,但是 From
会将数组或 Iterable
的数据取出然后逐个发射,而 Just
只是简单的原样发射,将数组或 Iterable
当做单个数据,如下情况,将会直接发送一个 List
出去,而不是里面的数字 0
;
1 | List<Integer> integers = new ArrayList<>(); |
Just
它最多接受 10 个参数,返回一个按参数列表顺序发射这些数据的 Observable
。从 RxJava2.x
开始,使用 just
不允许传递 null
,否则会出现异常(NPE)
1 | Observable.just(0); |
From
在 RxJava1.x
时,只有一个 From
操作符,接受不同类型的参数,但是在 RxJava2.x
对这个操作符进行了细分。
fromIterable 和 fromArray
from
操作符可以转换 数组 和 Iterable,产生的 Observable
会发射 数组 和 Iterable 的每一项数据。
1 | List<Integer> integers = new ArrayList<>(); |
1 | int[] array = new int[]{1,2,3}; |
fromCallable 和 fromFuture
todo… Callable
和 Future
都是 java.util.concurrent
包里面的类,具体使用方法暂时不清楚,后面补充。
fromCallable()
返回的是 onNext
传递的数据,fromCallable()
获取要发送的数据的代码只会在有 Observer
订阅之后执行,且获取数据的代码可以在子线程中执行。
1 | Observable.fromCallable(new Callable<Integer>() { |
1 | public static <T> Observable<T> fromFuture(Future<? extends T> future) |
fromPublisher
todo…暂时不是很清楚它的用法, 但是 Flowable
实现了 Publisher
接口,可以使用该方法将 Flowable
转换为 Observable
1 | Flowable<Integer> integerFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() { |
Defer
直到有观察者订阅时才创建 Observable
,并且为每个观察者创建一个新的Observable
。
Defer
操作符会一直等待直到有观察者订阅它,然后它使用 Observable
工厂方法生成一个 Observable
。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个 Observable
,事实上每个订阅者获取的是它们自己的单独的数据序列。
1 | Observable<Object> defer = Observable.defer(new Callable<ObservableSource<?>>() { |
Empty/Never/Throw
empty
创建一个不发射任何数据但是正常终止的 Observable
;
never
创建一个不发射数据也不终止的 Observable
;
throw
创建一个不发射数据以一个错误终止的 Observable
;
1 | Observable.empty(); |
Interval 和 Range
默认在 computation
调度器上执行,你也可以传递一个可选的Scheduler
参数来指定调度器。
interval
interval
返回一个以固定时间间隔发送无限递增的 Long
型数列的 Observable
。
1 | public static Observable<Long> interval(long period, TimeUnit unit) |
intervalRange
intervalRange
类似于 interval
,但是它可以指定起始数值,而且不再是一个无限数列,需要注意的是,假设起始值为 start = a,count = b,即为从 start 开始,发送 count 个数据,那么发送的区间是 [a,a+b),左闭右开,例如 start = 100,count = 120,区间是 [100,220),即 100 ~ 219,另外 count
不能为负数,否则会异常。
1 | public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) |
range 和 rangeLong
从 start 开始发送 count 个数据,区间为 [start,start+count)
1 | Observable.range(100,50).subscribe(new MyObserver<Integer>("range")); |
Repeat
repeat
默认在 trampoline
调度器执行。
repeat
重复发送 Observable
的数据,如果不指定数目,将会无限发送。
1 | Observable.just(1).repeat(10) |
repeatWhen
todo…对这个运算符不是很清楚它的作用
它不是缓存和重放原始 Observable
的数据序列,而是有条件的重新订阅和发射原来的 Observable
。
将原始 Observable
的终止通知(完成或错误)当做一个 void
数据传递给一个通知处理器,它以此来决定是否要重新订阅和发射原来的 Observable
。这个通知处理器就像一个 Observable
操作符,接受一个发射 void
通知的 Observable
为输入,返回一个发射 void
数据(意思是,重新订阅和发射原始 Observable
)或者直接终止(意思是,使用 repeatWhen
终止发射数据)的Observable
。
1 | Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() { |
Start
发送事件之前先发送某些事件。
1 | // 接收单个值 |
Timer
在一定时间之后发送一个特殊值 0,timer
操作符默认在 computation
线程执行。
1 | public static Observable<Long> timer(long delay, TimeUnit unit) |