使用RxJava和Completable并行执行阻止任务
借助RxJava 1.1.1中引入的Completable抽象,如何并行執(zhí)行阻止“僅副作用”(也稱為void)任務(wù)的并行執(zhí)行變得更加容易。 “
正如您可能已經(jīng)注意到,閱讀我的博客時(shí),我主要專注于軟件Craft.io和自動(dòng)代碼測(cè)試。 但是,此外,我還是持續(xù)交付和廣義并發(fā)的狂熱者。 最后一點(diǎn)從C中的純線程和信號(hào)量到更高級(jí)別的解決方案(例如ReactiveX和actor模型)不等。 這次是全新RxJava 1.1.1 – rx.Completable引入的非常方便(在特定情況下)功能的用例。 與我的許多博客條目類似,這也反映了我在處理實(shí)際任務(wù)和用例時(shí)遇到的實(shí)際事件。
要做的任務(wù)
想象一下,一個(gè)系統(tǒng)對(duì)來自不同來源的異步事件進(jìn)行了非常復(fù)雜的處理。 過濾,合并,轉(zhuǎn)換,分組,豐富等等。 RxJava非常適合這里,特別是如果我們想要反應(yīng)式的話。 假設(shè)我們已經(jīng)實(shí)現(xiàn)了它(外觀和效果很好),只剩下一件事了。 在開始處理之前,需要告知3個(gè)外部系統(tǒng)我們準(zhǔn)備好接收消息。 對(duì)舊系統(tǒng)的3個(gè)同步調(diào)用(通過RMI,JMX或SOAP)。 它們每個(gè)都可以持續(xù)幾秒鐘,我們需要在開始之前等待所有它們。 幸運(yùn)的是,它們已經(jīng)實(shí)現(xiàn),我們將它們視為可能成功(或失敗的例外)的黑匣子。 我們只需要調(diào)用它們(最好同時(shí)調(diào)用)并等待完成即可。
rx.Observable –方法1
觸手可及的RxJava似乎是顯而易見的方法。 首先,可以通過Observable來包裝作業(yè)執(zhí)行:
private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;}); }不幸的是(在我們的例子中) Observable期望返回一些元素。 我們需要使用Void并且尷尬地return null (而不是僅僅引用方法job::execute 。
接下來,我們可以使用subscribeOn()方法來使用另一個(gè)線程來執(zhí)行我們的工作(而不是阻塞主/當(dāng)前線程–我們不想順序地執(zhí)行我們的工作)。 Schedulers.io()為調(diào)度Schedulers.io()提供了一組用于IO綁定工作的線程。
Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io()); Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());最后,我們需要等待所有它們完成(所有Obvervable s完成)。 為此,可以調(diào)整zip功能。 它結(jié)合了Obserbable拉鏈發(fā)射的物品的序列號(hào)。 在我們的例子中,我們只對(duì)每個(gè)Observable到的作業(yè)中的第一個(gè)偽項(xiàng)目感興趣(我們僅發(fā)出null以滿足API),并以阻塞的方式等待它們。 zip運(yùn)算符中的zip函數(shù)需要返回某些內(nèi)容,因此我們需要重復(fù)null的變通方法。
Observable.zip(run1, run2, (r1, r2) -> return null).toBlocking().single();顯而易見, Observable設(shè)計(jì)為Observable使用值流,并且需要進(jìn)行一些額外的工作才能將其調(diào)整為僅產(chǎn)生副作用(不返回任何值)操作。 當(dāng)我們需要將僅具有副作用的操作與其他返回一些值的值合并(例如合并)時(shí),情況變得更加糟糕–需要更丑陋的轉(zhuǎn)換。 請(qǐng)參閱RxNetty API的實(shí)際用例 。
public void execute() {Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.zip(run1, run2, (r1, r2) -> null).toBlocking().single(); }private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;}); }rx.Observable –方法2
可能會(huì)使用另一種方法。 代替生成人工項(xiàng)目,可以將我們的任務(wù)中的空Observable作為onComplete操作執(zhí)行。 這迫使我們從zip操作切換到merge 。 結(jié)果,我們需要提供一個(gè)onNext動(dòng)作(對(duì)于空的Observable永遠(yuǎn)不會(huì)執(zhí)行),這肯定了我們?cè)噲D破解該系統(tǒng)的信念。
public void execute() {Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.merge(run1, run2).toBlocking().subscribe(next -> {}); }private Observable<Object> rxJobExecute(Job job) {return Observable.empty().doOnCompleted(job::execute); }可完成
RxJava 1.1.1解決了對(duì)不返回任何值的Observable的更好支持。 Completable可以視為Observable的簡(jiǎn)化版本,可以成功完成(發(fā)出onCompleted事件)或失敗( onError )。 創(chuàng)建Completable實(shí)例的最簡(jiǎn)單方法是使用fromAction方法,該方法采用不返回任何值的Action0 (例如Runnable )。
Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io()); Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());接下來,我們可以使用merge()方法,該方法返回一個(gè)Completable實(shí)例,該實(shí)例立即訂閱所有下游Completable ,并在所有下游Completable完成(或其中一個(gè)失敗)時(shí)完成。 當(dāng)我們使用帶外部調(diào)度程序的subscribeOn方法時(shí),所有作業(yè)將并行執(zhí)行(在不同線程中)。
Completable.merge(completable1, completable2).await();await()方法將阻塞,直到所有作業(yè)完成(如果發(fā)生錯(cuò)誤,將重新拋出異常)。 純粹而簡(jiǎn)單。
public void execute() {Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());Completable.merge(completable1, completable2).await(); }java.util.concurrent.CompletableFuture
有人可能會(huì)問:為什么不只使用CompletableFuture ? 這將是一個(gè)很好的問題。 Java 5中引入的純Future可能需要我們做更多的工作,而ListenableFuture (來自Guava)和CompletableFuture (來自Java 8)使其變得微不足道。
首先,我們需要運(yùn)行/安排作業(yè)執(zhí)行。 接下來,使用CompletableFuture.allOf()方法,我們可以創(chuàng)建一個(gè)新的CompletableFuture ,該工作在所有作業(yè)完成CompletableFuture完成了(我們之前沒有看到過這個(gè)概念嗎?)。 get()方法只是阻止等待。
public void execute() {try {CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute);CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute);CompletableFuture.allOf(run1, run2).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Jobs execution failed", e);} }我們需要對(duì)受檢查的異常進(jìn)行處理(很多時(shí)候我們不想用它們來污染我們的API),但是總的來說,這看起來很明智。 但是,值得記住的是,當(dāng)需要更復(fù)雜的鏈處理時(shí), CompletableFuture不足。 此外,在我們的項(xiàng)目中已經(jīng)使用RxJava時(shí),使用相同(或相似)的API而不是引入全新的東西通常會(huì)很有用。
摘要
多虧了rx.Completable ,使用RxJava僅完成副作用(不返回任何內(nèi)容)任務(wù)的執(zhí)行要舒適得多。 在已經(jīng)使用RxJava的代碼庫(kù)中,即使在簡(jiǎn)單情況下,它也可能比CompletableFuture更為可取。 但是, Completable提供了許多先進(jìn)的操作員和技術(shù),此外,還可以輕松地將它與Observable混合使用,這使其功能更加強(qiáng)大。
要了解有關(guān)Completable更多信息,您可能需要查看發(fā)行說明 。 對(duì)于那些想更深入地了解主題的人,Advanced RxJava博客( 第1部分和第2 部分 )上有關(guān)于Completable API的非常詳細(xì)的介紹。
- 可以從GitHub獲得代碼示例的源代碼。
順便說一句,如果您總體上對(duì)RxJava感興趣,我可以憑良心向您推薦一本書,該書目前由Tomasz Nurkiewicz和Ben Christensen – RxJava的反應(yīng)式編程編寫 。
翻譯自: https://www.javacodegeeks.com/2016/03/parallel-execution-blocking-tasks-rxjava-completable.html
總結(jié)
以上是生活随笔為你收集整理的使用RxJava和Completable并行执行阻止任务的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 外币理财产品起息日指?
- 下一篇: 支付宝要承诺函严重吗?
