java并发编程之AbstractQueuedSynchronizer
引言
AbstractQueuedSynchronizer,隊(duì)列同步器,簡稱AQS,它是java并發(fā)用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架。
一般使用AQS的主要方式是繼承,子類通過實(shí)現(xiàn)它提供的抽象方法來管理同步狀態(tài),主要管理的方式是通過tryAcquire和tryRelease類似的方法來操作狀態(tài),同時(shí),AQS提供以下線程安全的方法來對狀態(tài)進(jìn)行操作:
protected final int getState(); protected final void setState(int newState); protected final boolean compareAndSetState(int expect, int update);AQS本身是沒有實(shí)現(xiàn)任何同步接口的,它僅僅只是定義了同步狀態(tài)的獲取和釋放的方法來供自定義的同步組件的使用。
注:AQS主要是怎么使用的呢?
在java的同步組件中,AQS的子類一般是同步組件的靜態(tài)內(nèi)部類。
AQS是實(shí)現(xiàn)同步組件的關(guān)鍵,它倆的關(guān)系可以這樣描述:同步組件是面向使用者的,它定義了使用者與組件交互的接口,隱藏了具體的實(shí)現(xiàn)細(xì)節(jié);而AQS面向的是同步組件的實(shí)現(xiàn)者,它簡化了具體的實(shí)現(xiàn)方式,屏蔽了線程切換相關(guān)底層操作,它們倆一起很好的對使用者和實(shí)現(xiàn)者所關(guān)注的領(lǐng)域做了一個(gè)隔離。
AQS實(shí)現(xiàn)分析
接下來將從實(shí)現(xiàn)的角度來具體分析AQS是如何來完成線程同步的。
同步隊(duì)列分析
AQS的實(shí)現(xiàn)依賴內(nèi)部的同步隊(duì)列(FIFO雙向隊(duì)列)來完成同步狀態(tài)的管理,假如當(dāng)前線程獲取同步狀態(tài)失敗,AQS會將該線程以及等待狀態(tài)等信息構(gòu)造成一個(gè)Node,并將其加入同步隊(duì)列,同時(shí)阻塞當(dāng)前線程。當(dāng)同步狀態(tài)釋放時(shí),喚醒隊(duì)列的首節(jié)點(diǎn)。
- Node
Node主要包含以下成員變量:
- prev:前驅(qū)節(jié)點(diǎn);
- next:后繼節(jié)點(diǎn);
- thread:進(jìn)入隊(duì)列的當(dāng)前線程;
- nextWaiter:存儲condition隊(duì)列中的后繼節(jié)點(diǎn)。
Node是sync隊(duì)列和condition隊(duì)列構(gòu)建的基礎(chǔ),AQS擁有三個(gè)成員變量:
AQS成員變量對于鎖的獲取,請求形成節(jié)點(diǎn)將其掛在隊(duì)列尾部,至于資源的轉(zhuǎn)移,是從頭到尾進(jìn)行,隊(duì)列的基本結(jié)構(gòu)就出來了:
AQS同步隊(duì)列結(jié)構(gòu)同步隊(duì)列插入/刪除節(jié)點(diǎn)
節(jié)點(diǎn)插入
AQS提供基于CAS的設(shè)置尾節(jié)點(diǎn)的方法:
CAS設(shè)置尾節(jié)點(diǎn)
需要傳遞當(dāng)前線程認(rèn)為的尾節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn),設(shè)置成功后,當(dāng)前節(jié)點(diǎn)與尾節(jié)點(diǎn)建立關(guān)聯(lián)。
同步隊(duì)列插入節(jié)點(diǎn)節(jié)點(diǎn)刪除
同步隊(duì)列刪除節(jié)點(diǎn)
同步隊(duì)列遵循FIFO,首節(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn),首節(jié)點(diǎn)的線程在釋放同步狀態(tài)之后將會喚醒后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)將會在獲取同步狀態(tài)成功的時(shí)候?qū)⒆约涸O(shè)置為首節(jié)點(diǎn)。
注:設(shè)置首節(jié)點(diǎn)是由獲取同步狀態(tài)成功的線程來完成,因?yàn)槊看沃粫幸粋€(gè)線程能夠成功的獲取到同步狀態(tài),所以,設(shè)置首節(jié)點(diǎn)并不需要CAS來保證。
AQS源碼解析
AQS提供以下接口以供實(shí)現(xiàn)自定義同步器:
獨(dú)占式獲取同步狀態(tài),該方法的實(shí)現(xiàn)需要先查詢當(dāng)前的同步狀態(tài)是否可以獲取,如果可以獲取再進(jìn)行獲取;
釋放狀態(tài);
共享式獲取同步狀態(tài);
共享式釋放狀態(tài);
獨(dú)占模式下,判斷同步狀態(tài)是否已經(jīng)被占用。
使用者可以根據(jù)實(shí)際情況使用這些接口自定義同步組件。
AQS提供兩種方式來操作同步狀態(tài),獨(dú)占式與共享式,下面就針對性做一下源碼分析。
獨(dú)占式同步狀態(tài)獲取 - acquire實(shí)現(xiàn)
獨(dú)占式同步狀態(tài)獲取具體執(zhí)行流程如下:
下面我們具體來看一下節(jié)點(diǎn)的構(gòu)造以及加入同步隊(duì)列部分的代碼實(shí)現(xiàn)。
- addWaiter實(shí)現(xiàn)
- enq實(shí)現(xiàn)
enq的邏輯可以確保Node可以有順序的添加到同步隊(duì)列中,具體的加入隊(duì)列的邏輯如下:
可以看出,整個(gè)enq方法通過“死循環(huán)”來保證節(jié)點(diǎn)的正確插入。
進(jìn)入同步隊(duì)列之后接下來就是同步狀態(tài)的獲取了,或者說是訪問控制acquireQueued。對于同步隊(duì)列中的線程,在同一時(shí)刻只能由隊(duì)列首節(jié)點(diǎn)獲取同步狀態(tài),其他的線程進(jìn)入等待,直到符合條件才能繼續(xù)進(jìn)行。
- acquireQueued實(shí)現(xiàn)
在整個(gè)方法中,當(dāng)前線程一直都在“死循環(huán)”中嘗試獲取同步狀態(tài):
節(jié)點(diǎn)自旋獲取同步狀態(tài)從代碼的邏輯也可以看出,其實(shí)在節(jié)點(diǎn)與節(jié)點(diǎn)之間在循環(huán)檢查的過程中是不會相互通信的,僅僅只是判斷自己當(dāng)前的前驅(qū)是不是頭結(jié)點(diǎn),這樣設(shè)計(jì)使得節(jié)點(diǎn)的釋放符合FIFO,同時(shí)也避免了過早通知。
注:過早通知是指前驅(qū)節(jié)點(diǎn)不是頭結(jié)點(diǎn)的線程由于中斷被喚醒。
acquire實(shí)現(xiàn)總結(jié)
- 同步狀態(tài)維護(hù):
對同步狀態(tài)的操作是原子、非阻塞的,通過AQS提供的對狀態(tài)訪問的方法來對同步狀態(tài)進(jìn)行操作,并且利用CAS來確保原子操作; - 狀態(tài)獲取:
一旦線程成功的修改了同步狀態(tài),那么該線程會被設(shè)置為同步隊(duì)列的頭節(jié)點(diǎn); - 同步隊(duì)列維護(hù):
不符合獲取同步狀態(tài)的線程會進(jìn)入等待狀態(tài),直到符合條件被喚醒再開始執(zhí)行。
整個(gè)執(zhí)行流程如下:
acquire流程圖
當(dāng)前線程獲取同步狀態(tài)并執(zhí)行了相應(yīng)的邏輯之后,就需要釋放同步狀態(tài),讓后續(xù)節(jié)點(diǎn)可以獲取到同步狀態(tài),調(diào)用方法release(int arg)方法可以釋放同步狀態(tài)。
獨(dú)占式同步狀態(tài)釋放 - release實(shí)現(xiàn)
- unparkSuccessor實(shí)現(xiàn)
取出當(dāng)前節(jié)點(diǎn)的next節(jié)點(diǎn),將該節(jié)點(diǎn)線程喚醒,被喚醒的線程獲取同步狀態(tài)。這里主要通過LockSupport的unpark方法喚醒線程。
共享式同步狀態(tài)獲取
共享式獲取與獨(dú)占式獲取最主要的區(qū)別就是在同一時(shí)刻能否有多個(gè)線程可以同時(shí)獲取到同步狀態(tài)。這兩種不同的方式在獲取資源區(qū)別如下圖所示:
AQS提供acquireShared方法來支持共享式獲取同步狀態(tài)。
- acquireShared實(shí)現(xiàn)
tryAcquireShared方法返回值 > 0時(shí),表示能夠獲取到同步狀態(tài);
- doAcquireShared實(shí)現(xiàn)
與獨(dú)占式獲取同步狀態(tài)一樣,共享式獲取也是需要釋放同步狀態(tài)的,AQS提供releaseShared(int arg)方法可以釋放同步狀態(tài)。
共享式同步狀態(tài)釋放 - releaseShared實(shí)現(xiàn)
releaseShared實(shí)現(xiàn)獨(dú)占式超時(shí)獲取 - doAcquireNanos
該方法提供了超時(shí)獲取同步狀態(tài)調(diào)用,假如在指定的時(shí)間段內(nèi)可以獲取到同步狀態(tài)返回true,否則返回false。它是acquireInterruptibly(int arg)的增強(qiáng)版。
acquireInterruptibly實(shí)現(xiàn)
acquireInterruptibly實(shí)現(xiàn)
該方法提供了獲取同步狀態(tài)的能力,同樣,在無法獲取同步狀態(tài)時(shí)會進(jìn)入同步隊(duì)列,這類似于acquire的功能,但是它和acquire還是區(qū)別的:acquireInterruptibly可以在外界對當(dāng)前線程進(jìn)行中斷的時(shí)候可以提前獲取到同步狀態(tài)的操作,換個(gè)通俗易懂的解釋吧:類似于synchronized獲取鎖時(shí),這時(shí)候外界對當(dāng)前線程中斷了,線程獲取鎖的這個(gè)操作能夠及時(shí)響應(yīng)中斷并且提前返回。- 判斷當(dāng)前線程是否被中斷,如果已經(jīng)被中斷,拋出InterruptedException異常并將中斷標(biāo)志位置為false;
- 獲取同步狀態(tài),獲取成功并返回,獲取不成功調(diào)用doAcquireInterruptibly(int arg)排隊(duì)等待。
doAcquireInterruptibly實(shí)現(xiàn)
doAcquireInterruptibly實(shí)現(xiàn)- 構(gòu)造節(jié)點(diǎn)Node,加入同步隊(duì)列;
- 假如當(dāng)前節(jié)點(diǎn)是首節(jié)點(diǎn)并且可以獲取到同步狀態(tài),將當(dāng)前節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),其他節(jié)點(diǎn)自旋等待;
- 節(jié)點(diǎn)每次被喚醒的時(shí)候,需要進(jìn)行中斷檢測,假如當(dāng)前線程被中斷,拋出異常InterruptedException,退出循環(huán)。
doAcquireNanos實(shí)現(xiàn)
doAcquireNanos實(shí)現(xiàn)
該方法在支持中斷響應(yīng)的基礎(chǔ)上,增加了超時(shí)獲取的特性。針對超時(shí)獲取,主要在于計(jì)算出需要睡眠的時(shí)間間隔nanosTimeout,如果nanosTimeout > 0表示當(dāng)前線程還需要睡眠,反之返回false。- nanosTimeout <= 0,表明當(dāng)前線程不需要睡眠,返回false,不能獲取到同步狀態(tài);
- 不滿足條件的線程加入同步隊(duì)列;
- 假如當(dāng)前節(jié)點(diǎn)是首節(jié)點(diǎn),并且可以獲取到同步狀態(tài),將當(dāng)前節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn)并退出,返回true,表明在指定的時(shí)間內(nèi)可以獲取到同步狀態(tài);
- 不滿足條件3的線程,計(jì)算出當(dāng)前休眠時(shí)間,nanosTimeout = 原有nanosTimeout + deadline(睡眠之前記錄的時(shí)間)- now(System.nanoTime():當(dāng)前時(shí)間):
- 如果nanosTimeout <= 0,返回超時(shí)未獲取到同步狀態(tài);
- 如果nanosTimeout > 0 && nanosTimeout <= 1000L,線程快速自旋
注:為什么不直接進(jìn)入超時(shí)等待呢?原因在于非常短的超時(shí)等待是無法做到十分精確的,如果這時(shí)候再進(jìn)入超時(shí)等待會讓nanosTimeout的超時(shí)從整體上表現(xiàn)的不精確,所以,在超時(shí)非常短的情況下,AQS都會無條件進(jìn)入快速自旋;
- 如果nanosTimeout > 1000L,線程通過LockSupport.parkNanos進(jìn)入超時(shí)等待。
整個(gè)流程可以總結(jié)如下圖所示:
doAcquireNanos流程圖
后記
在上述對AQS進(jìn)行了實(shí)現(xiàn)層面的分析之后,我們就一個(gè)案例來加深對AQS的理解。
案例:設(shè)計(jì)一個(gè)AQS同步器,該工具在同一時(shí)刻,只能有兩個(gè)線程能夠訪問,其他的線程阻塞。
設(shè)計(jì)分析:針對上述案例,我們可以這樣定義AQS,設(shè)定一個(gè)初始狀態(tài)為2,每一個(gè)線程獲取一次就-1,正確的狀態(tài)為:0,1,2,0表示新的線程獲取同步狀態(tài)時(shí)阻塞。由于資源數(shù)量大與1,需要實(shí)現(xiàn)tryAcquireShared和tryReleaseShared方法。
代碼實(shí)現(xiàn):
public class LockInstance implements Lock {private final Sync sync = new Sync(2);private static final class Sync extends AbstractQueuedSynchronizer { Sync(int state) {if (state <= 0) {throw new IllegalArgumentException("count must large than 0");}setState(state);}public int tryAcquireShared(int arg) {for (;;) {System.out.println("try acquire....");int current = getState();int now = current - arg;if (now < 0 || compareAndSetState(current, now)) {return now;}}}public boolean tryReleaseShared(int arg) {for(;;) {System.out.println("try release....");int current = getState();int now = current + arg;if (compareAndSetState(current, now)) {return true;}}}}public void lock() {sync.acquireShared(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock() {return sync.tryAcquireShared(1) >= 0;}public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(time));}public void unlock() {sync.tryReleaseShared(1);}public Condition newCondition() {return null;} }作者:miaoLoveCode
鏈接:https://www.jianshu.com/p/df0d7d6571de
來源:簡書
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。
總結(jié)
以上是生活随笔為你收集整理的java并发编程之AbstractQueuedSynchronizer的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: AbstractQueuedSynchr
- 下一篇: 深入分析AbstractQueuedSy