“既生 ExecutorService, 何生 CompletionService?”
ExecutorService VS CompletionService
假設我們有 4 個任務(A, B, C, D)用來執行復雜的計算,每個任務的執行時間隨著輸入參數的不同而不同,如果將任務提交到 ExecutorService, 相信你已經可以“信手拈來”
ExecutorService?executorService?=?Executors.newFixedThreadPool(4); List<Future>?futures?=?new?ArrayList<Future<Integer>>(); futures.add(executorService.submit(A)); futures.add(executorService.submit(B)); futures.add(executorService.submit(C)); futures.add(executorService.submit(D));//?遍歷?Future?list,通過?get()?方法獲取每個?future?結果 for?(Future?future:futures)?{Integer?result?=?future.get();//?其他業務邏輯 }先直入主題,用 CompletionService 實現同樣的場景
ExecutorService?executorService?=?Executors.newFixedThreadPool(4);//?ExecutorCompletionService?是?CompletionService?唯一實現類 CompletionService?executorCompletionService=?new?ExecutorCompletionService<>(executorService?);List<Future>?futures?=?new?ArrayList<Future<Integer>>(); futures.add(executorCompletionService.submit(A)); futures.add(executorCompletionService.submit(B)); futures.add(executorCompletionService.submit(C)); futures.add(executorCompletionService.submit(D));//?遍歷?Future?list,通過?get()?方法獲取每個?future?結果 for?(int?i=0;?i<futures.size();?i++)?{Integer?result?=?executorCompletionService.take().get();//?其他業務邏輯 }兩種方式在代碼實現上幾乎一毛一樣,我們曾經說過 JDK 中不會重復造輪子,如果要造一個新輪子,必定是原有的輪子在某些場景的使用上有致命缺陷
既然新輪子出來了,二者到底有啥不同呢?
如果 Future 結果沒有完成,調用 get() 方法,程序會阻塞在那里,直至獲取返回結果
先來看第一種實現方式,假設任務 A 由于參數原因,執行時間相對任務 B,C,D 都要長很多,但是按照程序的執行順序,程序在 get() 任務 A 的執行結果會阻塞在那里,導致任務 B,C,D 的后續任務沒辦法執行。又因為每個任務執行時間是不固定的,所以無論怎樣調整將任務放到 List 的順序,都不合適,這就是致命弊端
新輪子自然要解決這個問題,它的設計理念就是哪個任務先執行完成,get() 方法就會獲取到相應的任務結果,這么做的好處是什么呢?來看個圖你就瞬間理解了
兩張圖一對比,執行時長高下立判了,在當今高并發的時代,這點時間差,在吞吐量上起到的效果可能不是一點半點了
那 CompletionService 是怎么做到獲取最先執行完的任務結果的呢?
?
遠看CompletionService 輪廓
如果你使用過消息隊列,你應該秒懂我要說什么了,CompletionService 實現原理很簡單
就是一個將異步任務的生產和任務完成結果的消費解耦的服務
用人話解釋一下上面的抽象概念我只能再畫一張圖了
說白了,哪個任務執行的完,就直接將執行結果放到隊列中,這樣消費者拿到的結果自然就是最早拿到的那個了
從上圖中看到,有任務,有結果隊列,那 CompletionService 自然也要圍繞著幾個關鍵字做文章了
既然是異步任務,那自然可能用到 Runnable 或 Callable
既然能獲取到結果,自然也會用到 Future 了
帶著這些線索,我們走進 CompletionService 源碼看一看
?
近看 CompletionService 源碼
CompletionService ?是一個接口,它簡單的只有 5 個方法:
Future<V>?submit(Callable<V>?task); Future<V>?submit(Runnable?task,?V?result); Future<V>?take()?throws?InterruptedException; Future<V>?poll(); Future<V>?poll(long?timeout,?TimeUnit?unit)?throws?InterruptedException;關于 2 個 submit 方法, 我在 不會用Java Future,我懷疑你泡茶沒我快 文章中做了非常詳細的分析以及案例使用說明,這里不再過多贅述
另外 3 個方法都是從阻塞隊列中獲取并移除阻塞隊列第一個元素,只不過他們的功能略有不同
Take: 如果隊列為空,那么調用 take() 方法的線程會被阻塞
Poll: 如果隊列為空,那么調用 poll() 方法的線程會返回 null
Poll-timeout: 以超時的方式獲取并移除阻塞隊列中的第一個元素,如果超時時間到,隊列還是空,那么該方法會返回 null
所以說,按大類劃分上面5個方法,其實就是兩個功能
提交異步任務 (submit)
從隊列中拿取并移除第一個元素 (take/poll)
CompletionService 只是接口,ExecutorCompletionService 是該接口的唯一實現類
ExecutorCompletionService 源碼分析
先來看一下類結構, 實現類里面并沒有多少內容
ExecutorCompletionService 有兩種構造函數:
private?final?Executor?executor; private?final?AbstractExecutorService?aes; private?final?BlockingQueue<Future<V>>?completionQueue;public?ExecutorCompletionService(Executor?executor)?{if?(executor?==?null)throw?new?NullPointerException();this.executor?=?executor;this.aes?=?(executor?instanceof?AbstractExecutorService)??(AbstractExecutorService)?executor?:?null;this.completionQueue?=?new?LinkedBlockingQueue<Future<V>>(); }public?ExecutorCompletionService(Executor?executor,BlockingQueue<Future<V>>?completionQueue)?{if?(executor?==?null?||?completionQueue?==?null)throw?new?NullPointerException();this.executor?=?executor;this.aes?=?(executor?instanceof?AbstractExecutorService)??(AbstractExecutorService)?executor?:?null;this.completionQueue?=?completionQueue; }兩個構造函數都需要傳入一個 Executor 線程池,因為是處理異步任務的,我們是不被允許手動創建線程的,所以這里要使用線程池也就很好理解了
另外一個參數是 BlockingQueue,如果不傳該參數,就會默認隊列為 LinkedBlockingQueue,任務執行結果就是加入到這個阻塞隊列中的
所以要徹底理解 ExecutorCompletionService ,我們只需要知道一個問題的答案就可以了:
它是如何將異步任務結果放到這個阻塞隊列中的?
想知道這個問題的答案,那只需要看它提交任務之后都做了些什么?
public?Future<V>?submit(Callable<V>?task)?{if?(task?==?null)?throw?new?NullPointerException();RunnableFuture<V>?f?=?newTaskFor(task);executor.execute(new?QueueingFuture(f));return?f; }我們前面也分析過,execute 是提交 Runnable 類型的任務,本身得不到返回值,但又可以將執行結果放到阻塞隊列里面,所以肯定是在 QueueingFuture 里面做了文章
從上圖中看一看出,QueueingFuture 實現的接口非常多,所以說也就具備了相應的接口能力。
重中之重是,它繼承了 FutureTask ,FutureTask 重寫了 Runnable 的 run() 方法 (方法細節分析可以查看FutureTask源碼分析 ) 文中詳細說明了,無論是set() 正常結果,還是setException() 結果,都會調用 finishCompletion() 方法:
private?void?finishCompletion()?{//?assert?state?>?COMPLETING;for?(WaitNode?q;?(q?=?waiters)?!=?null;)?{if?(UNSAFE.compareAndSwapObject(this,?waitersOffset,?q,?null))?{for?(;;)?{Thread?t?=?q.thread;if?(t?!=?null)?{q.thread?=?null;LockSupport.unpark(t);}WaitNode?next?=?q.next;if?(next?==?null)break;q.next?=?null;?//?unlink?to?help?gcq?=?next;}break;}}//?重點?重點?重點done();callable?=?null;????????//?to?reduce?footprint }上述方法會執行 done() 方法,而 QueueingFuture 恰巧重寫了 FutureTask 的 done() 方法:
方法實現很簡單,就是將 task 放到阻塞隊列中
protected?void?done()?{?completionQueue.add(task);? }執行到此的 task 已經是前序步驟 set 過結果的 task,所以就可以通過消費阻塞隊列獲取相應的結果了
相信到這里,CompletionService 在你面前應該沒什么秘密可言了
?
CompletionService 的主要用途
在 JDK docs 上明確給了兩個例子來說明 CompletionService 的用途:
假設你有一組針對某個問題的solvers,每個都返回一個類型為Result的值,并且想要并發地運行它們,處理每個返回一個非空值的結果,在某些方法使用(Result r)
其實就是文中開頭的使用方式
?void?solve(Executor?e,Collection<Callable<Result>>?solvers)throws?InterruptedException,?ExecutionException?{CompletionService<Result>?ecs=?new?ExecutorCompletionService<Result>(e);for?(Callable<Result>?s?:?solvers)ecs.submit(s);int?n?=?solvers.size();for?(int?i?=?0;?i?<?n;?++i)?{Result?r?=?ecs.take().get();if?(r?!=?null)use(r);}}假設你想使用任務集的第一個非空結果,忽略任何遇到異常的任務,并在第一個任務準備好時取消所有其他任務
void?solve(Executor?e,Collection<Callable<Result>>?solvers)throws?InterruptedException?{CompletionService<Result>?ecs=?new?ExecutorCompletionService<Result>(e);int?n?=?solvers.size();List<Future<Result>>?futures=?new?ArrayList<Future<Result>>(n);Result?result?=?null;try?{for?(Callable<Result>?s?:?solvers)futures.add(ecs.submit(s));for?(int?i?=?0;?i?<?n;?++i)?{try?{Result?r?=?ecs.take().get();if?(r?!=?null)?{result?=?r;break;}}?catch?(ExecutionException?ignore)?{}}}finally?{for?(Future<Result>?f?:?futures)//?注意這里的參數給的是?true,詳解同樣在前序?Future?源碼分析文章中f.cancel(true);}if?(result?!=?null)use(result);}這兩種方式都是非常經典的 CompletionService 使用 范式 ,請大家仔細品味每一行代碼的用意
范式沒有說明 Executor 的使用,使用 ExecutorCompletionService,需要自己創建線程池,看上去雖然有些麻煩,但好處是你可以讓多個 ExecutorCompletionService 的線程池隔離,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險 (這也是我們反復說過多次的,不要所有業務共用一個線程池)
?
總結
CompletionService 的應用場景還是非常多的,比如
Dubbo 中的 Forking Cluster
多倉庫文件/鏡像下載(從最近的服務中心下載后終止其他下載過程)
多服務調用(天氣預報服務,最先獲取到的結果)
CompletionService 不但能滿足獲取最快結果,還能起到一定 "load balancer" 作用,獲取可用服務的結果,使用也非常簡單, 只需要遵循范式即可
并發系列 講了這么多,分析源碼的過程也碰到各種隊列,接下來我們就看看那些讓人眼花繚亂的隊列?
?
靈魂追問
通常處理結果還會用異步方式進行處理,如果采用這種方式,有哪些注意事項?
如果是你,你會選擇使用無界隊列嗎?為什么?
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道公眾號
好文章,我在看??
總結
以上是生活随笔為你收集整理的“既生 ExecutorService, 何生 CompletionService?”的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux 29岁,你不知道的29个重大
- 下一篇: 实战项目---模拟商品采购中心信息平台