FutureTask源码
介紹
- FutureTask是一種異步任務(或異步計算),舉個栗子,主線程的邏輯中需要使用某個值,但這個值需要負責的運算得來,那么主線程可以提前建立一個異步任務來計算這個值(在其他的線程中計算),然后去做其他事情,當需要這個值的時候再通過剛才建立的異步任務來獲取這個值,有點并行的意思,這樣可以縮短整個主線程邏輯的執行時間。
- 與1.6版本不同,1.7的FutureTask不再基于AQS來構建,而是在內部采用簡單的Treiber Stack來保存等待線程。
接口
public interface Future<V> {//取消任務的執行。參數指定是否立即中斷任務執行,或者等等任務結束boolean cancel(boolean mayInterruptIfRunning);//任務是否已經取消,任務正常完成前將其取消,則返回 trueboolean isCancelled();//任務是否已經完成。需要注意的是如果任務正常終止、異?;蛉∠?#xff0c;都將返回trueboolean isDone();//等待任務執行結束,然后獲得V類型的結果。InterruptedException 線程被中斷異常, ExecutionException任務執行異常,如果任務被取消,還會拋出CancellationExceptionV get() throws InterruptedException, ExecutionException;//同上面的get功能一樣,多了設置超時時間。參數timeout指定超時時間,uint指定時間的單位,在枚舉類TimeUnit中有相關的定義。如果計算超時,將拋出TimeoutExceptionV get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; } public interface RunnableFuture<V> extends Runnable, Future<V> {/*** Sets this Future to the result of its computation* unless it has been cancelled.*/void run(); }?源碼分析
運行過程
FutureTask常用方式:
1.創建任務,實際使用時,一般會結合線程池(ThreadPoolExecutor)使用,所以是在線程池內部創建FutureTask。
2.執行任務,一般會有由工作線程(對于我們當前線程來說的其他線程)調用FutureTask的run方法,完成執行。
3.獲取結果,一般會有我們的當前線程去調用get方法來獲取執行結果,如果獲取時,任務并沒有被執行完畢,當前線程就會被阻塞,直到任務被執行完畢,然后獲取結果。
4.取消任務,某些情況下會放棄任務的執行,進行任務取消。
內部結構
public class FutureTask<V> implements RunnableFuture<V> {/** * 內部狀態可能得遷轉過程: * NEW -> COMPLETING -> NORMAL //正常完成 * NEW -> COMPLETING -> EXCEPTIONAL //發生異常 * NEW -> CANCELLED //取消 * NEW -> INTERRUPTING -> INTERRUPTED //中斷 */ private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;/** 內部的callable,運行完成后設置為null */ private Callable<V> callable;/** 如果正常完成,就是執行結果,通過get方法獲取;如果發生異常,就是具體的異常對象,通過get方法拋出。 */ private Object outcome; // 本身沒有volatile修飾, 依賴state的讀寫來保證可見性。 /** 執行內部callable的線程。 */ private volatile Thread runner;/** 存放等待線程的Treiber Stack*/ private volatile WaitNode waiters;//所謂的Treiber Stack就是由WaitNode組成的(一個單向鏈表)。static final class WaitNode { volatile Thread thread; //指向block線程volatile WaitNode next; //下一個nodeWaitNode() { thread = Thread.currentThread(); } } }創建
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable}public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable}//以下方法為Executors的方法public static <T> Callable<T> callable(Runnable task, T result) {if (task == null)throw new NullPointerException();return new RunnableAdapter<T>(task, result);}static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}public T call() {task.run();return result;}}必須把state的寫放到最后,因為state本身由volatile修飾,所以可以保證callable的可見性。(因為后續讀callable之前會先讀state,還記得這個volatile寫讀的HappenBefore規則吧)
狀態
/** * 內部狀態可能得遷轉過程: * NEW -> COMPLETING -> NORMAL //正常完成 * NEW -> COMPLETING -> EXCEPTIONAL //發生異常 * NEW -> CANCELLED //取消 * NEW -> INTERRUPTING -> INTERRUPTED //中斷 */ public boolean isCancelled() {return state >= CANCELLED;}//只要不為NEW就表示結束public boolean isDone() {return state != NEW;}?
?
private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}}get,set
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}get方法會block直到計算完成。awaitDone()方法:
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {//中斷,則移除q,拋出IEif (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {//處理完,返回,如果q!=null,則把線程解綁if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // 任務正在執行中,COMPLETING是中間狀態。Thread.yield(); //釋放CPU//以下代碼:state == NEW,else if (q == null) // q == null,則創建一個WaitNode,綁定Threadq = new WaitNode();else if (!queued) //未入隊,則入隊queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) { //超時判斷nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}private void removeWaiter(WaitNode node) {if (node != null) {node.thread = null;retry:for (;;) { // restart on removeWaiter racefor (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;if (q.thread != null)pred = q;else if (pred != null) {pred.next = s;if (pred.thread == null) // check for racecontinue retry;}else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}break;}}} private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {//喚醒線程Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}//繼續下一個waiterWaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null; // to reduce footprint}get方法總結:
1.首先檢查當前任務的狀態,如果狀態表示執行完成,進入第2步。
2.獲取執行結果,也可能得到取消或者執行異常,get過程結束。
3.如果當前任務狀態表示未執行或者正在執行,那么當前線程放入一個新建的等待節點,然后進入Treiber Stack進行阻塞等待。
4.如果任務被工作線程(對當前線程來說是其他線程)執行完畢,執行完畢時工作線程會喚醒Treiber Stack上等待的所有線程,所以當前線程被喚醒,清空當前等待節點上的線程域,然后進入第2步。
5.當前線程在阻塞等待結果過程中可能被中斷,如果被中斷,那么會移除當前線程在Treiber Stack上對應的等待節點,然后拋出中斷異常,get過程結束。
6.當前線程也可能執行帶有超時時間的阻塞等待,如果超時時間過了,還沒得到執行結果,那么會除當前線程在Treiber Stack上對應的等待節點,然后拋出超時異常,get過程結束。
?
run
public void run() {//不是NEW狀態或者設置runner失敗,直接退出if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//執行任務result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;//處理可能發生的取消中斷(cancel(true))。 if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}/** * 確保cancel(true)產生的中斷發生在run或runAndReset方法過程中。 */ private void handlePossibleCancellationInterrupt(int s) { // 如果當前正在中斷過程中,自旋等待一下,等中斷完成。 if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // 這里的state狀態一定是INTERRUPTED; // 這里不能清除中斷標記,因為沒辦法區分來自cancel(true)的中斷。 // Thread.interrupted(); } protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }可見runAndReset與run方法的區別只是執行完畢后不設置結果、而且有返回值表示是否執行成功。
cancel
//JDK 1.7 public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; //如果任務已經執行完畢,返回false。 if (mayInterruptIfRunning) { //如果有中斷任務的標志,嘗試將任務狀態設置為INTERRUPTING if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; //上面設置成功的話,這里進行線程中斷。 Thread t = runner; if (t != null) t.interrupt(); //最后將任務狀態設置為INTERRUPTED,注意這里又是LazySet。 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } //如果沒有中斷任務的標志,嘗試將任務狀態設置為CANCELLED。 else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; //最后喚醒Treiber Stack中所有等待線程。 finishCompletion(); return true; } //JDK 1.8public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try { // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {finishCompletion();}return true;}在設置mayInterruptIfRunning為true的情況下,內部首先通過一個原子操作將state從NEW轉變為INTERRUPTING,然后中斷執行任務的線程,然后在通過一個LazySet的操作將state從INTERRUPTING轉變為INTERRUPTED,由于后面這個操作對其他線程并不會立即可見,所以handlePossibleCancellationInterrupt才會有一個自旋等待state從INTERRUPTING變為INTERRUPTED的過程。
?
?
總結
以上是生活随笔為你收集整理的FutureTask源码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JDK1.8并发包中的类
- 下一篇: Java:ThreadPoolExecu