retrofit content-length为0_大佬们,一波RxJava 3.0来袭,请做好准备~
本文作者
作者:新小夢
鏈接:
https://juejin.im/post/5d1eeffe6fb9a07f0870b4e8
本文由作者授權發布。
0前言每個Android開發者,都是愛RxJava的,簡潔線程切換和多網絡請求合并,再配合Retrofit,簡直是APP開發的福音。
不知不覺,RxJava一路走來,已經更新到第三大版本了。
不像RxJava 2對RxJava 1那么殘忍,RxJava 3對RxJava 2的兼容性還是挺好的,目前并沒有做出很大的更改。RxJava2到2020年12月31號不再提供支持,錯誤的會同時在2.x和3.x修復,但新功能只會在3.x上添加。
作為嘗鮮,趕緊品嘗吧。
1主要變化主要特點
單一依賴:Reactive-Streams
繼續支持Java 6+和Android 2.3+
修復了API錯誤和RxJava 2的許多限制
旨在替代RxJava 2,具有相對較少的二進制不兼容更改
提供Java 8 lambda友好的API
關于并發源的不同意見
異步或同步執行
參數化并發的虛擬時間和調度程序
為測試schedulers,consumers和plugin hooks提供測試和診斷支持
與RxJava 2的主要區別是:
將eagerTruncate添加到replay運算符,以便head節點將在截斷時丟失它保留的項引用
新增 X.fromSupplier()
使用 Scheduler 添加 concatMap,保證 mapper 函數的運行位置
新增 startWithItem 和 startWithIterable
ConnectableFlowable/ConnetableFlowable 重新設計
將 as() 并入 to()
更改 Maybe.defaultIfEmpty() 以返回 Single
用 Supplier 代替 Callable
將一些實驗操作符推廣到標準
從某些主題/處理器中刪除 getValues()
刪除 replay(Scheduler) 及其重載
刪除 dematerialize()
刪除 startWith(T|Iterable)
刪除 as()
刪除 Maybe.toSingle(T)
刪除 Flowable.subscribe(4 args)
刪除 Observable.subscribe(4 args)
刪除 Single.toCompletable()
刪除 Completable.blockingGet()
1、添加依賴
implementation?"io.reactivex.rxjava3:rxjava:3.0.0-RC0"2、一些概念
2.1、上流、下流
在RxJava,數據以流的方式組織。也就是說,Rxjava包括一個源的數據流,數據流后跟著消費者的零個到多個消費數據流步驟。
source??.operator1()
??.operator2()
??.operator3()
??.subscribe(consumer)
在上文代碼中,對于operator2來說,在它前面叫做上流,在它后面的叫做下流。憋住,別笑,真的是下流來的。
2.2、流的對象
在RxJava的文檔中,emission, emits, item, event, signal, data and message都被認為在數據流中被傳遞的數據對象。
2.3、背壓(Backpressure)
當數據流通過異步的步驟執行時,這些步驟的執行速度可能不一致。也就是說上流數據發送太快,下流沒有足夠的能力去處理。為了避免這種情況,一般要么緩存上流的數據,要么拋棄數據。但這種處理方式,有時會帶來很大的問題。
為此,RxJava帶來了backpressure的概念。背壓是一種流量的控制步驟,在不知道上流還有多少數據的情形下控制內存的使用,表示它們還能處理多少數據。
支持背壓的有Flowable類,不支持背壓的有Observable,Single, Maybe and Completable類。
2.4 線程調度器(Schedulers)
對于我們Android開發來說,最喜歡的就是它簡潔切換線程的操作。RxJava通過調度器來方便線程的切換。
Schedulers.computation(): 適合運行在密集計算的操作,大多數異步操作符使用該調度器。
Schedulers.io():適合運行I/0和阻塞操作.
Schedulers.single():適合需要單一線程的操作
Schedulers.trampoline(): 適合需要順序運行的操作
在不同平臺還有不同的調度器,例如Android的主線程:AndroidSchedulers.mainThread()
Flowable.range(1,?10)??.observeOn(Schedulers.computation())
??.map(v?->?v?*?v)
??.blockingSubscribe(System.out::println);
2.5 基類
在 RxJava 3 可以發現有以下幾個基類(跟RxJava 2是一致的吧):
io.reactivex.Flowable:發送0個N個的數據,支持Reactive-Streams和背壓
io.reactivex.Observable:發送0個N個的數據,不支持背壓,
io.reactivex.Single:只能發送單個數據或者一個錯誤
io.reactivex.Completable:沒有發送任何數據,但只處理 onComplete 和 onError 事件。
io.reactivex.Maybe:能夠發射0或者1個數據,要么成功,要么失敗。
下文關于操作符內容太多了。
等需要了,再來查閱
下班時間還是好好護發吧
https://github.com/GitCode8/GitCode/blob/master/README.md
3操作符:實用操作符1、ObserveOn
指定觀察者的線程,例如在Android訪問網絡后,數據需要主線程消費,那么將觀察者的線程切換到主線就需要ObserveOn操作符。每次指定一次都會生效。
2、subscribeOn
指定被觀察者的線程,即數據源發生的線程。例如在Android訪問網絡時,需要將線程切換到子線程。多次指定只有第一次有效。
3、doOnEach
數據源(Observable)每發送一次數據,就調用一次。
4、doOnNext
數據源每次調用onNext() 之前都會先回調該方法。
5、doOnError
數據源每次調用onError() 之前會回調該方法。
6、doOnComplete
數據源每次調用onComplete() 之前會回調該方法
7、doOnSubscribe
數據源每次調用onSubscribe() 之后會回調該方法
8、doOnDispose
數據源每次調用dispose() 之后會回調該方法
其他的見官網吧,不難
https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators
對數據源過濾操作符
主要講對數據源進行選擇和過濾的常用操作符
1、skip(跳過)
可以作用于Flowable,Observable,表示源發射數據前,跳過多少個。例如下面跳過前四個:
Observable?source?=?Observable.just(1,?2,?3,?4,?5,?6,?7,?8,?9,?10);source.skip(4)
????.subscribe(System.out::print);
打印結果:5678910
Observable?source?=?Observable.just(1,?2,?3,?4,?5,?6,?7,?8,?9,?10);
source.skipLast(4)
????.subscribe(System.out::print);
打印結果:1 2 3 4 5 6
skipLast(n)操作表示從流的尾部跳過n個元素。
2、debounce(去抖動)
可作用于Flowable,Observable。在Android開發,通常為了防止用戶重復點擊而設置標記位,而通過RxJava的debounce操作符可以有效達到該效果。在規定時間內,用戶重復點擊只有最后一次有效,
Observable?source?=?Observable.create(emitter?->?{????emitter.onNext("A");
????Thread.sleep(1_500);
????emitter.onNext("B");
????Thread.sleep(500);
????emitter.onNext("C");
????Thread.sleep(250);
????emitter.onNext("D");
????Thread.sleep(2_000);
????emitter.onNext("E");
????emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
????????.debounce(1,?TimeUnit.SECONDS)
????????.blockingSubscribe(
????????????????item?->?System.out.print(item+"?"),
????????????????Throwable::printStackTrace,
????????????????()?->?System.out.println("onComplete"));
打印:A D E onComplete
上文代碼中,數據源以一定的時間間隔發送A,B,C,D,E。操作符debounce的時間設為1秒,發送A后1.5秒并沒有發射其他數據,所以A能成功發射。
發射B后,在1秒之內,又發射了C和D,在D之后的2秒才發射E,所有B、C都失效,只有D有效;而E之后已經沒有其他數據流了,所有E有效。
3、distinct(去重)
可作用于Flowable,Observable,去掉數據源重復的數據。
Observable.just(2,?3,?4,?4,?2,?1)????????.distinct()
????????.subscribe(System.out::print);
//?打印:2?3?4?1
Observable.just(1,?1,?2,?1,?2,?3,?3,?4)
????????.distinctUntilChanged()
????????.subscribe(System.out::print);
//打印:1 2 1 2 3 4
distinctUntilChanged()去掉相鄰重復數據。
4、elementAt(獲取指定位置元素)
可作用于Flowable,Observable,從數據源獲取指定位置的元素,從0開始。
?Observable.just(2,4,3,1,5,8)????????.elementAt(0)
????????.subscribe(integer?->?
?????????Log.d("TAG","elmentAt->"+integer));
打印:2
Observable<String>?source?=?Observable.just("Kirk",?"Spock",?"Chekov",?"Sulu");
Single<String>?element?=?source.elementAtOrError(4);
element.subscribe(
????name?->?System.out.println("onSuccess?will?not?be?printed!"),
????error?->?System.out.println("onError:?"?+?error));
打印:onSuccess will not?be?printed!
elementAtOrError:指定元素的位置超過數據長度,則發射異常。
5、filter(過濾)
可作用于 Flowable,Observable,Maybe,Single。在filter中返回表示發射該元素,返回false表示過濾該數據。
Observable.just(1,?2,?3,?4,?5,?6)????????.filter(x?->?x?%?2?==?0)
????????.subscribe(System.out::print);
打印:2?4?6
6、first(第一個)
作用于 Flowable,Observable。發射數據源第一個數據,如果沒有則發送默認值。
Observable<String>?source?=?Observable.just("A",?"B",?"C");Single<String>?firstOrDefault?=?source.first("D");
firstOrDefault.subscribe(System.out::println);
打印:A
Observable<String>?emptySource?=?Observable.empty();
Single<String>?firstOrError?=?emptySource.firstOrError();
firstOrError.subscribe(
????????element?->?System.out.println("onSuccess?will?not?be?printed!"),
????????error?->?System.out.println("onError:?"?+?error));
打印:onError: java.util.NoSuchElementException
和firstElement的區別是first返回的是Single,而firstElement返回Maybe。firstOrError在沒有數據會返回異常。
7、last(最后一個)
last、lastElement、lastOrError與fist、firstElement、firstOrError相對應。
Observable<String>?source?=?Observable.just("A",?"B",?"C");Single<String>?lastOrDefault?=?source.last("D");
lastOrDefault.subscribe(System.out::println);
//打印:C
Observable<String>?source?=?Observable.just("A",?"B",?"C");
Maybe<String>?last?=?source.lastElement();
last.subscribe(System.out::println);
//打印:C
Observable<String>?emptySource?=?Observable.empty();
Single<String>?lastOrError?=?emptySource.lastOrError();
lastOrError.subscribe(
????????element?->?System.out.println("onSuccess?will?not?be?printed!"),
????????error?->?System.out.println("onError:?"?+?error));
//?打印:onError: java.util.NoSuchElementException
8、ignoreElements & ignoreElement(忽略元素)
ignoreElements 作用于Flowable、Observable。ignoreElement作用于Maybe、Single。兩者都是忽略掉數據,返回完成或者錯誤時間。
Single?source?=?Single.timer(1,?TimeUnit.SECONDS);Completable?completable?=?source.ignoreElement();
completable.doOnComplete(()?->?System.out.println("Done!"))
????????.blockingAwait();//?1秒后打印:Donde!
Observable?source?=?Observable.intervalRange(1,?5,?1,?1,?TimeUnit.SECONDS);
Completable?completable?=?source.ignoreElements();
completable.doOnComplete(()?->?System.out.println("Done!"))
????????.blockingAwait();//?五秒后打印:Done!
9、ofType(過濾掉類型)
作用于Flowable、Observable、Maybe、過濾掉類型。
Observable?numbers?=?Observable.just(1,?4.0,?3,?2.71,?2f,?7);Observable?integers?=?numbers.ofType(Integer.class);
integers.subscribe((Integer?x)?->?System.out.print(x+"?"));//打印:1?3?7
10、sample
作用于Flowable、Observable,在一個周期內發射最新的數據。
Observable?source?=?Observable.create(emitter?->?{????emitter.onNext("A");
????Thread.sleep(500);
????emitter.onNext("B");
????Thread.sleep(200);
????emitter.onNext("C");
????Thread.sleep(800);
????emitter.onNext("D");
????Thread.sleep(600);
????emitter.onNext("E");
????emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
????????.sample(1,?TimeUnit.SECONDS)
????????.blockingSubscribe(
????????????????item?->?System.out.print(item+"?"),
????????????????Throwable::printStackTrace,
????????????????()?->?System.out.print("onComplete"));//?打印:C D onComplete
與debounce的區別是,sample是以時間為周期的發射,一秒又一秒內的最新數據。而debounce是最后一個有效數據開始。
11、throttleFirst & throttleLast & throttleWithTimeout
作用于Flowable、Observable。throttleLast與smaple一致,而throttleFirst是指定周期內第一個數據。throttleWithTimeout與debounce一致。
Observable?source?=?Observable.create(emitter?->?{????emitter.onNext("A");
????Thread.sleep(500);
????emitter.onNext("B");
????Thread.sleep(200);
????emitter.onNext("C");
????Thread.sleep(800);
????emitter.onNext("D");
????Thread.sleep(600);
????emitter.onNext("E");
????emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
????????.throttleFirst(1,?TimeUnit.SECONDS)
????????.blockingSubscribe(
????????????????item?->?System.out.print(item+"?"),
????????????????Throwable::printStackTrace,
????????????????()?->?System.out.print("?onComplete"));//打印:A?D?onComplete
source.subscribeOn(Schedulers.io())
????????.throttleLast(1,?TimeUnit.SECONDS)
????????.blockingSubscribe(
????????????????item?->?System.out.print(item+"?"),
????????????????Throwable::printStackTrace,
????????????????()?->?System.out.print("?onComplete"));//?打印:C?D?onComplete
12、throttleLatest
之所以拿出來單獨說,我看不懂官網的解釋。然后看別人的文章:throttleFirst+throttleLast的組合?開玩笑的吧。個人理解是:如果源的第一個數據總會被發射,然后開始周期計時,此時的效果就會跟throttleLast一致。
Observable?source?=?Observable.create(emitter?->?{????????????emitter.onNext("A");
????????????Thread.sleep(500);
????????????emitter.onNext("B");
????????????Thread.sleep(200);
????????????emitter.onNext("C");
????????????Thread.sleep(200);
????????????emitter.onNext("D");
????????????Thread.sleep(400);
????????????emitter.onNext("E");
????????????Thread.sleep(400);
????????????emitter.onNext("F");
????????????Thread.sleep(400);
????????????emitter.onNext("G");
????????????Thread.sleep(2000);
????????????emitter.onComplete();
????????});
????????source.subscribeOn(Schedulers.io())
????????.throttleLatest(1,?TimeUnit.SECONDS)
????????.blockingSubscribe(
????????????item?->?Log.e("RxJava",item),
?????????????????Throwable::printStackTrace,
????????????()?->?Log.e("RxJava","finished"));
打印結果:
13、take & takeLast
作用于Flowable、Observable,take發射前n個元素;takeLast發射后n個元素。
Observable?source?=?Observable.just(1,?2,?3,?4,?5,?6,?7,?8,?9,?10);source.take(4)
????.subscribe(System.out::print);//打印:1?2?3?4
source.takeLast(4)
????.subscribe(System.out::println);//打印:7?8?9?10
14、timeout(超時)
作用于Flowable、Observable、Maybe、Single、Completabl。后一個數據發射未在前一個元素發射后規定時間內發射則返回超時異常。
Observable?source?=?Observable.create(emitter?->?{????emitter.onNext("A");
????Thread.sleep(800);
????emitter.onNext("B");
????Thread.sleep(400);
????emitter.onNext("C");
????Thread.sleep(1200);
????emitter.onNext("D");
????emitter.onComplete();
});
source.timeout(1,?TimeUnit.SECONDS)
????????.subscribe(
????????????????item?->?System.out.println("onNext:?"?+?item),
????????????????error?->?System.out.println("onError:?"?+?error),
????????????????()?->?System.out.println("onComplete?will?not?be?printed!"));//?打印://?onNext:?A//?onNext:?B//?onNext:?C//?onError:?java.util.concurrent.TimeoutException:?
????????????The?source?did?not?signal?an?event?for?1?seconds?and?has?been?terminated.4操作符:連接操作符
通過連接操作符,將多個被觀察數據(數據源)連接在一起。
1、startWith
可作用于Flowable、Observable。將指定數據源合并在另外數據源的開頭。
Observable?names?=?Observable.just("Spock",?"McCoy");Observable?otherNames?=?Observable.just("Git",?"Code","8");
names.startWith(otherNames).subscribe(item?->?Log.d(TAG,item));
//打印:RxJava:?GitRxJava:?CodeRxJava:?8RxJava:?SpockRxJava:?McCo
2、merge
可作用所有數據源類型,用于合并多個數據源到一個數據源。
Observable<String>?names?=?Observable.just("Hello",?"world");Observable<String>?otherNames?=?Observable.just("Git",?"Code","8");
Observable.merge(names,otherNames).subscribe(name?->?Log.d(TAG,name));
//也可以是
//names.mergeWith(otherNames).subscribe(name?->?Log.d(TAG,name));
//打印:
RxJava:?Hello
RxJava:?world
RxJava:?Git
RxJava:?Code
RxJava:?8
merge在合并數據源時,如果一個合并發生異常后會立即調用觀察者的onError方法,并停止合并。可通過mergeDelayError操作符,將發生的異常留到最后處理。
Observable<String>?names?=?Observable.just("Hello",?"world");?Observable<String>?otherNames?=?Observable.just("Git",?"Code","8");
Observable<String>?error?=?Observable.error(????
????????????????????????????new?NullPointerException("Error!"));
Observable.mergeDelayError(names,error,otherNames).subscribe(
????name?->?Log.d(TAG,name),?e->Log.d(TAG,e.getMessage()));
//打印:
RxJava:?Hello
RxJava:?world
RxJava:?Git
RxJava:?Code
RxJava:?8
RxJava:?Error!
3、zip
可作用于Flowable、Observable、Maybe、Single。將多個數據源的數據一個一個的合并在一起哇。當其中一個數據源發射完事件之后,若其他數據源還有數據未發射完畢,也會停止。
Observable?names?=?Observable.just("Hello",?"world");Observable?otherNames?=?Observable.just("Git",?"Code",?"8");
names.zipWith(otherNames,?(first,?last)?->?first?+?"-"?+?last)
???????.subscribe(item?->?Log.d(TAG,?item));//打印:
RxJava:?Hello-Git
RxJava:?world-Code
4、combineLatest
可作用于Flowable, Observable。在結合不同數據源時,發射速度快的數據源最新item與較慢的相結合。如下時間線,Observable-1發射速率快,發射了65,Observable-2才發射了C, 那么兩者結合就是C5。
5、switchOnNext
一個發射多個小數據源的數據源,這些小數據源發射數據的時間發生重復時,取最新的數據源。
5操作符:?變換操作符變化數據源的數據,并轉化為新的數據源。
1、buffer
作用于Flowable、Observable。指將數據源拆解含有長度為n的list的多個數據源,不夠n的成為一個數據源。
Observable.range(0,?10)????.buffer(4)
????.subscribe((List?buffer)?->?System.out.println(buffer));//?打印://?[0,?1,?2,?3]//?[4,?5,?6,?7]//?[8,?9]
2、cast
作用于Flowable、Observable、Maybe、Single。將數據元素轉型成其他類型,轉型失敗會拋出異常。
Observable?numbers?=?Observable.just(1,?4.0,?3f,?7,?12,?4.6,?5);numbers.filter((Number?x)?->?Integer.class.isInstance(x))
????.cast(Integer.class)
????.subscribe((Integer?x)?->?System.out.println(x));//?prints://?1//?7//?12//?5
3、concatMap
作用于Flowable、Observable、Maybe。將數據源的元素作用于指定函數后,將函數的返回值有序的存在新的數據源。
Observable.range(0,?5)????.concatMap(i?->?{
????????long?delay?=?Math.round(Math.random()?*?2);
????????return?Observable.timer(delay,?TimeUnit.SECONDS).map(n?->?i);
????})
????.blockingSubscribe(System.out::print);
//?prints?01234
4、concatMapDelayError
與concatMap作用相同,只是將過程發送的所有錯誤延遲到最后處理。
Observable.intervalRange(1,?3,?0,?1,?TimeUnit.SECONDS)????.concatMapDelayError(x?->?{
????????if?(x.equals(1L))?return?Observable.error(new?IOException("Something?went?wrong!"));
????????else?return?Observable.just(x,?x?*?x);
????})
????.blockingSubscribe(
????????x?->?System.out.println("onNext:?"?+?x),
????????error?->?System.out.println("onError:?"?+?error.getMessage()));
//?prints:
//?onNext:?2
//?onNext:?4
//?onNext:?3
//?onNext:?9
//?onError:?Something?went?wrong!
5、concatMapCompletable
作用于Flowable、Observable。與contactMap類似,不過應用于函數后,返回的是CompletableSource。訂閱一次并在所有CompletableSource對象完成時返回一個Completable對象。
Observable?source?=?Observable.just(2,?1,?3);Completable?completable?=?source.concatMapCompletable(x?->?{return?Completable.timer(x,?TimeUnit.SECONDS)
????????.doOnComplete(()?->?System.out.println("Info:?Processing?of?item?\""?+?x?+?"\"?completed"));
????});
completable.doOnComplete(()?->?System.out.println("Info:?Processing?of?all?items?completed"))
????.blockingAwait();//?prints://?Info:?Processing?of?item?"2"?completed//?Info:?Processing?of?item?"1"?completed//?Info:?Processing?of?item?"3"?completed//?Info:?Processing?of?all?items?completed
6、concatMapCompletableDelayError
與concatMapCompletable作用相同,只是將過程發送的所有錯誤延遲到最后處理。
Observable?source?=?Observable.just(2,?1,?3);Completable?completable?=?source.concatMapCompletableDelayError(x?->?{if?(x.equals(2))?{return?Completable.error(new?IOException("Processing?of?item?\""?+?x?+?"\"?failed!"));
????}?else?{return?Completable.timer(1,?TimeUnit.SECONDS)
????????????.doOnComplete(()?->?System.out.println("Info:?Processing?of?item?\""?+?x?+?"\"?completed"));
????}
});
completable.doOnError(error?->?System.out.println("Error:?"?+?error.getMessage()))
????.onErrorComplete()
????.blockingAwait();//?prints://?Info:?Processing?of?item?"1"?completed//?Info:?Processing?of?item?"3"?completed//?Error:?Processing?of?item?"2"?failed!
7、flatMap
作用于Flowable、Observable、Maybe、Single。與contactMap類似,只是contactMap的數據發射是有序的,而flatMap是無序的。
Observable.just("A",?"B",?"C")????.flatMap(a?->?{
????????return?Observable.intervalRange(1,?3,?0,?1,?TimeUnit.SECONDS)
????????????????.map(b?->?'('?+?a?+?",?"?+?b?+?')');
????})
????.blockingSubscribe(System.out::println);
//?prints?(not?necessarily?in?this?order):
//?(A,?1)
//?(C,?1)
//?(B,?1)
//?(A,?2)
//?(C,?2)
//?(B,?2)
//?(A,?3)
//?(C,?3)
//?(B,?3)
8、flattenAsFlowable & flattenAsObservable
作用于Maybe、Single,將其轉化為Flowable,或Observable。
Single<Double>?source?=?Single.just(2.0);Flowable<Double>?flowable?=?source.flattenAsFlowable(x?->?{
????return?List.of(x,?Math.pow(x,?2),?Math.pow(x,?3));
});
flowable.subscribe(x?->?System.out.println("onNext:?"?+?x));
//?prints:
//?onNext:?2.0
//?onNext:?4.0
//?onNext:?8.0
9、groupBy
作用于Flowable、Observable。根據一定的規則對數據源進行分組。
Observable?animals?=?Observable.just("Tiger",?"Elephant",?"Cat",?"Chameleon",?"Frog",?"Fish",?"Turtle",?"Flamingo");animals.groupBy(animal?->?animal.charAt(0),?String::toUpperCase)
????.concatMapSingle(Observable::toList)
????.subscribe(System.out::println);//?prints://?[TIGER,?TURTLE]//?[ELEPHANT]//?[CAT,?CHAMELEON]//?[FROG,?FISH,?FLAMINGO]
10、scan
作用于Flowable、Observable。對數據進行相關聯操作,例如聚合等。
Observable.just(5,?3,?8,?1,?7)????.scan(0,?(partialSum,?x)?->?partialSum?+?x)
????.subscribe(System.out::println);
//?prints:
//?0
//?5
//?8
//?16
//?17
//?24
11、window
對數據源發射出來的數據進行收集,按照指定的數量進行分組,以組的形式重新發射。
Observable.range(1,?4)????//?Create?windows?containing?at?most?2?items,?and?skip?3?items?before?starting?a?new?window.
????.window(2)
????.flatMapSingle(window?->?{
????????return?window.map(String::valueOf)
????????????????.reduce(new?StringJoiner(",?",?"[",?"]"),?StringJoiner::add);
????})
????.subscribe(System.out::println);
//?prints:
//?[1,?2]
//?[3,?4]
6操作符:?錯誤處理操作符
1、onErrorReturn
作用于Flowable、Observable、Maybe、Single。但調用數據源的onError函數后會回到該函數,可對錯誤進行處理,然后返回值,會調用觀察者onNext()繼續執行,執行完調用onComplete()函數結束所有事件的發射。
Single.just("2A")????.map(v?->?Integer.parseInt(v,?10))
????.onErrorReturn(error?->?{
????????if?(error?instanceof?NumberFormatException)?return?0;
????????else?throw?new?IllegalArgumentException();
????})
????.subscribe(
????????System.out::println,
????????error?->?System.err.println("onError?should?not?be?printed!"));
//?prints?0
2、onErrorReturnItem
與onErrorReturn類似,onErrorReturnItem不對錯誤進行處理,直接返回一個值。
Single.just("2A")????.map(v?->?Integer.parseInt(v,?10))
????.onErrorReturnItem(0)
????.subscribe(
????????System.out::println,
????????error?->?System.err.println("onError?should?not?be?printed!"));
//?prints?0
3、onExceptionResumeNext
可作用于Flowable、Observable、Maybe。onErrorReturn發生異常時,回調onComplete()函數后不再往下執行,而onExceptionResumeNext則是要在處理異常的時候返回一個數據源,然后繼續執行,如果返回null,則調用觀察者的onError()函數。
Observable.create((ObservableOnSubscribe)?e?->?{????????????e.onNext(1);
????????????e.onNext(2);
????????????e.onNext(3);
????????????e.onError(new?NullPointerException());
????????????e.onNext(4);
????????})
????????????????.onErrorResumeNext(throwable?->?{
????????????????????Log.d(TAG,?"onErrorResumeNext?");return?Observable.just(4);
????????????????})
????????????????.subscribe(new?Observer()?{@Overridepublic?void?onSubscribe(Disposable?d)?{
????????????????????????Log.d(TAG,?"onSubscribe?");
????????????????????}@Overridepublic?void?onNext(Integer?integer)?{
????????????????????????Log.d(TAG,?"onNext?"?+?integer);
????????????????????}@Overridepublic?void?onError(Throwable?e)?{
????????????????????????Log.d(TAG,?"onError?");
????????????????????}@Overridepublic?void?onComplete()?{
????????????????????????Log.d(TAG,?"onComplete?");
????????????????????}
????????????????});
結果:
onExceptionResumeNext操作符也是類似的,只是捕獲Exception。
4、retry
可作用于所有的數據源,當發生錯誤時,數據源重復發射item,直到沒有異常或者達到所指定的次數。
boolean?first=true;Observable.create((ObservableOnSubscribe)?e?->?{
????????????e.onNext(1);
????????????e.onNext(2);if?(first){
????????????????first=false;
????????????????e.onError(new?NullPointerException());
????????????}
????????})
????????????????.retry(9)
????????????????.subscribe(new?Observer()?{@Overridepublic?void?onSubscribe(Disposable?d)?{
????????????????????????Log.d(TAG,?"onSubscribe?");
????????????????????}@Overridepublic?void?onNext(Integer?integer)?{
????????????????????????Log.d(TAG,?"onNext?"?+?integer);
????????????????????}@Overridepublic?void?onError(Throwable?e)?{
????????????????????????Log.d(TAG,?"onError?");
????????????????????}@Overridepublic?void?onComplete()?{
????????????????????????Log.d(TAG,?"onComplete?");
????????????????????}
????????????????});
結果:
5、retryUntil
作用于Flowable、Observable、Maybe。與retry類似,但發生異常時,返回值是false表示繼續執行(重復發射數據),true不再執行,但會調用onError方法。
?Observable.create((ObservableOnSubscribe)?e?->?{????????????e.onNext(1);
????????????e.onNext(2);
????????????e.onError(new?NullPointerException());
????????????e.onNext(3);
????????????e.onComplete();
????????})
????????????????.retryUntil(()?->?true)
????????????????.subscribe(new?Observer()?{@Overridepublic?void?onSubscribe(Disposable?d)?{
????????????????????????Log.d(TAG,?"onSubscribe?");
????????????????????}@Overridepublic?void?onNext(Integer?integer)?{
????????????????????????Log.d(TAG,?"onNext?"?+?integer);
????????????????????}@Overridepublic?void?onError(Throwable?e)?{
????????????????????????Log.d(TAG,?"onError?");
????????????????????}@Overridepublic?void?onComplete()?{
????????????????????????Log.d(TAG,?"onComplete?");
????????????????????}
????????????????});
結果:
retryWhen與此類似,但其判斷標準不是BooleanSupplier對象的getAsBoolean()函數的返回值。而是返回的 Observable或Flowable是否會發射異常事件。
總結
太多操作符太累了,看得心好累。還是根據實際開發需要查閱文檔才是正確的姿勢。
本文只是RxJava冰山一角,更多請參閱官網。
參閱官網
https://github.com/ReactiveX/RxJava
好東西要分享
https://github.com/Android-XXM/XXM-BLOG/blob/master/README.md
如果你看到了這,點個贊,收下我雙膝。如果文章有誤,幫忙指正,謝謝大佬們。
最后推薦一下我做的網站,玩Android:?wanandroid.com?,包含詳盡的知識體系、好用的工具,還有本公眾號文章合集,歡迎體驗和收藏!
推薦閱讀:
直面底層:“吹上天”的協程,帶你深入源碼分析關于Android 抓包 與 反抓包直面底層:你真的了解 View.post() 原理嗎?掃一掃?關注我的公眾號
如果你想要跟大家分享你的文章,歡迎投稿~
┏(^0^)┛明天見!
總結
以上是生活随笔為你收集整理的retrofit content-length为0_大佬们,一波RxJava 3.0来袭,请做好准备~的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python读取word指定内容_pyt
- 下一篇: python画一个点_pygame学习笔