Java Review - 并发编程_ScheduledThreadPoolExecutor原理源码剖析
文章目錄
- 概述
- 類結構
- 核心方法&源碼解析
- schedule(Runnable command, long delay,TimeUnit unit)
- scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
- scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
- 小結
概述
Java Review - 并發編程_ThreadPoolExecutor原理&源碼剖析 我們復習了Java中線程池ThreadPoolExecutor的原理,ThreadPoolExecutor只是Executors工具類的一部分功能。
下面來介紹另外一部分功能,也就是ScheduledThreadPoolExecutor的實現,這是一個可以在指定一定延遲時間后或者定時進行任務調度執行的線程池。
類結構
-
Executors其實是個工具類,它提供了好多靜態方法,可根據用戶的選擇返回不同的線程池實例。
-
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor并實現了ScheduledExecutorService接口。
-
線程池隊列是DelayedWorkQueue,其和DelayedQueue類似,是一個延遲隊列
-
ScheduledFutureTask是具有返回值的任務,繼承自FutureTask。FutureTask的內部有一個變量state用來表示任務的狀態,一開始狀態為NEW,所有狀態為
可能的任務狀態轉換路徑為
NEN-> COMPLETING-> NORMAL//初始狀態->執行中ー>正常結東 NEN-> COMPILETING-> EXCEPTIONAL//初始狀態->執行中ー>執行異常 NEN-> CANCELLED//初始狀態一>任務取消 NEN-> INTERRUPTING-> INTERRUPTED//初始狀態->被中斷中->被中斷-
ScheduledFutureTask內部還有一個變量period用來表示任務的類型,任務類型如下
period=0,說明當前任務是一次性的,執行完畢后就退出了。
period為負數,說明當前任務為fixed-delay任務,是固定延遲的定時可重復執行任務。
period為正數,說明當前任務為fixed-rate任務,是固定頻率的定時可重復執行任務
-
ScheduledThreadPoolExecutor的一個構造函數如下,由該構造函數可知線程池隊列是DelayedWorkQueue。
核心方法&源碼解析
schedule(Runnable command, long delay,TimeUnit unit)
該方法的作用是提交一個延遲執行的任務,任務從提交時間算起延遲單位為unit的delay時間后開始執行。提交的任務不是周期性任務,任務只會執行一次.
/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {// 1 參數校驗 if (command == null || unit == null)throw new NullPointerException();// 2 任務轉換 RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));// 3 添加任務到延遲隊列 delayedExecute(t);return t;}-
代碼(1)進行參數校驗,如果command或者unit為null,則拋出NPE異常。
-
代碼(2)裝飾任務,把提交的command任務轉換為ScheduledFutureTask。
ScheduledFutureTask是具體放入延遲隊列里面的東西。由于是延遲任務,所以ScheduledFutureTask實現了long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法。triggerTime方法將延遲時間轉換為絕對時間,也就是把當前時間的納秒數加上延遲的納秒數后的long型值。
ScheduledFutureTask的構造函數如下。
/*** Creates a one-shot action with given nanoTime-based trigger time.*/ScheduledFutureTask(Runnable r, V result, long ns) {// 調用父類FutureTask的構造函數super(r, result);this.time = ns;this.period = 0; // 0 說明是一次性任務this.sequenceNumber = sequencer.getAndIncrement();}在構造函數內部首先調用了父類FutureTask的構造函數,父類FutureTask的構造函數代碼如下。
public FutureTask(Runnable runnable, V result) {// 通過適配器把runnable轉換為callablethis.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable 設置狀態為NEW}FutureTask中的任務被轉換為Callable類型后,被保存到了變量this.callable里面,并設置FutureTask的任務狀態為NEW。
然后在ScheduledFutureTask構造函數內部設置time為上面說的絕對時間。需要注意,這里period的值為0,這說明當前任務為一次性的任務,不是定時反復執行任務。其中long getDelay(TimeUnit unit)方法的代碼如下(該方法用來計算當前任務還有多少時間就過期了)。
public long getDelay(TimeUnit unit) {// 裝飾后的時間 - 當前時間 = 即將過期剩余時間return unit.convert(time - now(), NANOSECONDS);}public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}compareTo的作用是加入元素到延遲隊列后,在內部建立或者調整堆時會使用該元素的compareTo方法與隊列里面其他元素進行比較,讓最快要過期的元素放到隊首。所以無論什么時候向隊列里面添加元素,隊首的元素都是最快要過期的元素。
-
代碼(3)將任務添加到延遲隊列,delayedExecute的代碼如下。
/*** Main execution method for delayed or periodic tasks. If pool* is shut down, rejects the task. Otherwise adds task to queue* and starts a thread, if necessary, to run it. (We cannot* prestart the thread to run the task because the task (probably)* shouldn't be run yet.) If the pool is shut down while the task* is being added, cancel and remove it if required by state and* run-after-shutdown parameters.** @param task the task*/private void delayedExecute(RunnableScheduledFuture<?> task) {// 4 如果線程池拐臂了,則執行線程執行拒絕策略if (isShutdown())reject(task);else {// 5 添加任務到延遲隊列super.getQueue().add(task);// 6 再次校驗if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else// 7 確保至少一個線程在處理任務ensurePrestart();}}- 代碼(4)首先判斷當前線程池是否已經關閉了,如果已經關閉則執行線程池的拒絕策略 ,否則執行代碼(5)將任務添加到延遲隊列
- 添加完畢后還要重新檢查線程池是否被關閉了,如果已經關閉則從延遲隊列里面刪除剛才添加的任務,但是此時有可能線程池中的線程已經從任務隊列里面移除了該任務,也就是該任務已經在執行了,所以還需要調用任務的cancle方法取消任務。
- 如果代碼(6)判斷結果為false,則會執行代碼(7)確保至少有一個線程在處理任務,即使核心線程數corePoolSize被設置為0
如上代碼首先獲取線程池中的線程個數,如果線程個數小于核心線程數則新增一個線程,否則如果當前線程數為0則新增一個線程。
上面我們分析了如何向延遲隊列添加任務,下面我們來看線程池里面的線程如何獲取并執行任務。
前面說ThreadPoolExecutor時我們說過,具體執行任務的線程是Worker線程,Worker線程調用具體任務的run方法來執行。由于這里的任務是ScheduledFutureTask,所以我們下面看看ScheduledFutureTask的run方法
/*** Overrides FutureTask version so as to reset/requeue if periodic.*/public void run() {// 8 是否只執行一次 boolean periodic = isPeriodic();// 9 取消任務if (!canRunInCurrentRunState(periodic))cancel(false);// 10 只執行一次,調用schedule方法else if (!periodic)ScheduledFutureTask.super.run();// 11 定時執行 else if (ScheduledFutureTask.super.runAndReset()) {// 11.1 設置time=time+periodsetNextRunTime();// 11.2 重新加入該任務到delay隊列reExecutePeriodic(outerTask);}}-
代碼(8)中的isPeriodic的作用是判斷當前任務是一次性任務還是可重復執行的任務
public boolean isPeriodic() {return period != 0;}可以看到,其內部是通過period的值來判斷的,由于轉換任務在創建ScheduledFutureTask時傳遞的period的值為0 ,所以這里isPeriodic返回false。
-
代碼(9)判斷當前任務是否應該被取消,canRunInCurrentRunState的代碼如下
boolean canRunInCurrentRunState(boolean periodic) {return isRunningOrShutdown(periodic ?continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown);}傳遞的periodic的值為false,所以isRunningOrShutdown的參數為executeExisti ngDelayedTasksAfterShutdown。executeExistingDelayedTasksAfterShutdown默認為true,表示當其他線程調用了shutdown命令關閉了線程池后,當前任務還是要執行,否則如果為false,則當前任務要被取消。
-
由于periodic的值為false,所以執行代碼(10)調用父類FutureTask的run方法具體執行任務。FutureTask的run方法的代碼如下
public void run() {// 12 if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;// 13 try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;// 13.1setException(ex);}// 13.2if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}} -
代碼(12)判斷如果任務狀態不是NEW則直接返回,或者如果當前任務狀態為NEW但是使用CAS設置當前任務的持有者為當前線程失敗則直接返回
-
代碼(13)具體調用callable的call方法執行任務。這里在調用前又判斷了任務的狀態是否為NEW,是為了避免在執行代碼(12)后其他線程修改了任務的狀態(比如取消了該任務)。\
-
如果任務執行成功則執行代碼(13.2)修改任務狀態,set方法的代碼如下。
protected void set(V v) {// 如果為NEW,設置為COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;// 設置當前任務的狀態為NORMAL,也就是任務正常結束UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}- 如上代碼首先使用CAS將當前任務的狀態從NEW轉換到COMPLETING。這里當有多個線程調用時只有一個線程會成功。成功的線程再通過UNSAFE.putOrderedInt設置任務的狀態為正常結束狀態,這里沒有使用CAS是因為對于同一個任務只可能有一個線程運行到這里。
- 在這里使用putOrderedInt比使用CAS或者putLongvolatile效率要高,并且這里的場景不要求其他線程馬上對設置的狀態值可見。
在什么時候多個線程會同時執行CAS將當前任務的狀態從NEW轉換到COMPLETING?其實當同一個command被多次提交到線程池時就會存在這樣的情況,因為同一個任務共享一個狀態值state。
如果任務執行失敗,則執行代碼(13.1)。setException的代碼如下,可見與set函數類似。
protected void setException(Throwable t) {// 如果當前任務的狀態為NEW,則設置為COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;// 設置當前任務的狀態為EXCEPTIONAL,也就是任務非正常結束UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}}到這里代碼(10)的邏輯執行完畢,一次性任務也就執行完畢了
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
該方法的作用是,當任務執行完畢后,讓其延遲固定時間后再次運行(fixed-delay任務)
- initialDelay表示提交任務后延遲多少時間開始執行任務command
- delay表示當任務執行完畢后延長多少時間后再次運行command任務
- unit是initialDelay和delay的時間單位
任務會一直重復運行直到任務運行中拋出了異常,被取消了,或者關閉了線程池。
/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}* @throws IllegalArgumentException {@inheritDoc}*/public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {// 14 參數校驗 if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();// 15 任務轉換 ,注意這里的 poeriod = -dealy < 0 【 unit.toNanos(-delay)】ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;// 16 添加任務到隊列delayedExecute(t);return t;}- 代碼(14)進行參數校驗,校驗失敗則拋出異常
- 代碼(15)將command任務轉換為ScheduledFutureTask。這里需要注意的是,傳遞給ScheduledFutureTask的period變量的值為-delay,period<0說明該任務為可重復執行的任務。
- 然后代碼(16)添加任務到延遲隊列后返回。
將任務添加到延遲隊列后線程池線程會從隊列里面獲取任務,然后調用ScheduledFutureTask的run方法執行。由于這里period<0,所以isPeriodic返回true,所以執行代碼(11)。runAndReset的代碼如下。
/*** Executes the computation without setting its result, and then* resets this future to initial state, failing to do so if the* computation encounters an exception or is cancelled. This is* designed for use with tasks that intrinsically execute more* than once.** @return {@code true} if successfully run and reset*/protected boolean runAndReset() {// 17 if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;// 18 boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptss = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;}該代碼和FutureTask的run方法類似,只是任務正常執行完畢后不會設置任務的狀態,這樣做是為了讓任務成為可重復執行的任務。
這里多了代碼(19),這段代碼判斷如果當前任務正常執行完畢并且任務狀態為NEW則返回true,否則返回false。 如果返回了true則執行代碼(11.1)的setNextRunTime方法設置該任務下一次的執行時間。
/*** Sets the next time to run for a periodic task.*/private void setNextRunTime() {long p = period;if (p > 0) // ffixed-rate類型任務time += p;else // fixed-delay 類型任務 time = triggerTime(-p);}這里p<0說明當前任務為fixed-delay類型任務。然后設置time為當前時間加上-p的時間,也就是延遲-p時間后再次執行。
fixed-delay類型的任務的執行原理為: 當添加一個任務到延遲隊列后,等待initialDelay時間,任務就會過期,過期的任務就會被從隊列移除,并執行。執行完畢后,會重新設置任務的延遲時間,然后再把任務放入延遲隊列,循環往復。需要注意的是,如果一個任務在執行中拋出了異常,那么這個任務就結束了,但是不影響其他任務的執行。
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
該方法相對起始時間點以固定頻率調用指定的任務(fixed-rate任務)。當把任務提交到線程池并延遲initialDelay時間(時間單位為unit)后開始執行任務command 。然后從initialDelay+period時間點再次執行,而后在 initialDelay + 2 * period時間點再次執行,循環往復,直到拋出異?;蛘哒{用了任務的cancel方法取消了任務,或者關閉了線程池。
scheduleAtFixedRate的原理與scheduleWithFixedDelay類似,下面我們看下它們之間的不同點。
首先調用scheduleAtFixedRate的代碼如下
/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}* @throws IllegalArgumentException {@inheritDoc}*/public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();// 裝飾任務,注意這里的period=period>0 不是負的ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}在如上代碼中,在將fixed-rate類型的任務command轉換為ScheduledFutureTask時設置period=period,不再是-period。
所以當前任務執行完畢后,調用setNextRunTime設置任務下次執行的時間時執行的是time += p而不再是time = triggerTime(-p)。
總結:相對于fixed-delay任務來說,fixed-rate方式執行規則為,時間為initdelday +n*period時啟動任務,但是如果當前任務還沒有執行完,下一次要執行任務的時間到了,則不會并發執行,下次要執行的任務會延遲執行,要等到當前任務執行完畢后再執行。
小結
ScheduledThreadPoolExecutor的實現原理,其內部使用DelayQueue來存放具體任務。任務分為三種,其中一次性執行任務執行完畢就結束了,fixed-delay任務保證同一個任務在多次執行之間間隔固定時間,fixed-rate任務保證按照固定的頻率執行。任務類型使用period的值來區分。
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Java Review - 并发编程_ScheduledThreadPoolExecutor原理源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Review - 并发编程_T
- 下一篇: Java Review - 并发编程_