深入分析线程池的实现原理
點(diǎn)擊上方?好好學(xué)java?,選擇?星標(biāo)?公眾號(hào)
重磅資訊、干貨,第一時(shí)間送達(dá) 今日推薦:SQL 語(yǔ)法速成手冊(cè)文末點(diǎn)擊閱讀原文,去B站看視頻,別忘記關(guān)注哦 個(gè)人原創(chuàng)100W+訪問(wèn)量博客:點(diǎn)擊前往,查看更多來(lái)源:jianshu.com/p/704a6c5d337c
一、概述
線程池,顧名思義就是存放線程的池子,池子里存放了很多可以復(fù)用的線程。
如果不用類似線程池的容器,每當(dāng)我們需要執(zhí)行用戶任務(wù)的時(shí)候都去創(chuàng)建新的線程,任務(wù)執(zhí)行完之后線程就被回收了,這樣頻繁地創(chuàng)建和銷毀線程會(huì)浪費(fèi)大量的系統(tǒng)資源。
因此,線程池通過(guò)線程復(fù)用機(jī)制,并對(duì)線程進(jìn)行統(tǒng)一管理,具有以下優(yōu)點(diǎn):
降低系統(tǒng)資源消耗。通過(guò)復(fù)用已存在的線程,降低線程創(chuàng)建和銷毀造成的消耗;
提高響應(yīng)速度。當(dāng)有任務(wù)到達(dá)時(shí),無(wú)需等待新線程的創(chuàng)建便能立即執(zhí)行;
提高線程的可管理性。線程是稀缺資源,如果無(wú)限制的創(chuàng)建,不僅會(huì)消耗大量系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行對(duì)線程進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。
ThreadPoolExecutor是線程池框架的一個(gè)核心類,本文通過(guò)對(duì)ThreadPoolExecutor源碼的分析(基于JDK 1.8),來(lái)深入分析線程池的實(shí)現(xiàn)原理。
二、ThreadPoolExecutor類的屬性
先從ThreadPoolExecutor類中的字段開(kāi)始:
附上我歷時(shí)三個(gè)月總結(jié)的?Java 面試 + Java 后端技術(shù)學(xué)習(xí)指南,筆者這幾年及春招的總結(jié),github 1.4k star,拿去不謝!
下載方式
1.?首先掃描下方二維碼
2.?后臺(tái)回復(fù)「Java面試」即可獲取
// 線程池的控制狀態(tài),用高3位來(lái)表示線程池的運(yùn)行狀態(tài),低29位來(lái)表示線程池中工作線程的數(shù)量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//值為29,用來(lái)表示偏移量private static final int COUNT_BITS = Integer.SIZE - 3;//線程池的最大容量,其值的二進(jìn)制為:00011111111111111111111111111111(29個(gè)1)private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 線程池的運(yùn)行狀態(tài),總共有5個(gè)狀態(tài),用高3位來(lái)表示private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;//任務(wù)緩存隊(duì)列,用來(lái)存放等待執(zhí)行的任務(wù)private final BlockingQueue<Runnable> workQueue;//全局鎖,對(duì)線程池狀態(tài)等屬性修改時(shí)需要使用這個(gè)鎖private final ReentrantLock mainLock = new ReentrantLock();//線程池中工作線程的集合,訪問(wèn)和修改需要持有全局鎖private final HashSet<Worker> workers = new HashSet<Worker>();// 終止條件private final Condition termination = mainLock.newCondition();//線程池中曾經(jīng)出現(xiàn)過(guò)的最大線程數(shù)private int largestPoolSize;//已完成任務(wù)的數(shù)量private long completedTaskCount;//線程工廠private volatile ThreadFactory threadFactory;//任務(wù)拒絕策略private volatile RejectedExecutionHandler handler;//線程存活時(shí)間private volatile long keepAliveTime;//是否允許核心線程超時(shí)private volatile boolean allowCoreThreadTimeOut;//核心池大小,若allowCoreThreadTimeOut被設(shè)置,核心線程全部空閑超時(shí)被回收的情況下會(huì)為0private volatile int corePoolSize;//最大池大小,不得超過(guò)CAPACITYprivate volatile int maximumPoolSize;//默認(rèn)的任務(wù)拒絕策略private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();private static final RuntimePermission shutdownPerm =new RuntimePermission("modifyThread");private final AccessControlContext acc;在ThreadPoolExecutor類的這些屬性中,線程池狀態(tài)是控制線程池生命周期至關(guān)重要的屬性,這里就以線程池狀態(tài)為出發(fā)點(diǎn)進(jìn)行研究。
通過(guò)上面的源碼可知,線程池的運(yùn)行狀態(tài)總共有5種,其值和含義分別如下:
RUNNING: ?高3位為111,接受新任務(wù)并處理阻塞隊(duì)列中的任務(wù)
SHUTDOWN: 高3位為000,不接受新任務(wù)但會(huì)處理阻塞隊(duì)列中的任務(wù)
STOP:高3位為001,不會(huì)接受新任務(wù),也不會(huì)處理阻塞隊(duì)列中的任務(wù),并且中斷正在運(yùn)行的任務(wù)
TIDYING: ?高3位為010,所有任務(wù)都已終止,工作線程數(shù)量為0,線程池將轉(zhuǎn)化到TIDYING狀態(tài),即將要執(zhí)行terminated()鉤子方法
TERMINATED: 高3位為011,terminated()方法已經(jīng)執(zhí)行結(jié)束
然而,線程池中并沒(méi)有使用單獨(dú)的變量來(lái)表示線程池的運(yùn)行狀態(tài),而是使用一個(gè)AtomicInteger類型的變量ctl來(lái)表示線程池的控制狀態(tài),其將線程池運(yùn)行狀態(tài)與工作線程的數(shù)量打包在一個(gè)整型中,用高3位來(lái)表示線程池的運(yùn)行狀態(tài),低29位來(lái)表示線程池中工作線程的數(shù)量,對(duì)ctl的操作主要參考以下幾個(gè)函數(shù):
// 通過(guò)與的方式,獲取ctl的高3位,也就是線程池的運(yùn)行狀態(tài)private static int runStateOf(int c) { return c & ~CAPACITY; }//通過(guò)與的方式,獲取ctl的低29位,也就是線程池中工作線程的數(shù)量private static int workerCountOf(int c) { return c & CAPACITY; }//通過(guò)或的方式,將線程池狀態(tài)和線程池中工作線程的數(shù)量打包成ctlprivate static int ctlOf(int rs, int wc) { return rs | wc; }//SHUTDOWN狀態(tài)的值是0,比它大的均是線程池停止或清理狀態(tài),比它小的是運(yùn)行狀態(tài)private static boolean isRunning(int c) {return c < SHUTDOWN;}接下來(lái),我們看一下線程池狀態(tài)的所有轉(zhuǎn)換情況,如下:
RUNNING -> SHUTDOWN:調(diào)用shutdown(),可能在finalize()中隱式調(diào)用
(RUNNING or SHUTDOWN) -> STOP:調(diào)用shutdownNow()
SHUTDOWN -> TIDYING:當(dāng)緩存隊(duì)列和線程池都為空時(shí)
STOP -> TIDYING:當(dāng)線程池為空時(shí)
TIDYING -> TERMINATED:當(dāng)terminated()方法執(zhí)行結(jié)束時(shí)
通常情況下,線程池有如下兩種狀態(tài)轉(zhuǎn)換流程:
RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED
RUNNING -> STOP -> TIDYING -> TERMINATED
三、ThreadPoolExecutor類的構(gòu)造方法
通常情況下,我們使用線程池的方式就是new一個(gè)ThreadPoolExecutor對(duì)象來(lái)生成一個(gè)線程池。接下來(lái),先看ThreadPoolExecutor類的構(gòu)造函數(shù):
//間接調(diào)用最后一個(gè)構(gòu)造函數(shù),采用默認(rèn)的任務(wù)拒絕策略AbortPolicy和默認(rèn)的線程工廠public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);//間接調(diào)用最后一個(gè)構(gòu)造函數(shù),采用默認(rèn)的任務(wù)拒絕策略AbortPolicypublic ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);//間接調(diào)用最后一個(gè)構(gòu)造函數(shù),采用默認(rèn)的默認(rèn)的線程工廠public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);//前面三個(gè)分別調(diào)用了最后一個(gè),主要的構(gòu)造函數(shù)public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);接下來(lái),看下最后一個(gè)構(gòu)造函數(shù)的具體實(shí)現(xiàn):
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {//參數(shù)合法性校驗(yàn)if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();//參數(shù)合法性校驗(yàn)if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();//初始化對(duì)應(yīng)的屬性this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}下面解釋下一下構(gòu)造器中各個(gè)參數(shù)的含義:
1. corePoolSize
線程池中的核心線程數(shù)。當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行。
2. maximumPoolSize
線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊(duì)列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize。
3. keepAliveTime
線程空閑時(shí)的存活時(shí)間。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才會(huì)起作用,如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止,直到線程池中的線程數(shù)不超過(guò)corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0。
4. unit
keepAliveTime參數(shù)的時(shí)間單位。
5. workQueue
任務(wù)緩存隊(duì)列,用來(lái)存放等待執(zhí)行的任務(wù)。如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)就會(huì)被保存到任務(wù)緩存隊(duì)列中,等待被執(zhí)行。
一般來(lái)說(shuō),這里的BlockingQueue有以下三種選擇:
SynchronousQueue:
一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)。
因此,如果線程池中始終沒(méi)有空閑線程(任務(wù)提交的平均速度快于被處理的速度),可能出現(xiàn)無(wú)限制的線程增長(zhǎng)。
LinkedBlockingQueue:
基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,如果不設(shè)置初始化容量,其容量為Integer.MAX_VALUE,即為無(wú)界隊(duì)列。
因此,如果線程池中線程數(shù)達(dá)到了corePoolSize,且始終沒(méi)有空閑線程(任務(wù)提交的平均速度快于被處理的速度),任務(wù)緩存隊(duì)列可能出現(xiàn)無(wú)限制的增長(zhǎng)。
ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,按FIFO排序任務(wù)。
6. threadFactory
線程工廠,創(chuàng)建新線程時(shí)使用的線程工廠。
7. handler
任務(wù)拒絕策略,當(dāng)阻塞隊(duì)列滿了,且線程池中的線程數(shù)達(dá)到maximumPoolSize,如果繼續(xù)提交任務(wù),就會(huì)采取任務(wù)拒絕策略處理該任務(wù),線程池提供了4種任務(wù)拒絕策略:
AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常,默認(rèn)策略;
CallerRunsPolicy:由調(diào)用execute方法的線程執(zhí)行該任務(wù);
DiscardPolicy:丟棄任務(wù),但是不拋出異常;
DiscardOldestPolicy:丟棄阻塞隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)。
當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù)。
四、線程池的實(shí)現(xiàn)原理
1. 提交任務(wù)
線程池框架提供了兩種方式提交任務(wù),submit()和execute(),通過(guò)submit()方法提交的任務(wù)可以返回任務(wù)執(zhí)行的結(jié)果,通過(guò)execute()方法提交的任務(wù)不能獲取任務(wù)執(zhí)行的結(jié)果。
submit()方法的實(shí)現(xiàn)有以下三種:
public Future<?> submit(Runnable task);public <T> Future<T> submit(Runnable task, T result);public <T> Future<T> submit(Callable<T> task);下面以第一個(gè)方法為例簡(jiǎn)單看一下submit()方法的實(shí)現(xiàn):
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}submit()方法是在ThreadPoolExecutor的父類AbstractExecutorService類實(shí)現(xiàn)的,最終還是調(diào)用的ThreadPoolExecutor類的execute()方法,下面著重看一下execute()方法的實(shí)現(xiàn)。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();//獲取線程池控制狀態(tài)int c = ctl.get();// (1)//worker數(shù)量小于corePoolSizeif (workerCountOf(c) < corePoolSize) {//創(chuàng)建worker,addWorker方法boolean參數(shù)用來(lái)判斷是否創(chuàng)建核心線程if (addWorker(command, true))//成功則返回return;//失敗則再次獲取線程池控制狀態(tài)c = ctl.get();}//(2)//線程池處于RUNNING狀態(tài),將任務(wù)加入workQueue任務(wù)緩存隊(duì)列if (isRunning(c) && workQueue.offer(command)) {// 再次檢查,獲取線程池控制狀態(tài),防止在任務(wù)入隊(duì)的過(guò)程中線程池關(guān)閉了或者線程池中沒(méi)有線程了int recheck = ctl.get();//線程池不處于RUNNING狀態(tài),且將任務(wù)從workQueue移除成功if (! isRunning(recheck) && remove(command))//采取任務(wù)拒絕策略reject(command);//worker數(shù)量等于0else if (workerCountOf(recheck) == 0)//創(chuàng)建workeraddWorker(null, false);}//(3)else if (!addWorker(command, false)) //創(chuàng)建workerreject(command); //如果創(chuàng)建worker失敗,采取任務(wù)拒絕策略}execute()方法的執(zhí)行流程可以總結(jié)如下:
若線程池工作線程數(shù)量小于corePoolSize,則創(chuàng)建新線程來(lái)執(zhí)行任務(wù)
若工作線程數(shù)量大于或等于corePoolSize,則將任務(wù)加入BlockingQueue
若無(wú)法將任務(wù)加入BlockingQueue(BlockingQueue已滿),且工作線程數(shù)量小于maximumPoolSize,則創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)
若工作線程數(shù)量達(dá)到maximumPoolSize,則創(chuàng)建線程失敗,采取任務(wù)拒絕策略
可以結(jié)合下面的兩張圖來(lái)理解線程池提交任務(wù)的執(zhí)行流程。
2. 創(chuàng)建線程
從execute()方法的實(shí)現(xiàn)可以看出,addWorker()方法主要負(fù)責(zé)創(chuàng)建新的線程并執(zhí)行任務(wù),代碼實(shí)現(xiàn)如下:? ?
//addWorker有兩個(gè)參數(shù):Runnable類型的firstTask,用于指定新增的線程執(zhí)行的第一個(gè)任務(wù);boolean類型的core,表示是否創(chuàng)建核心線程 //該方法的返回值代表是否成功新增一個(gè)線程private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// (1)if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);//線程數(shù)超標(biāo),不能再創(chuàng)建線程,直接返回if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//CAS操作遞增workCount//如果成功,那么創(chuàng)建線程前的所有條件校驗(yàn)都滿足了,準(zhǔn)備創(chuàng)建線程執(zhí)行任務(wù),退出retry循環(huán)//如果失敗,說(shuō)明有其他線程也在嘗試往線程池中創(chuàng)建線程(往線程池提交任務(wù)可以是并發(fā)的),則繼續(xù)往下執(zhí)行if (compareAndIncrementWorkerCount(c))break retry;//重新獲取線程池控制狀態(tài)c = ctl.get();// 如果線程池的狀態(tài)發(fā)生了變更,如有其他線程關(guān)閉了這個(gè)線程池,那么需要回到外層的for循環(huán)if (runStateOf(c) != rs)continue retry;//如果只是CAS操作失敗的話,進(jìn)入內(nèi)層的for循環(huán)就可以了}}//到這里,創(chuàng)建線程前的所有條件校驗(yàn)都滿足了,可以開(kāi)始創(chuàng)建線程來(lái)執(zhí)行任務(wù)//worker是否已經(jīng)啟動(dòng)boolean workerStarted = false;//是否已將這個(gè)worker添加到workers這個(gè)HashSet中boolean workerAdded = false;Worker w = null;try {//創(chuàng)建一個(gè)worker,從這里可以看出對(duì)線程的包裝w = new Worker(firstTask);//取出worker中的線程對(duì)象,Worker的構(gòu)造方法會(huì)調(diào)用ThreadFactory來(lái)創(chuàng)建一個(gè)新的線程final Thread t = w.thread;if (t != null) {//獲取全局鎖, 并發(fā)的訪問(wèn)線程池workers對(duì)象必須加鎖,持有鎖的期間線程池也不會(huì)被關(guān)閉final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//重新獲取線程池的運(yùn)行狀態(tài)int rs = runStateOf(ctl.get());//小于SHUTTDOWN即RUNNING//等于SHUTDOWN并且firstTask為null,不接受新的任務(wù),但是會(huì)繼續(xù)執(zhí)行等待隊(duì)列中的任務(wù)if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//worker里面的thread不能是已啟動(dòng)的if (t.isAlive())throw new IllegalThreadStateException();//將新創(chuàng)建的線程加入到線程池中workers.add(w);int s = workers.size();// 更新largestPoolSizeif (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//線程添加線程池成功,則啟動(dòng)新創(chuàng)建的線程if (workerAdded) {t.start();workerStarted = true;}}} finally {//若線程啟動(dòng)失敗,做一些清理工作,例如從workers中移除新添加的worker并遞減wokerCountif (! workerStarted)addWorkerFailed(w);}//返回線程是否啟動(dòng)成功return workerStarted;}因?yàn)榇a(1)處的邏輯不利于理解,我們通過(guò)(1)的等價(jià)實(shí)現(xiàn)來(lái)理解:? ?
if (rs>=SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) //等價(jià)實(shí)現(xiàn) rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())其含義為,滿足下列條件之一則直接返回false,線程創(chuàng)建失敗:
rs > SHUTDOWN,也就是STOP,TIDYING或TERMINATED,此時(shí)不再接受新的任務(wù),且中斷正在執(zhí)行的任務(wù)
rs = SHUTDOWN且firstTask != null,此時(shí)不再接受任務(wù),但是仍會(huì)處理任務(wù)緩存隊(duì)列中的任務(wù)
rs = SHUTDOWN,隊(duì)列為空
多說(shuō)一句,若線程池處于 SHUTDOWN, firstTask 為 null,且 workQueue 非空,那么還得創(chuàng)建線程繼續(xù)處理任務(wù)緩存隊(duì)列中的任務(wù)。
總結(jié)一下,addWorker()方法完成了如下幾件任務(wù):
1. 原子性的增加workerCount
2. 將用戶給定的任務(wù)封裝成為一個(gè)worker,并將此worker添加進(jìn)workers集合中
3. 啟動(dòng)worker對(duì)應(yīng)的線程
4. 若線程啟動(dòng)失敗,回滾worker的創(chuàng)建動(dòng)作,即從workers中移除新添加的worker,并原子性的減少workerCount
3. 工作線程的實(shí)現(xiàn)
從addWorker()方法的實(shí)現(xiàn)可以看出,工作線程的創(chuàng)建和啟動(dòng)都跟ThreadPoolExecutor中的內(nèi)部類Worker有關(guān)。下面我們分析Worker類來(lái)看一下工作線程的實(shí)現(xiàn)。
Worker類繼承自AQS類,具有鎖的功能;實(shí)現(xiàn)了Runable接口,可以將自身作為一個(gè)任務(wù)在線程中執(zhí)行。
private final class Workerextends AbstractQueuedSynchronizerimplements RunnableWorker的主要字段就下面三個(gè),代碼也比較簡(jiǎn)單。
//用來(lái)封裝worker的線程,線程池中真正運(yùn)行的線程,通過(guò)線程工廠創(chuàng)建而來(lái)final Thread thread;//worker所對(duì)應(yīng)的第一個(gè)任務(wù),可能為空Runnable firstTask;//記錄當(dāng)前線程完成的任務(wù)數(shù)volatile long completedTasks;Worker的構(gòu)造函數(shù)如下。
Worker(Runnable firstTask) {//設(shè)置AQS的state為-1,在執(zhí)行runWorker()方法之前阻止線程中斷setState(-1);//初始化第一個(gè)任務(wù)this.firstTask = firstTask;//利用指定的線程工廠創(chuàng)建一個(gè)線程,注意,參數(shù)是Worker實(shí)例本身this//也就是當(dāng)執(zhí)行start方法啟動(dòng)線程thread時(shí),真正執(zhí)行的是Worker類的run方法this.thread = getThreadFactory().newThread(this);}Worker類繼承了AQS類,重寫(xiě)了其相應(yīng)的方法,實(shí)現(xiàn)了一個(gè)自定義的同步器,實(shí)現(xiàn)了不可重入鎖。
//是否持有獨(dú)占鎖protected boolean isHeldExclusively() {return getState() != 0;}//嘗試獲取鎖protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {//設(shè)置獨(dú)占線程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//嘗試釋放鎖protected boolean tryRelease(int unused) {//設(shè)置獨(dú)占線程為nullsetExclusiveOwnerThread(null);setState(0);return true;}//獲取鎖public void lock() { acquire(1); }//嘗試獲取鎖public boolean tryLock() { return tryAcquire(1); }//釋放鎖public void unlock() { release(1); }//是否持有鎖public boolean isLocked() { return isHeldExclusively(); }Worker類還提供了一個(gè)中斷線程thread的方法。
void interruptIfStarted() {Thread t;//AQS狀態(tài)大于等于0,worker對(duì)應(yīng)的線程不為null,且該線程沒(méi)有被中斷if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}再來(lái)看一下Worker類的run()方法的實(shí)現(xiàn),會(huì)發(fā)現(xiàn)run()方法最終調(diào)用了ThreadPoolExecutor類的runWorker()方法。
public void run() {runWorker(this);}4. 線程復(fù)用機(jī)制
通過(guò)上文可以知道,worker中的線程start 后,執(zhí)行的是worker的run()方法,而run()方法最終會(huì)調(diào)用ThreadPoolExecutor類的runWorker()方法,runWorker()方法實(shí)現(xiàn)了線程池中的線程復(fù)用機(jī)制。下面我們來(lái)看一下runWorker()方法的實(shí)現(xiàn)。
final void runWorker(Worker w) {//獲取當(dāng)前線程Thread wt = Thread.currentThread();//獲取w的firstTaskRunnable task = w.firstTask;//設(shè)置w的firstTask為nullw.firstTask = null;// 釋放鎖,設(shè)置AQS的state為0,允許中斷w.unlock();//用于標(biāo)識(shí)線程是否異常終止,finally中processWorkerExit()方法會(huì)有不同邏輯boolean completedAbruptly = true;try {//循環(huán)調(diào)用getTask()獲取任務(wù),不斷從任務(wù)緩存隊(duì)列獲取任務(wù)并執(zhí)行while (task != null || (task = getTask()) != null) {//進(jìn)入循環(huán)內(nèi)部,代表已經(jīng)獲取到可執(zhí)行的任務(wù),則對(duì)worker對(duì)象加鎖,保證線程在執(zhí)行任務(wù)過(guò)程中不會(huì)被中斷w.lock();if ((runStateAtLeast(ctl.get(), STOP) || //若線程池狀態(tài)大于等于STOP,那么意味著該線程要中斷(Thread.interrupted() && //線程被中斷runStateAtLeast(ctl.get(), STOP))) && //且是因?yàn)榫€程池內(nèi)部狀態(tài)變化而被中斷!wt.isInterrupted()) //確保該線程未被中斷//發(fā)出中斷請(qǐng)求wt.interrupt();try {//開(kāi)始執(zhí)行任務(wù)前的Hook方法beforeExecute(wt, task);Throwable thrown = null;try {//到這里正式開(kāi)始執(zhí)行任務(wù)task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//執(zhí)行任務(wù)后的Hook方法afterExecute(task, thrown);}} finally {//置空task,準(zhǔn)備通過(guò)getTask()獲取下一個(gè)任務(wù)task = null;//completedTasks遞增w.completedTasks++;//釋放掉worker持有的獨(dú)占鎖w.unlock();}}completedAbruptly = false;} finally {//到這里,線程執(zhí)行結(jié)束,需要執(zhí)行結(jié)束線程的一些清理工作//線程執(zhí)行結(jié)束可能有兩種情況://1.getTask()返回null,也就是說(shuō),這個(gè)worker的使命結(jié)束了,線程執(zhí)行結(jié)束//2.任務(wù)執(zhí)行過(guò)程中發(fā)生了異常//第一種情況,getTask()返回null,那么getTask()中會(huì)將workerCount遞減//第二種情況,workerCount沒(méi)有進(jìn)行處理,這個(gè)遞減操作會(huì)在processWorkerExit()中處理processWorkerExit(w, completedAbruptly);}}runWorker()方法是線程池的核心,實(shí)現(xiàn)了線程池中的線程復(fù)用機(jī)制,來(lái)看一下
runWorker()方法都做了哪些工作:
1. 運(yùn)行第一個(gè)任務(wù)firstTask之后,循環(huán)調(diào)用getTask()方法獲取任務(wù),不斷從任務(wù)緩存隊(duì)列獲取任務(wù)并執(zhí)行;
2. 獲取到任務(wù)之后就對(duì)worker對(duì)象加鎖,保證線程在執(zhí)行任務(wù)的過(guò)程中不會(huì)被中斷,任務(wù)執(zhí)行完會(huì)釋放鎖;
3. 在執(zhí)行任務(wù)的前后,可以根據(jù)業(yè)務(wù)場(chǎng)景重寫(xiě)beforeExecute()和afterExecute()等Hook方法;
4. 執(zhí)行通過(guò)getTask()方法獲取到的任務(wù)
5. 線程執(zhí)行結(jié)束后,調(diào)用processWorkerExit()方法執(zhí)行結(jié)束線程的一些清理工作
從runWorker()方法的實(shí)現(xiàn)可以看出,runWorker()方法中主要調(diào)用了getTask()方法和processWorkerExit()方法,下面分別看一下這兩個(gè)方法的實(shí)現(xiàn)。
getTask()的實(shí)現(xiàn)
getTask()方法用來(lái)不斷地從任務(wù)緩存隊(duì)列獲取任務(wù)并交給線程執(zhí)行,下面分析一下其實(shí)現(xiàn)。?
private Runnable getTask() {//標(biāo)識(shí)當(dāng)前線程是否超時(shí)未能獲取到task對(duì)象boolean timedOut = false;for (;;) {//獲取線程池的控制狀態(tài)int c = ctl.get();//獲取線程池的運(yùn)行狀態(tài)int rs = runStateOf(c);//如果線程池狀態(tài)大于等于STOP,或者處于SHUTDOWN狀態(tài),并且阻塞隊(duì)列為空,線程池工作線程數(shù)量遞減,方法返回null,回收線程if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}//獲取worker數(shù)量int wc = workerCountOf(c);//標(biāo)識(shí)當(dāng)前線程在空閑時(shí),是否應(yīng)該超時(shí)回收// 如果allowCoreThreadTimeOut為ture,或當(dāng)前線程數(shù)大于核心池大小,則需要超時(shí)回收boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果worker數(shù)量大于maximumPoolSize(有可能調(diào)用了 setMaximumPoolSize(),導(dǎo)致worker數(shù)量大于maximumPoolSize)if ((wc > maximumPoolSize || (timed && timedOut)) //或者獲取任務(wù)超時(shí)&& (wc > 1 || workQueue.isEmpty())) { //workerCount大于1或者阻塞隊(duì)列為空(在阻塞隊(duì)列不為空時(shí),需要保證至少有一個(gè)工作線程)if (compareAndDecrementWorkerCount(c))//線程池工作線程數(shù)量遞減,方法返回null,回收線程return null;//線程池工作線程數(shù)量遞減失敗,跳過(guò)剩余部分,繼續(xù)循環(huán)continue;}try {//如果允許超時(shí)回收,則調(diào)用阻塞隊(duì)列的poll(),只在keepAliveTime時(shí)間內(nèi)等待獲取任務(wù),一旦超過(guò)則返回null//否則調(diào)用take(),如果隊(duì)列為空,線程進(jìn)入阻塞狀態(tài),無(wú)限時(shí)等待任務(wù),直到隊(duì)列中有可取任務(wù)或者響應(yīng)中斷信號(hào)退出Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//若task不為null,則返回成功獲取的task對(duì)象if (r != null)return r;// 若返回task為null,表示線程空閑時(shí)間超時(shí),則設(shè)置timeOut為truetimedOut = true;} catch (InterruptedException retry) {//如果此worker發(fā)生了中斷,采取的方案是重試,沒(méi)有超時(shí)//在哪些情況下會(huì)發(fā)生中斷?調(diào)用setMaximumPoolSize(),shutDown(),shutDownNow()timedOut = false;}}}接下來(lái)總結(jié)一下getTask()方法會(huì)在哪些情況下返回:
1. 線程池處于RUNNING狀態(tài),阻塞隊(duì)列不為空,返回成功獲取的task對(duì)象
2. 線程池處于SHUTDOWN狀態(tài),阻塞隊(duì)列不為空,返回成功獲取的task對(duì)象
3. 線程池狀態(tài)大于等于STOP,返回null,回收線程
4. 線程池處于SHUTDOWN狀態(tài),并且阻塞隊(duì)列為空,返回null,回收線程
5. worker數(shù)量大于maximumPoolSize,返回null,回收線程
6. 線程空閑時(shí)間超時(shí),返回null,回收線程
processWorkerExit()的實(shí)現(xiàn)
processWorkerExit()方法負(fù)責(zé)執(zhí)行結(jié)束線程的一些清理工作,下面分析一下其實(shí)現(xiàn)。
private void processWorkerExit(Worker w, boolean completedAbruptly) {//如果用戶任務(wù)執(zhí)行過(guò)程中發(fā)生了異常,則需要遞減workerCountif (completedAbruptly)decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;//獲取全局鎖mainLock.lock();try {//將worker完成任務(wù)的數(shù)量累加到總的完成任務(wù)數(shù)中completedTaskCount += w.completedTasks;//從workers集合中移除該workerworkers.remove(w);} finally {//釋放鎖mainLock.unlock();}//嘗試終止線程池tryTerminate();//獲取線程池控制狀態(tài)int c = ctl.get();if (runStateLessThan(c, STOP)) { //線程池運(yùn)行狀態(tài)小于STOPif (!completedAbruptly) { //如果用戶任務(wù)執(zhí)行過(guò)程中發(fā)生了異常,則直接調(diào)用addWorker()方法創(chuàng)建線程//是否允許核心線程超時(shí)int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//允許核心超時(shí)并且workQueue阻塞隊(duì)列不為空,那線程池中至少有一個(gè)工作線程if (min == 0 && ! workQueue.isEmpty())min = 1;//如果工作線程數(shù)量workerCount大于等于核心池大小corePoolSize,//或者允許核心超時(shí)并且workQueue阻塞隊(duì)列不為空時(shí),線程池中至少有一個(gè)工作線程,直接返回if (workerCountOf(c) >= min)return;//若不滿足上述條件,則調(diào)用addWorker()方法創(chuàng)建線程}//創(chuàng)建新的線程取代當(dāng)前線程addWorker(null, false);}}processWorkerExit()方法中主要調(diào)用了tryTerminate()方法,下面看一下tryTerminate()方法的實(shí)現(xiàn)。
final void tryTerminate() {for (;;) {//獲取線程池控制狀態(tài)int c = ctl.get();if (isRunning(c) || //線程池的運(yùn)行狀態(tài)為RUNNINGrunStateAtLeast(c, TIDYING) || //線程池的運(yùn)行狀態(tài)大于等于TIDYING(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //線程池的運(yùn)行狀態(tài)為SHUTDOWN且阻塞隊(duì)列不為空//不能終止,直接返回return;//只有當(dāng)線程池的運(yùn)行狀態(tài)為STOP,或線程池運(yùn)行狀態(tài)為SHUTDOWN且阻塞隊(duì)列為空時(shí),可以執(zhí)行到這里//如果線程池工作線程的數(shù)量不為0if (workerCountOf(c) != 0) {//僅僅中斷一個(gè)空閑的workerinterruptIdleWorkers(ONLY_ONE);return;}//只有當(dāng)線程池工作線程的數(shù)量為0時(shí)可以執(zhí)行到這里final ReentrantLock mainLock = this.mainLock;//獲取全局鎖mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //CAS操作設(shè)置線程池運(yùn)行狀態(tài)為T(mén)IDYING,工作線程數(shù)量為0try {//執(zhí)行terminated()鉤子方法terminated();} finally {//設(shè)置線程池運(yùn)行狀態(tài)為T(mén)ERMINATED,工作線程數(shù)量為0ctl.set(ctlOf(TERMINATED, 0));//喚醒在termination條件上等待的所有線程termination.signalAll();}return;}} finally {//釋放鎖mainLock.unlock();}//若CAS操作失敗則重試}}tryTerminate()方法的作用是嘗試終止線程池,它會(huì)在所有可能終止線程池的地方被調(diào)用,滿足終止線程池的條件有兩個(gè):首先,線程池狀態(tài)為STOP,或者為SHUTDOWN且任務(wù)緩存隊(duì)列為空;其次,工作線程數(shù)量為0。
滿足了上述兩個(gè)條件之后,tryTerminate()方法獲取全局鎖,設(shè)置線程池運(yùn)行狀態(tài)為T(mén)IDYING,之后執(zhí)行terminated()鉤子方法,最后設(shè)置線程池狀態(tài)為T(mén)ERMINATED。
至此,線程池運(yùn)行狀態(tài)變?yōu)門(mén)ERMINATED,工作線程數(shù)量為0,workers已清空,且workQueue也已清空,所有線程都執(zhí)行結(jié)束,線程池的生命周期到此結(jié)束。
5. 關(guān)閉線程池
關(guān)閉線程池有兩個(gè)方法,shutdown()和shutdownNow(),下面分別看一下這兩個(gè)方法的實(shí)現(xiàn)。
shutdown()的實(shí)現(xiàn)
shutdown()方法將線程池運(yùn)行狀態(tài)設(shè)置為SHUTDOWN,此時(shí)線程池不會(huì)接受新的任務(wù),但會(huì)處理阻塞隊(duì)列中的任務(wù)。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;//獲取全局鎖mainLock.lock();try {//檢查shutdown權(quán)限checkShutdownAccess();//設(shè)置線程池運(yùn)行狀態(tài)為SHUTDOWNadvanceRunState(SHUTDOWN);//中斷所有空閑workerinterruptIdleWorkers();//用onShutdown()鉤子方法onShutdown();} finally {//釋放鎖mainLock.unlock();}//嘗試終止線程池tryTerminate();}shutdown()方法首先會(huì)檢查是否具有shutdown的權(quán)限,然后設(shè)置線程池的運(yùn)行狀態(tài)為SHUTDOWN,之后中斷所有空閑的worker,再調(diào)用onShutdown()鉤子方法,最后嘗試終止線程池。
shutdown()方法調(diào)用了interruptIdleWorkers()方法中斷所有空閑的worker,其實(shí)現(xiàn)如下。
private void interruptIdleWorkers() {interruptIdleWorkers(false);}//onlyOne標(biāo)識(shí)是否只中斷一個(gè)線程private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;//獲取全局鎖mainLock.lock();try {//遍歷workers集合for (Worker w : workers) {//worker對(duì)應(yīng)的線程Thread t = w.thread;//線程未被中斷且成功獲得鎖if (!t.isInterrupted() && w.tryLock()) {try {//發(fā)出中斷請(qǐng)求t.interrupt();} catch (SecurityException ignore) {} finally {//釋放鎖w.unlock();}}//若只中斷一個(gè)線程,則跳出循環(huán)if (onlyOne)break;}} finally {//釋放鎖mainLock.unlock();}}shutdownNow()的實(shí)現(xiàn)
shutdownNow()方法將線程池運(yùn)行狀態(tài)設(shè)置為STOP,此時(shí)線程池不會(huì)接受新任務(wù),也不會(huì)處理阻塞隊(duì)列中的任務(wù),并且中斷正在運(yùn)行的任務(wù)。
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;//獲取全局鎖mainLock.lock();try {//檢查shutdown權(quán)限checkShutdownAccess();//設(shè)置線程池運(yùn)行狀態(tài)為STOPadvanceRunState(STOP);//中斷所有workerinterruptWorkers();//將任務(wù)緩存隊(duì)列中等待執(zhí)行的任務(wù)取出并放到list中tasks = drainQueue();} finally {//釋放鎖mainLock.unlock();}//嘗試終止線程池tryTerminate();//返回任務(wù)緩存隊(duì)列中等待執(zhí)行的任務(wù)列表return tasks;}shutdownNow()方法與shutdown()方法相似,不同之處在于,前者設(shè)置線程池的運(yùn)行狀態(tài)為STOP,之后中斷所有的worker(并非只是空閑的worker),嘗試終止線程池之后,返回任務(wù)緩存隊(duì)列中等待執(zhí)行的任務(wù)列表。
shutdownNow()方法調(diào)用了interruptWorkers()方法中斷所有的worker(并非只是空閑的worker),其實(shí)現(xiàn)如下。
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;//獲取全局鎖mainLock.lock();try {//遍歷workers集合for (Worker w : workers)//調(diào)用Worker類的interruptIfStarted()方法中斷線程w.interruptIfStarted();} finally {//釋放鎖mainLock.unlock();}}五、總結(jié)
至此,我們已經(jīng)閱讀了線程池框架的核心類ThreadPoolExecutor類的大部分源碼,由衷地贊嘆這個(gè)類很多地方設(shè)計(jì)的巧妙之處:
將線程池的運(yùn)行狀態(tài)和工作線程數(shù)量打包在一起,并使用了大量的位運(yùn)算
使用CAS操作更新線程控制狀態(tài)ctl,確保對(duì)ctl的更新是原子操作
內(nèi)部類Worker類繼承了AQS,實(shí)現(xiàn)了一個(gè)自定義的同步器,實(shí)現(xiàn)了不可重入鎖
使用while循環(huán)自旋地從任務(wù)緩存隊(duì)列中獲取任務(wù)并執(zhí)行,實(shí)現(xiàn)了線程復(fù)用機(jī)制
調(diào)用interrupt()方法中斷線程,但注意該方法并不能直接中斷線程的運(yùn)行,只是發(fā)出了中斷信號(hào),配合BlockingQueue的take(),poll()方法的使用,打斷線程的阻塞狀態(tài)
其實(shí),線程池的本質(zhì)就是生產(chǎn)者消費(fèi)者模式,線程池的調(diào)用者不斷向線程池提交任務(wù),線程池里面的工作線程不斷獲取這些任務(wù)并執(zhí)行(從任務(wù)緩存隊(duì)列獲取任務(wù)或者直接執(zhí)行任務(wù))。
讀完本文,相信大家對(duì)線程池的實(shí)現(xiàn)原理有了深刻的認(rèn)識(shí),比如向線程池提交一個(gè)任務(wù)之后線程池的執(zhí)行流程,一個(gè)任務(wù)從被提交到被執(zhí)行會(huì)經(jīng)歷哪些過(guò)程,一個(gè)工作線程從被創(chuàng)建到正常執(zhí)行到執(zhí)行結(jié)束的執(zhí)行過(guò)程,等等。
最后,再附上我歷時(shí)三個(gè)月總結(jié)的?Java 面試 + Java 后端技術(shù)學(xué)習(xí)指南,筆者這幾年及春招的總結(jié),github 1.4k star,拿去不謝!
下載方式
1.?首先掃描下方二維碼
2.?后臺(tái)回復(fù)「Java面試」即可獲取
總結(jié)
以上是生活随笔為你收集整理的深入分析线程池的实现原理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: synchronized 原理知多少
- 下一篇: 非常强悍的 RabbitMQ 总结,写得