Java 8 CompletableFuture
原文:Java 8 CompletableFutures Part I
- 作者:Bill Bejeck
- 譯者:noONE
譯者前言
JDK1.5就增加了Future接口,但是接口使用不是很能滿足異步開發(fā)的需求,使用起來不是那么友好。所以出現(xiàn)了很多第三方封裝的Future,Guava中就提供了一個(gè)更好的?ListenableFuture 類,Netty中則提供了一個(gè)自己的Future。所以,Java8中的CompletableFuture可以說是解決Future了一些痛點(diǎn),可以優(yōu)雅得進(jìn)行組合式異步編程,同時(shí)也更加契合函數(shù)式編程。
Java8已經(jīng)發(fā)布了很長一段時(shí)間,其中新增了一個(gè)很棒的并發(fā)控制工具,就是CompletableFuture類。CompletableFuture實(shí)現(xiàn)了Future接口,并且它可以顯式地設(shè)定值,更有意思的是我們可以進(jìn)行鏈?zhǔn)教幚?#xff0c;并且支持依賴行為,這些行為由CompletableFuture完成所觸發(fā)。CompletableFuture類似于Guava中的?ListenableFuture 類。它們兩個(gè)提供了類似的功能,本文不會(huì)再對它們進(jìn)行對比。我已經(jīng)在之前的文章中介紹過ListenableFutrue。雖然對于ListenableFutrue的介紹有點(diǎn)過時(shí),但是絕大數(shù)的知識仍然適用。CompletableFuture的文檔已經(jīng)非常全面了,但是缺少如何使用它們的具體示例 。本文意在通過單元測試中的一系列的簡單示例來展示如何使用CompletableFuture。最初我想在一篇文章中介紹完CompleteableFuture,但是信息太多了,分成三部分似乎更好一些:
CompletableFuture 入門
在開始使用CompletableFuture之前, 我們需要了解一些背景知識。CompletableFuture實(shí)現(xiàn)了?CompletionStage 接口。javadoc中簡明地介紹了CompletionStage :
一個(gè)可能的異步計(jì)算的階段,當(dāng)另外一個(gè)CompletionStage?完成時(shí),它會(huì)執(zhí)行一個(gè)操作或者計(jì)算一個(gè)值。一個(gè)階段的完成取決于它本身結(jié)算的結(jié)果,同時(shí)也可能反過來觸發(fā)其他依賴階段。
CompletionStage 的全部文檔的內(nèi)容很多,所以,我們在這里總結(jié)幾個(gè)關(guān)鍵點(diǎn):
計(jì)算可以由?Future?,Consumer?或者?Runnable 接口中的 apply,accept?或者?run等方法表示。
計(jì)算的執(zhí)行主要有以下
a. 默認(rèn)執(zhí)行(可能調(diào)用線程)
b. 使用默認(rèn)的CompletionStage的異步執(zhí)行提供者異步執(zhí)行。這些方法名使用someActionAsync這種格式表示。
c. 使用?Executor 提供者異步執(zhí)行。這些方法同樣也是someActionAsync這種格式,但是會(huì)增加一個(gè)Executor參數(shù)。
接下來,我會(huì)在本文中直接引用CompletableFuture?和?CompletionStage 。
創(chuàng)建一個(gè)CompleteableFuture
創(chuàng)建一個(gè)CompleteableFuture很簡單,但是不是很清晰。最簡單的方法就是使用CompleteableFuture.completedFuture方法,該方法返回一個(gè)新的且完結(jié)的CompleteableFuture:
public void test_completed_future() throws Exception {String expectedValue = "the expected value";CompletableFuture<String> alreadyCompleted = CompletableFuture.completedFuture(expectedValue);assertThat(alreadyCompleted.get(), is(expectedValue)); } 復(fù)制代碼這樣看起來有點(diǎn)乏味,稍后,我們就會(huì)看到如何創(chuàng)建一個(gè)已經(jīng)完成的CompleteableFuture 會(huì)派上用場。
現(xiàn)在,讓我們看一下如何創(chuàng)建一個(gè)表示異步任務(wù)的CompleteableFuture :
private static ExecutorService service = Executors.newCachedThreadPool(); public void test_run_async() throws Exception {CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println("running async task"), service);//utility testing methodpauseSeconds(1);assertThat(runAsync.isDone(), is(true)); } public void test_supply_async() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(simulatedTask(1, "Final Result"), service);assertThat(completableFuture.get(), is("Final Result")); } 復(fù)制代碼在第一個(gè)方法中,我們看到了runAsync任務(wù),在第二個(gè)方法中,則是supplyAsync的示例。這可能是顯而易見的,然而使用runAsync還是使用supplyAsync,這取決于任務(wù)是否有返回值。在這兩個(gè)例子中,我們都提供了一個(gè)自定義的Executor,它作為一個(gè)異步執(zhí)行提供者。當(dāng)使用supplyAsync方法時(shí),我個(gè)人認(rèn)為使用?Callable 而不是一個(gè)Supplier似乎更自然一些。因?yàn)樗鼈兌际呛瘮?shù)式接口,Callable與異步任務(wù)的關(guān)系更緊密一些,并且它還可以拋出受檢異常,而Supplier則不會(huì)(盡管我們可以通過少量的代碼讓Supplier拋出受檢異常)。
增加監(jiān)聽器
現(xiàn)在,我們可以創(chuàng)建CompleteableFuture 對象去運(yùn)行異步任務(wù),讓我們開始學(xué)習(xí)如何去“監(jiān)聽”任務(wù)的完成,并且執(zhí)行隨后的一些動(dòng)作。這里重點(diǎn)提一下,當(dāng)增加對?CompletionStage 對象的追隨時(shí),之前的任務(wù)需要徹底成功,后續(xù)的任務(wù)和階段才能運(yùn)行。本文會(huì)介紹介紹一些處理失敗任務(wù)的方法,而在CompleteableFuture中鏈?zhǔn)教幚礤e(cuò)誤的方案會(huì)在后續(xù)的文章中介紹。
public void test_then_run_async() throws Exception {Map<String,String> cache = new HashMap<>();cache.put("key","value");CompletableFuture<String> taskUsingCache = CompletableFuture.supplyAsync(simulatedTask(1,cache.get("key")),service);CompletableFuture<Void> cleanUp = taskUsingCache.thenRunAsync(cache::clear,service);cleanUp.get();String theValue = taskUsingCache.get();assertThat(cache.isEmpty(),is(true));assertThat(theValue,is("value")); } 復(fù)制代碼這個(gè)例子主要展示在第一個(gè)CompletableFuture成功結(jié)束后,運(yùn)行一個(gè)清理的任務(wù)。 在之前的例子中,當(dāng)最初的任務(wù)成功結(jié)束后,我們使用Runnable任務(wù)執(zhí)行。我們也可以定義一個(gè)后續(xù)任務(wù),它可以直接獲取之前任務(wù)的成功結(jié)果。
public void test_accept_result() throws Exception {CompletableFuture<String> task = CompletableFuture.supplyAsync(simulatedTask(1, "add when done"), service);CompletableFuture<Void> acceptingTask = task.thenAccept(results::add);pauseSeconds(2);assertThat(acceptingTask.isDone(), is(true));assertThat(results.size(), is(1));assertThat(results.contains("add when done"), is(true)); } 復(fù)制代碼這是一個(gè)使用Accept 方法的例子,該方法會(huì)獲取CompletableFuture的結(jié)果,然后將結(jié)果傳給一個(gè)?Consumer 對象。在Java 8中,?Consumer 實(shí)例是沒有返回值的 ,如果想得到運(yùn)行的副作用,需要把結(jié)果放到一個(gè)列表中。
組合與構(gòu)成任務(wù)
除了增加監(jiān)聽器去運(yùn)行后續(xù)任務(wù)或者接受CompletableFuture的成功結(jié)果,我們還可以組合或者構(gòu)成任務(wù)。
構(gòu)成任務(wù)
構(gòu)成意味著獲取一個(gè)成功的CompletableFuture結(jié)果作為輸入,通過 一個(gè)Function 返回另外一個(gè) CompletableFuture。下面是一個(gè)使用CompletableFuture.thenComposeAsync的例子:
public void test_then_compose() throws Exception {Function<Integer,Supplier<List<Integer>>> getFirstTenMultiples = num ->()->Stream.iterate(num, i -> i + num).limit(10).collect(Collectors.toList());Supplier<List<Integer>> multiplesSupplier = getFirstTenMultiples.apply(13);//Original CompletionStageCompletableFuture<List<Integer>> getMultiples = CompletableFuture.supplyAsync(multiplesSupplier, service);//Function that takes input from orignal CompletionStageFunction<List<Integer>, CompletableFuture<Integer>> sumNumbers = multiples ->CompletableFuture.supplyAsync(() -> multiples.stream().mapToInt(Integer::intValue).sum());//The final CompletableFuture composed of previous two.CompletableFuture<Integer> summedMultiples = getMultiples.thenComposeAsync(sumNumbers, service);assertThat(summedMultiples.get(), is(715)); } 復(fù)制代碼在這個(gè)列子中,第一個(gè)CompletionStage提供了一個(gè)列表,該列表包含10個(gè)數(shù)字,每個(gè)數(shù)字都乘以13。這個(gè)提供的Function獲取這些結(jié)果,并且創(chuàng)建另外一個(gè)CompletionStage,它將對列表中的數(shù)字求和。
組合任務(wù)
組合任務(wù)的完成是通過獲取兩個(gè)成功的CompletionStages,并且從中獲取BiFunction類型的參數(shù),進(jìn)而產(chǎn)出另外的結(jié)果。以下是一個(gè)非常簡單的例子用來說明從組合的CompletionStages中獲取結(jié)果。
public void test_then_combine_async() throws Exception {CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(simulatedTask(3, "combine all"), service);CompletableFuture<String> secondTask = CompletableFuture.supplyAsync(simulatedTask(2, "task results"), service);CompletableFuture<String> combined = firstTask.thenCombineAsync(secondTask, (f, s) -> f + " " + s, service);assertThat(combined.get(), is("combine all task results")); } 復(fù)制代碼這個(gè)例子展示了如何組合兩個(gè)異步任務(wù)的CompletionStage,然而,我們也可以組合已經(jīng)完成的CompletableFuture的異步任務(wù)。 組合一個(gè)已知的需要計(jì)算的值,也是一種很好的處理方式:
public void test_then_combine_with_one_supplied_value() throws Exception {CompletableFuture<String> asyncComputedValue = CompletableFuture.supplyAsync(simulatedTask(2, "calculated value"), service);CompletableFuture<String> knowValueToCombine = CompletableFuture.completedFuture("known value");BinaryOperator<String> calcResults = (f, s) -> "taking a " + f + " then adding a " + s;CompletableFuture<String> combined = asyncComputedValue.thenCombine(knowValueToCombine, calcResults);assertThat(combined.get(), is("taking a calculated value then adding a known value")); } 復(fù)制代碼最后,是一個(gè)使用CompletableFuture.runAfterbothAsync的例子
public void test_run_after_both() throws Exception {CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> {pauseSeconds(2);results.add("first task");}, service);CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> {pauseSeconds(3);results.add("second task");}, service);CompletableFuture<Void> finisher = run1.runAfterBothAsync(run2,() -> results. add(results.get(0)+ "&"+results.get(1)),service);pauseSeconds(4);assertThat(finisher.isDone(),is(true));assertThat(results.get(2),is("first task&second task")); } 復(fù)制代碼監(jiān)聽第一個(gè)結(jié)束的任務(wù)
在之前所有的例子中,所有的結(jié)果需要等待所有的CompletionStage結(jié)束,然而,需求并不總是這樣的。我們可能需要獲取第一個(gè)完成的任務(wù)的結(jié)果。下面的例子展示使用Consumer接受第一個(gè)完成的結(jié)果:
public void test_accept_either_async_nested_finishes_first() throws Exception {CompletableFuture<String> callingCompletable = CompletableFuture.supplyAsync(simulatedTask(2, "calling"), service);CompletableFuture<String> nestedCompletable = CompletableFuture.supplyAsync(simulatedTask(1, "nested"), service);CompletableFuture<Void> collector = callingCompletable.acceptEither(nestedCompletable, results::add);pauseSeconds(2);assertThat(collector.isDone(), is(true));assertThat(results.size(), is(1));assertThat(results.contains("nested"), is(true)); } 復(fù)制代碼類似功能的CompletableFuture.runAfterEither
public void test_run_after_either() throws Exception {CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> {pauseSeconds(2);results.add("should be first");}, service);CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> {pauseSeconds(3);results.add("should be second");}, service);CompletableFuture<Void> finisher = run1.runAfterEitherAsync(run2,() -> results.add(results.get(0).toUpperCase()),service);pauseSeconds(4);assertThat(finisher.isDone(),is(true));assertThat(results.get(1),is("SHOULD BE FIRST"));} 復(fù)制代碼多重組合
到目前為止,所有的組合/構(gòu)成的例子都只有兩個(gè)CompletableFuture對象。這里是有意為之,為了讓例子盡量的簡單明了。我們可以組合任意數(shù)量的CompletionStage。請注意,下面例子僅僅是為了說明而已!
public void test_several_stage_combinations() throws Exception {Function<String,CompletableFuture<String>> upperCaseFunction = s -> CompletableFuture.completedFuture(s.toUpperCase());CompletableFuture<String> stage1 = CompletableFuture.completedFuture("the quick ");CompletableFuture<String> stage2 = CompletableFuture.completedFuture("brown fox ");CompletableFuture<String> stage3 = stage1.thenCombine(stage2,(s1,s2) -> s1+s2);CompletableFuture<String> stage4 = stage3.thenCompose(upperCaseFunction);CompletableFuture<String> stage5 = CompletableFuture.supplyAsync(simulatedTask(2,"jumped over"));CompletableFuture<String> stage6 = stage4.thenCombineAsync(stage5,(s1,s2)-> s1+s2,service);CompletableFuture<String> stage6_sub_1_slow = CompletableFuture.supplyAsync(simulatedTask(4,"fell into"));CompletableFuture<String> stage7 = stage6.applyToEitherAsync(stage6_sub_1_slow,String::toUpperCase,service);CompletableFuture<String> stage8 = CompletableFuture.supplyAsync(simulatedTask(3," the lazy dog"),service);CompletableFuture<String> finalStage = stage7.thenCombineAsync(stage8,(s1,s2)-> s1+s2,service);assertThat(finalStage.get(),is("THE QUICK BROWN FOX JUMPED OVER the lazy dog")); } 復(fù)制代碼需要注意的是,組合CompletionStage的時(shí)候并不保證順序。在這些單元測試中,提供了一個(gè)時(shí)間去模擬任務(wù)以確保完成順序。
小結(jié)
本文主要是使用CompletableFuture類的第一部分。在后續(xù)文章中,將主要介紹錯(cuò)誤處理及恢復(fù),強(qiáng)制完成或取消。
資源
- CompletableFuture
- CompletionStage
- Source Code
關(guān)注我:
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的Java 8 CompletableFuture的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 35. 通过实现一个序列加密的功能,熟悉
- 下一篇: nginx学习九 upstream 负载