【多线程】ThreadPoolExecutor 类的使用详解
ThreadPoolExecutor 構造方法
ThreadPoolExecutor共4個構造方法:
咱們直接看參數最多的7個參數分別代表:
一個任務通過 execute(Runnable)方法被添加到線程池,任務就是一個 Runnable類型的對象,任務的執行方法就是 Runnable類型對象的run()方法。
一個任務通過execute(Runnable)方法欲添加到線程池時
- 如果此時線程池中的數量小于corePoolSize,即使線程池中的線程都處于空閑狀態,也要創建新的線程來處理被添加的任務。
- 如果此時線程池中的數量等于 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
- 如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量小于maximumPoolSize,建新的線程來處理被添加的任務。
- 如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量等于maximumPoolSize,那么通過 handler所指定的策略來處理此任務。
處理任務的優先級為
- 核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
當線程池中的線程數量大于 corePoolSize 時,如果某線程空閑時間超過 keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
workQueue任務隊列
任務隊列,被添加到線程池中,但尚未被執行的任務;它一般分為直接提交隊列、有界任務隊列、無界任務隊列、優先任務隊列幾種;
直接切換
設置為SynchronousQueue隊列,SynchronousQueue是一個特殊的BlockingQueue,它沒有容量,沒執行一個插入操作就會阻塞,需要再執行一個刪除操作才會被喚醒,反之每一個刪除操作也都要等待對應的插入操作。
使用SynchronousQueue隊列,提交的任務不會被保存,總是會馬上提交執行。如果用于執行任務的線程數量小于maximumPoolSize,則嘗試創建新的進程,如果達到maximumPoolSize設置的最大值,則根據你設置的handler執行拒絕策略。因此這種方式你提交的任務不會被緩存起來,而是會被馬上執行,在這種情況下,你需要對你程序的并發量有個準確的評估,才能設置合適的maximumPoolSize數量,否則很容易就會執行拒絕策略
無界隊列
一般使用基于鏈表的阻塞隊列LinkedBlockingQueue。使用無界任務隊列,線程池的任務隊列可以無限制的添加新的任務,而線程池創建的最大線程數量就是你corePoolSize設置的數量,也就是說在這種情況下maximumPoolSize這個參數是無效的,哪怕你的任務隊列中緩存了很多未執行的任務,當線程池的線程數達到corePoolSize后,就不會再增加了;若后續有新的任務加入,則直接進入隊列等待,當使用這種任務隊列模式時,一定要注意你任務提交與處理之間的協調與控制,不然會出現隊列中的任務由于無法及時處理導致一直增長,直到最后資源耗盡的問題。
使用有界隊列
一般使用ArrayBlockingQueue。使用該方式可以將線程池的最大線程數量限制為maximumPoolSize,這樣能夠降低資源的消耗,但同時這種方式也使得線程池對線程的調度變得更困難,因為線程池和隊列的容量都是有限的值,所以要想使線程池處理任務的吞吐率達到一個相對合理的范圍,又想使線程調度相對簡單,并且還要盡可能的降低線程池對資源的消耗,就需要合理的設置這兩個數量。
使用ArrayBlockingQueue有界任務隊列,若有新的任務需要執行時,線程池會創建新的線程,直到創建的線程數量達到corePoolSize時,則會將新的任務加入到等待隊列中。若等待隊列已滿,即超過ArrayBlockingQueue初始化的容量,則繼續創建線程,直到線程數量達到maximumPoolSize設置的最大線程數量,若大于maximumPoolSize,則執行拒絕策略。在這種情況下,線程數量的上限與有界任務隊列的狀態有直接關系,如果有界隊列初始容量較大或者沒有達到超負荷的狀態,線程數將一直維持在corePoolSize以下,反之當任務隊列已滿時,則會以maximumPoolSize為最大線程數上限。
- 如果要想降低系統資源的消耗(包括CPU的使用率,操作系統資源的消耗,上下文環境切換的開銷等), 可以設置較大的隊列容量和較小的線程池容量, 但這樣也會降低線程處理任務的吞吐量。
- 如果提交的任務經常發生阻塞,那么可以考慮通過調用 setMaximumPoolSize() 方法來重新設定線程池的容量。
- 如果隊列的容量設置的較小,通常需要將線程池的容量設置大一點,這樣CPU的使用率會相對的高一些。但如果線程池的容量設置的過大,則在提交的任務數量太多的情況下,并發量會增加,那么線程之間的調度就是一個要考慮的問題,因為這樣反而有可能降低處理任務的吞吐量。
ThreadPoolExecutor內部有實現4個拒絕策略,默認為AbortPolicy策略
- CallerRunsPolicy:由調用execute方法提交任務的線程來執行這個任務
- AbortPolicy:拋出異常RejectedExecutionException拒絕提交任務
- DiscardPolicy:直接拋棄任務,不做任何處理
- DiscardOldestPolicy:去除任務隊列中的第一個任務,重新提交
ThreadPoolExecutor 使用
1,當池中正在運行的線程數(包括空閑線程數)小于corePoolSize時,新建線程執行任務
public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));// 任務1pool.execute(() -> {try {Thread.sleep(3 * 1000);System.out.println("--helloWorld_001--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});//任務2pool.execute(() -> System.out.println("--helloWorld_002--" + Thread.currentThread().getName()));}運行結果:
–helloWorld_001–pool-1-thread-1
–helloWorld_002–pool-1-thread-2
結論:線程1 結束后 沒有繼續線程1 而是啟動線程2
2,當池中正在運行的線程數(包括空閑線程數)大于等于corePoolSize時,新插入的任務進入workQueue排隊(如果workQueue長度允許),等待空閑線程來執行。
public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));// 任務1pool.execute(() -> {try {Thread.sleep(3 * 1000);System.out.println("--helloWorld_001--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});// 任務2pool.execute(() -> {try {Thread.sleep(5 * 1000);System.out.println("--helloWorld_002--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});// 任務3pool.execute(() -> System.out.println("--helloWorld_003--" + Thread.currentThread().getName()));}運行結果:
–helloWorld_001–pool-1-thread-1
–helloWorld_003–pool-1-thread-1
–helloWorld_002–pool-1-thread-2
結論:任務2在運行過程中,任務3啟動不會新建線程,因為有一個隊列是空的,maximumPoolSize=3這個參數不起作用。
3,當隊列里的任務達到上限,并且池中正在進行的線程小于maxinumPoolSize,對于新加入的任務,新建線程。
public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));// 任務1pool.execute(() -> {try {Thread.sleep(3 * 1000);System.out.println("--helloWorld_001--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});// 任務2pool.execute(() -> {try {Thread.sleep(5 * 1000);System.out.println("--helloWorld_002--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});// 任務3pool.execute(() -> System.out.println("--helloWorld_003--" + Thread.currentThread().getName()));// 任務4pool.execute(() -> System.out.println("--helloWorld_004--" + Thread.currentThread().getName()));}運行結果:
–helloWorld_004–pool-1-thread-3
–helloWorld_003–pool-1-thread-3
–helloWorld_001–pool-1-thread-1
–helloWorld_002–pool-1-thread-2
結果:任務1,2啟動后 任務3在隊列 , 隊列就滿了, 由于正在進行的線程數是2 < maximumPoolSize,只能新建一個線程了 然后任務4就進了新線程-3,任務4結束,隊列里的任務3在線程3 進行。
4,隊列里的任務達到上限,并且池中正在運行的線程等于maximumPoolSize,對于新加入的任務,執行拒絕策略(線程池默認的策略是拋異常)。
public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));// 任務1pool.execute(() -> {try {Thread.sleep(3 * 1000);System.out.println("--helloWorld_001--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});// 任務2pool.execute(() -> {try {Thread.sleep(5 * 1000);System.out.println("--helloWorld_002--" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});// 任務3pool.execute(() -> System.out.println("--helloWorld_003--" + Thread.currentThread().getName()));// 任務4pool.execute(() -> {try {Thread.sleep(2 * 1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("--helloWorld_004--" + Thread.currentThread().getName());});// 任務5pool.execute(() -> System.out.println("--helloWorld_005--" + Thread.currentThread().getName()));}運行結果:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task cs.wy.Thread.ThreadPoolExecutorTest$$Lambda$5/999966131@7699a589 rejected from java.util.concurrent.ThreadPoolExecutor@58372a00[Running, pool size = 3, active threads = 3, queued tasks = 1, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at cs.wy.Thread.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:40) --helloWorld_004--pool-1-thread-3 --helloWorld_003--pool-1-thread-3 --helloWorld_001--pool-1-thread-1 --helloWorld_002--pool-1-thread-2結論:隊列達到上限,線程池達到最大值,故拋出異常。
關閉線程
分為兩種方式:
//平緩關閉,不允許新的線程加入,正在運行的都跑完即可關閉。 pool.shutdown(); //暴力關閉。不允許新的線程加入,且直接停到正在進行的線程。 pool.shutdownNow();總結
以上是生活随笔為你收集整理的【多线程】ThreadPoolExecutor 类的使用详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【多线程】Synchronized及实现
- 下一篇: 【多线程】ThreadPoolExecu