生活随笔
收集整理的這篇文章主要介紹了
Rx2.0后台开发分享
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Rxjava2.x
微軟的一個函數庫,Rx是一個編程模型,模板是提供一致的編程接口,幫助開發者更方便的處理異步數據流,現在Rx已經支持幾乎全部的流行編程語言。比較流行的有Rxjava,RxJs,Rx.NET,社區網站:http://reactivex.io/ Rx使用觀察者模式 使用觀察者模式監聽:RX可以訂閱任何可觀察的數據流并且執行操作 組合:RX使用查詢式的操作符和變換數據流 創建:Rx可以方便的創建事件流和數據流 簡化代碼 函數式風格:對可觀察數據流使用無副作用的輸入輸出函數,避免程序中錯綜復雜的狀態 簡化代碼:Rx的操作符通常可以將復雜難題簡單化為很少的代碼 異步錯誤處理: 傳統的try/catch沒有辦法處理異步計算,Rx提供了合適的錯誤處理機制 輕松使用并發:Rx的Observables和Schedulers讓開發者可以擺脫底層線程同步和各種并發問題。 jar包maven倉庫地址
< dependency> < groupId> io.reactivex.rxjava2
</ groupId> < artifactId> rxjava
</ artifactId> < version> 2.2.6
</ version>
</ dependency>
一個簡單的例子:
public void myTestObservable ( ) { Observable . fromIterable ( Lists . newArrayList ( 1 , 2 , 3 , 4 , 5 ) ) . filter ( integer -> { return integer > 2 ; } ) . subscribe ( integer -> { System . out. println ( Thread . currentThread ( ) . getName ( ) + " : " + integer ) ; } ) ; }
在Rxjava中,一個實現了Observer接口的對象可以訂閱(subscribe)一個Observable類的實例。訂閱者(Subscriber)對Observable發射(emit)的任何數據或數據序列作出響應。這種模式簡化了并發的操作,因為他不需要阻塞等待Observable發射數據,而是創建一個處于待命狀態的觀察者哨兵,哨兵在未來某個時刻響應Observable的通知。
Observable
如上圖是Observable發射數據的一個流程圖, 時間線 左邊 ----右邊, 各個不同形狀標識Observable上的元素 最后垂直線表示Observable成功執行 向下虛線箭標識數據被發射出去 盒子表示各種操作符來對對應數據進行處理 第二條時間線也是一個Observable,不過是轉換之后的 當轉換時候出現錯誤將會并不會終止,他會用一個error事件替代這個位置 Subscribe
觀察者通過SubScribe操作關聯Observable Observer
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
onNext事件,被觀察者每發射一個數據都會調onNext事件,相當于異步任務中的回調接口,相當于Feature的get獲取,只不過onNext不是阻塞的就是一個哨兵模式,每次發射數據,獲取立即獲取對應結果,然后執行之后的邏輯 onCompleted事件,表示事件隊列完成,Rxjava不僅把每一個事件單獨處理,還會把他看做一個隊列,Rxjava規定,當不會再發射新的元素觸發onNext事件時候,需要觸發onCompleted事件作為結束標志。 onError事件,事件隊列處理中出現異常時候,onError會被觸發,可以在onError中統一處理異常情況 onSubScribe事件,表示元素開始發射,相當于所有元素執行之前的一個預處理位置。 Schedulers
默認情況下Observable和Observer執行過程是在同一個線程執行如上面最簡單例子,如果想要切換線程在不同線程執行可以用SubscribeOn(),observeOn()。 Rxjava提供了幾種線調度器
調度器類型效果 Schedulers.newThread(); 為每一個任務創建一個新線程 Schedulers.computation(); 用于計算任務,如時間循環和回調處理,不要用于IO操作,默認線程數等于處理器數量 Schedulers.io(); 用于IO密集型任務,如異步阻塞IO操作,這個調度器的線程池會根據需要增長;對應普通計算任務,一般用上面這個,Schedulers.io默認是一個CachedThreadScheduler,很像一個有線程緩存的新線程調度器。 Schedulers.single(); 擁有一個線程單例的線程池,所有任務都在這一個線程中執行,當線程中有任務執行時候,他的任務將會按照先進先出的順序依次執行。 Schedulers.trampoline(); Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes. 在當前線程立即執行任務,如果當前線程有任務在執行,則會將其暫停,等插入進來的任務執行完之后,再將未完成的任務接著執行。
Observable . create ( new ObservableOnSubscribe < Integer > ( ) { @Override public void subscribe ( ObservableEmitter < Integer > emitter
) throws Exception { for ( Integer integer
: intList
) { System . out
. println ( Thread . currentThread ( ) . getName ( ) + " : send" ) ; emitter
. onNext ( integer
) ; } emitter
. onComplete ( ) ; } } ) . subscribeOn ( Schedulers . computation ( ) ) . observeOn ( Schedulers . newThread ( ) ) . flatMap ( new Function < Integer , ObservableSource < ? > > ( ) { @Override public ObservableSource < ? > apply ( Integer integer
) throws Exception { return Observable . just ( integer
) . subscribeOn ( Schedulers . computation ( ) ) . filter ( new Predicate < Integer > ( ) { @Override public boolean test ( Integer integer
) throws Exception { System . out
. println ( Thread . currentThread ( ) . getName ( ) + ": filter one integer: " + integer
) ; return integer
> 2 ; } } ) ; } } ) . observeOn ( Schedulers . io ( ) ) . subscribe ( new Consumer < Object > ( ) { @Override public void accept ( Object o
) throws Exception { System . out
. println ( Thread . currentThread ( ) . getName ( ) + " : onSubscribe" ) ; } } ) ;
這個代碼看起來比較長,也可以這么寫:
Observable . create ( emitter
-> { intList
. forEach ( intTemp
-> emitter
. onNext ( intTemp
) ) ; emitter
. onComplete ( ) ; } ) . subscribeOn ( Schedulers . computation ( ) ) . observeOn ( Schedulers . computation ( ) ) . flatMap ( intStr
-> Observable . just ( intStr
) . subscribeOn ( Schedulers . computation ( ) ) . filter ( filterInt
-> Integer . valueOf ( filterInt
. toString ( ) ) > 2 ) ) . observeOn ( Schedulers . computation ( ) ) . subscribe ( intTemp
-> System . out
. println ( intTemp
) ) ;
第一個subscribeOn指定被觀察對象發射的線程,使用的computation模型 第一個observeOn指定之后的flatMap操作符切換到另外線程中執行 最后的observeOn指定觀察者哨兵消費數據的線程,會有如下結果
Observable的這種異步切換線程方式從整體流程來看還是同步的方式,他必須先Observable發射數據-----操作符change-----消費數據并不是每次發射一個數據的同時進行change接著消費的并行實現,因此Rxjava提供了另外一個并行的方式,如下案例
public static void flowableDemo ( ) throws InterruptedException { Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( FlowableEmitter < Integer > emitter
) throws Exception { for ( int i
= 1 ; i
< 100 ; i
++ ) { System . out
. println ( Thread . currentThread ( ) . getName ( ) + " 發射數據" ) ; emitter
. onNext ( i
) ; } emitter
. onComplete ( ) ; } } , BackpressureStrategy . ERROR
) . subscribeOn ( Schedulers . io ( ) ) . observeOn ( Schedulers . newThread ( ) ) . filter ( new Predicate < Integer > ( ) { @Override public boolean test ( Integer integer
) throws Exception { System . out
. println ( Thread . currentThread ( ) . getName ( ) + " 過濾發射數據" ) ; return integer
> 0 ; } } ) . observeOn ( Schedulers . newThread ( ) ) . subscribe ( new Subscriber < Object > ( ) { public void onSubscribe ( Subscription subscription
) { System . out
. println ( "取出n條數據" ) ; subscription
. request ( 3 ) ; } public void onNext ( Object o
) { try { Thread . sleep ( 2000 ) ; } catch ( InterruptedException e
) { e
. printStackTrace ( ) ; } System . out
. println ( Thread . currentThread ( ) . getName ( ) ) ; System . out
. println ( "消費數據:" + o
) ; } public void onError ( Throwable throwable
) { System . out
. println ( throwable
. getMessage ( ) ) ; } public void onComplete ( ) { System . out
. println ( "onComplete" ) ; } } ) ; }
用Flowable不僅僅是對每個模塊進行了線程的切換,在同時也是并行的執行了整個流程 我覺得在異步編程方面Rxjava的確比原始的Thread ,Runnable這類操作要方便的多,通過幾個操作符就可以達到異步的目的,這也是Rxjava的一個優勢,而在我們工作中我們一般都是用架構組的異步框架也可以做到很優雅的異步編程,比起Rxjava而言只是會多創建一個異步類而已,那么我們來對比一下兩種異步操作,我用之前的批量關注接口做測試 原來的異步方式:
public void batchFollowPush ( UserInfo userInfo
, List < UserInfo > objectUserInfos
, Integer source
) { List < Boolean > batchFollowResult
= new ArrayList < > ( ) ; Future < Boolean > pushFuture
= null ; for ( UserInfo objectUserInfo
: objectUserInfos
) { try { pushFuture
= ( Future < Boolean > ) pushAsync
. asyncFollowAndStoreMoment ( userInfo
, objectUserInfo
, source
) ; batchFollowResult
. add ( pushFuture
. get ( ) ) ; } catch ( Exception e
) { if ( pushFuture
!= null ) { pushFuture
. cancel ( true ) ; } throw new RuntimeException ( e
) ; } } log
. info ( "batchFollow result:{}" , JSON
. toJSONString ( batchFollowResult
) ) ; } public Object asyncFollowAndStoreMoment ( UserInfo userInfo
, UserInfo objectUserInfo
, Integer source
) { Optional < InteractError > interactErrorOptional
= personFacade
. interactRestrict ( userInfo
, objectUserInfo
) ; if ( interactErrorOptional
. isPresent ( ) ) { return Boolean . FALSE
; } int result
= wooerFacade
. addOrUpdatePush2 ( userInfo
. getMemberID ( ) , objectUserInfo
. getMemberID ( ) , 1 , source
, HeadUaUtils . getPlatform ( ) ) ; Boolean isTrue
= result
> 0 ; if ( isTrue
== null || ! isTrue
) { return Boolean . FALSE
; } limitedPushFacade
. followPush ( userInfo
, objectUserInfo
) ; MomentListStoreParam momentListStoreParam
= new MomentListStoreParam ( Arrays . asList ( Long . valueOf ( objectUserInfo
. getMemberID ( ) ) ) , userInfo
. getMemberID ( ) . longValue ( ) ) ; log
. info ( "batchFollow afterFilter param:{}" , JSON
. toJSONString ( momentListStoreParam
) ) ; momentManagerFacade
. storeMomentMongoByMomentId ( momentListStoreParam
) ; return Boolean . TRUE
; }
public IResponse batchFollowRx ( BatchFollowForm batchFollowForm
, UserInfo myUserInfo
) { log
. info ( "batchFollow param:{}" , JSON
. toJSONString ( batchFollowForm
) ) ; BusinessAssert . isNotNull ( batchFollowForm
, CommonError . ARGS_EMPTY
) ; List < Long > objectIDs
= batchFollowForm
. getObjectIDs ( ) ; List < UserInfo > objectUserInfo
= coreUserService
. getList ( objectIDs
, UserInfo . class ) ; Flowable . create ( new FlowableOnSubscribe < UserInfo > ( ) { @Override public void subscribe ( FlowableEmitter < UserInfo > emitter
) throws Exception { for ( UserInfo info
: objectUserInfo
) { emitter
. onNext ( info
) ; } } } , BackpressureStrategy . ERROR
) . parallel ( ) . runOn ( Schedulers . io ( ) ) . filter ( userInfo
-> { Optional < InteractError > interactErrorOptional
= personFacade
. interactRestrict ( myUserInfo
, userInfo
) ; if ( interactErrorOptional
. isPresent ( ) ) { return Boolean . FALSE
; } Boolean isTrue
= wooerFacade
. addOrUpdatePush2 ( myUserInfo
. getMemberID ( ) , userInfo
. getMemberID ( ) , 1 , batchFollowForm
. getSource ( ) , HeadUaUtils . getPlatform ( ) ) > 0 ; if ( isTrue
== null || ! isTrue
) { return Boolean . FALSE
; } return Boolean . TRUE
; } ) . runOn ( Schedulers . computation ( ) ) . sequential ( ) . subscribe ( new Consumer < UserInfo > ( ) { @Override public void accept ( UserInfo userInfo
) throws Exception { limitedPushFacade
. followPush ( myUserInfo
, userInfo
) ; MomentListStoreParam momentListStoreParam
= new MomentListStoreParam ( Arrays . asList ( Long . valueOf ( userInfo
. getMemberID ( ) ) ) , userInfo
. getMemberID ( ) . longValue ( ) ) ; log
. info ( "batchFollow afterFilter param:{}" , JSON
. toJSONString ( momentListStoreParam
) ) ; momentManagerFacade
. storeMomentMongoByMomentId ( momentListStoreParam
) ; } } , new Consumer < Throwable > ( ) { @Override public void accept ( Throwable throwable
) throws Exception { log
. error ( "batch follow error exception:{}" , throwable
. getMessage ( ) ) ; } } ) ; return MessageResponse . success ( FOLLOW_SUCCESS
) ; }
public static void oomDemo ( ) { Observable . create ( new ObservableOnSubscribe < Integer > ( ) { @Override public void subscribe ( ObservableEmitter < Integer > emitter
) throws Exception { for ( int i
= 0 ; ; i
++ ) { System . out
. println ( Thread . currentThread ( ) . getName ( ) + " onNext : " + i
) ; emitter
. onNext ( i
) ; } } } ) . subscribeOn ( Schedulers . io ( ) ) . observeOn ( Schedulers . single ( ) ) . subscribe ( new Consumer < Integer > ( ) { @Override public void accept ( Integer integer
) throws Exception { Thread . sleep ( 2000 ) ; System . out
. println ( Thread . currentThread ( ) . getName ( ) + " consumer : " + integer
) ; } } ) ; }
讓發射數據在多個線程中執行,讓消費數據在一個線程中執行并且每兩秒才消費一個,這樣會導致發射的數據不斷的累積在內存中,最終可能會導致oom,我們通過內存信息來看他執行之后一段時間的堆內存信息
PSYoiungGen 年輕態區,總共1024k,使用了511k eden區域是新對象區,已經被沾滿 from和to區域 大學是一樣,在gc時候會遍歷from或者to區域,將不能清除的拷貝到另外一個區,然后清除本區域留下的,然后循環 paroldGen 老年代區域也已經被占滿 這種狀態下Observable因內存不夠已經oom,停止運行了,只有消費線程在消費數據。
io. reactivex. exceptions. UndeliverableException: The exception could not be delivered
to the consumer because it has already canceled
/ disposed the flow or the exception has nowhere
to go to begin with. Further reading
: https
: / / github
. com
/ ReactiveX / RxJava / wiki
/ What 's
- different
- in
- 2.0 #error
- handling
| java. lang. OutOfMemoryError: GC overhead limit exceededat
io. reactivex. plugins. RxJavaPlugins. onError ( RxJavaPlugins . java
: 367 ) at
io. reactivex. internal. schedulers. ScheduledRunnable. run ( ScheduledRunnable . java
: 69 ) at
io. reactivex. internal. schedulers. ScheduledRunnable. call ( ScheduledRunnable . java
: 57 ) at
java. util. concurrent. FutureTask. run ( FutureTask . java
: 266 ) at
java. util. concurrent. ScheduledThreadPoolExecutor$
ScheduledFutureTask . access$
201 ( ScheduledThreadPoolExecutor . java
: 180 ) at
java. util. concurrent. ScheduledThreadPoolExecutor$
ScheduledFutureTask . run ( ScheduledThreadPoolExecutor . java
: 293 )
我們用一個Flowable的例子來看他如何解決這個oom的問題:
public static void oomDemoFix ( ) { Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( FlowableEmitter < Integer > emitter
) throws Exception { for ( int i
= 0 ; ; i
++ ) { System . out
. println ( Thread . currentThread ( ) . getName ( ) + " onNext : " + i
) ; emitter
. onNext ( i
) ; } } } , BackpressureStrategy . ERROR
) . subscribeOn ( Schedulers . io ( ) ) . observeOn ( Schedulers . io ( ) ) . subscribe ( new Subscriber < Integer > ( ) { @Override public void onSubscribe ( Subscription subscription
) { subscription
. request ( 50 ) ; } @Override public void onNext ( Integer integer
) { System . out
. println ( Thread . currentThread ( ) . getName ( ) + "消費數據: " + integer
) ; } @Override public void onError ( Throwable throwable
) { } @Override public void onComplete ( ) { } } ) ; }
我們在創建Flowable的時候增加了一個參數 BackpressureStrategy.ERROR,這個參數指定了在處理背壓問題時候執行的一個策略,當內存滿時候接下來繼續發射的數據將會拋出MissingBackpressureException 異常,其余的策略稍等介紹 還有另外一個不同 onSubscribe中傳遞的不是Disposable變成了Subscription,而且還執行了這句代碼subscription.request(50)。因為在Flowable中采用了一個新的思路,響應獲取發射數據的方法來解決流量不均勻而造成的oom的問題,也就是我要消費多少我就取多少,這里我們從發射數據中取出了50條。其他的還是會存儲在內存中。
Flowable中主要有這幾個策略
BackpressureStrategy.ERROR:如果緩存池(默認128)溢出會立刻拋異常MissingBackpressureexception BackpressureStrategy.BUFFER:RxJava中有一個默認能夠存儲128個事件的緩存池,可以調節大小,生產者生產事件,并且將處理不了的事件緩存。(謹慎使用,因為依賴對消費者消費能力,耗內存) BackpressureStrategy.DROP:消費不了就丟棄,比如先生產200個,并沒有消費,而是在緩存,然后消費者request(200),但是緩存只有128個,所以其他的事件都丟棄了。 BackpressureStrategy.LATEST:和DROP功能基本一致,處理不了丟棄,區別在于LATEST能夠讓消費者接收到生產者產生的最后一個事件。 BackpressureStrategy.MISSING:生產的事件沒有進行緩存和丟棄,下游接收到的事件必須進行消費或者處理!
感覺這些都是緩兵之計,能否按照我的消費能力來發射數據呢,這樣才完美。
Rxjava2.x后有一個FlowableEmitter 這個接口:
public static void fix ( Flowable flowable
) { flowable
. observeOn ( Schedulers . computation ( ) ) . subscribe ( new Subscriber < Integer > ( ) { @Override public void onSubscribe ( Subscription subscription
) { subscription
. request ( 20 ) ; } @Override public void onNext ( Integer integer
) { System . out
. println ( "消費數據: " + 100 ) ; } @Override public void onError ( Throwable throwable
) { } @Override public void onComplete ( ) { } } ) ; } public static Flowable flowableEmitterDemo ( ) { Flowable flowable
= Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( FlowableEmitter < Integer > emitter
) throws Exception { while ( emitter
. requested ( ) > 0 ) { System . out
. println ( "下游處理能力:" + emitter
. requested ( ) ) ; emitter
. onNext ( 100 ) ; } } } , BackpressureStrategy . ERROR
) . subscribeOn ( Schedulers . io ( ) ) ; return flowable
; }
我可以在Flowable發射數據之前通過requested來獲取下游Subscriber的消費能力,依據這個來進行數據的發射,這樣既可以控制發射以及消費數據的速度,也能夠避免數據的丟失
現在我們看下開始時候從官網摘抄的Rx的幾個優點:
首先函數式風格,這種編程模式和常規的方式比較的確簡化了不少代碼比如第一個案例,但是感覺我們用stream表達式加上lambda也可以達到這種效果,而且對于map,flatmap,filter等這些操作符對于沒有函數式編程的人來說并不好理解不覺得這是優勢
簡化代碼,這點主要體現在異步編程模式時候,不管和我們java中的異步編程用的Thread和Runnable相比,還是我們框架中的異步編程框架比較的確代碼都更加簡單,只需要通過幾個異步線程切換的操作符便可以達到目的,但是缺點也很明顯,需要引入新的jar,新的技術對不熟悉這塊技術的同事并不友好有一定學習成本不利于維護。
異步錯誤處理,輕松使用并發:通過onError捕獲異常信息,通過操作法切換線程,的確也是優勢所在。
在之前的實踐中還有這種業務模型下使用Rxjava會更具優勢,當我們需要從多個網絡環境來獲取各自信息從中篩選出符合我們預期的并對其進行組合,我們可以通過Rxjava的豐富的操作符以及異步操作來完成。來一個簡單的例子
public static Flowable getIOData1 ( ) { return Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( @NonNull FlowableEmitter < Integer > flowableEmitter
) throws Exception { for ( int i
= 0 ; i
< 10 ; i
++ ) { flowableEmitter
. onNext ( i
) ; } System . out
. println ( Thread . currentThread ( ) . getName ( ) ) ; } } , BackpressureStrategy . DROP
) . subscribeOn ( Schedulers . io ( ) ) . filter ( temp
-> temp
> 2 ) ; } public static Flowable getIOData2 ( ) { return Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( @NonNull FlowableEmitter < Integer > flowableEmitter
) throws Exception { for ( int i
= 10 ; i
< 21 ; i
++ ) { flowableEmitter
. onNext ( i
) ; } System . out
. println ( Thread . currentThread ( ) . getName ( ) ) ; } } , BackpressureStrategy . DROP
) . subscribeOn ( Schedulers . io ( ) ) . filter ( temp
-> temp
> 12 ) ; } public static Flowable getIOData3 ( ) { return Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( @NonNull FlowableEmitter < Integer > flowableEmitter
) throws Exception { for ( int i
= 20 ; i
< 30 ; i
++ ) { flowableEmitter
. onNext ( i
) ; } System . out
. println ( Thread . currentThread ( ) . getName ( ) ) ; } } , BackpressureStrategy . DROP
) . subscribeOn ( Schedulers . io ( ) ) . filter ( temp
-> temp
> 22 ) ; } public static Flowable getIOData4 ( ) { return Flowable . create ( new FlowableOnSubscribe < Integer > ( ) { @Override public void subscribe ( @NonNull FlowableEmitter < Integer > flowableEmitter
) throws Exception { for ( int i
= 30 ; i
< 41 ; i
++ ) { flowableEmitter
. onNext ( i
) ; } System . out
. println ( Thread . currentThread ( ) . getName ( ) ) ; } } , BackpressureStrategy . DROP
) . subscribeOn ( Schedulers . io ( ) ) . filter ( temp
-> temp
> 32 ) ; } public static void mergeDemo ( ) { Flowable . merge ( getIOData1 ( ) , getIOData2 ( ) , getIOData3 ( ) , getIOData4 ( ) ) . map ( temp
-> "onNext" + temp
) . flatMap ( new Function ( ) { @Override public Object apply ( @NonNull Object o
) throws Exception { return Flowable . just ( o
) . subscribeOn ( Schedulers . io ( ) ) ; } } ) . subscribeOn ( Schedulers . newThread ( ) ) . observeOn ( Schedulers . computation ( ) ) . subscribe ( new Subscriber ( ) { @Override public void onSubscribe ( Subscription subscription
) { subscription
. request ( Long . MAX_VALUE
) ; } @Override public void onNext ( Object o
) { System . out
. println ( "onNext: " + o
) ; } @Override public void onError ( Throwable throwable
) { } @Override public void onComplete ( ) { } } ) ; }
我們定義N個Flowable用異步方式分別請求各個第三方接口來獲取對應的數據并且用filter過濾出我們需要的信息,然后通過merge操作法將所有獲取到的數據組合到同一個Flowable中,進行統一的封裝處理以及之后的一些業務操作。 如果我們用傳統的方式,我們不得不定義N個變量來獲取四個異步線程數據,然后等都獲取完畢之后,在分別對四個變量中保存的信息進行篩選,之后通過邏輯操作合并到一起,和rxjava相比顯然要遜色很多。 這種方式就是Flowable通過內置操作符對自身發射的數據在空間維度上重新組織,或者和其他Flowable發射的數據一起在空間維度上進行重新組織,是的觀察者的邏輯變得更加的簡單直觀因為直接看操作符就能知道具體做了什么,不需要關心數據從哪里來這部分由Flowable屏蔽了,從而使得觀察者更加專注于下游邏輯。
RxJava的響應式優勢只有在異步邏輯占主導時才會體現出來.
wiki地址:https://github.com/ReactiveX/RxJava/wiki reactivex官網:http://reactivex.io/
創作挑戰賽 新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
總結
以上是生活随笔 為你收集整理的Rx2.0后台开发分享 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。