【Android 异步操作】线程池 ( Worker 简介 | 线程池中的工作流程 runWorker | 从线程池任务队列中获取任务 getTask )
文章目錄
- 一、線程池中的 Worker ( 工作者 )
- 二、線程池中的工作流程 runWorker
- 三、線程池任務隊列中獲取任務 getTask
在博客 【Android 異步操作】線程池 ( 線程池 execute 方法源碼解析 ) 中 , 講解 線程池 ThreadPoolExecutor 的 execute 方法時 , 有兩個重要的核心方法 ;
兩個核心的操作 :
- 添加任務 : addWorker(command, true) , 第二個參數(shù)為 true 是添加核心線程任務 , 第二個參數(shù)為 false 是添加非核心線程任務 ;
- 拒絕任務 : reject(command)
在上一篇博客 【Android 異步操作】線程池 ( 線程池 reject 拒絕任務 | 線程池 addWorker 添加任務 ) 介紹了 addWorker 添加任務 , reject 拒絕任務 的源碼細節(jié) ;
本博客中介紹 Worker ( 工作者 ) 的相關源碼
一、線程池中的 Worker ( 工作者 )
工作者 Worker 主要 為線程執(zhí)行任務 , 維護終端控制狀態(tài) , 同時記錄其它信息 ;
該類擴展了 AbstractQueuedSynchronizer , 目的是 簡化 每個任務執(zhí)行時 獲取和釋放鎖的過程 ;
該操作可以防止中斷用于喚醒等待任務的工作線程 , 不會中斷一個正在運行的線程 ;
Worker 代碼及相關注釋說明 :
public class ThreadPoolExecutor extends AbstractExecutorService {/*** 工作者類主要為線程執(zhí)行任務 , 維護終端控制狀態(tài) , 同時記錄其它信息 ; * 該類擴展了 AbstractQueuedSynchronizer , 目的是簡化 每個任務執(zhí)行時 獲取和釋放鎖的過程 ; * 該操作可以防止中斷用于喚醒等待任務的工作線程 , 不會中斷一個正在運行的線程 ; */private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/*** 該類不會被序列化, 提供該常量用于支持 Java 文檔警告 .*/private static final long serialVersionUID = 6138294804551838833L;/** 該工作者運行的線程 , 如果線程工廠創(chuàng)建失敗 , 就為空. */final Thread thread;/** 線程初始任務 , 可能為空. */Runnable firstTask;/** 每個線程的任務計數(shù) */volatile long completedTasks;/*** 使用線程工廠 , 根據(jù)給定的初始任務 , 創(chuàng)建工作者*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 線程是在構造函數(shù)中 , 使用線程工廠創(chuàng)建的 this.thread = getThreadFactory().newThread(this);}/** 將主要的循環(huán)操作委托給了外部的 runWorker , 本博客下面有該方法的解析 . */public void run() {runWorker(this);}// 鎖相關方法 //// 0 代表未鎖定狀態(tài) .// 1 代表鎖定狀態(tài) .protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}} }二、線程池中的工作流程 runWorker
/*** Main worker run loop. Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don't need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters. Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and then we* ensure that unless pool is stopping, this thread does not have* its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to afterExecute.* We separately handle RuntimeException, Error (both of which the* specs guarantee that we trap) and arbitrary Throwables.* Because we cannot rethrow Throwables within Runnable.run, we* wrap them within Errors on the way out (to the thread's* UncaughtExceptionHandler). Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread's UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** @param w the worker*/final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 拿到了一個任務 Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 第一次循環(huán) task 不為空 , 直接就進入了循環(huán)體 // 第二次循環(huán) task 為空 , 此時執(zhí)行 || 后的邏輯 (task = getTask()) != null// 該邏輯中從線程池任務隊列中獲取任務 , 然后執(zhí)行該任務 // 此處一直循環(huán)讀取線程池任務隊列中的任務并執(zhí)行 while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {// 執(zhí)行任務 task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
三、線程池任務隊列中獲取任務 getTask
getTask 從 線程池 任務隊列中 獲取任務 , 該方法執(zhí)行 阻塞 或 定時等待 任務 , 具體執(zhí)行哪個需要根據(jù)當前的配置情況 ;
這里通過 線程數(shù) 判斷該線程是 核心線程 , 還是 非核心線程 ;
非核心線程 :
- 判定條件 : 如果當前執(zhí)行的線程 大于 核心線程數(shù) , 就是非核心線程
- 獲取方法 : 非核心線程 調(diào)用 poll 方法從任務隊列中取任務
- 線程回收 : 如果超過 keepAliveTime 時間還取不到任務 , 非核心線程 空閑時間 超過了一定時間 , 此時需要回收
核心線程 :
- 獲取方法 : 如果該線程是核心線程 , 那么就會調(diào)用 take 方法 , 而不是 poll 方法
- 阻塞方法 : take 方法是阻塞的
- 不會被回收 : 核心線程不會回收 , 非核心線程超過一定時間會被回收
如果出現(xiàn)下面 4 中情況 , 工作者必須退出 , 該方法返回 null :
- 工作者數(shù)量超過線程池個數(shù)
- 線程池停止
- 線程池關閉 , 任務隊列清空
- 該工作者等待時間超過空閑時間 , 需要被回收 ; 前提是該線程是非和核心線程 ;
getTask 相關源碼 :
/*** 執(zhí)行阻塞或定時等待任務 , 具體執(zhí)行哪個需要根據(jù)當前的配置情況 ; * * 如果出現(xiàn)下面 4 中情況 , 工作者必須退出 , 該方法返回 null : * 1 . 工作者數(shù)量超過線程池個數(shù) * 2 . 線程池停止* 3 . 線程池關閉 , 任務隊列清空 * 4 . 該工作者等待時間超過空閑時間 , 需要被回收 ; 前提是該線程是非和核心線程 ; ** @return 返回要執(zhí)行的任務 ; 如果返回空 , 說明該 工作者 Worker 必須退出 , 工作者計數(shù) -1*/private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?// 如果 wc 大于 核心線程數(shù) , 說明本線程是非核心線程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 這里進行了時間判斷 // 如果當前執(zhí)行的線程 大于 核心線程數(shù) , 就是非核心線程 // 調(diào)用 poll 方法從任務隊列中取任務, 如果超過 keepAliveTime 時間還取不到任務 , // 非核心線程 空閑時間 超過了一定時間 , 此時需要回收 // 如果該線程是核心線程 , 那么就會調(diào)用 take 方法 , 而不是 poll 方法// take 方法是阻塞的 // 因此核心線程不會回收 , 非核心線程超過一定時間會被回收 Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}總結
以上是生活随笔為你收集整理的【Android 异步操作】线程池 ( Worker 简介 | 线程池中的工作流程 runWorker | 从线程池任务队列中获取任务 getTask )的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Android 异步操作】线程池 (
- 下一篇: 【Android 异步操作】线程池 (