线程池java.util.concurrent.ThreadPoolExecutor总结
http://uule.iteye.com/blog/1123185
線程池還具有提高系統(tǒng)性能的優(yōu)點(diǎn),因?yàn)閯?chuàng)建線程和清除線程的開銷比較大。?
有兩種不同類型的線程池:一是固定線程數(shù)量的線程池;二是可變數(shù)量的線程池。?
?
對(duì)于固定數(shù)量的線程池,可以使用Executors的靜態(tài)方法 newFixedThreadPool 來(lái)創(chuàng)建 ExecutorService;或者利用 newSingleThreadPool來(lái)創(chuàng)建。?
? ? ? 而 ExecutorService 實(shí)現(xiàn)了 Executor 接口,這個(gè)接口中有一個(gè)方法:Execute(Runnable command),也就是執(zhí)行線程。?
? ? ? 對(duì)于固定數(shù)量的線程池而言,如果需要執(zhí)行的線程數(shù)量多于構(gòu)造的數(shù)量,那么只能并發(fā)構(gòu)造時(shí)的數(shù)量,剩下的線程就進(jìn)入線程池的等待隊(duì)列。?
? ? ? 如果不需要使用該線程池了,則使用 ExecutorService 中的 shutDown 方法,此時(shí),該線程池就不會(huì)接受執(zhí)行新的線程任務(wù)了。
?
對(duì)于可變數(shù)量的線程池,可用Executors的靜態(tài)方法 newCachedThreadPool 來(lái)創(chuàng)建 ExecutorService,該線程池的大小是不定的,當(dāng)執(zhí)行任務(wù)時(shí),會(huì)先選取緩存中的空閑線程來(lái)執(zhí)行,如果沒(méi)有空閑線程,則創(chuàng)建一個(gè)新的線程,而如果空閑線程的空閑狀態(tài)超過(guò)60秒,則線程池刪除該線程。
?
還有一種線程池:延遲線程池?
該線程池的創(chuàng)建有兩個(gè)方法: Executors.newScheduledThreadPool(int corePoolSize);
??????????????????????????????????????? Executors.newSingleScheduledExecutor();?
創(chuàng)建之后,會(huì)獲得一個(gè) ScheduledExecutorService。?
該對(duì)象的一個(gè)重要的方法就是: schedule(Runnable command, long delay, TimeUnit unit)
?該方法返回了一個(gè) ScheduledFuture。
?
?
?
JDK1.5中加入了許多對(duì)并發(fā)特性的支持,例如:線程池。
一、簡(jiǎn)介
線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構(gòu)造方法為:
?
corePoolSize:??????? 線程池維護(hù)線程的最少數(shù)量 (core : 核心)
maximumPoolSize:線程池維護(hù)線程的最大數(shù)量?
keepAliveTime:???? 線程池維護(hù)線程所允許的空閑時(shí)間
unit:?????????? 線程池維護(hù)線程所允許的空閑時(shí)間的單位
workQueue: 線程池所使用的緩沖隊(duì)列
handler:????? 線程池對(duì)拒絕任務(wù)的處理策略
一個(gè)任務(wù)通過(guò) execute(Runnable)方法被添加到線程池,任務(wù)就是一個(gè) Runnable類型的對(duì)象,任務(wù)的執(zhí)行方法就是 Runnable類型對(duì)象的run()方法。?
當(dāng)一個(gè)任務(wù)通過(guò)execute(Runnable)方法欲添加到線程池時(shí):
如果線程池中運(yùn)行的線程?小于corePoolSize?,即使線程池中的線程都處于空閑狀態(tài),也要?創(chuàng)建新的線程?來(lái)處理被添加的任務(wù)。
如果線程池中運(yùn)行的線程大于等于corePoolSize,但是緩沖隊(duì)列 workQueue未滿?,那么任務(wù)被放入緩沖隊(duì)列?。
如果此時(shí)線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿(即無(wú)法將請(qǐng)求加入隊(duì)列?),并且線程池中的數(shù)量小于maximumPoolSize,建新的線程?來(lái)處理被添加的任務(wù)。
如果此時(shí)線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量等于maximumPoolSize?,那么通過(guò)?handler?所指定的策略來(lái)處理此任務(wù)。
當(dāng)線程池中的線程數(shù)量大于 corePoolSize時(shí),如果某線程空閑時(shí)間超過(guò)keepAliveTime,線程將被終止?。這樣,線程池可以動(dòng)態(tài)的調(diào)整池中的線程數(shù)。
也就是:處理任務(wù)的優(yōu)先級(jí)為:
corePoolSize、任務(wù)隊(duì)列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務(wù)。?
當(dāng)然,如果用的是無(wú)界的緩沖隊(duì)列,那么當(dāng)線程等于corePoolSIze,小于maximumPoolSize,任務(wù)就會(huì)不停的添加到隊(duì)列中,也不會(huì)創(chuàng)建新線程,也不會(huì)丟棄。
Java代碼??一個(gè)例子:?
Java代碼?? 說(shuō)明:
1、在這段程序中,一個(gè)任務(wù)就是一個(gè)Runnable類型的對(duì)象,也就是一個(gè)ThreadPoolTask類型的對(duì)象。
2、一般來(lái)說(shuō)任務(wù)除了處理方式外,還需要處理的數(shù)據(jù),處理的數(shù)據(jù)通過(guò)構(gòu)造方法傳給任務(wù)。
3、在這段程序中,main()方法相當(dāng)于一個(gè)殘忍的領(lǐng)導(dǎo),他派發(fā)出許多任務(wù),丟給一個(gè)叫 threadPool的任勞任怨的小組來(lái)做。
4、這個(gè)小組里面隊(duì)員至少有兩個(gè),如果他們兩個(gè)忙不過(guò)來(lái),任務(wù)就被放到任務(wù)列表里面。
如果積壓的任務(wù)過(guò)多,多到任務(wù)列表都裝不下(超過(guò)3個(gè))的時(shí)候,就雇傭新的隊(duì)員來(lái)幫忙。但是基于成本的考慮,不能雇傭太多的隊(duì)員,至多只能雇傭 4個(gè)。
5、如果四個(gè)隊(duì)員都在忙時(shí),再有新的任務(wù),這個(gè)小組就處理不了了,任務(wù)就會(huì)被通過(guò)一種策略來(lái)處理,我們的處理方式是不停的派發(fā),直到接受這個(gè)任務(wù)為止(更殘忍!呵呵)。
因?yàn)殛?duì)員工作是需要成本的,如果工作很閑,閑到 3 秒都沒(méi)有新的任務(wù)了,那么有的隊(duì)員就會(huì)被解雇了,但是,為了小組的正常運(yùn)轉(zhuǎn),即使工作再閑,小組的隊(duì)員也不能少于兩個(gè)。
本例來(lái)源:http://blog.csdn.net/senton/article/details/3528720
?
Java里面線程池的頂級(jí)接口是Executor,但是嚴(yán)格意義上講Executor并不是一個(gè)線程池,而只是一個(gè)執(zhí)行線程的工具。真正的線程池接口是ExecutorService。
線程池的類體系結(jié)構(gòu):?
ExecutorService: ??? ??? 真正的線程池接口。
ScheduledExecutorService ??? 能和Timer/TimerTask類似,解決那些需要任務(wù)重復(fù)執(zhí)行的問(wèn)題。
ThreadPoolExecutor ??? ??? ExecutorService的默認(rèn)實(shí)現(xiàn)。
ScheduledThreadPoolExecutor ??? 繼承ThreadPoolExecutor的ScheduledExecutorService接口實(shí)現(xiàn),周期性任務(wù)調(diào)度的類實(shí)現(xiàn)。
?
Executors 工廠方法:?
ThreadPoolExecutor是Executors類的底層實(shí)現(xiàn):
ExecutorService newFixedThreadPool(int nThreads) 固定大小線程池
Java代碼???corePoolSize和maximumPoolSize的大小是一樣的(實(shí)際上,如果使用無(wú)界queue的話maximumPoolSize參數(shù)是沒(méi)有意義的),BlockingQueue選擇了LinkedBlockingQueue,該queue有一個(gè)特點(diǎn),他是無(wú)界的。
一個(gè)例子:
Java代碼???
ExecutorService newSingleThreadExecutor() 單線程
Java代碼???最大和最小都為1
?
ExecutorService newCachedThreadPool() 無(wú)界線程池
Java代碼?? 這個(gè)實(shí)現(xiàn)就有意思了。首先是無(wú)界的線程池,所以我們可以發(fā)現(xiàn)maximumPoolSize為big big。其次BlockingQueue的選擇上使用?SynchronousQueue
。可能對(duì)于該BlockingQueue有些陌生,簡(jiǎn)單說(shuō):該QUEUE中,?每個(gè)插入操作必須等待另一個(gè)
線程的對(duì)應(yīng)移除操作。?比如,我先添加一個(gè)元素,接下來(lái)如果繼續(xù)想嘗試添加則會(huì)阻塞,直到另一個(gè)線程取走一個(gè)元素,反之亦然。(想到什么?就是緩沖區(qū)為1的生產(chǎn)者消費(fèi)者模式^_^)
?
以下為重要分析:
到此如果有很多疑問(wèn),那是必然了(除非你也很了解了)
?
先從BlockingQueue?<Runnable?>?workQueue這個(gè)入?yún)㈤_始說(shuō)起。在JDK中,其實(shí)已經(jīng)說(shuō)得很清楚了,一共有三種類型的queue。以下為引用:(我會(huì)稍微修改一下,并用紅色突出顯示)
??
- 如果運(yùn)行的線程少于 corePoolSize,則 Executor 始終首選添加新的線程,而不進(jìn)行排隊(duì)。(什么意思?如果當(dāng)前運(yùn)行的線程小于corePoolSize?,則任務(wù)根本不會(huì)存放,添加到queue中?,而是直接?抄家伙(thread)開始運(yùn)行?)
- 如果運(yùn)行的線程等于或多于 corePoolSize,則 Executor 始終首選將請(qǐng)求加入隊(duì)列?,而不添加新的線程?。
- 如果無(wú)法將請(qǐng)求加入隊(duì)列,則創(chuàng)建新的線程?,除非創(chuàng)建此線程超出 maximumPoolSize,在這種情況下,任務(wù)將被拒絕。
排隊(duì)有三種通用策略:
?
到這里,該了解的理論已經(jīng)夠多了,可以調(diào)節(jié)的就是corePoolSize和maximumPoolSizes 這對(duì)參數(shù)還有就是BlockingQueue的選擇。
?
例子一:使用直接提交策略,也即SynchronousQueue。
?
首先SynchronousQueue是無(wú)界的,也就是說(shuō)他存數(shù)任務(wù)的能力是沒(méi)有限制的,但是由于該Queue本身的特性?,在某次添加元素后必須等待其他線程取走后才能繼續(xù)添加?。在這里不是核心線程便是新創(chuàng)建的線程,但是我們?cè)囅胍粯酉?#xff0c;下面的場(chǎng)景。
?
我們使用一下參數(shù)構(gòu)造ThreadPoolExecutor:
Java代碼???當(dāng)核心線程已經(jīng)有2個(gè)正在運(yùn)行.
?
什么意思?如果你的任務(wù)A1,A2有內(nèi)部關(guān)聯(lián),A1需要先運(yùn)行,那么先提交A1,再提交A2,當(dāng)使用SynchronousQueue我們可以保證,A1必定先被執(zhí)行,在A1么有被執(zhí)行前,A2不可能添加入queue中
例子二:使用無(wú)界隊(duì)列策略,即LinkedBlockingQueue
這個(gè)就拿newFixedThreadPool?來(lái)說(shuō),根據(jù)前文提到的規(guī)則: ?寫道 如果運(yùn)行的線程少于 corePoolSize,則 Executor 始終首選添加新的線程,而不進(jìn)行排隊(duì)。 ?那么當(dāng)任務(wù)繼續(xù)增加,會(huì)發(fā)生什么呢? ?寫道 ? 如果運(yùn)行的線程等于或多于 corePoolSize,則 Executor 始終首選將請(qǐng)求加入隊(duì)列,而不添加新的線程。
?OK,此時(shí)任務(wù)變加入隊(duì)列之中了,那什么時(shí)候才會(huì)添加新線程呢?
?
?寫道 如果無(wú)法將請(qǐng)求加入隊(duì)列,則創(chuàng)建新的線程,除非創(chuàng)建此線程超出 maximumPoolSize,在這種情況下,任務(wù)將被拒絕。這里就很有意思了,可能會(huì)出現(xiàn)無(wú)法加入隊(duì)列嗎?不像SynchronousQueue那樣有其自身的特點(diǎn),對(duì)于無(wú)界隊(duì)列來(lái)說(shuō),總是可以加入的(資源耗盡,當(dāng)然另當(dāng)別論)。換句說(shuō),永遠(yuǎn)也不會(huì)觸發(fā)產(chǎn)生新的線程!?corePoolSize大小的線程數(shù)會(huì)一直運(yùn)行,忙完當(dāng)前的,就從隊(duì)列中拿任務(wù)開始運(yùn)行。所以要防止任務(wù)瘋長(zhǎng),比如任務(wù)運(yùn)行的實(shí)行比較長(zhǎng),而添加任務(wù)的速度遠(yuǎn)遠(yuǎn)超過(guò)處理任務(wù)的時(shí)間,而且還不斷增加,如果任務(wù)內(nèi)存大一些,不一會(huì)兒就爆了,呵呵。
?
可以仔細(xì)想想哈。
?
例子三:有界隊(duì)列,使用ArrayBlockingQueue。
?
這個(gè)是最為復(fù)雜的使用,所以JDK不推薦使用也有些道理。與上面的相比,最大的特點(diǎn)便是可以防止資源耗盡的情況發(fā)生。
?
舉例來(lái)說(shuō),請(qǐng)看如下構(gòu)造方法:
?
Java代碼??假設(shè),所有的任務(wù)都永遠(yuǎn)無(wú)法執(zhí)行完。
?
對(duì)于首先來(lái)的A,B來(lái)說(shuō)直接運(yùn)行,接下來(lái),如果來(lái)了C,D,他們會(huì)被放到queu中,如果接下來(lái)再來(lái)E,F,則增加線程運(yùn)行E,F。但是如果再來(lái)任務(wù),隊(duì)列無(wú)法再接受了,線程數(shù)也到達(dá)最大的限制了,所以就會(huì)使用拒絕策略來(lái)處理。
?
總結(jié):
static class Task1 implements Runnable {int id;public Task1(int id) {this.id = id;}@Overridepublic void run() {try {System.out.println(Thread.currentThread() +" task " + id + " begin");Thread.sleep(5000);} catch (InterruptedException e) {}if (id < 5) {throw new RuntimeException(Thread.currentThread() + " task " +id+ " exception");}}}static ExecutorService pool = Executors.newFixedThreadPool(5);public void test() throws Exception {}public static void main(String[] args) throws Exception{for (int i = 0; i < 10; ++i) {pool.execute(new Task1(i));}ThreadPoolExecutor threads =(ThreadPoolExecutor) pool;for (int i= 0; i < 40; i++) {System.out.println("pool size = " + threads.getPoolSize() + " Largest = " + threads.getLargestPoolSize() + " Max = " + threads.getMaximumPoolSize());Thread.sleep(1000);}pool.shutdown(); // ToyClient client = new ToyClient(); // client.test();}起始線程池里有5個(gè)線程,前五個(gè)線程都拋出異常,這五個(gè)線程因?yàn)楫惓6赖?#xff0c;線程池會(huì)再創(chuàng)建五個(gè)新線程
所以建議用submit提交任務(wù),如果異常在線程中發(fā)生,當(dāng)前線程不會(huì)處理這個(gè)異常,當(dāng)前線程也不會(huì)死掉
如果任務(wù)實(shí)現(xiàn)的是callable接口,會(huì)在future.get();結(jié)果返回時(shí)拋出異常
總結(jié)
以上是生活随笔為你收集整理的线程池java.util.concurrent.ThreadPoolExecutor总结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: java多线程生命周期
- 下一篇: Redis和Memcache对比及选择