ThreadPoolExecutor(五)——线程池关闭相关操作
?補充了和Thread的interrupt操作相關的知識,回頭再來看ThreadPoolExecutor中interrupt,關閉線程池等相關操作。
1.shutdown
/*** Initiates an orderly shutdown in which previously submitted* tasks are executed, but no new tasks will be accepted.* Invocation has no additional effect if already shut down.** <p>This method does not wait for previously submitted tasks to* complete execution. Use {@link #awaitTermination awaitTermination}* to do that.** @throws SecurityException {@inheritDoc}*/public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}先看注釋:
開始一個順序的shutdown操作,shutdown之前被執行的已提交任務,新的任務不會再被接收了。如果線程池已經被shutdown了,該方法的調用沒有其他任何效果了。
該方法不會等待之前已經提交的任務執行完畢,awaitTermination方法才有這個效果。
具體看內部邏輯,checkShutdownAccess這個方法是確保允許調用發interrupt每個Worker線程的,具體就不看了。
1.advanceRunState方法
/*** Transitions runState to given target, or leaves it alone if* already at least the given target.** @param targetState the desired state, either SHUTDOWN or STOP* (but not TIDYING or TERMINATED -- use tryTerminate for that)*/private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();if (runStateAtLeast(c, targetState) ||ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}}該方法會原子設置線程池的rs(runState)。
設置的邏輯是,如果當前的線程池狀態已經是要設置的狀態,或者已經超過了要設置狀態(runStateAtLeast方法返回值是true),就保持不做任何操作了,直接break。
如果線程池當前狀態比要設置的狀態小,比如當前是RUNNING,要設置是的SHUTDOWN,那么runStateAtLeast方法返回false,繼續走第二個判斷,原子設置rs,如果失敗的話繼續這個流程。
2.interruptIdleWorkers方法
/*** Common form of interruptIdleWorkers, to avoid having to* remember what the boolean argument means.*/private void interruptIdleWorkers() {interruptIdleWorkers(false);}內部調用了interruptIdleWorkers方法,參數默認設置為false了。具體參數的含義下一個段落說interruptIdleWorkers方法的時候再說。
這里簡單說一下傳false的效果,就是檢查當前所有worker線程,在獲取Worker鎖的情況下,把所有沒有interrupt的線程都執行interrupt操作。
2.interruptIdleWorkers
/*** Interrupts threads that might be waiting for tasks (as* indicated by not being locked) so they can check for* termination or configuration changes. Ignores* SecurityExceptions (in which case some threads may remain* uninterrupted).** @param onlyOne If true, interrupt at most one worker. This is* called only from tryTerminate when termination is otherwise* enabled but there are still other workers. In this case, at* most one waiting worker is interrupted to propagate shutdown* signals in case all threads are currently waiting.* Interrupting any arbitrary thread ensures that newly arriving* workers since shutdown began will also eventually exit.* To guarantee eventual termination, it suffices to always* interrupt only one idle worker, but shutdown() interrupts all* idle workers so that redundant workers exit promptly, not* waiting for a straggler task to finish.*/private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}}先看注釋:
中斷那些可能在等待執行任務的線程(沒有被鎖住的——idle語義,tryLock成功,如果Worker線程在執行任務,runWorker方法中的執行任務的Worker是占有的鎖的,所以這里是無法獲取鎖的,也就是非idle的了),讓他們能檢查是否可以terminate。這里直接吞了SecurityException異常,防止某些線程在interrupt之后仍然處于uninterrupted狀態。
onlyOne參數,如果是true,最多只中斷一個Worker。這種情況只有在tryTerminate調用的時候才會出現,表示可以termination,但是還有其他的Worker存在。在這種情況下,最多只有一個處于等待的Worker被中斷,來保證shutdown信號的繁衍傳遞(propagate語義),以便能處理所有信號都處于等待狀態的情況,這個情景是什么,代碼塊在哪兒?。
中斷任意一個隨機的線程都能保證從shutdown操作開始之后新添加的Worker最終都能退出(哪個代碼塊有這個功能?)。
為了保證最終的termination,永遠只interrupt一個線程就足夠了(為什么足夠),但是shutdown操作總是所有idle的workers,這樣冗余的workers可以立即退出,而不是等待一個straggler任務來完成操作。
只看這一個方法和這一段注釋可能會有點云里霧里,還需要結合其他方法一起看。
3.interruptWorkers
/*** Interrupts all threads, even if active. Ignores SecurityExceptions* (in which case some threads may remain uninterrupted).*/private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {try {w.thread.interrupt();} catch (SecurityException ignore) {}}} finally {mainLock.unlock();}}中斷所有線程,即使線程還是active的。忽略所有SecurityExceptions異常。
和interruptIdleWorkers和區別從代碼上看就是后者在進行中斷之前進行了一個而判斷:
if (!t.isInterrupted() && w.tryLock())對于每一個Worker,如果它的線程之前沒有被中斷,而且該Worker的tryLock方法返回true,才進行中斷。
縱觀整個ThreadPoolExecutor類代碼,只有runWorker方法中會嘗試持有Worker鎖(調用Worker的lock方法)。而Worker之所以繼承AbstractQueuedSynchronizer類的語義也是為了保護一個正在等待執行任務的Worker線程不被中斷操作影響。interruptIdleWorkers方法會因為這層保護而放棄對某個Worker線程的中斷(tryLock為false)。
但是對于interruptWorkers方法,沒有這個判斷,是無差別的中斷操作(除非中斷是拋出了SecurityException異常進入catch塊并被吞掉),在shutdownNow方法中調用。
4.shutdownNow方法
/*** Attempts to stop all actively executing tasks, halts the* processing of waiting tasks, and returns a list of the tasks* that were awaiting execution. These tasks are drained (removed)* from the task queue upon return from this method.** <p>This method does not wait for actively executing tasks to* terminate. Use {@link #awaitTermination awaitTermination} to* do that.** <p>There are no guarantees beyond best-effort attempts to stop* processing actively executing tasks. This implementation* cancels tasks via {@link Thread#interrupt}, so any task that* fails to respond to interrupts may never terminate.** @throws SecurityException {@inheritDoc}*/public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}嘗試stop所有actively executing線程,halt所有正處于等待狀態的任務,并返回一個等待執行的task列表。返回列表之后,這些任務已經從task隊列中移除了,通過drained (removed)操作。
該方法不會等待actively executing tasks終止,而是立即結束。如果想等待指定時間,可以調用awaitTermination方法。
該方法和shutdownNow方法的區別有三個:
第一個是把狀態設置為STOP而不是SHUTDOWN。
第二個是調用interruptWorkers方法而不是interruptIdleWorkers,這兩個方法的區別上面已經說過了。
第三個是drainQueue把所有任務從隊列中移除。
5.tryTerminate方法
5.1注釋部分
/*** Transitions to TERMINATED state if either (SHUTDOWN and pool* and queue empty) or (STOP and pool empty). If otherwise* eligible to terminate but workerCount is nonzero, interrupts an* idle worker to ensure that shutdown signals propagate. This* method must be called following any action that might make* termination possible -- reducing worker count or removing tasks* from the queue during shutdown. The method is non-private to* allow access from ScheduledThreadPoolExecutor.*/final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}先看注釋:
在線程狀態是SHUTDOWN而且線程池和任務隊列都是空的,或者線程池處于STOP狀態,并且線程池是空的,把線程池的狀態改為TERMINATED。
如果線程池狀態是可以被terminate,但是wc不是0,那么用interruptIdleWorkers(true)來中斷一個idle worker來確保shutdown操作的繁衍(propagate語義)。
該方法一定要跟在任何使termination可行的操作之后——減少wc的值或者在shutdown過程中從任務隊列中移除任務。目前已知調用:
1.addWorker中
if (t == null ||(rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null))) {decrementWorkerCount();tryTerminate();return false;}減少wc操作。
2.shutdown操作
3.shutdownNow操作
4.remove操作
remove操作的執行在execute方法中double check的時候,
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}處理的情形是:double-check whether we should have added a thread?(because existing ones died since last checking) or that?the pool shut down since entry into this method.?So we?recheck state and if necessary roll back the enqueuing if?stopped...
如果在成功添加task之后線程池shutdown了,需要回滾入隊列操作——remove。
5.purge操作
6.processWorkerExit操作
該操作在runWorkerd的finally塊中執行。
5.2tryTerminate方法代碼部分
先看第一個if判斷:
if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;如果線程池的狀態是RUNNING或者是TIDYING,TERMINATED,直接返回。
如果線程池狀態是SHUTDOWN,而且任務隊列不是空的,也直接返回。
如果線程池狀態是SHUTDOWN,而且任務隊列是空的,向下進行。
如果線程池狀態是STOP,向下進行。
再看第二個if判斷:
if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}如果wc不是0,表示線程池可以被terminate,調用interruptIdleWorkers(true)來出發繁衍shutdown操作(這個后面再看)。
所以能走到下面流程的條件是:
1.線程池狀態是STOP且wc是0
2.線程池狀態是SHUTDOWN而且wc(pool)和任務隊列(queue)都是空的
只有這兩個情況,線程池的狀態會被原子操作ctl.compareAndSet(c, ctlOf(TIDYING, 0)將狀態設置為TIDYING,并在成功之后(因為tryTerminate方法會在多出調用,存在競爭)進一步在terminated結束之后的finally塊中通過ctl.set(ctlOf(TERMINATED, 0))設置為TERMINATED。
最后執行termination.signalAll(),會喚醒awaitTermination方法中由于執行termination.awaitNanos(nanos)操作進入等待狀態的線程。
6.processWorkerExit
顧名思義,這個方法是處理Worker退出的時候,所以位置也在runWorker方法的finally塊中。
/*** Performs cleanup and bookkeeping for a dying worker. Called* only from worker threads. Unless completedAbruptly is set,* assumes that workerCount has already been adjusted to account* for exit. This method removes thread from worker set, and* possibly terminates the pool or replaces the worker if either* it exited due to user task exception or if fewer than* corePoolSize workers are running or queue is non-empty but* there are no workers.** @param w the worker* @param completedAbruptly if the worker died due to user exception*/private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}}先看注釋:
執行清除操作,bookkeeping for a dying worker(這是個啥?)。只有Worker線程會調用(這個顯而易見,因為runWorker就是Worker線程run方法跑的)。除非completedAbruptly標志位被設置了(用戶任務拋出異常),否則都表示線程池準備退出了。該方法可能會terminate線程池(wc和task隊列都是空的),或者替換線程(如果是用戶task拋出異常結束的runWorker循環,或者少于核心線程數的workers在運行,或者任務隊列非空但是已經沒有Worker了)。
執行的操作有這么幾個:
1.首先把該Worker從HashSet<Worker>中移除。
2.tryTerminate,如果這時wc和task隊列都是空的話,就可以關閉線程池了。
3.如果completedAbruptly為false,對應這個主題的SHUTDOWN操作。注意如果是allowCoreThreadTimeOut=true這種模式,線程要減小到0才不替換,否則wc小于corePoolSize就會替換線程。
7.addWorker(null, false)的含義?
1.如果線程處于超過SHUTDOWN的那三種狀態的話,該方法返回false。
2.如果處于SHUTDOWN狀態,而且任務不為null——addWorker(not null, X),也會直接返回false,語義是:在線程池關閉之后,新來的任務不會被執行也不會入隊列。不會入隊列是由execute方法中的double check保證的,不執行就是這里的直接return false。
3.如果處于SHUTDOWN狀態,而且任務時null,這時如果隊列非空,程序會繼續向下走。這里不明白,為什么當前隊列非空時還要用空task構建一個idle Worker入隊列然后執行?為什么不能像情況2中那樣處理,而是構造idle線程等著被interrupt?!
目前已知的該傳參方式只有execute方法recheck發現wc為0,還有processWorkerExit方法,再就是prestartCoreThread等初始化方法。
addWorker方法中有句話:Initially idle threads are usually created via?prestartCoreThread or to replace other dying workers,這個應該是指processWorkerExit方法。
目前對execute方法中這個塊還有疑惑:
else if (workerCountOf(recheck) == 0)addWorker(null, false);8.幾個狀態
shutdown會把狀態改為SHUTDOWN,advanceRunState(SHUTDOWN)
shutdownNow會把狀態改為STOP,advanceRunState(STOP)。
tryTerminate中會在兩重if判斷都過了之后,原子操作ctl.compareAndSet(c, ctlOf(TIDYING, 0)將狀態設置為TIDYING,并在成功之后進一步在terminated結束之后的finally塊中通過ctl.set(ctlOf(TERMINATED, 0))設置為TERMINATED。
?
?
?
總結
以上是生活随笔為你收集整理的ThreadPoolExecutor(五)——线程池关闭相关操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 3D建模技巧:如果想用好ZBrush,必
- 下一篇: 什么是高斯模糊算法?