Java高并发系列 — AQS
只懂volatile和CAS是不是可以無視concurrent包了呢,發(fā)現(xiàn)一個好鏈接,繼續(xù)死磕,第一日;
首先,我承認很多時候要去看源碼才能更好搞懂一些事,但如果站在巨人肩膀上呢?有了大概思想源碼看還是挺容易的,關(guān)鍵是要調(diào)試一次,特別是類似CountDownLatch這樣的。
另外,沒有完全理解我是不會記錄一個字的,寫這個目的就是為了快速復(fù)習(xí)而已;不開玩笑,這個AQS我多年前也看過一次,不用也全忘了。
最后,文本是在自己理解下做的另一番解析,和原文不同。
介紹一副肩膀
https://www.cnblogs.com/waterystone/p/4920797.html
首先是AQS的整個構(gòu)造,我喜歡圖,上圖:
就是上圖的構(gòu)造,結(jié)合volatile(上圖的volatile int state),CAS(各種如compareAndSetTail()方法),for(;;)自旋,整個concurrent包的靈魂就出來了。
初認識AQS只需要知道用它分獲取和釋放兩種操作,每個操作分為獨占和共享,也就是acqurie、release、acquireShared、releaseshare這四個方法,可以多個子類同步器,如CyclicBarrire
記錄文:
獨占鎖acquire(int)
還有得到個好東西,一圖明確acquire()方法,如下圖:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}?
附帶acquire()的總結(jié):
這里有個有趣的問題就是在加入等待隊列之后,線程是否應(yīng)該進入wait狀態(tài)【圖中的park(),其實是用Unsafe.unpark實現(xiàn)的。unpark使得線程不需要獲取鎖進入wait狀態(tài)】,加入隊尾后,利用acquireQueued休息:
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;//標記是否成功拿到資源try {boolean interrupted = false;//標記等待過程中是否被中斷過//又是一個“自旋”!這里自旋什么時候會跳出呢?看return可知,只有在獲取到了鎖會退出,如果獲取不到,那么就會被unpark把線程帶到wait狀態(tài)。//當unpark喚醒后會繼續(xù)自旋看自己是否是老二,一般情況下就是了,然后退出循環(huán),返回中斷狀態(tài),用作補償處理,比較wai是不響應(yīng)中斷的for (;;) {final Node p = node.predecessor();//拿到前驅(qū)//如果前驅(qū)是head,即該結(jié)點已成老二,那么便有資格去嘗試獲取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。if (p == head && tryAcquire(arg)) {setHead(node);//拿到資源后,將head指向該結(jié)點。所以head所指的標桿結(jié)點,就是當前獲取到資源的那個結(jié)點或null。p.next = null; // setHead中node.prev已置為null,此處再將head.next置為null,就是為了方便GC回收以前的head結(jié)點。也就意味著之前拿完資源的結(jié)點出隊了!failed = false;return interrupted;//返回等待過程中是否被中斷過 }//如果自己可以休息了,就進入waiting狀態(tài),直到被unpark()if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;//如果等待過程中被中斷過,哪怕只有那么一次,就將interrupted標記為true }} finally {if (failed)cancelAcquire(node);} }
? acquireQueued方法是如何找到安全點的呢,找安全點其實就是找到一個前驅(qū)節(jié)點,該節(jié)點的waitStatus == Node.SIGNAL,而這個整型值為-1,這個標志的Node遵守一個協(xié)議就是該節(jié)點在釋放鎖的時候會喚醒下一個需要喚醒的節(jié)點(不一定是后繼節(jié)點);如果找不到,就會在自旋中變?yōu)橹苯訝幦℃i,結(jié)合上面和下面代碼:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//拿到前驅(qū)的狀態(tài)if (ws == Node.SIGNAL)//如果已經(jīng)告訴前驅(qū)拿完號后通知自己一下,那就可以安心休息了return true;if (ws > 0) {/** 如果前驅(qū)放棄了,那就一直往前找,直到找到最近一個正常等待的狀態(tài),并排在它的后邊。* 注意:那些放棄的結(jié)點,由于被自己“加塞”到它們前邊,它們相當于形成一個無引用鏈,稍后就會被保安大叔趕走了(GC回收)!*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//如果前驅(qū)正常,那就把前驅(qū)的狀態(tài)設(shè)置成SIGNAL,告訴它拿完號后通知自己一下。有可能失敗,人家說不定剛剛釋放完呢! compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; }?
? 介紹下waitStatus的狀態(tài)取:
1、如果pred的waitStatus == 0,則通過CAS指令修改waitStatus為Node.SIGNAL。2、如果pred的waitStatus > 0,表明pred的線程狀態(tài)CANCELLED,需從隊列中刪除。
3、如果pred的waitStatus為Node.SIGNAL,則通過LockSupport.park()方法把線程A掛起,并等待被喚醒。 /** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;
?
我們得知,如果waitStatus為-1那么就可以讓線程upark進入等待狀態(tài)(同thread.wait()),那么CLH是如何辦到喚醒的呢?答案在realease()方法上。
獨占鎖釋放release(int):
該方法的目的是用unpark()喚醒等待隊列中最前邊的那個未放棄線程,問題是所有的線程要么被中斷放棄,要么在等待被喚醒,那么該如何喚醒?就是用到waitStatus,首先嘗試釋放首節(jié)點,如果釋放失敗,那么就從tail開始,一直往前找,如果找到就Node s變量用記下來,找到多個就把最后的覆蓋前面的,那么遍歷到頭節(jié)點后,自然s記錄的就是最前的需要喚醒的節(jié)點了。結(jié)合代碼就很好看懂:
private void unparkSuccessor(Node node) {//這里,node一般為當前線程所在的結(jié)點。int ws = node.waitStatus;if (ws < 0)//置零當前線程所在的結(jié)點狀態(tài),允許失敗,使得找節(jié)點只需找<0的,可以跳過頭節(jié)點。compareAndSetWaitStatus(node, ws, 0);Node s = node.next;//找到下一個需要喚醒的結(jié)點s,先從頭結(jié)點開始,不對就從尾部開始往前找到最前的if (s == null || s.waitStatus > 0) {//如果為空或已取消s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)//從這里可以看出,<=0的結(jié)點,都是還有效的結(jié)點。s = t;}if (s != null)LockSupport.unpark(s.thread);//喚醒 }
?
從以上代碼可以看出,喚醒這個操作是當前線程做的,由獲取鎖的線程去喚醒下一個線程;這里只需要LocakSupport.unpark,當被喚醒的線程成功后,就會設(shè)置自己為head,然后退出代碼塊,具體見acquireQueue方法注釋。
共享獲取acquireShared()
這個其實和acquire差不多,只不過獲取的status是多個,而且獲取失敗后直接繼續(xù)喚醒后調(diào)用doAcquireShare
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}而doAcquireShare方法和acquireQueued方法是差不多的,具體看如下代碼注釋:
/*** Acquires in shared uninterruptible mode.* @param arg the acquire argument*/private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);//首先保證入隊boolean failed = true;try {boolean interrupted = false; //追蹤中斷狀態(tài)for (;;) {final Node p = node.predecessor();if (p == head) { //如果是頭結(jié)點那么久嘗試獲取資源,然后根據(jù)資源情況決定是否喚醒后面線程int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r); //r>0,喚醒后面線程p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}//這里也是找安全點if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}再看看該方法是如何繼續(xù)喚醒后面線程的,這里注意,喚醒線程有兩種情況,第一種是頭結(jié)點獲取到鎖后還有資源所以喚醒后面的一起共享,第二種情況是獲得鎖的線程釋放資源的時候去喚醒后面的節(jié)點,如下代碼所示,doReleaseShared方法就是用作喚醒線程的。
另外,在眾多代碼中我們都看到interrupted的標記位,可以看出,等待過程是不響應(yīng)中斷的,只會在后期補上中斷響應(yīng),而中斷響應(yīng)是線程自身決定的。
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; setHead(node);//head指向自己//如果還有剩余量,繼續(xù)喚醒下一個鄰居線程if (propagate > 0 || h == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();} }釋放共享releaseShare()
該方法同樣簡單,嘗試釋放資源,否則喚醒后面線程,這里我們同樣看到了doReleaseShared方法
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {//嘗試釋放資源doReleaseShared();//喚醒后繼結(jié)點return true;}return false; }代碼:
這里又用到了unparkSuccessor方法,首先判斷頭結(jié)點的waitStates是否為SIGNAL標志,是就設(shè)置為0,代表已經(jīng)不再需要資源,然后自旋(for ;;)調(diào)用unparkSusseor,直到后繼線程獲取成功,后繼線程獲取成功會被喚醒,喚醒后后繼線程第一件事就是在acquireQueue中自旋內(nèi)部把自己設(shè)置為頭結(jié)點,從而導(dǎo)致head引用(注意,head引用是各個線程共用的)發(fā)生變化,這樣一來,這個釋放的線程節(jié)點就能退出循環(huán),代表資源釋放完畢。
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;unparkSuccessor(h);//喚醒后繼 }else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head)// head發(fā)生變化,當喚醒成功后,老二結(jié)點會自動替換head為自己,所以這里就不再相等,釋放成功break;} }head發(fā)生變化,當喚醒成功后,老二結(jié)點會自動替換head為自己,所以這里就不再相等,釋放成功,請參看appendQueue代碼注釋、
?
Thread狀態(tài)轉(zhuǎn)換
另外因為復(fù)習(xí)該知識點通常要知道一下線程的狀態(tài)和之間的關(guān)系,所以再貼一張圖,由此特別說明一下,unpark方法是不加鎖進入waiting的唯一方法。轉(zhuǎn)載于:https://www.cnblogs.com/iCanhua/p/8965604.html
總結(jié)
以上是生活随笔為你收集整理的Java高并发系列 — AQS的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。