Java多线程(十二)之线程池深入分析(下)
一、數(shù)據(jù)結(jié)構(gòu)與線程構(gòu)造方法
?
由于已經(jīng)看到了ThreadPoolExecutor的源碼,因此很容易就看到了ThreadPoolExecutor線程池的數(shù)據(jù)結(jié)構(gòu)。圖1描述了這種數(shù)據(jù)結(jié)構(gòu)。
圖1 ThreadPoolExecutor 數(shù)據(jù)結(jié)構(gòu)
其實,即使沒有上述圖形描述ThreadPoolExecutor的數(shù)據(jù)結(jié)構(gòu),我們根據(jù)線程池的要求也很能夠猜測出其數(shù)據(jù)結(jié)構(gòu)出來。
- 線程池需要支持多個線程并發(fā)執(zhí)行,因此有一個線程集合Collection<Thread>來執(zhí)行線程任務(wù);
- 涉及任務(wù)的異步執(zhí)行,因此需要有一個集合來緩存任務(wù)隊列Collection<Runnable>;
- 很顯然在多個線程之間協(xié)調(diào)多個任務(wù),那么就需要一個線程安全的任務(wù)集合,同時還需要支持阻塞、超時操作,那么BlockingQueue是必不可少的;
- 既然是線程池,出發(fā)點就是提高系統(tǒng)性能同時降低資源消耗,那么線程池的大小就有限制,因此需要有一個核心線程池大小(線程個數(shù))和一個最大線程池大小(線程個數(shù)),有一個計數(shù)用來描述當前線程池大小;
- 如果是有限的線程池大小,那么長時間不使用的線程資源就應(yīng)該銷毀掉,這樣就需要一個線程空閑時間的計數(shù)來描述線程何時被銷毀;
- 前面描述過線程池也是有生命周期的,因此需要有一個狀態(tài)來描述線程池當前的運行狀態(tài);
- 線程池的任務(wù)隊列如果有邊界,那么就需要有一個任務(wù)拒絕策略來處理過多的任務(wù),同時在線程池的銷毀階段也需要有一個任務(wù)拒絕策略來處理新加入的任務(wù);
- 上面種的線程池大小、線程空閑實際那、線程池運行狀態(tài)等等狀態(tài)改變都不是線程安全的,因此需要有一個全局的鎖(mainLock)來協(xié)調(diào)這些競爭資源;
- 除了以上數(shù)據(jù)結(jié)構(gòu)以外,ThreadPoolExecutor還有一些狀態(tài)用來描述線程池的運行計數(shù),例如線程池運行的任務(wù)數(shù)、曾經(jīng)達到的最大線程數(shù),主要用于調(diào)試和性能分析。
?
對于ThreadPoolExecutor而言,一個線程就是一個Worker對象,它與一個線程綁定,當Worker執(zhí)行完畢就是線程執(zhí)行完畢,這個在后面詳細討論線程池中線程的運行方式。
既然是線程池,那么就首先研究下線程的構(gòu)造方法。
?
public interface ThreadFactory {
Thread newThread(Runnable r);
}
?
?
ThreadPoolExecutor使用一個線程工廠來構(gòu)造線程。線程池都是提交一個任務(wù)Runnable,然后在某一個線程Thread中執(zhí)行,ThreadFactory 負責如何創(chuàng)建一個新線程。
在J.U.C中有一個通用的線程工廠java.util.concurrent.Executors.DefaultThreadFactory,它的構(gòu)造方式如下:
?
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
?
?
在這個線程工廠中,同一個線程池的所有線程屬于同一個線程組,也就是創(chuàng)建線程池的那個線程組,同時線程池的名稱都是“pool-<poolNum>-thread-<threadNum>”,其中poolNum是線程池的數(shù)量序號,threadNum是此線程池中的線程數(shù)量序號。這樣如果使用jstack的話很容易就看到了系統(tǒng)中線程池的數(shù)量和線程池中線程的數(shù)量。另外對于線程池中的所有線程默認都轉(zhuǎn)換為非后臺線程,這樣主線程退出時不會直接退出JVM,而是等待線程池結(jié)束。還有一點就是默認將線程池中的所有線程都調(diào)為同一個級別,這樣在操作系統(tǒng)角度來看所有系統(tǒng)都是公平的,不會導(dǎo)致競爭堆積。
?
二、線程池中線程生命周期
?
一個線程Worker被構(gòu)造出來以后就開始處于運行狀態(tài)。以下是一個線程執(zhí)行的簡版邏輯。
?
private final class Worker implements Runnable {
private final ReentrantLock runLock = new ReentrantLock();
private Runnable firstTask;
Thread thread;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
task.run();
} finally {
runLock.unlock();
}
}
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
}
?
?
當提交一個任務(wù)時,如果需要創(chuàng)建一個線程(何時需要在下一節(jié)中探討)時,就調(diào)用線程工廠創(chuàng)建一個線程,同時將線程綁定到Worker工作隊列中。需要說明的是,Worker隊列構(gòu)造的時候帶著一個任務(wù)Runnable,因此Worker創(chuàng)建時總是綁定著一個待執(zhí)行任務(wù)。換句話說,創(chuàng)建線程的前提是有必要創(chuàng)建線程(任務(wù)數(shù)已經(jīng)超出了線程或者強制創(chuàng)建新的線程,至于為何強制創(chuàng)建新的線程后面章節(jié)會具體分析),不會無緣無故創(chuàng)建一堆空閑線程等著任務(wù)。這是節(jié)省資源的一種方式。
一旦線程池啟動線程后(調(diào)用線程run())方法,那么線程工作隊列Worker就從第1個任務(wù)開始執(zhí)行(這時候發(fā)現(xiàn)構(gòu)造Worker時傳遞一個任務(wù)的好處了),一旦第1個任務(wù)執(zhí)行完畢,就從線程池的任務(wù)隊列中取出下一個任務(wù)進行執(zhí)行。循環(huán)如此,直到線程池被關(guān)閉或者任務(wù)拋出了一個RuntimeException。
由此可見,線程池的基本原理其實也很簡單,無非預(yù)先啟動一些線程,線程進入死循環(huán)狀態(tài),每次從任務(wù)隊列中獲取一個任務(wù)進行執(zhí)行,直到線程池被關(guān)閉。如果某個線程因為執(zhí)行某個任務(wù)發(fā)生異常而終止,那么重新創(chuàng)建一個新的線程而已。如此反復(fù)。
其實,線程池原理看起來簡單,但是復(fù)雜的是各種策略,例如何時該啟動一個線程,何時該終止、掛起、喚醒一個線程,任務(wù)隊列的阻塞與超時,線程池的生命周期以及任務(wù)拒絕策略等等。
?
三、線程池任務(wù)執(zhí)行流程
?
我們從一個API開始接觸Executor是如何處理任務(wù)隊列的。
java.util.concurrent.Executor.execute(Runnable)
Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current?RejectedExecutionHandler.
線程池中所有任務(wù)執(zhí)行都依賴于此接口。這段話有以下幾個意思:
回答上面兩個“可能“。任務(wù)可能被執(zhí)行,那不可能的情況就是上面說的情況3;可能不是立即執(zhí)行,是因為任務(wù)可能還在隊列中排隊,因此還在等待分配線程執(zhí)行。了解完了字面上的問題,我們再來看具體的實現(xiàn)。
?
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
 ?
