Java 线程池艺术探索
線程池
Wiki 上是這樣解釋的:Thread Pool
作用:利用線程池可以大大減少在創(chuàng)建和銷毀線程上所花的時(shí)間以及系統(tǒng)資源的開銷!
下面主要講下線程池中最重要的一個(gè)類 ThreadPoolExecutor 。
ThreadPoolExecutor
ThreadPoolExecutor 構(gòu)造器:
有四個(gè)構(gòu)造器的,挑了參數(shù)最長(zhǎng)的一個(gè)進(jìn)行講解。
七個(gè)參數(shù):
- corePoolSize:核心池的大小,在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒有任何線程,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù),當(dāng)有任務(wù)來之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中;
- maximumPoolSize:線程池最大線程數(shù);
- keepAliveTime:表示線程沒有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止;
- unit:參數(shù)keepAliveTime的時(shí)間單位(DAYS、HOURS、MINUTES、SECONDS 等);
- workQueue:阻塞隊(duì)列,用來存儲(chǔ)等待執(zhí)行的任務(wù);
- ArrayBlockingQueue (有界隊(duì)列)
- LinkedBlockingQueue (無界隊(duì)列)
- SynchronousQueue
- threadFactory:線程工廠,主要用來創(chuàng)建線程
-
handler:拒絕處理任務(wù)的策略
- AbortPolicy:丟棄任務(wù)并拋出 RejectedExecutionException 異常。(默認(rèn)這種)
- DiscardPolicy:也是丟棄任務(wù),但是不拋出異常
- DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
- CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
重要方法:
- execute():通過這個(gè)方法可以向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行;
- shutdown():關(guān)閉線程池;
execute() 方法:
注:JDK 1.7 和 1.8 這個(gè)方法有點(diǎn)區(qū)別,下面代碼是 1.8 中的。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
? public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //1、如果當(dāng)前的線程數(shù)小于核心線程池的大小,根據(jù)現(xiàn)有的線程作為第一個(gè) Worker 運(yùn)行的線程,新建一個(gè) Worker,addWorker 自動(dòng)的檢查當(dāng)前線程池的狀態(tài)和 Worker 的數(shù)量,防止線程池在不能添加線程的狀態(tài)下添加線程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //2、如果線程入隊(duì)成功,然后還是要進(jìn)行 double-check 的,因?yàn)榫€程在入隊(duì)之后狀態(tài)是可能會(huì)發(fā)生變化的 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // recheck 防止線程池狀態(tài)的突變,如果突變,那么將 reject 線程,防止 workQueue 中增加新線程 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0)//上下兩個(gè)操作都有 addWorker 的操作,但是如果在workQueue.offer 的時(shí)候 Worker 變?yōu)?0,那么將沒有 Worker 執(zhí)行新的 task,所以增加一個(gè) Worker. addWorker(null, false); } //3、如果 task 不能入隊(duì)(隊(duì)列滿了),這時(shí)候嘗試增加一個(gè)新線程,如果增加失敗那么當(dāng)前的線程池狀態(tài)變化了或者線程池已經(jīng)滿了然后拒絕task else if (!addWorker(command, false)) reject(command); } |
其中調(diào)用了 addWorker() 方法:
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
? private boolean addWorker(Runnable firstTask, boolean core) {// firstTask: 新增一個(gè)線程并執(zhí)行這個(gè)任務(wù),可空,增加的線程從隊(duì)列獲取任務(wù);core:是否使用 corePoolSize 作為上限,否則使用 maxmunPoolSize retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * rs!=Shutdown || fistTask!=null || workQueue.isEmpty * 如果當(dāng)前的線程池的狀態(tài) > SHUTDOWN 那么拒絕 Worker 的 add 如果 =SHUTDOWN * 那么此時(shí)不能新加入不為 null 的 Task,如果在 workQueue 為 empty 的時(shí)候不能加入任何類型的 Worker, * 如果不為 empty 可以加入 task 為 null 的 Worker, 增加消費(fèi)的 Worker */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //如果當(dāng)前的數(shù)量超過了 CAPACITY,或者超過了 corePoolSize 和 maximumPoolSize(試 core 而定) if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS 嘗試增加線程數(shù),如果失敗,證明有競(jìng)爭(zhēng),那么重新到 retry。 if (compareAndIncrementWorkerCount(c))// AtomicInteger 的 CAS 操作; break retry; c = ctl.get(); // Re-read ctl //判斷當(dāng)前線程池的運(yùn)行狀態(tài),狀態(tài)發(fā)生改變,重試 retry; if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);// Worker 為內(nèi)部類,封裝了線程和任務(wù),通過 ThreadFactory 創(chuàng)建線程,可能失敗拋異?;蛘叻祷?null final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable // SHUTDOWN 以后的狀態(tài)和 SHUTDOWN 狀態(tài)下 firstTask 為 null,不可新增線程 throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s;//記錄最大線程數(shù) workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w);//失敗回退,從 wokers 移除 w, 線程數(shù)減一,嘗試結(jié)束線程池(調(diào)用tryTerminate 方法) } return workerStarted; } |
示意圖:
執(zhí)行流程:
1、當(dāng)有任務(wù)進(jìn)入時(shí),線程池創(chuàng)建線程去執(zhí)行任務(wù),直到核心線程數(shù)滿為止
2、核心線程數(shù)量滿了之后,任務(wù)就會(huì)進(jìn)入一個(gè)緩沖的任務(wù)隊(duì)列中
- 當(dāng)任務(wù)隊(duì)列為無界隊(duì)列時(shí),任務(wù)就會(huì)一直放入緩沖的任務(wù)隊(duì)列中,不會(huì)和最大線程數(shù)量進(jìn)行比較
- 當(dāng)任務(wù)隊(duì)列為有界隊(duì)列時(shí),任務(wù)先放入緩沖的任務(wù)隊(duì)列中,當(dāng)任務(wù)隊(duì)列滿了之后,才會(huì)將任務(wù)放入線程池,此時(shí)會(huì)與線程池中最大的線程數(shù)量進(jìn)行比較,如果超出了,則默認(rèn)會(huì)拋出異常。然后線程池才會(huì)執(zhí)行任務(wù),當(dāng)任務(wù)執(zhí)行完,又會(huì)將緩沖隊(duì)列中的任務(wù)放入線程池中,然后重復(fù)此操作。
shutdown() 方法:
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
? public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //判斷是否可以操作目標(biāo)線程 checkShutdownAccess(); //設(shè)置線程池狀態(tài)為 SHUTDOWN, 此處之后,線程池中不會(huì)增加新 Task advanceRunState(SHUTDOWN); //中斷所有的空閑線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //轉(zhuǎn)到 Terminate tryTerminate(); } |
參考資料:深入理解java線程池—ThreadPoolExecutor
JDK 自帶四種線程池分析與比較
1、newFixedThreadPool
創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。
2、newSingleThreadExecutor
創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
3、newCachedThreadPool
創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
4、newScheduledThreadPool
創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。
四種線程池其實(shí)內(nèi)部方法都是調(diào)用的 ThreadPoolExecutor 類,只不過利用了其不同的構(gòu)造器方法而已(傳入自己需要傳入的參數(shù)),那么利用這個(gè)特性,我們自己也是可以實(shí)現(xiàn)自己定義的線程池的。
自定義線程池
1、創(chuàng)建任務(wù)類
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
? package com.zhisheng.thread.threadpool.demo; /** * Created by 10412 on 2017/7/24. * 任務(wù) */ public class MyTask implements Runnable { private int taskId; //任務(wù) id private String taskName; //任務(wù)名字 public int getTaskId() { return taskId; } public void setTaskId(int taskId) { this.taskId = taskId; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public MyTask(int taskId, String taskName) { this.taskId = taskId; this.taskName = taskName; } @Override public void run() { System.out.println("當(dāng)前正在執(zhí)行 ****** 線程Id-->" + taskId + ",任務(wù)名稱-->" + taskName); try { Thread.currentThread().sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線程Id-->" + taskId + ",任務(wù)名稱-->" + taskName + " ----------- 執(zhí)行完畢!"); } } |
2、自定義拒絕策略,實(shí)現(xiàn) RejectedExecutionHandler 接口,重寫 rejectedExecution 方法
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
? package com.zhisheng.thread.threadpool.demo; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * Created by 10412 on 2017/7/24. * 自定義拒絕策略,實(shí)現(xiàn) RejectedExecutionHandler 接口 */ public class RejectedThreadPoolHandler implements RejectedExecutionHandler { public RejectedThreadPoolHandler() { } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("WARNING 自定義拒絕策略: Task " + r.toString() + " rejected from " + executor.toString()); } } |
3、創(chuàng)建線程池
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
? package com.zhisheng.thread.threadpool.demo; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by 10412 on 2017/7/24. */ public class ThreadPool { public static void main(String[] args) { //核心線程數(shù)量為 2,最大線程數(shù)量 4,空閑線程存活的時(shí)間 60s,有界隊(duì)列長(zhǎng)度為 3, //ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3)); //核心線程數(shù)量為 2,最大線程數(shù)量 4,空閑線程存活的時(shí)間 60s, 無界隊(duì)列, //ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); //核心線程數(shù)量為 2,最大線程數(shù)量 4,空閑線程存活的時(shí)間 60s,有界隊(duì)列長(zhǎng)度為 3, 使用自定義拒絕策略 ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new RejectedThreadPoolHandler()); for (int i = 1; i <= 10; i++) { //創(chuàng)建 10 個(gè)任務(wù) MyTask task = new MyTask(i, "任務(wù)" + i); //運(yùn)行 pool.execute(task); System.out.println("活躍的線程數(shù):"+pool.getActiveCount() + ",核心線程數(shù):" + pool.getCorePoolSize() + ",線程池大小:" + pool.getPoolSize() + ",隊(duì)列的大小" + pool.getQueue().size()); } //關(guān)閉線程池 pool.shutdown(); } } |
這里運(yùn)行結(jié)果就不截圖了,我在本地測(cè)試了代碼是沒問題的,感興趣的建議還是自己跑一下,然后分析下結(jié)果是不是和前面分析的一樣,如有問題,請(qǐng)?jiān)谖也┛拖旅嬖u(píng)論!
總結(jié)
本文一開始講了線程池的介紹和好處,然后分析了線程池中最核心的 ThreadPoolExecutor 類中構(gòu)造器的七個(gè)參數(shù)的作用、類中兩個(gè)重要的方法,然后在對(duì)比研究了下 JDK 中自帶的四種線程池的用法和內(nèi)部代碼細(xì)節(jié),最后寫了一個(gè)自定義的線程池。
總結(jié)
以上是生活随笔為你收集整理的Java 线程池艺术探索的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hystrix文档-实现原理
- 下一篇: 通过源码详解 Servlet