java rx.observable_Rxjava2 Observable的条件操作符详解及实例
簡要:
需求了解:
在使用 Rxjava 開發中,經常有一些各種條件的操作 ,如比較兩個 Observable 誰先發射了數據、跳過指定條件的 Observable 等一系列的條件操作需求,那么很幸運, Rxjava 中已經有了很多條件操作符,一起來了解一下吧。
下面列出了一些Rxjava的用于條件操作符:
Amb:給定兩個或多個Observables,它只發射首先發射數據或通知的那個Observable的所有數據。
DefaultIfEmpty:發射來自原始Observable的值,如果原始 Observable 沒有發射任何數據項,就發射一個默認值。
SwitchIfEmpty:如果原始Observable沒有發射數據時,發射切換一個指定的Observable繼續發射數據。
SkipUntil:丟棄原始 Observable 發射的數據,直到第二個 Observable 發射了一個數據,然后發射原始 Observable 的剩余數據。
SkipWhile:丟棄原始 Observable 發射的數據,直到一個特定的條件為假,然后發射原始 Observable 剩余的數據。
TakeUntil:發射來自原始 Observable 的數據,直到第二個 Observable 發射了一個數據或一個通知。
1. Amb
給定兩個或多個Observables,它只發射首先發射數據或通知的那個Observable的所有數據。
解析: 對多個Observable進行監聽,首先發射通知(包括數據)的Observable將會被觀察者觀察,發射這個Observable的所有數據。
示例代碼:
// 創建Observable
Observable delayObservable = Observable.range(1, 5)
.delay(100, TimeUnit.MILLISECONDS); // 延遲100毫秒發射數據
Observable rangeObservable = Observable.range(6, 5);
// 創建Observable的集合
ArrayList> list = new ArrayList<>();
list.add(delayObservable);
list.add(rangeObservable);
// 創建Observable的數組
Observable[] array = new Observable[2];
array[0] = delayObservable;
array[1] = rangeObservable;
/**
* 1. ambWith(ObservableSource extends T> other)
* 與另外一個Observable比較,只發射首先發射通知的Observable的數據
*/
rangeObservable.ambWith(delayObservable)
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(1): " + integer);
}
});
System.in.read();
System.out.println("------------------------------------------------");
/**
* 2. amb(Iterable extends ObservableSource extends T>> sources)
* 接受一個Observable類型的集合, 只發射集合中首先發射通知的Observable的數據
*/
Observable.amb(list)
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(2): " + integer);
}
});
System.in.read();
System.out.println("------------------------------------------------");
/**
* 3. ambArray(ObservableSource extends T>... sources)
* 接受一個Observable類型的數組, 只發射數組中首先發射通知的Observable的數據
*/
Observable.ambArray(array)
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("--> accept(3): " + integer);
}
});
System.in.read();
輸出:
--> accept(1): 6
--> accept(1): 7
--> accept(1): 8
--> accept(1): 9
--> accept(1): 10
------------------------------------------------
--> accept(2): 6
--> accept(2): 7
--> accept(2): 8
--> accept(2): 9
--> accept(2): 10
------------------------------------------------
--> accept(3): 6
--> accept(3): 7
--> accept(3): 8
--> accept(3): 9
--> accept(3): 10
2. DefaultIfEmpty
發射來自原始Observable的值,如果原始 Observable 沒有發射數據項,就發射一個默認值。
解析: DefaultIfEmpty 簡單的精確地發射原始Observable的值,如果原始Observable沒有發射任何數據正常終止(以 onCompleted 的形式), DefaultIfEmpty 返回的Observable就發射一個你提供的默認值。如果你需要發射更多的數據,或者切換備用的Observable,你可以考慮使用 switchIfEmpty 操作符 。
示例代碼:
/**
* defaultIfEmpty(@NotNull T defaultItem)
* 如果原始Observable沒有發射任何數據正常終止(以 onCompleted 的形式),
* DefaultIfEmpty 返回的Observable就發射一個你提供的默認值defaultItem。
*/
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onComplete(); // 不發射任何數據,直接發射完成通知
}
}).defaultIfEmpty("No Data emitter!!!")
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println("--> onNext: " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError: " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete");
}
});
輸出:
--> onSubscribe
--> onNext: No Data emitter!!!
--> onComplete
3. SwitchIfEmpty
如果原始Observable沒有發射數據時,發射切換一個指定的Observable繼續發射數據。
解析: 如果原始 Observable 沒有發射數據時,發射切換指定的 other 繼續發射數據。
示例代碼:
/**
* switchIfEmpty(ObservableSource other)
* 如果原始Observable沒有發射數據時,發射切換指定的other繼續發射數據
*/
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onComplete(); // 不發射任何數據,直接發射完成通知
}
}).switchIfEmpty(Observable.just(888)) // 如果原始Observable沒有發射數據項,默認發射備用的Observable,發射數據項888
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext: " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError: " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete");
}
});
輸出:
--> onSubscribe
--> onNext: 888
--> onComplete
4. SkipUntil
丟棄原始 Observable 發射的數據,直到第二個 Observable 發射了一個數據,然后發射原始 Observable 的剩余數據。
示例代碼:
/**
* skipUntil(ObservableSource other)
* 丟棄原始Observable發射的數據,直到other發射了一個數據,然后發射原始Observable的剩余數據。
*/
Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)
// 丟棄2000毫秒的原始Observable發射的數據,接受后面的剩余部分數據
.skipUntil(Observable.timer(2000, TimeUnit.MILLISECONDS))
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe");
}
@Override
public void onNext(Long aLong) {
System.out.println("--> onNext: " + aLong);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError: " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete");
}
});
System.in.read();
輸出:
--> onSubscribe
--> onNext: 5
--> onNext: 6
--> onNext: 7
--> onNext: 8
--> onNext: 9
--> onNext: 10
--> onComplete
5. SkipWhile
丟棄原始 Observable 發射的數據,直到一個特定的條件為假,然后發射原始 Observable 剩余的數據。
示例代碼:
/**
* skipWhile(Predicate super T> predicate)
* 丟棄原始 Observable 發射的數據,直到函數predicate的條件為假,然后發射原始Observable剩余的數據。
*/
Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)
.skipWhile(new Predicate() {
@Override
public boolean test(Long aLong) throws Exception {
if (aLong > 5) {
return false; // 當原始數據大于5時,發射后面的剩余部分數據
}
return true; // 丟棄原始數據項
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe");
}
@Override
public void onNext(Long aLong) {
System.out.println("--> onNext: " + aLong);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError: " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete");
}
});
System.in.read();
輸出:
--> onSubscribe
--> onNext: 6
--> onNext: 7
--> onNext: 8
--> onNext: 9
--> onNext: 10
--> onComplete
6. TakeUntil
發射來自原始 Observable 的數據,直到第二個 Observable 發射了一個數據或一個通知。
6.1 takeUntil(ObservableSource other)
TakeUntil 訂閱并開始發射原始 Observable,它還監視你提供的第二個 Observable。如果第二個 Observable 發射了一項數據或者發射了一個終止通知,TakeUntil 返回的 Observable 會停止發射原始 Observable 并終止。
解析: 第二個Observable發射一項數據或一個 onError 通知或一個 onCompleted 通知都會導致 takeUntil 停止發射數據。
示例代碼:
// 創建Observable,發送數字1~10,每間隔200毫秒發射一個數據
Observable observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);
/**
* 1. takeUntil(ObservableSource other)
* 發射來自原始Observable的數據,直到other發射了一個數據或一個通知后停止發射原始Observable并終止。
*/
observable.takeUntil(Observable.timer(1000, TimeUnit.MILLISECONDS)) // 1000毫秒后停止發射原始數據
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(1)");
}
@Override
public void onNext(Long aLong) {
System.out.println("--> onNext(1): " + aLong);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(1): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(1)");
}
});
System.in.read();
輸出:
--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
--> onNext(1): 4
--> onNext(1): 5
--> onComplete(1)
6.2 takeUntil(Predicate stopPredicate)
每次發射數據后,通過一個謂詞函數來判定是否需要終止發射數據。
解析: 每次發射數據后,通過一個謂詞函數 stopPredicate 來判定是否需要終止發射數據,如果 stopPredicate 返回 true 怎表示停止發射原始Observable后面的數據,否則繼續發射后面的數據。
示例代碼:
/**
* 2. takeUntil(Predicate super T> stopPredicate)
* 每次發射數據后,通過一個謂詞函數stopPredicate來判定是否需要終止發射數據
* 如果stopPredicate返回true怎表示停止發射后面的數據,否則繼續發射后面的數據
*/
observable.takeUntil(new Predicate() {
@Override
public boolean test(Long aLong) throws Exception { // 函數返回false則為繼續發射原始數據,true則停止發射原始數據
if(aLong > 5){
return true; // 滿足條件后,停止發射數據
}
return false; // 繼續發射數據
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onNext(Long aLong) {
System.out.println("--> onNext(2): " + aLong);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(2): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(2)");
}
});
System.in.read();
輸出:
--> onSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
--> onNext(2): 3
--> onNext(2): 4
--> onNext(2): 5
--> onNext(2): 6
--> onComplete(2)
7. TakeWhile
發射原始Observable的數據,直到一個特定的條件,然后跳過剩余的數據。
解析: 發射原始 Observable 的數據,直到 predicate 的條件為 false ,然后跳過剩余的數據。
示例代碼:
// 創建Observable,發送數字1~10,每間隔200毫秒發射一個數據
Observable observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);
/**
* takeWhile(Predicate predicate)
* 發射原始Observable的數據,直到predicate的條件為false,然后跳過剩余的數據
*/
observable.takeWhile(new Predicate() {
@Override
public boolean test(Long aLong) throws Exception { // 函數返回值決定是否繼續發射后續的數據
if(aLong > 5){
return false; // 滿足條件后跳過后面的數據
}
return true; // 繼續發射數據
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe");
}
@Override
public void onNext(Long aLong) {
System.out.println("--> onNext: " + aLong);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError: " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete");
}
});
System.in.read();
輸出:
--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
--> onNext(1): 4
--> onNext(1): 5
--> onComplete(1)
小結
本節主要介紹了Rxjava條件操作符可以根據不同的條件進行數據的發射,變換等相關行為。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼:
總結
以上是生活随笔為你收集整理的java rx.observable_Rxjava2 Observable的条件操作符详解及实例的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: java二叉树删除子树_132-BST删
- 下一篇: java枚举怎么编译不行的_java枚举
