Java 线程池中的线程复用是如何实现的?
前幾天,技術群里有個群友問了一個關于線程池的問題,內容如圖所示:
關于線程池相關知識可以先看下這篇:為什么阿里巴巴Java開發手冊中強制要求線程池不允許使用Executors創建?
那么就來和大家探討下這個問題,在線程池中,線程會從 workQueue 中讀取任務來執行,最小的執行單位就是 Worker,Worker 實現了 Runnable 接口,重寫了 run 方法,這個 run 方法是讓每個線程去執行一個循環,在這個循環代碼中,去判斷是否有任務待執行,若有則直接去執行這個任務,因此線程數不會增加。
如下是線程池創建線程的整體流程圖:
首先會判斷線程池的狀態,也就是是否在運行,若線程為非運行狀態,則會拒絕。接下來會判斷線程數是否小于核心線程數,若小于核心線程數,會新建工作線程并執行任務,隨著任務的增多,線程數會慢慢增加至核心線程數,如果此時還有任務提交,就會判斷阻塞隊列?workQueue?是否已滿,若沒滿,則會將任務放入到阻塞隊列中,等待工作線程獲得并執行,如果任務提交非常多,使得阻塞隊達到上限,會去判斷線程數是否小于最大線程數?maximumPoolSize,若小于最大線程數,線程池會添加工作線程并執行任務,如果仍然有大量任務提交,使得線程數等于最大線程數,如果此時還有任務提交,就會被拒絕。
現在我們對這個流程大致有所了解,那么讓我們去看看源碼是如何實現的吧!
線程池的任務提交從 submit 方法來說,submit 方法是 AbstractExecutorService 抽象類定義的,主要做了兩件事情:
把 Runnable 和 Callable 都轉化成 FutureTask
使用 execute 方法執行 FutureTask
execute 方法是 ThreadPoolExecutor 中的方法,源碼如下:
public void execute(Runnable command) {// 若任務為空,則拋 NPE,不能執行空任務if (command == null) {throw new NullPointerException();}int c = ctl.get();// 若工作線程數小于核心線程數,則創建新的線程,并把當前任務 command 作為這個線程的第一個任務if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) {return;}c = ctl.get();}/*** 至此,有以下兩種情況:* 1.當前工作線程數大于等于核心線程數* 2.新建線程失敗* 此時會嘗試將任務添加到阻塞隊列 workQueue*/// 若線程池處于 RUNNING 狀態,將任務添加到阻塞隊列 workQueue 中if (isRunning(c) && workQueue.offer(command)) {// 再次檢查線程池標記int recheck = ctl.get();// 如果線程池已不處于 RUNNING 狀態,那么移除已入隊的任務,并且執行拒絕策略if (!isRunning(recheck) && remove(command)) {// 任務添加到阻塞隊列失敗,執行拒絕策略reject(command);}// 如果線程池還是 RUNNING 的,并且線程數為 0,那么開啟新的線程else if (workerCountOf(recheck) == 0) {addWorker(null, false);}}/*** 至此,有以下兩種情況:* 1.線程池處于非運行狀態,線程池不再接受新的線程* 2.線程處于運行狀態,但是阻塞隊列已滿,無法加入到阻塞隊列* 此時會嘗試以最大線程數為界創建新的工作線程*/else if (!addWorker(command, false)) {// 任務進入線程池失敗,執行拒絕策略reject(command);} }可以看到 execute 方法中的的核心方法為 addWorker,再去看 addWorker 方法之前,先看下 Worker 的初始化方法:
Worker(Runnable firstTask) {// 每個任務的鎖狀態初始化為-1,這樣工作線程在運行之前禁止中斷setState(-1);this.firstTask = firstTask;// 把 Worker 作為 thread 運行的任務this.thread = getThreadFactory().newThread(this); }在 Worker 初始化時把當前 Worker 作為線程的構造器入參,接下來從 addWorker 方法中可以找到如下代碼:
final Thread t = w.thread; // 如果成功添加了 Worker,就可以啟動 Worker 了 if (workerAdded) {t.start();workerStarted = true; }這塊代碼是添加 worker 成功,調用 start 方法啟動線程,Thread t = w.thread;?此時的 w 是 Worker 的引用,那么t.start();實際上執行的就是 Worker 的 run 方法。
Worker 的 run 方法中調用了 runWorker 方法,簡化后的 runWorker 源碼如下:
final void runWorker(Worker w) {Runnable task = w.firstTask;while (task != null || (task = getTask()) != null) {try {task.run();} finally {task = null;}} }這個 while 循環有個 getTask 方法,getTask 的主要作用是阻塞從隊列中拿任務出來,如果隊列中有任務,那么就可以拿出來執行,如果隊列中沒有任務,這個線程會一直阻塞到有任務為止(或者超時阻塞),其中 getTask 方法的時序圖如下:
其中線程復用的關鍵是 1.6 和 1.7 部分,這部分源碼如下:
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();使用隊列的 poll 或 take 方法從隊列中拿數據,根據隊列的特性,隊列中有任務可以返回,隊列中無任務會阻塞。
線程池的線程復用就是通過取 Worker 的 firstTask 或者通過 getTask 方法從 workQueue 中不停地取任務,并直接調用 Runnable 的 run 方法來執行任務,這樣就保證了每個線程都始終在一個循環中,反復獲取任務,然后執行任務,從而實現了線程的復用。
總結
最好的關系就是互相成就,大家的在看、轉發、留言三連就是我創作的最大動力。
更詳細的源碼解析可以點擊鏈接查看:https://github.com/wupeixuan/JDKSourceCode1.8
參考
https://github.com/wupeixuan/JDKSourceCode1.8
面試官系統精講Java源碼及大廠真題
Java并發編程學習寶典
Java 并發面試 78 講
總結
以上是生活随笔為你收集整理的Java 线程池中的线程复用是如何实现的?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深度 | 一条查询SQL的前世今生 ——
- 下一篇: 原来不只是fastjson,这个你每天都