深入理解 Java 线程池:ThreadPoolExecutor
線程池介紹
在web開發(fā)中,服務(wù)器需要接受并處理請求,所以會為一個請求來分配一個線程來進行處理。如果每次請求都新創(chuàng)建一個線程的話實現(xiàn)起來非常簡便,但是存在一個問題:
如果并發(fā)的請求數(shù)量非常多,但每個線程執(zhí)行的時間很短,這樣就會頻繁的創(chuàng)建和銷毀線程,如此一來會大大降低系統(tǒng)的效率。可能出現(xiàn)服務(wù)器在為每個請求創(chuàng)建新線程和銷毀線程上花費的時間和消耗的系統(tǒng)資源要比處理實際的用戶請求的時間和資源更多。
那么有沒有一種辦法使執(zhí)行完一個任務(wù),并不被銷毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)呢?
這就是線程池的目的了。線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對多個任務(wù)重用線程,線程創(chuàng)建的開銷被分攤到了多個任務(wù)上。
什么時候使用線程池?
- 單個任務(wù)處理時間比較短
- 需要處理的任務(wù)數(shù)量很大
使用線程池的好處
引用自?ifeve.com/java-thread…?的說明:
- 降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
- 提高響應(yīng)速度。當(dāng)任務(wù)到達時,任務(wù)可以不需要的等到線程創(chuàng)建就能立即執(zhí)行。
- 提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。
Java中的線程池是用ThreadPoolExecutor類來實現(xiàn)的. 本文就結(jié)合JDK 1.8對該類的源碼來分析一下這個類內(nèi)部對于線程的創(chuàng)建, 管理以及后臺任務(wù)的調(diào)度等方面的執(zhí)行原理。
先看一下線程池的類圖:
Executor框架接口
Executor框架是一個根據(jù)一組執(zhí)行策略調(diào)用,調(diào)度,執(zhí)行和控制的異步任務(wù)的框架,目的是提供一種將”任務(wù)提交”與”任務(wù)如何運行”分離開來的機制。
J.U.C中有三個Executor接口:
- Executor:一個運行新任務(wù)的簡單接口;
- ExecutorService:擴展了Executor接口。添加了一些用來管理執(zhí)行器生命周期和任務(wù)生命周期的方法;
- ScheduledExecutorService:擴展了ExecutorService。支持Future和定期執(zhí)行任務(wù)。
Executor接口
public interface Executor {void execute(Runnable command); }Executor接口只有一個execute方法,用來替代通常創(chuàng)建或啟動線程的方法。例如,使用Thread來創(chuàng)建并啟動線程的代碼如下:
Thread t = new Thread(); t.start();使用Executor來啟動線程執(zhí)行任務(wù)的代碼如下:
Thread t = new Thread(); executor.execute(t);對于不同的Executor實現(xiàn),execute()方法可能是創(chuàng)建一個新線程并立即啟動,也有可能是使用已有的工作線程來運行傳入的任務(wù),也可能是根據(jù)設(shè)置線程池的容量或者阻塞隊列的容量來決定是否要將傳入的線程放入阻塞隊列中或者拒絕接收傳入的線程。
ExecutorService接口
ExecutorService接口繼承自Executor接口,提供了管理終止的方法,以及可為跟蹤一個或多個異步任務(wù)執(zhí)行狀況而生成 Future 的方法。增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。如果需要支持即時關(guān)閉,也就是shutDownNow()方法,則任務(wù)需要正確處理中斷。
ScheduledExecutorService接口
ScheduledExecutorService擴展ExecutorService接口并增加了schedule方法。調(diào)用schedule方法可以在指定的延時后執(zhí)行一個Runnable或者Callable任務(wù)。ScheduledExecutorService接口還定義了按照指定時間間隔定期執(zhí)行任務(wù)的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。
ThreadPoolExecutor分析
ThreadPoolExecutor繼承自AbstractExecutorService,也是實現(xiàn)了ExecutorService接口。
幾個重要的字段
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;ctl是對線程池的運行狀態(tài)和線程池中有效線程的數(shù)量進行控制的一個字段, 它包含兩部分的信息: 線程池的運行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),這里可以看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。
下面再介紹下線程池的運行狀態(tài). 線程池一共有五種狀態(tài), 分別是:
進入TERMINATED的條件如下:
- 線程池不是RUNNING狀態(tài);
- 線程池狀態(tài)不是TIDYING狀態(tài)或TERMINATED狀態(tài);
- 如果線程池狀態(tài)是SHUTDOWN并且workerQueue為空;
- workerCount為0;
- 設(shè)置TIDYING狀態(tài)成功。
下圖為線程池的狀態(tài)轉(zhuǎn)換過程:
ctl相關(guān)方法
這里還有幾個對ctl進行計算的方法:
private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }- runStateOf:獲取運行狀態(tài);
- workerCountOf:獲取活動線程數(shù);
- ctlOf:獲取運行狀態(tài)和活動線程數(shù)的值。
ThreadPoolExecutor構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler; }構(gòu)造方法中的字段含義如下:
corePoolSize:核心線程數(shù)量,當(dāng)有新任務(wù)在execute()方法提交時,會執(zhí)行以下判斷:
- 如果運行的線程少于 corePoolSize,則創(chuàng)建新線程來處理任務(wù),即使線程池中的其他線程是空閑的;
- 如果線程池中的線程數(shù)量大于等于 corePoolSize 且小于 maximumPoolSize,則只有當(dāng)workQueue滿時才創(chuàng)建新的線程去處理任務(wù);
- 如果設(shè)置的corePoolSize 和 maximumPoolSize相同,則創(chuàng)建的線程池的大小是固定的,這時如果有新任務(wù)提交,若workQueue未滿,則將請求放入workQueue中,等待有空閑的線程去從workQueue中取任務(wù)并處理;
- 如果運行的線程數(shù)量大于等于maximumPoolSize,這時如果workQueue已經(jīng)滿了,則通過handler所指定的策略來處理任務(wù);
- maximumPoolSize:最大線程數(shù)量;
- workQueue:等待隊列,當(dāng)任務(wù)提交時,如果線程池中的線程數(shù)量大于等于corePoolSize的時候,把該任務(wù)封裝成一個Worker對象放入等待隊列;
- workQueue:保存等待執(zhí)行的任務(wù)的阻塞隊列,當(dāng)提交一個新的任務(wù)到線程池以后, 線程池會根據(jù)當(dāng)前線程池中正在運行著的線程的數(shù)量來決定對該任務(wù)的處理方式,主要有以下幾種處理方式:
- 直接切換:這種方式常用的隊列是SynchronousQueue,但現(xiàn)在還沒有研究過該隊列,這里暫時還沒法介紹;
- 使用無界隊列:一般使用基于鏈表的阻塞隊列LinkedBlockingQueue。如果使用這種方式,那么線程池中能夠創(chuàng)建的最大線程數(shù)就是corePoolSize,而maximumPoolSize就不會起作用了(后面也會說到)。當(dāng)線程池中所有的核心線程都是RUNNING狀態(tài)時,這時一個新的任務(wù)提交就會放入等待隊列中。
- 使用有界隊列:一般使用ArrayBlockingQueue。使用該方式可以將線程池的最大線程數(shù)量限制為maximumPoolSize,這樣能夠降低資源的消耗,但同時這種方式也使得線程池對線程的調(diào)度變得更困難,因為線程池和隊列的容量都是有限的值,所以要想使線程池處理任務(wù)的吞吐率達到一個相對合理的范圍,又想使線程調(diào)度相對簡單,并且還要盡可能的降低線程池對資源的消耗,就需要合理的設(shè)置這兩個數(shù)量。
- 如果要想降低系統(tǒng)資源的消耗(包括CPU的使用率,操作系統(tǒng)資源的消耗,上下文環(huán)境切換的開銷等), 可以設(shè)置較大的隊列容量和較小的線程池容量, 但這樣也會降低線程處理任務(wù)的吞吐量。
- 如果提交的任務(wù)經(jīng)常發(fā)生阻塞,那么可以考慮通過調(diào)用 setMaximumPoolSize() 方法來重新設(shè)定線程池的容量。
- 如果隊列的容量設(shè)置的較小,通常需要將線程池的容量設(shè)置大一點,這樣CPU的使用率會相對的高一些。但如果線程池的容量設(shè)置的過大,則在提交的任務(wù)數(shù)量太多的情況下,并發(fā)量會增加,那么線程之間的調(diào)度就是一個要考慮的問題,因為這樣反而有可能降低處理任務(wù)的吞吐量。
- keepAliveTime:線程池維護線程所允許的空閑時間。當(dāng)線程池中的線程數(shù)量大于corePoolSize的時候,如果這時沒有新的任務(wù)提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime;
- threadFactory:它是ThreadFactory類型的變量,用來創(chuàng)建新線程。默認使用Executors.defaultThreadFactory() 來創(chuàng)建線程。使用默認的ThreadFactory來創(chuàng)建線程時,會使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級并且是非守護線程,同時也設(shè)置了線程的名稱。
- handler:它是RejectedExecutionHandler類型的變量,表示線程池的飽和策略。如果阻塞隊列滿了并且沒有空閑的線程,這時如果繼續(xù)提交任務(wù),就需要采取一種策略處理該任務(wù)。線程池提供了4種策略:
- AbortPolicy:直接拋出異常,這是默認策略;
- CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
- DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
- DiscardPolicy:直接丟棄任務(wù);
所以,任務(wù)提交時,判斷的順序為 corePoolSize --> workQueue --> maximumPoolSize。
execute方法
execute()方法用來提交任務(wù),代碼如下:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** clt記錄著runState和workerCount*/int c = ctl.get();/** workerCountOf方法取出低29位的值,表示當(dāng)前活動的線程數(shù);* 如果當(dāng)前活動線程數(shù)小于corePoolSize,則新建一個線程放入線程池中;* 并把任務(wù)添加到該線程中。*/if (workerCountOf(c) < corePoolSize) {/** addWorker中的第二個參數(shù)表示限制添加線程的數(shù)量是根據(jù)corePoolSize來判斷還是maximumPoolSize來判斷;* 如果為true,根據(jù)corePoolSize來判斷;* 如果為false,則根據(jù)maximumPoolSize來判斷*/if (addWorker(command, true))return;/** 如果添加失敗,則重新獲取ctl值*/c = ctl.get();}/** 如果當(dāng)前線程池是運行狀態(tài)并且任務(wù)添加到隊列成功*/if (isRunning(c) && workQueue.offer(command)) {// 重新獲取ctl值int recheck = ctl.get();// 再次判斷線程池的運行狀態(tài),如果不是運行狀態(tài),由于之前已經(jīng)把command添加到workQueue中了,// 這時需要移除該command// 執(zhí)行過后通過handler使用拒絕策略對該任務(wù)進行處理,整個方法返回if (! isRunning(recheck) && remove(command))reject(command);/** 獲取線程池中的有效線程數(shù),如果數(shù)量是0,則執(zhí)行addWorker方法* 這里傳入的參數(shù)表示:* 1. 第一個參數(shù)為null,表示在線程池中創(chuàng)建一個線程,但不去啟動;* 2. 第二個參數(shù)為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize,添加線程時根據(jù)maximumPoolSize來判斷;* 如果判斷workerCount大于0,則直接返回,在workQueue中新增的command會在將來的某個時刻被執(zhí)行。*/else if (workerCountOf(recheck) == 0)addWorker(null, false);}/** 如果執(zhí)行到這里,有兩種情況:* 1. 線程池已經(jīng)不是RUNNING狀態(tài);* 2. 線程池是RUNNING狀態(tài),但workerCount >= corePoolSize并且workQueue已滿。* 這時,再次調(diào)用addWorker方法,但第二個參數(shù)傳入為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize;* 如果失敗則拒絕該任務(wù)*/else if (!addWorker(command, false))reject(command); }簡單來說,在執(zhí)行execute()方法時如果狀態(tài)一直是RUNNING時,的執(zhí)行過程如下:
這里要注意一下addWorker(null, false);,也就是創(chuàng)建一個線程,但并沒有傳入任務(wù),因為任務(wù)已經(jīng)被添加到workQueue中了,所以worker在執(zhí)行的時候,會直接從workQueue中獲取任務(wù)。所以,在workerCountOf(recheck) == 0時執(zhí)行addWorker(null, false);也是為了保證線程池在RUNNING狀態(tài)下必須要有一個線程來執(zhí)行任務(wù)。
execute方法執(zhí)行流程如下:
addWorker方法
addWorker方法的主要工作是在線程池中創(chuàng)建一個新的線程并執(zhí)行,firstTask參數(shù) 用于指定新增的線程執(zhí)行的第一個任務(wù),core參數(shù)為true表示在新增線程時會判斷當(dāng)前活動線程數(shù)是否少于corePoolSize,false表示新增線程前需要判斷當(dāng)前活動線程數(shù)是否少于maximumPoolSize,代碼如下:
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();// 獲取運行狀態(tài)int rs = runStateOf(c);/** 這個if判斷* 如果rs >= SHUTDOWN,則表示此時不再接收新任務(wù);* 接著判斷以下3個條件,只要有1個不滿足,則返回false:* 1. rs == SHUTDOWN,這時表示關(guān)閉狀態(tài),不再接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊列中已保存的任務(wù)* 2. firsTask為空* 3. 阻塞隊列不為空* * 首先考慮rs == SHUTDOWN的情況* 這種情況下不會接受新提交的任務(wù),所以在firstTask不為空的時候會返回false;* 然后,如果firstTask為空,并且workQueue也為空,則返回false,* 因為隊列中已經(jīng)沒有任務(wù)了,不需要再添加線程了*/// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 獲取線程數(shù)int wc = workerCountOf(c);// 如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false;// 這里的core是addWorker方法的第二個參數(shù),如果為true表示根據(jù)corePoolSize來比較,// 如果為false則根據(jù)maximumPoolSize來比較。// if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 嘗試增加workerCount,如果成功,則跳出第一個for循環(huán)if (compareAndIncrementWorkerCount(c))break retry;// 如果增加workerCount失敗,則重新獲取ctl的值c = ctl.get(); // Re-read ctl// 如果當(dāng)前的運行狀態(tài)不等于rs,說明狀態(tài)已被改變,返回第一個for循環(huán)繼續(xù)執(zhí)行if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 根據(jù)firstTask來創(chuàng)建Worker對象w = new Worker(firstTask);// 每一個Worker對象都會創(chuàng)建一個線程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());// rs < SHUTDOWN表示是RUNNING狀態(tài);// 如果rs是RUNNING狀態(tài)或者rs是SHUTDOWN狀態(tài)并且firstTask為null,向線程池中添加線程。// 因為在SHUTDOWN時不會在添加新的任務(wù),但還是會執(zhí)行workQueue中的任務(wù)if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// workers是一個HashSetworkers.add(w);int s = workers.size();// largestPoolSize記錄著線程池中出現(xiàn)過的最大線程數(shù)量if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 啟動線程t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted; }注意一下這里的t.start()這個語句,啟動時會調(diào)用Worker類中的run方法,Worker本身實現(xiàn)了Runnable接口,所以一個Worker類型的對象也是一個線程。
Worker類
線程池中的每一個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象,看一下Worker的定義:
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable {/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}} }Worker類繼承了AQS,并實現(xiàn)了Runnable接口,注意其中的firstTask和thread屬性:firstTask用它來保存?zhèn)魅氲娜蝿?wù);thread是在調(diào)用構(gòu)造方法時通過ThreadFactory來創(chuàng)建的線程,是用來處理任務(wù)的線程。
在調(diào)用構(gòu)造方法時,需要把任務(wù)傳入,這里通過getThreadFactory().newThread(this);來新建一個線程,newThread方法傳入的參數(shù)是this,因為Worker本身繼承了Runnable接口,也就是一個線程,所以一個Worker對象在啟動的時候會調(diào)用Worker類中的run方法。
Worker繼承了AQS,使用AQS來實現(xiàn)獨占鎖的功能。為什么不使用ReentrantLock來實現(xiàn)呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:
所以,Worker繼承自AQS,用于判斷線程是否空閑以及是否可以被中斷。
此外,在構(gòu)造方法中執(zhí)行了setState(-1);,把state變量設(shè)置為-1,為什么這么做呢?是因為AQS中默認的state是0,如果剛創(chuàng)建了一個Worker對象,還沒有執(zhí)行任務(wù)時,這時就不應(yīng)該被中斷,看一下tryAquire方法:
protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false; }tryAcquire方法是根據(jù)state是否是0來判斷的,所以,setState(-1);將state設(shè)置為-1是為了禁止在執(zhí)行任務(wù)前對線程進行中斷。
正因為如此,在runWorker方法中會先調(diào)用Worker對象的unlock方法將state設(shè)置為0.
runWorker方法
在Worker類中的run方法調(diào)用了runWorker方法來執(zhí)行任務(wù),runWorker方法的代碼如下:
final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 獲取第一個任務(wù)Runnable task = w.firstTask;w.firstTask = null;// 允許中斷w.unlock(); // allow interrupts// 是否因為異常退出循環(huán)boolean completedAbruptly = true;try {// 如果task為空,則通過getTask來獲取任務(wù)while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);} }這里說明一下第一個if判斷,目的是:
- 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài);
- 如果不是的話,則要保證當(dāng)前線程不是中斷狀態(tài);
這里要考慮在執(zhí)行該if語句期間可能也執(zhí)行了shutdownNow方法,shutdownNow方法會把狀態(tài)設(shè)置為STOP,回顧一下STOP狀態(tài):
不能接受新任務(wù),也不處理隊列中的任務(wù),會中斷正在處理任務(wù)的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時,調(diào)用 shutdownNow() 方法會使線程池進入到該狀態(tài)。
STOP狀態(tài)要中斷線程池中的所有線程,而這里使用Thread.interrupted()來判斷是否中斷是為了確保在RUNNING或者SHUTDOWN狀態(tài)時線程是非中斷狀態(tài)的,因為Thread.interrupted()方法會復(fù)位中斷的狀態(tài)。
總結(jié)一下runWorker方法的執(zhí)行過程:
這里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給子類來實現(xiàn)。
completedAbruptly變量來表示在執(zhí)行任務(wù)過程中是否出現(xiàn)了異常,在processWorkerExit方法中會對該變量的值進行判斷。
getTask方法
getTask方法用來從阻塞隊列中取任務(wù),代碼如下:
private Runnable getTask() {// timeOut變量的值表示上次從阻塞隊列中取任務(wù)時是否超時boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary./** 如果線程池狀態(tài)rs >= SHUTDOWN,也就是非RUNNING狀態(tài),再進行以下判斷:* 1. rs >= STOP,線程池是否正在stop;* 2. 阻塞隊列是否為空。* 如果以上條件滿足,則將workerCount減1并返回null。* 因為如果當(dāng)前線程池狀態(tài)的值是SHUTDOWN或以上時,不允許再向阻塞隊列中添加任務(wù)。*/if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?// timed變量用于判斷是否需要進行超時控制。// allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時;// wc > corePoolSize,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量;// 對于超過核心線程數(shù)量的這些線程,需要進行超時控制boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;/** wc > maximumPoolSize的情況是因為可能在此方法執(zhí)行階段同時執(zhí)行了setMaximumPoolSize方法;* timed && timedOut 如果為true,表示當(dāng)前操作需要進行超時控制,并且上次從阻塞隊列中獲取任務(wù)發(fā)生了超時* 接下來判斷,如果有效線程數(shù)量大于1,或者阻塞隊列是空的,那么嘗試將workerCount減1;* 如果減1失敗,則返回重試。* 如果wc == 1時,也就說明當(dāng)前線程是線程池中唯一的一個線程了。*/if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {/** 根據(jù)timed來判斷,如果為true,則通過阻塞隊列的poll方法進行超時控制,如果在keepAliveTime時間內(nèi)沒有獲取到任務(wù),則返回null;* 否則通過take方法,如果這時隊列為空,則take方法會阻塞直到隊列不為空。* */Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;// 如果 r == null,說明已經(jīng)超時,timedOut設(shè)置為truetimedOut = true;} catch (InterruptedException retry) {// 如果獲取任務(wù)時當(dāng)前線程發(fā)生了中斷,則設(shè)置timedOut為false并返回循環(huán)重試timedOut = false;}} }這里重要的地方是第二個if判斷,目的是控制線程池的有效線程數(shù)量。由上文中的分析可以知道,在執(zhí)行execute方法時,如果當(dāng)前線程池的線程數(shù)量超過了corePoolSize且小于maximumPoolSize,并且workQueue已滿時,則可以增加工作線程,但這時如果超時沒有獲取到任務(wù),也就是timedOut為true的情況,說明workQueue已經(jīng)為空了,也就說明了當(dāng)前線程池中不需要那么多線程來執(zhí)行任務(wù)了,可以把多于corePoolSize數(shù)量的線程銷毀掉,保持線程數(shù)量在corePoolSize即可。
什么時候會銷毀?當(dāng)然是runWorker方法執(zhí)行完之后,也就是Worker中的run方法執(zhí)行完,由JVM自動回收。
getTask方法返回null時,在runWorker方法中會跳出while循環(huán),然后會執(zhí)行processWorkerExit方法。
processWorkerExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果completedAbruptly值為true,則說明線程執(zhí)行時出現(xiàn)了異常,需要將workerCount減1;// 如果線程執(zhí)行時沒有出現(xiàn)異常,說明在getTask()方法中已經(jīng)已經(jīng)對workerCount進行了減1操作,這里就不必再減了。 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//統(tǒng)計完成的任務(wù)數(shù)completedTaskCount += w.completedTasks;// 從workers中移除,也就表示著從線程池中移除了一個工作線程workers.remove(w);} finally {mainLock.unlock();}// 根據(jù)線程池狀態(tài)進行判斷是否結(jié)束線程池tryTerminate();int c = ctl.get();/** 當(dāng)線程池是RUNNING或SHUTDOWN狀態(tài)時,如果worker是異常結(jié)束,那么會直接addWorker;* 如果allowCoreThreadTimeOut=true,并且等待隊列有任務(wù),至少保留一個worker;* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。*/if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);} }至此,processWorkerExit執(zhí)行完之后,工作線程被銷毀,以上就是整個工作線程的生命周期,從execute方法開始,Worker使用ThreadFactory創(chuàng)建新的工作線程,runWorker通過getTask獲取任務(wù),然后執(zhí)行任務(wù),如果getTask返回null,進入processWorkerExit方法,整個線程結(jié)束,如圖所示:
tryTerminate方法
tryTerminate方法根據(jù)線程池狀態(tài)進行判斷是否結(jié)束線程池,代碼如下:
final void tryTerminate() {for (;;) {int c = ctl.get();/** 當(dāng)前線程池的狀態(tài)為以下幾種情況時,直接返回:* 1. RUNNING,因為還在運行中,不能停止;* 2. TIDYING或TERMINATED,因為線程池中已經(jīng)沒有正在運行的線程了;* 3. SHUTDOWN并且等待隊列非空,這時要執(zhí)行完workQueue中的task;*/if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 如果線程數(shù)量不為0,則中斷一個空閑的工作線程,并返回if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 這里嘗試設(shè)置狀態(tài)為TIDYING,如果設(shè)置成功,則調(diào)用terminated方法if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// terminated方法默認什么都不做,留給子類實現(xiàn)terminated();} finally {// 設(shè)置狀態(tài)為TERMINATEDctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS} }shutdown方法
shutdown方法要將線程池切換到SHUTDOWN狀態(tài),并調(diào)用interruptIdleWorkers方法請求中斷所有空閑的worker,最后調(diào)用tryTerminate嘗試結(jié)束線程池。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 安全策略判斷checkShutdownAccess();// 切換狀態(tài)為SHUTDOWNadvanceRunState(SHUTDOWN);// 中斷空閑線程interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 嘗試結(jié)束線程池tryTerminate(); }這里思考一個問題:在runWorker方法中,執(zhí)行任務(wù)時對Worker對象w進行了lock操作,為什么要在執(zhí)行任務(wù)的時候?qū)γ總€工作線程都加鎖呢?
下面仔細分析一下:
- 在getTask方法中,如果這時線程池的狀態(tài)是SHUTDOWN并且workQueue為空,那么就應(yīng)該返回null來結(jié)束這個工作線程,而使線程池進入SHUTDOWN狀態(tài)需要調(diào)用shutdown方法;
- shutdown方法會調(diào)用interruptIdleWorkers來中斷空閑的線程,interruptIdleWorkers持有mainLock,會遍歷workers來逐個判斷工作線程是否空閑。但getTask方法中沒有mainLock;
- 在getTask中,如果判斷當(dāng)前線程池狀態(tài)是RUNNING,并且阻塞隊列為空,那么會調(diào)用workQueue.take()進行阻塞;
- 如果在判斷當(dāng)前線程池狀態(tài)是RUNNING后,這時調(diào)用了shutdown方法把狀態(tài)改為了SHUTDOWN,這時如果不進行中斷,那么當(dāng)前的工作線程在調(diào)用了workQueue.take()后會一直阻塞而不會被銷毀,因為在SHUTDOWN狀態(tài)下不允許再有新的任務(wù)添加到workQueue中,這樣一來線程池永遠都關(guān)閉不了了;
- 由上可知,shutdown方法與getTask方法(從隊列中獲取任務(wù)時)存在競態(tài)條件;
- 解決這一問題就需要用到線程的中斷,也就是為什么要用interruptIdleWorkers方法。在調(diào)用workQueue.take()時,如果發(fā)現(xiàn)當(dāng)前線程在執(zhí)行之前或者執(zhí)行期間是中斷狀態(tài),則會拋出InterruptedException,解除阻塞的狀態(tài);
- 但是要中斷工作線程,還要判斷工作線程是否是空閑的,如果工作線程正在處理任務(wù),就不應(yīng)該發(fā)生中斷;
- 所以Worker繼承自AQS,在工作線程處理任務(wù)時會進行l(wèi)ock,interruptIdleWorkers在進行中斷時會使用tryLock來判斷該工作線程是否正在處理任務(wù),如果tryLock返回true,說明該工作線程當(dāng)前未執(zhí)行任務(wù),這時才可以被中斷。
下面就來分析一下interruptIdleWorkers方法。
interruptIdleWorkers方法
private void interruptIdleWorkers() {interruptIdleWorkers(false); }private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();} }interruptIdleWorkers遍歷workers中所有的工作線程,若線程沒有被中斷tryLock成功,就中斷該線程。
為什么需要持有mainLock?因為workers是HashSet類型的,不能保證線程安全。
shutdownNow方法
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);// 中斷所有工作線程,無論是否空閑interruptWorkers();// 取出隊列中沒有被執(zhí)行的任務(wù)tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks; }shutdownNow方法與shutdown方法類似,不同的地方在于:
shutdownNow方法執(zhí)行完之后調(diào)用tryTerminate方法,該方法在上文已經(jīng)分析過了,目的就是使線程池的狀態(tài)設(shè)置為TERMINATED。
線程池的監(jiān)控
通過線程池提供的參數(shù)進行監(jiān)控。線程池里有一些屬性在監(jiān)控線程池的時候可以使用
- getTaskCount:線程池已經(jīng)執(zhí)行的和未執(zhí)行的任務(wù)總數(shù);
- getCompletedTaskCount:線程池已完成的任務(wù)數(shù)量,該值小于等于taskCount;
- getLargestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是否滿過,也就是達到了maximumPoolSize;
- getPoolSize:線程池當(dāng)前的線程數(shù)量;
- getActiveCount:當(dāng)前線程池中正在執(zhí)行任務(wù)的線程數(shù)量。
通過這些方法,可以對線程池進行監(jiān)控,在ThreadPoolExecutor類中提供了幾個空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以擴展這些方法在執(zhí)行前或執(zhí)行后增加一些新的操作,例如統(tǒng)計線程池的執(zhí)行任務(wù)的時間等,可以繼承自ThreadPoolExecutor來進行擴展。
總結(jié)
本文比較詳細的分析了線程池的工作流程,總體來說有如下幾個內(nèi)容:
- 分析了線程的創(chuàng)建,任務(wù)的提交,狀態(tài)的轉(zhuǎn)換以及線程池的關(guān)閉;
- 這里通過execute方法來展開線程池的工作流程,execute方法通過corePoolSize,maximumPoolSize以及阻塞隊列的大小來判斷決定傳入的任務(wù)應(yīng)該被立即執(zhí)行,還是應(yīng)該添加到阻塞隊列中,還是應(yīng)該拒絕任務(wù)。
- 介紹了線程池關(guān)閉時的過程,也分析了shutdown方法與getTask方法存在競態(tài)條件;
- 在獲取任務(wù)時,要通過線程池的狀態(tài)來判斷應(yīng)該結(jié)束工作線程還是阻塞線程等待新的任務(wù),也解釋了為什么關(guān)閉線程池時要中斷工作線程以及為什么每一個worker都需要lock。
在向線程池提交任務(wù)時,除了execute方法,還有一個submit方法,submit方法會返回一個Future對象用于獲取返回值,有關(guān)Future和Callable請自行了解一下相關(guān)的文章,這里就不介紹了。
from:?https://juejin.im/entry/58fada5d570c350058d3aaad
總結(jié)
以上是生活随笔為你收集整理的深入理解 Java 线程池:ThreadPoolExecutor的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 聊聊并发(三)——JAVA线程池的分析和
- 下一篇: 并发新特性—Executor 框架与线程