Executor家族的辨析
先說說使用線程池的好處,比如可以控制線程的數量,節省反復創建線程和銷毀線程的開銷等,在開發中的使用,一般來說任務量肯定是大于線程數量的,而為了防止出現OOM,都是建議設置相對應業務的合適線程數量。那是在線程池中線程就只有那些,肯定是要做到線程的重復利用,才能執行超過線程的任務量的,那么線程池是怎么做到"線程復用"的呢?
從源碼開始逐步分析,從execute()方法開始
//原子變量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));public void execute(Runnable command) {//判斷提交的Runnable 任務,如果為null,則報NullPointerExceptionif (command == null)throw new NullPointerException(); int c = ctl.get();//判斷當前線程數是否小于核心線程數,如果小于,那就調用addWorker方法新增一個Worker,也可以理解成一個線程if (workerCountOf(c) < corePoolSize) {//addWorker這個方法主要要做的事就是執行command,同時第二個參數決定以哪個界限來進行是否新增線程的判斷,傳入true則代表以核心線程數為判斷條件if (addWorker(command, true))return;c = ctl.get();}//走到這步邏輯,則說明線程數大于等于核心線程數,或者addWorker方法調用失敗了,這時就判斷線程池是否是Running狀態,如果是就調用offer方法提交線程任務到任務隊列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//如果線程池狀態不是Running,說明線程池已經被關閉,這時就移除新提交到隊列中的任務if (! isRunning(recheck) && remove(command))//執行拒絕策略reject(command);//檢查下當前線程數是不是為0,為0的話就沒有線程執行任務了 else if (workerCountOf(recheck) == 0)//所以就通過addWorker新建一個線程addWorker(null, false);}//走到這步邏輯,要么是線程池狀態不是Running,說明已經關閉了,要么就是添加任務進任務隊列時失敗了,說明任務隊列滿了,這時候就該添加最大線程數了,傳入false則代表以最大線程數為判斷條件else if (!addWorker(command, false))//以上addWorker方法如果返回結果是false,就會執行拒絕策略了reject(command);}以上粗略的過了下execute()方法的源碼,可以發現有個addWorker()方法無處不在,同時它也有新建線程和運行任務的作用,所以重點看下
addWorker()方法源碼?
private boolean addWorker(Runnable firstTask, boolean core) {//可以當做一個標記位retry://沒有主動跳出就會無限的循環for (;;) {int c = ctl.get();int rs = runStateOf(c);//做校驗,比如rs >= SHUTDOWN說明線程池狀態不正常,比如rs == SHUTDOWN說明線程池關閉了, firstTask == null說明無任務執行等,就直接返回falseif (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//沒有主動跳出就會無限的循環for (;;) {//獲取當前線程數int wc = workerCountOf(c);//判斷下當前線程數是不是超過規定了,是就直接返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//CAS操作,比較并做+1操作if (compareAndIncrementWorkerCount(c))//失敗了的話就直接跳出循環到retry那里,且不再進循環了break retry; c = ctl.get(); // Re-read ctl// 如果線程池狀態被更改的話if (runStateOf(c) != rs)//直接跳出循環到retry那里,會再次進循環continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//創建一個Worker對象,以Runnable實例對象firstTask為參數w = new Worker(firstTask);//拿到線程對象final Thread t = w.thread;//線程對象不為空的話if (t != null) {//定義個鎖final ReentrantLock mainLock = this.mainLock;//加鎖mainLock.lock();try {//獲取線程池狀態int rs = runStateOf(ctl.get());//做校驗,不符合則拋IllegalThreadStateException異常if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//否則把w對象加到workers中,workers是HashSet類型的對象 workers.add(w);int s = workers.size();//比較下,再賦個值if (s > largestPoolSize)largestPoolSize = s;//修改標志,說明添加成功了workerAdded = true;}} finally {//解鎖mainLock.unlock();}//如果添加成功了if (workerAdded) {//就調用start方法執行任務,這個start做的就是運行任務的事了t.start();//修改標志位workerStarted = true;}}} finally {//最后處理下那些,添加成功了,但是沒有開始執行的任務if (! workerStarted)//這個方法主要作用就是把以上任務移除掉addWorkerFailed(w);}//返回運行結果return workerStarted;}看完這部分,其實還是沒發現線程池到底是怎么做到線程復用的,不過可以發現有個關鍵類,接下來就看看它的相關源碼:
Worker類的相關源碼?
//可以發現Worker實現了Runnable private final class Worker extends AbstractQueuedSynchronizerimplements Runnable{//這是Worker類的構造方法 Worker(Runnable firstTask) {//設置一下狀態位state,和AQS有關setState(-1); // inhibit interrupts until runWorker//賦值this.firstTask = firstTask;//拿到線程工廠,并創建一個線程,然后很騷氣的把Worker對象做為參數去做線程的初始化this.thread = getThreadFactory().newThread(this);} }實現了Runnable接口,那必然是要重寫run方法的,看看Worker類的run方法:?
public void run() {runWorker(this); }看看runWorker方法源碼:?
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;//設置為null,幫助gc回收w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//這步做的通過取Worker對象的firstTask或者通過getTask方法從工作隊列中獲取待執行的任務,只要不為null,就一直會循環while (task != null || (task = getTask()) != null) {w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {//然后直接調用task的run方法來執行具體的任務task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}分析到這里,關鍵出現在task.run()上,首先我們知道task是Runnable類型的,直接調用run方法的話,JVM是不會幫我們去生成新線程的,就像和調用普通方法一樣,所以一個線程始終都會在whlie循環的邏輯中不斷的被重復利用,然后去取Worker對象的firstTask或者通過getTask方法從工作隊列中獲取待執行的任務,再直接調用Runnable類型的run方法執行任務。
總結
簡單的從源碼來總結下,執行任務先調用的是t.start()方法,這個t是個Thread對象,而這個Thread對象則是從Worker對象里獲得的,在Worker在做初始化時就會賦值Thread,同時Worker初始化Thread對象時又是以自己作為參數來完成,而Worker對象又是個實現了Runnable接口的類,那Worker對象就肯定有自己的run方法,所以t.start()方法真正意義上調用的是Worker對象中重寫的run方法,而這個Worker對象中的run方法里沒有所謂的本地start方法,JVM自然不會再創建新的線程,而是把它當普通方法一樣執行。再加上whlie循環體,這樣就做到了Worker對象新建的線程始終都會在一個大循環里,而這個線程會反復的獲取任務,接著執行任務,知道任務都執行完畢,這就是線程池實現“線程復用”的原理。
?
總結
以上是生活随笔為你收集整理的Executor家族的辨析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 五虎上将收服线程池
- 下一篇: 线程池状态和使用注意点