Java多线程(十一)之线程池深入分析(上)
線程池是并發(fā)包里面很重要的一部分,在實際情況中也是使用很多的一個重要組件。
下圖描述的是線程池API的一部分。廣義上的完整線程池可能還包括Thread/Runnable、Timer/TimerTask等部分。這里只介紹主要的和高級的API以及架構和原理。
大多數(shù)并發(fā)應用程序是圍繞執(zhí)行任務(Task)進行管理的。所謂任務就是抽象、離散的工作單元(unit of work)。把一個應用程序的工作(work)分離到任務中,可以簡化程序的管理;這種分離還在不同事物間劃分了自然的分界線,可以方便程序在出現(xiàn)錯誤時進行恢復;同時這種分離還可以為并行工作提供一個自然的結構,有利于提高程序的并發(fā)性。下面通過任務的執(zhí)行策略來引入Executor相關的介紹。
?
一、任務的執(zhí)行策略
?
任務的執(zhí)行策略包括4W3H部分:
- 任務在什么(What)線程中執(zhí)行
- 任務以什么(What)順序執(zhí)行(FIFO/LIFO/優(yōu)先級等)
- 同時有多少個(How Many)任務并發(fā)執(zhí)行
- 允許有多少個(How Many)個任務進入執(zhí)行隊列
- 系統(tǒng)過載時選擇放棄哪一個(Which)任務,如何(How)通知應用程序這個動作
- 任務執(zhí)行的開始、結束應該做什么(What)處理
在后面的章節(jié)中會詳細分寫這些策略是如何實現(xiàn)的。我們先來簡單回答些如何滿足上面的條件。
?
二、線程池Executor的類體系結構與常用線程池
?
?
Java里面線程池的頂級接口是Executor,但是嚴格意義上講Executor并不是一個線程池,而只是一個執(zhí)行線程的工具。真正的線程池接口是ExecutorService。
下面這張圖完整描述了線程池的類體系結構。
?
首先Executor的execute方法只是執(zhí)行一個Runnable的任務,當然了從某種角度上將最后的實現(xiàn)類也是在線程中啟動此任務的。根據(jù)線程池的執(zhí)行策略最后這個任務可能在新的線程中執(zhí)行,或者線程池中的某個線程,甚至是調用者線程中執(zhí)行(相當于直接運行Runnable的run方法)。這點在后面會詳細說明。
ExecutorService在Executor的基礎上增加了一些方法,其中有兩個核心的方法:
- Future<?> submit(Runnable task)
- <T> Future<T> submit(Callable<T> task)
這兩個方法都是向線程池中提交任務,它們的區(qū)別在于Runnable在執(zhí)行完畢后沒有結果,Callable執(zhí)行完畢后有一個結果。這在多個線程中傳遞狀態(tài)和結果是非常有用的。另外他們的相同點在于都返回一個Future對象。Future對象可以阻塞線程直到運行完畢(獲取結果,如果有的話),也可以取消任務執(zhí)行,當然也能夠檢測任務是否被取消或者是否執(zhí)行完畢。
?
?
要配置一個線程池是比較復雜的,尤其是對于線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優(yōu)的,因此在
Executors類里面提供了一些靜態(tài)工廠,生成一些常用的線程池。
- newSingleThreadExecutor:創(chuàng)建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當于單線程串行執(zhí)行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執(zhí)行順序按照任務的提交順序執(zhí)行。
- newFixedThreadPool:創(chuàng)建固定大小的線程池。每次提交一個任務就創(chuàng)建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結束,那么線程池會補充一個新線程。
- newCachedThreadPool:創(chuàng)建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執(zhí)行任務)的線程,當任務數(shù)增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說JVM)能夠創(chuàng)建的最大線程大小。
- newScheduledThreadPool:創(chuàng)建一個大小無限的線程池。此線程池支持定時以及周期性執(zhí)行任務的需求。
- newSingleThreadScheduledExecutor:創(chuàng)建一個單線程的線程池。此線程池支持定時以及周期性執(zhí)行任務的需求。
?
三、線程池Executor的數(shù)據(jù)結構
?
?
由于已經(jīng)看到了ThreadPoolExecutor的源碼,因此很容易就看到了ThreadPoolExecutor線程池的數(shù)據(jù)結構。下圖3描述了這種數(shù)據(jù)結構。
圖3 ThreadPoolExecutor 數(shù)據(jù)結構
其實,即使沒有上述圖形描述ThreadPoolExecutor的數(shù)據(jù)結構,我們根據(jù)線程池的要求也很能夠猜測出其數(shù)據(jù)結構出來。
- 線程池需要支持多個線程并發(fā)執(zhí)行,因此有一個線程集合Collection<Thread>來執(zhí)行線程任務;
- 涉及任務的異步執(zhí)行,因此需要有一個集合來緩存任務隊列Collection<Runnable>;
- 很顯然在多個線程之間協(xié)調多個任務,那么就需要一個線程安全的任務集合,同時還需要支持阻塞、超時操作,那么BlockingQueue是必不可少的;
- 既然是線程池,出發(fā)點就是提高系統(tǒng)性能同時降低資源消耗,那么線程池的大小就有限制,因此需要有一個核心線程池大小(線程個數(shù))和一個最大線程池大小(線程個數(shù)),有一個計數(shù)用來描述當前線程池大小;
- 如果是有限的線程池大小,那么長時間不使用的線程資源就應該銷毀掉,這樣就需要一個線程空閑時間的計數(shù)來描述線程何時被銷毀;
- 前面描述過線程池也是有生命周期的,因此需要有一個狀態(tài)來描述線程池當前的運行狀態(tài);
- 線程池的任務隊列如果有邊界,那么就需要有一個任務拒絕策略來處理過多的任務,同時在線程池的銷毀階段也需要有一個任務拒絕策略來處理新加入的任務;
- 上面種的線程池大小、線程空閑實際那、線程池運行狀態(tài)等等狀態(tài)改變都不是線程安全的,因此需要有一個全局的鎖(mainLock)來協(xié)調這些競爭資源;
- 除了以上數(shù)據(jù)結構以外,ThreadPoolExecutor還有一些狀態(tài)用來描述線程池的運行計數(shù),例如線程池運行的任務數(shù)、曾經(jīng)達到的最大線程數(shù),主要用于調試和性能分析。
?
四、線程池Executor生命周期
?
?
線程池Executor是異步的執(zhí)行任務,因此任何時刻不能夠直接獲取提交的任務的狀態(tài)。這些任務有可能已經(jīng)完成,也有可能正在執(zhí)行或者還在排隊等待執(zhí)行。因此關閉線程池可能出現(xiàn)一下幾種情況:
- 平緩關閉:已經(jīng)啟動的任務全部執(zhí)行完畢,同時不再接受新的任務
- 立即關閉:取消所有正在執(zhí)行和未執(zhí)行的任務
另外關閉線程池后對于任務的狀態(tài)應該有相應的反饋信息。
?
圖4 描述了線程池的4種狀態(tài)。
- 線程池在構造前(new操作)是初始狀態(tài),一旦構造完成線程池就進入了執(zhí)行狀態(tài)RUNNING。嚴格意義上講線程池構造完成后并沒有線程被立即啟動,只有進行“預啟動”或者接收到任務的時候才會啟動線程。這個會后面線程池的原理會詳細分析。但是線程池是出于運行狀態(tài),隨時準備接受任務來執(zhí)行。
- 線程池運行中可以通過shutdown()和shutdownNow()來改變運行狀態(tài)。shutdown()是一個平緩的關閉過程,線程池停止接受新的任務,同時等待已經(jīng)提交的任務執(zhí)行完畢,包括那些進入隊列還沒有開始的任務,這時候線程池處于SHUTDOWN狀態(tài);shutdownNow()是一個立即關閉過程,線程池停止接受新的任務,同時線程池取消所有執(zhí)行的任務和已經(jīng)進入隊列但是還沒有執(zhí)行的任務,這時候線程池處于STOP狀態(tài)。
- 一旦shutdown()或者shutdownNow()執(zhí)行完畢,線程池就進入TERMINATED狀態(tài),此時線程池就結束了。
- isTerminating()描述的是SHUTDOWN和STOP兩種狀態(tài)。
- isShutdown()描述的是非RUNNING狀態(tài),也就是SHUTDOWN/STOP/TERMINATED三種狀態(tài)。
?
圖4
線程池的API如下:
圖5
其中shutdownNow()會返回那些已經(jīng)進入了隊列但是還沒有執(zhí)行的任務列表。awaitTermination描述的是等待線程池關閉的時間,如果等待時間線程池還沒有關閉將會拋出一個超時異常。
對于關閉線程池期間發(fā)生的任務提交情況就會觸發(fā)一個拒絕執(zhí)行的操作。這是java.util.concurrent.RejectedExecutionHandler描述的任務操作。下一個小結中將描述這些任務被拒絕后的操作。
?
總結下這個小節(jié):
線程池有運行、關閉、停止、結束四種狀態(tài),結束后就會釋放所有資源
平緩關閉線程池使用shutdown()
立即關閉線程池使用shutdownNow(),同時得到未執(zhí)行的任務列表
檢測線程池是否正處于關閉中,使用isShutdown()
檢測線程池是否已經(jīng)關閉使用isTerminated()
定時或者永久等待線程池關閉結束使用awaitTermination()操作
?
五、線程池Executor任務拒絕策略
?
緊接上面,對于關閉線程池期間發(fā)生的任務提交情況就會觸發(fā)一個拒絕執(zhí)行的操作。這是java.util.concurrent.RejectedExecutionHandler描述的任務操作。
先來分析下為什么有任務拒絕的情況發(fā)生。
這里先假設一個前提:線程池有一個任務隊列,用于緩存所有待處理的任務,正在處理的任務將從任務隊列中移除。因此在任務隊列長度有限的情況下就會出現(xiàn)新任務的拒絕處理問題,需要有一種策略來處理應該加入任務隊列卻因為隊列已滿無法加入的情況。另外在線程池關閉的時候也需要對任務加入隊列操作進行額外的協(xié)調處理。
?
RejectedExecutionHandler提供了四種方式來處理任務拒絕策略。
這四種策略是獨立無關的,是對任務拒絕處理的四種表現(xiàn)形式。
最簡單的方式就是直接丟棄任務。但是卻有兩種方式,到底是該丟棄哪一個任務,比如可以丟棄當前將要加入隊列的任務本身(DiscardPolicy)或者丟棄任務隊列中最舊任務(DiscardOldestPolicy)。丟棄最舊任務也不是簡單的丟棄最舊的任務,而是有一些額外的處理。除了丟棄任務還可以直接拋出一個異常(RejectedExecutionException),這是比較簡單的方式。拋出異常的方式(AbortPolicy)盡管實現(xiàn)方式比較簡單,但是由于拋出一個RuntimeException,因此會中斷調用者的處理過程。除了拋出異常以外還可以不進入線程池執(zhí)行,在這種方式(CallerRunsPolicy)中任務將有調用者線程去執(zhí)行。
?
上面是一些理論知識,下面結合一些例子進行分析討論。
?
package xylz.study.concurrency;
import java.lang.reflect.Field;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
public class ExecutorServiceDemo {
static void log(String msg) {
System.out.println(System.currentTimeMillis() + " -> " + msg);
}
static int getThreadPoolRunState(ThreadPoolExecutor pool) throws Exception {
Field f = ThreadPoolExecutor.class.getDeclaredField("runState");
f.setAccessible(true);
int v = f.getInt(pool);
return v;
}
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 10; i++) {
final int index = i;
pool.submit(new Runnable() {
public void run() {
log("run task:" + index + " -> " + Thread.currentThread().getName());
try {
Thread.sleep(1000L);
} catch (Exception e) {
e.printStackTrace();
}
log("run over:" + index + " -> " + Thread.currentThread().getName());
}
});
}
log("before sleep");
Thread.sleep(4000L);
log("before shutdown()");
pool.shutdown();
log("after shutdown(),pool.isTerminated=" + pool.isTerminated());
pool.awaitTermination(1000L, TimeUnit.SECONDS);
log("now,pool.isTerminated=" + pool.isTerminated() + ", state="
+ getThreadPoolRunState(pool));
}
}
?
?
?
第一種方式直接丟棄(DiscardPolicy)的輸出結果是:
?
1294494050696 -> run task:0
1294494050696 -> before sleep
1294494051697 -> run over:0 -> pool-1-thread-1
1294494051697 -> run task:1
1294494052697 -> run over:1 -> pool-1-thread-1
1294494054697 -> before shutdown()
1294494054697 -> after shutdown(),pool.isTerminated=false
1294494054698 -> now,pool.isTerminated=true, state=3
?
?
對于上面的結果需要補充幾點。
如果把策略換成丟棄最舊任務(DiscardOldestPolicy),結果會稍有不同。
?
1294494484622 -> run task:0
1294494484622 -> before sleep
1294494485622 -> run over:0 -> pool-1-thread-1
1294494485622 -> run task:9
1294494486622 -> run over:9 -> pool-1-thread-1
1294494488622 -> before shutdown()
1294494488622 -> after shutdown(),pool.isTerminated=false
1294494488623 -> now,pool.isTerminated=true, state=3
?
?
這里依然只是執(zhí)行兩個任務,但是換成了任務task0和task9。實際上task1~task8還是進入了任務隊列,只不過被task9擠出去了。
對于異常策略(AbortPolicy)就比較簡單,這回調用線程的任務執(zhí)行。
對于調用線程執(zhí)行方式(CallerRunsPolicy),輸出的結果就有意思了。
?
1294496076266 -> run task:2 -> main
1294496076266 -> run task:0 -> pool-1-thread-1
1294496077266 -> run over:0 -> pool-1-thread-1
1294496077266 -> run task:1 -> pool-1-thread-1
1294496077266 -> run over:2 -> main
1294496077266 -> run task:4 -> main
1294496078267 -> run over:4 -> main
1294496078267 -> run task:5 -> main
1294496078267 -> run over:1 -> pool-1-thread-1
1294496078267 -> run task:3 -> pool-1-thread-1
1294496079267 -> run over:3 -> pool-1-thread-1
1294496079267 -> run over:5 -> main
1294496079267 -> run task:7 -> main
1294496079267 -> run task:6 -> pool-1-thread-1
1294496080267 -> run over:7 -> main
1294496080267 -> run task:9 -> main
1294496080267 -> run over:6 -> pool-1-thread-1
1294496080267 -> run task:8 -> pool-1-thread-1
1294496081268 -> run over:9 -> main
1294496081268 -> before sleep
1294496081268 -> run over:8 -> pool-1-thread-1
1294496085268 -> before shutdown()
1294496085268 -> after shutdown(),pool.isTerminated=false
1294496085269 -> now,pool.isTerminated=true, state=3
?
?
參考內容:
深入淺出 Java Concurrency (28): 線程池 part 1 簡介
http://www.blogjava.net/xylz/archive/2010/12/19/341098.html
深入淺出 Java Concurrency (29): 線程池 part 2 Executor 以及Executors
http://www.blogjava.net/xylz/archive/2010/12/21/341281.html
深入淺出 Java Concurrency (30): 線程池 part 3 Executor 生命周期
http://www.blogjava.net/xylz/archive/2011/01/04/342316.html
深入淺出 Java Concurrency (31): 線程池 part 4 線程池任務拒絕策略
http://www.blogjava.net/xylz/archive/2011/01/08/342609.html
java的concurrent用法詳解
http://blog.csdn.net/a511596982/article/details/8063742
總結
以上是生活随笔為你收集整理的Java多线程(十一)之线程池深入分析(上)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java多线程(二)之Atomic:原子
- 下一篇: Java多线程(十二)之线程池深入分析(