AQS独占式同步队列入队与出队
入隊
Node
AQS同步隊列和等待隊列共用同一種節點結構Node,與同步隊列相關的屬性如下。
- prev 前驅結點
- next 后繼節點
- thread 入隊的線程
- 入隊節點的狀態?
?
acquire?
模板方法,失敗后構造節點、入隊、自旋。需要關注的是如果if條件滿足會執行selfInterrupt,這個后面分析。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}?
構造節點并入隊
- 建立節點的構造方法并沒有設置waitSatus,由于waitStatus是int類型的,在沒有初始化的情況下就是0,所以默認新建的Node其waitStatus就是0。
- 如果當前隊列的tail不為Null,代表隊列已經初始化,那么就自旋CAS加入隊列。這里使用CAS來保證并發安全下的安全性,每次CAS之前都會再次取出當前的tail。
- 如果當前隊列的tail為Null,代表隊列沒有初始化,調用CAS創建Head。這里也用了CAS,同樣為了保證并發安全性,且創建完成后隊列的head=tail,而且會繼續下一次循環,說明隊列里有一個冗余節點(dummy head)
?
acquireQueued
如果當前節點的頭結點是head,且獲得鎖成功,把當前節點設置為head并返回。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}? 如果前繼不是頭結點,或者前繼是頭結點但是獲得鎖失敗
首先判斷是否需要park自己 (shouldParkAfterFailedAcquire)
- 如果前繼節點的waitStatus是-1,返回true
- 如果前繼節點大于零,說明前繼節點已經被Cancelled,跳過該前繼節點一直往前找知道找到一個waitStatus<=0的節點,直到找到一個<=0的,然后設置成-1,返回false
- 返回true的會被park進入WAITING狀態,返回false的將會再次嘗試獲得一次鎖
?
?不響應中斷情況下對中斷的處理
上面分析的是acquire而非acquireInterruptibly,即一個線程在獲取鎖時如果被中斷是不響應的。從底層往上追溯一下AQS對中斷的處理。
- 首先一個線程被prak的線程被中斷后會返回且不拋出異常,所以該線程會從parkAndCheckInterrupt方法返回,且返回為true
- 接下來來到了acquireQueued的if判斷,在if判斷成立后interrupt被設置成true,默認情況下interrupt是false if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
?
- ?接著他會一如既往的嘗試獲取鎖,如果失敗會繼續park自己。是否park自己只與前繼節點的waitStatus有關,與自己的Interrupt沒有任何關系。
- 當該線程獲取鎖會從acquireQueued中返回,返回的值就是在acquireQueued中的第二個if里被修改的interrupt,也就是true
- 然后回到了最初的起點,獲得鎖的線程會從acquireQueued中返回。因為acquireQueued為true所有會執行selfInterrupt。但是由于此時線程的狀態已經是RUNNING狀態,所以該interrupt并不會對該線程造成任何影響,產生的結果就像對一個處于RUNNING的線程執行interrupt一樣,就是沒有影響,除了改變了中斷狀態標志位以外。
? 測試代碼如下:主線程先獲取鎖,然后再開啟子線程并嘗試獲取鎖,此時打印出子線程中斷標志位,然后主線程釋放鎖后,觀察子線程是否繼續執行。
public static void main(String[] args) throws InterruptedException {final ReentrantLock lock = new ReentrantLock();Thread thread = new Thread(new Runnable() {@Overridepublic void run() {lock.lock();System.out.println("我來了");}});lock.lock();Thread.sleep(1000);thread.start();thread.interrupt();System.out.println(thread.isInterrupted());lock.unlock();}? 最終的結果和我的分析一樣,在主線程將獲得鎖的情況下中斷子線程,子線程沒有任何反應,但子線程的中斷標志位是true。并且在主線程釋放鎖后,子線程獲得鎖就可繼續執行。
?
?響應中斷情況下對中斷的處理
大體邏輯是相同的,不同的是如果一個線程在隊列里被中斷的話,會拋出異常并退出嘗試獲得鎖。
public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg);} private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}?
?支持超時、中斷
- 方法聲明里拋出了一個異常,以為這個方法可以相應中斷
- 在每次獲得鎖失敗都會檢查是否還有剩余時間,如果有才會park自己,否則返回
?
?
?
出隊
- 調用被重寫的tryRelease方法釋放鎖。之所以這里加上if判斷,是為了解決鎖重入的情況下,必須把所持有的所有重入的鎖都釋放才可以喚醒后繼節點
- 判斷是否需要喚醒后繼節點,h.waitStatus!=0其實就是<0,即后面是否有等待的節點
- 喚醒后繼節點
?
- 清除等待標志位。只用了一個cas操作而非循環cas,也就意味著清除等待標志位是允許失敗的
- 找到下一個需要被喚醒的節點
- 喚醒節點
?
? 在喚醒節點的時候,如果當前節點的直接后繼不存在或者已經退出等待,那么會從隊列里從tail節點開始再次尋找一個需要被喚醒的節點。當喚醒一個pre不是head的節點時,doAcquireInterruptibly中的if(p==head)判斷不通過,進入shouldParkAfterFailedAcquire方法,在該方法中會清楚掉s節點之前的失效節點并把s的pre設置為head,由于shouldParkAfterFailedAcquire返回false會再次嘗試獲取鎖。由于s的pre已經是head,所以此時s節點可以獲取鎖。
?
轉載于:https://www.cnblogs.com/AshOfTime/p/10877666.html
總結
以上是生活随笔為你收集整理的AQS独占式同步队列入队与出队的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 利用PyMySQL模块操作数据库
- 下一篇: Spring Boot 2 快速教程:W