非阻塞式异步Java 8和Scala的Try / Success / Failure
受Heinz Kabutz最近的時事通訊以及我在最近的書中研究的Scala的期貨的啟發(fā),我著手使用Java 8編寫了一個示例,該示例如何將工作提交給執(zhí)行服務并異步地響應其結果,并使用了回調(diào)。無需阻止任何線程等待執(zhí)行服務的結果。
理論認為,調(diào)用攔截方法,如get上java.util.concurrent.Future是壞的,因為系統(tǒng)會需要超過線程的最佳數(shù)量,如果它是不斷做工作,并在浪費時間與結果上下文切換 。
在Scala世界中,像Akka這樣的框架都使用編程模型,這意味著這些框架永遠不會阻塞-線程阻塞的唯一時間是用戶對阻塞的對象進行編程時,他們不愿意這樣做。 通過永不阻塞,該框架可以避免每個內(nèi)核使用大約一個線程,這比說說標準JBoss Java EE Application Server(在啟動后最多擁有400個線程)要少得多。 很大程度上歸功于Akka框架的工作,Scala 2.10添加了Futures和Promises ,但是Java中還不存在這些東西。
以下代碼顯示了我的預期目標。 它分為三個部分。 首先,使用在類ch.maxant.async.Future找到的static future方法將新任務添加到執(zhí)行服務中。 它返回一個Future ,但不是從java.util.concurrent包中返回一個Future ,而是從ch.maxant.async包中返回其子類。 其次, Future具有一種名為map的方法,該方法遵循Scala或新的Java 8 Stream類的功能樣式。 map方法允許您注冊一個回調(diào),或更準確地說,我們可以將第一個future包含的值映射(轉(zhuǎn)換)為新值。 在第一個Future完成后,映射將在將來的其他時間執(zhí)行,因此會產(chǎn)生新的Future 。 第三,我們在Future類中使用另一個方法來注冊一個回調(diào),一旦我們創(chuàng)建的所有期貨都完成,該回調(diào)將運行。 任何時候都不會使用Future API的任何阻塞方法!
final Random random = new Random(); int numTasks = 10; List<Future<Integer>> futures = new ArrayList<>();for(int i = 0; i < numTasks; i++){final int j = i;log("adding future " + i);// PART 1//start some work async / in the futureFuture<String> f = future(new Task<String>( () -> {sleep(random.nextInt(1000));if(j < 5){log("working success");return "20";}else{log("working failure");throw new Exception();}}));// PART 2//register a callback, to be called when the work is donelog("adding mapping callback to future");final Future<Integer> f2 = f.map( (Try<String> stringNumber) -> {return stringNumber.map( (String s) -> {log("mapping '" + s + "' to int");return Integer.parseInt(s);}).recover( (Exception e) -> {log("recovering");return -10;}).get(); //wont throw an exception, because we provided a recovery!});futures.add(f2); }// PART 3 log("registering callback for final result"); Future.registerCallback(futures, (List<Try<Integer>> results) -> {Integer finalResult = results.stream().map( (Try<Integer> t) -> {log("mapping " + t);try {return t.get();} catch (Exception e) {return 0;}}).reduce(0, (Integer i1, Integer i2) -> {log("reducing " + i1 + " and " + i2);return i1 + i2;});log("final result is " + finalResult);Future.shutdown();if(finalResult != 50){throw new RuntimeException("FAILED");}else{log("SUCESS");} });System.out.println("Completed submitting all tasks on thread " + Thread.currentThread().getId());//this main thread will now die, but the Future executor is still up and running. the callback will shut it down and with it, the jvm.第11行調(diào)用了future方法來注冊一個新Task ,該Task是使用Work實例構造的,在這里是使用Java 8 lambda構造的。 工作會睡一會兒,然后要么返回數(shù)字20(作為字符串),要么拋出異常,以演示如何處理錯誤。
使用第11行從執(zhí)行服務返回的Future ,第25行將其值從字符串映射到整數(shù),從而生成Future<Integer>而不是Future<String> 。 該結果將添加到第35行的Future列表中,第3部分在第40行中使用該列表registerCallback方法將確保在完成最后一個future之后調(diào)用給定的回調(diào)。
第25-33行的映射使用傳遞給Try對象的lambda完成。 Try有點像Java 8 Optional ,它是Success和Failure類的抽象(超類),我是根據(jù)對Scala對應類的了解而實現(xiàn)的。 與必須顯式檢查錯誤相比,它可使程序員更輕松地處理故障。 我對Try接口的實現(xiàn)如下:
public interface Try<T> {/** returns the value, or throws an exception if its a failure. */T get() throws Exception;/** converts the value using the given function, resulting in a new Try */<S> Try<S> map(Function1<T, S> func);/** can be used to handle recovery by converting the exception into a {@link Try} */Try<T> recover(Recovery<T> r);}發(fā)生的事情是Success和Failure的實現(xiàn)可以優(yōu)雅地處理錯誤。 例如,如果第一個清單的第11行上的Future完成但有例外,則將第一個清單的第25行上的lambda傳遞給Failure對象,并且在Failure上調(diào)用map方法絕對沒有任何作用。 沒有例外,沒有任何問題。 為了進行補償,您可以在第一個清單的第29行上調(diào)用recover方法,該方法使您可以處理異常并返回程序可以繼續(xù)使用的值,例如默認值。
另一方面, Success類以不同的方式實現(xiàn)Try接口的map和recover方法,這樣,調(diào)用map會導致給定的函數(shù)被調(diào)用,但是調(diào)用recover絕對不會執(zhí)行任何操作。 map和recover方法無需顯式編碼try / catch塊,而是提供了一種更好的語法,該語法在讀取或查看代碼時更容易驗證(與編寫相比,這種情況在代碼中更常見)。
由于map和recover方法將函數(shù)的結果包裝在Try s中,因此可以將調(diào)用鏈接在一起,例如第Try和32行。Scala的Try API具有比我在這里實現(xiàn)的三種方法更多的方法。 請注意,我選擇在Try API java.util.function.Function不使用java.util.function.Function ,因為它的apply方法不會throw Exception ,這意味著第一個清單中顯示的代碼不像現(xiàn)在那樣好。 相反,我寫了
Function1接口。
難題的第3部分是如何使程序在所有Future完成之后做一些有用的事情,而又不會像對Future#get()方法那樣討厭調(diào)用。 解決方案是注冊一個回調(diào),如第40行所示。該回調(diào)與此處顯示的所有其他回調(diào)一樣,都已提交給執(zhí)行服務。 這意味著我們無法保證哪個線程可以運行它,這會帶來副作用,即線程本地存儲(TLS)不再起作用-某些框架((的較舊版本?)Hibernate依賴TLS,而它們只會勝任)。在這里工作。 Scala有一個很好的方法可以使用implicit關鍵字來解決該問題,而Java還沒有(但…?),因此需要使用其他機制。 我在提到它,只是為了讓您知道它。
因此,當最后一個Future完成時,將調(diào)用40-60行,并傳遞包含Integer而不是Future的Try的List 。 registerCallback方法將期貨轉(zhuǎn)換為適當?shù)腟uccess或Failure 。 但是我們?nèi)绾螌⑺鼈冝D(zhuǎn)換為有用的東西呢? 幸運的是,Java 8現(xiàn)在有了一個簡單的map / reduce,就支持了Stream類,該類通過調(diào)用stream()方法從第42行的Try集合中Try了。 首先,我將Try映射(轉(zhuǎn)換)為它們的值,然后在第49行上將流減少為單個值。我本可以使用而不是傳遞自己的求和值的lambda實現(xiàn)。
Integer::sum ,例如someStream.reduce(0, Integer::sum) 。
我上次運行該程序時,它輸出以下內(nèi)容:
Thread-1 says: adding future 0 Thread-1 says: adding mapping callback to future Thread-1 says: adding future 1 Thread-1 says: adding mapping callback to future Thread-1 says: adding future 2 Thread-1 says: adding mapping callback to future Thread-1 says: adding future 3 Thread-1 says: adding mapping callback to future Thread-1 says: adding future 4 Thread-1 says: adding mapping callback to future Thread-1 says: adding future 5 Thread-1 says: adding mapping callback to future Thread-1 says: adding future 6 Thread-1 says: adding mapping callback to future Thread-1 says: adding future 7 Thread-1 says: adding mapping callback to future Thread-1 says: adding future 8 Thread-1 says: adding mapping callback to future Thread-1 says: adding future 9 Thread-1 says: adding mapping callback to future Thread-1 says: registering callback for final result Thread-10 says: working success Completed submitting all tasks on thread 1 Thread-14 says: working success Thread-10 says: working failure Thread-14 says: working failure Thread-12 says: working success Thread-10 says: working failure Thread-10 says: mapping '20' to int Thread-10 says: mapping '20' to int Thread-10 says: recovering Thread-10 says: recovering Thread-10 says: mapping '20' to int Thread-10 says: recovering Thread-11 says: working success Thread-11 says: mapping '20' to int Thread-13 says: working success Thread-10 says: mapping '20' to int Thread-12 says: working failure Thread-12 says: recovering Thread-14 says: working failure Thread-14 says: recovering Thread-14 says: mapping Success(20) Thread-14 says: mapping Success(20) Thread-14 says: mapping Success(20) Thread-14 says: mapping Success(20) Thread-14 says: mapping Success(20) Thread-14 says: mapping Success(-10) Thread-14 says: mapping Success(-10) Thread-14 says: mapping Success(-10) Thread-14 says: mapping Success(-10) Thread-14 says: mapping Success(-10) Thread-14 says: final result is 50 Thread-14 says: SUCESS如您所見,主線程添加了所有任務并注冊了所有映射功能(第1-20行)。 然后,它注冊回調(diào)(輸出的第21行,與清單的第39行相對應),最后從清單中的第63行輸出文本,此后它死了,因為它無事可做。 然后,輸出的第22行和第24-42行顯示了池中的各個線程(包含5個線程),這些線程處理工作以及從String到Integer的映射或從異常中恢復。 這是第一個清單的第1部分和第2部分中的代碼。 您可以看到它是完全異步的,在所有初始工作完成之前會發(fā)生一些映射/恢復(將第38行或第40行分別映射到輸出的第41行,并將其映射到輸出的第41行,此行是最后的輸出)最初的工作)。 第43-52行是map / reduce的輸出,它是主列表的第3部分。 請注意,沒有記錄reduce,因為我運行的代碼(位于Github上)使用上面提到的Integer::sum快捷方式,而不是上面顯示的第一個清單的第50-51行。
盡管使用Java 6(甚至5?)可以實現(xiàn)所有這些功能,例如,通過獲取提交到池中的任務來自己提交回調(diào),但是一旦完成,執(zhí)行該操作所需的代碼量就會更大,并且代碼本身將比此處顯示的代碼丑陋。 可以使用回調(diào)進行映射的Java 8 lambda, Future以及具有簡潔錯誤處理功能的Try API都可以使此處所示的解決方案更具可維護性。
上面顯示的代碼以及ch.maxant.async包中類的代碼在Apache License Version 2.0下可用,并且可以從我的Github帳戶下載。
翻譯自: https://www.javacodegeeks.com/2013/10/non-blocking-asynchronous-java-8-and-scalas-trysuccessfailure.html
總結
以上是生活随笔為你收集整理的非阻塞式异步Java 8和Scala的Try / Success / Failure的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pk10官方下载(pk10安卓)
- 下一篇: Java 8中的instanceof运算