渴望订阅– RxJava常见问题解答
在教學和指導RxJava以及撰寫本書之后 ,我注意到某些領(lǐng)域尤其成問題。 我決定發(fā)布一些簡短的提示,以解決最常見的陷阱。 這是第一部分。
Observable和Flowable本質(zhì)上是惰性的。 這意味著無論您在Flowable放置了多么繁瑣或長時間運行的邏輯,僅當有人訂閱時,它才會被評估。 并且還有某人訂閱的次數(shù)。 下面的代碼段對此進行了說明:
這樣的Observable或Flowable將不可避免地打印:
19:37:57.368 [main] - Created 19:37:57.379 [main] - Running 19:37:58.383 [main] - Running 19:37:59.388 [main] - Done請注意,您需要兩次支付sleep()的價格sleep()兩次訂閱)。 此外,所有邏輯都在客戶端( main )線程中運行,除非通過subscriptionOn subscribeOn()請求或異步流隱式可用,否則RxJava中沒有隱式線程。 問題是:我們是否可以熱切地強制運行訂閱邏輯,以便每當有人訂閱該流時,就已經(jīng)對其進行了預(yù)先計算或至少開始了計算?
完全渴望評估
最明顯但有缺陷的解決方案是急于計算流返回的任何內(nèi)容,并簡單地將其包裝為固定的Flowable :
Flowable<String> eager() {final String slow = slow();return Flowable.just(slow); }不幸的是,這大大破壞了RxJava的目的。 首先,像subscribeOn()這樣的運算符將不再起作用,并且無法將計算卸載到其他線程。 更糟糕的是,即使eager()返回了Flowable但按照定義,它將始終阻止客戶端線程。 很難推理,組合和管理此類流。 通常,即使需要進行急切的評估,也應(yīng)避免使用這種模式,而應(yīng)選擇延遲加載。
使用
下一個示例僅使用cache()運算符:
Flowable<String> eager3() throws InterruptedException {final Flowable<String> cached =Flowable.fromCallable(this::slow).cache();cached.subscribe();return cached; }這個想法很簡單:用惰性Flowable包裝計算并緩存它。 cache()運算符所做的是,它會記住第一次訂閱時發(fā)出的所有事件,以便在出現(xiàn)第二個Subscriber ,它將接收相同的事件緩存序列。 但是cache()運算符(像大多數(shù)其他運算符一樣)是惰性的,因此我們必須第一次強制訂閱。 調(diào)用subscribe()將預(yù)填充緩存,此外,如果第二個訂戶出現(xiàn)在slow()計算完成之前,它將同樣等待它,而不是第二次啟動它。
此解決方案有效,但請記住,由于未涉及Scheduler , subscribe()實際上將被阻止。 如果要在后臺預(yù)填充Flowable ,請嘗試subscribeOn() :
Flowable<String> eager3() throws InterruptedException {final Flowable<String> cached =Flowable.fromCallable(this::slow).subscribeOn(justDontAlwaysUse_Schedulers.io()).cache();cached.subscribe();return cached; }是的,在生產(chǎn)系統(tǒng)上使用Schedulers.io()存在問題且難以維護,因此請避免使用自定義線程池。
錯誤處理
令人遺憾的是,吞噬RxJava中的異常非常容易。 如果slow()方法失敗,這就是我們上一個示例中可能發(fā)生的情況。 異常不會完全被吞沒,但是默認情況下,如果沒有人對此感興趣,它將在System.err上打印堆棧跟蹤。 同樣,未處理的異常也包裝在OnErrorNotImplementedException 。 如果您執(zhí)行任何形式的集中式日志記錄,則不太方便,很可能會丟失。 您可以使用doOnError()操作進行日志記錄,但它仍然通過例外下游RxJava認為未處理的為好,一次包裝與OnErrorNotImplementedException 。 因此,讓我們在subscribe()實現(xiàn)onError回調(diào):
Flowable<String> eager3() throws InterruptedException {final Flowable<String> cached =Flowable.fromCallable(this::slow).cache();cached.subscribe(x -> {/* ignore */},e -> logger.error("Prepopulation error", e));return cached; }我們不想處理實際事件,而只是處理subscribe()錯誤。 此時,您可以安全地返回此類Flowable 。 急切且有希望的是,只要您訂閱了它,數(shù)據(jù)就已經(jīng)可用。 注意,例如,Hystrix的observe()方法也很急切,而懶惰的toObservable()相反。 這是你的選擇。
翻譯自: https://www.javacodegeeks.com/2017/08/eager-subscription-rxjava-faq.html
總結(jié)
以上是生活随笔為你收集整理的渴望订阅– RxJava常见问题解答的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: stackexchange_通过Spri
- 下一篇: HTPC知识普及第一讲2