本文中的学习测试代码:https://github.com/AnnieCyl/rxjavastudy

Rx

Rx = Observables + LINQ + Schedulers

RxJava 使用三步骤

  1. 创建 Observable
  2. 创建 Observer
  3. 使用 subscribe() 进行订阅

subscribe 重载方法

  • subscribe(onNext)
  • subscribe(onNext, onError)
  • subscribe(onNext, onError, onComplete)
  • subscribe(onNext, onError, onComplete, onSubscribe)

5 种被观察者类型

  • Observable:能够发射 0 或 n 个数据,并以成功或者错误事件终止。
  • Flowable:能够发射 0 或 n 个数据,并以成功或者错误事件终止。支持背压,可以控制数据源发射的速度。
  • Maybe:能够发射 0 或者 1 个数据,要么成功,要么失败。只有 onComplete、onSuccess 和 onError 事件,三者选其一。
  • Single:只发射单个数据或错误事件,只有 onSuccess 和 onError 事件。
  • Completable:从来不发射数据,只处理 onComplete 和 onError 事件。可以看成 Rx 的 Runnable。常常结合 andThen 操作符使用。

do 操作符

do 操作符可以给 Observable 的生命周期的各个阶段加上一系列的回调监听:(以下基本是按照 do 操作的执行顺序排序)

  • doOnSubscribe:一旦观察者订阅了 Observable,它就会被调动。
  • doOnLifecycle:可以在观察者订阅之后,设置是否取消订阅。
  • doOnNext:Observable 每发射一项数据就会调用它一次,在 onNext 之前执行。
  • doOnEach:Observable 每发射一项数据就会调用它一次,在 doOnNext、doOnError 和 doOnComplete 之后,onNext、onError 和 onComplete 之前执行。
  • doAfterNext:在 onNext 之后执行。
  • doOnComplete:当 Observable 正常终止调用 onComplete 前会被调用。
  • doFinally:Observable 终止之后会被调用,无论是正常终止还是异常终止。
  • doAfterTerminate:注册一个 Action,当 Observable 调用 onComplete 或者 onError 时触发。

Hot Observable 和 Cold Observable

Hot Observable

  • 无论有没有观察者进行订阅,事件始终会发生
  • Hot Observable 与订阅者们的关系是一对多的关系
  • 好比是一个广播电台,所有此刻收听的听众都会听到同一首歌

Cold Observable

  • 只有观察者订阅了,才开始执行发射数据流的代码
  • Cold Observable 和 Observer 是一对一的关系
  • 好比是一张音乐 CD,人们可以独立购买并听取它
  • just、create、range、fromXXX 等操作符都能生成 Cold Observable

Cold Observable 转成 Hot Observable

  • 使用 publish(),生成 ConnectableObservable。生成的 ConnectableObservable 并不是 subscribe 的时候就发射数据,而是只有对其应用 connect() 的时候才开始发射数据。
  • 使用 Subject/Processor:Subject 作为观察者,可以订阅目标 Cold Observable,使对方开始发送事件,让 Cold Observable 借助 Subject 转换为 Hot Observable。

Hot Observable 转成 Cold Observable

  • ConnectableObservable 的 refCount 操作符。RefCount 跟踪有多少个观察者订阅它:
    • 如果所有订阅者都取消订阅了,则数据流停止;如果重新订阅,则重新开始数据流。
    • 如果不是所有订阅者都取消了订阅,而只是部分取消,则部分订阅者重新开始订阅时,不会从头开始数据流。
  • Observable 的 share 操作符。其实 share 操作符封装了 publish().refCount()

Subject

详细可参见这个链接:https://mcxiaoke.gitbooks.io/rxdocs/content/Subject.html

  • AsyncSubject:Observer 会接收 AsyncSubject 的 onComplete() 之前的最后一个数据。subject.onComplete() 必须要调用才会开始发送数据,否则观察者将不接收任何数据。
  • BehaviorSubject:Observer 会先接收到 BehaviorSubject 被订阅之前的最后一个数据,再接收订阅之后发射过来的数据。如果 BehaviorSubject 被订阅之前没有发送任何数据,则会发送一个默认数据。
  • ReplaySubject:ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。ReplaySubject 可以限制缓存数据的数量(createWithSize()),也可以限制缓存的时间(createWithTime())。
  • PublishSubject:Observer 只接收 PublishSubject 被订阅之后发送的数据。

Operator

fromArray

只有当要发送的是对象数组时,才会将数组里面的元素一个个发出,否则就是一整个数组发出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Integer[] arr1 = {1, 2, 3};
int[] arr2 = {1, 2, 3, 4};

Observable.fromArray(arr1)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer i) throws Exception {
System.out.println("Observable.array1: " + i);
}
})
.subscribe();

Observable.fromArray(arr2)
.doOnNext(new Consumer<int[]>() {
@Override
public void accept(int[] i) throws Exception {
System.out.println("Observable.array2: " + i);
}
})
.subscribe();

以上代码最后输出的是:

1
2
3
4
Observable.array1: 1
Observable.array1: 2
Observable.array1: 3
Observable.array2: [I@4563e9ab

repeat、repeatWhen

  • repeat 不是创建一个 Observable,而是重复发射原始 Observable 的数据序列。

  • repeatWhen 不是缓存和重放原始 Observable 的数据序列,而是有条件地重新订阅和发射原来的 Observable。

retry、retryWhen

  • retryWhenonError 中的 Throwable 传递给一个函数,这个函数产生另一个Observable,retryWhen 观察它的结果再决定是不是要重新订阅原始的 Observable。如果这个 Observable 发射了一项数据,它就重新订阅,如果这个 Observable 发射的是onError 通知,它就将这个通知传递给观察者然后终止。

map、flatMap

  • 当要转换的数据源是 observable 类型的,则只能用 flatmap,而不能用 map

补充学习

CompositeDisposable

RxJava 容易造成内存泄漏,在某些情况下没有及时取消订阅导致内存泄漏。CompositeDisposable 可以将 Disposable 统一管理。每当我们得到一个 Disposable 时,就调用 CompositeDisposable.add() 将它添加到容器中, 在退出的时候, 调用 CompositeDisposable.clear() 即可快速解除。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private final CompositeDisposable disposables = new CompositeDisposable();

// adding an Observable to the disposable
disposables.add(sampleObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onComplete() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(String value) {
}
}));

static Observable<String> sampleObservable() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
// Do some long running operation
SystemClock.sleep(2000);
return Observable.just("one", "two", "three", "four", "five");
}
});
}


// Using clear will clear all, but can accept new disposable
disposables.clear();
// Using dispose will clear all and set isDisposed = true, so it will not accept any new disposable
disposables.dispose();