线程池之ScheduledThreadPoolExecutor线程池源码分析笔记
1.ScheduledThreadPoolExecutor?整體結構剖析。
1.1類圖介紹
?
根據上面類圖圖可以看到Executor其實是一個工具類,里面提供了好多靜態方法,根據用戶選擇返回不同的線程池實例??梢钥吹絊cheduledThreadPoolExecutor?繼承了?ThreadPoolExecutor?并實現?ScheduledExecutorService接口。線程池隊列是?DelayedWorkQueue,和?DelayedQueue?類似是一個延遲隊列。
ScheduledFutureTask?是具有返回值的任務,繼承自 FutureTask,FutureTask 內部有個變量 state 用來表示任務的狀態,一開始狀態為 NEW,所有狀態為:
private static final int NEW = 0;//初始狀態private static final int COMPLETING = 1;//執行中狀態private static final int NORMAL = 2;//正常運行結束狀態private static final int EXCEPTIONAL = 3;//運行中異常private static final int CANCELLED = 4;//任務被取消private static final int INTERRUPTING = 5;//任務正在被中斷private static final int INTERRUPTED = 6;//任務已經被中斷FutureTask可能的任務狀態轉換路徑如下所示:
NEW -> COMPLETING -> NORMAL //初始狀態->執行中->正常結束NEW -> COMPLETING -> EXCEPTIONAL//初始狀態->執行中->執行異常NEW -> CANCELLED//初始狀態->任務取消NEW -> INTERRUPTING -> INTERRUPTED//初始狀態->被中斷中->被中斷其實ScheduledFutureTask?內部還有個變量 period 用來表示任務的類型,其任務類型如下:
-
period=0,說明當前任務是一次性的,執行完畢后就退出了。
-
period 為負數,說明當前任務為 fixed-delay 任務,是定時可重復執行任務。
-
period 為整數,說明當前任務為 fixed-rate 任務,是定時可重復執行任務。
?
接下來我們可以看到ScheduledThreadPoolExecutor?的造函數如下
//使用改造后的Delayqueue.public ScheduledThreadPoolExecutor(int corePoolSize) {//調用父類ThreadPoolExecutor的構造函數super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue());}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}根據上面代碼可以看到線程池隊列是?DelayedWorkQueue
?
2、原理分析
我們主要看三個重要的函數,如下所示:
schedule(Runnable command, long delay,TimeUnit unit)scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)?
2.1、schedule(Runnable command, long delay,TimeUnit unit)方法
該方法作用是提交一個延遲執行的任務,任務從提交時間算起延遲 unit 單位的 delay 時間后開始執行,提交的任務不是周期性任務,任務只會執行一次,代碼如下:
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {//(1)參數校驗if (command == null || unit == null)throw new NullPointerException();//(2)任務轉換RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));//(3)添加任務到延遲隊列 delayedExecute(t);return t; }可以看到上面代碼所示,代碼(1)參數校驗,如果 command 或者 unit 為 null,拋出 NPE 異常。
代碼(2)裝飾任務,把提交的 command 任務轉換為?ScheduledFutureTask,ScheduledFutureTask?是具體放入到延遲隊列里面的東西,由于是延遲任務,所以?ScheduledFutureTask?實現了?long getDelay(TimeUnit unit)?和?int compareTo(Delayed other)?方法,triggerTime 方法轉換延遲時間為絕對時間,也就是把當前時間的納秒數加上延遲的納秒數后的 long 型值。
接下來我們需要看?ScheduledFutureTask的構造函數,如下所示:
ScheduledFutureTask(Runnable r, V result, long ns) {//調用父類FutureTask的構造函數super(r, result);this.time = ns;this.period = 0;//period為0,說明為一次性任務this.sequenceNumber = sequencer.getAndIncrement(); }根據構造函數可以看到內部首先調用了父類 FutureTask 的構造函數,父類 FutureTask 的構造函數代碼如下:
//通過適配器把runnable轉換為callable public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; //設置當前任務狀態為NEW }根據上面代碼可以看到FutureTask 中任務又被轉換為了 Callable 類型后,保存到了變量 this.callable 里面,并設置 FutureTask 的任務狀態為 NEW。
然后?ScheduledFutureTask?構造函數內部設置 time 為上面說的絕對時間,需要注意這里 period 的值為 0,這說明當前任務為一次性任務,不是定時反復執行任務。
其中?long getDelay(TimeUnit unit)?方法代碼如下,用來獲取當前任務還有多少時間就過期了,代碼如下所示:
//元素過期算法,裝飾后時間-當前時間,就是即將過期剩余時間 public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS); }?
接下來接著看compareTo(Delayed other)?方法,代碼如下:
public int compareTo(Delayed other) {if (other == this) // compare zero ONLY if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long d = (getDelay(TimeUnit.NANOSECONDS) -other.getDelay(TimeUnit.NANOSECONDS));return (d == 0) ? 0 : ((d < 0) ? -1 : 1); }根據上面代碼的執行邏輯,可以看到compareTo 作用是加入元素到延遲隊列后,內部建立或者調整堆時候會使用該元素的 compareTo 方法與隊列里面其他元素進行比較,讓最快要過期的元素放到隊首。所以無論什么時候向隊列里面添加元素,隊首的的元素都是最即將過期的元素。
?
接下來接著看代碼(3)添加任務到延遲隊列,delayedExecute 的代碼如下:
private void delayedExecute(RunnableScheduledFuture<?> task) {//(4)如果線程池關閉了,則執行線程池拒絕策略if (isShutdown())reject(task);else {//(5)添加任務到延遲隊列super.getQueue().add(task);//(6)再次檢查線程池狀態if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else//(7)確保至少一個線程在處理任務 ensurePrestart();} }可以看到代碼(4)首先判斷當前線程池是否已經關閉了,如果已經關閉則執行線程池的拒絕策略(如果不知道線程池的拒絕策略可以看前一篇線程池的介紹。)
否者執行代碼(5)添加任務到延遲隊列。添加完畢后還要重新檢查線程池是否被關閉了,如果已經關閉則從延遲隊列里面刪除剛才添加的任務,但是有可能線程池線程已經從任務隊列里面移除了該任務,也就是該任務已經在執行了,所以還需要調用任務的 cancle 方法取消任務。
如果代碼(6)判斷結果為 false,則會執行代碼(7)確保至少有一個線程在處理任務,即使核心線程數 corePoolSize 被設置為 0.
ensurePrestart 代碼如下:
void ensurePrestart() {int wc = workerCountOf(ctl.get());//增加核心線程數if (wc < corePoolSize)addWorker(null, true);//如果初始化corePoolSize==0,則也添加一個線程。else if (wc == 0)addWorker(null, false);} }如上代碼首先首先獲取線程池中線程個數,如果線程個數小于核心線程數則新增一個線程,否者如果當前線程數為 0 則新增一個線程。
通過上面代碼我們分析了如何添加任務到延遲隊列,下面我們看線程池里面的線程如何獲取并執行任務的,從前面講解的?ThreadPoolExecutor?我們知道具體執行任務的線程是 Worker 線程,Worker 線程里面調用具體任務的 run 方法進行執行,由于這里任務是?ScheduledFutureTask,所以我們下面看看?ScheduledFutureTask?的 run 方法。代碼如下:
public void run() {//(8)是否只執行一次boolean periodic = isPeriodic();//(9)取消任務if (!canRunInCurrentRunState(periodic))cancel(false);//(10)只執行一次,調用schdule時候else if (!periodic)ScheduledFutureTask.super.run();//(11)定時執行else if (ScheduledFutureTask.super.runAndReset()) {//(11.1)設置time=time+period setNextRunTime();//(11.2)重新加入該任務到delay隊列 reExecutePeriodic(outerTask);} }可以看到代碼(8)isPeriodic 的作用是判斷當前任務是一次性任務還是可重復執行的任務,isPeriodic 的代碼如下:
public boolean isPeriodic() {return period != 0; }可知內部是通過 period 的值來判斷,由于轉換任務創建 ScheduledFutureTask 時候傳遞的 period 為 0 ,所以這里 isPeriodic 返回 false。
代碼(9)判斷當前任務是否應該被取消,canRunInCurrentRunState 的代碼如下:
boolean canRunInCurrentRunState(boolean periodic) {return isRunningOrShutdown(periodic ?continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown); }這里傳遞的 periodic 為 false,所以?isRunningOrShutdown?的參數為?executeExistingDelayedTasksAfterShutdown,executeExistingDelayedTasksAfterShutdown?默認是 true 標示當其它線程調用了 shutdown 命令關閉了線程池后,當前任務還是要執行,否者如果為 false,標示當前任務要被取消。
由于 periodic 為 false,所以執行代碼(10)調用父類 FutureTask 的 run 方法具體執行任務,FutureTask 的 run 方法代碼如下:
public void run() {//(12)if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;//(13)try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;//(13.1) setException(ex);}//(13.2)if (ran)set(result);}} finally {...省略 }}?
可以看到代碼(12)如果任務狀態不是 NEW 則直接返回,或者如果當前任務狀態為NEW但是使用 CAS 設置當然任務的持有者為當前線程失敗則直接返回。代碼(13)具體調用 callable 的 call 方法執行任務,這里在調用前又判斷了任務的狀態是否為 NEW 是為了避免在執行代碼(12)后其他線程修改了任務的狀態(比如取消了該任務)。
如果任務執行成功則執行代碼(13.2)修改任務狀態,set 方法代碼如下:
protected void set(V v) {//如果當前任務狀態為NEW,則設置為COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;//設置當前任務終狀為NORMAL,也就是任務正常結束UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion();}}如上代碼首先 CAS 設置當前任務狀態從 NEW 轉換到 COMPLETING,這里多個線程調用時候只有一個線程會成功,成功的線程在通過?UNSAFE.putOrderedInt?設置任務的狀態為正常結束狀態,這里沒有用 CAS 是因為同一個任務只可能有一個線程可以運行到這里,這里使用?putOrderedInt?比使用 CAS 函數或者?putLongVolatile?效率要高,并且這里的場景不要求其它線程馬上對設置的狀態值可見。
這里思考個問題,這里什么時候多個線程會同時執行 CAS 設置任務狀態從態從 NEW 到 COMPLETING?其實當同一個 comand 被多次提交到線程池時候就會存在這樣的情況,由于同一個任務共享一個狀態值 state。
如果任務執行失敗,則執行代碼(13.1),setException 的代碼如下,可見與 set 函數類似,代碼如下:
protected void setException(Throwable t) {//如果當前任務狀態為NEW,則設置為COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;//設置當前任務終態為EXCEPTIONAL,也就是任務非正常結束UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);finishCompletion();} }
到這里代碼(10)邏輯執行完畢,一次性任務也就執行完畢了,
下面會講到如果任務是可重復執行的,則不會執行步驟(10)而是執行代碼(11)。
?
2.2??scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)方法
當任務執行完畢后,延遲固定間隔時間后再次運行(fixed-delay 任務):其中 initialDelay 說明提交任務后延遲多少時間開始執行任務 command,delay 表示當任務執行完畢后延長多少時間后再次運行 command 任務,unit 是 initialDelay 和 delay 的時間單位。任務會一直重復運行直到任務運行時候拋出了異?;蛘呷∠巳蝿?#xff0c;或者關閉了線程池。scheduleWithFixedDelay?的代碼如下:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {//(14)參數校驗if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();//(15)任務轉換,注意這里是period=-delay<0ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null, triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;//(16)添加任務到隊列 delayedExecute(t);return t;}
?
如上代碼(14)進行參數校驗,校驗失敗則拋出異常,代碼(15)轉換 command 任務為?ScheduledFutureTask,這里需要注意的是這里傳遞給?ScheduledFutureTask?的 period 變量的值為 -delay,period < 0 這個說明該任務為可重復執行的任務。然后代碼(16)添加任務到延遲隊列后返回。
任務添加到延遲隊列后線程池線程會從隊列里面獲取任務,然后調用?ScheduledFutureTask的 run 方法執行,由于這里 period<0 所以 isPeriodic 返回 true,所以執行代碼(11),runAndReset 的代碼如下:
protected boolean runAndReset() {//(17)if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;//(18)boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}}} finally {... }return ran && s == NEW;//(19) }該代碼和 FutureTask 的 run 類似,只是任務正常執行完畢后不會設置任務的狀態,這樣做是為了讓任務成為可重復執行的任務,這里多了代碼(19)如果當前任務正常執行完畢并且任務狀態為 NEW 則返回 true 否者返回 false。
如果返回了 true 則執行代碼(11.1)setNextRunTime?方法設置該任務下一次的執行時間,setNextRunTime?的代碼如下:
private void setNextRunTime() {long p = period;if (p > 0)//fixed-rate類型任務time += p;else//fixed-delay類型任務time = triggerTime(-p);}?
如上代碼這里 p < 0 說明當前任務為?fixed-delay?類型任務,然后設置 time 為當前時間加上?-p?的時間,也就是延遲?-p?時間后在次執行。
總結:本節介紹的?fixed-delay?類型的任務的執行實現原理如下,當添加一個任務到延遲隊列后,等 initialDelay 時間后,任務就會過期,過期的任務就會被從隊列移除,并執行,執行完畢后,會重新設置任務的延遲時間,然后在把任務放入延遲隊列實現的,依次往復。需要注意的是如果一個任務在執行某一個次時候拋出了異常,那么這個任務就結束了,但是不影響其它任務的執行。
?
2.3、scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)方法
相對起始時間點固定頻率調用指定的任務(fixed-rate 任務):當提交任務到線程池后延遲 initialDelay 個時間單位為 unit 的時間后開始執行任務 comand ,然后?initialDelay + period?時間點再次執行,然后在?initialDelay + 2 * period?時間點再次執行,依次往復,直到拋出異?;蛘哒{用了任務的 cancel 方法取消了任務在結束或者關閉了線程池。
scheduleAtFixedRate?的原理與?scheduleWithFixedDelay?類似,下面我們講下不同點,首先調用?scheduleAtFixedRate?時候代碼如下:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {...//裝飾任務類,注意period=period>0,不是負的ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));...return t; }如上代碼?fixed-rate?類型的任務在轉換?command?任務為?ScheduledFutureTask?的時候設置的?period=period?不在是?-period。
所以當前任務執行完畢后,調用?setNextRunTime?設置任務下次執行的時間時候執行的是?time += p?而不在是?time = triggerTime(-p);。
總結:相對于?fixed-delay?任務來說,fixed-rate?方式執行規則為時間為?initdelday + n*period;?時候啟動任務,但是如果當前任務還沒有執行完,下一次要執行任務的時間到了,不會并發執行,下次要執行的任務會延遲執行,要等到當前任務執行完畢后在執行一個任務。
?
3、總結
?ScheduledThreadPoolExecutor?的實現原理,其內部使用的?DelayQueue來存放具體任務,其中任務分為三種,其中一次性執行任務執行完畢就結束了,fixed-delay任務保證同一個任務多次執行之間間隔固定時間,fixed-rate?任務保證任務執行按照固定的頻率執行,其中任務類型使用?period?的值來區分。
?
轉載于:https://www.cnblogs.com/huangjuncong/p/11029893.html
總結
以上是生活随笔為你收集整理的线程池之ScheduledThreadPoolExecutor线程池源码分析笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【转载】C#中List集合使用Exist
- 下一篇: [转]进程,线程和多线程