java 线程池 源码_java线程池源码分析
我們在關閉線程池的時候會使用shutdown()和shutdownNow(),那么問題來了:
這兩個方法又什么區別呢?
他們背后的原理是什么呢?
線程池中線程超過了coresize后會怎么操作呢?
為了解決這些疑問我們需要分析java線程池的原理。
1 基本使用
1.1 繼承關系
平常我們在創建線程池經常使用的方式如下:
ExecutorService executorService = Executors.newFixedThreadPool(5);
看下newFixedThreadPool源碼, 其實Executors是個工廠類,內部是new了一個ThreadPoolExecuto:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
參數的意義就不介紹了,網上有很多內容,看源碼注釋也可以明白。
線程池中類的繼承關系如下:
2 源碼分析
2.1 入口
將一個Runnable放到線程池執行有兩種方式,一個是調用ThreadPoolExecutor#submit,一個是調用ThreadPoolExecutor#execute。其實submit是將Runnable封裝成了一個RunnableFuture,然后再調用execute,最終調用的還是execute,所以我們這里就只從ThreadPoolExecutor#execute開始分析。
2.2 ctl和線程池狀態
ThreadPoolExecutor中有個重要的屬性是ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 高3位表示狀態,低29位表示線程池中線程的多少
private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3 = 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 左移29為減1,即最終得到為高3位為0,低29位為1的數字,作為掩碼,是二進制運算中常用的方法
private static final int RUNNING = -1 << COUNT_BITS; // 高三位111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位000
private static final int STOP = 1 << COUNT_BITS; // 高三位001
private static final int TIDYING = 2 << COUNT_BITS; // 高三位010
private static final int TERMINATED = 3 << COUNT_BITS; // 高三位011
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } // 保留高3位,即計算線程池狀態
private static int workerCountOf(int c) { return c & CAPACITY; } // 保留低29位, 即計算線程數量
private static int ctlOf(int rs, int wc) { return rs | wc; } // 求ctl
ThreadPoolExecutor中使用32位Integer來表示線程池的狀態和線程的數量,其中高3位表示狀態,低29位表示數量。如果對二進制運行不熟悉可以參考:二進制運算。從上也可以看出線程池有五種狀態,我們關心前3中狀態
RUNNING 接收task和處理queue中的task
SHUTDOWN 不再接收新的task,但是會處理完正在運行的task和queue中的task,不會interrupt正在執行的task,其實調用shutdown后線程池處于該狀態
STOP 不再接收新的task,也不處理queue中的task,同時正在運行的線程會被interrupt。調用shutdownNow后線程池會處于該狀態。
2.3 execute
明白了ctl和線程池的狀態后我們來具體看下execute的處理邏輯
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 線程數量小于coresize,那么就調用addWorker
if (addWorker(command, true)) // 這里知道,返回true就不往下走了
return;
c = ctl.get();
}
// 不滿足上述條件,即線程數量 >= coreSize,或者addWorker返回fasle,那么走下面的邏輯
if (isRunning(c) && workQueue.offer(command)) { // 可以看到是往blockingqueue中放task
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果不滿足上述條件,即blockingqueue也放不進去,那么就走下面的邏輯
else if (!addWorker(command, false))
reject(command);
}
從上面的代碼我們可以看到線程池處理線程的基本思路是: 如果線程數量小于coresize那么就執行task,否則就放到queue中,如果queue也放不下就走下面addWorker,如果也失敗了,那么就調用reject策略。當然還涉及一些細節,需要進一步分析。
2.4 addWorker
execute中反復調用的是addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 計算線程池狀態
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && // 先忽略
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 線程數量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 可見如果超過了運行的最大線程數量則返回false
return false;
if (compareAndIncrementWorkerCount(c)) // 如果成功,線程數量肯定加1
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask); // 將task封裝成了Worker
final Thread t = w.thread; // 來獲取worker的thread
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 將worker添加到hashset中報存,關閉的時候要使用
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) { // 經過一些檢查, 啟動了work的thread
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 如果線程啟動失敗,則將線程數減1
}
return workerStarted;
}
上面的代碼看起來比較復雜,但是如果我們忽略具體的細節,從大致思路上看,其實也比較簡單。上面代碼的主要思路就是:除了一些狀態檢查外,首先將線程數量加1,然后將runnable分裝成一個worker,去啟動worker線程,如果啟動失敗則再將線程數量減1。返回false的原因可能是線程數量大于允許的數量。所以addWorker調用成功,則會啟動一個work線程,且線程池中線程數量加1
2.5 worker
woker是線程池中真正的線程實體。線程池中的線程不是自定義的Runnable實現的線程,而是woker線程,worker在run方法里調用了自定義的Runnable的run方法。
Worker繼承了AQS,并實現了runnable接口:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 這個時候回頭看看addWorker中t.start(), 就明白了啟動的實際是一個Woker線程,而不是用戶定義的Runnable
}
public void run() {
runWorker(this);
}
}
Worker中firstTask存儲了用戶定義的Runnable,thread是以他自身為參數的Thread對象。getThreadFactory()默認返回是Executors#DefaultThreadFactory,用來新建線程,并定義了線程名稱的前綴等:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-"; //
}
public Thread newThread(Runnable r) { // 調用后新建一個線程
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
2.6 runWoker
Worker的run方法調用了runWorker,并將自身作為參數傳了進去,下面看看問題的關鍵:runWorker:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 注意這里的while循環,這里很關鍵。這里注意,如果兩個條件都滿足了,那么線程就結束了
w.lock(); // 注意worker繼承了AQS,相當于自己實現了鎖,這個在關閉線程的時候有用
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 僅僅是回調了Runnable的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null; // 重點,task執行完后就被置位null
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 注意while循環結束后worker線程就結束了
}
}
runWorker中有個while循環,while中判斷條件為(task != null || (task = getTask()) != null)。假設我們按照正常的邏輯,即task != null,則會調用task.run方法,執行完run方法后然后在finally中task被置為null;接著又進入while循環判斷,這次task == null,所以不符合第一個判斷條件,則會繼續判斷 task == getTask()) != null。我們來看下getTask做了什么。
2.7 getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 當調用shutdown()方法的時候,線程狀態就為shutdown了; 當調用shutdownow()的時候,線程狀態就為stop了
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) { // 通過死循環設置狀態
int wc = workerCountOf(c);
// 設置允許core線程timeout或者線程數量大于coresize,則允許線程超時
timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果線程數量 <= 最大線程數 且 沒有超時和允許超時 則跳出死循環
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
// 這里是關鍵,如果允許超時則調用poll從queue中取出task,否則就調用take可阻塞的獲取task
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null) // 獲取到task則返回,然后runWorker的while循環就繼續執行,并調用task的run方法
return r;
timedOut = true; // 否則設置為timeOut,繼續循環,但是下次循環會走到if (compareAndDecrementWorkerCount(c)) 處,并返回null。
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
忽略掉具體細節,getTask的整體思路是: 從blockqueue中拿去task,如果queue中沒有task則分兩種情況:
如果允許超時則調用poll(keepAliveTime, TimeUnit.NANOSECONDS),在規定時間沒有返回了則getTask返回null,runWorker結束while循環,work線程結束。當線程數量大于coresize且blockqueue滿的時候且小于maxsize的時候,新創建的線程便是走這個邏輯;或者允許core線程超時的時候也是走這個邏輯
如果不允許超時,則會一直阻塞直到blockqueue中有了新的task。take方法阻塞則表示worker線程也阻塞,也就是在沒有task執行的情況下,worker線程便會阻塞等待。core線程走的就是這個邏輯。
這個時候回頭再看下runWorker,如果task != null,那么就會執行task的run方法,執行完后task就會為被置為null,再次進入while循環執行getTask阻塞在這里了。通過這種方式保留住了線程。如果while循環結束了,那么worker線程也就結束了。
2.8 再看addWorker
分析到這里我們再來看下addWoker。addWorker可以將第一個參數設置為null。例如ThreadPoolExecutor#prestartAllCoreThreads:
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true)) // addWorker第一個參數是null
++n;
return n;
}
經過前面的分析,我們知道addWoker用來啟動一個worker線程,worker線程調用runWorker來執行,而runWorker中有個while循環,判斷條件是(task != null || (task = getTask()) != null)。因為我們傳入的task為null,所以就會判斷task = getTask()) != null,而getTask就是去blockqueue中拿去數據,如果沒有任務就會阻塞住。這個時候就是一個阻塞的線程在等待task的到來了。所以傳入參數為null表示創建一個空的線程,什么都不執行。
2.9 再看execute
已經知道了線程池內部的大概工作情況,我們再來看下如果所有core線程都創建好了且處于空置狀態,這個時候新放入一個線程的執行流程。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // core線程都創建好了,所以判斷條件不滿足
if (addWorker(command, true))
return;
c = ctl.get();
}
// 會走到這里,會通過offer往blockingqueue里放置一個task。這個時候阻塞的core線程會通過blockingqueue的take拿到task執行,類似一個生產者消費者的情況
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果blockingqueue添加失敗,則創建線程直到maxsize
else if (!addWorker(command, false))
reject(command);
}
可見,線程和execute通過blockingqueue來通信,而不是其他方式,execute往blockingqueue中放置task,線程通過take來獲取。整體線程池的邏輯如下圖
2.10 shutdown
這個時候我們終于可以來看看shutdown和shutdownNow了
看下shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 重點,將線程狀態置為shutdown,這樣getTask等workqueue為空后就返回null了
interruptIdleWorkers(); // 重點
onShutdown(); // 什么都沒做
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 線程沒有中斷 且 獲取到worker的鎖
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt(); // 調用interrup,中斷線程
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdown的核心方法在interruptIdleWorkers里,這里可以看到在t.interrupt的時候有個判斷添加,一個是線程沒有設置中斷標記,第二個是獲取到worker的鎖,我們注意下第二個條件。回頭看下runWorker,while中執行task的run方法的時候,會先獲取到worker線程的鎖,所以如果線程正在執行task的run方法,則shutdown的時候會獲取鎖失敗,也就不會中斷線程了。這里可以得出結論:shutdown不會中斷正在執行的線程。
如果blockingqueu中有task還沒執行完呢? 這個時候while中的take并不會阻塞,也不會被中斷,shutdown中也沒有清空blockingqueue的操作。所以可以得出結論:shutdown會等blockingqueue中的task執行完成再關閉。可以說shutdown是一種比較溫柔的關閉方式了。
如果core線程都阻塞在take方法上了,即沒有正在執行的task了,那么這個時候 t.interrupt則會中斷take方法,worker線程的while循環結束,worker線程結束。當所有的worker線程都結束后線程池就關閉了
總結下就是: shutdown會把它被調用前放到線程池中的task全部執行完。
2.11 shutdownNow
再來看下shutdownNow
public List shutdownNow() {
List tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); // 重點,將線程狀態置為stop
interruptWorkers(); // 重點
tasks = drainQueue(); // 重點
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 沒有去獲取woker的鎖
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
private List drainQueue() {
BlockingQueue q = workQueue;
List taskList = new ArrayList();
q.drainTo(taskList); // 將blockingqueue中的task清空
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
從上面的代碼可以看出:
shutdownNow不會去獲取worker的鎖,所以shutdownNow會導致正在運行的task也被中斷
shutdownNow會將blockingqueue中的task清空,所以在blockingqueue中的task也不會被執行
總結就是shutdownNow比較粗暴,調用他后,他會將所有之前提交的任務都interrupt,且將blockingqueue中的task清空
另外就是不論是shutdown還是shutdownNow都是調用Thread的interrupt()方法。如果task不響應中斷或者忽略中斷標記,那么這個線程就不會被終止。例如在run中執行以下邏輯
poolExecutor.execute(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println("b");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.printf("不處理"); // 忽略中斷
}
}
}
});
運行結果是,即使調用了shutdownNow也終止不了線程運行
b
0
不處理b
b
b
b
b
....
3 總結
線程通過while循環不停的從blockingqueue中獲取task來保留線程,避免重復重建線程
4 參考
總結
以上是生活随笔為你收集整理的java 线程池 源码_java线程池源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python朋友圈表白_情人节「告白生成
- 下一篇: java的8中数据类型_java 8种基