通过实例理解 JDK8 的 CompletableFuture
轉載自?通過實例理解 JDK8 的 CompletableFuture
?
前言
Java 5 并發庫主要關注于異步任務的處理,它采用了這樣一種模式,producer 線程創建任務并且利用阻塞隊列將其傳遞給任務的 consumer。這種模型在 Java 7 和 8 中進一步發展,并且開始支持另外一種風格的任務執行,那就是將任務的數據集分解為子集,每個子集都可以由獨立且同質的子任務來負責處理。
這種風格的基礎庫也就是 fork/join 框架,它允許程序員規定數據集該如何進行分割,并且支持將子任務提交到默認的標準線程池中,也就是"通用的"ForkJoinPool。Java 8 中,fork/join 并行功能借助并行流的機制變得更加具有可用性。但是,不是所有的問題都適合這種風格的并行處理:所處理的元素必須是獨立的,數據集要足夠大,并且在并行加速方面,每個元素的處理成本要足夠高,這樣才能補償建立 fork/join 框架所消耗的成本。CompletableFuture 類則是 Java 8 在并行流方面的創新。
準備知識
異步計算
所謂異步調用其實就是實現一個可無需等待被調用函數的返回值而讓操作繼續運行的方法。在 Java 語言中,簡單的講就是另啟一個線程來完成調用中的部分計算,使調用繼續運行或返回,而不需要等待計算結果。但調用者仍需要取線程的計算結果。
回調函數
回調函數比較通用的解釋是,它是一個通過函數指針調用的函數。如果你把函數的指針(地址)作為參數傳遞給另一個函數,當這個指針被用為調用它所指向的函數時,我們就說這是回調函數。回調函數不是由該函數的實現方直接調用,而是在特定的事件或條件發生時由另外一方調用的,用于對該事件或條件進行響應。
回調函數的機制:
(1)定義一個回調函數;
(2)提供函數實現的一方在初始化時候,將回調函數的函數指針注冊給調用者;
(3)當特定的事件或條件發生的時候,調用者使用函數指針調用回調函數對事件進行處理。
回調函數通常與原始調用者處于同一層次,如圖 1 所示:
圖 1 回調函數示例圖
Future 接口介紹
JDK5 新增了 Future 接口,用于描述一個異步計算的結果。雖然 Future 以及相關使用方法提供了異步執行任務的能力,但是對于結果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務的結果。阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的 CPU 資源,而且也不能及時地得到計算結果,為什么不能用觀察者設計模式呢?即當計算結果完成及時通知監聽者。
有一些開源框架實現了我們的設想,例如 Netty 的 ChannelFuture 類擴展了 Future 接口,通過提供 addListener 方法實現支持回調方式的異步編程。Netty 中所有的 I/O 操作都是異步的,這意味著任何的 I/O 調用都將立即返回,而不保證這些被請求的 I/O 操作在調用結束的時候已經完成。取而代之地,你會得到一個返回的 ChannelFuture 實例,這個實例將給你一些關于 I/O 操作結果或者狀態的信息。當一個 I/O 操作開始的時候,一個新的 Future 對象就會被創建。在開始的時候,新的 Future 是未完成的狀態--它既非成功、失敗,也非被取消,因為 I/O 操作還沒有結束。如果 I/O 操作以成功、失敗或者被取消中的任何一種狀態結束了,那么這個 Future 將會被標記為已完成,并包含更多詳細的信息(例如:失敗的原因)。請注意,即使是失敗和被取消的狀態,也是屬于已完成的狀態。阻塞方式的示例代碼如清單 1 所示。
清單 1 阻塞方式示例代碼
| 1 2 3 4 5 6 | // Start the connection attempt. ChannelFuture Future = bootstrap.connect(new InetSocketAddress(host, port)); // Wait until the connection is closed or the connection attempt fails. Future.getChannel().getCloseFuture().awaitUninterruptibly(); // Shut down thread pools to exit. bootstrap.releaseExternalResources(); |
上面代碼使用的是 awaitUninterruptibly 方法,源代碼如清單 2 所示。
清單 2 awaitUninterruptibly 源代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | publicChannelFutureawaitUninterruptibly() { ????boolean interrupted = false; ????synchronized (this) { ????????//循環等待到完成 ????????while (!done) { ????????????checkDeadLock(); ????????????waiters++; ????????try { ????????????wait(); ????????} catch (InterruptedException e) { ????????????//不允許中斷 ????????????interrupted = true; ????????} finally { ????????????waiters--; ????????} ????} } ????if (interrupted) { ????Thread.currentThread().interrupt(); } return this; } |
清單 3 異步非阻塞方式示例代碼
| 1 2 3 4 5 6 7 8 9 10 | // Start the connection attempt. ChannelFuture Future = bootstrap.connect(new InetSocketAddress(host, port)); Future.addListener(new ChannelFutureListener(){ ????public void operationComplete(final ChannelFuture Future) ????????throws Exception ????????{?????????? ????} }); // Shut down thread pools to exit. bootstrap.releaseExternalResources(); |
可以明顯的看出,在異步模式下,上面這段代碼沒有阻塞,在執行 connect 操作后直接執行到 printTime("異步時間: "),隨后 connect 完成,Future 的監聽函數輸出 connect 操作完成。
非阻塞則是添加監聽類 ChannelFutureListener,通過覆蓋 ChannelFutureListener 的 operationComplete 執行業務邏輯。
清單 4 異步非阻塞方式示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | public void addListener(final ChannelFutureListener listener) { ????if (listener == null) { ????throw new NullPointerException("listener"); } ????booleannotifyNow = false; ????synchronized (this) { ????????if (done) { ????????notifyNow = true; ????} else { ????????if (firstListener == null) { ????????//listener 鏈表頭 ????????firstListener = listener; ????} else { ????????if (otherListeners == null) { ????????otherListeners = new ArrayList<ChannelFutureListener>(1); ????????} ????????//添加到 listener 鏈表中,以便操作完成后遍歷操作 ????????otherListeners.add(listener); ????} ????...... ????if (notifyNow) { ????????//通知 listener 進行處理 ????????notifyListener(listener); ????????} } |
這部分代碼的邏輯很簡單,就是注冊回調函數,當操作完成后自動調用回調函數,就達到了異步的效果。
CompletableFuture 類介紹?
Java 8 中, 新增加了一個包含 50 個方法左右的類--CompletableFuture,它提供了非常強大的 Future 的擴展功能,可以幫助我們簡化異步編程的復雜性,并且提供了函數式編程的能力,可以通過回調的方式處理計算結果,也提供了轉換和組合 CompletableFuture 的方法。
對于阻塞或者輪詢方式,依然可以通過 CompletableFuture 類的 CompletionStage 和 Future 接口方式支持。
CompletableFuture 類聲明了 CompletionStage 接口,CompletionStage 接口實際上提供了同步或異步運行計算的舞臺,所以我們可以通過實現多個 CompletionStage 命令,并且將這些命令串聯在一起的方式實現多個命令之間的觸發。
我們可以通過 CompletableFuture.supplyAsync(this::sendMsg); 這么一行代碼創建一個簡單的異步計算。在這行代碼中,supplyAsync 支持異步地執行我們指定的方法,這個例子中的異步執行方法是 sendMsg。當然,我們也可以使用 Executor 執行異步程序,默認是 ForkJoinPool.commonPool()。
我們也可以在異步計算結束之后指定回調函數,例如 CompletableFuture.supplyAsync(this::sendMsg) .thenAccept(this::notify);這行代碼中的 thenAccept 被用于增加回調函數,在我們的示例中 notify 就成了異步計算的消費者,它會處理計算結果。
CompletableFuture 類使用示例
接下來我們通過 20 個示例看看 CompletableFuture 類具體怎么用。
創建完整的 CompletableFuture
清單 5 示例代碼
| 1 2 3 4 5 | static void completedFutureExample() { ????CompletableFuture<String>cf = CompletableFuture.completedFuture("message"); ????assertTrue(cf.isDone()); ????assertEquals("message", cf.getNow(null)); } |
以上代碼一般來說被用于啟動異步計算,getNow(null)返回計算結果或者 null。
運行簡單的異步場景
清單 6 示例代碼
| 1 2 3 4 5 6 7 8 9 | static void runAsyncExample() { ????CompletableFuture<Void>cf = CompletableFuture.runAsync(() -> { ????assertTrue(Thread.currentThread().isDaemon()); ????randomSleep(); }); ????assertFalse(cf.isDone()); ????sleepEnough(); ????assertTrue(cf.isDone()); } |
以上代碼的關鍵點有兩點:
同步執行動作示例
清單 7 示例代碼
| 1 2 3 4 5 6 7 | static void thenApplyExample() { ????CompletableFuture<String>cf = CompletableFuture.completedFuture("message").thenApply(s -> { ????assertFalse(Thread.currentThread().isDaemon()); ????returns.toUpperCase(); ????}); ????assertEquals("MESSAGE", cf.getNow(null)); } |
以上代碼在異步計算正常完成的前提下將執行動作(此處為轉換成大寫字母)。
異步執行動作示例?
相較前一個示例的同步方式,以下代碼實現了異步方式,僅僅是在上面的代碼里的多個方法增加"Async"這樣的關鍵字。
清單 8 示例代碼
| 1 2 3 4 5 6 7 8 9 | static void thenApplyAsyncExample() { ????CompletableFuture<String>cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { ????assertTrue(Thread.currentThread().isDaemon()); ????randomSleep(); ????returns.toUpperCase(); ????}); ????assertNull(cf.getNow(null)); ????assertEquals("MESSAGE", cf.join()); } |
使用固定的線程池完成異步執行動作示例?
我們可以通過使用線程池方式來管理異步動作申請,以下代碼基于固定的線程池,也是做一個大寫字母轉換動作,代碼如清單 9 所示。
清單 9 示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | staticExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() { ????int count = 1; ????@Override ????public Thread newThread(Runnable runnable) { ????????return new Thread(runnable, "custom-executor-" + count++); ????} ????}); ????????static void thenApplyAsyncWithExecutorExample() { ????????????CompletableFuture<String>cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { ????????????assertTrue(Thread.currentThread().getName().startsWith("custom-executor-")); ????????????assertFalse(Thread.currentThread().isDaemon()); ????????????randomSleep(); ????????????returns.toUpperCase(); ????????}, executor); ????????assertNull(cf.getNow(null)); ????????assertEquals("MESSAGE", cf.join()); } |
作為消費者消費計算結果示例?
假設我們本次計算只需要前一次的計算結果,而不需要返回本次計算結果,那就有點類似于生產者(前一次計算)-消費者(本次計算)模式了,示例代碼如清單 10 所示。
清單 10 示例代碼
| 1 2 3 4 5 6 | static void thenAcceptExample() { ????StringBuilder result = new StringBuilder(); ????CompletableFuture.completedFuture("thenAccept message") ????.thenAccept(s ->result.append(s)); ????assertTrue("Result was empty", result.length() > 0); } |
消費者是同步執行的,所以不需要在 CompletableFuture 里對結果進行合并。
異步消費示例?
相較于前一個示例的同步方式,我們也對應有異步方式,代碼如清單 11 所示。
清單 11 示例代碼
| 1 2 3 4 5 6 7 | static void thenAcceptAsyncExample() { ????StringBuilder result = new StringBuilder(); ????CompletableFuture<Void>cf = CompletableFuture.completedFuture("thenAcceptAsync message") ????.thenAcceptAsync(s ->result.append(s)); ????cf.join(); ????assertTrue("Result was empty", result.length() > 0); } |
計算過程中的異常示例?
接下來介紹異步操作過程中的異常情況處理。下面這個示例中我們會在字符轉換異步請求中刻意延遲 1 秒鐘,然后才會提交到 ForkJoinPool 里面去執行。
清單 12 示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | static void completeExceptionallyExample() { ????????CompletableFuture<String>cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, ????????CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); ????????CompletableFuture<String>exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; }); ????????cf.completeExceptionally(new RuntimeException("completed exceptionally")); ????????assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); ????try { ????????cf.join(); ????????fail("Should have thrown an exception"); ????????} catch(CompletionException ex) { // just for testing ????????????assertEquals("completed exceptionally", ex.getCause().getMessage()); ????} ?????assertEquals("message upon cancel", exceptionHandler.join()); } |
示例代碼中,首先我們創建一個 CompletableFuture(計算完畢),然后調用 thenApplyAsync 返回一個新的 CompletableFuture,接著通過使用 delayedExecutor(timeout, timeUnit)方法延遲 1 秒鐘執行。然后我們創建一個 handler(exceptionHandler),它會處理異常,返回另一個字符串"message upon cancel"。接下來進入 join()方法,執行大寫轉換操作,并且拋出 CompletionException 異常。
取消計算任務
與前面一個異常處理的示例類似,我們可以通過調用 cancel(boolean mayInterruptIfRunning)方法取消計算任務。此外,cancel()方法與 completeExceptionally(new CancellationException())等價。
清單 13 示例代碼
| 1 2 3 4 5 6 7 8 | static void cancelExample() { ????CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, ????CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); ????CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message"); ????assertTrue("Was not canceled", cf.cancel(true)); ????assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); ????assertEquals("canceled message", cf2.join()); } |
一個 CompletableFuture VS 兩個異步計算
我們可以創建一個 CompletableFuture 接收兩個異步計算的結果,下面代碼首先創建了一個 String 對象,接下來分別創建了兩個 CompletableFuture 對象 cf1 和 cf2,cf2 通過調用 applyToEither 方法實現我們的需求。
清單 14 示例代碼
| 1 2 3 4 5 6 7 8 9 | static void applyToEitherExample() { ????String original = "Message"; ????CompletableFuture cf1 = CompletableFuture.completedFuture(original) ????.thenApplyAsync(s -> delayedUpperCase(s)); ????CompletableFuture cf2 = cf1.applyToEither( ????CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), ????s -> s + " from applyToEither"); ????assertTrue(cf2.join().endsWith(" from applyToEither")); } |
如果我們想要使用消費者替換清單 14 的方法方式用于處理異步計算結果,代碼如清單 15 所示。
清單 15 示例代碼
| 1 2 3 4 5 6 7 8 9 10 | static void acceptEitherExample() { ????String original = "Message"; ????StringBuilder result = new StringBuilder(); ????CompletableFuture cf = CompletableFuture.completedFuture(original) ????.thenApplyAsync(s -> delayedUpperCase(s)) ????.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), ????s -> result.append(s).append("acceptEither")); ????cf.join(); ????assertTrue("Result was empty", result.toString().endsWith("acceptEither")); } |
運行兩個階段后執行
下面這個示例程序兩個階段執行完畢后返回結果,首先將字符轉為大寫,然后將字符轉為小寫,在兩個計算階段都結束之后觸發 CompletableFuture。
清單 16 示例代碼
| 1 2 3 4 5 6 7 8 | static void runAfterBothExample() { ????String original = "Message"; ????StringBuilder result = new StringBuilder(); ????CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth( ????CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), ????() -> result.append("done")); ????assertTrue("Result was empty", result.length() > 0); } |
也可以通過以下方式處理異步計算結果,
清單 17 示例代碼
| 1 2 3 4 5 6 7 8 | static void thenAcceptBothExample() { ????String original = "Message"; ????StringBuilder result = new StringBuilder(); ????CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth( ????CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), ????(s1, s2) -> result.append(s1 + s2)); ????assertEquals("MESSAGEmessage", result.toString()); } |
整合兩個計算結果
我們可以通過 thenCombine()方法整合兩個異步計算的結果,注意,以下代碼的整個程序過程是同步的,getNow()方法最終會輸出整合后的結果,也就是說大寫字符和小寫字符的串聯值。
清單 18 示例代碼
| 1 2 3 4 5 6 7 | static void thenCombineExample() { ????String original = "Message"; ????CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) ????.thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)), ????(s1, s2) -> s1 + s2); ????assertEquals("MESSAGEmessage", cf.getNow(null)); } |
上面這個示例是按照同步方式執行兩個方法后再合成字符串,以下代碼采用異步方式同步執行兩個方法,由于異步方式情況下不能夠確定哪一個方法最終執行完畢,所以我們需要調用 join()方法等待后一個方法結束后再合成字符串,這一點和線程的 join()方法是一致的,主線程生成并起動了子線程,如果子線程里要進行大量的耗時的運算,主線程往往將于子線程之前結束,但是如果主線程處理完其他的事務后,需要用到子線程的處理結果,也就是主線程需要等待子線程執行完成之后再結束,這個時候就要用到 join()方法了,即 join()的作用是:"等待該線程終止"。
清單 19 示例代碼
| 1 2 3 4 5 6 7 8 | static void thenCombineAsyncExample() { ????String original = "Message"; ????CompletableFuture cf = CompletableFuture.completedFuture(original) ????.thenApplyAsync(s -> delayedUpperCase(s)) ????.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), ????assertEquals("MESSAGEmessage", cf.join()); ????(s1, s2) -> s1 + s2); } |
除了 thenCombine()方法以外,還有另外一種方法-thenCompose(),這個方法也會實現兩個方法執行后的返回結果的連接。
清單 20 示例代碼
| 1 2 3 4 5 6 7 | static void thenComposeExample() { ????String original = "Message"; ????CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) ????.thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)) ????.thenApply(s -> upper + s)); ????assertEquals("MESSAGEmessage", cf.join()); } |
anyOf()方法
以下代碼模擬了如何在幾個計算過程中任意一個完成后創建 CompletableFuture,在這個例子中,我們創建了幾個計算過程,然后轉換字符串到大寫字符。由于這些 CompletableFuture 是同步執行的(下面這個例子使用的是 thenApply()方法,而不是 thenApplyAsync()方法),使用 anyOf()方法后返回的任何一個值都會立即觸發 CompletableFuture。然后我們使用 whenComplete(BiConsumer<? super Object, ? super Throwable> action)方法處理結果。
清單 21 示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | static void anyOfExample() { ????StringBuilder result = new StringBuilder(); ????List messages = Arrays.asList("a", "b", "c"); ????List<CompletableFuture> futures = messages.stream() ????.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) ????.collect(Collectors.toList()); ????CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> { ????????if(th == null) { ????????assertTrue(isUpperCase((String) res)); ????????result.append(res); ????} }); ????assertTrue("Result was empty", result.length() > 0); } |
當所有的 CompletableFuture 完成后創建 CompletableFuture
清單 22 所示我們會以同步方式執行多個異步計算過程,在所有計算過程都完成后,創建一個 CompletableFuture。
清單 22 示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 | static void allOfExample() { ????StringBuilder result = new StringBuilder(); ????List messages = Arrays.asList("a", "b", "c"); ????List<CompletableFuture> futures = messages.stream() ????.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) ????.collect(Collectors.toList()); ????CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> { ????????futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); ????????result.append("done"); }); ????assertTrue("Result was empty", result.length() > 0); } |
相較于前一個同步示例,我們也可以異步執行,如清單 23 所示。
清單 23 示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | static void allOfAsyncExample() { ????StringBuilder result = new StringBuilder(); ????List messages = Arrays.asList("a", "b", "c"); ????List<CompletableFuture> futures = messages.stream() ????.map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) ????.collect(Collectors.toList()); ????CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v, th) -> { ????futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); ????result.append("done"); }); ????allOf.join(); ????assertTrue("Result was empty", result.length() > 0); } |
實際案例
以下代碼完成的操作包括:
清單 24 示例代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | cars().thenCompose(cars -> { ????List<CompletionStage> updatedCars = cars.stream() ????.map(car -> rating(car.manufacturerId).thenApply(r -> { ????car.setRating(r); ????return car; ?????})).collect(Collectors.toList()); ????CompletableFuture done = CompletableFuture ????.allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()])); ????return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture) ????.map(CompletableFuture::join).collect(Collectors.toList())); ????}).whenComplete((cars, th) -> { ????if (th == null) { ????cars.forEach(System.out::println); ????} else { ????throw new RuntimeException(th); ????} }).toCompletableFuture().join(); |
結束語
Completable 類為我們提供了豐富的異步計算調用方式,我們可以通過上述基本操作描述及 20 個示例程序進一步了解如果使用 CompletableFuture 類實現我們的需求,期待 JDK10 會有持續更新。
參考資源
參考 developerWorks 上的 Java 8 文章,了解更多 Java 8 知識。
參考書籍?Java 8 in Action?Raoul-Gabriel Urma
參考書籍?Mastering Lambdas: Java Programming in a Multicore World?Maurice Naftalin
參考文章?Java 8 CompletableFutures,這篇文章從基礎介紹了 CompletableFuture 類的使用方式。
?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的通过实例理解 JDK8 的 CompletableFuture的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何配置帧中继?
- 下一篇: 获取Spring的Application