聊聊高并发(二十四)解析java.util.concurrent各个组件(六) 深入理解AQS(四)
最近整體過了下AQS的結構,也在網上看了一些講AQS的文章,大部分的文章都是泛泛而談。重新看了下AQS的代碼,把一些新的要點拿出來說一說。
?
AQS是一個管程,提供了一個基本的同步器的能力,包含了一個狀態,修改狀態的原子操作,以及同步線程的一系列操作。它是CLHLock的變種,CLHLock是一個基于隊列鎖的自旋鎖算法。AQS也采用了隊列來作為同步線程的結構,它維護了兩個隊列,一個是作為線程同步的同步隊列,另一個是基于Unsafe來進行阻塞/喚醒操作的條件隊列。所以理解隊列操作是理解AQS的關鍵。
1. 理解 head, tail引用
2. 理解 next, prev引用
3. 理解隊列節點何時入隊,何時出隊
?
關于head引用,需要記住的是
1.?head引用始終指向獲得了鎖的節點,它不會被取消。acquire操作成功就表示獲得了鎖,acquire過程中如果中斷,那么acquire就失敗了,這時候head就會指向下一個節點。
?
?* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
而獲得了鎖的之后,如果線程中斷了,那么就需要release來釋放head節點。如果線程中斷了不釋放鎖,就有可能造成問題。所以使用顯式鎖時,必須要在finally里面釋放鎖
?
?Lock lock = new ReentrantLock();
lock.lock();
try{
// 如果中斷,可以處理獲得拋出,要保證在finally里面釋放鎖
}finally{
lock.unlock();
}
再來看看獲得鎖時對head引用的處理,只有節點的前驅節點是head時,它才有可能獲得鎖,而獲得鎖之后,要把自己設置為head節點,同時把老的head的next設置為null。
這里有幾層含義:
1. 始終從head節點開始獲得鎖
2. 新的線程獲得鎖之后,之前獲得鎖的節點從隊列中出隊
3. 一旦獲得了鎖,acquire方法肯定返回,這個過程中不會被中斷
?
?final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
?
關于tail引用,它負責無鎖地實現一個鏈式結構,采用CAS + 輪詢的方式。節點的入隊操作都是在tail節點
?
?private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
?
next引用在隊列中扮演了很重要的作用,它出現的頻率很高。關于next引用,它有幾種值的情況
1. next = null
2. next指向非null的下一個節點
3. next = 節點自己
?
next = null的情況有三種
1. 隊尾節點,隊尾節點的next沒有顯式地設置,所以為null
2. 隊尾節點入隊列時的上一個隊尾節點next節點有可能為null,因為enq不是原子操作,CAS之后是復合操作
?
?private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
// 這個期間next可能為null
?t.next = node;
return t;
}
}
}
}
3. 獲取鎖時,之前獲取鎖的節點的next設置為null
?
?if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
?
next指向非null的下一個節點,這種情況就是正常的在同步隊列中等待的節點,入隊操作時設置了前一個節點的next值,這樣可以在釋放鎖時,通知下一個節點來獲取鎖
?
?private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
next指向自己,這個是取消操作時,會把節點的前一個節點指向它的后一個節點,最后把next域設置為自己
?
?private void cancelAcquire(Node node) {
??????? // Ignore if node doesn't exist
??????? if (node == null)
??????????? return;
??????? node.thread = null;
??????? // Skip cancelled predecessors
??????? Node pred = node.prev;
??????? while (pred.waitStatus > 0)
??????????? node.prev = pred = pred.prev;
??????? // predNext is the apparent node to unsplice. CASes below will
??????? // fail if not, in which case, we lost race vs another cancel
??????? // or signal, so no further action is necessary.
??????? Node predNext = pred.next;
??????? // Can use unconditional write instead of CAS here.
??????? // After this atomic step, other Nodes can skip past us.
??????? // Before, we are free of interference from other threads.
??????? node.waitStatus = Node.CANCELLED;
??????? // If we are the tail, remove ourselves.
??????? if (node == tail && compareAndSetTail(node, pred)) {
??????????? compareAndSetNext(pred, predNext, null);
??????? } else {
??????????? // If successor needs signal, try to set pred's next-link
??????????? // so it will get one. Otherwise wake it up to propagate.
??????????? int ws;
??????????? if (pred != head &&
??????????????? ((ws = pred.waitStatus) == Node.SIGNAL ||
???????????????? (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
??????????????? pred.thread != null) {
??????????????? Node next = node.next;
??????????????? if (next != null && next.waitStatus <= 0)
??????????????????? compareAndSetNext(pred, predNext, next);
??????????? } else {
??????????????? unparkSuccessor(node);
??????????? }
??????????? node.next = node; // help GC
??????? }
??? }
?
prev引用比較簡單,它主要是維護鏈表結構。CLHLock是在前一個節點的狀態自旋,AQS里面的節點不是在前一個狀態等待,而是釋放的時候由前一個節點通知隊列來查找下一個要被喚醒的節點。
?
最后說說節點進入隊列和出隊列的情況。
?
節點入隊列只有一種情況,那就是它的tryAcquire操作失敗,沒有獲得鎖,就進入同步隊列等待,如果tryAcquire成功了,就不需要進入同步隊列等待了。AQS提供了充分的靈活性,它提供了tryAcquire和tryRelase方法給子類擴展,基類負責維護隊列操作,子類可以自己決定是否要進入隊列。
所以實際子類擴展的時候有兩種類型,一種是公平的同步器,一種是非公平的同步器。這里需要注意的是,所謂的非公平,不是說不使用隊列來維護阻塞操作,而是說在獲取競爭時,不考慮先來的線程,后來的線程可以直接競爭資源。非公平和公平的同步器競爭失敗后,都需要進入AQS的同步隊列進行等待,而同步隊列是先來先服務的公平的隊列。
?
?static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
出隊列有兩種情況,
1. 后一個線程獲得鎖是,head引用指向當前獲得鎖的線程,前一個獲得鎖的節點自動出隊列
2. 取消操作時,節點出隊列,取消只有兩種情況,一種是線程被中斷,還有一種是等待超時
總結
以上是生活随笔為你收集整理的聊聊高并发(二十四)解析java.util.concurrent各个组件(六) 深入理解AQS(四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 聊聊高并发(二十三)解析java.uti
- 下一篇: 聊聊高并发(二十五)解析java.uti