哪个线程执行CompletableFuture的任务和回调?
盡管CompletableFuture大約是兩年前(!)于2014年3月在Java 8中引入的,但它仍然是一個相對較新的概念。但是,此類不是很廣為人知是一件好事,因為它很容易被濫用,尤其是在線程和線程方面。一路涉及的線程池。 本文旨在描述如何將線程與CompletableFuture一起使用。
運行任務
這是API的基本部分。 有一個便捷的supplyAsync()方法類似于ExecutorService.submit() ,但是返回CompletableFuture :
CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {try (InputStream is = new URL("http://www.nurkiewicz.com").openStream()) {log.info("Downloading");return IOUtils.toString(is, StandardCharsets.UTF_8);} catch (IOException e) {throw new RuntimeException(e);}});問題是, supplyAsync()默認情況下使用ForkJoinPool.commonPool() ,所有CompletableFuture ,所有并行流以及部署在同一JVM上的所有應用程序之間共享的線程池(如果不幸的是,仍然使用具有許多已部署工件的應用程序服務器) 。 這個硬編碼的,不可配置的線程池完全在我們的控制范圍之外,難以監視和擴展。 因此,您應該始終指定自己的Executor ,例如此處(并查看我如何創建一個的一些技巧 ):
ExecutorService pool = Executors.newFixedThreadPool(10);final CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {//...}, pool);但這僅僅是開始……
回調和轉換
假設您要轉換給定的CompletableFuture ,例如,提取String的長度:
CompletableFuture<Integer> intFuture =future.thenApply(s -> s.length());究竟是誰調用s.length()代碼? 坦白地說,我親愛的開發人員,我們不給該死[1] 。 只要像thenApply這樣的所有運算符中的lambda表達式thenApply便宜,我們就不在乎誰調用它。 但是,如果此表達式花費一點CPU時間來完成或進行阻塞的網絡調用怎么辦?
首先,默認情況下會發生什么? 想想看:我們有一個String類型的后臺任務,我們想在該值完成后異步應用一些特定的轉換。 最簡單的實現方法是包裝原始任務(返回String ),并在完成任務時對其進行攔截。 內部任務完成后,我們的回調開始,應用轉換并返回修改后的值。 這就像介于我們的代碼和原始計算結果之間的一個方面。 話雖這么說,很明顯s.length()轉換將在與原始任務相同的線程中執行,是嗎? 不完全的!
CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {sleepSeconds(2);return "ABC";}, pool);future.thenApply(s -> {log.info("First transformation");return s.length(); });future.get(); pool.shutdownNow(); pool.awaitTermination(1, TimeUnit.MINUTES);future.thenApply(s -> {log.info("Second transformation");return s.length(); });當任務仍在運行時,將注冊thenApply()的第一個轉換。 因此,它將在任務完成后立即在與任務相同的線程中執行。 但是,在注冊第二個轉換之前,我們要等到任務實際完成為止。 更糟糕的是,我們完全關閉了線程池,以確保在那里沒有其他代碼可以執行。 那么哪個線程將運行第二次轉換? 我們知道它必須立即發生,因為future我們在已經完成的回調上進行注冊。 事實證明,默認情況下使用客戶端線程(!)! 輸出如下:
pool-1-thread-1 | First transformation main | Second transformation在注冊了第二個轉換后,它意識到CompletableFuture已經完成,因此它立即執行了轉換。 周圍沒有其他線程,因此在當前main線程的上下文中調用thenApply() 。 當實際的轉換成本很高時,就會出現這種行為容易出錯的最大原因。 想象一下thenApply() lambda表達式進行了一些繁重的計算或阻塞了網絡調用。 突然,我們的異步CompletableFuture阻止了調用線程!
控制回調的線程池
有兩種技術可以控制哪個線程執行我們的回調和轉換。 請注意,僅當您的轉換成本很高時才需要這些解決方案。 否則,差異可以忽略不計。 因此,首先我們可以選擇*Async版本的運算符,例如:
future.thenApplyAsync(s -> {log.info("Second transformation");return s.length(); });這次,第二個轉換自動卸載給我們的朋友ForkJoinPool.commonPool() :
pool-1-thread-1 | First transformation ForkJoinPool.commonPool-worker-1 | Second transformation但是我們不喜歡commonPool所以我們提供自己的:
future.thenApplyAsync(s -> {log.info("Second transformation");return s.length(); }, pool2);請注意,使用了不同的線程池( pool-1與pool-2 ):
pool-1-thread-1 | First transformation pool-2-thread-1 | Second transformation將回調視為另一個計算步驟
但是我相信,如果您在長時間運行的回調和轉換方面遇到麻煩(請記住,本文適用于CompletableFuture上的幾乎所有其他方法),則應該簡單地使用另一個顯式的CompletableFuture ,例如:
//Imagine this is slow and costly CompletableFuture<Integer> strLen(String s) {return CompletableFuture.supplyAsync(() -> s.length(),pool2); }//...CompletableFuture<Integer> intFuture = future.thenCompose(s -> strLen(s));這種方法更加明確。 知道我們的轉換成本很高,因此我們不冒險在任意或不受控制的線程上運行它。 相反,我們將其顯式建模為從String到CompletableFuture<Integer>異步操作。 但是,我們必須將thenApply()替換為thenCompose() ,否則最終將獲得CompletableFuture<CompletableFuture<Integer>> 。
但是,如果我們的轉換沒有一個與嵌套CompletableFuture applyToEither()的版本,例如, applyToEither()等待第一個Future完成并應用轉換,該怎么辦?
CompletableFuture<CompletableFuture<Integer>> poor = future1.applyToEither(future2, s -> strLen(s));有一個方便的技巧可以“解包”這種晦澀的數據結構,稱為flatten ,可以使用flatMap(identity) (或flatMap(x -> x) )輕松實現。 在我們的例子中, flatMap()稱為thenCompose ( duh! ):
CompletableFuture<Integer> good = poor.thenCompose(x -> x);我由您自己決定如何運作以及為什么運作。 我希望本文CompletableFuture您更清楚地了解如何在CompletableFuture中涉及線程。
翻譯自: https://www.javacodegeeks.com/2015/12/thread-executes-completablefutures-tasks-callbacks.html
總結
以上是生活随笔為你收集整理的哪个线程执行CompletableFuture的任务和回调?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: wechat电脑版登陆(wechat登录
- 下一篇: ipad平板电脑价格表(ipad平板电脑