Java8新特性--CompletableFuture
并發(fā)與并行
Java 5并發(fā)庫主要關注于異步任務的處理,它采用了這樣一種模式,producer線程創(chuàng)建任務并且利用阻塞隊列將其傳遞給任務的consumer。這種模型在Java 7和8中進一步發(fā)展,并且開始支持另外一種風格的任務執(zhí)行,那就是將任務的數(shù)據(jù)集分解為子集,每個子集都可以由獨立且同質的子任務來負責處理。
這種風格的基礎庫也就是fork/join框架,它允許程序員規(guī)定數(shù)據(jù)集該如何進行分割,并且支持將子任務提交到默認的標準線程池中,也就是“通用的”ForkJoinPool。(在本文中,非全限定的類和接口名指的都是java.util.concurrent包中的類型。)在Java 8中,fork/join并行功能借助并行流的機制變得更加具有可用性。但是,不是所有的問題都適合這種風格的并行處理:所處理的元素必須是獨立的,數(shù)據(jù)集要足夠大,并且在并行加速方面,每個元素的處理成本要足夠高,這樣才能補償建立fork/join框架所消耗的成本。
同時,Java 8在并行流方面的革新得到了廣泛的關注,這導致大家忽略了并發(fā)庫中一項新增的重要功能,那就是CompletableFuture<T>類。本文將會探討CompletableFuture類,有一些系統(tǒng)會依賴于不同類型的異步執(zhí)行任務,本文將會闡述該類為什么會對這種類型的系統(tǒng)如此重要,并介紹了它是如何補充fork/join風格的并行機制和并行流的。
頁面渲染器
我們的起始點將是“Java Concurrency in Practice”(JCiP)一書中的樣例,這個樣例非常經(jīng)典地闡述了Java 5中的并發(fā)工具類。在JCiP的第6.3節(jié)中,Brian Goetz探討了如何開發(fā)一個Web頁面的渲染器,對于每個頁面來說,它的任務就是渲染文本,并下載和渲染圖片。圖片的下載會耗費較長的時間,在這段時間內CPU無事可做,只能等待。所以,在渲染頁面時,一個很明顯的策略就是首先初始化所有圖片的下載,然后利用它們完成之前的這段時間渲染頁面文本,最后渲染下載的圖片。
相關廠商內容
用Kafka Streams搭建實時的廣告消費系統(tǒng)
OpenResty十年開源的歷程和思考
阿里巴巴Blink流計算平臺介紹與實踐
Apache Kafka的過去,現(xiàn)在,和未來
阿里媽媽國際廣告算法大賽,等的就是你!
相關贊助商
在JCiP中,第一版本的頁面渲染器使用了Future的理念,這個接口暴露了一些方法,允許客戶端監(jiān)控任務的執(zhí)行進度,這里的任務是在一個不同的進程中執(zhí)行的。在程序清單1中,Callable代表了下載頁面中所有圖片的任務,它被提交到了Executor中,然后返回一個Future對象,通過它就能詢問下載任務的狀態(tài)。當主線程渲染完頁面的文本后,會調用Future.get方法,這個方法會一直阻塞直到所有下載的結果均可用為止,在本例中這個結果是以List<ImageData>的形式來表示的。這種方式一個很明顯的缺點在于下載任務的粗粒度性,在所有的圖片下載完成之前,我們一張圖片也不能渲染。接下來,我們看一下如何緩解這個問題。
public void renderPage(CharSequence source) { List<ImageInfo> info = scanForImageInfo(source); //創(chuàng)建Callable,它代表了下載所有的圖片 final Callable<List<ImageData>> task = () ->info.stream().map(ImageInfo::downloadImage).collect(Collectors.toList());// 將下載任務提交到executorFuture<List<ImageData>> images = executor.submit(task);// renderText(source); try {// 獲得所有下載的圖片(在所有圖片可用之前會一直阻塞)final List<ImageData> imageDatas = images.get();// 渲染圖片imageDatas.forEach(this::renderImage); } catch (InterruptedException e) {// 重新維護線程的中斷狀態(tài)Thread.currentThread().interrupt();// 我們不需要結果,所以取消任務images.cancel(true); } catch (ExecutionException e) {throw launderThrowable(e.getCause()); } }程序清單1:使用Future等待所有的圖片下載完成
為了讓這個樣例及其后面的變種易于管理,這里有一個前提條件:我們假設類型ImageInfo(簡單來講,就是一個URL)和ImageData(圖片的二進制數(shù)據(jù))以及方法scanForImageInfo、downloadImage、renderText、renderImage、launderThrowable和ImageInfo.downloadImage都已經(jīng)存在了。實例變量executor是通過ExecutorService類型聲明的并進行了恰當?shù)某跏蓟T诒疚闹?#xff0c;我將JCiP中最初的樣例利用Java 8 lambda表達式和流進行了現(xiàn)代化。
在這段代碼中,必須要等待所有下載都完成的原因在于,它使用Future接口來代表下載任務,作為異步執(zhí)行的任務模型,它有很大的局限性。Future允許客戶端詢問任務執(zhí)行的結果,如果必要的話,將會產(chǎn)生阻塞,另外還可以詢問任務的狀態(tài),判斷它已經(jīng)完成還是被取消了。但是,Future本身并不能提供回調方法,假設能夠這樣做的話,當每個圖片下載完成的時候,就能通知頁面的渲染線程了。
程序清單2改善了之前樣例中粒度較粗的問題,它將頁面下載的任務提交到了CompletionService類中,這個類的poll和take方法會產(chǎn)生對應的Future實例,這些實例是按照任務完成的順序排列的,而不是像程序清單1那樣任務是按照提交的順序處理的。在ExecutorCompletionService接口的平臺實現(xiàn)中,為了實現(xiàn)該功能,每項任務都會包裝在一個FutureTask中,FutureTask是Future的一個實現(xiàn),它允許提供完成時的回調。Future的回調行為是在ExecutorCompletionService中創(chuàng)建的,完成的任務會封裝到一個隊列中,供客戶端詢問時使用。
public void renderPage(CharSequence source) { List<ImageInfo> info = scanForImageInfo(source); CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor); // 將每個下載任務提交到completion service中info.forEach(imageInfo -> completionService.submit(imageInfo::downloadImage)); renderText(source); // 當每個RunnableFuture可用時(并且我們也準備處理它的時候),// 將它們檢索出來 for (int t = 0; t < info.size(); t++) { Future<ImageData> imageFuture = completionService.take(); renderImage(imageFuture.get()); } }程序清單2:借助CompletionService,當圖片可用時立即將其渲染出來(為了保持簡潔性,省略掉了中斷和錯誤處理的代碼)
CompletableFuture簡介
程序清單2代表了Java 5所能達到的水準,不過2014年之后,在Java中,編寫異步系統(tǒng)的表現(xiàn)性得到了巨大的提升,這是通過引入CompletableFuture (CF)類實現(xiàn)的。這個類是Future的實現(xiàn),它能夠將回調放到與任務不同的線程中執(zhí)行,也能將回調作為繼續(xù)執(zhí)行的同步函數(shù),在與任務相同的線程中執(zhí)行。它避免了傳統(tǒng)回調最大的問題,那就是能夠將控制流分離到不同的事件處理器中,而這是通過允許CF實例與回調方法進行組合形成新的CF來實現(xiàn)的。
作為樣例,可以參考thenAccept方法,它接受一個?Consumer(用戶提供的且沒有返回值的函數(shù))并返回一個新的CF。這個新CF所能達到的效果就是在最初CF完成時所得到的結果上,運用Consumer。與很多其他的CF方法類似,thenAccept有兩個變種形式,在第一個中,Consumer會由通用fork/join池中的某一個線程來執(zhí)行;在第二個中,它會由Executor中的某一個線程來負責執(zhí)行,而Executor是我們在調用時提供的。這形成了三種重載形式:同步運行、在ForkJoinPool中異步運行以及在調用時所提供的線程池中異步運行,CompletableFuture中有近60個方法,上述的這三種重載形式占了絕大多數(shù)。
如下是thenAccept的一個樣例,借助它重新實現(xiàn)了頁面渲染器的功能:
public void renderPage(CharSequence source) { List<ImageInfo> info = scanForImageInfo(source); info.forEach(imageInfo -> CompletableFuture .supplyAsync(imageInfo::downloadImage) .thenAccept(this::renderImage)); renderText(source); }程序清單3:使用CompletableFuture來實現(xiàn)頁面渲染功能
盡管程序清單3比前面的形式更加簡潔,但是我們需要練習一下才能更好地閱讀它。工廠方法supplyAsync返回一個新的CF,它會在通用的?ForkJoinPool中運行指定的Supplier,完成時,Supplier的結果將會作為CF的結果。方法thenAccept會返回一個新的CF,它將會執(zhí)行指定的Consumer,在本例中也就是渲染給定的圖片,即supplyAsync方法所產(chǎn)生的CF的結果。
需要澄清的是,thenAccept并不是將CF與函數(shù)組合起來的唯一方式。將CF與函數(shù)組合起來可以接受如下的參數(shù):
- 應用于CF操作結果的函數(shù)。此時,可以采用的方法包括: - thenCompose:針對返回值為CompletableFuture的函數(shù);
- thenApply:針對返回值為其他類型的函數(shù);
- thenAccept:針對返回值為void的函數(shù);
 
