《RxJava2.x 实战》学习笔记
本文中的学习测试代码:https://github.com/AnnieCyl/rxjavastudy
Rx
Rx = Observables + LINQ + Schedulers
RxJava 使用三步骤
- 创建 Observable
- 创建 Observer
- 使用 subscribe() 进行订阅
subscribe 重载方法
- subscribe(onNext)
- subscribe(onNext, onError)
- subscribe(onNext, onError, onComplete)
- subscribe(onNext, onError, onComplete, onSubscribe)
5 种被观察者类型
- Observable:能够发射 0 或 n 个数据,并以成功或者错误事件终止。
- Flowable:能够发射 0 或 n 个数据,并以成功或者错误事件终止。支持背压,可以控制数据源发射的速度。
- Maybe:能够发射 0 或者 1 个数据,要么成功,要么失败。只有 onComplete、onSuccess 和 onError 事件,三者选其一。
- Single:只发射单个数据或错误事件,只有 onSuccess 和 onError 事件。
- Completable:从来不发射数据,只处理 onComplete 和 onError 事件。可以看成 Rx 的 Runnable。常常结合
andThen
操作符使用。
do 操作符
do 操作符可以给 Observable 的生命周期的各个阶段加上一系列的回调监听:(以下基本是按照 do 操作的执行顺序排序)
- doOnSubscribe:一旦观察者订阅了 Observable,它就会被调动。
- doOnLifecycle:可以在观察者订阅之后,设置是否取消订阅。
- doOnNext:Observable 每发射一项数据就会调用它一次,在 onNext 之前执行。
- doOnEach:Observable 每发射一项数据就会调用它一次,在 doOnNext、doOnError 和 doOnComplete 之后,onNext、onError 和 onComplete 之前执行。
- doAfterNext:在 onNext 之后执行。
- doOnComplete:当 Observable 正常终止调用 onComplete 前会被调用。
- doFinally:Observable 终止之后会被调用,无论是正常终止还是异常终止。
- doAfterTerminate:注册一个 Action,当 Observable 调用 onComplete 或者 onError 时触发。
Hot Observable 和 Cold Observable
Hot Observable
- 无论有没有观察者进行订阅,事件始终会发生
- Hot Observable 与订阅者们的关系是一对多的关系
- 好比是一个广播电台,所有此刻收听的听众都会听到同一首歌
Cold Observable
- 只有观察者订阅了,才开始执行发射数据流的代码
- Cold Observable 和 Observer 是一对一的关系
- 好比是一张音乐 CD,人们可以独立购买并听取它
- just、create、range、fromXXX 等操作符都能生成 Cold Observable
Cold Observable 转成 Hot Observable
- 使用
publish()
,生成ConnectableObservable
。生成的ConnectableObservable
并不是 subscribe 的时候就发射数据,而是只有对其应用connect()
的时候才开始发射数据。 - 使用 Subject/Processor:Subject 作为观察者,可以订阅目标 Cold Observable,使对方开始发送事件,让 Cold Observable 借助 Subject 转换为 Hot Observable。
Hot Observable 转成 Cold Observable
- ConnectableObservable 的
refCount
操作符。RefCount 跟踪有多少个观察者订阅它:- 如果所有订阅者都取消订阅了,则数据流停止;如果重新订阅,则重新开始数据流。
- 如果不是所有订阅者都取消了订阅,而只是部分取消,则部分订阅者重新开始订阅时,不会从头开始数据流。
- Observable 的
share
操作符。其实share
操作符封装了publish().refCount()
Subject
详细可参见这个链接:https://mcxiaoke.gitbooks.io/rxdocs/content/Subject.html
- AsyncSubject:Observer 会接收 AsyncSubject 的
onComplete()
之前的最后一个数据。subject.onComplete()
必须要调用才会开始发送数据,否则观察者将不接收任何数据。 - BehaviorSubject:Observer 会先接收到 BehaviorSubject 被订阅之前的最后一个数据,再接收订阅之后发射过来的数据。如果 BehaviorSubject 被订阅之前没有发送任何数据,则会发送一个默认数据。
- ReplaySubject:ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。ReplaySubject 可以限制缓存数据的数量(
createWithSize()
),也可以限制缓存的时间(createWithTime()
)。 - PublishSubject:Observer 只接收 PublishSubject 被订阅之后发送的数据。
Operator
fromArray
只有当要发送的是对象数组时,才会将数组里面的元素一个个发出,否则就是一整个数组发出。
1 | Integer[] arr1 = {1, 2, 3}; |
以上代码最后输出的是:
1 | Observable.array1: 1 |
repeat、repeatWhen
repeat
不是创建一个 Observable,而是重复发射原始 Observable 的数据序列。repeatWhen
不是缓存和重放原始 Observable 的数据序列,而是有条件地重新订阅和发射原来的 Observable。
retry、retryWhen
retryWhen
将onError
中的Throwable
传递给一个函数,这个函数产生另一个Observable,retryWhen
观察它的结果再决定是不是要重新订阅原始的 Observable。如果这个 Observable 发射了一项数据,它就重新订阅,如果这个 Observable 发射的是onError
通知,它就将这个通知传递给观察者然后终止。
map、flatMap
- 当要转换的数据源是 observable 类型的,则只能用
flatmap
,而不能用map
补充学习
CompositeDisposable
RxJava 容易造成内存泄漏,在某些情况下没有及时取消订阅导致内存泄漏。CompositeDisposable 可以将 Disposable 统一管理。每当我们得到一个 Disposable 时,就调用 CompositeDisposable.add() 将它添加到容器中, 在退出的时候, 调用 CompositeDisposable.clear() 即可快速解除。例如:
1 | private final CompositeDisposable disposables = new CompositeDisposable(); |
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Annie's Blog!