从串行线程封闭到对象池、线程池
今天講一個牛逼而實用的概念,串行線程封閉。對象池是串行線程封閉的典型應用場景;線程池糅合了對象池技術,但核心實現不依賴于對象池,很容易產生誤會。
本文從串行線程封閉和對象池入手,最后通過源碼分析線程池的核心原理,厘清對象池與線程池之間的誤會。
線程封閉與串行線程封閉
線程封閉
線程封閉是一種常見的線程安全設計策略:僅在固定的一個線程內訪問對象,不對其他線程共享。
使用線程封閉技術,對象O始終只對一個線程T1可見,“單線程”中自然不存在線程安全的問題。
ThreadLocal是常用的線程安全工具。線程封閉在Servlet及高層的web框架Spring等中應用不少。
https://monkeysayhi.github.io/2016/11/27/源碼%7CThreadLocal的實現原理/
串行線程封閉
線程封閉雖然好用,卻限制了對象的共享。串行線程封閉改進了這一點:對象O只能由單個線程T1擁有,但可以通過安全的發布對象O來轉移O的所有權;在轉移所有權后,也只有另一個線程T2能獲得這個O的所有權,并且發布O的T1不會再訪問O。
所謂“所有權”,指修改對象的權利。
相對于線程封閉,串行線程封閉使得任意時刻,最多僅有一個線程擁有對象的所有權。當然,這不是絕對的,只要線程T1事實不會再修改對象O,那么就相當于僅有T2擁有對象的所有權。串行線層封閉讓對象變得可以共享(雖然只能串行的擁有所有權),靈活性得到大大提高;相對的,要共享對象就涉及安全發布的問題,依靠BlockingQueue等同步工具很容易實現這一點。
對象池是串行線程封閉的經典應用場景,如數據庫連接池等。
對象池
對象池利用了串行封閉:將對象O“借給”一個請求線程T1,T1使用完再交還給對象池,并保證“未擅自發布該對象”且“以后不再使用”;對象池收回O后,等T2來借的時候再把它借給T2,完成對象所有權的傳遞。
猴子擼了一個簡化版的線程池,用戶只需要覆寫newObject()方法:
public abstract class AbstractObjectPool<T> {
? protected final int min;
? protected final int max;
? protected final List<T> usings = new LinkedList<>();
? protected final List<T> buffer = new LinkedList<>();
? private volatile boolean inited = false;
? public AbstractObjectPool(int min, int max) {
? ? this.min = min;
? ? this.max = max;
? ? if (this.min < 0 || this.min > this.max) {
? ? ? throw new IllegalArgumentException(String.format(
? ? ? ? ? "need 0 <= min <= max <= Integer.MAX_VALUE, given min: %s, max: %s", this.min, this.max));
? ? }
? }
? public void init() {
? ? for (int i = 0; i < min; i++) {
? ? ? buffer.add(newObject());
? ? }
? ? inited = true;
? }
? protected void checkInited() {
? ? if (!inited) {
? ? ? throw new IllegalStateException("not inited");
? ? }
? }
? abstract protected T newObject();
? public synchronized T getObject() {
? ? checkInited();
? ? if (usings.size() == max) {
? ? ? return null;
? ? }
? ? if (buffer.size() == 0) {
? ? ? T newObj = newObject();
? ? ? usings.add(newObj);
? ? ? return newObj;
? ? }
? ? T oldObj = buffer.remove(0);
? ? usings.add(oldObj);
? ? return oldObj;
? }
? public synchronized void freeObject(T obj) {
? ? checkInited();
? ? if (!usings.contains(obj)) {
? ? ? throw new IllegalArgumentException(String.format("obj not in using queue: %s", obj));
? ? }
? ? usings.remove(usings.indexOf(obj));
? ? buffer.add(obj);
? }
}
AbstractObjectPool具有以下特性:
支持設置最小、最大容量
對象一旦申請就不再釋放,避免了GC
雖然很簡單,但大可以用于一些時間敏感、資源充裕的場景。如果時間進一步敏感,可將getObject()、freeObject()改寫為并發程度更高的版本,但記得保證安全發布安全回收;如果資源不那么充裕,可以適當增加對象回收策略。
可以看到,一個對象池的基本行為包括:
創建對象newObject()
借取對象getObject()
歸還對象freeObject()
典型的對象池有各種連接池、常量池等,應用非常多,模型也大同小異,不做解析。令人迷惑的是線程池,很容易讓人誤以為線程池的核心原理也是對象池,下面來追一遍源碼。
線程池
首先擺出結論:線程池糅合了對象池模型,但核心原理是生產者-消費者模型。
繼承結構如下:
用戶可以將Runnable(或Callables)實例提交給線程池,線程池會異步執行該任務,返回響應的結果(完成/返回值)。
猴子最喜歡的是submit(Callable<T> task)方法。我們從該方法入手,逐步深入函數棧,探究線程池的實現原理。
submit()
submit()方法在ExecutorService接口中定義,AbstractExecutorService實現,ThreadPoolExecutor直接繼承。
public abstract class AbstractExecutorService implements ExecutorService {
...
? ? public <T> Future<T> submit(Callable<T> task) {
? ? ? ? if (task == null) throw new NullPointerException();
? ? ? ? RunnableFuture<T> ftask = newTaskFor(task);
? ? ? ? execute(ftask);
? ? ? ? return ftask;
? ? }
...
AbstractExecutorService#newTaskFor()創建一個RunnableFuture類型的FutureTask。
核心是execute()方法。
execute()
execute()方法在Executor接口中定義,ThreadPoolExecutor實現。
public class ThreadPoolExecutor extends AbstractExecutorService {
...
? ? public void execute(Runnable command) {
? ? ? ? if (command == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? int c = ctl.get();
? ? ? ? if (workerCountOf(c) < corePoolSize) {
? ? ? ? ? ? if (addWorker(command, true))
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? c = ctl.get();
? ? ? ? }
? ? ? ? if (isRunning(c) && workQueue.offer(command)) {
? ? ? ? ? ? int recheck = ctl.get();
? ? ? ? ? ? if (! isRunning(recheck) && remove(command))
? ? ? ? ? ? ? ? reject(command);
? ? ? ? ? ? else if (workerCountOf(recheck) == 0)
? ? ? ? ? ? ? ? addWorker(null, false);
? ? ? ? }
? ? ? ? else if (!addWorker(command, false))
? ? ? ? ? ? reject(command);
? ? }
...
}
我們暫且忽略線程池的池化策略。關注一個最簡單的場景,看能不能先回答一個問題:線程池中的任務如何執行?
核心是addWorker()方法。以8行的參數為例,此時,線程池中的線程數未達到最小線程池大小corePoolSize,通常可以直接在9行返回。
addWorker()
簡化如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
? ? private boolean addWorker(Runnable firstTask, boolean core) {
? ? ? ? boolean workerStarted = false;
? ? ? ? boolean workerAdded = false;
? ? ? ? Worker w = null;
? ? ? ? try {
? ? ? ? ? ? w = new Worker(firstTask);
? ? ? ? ? ? final Thread t = w.thread;
? ? ? ? ? ? if (t != null) {
? ? ? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;
? ? ? ? ? ? ? ? mainLock.lock();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? int rs = runStateOf(ctl.get());
? ? ? ? ? ? ? ? ? ? if (rs < SHUTDOWN) {
? ? ? ? ? ? ? ? ? ? ? ? workers.add(w);
? ? ? ? ? ? ? ? ? ? ? ? workerAdded = true;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? mainLock.unlock();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (workerAdded) {
? ? ? ? ? ? ? ? ? ? t.start();
? ? ? ? ? ? ? ? ? ? workerStarted = true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? if (! workerStarted)
? ? ? ? ? ? ? ? addWorkerFailed(w);
? ? ? ? }
? ? ? ? return workerStarted;
? ? }
...
}
我去掉了很多用于管理線程池、維護線程安全的代碼。假設線程池未關閉,worker(即w,下同)添加成功,則必然能夠將worker添加至workers中。workers是一個HashSet:
private final HashSet<Worker> workers = new HashSet<Worker>();
哪里是對象池?
如果說與對象池有關,那么workers即相當于示例代碼中的using,應用了對象池模型;只不過這里的using是一直增長的,直到達到最大線程池大小maximumPoolSize。
但是很明顯,線程池并沒有將線程發布出去,workers也僅僅完成using“保存線程”的功能。那么,線程池中的任務如何執行呢?跟線程池有沒有關系?
哪里又不是?
注意9、17、24行:
9行將我們提交到線程池的firstTask封裝入一個worker。
17行將worker加入workers,維護起來
24行則啟動了worker中的線程t
核心在與這三行,但線程池并沒有直接在addWorker()中啟動任務firstTask,代之以啟動一個worker。最終任務必然被啟動,那么我們繼續看Worker如何啟動這個任務。
Worker
Worker實現了Runnable接口:
private final class Worker
? ? extends AbstractQueuedSynchronizer
? ? implements Runnable
{
...
? ? Worker(Runnable firstTask) {
? ? ? ? setState(-1); // inhibit interrupts until runWorker
? ? ? ? this.firstTask = firstTask;
? ? ? ? this.thread = getThreadFactory().newThread(this);
? ? }
? ? /** Delegates main run loop to outer runWorker? */
? ? public void run() {
? ? ? ? runWorker(this);
? ? }
...
}
為什么要將構造Worker時的參數命名為firstTask?因為當且僅當需要建立新的Worker以執行任務task時,才會調用構造函數。因此,任務task對于新Worker而言,是第一個任務firstTask。
Worker的實現非常簡單:將自己作為Runable實例,構造時在內部創建并持有一個線程thread。Thread和Runable的使用大家很熟悉了,核心是Worker的run方法,它直接調用了runWorker()方法。
runWorker()
敲黑板!!!
重頭戲來了。簡化如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
? ? final void runWorker(Worker w) {
? ? ? ? Thread wt = Thread.currentThread();
? ? ? ? Runnable task = w.firstTask;
? ? ? ? w.firstTask = null;
? ? ? ? w.unlock(); // allow interrupts
? ? ? ? boolean completedAbruptly = true;
? ? ? ? try {
? ? ? ? ? ? while (task != null || (task = getTask()) != null) {
? ? ? ? ? ? ? ? w.lock();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? beforeExecute(wt, task);
? ? ? ? ? ? ? ? ? ? Throwable thrown = null;
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? task.run();
? ? ? ? ? ? ? ? ? ? } catch (RuntimeException x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
? ? ? ? ? ? ? ? ? ? } catch (Error x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
? ? ? ? ? ? ? ? ? ? } catch (Throwable x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw new Error(x);
? ? ? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? task = null;
? ? ? ? ? ? ? ? ? ? w.completedTasks++;
? ? ? ? ? ? ? ? ? ? w.unlock();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? completedAbruptly = false;
? ? ? ? } finally {
? ? ? ? ? ? processWorkerExit(w, completedAbruptly);
? ? ? ? }
? ? }
...
}
我們在前面將要執行的任務賦值給firstTask,5-6行首先取出任務task,并將firstTask置為null。因為接下來要執行task,firstTask字段就沒有用了。
重點是10-31行的while循環。下面分情況討論。
case1:第一次進入循環,task不為null
case1對應前面作出的諸多假設。
第一次進入循環時,task==firstTask,不為null,使10行布爾短路直接進入循環;從而16行執行的是firstTask的run()方法;異常處理不表;最后,finally代碼塊中,task會被置為null,導致下一輪循環會進入case2。
case2:非第一次進入循環,task為null
case2是更普遍的情況,也就是線程池的核心。
case1中,task被置為了null,使10行布爾表達式執行第二部分(task = getTask()) != null(getTask()稍后再講,它返回一個用戶已提交的任務)。假設task得到了一個已提交的任務,從而16行執行的是新獲得的任務task的run()方法。后同case1,最后task仍然會被置為null,以后循環都將進入case2。
GETTASK()
任務從哪來呢?簡化如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
? ? private Runnable getTask() {
? ? ? ? boolean timedOut = false;
? ? ? ? for (;;) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Runnable r = timed ?
? ? ? ? ? ? ? ? ? ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
? ? ? ? ? ? ? ? ? ? workQueue.take();
? ? ? ? ? ? ? ? if (r != null)
? ? ? ? ? ? ? ? ? ? return r;
? ? ? ? ? ? ? ? timedOut = true;
? ? ? ? ? ? } catch (InterruptedException retry) {
? ? ? ? ? ? ? ? timedOut = false;
? ? ? ? ? ? }
? ? ? ? }
? ? }
...
}
我們先看最簡單的,19-28行。
首先,workQueue是一個線程安全的BlockingQueue,大部分時候使用的實現類是LinkedBlockingQueue:
private final BlockingQueue<Runnable> workQueue;
假設timed為false,則調用阻塞的take()方法,返回的r一定不是null,從而12行退出,將任務交給了某個worker線程。
一個小細節有點意思:前面每個worker線程runWorker()方法時,在循環中加鎖粒度在worker級別,直接使用的lock同步;但因為每一個woker都會調用getTask(),考慮到性能因素,源碼中getTask()中使用樂觀的CAS+SPIN實現無鎖同步。
關于樂觀鎖和CAS,可以參考https://monkeysayhi.github.io/2017/10/22/源碼%7C并發一枝花之ConcurrentLinkedQueue【偽】/
workQueue中的元素從哪來呢?這就要回顧execute()方法了。
EXECUTE()
public class ThreadPoolExecutor extends AbstractExecutorService {
...
? ? public void execute(Runnable command) {
? ? ? ? if (command == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? int c = ctl.get();
? ? ? ? if (workerCountOf(c) < corePoolSize) {
? ? ? ? ? ? if (addWorker(command, true))
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? c = ctl.get();
? ? ? ? }
? ? ? ? if (isRunning(c) && workQueue.offer(command)) {
? ? ? ? ? ? int recheck = ctl.get();
? ? ? ? ? ? if (! isRunning(recheck) && remove(command))
? ? ? ? ? ? ? ? reject(command);
? ? ? ? ? ? else if (workerCountOf(recheck) == 0)
? ? ? ? ? ? ? ? addWorker(null, false);
? ? ? ? }
? ? ? ? else if (!addWorker(command, false))
? ? ? ? ? ? reject(command);
? ? }
...
}
前面以8行的參數為例,此時,線程池中的線程數未達到最小線程池大小corePoolSize,通常可以直接在9行返回。進入8行的條件是“當前worker數小于最小線程池大小corePoolSize”。
如果不滿足,會繼續執行到12行。isRunning(c)判斷線程池是否未關閉,我們關注未關閉的情況;則會繼續執行布爾表達式的第二部分workQueue.offer(command),嘗試將任務command放入隊列workQueue。
workQueue.offer()的行為取決于線程池持有的BlockingQueue實例。
Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor()創建的線程池使用LinkedBlockingQueue,而Executors.newCachedThreadPool()創建的線程池則使用SynchronousQueue。
以LinkedBlockingQueue為例,創建時不配置容量,即創建為無界隊列,則LinkedBlockingQueue#offer()永遠返回true,從而進入12-18行。
更細節的內容不必關心了,當workQueue.offer()返回true時,已經將任務command放入了隊列workQueue。當未來的某個時刻,某worker執行完某一個任務之后,會從workQueue中再取出一個任務繼續執行,直到線程池關閉,直到海枯石爛。
CachedThreadPool是一種無界線程池,使用SynchronousQueue能進一步提升性能,簡化代碼結構。留給讀者分析。
CASE2小結
可以看到,實際上,線程池的核心原理與對象池模型無關,而是生產者-消費者模型:
-
生產者(調用submit()或execute()方法)將任務task放入隊列
-
消費者(worker線程)循環從隊列中取出任務處理任務(執行task.run())
鉤子方法
回到runWorker()方法,在執行任務的過程中,線程池保留了一些鉤子方法,如beforeExecute()、afterExecute()。用戶可以在實現自己的線程池時,可以通過覆寫鉤子方法為線程池添加功能。
但猴子不認為鉤子方法是一種好的設計。因為鉤子方法大多依賴于源碼實現,那么除非了解源碼或API聲明絕對的嚴謹正確,否則很難正確使用鉤子方法。等發生錯誤時再去了解實現,可能就太晚了。說到底,還是不要使用類似extends這種表達“擴展”語義的語法來實現繼承,詳見Java中如何恰當的表達“繼承”與“擴展”的語義?。
當然,鉤子方法也是極其方便的。權衡看待。
總結
相對于線程封閉,串行線程封閉離用戶的距離更近一些,簡單靈活,實用性強,很容易掌握。而線程封閉更多淪為單純的設計策略,單純使用線程封閉的場景不多。
線程池與串行線程封閉、對象池的關系不大,但經常被混為一談;沒看過源碼的很難想到其實現方案,面試時也能立分高下。
線程池的實現很有意思。在追源碼之前,猴子一直以為線程池就是把線程存起來,用的時候取出來執行任務;看了源碼才知道實現如此之妙,簡潔優雅效率高。
源碼才是最好的老師。
來源:ImportNew
總結
以上是生活随笔為你收集整理的从串行线程封闭到对象池、线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用画小狗的方法来解释Java中的值传递
- 下一篇: 这本造价500万的“黑科技”日历,用37