?
這一段代碼看起來挺簡單的,其實這就是線程池最重要的一部分,如果能夠完全理解這一塊,線程池還是挺容易的。整個執(zhí)行流程是這樣的:
文字描述步驟不夠簡單?下面圖形詳細表述了此過程。
老實說這個圖比上面步驟更難以理解,那么從何入手呢。
流程的入口很簡單,我們就是要執(zhí)行一個任務(wù)(Runnable command),那么它的結(jié)束點在哪或者有哪幾個?
根據(jù)左邊這個圖我們知道可能有以下幾種出口:
(1)圖中的P1、P7,我們根據(jù)這條路徑可以看到,僅僅是將任務(wù)加入任務(wù)隊列(offer(command))了;
(2)圖中的P3,這條路徑不將任務(wù)加入任務(wù)隊列,但是啟動了一個新工作線程(Worker)進行掃尾操作,用戶處理為空的任務(wù)隊列;
(3)圖中的P4,這條路徑?jīng)]有將任務(wù)加入任務(wù)隊列,但是啟動了一個新工作線程(Worker),并且工作現(xiàn)場的第一個任務(wù)就是當前任務(wù);
(4)圖中的P5、P6,這條路徑?jīng)]有將任務(wù)加入任務(wù)隊列,也沒有啟動工作線程,僅僅是拋給了任務(wù)拒絕策略。P2是任務(wù)加入了任務(wù)隊列卻因為線程池已經(jīng)關(guān)閉于是又從任務(wù)隊列中刪除,并且拋給了拒絕策略。
如果上面的解釋還不清楚,可以去研究下面兩段代碼:
?
java.util.concurrent.ThreadPoolExecutor.addIfUnderCorePoolSize(Runnable)
java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(Runnable)
java.util.concurrent.ThreadPoolExecutor.ensureQueuedTaskHandled(Runnable)
 ?
