ThreadPoolExecutor(三)——Worker
1.Worker
Worker是ThreadPoolExecutor的一個內部類,實現了AbstractQueuedSynchronizer抽象類。
/*** Class Worker mainly maintains interrupt control state for* threads running tasks, along with other minor bookkeeping.* This class opportunistically extends AbstractQueuedSynchronizer* to simplify acquiring and releasing a lock surrounding each* task execution. This protects against interrupts that are* intended to wake up a worker thread waiting for a task from* instead interrupting a task being run. We implement a simple* non-reentrant mutual exclusion lock rather than use ReentrantLock* because we do not want worker tasks to be able to reacquire the* lock when they invoke pool control methods like setCorePoolSize.*/看注釋,該類主要是控制在線程執行任務時的interrupt操作的。它繼承了AbstractQueuedSynchronizer類,實現了一個非重入的鎖。該鎖會保護一個正在等待任務被執行的Worker不被interrupt操作打斷。
為什么不用ReentrantLock,要用非重入的鎖?因為作者不想讓這個Worker task在setCorePoolSize這種線程池控制方法調用時能重新獲取到鎖。
2.構造方法
/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}用自己作為task構造一個線程,同時把外層任務賦值給自己的task成員變量,相當于對task做了一個包裝。
3.run方法
run方法調用了ThreadPoolExecutor的runWorker方法,
4.runWorker方法
/*** Main worker run loop. Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don't need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters. Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and* clearInterruptsForTaskRun called to ensure that unless pool is* stopping, this thread does not have its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to* afterExecute. We separately handle RuntimeException, Error* (both of which the specs guarantee that we trap) and arbitrary* Throwables. Because we cannot rethrow Throwables within* Runnable.run, we wrap them within Errors on the way out (to the* thread's UncaughtExceptionHandler). Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread's UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** @param w the worker*/final void runWorker(Worker w) {Runnable task = w.firstTask;w.firstTask = null;boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();clearInterruptsForTaskRun();try {beforeExecute(w.thread, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}這是執行task的主流程。
上面說過,addWorker會用當前task創建一個Worker對象,相當于對task的包裝,然后用Worker對象作為task創建一個Thread,該Thread保存在Worker的thread成員變量中。在addWorker中啟動了這個線程,線程中執行runWorker方法。
先看注釋:
1.首先取傳入的task執行,如果task是null,只要該線程池處于運行狀態,就會通過getTask方法從workQueue中取任務。ThreadPoolExecutor的execute方法會在無法產生core線程的時候向workQueue隊列中offer任務。
getTask方法從隊列中取task的時候會根據相關配置決定是否阻塞和阻塞多久。如果getTask方法結束,返回的是null,runWorker循環結束,執行processWorkerExit方法。
至此,該線程結束自己的使命,從線程池中“消失”。
2.在開始執行任務之前,會調用Worker的lock方法,目的是阻止task正在被執行的時候被interrupt,通過調用clearInterruptsForTaskRun方法來保證的(后面可以看一下這個方法),該線程沒有自己的interrupt set了。
3.beforeExecute和afterExecute方法用于在執行任務前后執行一些自定義的操作,這兩個方法是空的,留給繼承類去填充功能。
我們可以在beforeExecute方法中拋出異常,這樣task不會被執行,而且在跳出該循環的時候completedAbruptly的值是true,表示the worker died due to user exception,會用decrementWorkerCount調整wc。
4.因為Runnable的run方法不能拋出Throwables異常,所以這里重新包裝異常然后拋出,拋出的異常會使當當前線程死掉,可以在afterExecute中對異常做一些處理。
5.afterExecute方法也可能拋出異常,也可能使當前線程死掉。
總結
以上是生活随笔為你收集整理的ThreadPoolExecutor(三)——Worker的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 设计模式(二)简单工厂模式
- 下一篇: 2021年缆索式起重机司机考试内容及缆索