rxjava 并行_使用RxJava和Completable并行执行阻塞任务
rxjava 并行
通過RxJava 1.1.1中引入的Completable抽象,如何并行執行阻止“僅副作用”(也稱為void)任務的并行執行變得更加容易。 “
正如您可能已經注意到,閱讀我的博客時,我主要專注于軟件Craft.io和自動代碼測試。 但是,此外,我還是持續交付和廣義并發的狂熱者。 最后一點從C中的純線程和信號量到更高級別的解決方案(例如ReactiveX和actor模型)不等。 這次是全新RxJava 1.1.1 – rx.Completable引入的非常方便(在特定情況下)功能的用例。 與我的許多博客條目類似,這也反映了我在處理實際任務和用例時遇到的實際事件。
要做的任務
想象一下,一個系統對來自不同來源的異步事件進行了非常復雜的處理。 過濾,合并,轉換,分組,豐富等等。 RxJava非常適合這里,特別是如果我們想要React式的話。 假設我們已經實現了它(外觀和效果很好),只剩下一件事了。 在開始處理之前,需要告知3個外部系統我們準備好接收消息。 對遺留系統的3個同步調用(通過RMI,JMX或SOAP)。 它們每個都可以持續幾秒鐘,我們需要等待所有它們之后才能開始。 幸運的是,它們已經實現,我們將它們視為可能成功(或因異常而失敗)的黑匣子。 我們只需要調用它們(最好同時調用)并等待完成即可。
rx.Observable –方法1
觸手可及的RxJava似乎是顯而易見的方法。 首先,可以使用Observable來包裝作業執行:
private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;}); }不幸的是(在我們的例子中) Observable期望返回一些元素。 我們需要使用Void并且尷尬地return null (而不是僅僅引用方法job::execute 。
接下來,我們可以使用subscribeOn()方法來使用另一個線程來執行我們的作業(而不是阻塞主/當前線程–我們不想順序執行我們的作業)。 Schedulers.io()為調度Schedulers.io()提供了一組用于IO綁定工作的線程。
Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io()); Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());最后,我們需要等待所有它們完成(所有Obvervable s完成)。 為此,可以調整zip功能。 它結合了Obserbable拉鏈發射的物品的序列號。 在我們的例子中,我們只對每個Observable到的作業中的第一個偽項目感興趣(我們僅發出null以滿足API),并以阻塞的方式等待它們。 zip運算符中的zip函數需要返回某些內容,因此我們需要重復null的變通方法。
Observable.zip(run1, run2, (r1, r2) -> return null).toBlocking().single();顯而易見, Observable設計為Observable使用值流,并且需要進行一些額外的工作才能將其調整為僅產生副作用(不返回任何值)操作。 當我們需要將僅具有副作用的操作與其他返回一些值的值合并(例如合并)時,情況變得更加糟糕–需要更丑陋的轉換。 請參閱RxNetty API的實際用例 。
public void execute() {Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.zip(run1, run2, (r1, r2) -> null).toBlocking().single(); }private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;}); }rx.Observable –方法2
可能會使用另一種方法。 代替生成人工項目,可以將我們的任務的空Observable作為onComplete操作執行。 這迫使我們從zip操作切換到merge 。 結果,我們需要提供一個onNext動作(對于空的Observable永遠不會執行),這肯定了我們試圖破解該系統的信念。
public void execute() {Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.merge(run1, run2).toBlocking().subscribe(next -> {}); }private Observable<Object> rxJobExecute(Job job) {return Observable.empty().doOnCompleted(job::execute); }rx.Completable
RxJava 1.1.1解決了對不返回任何值的Observable的更好支持。 Completable可以視為Observable的簡化版本,可以成功完成(發出onCompleted事件)或失敗( onError )。 創建Completable實例的最簡單方法是使用fromAction方法,該方法采用不返回任何值的Action0 (例如Runnable )。
Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io()); Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());接下來,我們可以使用merge()方法,該方法返回一個Completable實例,該實例立即訂閱所有下游Completable ,并在它們全部完成(或其中一個失敗)時完成。 由于我們在外部調度程序中使用了subscribeOn方法,因此所有作業都是并行執行的(在不同的線程中)。
Completable.merge(completable1, completable2).await();await()方法將阻塞,直到所有作業完成(如果發生錯誤,將重新拋出異常)。 純凈而簡單。
public void execute() {Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());Completable.merge(completable1, completable2).await(); }java.util.concurrent.CompletableFuture
有人可能會問:為什么不只使用CompletableFuture ? 這將是一個很好的問題。 Java 5中引入的純Future可能需要我們做更多的工作,而ListenableFuture (來自Guava)和CompletableFuture (來自Java 8)使其變得微不足道。
首先,我們需要運行/安排作業執行。 接下來,使用CompletableFuture.allOf()方法,我們可以創建一個新的CompletableFuture ,它在所有作業完成時就完成了(我們以前沒有看到過這個概念嗎?)。 get()方法只是阻止等待。
public void execute() {try {CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute);CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute);CompletableFuture.allOf(run1, run2).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Jobs execution failed", e);} }我們需要對受檢查的異常做一些事情(很多時候我們不想使用它們來污染我們的API),但是總的來說,這看起來很明智。 但是,值得記住的是,當需要更復雜的鏈處理時, CompletableFuture不足。 另外,在我們的項目中已經使用RxJava時,使用相同(或相似)的API而不是引入全新的東西通常會很有用。
摘要
多虧了rx.Completable ,使用RxJava僅完成副作用(不返回任何內容)任務的執行更加輕松。 在已經使用RxJava的代碼庫中,即使在簡單情況下,它也可能比CompletableFuture更受歡迎。 但是, Completable提供了許多先進的操作員和技術,此外,還可以輕松地將它與Observable混合使用,這使其功能更加強大。
要了解有關Completable更多信息,您可能需要查看發行說明 。 對于那些想深入了解主題的人,Advanced RxJava博客( 第1部分和第2 部分 )上有關于Completable API的非常詳細的介紹。
- 可以從GitHub獲得代碼示例的源代碼。
順便說一句,如果您總體上對RxJava感興趣,我可以憑良心向您推薦一本書,該書目前由Tomasz Nurkiewicz和Ben Christensen – RxJava的React式編程編寫 。
翻譯自: https://www.javacodegeeks.com/2016/03/parallel-execution-blocking-tasks-rxjava-completable.html
rxjava 并行
總結
以上是生活随笔為你收集整理的rxjava 并行_使用RxJava和Completable并行执行阻塞任务的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: netbeans连接数据库_NetBea
- 下一篇: jdk 9和jdk8_了解有关JDK9紧
