Java线程池ThreadPoolExecutor源码解析
Java線程池ThreadPoolExecutor源碼解析
1.ThreadPoolExecutor的構造實現
以jdk8為準,常說線程池有七大參數,通常而言,有四個參數是比較重要的
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
-
corePoolSize:核心線程數,具體含義理解代碼 -
maximumPoolSize:最大線程數 -
keepAliveTime:線程空閑的存活時間 -
unit:時間單位 -
BlockingQueue:阻塞隊列,用來保存等待執行的任務
接下來去看完整參數的構造實現:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
ThreadFactory:線程工廠,用來創造線程 -
RejectedExecutionHandler:拒絕策略-
如果核心線程數等其他參數非法化就會拋出相應的異常
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); -
之后進行初始化賦值
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;- 注:acc是一個成員變量,用來管理線程池中線程的訪問控制上下文,其實現類是
AccessControlContext
- 注:acc是一個成員變量,用來管理線程池中線程的訪問控制上下文,其實現類是
-
2.線程池的執行execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
總共大致分為三步:要想理解線程池的執行,要先去理解控制字段其具體含義
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
先聲明線程池的五種狀態,再看其他字段方法具體執行了其他什么操作
- RUNNING: -1 << COUNT_BITS,即高3位為111
- SHUTDOWN: 0 << COUNT_BITS,即高3位為000
- STOP : 1 << COUNT_BITS,即高3位為001
- TIDYING : 2 << COUNT_BITS,即高3位為010
- TERMINATED: 3 << COUNT_BITS,即高3位為011
至于其每種空置狀態的具體意義,根據英文釋義結合代碼具體理解,而非直接理解,通過位移位的操作將高3位與低29位分離開來,高三位表示此時整個線程池的運行狀態,低29位表示線程池中線程的數量,再去看execute執行過程即可.
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
- 用于獲取此時線程中的線程數,如果小于核心線程數,就添加任務,添加任務成功則返回,失敗則重新獲取控制字段,
addworker后續了解,復雜的東西簡單化,理解大致操作思想最為核心.
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
- 根據控制字段c去判斷線程池的運行狀態是否正在運行,如果添加任務成功則不會執行失敗,或者說此時線程數有可能已經大于了核心線程數也有可能走到這,所以會將任務添加到阻塞隊列中去,然后重新去獲得控制字段,再去做校驗,如果此時線程池不是正在運行的狀態并且刪除任務成功,這一步主要是為了防止阻塞隊列添加任務成功這個過程,可能線程池不運行了,那么這時候就需要將添加的那個任務刪除,并對他執行拒絕策略,又或者是此時線程池中的線程數已經為0,說明沒有線程在工作了,因此添加一個空任務,至于第二個參數在
addWorker中在做說明
else if (!addWorker(command, false))
reject(command);
- 字面意思就是添加任務失敗,執行拒絕策略,則是為了應對線程池已經到了滿負載的狀態
3.線程池的任務添加addworker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
代碼量很長,但是大致可分為兩部分,且邏輯很清晰
-
這里使用了標簽語法,前半段大致是是否需要添加線程做一系列準備
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }- 獲取控制字段
c:這個字段包含了運行狀態信息和線程池數量信息,是一個復合字段,而rs則是獲取高三位的線程池狀態信息 - 根據前面線程池狀態信息,運行
RUNNING值最小,因此判斷線程池如果處于非運行的狀態,則去判斷是否處于關閉的狀態,判斷第一個任務是否為空,隊列不為空,但是由于前面取反操作,其真正含義是:如果線程池的狀態不是SHUTDOWN,或者任務隊列為空,或者有待執行的任務,那么就不會拒絕新任務的提交,否則就返回false,表示添加任務失敗 - 接下來死循環表示需要去添加執行的任務,首先獲取線程池中的線程數,關鍵的地方在這,如果此時的線程數大于等于容量或者(這里根據傳遞的參數
core來選則比較的目標是核心線程數還是最大線程數),比較失敗,則說明超過了接受的范疇,添加任務失敗,如果沒有失敗,則通過底層CAS操作使得線程數加1,然后直接結束調用,跳出循環,,如果CAS失敗,則說明ctl字段受到了變化,此期間有其他任務參與,重新獲取此字段,去判斷一下重新獲取的ctl字段和之前的rs字段是否相等,這是為了保障多線程情況下出現的一種并發競爭問題導致的線程數發生錯亂.
- 獲取控制字段
-
至此,上半部分的核心已經解決,下半部分此時真正去實現任務的添加,通過線程池中的內部類Worker去實現
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }-
兩個布爾類型暫時不用管,大致猜測意思即可,將firstask任務交付給worker,由worker內部的thread線程去執行,因此需要去理解worker的實現
3.1Worker內部的工作者
3.11構造方法實現
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }接收一個
Runnable參數做為任務進行初始化,這里用到了AQS的一些實現,然后通過線程工廠創造一個新的線程,賦予給內部的成員變量引用- 還有一些鎖的一些操作,后續再看
-
如果工作者的內部線程已經被創造好,實現就緒,要先獲得線程池的互斥鎖,然后對接下來的操作進行互斥訪問
-
重新獲取最新的線程池的運行狀態,只有當線程池處于運行狀態或者處于關閉狀態但沒有待執行的任務時,才能將新工作線程添加到線程池中,也就是worker中去,因此一個worker內部具備一個thread,如果想要實現許多線程去完成線程池的相應操作,需要將worker封裝成集合,因此線程池內部還有一成員變量:
private final HashSet<Worker> workers = new HashSet<Worker>();這樣就確保了每一個
worker都是獨一無二的,不會重復的,也就意味著每一個線程都不一樣. -
而最后一個
largestPoolSize則是保留歷史的最大線程數的,用來記錄,至此就已經添加成功了,只不過此時還沒有執行 -
之后解鎖,用之前標志位
workerAdded表示添加成功,然后啟動線程,也就是去執行這個任務,再用另一個標志workerStarted表示啟動成功. -
最后則是檢查是否有什么異常在啟動期間,如果沒啟動成功,則調用
addWorkerFiled方法去處理3.12 addWorkerFailed方法
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }- 緊接上文,也就是啟動失敗的話,會將執行任務的worker
remove(底層通過HashMap實現鍵的刪除),然后減少線程數,等待一會,這個過程是互斥的,因為牽扯到控制字段
- 緊接上文,也就是啟動失敗的話,會將執行任務的worker
-
至此,添加任務如果成功,則進行執行,如果成功開啟執行,則成功返回
因此,根據線程池的執行添加流程,大致可以將此過程通過繪圖的方式表現出來:
-
4.工作者的run方法是如何運行的
在worker中還有一個方法
public void run() {
runWorker(this);
}
是其執行的具體操作
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 上鎖之前的操作都很容易看懂,處了
getTask,這個方法用來獲取阻塞隊列中的任務,后續再理解- 首先看第一段if,就是用來查看此時線程池的狀態,如果不處于關閉或者運行的狀態,或者線程處于中斷的狀態,則確保線程中斷
- 接下來是一部分異常和錯誤的處理以及執行一些前置任務和一些后置任務
- 最后完成的任務數加一,解鎖,將標志位是否中斷,改為false,表示執行成功.
5.獲取任務的getTask方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
-
首先標志位用來判斷是否超時,默認情況下不超時,跟之前的參數掛鉤,后續再看,然后進入死循環,不斷循環去執行后續操作
-
獲取控制位c和rs運行狀態,之后的if操作含義是如果線程池處于關閉的狀態或者此時隊列為空,就說明沒有任務需要處理了,此時讓線程池中的線程數減一,返回,另一種情況則是線程池的狀態處于關閉狀態之上,則說明線程池現在不執行任務了,不需要管隊列中是否還有任務存在,則同上減一,返回。
-
然后重新獲取線程池的線程數,接下來的time布爾這個字段有些作用,后面的
allowCoreThreadTimeOut是一個控制字段,用來表示線程是否允許超時而返回的一個字段,如:線程池中的核心線程如果因為長時間沒有得到任務的滋養,就如同線程之間會發生饑餓一樣,因此存在一個字段用來控制超時是否生效.因此如果線程數大于核心線程數或者開啟超時控制字段,就說明會執行超時退出. -
接下來的if判斷是用來執行遞減線程數的一個操作,底層采取CAS就不多說了,
wc > maximumPoolSize:用來表示如果大于了最大線程數,說明需要減少線程數,至于為什么會出現這種情況,等會理解.(timed && timedOut):說明開啟超時退出,且上一次獲取任務因超時返回,這個需看后面代碼理解.上面的兩個條件滿足其中之一即可.wc > 1 || workQueue.isEmpty())而這個操作則是為了減少不必要的線程開銷,如果阻塞隊列為空說明沒有任務,那自然不需要多余的線程數去執行,因此會發生接下來的操作,遞減線程數,然后跳到下一次循環. -
之后接下來就是從阻塞隊列中獲取任務的核心了,第一步是根據超時控制字段來決定行為方式,允許超時退出的話,通過
poll方式,不允許則通過take方式,兩種方式大致是一個等待一定時間,如果為空是前提.另一個是無限等待,會阻塞線程.其具體實現通過阻塞隊列的真正實現類別去實現.如果獲取到了任務,就返回,如果沒有則timeout設置為true,表示沒有接受到任務,因此前文的timeout就理解了.通常而言線程池中的線程數是不允許超過最大線程數的,但通常而言這是一種機制的完整性和規范,假如是自定義線程池的情況下,就有可能出現這種情況,另外一種是本人推測雖然由于增加工作線程數的操作底層是通過CAS去實現的,底層是原子性的,同時進行CAS操作就有可能導致ABA問題出現,或者操作失敗,或者不斷自旋的可能,
6.任務的提交submit
眾所周知,任務需要進行提交給線程池,再有線程池去執行,而Runnable接口實現的run方法是沒有返回值的,而在線程中Callable通常具備返回值,且配備Future去接受結果.因此submit具備不同的操作
這里以AbstractExecutorService(線程池的父類)接口為例:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
-
RunnableFuture接口的實現類FutureTask,總而言之就是轉換為一個Runnable,然后進行提交,最后返回一個future,至于FutureTask具體內容自行詳解.
7.線程池的關閉
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
實現邏輯也很清楚,檢查是否可以關閉線程,然后設置線程的狀態,interruptIdleWorkers()這個方法算是關鍵的,他會去中斷worker;onShutdown是一個空方法,留給子類去實現的.
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
他會去遍歷集合workers,獲取每一個worker的工作線程,然后嘗試去中斷,最后結束.
總結
以上是生活随笔為你收集整理的Java线程池ThreadPoolExecutor源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 迎战黑暗:《致命公司》飞船灯不亮全面解决
- 下一篇: 广东属于哪里(广东属于河南省吗)