aqs java 简书,Java并发之AQS原理
一.總體框架
AQS是指AbstractQueuedSynchronizer。它是一個抽象類,java并發包里的ReentrantLock、CountDownLatch和Semaphroe等重要的工具類都是基于AQS來實現的。
總體來說,AQS維護了一個volatile的state變量代表共享資源,還有一個FIFO的等待隊列,在多線程爭奪資源被阻塞時會進入此隊列了。等待隊列是個雙向鏈表記錄則沒有獲取的執行許可的線程。等待隊列中的結點元素是AQS自定義的static的內部類Node。AQS支持共享和獨占兩種模式。ReentrantLock就是獨占型的,只有一個線程可以獲得到鎖并執行。CountDownLatch和Semaphore就是共享型,允許多個線程同時執行。
AQS是一個抽象類,并不能被直接實例化使用。它的作用是提供等待隊列的管理,包括如何入隊何時喚醒等。而具體的資源如何獲取和釋放等由具體的自定義同步器來實現。也就是說ReentrantLock等類自定義了資源(state)的獲取和釋放,而使用AQS的來管理阻塞隊列。不同的自定義資源獲取方式實現了CountDownLatch和Semaphore等類。
自定義同步方法需要實現的方法有:
isHeldExclusively() //返回該線程是否正在獨占資源,只有用的condition才需要去實現它
tryAcquire(int); //獨占方式,嘗試獲取資源,成功返回true,失敗返回false
tryRelease(int); //獨占方式,嘗試釋放資源,成功返回true,失敗返回false
tryAcquireShared(int); //共享方法,嘗試獲取資源。返回負數表示失敗,0表示成功,但沒有可用資源了,正數表示成功且有剩余資源
tryReleaseShared(int);//共享方式。嘗試釋放資源。如果釋放后運行喚醒后續結點返回true,否則返回false
這其中tryAcquire和tryRelease是一組,用于實現獨占資源的情況,如ReentrantLock;tryAcquireShared和tryReleaseShared是一組用于實現共享資源的情況,如CountDownLatch。
二.源碼分析
2.1 acquire方法源碼詳解
在AQS中一個重要的方法是acquire(int),這個方法實現請求資源和阻塞線程的功能。下面先貼一下它的源碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這個方法里只有一個if語句。首先執行tryAcquire(int)方法,前面說了這個方法需要子類來自定義,這里先看一下它的代碼:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
可以看到在AQS中tryAcquire方法直接拋出了異常。因為具體的獲取資源細節需要子類根據自己要實現的功能來寫,AQS只負責阻塞隊列的管理等工作。同時注意到這個方法并不是一個抽象的方法。其實前面說的需要子類實現的5個方法都不是抽象的,因為子類并不一定需要實現所有這些方法,這提供了一定的靈活性。
2.1.1 addWaiter方法詳解
先忙接著看acquire方法。在if語句里,如果tryAcquire返回true,那么acquire就返回了,說明成功獲取到了資源。如果tryAcquire返回false,if語句的前半句判斷就成立了,需要繼續執行&&右邊的acquireQueued方法,執行它之前先執行了addWaiter。先看一下addWaiter它的代碼:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
可以看到這個方法有一個Node參數。Node類便是aqs維護的FIFO隊列中的元素的類型。回顧一下acquire()方法的代碼,是將Node.EXCLUSIVE作為參數傳入了addWaiter。查看Node源碼發現有這么一句:
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
原來這是一個null值,用來表示獨占性線程。不管如何,先繼續看addWaiter的源碼吧。
第一句代碼:Node node = new Node(Thread.currentThread,mode);新建了一個表示當前線程的結點。剛才傳入的null作為模式傳給構造方法。進入對象構造方法查看:
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
繼續看addWaiter的后續代碼,發現是獲取了當前隊列的尾節點,并將新建結點的prev指針執行尾節點,再使用cas嘗試替換尾節點,如果成功,那么當前結點就成為新的尾節點,返回。
如果cas失敗或者當前tail為null,調用eng方法處理。下面看一下eng的代碼:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
熟悉AtomicInteger的朋友看到這段代碼一定會感動非常熟悉。這里就是使用了循環嘗試的方式來進行cas操作,指導成功為止。另外當tail==null時,先新建head結點再進行操作,當前這里給head變量反之也是使用了cas操作。
綜上,addWaiter()進行的操作就是安全地更新隊列的tail指針。
2.1.2 acquireQueue方法詳解
下面繼續看acquire()方法。再把代碼貼一次。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
給acqureQueued方法傳入的第一個參數是addWaiter方法的返回值,回想一下剛才的addWaiter方法,發現它的返回值是新創建的表示當前線程的Node結點。acquireQueued方法的另一個參數是acquire的形參arg,這個一般是獲取資源的個數,像ReentrantLock的lock方法就是調用了acquire(1)。下面看一下acquireQueued方法的源碼吧:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這個方法的主體是一個死循環,不斷測試兩件事:1.是否是頭結點的下一個節點,說明該輪到自己獲取資源了。2:是否可以休息了。判斷1成功后就用tryAcquire獲取資源,成功后設置當前結點為頭結點,返回。如果1判斷不成功則執行shouldParkAfterFailedAcquire方法,先貼一下代碼:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
這個方法的工作是找到當前結點之前的一個未取消的結點,將其waitStatue改為SIGNAL(-1)。這樣在該結點釋放資源時就會喚醒當前結點。
當shouldParkAfterFailedAcquire返回true之后,當前線程就可以去休息了——調用parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
這個方法使用了LockSupport的park方法,使線程進入waiting狀態。當其它線程調用unPark方法,或此線程被中斷后才會返回。
2.1.3小結
下面來總結一下acquire方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
先嘗試獲取資源,獲取到的情況直接返回。獲取不到將線程加入隊列:首先將tail指向表示當前線程的結點,使用CAS操作更新tail。之后執行acquireQueued方法,如果是當前隊列的第二個則再次嘗試獲取tryAcquire,成功后將自己設置為head(head表示已經獲取到的資源的結點)。不能獲取資源時判斷是否可以park(),判斷依據是其prev的結點的waitState是否是signal,即是否會在釋放資源時通知它。之后當前線程調用park進入waiting狀態。waitting結束時返回是否中斷標志,并重置標志。回到acquire,如果waitting期間中斷過,則調用selfInterrupt響應中斷。
2.2 release(int)
此方法是獨占模式下釋放資源的頂層方法。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
這里可以看出釋放資源成功時,獲取到head結點(因為head結點表示的線程就是當前獲取到資源的線程),執行unparkSuccessor()操作。這里便和shouldParkAfterFailedAcquire中‘休息’的代碼相呼應。如果那里設置了waitStatus為signal就會使用LockSupport.unpark方法來喚醒等待的線程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 如果后繼結點為null或等待狀態>0(當前結點被取消),則從后往前找到正在應該被喚醒的結點
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
對了,tryRelease方法也是具體的同步器來實現的。
2.3 其它方法
acquireShared(int)和releaseShared()方法是共享模式下獲取資源和釋放資源的方法。這里不再詳細展開了,請看參考資料里的文章。
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的aqs java 简书,Java并发之AQS原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 4 条生产线受损,苹果数据线供应商 Fo
- 下一篇: 苹果获得新扩展现实头显专利:前置摄像头系