线程池invokeAll方法详解
線程池invokeAll方法詳解
- 問題起源與抽象
- 問題排查與猜測
- 猜測一:invokeAll 在異步執行后會不會同步等待線程執行完畢獲取最終結果
- 猜測二:隊列里面可能存在第一次調用 invokeAll 執行了但沒有刪掉的任務,所以才會導致第二次放入隊列失敗
- 兩次猜測失敗后的總結
- 復查源碼,真相大白
- 問題解決方案
- 參考
線上真實案例,多次調用線程池 ThreadPoolExecutor 的 invokeAll() 方法進行數據統計時任務被拒絕,故事從此開始。
本文重在講述問題的產生、抽象、尋找解決方法的過程,并結合源碼對原因進行抽絲剝繭般的分析。bug 千千萬萬,唯有合理的邏輯推理思維才能讓這些 bug 顯露原形。
問題起源與抽象
先來看一段簡單的代碼,定義一個核心線程數5、有界隊列5的線程池,然后創建10個任務丟進去執行2次。
按照以前對線程池執行邏輯的理解,創建的10個線程,會先交給核心線程去執行,5個核心線程滿了之后,存放到隊列中,剛好存儲剩下的5個,按理說10個任務都會正常執行完畢。本次只測試固定大小的線程池。
public class InvokeAllTest {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,60 * 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),new MyThreadFactory());public static void main(String[] args) {List<Callable<Void>> tasks = new ArrayList<>();for (int i = 0; i < 10; i++) {tasks.add(new InvokeAllThread());}System.out.println("第一次任務執行前的executor: " + executor);try {executor.invokeAll(tasks);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一次任務執行完畢后的executor: " + executor);System.out.println("==============第一次任務執行完畢,開始第二次任務============");try {Thread.sleep(1000);executor.invokeAll(tasks);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二次任務執行完畢后的executor:" + executor);}// 任務執行線程。通過打印線程名稱,觀察提交的任務被哪個線程執行static class InvokeAllThread implements Callable<Void> {@Overridepublic Void call() throws Exception {System.out.println(Thread.currentThread().getName());return null;}}// 給工作線程自定義名字,方便觀察提交的任務被哪個線程執行static class MyThreadFactory implements ThreadFactory {private AtomicInteger threadNum = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, String.valueOf(threadNum.getAndIncrement()));if (thread.getPriority() != Thread.NORM_PRIORITY) {thread.setPriority(Thread.NORM_PRIORITY);}return thread;}}運行程序后發現,第一次調用 invokeAll 正常執行,第二次調用報錯。多次執行結果相同。
第一次任務執行前的executorjava.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 1 2 3 4 4 5 3 2 3 3 第一次任務執行完畢后的executorjava.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 10] ==============第一次任務執行完畢,開始第二次任務============ 2 4 5 2 1 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3a71f4dd rejected from java.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 5, active threads = 2, queued tasks = 0, completed tasks = 13]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:238)at com.aaron.hp.thread.pool.InvokeAllTest.main(InvokeAllTest.java:36)問題排查與猜測
既然程序出現異常,就該調用 debug 模式進行排查,并遵循"大膽猜測,小心求證"的態度,去解決這個問題。
猜測一:invokeAll 在異步執行后會不會同步等待線程執行完畢獲取最終結果
由于 invokeAll 封裝的太好,之前只知道最后會同步等待才能獲取返回值。那么現在就需要去證實這個概念。
進入 invokeAll 方法后,發現調用了f.get(),那么毫無疑問,這個猜測可以排除掉了。
其實從執行過程的輸出內容也可以看出,兩次調用 invokeAll 的執行順序和界限(打印語句) 非常明顯。
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException();ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());boolean done = false;try {for (Callable<T> t : tasks) {RunnableFuture<T> f = newTaskFor(t);futures.add(f);// 任務被添加后的具體執行execute(f);}for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {try {// 此處同步等待f.get();} catch (CancellationException ignore) {} catch (ExecutionException ignore) {}}}done = true;return futures;} finally {if (!done)for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);} }猜測二:隊列里面可能存在第一次調用 invokeAll 執行了但沒有刪掉的任務,所以才會導致第二次放入隊列失敗
由于未閱讀源碼,猜測只有當創建的任務執行完畢并且銷毀之后,才會從隊列中真正移除。
那么就需要查看入隊列和出隊列的時機。查看 invokeAll 方法中的 execute(f) 方法。
查看 ThreadPoolExecutor 類下的 execute 方法源碼:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 判斷工作線程數是否小于核心線程數,如果是則創建 Worker 工作線并返回if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 判斷主線程是否在運行,并判斷是否入隊列成功if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 否則重新創建 Worker 線程,創建失敗則拋出拒絕策略else if (!addWorker(command, false))reject(command); }此時就會發現入隊列的操作在workQueue.offer(command)處完成,而我們提交的任務是由一個叫 Worker 類的實例來執行,addWorker(command, true)創建 Worker 實例。
那么我們就分別進去這兩個方法來看下源碼:
矮油黑人問號臉。。沒想到這個 ThreadPoolExecutor 類的 addWorker 這么長,給核心代碼寫個注釋重點關注,掃一眼然后去看 offer 方法(英文注釋是源碼中自帶的)。前面都是校驗,創建核心線程處為new Worker(firstTask):
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 上面一堆都是校驗,此處才是 Worker 被創建的地方,注意被傳入的 firstTaskw = new Worker(firstTask);// 此處發現 Worker 里面居然還有個 therad 線程,不過想想也是,沒有線程怎么異步執行呢。點進 Worker 的構造方法看一眼就會發現,這個線程就是由我們自定義的 threadFactory 來創建的,所以核心線程名稱就是我們之前設定好的名字。this.thread = getThreadFactory().newThread(this);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// worker 實例成功創建后,讓它啟動起來t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted; }接著是 ArrayBlockingQueue 類的 offer 方法,在 enqueue(e)處進入隊列:
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {// 進入隊列enqueue(e);return true;}} finally {lock.unlock();} }此時我們先來調試一波,看看入隊列時這些方法的執行情況,在三個 if 處分別設置斷點,在 addWorker 和 offer 方法靠前的未知打斷點,確定是否會進入。
第一次調用 invokeAll:addWorker 進入5次,offer 方法進入5次。
第二次調用 invokeAll:addWorker 進入0次,offer 方法進入10次(可能是5-10次)。
那么發現了新的問題:程序居然沒報錯!正常執行完成!這不科學!
帶著疑惑,重新 debug,居然還沒報錯!難道之前的異常是偶然嗎?
以最快速度連按 F9 debug了幾次,有時候報錯。。
重新運行 run 了幾次,次次報錯。。
懷疑人生了。。
此時墨菲定律在我頭腦中回響,“偶然事件存在必然的因素”。那么大膽猜測,這個原因極有可能是隊列消費速度較慢導致的,去查看消費部分的源碼。由于 worker 也是一個線程,那么肯定有類似的 run 方法:
查看 ThreadPoolExecutor 類 的 Worker 這個內部類,找到 run() 方法:
public void run() {runWorker(this); }而 run 方法調用的是 ThreadPoolExecutor 類里的 runWorker(this):
final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 此處注意,將 worker 里存入的 firstTask 取出來,交給下面的 while 去執行Runnable task = w.firstTask;// 將 worker 里的 firstTask 屬性置空w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 如果 task 不為空,即取出的 firstTask 不為空,則執行;否則調用 getTask() 方法獲取 task 再執行while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 此處為空實現,可自定義beforeExecute(wt, task);Throwable thrown = null;try {// 調用 task 的 run 方法執行任務task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);} }查看 ThreadPoolExecutor 類下的 getTask() 方法:
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 此處為出隊列操作,poll 和 take 的區別在于,poll 會等待指定時間,而 take 是阻塞的,會一直等待Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }看到這里,猜測二也就不攻自破,出隊列后任務才會被執行,所以某個任務出隊列后,執行成功與否與隊列再無瓜葛。(注意這個說法只針對默認代碼,如果自定義了拒絕策略是可以將被 interrupt 的線程重新塞回隊列里的)
兩次猜測失敗后的總結
隊列是異步消費的,但入隊是同步進行的,如果隊列的容量不足以承載要存入隊列的任務數,就會被拒絕。(雖然是 ArrayBlockQueue 的特性,但這是通過 debug 以及 run 后觀察到的)
第一次 addWorker 方法執行了5次,offer 執行了5次;第二次則是 0 次,10 次。剛才忽略了這個細節,那么需要重新找到相應的源碼閱讀。
任務從隊列中移除與任務是否執行完畢無關,先移除,后執行。
我們創建的任務,是由 worker 核心線程去調用任務的 run 方法來同步執行的,而不是調用任務實例的 start 去異步執行,這也就是為什么 invokeAll 可以獲取到返回值的原因所在。
**備注:**這里有點繞,任務實例指的是我們最開始在 for 循環中創建的10個tasks new InvokeAllThread(),為什么繼承了 Callable 明明改寫的是 call()方法,但卻有 run()方法可以被調用呢?這是因為在 invokeAll()方法執行execute()方法前,通過RunnableFuture<T> f = newTaskFor(t);進行了包裝。
復查源碼,真相大白
查看 ThreadPoolExecutor 類下的 execute() 方法,創建 worker 前的判斷如下:
if (workerCountOf(c) < corePoolSize) { ...}第一次調用 invokeAll 時,線程池中的核心線程 worker 數為0,小于 corePoolSize,所以前5次會創建 worker 核心線程并返回,此時隨著 worker 的創建,我們創建的10個任務中的5個也會隨著 worker 的創建作為 firstTask 屬性被傳進去。后5個任務則被放入 queue 中。
第二次調用 invokeAll 時,線程池中的核心數已經是5,所以10個任務都會被放入 queue 中異步消費,但是我們的 queue 的容量為5。如果消費速度快于入隊速度(debug),那么10個任務會正常執行。但是入隊速度太快的話(run),前5個肯定可以入隊,后面的5個幾乎都會被拒絕。
問題解決方案
參考
ThreadPoolExecutor源碼分析及阻塞提交任務方法
Thread的中斷機制(interrupt)
總結
以上是生活随笔為你收集整理的线程池invokeAll方法详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 网络工程师知识点整理—第五章:无线通信网
- 下一篇: 【《Multimodal Transfo