并发查询parallel_惯用并发:flatMap()与parallel()– RxJava常见问题解答
并發(fā)查詢parallel
簡單,有效和安全的并發(fā)是RxJava的設計原則之一。 然而,具有諷刺意味的是,它可能是該庫中最容易被誤解的方面之一。 讓我們舉一個簡單的例子:假設我們有一堆UUID并且對于每個UUID ,我們必須執(zhí)行一組任務。 第一個問題是每個UUID都要執(zhí)行I / O密集型操作,例如,從數據庫加載對象:
Flowable<UUID> ids = Flowable.fromCallable(UUID::randomUUID).repeat().take(100);ids.subscribe(id -> slowLoadBy(id));首先,為了測試,我將生成100個隨機UUID。 然后,對于每個UUID,我想使用以下方法加載記錄:
Person slowLoadBy(UUID id) {//... }slowLoadBy()的實現(xiàn)是無關緊要的,請記住它是緩慢且阻塞的。 使用subscribe()調用slowLoadBy()有許多缺點:
- subscribe()根據設計是單線程的,無法解決。 每個UUID順序加載
- 當您調用subscribe() ,無法進一步轉換Person對象。 這是終端操作
一種更健壯,甚至更殘破的方法是map()每個UUID :
Flowable<Person> people = ids.map(id -> slowLoadBy(id)); //BROKEN這是非常可讀的,但不幸的是損壞了。 就像訂閱者一樣,運算符也是單線程的。 這意味著在任何給定時間只能映射一個UUID ,此處也不允許并發(fā)。 更糟糕的是,我們從上游繼承線程/工作者。 這有幾個缺點。 如果上游使用某些專用的調度程序產生事件,我們將劫持該調度程序中的線程。 例如,許多操作符(例如interval() Schedulers.computation()透明地使用Schedulers.computation()線程池。 我們突然開始在完全不適合該功能的池上執(zhí)行I / O密集型操作。 此外,我們通過這一阻塞性順序步驟降低了整個管道的速度。 非常非常糟糕。
您可能已經聽說過這個subscribeOn()運算符,以及它如何啟用并發(fā)。 確實,但是在應用它時必須非常小心。 以下示例(再次)是錯誤的 :
import io.reactivex.schedulers.Schedulers;Flowable<Person> people = ids.subscribeOn(Schedulers.io()).map(id -> slowLoadBy(id)); //BROKEN上面的代碼段仍然損壞。 observeOn() subscribeOn() (以及該事件的observeOn() )幾乎不會將執(zhí)行切換到其他工作程序(線程),而不會引入任何并發(fā)性。 流仍然按順序處理所有事件,但是在不同的線程上。 換句話說,我們現(xiàn)在不是在從上游繼承的線程上順序使用事件,而是在io()線程上順序使用事件。 那么,這個神話般的flatMap()運算符呢?
flatMap()運算符可以進行救援
flatMap()運算符通過將事件流分成子流來啟用并發(fā)。 但首先,還有一個破碎的示例:
Flowable<Person> asyncLoadBy(UUID id) {return Flowable.fromCallable(() -> slowLoadBy(id)); }Flowable<Person> people = ids.subscribeOn(Schedulers.io()).flatMap(id -> asyncLoadBy(id)); //BROKEN哦,天哪,這還是壞了 ! flatMap()運算符在邏輯上做兩件事:
- 在每個上游事件上應用轉換( id -> asyncLoadBy(id) )–這將產生Flowable<Flowable<Person>> 。 這是有道理的,對于每個上游UUID我們都有一個Flowable<Person>因此最終得到的是Person對象流
- 然后flatMap()嘗試一次訂閱所有這些內部子流。 每當任何子流發(fā)出Person事件時,它都會作為外部Flowable的結果透明傳遞。
從技術上講, flatMap()僅創(chuàng)建并預訂前128個(默認情況下,可選的maxConcurrency參數)子流。 同樣,當最后一個子流完成時, Person外部流也將完成。 現(xiàn)在,這到底為什么被打破? 除非明確要求,否則RxJava不會引入任何線程池。 例如,這段代碼仍在阻塞:
log.info("Setup"); Flowable<String> blocking = Flowable.fromCallable(() -> {log.info("Starting");TimeUnit.SECONDS.sleep(1);log.info("Done");return "Hello, world!";}); log.info("Created"); blocking.subscribe(s -> log.info("Received {}", s)); log.info("Done");仔細查看輸出,特別是涉及的事件和線程的順序:
19:57:28.847 | INFO | main | Setup 19:57:28.943 | INFO | main | Created 19:57:28.949 | INFO | main | Starting 19:57:29.954 | INFO | main | Done 19:57:29.955 | INFO | main | Received Hello, world! 19:57:29.957 | INFO | main | Done沒有任何并發(fā)??,沒有額外的線程。 僅將阻塞代碼包裝在Flowable中不會神奇地增加并發(fā)性。 您必須顯式使用… subscribeOn() :
log.info("Setup"); Flowable<String> blocking = Flowable.fromCallable(() -> {log.info("Starting");TimeUnit.SECONDS.sleep(1);log.info("Done");return "Hello, world!";}).subscribeOn(Schedulers.io()); log.info("Created"); blocking.subscribe(s -> log.info("Received {}", s)); log.info("Done");這次的輸出更有希望:
19:59:10.547 | INFO | main | Setup 19:59:10.653 | INFO | main | Created 19:59:10.662 | INFO | main | Done 19:59:10.664 | INFO | RxCachedThreadScheduler-1 | Starting 19:59:11.668 | INFO | RxCachedThreadScheduler-1 | Done 19:59:11.669 | INFO | RxCachedThreadScheduler-1 | Received Hello, world!但是我們上次確實使用了subscribeOn() ,這是怎么回事? 嗯,外部流級別的subscribeOn()基本上說所有事件都應在此流中的不同線程上順序處理。 我們并沒有說應該同時運行許多子流。 并且由于所有子流都處于阻塞狀態(tài),因此當RxJava嘗試訂閱所有子流時,它會有效地依次依次訂閱。 asyncLoadBy()并不是真正的async ,因此當flatMap()運算符嘗試對其進行訂閱時,它會阻塞。 修復很容易。 通常,您會將subscribeOn()放在asyncLoadBy()但出于教育目的,我將其直接放置在asyncLoadBy()道中:
Flowable<Person> people = ids.flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()));現(xiàn)在它就像一個魅力! 默認情況下,RxJava將接收前128個上游事件( UUID ),將它們轉換為子流并訂閱所有這些事件。 如果子流是異步且高度可并行化的(例如,網絡調用), asyncLoadBy()獲得128個并發(fā)調用asyncLoadBy() 。 并發(fā)級別(128)可通過maxConcurrency參數配置:
Flowable<Person> people = ids.flatMap(id ->asyncLoadBy(id).subscribeOn(Schedulers.io()),10 //maxConcurrency);那是很多工作,您不覺得嗎? 并發(fā)不應該更具聲明性嗎? 我們不再處理Executor和期貨,但似乎這種方法太容易出錯。 它不能像Java 8流中的parallel()一樣簡單嗎?
輸入ParallelFlowable
讓我們首先來看一下我們的示例,并通過添加filter()使它更加復雜:
Flowable<Person> people = ids.map(this::slowLoadBy) //BROKEN.filter(this::hasLowRisk); //BROKENhasLowRisk()是慢速謂詞:
boolean hasLowRisk(Person p) {//slow... }我們已經知道,針對此問題的慣用方法是使用flatMap()兩次:
Flowable<Person> people = ids.flatMap(id -> asyncLoadBy(id).subscribeOn(io())).flatMap(p -> asyncHasLowRisk(p).subscribeOn(io()));asyncHasLowRisk()相當模糊-謂詞通過時返回單元素流,失敗則返回空流。 這是使用flatMap()模擬filter() flatMap() 。 我們可以做得更好嗎? 從RxJava 2.0.5開始,有一個新的運算符叫做… parallel() ! 令人驚訝的是,由于許多誤解和濫用,在RxJava成為1.0之前已刪除了同名的運算符。 2.x中的parallel()似乎最終以一種安全且聲明性的方式解決了慣用并發(fā)問題。 首先,讓我們看一些漂亮的代碼!
Flowable<Person> people = ids.parallel(10).runOn(Schedulers.io()).map(this::slowLoadBy).filter(this::hasLowRisk).sequential();就這樣! parallel()和sequential()之間的代碼塊parallel()運行。 我們有什么在這里? 首先,新的parallel()運算符將Flowable<UUID>轉換為ParallelFlowable<UUID> ,該API的API比Flowable小得多。 您將在第二秒看到原因。 可選的int參數(在我們的例子中為10 )定義并發(fā)性,或者(如文檔所述)定義創(chuàng)建并發(fā)“ rails”的數量。 因此,對于我們來說,我們將單個Flowable<Person>分成10個并發(fā)的獨立軌道(認為是thread )。 來自UUID原始流的事件被拆分( modulo 10 )為彼此獨立的不同軌,子流。 將它們視為將上游事件發(fā)送到10個單獨的線程中。 但是首先我們必須使用方便的runOn()運算符定義這些線程的來源。 這比Java 8流上的parallel()好得多,在Java 8流上,您無法控制并發(fā)級別。
至此,我們有了一個ParallelFlowable 。 當事件出現(xiàn)在上游( UUID )中時,它將委派給10個“軌道”,并發(fā),獨立的管道之一。 管道提供了可以安全地同時運行的運算符的有限子集,例如map()和filter() ,還包括reduce() 。 沒有buffer() , take()等,因為一次在多個子流上調用它們的語義尚不清楚。 我們的阻塞slowLoadBy()和hasLowRisk()仍按順序調用,但僅在單個“ rail”內部。 因為我們現(xiàn)在有10個并發(fā)的“ rails”,所以我們無需花費太多精力就可以有效地并行化它們。
當事件到達子流(“軌道”)的末尾時,它們會遇到sequential()運算符。 該運算符將ParallelFlowable回Flowable 。 只要我們的映射器和過濾器是線程安全的, parallel() / sequential()對就提供了非常簡單的并行化流的方法。 一個小警告-您將不可避免地使郵件重新排序。 順序map()和filter()始終保留順序(就像大多數運算符一樣)。 但是,一旦在parallel()塊中運行它們,順序就會丟失。 這允許更大的并發(fā)性,但是您必須牢記這一點。
是否應該使用parallel()而不是嵌套的flatMap()來并行化代碼? 這取決于您,但是parallel()似乎更容易閱讀和掌握。
翻譯自: https://www.javacodegeeks.com/2017/09/idiomatic-concurrency-flatmap-vs-parallel-rxjava-faq.html
并發(fā)查詢parallel
總結
以上是生活随笔為你收集整理的并发查询parallel_惯用并发:flatMap()与parallel()– RxJava常见问题解答的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 戴尔台式电脑进不了winpe(戴尔电脑进
- 下一篇: 索爱C905和K858(索爱c902)