Java并发编程笔记之FutureTask源码分析
FutureTask可用于異步獲取執(zhí)行結(jié)果或取消執(zhí)行任務(wù)的場景。通過傳入Runnable或者Callable的任務(wù)給FutureTask,直接調(diào)用其run方法或者放入線程池執(zhí)行,之后可以在外部通過FutureTask的get方法異步獲取執(zhí)行結(jié)果,因此,FutureTask非常適合用于耗時(shí)的計(jì)算,主線程可以在完成自己的任務(wù)后,再去獲取結(jié)果。另外,FutureTask還可以確保即使調(diào)用了多次run方法,它都只會(huì)執(zhí)行一次Runnable或者Callable任務(wù),或者通過cancel取消FutureTask的執(zhí)行等。
類圖結(jié)構(gòu)如下所示:
線程池使用 FutureTask 時(shí)候需要注意的一點(diǎn)事,FutureTask 使用不當(dāng)可能會(huì)造成調(diào)用線程一直阻塞,如何避免?
線程池使用 FutureTask 的時(shí)候如果拒絕策略設(shè)置為了?DiscardPolicy和DiscardOldestPolicy并且在被拒絕的任務(wù)的 Future 對(duì)象上調(diào)用無參 get 方法那么調(diào)用線程會(huì)一直被阻塞。
下面先通過一個(gè)簡單的例子來復(fù)現(xiàn)問題,代碼如下:
?
public class FutureTest {//(1)線程池單個(gè)線程,線程池隊(duì)列元素個(gè)數(shù)為1private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(1),new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) throws Exception {//(2)添加任務(wù)oneFuture futureOne = executorService.submit(new Runnable() {@Overridepublic void run() {System.out.println("start runable one");try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}});//(3)添加任務(wù)twoFuture futureTwo = executorService.submit(new Runnable() {@Overridepublic void run() {System.out.println("start runable two");}});//(4)添加任務(wù)threeFuture futureThree=null;try {futureThree = executorService.submit(new Runnable() {@Overridepublic void run() {System.out.println("start runable three");}});} catch (Exception e) {System.out.println(e.getLocalizedMessage());}System.out.println("task one " + futureOne.get());//(5)等待任務(wù)one執(zhí)行完畢System.out.println("task two " + futureTwo.get());//(6)等待任務(wù)two執(zhí)行完畢System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任務(wù)three執(zhí)行完畢executorService.shutdown();//(8)關(guān)閉線程池,阻塞直到所有任務(wù)執(zhí)行完畢}
?
?
運(yùn)行結(jié)果如下:
代碼 (1) 創(chuàng)建了一個(gè)單線程并且隊(duì)列元素個(gè)數(shù)為 1 的線程池,并且拒絕策略設(shè)置為了DiscardPolicy
代碼(2)向線程池提交了一個(gè)任務(wù) one,那么這個(gè)任務(wù)會(huì)使用唯一的一個(gè)線程進(jìn)行執(zhí)行,任務(wù)在打印?start runable one后會(huì)阻塞該線程 5s.
代碼(3)向線程池提交了一個(gè)任務(wù) two,這時(shí)候會(huì)把任務(wù) two 放入到阻塞隊(duì)列
代碼(4)向線程池提交任務(wù) three,由于隊(duì)列已經(jīng)滿了則會(huì)觸發(fā)拒絕策略丟棄任務(wù) three, 從執(zhí)行結(jié)果看在任務(wù) one 阻塞的 5s 內(nèi),主線程執(zhí)行到了代碼 (5) 等待任務(wù) one 執(zhí)行完畢,當(dāng)任務(wù) one 執(zhí)行完畢后代碼(5)返回,主線程打印出 task one null。任務(wù) one 執(zhí)行完成后線程池的唯一線程會(huì)去隊(duì)列里面取出任務(wù) two 并執(zhí)行所以輸出 start runable two 然后代碼(6)會(huì)返回,這時(shí)候主線程輸出 task two null,然后執(zhí)行代碼(7)等待任務(wù) three 執(zhí)行完畢,從執(zhí)行結(jié)果看代碼(7)會(huì)一直阻塞不會(huì)返回,至此問題產(chǎn)生,如果把拒絕策略修改為 DiscardOldestPolicy 也會(huì)存在有一個(gè)任務(wù)的 get 方法一直阻塞只是現(xiàn)在是任務(wù) two 被阻塞。但是如果拒絕策略設(shè)置為默認(rèn)的 AbortPolicy 則會(huì)正常返回,并且會(huì)輸出如下結(jié)果:
?
要分析這個(gè)問題需要看下線程池的 submit 方法里面做了什么,submit 方法源碼如下:
?
public Future<?> submit(Runnable task) {...//(1)裝飾Runnable為Future對(duì)象RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);//(6)返回future對(duì)象return ftask; }protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}public void execute(Runnable command) {...//(2) 如果線程個(gè)數(shù)消息核心線程數(shù)則新增處理線程處理int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//(3)如果當(dāng)前線程個(gè)數(shù)已經(jīng)達(dá)到核心線程數(shù)則任務(wù)放入隊(duì)列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//(4)嘗試新增處理線程進(jìn)行處理else if (!addWorker(command, false))reject(command);//(5)新增失敗則調(diào)用拒絕策略 }
?
?
代碼(1)裝飾 Runnable 為 FutureTask 對(duì)象,然后調(diào)用線程池的 execute 方法。
代碼 (2) 如果線程個(gè)數(shù)消息核心線程數(shù)則新增處理線程處理
代碼(3)如果當(dāng)前線程個(gè)數(shù)已經(jīng)達(dá)到核心線程數(shù)則任務(wù)放入隊(duì)列
代碼(4)嘗試新增處理線程進(jìn)行處理,失敗則進(jìn)行代碼(5),否者直接使用新線程處理
代碼(5)執(zhí)行具體拒絕策略,從這里也可以看出拒絕策略執(zhí)行是使用的業(yè)務(wù)線程。
所以要分析上面例子中問題所在只需要看步驟(5)對(duì)被拒絕任務(wù)的影響,這里先看下拒絕策略 DiscardPolicy 的源碼,如下:
?
public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}
?
?
可知拒絕策略 rejectedExecution 方法里面什么都沒做,所以代碼(4)調(diào)用 submit 后會(huì)返回一個(gè) future 對(duì)象,這里有必要在重新說 future 是有狀態(tài)的,FutureTask 內(nèi)部有一個(gè)state用來展示任務(wù)的狀態(tài),并且是volatile修飾的,future 的狀態(tài)枚舉值如下:
?
/** Possible state transitions:* NEW -> COMPLETING -> NORMAL 正常的狀態(tài)轉(zhuǎn)移* NEW -> COMPLETING -> EXCEPTIONAL 異常* NEW -> CANCELLED 取消* NEW -> INTERRUPTING -> INTERRUPTED 中斷*/private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
?
?
在代碼(1)的時(shí)候使用 newTaskFor 方法轉(zhuǎn)換 Runnable 任務(wù)為 FutureTask,而 FutureTask 的構(gòu)造函數(shù)里面設(shè)置的狀態(tài)就是 New。FutureTask的構(gòu)造函數(shù)源碼如下:
?
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable }?
?
把FutureTask提交到線程池或者線程執(zhí)行start時(shí)候會(huì)調(diào)用run方法,源碼如下:
?
public void run() {//如果當(dāng)前不是new狀態(tài),或者當(dāng)前cas設(shè)置當(dāng)前線程失敗則返回,只有一個(gè)線程可以成功。if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {//當(dāng)前狀態(tài)為new 則調(diào)用任務(wù)的call方法執(zhí)行任務(wù)Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);//完成NEW -> COMPLETING -> EXCEPTIONAL 狀態(tài)轉(zhuǎn)移}//執(zhí)行任務(wù)成功則保存結(jié)果更新狀態(tài),unpark所有等待線程。if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);} }protected void set(V v) {//狀態(tài)從new->COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;//狀態(tài)從COMPLETING-》NORMALUNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state//unpark所有等待線程。finishCompletion();} }?
?
所以使用 DiscardPolicy 策略提交任務(wù)后返回了一個(gè)狀態(tài)值為NEW的future對(duì)象。那么我們下面就要看下當(dāng)future的無參get()方法的時(shí)候,future變?yōu)槭裁礌顟B(tài)才會(huì)返回,這時(shí)候就要看一下FutureTask的get方法的源碼,源碼如下:
?
public V get() throws InterruptedException, ExecutionException {int s = state;//當(dāng)狀態(tài)值<=COMPLETING時(shí)候需要等待,否者調(diào)用report返回if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {//如果被中斷,則拋異常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}//組建單列表int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();//超時(shí)則返回if (nanos <= 0L) {removeWaiter(q);return state;}//否者設(shè)置park超時(shí)時(shí)間LockSupport.parkNanos(this, nanos);}else//直接掛起當(dāng)前線程LockSupport.park(this);}}private V report(int s) throws ExecutionException {Object x = outcome;//狀態(tài)值為NORMAL正常返回if (s == NORMAL)return (V)x;//狀態(tài)值大于等于CANCELLED則拋異常if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
?
?
也就是說當(dāng) future 的狀態(tài) > COMPLETING 時(shí)候調(diào)用 get 方法才會(huì)返回,而明顯 DiscardPolicy 策略在拒絕元素的時(shí)候并沒有設(shè)置該 future 的狀態(tài),后面也沒有其他機(jī)會(huì)可以設(shè)置該 future 的狀態(tài),所以 future 的狀態(tài)一直是 NEW,所以一直不會(huì)返回,同理 DiscardOldestPolicy 策略也是這樣的問題,最老的任務(wù)被淘汰時(shí)候沒有設(shè)置被淘汰任務(wù)對(duì)于 future 的狀態(tài)。、
在submit任務(wù)后還可以調(diào)用futuretask的cancel來取消任務(wù):
?
public boolean cancel(boolean mayInterruptIfRunning) {//只有任務(wù)是new的才能取消if (state != NEW)return false;//運(yùn)行時(shí)允許中斷if (mayInterruptIfRunning) {//完成new->INTERRUPTINGif (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))return false;Thread t = runner;if (t != null)t.interrupt();//完成INTERRUPTING->INTERRUPTEDUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state}//不允許中斷則直接new->CANCELLEDelse if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))return false;finishCompletion();return true; }??
?
那么默認(rèn)的 AbortPolicy 策略為啥沒問題呢?
也就是說當(dāng) future 的狀態(tài) > COMPLETING 時(shí)候調(diào)用 get 方法才會(huì)返回,而明顯 DiscardPolicy 策略在拒絕元素的時(shí)候并沒有設(shè)置該 future 的狀態(tài),后面也沒有其他機(jī)會(huì)可以設(shè)置該 future 的狀態(tài),所以 future 的狀態(tài)一直是 NEW,所以一直不會(huì)返回,同理 DiscardOldestPolicy 策略也是這樣的問題,最老的任務(wù)被淘汰時(shí)候沒有設(shè)置被淘汰任務(wù)對(duì)于 future 的狀態(tài)。
所以當(dāng)使用 Future 的時(shí)候,盡量使用帶超時(shí)時(shí)間的 get 方法,這樣即使使用了 DiscardPolicy 拒絕策略也不至于一直等待,等待超時(shí)時(shí)間到了會(huì)自動(dòng)返回的,如果非要使用不帶參數(shù)的 get 方法則可以重寫 DiscardPolicy 的拒絕策略在執(zhí)行策略時(shí)候設(shè)置該 Future 的狀態(tài)大于 COMPLETING 即可,但是查看 FutureTask 提供的方法發(fā)現(xiàn)只有 cancel 方法是 public 的并且可以設(shè)置 FutureTask 的狀態(tài)大于 COMPLETING,重寫拒絕策略具體代碼可以如下:
?
/*** Created by cong on 2018/7/13.*/ public class MyRejectedExecutionHandler implements RejectedExecutionHandler {public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {if (!threadPoolExecutor.isShutdown()) {if(null != runnable && runnable instanceof FutureTask){((FutureTask) runnable).cancel(true);}}} }?
使用這個(gè)策略時(shí)候由于從 report 方法知道在 cancel 的任務(wù)上調(diào)用 get() 方法會(huì)拋出異常所以代碼(7)需要使用 try-catch 捕獲異常代碼(7)修改為如下:
?
package com.hjc;import java.util.concurrent.*;/*** Created by cong on 2018/7/13.*/ public class FutureTest {//(1)線程池單個(gè)線程,線程池隊(duì)列元素個(gè)數(shù)為1private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L,TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1), new MyRejectedExecutionHandler());public static void main(String[] args) throws Exception {//(2)添加任務(wù)oneFuture futureOne = executorService.submit(new Runnable() {public void run() {System.out.println("start runable one");try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}});//(3)添加任務(wù)twoFuture futureTwo = executorService.submit(new Runnable() {public void run() {System.out.println("start runable two");}});//(4)添加任務(wù)threeFuture futureThree = null;try {futureThree = executorService.submit(new Runnable() {public void run() {System.out.println("start runable three");}});} catch (Exception e) {System.out.println(e.getLocalizedMessage());}System.out.println("task one " + futureOne.get());//(5)等待任務(wù)one執(zhí)行完畢System.out.println("task two " + futureTwo.get());//(6)等待任務(wù)two執(zhí)行完畢try{System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任務(wù)three}catch(Exception e){System.out.println(e.getLocalizedMessage());}executorService.shutdown();//(8)關(guān)閉線程池,阻塞直到所有任務(wù)執(zhí)行完畢} }?
?
運(yùn)行結(jié)果如下:
當(dāng)然這相比正常情況下多了一個(gè)異常捕獲,其實(shí)最好的情況是重寫拒絕策略時(shí)候設(shè)置 FutureTask 的狀態(tài)為 NORMAL,但是這需要重寫 FutureTask 方法了,因?yàn)?FutureTask 并沒有提供接口進(jìn)行設(shè)置。
作者:狂小白
閱讀原文
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Java并发编程笔记之FutureTask源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 五位专家跟你讲讲为啥Python更适合做
- 下一篇: 一张图看懂阿里云新发布的物联网设备上云神