c++ socket线程池原理_ThreadPoolExecutor线程池实现原理+源码解析
推薦學(xué)習(xí)
- 被微服務(wù)轟炸?莫怕!耗時(shí)35天整出的「微服務(wù)學(xué)習(xí)教程」送你
- 死磕「并發(fā)編程」100天,全靠阿里大牛的這份最全「高并發(fā)套餐」
- 閉關(guān)28天,奉上[Java一線大廠高崗面試題解析合集],備戰(zhàn)金九銀十
前言
或許每個(gè)Java工程師都被問過這樣一個(gè)問題
Java中開啟一個(gè)新的線程有幾種方法?
繼承Thread類和實(shí)現(xiàn)Runnable接口。但是除了寫Demo,幾乎沒人會(huì)在生產(chǎn)環(huán)境上這樣用。具體原因如下:
- 線程頻繁的被創(chuàng)建、銷毀,非常消耗資源
- 這兩種方式開啟的線程都不便于統(tǒng)一的調(diào)度和管理
- HotSpot虛擬機(jī)采用1:1的模型來實(shí)現(xiàn)Java線程的,也就是說一個(gè)Java線程直接通過一個(gè)操作系統(tǒng)線程來實(shí)現(xiàn),如果可以無限制的開啟線程,很容易導(dǎo)致操作系統(tǒng)資源耗盡。
線程池
繼承Thread和實(shí)現(xiàn)Runnable的諸多缺點(diǎn),所以生產(chǎn)環(huán)境必須使用線程池來實(shí)現(xiàn)多線程。
線程池(thread pool):一種線程使用模式。線程過多會(huì)帶來調(diào)度開銷,進(jìn)而影響緩存局部性和整體性能。而線程池維護(hù)著多個(gè)線程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。這避免了在處理短時(shí)間任務(wù)時(shí)創(chuàng)建與銷毀線程的代價(jià)。線程池不僅能夠保證內(nèi)核的充分利用,還能防止過分調(diào)度。可用線程數(shù)量應(yīng)該取決于可用的并發(fā)處理器、處理器內(nèi)核、內(nèi)存、網(wǎng)絡(luò)sockets等的數(shù)量。 ——維基百科
簡單來說,“池”在計(jì)算機(jī)領(lǐng)域是指集合,線程池就是指線程集合。線程池可以對(duì)一系列線程的生命周期進(jìn)行統(tǒng)一的調(diào)度和管理,包括線程的創(chuàng)建、消亡、生存時(shí)間、數(shù)量控制等。
Java中的線程池從JDK1.5開始,有一個(gè)標(biāo)準(zhǔn)的實(shí)現(xiàn)java.util.concurrent.ThreadPoolExecutor,對(duì)于這個(gè)類,首先看下它的體系結(jié)構(gòu)圖
- Executor:只定義了一個(gè)方法execute,用于執(zhí)行提交的任務(wù)
- ExecutorService:定義了一些線程池管理、任務(wù)提交、線程池檢測(cè)的方法
- AbstractExecutorService:提供了ExecutorService接口執(zhí)行方法的默認(rèn)實(shí)現(xiàn),用于統(tǒng)一處理Callable任務(wù)和Runnable任務(wù)
內(nèi)部結(jié)構(gòu)
這里主要關(guān)注類的定義和一些重要的常量、成員變量
public class ThreadPoolExecutor extends AbstractExecutorService {// 高3位表示線程池狀態(tài),低29位表示worker數(shù)量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 29 = 32 - 3 private static final int COUNT_BITS = Integer.SIZE - 3; // 線程池允許的最大線程數(shù)。為 2^29 - 1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 線程池有5種狀態(tài),按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl // 獲取線程池狀態(tài) private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取線程池worker數(shù)量 private static int workerCountOf(int c) { return c & CAPACITY; } // 根據(jù)線程池狀態(tài)和worker數(shù)量生成ctl值 private static int ctlOf(int rs, int wc) { return rs | wc; } // 緩沖隊(duì)列(阻塞隊(duì)列) private final BlockingQueue workQueue; // 互斥鎖 private final ReentrantLock mainLock = new ReentrantLock(); // 包含線程池工作的所以線程,僅在持有mainLock的時(shí)候能訪問 private final HashSet workers = new HashSet(); private final Condition termination = mainLock.newCondition(); // 跟蹤線程池最大的大小(實(shí)際的最大值),僅在持有mainLock的時(shí)候能訪問 private int largestPoolSize; // 記錄已經(jīng)完成的任務(wù)數(shù),僅在工作線程終止時(shí)更新,僅在持有mainLock的時(shí)候能訪問 private long completedTaskCount; // 線程工廠 private volatile ThreadFactory threadFactory; // 線程池飽和或者關(guān)閉時(shí)的執(zhí)行器 private volatile RejectedExecutionHandler handler; // 空閑線程等待工作的超時(shí)時(shí)間 private volatile long keepAliveTime; // 如果為false(默認(rèn)值),核心線程永遠(yuǎn)不回收 // 如果為true,核心線程也通過keepAliveTime參數(shù)超時(shí)回收 private volatile boolean allowCoreThreadTimeOut; // 核心線程數(shù) private volatile int corePoolSize; // 最大線程數(shù)(程序設(shè)置的最大線程數(shù),區(qū)別于largestPoolSize) private volatile int maximumPoolSize; // 默認(rèn)的拒絕策略處理器,拋出RejectedExecutionException異常 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();}涉及到的成員變量、常量比較多,也不太容易理解,不過看完整篇后再來回顧這里,就很容易理解了。
生命周期
ThreadPoolExecutor類提供了線程池的五個(gè)狀態(tài)描述
// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;這幾種狀態(tài)之間的轉(zhuǎn)換過程如下
- RUNNING:運(yùn)行狀態(tài),可以執(zhí)行任務(wù),也可以接受阻塞隊(duì)列里的任務(wù)調(diào)度
- SHUTDOWN:調(diào)用了shutdown()方法,該狀態(tài)可以繼續(xù)執(zhí)行阻塞隊(duì)列中的任務(wù),但是不會(huì)再接受新任務(wù)
- STOP:調(diào)用了shutdownNow()方法,該狀態(tài)會(huì)嘗試中斷正在執(zhí)行的所有任務(wù),不能繼續(xù)執(zhí)行阻塞隊(duì)列中的任務(wù),也不會(huì)再接受新任務(wù)
- TIDYING:所有任務(wù)都執(zhí)行完畢,至于阻塞隊(duì)列中的任務(wù)是否執(zhí)行完成,取決于調(diào)用了shutdown()還是shutdownNow()方法
- TERMINATED:terminated()方法執(zhí)行完成后進(jìn)入該狀態(tài),terminated()方法默認(rèn)沒有任何操作
構(gòu)造方法
ThreadPoolExecutor提供了四個(gè)構(gòu)造方法,忽略它提供的語法糖,我們直接看最吊的那個(gè)構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) // corePoolSize、maximumPoolSize、keepAliveTime都不能小于0 // 且maximumPoolSize必須大于等于corePoolSize throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) // workQueue、threadFactory、handler均不能為null throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}這個(gè)構(gòu)造方法有七個(gè)參數(shù),如果能明白各個(gè)參數(shù)的作用,那么線程池的工作原理也就基本清晰了。
- int corePoolSize:核心線程數(shù),當(dāng)有新的任務(wù)提交到線程池時(shí),會(huì)進(jìn)行如下判斷:線程池中線程數(shù)量小于corePoolSize時(shí),會(huì)創(chuàng)建新線程處理任務(wù),即使還有其他空閑的核心線程線程池中線程數(shù)量等于corePoolSize時(shí),任務(wù)會(huì)加入到workQueue緩存隊(duì)列,直到緩存隊(duì)列滿了,才會(huì)新建非核心線程去處理任務(wù)線程池中的線程數(shù)量等于maximumPoolSize且緩存隊(duì)列已滿時(shí),會(huì)根據(jù)RejectedExecutionHandler參數(shù)指定的拒絕策略來處理提交的任務(wù)如果corePoolSize和maximumPoolSize相等,則創(chuàng)建的線程池大小是固定的,緩存隊(duì)列滿了就執(zhí)行決絕策略
- int maximumPoolSize:最大線程數(shù)
- long keepAliveTime:非核心線程的最長空閑時(shí)間,超過了會(huì)被回收(allowCoreThreadTimeOut參數(shù)設(shè)置成true,也會(huì)回收核心線程)
- TimeUnit unit:keepAliveTime參數(shù)的單位
- BlockingQueue workQueue:阻塞隊(duì)列,用于緩存,保存正在等待執(zhí)行的任務(wù)。一般有以下幾種配置直接切換:常用的隊(duì)列是SynchronousQueue無界隊(duì)列:常用的隊(duì)列是LinkedBlockingQueue,隊(duì)列基于鏈表實(shí)現(xiàn),最大長度是Integer.MAX_VALUE,雖然是有界的,但是值太大,所以認(rèn)為是無界隊(duì)列。使用無界隊(duì)列可能會(huì)導(dǎo)致最大線程數(shù)maximumPoolSize失效,這點(diǎn)結(jié)合下文的線程池執(zhí)行過程會(huì)很容易理解有界隊(duì)列:常用的隊(duì)列是ArrayBlockingQueue,基于數(shù)組實(shí)現(xiàn),能把最大線程數(shù)控制為maximumPoolSize。也能避免阻塞隊(duì)列中堆積的任務(wù)過多。
- ThreadFactory threadFactory:線程Factory,用來創(chuàng)建線程。使用默認(rèn)的ThreadFactory創(chuàng)建的線程是具有相同優(yōu)先級(jí)的非守護(hù)線程。一般需要自定義ThreadFactory,因?yàn)橐?strong>給每個(gè)線程設(shè)置有意義的名稱。
- RejectedExecutionHandler handler: 當(dāng)線程數(shù)達(dá)到了最大線程數(shù),且沒有線程空閑,且緩沖隊(duì)列也滿了(也就是線程池飽和了),指定拒絕策略,ThreadPoolExecutor自身提供了四種拒絕策略:AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常CallerRunsPolicy:利用調(diào)用者所在的線程執(zhí)行任務(wù),哪個(gè)線程提交這個(gè)任務(wù),就由哪個(gè)線程執(zhí)行DiscardOldestPolicy:丟棄緩存隊(duì)列中頭部的任務(wù),重試提交的任務(wù)DiscardPolicy:直接丟棄顯然默認(rèn)的四種拒絕策略都不能很好的使用在生產(chǎn)環(huán)境,所以一般也需要自定義拒絕策略來處理飽和的任務(wù)。將暫時(shí)無法處理的任務(wù)存入中間件、數(shù)據(jù)庫以及日志記錄。
線程池中線程的數(shù)量并不是越多越好,因?yàn)榉?wù)器的性能總是有限的。線程數(shù)過多會(huì)增加線程切換的開銷,并且空閑線程的頻繁回收也需要消耗資源。線程池的七個(gè)參數(shù)相輔相成,相互影響,設(shè)置的時(shí)候需要根據(jù)實(shí)際情況酌情考慮。
看文字描述多少有些不清晰,如果能有張圖的話就再好不過了。你就說巧不巧吧,剛好我畫了一張圖。
對(duì)照這張圖和上面的描述,相信大家對(duì)ThreadPoolExecutor的七個(gè)參數(shù)有個(gè)深刻的認(rèn)識(shí)。也很容易理解為什么使用無界隊(duì)列LinkedBlockingQueue會(huì)使maximumPoolSize失效了,因?yàn)?strong>緩存隊(duì)列可能永遠(yuǎn)不會(huì)滿。
核心方法
毫無疑問,線程池最核心的方法除了構(gòu)造方法,就是執(zhí)行task的方法了。在看ThreadPoolExecutor的核心方法之前,先看一個(gè)非常非常重要的內(nèi)部類Worker,它是線程池中運(yùn)行任務(wù)的最小單元。
// 繼承了AbstractQueuedSynchronizer,是一把鎖// 實(shí)現(xiàn)了Runnable接口,是一個(gè)線程執(zhí)行的taskprivate final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; /** 運(yùn)行任務(wù)的線程 */ final Thread thread; /** 要運(yùn)行的初始任務(wù),可能為null */ Runnable firstTask; /** 每個(gè)線程的任務(wù)計(jì)數(shù)器 */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 把自己作為一個(gè)任務(wù)傳遞給ThreadFactory創(chuàng)建的線程 this.thread = getThreadFactory().newThread(this); } /** runWorker是一個(gè)非常重要的方法,后文詳細(xì)介紹 */ public void run() { runWorker(this); } // 值為0代表解鎖狀態(tài) // 值為1表示鎖定狀態(tài) protected boolean isHeldExclusively() { return getState() != 0; }// CAS的方式嘗試加鎖 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }// 嘗試釋放鎖 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }}Worker類實(shí)現(xiàn)了Runnable接口,所以本身就是一個(gè)可執(zhí)行的任務(wù),并且在構(gòu)造方法中將自己傳遞給ThreadFactory創(chuàng)建的線程去執(zhí)行
Worker類繼承了AbstractQueuedSynchronizer類,所以它本身也是一把鎖,執(zhí)行任務(wù)的時(shí)候鎖住自己,任務(wù)執(zhí)行完成后解鎖。
了解了Worker類,再來看核心方法。
execute
execute方法用于在將來的某個(gè)時(shí)間執(zhí)行指定的任務(wù),execute方法源碼比較復(fù)雜,應(yīng)該先理清楚整體邏輯,在逐步深入細(xì)節(jié)。
public void execute(Runnable command) { if (command == null) // 提交空任務(wù),直接拋異常 throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // worker數(shù)量小于核心線程數(shù),創(chuàng)建核心線程執(zhí)行任務(wù)(第二個(gè)參數(shù)為true,表示創(chuàng)建核心線程) // addWorker方法會(huì)檢查線程池的狀態(tài) if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // worker數(shù)量超過核心線程數(shù),進(jìn)入緩沖隊(duì)列 // 再次獲取ctl值,因?yàn)閺纳洗潍@取到這里,有可能ctl的值已經(jīng)被改變,double-check int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 線程池不是RUNNING狀態(tài),說明已經(jīng)調(diào)用過shutdown方法,需要對(duì)新提交的任務(wù)執(zhí)行拒絕策略 reject(command); else if (workerCountOf(recheck) == 0) // 因?yàn)闃?gòu)造方法中corePoolSize可能為0或者核心線程也都被回收了,所以此處需要判斷 addWorker(null, false); } else if (!addWorker(command, false)) // 線程池不是RUNNING狀態(tài),或者任務(wù)加入緩沖隊(duì)列失敗,創(chuàng)建非核心線程執(zhí)行任務(wù)(第二個(gè)參數(shù)為false) // 任務(wù)執(zhí)行失敗,需要執(zhí)行拒絕策略 reject(command);}整體邏輯就是前文所示的流程圖。相信有了流程圖的對(duì)比,execute方法的理解就容易多了。
addWorker
addWorker方法用于往線程池添加新的worker。其實(shí)現(xiàn)如下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 這種寫法叫做label語法,一般用于多重性循環(huán)中跳轉(zhuǎn)到指定位置 for (;;) { // 外層自旋 int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 線程池狀態(tài) >= SHUTDOWN if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 內(nèi)層自旋 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 工作中的線程數(shù)大于線程池的容量,或者已經(jīng)大于等于核心線程數(shù),或者大于等于最大線程數(shù) // core為true,表示要?jiǎng)?chuàng)建核心線程,false表示要?jiǎng)?chuàng)建非核心線程 // 為什么大于等核心線程數(shù)的時(shí)候要返回false,因?yàn)橐砑拥骄彌_隊(duì)列,或者創(chuàng)建非核心線程來執(zhí)行,不能創(chuàng)建核心線程了 return false; if (compareAndIncrementWorkerCount(c)) // 以CAS的方式嘗試把線程數(shù)加1 // 注意這里只是把線程池中的線程數(shù)加1,并沒有在線程池中真正的創(chuàng)建線程 // 成功后跳出內(nèi)層自旋 break retry; // CAS失敗,再次獲取ctl,檢查線程池狀態(tài) c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 線程池狀態(tài)被改變了,從外層自旋開始再次執(zhí)行之前的邏輯 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 可以看到兩層自旋 + CAS,僅僅是為了把線程池中的線程數(shù)加1,還沒有新建線程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 把task包裝成Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 加鎖 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 獲取鎖之后,再次檢查線程池的狀態(tài) int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable // 檢查線程狀態(tài) throw new IllegalThreadStateException(); // 添加到worders workers.add(w); int s = workers.size(); if (s > largestPoolSize) // 維護(hù)largestPoolSize變量 largestPoolSize = s; workerAdded = true; } } finally { // 解鎖 mainLock.unlock(); } if (workerAdded) { // 添加成功 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 執(zhí)行worker的線程啟動(dòng)失敗 addWorkerFailed(w); } return workerStarted;}可以看到addWorker方法前一部分,用了外層自旋判斷線程池的狀態(tài),內(nèi)層自旋 + CAS給線程池中的線程數(shù)加1。后半部分用了ReentrantLock保證創(chuàng)建Worker對(duì)象,以及啟動(dòng)線程的線程安全。一個(gè)方法中三次獲取了線程池的狀態(tài)(不包含該方法調(diào)用的其他方法),因?yàn)槊績纱沃g,線程池的狀態(tài)都有可能被改變。
runWorker
前文在介紹Worker內(nèi)部類時(shí)說過,Worker會(huì)把自己傳遞給ThreadFactory創(chuàng)建的線程執(zhí)行,最終執(zhí)行Worker的run方法,而Worker類的run方法只有一行代碼:
runWorker(this);所以接下來看看runWorker方法是如何實(shí)現(xiàn)了
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 允許外部中斷 w.unlock(); // allow interrupts // 記錄worker是不是異常退出的 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 自旋,如果task不為空,或者能從緩沖隊(duì)列(阻塞隊(duì)列)中獲取任務(wù)就繼續(xù)執(zhí)行,不能就一直阻塞 // 加鎖 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 如果線程池正在停止,并且當(dāng)前線程沒有被中斷,就中斷當(dāng)前線程 wt.interrupt(); try { // 鉤子函數(shù),處理task執(zhí)行前的邏輯 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 鉤子函數(shù),處理task執(zhí)行后的邏輯 afterExecute(task, thrown); } } finally { task = null; // 完成的任務(wù)數(shù)加1 w.completedTasks++; // 解鎖 w.unlock(); } } // 運(yùn)行到這里,說明worker沒有異常退出 completedAbruptly = false; } finally { // 自旋操作被打斷了,說明線程需要被回收 processWorkerExit(w, completedAbruptly); }}第10行代碼中,task為null時(shí),會(huì)通過getTask()方法從緩沖隊(duì)列中取任務(wù),因?yàn)榫彌_隊(duì)列是阻塞隊(duì)列,所以如果獲取不到任務(wù)會(huì)一直被阻塞,接下來看看getTask方法的內(nèi)部實(shí)現(xiàn)
getTask
getTask用于阻塞式的從緩沖隊(duì)列中獲取任務(wù)。
private Runnable getTask() {// 線程是否超時(shí) boolean timedOut = false; // Did the last poll() time out? for (;;) { // 自旋 // 獲取線程池狀態(tài) int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 線程池終止了,或者線程池停止了,且緩沖隊(duì)列中沒有任務(wù)了 // 自旋 + CAS方式減少線程計(jì)數(shù) decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 根據(jù)allowCoreThreadTimeOut參數(shù)來判斷,要不要給核心線程設(shè)置等待超時(shí)時(shí)間 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 當(dāng)前線程數(shù)大于了maximumPoolSize(因?yàn)閙aximumPoolSize可以動(dòng)態(tài)修改)或者當(dāng)前線程設(shè)置了超時(shí)時(shí)間且已經(jīng)超時(shí)了 // 且線程數(shù)大于1或者緩沖隊(duì)列為空 // 這個(gè)條件的意思就是:當(dāng)前線程需要被回收 if (compareAndDecrementWorkerCount(c)) // 返回null后,上層runWorker方法中斷循環(huán),執(zhí)行processWorkerExit方法回收線程 return null; continue; } try { // 從阻塞隊(duì)列中獲取任務(wù) Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) // 成功獲取任務(wù) return r; // 沒有獲取到任務(wù),超時(shí) timedOut = true; } catch (InterruptedException retry) { // 線程被中斷,重試 timedOut = false; } }}理解該方法的前提,是要理解阻塞隊(duì)列提供的阻塞式API。
這個(gè)方法重點(diǎn)關(guān)注兩點(diǎn):
- 從緩沖隊(duì)列取任務(wù)時(shí),poll非阻塞,take阻塞,調(diào)用哪個(gè)由當(dāng)前線程需不需要被回收來決定
- 該方法返回null之后,上層方法會(huì)回收當(dāng)前線程
除了這幾個(gè)核心方法之外,往線程池提交任務(wù)還有一個(gè)方法叫submit
public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null); execute(ftask); return ftask;}public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); execute(ftask); return ftask;}public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask;}submit方法可以接收線程池返回的結(jié)果,也就是Futrue對(duì)象,可以接收Runnable對(duì)象和Callable對(duì)象。
至于Future、FutureTask、Runnable、Callable之間的關(guān)系,博主在前一篇博客 如何獲取子線程的執(zhí)行結(jié)果 已經(jīng)詳細(xì)介紹過,此處不再贅述。
至此ThreadPoolExecutor的核心方法的源碼以及執(zhí)行邏輯已經(jīng)講解完畢,再來看一些非核心方法,了解一下即可
- public void shutdown():關(guān)閉線程池,已經(jīng)提交過的任務(wù)還會(huì)執(zhí)行(線程池中未運(yùn)行完畢的,緩沖隊(duì)列中的)
- public List shutdownNow():停止線程池,試圖停止正在執(zhí)行的任務(wù),暫停緩沖隊(duì)列中的任務(wù),并且返回
- public void allowCoreThreadTimeOut(boolean value):設(shè)置核心線程是否允許回收
- protected void beforeExecute(Thread t, Runnable r):鉤子函數(shù),處理線程執(zhí)行任務(wù)前的邏輯,這里是空實(shí)現(xiàn)
- protected void afterExecute(Runnable r, Throwable t):鉤子函數(shù),處理線程執(zhí)行任務(wù)后的邏輯,這里是空實(shí)現(xiàn)
- public int getActiveCount():返回正在執(zhí)行任務(wù)的線程的大致數(shù)量
- public long getCompletedTaskCount():返回執(zhí)行完成的任務(wù)的大致數(shù)量
除此之外還需要了解的是,構(gòu)造方法中的七個(gè)參數(shù),除了BlockingQueue是不能動(dòng)態(tài)設(shè)置外,其余六個(gè)參數(shù)都可以動(dòng)態(tài)設(shè)置,分別調(diào)用對(duì)于的setXxx方法即可,當(dāng)然也可以通過對(duì)于的getXxx方法獲取對(duì)應(yīng)的信息。
鑒于此,我們?cè)賮砜匆粋€(gè)常見的問題
Java有幾種線程池?
JDK(準(zhǔn)確的說是java.util.concurrent.Executors工具類)提供了四種線程池:
- CachedThreadPool:緩沖線程池、
- FixedThreadPool:固定線程數(shù)的線程池
- SingleThreadExecutor:單線程的線程池
- ScheduledThreadPool:可定時(shí)調(diào)度的線程池
仔細(xì)看下這四種線程池,最終都調(diào)用了ThreadPoolExecutor的構(gòu)造方法,只是傳遞的參數(shù)有所不同。
- CachedThreadPool和ScheculedThreadPool設(shè)置的最大線程數(shù)都是Integer.MAX_VALUE,可能線程數(shù)過多而產(chǎn)生OOM
- SingleThreadExecutor和FixedThreadPool使用的都是無界隊(duì)列,最大元素個(gè)數(shù)為Integer.MAX_VALUE,可能緩沖隊(duì)列中堆積的任務(wù)過多,而產(chǎn)生OOM
這兩點(diǎn)正是阿里巴巴代碼規(guī)范里禁止使用這四種線程池的原因。
想要使用線程池,必須通過ThreadPoolExecutor的方法來創(chuàng)建線程池。
總結(jié)
使用線程池需要注意的幾點(diǎn)如下:
- 合理設(shè)置七個(gè)參數(shù)
- 自定義ThreadFactory,給每個(gè)線程設(shè)置有意義的名稱
- 自定義RejectedExecutionHandler,處理線程池飽和時(shí)的邏輯
使用線程池之前一定要十分明確每個(gè)參數(shù)的意義以及對(duì)其他參數(shù)的影響,才能更加合理的使用線程池。
作者:Sicimike
原文鏈接:https://blog.csdn.net/Baisitao_/article/details/100415358
總結(jié)
以上是生活随笔為你收集整理的c++ socket线程池原理_ThreadPoolExecutor线程池实现原理+源码解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 内存泄漏的原因及解决办法_编程基础 |
- 下一篇: 端口扫描有哪几种方式(internet的