AbstractQueuedSynchronizer 原理分析 - 独占/共享模式
1.簡介
AbstractQueuedSynchronizer (抽象隊列同步器,以下簡稱 AQS)出現在 JDK 1.5 中,由大師 Doug Lea 所創作。AQS 是很多同步器的基礎框架,比如 ReentrantLock、CountDownLatch 和 Semaphore 等都是基于 AQS 實現的。除此之外,我們還可以基于 AQS,定制出我們所需要的同步器。
AQS 的使用方式通常都是通過內部類繼承 AQS 實現同步功能,通過繼承 AQS,可以簡化同步器的實現。如前面所說,AQS 是很多同步器實現的基礎框架。弄懂 AQS 對理解 Java 并發包里的組件大有裨益,這也是我學習 AQS 并寫出這篇文章的緣由。另外,需要說明的是,AQS 本身并不是很好理解,細節很多。在看的過程中藥有一定的耐心,做好看多遍的準備。好了,其他的就不多說了,開始進入正題吧。
?2.原理概述
在 AQS 內部,通過維護一個FIFO 隊列來管理多線程的排隊工作。在公平競爭的情況下,無法獲取同步狀態的線程將會被封裝成一個節點,置于隊列尾部。入隊的線程將會通過自旋的方式獲取同步狀態,若在有限次的嘗試后,仍未獲取成功,線程則會被阻塞住。大致示意圖如下:
當頭結點釋放同步狀態后,且后繼節點對應的線程被阻塞,此時頭結點
線程將會去喚醒后繼節點線程。后繼節點線程恢復運行并獲取同步狀態后,會將舊的頭結點從隊列中移除,并將自己設為頭結點。大致示意圖如下:
?3.重要方法介紹
本節將介紹三組重要的方法,通過使用這三組方法即可實現一個同步組件。
第一組方法是用于訪問/設置同步狀態的,如下:
| int getState() | 獲取同步狀態 |
| void setState() | 設置同步狀態 |
| boolean compareAndSetState(int expect, int update) | 通過 CAS 設置同步狀態 |
第二組方需要由同步組件覆寫。如下:
| boolean tryAcquire(int arg) | 獨占式獲取同步狀態 |
| boolean tryRelease(int arg) | 獨占式釋放同步狀態 |
| int tryAcquireShared(int arg) | 共享式獲取同步狀態 |
| boolean tryReleaseShared(int arg) | 共享式私房同步狀態 |
| boolean isHeldExclusively() | 檢測當前線程是否獲取獨占鎖 |
第三組方法是一組模板方法,同步組件可直接調用。如下:
| void acquire(int arg) | 獨占式獲取同步狀態,該方法將會調用 tryAcquire 嘗試獲取同步狀態。獲取成功則返回,獲取失敗,線程進入同步隊列等待。 |
| void acquireInterruptibly(int arg) | 響應中斷版的 acquire |
| boolean tryAcquireNanos(int arg,long nanos) | 超時+響應中斷版的?acquire |
| void acquireShared(int arg) | 共享式獲取同步狀態,同一時刻可能會有多個線程獲得同步狀態。比如讀寫鎖的讀鎖就是就是調用這個方法獲取同步狀態的。 |
| void acquireSharedInterruptibly(int arg) | 響應中斷版的?acquireShared |
| boolean tryAcquireSharedNanos(int arg,long nanos) | 超時+響應中斷版的 acquireShared |
| boolean release(int arg) | 獨占式釋放同步狀態 |
| boolean releaseShared(int arg) | 共享式釋放同步狀態 |
上面列舉了一堆方法,看似繁雜。但稍微理一下,就會發現上面諸多方法無非就兩大類:一類是獨占式獲取和釋放共享狀態,另一類是共享式獲取和釋放同步狀態。至于這兩類方法的實現細節,我會在接下來的章節中講到,繼續往下看吧。
?4.源碼分析
?4.1 節點結構
在并發的情況下,AQS 會將未獲取同步狀態的線程將會封裝成節點,并將其放入同步隊列尾部。同步隊列中的節點除了要保存線程,還要保存等待狀態。不管是獨占式還是共享式,在獲取狀態失敗時都會用到節點類。所以這里我們要先看一下節點類的實現,為后面的源碼分析進行簡單鋪墊。源碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | static final class Node {/** 共享類型節點,標記節點在共享模式下等待 */static final Node SHARED = new Node();/** 獨占類型節點,標記節點在獨占模式下等待 */static final Node EXCLUSIVE = null;/** 等待狀態 - 取消 */static final int CANCELLED = 1;/** * 等待狀態 - 通知。某個節點是處于該狀態,當該節點釋放同步狀態后,* 會通知后繼節點線程,使之可以恢復運行 */static final int SIGNAL = -1;/** 等待狀態 - 條件等待。表明節點等待在 Condition 上 */static final int CONDITION = -2;/*** 等待狀態 - 傳播。表示無條件向后傳播喚醒動作,詳細分析請看第五章*/static final int PROPAGATE = -3;/*** 等待狀態,取值如下:* SIGNAL,* CANCELLED,* CONDITION,* PROPAGATE,* 0* * 初始情況下,waitStatus = 0*/volatile int waitStatus;/*** 前驅節點*/volatile Node prev;/*** 后繼節點*/volatile Node next;/*** 對應的線程*/volatile Thread thread;/*** 下一個等待節點,用在 ConditionObject 中*/Node nextWaiter;/*** 判斷節點是否是共享節點*/final boolean isShared() {return nextWaiter == SHARED;}/*** 獲取前驅節點*/final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // Used to establish initial head or SHARED marker}/** addWaiter 方法會調用該構造方法 */Node(Thread thread, Node mode) {this.nextWaiter = mode;this.thread = thread;}/** Condition 中會用到此構造方法 */Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;} } |
?4.2 獨占模式分析
?4.2.1 獲取同步狀態
獨占式獲取同步狀態時通過 acquire 進行的,下面來分析一下該方法的源碼。如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 | /*** 該方法將會調用子類復寫的 tryAcquire 方法獲取同步狀態,* - 獲取成功:直接返回* - 獲取失敗:將線程封裝在節點中,并將節點置于同步隊列尾部,* 通過自旋嘗試獲取同步狀態。如果在有限次內仍無法獲取同步狀態,* 該線程將會被 LockSupport.park 方法阻塞住,直到被前驅節點喚醒*/ public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }/** 向同步隊列尾部添加一個節點 */ private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// 嘗試以快速方式將節點添加到隊列尾部Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 快速插入節點失敗,調用 enq 方法,不停的嘗試插入節點enq(node);return node; }/*** 通過 CAS + 自旋的方式插入節點到隊尾*/ private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initialize// 設置頭結點,初始情況下,頭結點是一個空節點if (compareAndSetHead(new Node()))tail = head;} else {/** 將節點插入隊列尾部。這里是先將新節點的前驅設為尾節點,之后在嘗試將新節點設為尾節* 點,最后再將原尾節點的后繼節點指向新的尾節點。除了這種方式,我們還先設置尾節點,* 之后再設置前驅和后繼,即:* * if (compareAndSetTail(t, node)) {* node.prev = t;* t.next = node;* }* * 但但如果是這樣做,會導致一個問題,即短時內,隊列結構會遭到破壞。考慮這種情況,* 某個線程在調用 compareAndSetTail(t, node)成功后,該線程被 CPU 切換了。此時* 設置前驅和后繼的代碼還沒帶的及執行,但尾節點指針卻設置成功,導致隊列結構短時內會* 出現如下情況:** +------+ prev +-----+ +-----+* head | | <---- | | | | tail* | | ----> | | | |* +------+ next +-----+ +-----+** tail 節點完全脫離了隊列,這樣導致一些隊列遍歷代碼出錯。如果先設置* 前驅,在設置尾節點。及時線程被切換,隊列結構短時可能如下:** +------+ prev +-----+ prev +-----+* head | | <---- | | <---- | | tail* | | ----> | | | |* +------+ next +-----+ +-----+* * 這樣并不會影響從后向前遍歷,不會導致遍歷邏輯出錯。* * 參考:* https://www.cnblogs.com/micrari/p/6937995.html*/node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}} }/*** 同步隊列中的線程在此方法中以循環嘗試獲取同步狀態,在有限次的嘗試后,* 若仍未獲取鎖,線程將會被阻塞,直至被前驅節點的線程喚醒。*/ final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;// 循環獲取同步狀態for (;;) {final Node p = node.predecessor();/** 前驅節點如果是頭結點,表明前驅節點已經獲取了同步狀態。前驅節點釋放同步狀態后,* 在不出異常的情況下, tryAcquire(arg) 應返回 true。此時節點就成功獲取了同* 步狀態,并將自己設為頭節點,原頭節點出隊。*/ if (p == head && tryAcquire(arg)) {// 成功獲取同步狀態,設置自己為頭節點setHead(node);p.next = null; // help GCfailed = false;return interrupted;}/** 如果獲取同步狀態失敗,則根據條件判斷是否應該阻塞自己。* 如果不阻塞,CPU 就會處于忙等狀態,這樣會浪費 CPU 資源*/if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {/** 如果在獲取同步狀態中出現異常,failed = true,cancelAcquire 方法會被執行。* tryAcquire 需同步組件開發者覆寫,難免不了會出現異常。*/if (failed)cancelAcquire(node);} }/** 設置頭節點 */ private void setHead(Node node) {// 僅有一個線程可以成功獲取同步狀態,所以這里不需要進行同步控制head = node;node.thread = null;node.prev = null; }/*** 該方法主要用途是,當線程在獲取同步狀態失敗時,根據前驅節點的等待狀態,決定后續的動作。比如前驅* 節點等待狀態為 SIGNAL,表明當前節點線程應該被阻塞住了。不能老是嘗試,避免 CPU 忙等。* —————————————————————————————————————————————————————————————————* | 前驅節點等待狀態 | 相應動作 |* —————————————————————————————————————————————————————————————————* | SIGNAL | 阻塞 |* | CANCELLED | 向前遍歷, 移除前面所有為該狀態的節點 |* | waitStatus < 0 | 將前驅節點狀態設為 SIGNAL, 并再次嘗試獲取同步狀態 |* —————————————————————————————————————————————————————————————————*/ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;/* * 前驅節點等待狀態為 SIGNAL,表示當前線程應該被阻塞。* 線程阻塞后,會在前驅節點釋放同步狀態后被前驅節點線程喚醒*/if (ws == Node.SIGNAL)return true;/** 前驅節點等待狀態為 CANCELLED,則以前驅節點為起點向前遍歷,* 移除其他等待狀態為 CANCELLED 的節點。*/ if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** 等待狀態為 0 或 PROPAGATE,設置前驅節點等待狀態為 SIGNAL,* 并再次嘗試獲取同步狀態。*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; }private final boolean parkAndCheckInterrupt() {// 調用 LockSupport.park 阻塞自己LockSupport.park(this);return Thread.interrupted(); }/*** 取消獲取同步狀態*/ private void cancelAcquire(Node node) {if (node == null)return;node.thread = null;// 前驅節點等待狀態為 CANCELLED,則向前遍歷并移除其他為該狀態的節點Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;// 記錄 pred 的后繼節點,后面會用到Node predNext = pred.next;// 將當前節點等待狀態設為 CANCELLEDnode.waitStatus = Node.CANCELLED;/** 如果當前節點是尾節點,則通過 CAS 設置前驅節點 prev 為尾節點。設置成功后,再利用 CAS 將 * prev 的 next 引用置空,斷開與后繼節點的聯系,完成清理工作。*/ if (node == tail && compareAndSetTail(node, pred)) {/* * 執行到這里,表明 pred 節點被成功設為了尾節點,這里通過 CAS 將 pred 節點的后繼節點* 設為 null。注意這里的 CAS 即使失敗了,也沒關系。失敗了,表明 pred 的后繼節點更新* 了。pred 此時已經是尾節點了,若后繼節點被更新,則是有新節點入隊了。這種情況下,CAS * 會失敗,但失敗不會影響同步隊列的結構。*/compareAndSetNext(pred, predNext, null);} else {int ws;// 根據條件判斷是喚醒后繼節點,還是將前驅節點和后繼節點連接到一起if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;if (next != null && next.waitStatus <= 0)/** 這里使用 CAS 設置 pred 的 next,表明多個線程同時在取消,這里存在競爭。* 不過此處沒針對 compareAndSetNext 方法失敗后做一些處理,表明即使失敗了也* 沒關系。實際上,多個線程同時設置 pred 的 next 引用時,只要有一個能設置成* 功即可。*/compareAndSetNext(pred, predNext, next);} else {/** 喚醒后繼節點對應的線程。這里簡單講一下為什么要喚醒后繼線程,考慮下面一種情況:* head node1 node2 tail* ws=0 ws=1 ws=-1 ws=0* +------+ prev +-----+ prev +-----+ prev +-----+* | | <---- | | <---- | | <---- | | * | | ----> | | ----> | | ----> | |* +------+ next +-----+ next +-----+ next +-----+* * 頭結點初始狀態為 0,node1、node2 和 tail 節點依次入隊。node1 自旋過程中調用 * tryAcquire 出現異常,進入 cancelAcquire。head 節點此時等待狀態仍然是 0,它* 會認為后繼節點還在運行中,所它在釋放同步狀態后,不會去喚醒后繼等待狀態為非取消的* 節點 node2。如果 node1 再不喚醒 node2 的線程,該線程面臨無法被喚醒的情況。此* 時,整個同步隊列就回全部阻塞住。*/unparkSuccessor(node);}node.next = node; // help GC} }private void unparkSuccessor(Node node) {int ws = node.waitStatus;/** 通過 CAS 將等待狀態設為 0,讓后繼節點線程多一次* 嘗試獲取同步狀態的機會*/if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;/** 這里如果 s == null 處理,是不是表明 node 是尾節點?答案是不一定。原因之前在分析 * enq 方法時說過。這里再啰嗦一遍,新節點入隊時,隊列瞬時結構可能如下:* node1 node2* +------+ prev +-----+ prev +-----+* head | | <---- | | <---- | | tail* | | ----> | | | |* +------+ next +-----+ +-----+* * node2 節點為新入隊節點,此時 tail 已經指向了它,但 node1 后繼引用還未設置。* 這里 node1 就是 node 參數,s = node1.next = null,但此時 node1 并不是尾* 節點。所以這里不能從前向后遍歷同步隊列,應該從后向前。*/for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)// 喚醒 node 的后繼節點線程LockSupport.unpark(s.thread); } |
到這里,獨占式獲取同步狀態的分析就講完了。如果僅分析獲取同步狀態的大致流程,那么這個流程并不難。但若深入到細節之中,還是需要思考思考。這里對獨占式獲取同步狀態的大致流程做個總結,如下:
上面的步驟對應下面的流程圖:
上面流程圖參考自《Java并發編程》第128頁圖 5-5,這里進行了重新繪制,并做了一定的修改。
?4.2.2 釋放同步狀態
相對于獲取同步狀態,釋放同步狀態的過程則要簡單的多,這里簡單羅列一下步驟:
就兩個步驟,下面看一下源碼分析。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;/** 這里簡單列舉條件分支的可能性,如下:* 1. head = null* head 還未初始化。初始情況下,head = null,當第一個節點入隊后,head 會被初始* 為一個虛擬(dummy)節點。這里,如果還沒節點入隊就調用 release 釋放同步狀態,* 就會出現 h = null 的情況。* * 2. head != null && waitStatus = 0* 表明后繼節點對應的線程仍在運行中,不需要喚醒* * 3. head != null && waitStatus < 0* 后繼節點對應的線程可能被阻塞了,需要喚醒 */if (h != null && h.waitStatus != 0)// 喚醒后繼節點,上面分析過了,這里不再贅述unparkSuccessor(h);return true;}return false; } |
?4.3 共享模式分析
與獨占模式不同,共享模式下,同一時刻會有多個線程獲取共享同步狀態。共享模式是實現讀寫鎖中的讀鎖、CountDownLatch 和 Semaphore 等同步組件的基礎,搞懂了,再去理解一些共享同步組件就不難了。
?4.3.1 獲取同步狀態
共享類型的節點獲取共享同步狀態后,如果后繼節點也是共享類型節點,當前節點則會喚醒后繼節點。這樣,多個節點線程即可同時獲取共享同步狀態。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | public final void acquireShared(int arg) {// 嘗試獲取共享同步狀態,tryAcquireShared 返回的是整型if (tryAcquireShared(arg) < 0)doAcquireShared(arg); }private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;// 這里和前面一樣,也是通過有限次自旋的方式獲取同步狀態for (;;) {final Node p = node.predecessor();/** 前驅是頭結點,其類型可能是 EXCLUSIVE,也可能是 SHARED.* 如果是 EXCLUSIVE,線程無法獲取共享同步狀態。* 如果是 SHARED,線程則可獲取共享同步狀態。* 能不能獲取共享同步狀態要看 tryAcquireShared 具體的實現。比如多個線程競爭讀寫* 鎖的中的讀鎖時,均能成功獲取讀鎖。但多個線程同時競爭信號量時,可能就會有一部分線* 程因無法競爭到信號量資源而阻塞。*/ if (p == head) {// 嘗試獲取共享同步狀態int r = tryAcquireShared(arg);if (r >= 0) {// 設置頭結點,如果后繼節點是共享類型,喚醒后繼節點setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }/*** 這個方法做了兩件事情:* 1. 設置自身為頭結點* 2. 根據條件判斷是否要喚醒后繼節點*/ private void setHeadAndPropagate(Node node, int propagate) {Node h = head;// 設置頭結點setHead(node);/** 這個條件分支由 propagate > 0 和 h.waitStatus < 0 兩部分組成。* h.waitStatus < 0 時,waitStatus = SIGNAL 或 PROPAGATE。這里僅依賴* 條件 propagate > 0 判斷是否喚醒后繼節點是不充分的,至于原因請參考第五章*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;/** 節點 s 如果是共享類型節點,則應該喚醒該節點* 至于 s == null 的情況前面分析過,這里不在贅述。*/ if (s == null || s.isShared())doReleaseShared();} }/*** 該方法用于在 acquires/releases 存在競爭的情況下,確保喚醒動作向后傳播。*/ private void doReleaseShared() {/** 下面的循環在 head 節點存在后繼節點的情況下,做了兩件事情:* 1. 如果 head 節點等待狀態為 SIGNAL,則將 head 節點狀態設為 0,并喚醒后繼節點* 2. 如果 head 節點等待狀態為 0,則將 head 節點狀態設為 PROPAGATE,保證喚醒能夠正* 常傳播下去。關于 PROPAGATE 狀態的細節分析,后面會講到。*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}/* * ws = 0 的情況下,這里要嘗試將狀態從 0 設為 PROPAGATE,保證喚醒向后* 傳播。setHeadAndPropagate 在讀到 h.waitStatus < 0 時,可以繼續喚醒* 后面的節點。*/else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;} } |
到這里,共享模式下獲取同步狀態的邏輯就分析完了,不過我這里只做了簡單分析。相對于獨占式獲取同步狀態,共享式的情況更為復雜。獨占模式下,只有一個節點線程可以成功獲取同步狀態,也只有獲取已同步狀態節點線程才可以釋放同步狀態。但在共享模式下,多個共享節點線程可以同時獲得同步狀態,在一些線程獲取同步狀態的同時,可能還會有另外一些線程正在釋放同步狀態。所以,共享模式更為復雜。這里我的腦力跟不上了,沒法面面俱到的分析,見諒。
最后說一下共享模式下獲取同步狀態的大致流程,如下:
?4.3.2 釋放共享狀態
釋放共享狀態主要邏輯在 doReleaseShared 中,doReleaseShared 上節已經分析過,這里就不贅述了。共享節點線程在獲取同步狀態和釋放同步狀態時都會調用 doReleaseShared,所以 doReleaseShared 是多線程競爭集中的地方。
| 1 2 3 4 5 6 7 | public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; } |
?5.PROPAGATE 狀態存在的意義
AQS 的節點有幾種不同的狀態,這個在 4.1 節介紹過。在這幾個狀態中,PROPAGATE 的用途可能是最不好理解的。網上包括一些書籍關于該狀態的敘述基本都是一句帶過,也就是 PROPAGATE 字面意義,即向后傳播喚醒動作。至于怎么傳播,鮮有資料說明過。不過,好在最終我還是找到了一篇詳細敘述了 PROPAGATE 狀態的文章。在博客園上,博友?活在夢裡?在他的文章?AbstractQueuedSynchronizer源碼解讀?對 PROPAGATE,以及其他的一些細節進行了說明,很有深度。在欽佩之余,不由得感嘆作者思考的很深入。在征得他的同意后,我將在本節中引用他文章中對 PROPAGATE 狀態說明的部分,并進行一定的補充說明。這里感謝作者?活在夢裡?的精彩分享,若不參考他的文章,我的這篇文章內容會比較空洞。好了,其他的不多說了,繼續往下分析。
在本節中,將會說明兩個個問題,如下:
這兩個問題將會在下面兩節中分別進行說明。
?5.1 利用 PROPAGATE 傳播喚醒動作
PROPAGATE 狀態是用來傳播喚醒動作的,那么它是在哪里進行傳播的呢?答案是在setHeadAndPropagate方法中,這里再來看看 setHeadAndPropagate 方法的實現:
| 1 2 3 4 5 6 7 8 9 10 11 | private void setHeadAndPropagate(Node node, int propagate) {Node h = head;setHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();} } |
大家注意看 setHeadAndPropagate 方法中那個長長的判斷語句,其中有一個條件是h.waitStatus < 0,當 h.waitStatus = SIGNAL(-1) 或 PROPAGATE(-3) 是,這個條件就會成立。那么 PROPAGATE 狀態是在何時被設置的呢?答案是在doReleaseShared方法中,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {...}// 如果 ws = 0,則將 h 狀態設為 PROPAGATEelse if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}...} } |
再回到 setHeadAndPropagate 的實現,該方法既然引入了h.waitStatus < 0這個條件,就意味著僅靠條件propagate > 0判斷是否喚醒后繼節點線程的機制是不充分的。至于為啥不充分,請繼續往看下看。
?5.2 引入 PROPAGATE 所解決的問題
PROPAGATE 的引入是為了解決一個 BUG –?JDK-6801020,復現這個 BUG 的代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | import java.util.concurrent.Semaphore;public class TestSemaphore {private static Semaphore sem = new Semaphore(0);private static class Thread1 extends Thread {@Overridepublic void run() {sem.acquireUninterruptibly();}}private static class Thread2 extends Thread {@Overridepublic void run() {sem.release();}}public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10000000; i++) {Thread t1 = new Thread1();Thread t2 = new Thread1();Thread t3 = new Thread2();Thread t4 = new Thread2();t1.start();t2.start();t3.start();t4.start();t1.join();t2.join();t3.join();t4.join();System.out.println(i);}} } |
根據 BUG 的描述消息可知 JDK 6u11,6u17 兩個版本受到影響。那么,接下來再來看看引起這個 BUG 的代碼 –?JDK 6u17?中 setHeadAndPropagate 和 releaseShared 兩個方法源碼,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | private void setHeadAndPropagate(Node node, int propagate) {setHead(node);if (propagate > 0 && node.waitStatus != 0) {/** Don't bother fully figuring out successor. If it* looks null, call unparkSuccessor anyway to be safe.*/Node s = node.next;if (s == null || s.isShared())unparkSuccessor(node);} }// 和 release 方法的源碼基本一樣 public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; } |
下面來簡單說明 TestSemaphore 這個類的邏輯。這個類持有一個數值為 0 的信號量對象,并創建了4個線程,線程 t1 和 t2 用于獲取信號量,t3 和 t4 則是調用 release() 方法釋放信號量。在一般情況下,TestSemaphore 這個類的代碼都可以正常執行。但當有極端情況出現時,可能會導致同步隊列掛掉。這里演繹一下這個極端情況,考慮某次循環時,隊列結構如下:
下面再來看一下修復 BUG 后的代碼,根據 BUG 詳情頁顯示,該 BUG 在 JDK 1.7 中被修復。這里找一個 JDK 7 較早版本(JDK 7u10)的代碼看一下,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);if (propagate > 0 || h == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();} }public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; }private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;} } |
在按照上面的代碼演繹一下邏輯,如下:
到這里關于狀態 PROPAGATE 的內容就講完了。最后,簡單總結一下本章開頭提的兩個問題。
問題一:PROPAGATE 狀態用在哪里,以及怎樣向后傳播喚醒動作的?
答:PROPAGATE 狀態用在 setHeadAndPropagate。當頭節點狀態被設為 PROPAGATE 后,后繼節點成為新的頭結點后。若?propagate > 0?條件不成立,則根據條件h.waitStatus < 0成立與否,來決定是否喚醒后繼節點,即向后傳播喚醒動作。
問題二:引入 PROPAGATE 狀態是為了解決什么問題?
答:引入 PROPAGATE 狀態是為了解決并發釋放信號量所導致部分請求信號量的線程無法被喚醒的問題。
聲明:
本章內容是在博友?活在夢裡?的文章?AbstractQueuedSynchronizer源碼解讀?基礎上,進行了一定的補充說明。本章所參考的觀點已經過原作者同意,為避免抄襲嫌疑,特此聲明。
?6.總結
到這里,本文就差不多結束了。如果大家從頭看到尾,到這里就可以放松一下了。寫到這里,我也可以放松一下了。這篇文章總共花費了我12天的空閑時間,確實不容易。本來我只打算講一下基本原理,但知道后來看到本文多次推薦的那篇文章。那篇文章給我的第一感覺是,作者很厲害。第二感覺是,我也要寫出一篇較為深入的 AQS 分析文章。雖然寫出來也不能怎么樣,水平也不會因此提高多少,也不會去造個類似的輪子。但是寫完后,確實感覺很有成就感。本文的最后,來說一下如何學習 AQS 原理。AQS 的大致原理不是很難理解,所以一開始不建議糾結細節,應該先弄懂它的大致原理。在此基礎上,再去分析一些細節,分析細節時,要從多線程的角度去考慮。比如,有點地方 CAS 失敗后要重試,有的不用重試。總體來說 AQS 的大致原理容易理解,細節部分比較復雜。很多細節要在腦子里演繹一遍,好好思考才能想通,有點燒腦。另外因為文章篇幅的問題,關于 AQS ConditionObject 部分的分析將會放在下一篇文章中進行。
最后,再向 AQS 的作者 Doug Lea 致以崇高的敬意。僅盡力弄懂 AQS 的原理都很難了,可想而知,實現 AQS 的難度有多大。
限于本人的能力,加之深入分析 AQS 本身就比較有難度。所以文中難免會有錯誤出現,如果不慎翻車,請見諒。也歡迎在評論區指明這些錯誤,感謝。
?參考
- AbstractQueuedSynchronizer源碼解讀 - 活在夢裡
- 《Java并發編程的藝術》 - 方騰飛 / 魏鵬 / 程曉明
- 本文鏈接:?https://www.tianxiaobo.com/2018/05/01/AbstractQueuedSynchronizer-原理分析-獨占-共享模式/
from:?http://www.tianxiaobo.com/2018/05/01/AbstractQueuedSynchronizer-%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90-%E7%8B%AC%E5%8D%A0-%E5%85%B1%E4%BA%AB%E6%A8%A1%E5%BC%8F/?
總結
以上是生活随笔為你收集整理的AbstractQueuedSynchronizer 原理分析 - 独占/共享模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基于 Java NIO 实现简单的 HT
- 下一篇: AbstractQueuedSynchr