Java 并发:Executor ExecutorService ThreadPoolExecutor
Executor
Executor僅僅是一個簡單的接口,其定義如下
public interface Executor {void execute(Runnable command); }作為一個簡單的線程池的話,實現(xiàn)這個接口就可以使用了。不過單單這樣的話,無法使用Future功能。
ExecutorService
public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }ExecutorService接口擴展了Executor接口,加入了線程池生命周期的管理,還加入了Future功能。除了提交runnable外還可以使用Callable用于返回結(jié)果的任務(wù)。這里要注意execute和submit的區(qū)別,execute是用于實現(xiàn)Executor接口的,submit則提供了任務(wù)的Future機制,submit的實現(xiàn)是基于線程池execute基本功能的。實際上Future機制的大部分代碼都在FutureTask這個類里,反倒和線程池關(guān)系不大。
AbstractExecutorService
AbstractExecutorService實現(xiàn)了部分invoke系列接口和submit系列接口,它們都依賴子類實現(xiàn)的execute方法。這也說明實現(xiàn)線程池關(guān)鍵是提供管理其生命周期和執(zhí)行任務(wù)的接口,至于submit提供的Future機制可以基于這些很快的實現(xiàn),比如:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}ThreadPoolExecutor
ThreadPoolExecutor是JDK中主要線程池的一個實現(xiàn),提供了多種不同的構(gòu)造函數(shù)可以依據(jù)參數(shù)得到不同特性的線程池,它對于的工廠方法類為Executors。
組成成分
ThreadPoolExecutor從其工廠函數(shù)就可以大致看出各個組成成分,具體如下
任務(wù)隊列
通過execute提交的任務(wù)(submit操作最后也通過execute進行任務(wù)執(zhí)行),會有可能先進入任務(wù)隊列而不是立即被線程池執(zhí)行。這依賴于當(dāng)前的線程池狀態(tài)和設(shè)定的參數(shù),如果當(dāng)前創(chuàng)建的線程數(shù)尚未達到corePoolSize那么會立即創(chuàng)建一個線程,否則則會嘗試加入隊列之中。
線程集合
作為一個線程池,它肯定需要創(chuàng)建線程,并保存這些線程的狀態(tài)信息。因為線程池內(nèi)的線程是專門用來運行提交的Runnable活著Callable任務(wù)的,他們除了維護狀態(tài)信息外基本不會為自己干點什么,一般這樣的線程叫做worker,或工作者線程。在內(nèi)部使用HashSet保存這些線程對象。
線程工廠
可以按照需要定制thread對象,比如設(shè)置線程池內(nèi)線程名稱,調(diào)整daemon屬性等。
拒絕策略
如果線程池處理數(shù)量達到上限(隊列已滿且已有線程數(shù)達到maximumPoolSize)則開始拒絕任務(wù),相當(dāng)于提供了一個鉤子函數(shù)
池內(nèi)線程
線程池內(nèi)的線程除了運行用戶提交的任務(wù)外,還需要維護自己的一些狀態(tài)信息。這JDK的實現(xiàn)中工作者線程運行邏輯用一個實現(xiàn)了Runnable接口并繼承了AbstractQueuedSynchronizer的一個類Worker來表示。它沒有繼承Thread類,而只是實現(xiàn)了Runnable接口,具體創(chuàng)建線程的過程交給了用戶可以自己定制的ThreadFactory線程工廠。
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}線程生成
線程的生成均由addWorker這個函數(shù)進行,從線程池創(chuàng)建后,線程生成主要發(fā)生在幾種情況下:
- 提交了一個任務(wù)并且當(dāng)前線程數(shù)量小于corePoolSize(不管是否有空閑著的線程)
- 提交了一個任務(wù)并且(a.沒有線程空閑 & b.任務(wù)隊列無法添加任務(wù) & c.任務(wù)數(shù)小于設(shè)定的最大值)
- 調(diào)用了prestartAllCoreThreads,由于在創(chuàng)建后生成所有corePoolSize數(shù)量的線程
- 調(diào)用了prestartCoreThread,用于在創(chuàng)建后生成一個線程,如果數(shù)量已經(jīng)達到corePoolSize則忽略
后面兩個方法可以方便的來預(yù)熱線程池。如上述給出的Worker構(gòu)造函數(shù)可知它又一個參數(shù)叫做firstTask,這是因為一般情況下線程的創(chuàng)建都是因為有任務(wù)提交引起的(也就是說一個線程池創(chuàng)建后并不會馬上產(chǎn)生指定池大小數(shù)量的線程),firstTask是該Worker線程第一個運行的任務(wù)。當(dāng)Worker線程運行完第一個任務(wù)后,它獲取新任務(wù)的方式就發(fā)生了改變,它會阻塞在任務(wù)隊列上,等待新任務(wù)的到來,firstTask基本就不再使用了。
為什么要采取這樣的方式?如果線程池是一個固定大小的,一創(chuàng)建后立即生成所有工作者線程的這樣的一種實現(xiàn),就完全可以把任務(wù)放到隊列中,所有的Worker線程都從隊列里獲取要執(zhí)行的任務(wù)。但JDK里實現(xiàn)支持動態(tài)的添加工作者線程,新創(chuàng)建的線程總是運行剛剛使得它被創(chuàng)建的那個任務(wù)提交。如果放到隊列里的話還要進行等待其他的任務(wù)先被執(zhí)行。不過這么說也有些牽強,也未必后到得任務(wù)就更重要,反而讓前面排隊的任務(wù)等著。
線程運行
Worker對象實現(xiàn)了runnable接口由ThreadFactory給出的Thread對象負責(zé)正真的執(zhí)行(Thread.start),然后再Worker的run方法中會去執(zhí)行它接收到得任務(wù),
public void run() {runWorker(this);}關(guān)鍵過程如runWorker函數(shù):
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}- task = getTask()就是從任務(wù)隊列取任務(wù)的過程,如果沒有任務(wù)的話會阻塞著,線程是處于WAITING狀態(tài),也即空閑狀態(tài)
- task.run()即運行提交的任務(wù)
- w.completedTasks++每個worker都會統(tǒng)計自己運行的任務(wù)數(shù),通過線程池可以獲得一個大概的總數(shù),之所以是大概因為執(zhí)行的任務(wù)時刻會完成,也沒必要用鎖去保證這個數(shù)字。
beforeExecute和afterExecute是線程池的鉤子函數(shù)可以被子類覆蓋用于實現(xiàn)一些統(tǒng)計功能,但要注意這些是會被線程池內(nèi)不同線程執(zhí)行,所以一般要用到threadlocal機制。
線程狀態(tài)
為什么Worker要繼承AbstractQueuedSynchronizer,因為它要維護自己的一個狀態(tài)變更過程,而且是要支持等待的,其實用一般的lock也可以,不過可能doug lea覺得沒必要再隔一層吧(lock也是用AQS實現(xiàn)的)。狀態(tài)值有下面幾個:- -1 Worker對象被創(chuàng)建但還沒有對應(yīng)的Thread開始運行它,初始化時設(shè)置
- 0 已經(jīng)有對應(yīng)的Thread運行Worker的run方法,但沒有在運行Worker接收的任務(wù)內(nèi)容,worker.unlock(),表示當(dāng)前線程空閑
- 1 正在運行任務(wù),worker.lock()
實際上用到worker.lock/tryLock的地方并不多,一個是在runWorker內(nèi)部,一個就在interruptIdleWorkers這里。這里不得不提下shutdown方法它負責(zé)把線程池關(guān)掉,但是并不是很暴力,只是讓隊列停止接收任務(wù),而讓已經(jīng)執(zhí)行的任務(wù)繼續(xù)直到所有已提交任務(wù)被完成。所以在這里要有選擇的interrupt線程,即選擇那些處于idle狀態(tài)的線程。而idle狀態(tài)的線程按照前面的設(shè)定就是狀態(tài)值為0/-1,即可以獲得lock的那些線程。
主要過程
任務(wù)提交
任務(wù)提交會導(dǎo)致新德工作者線程生成
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}線程退出
工作線程的退出有一下幾個原因:
線程池設(shè)定了allowCoreThreadTimeOut = true,并且獲取任務(wù)等待時長超過keepAliveTime
任務(wù)異常
當(dāng)工作線程執(zhí)行的任務(wù)拋出異常時,工作者線程會退出。當(dāng)時在完全退出前會執(zhí)行processWorkerExit(w, completedAbruptly);,有異常拋出時completedAbruptly為true,所以在該函數(shù)中如果發(fā)現(xiàn)當(dāng)前工作者線程是因為異常而退出的會嘗試著再次執(zhí)行一個addWorker調(diào)用來補上這個要退出的線程。可以發(fā)現(xiàn)如果此時線程池要關(guān)閉或者線程數(shù)量已經(jīng)超過當(dāng)前條件的最小值則不進行線程補充。這個最小值的產(chǎn)生很微妙。
線程池收縮
通過設(shè)定合適的keepAliveTime可以讓線程池多余corePoolSize的線程在一定時間后主動退出,實現(xiàn)線程池的動態(tài)收縮,如果設(shè)定了allowCoreThreadTimeOut = true的話連core線程也可以自動退出,直到一個線程都沒有,從getTask觀察得到:
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}boolean timed; // Are workers subject to culling?for (;;) {int wc = workerCountOf(c);timed = allowCoreThreadTimeOut || wc > corePoolSize;if (wc <= maximumPoolSize && ! (timedOut && timed))break;if (compareAndDecrementWorkerCount(c))return null;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}可以發(fā)現(xiàn)當(dāng)需要自動收縮時通過帶有超時參數(shù)的poll函數(shù)去取得任務(wù)隊列(workQueue)內(nèi)的任務(wù),而一般情況下則使用take調(diào)用無限阻塞。
通過返回一個null值可以使得runWorker中的循環(huán)退出轉(zhuǎn)而執(zhí)行processWorkerExit,注意在Worker線程完全退出前已經(jīng)通過compareAndDecrementWorkerCount將當(dāng)前Worker線程的數(shù)量給減少了,因為直到收到null后的工作線程循環(huán)肯定會馬上退出不再處理后續(xù)任務(wù)了,這也是為什么在前面processWorkerExit函數(shù)內(nèi)要選擇性的進行計數(shù)減的原因。
shutdown
如前面提到的shutdown是一種溫和的關(guān)閉線程池的方式,它不會去interrupt已在運行任務(wù)的線程。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}它使用了interruptIdleWorkers去interrupt那些處于idle狀態(tài)的工作者線程。一旦線程中的任務(wù)響應(yīng)了interrupt請求或主動退出或拋出InterruptedException都會使得工作者線程退出執(zhí)行processWorkerExit方法并進而調(diào)用tryTerminate,使其在平時調(diào)用processWorkerExit時也會執(zhí)行tryTerminate不過不必慌張,因為處于運行狀態(tài)的線程池后綴不做什么立即返回。
final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}在(workerCountOf(c) != 0)時即還有任務(wù)運行時都會嘗試著去interrupt其中的空閑工作者線程(而這些線程退出又會執(zhí)行tryTerminate方法,形成一個鏈式的傳遞)。而那些正在執(zhí)行任務(wù)的工作者線程,雖然現(xiàn)在不能去中斷他們,但在在完成任務(wù)后它們會發(fā)現(xiàn)線程池已經(jīng)處于要關(guān)閉的狀態(tài)也會主動退出。當(dāng)所有的工作者線程都退出時執(zhí)行termination.signalAll();喚醒在termination條件隊列上等的線程。一般是通過調(diào)用awaitTermination方法等待線程池完全退出。
轉(zhuǎn)載于:https://www.cnblogs.com/lailailai/p/4651930.html
總結(jié)
以上是生活随笔為你收集整理的Java 并发:Executor ExecutorService ThreadPoolExecutor的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。