java 线程池ThreadPoolExecutor
線程池
線程池的作用:
線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。
根 據(jù)系統(tǒng)的環(huán)境情況,可以自動(dòng)或手動(dòng)設(shè)置線程數(shù)量,達(dá)到運(yùn)行的最佳效果;少了浪費(fèi)了系統(tǒng)資源,多了造成系統(tǒng)擁擠效率不高。用線程池控制線程數(shù)量,其他線程排 隊(duì)等候。一個(gè)任務(wù)執(zhí)行完畢,再?gòu)年?duì)列的中取最前面的任務(wù)開始執(zhí)行。若隊(duì)列中沒有等待進(jìn)程,線程池的這一資源處于等待。當(dāng)一個(gè)新任務(wù)需要運(yùn)行時(shí),如果線程池 中有等待的工作線程,就可以開始運(yùn)行了;否則進(jìn)入等待隊(duì)列。
為什么要用線程池:
1.減少了創(chuàng)建和銷毀線程的次數(shù),每個(gè)工作線程都可以被重復(fù)利用,可執(zhí)行多個(gè)任務(wù)。
2.可以根據(jù)系統(tǒng)的承受能力,調(diào)整線程池中工作線線程的數(shù)目,防止因?yàn)橄倪^多的內(nèi)存,而把服務(wù)器累趴下(每個(gè)線程需要大約1MB內(nèi)存,線程開的越多,消耗的內(nèi)存也就越大,最后死機(jī))。
Java里面線程池的頂級(jí)接口是Executor,但是嚴(yán)格意義上講Executor并不是一個(gè)線程池,而只是一個(gè)執(zhí)行線程的工具。真正的線程池接口是ExecutorService。
比較重要的幾個(gè)類:
ExecutorService: 真正的線程池接口。
ScheduledExecutorService: 能和Timer/TimerTask類似,解決那些需要任務(wù)重復(fù)執(zhí)行的問題。
ThreadPoolExecutor: ExecutorService的默認(rèn)實(shí)現(xiàn)。
ScheduledThreadPoolExecutor: 繼承ThreadPoolExecutor的ScheduledExecutorService接口實(shí)現(xiàn),周期性任務(wù)調(diào)度的類實(shí)現(xiàn)。
要配置一個(gè)線程池是比較復(fù)雜的,尤其是對(duì)于線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優(yōu)的,因此在Executors類里面提供了一些靜態(tài)工廠,生成一些常用的線程池。
Java線程池的參數(shù)
關(guān)于Java線程池的參數(shù)設(shè)置。線程池是Java多線程里開發(fā)里的重要內(nèi)容,使用難度不大,但如何用好就要明白參數(shù)的含義和如何去設(shè)置。
一、ThreadPoolExecutor的重要參數(shù)
? ? 1、corePoolSize:核心線程數(shù)
? ? ? ? * 核心線程會(huì)一直存活,及時(shí)沒有任務(wù)需要執(zhí)行
? ? ? ? * 當(dāng)線程數(shù)小于核心線程數(shù)時(shí),即使有線程空閑,線程池也會(huì)優(yōu)先創(chuàng)建新線程處理
? ? ? ? * 設(shè)置allowCoreThreadTimeout=true(默認(rèn)false)時(shí),核心線程會(huì)超時(shí)關(guān)閉
? ? 2、queueCapacity:任務(wù)隊(duì)列容量(阻塞隊(duì)列)
? ? ? ? * 當(dāng)核心線程數(shù)達(dá)到最大時(shí),新任務(wù)會(huì)放在隊(duì)列中排隊(duì)等待執(zhí)行
? ? 3、maxPoolSize:最大線程數(shù)
? ? ? ? * 當(dāng)線程數(shù)>=corePoolSize,且任務(wù)隊(duì)列已滿時(shí)。線程池會(huì)創(chuàng)建新線程來處理任務(wù)
? ? ? ? * 當(dāng)線程數(shù)=maxPoolSize,且任務(wù)隊(duì)列已滿時(shí),線程池會(huì)拒絕處理任務(wù)而拋出異常
? ? 4、 keepAliveTime:線程空閑時(shí)間
? ? ? ? * 當(dāng)線程空閑時(shí)間達(dá)到keepAliveTime時(shí),線程會(huì)退出,直到線程數(shù)量=corePoolSize
? ? ? ? * 如果allowCoreThreadTimeout=true,則會(huì)直到線程數(shù)量=0
? ? 5、allowCoreThreadTimeout:允許核心線程超時(shí)
? ? 6、rejectedExecutionHandler:任務(wù)拒絕處理器
? ? ? ? * 兩種情況會(huì)拒絕處理任務(wù):
? ? ? ? ? ? - 當(dāng)線程數(shù)已經(jīng)達(dá)到maxPoolSize,切隊(duì)列已滿,會(huì)拒絕新任務(wù)
? ? ? ? ? ? - 當(dāng)線程池被調(diào)用shutdown()后,會(huì)等待線程池里的任務(wù)執(zhí)行完畢,再shutdown。如果在調(diào)用shutdown()和線程池真正shutdown之間提交任務(wù),會(huì)拒絕新任務(wù)
? ? ? ? * 線程池會(huì)調(diào)用rejectedExecutionHandler來處理這個(gè)任務(wù)。如果沒有設(shè)置默認(rèn)是AbortPolicy,會(huì)拋出異常
? ? ? ? * ThreadPoolExecutor類有幾個(gè)內(nèi)部實(shí)現(xiàn)類來處理這類情況:
? ? ? ? ? ? - AbortPolicy 丟棄任務(wù),拋運(yùn)行時(shí)異常
? ? ? ? ? ? - CallerRunsPolicy 執(zhí)行任務(wù)
? ? ? ? ? ? - DiscardPolicy 忽視,什么都不會(huì)發(fā)生
? ? ? ? ? ? - DiscardOldestPolicy 從隊(duì)列中踢出最先進(jìn)入隊(duì)列(最后一個(gè)執(zhí)行)的任務(wù)
? ? ? ? * 實(shí)現(xiàn)RejectedExecutionHandler接口,可自定義處理器
二、ThreadPoolExecutor執(zhí)行順序
? ? ? ? 線程池按以下行為執(zhí)行任務(wù)
? ? 1. 當(dāng)線程數(shù)小于核心線程數(shù)時(shí),創(chuàng)建線程。
? ? 2. 當(dāng)線程數(shù)大于等于核心線程數(shù),且任務(wù)隊(duì)列未滿時(shí),將任務(wù)放入任務(wù)隊(duì)列。
? ? 3. 當(dāng)線程數(shù)大于等于核心線程數(shù),且任務(wù)隊(duì)列已滿
? ? ? ? - 若線程數(shù)小于最大線程數(shù),創(chuàng)建線程
? ? ? ? - 若線程數(shù)等于最大線程數(shù),拋出異常,拒絕任務(wù)
三、如何設(shè)置參數(shù)
? ? 1、默認(rèn)值
? ? ? ? * corePoolSize=1
? ? ? ? * queueCapacity=Integer.MAX_VALUE
? ? ? ? * maxPoolSize=Integer.MAX_VALUE
? ? ? ? * keepAliveTime=60s
? ? ? ? * allowCoreThreadTimeout=false
? ? ? ? * rejectedExecutionHandler=AbortPolicy()
? ? 2、如何來設(shè)置
? ? ? ? * 需要根據(jù)幾個(gè)值來決定
? ? ? ? ? ? - tasks :每秒的任務(wù)數(shù),假設(shè)為500~1000
? ? ? ? ? ? - taskcost:每個(gè)任務(wù)花費(fèi)時(shí)間,假設(shè)為0.1s
? ? ? ? ? ? - responsetime:系統(tǒng)允許容忍的最大響應(yīng)時(shí)間,假設(shè)為1s
? ? ? ? * 做幾個(gè)計(jì)算
? ? ? ? ? ? - corePoolSize = 每秒需要多少個(gè)線程處理??
? ? ? ? ? ? ? ? * threadcount = tasks/(1/taskcost) =tasks*taskcout =? (500~1000)*0.1 = 50~100 個(gè)線程。corePoolSize設(shè)置應(yīng)該大于50
? ? ? ? ? ? ? ? * 根據(jù)8020原則,如果80%的每秒任務(wù)數(shù)小于800,那么corePoolSize設(shè)置為80即可
? ? ? ? ? ? - queueCapacity = (coreSizePool/taskcost)*responsetime
? ? ? ? ? ? ? ? * 計(jì)算可得 queueCapacity = 80/0.1*1 = 80。意思是隊(duì)列里的線程可以等待1s,超過了的需要新開線程來執(zhí)行
? ? ? ? ? ? ? ? * 切記不能設(shè)置為Integer.MAX_VALUE,這樣隊(duì)列會(huì)很大,線程數(shù)只會(huì)保持在corePoolSize大小,當(dāng)任務(wù)陡增時(shí),不能新開線程來執(zhí)行,響應(yīng)時(shí)間會(huì)隨之陡增。
? ? ? ? ? ? - maxPoolSize = (max(tasks) - queueCapacity)/(1/taskcost)
? ? ? ? ? ? ? ? * 計(jì)算可得 maxPoolSize = (1000-80)/10 = 92
? ? ? ? ? ? ? ? * (最大任務(wù)數(shù)-隊(duì)列容量)/每個(gè)線程每秒處理能力 = 最大線程數(shù)
? ? ? ? ? ? - rejectedExecutionHandler:根據(jù)具體情況來決定,任務(wù)不重要可丟棄,任務(wù)重要?jiǎng)t要利用一些緩沖機(jī)制來處理
? ? ? ? ? ? - keepAliveTime和allowCoreThreadTimeout采用默認(rèn)通常能滿足
? ? 3、 以上都是理想值,實(shí)際情況下要根據(jù)機(jī)器性能來決定。如果在未達(dá)到最大線程數(shù)的情況機(jī)器cpu load已經(jīng)滿了,則需要通過升級(jí)硬件(呵呵)和優(yōu)化代碼,降低taskcost來處理。
線程池代碼
public class ThreadPoolExecutor extends AbstractExecutorService {.....public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);... }從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,并提供了四個(gè)構(gòu)造器,事實(shí)上,通過觀察每個(gè)構(gòu)造器的源碼具體實(shí)現(xiàn),發(fā)現(xiàn)前面三個(gè)構(gòu)造器都是調(diào)用的第四個(gè)構(gòu)造器進(jìn)行的初始化工作。
? 各個(gè)參數(shù)的含義:
- corePoolSize:核心池的大小,這個(gè)參數(shù)跟后面講述的線程池的實(shí)現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒有任何線程,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個(gè)方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒有任務(wù)到來之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中;
- maximumPoolSize:線程池最大線程數(shù),這個(gè)參數(shù)也是一個(gè)非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個(gè)線程;
- keepAliveTime:表示線程沒有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才會(huì)起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí),keepAliveTime參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0;
- unit:參數(shù)keepAliveTime的時(shí)間單位,有7種取值,在TimeUnit類中有7種靜態(tài)屬性:
- workQueue:一個(gè)阻塞隊(duì)列,用來存儲(chǔ)等待執(zhí)行的任務(wù),這個(gè)參數(shù)的選擇也很重要,會(huì)對(duì)線程池的運(yùn)行過程產(chǎn)生重大影響,一般來說,這里的阻塞隊(duì)列有以下幾種選擇:
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊(duì)策略與BlockingQueue有關(guān)。
- threadFactory:線程工廠,主要用來創(chuàng)建線程;
- handler:表示當(dāng)拒絕處理任務(wù)時(shí)的策略,有以下四種取值:
最全參數(shù)的線程池:
// 最全參數(shù)的線程池// 參數(shù)順序:核心線程數(shù)、最大線程數(shù)、空閑線程超時(shí)時(shí)間、超時(shí)后線程銷毀、隊(duì)列長(zhǎng)度、線程構(gòu)造器、執(zhí)行任務(wù)
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1),new ThreadFactoryBuilder().setNameFormat("thread-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());for (int i = 0; i<100; i++) {final int finalI = i;threadPoolExecutor.execute(new Runnable() {@Overridepublic void run() {System.out.println(finalI + ":" + Thread.currentThread().getName() + " is running");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(finalI + ":" + Thread.currentThread().getName() + " is end");}});}threadPoolExecutor.shutdown();
執(zhí)行方法
execute方法無返回參數(shù),如需接收方法執(zhí)行返回參數(shù)可使用callable方法,并用Future接收返回的參數(shù)。
Future<String> future = threadPoolExecutor.callable(...)?
注意事項(xiàng):
Java通過Executors提供四種線程池,分別為:
newCachedThreadPool創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。
newScheduledThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。
newSingleThreadExecutor 創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
一般場(chǎng)景也可以采用以上方法,但是建議還是采用自己配置參數(shù)的方法,這樣可以使得線程池可控,自己掌握。
轉(zhuǎn)載于:https://www.cnblogs.com/loveyaoyao/p/10217253.html
總結(jié)
以上是生活随笔為你收集整理的java 线程池ThreadPoolExecutor的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 功能扩展
- 下一篇: Docker在Linux上运行NetCo