Java 8:正在运行的CompletableFuture
在Java 8中全面研究了CompletableFuture API之后,我們準備編寫一個簡單的Web搜尋器。 我們已經使用ExecutorCompletionService , Guava ListenableFuture和Scala / Akka解決了類似的問題。 我選擇了相同的問題,以便輕松比較方法和實現技術。
 首先,我們將定義一個簡單的阻止方法來下載單個URL的內容: 
沒有什么花哨。 稍后將為線程池內的其他站點調用此方法。 另一種方法將String解析為XML Document (讓我省略實現,沒有人愿意看一下它):
private Document parse(String xml) //...最后,我們算法的核心是以Document為輸入的每個網站的功能計算相關性 。 就像上面我們不在乎實現一樣,只有簽名很重要:
private CompletableFuture<Double> calculateRelevance(Document doc) //...讓我們把所有的東西放在一起。 抓取到一份網站列表后,我們的搜尋器應開始異步并發下載每個網站的內容。 然后,將每個下載HTML字符串解析為XML Document并隨后計算相關性 。 最后,我們采用所有計算出的相關性指標并找到最大的指標。 當您意識到下載內容和計算相關性都是異步的(返回CompletableFuture )并且我們絕對不想阻塞或忙于等待時,這聽起來很簡單。 這是第一部分:
ExecutorService executor = Executors.newFixedThreadPool(4);List<String> topSites = Arrays.asList("www.google.com", "www.youtube.com", "www.yahoo.com", "www.msn.com" );List<CompletableFuture<Double>> relevanceFutures = topSites.stream().map(site -> CompletableFuture.supplyAsync(() -> downloadSite(site), executor)).map(contentFuture -> contentFuture.thenApply(this::parse)).map(docFuture -> docFuture.thenCompose(this::calculateRelevance)).collect(Collectors.<CompletableFuture<Double>>toList());實際上這里有很多事情。 定義要爬網的線程池和站點是顯而易見的。 但是這種鏈式表達式計算relevanceFutures 。 最后的map()和collect()的序列具有很強的描述性。 從網站列表開始,我們通過將異步任務( downloadSite() )提交到線程池中,將每個網站( String )轉換為CompletableFuture<String> 。
因此,我們有了CompletableFuture<String> 。 我們繼續對其進行轉換,這一次在每個parse()上都應用了parse()方法。 請記住,當基礎將來完成時, thenApply()將調用提供的lambda并立即返回CompletableFuture<Document> 。 第三個也是最后一個轉換步驟是使用calculateRelevance()將輸入列表中的每個CompletableFuture<Document>組成。 請注意, calculateRelevance()返回CompletableFuture<Double>而不是Double ,因此我們使用thenCompose()而不是thenApply() 。 經過這么多階段,我們終于collect()了CompletableFuture<Double> 。
現在,我們想對所有結果進行一些計算。 我們有一份期貨清單,我們想知道所有這些期貨(最后一個)何時完成。 當然,我們可以在每個將來注冊完成回調,并使用CountDownLatch阻止直到調用所有回調。 我對此很懶,讓我們利用現有的CompletableFuture.allOf() 。 不幸的是,它有兩個小缺點-使用vararg而不是Collection ,并且不返回將來的合計結果,而是返回Void 。 通過匯總結果,我的意思是:如果我們提供List<CompletableFuture<Double>>該方法應返回CompletableFuture<List<Double>> ,而不是CompletableFuture<Void> ! 幸運的是,使用一些粘合代碼很容易修復:
private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {CompletableFuture<Void> allDoneFuture =CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));return allDoneFuture.thenApply(v ->futures.stream().map(future -> future.join()).collect(Collectors.<T>toList())); }仔細觀察sequence()參數和返回類型。 實現非常簡單,訣竅是使用現有的allOf()但是當allDoneFuture完成時(這意味著所有基礎期貨都已完成),只需遍歷所有期貨并在每個期貨上進行join() (阻塞等待)。 但是,由于目前所有期貨都已完成,因此保證此電話不會被阻止! 有了這種實用程序,我們終于可以完成我們的任務:
CompletableFuture<List<Double>> allDone = sequence(relevanceFutures); CompletableFuture<OptionalDouble> maxRelevance = allDone.thenApply(relevances ->relevances.stream().mapToDouble(Double::valueOf).max() );這很容易–當allDone完成后,應用我們的功能即可計算整個集合中的最大相關性。 maxRelevance仍然是未來。 到JVM到達這一行時,可能尚未下載任何網站。 但是我們將業務邏輯封裝在期貨之上,并以事件驅動的方式將其堆疊。 代碼保持可讀性(不帶lambda和普通Future的版本至少要長兩倍),但避免阻塞主線程。 當然, allDone也可以作為中間步驟,我們可以對其進行進一步的轉換,而實際上還沒有結果。
缺點
Java 8中的CompletableFuture是向前邁出的一大步。 從對異步任務的細微抽象到功能完善,功能豐富的實用程序。 但是,在玩了幾天之后,我發現了一些小缺點:
- 返回前面提到的CompletableFuture<Void> CompletableFuture.allOf() 。 我認為可以這樣說:如果我通過一組期貨并希望等待所有這些期貨,那么我也想在它們容易到達時提取結果。 使用CompletableFuture.anyOf()甚至更糟。 如果我等待任何期貨完成,那么我將無法想象傳遞不同類型的期貨,比如說CompletableFuture<Car>和CompletableFuture<Restaurant> 。 如果我不在乎哪個先完成,那么我該如何處理返回類型? 通常,您將傳遞同類期貨的集合(例如CompletableFuture<Car> ),然后anyOf()可以簡單地返回該類型的期貨(而不是再次代替CompletableFuture<Void> )。
- 混合可設置和可聽的抽象。 在番石榴中,有ListenableFuture和SettableFuture擴展。 ListenableFuture允許注冊回調,而SettableFuture增加了從任意線程和上下文設置(解析)將來值的可能性。 CompletableFuture與SettableFuture等效,但是沒有等效于ListenableFuture受限版本。 為什么會出問題呢? 如果API返回CompletableFuture ,然后有兩個線程等待它完成(這沒什么問題),那么其中一個線程可以解決此將來并喚醒其他線程,而只有API實現才可以執行此操作。 但是,當API嘗試在以后解決將來時,對complete()調用將被忽略。 它可能會導致真正令人討厭的錯誤,在Guava中,將這兩個責任分開可以避免。
- 在JDK中, CompletableFuture被忽略。 未對ExecutorService進行改裝以返回CompletableFuture 。 從字面上看, CompletableFuture在JDK中未引用任何地方。 這是一個非常有用的類,與Future向下兼容,但在標準庫中并未真正推廣。
- 膨脹的API(?)總共50種方法,大多數為三種形式。 拆分可設置和可聽 (見上文)將有所幫助。 同樣,恕我直言,諸如runAfterBoth()或runAfterEither()類的某些方法runAfterBoth()并不屬于任何CompletableFuture 。 fast.runAfterBoth(predictable, ...)和predictable.runAfterBoth(fast, ...)之間有區別嗎? 否,但是API支持兩者之一。 實際上,我相信runAfterBoth(fast, predictable, ...)更好地表達我的意圖。
-  CompletableFuture.getNow(T)應該使用Supplier<T>而不是原始引用。 在下面的示例中,無論將來是否完成, expensiveAlternative()始終是代碼: future.getNow(expensiveAlternative());但是,我們可以輕松地調整此行為(我知道,這里有一個小的競爭條件,但是原始的getNow()也可以這種方式工作): public static <T> T getNow(CompletableFuture<T> future,Supplier<T> valueIfAbsent) throws ExecutionException, InterruptedException {if (future.isDone()) {return future.get();} else {return valueIfAbsent.get();} }使用此實用程序方法,我們可以避免在不需要時調用expensiveAlternative() : getNow(future, () -> expensiveAlternative()); //or: getNow(future, this::expensiveAlternative);
 總體而言, CompletableFuture是我們JDK腰帶中的一款出色的新工具。 較小的API問題,有時由于有限的類型推斷而導致語法過于冗長,這不會阻止您使用它。 至少它為更好的抽象和更健壯的代碼奠定了堅實的基礎。 
翻譯自: https://www.javacodegeeks.com/2013/05/java-8-completablefuture-in-action.html
總結
以上是生活随笔為你收集整理的Java 8:正在运行的CompletableFuture的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 出口贸易备案登记表(出口贸易备案)
- 下一篇: linux文件读取(linux 文件读取