?
那么什么時候一個任務(wù)被立即執(zhí)行呢?
在線程池運行狀態(tài)下,如果線程池大小 小于 核心線程池大小或者線程池已滿(任務(wù)隊列已滿)并且線程池大小 小于 最大線程池大小(此時線程池大小 大于 核心線程池大小的),用程序描述為:
?
runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())?
?
上面的條件就是一個任務(wù)能夠被立即執(zhí)行的條件。
有了execute的基礎(chǔ),我們看看ExecutorService中的幾個submit方法的實現(xiàn)。
?
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
 ?
?
很簡單,不是么?對于一個線程池來說復(fù)雜的地方也就在execute方法的執(zhí)行流程。在下一節(jié)中我們來討論下如何獲取任務(wù)的執(zhí)行結(jié)果,也就是Future類的使用和原理。
?
四、線程池任務(wù)執(zhí)行結(jié)果
?
這一節(jié)來探討下線程池中任務(wù)執(zhí)行的結(jié)果以及如何阻塞線程、取消任務(wù)等等。
?
package info.imxylz.study.concurrency.future;
public class SleepForResultDemo implements Runnable {
static boolean result = false;
static void sleepWhile(long ms) {
try {
Thread.sleep(ms);
} catch (Exception e) {}
}
@Override
public void run() {
//do work
System.out.println("Hello, sleep a while.");
sleepWhile(2000L);
result = true;
}
public static void main(String[] args) {
SleepForResultDemo demo = new SleepForResultDemo();
Thread t = new Thread(demo);
t.start();
sleepWhile(3000L);
System.out.println(result);
}
}
 ?
?
在沒有線程池的時代里面,使用Thread.sleep(long)去獲取線程執(zhí)行完畢的場景很多。顯然這種方式很笨拙,他需要你事先知道任務(wù)可能的執(zhí)行時間,并且還會阻塞主線程,不管任務(wù)有沒有執(zhí)行完畢。
?
package info.imxylz.study.concurrency.future;
public class SleepLoopForResultDemo implements Runnable {
boolean result = false;
volatile boolean finished = false;
static void sleepWhile(long ms) {
try {
Thread.sleep(ms);
} catch (Exception e) {}
}
@Override
public void run() {
//do work
try {
System.out.println("Hello, sleep a while.");
sleepWhile(2000L);
result = true;
} finally {
finished = true;
}
}
public static void main(String[] args) {
SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
Thread t = new Thread(demo);
t.start();
while (!demo.finished) {
sleepWhile(10L);
}
System.out.println(demo.result);
}
}
 ?
