AbstractQueuedSynchronizer 原理分析 - Condition 实现原理
1. 簡介
Condition是一個接口,AbstractQueuedSynchronizer 中的ConditionObject內部類實現了這個接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Object中的wait/notify/notifyAll等方法相似。這兩者相同的地方在于,它們所提供的等待/通知方法均是為了協同線程的運行秩序。只不過,Object 中的方法需要配合 synchronized 關鍵字使用,而 Condition 中的方法則要配合鎖對象使用,并通過newCondition方法獲取實現類對象。除此之外,Condition 接口中聲明的方法功能上更為豐富一些。比如,Condition 聲明了具有不響應中斷和超時功能的等待接口,這些都是 Object wait 方法所不具備的。
本篇文章是上一篇文章AbstractQueuedSynchronizer 原理分析 - 獨占/共享模式的續(xù)篇,在學習 Condition 的原理前,建議大家先去了解 AbstractQueuedSynchronizer 同步隊列相關原理。本篇文章會涉及到同步隊列相關知識,這些知識在上一篇文章分析過。
關于Condition的簡介這里先說到這,接下來分析一下Condition實現類ConditionObject的原理。
?2. 實現原理
ConditionObject是通過基于單鏈表的條件隊列來管理等待線程的。線程在調用await方法進行等待時,會釋放同步狀態(tài)。同時線程將會被封裝到一個等待節(jié)點中,并將節(jié)點置入條件隊列尾部進行等待。當有線程在獲取獨占鎖的情況下調用signal或singalAll方法時,隊列中的等待線程將會被喚醒,重新競爭鎖。另外,需要說明的是,一個鎖對象可同時創(chuàng)建多個 ConditionObject 對象,這意味著多個競爭同一獨占鎖的線程可在不同的條件隊列中進行等待。在喚醒時,可喚醒指定條件隊列中的線程。其大致示意圖如下:
以上就是 ConditionObject 所實現的等待/通知機制的大致原理,并不是很難理解。當然,在具體的實現中,則考慮的更為細致一些。相關細節(jié)將會在接下來一章中進行說明,繼續(xù)往下看吧。
?3. 源碼解析
?3.1 等待
ConditionObject 中實現了幾種不同的等待方法,每種方法均有它自己的特點。比如await()會響應中斷,而awaitUninterruptibly()則不響應中斷。await(long, TimeUnit)則會在響應中斷的基礎上,新增了超時功能。除此之外,還有一些等待方法,這里就不一一列舉了。
在本節(jié)中,我將主要分析await()的方法實現。其他的等待方法大同小異,就不一一分析了,有興趣的朋友可以自己看一下。好了,接下來進入源碼分析階段。
|| /*** await 是一個響應中斷的等待方法,主要邏輯流程如下:* 1. 如果線程中斷了,拋出 InterruptedException 異常* 2. 將線程封裝到節(jié)點對象里,并將節(jié)點添加到條件隊列尾部* 3. 保存并完全釋放同步狀態(tài),保存下來的同步狀態(tài)在重新競爭鎖時會用到* 4. 線程進入等待狀態(tài),直到被通知或中斷才會恢復運行* 5. 使用第3步保存的同步狀態(tài)去競爭獨占鎖*/ public final void await() throws InterruptedException {// 線程中斷,則拋出中斷異常,對應步驟1if (Thread.interrupted())throw new InterruptedException();// 添加等待節(jié)點到條件隊列尾部,對應步驟2Node node = addConditionWaiter();// 保存并完全釋放同步狀態(tài),對應步驟3。此方法的意義會在后面詳細說明。int savedState = fullyRelease(node);int interruptMode = 0;/** 判斷節(jié)點是否在同步隊列上,如果不在則阻塞線程。* 循環(huán)結束的條件:* 1. 其他線程調用 singal/singalAll,node 將會被轉移到同步隊列上。node 對應線程將* 會在獲取同步狀態(tài)的過程中被喚醒,并走出 while 循環(huán)。* 2. 線程在阻塞過程中產生中斷*/ while (!isOnSyncQueue(node)) {// 調用 LockSupport.park 阻塞當前線程,對應步驟4LockSupport.park(this);/** 檢測中斷模式,這里有兩種中斷模式,如下:* THROW_IE:* 中斷在 node 轉移到同步隊列“前”發(fā)生,需要當前線程自行將 node 轉移到同步隊* 列中,并在隨后拋出 InterruptedException 異常。* * REINTERRUPT:* 中斷在 node 轉移到同步隊列“期間”或“之后”發(fā)生,此時表明有線程正在調用 * singal/singalAll 轉移節(jié)點。在該種中斷模式下,再次設置線程的中斷狀態(tài)。* 向后傳遞中斷標志,由后續(xù)代碼去處理中斷。*/if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}/** 被轉移到同步隊列的節(jié)點 node 將在 acquireQueued 方法中重新獲取同步狀態(tài),注意這里* 的這里的 savedState 是上面調用 fullyRelease 所返回的值,與此對應,可以把這里的 * acquireQueued 作用理解為 fullyAcquire(并不存在這個方法)。* * 如果上面的 while 循環(huán)沒有產生中斷,則 interruptMode = 0。但 acquireQueued 方法* 可能會產生中斷,產生中斷時返回 true。這里仍將 interruptMode 設為 REINTERRUPT,* 目的是繼續(xù)向后傳遞中斷,acquireQueued 不會處理中斷。*/if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;/** 正常通過 singal/singalAll 轉移節(jié)點到同步隊列時,nextWaiter 引用會被置空。* 若發(fā)生線程產生中斷(THROW_IE)或 fullyRelease 方法出現錯誤等異常情況,* 該引用則不會被置空*/ if (node.nextWaiter != null) // clean up if cancelled// 清理等待狀態(tài)非 CONDITION 的節(jié)點unlinkCancelledWaiters();if (interruptMode != 0)/** 根據 interruptMode 覺得中斷的處理方式:* THROW_IE:拋出 InterruptedException 異常* REINTERRUPT:重新設置線程中斷標志*/ reportInterruptAfterWait(interruptMode); }/** 將當先線程封裝成節(jié)點,并將節(jié)點添加到條件隊列尾部 */ private Node addConditionWaiter() {Node t = lastWaiter;/** 清理等待狀態(tài)為 CANCELLED 的節(jié)點。fullyRelease 內部調用 release 發(fā)生異?;蜥尫磐綘? 態(tài)失敗時,節(jié)點的等待狀態(tài)會被設置為 CANCELLED。所以這里要清理一下已取消的節(jié)點*/if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 創(chuàng)建節(jié)點,并將節(jié)點置于隊列尾部Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node; }/** 清理等待狀態(tài)為 CANCELLED 的節(jié)點 */ private void unlinkCancelledWaiters() {Node t = firstWaiter;// 指向上一個等待狀態(tài)為非 CANCELLED 的節(jié)點Node trail = null;while (t != null) {Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;/** trail 為 null,表明 next 之前的節(jié)點等待狀態(tài)均為 CANCELLED,此時更新 * firstWaiter 引用的指向。* trail 不為 null,表明 next 之前有節(jié)點的等待狀態(tài)為 CONDITION,這時將 * trail.nextWaiter 指向 next 節(jié)點。*/if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;// next 為 null,表明遍歷到條件隊列尾部了,此時將 lastWaiter 指向 trailif (next == null)lastWaiter = trail;}else// t.waitStatus = Node.CONDITION,則將 trail 指向 ttrail = t;t = next;} }/*** 這個方法用于完全釋放同步狀態(tài)。這里解釋一下完全釋放的原因:為了避免死鎖的產生,鎖的實現上* 一般應該支持重入功能。對應的場景就是一個線程在不釋放鎖的情況下可以多次調用同一把鎖的 * lock 方法進行加鎖,且不會加鎖失敗,如失敗必然導致導致死鎖。鎖的實現類可通過 AQS 中的整型成員* 變量 state 記錄加鎖次數,每次加鎖,將 state++。每次 unlock 方法釋放鎖時,則將 state--,* 直至 state = 0,線程完全釋放鎖。用這種方式即可實現了鎖的重入功能。*/ final int fullyRelease(Node node) {boolean failed = true;try {// 獲取同步狀態(tài)數值int savedState = getState();// 調用 release 釋放指定數量的同步狀態(tài)if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {// 如果 relase 出現異?;蜥尫磐綘顟B(tài)失敗,此處將 node 的等待狀態(tài)設為 CANCELLEDif (failed)node.waitStatus = Node.CANCELLED;} }/** 該方法用于判斷節(jié)點 node 是否在同步隊列上 */ final boolean isOnSyncQueue(Node node) {/** 節(jié)點在同步隊列上時,其狀態(tài)可能為 0、SIGNAL、PROPAGATE 和 CANCELLED 其中之一,* 但不會為 CONDITION,所以可已通過節(jié)點的等待狀態(tài)來判斷節(jié)點所處的隊列。* * node.prev 僅會在節(jié)點獲取同步狀態(tài)后,調用 setHead 方法將自己設為頭結點時被置為 * null,所以只要節(jié)點在同步隊列上,node.prev 一定不會為 null*/if (node.waitStatus == Node.CONDITION || node.prev == null)return false;/** 如果節(jié)點后繼被為 null,則表明節(jié)點在同步隊列上。因為條件隊列使用的是 nextWaiter 指* 向后繼節(jié)點的,條件隊列上節(jié)點的 next 指針均為 null。但僅以 node.next != null 條* 件斷定節(jié)點在同步隊列是不充分的。節(jié)點在入隊過程中,是先設置 node.prev,后設置 * node.next。如果設置完 node.prev 后,線程被切換了,此時 node.next 仍然為 * null,但此時 node 確實已經在同步隊列上了,所以這里還需要進行后續(xù)的判斷。*/if (node.next != null)return true;// 在同步隊列上,從后向前查找 node 節(jié)點return findNodeFromTail(node); }/** 由于同步隊列上的的節(jié)點 prev 引用不會為空,所以這里從后向前查找 node 節(jié)點 */ private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;} }/** 檢測線程在等待期間是否發(fā)生了中斷 */ private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0; }/** * 判斷中斷發(fā)生的時機,分為兩種:* 1. 中斷在節(jié)點被轉移到同步隊列前發(fā)生,此時返回 true* 2. 中斷在節(jié)點被轉移到同步隊列期間或之后發(fā)生,此時返回 false*/ final boolean transferAfterCancelledWait(Node node) {// 中斷在節(jié)點被轉移到同步隊列前發(fā)生,此時自行將節(jié)點轉移到同步隊列上,并返回 trueif (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {// 調用 enq 將節(jié)點轉移到同步隊列中enq(node);return true;}/** 如果上面的條件分支失敗了,則表明已經有線程在調用 signal/signalAll 方法了,這兩個* 方法會先將節(jié)點等待狀態(tài)由 CONDITION 設置為 0 后,再調用 enq 方法轉移節(jié)點。下面判斷節(jié)* 點是否已經在同步隊列上的原因是,signal/signalAll 方法可能僅設置了等待狀態(tài),還沒* 來得及轉移節(jié)點就被切換走了。所以這里用自旋的方式判斷 signal/signalAll 是否已經完* 成了轉移操作。這種情況表明了中斷發(fā)生在節(jié)點被轉移到同步隊列期間。*/while (!isOnSyncQueue(node))Thread.yield();}// 中斷在節(jié)點被轉移到同步隊列期間或之后發(fā)生,返回 falsereturn false; }/*** 根據中斷類型做出相應的處理:* THROW_IE:拋出 InterruptedException 異常* REINTERRUPT:重新設置中斷標志,向后傳遞中斷*/ private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt(); }/** 中斷線程 */ static void selfInterrupt() {Thread.currentThread().interrupt(); } |
?3.2 通知
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | /** 將條件隊列中的頭結點轉移到同步隊列中 */ public final void signal() {// 檢查線程是否獲取了獨占鎖,未獲取獨占鎖調用 signal 方法是不允許的if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)// 將頭結點轉移到同步隊列中doSignal(first); }private void doSignal(Node first) {do {/** 將 firstWaiter 指向 first 節(jié)點的 nextWaiter 節(jié)點,while 循環(huán)將會用到更新后的 * firstWaiter 作為判斷條件。*/ if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;// 將頭結點從條件隊列中移除first.nextWaiter = null;/** 調用 transferForSignal 將節(jié)點轉移到同步隊列中,如果失敗,且 firstWaiter* 不為 null,則再次進行嘗試。transferForSignal 成功了,while 循環(huán)就結束了。*/} while (!transferForSignal(first) &&(first = firstWaiter) != null); }/** 這個方法用于將條件隊列中的節(jié)點轉移到同步隊列中 */ final boolean transferForSignal(Node node) {/** 如果將節(jié)點的等待狀態(tài)由 CONDITION 設為 0 失敗,則表明節(jié)點被取消。* 因為 transferForSignal 中不存在線程競爭的問題,所以下面的 CAS * 失敗的唯一原因是節(jié)點的等待狀態(tài)為 CANCELLED。*/ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 調用 enq 方法將 node 轉移到同步隊列中,并返回 node 的前驅節(jié)點 pNode p = enq(node);int ws = p.waitStatus;/** 如果前驅節(jié)點的等待狀態(tài) ws > 0,則表明前驅節(jié)點處于取消狀態(tài),此時應喚醒 node 對應的* 線程去獲取同步狀態(tài)。如果 ws <= 0,這里通過 CAS 將節(jié)點 p 的等待設為 SIGNAL。* 這樣,節(jié)點 p 在釋放同步狀態(tài)后,才會喚醒后繼節(jié)點 node。如果 CAS 設置失敗,則應立即* 喚醒 node 節(jié)點對應的線程。以免因 node 沒有被喚醒導致同步隊列掛掉。關于同步隊列的相關的* 知識,請參考我的另一篇文章“AbstractQueuedSynchronizer 原理分析 - 獨占/共享模式”,* 鏈接為:http://t.cn/RuERpHl*/if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true; } |
看完了 signal 方法的分析,下面再來看看 signalAll 的源碼分析,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public final void signalAll() {// 檢查線程是否獲取了獨占鎖if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first); }private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;/** 將條件隊列中所有的節(jié)點轉移到同步隊列中。與 doSignal 方法略有不同,主要區(qū)別在 * while 循環(huán)的循環(huán)條件上,下的循環(huán)只有在條件隊列中沒節(jié)點后才終止。*/ do {Node next = first.nextWaiter;// 將 first 節(jié)點從條件隊列中移除first.nextWaiter = null;// 轉移節(jié)點到同步隊列上transferForSignal(first);first = next; } while (first != null); } |
?4. 其他
在我閱讀 ConditionObject 源碼時發(fā)現了一個問題 - await 方法竟然沒有做同步控制。而在 signal 和 signalAll 方法開頭都會調用 isHeldExclusively 檢測線程是否已經獲取了獨占鎖,未獲取獨占鎖調用這兩個方法會拋出異常。但在 await 方法中,卻沒有進行相關的檢測。如果在正確的使用方式下調用 await 方法是不會出現問題的,所謂正確的使用方式指的是在獲取鎖的情況下調用 await 方法。但如果沒獲取鎖就調用該方法,就會產生線程競爭的情況,這將會對條件隊列的結構造成破壞。這里再來看一下新增節(jié)點的方法源碼,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | private Node addConditionWaiter() {Node t = lastWaiter;if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);// 存在競爭時將會導致節(jié)點入隊出錯if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node; } |
假如現在有線程 t1 和 t2,對應節(jié)點 node1 和 node2。線程 t1 獲取了鎖,而 t2 未獲取鎖,此時條件隊列為空,即 firstWaiter = lastWaiter = null。演繹一下會導致條件隊列被破壞的場景,如下:
如上,如果線程是按照上面的順序執(zhí)行,這會導致隊列被破壞。firstWaiter 本應該指向 node1,但結果卻指向了 node2,node1 被排擠出了隊列。這樣會導致什么問題呢?這樣可能會導致線程 t1 一直阻塞下去。因為 signal/signalAll 是從條件隊列頭部轉移節(jié)點的,但 node1 不在隊列中,所以 node1 無法被轉移到同步隊列上。在不出現中斷的情況下,node1 對應的線程 t1 會被永久阻塞住。
這里未對 await 方法進行同步控制,導致條件隊列出現問題,應該算 ConditionObject 實現上的一個缺陷了。關于這個缺陷,博客園博主?活在夢裡?在他的文章?AbstractQueuedSynchronizer源碼解讀–續(xù)篇之Condition?中也提到了。并向 JDK 開發(fā)者提了一個 BUG,BUG 鏈接為?JDK-8187408,有興趣的同學可以去看看。
?5. 總結
到這里,Condition 的原理就分析完了。分析完 Condition 原理,關于 AbstractQueuedSynchronizer 的分析也就結束了。總體來說,通過分析 AQS 并寫成博客,使我對 AQS 的原理有了更深刻的認識。AQS 是 JDK 中鎖和其他并發(fā)組件實現的基礎,弄懂 AQS 原理對后續(xù)在分析各種鎖和其他同步組件大有裨益。
AQS 本身實現比較復雜,要處理各種各樣的情況。作為類庫,AQS 要考慮和處理各種可能的情況,實現起來可謂非常復雜。不僅如此,AQS 還很好的封裝了同步隊列的管理,線程的阻塞與喚醒等基礎操作,大大降低了繼承類實現同步控制功能的復雜度。所以,在本文的最后,再次向 AQS 的作者,Java 大師Doug Lea致敬。
好了,本文到此結束,謝謝大家閱讀。
?參考
- AbstractQueuedSynchronizer源碼解讀–續(xù)篇之Condition
- 本文鏈接:?https://www.tianxiaobo.com/2018/05/04/AbstractQueuedSynchronizer-原理分析-Condition-實現原理/
from:?http://www.tianxiaobo.com/2018/05/04/AbstractQueuedSynchronizer-%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90-Condition-%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86/?
總結
以上是生活随笔為你收集整理的AbstractQueuedSynchronizer 原理分析 - Condition 实现原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: AbstractQueuedSynchr
- 下一篇: Java 重入锁 ReentrantLo