- Runnable。通過thenRun方法,可以接受Runnable參數(shù);
- 函數(shù)在處理的過程中,可能正常結束也可能異常退出。CF能夠通過方法來分別組合這兩種情況: - handle,針對接受一個值和一個Throwable,并有返回值的函數(shù);
- whenComplete,針對接受一個值和一個Throwable,并返回void的函數(shù)。
 
擴展頁面渲染器
擴展該樣例能夠闡述CompletableFuture的其他特性。比如,當圖片下載超時或失敗時,我們想使用一個圖標作為可見的指示器。CF暴露了一個名為get(long, TimeUnit)的方法,如果?CF在指定的時間內沒有完成的話,將會拋出TimeoutException異常。我們可以使用它來定義一個函數(shù),這個函數(shù)會將?ImageInfo轉換為ImageData?(程序清單4)。
Function<ImageInfo, ImageData> infoToData = imageInfo -> { CompletableFuture<ImageData> imageDataFuture = CompletableFuture.supplyAsync(imageInfo::downloadImage, executor); try { return imageDataFuture.get(5, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); imageDataFuture.cancel(true); return ImageData.createIcon(e); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } catch (TimeoutException e) { return ImageData.createIcon(e); } }程序清單4:使用CompletableFuture.get來實現(xiàn)超時
現(xiàn)在,頁面可以通過連續(xù)調用infoToData來進行渲染。其中每個調用都會同步返回一個下載的圖片,所以要并行下載的話,需要為它們各自創(chuàng)建一個新的異步任務。要實現(xiàn)這一功能,合適的工廠方法是CompletableFuture.runAsync(),它與supplyAsync類似,但是接受的參數(shù)是Runnable而不是Supplier:
public void renderPage(CharSequence source) throws InterruptedException { List<ImageInfo> info = scanForImageInfo(source); info.forEach(imageInfo -> CompletableFuture.runAsync(() -> renderImage(infoToData.apply(imageInfo)), executor)); }現(xiàn)在,我們考慮進一步的需求,當所有的請求完成或超時后,在頁面上顯示一個指示器,如果對應的所有CompletableFuture都從join方法中返回,就能表示出現(xiàn)了這種場景。靜態(tài)方法allOf就是為這種需求而提供的,它能夠創(chuàng)建一個返回值為空的CompletableFuture,當其所有的組件均完成時,它也會達到完成狀態(tài)。(join方法通常用來返回某個CF的結果,為了查看allOf方法所組合起來的所有CF的結果,必須要對其進行單獨地查詢。)
public void renderPage(CharSequence source) { List<ImageInfo> info = scanForImageInfo(source); CompletableFuture[] cfs = info.stream() .map(ii -> CompletableFuture.runAsync( () -> renderImage(mapper.apply(ii)), executor)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(cfs).join(); renderImage(ImageData.createDoneIcon()); }聯(lián)合多個CompletableFuture
另外一組方法允許將多個CF聯(lián)合在一起。我們已經(jīng)看見過靜態(tài)方法allOf,當其所有的組件均完成時,它就會處于完成狀態(tài),與之對應的方法也就是anyOf,返回值同樣是void,當其任意一個組件完成時,它就會完成。除了這兩個方法以外,這個組中其他的方法都是實例方法,它們能夠將receiver按照某種方式與另外一個CF聯(lián)合在一起,然后將結果傳遞到給定的函數(shù)中。
為了展現(xiàn)它們是如何運行的,我們擴展一下JCiP中的另一個例子,這是一個旅行預訂的門戶,我們將互相關聯(lián)的訂購過程記錄在TripPlan對象中,它包含了總價以及所使用服務供應商的列表:
interface TripPlan { List<ServiceSupplier> getSuppliers(); int getPrice(); TripPlan combine(TripPlan); }ServiceSupplier(比如說某個航線或酒店)能夠創(chuàng)建一個TripPlan:(當然,在現(xiàn)實中,ServiceSupplier.createPlan?將會接受參數(shù),來反映對應的目的地、旅行等級等信息。)
interface ServiceSupplier { TripPlan createPlan(); String getAlliance(); // 稍后使用 }為了選擇最佳的旅行計劃,需要查詢每個服務供應商為我們的旅行所給定的規(guī)劃,然后使用Comparator來對比每個規(guī)劃結果,這個Comparator反映了我們的選擇標準(在本例中,只是簡單的選擇價格最低者):
TripPlan selectBestTripPlan(List<ServiceSupplier> serviceList) { List<CompletableFuture<TripPlan>> tripPlanFutures = serviceList.stream() .map(svc -> CompletableFuture.supplyAsync(svc::createPlan, executor)) .collect(toList()); return tripPlanFutures.stream() .min(Comparator.comparing(cf -> cf.join().getPrice())) .get().join(); }請注意中間的collect操作,在流處理里面,由于中間操作的延遲性(laziness of intermediate operation),它就變得非常必要了。如果沒有它的話,流的終止操作(terminal operation)將會是min,它如果要執(zhí)行的話,首先需要針對tripPlanFutures的每個元素執(zhí)行join操作。如上述的代碼所示,我們并沒有這樣做,終止操作是collect,它會將map操作所形成的CF值累積起來,這個過程中沒有阻塞,因此允許底層的任務并發(fā)執(zhí)行。
如果獲取航線和酒店最佳旅行計劃的任務是獨立的,那么我們會希望它們能夠同時初始化,就像前文所述的圖片下載一樣。要將兩個CF按照這種方式聯(lián)合在一起,我們需要使用CompletableFuture.thenCombine方法,它會并行地執(zhí)行receiver以及所提供的CF,然后將它們的結果使用給定的函數(shù)組合起來(在這里,假設變量?airlines、hotels和(稍后使用的)cars都是以List<TravelService>?類型進行聲明的,并且已經(jīng)進行了恰當?shù)某跏蓟?#xff09;:
CompletableFuture .supplyAsync(() -> selectBestTripPlan(airlines)) .thenCombine( CompletableFuture.supplyAsync(() -> selectBestTripPlan(hotels)), TripPlan::combine);對這個樣例進行擴展,我們將會學到更多的內容。假設每個服務供應商都屬于某一個旅行聯(lián)盟(travel alliance),通過String類型的屬性alliance來表示。在獨立訂購完航線和酒店后,我們將會確定它們是否屬于同一個聯(lián)盟,如果是的話,那么只有屬于同一聯(lián)盟的租車服務,才在我們的考慮范圍之內:
private TripPlan addCarHire(TripPlan p) { List<String> alliances = p.getSuppliers().stream() .map(ServiceSupplier::getAlliance) .distinct() .collect(toList()); if (alliances.size() == 1) { return p.combine(selectBestTripPlan(cars, alliances.get(0))); } else { return p.combine(selectBestTripPlan(cars)); } }selectBestTripPlan方法新的重載形式將會接受一個String類型作為偏愛的聯(lián)盟,如果這個值存在的話,會使用它來過濾流中的服務:
private TripPlan selectBestTripPlan( List<ServiceSupplier> serviceList, String favoredAlliance) { List<CompletableFuture<TripPlan>> tripPlanFutures = serviceList.stream() .filter(ts -> favoredAlliance == null || ts.getAlliance().equals(favoredAlliance)) .map(svc -> CompletableFuture.supplyAsync(svc::createPlan, executor)) .collect(toList()); ... }在本例中,選擇租車服務的CF要依賴于航班和酒店預訂任務組合所形成的CF。只有航班和酒店都預訂之后,它才能完成。實現(xiàn)這種關聯(lián)關系的方法就是thenCompose:
CompletableFuture.supplyAsync(() -> selectBestTripPlan(airlines)) .thenCombine( CompletableFuture.supplyAsync(() -> selectBestTripPlan(hotels)), TripPlan::combine) .thenCompose(p -> CompletableFuture.supplyAsync(() -> addCarHire(p)));預訂航班和酒店聯(lián)合形成的CF會執(zhí)行,并且它的結果,也就是聯(lián)合后的TripPlan,將會作為thenCompose函數(shù)參數(shù)的輸入。結果形成的CF非常簡潔地封裝了不同異步服務之間的依賴關系。這段代碼如此簡潔的原因在于,盡管thenCompose聯(lián)合了兩個CF,但是它所返回的并不是我們預期的CompletableFuture<CompletableFuture<TripPlan>>,而是CompletableFuture<TripPlan>。所以,不管在創(chuàng)建CF的時候使用了多少層級的組合,它并不是嵌套的,而是扁平的,要獲取它的結果只需要一步操作。這是monad“綁定(bind)”操作(這個名稱來源于Haskell)的特性,CF就是這種monad,并且闡明了monad一些非常積極的特征:比如,在本例中,我們能夠按照函數(shù)式的形式進行編寫,如果沒有這項功能的話,就需要在各個回調中非常繁瑣地顯式編寫任務定義。
thenCombine方法只是將兩個CF聯(lián)合起來的方法之一,其他的方法包括:
- thenAcceptBoth:與thenCombine類似,但是它接受一個返回值為void的函數(shù);
- runAfterBoth:接受一個?Runnable,在兩個CF都完成后執(zhí)行;
- applyToEither:接受一個一元函數(shù)(unary function),會將首先完成的CF的結果提供給它;
- acceptEither:與applyToEither類似,接受一個一元函數(shù),但是結果為void;
- runAfterEither:接受一個Runnable,在其中一個CF完成后就執(zhí)行。
結論
我們不可能在一篇短文中,完整地闡述像CompletableFuture這樣的API,但是我希望這里的樣例能夠讓你對它所能實現(xiàn)的并發(fā)編程形式有一個直觀印象。將CompletableFuture與其他CompletableFuture組合,以及與其他函數(shù)組合,能夠為多項任務構建類似流水線的方案,這樣能夠控制同步和異步執(zhí)行以及它們之間的依賴。你想更加詳細了解的內容可能會包括異常處理、選擇和配置executor的實際經(jīng)驗以及設計異步API所面臨的挑戰(zhàn)。
我希望已經(jīng)解釋清楚了Java 8所提供的兩種異步編程風格之間的聯(lián)系。在使用fork/join并行機制(包括并行流)的場景中,能夠非常高效的將工作內容進行跨核心分發(fā)。但是,它的適用條件卻非常有限:數(shù)據(jù)集很大并且能夠高效地分割,對某個數(shù)據(jù)元素的操作與其他元素是(相對)獨立的,這些操作的成本應該是比較高昂的,并且應該是CPU密集型的。如果這些條件無法滿足的話,尤其是如果你的任務會花費很多時間阻塞在I/O或網(wǎng)絡請求上的話,那么?CompletableFuture是更好的替代方案。作為Java程序員,我們非常幸運地有這樣一個平臺庫,它將這些補充的方式集成在了一起。
轉載于:https://www.cnblogs.com/kexianting/p/8692437.html
總結
以上是生活随笔為你收集整理的Java8新特性--CompletableFuture的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: weui-react项目实战新心得
- 下一篇: 三元表达式,递归,匿名函数,内置函数