?
使用volatile與while死循環(huán)的好處就是等待的時間可以稍微小一點,但是依然有CPU負載高并且阻塞主線程的問題。最簡單的降低CPU負載的方式就是使用Thread.join().
?
SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
Thread t = new Thread(demo);
t.start();
t.join();
System.out.println(demo.result);
 ?
?
顯然這也是一種不錯的方式,另外還有自己寫鎖使用wait/notify的方式。其實join()從本質(zhì)上講就是利用while和wait來實現(xiàn)的。
上面的方式中都存在一個問題,那就是會阻塞主線程并且任務(wù)不能被取消。為了解決這個問題,線程池中提供了一個Future接口。
在Future接口中提供了5個方法。
- V get() throws InterruptedException, ExecutionException: 等待計算完成,然后獲取其結(jié)果。
- V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException。最多等待為使計算完成所給定的時間之后,獲取其結(jié)果(如果結(jié)果可用)。
- boolean cancel(boolean mayInterruptIfRunning):試圖取消對此任務(wù)的執(zhí)行。
- boolean isCancelled():如果在任務(wù)正常完成前將其取消,則返回?true。
- boolean isDone():如果任務(wù)已完成,則返回?true。 可能由于正常終止、異常或取消而完成,在所有這些情況中,此方法都將返回true。
API看起來容易,來研究下異常吧。get()請求獲取一個結(jié)果會阻塞當前進程,并且可能拋出以下三種異常:
- InterruptedException:執(zhí)行任務(wù)的線程被中斷則會拋出此異常,此時不能知道任務(wù)是否執(zhí)行完畢,因此其結(jié)果是無用的,必須處理此異常。
- ExecutionException:任務(wù)執(zhí)行過程中(Runnable#run())方法可能拋出RuntimeException,如果提交的是一個java.util.concurrent.Callable<V>接口任務(wù),那么java.util.concurrent.Callable.call()方法有可能拋出任意異常。
- CancellationException:實際上get()方法還可能拋出一個CancellationException的RuntimeException,也就是任務(wù)被取消了但是依然去獲取結(jié)果。
對于get(long timeout, TimeUnit unit)而言,除了get()方法的異常外,由于有超時機制,因此還可能得到一個TimeoutException。
boolean cancel(boolean mayInterruptIfRunning)方法比較復(fù)雜,各種情況比較多:
來看看Future接口的實現(xiàn)類java.util.concurrent.FutureTask<V>具體是如何操作的。
在FutureTask中使用了一個AQS數(shù)據(jù)結(jié)構(gòu)來完成各種狀態(tài)以及加鎖、阻塞的實現(xiàn)。
在此AQS類java.util.concurrent.FutureTask.Sync中一個任務(wù)用4中狀態(tài):
初始情況下任務(wù)狀態(tài)state=0,任務(wù)執(zhí)行(innerRun)后狀態(tài)變?yōu)檫\行狀態(tài)RUNNING(state=1),執(zhí)行完畢后變成運行結(jié)束狀態(tài)RAN(state=2)。任務(wù)在初始狀態(tài)或者執(zhí)行狀態(tài)被取消后就變?yōu)闋顟B(tài)CANCELLED(state=4)。AQS最擅長無鎖情況下處理幾種簡單的狀態(tài)變更的。
?
void innerRun() {
if (!compareAndSetState(0, RUNNING))
return;
try {
runner = Thread.currentThread();
if (getState() == RUNNING) // recheck after setting thread
innerSet(callable.call());
else
releaseShared(0); // cancel
} catch (Throwable ex) {
innerSetException(ex);
}
}
 ?
