java 线程池 -- (Java并发)
池技術是作為一個架構師必須深刻理解的東西,比如線程池,連接池,對象池,內存池等。
首先需要問一個問題:在c/c++ 編程中,你是如何操作一個任務的或者給一個線程添加任務的?如果你很清楚,那么你知道Java中runnable和thread的區別嗎?
Java中沒有指針,那么沒有辦法給一個線程直接傳方法指針(傳遞方法,作為任務),那么直接可以傳對象,Runnable對象。線程拿到runnable對象的時候,就會調用run()方法。
Java中的線程池其實沒有什么高深的思想的,無非就是線程管理的算法。為什么需要線程池?????
說白了,就是存活一定數量的線程,大量并發任務交給他們處理,線程數量可控,線程不是動態的創建銷毀,而是一直存活在池中。
在Java中,如果每當一個請求到達就創建一個新線程,開銷是相當大的。在實際使用中,每個請求創建新線程的服務器在創建和銷毀線程上花費的時間和消耗的系統資源,甚至可能要比花在處理實際的用戶請求的時間和資源要多得多。除了創建和銷毀線程的開銷之外,活動的線程也需要消耗系統資源。如果在一個JVM里創建太多的線程,可能會導致系統由于過度消耗內存或“切換過度”而導致系統資源不足。為了防止資源不足,服務器應用程序需要一些辦法來限制任何給定時刻處理的請求數目,盡可能減少創建和銷毀線程的次數,特別是一些資源耗費比較大的線程的創建和銷毀,盡量利用已有對象來進行服務,這就是“池化資源”技術產生的原因。?
這篇文章寫了好久,回頭看看,當時對于線程池的理解雖然都是基于別人的話語中,但是主要的點還是說到了;比如線程池避免線程的過度創建(對于系統的穩定性危害極大),避免只針對任務的執行創建和銷毀線程,這樣對系統資源就是很大的浪費;線程的創建和銷毀是很耗資源的(有時甚至比你處理任務的消耗的內存和計算還要多,而且響應時間也延時嚴重)。對于針對任務,設計線程池的線程數目大小也是線程池設計極為關鍵的考量。對于軟件的掌握深度,其實不容易,在于你操作的經驗,在于你對于底層的理解深度;不然你掌握的效率有極大的折扣,學習任何一門科學都是這樣,這就是在沒有深度的條件下學習,效率就是回環線,收獲很小。
線程池主要用來解決線程生命周期開銷問題和資源不足問題。通過對多個任務重用線程,線程創建的開銷就被分攤到了多個任務上了,而且由于在請求到達時線程已經存在,所以消除了線程創建所帶來的延遲。這樣,就可以立即為請求服務,使應用程序響應更快。另外,通過適當地調整線程池中的線程數目可以防止出現資源不足的情況。缺點:當任務較少的時候還是每一個任務對應創建一個線程,任務較大的時候,有大量的任務在等待。?
說說Java中的java.util.concurrent.ThreadPoolExecutor:
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現源碼。
在ThreadPoolExecutor類中提供了四個構造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {.....public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);... }從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,并提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。
? 下面解釋下一下構造器中各個參數的含義:
- corePoolSize:核心池的大小,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中并沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
- maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;
- keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大于corePoolSize,即當線程池中的線程數大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大于corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
- unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
- workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。
- threadFactory:線程工廠,主要用來創建線程;
- handler:表示當拒絕處理任務時的策略,有以下四種取值:
? 具體參數的配置與線程池的關系將在下一節講述。
從上面給出的ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現:
public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };public Future<?> submit(Runnable task) {};public <T> Future<T> submit(Runnable task, T result) { };public <T> Future<T> submit(Callable<T> task) { };private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {};public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {};public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {};public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {};public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {}; }AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。
我們接著看ExecutorService接口的實現:
public interface ExecutorService extends Executor {void shutdown();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; }而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實現:
public interface Executor {void execute(Runnable command); }到這里,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關系了。
Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;
然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法;
然后ThreadPoolExecutor繼承了類AbstractExecutorService。
在ThreadPoolExecutor類中有幾個非常重要的方法:
execute() submit() shutdown() shutdownNow()?
execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中并沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。
shutdown()和shutdownNow()是用來關閉線程池的。
還有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法,有興趣的朋友可以自行查閱API。
-----------------------------至此 Java.util.ThreadPoolExecutor 源碼解析完成--------------------------------
其實現原理:
1.線程池狀態
在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:
2.任務的執行
在了解將任務提交給線程池到任務執行完畢整個過程之前,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:
private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來存放等待執行的任務 private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態鎖,對線程池狀態(比如線程池大小//、runState等)的改變都要使用這個鎖 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集private volatile long keepAliveTime; //線程存活時間 private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設置存活時間 private volatile int corePoolSize; //核心池的大小(即線程池中的線程數目大于這個參數時,提交的任務會被放進任務緩存隊列) private volatile int maximumPoolSize; //線程池最大能容忍的線程數private volatile int poolSize; //線程池中當前的線程數private volatile RejectedExecutionHandler handler; //任務拒絕策略private volatile ThreadFactory threadFactory; //線程工廠,用來創建線程private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數private long completedTaskCount; //用來記錄已經執行完畢的任務個數 corePoolSize在很多地方被翻譯成核心池大小,其實我的理解這個就是線程池的大小。舉個簡單的例子:假如有一個工廠,工廠里面有10個工人,每個工人同時只能做一件任務。
因此只要當10個工人中有工人是空閑的,來了任務就分配給空閑的工人做;當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;
如果說新任務數目增長的速度遠遠大于工人做任務的速度,那么此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;
然后就將任務也分配給這4個臨時工人做;如果說著14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。
當這14個工人當中有人空閑時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。
這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。
不過為了方便理解,在本文后面還是將corePoolSize翻譯成核心池大小。
largestPoolSize只是一個用來起記錄作用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關系。
下面我們進入正題,看一下任務從提交到最終執行完畢經歷了哪些過程。
在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法里面最終調用的還是execute()方法,所以我們只需要研究execute()方法的實現原理即可:
上面的代碼可能看起來不是那么容易理解,下面我們一句一句解釋:
首先,判斷提交的任務command是否為null,若是null,則拋出空指針異常;
接著是這句,這句要好好理解一下:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))? ?由于是或條件運算符,所以先計算前半部分的值,如果線程池中當前線程數不小于核心池大小,那么就會直接進入下面的if語句塊了。
如果線程池中當前線程數小于核心池大小,則接著執行后半部分,也就是執行:
addIfUnderCorePoolSize(command)如果執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,否則整個方法就直接執行完畢了。
如果執行完addIfUnderCorePoolSize這個方法返回false,然后接著判斷:
if (runState == RUNNING && workQueue.offer(command))? 如果當前線程池處于RUNNING狀態,則將任務放入任務緩存隊列;如果當前線程池不處于RUNNING狀態或者任務放入緩存隊列失敗,則執行:
addIfUnderMaximumPoolSize(command)如果執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。
回到前面:
if (runState == RUNNING && workQueue.offer(command))? 這句的執行,如果說當前線程池處于RUNNING狀態且將任務放入任務緩存隊列成功,則繼續進行判斷:
if (runState != RUNNING || poolSize == 0)? 這句判斷是為了防止在將此任務添加進任務緩存隊列的同時其他線程突然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。如果是這樣就執行:
ensureQueuedTaskHandled(command)? 進行應急處理,從名字可以看出是保證 添加到任務緩存隊列中的任務得到處理。
我們接著看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < corePoolSize && runState == RUNNING)t = addThread(firstTask); //創建線程去執行firstTask任務 } finally {mainLock.unlock();}if (t == null)return false;t.start();return true; }? 這個是addIfUnderCorePoolSize方法的具體實現,從名字可以看出它的意圖就是當低于核心池大小時執行的方法。下面看其具體實現,首先獲取到鎖,因為這地方涉及到線程池狀態的變化,先通過if語句判斷當前線程池中的線程數目是否小于核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有線程池當前線程數目小于核心池大小才會執行addIfUnderCorePoolSize方法的,為何這地方還要繼續判斷?原因很簡單,前面的判斷過程中并沒有加鎖,因此可能在execute方法判斷的時候poolSize小于corePoolSize,而判斷完之后,在其他線程中又向線程池提交了任務,就可能導致poolSize不小于corePoolSize了,所以需要在這個地方繼續判斷。然后接著判斷線程池的狀態是否為RUNNING,原因也很簡單,因為有可能在其他線程中調用了shutdown或者shutdownNow方法。然后就是執行
t = addThread(firstTask);? 這個方法也非常關鍵,傳進去的參數為提交的任務,返回值為Thread類型。然后接著在下面判斷t是否為空,為空則表明創建線程失敗(即poolSize>=corePoolSize或者runState不等于RUNNING),否則調用t.start()方法啟動線程。
我們來看一下addThread方法的實現:
private Thread addThread(Runnable firstTask) {Worker w = new Worker(firstTask);Thread t = threadFactory.newThread(w); //創建一個線程,執行任務 if (t != null) {w.thread = t; //將創建的線程的引用賦值為w的成員變量 workers.add(w);int nt = ++poolSize; //當前線程數加1 if (nt > largestPoolSize)largestPoolSize = nt;}return t; }? 在addThread方法中,首先用提交的任務創建了一個Worker對象,然后調用線程工廠threadFactory創建了一個新的線程t,然后將線程t的引用賦值給了Worker對象的成員變量thread,接著通過workers.add(w)將Worker對象添加到工作集當中。
下面我們看一下Worker類的實現:
private final class Worker implements Runnable {private final ReentrantLock runLock = new ReentrantLock();private Runnable firstTask;volatile long completedTasks;Thread thread;Worker(Runnable firstTask) {this.firstTask = firstTask;}boolean isActive() {return runLock.isLocked();}void interruptIfIdle() {final ReentrantLock runLock = this.runLock;if (runLock.tryLock()) {try {if (thread != Thread.currentThread())thread.interrupt();} finally {runLock.unlock();}}}void interruptNow() {thread.interrupt();}private void runTask(Runnable task) {final ReentrantLock runLock = this.runLock;runLock.lock();try {if (runState < STOP &&Thread.interrupted() &&runState >= STOP)boolean ran = false;beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶可以根據//自己需要重載這個方法和后面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等 try {task.run();ran = true;afterExecute(task, null);++completedTasks;} catch (RuntimeException ex) {if (!ran)afterExecute(task, ex);throw ex;}} finally {runLock.unlock();}}public void run() {try {Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this); //當任務隊列中沒有任務時,進行清理工作 }} }? 它實際上實現了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本一樣:
Thread t = new Thread(w);? 相當于傳進去了一個Runnable任務,在線程t中執行這個Runnable。
既然Worker實現了Runnable接口,那么自然最核心的方法便是run()方法了:
public void run() {try {Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this);} }? 從run方法的實現可以看出,它首先執行的是通過構造器傳進來的任務firstTask,在調用runTask()執行完firstTask之后,在while循環里面不斷通過getTask()去取新的任務來執行,那么去哪里取呢?自然是從任務緩存隊列里面去取,getTask是ThreadPoolExecutor類中的方法,并不是Worker類中的方法,下面是getTask方法的實現:
Runnable getTask() {for (;;) {try {int state = runState;if (state > SHUTDOWN)return null;Runnable r;if (state == SHUTDOWN) // Help drain queuer = workQueue.poll();else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數大于核心池大小或者允許為核心池線程設置空閑時間,//則通過poll取任務,若等待一定的時間取不到任務,則返回nullr = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);elser = workQueue.take();if (r != null)return r;if (workerCanExit()) { //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出if (runState >= SHUTDOWN) // Wake up othersinterruptIdleWorkers(); //中斷處于空閑狀態的workerreturn null;}// Else retry} catch (InterruptedException ie) {// On interruption, re-check runState}} }? ? 在getTask中,先判斷當前線程池狀態,如果runState大于SHUTDOWN(即為STOP或者TERMINATED),則直接返回null。
如果runState為SHUTDOWN或者RUNNING,則從任務緩存隊列取任務。
如果當前線程池的線程數大于核心池大小corePoolSize或者允許為核心池中的線程設置空閑存活時間,則調用poll(time,timeUnit)來取任務,這個方法會等待一定的時間,如果取不到任務就返回null。
然后判斷取到的任務r是否為null,為null則通過調用workerCanExit()方法來判斷當前worker是否可以退出,我們看一下workerCanExit()的實現:
private boolean workerCanExit() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();boolean canExit;//如果runState大于等于STOP,或者任務緩存隊列為空了//或者 允許為核心池線程設置空閑存活時間并且線程池中的線程數目大于1try {canExit = runState >= STOP ||workQueue.isEmpty() ||(allowCoreThreadTimeOut &&poolSize > Math.max(1, corePoolSize));} finally {mainLock.unlock();}return canExit; }? 也就是說如果線程池處于STOP狀態、或者任務隊列已為空或者允許為核心池線程設置空閑存活時間并且線程數大于1時,允許worker退出。如果允許worker退出,則調用interruptIdleWorkers()中斷處于空閑狀態的worker,我們看一下interruptIdleWorkers()的實現:
void interruptIdleWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) //實際上調用的是worker的interruptIfIdle()方法w.interruptIfIdle();} finally {mainLock.unlock();} }? 從實現可以看出,它實際上調用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:
void interruptIfIdle() {final ReentrantLock runLock = this.runLock;if (runLock.tryLock()) { //注意這里,是調用tryLock()來獲取鎖的,因為如果當前worker正在執行任務,鎖已經被獲取了,是無法獲取到鎖的 //如果成功獲取了鎖,說明當前worker處于空閑狀態try {if (thread != Thread.currentThread()) thread.interrupt();} finally {runLock.unlock();}} }?? 這里有一個非常巧妙的設計方式,假如我們來設計線程池,可能會有一個任務分派線程,當發現有線程空閑時,就從任務緩存隊列中取一個任務交給空閑線程執行。但是在這里,并沒有采用這樣的方式,因為這樣會要額外地對任務分派線程進行管理,無形地會增加難度和復雜度,這里直接讓執行完任務的線程去任務緩存隊列里面取任務來執行。
? 我們再看addIfUnderMaximumPoolSize方法的實現,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在于addIfUnderMaximumPoolSize方法是在線程池中的線程數達到了核心池大小并且往任務隊列中添加任務失敗的情況下執行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < maximumPoolSize && runState == RUNNING)t = addThread(firstTask);} finally {mainLock.unlock();}if (t == null)return false;t.start();return true; }? 看到沒有,其實它和addIfUnderCorePoolSize方法的實現基本一模一樣,只是if語句判斷條件中的poolSize < maximumPoolSize不同而已。
到這里,大部分朋友應該對任務提交給線程池之后到被執行的整個過程有了一個基本的了解,下面總結一下:
1)首先,要清楚corePoolSize和maximumPoolSize的含義;
2)其次,要知道Worker是用來起到什么作用的;
3)要知道任務提交給線程池之后的處理策略,這里總結一下主要有4點:
- 如果當前線程池中的線程數目小于corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
- 如果當前線程池中的線程數目>=corePoolSize(其實就是=coresize),則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務,直到達到maximumPoolSize;
- 如果當前線程池中的線程數目達到maximumPoolSize,且排隊隊列已經滿了,則會采取任務拒絕策略進行處理;
- 如果線程池中的線程數量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大于corePoolSize;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。
3.線程池中的線程初始化
默認情況下,創建線程池之后,線程池中是沒有線程的,需要提交任務之后才會創建線程。
在實際中如果需要線程池創建之后立即創建線程,可以通過以下兩個方法辦到:
- prestartCoreThread():初始化一個核心線程;
- prestartAllCoreThreads():初始化所有核心線程
下面是這2個方法的實現:
public boolean prestartCoreThread() {return addIfUnderCorePoolSize(null); //注意傳進去的參數是null }public int prestartAllCoreThreads() {int n = 0;while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null++n;return n; }? 注意上面傳進去的參數是null,根據第2小節的分析可知如果傳進去的參數為null,則最后執行線程會阻塞在getTask方法中的
r = workQueue.take();? 即等待任務隊列中有任務。
4.任務緩存隊列及排隊策略
在前面我們多次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。
workQueue的類型為BlockingQueue<Runnable>,通常可以取下面三種類型:
1)ArrayBlockingQueue:基于數組的先進先出隊列,此隊列創建時必須指定大小;
2)LinkedBlockingQueue:基于鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;
3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
5.任務拒絕策略
當線程池的任務緩存隊列已滿并且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略,通常有以下四種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程) ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務6.線程池的關閉
ThreadPoolExecutor提供了兩個方法,用于線程池的關閉,分別是shutdown()和shutdownNow(),其中:
- shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務
- shutdownNow():立即終止線程池,并嘗試打斷正在執行的任務,并且清空任務緩存隊列,返回尚未執行的任務
7.線程池容量的動態調整
ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
- setCorePoolSize:設置核心池大小
- setMaximumPoolSize:設置線程池最大能創建的線程數目大小
當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創建新的線程來執行任務。
8.??? 合理的配置線程池
要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:
任務性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務配置盡可能少的線程數量,如配置Ncpu+1個線程的線程池。IO密集型任務則由于需要等待IO操作,線程并不是一直在執行任務,則配置盡可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐率要高于串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。
優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,需要注意的是如果一直有優先級高的任務提交到隊列里,那么優先級低的任務可能永遠不能執行。
執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。
依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,如果等待的時間越長CPU空閑時間就越長,那么線程數應該設置越大,這樣才能更好的利用CPU。
建議使用有界隊列,有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千。有一次我們組使用的后臺任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后臺任務線程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻塞住,任務積壓在線程池里。如果當時我們設置成無界隊列,線程池的隊列就會越來越多,有可能會撐滿內存,導致整個系統不可用,而不只是后臺任務出現問題。當然我們的系統所有的任務是用的單獨的服務器部署的,而我們使用不同規模的線程池跑不同類型的任務,但是出現這樣問題時也會影響到其他任務。
9.??? 線程池的監控
通過線程池提供的參數進行監控。線程池里有一些屬性在監控線程池的時候可以使用
- taskCount:線程池需要執行的任務數量。
- completedTaskCount:線程池在運行過程中已完成的任務數量。小于或等于taskCount。
- largestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經滿了。
- getPoolSize:線程池的線程數量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減。
- getActiveCount:獲取活動的線程數。
通過擴展線程池進行監控。通過繼承線程池并重寫線程池的beforeExecute,afterExecute和terminated方法,我們可以在任務執行前,執行后和線程池關閉前干一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池里是空方法。如:
view sourceprint?| 1 | protected void beforeExecute(Thread t, Runnable r) { } |
總結
以上是生活随笔為你收集整理的java 线程池 -- (Java并发)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 劲舞团抽奖怎样卡时间
- 下一篇: 精灵宝可梦letsgo超梦在哪抓(精灵长