RxJava 2.x 教程
這可能是最好的 RxJava 2.x 入門教程系列專欄
文章鏈接:
這可能是最好的RxJava 2.x 入門教程(一)
這可能是最好的RxJava 2.x 入門教程(二)
這可能是最好的RxJava 2.x 入門教程(三)
這可能是最好的RxJava 2.x 入門教程(四)
這可能是最好的RxJava 2.x 入門教程(五)
GitHub 代碼同步更新:https://github.com/nanchen2251/RxJava2Examples
為了滿足大家的饑渴難耐,GitHub將同步更新代碼,主要包含基本的代碼封裝,RxJava 2.x所有操作符應用場景介紹和實際應用場景,后期除了RxJava可能還會增添其他東西,總之,GitHub上的Demo專為大家傾心打造。傳送門:https://github.com/nanchen2251/RxJava2Examples
為什么要學 RxJava?
提升開發效率,降低維護成本一直是開發團隊永恒不變的宗旨。近兩年來國內的技術圈子中越來越多的開始提及 RxJava ,越來越多的應用和面試中都會有 RxJava ,而就目前的情況,Android 的網絡庫基本被 Retrofit + OkHttp 一統天下了,而配合上響應式編程 RxJava 可謂如魚得水。想必大家肯定被近期的 Kotlin 炸開了鍋,筆者也在閑暇之時去了解了一番(作為一個與時俱進的有理想的青年怎么可能不與時俱進?),發現其中有個非常好的優點就是簡潔,支持函數式編程。是的, RxJava 最大的優點也是簡潔,但它不止是簡潔,而且是** 隨著程序邏輯變得越來越復雜,它依然能夠保持簡潔 **(這貨潔身自好呀有木有)。
咳咳,要例子,猛戳這里:給 Android 開發者的 RxJava 詳解
?
什么是響應式編程
上面我們提及了響應式編程,不少新司機對它可謂一臉懵逼,那什么是響應式編程呢?響應式編程是一種基于異步數據流概念的編程模式。數據流就像一條河:它可以被觀測,被過濾,被操作,或者為新的消費者與另外一條流合并為一條新的流。
響應式編程的一個關鍵概念是事件。事件可以被等待,可以觸發過程,也可以觸發其它事件。事件是唯一的以合適的方式將我們的現實世界映射到我們的軟件中:如果屋里太熱了我們就打開一扇窗戶。同樣的,當我們的天氣app從服務端獲取到新的天氣數據后,我們需要更新app上展示天氣信息的UI;汽車上的車道偏移系統探測到車輛偏移了正常路線就會提醒駕駛者糾正,就是是響應事件。
今天,響應式編程最通用的一個場景是UI:我們的移動App必須做出對網絡調用、用戶觸摸輸入和系統彈框的響應。在這個世界上,軟件之所以是事件驅動并響應的是因為現實生活也是如此。
為什么出了一個系列后還有完結版?
RxJava 這些年可謂越來越流行,而在去年的晚些時候發布了2.0正式版。大半年已過,雖然網上已經出現了大部分的 RxJava 教程(其實細心的你還是會發現 1.x 的超級多),前些日子,筆者花了大約兩周的閑暇之時寫了 RxJava 2.x 系列教程,也得到了不少反饋,其中就有不少讀者覺得每一篇的教程太短,抑或是希望更多的側重適用場景的介紹,在這樣的大前提下,這篇完結版教程就此誕生,僅供各位新司機采納。
?
開始
RxJava 2.x 已經按照 Reactive-Streams specification 規范完全的重寫了,maven也被放在了io.reactivex.rxjava2:rxjava:2.x.y 下,所以 RxJava 2.x 獨立于 RxJava 1.x 而存在,而隨后官方宣布的將在一段時間后終止對 RxJava 1.x 的維護,所以對于熟悉 RxJava 1.x 的老司機自然可以直接看一下 2.x 的文檔和異同就能輕松上手了,而對于不熟悉的年輕司機,不要慌,本醬帶你裝逼帶你飛,馬上就發車,坐穩了:https://github.com/nanchen2251/RxJava2Examples
你只需要在 build.gradle 中加上:compile 'io.reactivex.rxjava2:rxjava:2.1.1'(2.1.1為寫此文章時的最新版本)
?
接口變化
RxJava 2.x 擁有了新的特性,其依賴于4個基礎接口,它們分別是
- Publisher
- Subscriber
- Subscription
- Processor
其中最核心的莫過于 Publisher 和 Subscriber。Publisher 可以發出一系列的事件,而 Subscriber 負責和處理這些事件。
其中用的比較多的自然是 Publisher 的 Flowable,它支持背壓。關于背壓給個簡潔的定義就是:
背壓是指在異步場景中,被觀察者發送事件速度遠快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發送速度的策略。
簡而言之,背壓是流速控制的一種策略。有興趣的可以看一下官方對于背壓的講解。
可以明顯地發現,RxJava 2.x 最大的改動就是對于 backpressure 的處理,為此將原來的 Observable 拆分成了新的 Observable 和 Flowable,同時其他相關部分也同時進行了拆分,但令人慶幸的是,是它,是它,還是它,還是我們最熟悉和最喜歡的 RxJava。
觀察者模式
大家可能都知道, RxJava 以觀察者模式為骨架,在 2.0 中依舊如此。
不過此次更新中,出現了兩種觀察者模式:
- Observable ( 被觀察者 ) / Observer ( 觀察者 )
-
Flowable (被觀察者)/ Subscriber (觀察者)
?
在 RxJava 2.x 中,Observable 用于訂閱 Observer,不再支持背壓(1.x 中可以使用背壓策略),而 Flowable 用于訂閱 Subscriber , 是支持背壓(Backpressure)的。
Observable
在 RxJava 1.x 中,我們最熟悉的莫過于 Observable 這個類了,筆者在剛剛使用 RxJava 2.x 的時候,創建了 一個 Observable,瞬間一臉懵逼有木有,居然連我們最最熟悉的 Subscriber 都沒了,取而代之的是 ObservableEmmiter,俗稱發射器。此外,由于沒有了Subscriber的蹤影,我們創建觀察者時需使用 Observer。而 Observer 也不是我們熟悉的那個 Observer,又出現了一個 Disposable 參數帶你裝逼帶你飛。
廢話不多說,從會用開始,還記得 RxJava 的三部曲嗎?
?
?
** 第一步:初始化 Observable **
** 第二步:初始化 Observer **
** 第三步:建立訂閱關系 **
?
Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {Log.e(TAG, "Observable emit 1" + "\n");e.onNext(1);Log.e(TAG, "Observable emit 2" + "\n");e.onNext(2);Log.e(TAG, "Observable emit 3" + "\n");e.onNext(3);e.onComplete();Log.e(TAG, "Observable emit 4" + "\n" );e.onNext(4);}}).subscribe(new Observer<Integer>() { // 第三步:訂閱// 第二步:初始化Observerprivate int i;private Disposable mDisposable;@Overridepublic void onSubscribe(@NonNull Disposable d) { mDisposable = d;}@Overridepublic void onNext(@NonNull Integer integer) {i++;if (i == 2) {// 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作,讓Observer觀察者不再接收上游事件mDisposable.dispose();}}@Overridepublic void onError(@NonNull Throwable e) {Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );}@Overridepublic void onComplete() {Log.e(TAG, "onComplete" + "\n" );}});不難看出,RxJava 2.x 與 1.x 還是存在著一些區別的。首先,創建 Observable 時,回調的是 ObservableEmitter ,字面意思即發射器,并且直接 throws Exception。其次,在創建的 Observer 中,也多了一個回調方法:onSubscribe,傳遞參數為Disposable,Disposable 相當于 RxJava 1.x 中的 Subscription, 用于解除訂閱。可以看到示例代碼中,在 i 自增到 2 的時候,訂閱關系被切斷。
07-03 14:24:11.663 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onSubscribe : false 07-03 14:24:11.664 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 1 07-03 14:24:11.665 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 1 07-03 14:24:11.666 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 2 07-03 14:24:11.667 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 2 07-03 14:24:11.668 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : isDisposable : true 07-03 14:24:11.669 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 3 07-03 14:24:11.670 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 4當然,我們的 RxJava 2.x 也為我們保留了簡化訂閱方法,我們可以根據需求,進行相應的簡化訂閱,只不過傳入對象改為了 Consumer。
Consumer 即消費者,用于接收單個值,BiConsumer 則是接收兩個值,Function 用于變換對象,Predicate 用于判斷。這些接口命名大多參照了 Java 8 ,熟悉 Java 8 新特性的應該都知道意思,這里也不再贅述。
線程調度
關于線程切換這點,RxJava 1.x 和 RxJava 2.x 的實現思路是一樣的。這里簡單的說一下,以便于我們的新司機入手。
subScribeOn
同 RxJava 1.x 一樣,subscribeOn 用于指定 subscribe() 時所發生的線程,從源碼角度可以看出,內部線程調度是通過 ObservableSubscribeOn來實現的。
@SchedulerSupport(SchedulerSupport.CUSTOM)public final Observable<T> subscribeOn(Scheduler scheduler) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));}ObservableSubscribeOn 的核心源碼在 subscribeActual 方法中,通過代理的方式使用 SubscribeOnObserver 包裝 Observer 后,設置 Disposable 來將 subscribe 切換到 Scheduler 線程中。
observeOn
observeOn 方法用于指定下游 Observer 回調發生的線程。
@SchedulerSupport(SchedulerSupport.CUSTOM)public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {ObjectHelper.requireNonNull(scheduler, "scheduler is null");ObjectHelper.verifyPositive(bufferSize, "bufferSize");return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));}線程切換需要注意的
RxJava 內置的線程調度器的確可以讓我們的線程切換得心應手,但其中也有些需要注意的地方。
- 簡單地說,subscribeOn() 指定的就是發射事件的線程,observerOn 指定的就是訂閱者接收事件的線程。
- 多次指定發射事件的線程只有第一次指定的有效,也就是說多次調用 subscribeOn() 只有第一次的有效,其余的會被忽略。
- 但多次指定訂閱者接收線程是可以的,也就是說每調用一次 observerOn(),下游的線程就會切換一次。
輸出:
07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1 07-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main 07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2實例代碼中,分別用 Schedulers.newThread() 和 Schedulers.io() 對發射線程進行切換,并采用 observeOn(AndroidSchedulers.mainThread() 和 Schedulers.io() 進行了接收線程的切換。可以看到輸出中發射線程僅僅響應了第一個 newThread,但每調用一次 observeOn() ,線程便會切換一次,因此如果我們有類似的需求時,便知道如何處理了。
RxJava 中,已經內置了很多線程選項供我們選擇,例如有:
- Schedulers.io() 代表io操作的線程, 通常用于網絡,讀寫文件等io密集型的操作;
- Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作;
- Schedulers.newThread() 代表一個常規的新線程;
- AndroidSchedulers.mainThread() 代表Android的主線程
這些內置的 Scheduler 已經足夠滿足我們開發的需求,因此我們應該使用內置的這些選項,而 RxJava 內部使用的是線程池來維護這些線程,所以效率也比較高。
操作符
關于操作符,在官方文檔中已經做了非常完善的講解,并且筆者前面的系列教程中也著重講解了絕大多數的操作符作用,這里受于篇幅限制,就不多做贅述,只挑選幾個進行實際情景的講解。
?
map
map 操作符可以將一個 Observable 對象通過某種關系轉換為另一個Observable 對象。在 2.x 中和 1.x 中作用幾乎一致,不同點在于:2.x 將 1.x 中的 Func1 和 Func2 改為了 Function 和 BiFunction。
采用 map 操作符進行網絡數據解析
想必大家都知道,很多時候我們在使用 RxJava 的時候總是和 Retrofit 進行結合使用,而為了方便演示,這里我們就暫且采用 OkHttp3 進行演示,配合 map,doOnNext ,線程切換進行簡單的網絡請求:
1)通過 Observable.create() 方法,調用 OkHttp 網絡請求;
2)通過 map 操作符集合 gson,將 Response 轉換為 bean 類;
3)通過 doOnNext() 方法,解析 bean 中的數據,并進行數據庫存儲等操作;
4)調度線程,在子線程中進行耗時操作任務,在主線程中更新 UI ;
5)通過 subscribe(),根據請求成功或者失敗來更新 UI 。
concat
?
concat 可以做到不交錯的發射兩個甚至多個 Observable 的發射事件,并且只有前一個 Observable 終止(onComplete) 后才會訂閱下一個 Observable。
?
采用 concat 操作符先讀取緩存再通過網絡請求獲取數據
想必在實際應用中,很多時候(對數據操作不敏感時)都需要我們先讀取緩存的數據,如果緩存沒有數據,再通過網絡請求獲取,隨后在主線程更新我們的UI。
concat 操作符簡直就是為我們這種需求量身定做。
利用 concat 的必須調用 onComplete 后才能訂閱下一個 Observable 的特性,我們就可以先讀取緩存數據,倘若獲取到的緩存數據不是我們想要的,再調用 onComplete() 以執行獲取網絡數據的 Observable,如果緩存數據能應我們所需,則直接調用 onNext(),防止過度的網絡請求,浪費用戶的流量。
Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {Log.e(TAG, "create當前線程:"+Thread.currentThread().getName() );FoodList data = CacheManager.getInstance().getFoodListData();// 在操作符 concat 中,只有調用 onComplete 之后才會執行下一個 Observableif (data != null){ // 如果緩存數據不為空,則直接讀取緩存數據,而不讀取網絡數據isFromNet = false;Log.e(TAG, "\nsubscribe: 讀取緩存數據:" );runOnUiThread(new Runnable() {@Overridepublic void run() {mRxOperatorsText.append("\nsubscribe: 讀取緩存數據:\n");}});e.onNext(data);}else {isFromNet = true;runOnUiThread(new Runnable() {@Overridepublic void run() {mRxOperatorsText.append("\nsubscribe: 讀取網絡數據:\n");}});Log.e(TAG, "\nsubscribe: 讀取網絡數據:" );e.onComplete();}}});Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list").addQueryParameter("rows",10+"").build().getObjectObservable(FoodList.class);// 兩個 Observable 的泛型應當保持一致Observable.concat(cache,network).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<FoodList>() {@Overridepublic void accept(@NonNull FoodList tngouBeen) throws Exception {Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );if (isFromNet){mRxOperatorsText.append("accept : 網絡獲取數據設置緩存: \n");Log.e(TAG, "accept : 網絡獲取數據設置緩存: \n"+tngouBeen.toString() );CacheManager.getInstance().setFoodListData(tngouBeen);}mRxOperatorsText.append("accept: 讀取數據成功:" + tngouBeen.toString()+"\n");Log.e(TAG, "accept: 讀取數據成功:" + tngouBeen.toString());}}, new Consumer<Throwable>() {@Overridepublic void accept(@NonNull Throwable throwable) throws Exception {Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() );Log.e(TAG, "accept: 讀取數據失敗:"+throwable.getMessage() );mRxOperatorsText.append("accept: 讀取數據失敗:"+throwable.getMessage()+"\n");}});有時候我們的緩存可能還會分為 memory 和 disk ,實際上都差不多,無非是多寫點 Observable ,然后通過 concat 合并即可。
flatMap 實現多個網絡請求依次依賴
想必這種情況也在實際情況中比比皆是,例如用戶注冊成功后需要自動登錄,我們只需要先通過注冊接口注冊用戶信息,注冊成功后馬上調用登錄接口進行自動登錄即可。
我們的 flatMap 恰好解決了這種應用場景,flatMap 操作符可以將一個發射數據的 Observable 變換為多個 Observables ,然后將它們發射的數據合并后放到一個單獨的 Observable,利用這個特性,我們很輕松地達到了我們的需求。
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list").addQueryParameter("rows", 1 + "").build().getObjectObservable(FoodList.class) // 發起獲取食品列表的請求,并解析到FootList.subscribeOn(Schedulers.io()) // 在io線程進行網絡請求.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理獲取食品列表的請求結果.doOnNext(new Consumer<FoodList>() {@Overridepublic void accept(@NonNull FoodList foodList) throws Exception {// 先根據獲取食品列表的響應結果做一些操作Log.e(TAG, "accept: doOnNext :" + foodList.toString());mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");}}).observeOn(Schedulers.io()) // 回到 io 線程去處理獲取食品詳情的請求.flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {@Overridepublic ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show").addBodyParameter("id", foodList.getTngou().get(0).getId() + "").build().getObjectObservable(FoodDetail.class);}return null;}}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<FoodDetail>() {@Overridepublic void accept(@NonNull FoodDetail foodDetail) throws Exception {Log.e(TAG, "accept: success :" + foodDetail.toString());mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");}}, new Consumer<Throwable>() {@Overridepublic void accept(@NonNull Throwable throwable) throws Exception {Log.e(TAG, "accept: error :" + throwable.getMessage());mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");}});善用 zip 操作符,實現多個接口數據共同更新 UI
在實際應用中,我們極有可能會在一個頁面顯示的數據來源于多個接口,這時候我們的 zip 操作符為我們排憂解難。
zip 操作符可以將多個 Observable 的數據結合為一個數據源再發射出去。
Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512").build().getObjectObservable(MobileAddress.class);Observable<CategoryResult> observable2 = Network.getGankApi().getCategoryData("Android",1,1);Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {@Overridepublic String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {return "合并后的數據為:手機歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {@Overridepublic void accept(@NonNull String s) throws Exception {Log.e(TAG, "accept: 成功:" + s+"\n");}}, new Consumer<Throwable>() {@Overridepublic void accept(@NonNull Throwable throwable) throws Exception {Log.e(TAG, "accept: 失敗:" + throwable+"\n");}});采用 interval 操作符實現心跳間隔任務
想必即時通訊等需要輪訓的任務在如今的 APP 中已是很常見,而 RxJava 2.x 的 interval 操作符可謂完美地解決了我們的疑惑。
這里就簡單的意思一下輪訓。
private Disposable mDisposable;@Overrideprotected void doSomething() {mDisposable = Flowable.interval(1, TimeUnit.SECONDS).doOnNext(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {Log.e(TAG, "accept: doOnNext : "+aLong );}}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {@Overridepublic void accept(@NonNull Long aLong) throws Exception {Log.e(TAG, "accept: 設置文本 :"+aLong );mRxOperatorsText.append("accept: 設置文本 :"+aLong +"\n");}});}/*** 銷毀時停止心跳*/@Overrideprotected void onDestroy() {super.onDestroy();if (mDisposable != null){mDisposable.dispose();}}RxJava 1.x 如何平滑升級到 RxJava 2.x?
由于 RxJava 2.x 變化較大無法直接升級,幸運的是,官方為我們提供了 RxJava2Interrop 這個庫,可以方便地把 RxJava 1.x 升級到 RxJava 2.x,或者將 RxJava 2.x 轉回到 RxJava 1.x。
寫在最后
本醬看你都看到這兒了,實為未來的棟梁之才,所以且送你一本經書:
https://github.com/nanchen2251/RxJava2Examples
?
GIF.gif
來源:https://www.jianshu.com/p/0cd258eecf60
總結
以上是生活随笔為你收集整理的RxJava 2.x 教程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 缓存穿透,瞬间并发,缓存雪崩的解决方法
- 下一篇: TCP的状态转换及生产问题实操