?
執(zhí)行一個任務(wù)有四步:設(shè)置運行狀態(tài)、設(shè)置當前線程(AQS需要)、執(zhí)行任務(wù)(Runnable#run或者Callable#call)、設(shè)置執(zhí)行結(jié)果。這里也可以看到,一個任務(wù)只能執(zhí)行一次,因為執(zhí)行完畢后它的狀態(tài)不在為初始值0,要么為CANCELLED,要么為RAN。
取消一個任務(wù)(cancel)又是怎樣進行的呢?對比下前面取消任務(wù)的描述是不是很簡單,這里無非利用AQS的狀態(tài)來改變?nèi)蝿?wù)的執(zhí)行狀態(tài),最終達到放棄未啟動或者正在執(zhí)行的任務(wù)的目的。
?
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
r.interrupt();
}
releaseShared(0);
done();
return true;
}
 ?
?
到目前為止我們依然沒有說明到底是如何阻塞獲取一個結(jié)果的。下面四段代碼描述了這個過程。
?
V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
//AQS#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); //park current Thread for result
}
protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}
 ?
?
當調(diào)用Future#get()的時候嘗試去獲取一個共享變量。這就涉及到AQS的使用方式了。這里獲取一個共享變量的狀態(tài)是任務(wù)是否結(jié)束(innerIsDone()),也就是任務(wù)是否執(zhí)行完畢或者被取消。如果不滿足條件,那么在AQS中就會doAcquireSharedInterruptibly(arg)掛起當前線程,直到滿足條件。AQS前面講過,掛起線程使用的是LockSupport的park方式,因此性能消耗是很低的。
至于將Runnable接口轉(zhuǎn)換成Callable接口,java.util.concurrent.Executors.callable(Runnable, T)也提供了一個簡單實現(xiàn)。
?
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
 ?
?
五、延遲、周期性任務(wù)調(diào)度的實現(xiàn)
?
java.util.concurrent.ScheduledThreadPoolExecutor是默認的延遲、周期性任務(wù)調(diào)度的實現(xiàn)。
有了整個線程池的實現(xiàn),再回頭來看延遲、周期性任務(wù)調(diào)度的實現(xiàn)應(yīng)該就很簡單了,因為所謂的延遲、周期性任務(wù)調(diào)度,無非添加一系列有序的任務(wù)隊列,然后按照執(zhí)行順序的先后來處理整個任務(wù)隊列。如果是周期性任務(wù),那么在執(zhí)行完畢的時候加入下一個時間點的任務(wù)即可。
由此可見,ScheduledThreadPoolExecutor和ThreadPoolExecutor的唯一區(qū)別在于任務(wù)是有序(按照執(zhí)行時間順序)的,并且需要到達時間點(臨界點)才能執(zhí)行,并不是任務(wù)隊列中有任務(wù)就需要執(zhí)行的。也就是說唯一不同的就是任務(wù)隊列BlockingQueue<Runnable> workQueue不一樣。ScheduledThreadPoolExecutor的任務(wù)隊列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,它是基于java.util.concurrent.DelayQueue<RunnableScheduledFuture>隊列的實現(xiàn)。
DelayQueue是基于有序隊列PriorityQueue實現(xiàn)的。PriorityQueue?也叫優(yōu)先級隊列,按照自然順序?qū)υ剡M行排序,類似于TreeMap/Collections.sort一樣。
同樣是有序隊列,DelayQueue和PriorityQueue區(qū)別在什么地方?
由于DelayQueue在獲取元素時需要檢測元素是否“可用”,也就是任務(wù)是否達到“臨界點”(指定時間點),因此加入元素和移除元素會有一些額外的操作。
典型的,移除元素需要檢測元素是否達到“臨界點”,增加元素的時候如果有一個元素比“頭元素”更早達到臨界點,那么就需要通知任務(wù)隊列。因此這需要一個條件變量final Condition available 。
移除元素(出隊列)的過程是這樣的:
- 總是檢測隊列的頭元素(順序最小元素,也是最先達到臨界點的元素)
- 檢測頭元素與當前時間的差,如果大于0,表示還未到底臨界點,因此等待響應(yīng)時間(使用條件變量available)
- 如果小于或者等于0,說明已經(jīng)到底臨界點或者已經(jīng)過了臨界點,那么就移除頭元素,并且喚醒其它等待任務(wù)隊列的線程。
-  public E take() throws InterruptedException { 
-  final ReentrantLock lock = this.lock; 
-  lock.lockInterruptibly(); 
-  try { 
-  for (;;) { 
-  E first = q.peek(); 
-  if (first == null) { 
-  available.await(); 
-  } else { 
-  long delay = first.getDelay(TimeUnit.NANOSECONDS); 
-  if (delay > 0) { 
-  long tl = available.awaitNanos(delay); 
-  } else { 
-  E x = q.poll(); 
-  assert x != null; 
-  if (q.size() != 0) 
-  available.signalAll(); // wake up other takers 
-  return x; 
- ?
-  } 
-  } 
-  } 
-  } finally { 
-  lock.unlock(); 
-  } 
-  } 
?
同樣加入元素也會有相應(yīng)的條件變量操作。當前僅當隊列為空或者要加入的元素比隊列中的頭元素還小的時候才需要喚醒“等待線程”去檢測元素。因為頭元素都沒有喚醒那么比頭元素更延遲的元素就更加不會喚醒。
?
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
 ?
