AbstractQueuedSynchronizer 源码分析
概述
Java的內置鎖一直都是備受爭議的,在JDK 1.6之前,synchronized這個重量級鎖其性能一直都是較為低下,雖然在1.6后,進行大量的鎖優化策略,但是與Lock相比synchronized還是存在一些缺陷的:雖然synchronized提供了便捷性的隱式獲取鎖釋放鎖機制(基于JVM機制),但是它卻缺少了獲取鎖與釋放鎖的可操作性,可中斷、超時獲取鎖,且它為獨占式在高并發場景下性能大打折扣。
AQS,AbstractQueuedSynchronizer,即隊列同步器。它是構建鎖或者其他同步組件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC并發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。它是JUC并發包中的核心基礎組件。
AQS解決了子類實現同步器時涉及當的大量細節問題,例如獲取同步狀態、FIFO同步隊列。基于AQS來構建同步器可以帶來很多好處。它不僅能夠極大地減少實現工作,而且也不必處理在多個位置上發生的競爭問題。
AQS的主要使用方式是繼承,子類通過繼承同步器并實現它的抽象方法來管理同步狀態。
AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操作,當然AQS可以確保對state的操作是安全的。
AQS通過內置的FIFO同步隊列來完成資源獲取線程的排隊工作,如果當前線程獲取同步狀態失敗(鎖)時,AQS則會將當前線程以及等待狀態等信息構造成一個節點(Node)并將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。
AQS可以實現獨占鎖和共享鎖,RenntrantLock實現的是獨占鎖,ReentrantReadWriteLock實現的是獨占鎖和共享鎖,CountDownLatch實現的是共享鎖。
下面我們通過源碼來分析下AQS的實現原理
AbstractQueuedSynchronizer類結構
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable {protected AbstractQueuedSynchronizer() { }//同步器隊列頭結點private transient volatile Node head;//同步器隊列尾結點private transient volatile Node tail;//同步狀態(打的那個state為0時,無鎖,當state>0時說明有鎖。)private volatile int state;//獲取鎖狀態protected final int getState() {return state;}//設置鎖狀態protected final void setState(int newState) {state = newState;}......通過AQS的類結構我們可以看到它內部有一個隊列和一個state的int變量。
隊列:通過一個雙向鏈表實現的隊列來存儲等待獲取鎖的線程。
state:鎖的狀態。
head、tail和state 都是volatile類型的變量,volatile可以保證多線程的內存可見性。
同步隊列的基本結構如下:
同步器隊列Node元素的類結構如下:
static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;//表示當前的線程被取消;static final int CANCELLED = 1;//表示當前節點的后繼節點包含的線程需要運行,也就是unpark;static final int SIGNAL = -1;//表示當前節點在等待condition,也就是在condition隊列中;static final int CONDITION = -2;//表示當前場景下后續的acquireShared能夠得以執行;static final int PROPAGATE = -3;//表示節點的狀態。默認為0,表示當前節點在sync隊列中,等待著獲取鎖。//其它幾個狀態為:CANCELLED、SIGNAL、CONDITION、PROPAGATEvolatile int waitStatus;//前驅節點volatile Node prev;//后繼節點volatile Node next;//獲取鎖的線程volatile Thread thread;//存儲condition隊列中的后繼節點。Node nextWaiter;...... }從Node結構prev和next節點可以看出它是一個雙向鏈表,waitStatus存儲了當前線程的狀態信息
waitStatus
1. CANCELLED,值為1,表示當前的線程被取消;
2. SIGNAL,值為-1,表示當前節點的后繼節點包含的線程需要運行,也就是unpark;
3. CONDITION,值為-2,表示當前節點在等待condition,也就是在condition隊列中;
4. PROPAGATE,值為-3,表示當前場景下后續的acquireShared能夠得以執行;
5. 值為0,表示當前節點在sync隊列中,等待著獲取鎖。
下面我們通過以下五個方面來介紹AQS是怎么實現的鎖的獲取和釋放的
1. 獨占式獲得鎖
2. 獨占式釋放鎖
3. 共享式獲得鎖
4. 共享式釋放鎖
5.獨占超時獲得鎖
1.獨占式獲得鎖
acquire方法代碼如下:
public final void acquire(int arg) {//嘗試獲得鎖,獲取不到則加入到隊列中等待獲取if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}addWaiter方法代碼如下:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;//將該節點添加到隊列尾部if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//如果前驅節點為null,則進入enq方法通過自旋方式入隊列enq(node);return node; }將構造的同步節點加入到同步隊列中
enq方法代碼如下:
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initialize//如果隊列為空,則通過CAS把當前Node設置成頭節點if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;//如果隊列不為空,則向隊列尾部添加Nodeif (compareAndSetTail(t, node)) {t.next = node;return t;}}}}該方法使用CAS自旋的方式來保證向隊列中添加Node(同步節點簡寫Node)
acquireQueued方法代碼如下:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //找到當前節點的前驅節點final Node p = node.predecessor(); //檢測p是否為頭節點,如果是,再次調用tryAcquire方法 if (p == head && tryAcquire(arg)) { //如果p節點是頭節點且tryAcquire方法返回true。那么將當前節點設置為頭節點。setHead(node); p.next = null; // help GC failed = false; return interrupted; } //如果p節點不是頭節點,或者tryAcquire返回false,說明請求失敗。 //那么首先需要判斷請求失敗后node節點是否應該被阻塞,如果應該 //被阻塞,那么阻塞node節點,并檢測中斷狀態。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //如果有中斷,設置中斷狀態。 interrupted = true; } } finally { if (failed) //最后檢測一下如果請求失敗(異常退出),取消請求。 cancelAcquire(node); } }在acquireQueued方法中,當前線程通過自旋的方式來嘗試獲取同步狀態,
1. 如果當前節點的前驅節點頭節點才能嘗試獲得鎖,如果獲得成功,則把當前線程設置成頭結點,把之前的頭結點從隊列中移除,等待垃圾回收(沒有對象引用)
2. 如果獲取鎖失敗則進入shouldParkAfterFailedAcquire方法中檢測當前節點是否可以被安全的掛起(阻塞),如果可以安全掛起則進入parkAndCheckInterrupt方法,把當前線程掛起,并檢查剛線程是否執行了interrupted方法。
通過上面的代碼我們可以發現AQS內部的同步隊列是FIFO的方式存取的。節點自旋獲取同步狀態的行為如下圖所示
shouldParkAfterFailedAcquire方法代碼如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//獲得前驅節點狀態int ws = pred.waitStatus;if (ws == Node.SIGNAL)//如果前驅節點狀態為SIGNAL,當前線程則可以阻塞。return true;if (ws > 0) {do {//判斷如果前驅節點狀態為CANCELLED,那就一直往前找,直到找到最近一個正常等待的狀態node.prev = pred = pred.prev;} while (pred.waitStatus > 0);//并將當前Node排在它的后邊。pred.next = node;} else {//如果前驅節點正常,則修改前驅節點狀態為SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}節點的狀態如下表:
| CANCELLED | 1 | 等待超時或者中斷,需要從同步隊列中取消 |
| SIGNAL | -1 | 后繼節點出于等待狀態,當前節點釋放鎖后將會喚醒后繼節點 |
| CONDITION | -2 | 節點在等待隊列中,節點線程等待在Condition上,其它線程對Condition調用signal()方法后,該節點將會從等待同步隊列中移到同步隊列中,然后等待獲取鎖。 |
| PROPAGATE | -3 | 表示下一次共享式同步狀態獲取將會無條件地傳播下去 |
| INITIAL | 0 | 初始狀態 |
1. 首先獲取前驅節點的狀態ws
2. 如果ws為SIGNAL則表示可以被前驅節點喚醒,當前線程就可以掛起,等待前驅節點喚醒,返回true(可以掛起)
3. 如果ws>0說明,前驅節點取消了,并循環查找此前驅節點之前所有連續取消的節點。并返回false(不能掛起)。
4. 嘗試將當前節點的前驅節點的等待狀態設為SIGNAL
parkAndCheckInterrupt方法代碼如下:
private final boolean parkAndCheckInterrupt() {//阻塞當前線程LockSupport.park(this);//判斷是否中斷來喚醒的return Thread.interrupted(); }2. 獨占式釋放鎖
release方法代碼如下:
public final boolean release(int arg) {//嘗試釋放鎖if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)//喚醒后繼節點unparkSuccessor(h);return true;}return false;}tryRelease(int arg) 方法應該由實現AQS的子類來實現具體的邏輯。
1. 首先通過tryRelease方法釋放鎖如果釋放鎖成功,執行第2步。
2. 通過調用unparkSuccessor() 方法來喚醒頭結點的后繼節點。該方法內部是通過LockSupport.unpark(s.thread);來喚醒后繼節點的。
3. 共享式獲得鎖
acquireShared方法代碼如下:
public final void acquireShared(int arg) {//嘗試獲取的鎖,如果獲取失敗執行doAcquireShared方法。if (tryAcquireShared(arg) < 0)doAcquireShared(arg); }tryAcquireShared()嘗試獲取鎖,如果獲取失敗則通過doAcquireShared()進入等待隊列,直到獲取到資源為止才返回。
這里tryAcquireShared()需要自定義同步器去實現。
AQS中規定:負值代表獲取失敗,非負數標識獲取成功。
doAcquireShared方法代碼如下:
private void doAcquireShared(int arg) {//構建共享Nodefinal Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {//獲取前驅節點final Node p = node.predecessor();//如果是頭節點進行嘗試獲得鎖if (p == head) {//如果返回值大于等于0,則說明獲得鎖int r = tryAcquireShared(arg);if (r >= 0) {//當前節點設置為隊列頭,并setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }在acquireQueued方法中,當前線程也通過自旋的方式來嘗試獲取同步狀態,同獨享式獲得鎖一樣
setHeadAndPropagate方法代碼如下:
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);//如果propagate >0,說明共享鎖還有可以進行獲得鎖,繼續喚醒下一個節點if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}設置當前節點為頭結點,并調用了doReleaseShared()方法,acquireShared方法最終調用了release方法,得看下為什么。原因其實也很簡單,shared模式下是允許多個線程持有一把鎖的,其中tryAcquire的返回值標志了是否允許其他線程繼續進入。如果允許的話,需要喚醒隊列中等待的線程。其中doReleaseShared方法的邏輯很簡單,就是喚醒后繼線程。
因此acquireShared的主要邏輯就是嘗試加鎖,如果允許其他線程繼續加鎖,那么喚醒后繼線程,如果失敗,那么入隊阻塞等待。
4. 共享式釋放鎖
releaseShared方法代碼如下:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; }tryReleaseShared(int arg) 方法應該由實現AQS的子類來實現具體的邏輯。
doReleaseShared方法代碼如下:
private void doReleaseShared() {for (;;) {// 獲取隊列的頭節點Node h = head;// 如果頭節點不為null,并且頭節點不等于tail節點。if (h != null && h != tail) {// 獲取頭節點對應的線程的狀態int ws = h.waitStatus;// 如果頭節點對應的線程是SIGNAL狀態,則意味著“頭節點的下一個節點所對應的線程”需要被unpark喚醒。if (ws == Node.SIGNAL) {// 設置“頭節點對應的線程狀態”為空狀態。失敗的話,則繼續循環。if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;// 喚醒“頭節點的下一個節點所對應的線程”。unparkSuccessor(h);}// 如果頭節點對應的線程是空狀態,則設置“尾節點對應的線程所擁有的共享鎖”為其它線程獲取鎖的空狀態。else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}// 如果頭節點發生變化,則繼續循環。否則,退出循環。if (h == head) // loop if head changedbreak;} }該方法主要是喚醒后繼節點。對于能夠支持多個線程同時訪問的并發組件(比如Semaphore),它和獨占式主要區別在于tryReleaseShared(int arg)方法必須確保同步狀態(或者資源數)線程安全釋放,一般是通過循環和CAS來保證的,因為釋放同步狀態的操作會同時來自多個線程。
5. 獨占超時獲得鎖
doAcquireNanos方法代碼如下:
private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;//計算出超時時間點final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}//計算剩余超時時間,超時時間點deadline減去當前時間點System.nanoTime()得到還應該睡眠的時間nanosTimeout = deadline - System.nanoTime();//如果超時,返回false,獲取鎖失敗if (nanosTimeout <= 0L)return false;//判斷是否需要阻塞當前線程//如果需要,在判斷當前剩余納秒數是否大于1000if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)//阻塞 nanosTimeout納秒數LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }該方法在自旋過程中,當節點的前驅節點為頭節點時嘗試獲取同步狀態,如果獲取成功則從該方法返回,這個過程和獨占式同步獲取的過程類似,但是在同步狀態獲取失敗的處理上有所不同。如果當前線程獲取同步狀態失敗,則首先重新計算超時間隔nanosTimeout,則判斷是否超時(nanosTimeout小于等于0表示已經超時),如果沒有超時,則使當前線程等待nanosTimeout納秒(當已到設置的超時時間,該線程會從LockSupport.parkNanos(Object blocker,long nanos)方法返回)。
如果nanosTimeout小于等于spinForTimeoutThreshold(1000納秒)時,將不會使該線程進行
超時等待,而是進入快速的自旋過程。原因在于,非常短的超時等待無法做到十分精確,如果
這時再進行超時等待,相反會讓nanosTimeout的超時從整體上表現得反而不精確。因此,在超
時非常短的場景下,同步器會進入無條件的快速自旋。
本人簡書blog地址:http://www.jianshu.com/u/1f0067e24ff8????
點擊這里快速進入簡書
總結
以上是生活随笔為你收集整理的AbstractQueuedSynchronizer 源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 并发编程基础知识点
- 下一篇: CountDownLatch 源码分析