Java核心(二)深入理解线程池ThreadPool
本文你將獲得以下信息:
- 線程池源碼解讀
- 線程池執行流程分析
- 帶返回值的線程池實現
- 延遲線程池實現
為了方便讀者理解,本文會由淺入深,先從線程池的使用開始再延伸到源碼解讀和源碼分析等高級內容,讀者可根據自己的情況自主選擇閱讀順序和需要了解的章節。
一、線程池優點
線程池能夠更加充分的利用CPU、內存、網絡、IO等系統資源,線程池的主要作用如下:
- 利用線程池可以復用線程,控制最大并發數;
- 實現任務緩存策略和拒絕機制;
- 實現延遲執行
阿里巴巴Java開發手冊強制規定:線程資源必須通過線程池提供,如下圖:
二、線程池使用
本節會介紹7種線程池的創建與使用,線程池的狀態介紹,ThreadPoolExecutor參數介紹等。
2.1 線程池創建
線程池可以使用Executors和ThreadPoolExecutor,其中使用Executors有六種創建線程池的方法,如下圖:
// 使用Executors方式創建 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2); ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); ExecutorService workStealingPool = Executors.newWorkStealingPool(); // 原始創建方式 ThreadPoolExecutor tp = new ThreadPoolExecutor(10, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());2.1.1 線程池解讀
總結: 其中newSingleThreadExecutor、newCachedThreadPool、newFixedThreadPool是對ThreadPoolExecutor的封裝實現,newSingleThreadScheduledExecutor、newScheduledThreadPool則為ThreadPoolExecutor子類ScheduledThreadPoolExecutor的封裝,用于執行延遲任務,newWorkStealingPool則為Java 8新加的方法。
2.1.2 單線程池的意義
從以上代碼可以看出newSingleThreadExecutor和newSingleThreadScheduledExecutor創建的都是單線程池,那么單線程池的意義是什么呢?
雖然是單線程池,但提供了工作隊列,生命周期管理,工作線程維護等功能。
2.2 ThreadPoolExecutor解讀
ThreadPoolExecutor作為線程池的核心方法,我們來看一下ThreadPoolExecutor內部實現,以及封裝類是怎么調用ThreadPoolExecutor的。
先從構造函數說起,構造函數源碼如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler; }參數說明:
- corePoolSize:所謂的核心線程數,可以大致理解為長期駐留的線程數目(除非設置了 allowCoreThreadTimeOut)。對于不同的線程池,這個值可能會有很大區別,比如 newFixedThreadPool 會將其設置為 nThreads,而對于 newCachedThreadPool 則是為 0。
- maximumPoolSize:顧名思義,就是線程不夠時能夠創建的最大線程數。同樣進行對比,對于 newFixedThreadPool,當然就是 nThreads,因為其要求是固定大小,而 newCachedThreadPool 則是 Integer.MAX_VALUE。
- keepAliveTime:空閑線程的保活時間,如果線程的空閑時間超過這個值,那么將會被關閉。注意此值生效條件必須滿足:空閑時間超過這個值,并且線程池中的線程數少于等于核心線程數corePoolSize。當然核心線程默認是不會關閉的,除非設置了allowCoreThreadTimeOut(true)那么核心線程也可以被回收。
- TimeUnit:時間單位。
- BlockingQueue:任務丟列,用于存儲線程池的待執行任務的。
- threadFactory:用于生成線程,一般我們可以用默認的就可以了。
- handler:當線程池已經滿了,但是又有新的任務提交的時候,該采取什么策略由這個來指定。有幾種方式可供選擇,像拋出異常、直接拒絕然后返回等,也可以自己實現相應的接口實現自己的邏輯。
來看一下線程池封裝類對于ThreadPoolExecutor的調用:
newSingleThreadExecutor對ThreadPoolExecutor的封裝源碼如下:
public static ExecutorService newSingleThreadExecutor() {return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); }newCachedThreadPool對ThreadPoolExecutor的封裝源碼如下:
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); }newFixedThreadPool對ThreadPoolExecutor的封裝源碼如下:
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }ScheduledExecutorService對ThreadPoolExecutor的封裝源碼如下:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); }newSingleThreadScheduledExecutor使用的是ThreadPoolExecutor的子類ScheduledThreadPoolExecutor,如下圖所示:
newScheduledThreadPool對ThreadPoolExecutor的封裝源碼如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize); }newScheduledThreadPool使用的也是ThreadPoolExecutor的子類ScheduledThreadPoolExecutor。
2.3 線程池狀態
查看ThreadPoolExecutor源碼可知線程的狀態如下:
線程狀態解讀(以下內容來源于:https://javadoop.com/post/java-thread-pool):
- RUNNING:這個沒什么好說的,這是最正常的狀態:接受新的任務,處理等待隊列中的任務;
- SHUTDOWN:不接受新的任務提交,但是會繼續處理等待隊列中的任務;
- STOP:不接受新的任務提交,不再處理等待隊列中的任務,中斷正在執行任務的線程;
- TIDYING:所有的任務都銷毀了,workCount 為 0。線程池的狀態在轉換為 TIDYING 狀態時,會執行鉤子方法 terminated();
- TERMINATED:terminated() 方法結束后,線程池的狀態就會變成這個;
RUNNING 定義為 -1,SHUTDOWN 定義為 0,其他的都比 0 大,所以等于 0 的時候不能提交任務,大于 0 的話,連正在執行的任務也需要中斷。
看了這幾種狀態的介紹,讀者大體也可以猜到十之八九的狀態轉換了,各個狀態的轉換過程有以下幾種:
- RUNNING -> SHUTDOWN:當調用了 shutdown() 后,會發生這個狀態轉換,這也是最重要的;
- (RUNNING or SHUTDOWN) -> STOP:當調用 shutdownNow() 后,會發生這個狀態轉換,這下要清楚 shutDown() 和 shutDownNow() 的區別了;
- SHUTDOWN -> TIDYING:當任務隊列和線程池都清空后,會由 SHUTDOWN 轉換為 TIDYING;
- STOP -> TIDYING:當任務隊列清空后,發生這個轉換;
- TIDYING -> TERMINATED:這個前面說了,當 terminated() 方法結束后;
2.4 線程池執行
說了那么多下來一起來看線程池的是怎么執行任務的,線程池任務提交有兩個方法:
- execute
- submit
其中execute只能接受Runnable類型的任務,使用如下:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); singleThreadExecutor.execute(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName());} });submit可以接受Runnable或Callable類型的任務,使用如下:
ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName());} });2.4.1 帶返回值的線程池實現
使用submit傳遞Callable類可以獲取執行任務的返回值,Callable是JDK 1.5 添加的特性用于補充Runnable無返回的情況。
ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Long> result = executorService.submit(new Callable<Long>() {@Overridepublic Long call() throws Exception {return new Date().getTime();} }); try {System.out.println("運行結果:" + result.get()); } catch (InterruptedException e) {e.printStackTrace(); } catch (ExecutionException e) {e.printStackTrace(); }2.4.2 延遲線程池實現
在線程池中newSingleThreadScheduledExecutor和newScheduledThreadPool返回的是ScheduledExecutorService,用于執行延遲線程池的,代碼如下:
// 延遲線程池 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); scheduledThreadPool.schedule(new Runnable() {@Overridepublic void run() {System.out.println("time:" + new Date().getTime());} }, 10, TimeUnit.SECONDS);完整示例下載地址: https://github.com/vipstone/java-core-example
三、線程池源碼解讀
閱讀線程池的源碼有一個小技巧,可以按照線程池執行的順序進行串連關聯閱讀,這樣更容易理解線程池的實現。
源碼閱讀流程解讀
我們先從線程池的任務提交方法execute()開始閱讀,從execute()我們會發現線程池執行的核心方法是addWorker(),在addWorker()中我們發現啟動線程調用了start()方法,調用start()方法之后會執行Worker類的run()方法,run里面調用runWorker(),運行程序的關鍵在于getTask()方法,getTask()方法之后就是此線程的關閉,整個線程池的工作流程也就完成了,下來一起來看吧(如果本段文章沒看懂的話也可以看完源碼之后,回過頭來再看一遍)。
3.1 execute() 源碼解讀
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 如果當前線程數少于核心線程數,那么直接添加一個 worker 來執行任務,// 創建一個新的線程,并把當前任務 command 作為這個線程的第一個任務(firstTask)if (workerCountOf(c) < corePoolSize) {// 添加任務成功,那么就結束了。提交任務嘛,線程池已經接受了這個任務,這個方法也就可以返回了// 至于執行的結果,到時候會包裝到 FutureTask 中。// 返回 false 代表線程池不允許提交任務if (addWorker(command, true))return;c = ctl.get();}// 到這里說明,要么當前線程數大于等于核心線程數,要么剛剛 addWorker 失敗了// 如果線程池處于 RUNNING 狀態,把這個任務添加到任務隊列 workQueue 中if (isRunning(c) && workQueue.offer(command)) {/* 這里面說的是,如果任務進入了 workQueue,我們是否需要開啟新的線程* 因為線程數在 [0, corePoolSize) 是無條件開啟新的線程* 如果線程數已經大于等于 corePoolSize,那么將任務添加到隊列中,然后進到這里*/int recheck = ctl.get();// 如果線程池已不處于 RUNNING 狀態,那么移除已經入隊的這個任務,并且執行拒絕策略if (! isRunning(recheck) && remove(command))reject(command);// 如果線程池還是 RUNNING 的,并且線程數為 0,那么開啟新的線程// 到這里,我們知道了,這塊代碼的真正意圖是:擔心任務提交到隊列中了,但是線程都關閉了else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果 workQueue 隊列滿了,那么進入到這個分支// 以 maximumPoolSize 為界創建新的 worker,// 如果失敗,說明當前線程數已經達到 maximumPoolSize,執行拒絕策略else if (!addWorker(command, false))reject(command); }3.2 addWorker() 源碼解讀
// 第一個參數是準備提交給這個線程執行的任務,之前說了,可以為 null // 第二個參數為 true 代表使用核心線程數 corePoolSize 作為創建線程的界線,也就說創建這個線程的時候, // 如果線程池中的線程總數已經達到 corePoolSize,那么不能響應這次創建線程的請求 // 如果是 false,代表使用最大線程數 maximumPoolSize 作為界線 private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 這個非常不好理解// 如果線程池已關閉,并滿足以下條件之一,那么不創建新的 worker:// 1. 線程池狀態大于 SHUTDOWN,其實也就是 STOP, TIDYING, 或 TERMINATED// 2. firstTask != null// 3. workQueue.isEmpty()// 簡單分析下:// 還是狀態控制的問題,當線程池處于 SHUTDOWN 的時候,不允許提交任務,但是已有的任務繼續執行// 當狀態大于 SHUTDOWN 時,不允許提交任務,且中斷正在執行的任務// 多說一句:如果線程池處于 SHUTDOWN,但是 firstTask 為 null,且 workQueue 非空,那么是允許創建 worker 的if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 如果成功,那么就是所有創建線程前的條件校驗都滿足了,準備創建線程執行任務了// 這里失敗的話,說明有其他線程也在嘗試往線程池中創建線程if (compareAndIncrementWorkerCount(c))break retry;// 由于有并發,重新再讀取一下 ctlc = ctl.get();// 正常如果是 CAS 失敗的話,進到下一個里層的for循環就可以了// 可是如果是因為其他線程的操作,導致線程池的狀態發生了變更,如有其他線程關閉了這個線程池// 那么需要回到外層的for循環if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}/* * 到這里,我們認為在當前這個時刻,可以開始創建線程來執行任務了,* 因為該校驗的都校驗了,至于以后會發生什么,那是以后的事,至少當前是滿足條件的*/// worker 是否已經啟動boolean workerStarted = false;// 是否已將這個 worker 添加到 workers 這個 HashSet 中boolean workerAdded = false;Worker w = null;try {final ReentrantLock mainLock = this.mainLock;// 把 firstTask 傳給 worker 的構造方法w = new Worker(firstTask);// 取 worker 中的線程對象,之前說了,Worker的構造方法會調用 ThreadFactory 來創建一個新的線程final Thread t = w.thread;if (t != null) {// 這個是整個類的全局鎖,持有這個鎖才能讓下面的操作“順理成章”,// 因為關閉一個線程池需要這個鎖,至少我持有鎖的期間,線程池不會被關閉mainLock.lock();try {int c = ctl.get();int rs = runStateOf(c);// 小于 SHUTTDOWN 那就是 RUNNING,這個自不必說,是最正常的情況// 如果等于 SHUTDOWN,前面說了,不接受新的任務,但是會繼續執行等待隊列中的任務if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// worker 里面的 thread 可不能是已經啟動的if (t.isAlive())throw new IllegalThreadStateException();// 加到 workers 這個 HashSet 中workers.add(w);int s = workers.size();// largestPoolSize 用于記錄 workers 中的個數的最大值// 因為 workers 是不斷增加減少的,通過這個值可以知道線程池的大小曾經達到的最大值if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}// 添加成功的話,啟動這個線程if (workerAdded) {// 啟動線程t.start();workerStarted = true;}}} finally {// 如果線程沒有啟動,需要做一些清理工作,如前面 workCount 加了 1,將其減掉if (! workerStarted)addWorkerFailed(w);}// 返回線程是否啟動成功return workerStarted; }在這段代碼可以看出,調用了t.start();
3.3 runWorker() 源碼解讀
根據上面代碼可知,調用了Worker的t.start()之后,緊接著會調用Worker的run()方法,run()源碼如下:
public void run() {runWorker(this); }runWorker()源碼如下:
// worker 線程啟動后調用,while 循環(即自旋!)不斷從等待隊列獲取任務并執行 // worker 初始化時,可指定 firstTask,那么第一個任務也就可以不需要從隊列中獲取 final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 該線程的第一個任務(若有)Runnable task = w.firstTask;w.firstTask = null;// 允許中斷w.unlock(); boolean completedAbruptly = true;try {// 循環調用 getTask 獲取任務while (task != null || (task = getTask()) != null) {w.lock(); // 若線程池狀態大于等于 STOP,那么意味著該線程也要中斷/*** 若線程池STOP,請確保線程 已被中斷* 如果沒有,請確保線程未被中斷* 這需要在第二種情況下進行重新檢查,以便在關中斷時處理shutdownNow競爭*/if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 這是一個鉤子方法,留給需要的子類實現beforeExecute(wt, task);Throwable thrown = null;try {// 到這里終于可以執行任務了task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {// 這里不允許拋出 Throwable,所以轉換為 Errorthrown = x; throw new Error(x);} finally {// 也是一個鉤子方法,將 task 和異常作為參數,留給需要的子類實現afterExecute(task, thrown);}} finally {// 置空 task,準備 getTask 下一個任務task = null;// 累加完成的任務數w.completedTasks++;// 釋放掉 worker 的獨占鎖w.unlock();}}completedAbruptly = false;} finally {// 到這里,需要執行線程關閉// 1. 說明 getTask 返回 null,也就是說,這個 worker 的使命結束了,執行關閉// 2. 任務執行過程中發生了異常// 第一種情況,已經在代碼處理了將 workCount 減 1,這個在 getTask 方法分析中說// 第二種情況,workCount 沒有進行處理,所以需要在 processWorkerExit 中處理processWorkerExit(w, completedAbruptly);} }3.4 getTask() 源碼解讀
runWorker里面的有getTask(),來看下具體的實現:
// 此方法有三種可能 // 1. 阻塞直到獲取到任務返回。默認 corePoolSize 之內的線程是不會被回收的,它們會一直等待任務 // 2. 超時退出。keepAliveTime 起作用的時候,也就是如果這么多時間內都沒有任務,那么應該執行關閉 // 3. 如果發生了以下條件,須返回 null // 池中有大于 maximumPoolSize 個 workers 存在(通過調用 setMaximumPoolSize 進行設置) // 線程池處于 SHUTDOWN,而且 workQueue 是空的,前面說了,這種不再接受新的任務 // 線程池處于 STOP,不僅不接受新的線程,連 workQueue 中的線程也不再執行 private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {// 允許核心線程數內的線程回收,或當前線程數超過了核心線程數,那么有可能發生超時關閉// 這里 break,是為了不往下執行后一個 if (compareAndDecrementWorkerCount(c))// 兩個 if 一起看:如果當前線程數 wc > maximumPoolSize,或者超時,都返回 null// 那這里的問題來了,wc > maximumPoolSize 的情況,為什么要返回 null?// 換句話說,返回 null 意味著關閉線程。// 那是因為有可能開發者調用了 setMaximumPoolSize 將線程池的 maximumPoolSize 調小了// 如果此 worker 發生了中斷,采取的方案是重試// 解釋下為什么會發生中斷,這個讀者要去看 setMaximumPoolSize 方法,// 如果開發者將 maximumPoolSize 調小了,導致其小于當前的 workers 數量,// 那么意味著超出的部分線程要被關閉。重新進入 for 循環,自然會有部分線程會返回 nullint c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// CAS 操作,減少工作線程數decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {// 如果此 worker 發生了中斷,采取的方案是重試// 解釋下為什么會發生中斷,這個讀者要去看 setMaximumPoolSize 方法,// 如果開發者將 maximumPoolSize 調小了,導致其小于當前的 workers 數量,// 那么意味著超出的部分線程要被關閉。重新進入 for 循環,自然會有部分線程會返回 nulltimedOut = false;}} }四、線程池執行流程
線程池的執行流程如下圖:
五、總結
本文總結以問答的形式展示,引自《深度解讀 java 線程池設計思想及源碼實現》,最下方附參考地址。
1、線程池有哪些關鍵屬性?
-
corePoolSize 到 maximumPoolSize 之間的線程會被回收,當然 corePoolSize 的線程也可以通過設置而得到回收(allowCoreThreadTimeOut(true))。
-
workQueue 用于存放任務,添加任務的時候,如果當前線程數超過了 corePoolSize,那么往該隊列中插入任務,線程池中的線程會負責到隊列中拉取任務。
-
keepAliveTime 用于設置空閑時間,如果線程數超出了 corePoolSize,并且有些線程的空閑時間超過了這個值,會執行關閉這些線程的操作
-
rejectedExecutionHandler 用于處理當線程池不能執行此任務時的情況,默認有拋出 RejectedExecutionException 異常、忽略任務、使用提交任務的線程來執行此任務和將隊列中等待最久的任務刪除,然后提交此任務這四種策略,默認為拋出異常。
2、線程池中的線程創建時機?
-
如果當前線程數少于 corePoolSize,那么提交任務的時候創建一個新的線程,并由這個線程執行這個任務;
-
如果當前線程數已經達到 corePoolSize,那么將提交的任務添加到隊列中,等待線程池中的線程去隊列中取任務;
-
如果隊列已滿,那么創建新的線程來執行任務,需要保證池中的線程數不會超過 maximumPoolSize,如果此時線程數超過了 maximumPoolSize,那么執行拒絕策略。
3、任務執行過程中發生異常怎么處理?
如果某個任務執行出現異常,那么執行任務的線程會被關閉,而不是繼續接收其他任務。然后會啟動一個新的線程來代替它。
4、什么時候會執行拒絕策略?
- workers 的數量達到了 corePoolSize,任務入隊成功,以此同時線程池被關閉了,而且關閉線程池并沒有將這個任務出隊,那么執行拒絕策略。這里說的是非常邊界的問題,入隊和關閉線程池并發執行,讀者仔細看看 execute 方法是怎么進到第一個 reject(command) 里面的。
- workers 的數量大于等于 corePoolSize,準備入隊,可是隊列滿了,任務入隊失敗,那么準備開啟新的線程,可是線程數已經達到 maximumPoolSize,那么執行拒絕策略。
六、參考資料
書籍:《碼出高效:Java開發手冊》
Java核心技術36講:http://t.cn/EwUJvWA
深度解讀 java 線程池設計思想及源碼實現:https://javadoop.com/post/java-thread-pool
Java線程池-ThreadPoolExecutor源碼解析(基于Java8):https://www.imooc.com/article/42990
總結
以上是生活随笔為你收集整理的Java核心(二)深入理解线程池ThreadPool的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Tomcat工作原理及简单模拟实现
- 下一篇: MySQL 面试题汇总