?
有了任務(wù)隊列后再來看Future在ScheduledThreadPoolExecutor中是如何操作的。
java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask<V>是繼承java.util.concurrent.FutureTask<V>的,區(qū)別在于執(zhí)行任務(wù)是否是周期性的。
?
private void runPeriodic() {
boolean ok = ScheduledFutureTask.super.runAndReset();
boolean down = isShutdown();
// Reschedule if not cancelled and not shutdown or policy allows
if (ok && (!down ||
(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
!isStopped()))) {
long p = period;
if (p > 0)
time += p;
else
time = now() - p;
ScheduledThreadPoolExecutor.super.getQueue().add(this);
}
// This might have been the final executed delayed
// task. Wake up threads to check.
else if (down)
interruptIdleWorkers();
}
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
if (isPeriodic())
runPeriodic();
else
ScheduledFutureTask.super.run();
}
}
 ?
?
如果不是周期性任務(wù)調(diào)度,那么就和java.util.concurrent.FutureTask.Sync的調(diào)度方式是一樣的。如果是周期性任務(wù)(isPeriodic())那么就稍微有所不同的。
先從功能/結(jié)構(gòu)上分析下。第一種情況假設(shè)提交的任務(wù)每次執(zhí)行花費10s,間隔(delay/period)為20s,對于scheduleAtFixedRate而言,每次執(zhí)行開始時間20s,對于scheduleWithFixedDelay來說每次執(zhí)行開始時間30s。第二種情況假設(shè)提交的任務(wù)每次執(zhí)行時間花費20s,間隔(delay/period)為10s,對于scheduleAtFixedRate而言,每次執(zhí)行開始時間10s,對于scheduleWithFixedDelay來說每次執(zhí)行開始時間30s。(具體分析可以參考這里)
也就是說scheduleWithFixedDelay的執(zhí)行開始時間為(delay+cost),而對于scheduleAtFixedRate來說執(zhí)行開始時間為max(period,cost)。
回頭再來看上面源碼runPeriodic()就很容易了。但特別要提醒的,如果任務(wù)的任何一個執(zhí)行遇到異常,則后續(xù)執(zhí)行都會被取消,這從runPeriodic()就能看出。要強調(diào)的第二點就是同一個周期性任務(wù)不會被同時執(zhí)行。就比如說盡管上面第二種情況的scheduleAtFixedRate任務(wù)每隔10s執(zhí)行到達一個時間點,但是由于每次執(zhí)行時間花費為20s,因此每次執(zhí)行間隔為20s,只不過執(zhí)行的任務(wù)次數(shù)會多一點。但從本質(zhì)上講就是每隔20s執(zhí)行一次,如果任務(wù)隊列不取消的話。
為什么不會同時執(zhí)行?
這是因為ScheduledFutureTask執(zhí)行的時候會將任務(wù)從隊列中移除來,執(zhí)行完畢以后才會添加下一個同序列的任務(wù),因此任務(wù)隊列中其實最多只有同序列的任務(wù)的一份副本,所以永遠不會同時執(zhí)行(盡管要執(zhí)行的時間在過去)。
?
ScheduledThreadPoolExecutor使用一個無界(容量無限,整數(shù)的最大值)的容器(DelayedWorkQueue隊列),根據(jù)ThreadPoolExecutor的原理,只要當容器滿的時候才會啟動一個大于corePoolSize的線程數(shù)。因此實際上ScheduledThreadPoolExecutor是一個固定線程大小的線程池,固定大小為corePoolSize,構(gòu)造函數(shù)里面的Integer.MAX_VALUE其實是不生效的(盡管PriorityQueue使用數(shù)組實現(xiàn)有PriorityQueue大小限制,如果你的任務(wù)數(shù)超過了2147483647就會導(dǎo)致OutOfMemoryError,這個參考PriorityQueue的grow方法)。
?
再回頭看scheduleAtFixedRate等方法就容易多了。無非就是往任務(wù)隊列中添加一個未來某一時刻的ScheduledFutureTask任務(wù),如果是scheduleAtFixedRate那么period/delay就是正數(shù),如果是scheduleWithFixedDelay那么period/delay就是一個負數(shù),如果是0那么就是一次性任務(wù)。直接調(diào)用父類ThreadPoolExecutor的execute/submit等方法就相當于period/delay是0,并且initialDelay也是0。
?
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
if (initialDelay < 0) initialDelay = 0;
long triggerTime = now() + unit.toNanos(initialDelay);
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Object>(command,
null,
triggerTime,
unit.toNanos(period)));
delayedExecute(t);
return t;
}
 ?
