ScheduledThreadPoolExecutor定时任务线程池执行原理分析
一、示例代碼
?
@Slf4j public class ScheduleThreadPoolTest {private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);int nCnt = 0;public void testScheduleThread(){log.debug(" fixed time--> start." );executor.scheduleWithFixedDelay(()->{log.debug(" fixed time--> nCnt:" + (nCnt++));try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}},3000,2000,TimeUnit.MILLISECONDS);try {executor.awaitTermination(100000,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}log.debug(" fixed time--> end." );}public static void main(String[] args) {ScheduleThreadPoolTest scheduleThreadPoolTest = new ScheduleThreadPoolTest();scheduleThreadPoolTest.testScheduleThread();} }二、通用線程池ThreadPoolExecutor執行原理
1.構造函數和成員變量定義
corePoolSize:
線程池的基本大小,即在沒有任務需要執行的時候線程池的大小,并且只有在工作隊列滿了的情況下才會創建超出這個數量的線程。這里需要注意的是:在剛剛創建ThreadPoolExecutor的時候,線程并不會立即啟動,而是要等到有任務提交時才會啟動,除非調用了prestartCoreThread/prestartAllCoreThreads事先啟動核心線程。再考慮到keepAliveTime和allowCoreThreadTimeOut超時參數的影響,所以沒有任務需要執行的時候,線程池的大小不一定是corePoolSize。
maximumPoolSize:
線程池中允許的最大線程數,線程池中的當前線程數目不會超過該值。如果隊列中任務已滿,并且當前線程個數小于maximumPoolSize,那么會創建新的線程來執行任務。這里值得一提的是largestPoolSize,該變量記錄了線程池在整個生命周期中曾經出現的最大線程個數。為什么說是曾經呢?因為線程池創建之后,可以調用setMaximumPoolSize()改變運行的最大線程的數目。
poolSize:
線程池中當前線程的數量,當該值為0的時候,意味著沒有任何線程,線程池會終止;同一時刻,poolSize不會超過maximumPoolSize。
private final BlockingQueue<Runnable> workQueue; 阻塞任務隊列。 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}新提交一個任務時的處理流程很明顯:
1、如果當前線程池的線程數還沒有達到基本大小(poolSize < corePoolSize),無論是否有空閑的線程新增一個線程處理新提交的任務;
2、如果當前線程池的線程數大于或等于基本大小(poolSize >= corePoolSize)?且任務隊列未滿時,就將新提交的任務提交到阻塞隊列排隊,等候處理workQueue.offer(command);
3、如果當前線程池的線程數大于或等于基本大小(poolSize >= corePoolSize)?且任務隊列滿時;
3.1、當前poolSize<maximumPoolSize,那么就新增線程來處理任務;
3.2、當前poolSize=maximumPoolSize,那么意味著線程池的處理能力已經達到了極限,此時需要拒絕新增加的任務。至于如何拒絕處理新增的任務,取決于線程池的飽和策略RejectedExecutionHandler。
2.提交任務時,創建ThreadPoolExecutor的Worker類對象(實現runnable接口),并運行此線程,把此worker添加到works中。
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{final Thread thread;Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this);}3.添加WORK的代碼和流程,會新建一個worker對象,并且運行線程。
private boolean addWorker(Runnable firstTask, boolean core) {boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {if (workerAdded) {t.start();workerStarted = true;}}} return workerStarted;}4.循環從阻塞任務隊列取出任務,然后執行任務。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} finally {afterExecute(task, thrown);}} } finally {processWorkerExit(w, completedAbruptly);}}?5.取任務的過程
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}三、ScheduledThreadPoolExecutor從隊列取任務和存任務的過程
1.DelayedWorkQueue為ScheduledThreadPoolExecutor的阻塞任務隊列。 static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {private static final int INITIAL_CAPACITY = 16;private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture[16];private final ReentrantLock lock = new ReentrantLock();private int size = 0;private Thread leader = null;private final Condition available;DelayedWorkQueue() {this.available = this.lock.newCondition();}2.取任務的過程
public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 自循環,實現對隊列的監控 保證返回根節點for (;;) {// 獲取根節點任務RunnableScheduledFuture<?> first = queue[0];// 如果隊列為空,則通知其他線程等待if (first == null)available.await();else {// 獲取根節點任務等待時間與系統時間的差值long delay = first.getDelay(NANOSECONDS);// 如果等待時間已經到,則返回根節點任務并重排序隊列if (delay <= 0)return finishPoll(first);// 如果等待時間還沒有到,則繼續等待且不擁有任務的引用first = null; // don't retain ref while waiting// 如果此時等待根節點的leader線程不為空則通知其他線程繼續等待if (leader != null)available.await();else {// 如果此時leader線程為空,則把當前線程置為leaderThread thisThread = Thread.currentThread();leader = thisThread;try {// 當前線程等待延遲的時間available.awaitNanos(delay); } finally {// 延遲時間已到 則把當前線程變成非leader線程// 當前線程繼續用于執行for循環的邏輯if (leader == thisThread)leader = null;}}}} } finally {// 如果leader為null 則喚醒一個線程成為leaderif (leader == null && queue[0] != null)available.signal();lock.unlock();} }3.?finishPoll(RunnableScheduledFuture)-獲取根節點后重排序
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {// 因為取出根節點 所以隊列深度減1 并賦值給sint s = --size;// 獲取隊列最后一個任務RunnableScheduledFuture<?> x = queue[s];queue[s] = null; // 該位置元素置空// 如果s已經根節點則直接返回,否則堆重排序if (s != 0)siftDown(0, x);// 取出來的任務 設置其堆索引為-1setIndex(f, -1);return f; // 返回任務 }4.siftDown(int,RunnableScheduledFuture)-移除元素后重排序
private void siftDown(int k, RunnableScheduledFuture<?> key) {// 取隊列當前深度的一半 相當于size / 2int half = size >>> 1;// 索引k(初值為0)的值大于half時 退出循環while (k < half) {// 獲取左節點的索引int child = (k << 1) + 1;// 獲取左節點的任務RunnableScheduledFuture<?> c = queue[child];// 獲取右節點的索引int right = child + 1;// 如果右節點在范圍內 且 左節點大于右節點,if (right < size && c.compareTo(queue[right]) > 0)// 更新child的值為右節點索引值 且更新c為右節點的任務c = queue[child = right];// 如果任務key小于任務c 則退出循環(最小堆)if (key.compareTo(c) <= 0)break;// 否則把任務c放到k上(較小的任務放到父節點上)queue[k] = c;// 設置任務c的堆索引setIndex(c, k);// 更新k的值為childk = child;}// 任務key插入k的位置queue[k] = key;// 設置任務key的堆索引ksetIndex(key, k); }執行的流程圖為:
5.入隊列的過程??offer(Runnable)-新增元素
public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();// 只能存放RunnableScheduledFuture任務RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;// 為了保證隊列的線程安全,offer()方法為線程安全方法final ReentrantLock lock = this.lock;lock.lock();try {// 當前隊列實際深度,即隊列中任務個數int i = size;// 如果任務數已經超過數組長度,則擴容為原來的1.5倍if (i >= queue.length)grow();// 隊列實際深度+1size = i + 1;// 如果是空隊列 新增任務插入到數組頭部;if (i == 0) {queue[0] = e;// 設置該任務在堆中的索引,便于后續取消或者刪除任務;免于查找setIndex(e, 0);} else {// 如果不是空隊列 則調用siftUp()插入任務siftUp(i, e);}// 如果作為首個任務插入到數組頭部if (queue[0] == e) {// 置空當前leader線程leader = null;// 喚醒一個等待的線程 使其成為leader線程available.signal();}} finally {lock.unlock();}return true; }?這個方法理解的難點在于leader線程。若新增任務插入空隊列中,首先清空leader線程,并喚醒等待線程中的某一個線程,把喚醒的線程作為leader線程;若新增任務插入前,隊列中已經存在任務,則說明已經有leader線程在等待獲取根節點,此時無需設置leader線程。leader線程的作用就是用來監聽隊列的根節點任務,如果leader線程沒有獲取到根節點任務則通知其他線程等待,這表明leader線程決定著等待線程的狀態。
用leader-before這種機制,可以減少線程的等待時間,而每一個等待的線程都有可能成為leader線程。注意:這里還不太清除哪些線程會等待。
6.siftUp(int,RunnableScheduledFuture)-新增任務后重排
新增任務插入隊列(數組),首先插入到數組的尾部,然后對比其與該位置的父節點的大小,如果新增任務大于父節點任務(此處是最小堆),則新增任務位置不變,否則改變其與父節點的位置,并再比較父節點與父父節點的大小,直到根節點。插入的過程可以結合上面堆的二叉樹變化過程圖一起理解。
插入流程圖:
7.scheduleWithFixedDelay方法:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t; }任務添加到隊列后,工作線程會從隊列獲取并移除到期的元素,然后執行run方法,所以下面看看ScheduledFutureTask的run方法如何實現定時調度的。
其中ScheduledFutureTask封裝定時任務內部類,重點關注其run方法。
ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);//僅執行一次else if (!periodic)ScheduledFutureTask.super.run();//定時任務else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();//重新加入該任務到delay隊列reExecutePeriodic(outerTask);}}定時調度是先從隊列獲取任務然后執行,然后在重新設置任務時間,在把任務放入隊列實現的。
如果任務執行時間大于delay時間則等任務執行完畢后的delay時間后在次調用任務,不會同一個任務并發執行。
四、上面的delayWorkQueue使用了堆的數據結構,
堆的一些屬性
堆都是滿二叉樹.因為滿二叉樹會充分利用數組的內存空間;
最小堆是指父節點比左節點和右節點都小的結構,所以整個最小堆中,根節點是最小的節點;
最大堆是指父節點比左節點和右節點都大的結構,所以整個最大堆中,根節點是最大的節點;
最大堆和最小堆的左節點和右節點沒有關系,只能判斷父節點和左右兩節點的大小關系;
基于堆的這些屬性,堆適用于找到集合中的最大或者最小值;另外,堆結構記錄任務及其索引的關系,便于插入數據或者刪除數據后重新排序,所以堆適用于優先隊列。
參考鏈接:https://blog.csdn.net/nobody_1/article/details/99684009
總結
以上是生活随笔為你收集整理的ScheduledThreadPoolExecutor定时任务线程池执行原理分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty ObjectPool对象池技
- 下一篇: HashedWheelTimer时间轮定