RxJava2
1.簡介
RxJava 有以下三個基本的元素:
首先在 gradle 文件中添加依賴:
implementation 'io.reactivex.rxjava2:rxjava:2.1.4' implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'?三者建立連接的方式有兩種
?
一種是分開寫:
? ? ?2.創建觀察者:
Observer observer = new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "======================onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "======================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "======================onError");}@Overridepublic void onComplete() {Log.d(TAG, "======================onComplete");} };? ? ?3.?訂閱
observable.subscribe(observer);另一種方式是連在一起寫:
Observable.create(new ObservableOnSubscribe < Integer > () {@Override public void subscribe(ObservableEmitter < Integer > e) throws Exception { Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName()); e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }) .subscribe(new Observer < Integer > () {@Override public void onSubscribe(Disposable d) { Log.d(TAG, "======================onSubscribe"); }@Override public void onNext(Integer integer) {Log.d(TAG, "======================onNext " + integer); } @Override public void onError(Throwable e) {Log.d(TAG, "======================onError");}@Override public void onComplete() {Log.d(TAG, "======================onComplete"); } });被觀察者發送的事件有以下幾種,總結如下表:
| onNext() | 發送該事件時,觀察者會回調 onNext() 方法 |
| onError() | 發送該事件時,觀察者會回調 onError() 方法,當發送該事件之后,其他事件將不會繼續發送 |
| onComplete() | 發送該事件時,觀察者會回調 onComplete() 方法,當發送該事件之后,其他事件將不會繼續發送 |
總結如下圖:
?
?
?
下面就來講解 RxJava 各種常見的操作符。
1. 創建操作符
以下就是講解創建被觀察者的各種操作符。
1.1 create()
方法預覽:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)有什么用:
創建一個被觀察者
怎么用:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("Hello Observer");e.onComplete();} });上面的代碼非常簡單,創建 ObservableOnSubscribe 并重寫其 subscribe 方法,就可以通過 ObservableEmitter 發射器向觀察者發送事件。
以下創建一個觀察者,來驗證這個被觀察者是否成功創建。
Observer<String> observer = new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(String s) {Log.d("chan","=============onNext " + s);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {Log.d("chan","=============onComplete ");} };observable.subscribe(observer);打印結果:
05-20 16:16:50.654 22935-22935/com.example.louder.rxjavademo D/chan: =============onNext Hello Observer =============onComplete1.2 just()
方法預覽:
public static <T> Observable<T> just(T item) ...... public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)有什么用?
創建一個被觀察者,并發送事件,發送的事件不可以超過10個以上。
怎么用?
Observable.just(1, 2, 3) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "=================onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "=================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "=================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "=================onComplete ");} });上面的代碼直接使用鏈式調用,代碼也非常簡單,這里就不細說了,看看打印結果:
05-20 16:27:26.938 23281-23281/? D/chan: =================onSubscribe =================onNext 1 =================onNext 2 =================onNext 3 =================onComplete1.3 From 操作符
1.3.1 fromArray()
方法預覽:
public static <T> Observable<T> fromArray(T... items)有什么用?
這個方法和 just() 類似,只不過 fromArray 可以傳入多于10個的變量,并且可以傳入一個數組。
怎么用?
Integer array[] = {1, 2, 3, 4}; Observable.fromArray(array) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "=================onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "=================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "=================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "=================onComplete ");} }); 復制代碼代碼和 just() 基本上一樣,直接看打印結果:
05-20 16:35:23.797 23574-23574/com.example.louder.rxjavademo D/chan: =================onSubscribe =================onNext 1 =================onNext 2 =================onNext 3 =================onNext 4 =================onComplete1.3.2 fromCallable()
方法預覽:
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)有什么用?
這里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它會返回一個結果值,這個結果值就是發給觀察者的。
怎么用?
Observable.fromCallable(new Callable < Integer > () {@Overridepublic Integer call() throws Exception {return 1;} }) .subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "================accept " + integer);} });打印結果:
05-26 13:01:43.009 6890-6890/? D/chan: ================accept 11.3.3 fromFuture()
方法預覽:
public static <T> Observable<T> fromFuture(Future<? extends T> future)有什么用?
參數中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通過 get() 方法來獲取 Callable 返回的值。
怎么用?
FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {@Overridepublic String call() throws Exception {Log.d(TAG, "CallableDemo is Running");return "返回結果";} });Observable.fromFuture(futureTask).doOnSubscribe(new Consumer < Disposable > () {@Overridepublic void accept(Disposable disposable) throws Exception {futureTask.run();} }) .subscribe(new Consumer < String > () {@Overridepublic void accept(String s) throws Exception {Log.d(TAG, "================accept " + s);} });doOnSubscribe() 的作用就是只有訂閱時才會發送事件,具體會在下面講解。
打印結果:
05-26 13:54:00.470 14429-14429/com.example.rxjavademo D/chan: CallableDemo is Running ================accept 返回結果1.3.4 fromIterable()
方法預覽:
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)有什么用?
直接發送一個 List 集合數據給觀察者
怎么用?
List<Integer> list = new ArrayList<>(); list.add(0); list.add(1); list.add(2); list.add(3); Observable.fromIterable(list) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "=================onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "=================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "=================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "=================onComplete ");} });打印結果如下:
05-20 16:43:28.874 23965-23965/? D/chan: =================onSubscribe =================onNext 0 =================onNext 1 =================onNext 2 =================onNext 3 =================onComplete?
1.4 defer()
方法預覽:
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)有什么用?
這個方法的作用就是直到被觀察者被訂閱后才會創建被觀察者。
怎么用?
?
// i 要定義為成員變量 Integer i = 100;Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {@Overridepublic ObservableSource<? extends Integer> call() throws Exception {return Observable.just(i);} });i = 200;Observer observer = new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "================onNext " + integer);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} };observable.subscribe(observer);i = 300;observable.subscribe(observer);打印結果如下:
05-20 20:05:01.443 26622-26622/? D/chan: ================onNext 200 ================onNext 300因為 defer() 只有觀察者訂閱的時候才會創建新的被觀察者,所以每訂閱一次就會打印一次,并且都是打印 i 最新的值。
?
1.5 timer()
方法預覽:
public static Observable<Long> timer(long delay, TimeUnit unit) ......有什么用?
當到指定時間后就會發送一個 0L 的值給觀察者。
怎么用?
Observable.timer(2, TimeUnit.SECONDS) .subscribe(new Observer < Long > () {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Long aLong) {Log.d(TAG, "===============onNext " + aLong);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });打印結果:
05-20 20:27:48.004 27204-27259/com.example.louder.rxjavademo D/chan: ===============onNext 01.6 interval()
方法預覽:
public static Observable<Long> interval(long period, TimeUnit unit) public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) ......有什么用?
每隔一段時間就會發送一個事件,這個事件是從0開始,不斷增1的數字。
怎么用?
Observable.interval(4, TimeUnit.SECONDS) .subscribe(new Observer < Long > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==============onSubscribe ");}@Overridepublic void onNext(Long aLong) {Log.d(TAG, "==============onNext " + aLong);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });打印結果:
05-20 20:48:10.321 28723-28723/com.example.louder.rxjavademo D/chan: ==============onSubscribe 05-20 20:48:14.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 0 05-20 20:48:18.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 1 05-20 20:48:22.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 2 05-20 20:48:26.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 3 05-20 20:48:30.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 4 05-20 20:48:34.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 5從時間就可以看出每隔4秒就會發出一次數字遞增1的事件。這里說下 interval() 第三個方法的 initialDelay 參數,這個參數的意思就是 onSubscribe 回調之后,再次回調 onNext 的間隔時間。
1.7 intervalRange()
方法預覽:
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)有什么用?
可以指定發送事件的開始值和數量,其他與 interval() 的功能一樣。
怎么用?
Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS) .subscribe(new Observer < Long > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==============onSubscribe ");}@Overridepublic void onNext(Long aLong) {Log.d(TAG, "==============onNext " + aLong);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });打印結果:
05-21 00:03:01.672 2504-2504/com.example.louder.rxjavademo D/chan: ==============onSubscribe 05-21 00:03:03.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 2 05-21 00:03:04.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 3 05-21 00:03:05.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 4 05-21 00:03:06.673 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 5 05-21 00:03:07.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 6可以看出收到5次 onNext 事件,并且是從 2 開始的。
?
1.8 range()
方法預覽:
public static Observable<Integer> range(final int start, final int count)有什么用?
同時發送一定范圍的事件序列。
怎么用?
Observable.range(2, 5) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==============onSubscribe ");}@Overridepublic void onNext(Integer aLong) {Log.d(TAG, "==============onNext " + aLong);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });打印結果:
05-21 00:09:17.202 2921-2921/? D/chan: ==============onSubscribe ==============onNext 2 ==============onNext 3 ==============onNext 4 ==============onNext 5 ==============onNext 61.9 rangeLong()
方法預覽:
public static Observable<Long> rangeLong(long start, long count) 復制代碼有什么用?
作用與 range() 一樣,只是數據類型為 Long
怎么用?
用法與 range() 一樣。
1.10 empty() & never() & error()
方法預覽:
public static <T> Observable<T> empty() public static <T> Observable<T> never() public static <T> Observable<T> error(final Throwable exception)有什么用?
怎么用?
Observable.empty() .subscribe(new Observer < Object > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe");}@Overridepublic void onNext(Object o) {Log.d(TAG, "==================onNext");}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError " + e);}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete");} });打印結果:
05-26 14:06:11.881 15798-15798/com.example.rxjavademo D/chan: ==================onSubscribe ==================onComplete換成 never() 的打印結果:
05-26 14:12:17.554 16805-16805/com.example.rxjavademo D/chan: ==================onSubscribe換成 error() 的打印結果:
05-26 14:12:58.483 17817-17817/com.example.rxjavademo D/chan: ==================onSubscribe ==================onError java.lang.NullPointerException?
2. 轉換操作符
2.1 map()
方法預覽:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)有什么用?
map 可以將被觀察者發送的數據類型轉變成其他的類型
怎么用?
以下代碼將 Integer 類型的數據轉換成 String。
Observable.just(1, 2, 3) .map(new Function < Integer, String > () {@Overridepublic String apply(Integer integer) throws Exception {return "I'm " + integer;} }) .subscribe(new Observer < String > () {@Overridepublic void onSubscribe(Disposable d) {Log.e(TAG, "===================onSubscribe");}@Overridepublic void onNext(String s) {Log.e(TAG, "===================onNext " + s);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });打印結果:
05-21 09:16:03.490 5700-5700/com.example.rxjavademo E/chan: ===================onSubscribe ===================onNext I'm 1 ===================onNext I'm 2 ===================onNext I'm 32.2 flatMap()
方法預覽:
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) ......有什么用?
這個方法可以將事件序列中的元素進行整合加工,返回一個新的被觀察者。
怎么用?
flatMap() 其實與 map() 類似,但是 flatMap() 返回的是一個 Observerable。現在用一個例子來說明 flatMap() 的用法。
假設一個有一個 Person 類,這個類的定義如下:
public class Person {private String name;private List<Plan> planList = new ArrayList<>();public Person(String name, List<Plan> planList) {this.name = name;this.planList = planList;}public String getName() {return name;}public void setName(String name) {this.name = name;}public List<Plan> getPlanList() {return planList;}public void setPlanList(List<Plan> planList) {this.planList = planList;}}Person 類有一個 name 和 planList 兩個變量,分別代表的是人名和計劃清單。
Plan 類的定義如下:
public class Plan {private String time;private String content;private List<String> actionList = new ArrayList<>();public Plan(String time, String content) {this.time = time;this.content = content;}public String getTime() {return time;}public void setTime(String time) {this.time = time;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public List<String> getActionList() {return actionList;}public void setActionList(List<String> actionList) {this.actionList = actionList;} }現在有一個需求就是要將 Person 集合中的每個元素中的 Plan 的 action 打印出來。 首先用 map() 來實現這個需求看看:
Observable.fromIterable(personList) .map(new Function < Person, List < Plan >> () {@Overridepublic List < Plan > apply(Person person) throws Exception {return person.getPlanList();} }) .subscribe(new Observer < List < Plan >> () {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(List < Plan > plans) {for (Plan plan: plans) {List < String > planActionList = plan.getActionList();for (String action: planActionList) {Log.d(TAG, "==================action " + action);}}}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });可以看到 onNext() 用了嵌套 for 循環來實現,如果代碼邏輯復雜起來的話,可能需要多重循環才可以實現。
現在看下使用 flatMap() 實現:
Observable.fromIterable(personList) .flatMap(new Function < Person, ObservableSource < Plan >> () {@Overridepublic ObservableSource < Plan > apply(Person person) {return Observable.fromIterable(person.getPlanList());} }) .flatMap(new Function < Plan, ObservableSource < String >> () {@Overridepublic ObservableSource < String > apply(Plan plan) throws Exception {return Observable.fromIterable(plan.getActionList());} }) .subscribe(new Observer < String > () {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(String s) {Log.d(TAG, "==================action: " + s);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });從代碼可以看出,只需要兩個 flatMap() 就可以完成需求,并且代碼邏輯非常清晰。
2.3 concatMap()
方法預覽:
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)有什么用?
concatMap() 和 flatMap() 基本上是一樣的,只不過 concatMap() 轉發出來的事件是有序的,而 flatMap() 是無序的。
怎么用?
還是使用上面 flatMap() 的例子來講解,首先來試下 flatMap() 來驗證發送的事件是否是無序的,代碼如下:
Observable.fromIterable(personList) .flatMap(new Function < Person, ObservableSource < Plan >> () {@Overridepublic ObservableSource < Plan > apply(Person person) {if ("chan".equals(person.getName())) {return Observable.fromIterable(person.getPlanList()).delay(10, TimeUnit.MILLISECONDS);}return Observable.fromIterable(person.getPlanList());} }) .subscribe(new Observer < Plan > () {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Plan plan) {Log.d(TAG, "==================plan " + plan.getContent());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });為了更好的驗證 flatMap 是無序的,使用了一個 delay() 方法來延遲,直接看打印結果:
05-21 13:57:14.031 21616-21616/com.example.rxjavademo D/chan: ==================plan chan 上課 ==================plan chan 寫作業 ==================plan chan 打籃球 05-21 13:57:14.041 21616-21641/com.example.rxjavademo D/chan: ==================plan Zede 開會 ==================plan Zede 寫代碼 ==================plan Zede 寫文章可以看到本來 Zede 的事件發送順序是排在 chan 事件之前,但是經過延遲后, 這兩個事件序列發送順序互換了。
現在來驗證下 concatMap() 是否是有序的,使用上面同樣的代碼,只是把 flatMap() 換成 concatMap(),打印結果如下:
05-21 13:58:42.917 21799-21823/com.example.rxjavademo D/chan: ==================plan Zede 開會 ==================plan Zede 寫代碼 ==================plan Zede 寫文章 ==================plan chan 上課 ==================plan chan 寫作業 ==================plan chan 打籃球這就代表 concatMap() 轉換后發送的事件序列是有序的了。
?
2.4 buffer()
方法預覽:
public final Observable<List<T>> buffer(int count, int skip) ......有什么用?
從需要發送的事件當中獲取一定數量的事件,并將這些事件放到緩沖區當中一并發出。
怎么用?
buffer 有兩個參數,一個是 count,另一個 skip。count 緩沖區元素的數量,skip 就代表緩沖區滿了之后,發送下一次事件序列的時候要跳過多少元素。這樣說可能還是有點抽象,直接看代碼:
Observable.just(1, 2, 3, 4, 5) .buffer(2, 1) .subscribe(new Observer < List < Integer >> () {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(List < Integer > integers) {Log.d(TAG, "================緩沖區大小: " + integers.size());for (Integer i: integers) {Log.d(TAG, "================元素: " + i);}}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });打印結果:
05-21 14:09:34.015 22421-22421/com.example.rxjavademo D/chan: ================緩沖區大小: 2 ================元素: 1 ================元素: 2 ================緩沖區大小: 2 ================元素: 2 ================元素: 3 ================緩沖區大小: 2 ================元素: 3 ================元素: 4 ================緩沖區大小: 2 ================元素: 4 ================元素: 5 ================緩沖區大小: 1 ================元素: 5從結果可以看出,每次發送事件,指針都會往后移動一個元素再取值,直到指針移動到沒有元素的時候就會停止取值。
2.5 groupBy()
方法預覽:
public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)有什么用?
將發送的數據進行分組,每個分組都會返回一個被觀察者。
怎么用?
Observable.just(5, 2, 3, 4, 1, 6, 8, 9, 7, 10) .groupBy(new Function < Integer, Integer > () {@Overridepublic Integer apply(Integer integer) throws Exception {return integer % 3;} }) .subscribe(new Observer < GroupedObservable < Integer, Integer >> () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "====================onSubscribe ");}@Overridepublic void onNext(GroupedObservable < Integer, Integer > integerIntegerGroupedObservable) {Log.d(TAG, "====================onNext ");integerIntegerGroupedObservable.subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "====================GroupedObservable onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "====================GroupedObservable onNext groupName: " + integerIntegerGroupedObservable.getKey() + " value: " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "====================GroupedObservable onError ");}@Overridepublic void onComplete() {Log.d(TAG, "====================GroupedObservable onComplete ");}});}@Overridepublic void onError(Throwable e) {Log.d(TAG, "====================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "====================onComplete ");} });在 groupBy() 方法返回的參數是分組的名字,每返回一個值,那就代表會創建一個組,以上的代碼就是將1~10的數據分成3組,來看看打印結果:
05-26 14:38:02.062 21451-21451/com.example.rxjavademo D/chan: ====================onSubscribe 05-26 14:38:02.063 21451-21451/com.example.rxjavademo D/chan: ====================onNext ====================GroupedObservable onSubscribe ====================GroupedObservable onNext groupName: 2 value: 5 ====================GroupedObservable onNext groupName: 2 value: 2 ====================onNext ====================GroupedObservable onSubscribe ====================GroupedObservable onNext groupName: 0 value: 3 05-26 14:38:02.064 21451-21451/com.example.rxjavademo D/chan: ====================onNext ====================GroupedObservable onSubscribe ====================GroupedObservable onNext groupName: 1 value: 4 ====================GroupedObservable onNext groupName: 1 value: 1 ====================GroupedObservable onNext groupName: 0 value: 6 ====================GroupedObservable onNext groupName: 2 value: 8 ====================GroupedObservable onNext groupName: 0 value: 9 ====================GroupedObservable onNext groupName: 1 value: 7 ====================GroupedObservable onNext groupName: 1 value: 10 05-26 14:38:02.065 21451-21451/com.example.rxjavademo D/chan: ====================GroupedObservable onComplete ====================GroupedObservable onComplete ====================GroupedObservable onComplete ====================onComplete可以看到返回的結果中是有3個組的。
2.6 scan()
方法預覽:
public final Observable<T> scan(BiFunction<T, T, T> accumulator)有什么用?
將數據以一定的邏輯聚合起來。
怎么用?
Observable.just(1, 2, 3, 4, 5) .scan(new BiFunction < Integer, Integer, Integer > () {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {Log.d(TAG, "====================apply ");Log.d(TAG, "====================integer " + integer);Log.d(TAG, "====================integer2 " + integer2);return integer + integer2;} }) .subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "====================accept " + integer);} });打印結果:
05-26 14:45:27.784 22519-22519/com.example.rxjavademo D/chan: ====================accept 1 ====================apply ====================integer 1 ====================integer2 2 ====================accept 3 ====================apply 05-26 14:45:27.785 22519-22519/com.example.rxjavademo D/chan: ====================integer 3 ====================integer2 3 ====================accept 6 ====================apply ====================integer 6 ====================integer2 4 ====================accept 10 ====================apply ====================integer 10 ====================integer2 5 ====================accept 152.7 window()
方法預覽:
public final Observable<Observable<T>> window(long count) ......有什么用?
發送指定數量的事件時,就將這些事件分為一組。window 中的 count 的參數就是代表指定的數量,例如將 count 指定為2,那么每發2個數據就會將這2個數據分成一組。
怎么用?
Observable.just(1, 2, 3, 4, 5) .window(2) .subscribe(new Observer < Observable < Integer >> () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "=====================onSubscribe ");}@Overridepublic void onNext(Observable < Integer > integerObservable) {integerObservable.subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "=====================integerObservable onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "=====================integerObservable onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "=====================integerObservable onError ");}@Overridepublic void onComplete() {Log.d(TAG, "=====================integerObservable onComplete ");}});}@Overridepublic void onError(Throwable e) {Log.d(TAG, "=====================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "=====================onComplete ");} });打印結果:
05-26 15:02:20.654 25838-25838/com.example.rxjavademo D/chan: =====================onSubscribe 05-26 15:02:20.655 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onSubscribe 05-26 15:02:20.656 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onNext 1 =====================integerObservable onNext 2 =====================integerObservable onComplete =====================integerObservable onSubscribe =====================integerObservable onNext 3 =====================integerObservable onNext 4 =====================integerObservable onComplete =====================integerObservable onSubscribe =====================integerObservable onNext 5 =====================integerObservable onComplete =====================onComplete從結果可以發現,window() 將 1~5 的事件分成了3組。
3. 組合操作符
3.1 concat()
方法預覽:
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4) ......有什么用?
可以將多個觀察者組合在一起,然后按照之前發送順序發送事件。需要注意的是,concat() 最多只可以發送4個事件。
怎么用?
Observable.concat(Observable.just(1, 2), Observable.just(3, 4), Observable.just(5, 6), Observable.just(7, 8)) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "================onNext " + integer);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });打印如下:
05-21 15:40:26.738 7477-7477/com.example.rxjavademo D/chan: ================onNext 1 ================onNext 2 05-21 15:40:26.739 7477-7477/com.example.rxjavademo D/chan: ================onNext 3 ================onNext 4 ================onNext 5 ================onNext 6 ================onNext 7 ================onNext 83.2 concatArray()
方法預覽:
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)有什么用?
與 concat() 作用一樣,不過 concatArray() 可以發送多于 4 個被觀察者。
怎么用?
Observable.concatArray(Observable.just(1, 2), Observable.just(3, 4), Observable.just(5, 6), Observable.just(7, 8), Observable.just(9, 10)) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "================onNext " + integer);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });打印結果:
05-21 15:47:18.581 9129-9129/com.example.rxjavademo D/chan: ================onNext 1 ================onNext 2 ================onNext 3 ================onNext 4 ================onNext 5 ================onNext 6 ================onNext 7 ================onNext 8 ================onNext 9 ================onNext 103.3 merge()
方法預覽:
public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4) ......有什么用?
這個方法月 concat() 作用基本一樣,知識 concat() 是串行發送事件,而 merge() 并行發送事件。
怎么用?
現在來演示 concat() 和 merge() 的區別。
Observable.merge( Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {@Overridepublic String apply(Long aLong) throws Exception {return "A" + aLong;} }), Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {@Overridepublic String apply(Long aLong) throws Exception {return "B" + aLong;} })).subscribe(new Observer < String > () {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(String s) {Log.d(TAG, "=====================onNext " + s);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {} });打印結果如下:
05-21 16:10:31.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B0 05-21 16:10:31.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A0 05-21 16:10:32.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A1 05-21 16:10:32.126 12801-12850/com.example.rxjavademo D/chan: =====================onNext B1 05-21 16:10:33.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A2 05-21 16:10:33.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B2 05-21 16:10:34.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A3 05-21 16:10:34.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B3 05-21 16:10:35.124 12801-12849/com.example.rxjavademo D/chan: =====================onNext A4 05-21 16:10:35.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B4 05-21 16:10:36.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A5 05-21 16:10:36.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B5 ...... 復制代碼從結果可以看出,A 和 B 的事件序列都可以發出,將以上的代碼換成 concat() 看看打印結果:
05-21 16:17:52.352 14597-14621/com.example.rxjavademo D/chan: =====================onNext A0 05-21 16:17:53.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A1 05-21 16:17:54.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A2 05-21 16:17:55.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A3 05-21 16:17:56.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A4 05-21 16:17:57.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A5 ......從結果可以知道,只有等到第一個被觀察者發送完事件之后,第二個被觀察者才會發送事件。
mergeArray() 與 merge() 的作用是一樣的,只是它可以發送4個以上的被觀察者,這里就不再贅述了。
3.4 concatArrayDelayError() & mergeArrayDelayError()
方法預覽:
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources) public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)有什么用?
在 concatArray() 和 mergeArray() 兩個方法當中,如果其中有一個被觀察者發送了一個 Error 事件,那么就會停止發送事件,如果你想 onError() 事件延遲到所有被觀察者都發送完事件后再執行的話,就可以使用 concatArrayDelayError() 和 mergeArrayDelayError()
怎么用?
首先使用 concatArray() 來驗證一下發送 onError() 事件是否會中斷其他被觀察者發送事件,代碼如下:
Observable.concatArray( Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onError(new NumberFormatException());} }), Observable.just(2, 3, 4)).subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "===================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "===================onError ");}@Overridepublic void onComplete() {} });打印結果:
05-21 16:38:59.725 17985-17985/com.example.rxjavademo D/chan: ===================onNext 1 ===================onError從結果可以知道,確實中斷了,現在換用 concatArrayDelayError(),代碼如下:
Observable.concatArrayDelayError( Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onError(new NumberFormatException());} }), Observable.just(2, 3, 4)) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "===================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "===================onError ");}@Overridepublic void onComplete() {} });打印結果如下:
05-21 16:40:59.329 18199-18199/com.example.rxjavademo D/chan: ===================onNext 1 ===================onNext 2 ===================onNext 3 ===================onNext 4 ===================onError從結果可以看到,onError 事件是在所有被觀察者發送完事件才發送的。mergeArrayDelayError() 也是有同樣的作用,這里不再贅述。
3.5 zip()
方法預覽:
public static <T1, T2, R> Observable<R> zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper) ......有什么用?
會將多個被觀察者合并,根據各個被觀察者發送事件的順序一個個結合起來,最終發送的事件數量會與源 Observable 中最少事件的數量一樣。
怎么用?
Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {@Overridepublic String apply(Long aLong) throws Exception {String s1 = "A" + aLong;Log.d(TAG, "===================A 發送的事件 " + s1);return s1;}}),Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {@Overridepublic String apply(Long aLong) throws Exception {String s2 = "B" + aLong;Log.d(TAG, "===================B 發送的事件 " + s2);return s2;}}),new BiFunction<String, String, String>() {@Overridepublic String apply(String s, String s2) throws Exception {String res = s + s2;return res;}}) .subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "===================onSubscribe ");}@Overridepublic void onNext(String s) {Log.d(TAG, "===================onNext " + s);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "===================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "===================onComplete ");} });上面代碼中有兩個 Observable,第一個發送事件的數量為5個,第二個發送事件的數量為6個。現在來看下打印結果:
05-22 09:10:39.952 5338-5338/com.example.rxjavademo D/chan: ===================onSubscribe 05-22 09:10:40.953 5338-5362/com.example.rxjavademo D/chan: ===================A 發送的事件 A1 05-22 09:10:40.953 5338-5363/com.example.rxjavademo D/chan: ===================B 發送的事件 B1 ===================onNext A1B1 05-22 09:10:41.953 5338-5362/com.example.rxjavademo D/chan: ===================A 發送的事件 A2 05-22 09:10:41.954 5338-5363/com.example.rxjavademo D/chan: ===================B 發送的事件 B2 ===================onNext A2B2 05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================A 發送的事件 A3 05-22 09:10:42.953 5338-5363/com.example.rxjavademo D/chan: ===================B 發送的事件 B3 05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================onNext A3B3 05-22 09:10:43.953 5338-5362/com.example.rxjavademo D/chan: ===================A 發送的事件 A4 05-22 09:10:43.953 5338-5363/com.example.rxjavademo D/chan: ===================B 發送的事件 B4 05-22 09:10:43.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A4B4 05-22 09:10:44.953 5338-5362/com.example.rxjavademo D/chan: ===================A 發送的事件 A5 05-22 09:10:44.953 5338-5363/com.example.rxjavademo D/chan: ===================B 發送的事件 B5 05-22 09:10:44.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A5B5 ===================onComplete可以發現最終接收到的事件數量是5,那么為什么第二個 Observable 沒有發送第6個事件呢?因為在這之前第一個 Observable 已經發送了 onComplete 事件,所以第二個 Observable 不會再發送事件。
3.6 combineLatest() & combineLatestDelayError()
方法預覽:
public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner) .......有什么用?
combineLatest() 的作用與 zip() 類似,但是 combineLatest() 發送事件的序列是與發送的時間線有關的,當 combineLatest() 中所有的 Observable 都發送了事件,只要其中有一個 Observable 發送事件,這個事件就會和其他 Observable 最近發送的事件結合起來發送,這樣可能還是比較抽象,看看以下例子代碼。
怎么用?
Observable.combineLatest( Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS).map(new Function < Long, String > () {@Overridepublic String apply(Long aLong) throws Exception {String s1 = "A" + aLong;Log.d(TAG, "===================A 發送的事件 " + s1);return s1;} }), Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS).map(new Function < Long, String > () {@Overridepublic String apply(Long aLong) throws Exception {String s2 = "B" + aLong;Log.d(TAG, "===================B 發送的事件 " + s2);return s2;} }), new BiFunction < String, String, String > () {@Overridepublic String apply(String s, String s2) throws Exception {String res = s + s2;return res;} }) .subscribe(new Observer < String > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "===================onSubscribe ");}@Overridepublic void onNext(String s) {Log.d(TAG, "===================最終接收到的事件 " + s);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "===================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "===================onComplete ");} });分析上面的代碼,Observable A 會每隔1秒就發送一次事件,Observable B 會隔2秒發送一次事件。來看看打印結果:
05-22 11:41:20.859 15104-15104/? D/chan: ===================onSubscribe 05-22 11:41:21.859 15104-15128/com.example.rxjavademo D/chan: ===================A 發送的事件 A1 05-22 11:41:22.860 15104-15128/com.example.rxjavademo D/chan: ===================A 發送的事件 A2 05-22 11:41:22.861 15104-15129/com.example.rxjavademo D/chan: ===================B 發送的事件 B1 05-22 11:41:22.862 15104-15129/com.example.rxjavademo D/chan: ===================最終接收到的事件 A2B1 05-22 11:41:23.860 15104-15128/com.example.rxjavademo D/chan: ===================A 發送的事件 A3 ===================最終接收到的事件 A3B1 05-22 11:41:24.860 15104-15128/com.example.rxjavademo D/chan: ===================A 發送的事件 A4 05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================B 發送的事件 B2 05-22 11:41:24.861 15104-15128/com.example.rxjavademo D/chan: ===================最終接收到的事件 A4B1 05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================最終接收到的事件 A4B2 05-22 11:41:26.860 15104-15129/com.example.rxjavademo D/chan: ===================B 發送的事件 B3 05-22 11:41:26.861 15104-15129/com.example.rxjavademo D/chan: ===================最終接收到的事件 A4B3 05-22 11:41:28.860 15104-15129/com.example.rxjavademo D/chan: ===================B 發送的事件 B4 05-22 11:41:28.861 15104-15129/com.example.rxjavademo D/chan: ===================最終接收到的事件 A4B4 05-22 11:41:30.860 15104-15129/com.example.rxjavademo D/chan: ===================B 發送的事件 B5 05-22 11:41:30.861 15104-15129/com.example.rxjavademo D/chan: ===================最終接收到的事件 A4B5 ===================onComplete分析上述結果可以知道,當發送 A1 事件之后,因為 B 并沒有發送任何事件,所以根本不會發生結合。當 B 發送了 B1 事件之后,就會與 A 最近發送的事件 A2 結合成 A2B1,這樣只有后面一有被觀察者發送事件,這個事件就會與其他被觀察者最近發送的事件結合起來了。
因為 combineLatestDelayError() 就是多了延遲發送 onError() 功能,這里就不再贅述了。
3.7 reduce()
方法預覽:
public final Maybe<T> reduce(BiFunction<T, T, T> reducer)有什么用?
與 scan() 操作符的作用也是將發送數據以一定邏輯聚合起來,這兩個的區別在于 scan() 每處理一次數據就會將事件發送給觀察者,而 reduce() 會將所有數據聚合在一起才會發送事件給觀察者。
怎么用?
Observable.just(0, 1, 2, 3) .reduce(new BiFunction < Integer, Integer, Integer > () {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {int res = integer + integer2;Log.d(TAG, "====================integer " + integer);Log.d(TAG, "====================integer2 " + integer2);Log.d(TAG, "====================res " + res);return res;} }) .subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "==================accept " + integer);} });打印結果:
05-22 14:21:46.042 17775-17775/? D/chan: ====================integer 0 ====================integer2 1 ====================res 1 ====================integer 1 ====================integer2 2 ====================res 3 ====================integer 3 ====================integer2 3 ====================res 6 ==================accept 6從結果可以看到,其實就是前2個數據聚合之后,然后再與后1個數據進行聚合,一直到沒有數據為止。
3.8 collect()
方法預覽:
public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)有什么用?
將數據收集到數據結構當中。
怎么用?
Observable.just(1, 2, 3, 4) .collect(new Callable < ArrayList < Integer >> () {@Overridepublic ArrayList < Integer > call() throws Exception {return new ArrayList < > ();} }, new BiConsumer < ArrayList < Integer > , Integer > () {@Overridepublic void accept(ArrayList < Integer > integers, Integer integer) throws Exception {integers.add(integer);} }) .subscribe(new Consumer < ArrayList < Integer >> () {@Overridepublic void accept(ArrayList < Integer > integers) throws Exception {Log.d(TAG, "===============accept " + integers);} });打印結果:
05-22 16:47:18.257 31361-31361/com.example.rxjavademo D/chan: ===============accept [1, 2, 3, 4]3.10 count()
方法預覽:
public final Single<Long> count()有什么用?
返回被觀察者發送事件的數量。
怎么用?
Observable.just(1, 2, 3) .count() .subscribe(new Consumer < Long > () {@Overridepublic void accept(Long aLong) throws Exception {Log.d(TAG, "=======================aLong " + aLong);} });打印結果:
05-22 20:41:25.025 14126-14126/? D/chan: =======================aLong 34. 功能操作符
4.1 delay()
方法預覽:
public final Observable<T> delay(long delay, TimeUnit unit)有什么用?
延遲一段事件發送事件。
怎么用?
Observable.just(1, 2, 3) .delay(2, TimeUnit.SECONDS) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "=======================onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "=======================onNext " + integer);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {Log.d(TAG, "=======================onSubscribe");} });這里延遲了兩秒才發送事件,來看看打印結果:
05-22 20:53:43.618 16880-16880/com.example.rxjavademo D/chan: =======================onSubscribe 05-22 20:53:45.620 16880-16906/com.example.rxjavademo D/chan: =======================onNext 1 05-22 20:53:45.621 16880-16906/com.example.rxjavademo D/chan: =======================onNext 2 =======================onNext 3 =======================onSubscribe從打印結果可以看出 onSubscribe 回調2秒之后 onNext 才會回調。
4.2 doOnEach()
方法預覽:
public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification)有什么用?
Observable 每發送一件事件之前都會先回調這個方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);// e.onError(new NumberFormatException());e.onComplete();} }) .doOnEach(new Consumer < Notification < Integer >> () {@Overridepublic void accept(Notification < Integer > integerNotification) throws Exception {Log.d(TAG, "==================doOnEach " + integerNotification.getValue());} }) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 09:07:05.547 19867-19867/? D/chan: ==================onSubscribe ==================doOnEach 1 ==================onNext 1 ==================doOnEach 2 ==================onNext 2 ==================doOnEach 3 ==================onNext 3 ==================doOnEach null ==================onComplete從結果就可以看出每發送一個事件之前都會回調 doOnEach 方法,并且可以取出 onNext() 發送的值。
4.3 doOnNext()
方法預覽:
public final Observable<T> doOnNext(Consumer<? super T> onNext)有什么用?
Observable 每發送 onNext() 之前都會先回調這個方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();} }) .doOnNext(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "==================doOnNext " + integer);} }) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 09:09:36.769 20020-20020/com.example.rxjavademo D/chan: ==================onSubscribe ==================doOnNext 1 ==================onNext 1 ==================doOnNext 2 ==================onNext 2 ==================doOnNext 3 ==================onNext 3 ==================onComplete4.4 doAfterNext()
方法預覽:
public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)有什么用?
Observable 每發送 onNext() 之后都會回調這個方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();} }) .doAfterNext(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "==================doAfterNext " + integer);} }) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 09:15:49.215 20432-20432/com.example.rxjavademo D/chan: ==================onSubscribe ==================onNext 1 ==================doAfterNext 1 ==================onNext 2 ==================doAfterNext 2 ==================onNext 3 ==================doAfterNext 3 ==================onComplete4.5 doOnComplete()
方法預覽:
public final Observable<T> doOnComplete(Action onComplete)有什么用?
Observable 每發送 onComplete() 之前都會回調這個方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();} }) .doOnComplete(new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "==================doOnComplete ");} }) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 09:32:18.031 20751-20751/? D/chan: ==================onSubscribe ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================doOnComplete ==================onComplete4.6 doOnError()
方法預覽:
public final Observable<T> doOnError(Consumer<? super Throwable> onError)有什么用?
Observable 每發送 onError() 之前都會回調這個方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onError(new NullPointerException());} }) .doOnError(new Consumer < Throwable > () {@Overridepublic void accept(Throwable throwable) throws Exception {Log.d(TAG, "==================doOnError " + throwable);} }) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 09:35:04.150 21051-21051/? D/chan: ==================onSubscribe ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================doOnError java.lang.NullPointerException ==================onError4.7 doOnSubscribe()
方法預覽:
public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)有什么用?
Observable 每發送 onSubscribe() 之前都會回調這個方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();} }) .doOnSubscribe(new Consumer < Disposable > () {@Overridepublic void accept(Disposable disposable) throws Exception {Log.d(TAG, "==================doOnSubscribe ");} }) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 09:39:25.778 21245-21245/? D/chan: ==================doOnSubscribe ==================onSubscribe ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onComplete?
4.8 doOnDispose()
方法預覽:
public final Observable<T> doOnDispose(Action onDispose)有什么用?
當調用 Disposable 的 dispose() 之后回調該方法。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();} }) .doOnDispose(new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "==================doOnDispose ");} }) .subscribe(new Observer < Integer > () {private Disposable d;@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");this.d = d;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);d.dispose();}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 09:55:48.122 22023-22023/com.example.rxjavademo D/chan: ==================onSubscribe ==================onNext 1 ==================doOnDispose4.9 doOnLifecycle()
方法預覽:
public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)有什么用?
在回調 onSubscribe 之前回調該方法的第一個參數的回調方法,可以使用該回調方法決定是否取消訂閱。
怎么用?
doOnLifecycle() 第二個參數的回調方法的作用與 doOnDispose() 是一樣的,現在用下面的例子來講解:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();} }) .doOnLifecycle(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) throws Exception {Log.d(TAG, "==================doOnLifecycle accept");} }, new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "==================doOnLifecycle Action");} }) .doOnDispose(new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "==================doOnDispose Action");} }) .subscribe(new Observer<Integer>() {private Disposable d;@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");this.d = d;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);d.dispose();}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");}});打印結果:
05-23 10:20:36.345 23922-23922/? D/chan: ==================doOnLifecycle accept ==================onSubscribe ==================onNext 1 ==================doOnDispose Action ==================doOnLifecycle Action可以看到當在 onNext() 方法進行取消訂閱操作后,doOnDispose() 和 doOnLifecycle() 都會被回調。
如果使用 doOnLifecycle 進行取消訂閱,來看看打印結果:
05-23 10:32:20.014 24652-24652/com.example.rxjavademo D/chan: ==================doOnLifecycle accept ==================onSubscribe可以發現 doOnDispose Action 和 doOnLifecycle Action 都沒有被回調。
4.10 doOnTerminate() & doAfterTerminate()
方法預覽:
public final Observable<T> doOnTerminate(final Action onTerminate) public final Observable<T> doAfterTerminate(Action onFinally)有什么用?
doOnTerminate 是在 onError 或者 onComplete 發送之前回調,而 doAfterTerminate 則是 onError 或者 onComplete 發送之后回調。
怎么用?
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3); // e.onError(new NullPointerException());e.onComplete();} }) .doOnTerminate(new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "==================doOnTerminate ");} }) .subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");}});打印結果:
05-23 10:00:39.503 22398-22398/com.example.rxjavademo D/chan: ==================onSubscribe ==================onNext 1 ==================onNext 2 05-23 10:00:39.504 22398-22398/com.example.rxjavademo D/chan: ==================onNext 3 ==================doOnTerminate ==================onCompletedoAfterTerminate 也是差不多,這里就不再贅述。
4.11 doFinally()
方法預覽:
public final Observable<T> doFinally(Action onFinally)有什么用?
在所有事件發送完畢之后回調該方法。
怎么用?
這里可能你會有個問題,那就是 doFinally() 和 doAfterTerminate() 到底有什么區別?區別就是在于取消訂閱,如果取消訂閱之后 doAfterTerminate() 就不會被回調,而 doFinally() 無論怎么樣都會被回調,且都會在事件序列的最后。
現在用以下例子說明下:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();} }) .doFinally(new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "==================doFinally ");} }) .doOnDispose(new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "==================doOnDispose ");} }) .doAfterTerminate(new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "==================doAfterTerminate ");} }) .subscribe(new Observer<Integer>() {private Disposable d;@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");this.d = d;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);d.dispose();}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 10:10:10.469 23196-23196/? D/chan: ==================onSubscribe 05-23 10:10:10.470 23196-23196/? D/chan: ==================onNext 1 ==================doOnDispose ==================doFinally可以看到如果調用了 dispose() 方法,doAfterTerminate() 不會被回調。
現在試試把 dispose() 注釋掉看看,看看打印結果:
05-23 10:13:34.537 23439-23439/com.example.rxjavademo D/chan: ==================onSubscribe ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onComplete ==================doAfterTerminate ==================doFinallydoAfterTerminate() 已經成功回調,doFinally() 還是會在事件序列的最后。
4.12 onErrorReturn()
方法預覽:
public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier)有什么用?
當接受到一個 onError() 事件之后回調,返回的值會回調 onNext() 方法,并正常結束該事件序列。
怎么用?
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onError(new NullPointerException());} }) .onErrorReturn(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {Log.d(TAG, "==================onErrorReturn " + throwable);return 404;} }) .subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 18:35:18.175 19239-19239/? D/chan: ==================onSubscribe ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onErrorReturn java.lang.NullPointerException ==================onNext 404 ==================onComplete4.13 onErrorResumeNext()
方法預覽:
public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction)有什么用?
當接收到 onError() 事件時,返回一個新的 Observable,并正常結束事件序列。
怎么用?
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onError(new NullPointerException());} }) .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {@Overridepublic ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {Log.d(TAG, "==================onErrorResumeNext " + throwable);return Observable.just(4, 5, 6);} }) .subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 18:43:10.910 26469-26469/? D/chan: ==================onSubscribe ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onErrorResumeNext java.lang.NullPointerException ==================onNext 4 ==================onNext 5 ==================onNext 6 ==================onComplete4.14 onExceptionResumeNext()
方法預覽:
public final Observable<T> onExceptionResumeNext(final ObservableSource<? extends T> next)有什么用?
與 onErrorResumeNext() 作用基本一致,但是這個方法只能捕捉 Exception。
怎么用?
先來試試 onExceptionResumeNext() 是否能捕捉 Error。
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onError(new Error("404"));} }) .onExceptionResumeNext(new Observable<Integer>() {@Overrideprotected void subscribeActual(Observer<? super Integer> observer) {observer.onNext(333);observer.onComplete();} }) .subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 22:23:08.873 1062-1062/com.example.louder.rxjavademo D/chan: ==================onSubscribe 05-23 22:23:08.874 1062-1062/com.example.louder.rxjavademo D/chan: ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onError從打印結果可以知道,觀察者收到 onError() 事件,證明 onErrorResumeNext() 不能捕捉 Error 事件。
將被觀察者的 e.onError(new Error("404")) 改為 e.onError(new Exception("404")),現在看看是否能捕捉 Exception 事件:
05-23 22:32:14.563 10487-10487/com.example.louder.rxjavademo D/chan: ==================onSubscribe ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onNext 333 ==================onComplete從打印結果可以知道,這個方法成功捕獲 Exception 事件。
4.15 retry()
方法預覽:
public final Observable<T> retry(long times) ......有什么用?
如果出現錯誤事件,則會重新發送所有事件序列。times 是代表重新發的次數。
怎么用?
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onError(new Exception("404"));} }) .retry(2) .subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 22:46:18.537 22239-22239/com.example.louder.rxjavademo D/chan: ==================onSubscribe 05-23 22:46:18.538 22239-22239/com.example.louder.rxjavademo D/chan: ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onError4.16 retryUntil()
方法預覽:
public final Observable<T> retryUntil(final BooleanSupplier stop)有什么用?
出現錯誤事件之后,可以通過此方法判斷是否繼續發送事件。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onError(new Exception("404"));} }) .retryUntil(new BooleanSupplier() {@Overridepublic boolean getAsBoolean() throws Exception {if (i == 6) {return true;}return false;} }) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {i += integer;Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-23 22:57:32.905 23063-23063/com.example.louder.rxjavademo D/chan: ==================onSubscribe 05-23 22:57:32.906 23063-23063/com.example.louder.rxjavademo D/chan: ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onError?
4.17 retryWhen()
方法預覽:
public final void safeSubscribe(Observer<? super T> s)有什么用?
當被觀察者接收到異常或者錯誤事件時會回調該方法,這個方法會返回一個新的被觀察者。如果返回的被觀察者發送 Error 事件則之前的被觀察者不會繼續發送事件,如果發送正常事件則之前的被觀察者會繼續不斷重試發送事件。
怎么用?
Observable.create(new ObservableOnSubscribe < String > () {@Overridepublic void subscribe(ObservableEmitter < String > e) throws Exception {e.onNext("chan");e.onNext("ze");e.onNext("de");e.onError(new Exception("404"));e.onNext("haha");} }) .retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {@Overridepublic ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {@Overridepublic ObservableSource <? > apply(Throwable throwable) throws Exception {if(!throwable.toString().equals("java.lang.Exception: 404")) {return Observable.just("可以忽略的異常");} else {return Observable.error(new Throwable("終止啦"));}}});} }) .subscribe(new Observer < String > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(String s) {Log.d(TAG, "==================onNext " + s);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError " + e.toString());}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-24 09:13:25.622 28372-28372/com.example.rxjavademo D/chan: ==================onSubscribe 05-24 09:13:25.623 28372-28372/com.example.rxjavademo D/chan: ==================onNext chan ==================onNext ze ==================onNext de 05-24 09:13:25.624 28372-28372/com.example.rxjavademo D/chan: ==================onError java.lang.Throwable: 終止啦將 onError(new Exception("404")) 改為 onError(new Exception("303")) 看看打印結果:
==================onNext chan 05-24 09:54:08.653 29694-29694/? D/chan: ==================onNext ze ==================onNext de ==================onNext chan ==================onNext ze ==================onNext de ==================onNext chan ==================onNext ze ==================onNext de ==================onNext chan ==================onNext ze ==================onNext de ==================onNext chan ==================onNext ze ==================onNext de ==================onNext chan ......從結果可以看出,會不斷重復發送消息。
4.18 repeat()
方法預覽:
public final Observable<T> repeat(long times) ......有什么用?
重復發送被觀察者的事件,times 為發送次數。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();} }) .repeat(2) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "===================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "===================onNext " + integer);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {Log.d(TAG, "===================onComplete ");} });打印結果:
05-24 11:33:29.565 8544-8544/com.example.rxjavademo D/chan: ===================onSubscribe ===================onNext 1 ===================onNext 2 ===================onNext 3 ===================onNext 1 ===================onNext 2 ===================onNext 3 05-24 11:33:29.565 8544-8544/com.example.rxjavademo D/chan: ===================onComplete從結果可以看出,該事件發送了兩次。
4.19 repeatWhen()
方法預覽:
public final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)有什么用?
這個方法可以會返回一個新的被觀察者設定一定邏輯來決定是否重復發送事件。
怎么用?
這里分三種情況,如果新的被觀察者返回 onComplete 或者 onError 事件,則舊的被觀察者不會繼續發送事件。如果被觀察者返回其他事件,則會重復發送事件。
現在試驗發送 onComplete 事件,代碼如下:
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();} }) .repeatWhen(new Function < Observable < Object > , ObservableSource <? >> () {@Overridepublic ObservableSource <? > apply(Observable < Object > objectObservable) throws Exception {return Observable.empty();// return Observable.error(new Exception("404"));// return Observable.just(4); null;} }) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "===================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "===================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "===================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "===================onComplete ");} });?
打印結果:
05-24 11:44:33.486 9379-9379/com.example.rxjavademo D/chan: ===================onSubscribe 05-24 11:44:33.487 9379-9379/com.example.rxjavademo D/chan: ===================onComplete下面直接看看發送 onError 事件和其他事件的打印結果。
發送 onError 打印結果:
05-24 11:46:29.507 9561-9561/com.example.rxjavademo D/chan: ===================onSubscribe 05-24 11:46:29.508 9561-9561/com.example.rxjavademo D/chan: ===================onError發送其他事件的打印結果:
05-24 11:48:35.844 9752-9752/com.example.rxjavademo D/chan: ===================onSubscribe ===================onNext 1 ===================onNext 2 ===================onNext 3 ===================onComplete4.20 subscribeOn()
方法預覽:
public final Observable<T> subscribeOn(Scheduler scheduler)有什么用?
指定被觀察者的線程,要注意的時,如果多次調用此方法,只有第一次有效。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();} }) //.subscribeOn(Schedulers.newThread()) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "======================onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "======================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "======================onError");}@Overridepublic void onComplete() {Log.d(TAG, "======================onComplete");} });現在不調用 subscribeOn() 方法,來看看打印結果:
05-26 10:40:42.246 21466-21466/? D/chan: ======================onSubscribe 05-26 10:40:42.247 21466-21466/? D/chan: =========================currentThread name: main ======================onNext 1 ======================onNext 2 ======================onNext 3 ======================onComplete可以看到打印被觀察者的線程名字是主線程。
接著調用 subscribeOn(Schedulers.newThread()) 來看看打印結果:
05-26 10:43:26.964 22530-22530/com.example.rxjavademo D/chan: ======================onSubscribe 05-26 10:43:26.966 22530-22569/com.example.rxjavademo D/chan: =========================currentThread name: RxNewThreadScheduler-1 05-26 10:43:26.967 22530-22569/com.example.rxjavademo D/chan: ======================onNext 1 ======================onNext 2 ======================onNext 3 ======================onComplete可以看到打印結果被觀察者是在一條新的線程。
現在看看多次調用會不會有效,代碼如下:
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();} }) .subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.newThread()) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "======================onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "======================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "======================onError");}@Overridepublic void onComplete() {Log.d(TAG, "======================onComplete");} });打印結果:
05-26 10:47:20.925 23590-23590/com.example.rxjavademo D/chan: ======================onSubscribe 05-26 10:47:20.930 23590-23629/com.example.rxjavademo D/chan: =========================currentThread name: RxComputationThreadPool-1 ======================onNext 1 ======================onNext 2 ======================onNext 3 ======================onComplete可以看到第二次調動的 subscribeOn(Schedulers.newThread()) 并沒有效果。
4.21 observeOn()
方法預覽:
public final Observable<T> observeOn(Scheduler scheduler)有什么用?
指定觀察者的線程,每指定一次就會生效一次。
怎么用?
Observable.just(1, 2, 3) .observeOn(Schedulers.newThread()) .flatMap(new Function < Integer, ObservableSource < String >> () {@Overridepublic ObservableSource < String > apply(Integer integer) throws Exception {Log.d(TAG, "======================flatMap Thread name " + Thread.currentThread().getName());return Observable.just("chan" + integer);} }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer < String > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "======================onSubscribe");}@Overridepublic void onNext(String s) {Log.d(TAG, "======================onNext Thread name " + Thread.currentThread().getName());Log.d(TAG, "======================onNext " + s);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "======================onError");}@Overridepublic void onComplete() {Log.d(TAG, "======================onComplete");} });打印結果:
05-26 10:58:04.593 25717-25717/com.example.rxjavademo D/chan: ======================onSubscribe 05-26 10:58:04.594 25717-25753/com.example.rxjavademo D/chan: ======================flatMap Thread name RxNewThreadScheduler-1 05-26 10:58:04.595 25717-25753/com.example.rxjavademo D/chan: ======================flatMap Thread name RxNewThreadScheduler-1 ======================flatMap Thread name RxNewThreadScheduler-1 05-26 10:58:04.617 25717-25717/com.example.rxjavademo D/chan: ======================onNext Thread name main ======================onNext chan1 ======================onNext Thread name main ======================onNext chan2 ======================onNext Thread name main ======================onNext chan3 05-26 10:58:04.618 25717-25717/com.example.rxjavademo D/chan: ======================onComplete從打印結果可以知道,observeOn 成功切換了線程。
下表總結了 RxJava 中的調度器:
| Schedulers.computation(?) | 用于使用計算任務,如事件循環和回調處理 |
| Schedulers.immediate(?) | 當前線程 |
| Schedulers.io(?) | 用于 IO 密集型任務,如果異步阻塞 IO 操作。 |
| Schedulers.newThread(?) | 創建一個新的線程 |
| AndroidSchedulers.mainThread() | Android 的 UI 線程,用于操作 UI。 |
5. 過濾操作符
5.1 filter()
方法預覽:
public final Observable<T> filter(Predicate<? super T> predicate)有什么用?
通過一定邏輯來過濾被觀察者發送的事件,如果返回 true 則會發送事件,否則不會發送。
怎么用?
Observable.just(1, 2, 3).filter(new Predicate < Integer > () {@Overridepublic boolean test(Integer integer) throws Exception {return integer < 2;} }) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {i += integer;Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });以上代碼只有小于2的事件才會發送,來看看打印結果:
05-24 22:57:32.562 12776-12776/com.example.louder.rxjavademo D/chan: ==================onSubscribe ==================onNext 1 ==================onComplete5.2 ofType()
方法預覽:
public final <U> Observable<U> ofType(final Class<U> clazz)有什么用?
可以過濾不符合該類型事件
怎么用?
Observable.just(1, 2, 3, "chan", "zhide") .ofType(Integer.class) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {i += integer;Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-24 23:04:24.752 13229-13229/? D/chan: ==================onSubscribe ==================onNext 1 ==================onNext 2 ==================onNext 3 05-24 23:04:24.753 13229-13229/? D/chan: ==================onComplete5.3 skip()
方法預覽:
public final Observable<T> skip(long count) .......有什么用?
跳過正序某些事件,count 代表跳過事件的數量
怎么用?
Observable.just(1, 2, 3) .skip(2) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {i += integer;Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-24 23:13:50.448 13831-13831/? D/chan: ==================onSubscribe 05-24 23:13:50.449 13831-13831/? D/chan: ==================onNext 3 ==================onCompleteskipLast() 作用也是跳過某些事件,不過它是用來跳過正序的后面的事件,這里就不再講解了。
5.4 distinct()
方法預覽:
public final Observable<T> distinct()有什么用?
過濾事件序列中的重復事件。
怎么用?
Observable.just(1, 2, 3, 3, 2, 1) .distinct() .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {i += integer;Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-24 23:19:44.334 14206-14206/com.example.louder.rxjavademo D/chan: ==================onSubscribe ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onComplete5.5 distinctUntilChanged()
方法預覽:
public final Observable<T> distinctUntilChanged()有什么用?
過濾掉連續重復的事件
怎么用?
Observable.just(1, 2, 3, 3, 2, 1) .distinctUntilChanged() .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {i += integer;Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-24 23:22:35.985 14424-14424/com.example.louder.rxjavademo D/chan: ==================onSubscribe ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onNext 2 ==================onNext 1 ==================onComplete因為事件序列中連續出現兩次3,所以第二次3并不會發出。
5.6 take()
方法預覽:
public final Observable<T> take(long count) ......有什么用?
控制觀察者接收的事件的數量。
怎么用?
Observable.just(1, 2, 3, 4, 5) .take(3) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "==================onSubscribe ");}@Overridepublic void onNext(Integer integer) {i += integer;Log.d(TAG, "==================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "==================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "==================onComplete ");} });打印結果:
05-24 23:28:32.899 14704-14704/? D/chan: ==================onSubscribe ==================onNext 1 ==================onNext 2 ==================onNext 3 ==================onCompletetakeLast() 的作用就是控制觀察者只能接受事件序列的后面幾件事情,這里就不再講解了,大家可以自己試試。
5.7 debounce()
方法預覽:
public final Observable<T> debounce(long timeout, TimeUnit unit) ......有什么用?
如果兩件事件發送的時間間隔小于設定的時間間隔則前一件事件就不會發送給觀察者。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);Thread.sleep(900);e.onNext(2);} }) .debounce(1, TimeUnit.SECONDS) .subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "===================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "===================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "===================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "===================onComplete ");} });打印結果:
05-25 20:39:10.512 17441-17441/com.example.rxjavademo D/chan: ===================onSubscribe 05-25 20:39:12.413 17441-17478/com.example.rxjavademo D/chan: ===================onNext 2可以看到事件1并沒有發送出去,現在將間隔時間改為1000,看看打印結果:
05-25 20:42:10.874 18196-18196/com.example.rxjavademo D/chan: ===================onSubscribe 05-25 20:42:11.875 18196-18245/com.example.rxjavademo D/chan: ===================onNext 1 05-25 20:42:12.875 18196-18245/com.example.rxjavademo D/chan: ===================onNext 2throttleWithTimeout() 與此方法的作用一樣,這里就不再贅述了。
5.8 firstElement() && lastElement()
方法預覽:
public final Maybe<T> firstElement() public final Maybe<T> lastElement()有什么用?
firstElement() 取事件序列的第一個元素,lastElement() 取事件序列的最后一個元素。
怎么用?
Observable.just(1, 2, 3, 4) .firstElement() .subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "====================firstElement " + integer);} });Observable.just(1, 2, 3, 4) .lastElement() .subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "====================lastElement " + integer);} });打印結果:
05-25 20:47:22.189 19909-19909/? D/chan: ====================firstElement 1 ====================lastElement 45.9 elementAt() & elementAtOrError()
方法預覽:
public final Maybe<T> elementAt(long index) public final Single<T> elementAtOrError(long index)有什么用?
elementAt() 可以指定取出事件序列中事件,但是輸入的 index 超出事件序列的總數的話就不會出現任何結果。這種情況下,你想發出異常信息的話就用 elementAtOrError() 。
怎么用?
Observable.just(1, 2, 3, 4) .elementAt(0) .subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "====================accept " + integer);} });打印結果:
05-25 20:56:22.266 23346-23346/com.example.rxjavademo D/chan: ====================accept 1將 elementAt() 的值改為5,這時是沒有打印結果的,因為沒有滿足條件的元素。
替換 elementAt() 為 elementAtOrError(),代碼如下:
Observable.just(1, 2, 3, 4) .elementAtOrError(5) .subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "====================accept " + integer);} });打印結果:
io.reactivex.exceptions.OnErrorNotImplementedException at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 704) at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 701) at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java: 47) at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117) at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110) at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36) at io.reactivex.Observable.subscribe(Observable.java: 10903) at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37) at io.reactivex.Single.subscribe(Single.java: 2707) at io.reactivex.Single.subscribe(Single.java: 2693) at io.reactivex.Single.subscribe(Single.java: 2664) at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103) at android.app.Activity.performCreate(Activity.java: 6942) at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126) at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880) at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988) at android.app.ActivityThread. - wrap14(ActivityThread.java) at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631) at android.os.Handler.dispatchMessage(Handler.java: 102) at android.os.Looper.loop(Looper.java: 154) at android.app.ActivityThread.main(ActivityThread.java: 6682) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410) Caused by: java.util.NoSuchElementException at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117)? at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110)? at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36)? at io.reactivex.Observable.subscribe(Observable.java: 10903)? at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37)? at io.reactivex.Single.subscribe(Single.java: 2707)? at io.reactivex.Single.subscribe(Single.java: 2693)? at io.reactivex.Single.subscribe(Single.java: 2664)? at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103)? at android.app.Activity.performCreate(Activity.java: 6942)? at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126)? at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880)? at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988)? at android.app.ActivityThread. - wrap14(ActivityThread.java)? at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631)? at android.os.Handler.dispatchMessage(Handler.java: 102)? at android.os.Looper.loop(Looper.java: 154)? at android.app.ActivityThread.main(ActivityThread.java: 6682)? at java.lang.reflect.Method.invoke(Native Method)? at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520)? at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410)? 復制代碼這時候會拋出 NoSuchElementException 異常。
6. 條件操作符
6.1 all()
方法預覽:
public final Observable<T> ambWith(ObservableSource<? extends T> other)有什么用?
判斷事件序列是否全部滿足某個事件,如果都滿足則返回 true,反之則返回 false。
怎么用?
Observable.just(1, 2, 3, 4) .all(new Predicate < Integer > () {@Overridepublic boolean test(Integer integer) throws Exception {return integer < 5;} }) .subscribe(new Consumer < Boolean > () {@Overridepublic void accept(Boolean aBoolean) throws Exception {Log.d(TAG, "==================aBoolean " + aBoolean);} });打印結果:
05-26 09:39:51.644 1482-1482/com.example.rxjavademo D/chan: ==================aBoolean true6.2 takeWhile()
方法預覽:
public final Observable<T> takeWhile(Predicate<? super T> predicate)有什么用?
可以設置條件,當某個數據滿足條件時就會發送該數據,反之則不發送。
怎么用?
Observable.just(1, 2, 3, 4) .takeWhile(new Predicate < Integer > () {@Overridepublic boolean test(Integer integer) throws Exception {return integer < 3;} }) .subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "========================integer " + integer);} });打印結果:
05-26 09:43:14.634 3648-3648/com.example.rxjavademo D/chan: ========================integer 1 ========================integer 26.3 skipWhile()
方法預覽:
public final Observable<T> skipWhile(Predicate<? super T> predicate)有什么用?
可以設置條件,當某個數據滿足條件時不發送該數據,反之則發送。
怎么用?
Observable.just(1, 2, 3, 4) .skipWhile(new Predicate < Integer > () {@Overridepublic boolean test(Integer integer) throws Exception {return integer < 3;} }) .subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "========================integer " + integer);} });打印結果:
05-26 09:47:32.653 4861-4861/com.example.rxjavademo D/chan: ========================integer 3 ========================integer 46.4 takeUntil()
方法預覽:
public final Observable<T> takeUntil(Predicate<? super T> stopPredicate有什么用?
可以設置條件,當事件滿足此條件時,下一次的事件就不會被發送了。
怎么用?
Observable.just(1, 2, 3, 4, 5, 6) .takeUntil(new Predicate < Integer > () {@Overridepublic boolean test(Integer integer) throws Exception {return integer > 3;} }) .subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "========================integer " + integer);} });打印結果:
05-26 09:55:12.918 7933-7933/com.example.rxjavademo D/chan: ========================integer 1 ========================integer 2 05-26 09:55:12.919 7933-7933/com.example.rxjavademo D/chan: ========================integer 3 ========================integer 46.5 skipUntil()
方法預覽:
public final <U> Observable<T> skipUntil(ObservableSource<U> other)有什么用?
當 skipUntil() 中的 Observable 發送事件了,原來的 Observable 才會發送事件給觀察者。
怎么用?
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS) .skipUntil(Observable.intervalRange(6, 5, 3, 1, TimeUnit.SECONDS)) .subscribe(new Observer < Long > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "========================onSubscribe ");}@Overridepublic void onNext(Long along) {Log.d(TAG, "========================onNext " + along);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "========================onError ");}@Overridepublic void onComplete() {Log.d(TAG, "========================onComplete ");} });打印結果:
05-26 10:08:50.574 13023-13023/com.example.rxjavademo D/chan: ========================onSubscribe 05-26 10:08:53.576 13023-13054/com.example.rxjavademo D/chan: ========================onNext 4 05-26 10:08:54.576 13023-13054/com.example.rxjavademo D/chan: ========================onNext 5 ========================onComplete從結果可以看出,skipUntil() 里的 Observable 并不會發送事件給觀察者。
6.6 sequenceEqual()
方法預覽:
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2) ......有什么用?
判斷兩個 Observable 發送的事件是否相同。
怎么用?
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3)) .subscribe(new Consumer < Boolean > () {@Overridepublic void accept(Boolean aBoolean) throws Exception {Log.d(TAG, "========================onNext " + aBoolean);} });打印結果:
05-26 10:11:45.975 14157-14157/? D/chan: ========================onNext true6.7 contains()
方法預覽:
public final Single<Boolean> contains(final Object element)有什么用?
判斷事件序列中是否含有某個元素,如果有則返回 true,如果沒有則返回 false。
怎么用?
Observable.just(1, 2, 3) .contains(3) .subscribe(new Consumer < Boolean > () {@Overridepublic void accept(Boolean aBoolean) throws Exception {Log.d(TAG, "========================onNext " + aBoolean);} });打印結果:
05-26 10:14:23.522 15085-15085/com.example.rxjavademo D/chan: ========================onNext true6.8 isEmpty()
方法預覽:
public final Single<Boolean> isEmpty()有什么用?
判斷事件序列是否為空。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onComplete();} }) .isEmpty() .subscribe(new Consumer < Boolean > () {@Overridepublic void accept(Boolean aBoolean) throws Exception {Log.d(TAG, "========================onNext " + aBoolean);} });打印結果:
05-26 10:17:16.725 16109-16109/com.example.rxjavademo D/chan: ========================onNext true6.9 amb()
方法預覽:
public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)有什么用?
amb() 要傳入一個 Observable 集合,但是只會發送最先發送事件的 Observable 中的事件,其余 Observable 將會被丟棄。
怎么用?
ArrayList < Observable < Long >> list = new ArrayList < > ();list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS)); list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS));Observable.amb(list) .subscribe(new Consumer < Long > () {@Overridepublic void accept(Long aLong) throws Exception {Log.d(TAG, "========================aLong " + aLong);} });打印結果:
05-26 10:21:29.580 17185-17219/com.example.rxjavademo D/chan: ========================aLong 6 05-26 10:21:30.580 17185-17219/com.example.rxjavademo D/chan: ========================aLong 7 05-26 10:21:31.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 8 05-26 10:21:32.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 9 05-26 10:21:33.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 106.10 defaultIfEmpty()
方法預覽:
public final Observable<T> defaultIfEmpty(T defaultItem)有什么用?
如果觀察者只發送一個 onComplete() 事件,則可以利用這個方法發送一個值。
怎么用?
Observable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onComplete();} }) .defaultIfEmpty(666) .subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "========================onNext " + integer);} });打印結果:
05-26 10:26:56.376 19249-19249/com.example.rxjavademo D/chan: ========================onNext 666RxJava 常見的使用方式都已經介紹的差不多,相信大家如果都掌握這些操作符的用法的話,那么使用 RxJava 將不會再是難題了。
?
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
- 上一篇: Retrofit 2.0
- 下一篇: Math Adventures with