?
另外需要補充說明的一點,前面說過java.util.concurrent.FutureTask.Sync任務(wù)只能執(zhí)行一次,那么在runPeriodic()里面怎么又將執(zhí)行過的任務(wù)加入隊列中呢?這是因為java.util.concurrent.FutureTask.Sync提供了一個innerRunAndReset()方法,此方法不僅執(zhí)行任務(wù)還將任務(wù)的狀態(tài)還原成0(初始狀態(tài))了,所以此任務(wù)就可以重復(fù)執(zhí)行。這就是為什么runPeriodic()里面調(diào)用runAndRest()的緣故。
?
boolean innerRunAndReset() {
if (!compareAndSetState(0, RUNNING))
return false;
try {
runner = Thread.currentThread();
if (getState() == RUNNING)
callable.call(); // don't set result
runner = null;
return compareAndSetState(RUNNING, 0);
} catch (Throwable ex) {
innerSetException(ex);
return false;
}
}
 ?
?
謝謝xylz的文章。
關(guān)于線程池由于時間原因,沒有好好整理。
?
內(nèi)容來源:
深入淺出 Java Concurrency (33): 線程池 part 6 線程池的實現(xiàn)及原理 (1)
http://www.blogjava.net/xylz/archive/2011/01/18/343183.html
深入淺出 Java Concurrency (33): 線程池 part 6 線程池的實現(xiàn)及原理 (2)
http://www.blogjava.net/xylz/archive/2011/02/11/344091.html
深入淺出 Java Concurrency (33): 線程池 part 6 線程池的實現(xiàn)及原理 (3)
http://www.blogjava.net/xylz/archive/2011/02/13/344207.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的Java多线程(十二)之线程池深入分析(下)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Java多线程(十一)之线程池深入分析(
- 下一篇: Java多线程(十)之Reentrant
