Java基础——深入理解Java线程池
簡(jiǎn)介
? ? ? ?我們使用線程的時(shí)候就去創(chuàng)建一個(gè)線程,這樣實(shí)現(xiàn)起來(lái)非常簡(jiǎn)便,但是就會(huì)有一個(gè)問(wèn)題:
? ? ? ?如果并發(fā)的線程數(shù)量很多,并且每個(gè)線程都是執(zhí)行一個(gè)時(shí)間很短的任務(wù)就結(jié)束了,這樣頻繁創(chuàng)建線程就會(huì)大大降低系統(tǒng)的效率,因?yàn)轭l繁創(chuàng)建線程和銷(xiāo)毀線程需要時(shí)間。
? ? ? ?那么有沒(méi)有一種辦法使得線程可以復(fù)用,就是執(zhí)行完一個(gè)任務(wù),并不被銷(xiāo)毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)?
? ? ? ?在Java中可以通過(guò)線程池來(lái)達(dá)到這樣的效果。今天我們就來(lái)詳細(xì)講解一下Java的線程池,首先我們從最核心的ThreadPoolExecutor類(lèi)中的方法講起,然后再講述它的實(shí)現(xiàn)原理,接著給出了它的使用示例,最后討論了一下如何合理配置線程池的大小。
以下是本文的目錄大綱:
一、Java中的ThreadPoolExecutor類(lèi)
二、深入剖析線程池實(shí)現(xiàn)原理
三、使用示例
四、為什么使用線程池
五、使用線程池的風(fēng)險(xiǎn)
六、有效使用線程池的準(zhǔn)則
七、線程池的大小設(shè)置
一、Java中的ThreadPoolExecutor類(lèi)
? ? ? ?java.uitl.concurrent.ThreadPoolExecutor類(lèi)是線程池中最核心的一個(gè)類(lèi),因此如果要透徹地了解Java中的線程池,必須先了解這個(gè)類(lèi)。下面我們來(lái)看一下ThreadPoolExecutor類(lèi)的具體實(shí)現(xiàn)源碼。
在ThreadPoolExecutor類(lèi)中提供了四個(gè)構(gòu)造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {.....public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);... }? ? ? ?從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類(lèi),并提供了四個(gè)構(gòu)造器,事實(shí)上,通過(guò)觀察每個(gè)構(gòu)造器的源碼具體實(shí)現(xiàn),發(fā)現(xiàn)前面三個(gè)構(gòu)造器都是調(diào)用的第四個(gè)構(gòu)造器進(jìn)行的初始化工作。
?下面解釋下一下構(gòu)造器中各個(gè)參數(shù)的含義:
- corePoolSize:核心池的大小,這個(gè)參數(shù)跟后面講述的線程池的實(shí)現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒(méi)有任何線程,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個(gè)方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒(méi)有任務(wù)到來(lái)之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中;
- maximumPoolSize:線程池最大線程數(shù),這個(gè)參數(shù)也是一個(gè)非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個(gè)線程;
- keepAliveTime:表示線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才會(huì)起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止,直到線程池中的線程數(shù)不超過(guò)corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí),keepAliveTime參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0;
- unit:參數(shù)keepAliveTime的時(shí)間單位,有7種取值,在TimeUnit類(lèi)中有7種靜態(tài)屬性:
- workQueue:一個(gè)阻塞隊(duì)列,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù),這個(gè)參數(shù)的選擇也很重要,會(huì)對(duì)線程池的運(yùn)行過(guò)程產(chǎn)生重大影響,一般來(lái)說(shuō),這里的阻塞隊(duì)列有以下幾種選擇:
? ? ? ?ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊(duì)策略與BlockingQueue有關(guān)。
- threadFactory:線程工廠,主要用來(lái)創(chuàng)建線程;
- handler:表示當(dāng)拒絕處理任務(wù)時(shí)的策略,有以下四種取值:
? ? ? ?具體參數(shù)的配置與線程池的關(guān)系將在下一節(jié)講述。
? ? ? ?從上面給出的ThreadPoolExecutor類(lèi)的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來(lái)看一下AbstractExecutorService的實(shí)現(xiàn):
public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };public Future<?> submit(Runnable task) {};public <T> Future<T> submit(Runnable task, T result) { };public <T> Future<T> submit(Callable<T> task) { };private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {};public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {};public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {};public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {};public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {}; }? ? ? ?AbstractExecutorService是一個(gè)抽象類(lèi),它實(shí)現(xiàn)了ExecutorService接口。
? ? ? ?我們接著看ExecutorService接口的實(shí)現(xiàn):
public interface ExecutorService extends Executor {void shutdown();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; }? ? ? ?而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實(shí)現(xiàn):public interface Executor {void execute(Runnable command); }
? ? ? ?到這里,大家應(yīng)該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個(gè)之間的關(guān)系了。
- Executor是一個(gè)頂層接口,在它里面只聲明了一個(gè)方法execute(Runnable),返回值為void,參數(shù)為Runnable類(lèi)型,從字面意思可以理解,就是用來(lái)執(zhí)行傳進(jìn)去的任務(wù)的;
- 然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
- 抽象類(lèi)AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口,基本實(shí)現(xiàn)了ExecutorService中聲明的所有方法;
- 然后ThreadPoolExecutor繼承了類(lèi)AbstractExecutorService。
? ? ? ?在ThreadPoolExecutor類(lèi)中有幾個(gè)非常重要的方法:
execute() submit() shutdown() shutdownNow()? ? ? ?execute()方法實(shí)際上是Executor中聲明的方法,在ThreadPoolExecutor進(jìn)行了具體的實(shí)現(xiàn),這個(gè)方法是ThreadPoolExecutor的核心方法,通過(guò)這個(gè)方法可以向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行。
? ? ? ?submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實(shí)現(xiàn),在ThreadPoolExecutor中并沒(méi)有對(duì)其進(jìn)行重寫(xiě),這個(gè)方法也是用來(lái)向線程池提交任務(wù)的,但是它和execute()方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果,去看submit()方法的實(shí)現(xiàn),會(huì)發(fā)現(xiàn)它實(shí)際上還是調(diào)用的execute()方法,只不過(guò)它利用了Future來(lái)獲取任務(wù)執(zhí)行結(jié)果(Future相關(guān)內(nèi)容將在下一篇講述)。
? ? ? ?shutdown()和shutdownNow()是用來(lái)關(guān)閉線程池的。
? ? ? ?還有很多其他的方法:
? ? ? ?比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關(guān)屬性的方法,有興趣的朋友可以自行查閱API。
二、深入剖析線程池實(shí)現(xiàn)原理
在上一節(jié)我們從宏觀上介紹了ThreadPoolExecutor,下面我們來(lái)深入解析一下線程池的具體實(shí)現(xiàn)原理,將從下面幾個(gè)方面講解:
1、線程池狀態(tài)
2、任務(wù)的執(zhí)行
3、線程池中的線程初始化
4、任務(wù)緩存隊(duì)列及排隊(duì)策略
5、任務(wù)拒絕策略
6、線程池的關(guān)閉
7、線程池容量的動(dòng)態(tài)調(diào)整
1、線程池狀態(tài)
在ThreadPoolExecutor中定義了一個(gè)volatile變量,另外定義了幾個(gè)static final變量表示線程池的各個(gè)狀態(tài):
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
? ? ? ?runState:表示當(dāng)前線程池的狀態(tài),它是一個(gè)volatile變量用來(lái)保證線程之間的可見(jiàn)性;
下面的幾個(gè)static final變量表示runState可能的幾個(gè)取值:
? ? ? ?RUNNING:當(dāng)創(chuàng)建線程池后,初始時(shí),線程池處于RUNNING狀態(tài);
? ? ? ?SHUTDOWN:如果調(diào)用了shutdown()方法,則線程池處于SHUTDOWN狀態(tài),此時(shí)線程池不能夠接受新的任務(wù),它會(huì)等待所有任務(wù)執(zhí)行完畢;
? ? ? ?STOP:如果調(diào)用了shutdownNow()方法,則線程池處于STOP狀態(tài),此時(shí)線程池不能接受新的任務(wù),并且會(huì)去嘗試終止正在執(zhí)行的任務(wù);
? ? ? ?TERMINATED:當(dāng)線程池處于SHUTDOWN或STOP狀態(tài),并且所有工作線程已經(jīng)銷(xiāo)毀,任務(wù)緩存隊(duì)列已經(jīng)清空或執(zhí)行結(jié)束后,線程池被設(shè)置為T(mén)ERMINATED狀態(tài)。
2、任務(wù)的執(zhí)行
? ? ? ?在了解將任務(wù)提交給線程池到任務(wù)執(zhí)行完畢整個(gè)過(guò)程之前,我們先來(lái)看一下ThreadPoolExecutor類(lèi)中其他的一些比較重要成員變量:
private final BlockingQueue<Runnable> workQueue; //任務(wù)緩存隊(duì)列,用來(lái)存放等待執(zhí)行的任務(wù) private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態(tài)鎖,對(duì)線程池狀態(tài)(比如線程池大小//、runState等)的改變都要使用這個(gè)鎖 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來(lái)存放工作集private volatile long keepAliveTime; //線程存活時(shí)間 private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設(shè)置存活時(shí)間 private volatile int corePoolSize; //核心池的大小(即線程池中的線程數(shù)目大于這個(gè)參數(shù)時(shí),提交的任務(wù)會(huì)被放進(jìn)任務(wù)緩存隊(duì)列) private volatile int maximumPoolSize; //線程池最大能容忍的線程數(shù)private volatile int poolSize; //線程池中當(dāng)前的線程數(shù)private volatile RejectedExecutionHandler handler; //任務(wù)拒絕策略private volatile ThreadFactory threadFactory; //線程工廠,用來(lái)創(chuàng)建線程private int largestPoolSize; //用來(lái)記錄線程池中曾經(jīng)出現(xiàn)過(guò)的最大線程數(shù)private long completedTaskCount; //用來(lái)記錄已經(jīng)執(zhí)行完畢的任務(wù)個(gè)數(shù)
? ? ? ?每個(gè)變量的作用都已經(jīng)標(biāo)明出來(lái)了,這里要重點(diǎn)解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個(gè)變量。
? ? ? ?corePoolSize在很多地方被翻譯成核心池大小,其實(shí)我的理解這個(gè)就是線程池的大小。舉個(gè)簡(jiǎn)單的例子:
? ? ? ?假如有一個(gè)工廠,工廠里面有10個(gè)工人,每個(gè)工人同時(shí)只能做一件任務(wù)。
? ? ? ?因此只要當(dāng)10個(gè)工人中有工人是空閑的,來(lái)了任務(wù)就分配給空閑的工人做;
? ? ? ?當(dāng)10個(gè)工人都有任務(wù)在做時(shí),如果還來(lái)了任務(wù),就把任務(wù)進(jìn)行排隊(duì)等待;
? ? ? ?如果說(shuō)新任務(wù)數(shù)目增長(zhǎng)的速度遠(yuǎn)遠(yuǎn)大于工人做任務(wù)的速度,那么此時(shí)工廠主管可能會(huì)想補(bǔ)救措施,比如重新招4個(gè)臨時(shí)工人進(jìn)來(lái);
? ? ? ?然后就將任務(wù)也分配給這4個(gè)臨時(shí)工人做;
? ? ? ?如果說(shuō)著14個(gè)工人做任務(wù)的速度還是不夠,此時(shí)工廠主管可能就要考慮不再接收新的任務(wù)或者拋棄前面的一些任務(wù)了。
? ? ? ?當(dāng)這14個(gè)工人當(dāng)中有人空閑時(shí),而新任務(wù)增長(zhǎng)的速度又比較緩慢,工廠主管可能就考慮辭掉4個(gè)臨時(shí)工了,只保持原來(lái)的10個(gè)工人,畢竟請(qǐng)額外的工人是要花錢(qián)的。
? ? ? ?這個(gè)例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
? ? ? ?也就是說(shuō)corePoolSize就是線程池大小,maximumPoolSize在我看來(lái)是線程池的一種補(bǔ)救措施,即任務(wù)量突然過(guò)大時(shí)的一種補(bǔ)救措施。
? ? ? ?不過(guò)為了方便理解,在本文后面還是將corePoolSize翻譯成核心池大小。
? ? ? ?largestPoolSize只是一個(gè)用來(lái)起記錄作用的變量,用來(lái)記錄線程池中曾經(jīng)有過(guò)的最大線程數(shù)目,跟線程池的容量沒(méi)有任何關(guān)系。
? ? ? ?下面我們進(jìn)入正題,看一下任務(wù)從提交到最終執(zhí)行完畢經(jīng)歷了哪些過(guò)程。
? ? ? ?在ThreadPoolExecutor類(lèi)中,最核心的任務(wù)提交方法是execute()方法,雖然通過(guò)submit也可以提交任務(wù),但是實(shí)際上submit方法里面最終調(diào)用的還是execute()方法,所以我們只需要研究execute()方法的實(shí)現(xiàn)原理即可:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {if (runState == RUNNING && workQueue.offer(command)) {if (runState != RUNNING || poolSize == 0)ensureQueuedTaskHandled(command);}else if (!addIfUnderMaximumPoolSize(command))reject(command); // is shutdown or saturated} }? ? ? ?上面的代碼可能看起來(lái)不是那么容易理解,下面我們一句一句解釋:
? ? ? ?首先,判斷提交的任務(wù)command是否為null,若是null,則拋出空指針異常;
? ? ? ?接著是這句,這句要好好理解一下:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))? ? ? ?由于是或條件運(yùn)算符,所以先計(jì)算前半部分的值,如果線程池中當(dāng)前線程數(shù)不小于核心池大小,那么就會(huì)直接進(jìn)入下面的if語(yǔ)句塊了。
? ? ? ?如果線程池中當(dāng)前線程數(shù)小于核心池大小,則接著執(zhí)行后半部分,也就是執(zhí)行
addIfUnderCorePoolSize(command)? ? ? ?如果執(zhí)行完addIfUnderCorePoolSize這個(gè)方法返回false,則繼續(xù)執(zhí)行下面的if語(yǔ)句塊,否則整個(gè)方法就直接執(zhí)行完畢了。
? ? ? ?如果執(zhí)行完addIfUnderCorePoolSize這個(gè)方法返回false,然后接著判斷:
if (runState == RUNNING && workQueue.offer(command))? ? ? ?如果當(dāng)前線程池處于RUNNING狀態(tài),則將任務(wù)放入任務(wù)緩存隊(duì)列;如果當(dāng)前線程池不處于RUNNING狀態(tài)或者任務(wù)放入緩存隊(duì)列失敗,則執(zhí)行:addIfUnderMaximumPoolSize(command)
? ? ? ?如果執(zhí)行addIfUnderMaximumPoolSize方法失敗,則執(zhí)行reject()方法進(jìn)行任務(wù)拒絕處理。
? ? ? ?回到前面:
if (runState == RUNNING && workQueue.offer(command))? ? ? ?這句的執(zhí)行,如果說(shuō)當(dāng)前線程池處于RUNNING狀態(tài)且將任務(wù)放入任務(wù)緩存隊(duì)列成功,則繼續(xù)進(jìn)行判斷:if (runState != RUNNING || poolSize == 0)? ? ? ?這句判斷是為了防止在將此任務(wù)添加進(jìn)任務(wù)緩存隊(duì)列的同時(shí)其他線程突然調(diào)用shutdown或者shutdownNow方法關(guān)閉了線程池的一種應(yīng)急措施。如果是這樣就執(zhí)行:
ensureQueuedTaskHandled(command)
? ? ? ?進(jìn)行應(yīng)急處理,從名字可以看出是保證 添加到任務(wù)緩存隊(duì)列中的任務(wù)得到處理。
? ? ? ?我們接著看2個(gè)關(guān)鍵方法的實(shí)現(xiàn):addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < corePoolSize && runState == RUNNING)t = addThread(firstTask); //創(chuàng)建線程去執(zhí)行firstTask任務(wù) } finally {mainLock.unlock();}if (t == null)return false;t.start();return true; }? ? ? ?這個(gè)是addIfUnderCorePoolSize方法的具體實(shí)現(xiàn),從名字可以看出它的意圖就是當(dāng)?shù)陀诤诵某卮笮r(shí)執(zhí)行的方法。下面看其具體實(shí)現(xiàn),首先獲取到鎖,因?yàn)檫@地方涉及到線程池狀態(tài)的變化,先通過(guò)if語(yǔ)句判斷當(dāng)前線程池中的線程數(shù)目是否小于核心池大小,有朋友也許會(huì)有疑問(wèn):前面在execute()方法中不是已經(jīng)判斷過(guò)了嗎,只有線程池當(dāng)前線程數(shù)目小于核心池大小才會(huì)執(zhí)行addIfUnderCorePoolSize方法的,為何這地方還要繼續(xù)判斷?原因很簡(jiǎn)單,前面的判斷過(guò)程中并沒(méi)有加鎖,因此可能在execute方法判斷的時(shí)候poolSize小于corePoolSize,而判斷完之后,在其他線程中又向線程池提交了任務(wù),就可能導(dǎo)致poolSize不小于corePoolSize了,所以需要在這個(gè)地方繼續(xù)判斷。然后接著判斷線程池的狀態(tài)是否為RUNNING,原因也很簡(jiǎn)單,因?yàn)橛锌赡茉谄渌€程中調(diào)用了shutdown或者shutdownNow方法。然后就是執(zhí)行t = addThread(firstTask);
? ? ? ?這個(gè)方法也非常關(guān)鍵,傳進(jìn)去的參數(shù)為提交的任務(wù),返回值為T(mén)hread類(lèi)型。然后接著在下面判斷t是否為空,為空則表明創(chuàng)建線程失敗(即poolSize>=corePoolSize或者runState不等于RUNNING),否則調(diào)用t.start()方法啟動(dòng)線程。
? ? ? ?我們來(lái)看一下addThread方法的實(shí)現(xiàn):
private Thread addThread(Runnable firstTask) {Worker w = new Worker(firstTask);Thread t = threadFactory.newThread(w); //創(chuàng)建一個(gè)線程,執(zhí)行任務(wù) if (t != null) {w.thread = t; //將創(chuàng)建的線程的引用賦值為w的成員變量 workers.add(w);int nt = ++poolSize; //當(dāng)前線程數(shù)加1 if (nt > largestPoolSize)largestPoolSize = nt;}return t; }? ? ? ?在addThread方法中,首先用提交的任務(wù)創(chuàng)建了一個(gè)Worker對(duì)象,然后調(diào)用線程工廠threadFactory創(chuàng)建了一個(gè)新的線程t,然后將線程t的引用賦值給了Worker對(duì)象的成員變量thread,接著通過(guò)workers.add(w)將Worker對(duì)象添加到工作集當(dāng)中。
? ? ? ?下面我們看一下Worker類(lèi)的實(shí)現(xiàn):
private final class Worker implements Runnable {private final ReentrantLock runLock = new ReentrantLock();private Runnable firstTask;volatile long completedTasks;Thread thread;Worker(Runnable firstTask) {this.firstTask = firstTask;}boolean isActive() {return runLock.isLocked();}void interruptIfIdle() {final ReentrantLock runLock = this.runLock;if (runLock.tryLock()) {try {if (thread != Thread.currentThread())thread.interrupt();} finally {runLock.unlock();}}}void interruptNow() {thread.interrupt();}private void runTask(Runnable task) {final ReentrantLock runLock = this.runLock;runLock.lock();try {if (runState < STOP &&Thread.interrupted() &&runState >= STOP)boolean ran = false;beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類(lèi)的一個(gè)方法,沒(méi)有具體實(shí)現(xiàn),用戶(hù)可以根據(jù)//自己需要重載這個(gè)方法和后面的afterExecute方法來(lái)進(jìn)行一些統(tǒng)計(jì)信息,比如某個(gè)任務(wù)的執(zhí)行時(shí)間等 try {task.run();ran = true;afterExecute(task, null);++completedTasks;} catch (RuntimeException ex) {if (!ran)afterExecute(task, ex);throw ex;}} finally {runLock.unlock();}}public void run() {try {Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this); //當(dāng)任務(wù)隊(duì)列中沒(méi)有任務(wù)時(shí),進(jìn)行清理工作 }} }? ? ? ?它實(shí)際上實(shí)現(xiàn)了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本一樣:Thread t = new Thread(w);
? ? ? ?相當(dāng)于傳進(jìn)去了一個(gè)Runnable任務(wù),在線程t中執(zhí)行這個(gè)Runnable。
? ? ? ?既然Worker實(shí)現(xiàn)了Runnable接口,那么自然最核心的方法便是run()方法了:
public void run() {try {Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this);} }? ? ? ?從run方法的實(shí)現(xiàn)可以看出,它首先執(zhí)行的是通過(guò)構(gòu)造器傳進(jìn)來(lái)的任務(wù)firstTask,在調(diào)用runTask()執(zhí)行完firstTask之后,在while循環(huán)里面不斷通過(guò)getTask()去取新的任務(wù)來(lái)執(zhí)行,那么去哪里取呢?自然是從任務(wù)緩存隊(duì)列里面去取,getTask是ThreadPoolExecutor類(lèi)中的方法,并不是Worker類(lèi)中的方法,下面是getTask方法的實(shí)現(xiàn):Runnable getTask() {for (;;) {try {int state = runState;if (state > SHUTDOWN)return null;Runnable r;if (state == SHUTDOWN) // Help drain queuer = workQueue.poll();else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數(shù)大于核心池大小或者允許為核心池線程設(shè)置空閑時(shí)間,//則通過(guò)poll取任務(wù),若等待一定的時(shí)間取不到任務(wù),則返回nullr = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);elser = workQueue.take();if (r != null)return r;if (workerCanExit()) { //如果沒(méi)取到任務(wù),即r為null,則判斷當(dāng)前的worker是否可以退出if (runState >= SHUTDOWN) // Wake up othersinterruptIdleWorkers(); //中斷處于空閑狀態(tài)的workerreturn null;}// Else retry} catch (InterruptedException ie) {// On interruption, re-check runState}} }
? ? ? ?在getTask中,先判斷當(dāng)前線程池狀態(tài),如果runState大于SHUTDOWN(即為STOP或者TERMINATED),則直接返回null。
? ? ? ?如果runState為SHUTDOWN或者RUNNING,則從任務(wù)緩存隊(duì)列取任務(wù)。
? ? ? ?如果當(dāng)前線程池的線程數(shù)大于核心池大小corePoolSize或者允許為核心池中的線程設(shè)置空閑存活時(shí)間,則調(diào)用poll(time,timeUnit)來(lái)取任務(wù),這個(gè)方法會(huì)等待一定的時(shí)間,如果取不到任務(wù)就返回null。
? ? ? ?然后判斷取到的任務(wù)r是否為null,為null則通過(guò)調(diào)用workerCanExit()方法來(lái)判斷當(dāng)前worker是否可以退出,我們看一下workerCanExit()的實(shí)現(xiàn):
private boolean workerCanExit() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();boolean canExit;//如果runState大于等于STOP,或者任務(wù)緩存隊(duì)列為空了//或者 允許為核心池線程設(shè)置空閑存活時(shí)間并且線程池中的線程數(shù)目大于1try {canExit = runState >= STOP ||workQueue.isEmpty() ||(allowCoreThreadTimeOut &&poolSize > Math.max(1, corePoolSize));} finally {mainLock.unlock();}return canExit; }? ? ? ?也就是說(shuō)如果線程池處于STOP狀態(tài)、或者任務(wù)隊(duì)列已為空或者允許為核心池線程設(shè)置空閑存活時(shí)間并且線程數(shù)大于1時(shí),允許worker退出。如果允許worker退出,則調(diào)用interruptIdleWorkers()中斷處于空閑狀態(tài)的worker,我們看一下interruptIdleWorkers()的實(shí)現(xiàn):void interruptIdleWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) //實(shí)際上調(diào)用的是worker的interruptIfIdle()方法w.interruptIfIdle();} finally {mainLock.unlock();} }? ? ? ?從實(shí)現(xiàn)可以看出,它實(shí)際上調(diào)用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:
void interruptIfIdle() {final ReentrantLock runLock = this.runLock;if (runLock.tryLock()) { //注意這里,是調(diào)用tryLock()來(lái)獲取鎖的,因?yàn)槿绻?dāng)前worker正在執(zhí)行任務(wù),鎖已經(jīng)被獲取了,是無(wú)法獲取到鎖的//如果成功獲取了鎖,說(shuō)明當(dāng)前worker處于空閑狀態(tài)try {if (thread != Thread.currentThread()) thread.interrupt();} finally {runLock.unlock();}} }
? ? ? ?這里有一個(gè)非常巧妙的設(shè)計(jì)方式,假如我們來(lái)設(shè)計(jì)線程池,可能會(huì)有一個(gè)任務(wù)分派線程,當(dāng)發(fā)現(xiàn)有線程空閑時(shí),就從任務(wù)緩存隊(duì)列中取一個(gè)任務(wù)交給空閑線程執(zhí)行。但是在這里,并沒(méi)有采用這樣的方式,因?yàn)檫@樣會(huì)要額外地對(duì)任務(wù)分派線程進(jìn)行管理,無(wú)形地會(huì)增加難度和復(fù)雜度,這里直接讓執(zhí)行完任務(wù)的線程去任務(wù)緩存隊(duì)列里面取任務(wù)來(lái)執(zhí)行。
? ? ? ? 我們?cè)倏碼ddIfUnderMaximumPoolSize方法的實(shí)現(xiàn),這個(gè)方法的實(shí)現(xiàn)思想和addIfUnderCorePoolSize方法的實(shí)現(xiàn)思想非常相似,唯一的區(qū)別在于addIfUnderMaximumPoolSize方法是在線程池中的線程數(shù)達(dá)到了核心池大小并且往任務(wù)隊(duì)列中添加任務(wù)失敗的情況下執(zhí)行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < maximumPoolSize && runState == RUNNING)t = addThread(firstTask);} finally {mainLock.unlock();}if (t == null)return false;t.start();return true; }? ? ? ?看到?jīng)]有,其實(shí)它和addIfUnderCorePoolSize方法的實(shí)現(xiàn)基本一模一樣,只是if語(yǔ)句判斷條件中的poolSize < maximumPoolSize不同而已。
到這里,大部分朋友應(yīng)該對(duì)任務(wù)提交給線程池之后到被執(zhí)行的整個(gè)過(guò)程有了一個(gè)基本的了解,下面總結(jié)一下:
? ? ? ?1)首先,要清楚corePoolSize和maximumPoolSize的含義;
? ? ? ?2)其次,要知道Worker是用來(lái)起到什么作用的;
? ? ? ?3)要知道任務(wù)提交給線程池之后的處理策略,這里總結(jié)一下主要有4點(diǎn):
- 如果當(dāng)前線程池中的線程數(shù)目小于corePoolSize,則每來(lái)一個(gè)任務(wù),就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行這個(gè)任務(wù);
- 如果當(dāng)前線程池中的線程數(shù)目>=corePoolSize,則每來(lái)一個(gè)任務(wù),會(huì)嘗試將其添加到任務(wù)緩存隊(duì)列當(dāng)中,若添加成功,則該任務(wù)會(huì)等待空閑線程將其取出去執(zhí)行;若添加失敗(一般來(lái)說(shuō)是任務(wù)緩存隊(duì)列已滿(mǎn)),則會(huì)嘗試創(chuàng)建新的線程去執(zhí)行這個(gè)任務(wù);
- 如果當(dāng)前線程池中的線程數(shù)目達(dá)到maximumPoolSize,則會(huì)采取任務(wù)拒絕策略進(jìn)行處理;
- 如果線程池中的線程數(shù)量大于 corePoolSize時(shí),如果某線程空閑時(shí)間超過(guò)keepAliveTime,線程將被終止,直至線程池中的線程數(shù)目不大于corePoolSize;如果允許為核心池中的線程設(shè)置存活時(shí)間,那么核心池中的線程空閑時(shí)間超過(guò)keepAliveTime,線程也會(huì)被終止。
3、線程池中的線程初始化
? ? ? ?默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒(méi)有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。
在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過(guò)以下兩個(gè)方法辦到:
- prestartCoreThread():初始化一個(gè)核心線程;
- prestartAllCoreThreads():初始化所有核心線程。
下面是這2個(gè)方法的實(shí)現(xiàn):
public boolean prestartCoreThread() {return addIfUnderCorePoolSize(null); //注意傳進(jìn)去的參數(shù)是null }public int prestartAllCoreThreads() {int n = 0;while (addIfUnderCorePoolSize(null))//注意傳進(jìn)去的參數(shù)是null++n;return n; }? ? ? ?注意上面?zhèn)鬟M(jìn)去的參數(shù)是null,根據(jù)第2小節(jié)的分析可知如果傳進(jìn)去的參數(shù)為null,則最后執(zhí)行線程會(huì)阻塞在getTask方法中的。r = workQueue.take();? ? ? ?即等待任務(wù)隊(duì)列中有任務(wù)。
4、任務(wù)緩存隊(duì)列及排隊(duì)策略
? ? ? ?在前面我們多次提到了任務(wù)緩存隊(duì)列,即workQueue,它用來(lái)存放等待執(zhí)行的任務(wù)。
workQueue的類(lèi)型為BlockingQueue<Runnable>,通常可以取下面三種類(lèi)型:
? ? ? ?1)ArrayBlockingQueue:基于數(shù)組的先進(jìn)先出隊(duì)列,此隊(duì)列創(chuàng)建時(shí)必須指定大小;
? ? ? ?2)LinkedBlockingQueue:基于鏈表的先進(jìn)先出隊(duì)列,如果創(chuàng)建時(shí)沒(méi)有指定此隊(duì)列大小,則默認(rèn)為Integer.MAX_VALUE;
? ? ? ?3)synchronousQueue:這個(gè)隊(duì)列比較特殊,它不會(huì)保存提交的任務(wù),而是將直接新建一個(gè)線程來(lái)執(zhí)行新來(lái)的任務(wù)。
5、任務(wù)拒絕策略
? ? ? ?當(dāng)線程池的任務(wù)緩存隊(duì)列已滿(mǎn)并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來(lái)就會(huì)采取任務(wù)拒絕策略,通常有以下四種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程) ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
6、線程池的關(guān)閉
ThreadPoolExecutor提供了兩個(gè)方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:
- shutdown():不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù)
- shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)
7、線程池容量的動(dòng)態(tài)調(diào)整
ThreadPoolExecutor提供了動(dòng)態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize()
- setCorePoolSize:設(shè)置核心池大小
- setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小
? ? ? ?當(dāng)上述參數(shù)從小變大時(shí),ThreadPoolExecutor進(jìn)行線程賦值,還可能立即創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。
三、使用示例
? ? ? ?前面我們討論了關(guān)于線程池的實(shí)現(xiàn)原理,這一節(jié)我們來(lái)看一下它的具體使用:
public class Test {public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(5));for(int i=0;i<15;i++){MyTask myTask = new MyTask(i);executor.execute(myTask);System.out.println("線程池中線程數(shù)目:"+executor.getPoolSize()+",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:"+executor.getQueue().size()+",已執(zhí)行玩別的任務(wù)數(shù)目:"+executor.getCompletedTaskCount());}executor.shutdown();} }class MyTask implements Runnable {private int taskNum;public MyTask(int num) {this.taskNum = num;}@Overridepublic void run() {System.out.println("正在執(zhí)行task "+taskNum);try {Thread.currentThread().sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("task "+taskNum+"執(zhí)行完畢");} }執(zhí)行結(jié)果:
正在執(zhí)行task 0 線程池中線程數(shù)目:1,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:2,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 1 線程池中線程數(shù)目:3,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 2 線程池中線程數(shù)目:4,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 3 線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 4 線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:1,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:2,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:3,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:4,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 線程池中線程數(shù)目:6,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 10 線程池中線程數(shù)目:7,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 11 線程池中線程數(shù)目:8,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 12 線程池中線程數(shù)目:9,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 13 線程池中線程數(shù)目:10,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0 正在執(zhí)行task 14 task 3執(zhí)行完畢 task 0執(zhí)行完畢 task 2執(zhí)行完畢 task 1執(zhí)行完畢 正在執(zhí)行task 8 正在執(zhí)行task 7 正在執(zhí)行task 6 正在執(zhí)行task 5 task 4執(zhí)行完畢 task 10執(zhí)行完畢 task 11執(zhí)行完畢 task 13執(zhí)行完畢 task 12執(zhí)行完畢 正在執(zhí)行task 9 task 14執(zhí)行完畢 task 8執(zhí)行完畢 task 5執(zhí)行完畢 task 7執(zhí)行完畢 task 6執(zhí)行完畢 task 9執(zhí)行完畢
? ? ? ?從執(zhí)行結(jié)果可以看出,當(dāng)線程池中線程的數(shù)目大于5時(shí),便將任務(wù)放入任務(wù)緩存隊(duì)列里面,當(dāng)任務(wù)緩存隊(duì)列滿(mǎn)了之后,便創(chuàng)建新的線程。如果上面程序中,將for循環(huán)中改成執(zhí)行20個(gè)任務(wù),就會(huì)拋出任務(wù)拒絕異常了。
? ? ? ?不過(guò)在java doc中,并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類(lèi)中提供的幾個(gè)靜態(tài)方法來(lái)創(chuàng)建線程池:
Executors.newCachedThreadPool(); //創(chuàng)建一個(gè)緩沖池,緩沖池容量大小為Integer.MAX_VALUE Executors.newSingleThreadExecutor(); //創(chuàng)建容量為1的緩沖池 Executors.newFixedThreadPool(int); //創(chuàng)建固定容量大小的緩沖池 Executors.newScheduledThreadPool(int) //創(chuàng)建一個(gè)定長(zhǎng)的線程池,而且支持定時(shí)的以及周期性的任務(wù)執(zhí)行,支持定時(shí)及周期性任務(wù)執(zhí)行。
下面是這四個(gè)靜態(tài)方法的具體實(shí)現(xiàn);
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); }public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize); }
? ? ? ?從它們的具體實(shí)現(xiàn)來(lái)看,它們實(shí)際上也是調(diào)用了ThreadPoolExecutor,只不過(guò)參數(shù)都已配置好了。
? ? ? ?1、newFixedThreadPool創(chuàng)建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
? ? ? ?2、newSingleThreadExecutor將corePoolSize和maximumPoolSize都設(shè)置為1,也使用的LinkedBlockingQueue;
? ? ? ?3、newCachedThreadPool將corePoolSize設(shè)置為0,將maximumPoolSize設(shè)置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說(shuō)來(lái)了任務(wù)就創(chuàng)建線程運(yùn)行,當(dāng)線程空閑超過(guò)60秒,就銷(xiāo)毀線程。
? ? ? ?4、newScheduledThreadPool實(shí)例化了一個(gè)ScheduledThreadPoolExecutor對(duì)象,該類(lèi)繼承了ThreadPoolExecutor,其中maximumPoolSize設(shè)置為Integer.MAX_VALUE,使用的是DelayedWorkQueue。
? ? ? ?實(shí)際中,如果Executors提供的四個(gè)靜態(tài)方法能滿(mǎn)足要求,就盡量使用它提供的四個(gè)方法,因?yàn)樽约喝ナ謩?dòng)配置ThreadPoolExecutor的參數(shù)有點(diǎn)麻煩,要根據(jù)實(shí)際任務(wù)的類(lèi)型和數(shù)量來(lái)進(jìn)行配置。
? ? ? ?另外,如果ThreadPoolExecutor達(dá)不到要求,可以自己繼承ThreadPoolExecutor類(lèi)進(jìn)行重寫(xiě)。
1、newFixedThreadPool
? ? ? ?創(chuàng)建一個(gè)指定工作線程數(shù)量的線程池。每當(dāng)提交一個(gè)任務(wù)就創(chuàng)建一個(gè)工作線程,如果工作線程數(shù)量達(dá)到線程池初始的最大數(shù),則將提交的任務(wù)存入到池隊(duì)列中。
? ? ? ?FixedThreadPool是一個(gè)典型且優(yōu)秀的線程池,它具有線程池提高程序效率和節(jié)省創(chuàng)建線程時(shí)所耗的開(kāi)銷(xiāo)的優(yōu)點(diǎn)。但是,在線程池空閑時(shí),即線程池中沒(méi)有可運(yùn)行任務(wù)時(shí),它不會(huì)釋放工作線程,還會(huì)占用一定的系統(tǒng)資源。
示例代碼如下:
public class ThreadPoolTest {public static void main(String[] args) {ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);for (int i = 0; i < 10; i++) {final int index = i;fixedThreadPool.execute(new Runnable() {public void run() {try {System.out.println(index);Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}});}} } ? ? ? ?因?yàn)榫€程池大小為3 ,每個(gè)任務(wù)輸出 index 后 sleep 2 秒,所以每?jī)擅氪蛴?/span> 3 個(gè)數(shù)字。
? ? ? ?定長(zhǎng)線程池的大小最好根據(jù)系統(tǒng)資源進(jìn)行設(shè)置如Runtime.getRuntime().availableProcessors()。
2、newSingleThreadExecutor
? ? ? ?創(chuàng)建一個(gè)單線程化的Executor,即只創(chuàng)建唯一的工作者線程來(lái)執(zhí)行任務(wù),它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO,?優(yōu)先級(jí))執(zhí)行。如果這個(gè)線程異常結(jié)束,會(huì)有另一個(gè)取代它,保證順序執(zhí)行。單工作線程最大的特點(diǎn)是可保證順序地執(zhí)行各個(gè)任務(wù),并且在任意給定的時(shí)間不會(huì)有多個(gè)線程是活動(dòng)的。
示例代碼如下:
public class ThreadPoolTest {public static void main(String[] args) {ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();for (int i = 0; i < 10; i++) {final int index = i;singleThreadExecutor.execute(new Runnable() {public void run() {try {System.out.println(index);Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}});}} }3、newCachedThreadPool
? ? ? ?創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過(guò)處理需要,可靈活回收空閑線程,若無(wú)可回收,則新建線程。
這種類(lèi)型的線程池特點(diǎn)是:
- 工作線程的創(chuàng)建數(shù)量幾乎沒(méi)有限制(其實(shí)也有限制的,數(shù)目為Interger. MAX_VALUE),?這樣可靈活的往線程池中添加線程。
- 如果長(zhǎng)時(shí)間沒(méi)有往線程池中提交任務(wù),即如果工作線程空閑了指定的時(shí)間(默認(rèn)為1分鐘),則該工作線程將自動(dòng)終止。終止后,如果你又提交了新的任務(wù),則線程池重新創(chuàng)建一個(gè)工作線程。
- 在使用CachedThreadPool時(shí),一定要注意控制任務(wù)的數(shù)量,否則,由于大量線程同時(shí)運(yùn)行,很有會(huì)造成系統(tǒng)癱瘓。
示例代碼如下:
public class ThreadPoolTest {public static void main(String[] args) {ExecutorService cachedThreadPool = Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {final int index = i;try {Thread.sleep(index * 1000);} catch (InterruptedException e) {e.printStackTrace();}cachedThreadPool.execute(new Runnable() {public void run() {System.out.println(index);}});}} }
? ? ? ?創(chuàng)建一個(gè)定長(zhǎng)的線程池,而且支持定時(shí)的以及周期性的任務(wù)執(zhí)行,支持定時(shí)及周期性任務(wù)執(zhí)行。
延遲3秒執(zhí)行,延遲執(zhí)行示例代碼如下:
public class ThreadPoolTest {public static void main(String[] args) {ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);scheduledThreadPool.schedule(new Runnable() {public void run() {System.out.println("delay 3 seconds");}}, 3, TimeUnit.SECONDS);} } 表示延遲1 秒后每 3 秒執(zhí)行一次, 定期執(zhí)行示例代碼如下:
public class ThreadPoolTest {public static void main(String[] args) {ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);scheduledThreadPool.scheduleAtFixedRate(new Runnable() {public void run() {System.out.println("delay 1 seconds, and excute every 3 seconds");}}, 1, 3, TimeUnit.SECONDS);} }
四、為什么使用線程池
? ? ? ?諸如?Web?服務(wù)器、數(shù)據(jù)庫(kù)服務(wù)器、文件服務(wù)器或郵件服務(wù)器之類(lèi)的許多服務(wù)器應(yīng)用程序都面向處理來(lái)自某些遠(yuǎn)程來(lái)源的大量短小的任務(wù)。請(qǐng)求以某種方式到達(dá)服務(wù)器,這種方式可能是通過(guò)網(wǎng)絡(luò)協(xié)議(例如?HTTP、FTP?或?POP)、通過(guò)?JMS?隊(duì)列或者可能通過(guò)輪詢(xún)數(shù)據(jù)庫(kù)。不管請(qǐng)求如何到達(dá),服務(wù)器應(yīng)用程序中經(jīng)常出現(xiàn)的情況是:單個(gè)任務(wù)處理的時(shí)間很短而請(qǐng)求的數(shù)目卻是巨大的。
? ? ? ?構(gòu)建服務(wù)器應(yīng)用程序的一個(gè)簡(jiǎn)單模型是:每當(dāng)一個(gè)請(qǐng)求到達(dá)就創(chuàng)建一個(gè)新線程,然后在新線程中為請(qǐng)求服務(wù)。實(shí)際上對(duì)于原型開(kāi)發(fā)這種方法工作得很好,但如果試圖部署以這種方式運(yùn)行的服務(wù)器應(yīng)用程序,那么這種方法的嚴(yán)重不足就很明顯。每個(gè)請(qǐng)求對(duì)應(yīng)一個(gè)線程(thread-per-request)方法的不足之一是:為每個(gè)請(qǐng)求創(chuàng)建一個(gè)新線程的開(kāi)銷(xiāo)很大;為每個(gè)請(qǐng)求創(chuàng)建新線程的服務(wù)器在創(chuàng)建和銷(xiāo)毀線程上花費(fèi)的時(shí)間和消耗的系統(tǒng)資源要比花在處理實(shí)際的用戶(hù)請(qǐng)求的時(shí)間和資源更多。
? ? ? ?除了創(chuàng)建和銷(xiāo)毀線程的開(kāi)銷(xiāo)之外,活動(dòng)的線程也消耗系統(tǒng)資源。在一個(gè)?JVM?里創(chuàng)建太多的線程可能會(huì)導(dǎo)致系統(tǒng)由于過(guò)度消耗內(nèi)存而用完內(nèi)存或“切換過(guò)度”。為了防止資源不足,服務(wù)器應(yīng)用程序需要一些辦法來(lái)限制任何給定時(shí)刻處理的請(qǐng)求數(shù)目。
? ? ? ?線程池為線程生命周期開(kāi)銷(xiāo)問(wèn)題和資源不足問(wèn)題提供了解決方案。通過(guò)對(duì)多個(gè)任務(wù)重用線程,線程創(chuàng)建的開(kāi)銷(xiāo)被分?jǐn)偟搅硕鄠€(gè)任務(wù)上。其好處是,因?yàn)樵谡?qǐng)求到達(dá)時(shí)線程已經(jīng)存在,所以無(wú)意中也消除了線程創(chuàng)建所帶來(lái)的延遲。這樣,就可以立即為請(qǐng)求服務(wù),使應(yīng)用程序響應(yīng)更快。而且,通過(guò)適當(dāng)?shù)卣{(diào)整線程池中的線程數(shù)目,也就是當(dāng)請(qǐng)求的數(shù)目超過(guò)某個(gè)閾值時(shí),就強(qiáng)制其它任何新到的請(qǐng)求一直等待,直到獲得一個(gè)線程來(lái)處理為止,從而可以防止資源不足。
五、使用線程池的風(fēng)險(xiǎn)
? ? ? ?雖然線程池是構(gòu)建多線程應(yīng)用程序的強(qiáng)大機(jī)制,但使用它并不是沒(méi)有風(fēng)險(xiǎn)的。用線程池構(gòu)建的應(yīng)用程序容易遭受任何其它多線程應(yīng)用程序容易遭受的所有并發(fā)風(fēng)險(xiǎn),諸如同步錯(cuò)誤和死鎖,它還容易遭受特定于線程池的少數(shù)其它風(fēng)險(xiǎn),諸如與池有關(guān)的死鎖、資源不足和線程泄漏。
1、死鎖
? ? ? ?任何多線程應(yīng)用程序都有死鎖風(fēng)險(xiǎn)。當(dāng)一組進(jìn)程或線程中的每一個(gè)都在等待一個(gè)只有該組中另一個(gè)進(jìn)程才能引起的事件時(shí),我們就說(shuō)這組進(jìn)程或線程?死鎖了。死鎖的最簡(jiǎn)單情形是:線程?A?持有對(duì)象?X?的獨(dú)占鎖,并且在等待對(duì)象?Y?的鎖,而線程?B?持有對(duì)象?Y?的獨(dú)占鎖,卻在等待對(duì)象?X?的鎖。除非有某種方法來(lái)打破對(duì)鎖的等待(Java?鎖定不支持這種方法),否則死鎖的線程將永遠(yuǎn)等下去。
? ? ? ?雖然任何多線程程序中都有死鎖的風(fēng)險(xiǎn),但線程池卻引入了另一種死鎖可能,在那種情況下,所有池線程都在執(zhí)行已阻塞的等待隊(duì)列中另一任務(wù)的執(zhí)行結(jié)果的任務(wù),但這一任務(wù)卻因?yàn)闆](méi)有未被占用的線程而不能運(yùn)行。當(dāng)線程池被用來(lái)實(shí)現(xiàn)涉及許多交互對(duì)象的模擬,被模擬的對(duì)象可以相互發(fā)送查詢(xún),這些查詢(xún)接下來(lái)作為排隊(duì)的任務(wù)執(zhí)行,查詢(xún)對(duì)象又同步等待著響應(yīng)時(shí),會(huì)發(fā)生這種情況。
2、資源不足
? ? ? ?線程池的一個(gè)優(yōu)點(diǎn)在于:相對(duì)于其它替代調(diào)度機(jī)制(有些我們已經(jīng)討論過(guò))而言,它們通常執(zhí)行得很好。但只有恰當(dāng)?shù)卣{(diào)整了線程池大小時(shí)才是這樣的。線程消耗包括內(nèi)存和其它系統(tǒng)資源在內(nèi)的大量資源。除了?Thread?對(duì)象所需的內(nèi)存之外,每個(gè)線程都需要兩個(gè)可能很大的執(zhí)行調(diào)用堆棧。除此以外,JVM?可能會(huì)為每個(gè)?Java?線程創(chuàng)建一個(gè)本機(jī)線程,這些本機(jī)線程將消耗額外的系統(tǒng)資源。最后,雖然線程之間切換的調(diào)度開(kāi)銷(xiāo)很小,但如果有很多線程,環(huán)境切換也可能?chē)?yán)重地影響程序的性能。
? ? ? ?如果線程池太大,那么被那些線程消耗的資源可能?chē)?yán)重地影響系統(tǒng)性能。在線程之間進(jìn)行切換將會(huì)浪費(fèi)時(shí)間,而且使用超出比您實(shí)際需要的線程可能會(huì)引起資源匱乏問(wèn)題,因?yàn)槌鼐€程正在消耗一些資源,而這些資源可能會(huì)被其它任務(wù)更有效地利用。除了線程自身所使用的資源以外,服務(wù)請(qǐng)求時(shí)所做的工作可能需要其它資源,例如JDBC?連接、套接字或文件。這些也都是有限資源,有太多的并發(fā)請(qǐng)求也可能引起失效,例如不能分配?JDBC?連接。
3、并發(fā)錯(cuò)誤
? ? ? ?線程池和其它排隊(duì)機(jī)制依靠使用?wait()?和?notify()?方法,這兩個(gè)方法都難于使用。如果編碼不正確,那么可能丟失通知,導(dǎo)致線程保持空閑狀態(tài),盡管隊(duì)列中有工作要處理。使用這些方法時(shí),必須格外小心。而最好使用現(xiàn)有的、已經(jīng)知道能工作的實(shí)現(xiàn),例如?util.concurrent?包。
4、線程泄漏
? ? ? ?各種類(lèi)型的線程池中一個(gè)嚴(yán)重的風(fēng)險(xiǎn)是線程泄漏,當(dāng)從池中除去一個(gè)線程以執(zhí)行一項(xiàng)任務(wù),而在任務(wù)完成后該線程卻沒(méi)有返回池時(shí),會(huì)發(fā)生這種情況。發(fā)生線程泄漏的一種情形出現(xiàn)在任務(wù)拋出一個(gè)?RuntimeException?或一個(gè)?Error?時(shí)。如果池類(lèi)沒(méi)有捕捉到它們,那么線程只會(huì)退出而線程池的大小將會(huì)永久減少一個(gè)。當(dāng)這種情況發(fā)生的次數(shù)足夠多時(shí),線程池最終就為空,而且系統(tǒng)將停止,因?yàn)闆](méi)有可用的線程來(lái)處理任務(wù)。
? ? ? ?有些任務(wù)可能會(huì)永遠(yuǎn)等待某些資源或來(lái)自用戶(hù)的輸入,而這些資源又不能保證變得可用,用戶(hù)可能也已經(jīng)回家了,諸如此類(lèi)的任務(wù)會(huì)永久停止,而這些停止的任務(wù)也會(huì)引起和線程泄漏同樣的問(wèn)題。如果某個(gè)線程被這樣一個(gè)任務(wù)永久地消耗著,那么它實(shí)際上就被從池除去了。對(duì)于這樣的任務(wù),應(yīng)該要么只給予它們自己的線程,要么只讓它們等待有限的時(shí)間。
5、請(qǐng)求過(guò)載
? ? ? ?僅僅是請(qǐng)求就壓垮了服務(wù)器,這種情況是可能的。在這種情形下,我們可能不想將每個(gè)到來(lái)的請(qǐng)求都排隊(duì)到我們的工作隊(duì)列,因?yàn)榕旁陉?duì)列中等待執(zhí)行的任務(wù)可能會(huì)消耗太多的系統(tǒng)資源并引起資源缺乏。在這種情形下決定如何做取決于您自己;在某些情況下,您可以簡(jiǎn)單地拋棄請(qǐng)求,依靠更高級(jí)別的協(xié)議稍后重試請(qǐng)求,您也可以用一個(gè)指出服務(wù)器暫時(shí)很忙的響應(yīng)來(lái)拒絕請(qǐng)求。
六、有效使用線程池的準(zhǔn)則
只要您遵循幾條簡(jiǎn)單的準(zhǔn)則,線程池可以成為構(gòu)建服務(wù)器應(yīng)用程序的極其有效的方法:
- 不要對(duì)那些同步等待其它任務(wù)結(jié)果的任務(wù)排隊(duì)。這可能會(huì)導(dǎo)致上面所描述的那種形式的死鎖,在那種死鎖中,所有線程都被一些任務(wù)所占用,這些任務(wù)依次等待排隊(duì)任務(wù)的結(jié)果,而這些任務(wù)又無(wú)法執(zhí)行,因?yàn)樗械木€程都很忙。
- 在為時(shí)間可能很長(zhǎng)的操作使用合用的線程時(shí)要小心。如果程序必須等待諸如?I/O?完成這樣的某個(gè)資源,那么請(qǐng)指定最長(zhǎng)的等待時(shí)間,以及隨后是失效還是將任務(wù)重新排隊(duì)以便稍后執(zhí)行。這樣做保證了:通過(guò)將某個(gè)線程釋放給某個(gè)可能成功完成的任務(wù),從而將最終取得某些進(jìn)展。
- 理解任務(wù)。要有效地調(diào)整線程池大小,您需要理解正在排隊(duì)的任務(wù)以及它們正在做什么。它們是?CPU?限制的(CPU-bound)嗎?它們是?I/O?限制的(I/O-bound)嗎?您的答案將影響您如何調(diào)整應(yīng)用程序。如果您有不同的任務(wù)類(lèi),這些類(lèi)有著截然不同的特征,那么為不同任務(wù)類(lèi)設(shè)置多個(gè)工作隊(duì)列可能會(huì)有意義,這樣可以相應(yīng)地調(diào)整每個(gè)池。
七、線程池的大小設(shè)置
? ? ? ?本節(jié)來(lái)討論一個(gè)比較重要的話題:如何合理配置線程池大小,僅供參考。
? ? ? ?一般需要根據(jù)任務(wù)的類(lèi)型來(lái)配置線程池大小:
? ? ? ?1、如果是CPU密集型任務(wù),就需要盡量壓榨CPU,參考值可以設(shè)為?NCPU+1
? ? ? ?2、如果是IO密集型任務(wù),參考值可以設(shè)置為2*NCPU
? ? ? ?當(dāng)然,這只是一個(gè)參考值,具體的設(shè)置還需要根據(jù)實(shí)際情況進(jìn)行調(diào)整,比如可以先將線程池大小設(shè)置為參考值,再觀察任務(wù)運(yùn)行情況和系統(tǒng)負(fù)載、資源利用率來(lái)進(jìn)行適當(dāng)調(diào)整。
? ? ? ?調(diào)整線程池的大小基本上就是避免兩類(lèi)錯(cuò)誤:線程太少或線程太多。幸運(yùn)的是,對(duì)于大多數(shù)應(yīng)用程序來(lái)說(shuō),太多和太少之間的余地相當(dāng)寬。
? ? ? ?請(qǐng)回憶:在應(yīng)用程序中使用線程有兩個(gè)主要優(yōu)點(diǎn),盡管在等待諸如?I/O?的慢操作,但允許繼續(xù)進(jìn)行處理,并且可以利用多處理器。在運(yùn)行于具有?N?個(gè)處理器機(jī)器上的計(jì)算限制的應(yīng)用程序中,在線程數(shù)目接近?N?時(shí)添加額外的線程可能會(huì)改善總處理能力,而在線程數(shù)目超過(guò)?N?時(shí)添加額外的線程將不起作用。事實(shí)上,太多的線程甚至?xí)档托阅?#xff0c;因?yàn)樗鼤?huì)導(dǎo)致額外的環(huán)境切換開(kāi)銷(xiāo)。
? ? ? ?線程池的最佳大小取決于可用處理器的數(shù)目以及工作隊(duì)列中的任務(wù)的性質(zhì)。若在一個(gè)具有?N?個(gè)處理器的系統(tǒng)上只有一個(gè)工作隊(duì)列,其中全部是計(jì)算性質(zhì)的任務(wù),在線程池具有?N?或?N+1?個(gè)線程時(shí)一般會(huì)獲得最大的?CPU?利用率。
? ? ? ?對(duì)于那些可能需要等待?I/O?完成的任務(wù)(例如,從套接字讀取?HTTP?請(qǐng)求的任務(wù)),需要讓池的大小超過(guò)可用處理器的數(shù)目,因?yàn)椴⒉皇撬芯€程都一直在工作。通過(guò)使用概要分析,您可以估計(jì)某個(gè)典型請(qǐng)求的等待時(shí)間(WT)與服務(wù)時(shí)間(ST)之間的比例。如果我們將這一比例稱(chēng)之為?WT/ST,那么對(duì)于一個(gè)具有?N?個(gè)處理器的系統(tǒng),需要設(shè)置大約?N*(1+WT/ST)?個(gè)線程來(lái)保持處理器得到充分利用。
? ? ? ?處理器利用率不是調(diào)整線程池大小過(guò)程中的唯一考慮事項(xiàng)。隨著線程池的增長(zhǎng),您可能會(huì)碰到調(diào)度程序、可用內(nèi)存方面的限制,或者其它系統(tǒng)資源方面的限制,例如套接字、打開(kāi)的文件句柄或數(shù)據(jù)庫(kù)連接等的數(shù)目。
總結(jié)
以上是生活随笔為你收集整理的Java基础——深入理解Java线程池的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 起步前要做哪些准备?
- 下一篇: 《生存之旅》逼真女巫COS赏 无限恐怖吓