AQS的原理及应用
前言
Java中的大部分同步類(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(簡稱為AQS)實現的。AQS是一種提供了原子式管理同步狀態、阻塞和喚醒線程功能以及隊列模型的簡單框架。本文會從應用層逐漸深入到原理層,并通過ReentrantLock的基本特性和ReentrantLock與AQS的關聯,來深入解讀AQS相關獨占鎖的知識點,同時采取問答的模式來幫助大家理解AQS。由于篇幅原因,本篇文章主要闡述AQS中獨占鎖的邏輯和Sync Queue,不講述包含共享鎖和Condition Queue的部分(本篇文章核心為AQS原理剖析,只是簡單介紹了ReentrantLock,感興趣同學可以閱讀一下ReentrantLock的源碼)。
下面列出本篇文章的大綱和思路,以便于大家更好地理解:
1 ReentrantLock
1.1 ReentrantLock特性概覽
ReentrantLock意思為可重入鎖,指的是一個線程能夠對一個臨界資源重復加鎖。為了幫助大家更好地理解ReentrantLock的特性,我們先將ReentrantLock跟常用的Synchronized進行比較,其特性如下(藍色部分為本篇文章主要剖析的點):
下面通過偽代碼,進行更加直觀的比較:
//?**************************Synchronized的使用方式************************** //?1.用于代碼塊 synchronized?(this)?{} //?2.用于對象 synchronized?(object)?{} //?3.用于方法 public?synchronized?void?test?()?{} //?4.可重入 for?(int?i?=?0;?i?<?100;?i++)?{synchronized?(this)?{} } //?**************************ReentrantLock的使用方式************************** public?void?test?()?throw?Exception?{//?1.初始化選擇公平鎖、非公平鎖ReentrantLock?lock?=?new?ReentrantLock(true);//?2.可用于代碼塊lock.lock();try?{try?{//?3.支持多種加鎖方式,比較靈活;?具有可重入特性if(lock.tryLock(100,?TimeUnit.MILLISECONDS)){?}}?finally?{//?4.手動釋放鎖lock.unlock()}}?finally?{lock.unlock();} }1.2 ReentrantLock與AQS的關聯
通過上文我們已經了解,ReentrantLock支持公平鎖和非公平鎖(關于公平鎖和非公平鎖的原理分析,可參考《不可不說的Java“鎖”事》),并且ReentrantLock的底層就是由AQS來實現的。那么ReentrantLock是如何通過公平鎖和非公平鎖與AQS關聯起來呢?我們著重從這兩者的加鎖過程來理解一下它們與AQS之間的關系(加鎖過程中與AQS的關聯比較明顯,解鎖流程后續會介紹)。
非公平鎖源碼中的加鎖流程如下:
//?java.util.concurrent.locks.ReentrantLock#NonfairSync//?非公平鎖 static?final?class?NonfairSync?extends?Sync?{...final?void?lock()?{if?(compareAndSetState(0,?1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}... }這塊代碼的含義為:
-
若通過CAS設置變量State(同步狀態)成功,也就是獲取鎖成功,則將當前線程設置為獨占線程。
-
若通過CAS設置變量State(同步狀態)失敗,也就是獲取鎖失敗,則進入Acquire方法進行后續處理。
第一步很好理解,但第二步獲取鎖失敗后,后續的處理策略是怎么樣的呢?這塊可能會有以下思考:
-
某個線程獲取鎖失敗的后續流程是什么呢?有以下兩種可能:
-
將當前線程獲鎖結果設置為失敗,獲取鎖流程結束。這種設計會極大降低系統的并發度,并不滿足我們實際的需求。所以就需要下面這種流程,也就是AQS框架的處理流程。
-
存在某種排隊等候機制,線程繼續等待,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續。
-
-
對于問題1的第二種情況,既然說到了排隊等候機制,那么就一定會有某種隊列形成,這樣的隊列是什么數據結構呢?
-
處于排隊等候機制中的線程,什么時候可以有機會獲取鎖呢?
-
如果處于排隊等候機制中的線程一直無法獲取鎖,還是需要一直等待嗎,還是有別的策略來解決這一問題?
帶著非公平鎖的這些問題,再看下公平鎖源碼中獲鎖的方式:
//?java.util.concurrent.locks.ReentrantLock#FairSyncstatic?final?class?FairSync?extends?Sync?{...??final?void?lock()?{acquire(1);}... }看到這塊代碼,我們可能會存在這種疑問:Lock函數通過Acquire方法進行加鎖,但是具體是如何加鎖的呢?
結合公平鎖和非公平鎖的加鎖流程,雖然流程上有一定的不同,但是都調用了Acquire方法,而Acquire方法是FairSync和UnfairSync的父類AQS中的核心方法。
對于上邊提到的問題,其實在ReentrantLock類源碼中都無法解答,而這些問題的答案,都是位于Acquire方法所在的類AbstractQueuedSynchronizer中,也就是本文的核心——AQS。下面我們會對AQS以及ReentrantLock和AQS的關聯做詳細介紹(相關問題答案會在2.3.5小節中解答)。
2 AQS
首先,我們通過下面的架構圖來整體了解一下AQS框架:
-
上圖中有顏色的為Method,無顏色的為Attribution。
-
總的來說,AQS框架共分為五層,自上而下由淺入深,從AQS對外暴露的API到底層基礎數據。
-
當有自定義同步器接入時,只需重寫第一層所需要的部分方法即可,不需要關注底層具體的實現流程。當自定義同步器進行加鎖或者解鎖操作時,先經過第一層的API進入AQS內部方法,然后經過第二層進行鎖的獲取,接著對于獲取鎖失敗的流程,進入第三層和第四層的等待隊列處理,而這些處理方式均依賴于第五層的基礎數據提供層。
下面我們會從整體到細節,從流程到方法逐一剖析AQS框架,主要分析過程如下:
2.1 原理概覽
AQS核心思想是,如果被請求的共享資源空閑,那么就將當前請求資源的線程設置為有效的工作線程,將共享資源設置為鎖定狀態;如果共享資源被占用,就需要一定的阻塞等待喚醒機制來保證鎖分配。這個機制主要用的是CLH隊列的變體實現的,將暫時獲取不到鎖的線程加入到隊列中。
CLH:Craig、Landin and Hagersten隊列,是單向鏈表,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO),AQS是通過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。
主要原理圖如下:
AQS使用一個Volatile的int類型的成員變量來表示同步狀態,通過內置的FIFO隊列來完成資源獲取的排隊工作,通過CAS完成對State值的修改。
2.1.1 AQS數據結構
先來看下AQS中最基本的數據結構——Node,Node即為上面CLH變體隊列中的節點。
解釋一下幾個方法和屬性值的含義:
| waitStatus | 當前節點在隊列中的狀態 |
| prev | 前驅指針 |
| next | 后繼指針 |
| thread | 表示處于該節點的線程 |
| nextWaiter | 指向下一個處于CONDITION狀態的節點(由于本篇文章不講述Condition Queue隊列,這個指針不多介紹) |
| predecessor | 返回前驅節點,沒有的話拋出npe |
線程兩種鎖的模式:
| SHARED | 表示線程以共享的模式等待鎖 |
| EXCLUSIVE | 表示線程正在以獨占的方式等待鎖 |
waitStatus有下面幾個枚舉值:
| CANCELLED | 為1,表示線程獲取鎖的請求已經取消了 |
| SIGNAL | 為-1,表示線程已經準備好了,就等資源釋放了 |
| CONDITION | 為-2,表示節點在等待隊列中,節點線程等待喚醒 |
| PROPAGATE | 為-3,當前線程處在SHARED情況下,該字段才會使用 |
| 0 | 當一個Node被初始化的時候的默認值 |
2.1.2 同步狀態State
在了解數據結構后,接下來了解一下AQS的同步狀態——State。AQS中維護了一個名為state的字段,意為同步狀態,是由Volatile修飾的,用于展示當前臨界資源的獲鎖情況。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?volatile?int?state;下面提供了幾個訪問這個字段的方法:
| protected final int getState() | 獲取State的值 |
| protected final void setState(int newState) | 設置State的值 |
| protected final boolean compareAndSetState(int expect, int update) | 使用CAS方式更新State |
這幾個方法都是Final修飾的,說明子類中無法重寫它們。我們可以通過修改State字段表示的同步狀態來實現多線程的獨占模式和共享模式(加鎖過程)。
對于我們自定義的同步工具,需要自定義獲取同步狀態和釋放狀態的方式,也就是AQS架構圖中的第一層:API層。
2.2 AQS重要方法與ReentrantLock的關聯
從架構圖中可以得知,AQS提供了大量用于自定義同步器實現的Protected方法。自定義同步器實現的相關方法也只是為了通過修改State字段來實現多線程的獨占模式或者共享模式。自定義同步器需要實現以下方法(ReentrantLock需要實現的方法如下,并不是全部):
| protected boolean isHeldExclusively() | 該線程是否正在獨占資源。只有用到Condition才需要去實現它。 |
| protected boolean tryAcquire(int arg) | 獨占方式。arg為獲取鎖的次數,嘗試獲取資源,成功則返回True,失敗則返回False。 |
| protected boolean tryRelease(int arg) | 獨占方式。arg為釋放鎖的次數,嘗試釋放資源,成功則返回True,失敗則返回False。 |
| protected int tryAcquireShared(int arg) | 共享方式。arg為獲取鎖的次數,嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。 |
| protected boolean tryReleaseShared(int arg) | 共享方式。arg為釋放鎖的次數,嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回True,否則返回False。 |
一般來說,自定義同步器要么是獨占方式,要么是共享方式,它們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。AQS也支持自定義同步器同時實現獨占和共享兩種方式,如ReentrantReadWriteLock。ReentrantLock是獨占鎖,所以實現了tryAcquire-tryRelease。
以非公平鎖為例,這里主要闡述一下非公平鎖與AQS之間方法的關聯之處,具體每一處核心方法的作用會在文章后面詳細進行闡述。
為了幫助大家理解ReentrantLock和AQS之間方法的交互過程,以非公平鎖為例,我們將加鎖和解鎖的交互流程單獨拎出來強調一下,以便于對后續內容的理解。
加鎖:
-
通過ReentrantLock的加鎖方法Lock進行加鎖操作。
-
會調用到內部類Sync的Lock方法,由于Sync#lock是抽象方法,根據ReentrantLock初始化選擇的公平鎖和非公平鎖,執行相關內部類的Lock方法,本質上都會執行AQS的Acquire方法。
-
AQS的Acquire方法會執行tryAcquire方法,但是由于tryAcquire需要自定義同步器實現,因此執行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通過公平鎖和非公平鎖內部類實現的tryAcquire方法,因此會根據鎖類型不同,執行不同的tryAcquire。
-
tryAcquire是獲取鎖邏輯,獲取失敗后,會執行框架AQS的后續邏輯,跟ReentrantLock自定義同步器無關。
?
解鎖:
-
通過ReentrantLock的解鎖方法Unlock進行解鎖。
-
Unlock會調用內部類Sync的Release方法,該方法繼承于AQS。
-
Release中會調用tryRelease方法,tryRelease需要自定義同步器實現,tryRelease只在ReentrantLock中的Sync實現,因此可以看出,釋放鎖的過程,并不區分是否為公平鎖。
-
釋放成功后,所有處理由AQS框架完成,與自定義同步器無關。
通過上面的描述,大概可以總結出ReentrantLock加鎖解鎖時API層核心方法的映射關系。
2.3 通過ReentrantLock理解AQS
ReentrantLock中公平鎖和非公平鎖在底層是相同的,這里以非公平鎖為例進行分析。
在非公平鎖中,有一段這樣的代碼:
//?java.util.concurrent.locks.ReentrantLockstatic?final?class?NonfairSync?extends?Sync?{...final?void?lock()?{if?(compareAndSetState(0,?1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}... }看一下這個Acquire是怎么寫的:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerpublic?final?void?acquire(int?arg)?{if?(!tryAcquire(arg)?&&?acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))selfInterrupt(); }再看一下tryAcquire方法:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprotected?boolean?tryAcquire(int?arg)?{throw?new?UnsupportedOperationException(); }可以看出,這里只是AQS的簡單實現,具體獲取鎖的實現方法是由各自的公平鎖和非公平鎖單獨實現的(以ReentrantLock為例)。如果該方法返回了True,則說明當前線程獲取鎖成功,就不用往后執行了;如果獲取失敗,就需要加入到等待隊列中。下面會詳細解釋線程是何時以及怎樣被加入進等待隊列中的。
2.3.1 線程加入等待隊列
2.3.1.1 加入隊列的時機
當執行Acquire(1)時,會通過tryAcquire獲取鎖。在這種情況下,如果獲取鎖失敗,就會調用addWaiter加入到等待隊列中去。
2.3.1.2 如何加入隊列
獲取鎖失敗后,會執行addWaiter(Node.EXCLUSIVE)加入等待隊列,具體實現方法如下:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?Node?addWaiter(Node?mode)?{Node?node?=?new?Node(Thread.currentThread(),?mode);//?Try?the?fast?path?of?enq;?backup?to?full?enq?on?failureNode?pred?=?tail;if?(pred?!=?null)?{node.prev?=?pred;if?(compareAndSetTail(pred,?node))?{pred.next?=?node;return?node;}}enq(node);return?node; } private?final?boolean?compareAndSetTail(Node?expect,?Node?update)?{return?unsafe.compareAndSwapObject(this,?tailOffset,?expect,?update); }主要的流程如下:
(1)通過當前的線程和鎖模式新建一個節點。
(2)Pred指針指向尾節點Tail。
(3)將New中Node的Prev指針指向Pred。
(4)通過compareAndSetTail方法,完成尾節點的設置。這個方法主要是對tailOffset和Expect進行比較,如果tailOffset的Node和Expect的Node地址是相同的,那么設置Tail的值為Update的值。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerstatic?{try?{stateOffset?=?unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset?=?unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset?=?unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset?=?unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset?=?unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));}?catch?(Exception?ex)?{?throw?new?Error(ex);?} }從AQS的靜態代碼塊可以看出,都是獲取一個對象的屬性相對于該對象在內存當中的偏移量,這樣我們就可以根據這個偏移量在對象內存當中找到這個屬性。tailOffset指的是tail對應的偏移量,所以這個時候會將new出來的Node置為當前隊列的尾節點。同時,由于是雙向鏈表,也需要將前一個節點指向尾節點。
(5)?如果Pred指針是Null(說明等待隊列中沒有元素),或者當前Pred指針和Tail指向的位置不同(說明被別的線程已經修改),就需要看一下Enq的方法。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?Node?enq(final?Node?node)?{for?(;;)?{Node?t?=?tail;if?(t?==?null)?{?//?Must?initializeif?(compareAndSetHead(new?Node()))tail?=?head;}?else?{node.prev?=?t;if?(compareAndSetTail(t,?node))?{t.next?=?node;return?t;}}} }如果沒有被初始化,需要進行初始化一個頭結點出來。但請注意,初始化的頭結點并不是當前線程節點,而是調用了無參構造函數的節點。如果經歷了初始化或者并發導致隊列中有元素,則與之前的方法相同。其實,addWaiter就是一個在雙端鏈表添加尾節點的操作,需要注意的是,雙端鏈表的頭結點是一個無參構造函數的頭結點。
總結一下,線程獲取鎖的時候,過程大體如下:
a. 當沒有線程獲取到鎖時,線程1獲取鎖成功。
b. 線程2申請鎖,但是鎖被線程1占有。
c. 如果再有線程要獲取鎖,依次在隊列中往后排隊即可。
回到上邊的代碼,hasQueuedPredecessors是公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法。如果返回False,說明當前線程可以爭取共享資源;如果返回True,說明隊列中存在有效節點,當前線程必須加入到等待隊列中。
//?java.util.concurrent.locks.ReentrantLockpublic?final?boolean?hasQueuedPredecessors()?{//?The?correctness?of?this?depends?on?head?being?initialized//?before?tail?and?on?head.next?being?accurate?if?the?current//?thread?is?first?in?queue.Node?t?=?tail;?//?Read?fields?in?reverse?initialization?orderNode?h?=?head;Node?s;return?h?!=?t?&&?((s?=?h.next)?==?null?||?s.thread?!=?Thread.currentThread()); }看到這里,我們理解一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());為什么要判斷的頭結點的下一個節點?第一個節點儲存的數據是什么?
雙向鏈表中,第一個節點為虛節點,其實并不存儲任何信息,只是占位。真正的第一個有數據的節點,是在第二個節點開始的。當h != t時:
?
a. 如果(s = h.next) == null,等待隊列正在有線程進行初始化,但只是進行到了Tail指向Head,沒有將Head指向Tail,此時隊列中有元素,需要返回True(這塊具體見下邊代碼分析)。
b. 如果(s = h.next) != null,說明此時隊列中至少有一個有效節點。如果此時s.thread == Thread.currentThread(),說明等待隊列的第一個有效節點中的線程與當前線程相同,那么當前線程是可以獲取資源的;如果s.thread != Thread.currentThread(),說明等待隊列的第一個有效節點線程與當前線程不同,當前線程必須加入進等待隊列。
?
//?java.util.concurrent.locks.AbstractQueuedSynchronizer#enqif?(t?==?null)?{?//?Must?initializeif?(compareAndSetHead(new?Node()))tail?=?head; }?else?{node.prev?=?t;if?(compareAndSetTail(t,?node))?{t.next?=?node;return?t;} }?
節點入隊不是原子操作,所以會出現短暫的head != tail,此時Tail指向最后一個節點,而且Tail指向Head。如果Head沒有指向Tail(可見5、6、7行),這種情況下也需要將相關線程加入隊列中。所以這塊代碼是為了解決極端情況下的并發問題。
2.3.1.3 等待隊列中線程出隊列時機
回到最初的源碼:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerpublic?final?void?acquire(int?arg)?{if?(!tryAcquire(arg)?&&?acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))selfInterrupt(); }上文解釋了addWaiter方法,這個方法其實就是把對應的線程以Node的數據結構形式加入到雙端隊列里,返回的是一個包含該線程的Node。而這個Node會作為參數,進入到acquireQueued方法中。acquireQueued方法可以對排隊中的線程進行“獲鎖”操作。
總的來說,一個線程獲取鎖失敗了,被放入等待隊列,acquireQueued會把放入隊列中的線程不斷去獲取鎖,直到獲取成功或者不再需要獲取(中斷)。
下面我們從“何時出隊列?”和“如何出隊列?”兩個方向來分析一下acquireQueued源碼:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerfinal?boolean?acquireQueued(final?Node?node,?int?arg)?{//?標記是否成功拿到資源boolean?failed?=?true;try?{//?標記等待過程中是否中斷過boolean?interrupted?=?false;//?開始自旋,要么獲取鎖,要么中斷for?(;;)?{//?獲取當前節點的前驅節點final?Node?p?=?node.predecessor();//?如果p是頭結點,說明當前節點在真實數據隊列的首部,就嘗試獲取鎖(別忘了頭結點是虛節點)if?(p?==?head?&&?tryAcquire(arg))?{//?獲取鎖成功,頭指針移動到當前nodesetHead(node);p.next?=?null;?//?help?GCfailed?=?false;return?interrupted;}//?說明p為頭節點且當前沒有獲取到鎖(可能是非公平鎖被搶占了)或者是p不為頭結點,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅節點的waitStatus為-1),防止無限循環浪費資源。具體兩個方法下面細細分析if?(shouldParkAfterFailedAcquire(p,?node)?&&?parkAndCheckInterrupt())interrupted?=?true;}}?finally?{if?(failed)cancelAcquire(node);} }注:setHead方法是把當前節點置為虛節點,但并沒有修改waitStatus,因為它是一直需要用的數據。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?void?setHead(Node?node)?{head?=?node;node.thread?=?null;node.prev?=?null; }?
//?java.util.concurrent.locks.AbstractQueuedSynchronizer//?靠前驅節點判斷當前線程是否應該被阻塞 private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{//?獲取頭結點的節點狀態int?ws?=?pred.waitStatus;//?說明頭結點處于喚醒狀態if?(ws?==?Node.SIGNAL)return?true;?//?通過枚舉值我們知道waitStatus>0是取消狀態if?(ws?>?0)?{do?{//?循環向前查找取消節點,把取消節點從隊列中剔除node.prev?=?pred?=?pred.prev;}?while?(pred.waitStatus?>?0);pred.next?=?node;}?else?{//?設置前任節點等待狀態為SIGNALcompareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);}return?false; }parkAndCheckInterrupt主要用于掛起當前線程,阻塞調用棧,返回當前線程的中斷狀態。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?final?boolean?parkAndCheckInterrupt()?{LockSupport.park(this);return?Thread.interrupted(); }上述方法的流程圖如下:
從上圖可以看出,跳出當前循環的條件是當“前置節點是頭結點,且當前線程獲取鎖成功”。為了防止因死循環導致CPU資源被浪費,我們會判斷前置節點的狀態來決定是否要將當前線程掛起,具體掛起流程用流程圖表示如下(shouldParkAfterFailedAcquire流程):
從隊列中釋放節點的疑慮打消了,那么又有新問題了:
-
shouldParkAfterFailedAcquire中取消節點是怎么生成的呢?什么時候會把一個節點的waitStatus設置為-1?
-
是在什么時間釋放節點通知到被掛起的線程呢?
2.3.2 CANCELLED狀態節點生成
acquireQueued方法中的Finally代碼:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerfinal?boolean?acquireQueued(final?Node?node,?int?arg)?{boolean?failed?=?true;try?{...for?(;;)?{final?Node?p?=?node.predecessor();if?(p?==?head?&&?tryAcquire(arg))?{...failed?=?false;...}...}?finally?{if?(failed)cancelAcquire(node);} }通過cancelAcquire方法,將Node的狀態標記為CANCELLED。接下來,我們逐行來分析這個方法的原理:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?void?cancelAcquire(Node?node)?{//?將無效節點過濾if?(node?==?null)return;//?設置該節點不關聯任何線程,也就是虛節點node.thread?=?null;Node?pred?=?node.prev;//?通過前驅節點,跳過取消狀態的nodewhile?(pred.waitStatus?>?0)node.prev?=?pred?=?pred.prev;//?獲取過濾后的前驅節點的后繼節點Node?predNext?=?pred.next;//?把當前node的狀態設置為CANCELLEDnode.waitStatus?=?Node.CANCELLED;//?如果當前節點是尾節點,將從后往前的第一個非取消狀態的節點設置為尾節點//?更新失敗的話,則進入else,如果更新成功,將tail的后繼節點設置為nullif?(node?==?tail?&&?compareAndSetTail(node,?pred))?{compareAndSetNext(pred,?predNext,?null);}?else?{int?ws;//?如果當前節點不是head的后繼節點,1:判斷當前節點前驅節點的是否為SIGNAL,2:如果不是,則把前驅節點設置為SINGAL看是否成功//?如果1和2中有一個為true,再判斷當前節點的線程是否為null//?如果上述條件都滿足,把當前節點的前驅節點的后繼指針指向當前節點的后繼節點if?(pred?!=?head?&&?((ws?=?pred.waitStatus)?==?Node.SIGNAL?||?(ws?<=?0?&&?compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL)))?&&?pred.thread?!=?null)?{Node?next?=?node.next;if?(next?!=?null?&&?next.waitStatus?<=?0)compareAndSetNext(pred,?predNext,?next);}?else?{//?如果當前節點是head的后繼節點,或者上述條件不滿足,那就喚醒當前節點的后繼節點unparkSuccessor(node);}node.next?=?node;?//?help?GC} }當前的流程:
獲取當前節點的前驅節點,如果前驅節點的狀態是CANCELLED,那就一直往前遍歷,找到第一個waitStatus <= 0的節點,將找到的Pred節點和當前Node關聯,將當前Node設置為CANCELLED。
根據當前節點的位置,考慮以下三種情況:
當前節點是尾節點。
當前節點是Head的后繼節點。
當前節點不是Head的后繼節點,也不是尾節點。
根據上述第二條,我們來分析每一種情況的流程。
當前節點是尾節點。
當前節點是Head的后繼節點。
當前節點不是Head的后繼節點,也不是尾節點。
通過上面的流程,我們對于CANCELLED節點狀態的產生和變化已經有了大致的了解,但是為什么所有的變化都是對Next指針進行了操作,而沒有對Prev指針進行操作呢?什么情況下會對Prev指針進行操作?
(1)執行cancelAcquire的時候,當前節點的前置節點可能已經從隊列中出去了(已經執行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),如果此時修改Prev指針,有可能會導致Prev指向另一個已經移除隊列的Node,因此這塊變化Prev指針不安全。
(2)shouldParkAfterFailedAcquire方法中,會執行下面的代碼,其實就是在處理Prev指針。shouldParkAfterFailedAcquire是獲取鎖失敗的情況下才會執行,進入該方法后,說明共享資源已被獲取,當前節點之前的節點都不會出現變化,因此這個時候變更Prev指針比較安全。
?
do?{node.prev?=?pred?=?pred.prev; }?while?(pred.waitStatus?>?0);2.3.3 如何解鎖
我們已經剖析了加鎖過程中的基本流程,接下來再對解鎖的基本流程進行分析。由于ReentrantLock在解鎖的時候,并不區分公平鎖和非公平鎖,所以我們直接看解鎖的源碼:
//?java.util.concurrent.locks.ReentrantLockpublic?void?unlock()?{sync.release(1); }可以看到,本質釋放鎖的地方,是通過框架來完成的。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerpublic?final?boolean?release(int?arg)?{if?(tryRelease(arg))?{Node?h?=?head;if?(h?!=?null?&&?h.waitStatus?!=?0)unparkSuccessor(h);return?true;}return?false; }在ReentrantLock里面的公平鎖和非公平鎖的父類Sync定義了可重入鎖的釋放鎖機制。
//?java.util.concurrent.locks.ReentrantLock.Sync//?方法返回當前鎖是不是沒有被線程持有 protected?final?boolean?tryRelease(int?releases)?{//?減少可重入次數int?c?=?getState()?-?releases;//?當前線程不是持有鎖的線程,拋出異常if?(Thread.currentThread()?!=?getExclusiveOwnerThread())throw?new?IllegalMonitorStateException();boolean?free?=?false;//?如果持有線程全部釋放,將當前獨占鎖所有線程設置為null,并更新stateif?(c?==?0)?{free?=?true;setExclusiveOwnerThread(null);}setState(c);return?free; }我們來解釋下述源碼:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerpublic?final?boolean?release(int?arg)?{//?上邊自定義的tryRelease如果返回true,說明該鎖沒有被任何線程持有if?(tryRelease(arg))?{//?獲取頭結點Node?h?=?head;//?頭結點不為空并且頭結點的waitStatus不是初始化節點情況,解除線程掛起狀態if?(h?!=?null?&&?h.waitStatus?!=?0)unparkSuccessor(h);return?true;}return?false; }這里的判斷條件為什么是h != null && h.waitStatus != 0?
(1)h == null ? Head還沒初始化。初始情況下,head == null,第一個節點入隊,Head會被初始化一個虛擬節點。所以說,這里如果還沒來得及入隊,就會出現head == null 的情況。
(2)h != null && waitStatus == 0 ? 表明后繼節點對應的線程仍在運行中,不需要喚醒。
(3)h != null && waitStatus < 0 ?表明后繼節點可能被阻塞了,需要喚醒。
在看一下unparkSuccessor方法:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?void?unparkSuccessor(Node?node)?{//?獲取頭結點waitStatusint?ws?=?node.waitStatus;if?(ws?<?0)compareAndSetWaitStatus(node,?ws,?0);//?獲取當前節點的下一個節點Node?s?=?node.next;//?如果下個節點是null或者下個節點被cancelled,就找到隊列最開始的非cancelled的節點if?(s?==?null?||?s.waitStatus?>?0)?{s?=?null;//?就從尾部節點開始找,到隊首,找到隊列第一個waitStatus<0的節點。for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev)if?(t.waitStatus?<=?0)s?=?t;}//?如果當前節點的下個節點不為空,而且狀態<=0,就把當前節點unparkif?(s?!=?null)LockSupport.unpark(s.thread); }為什么要從后往前找第一個非Cancelled的節點呢?原因如下。
之前的addWaiter方法:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?Node?addWaiter(Node?mode)?{Node?node?=?new?Node(Thread.currentThread(),?mode);//?Try?the?fast?path?of?enq;?backup?to?full?enq?on?failureNode?pred?=?tail;if?(pred?!=?null)?{node.prev?=?pred;if?(compareAndSetTail(pred,?node))?{pred.next?=?node;return?node;}}enq(node);return?node; }我們從這里可以看到,節點入隊并不是原子操作,也就是說,node.prev = pred; ?compareAndSetTail(pred, node) 這兩個地方可以看作Tail入隊的原子操作,但是此時pred.next = node;還沒執行,如果這個時候執行了unparkSuccessor方法,就沒辦法從前往后找了,所以需要從后往前找。還有一點原因,在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針并未斷開,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node。
綜上所述,如果是從前往后找,由于極端情況下入隊的非原子操作和CANCELLED節點產生過程中斷開Next指針的操作,可能會導致無法遍歷所有的節點。所以,喚醒對應的線程后,對應的線程就會繼續往下執行。繼續執行acquireQueued方法以后,中斷如何處理?
2.3.4 中斷恢復后的執行流程
喚醒后,會執行return Thread.interrupted();,這個函數返回的是當前執行線程的中斷狀態,并清除。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?final?boolean?parkAndCheckInterrupt()?{LockSupport.park(this);return?Thread.interrupted(); }再回到acquireQueued代碼,當parkAndCheckInterrupt返回True或者False的時候,interrupted的值不同,但都會執行下次循環。如果這個時候獲取鎖成功,就會把當前interrupted返回。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerfinal?boolean?acquireQueued(final?Node?node,?int?arg)?{boolean?failed?=?true;try?{boolean?interrupted?=?false;for?(;;)?{final?Node?p?=?node.predecessor();if?(p?==?head?&&?tryAcquire(arg))?{setHead(node);p.next?=?null;?//?help?GCfailed?=?false;return?interrupted;}if?(shouldParkAfterFailedAcquire(p,?node)?&&?parkAndCheckInterrupt())interrupted?=?true;}}?finally?{if?(failed)cancelAcquire(node);} }如果acquireQueued為True,就會執行selfInterrupt方法。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerstatic?void?selfInterrupt()?{Thread.currentThread().interrupt(); }該方法其實是為了中斷線程。但為什么獲取了鎖以后還要中斷線程呢?這部分屬于Java提供的協作式中斷知識內容,感興趣同學可以查閱一下。這里簡單介紹一下:
(1)?當中斷線程被喚醒時,并不知道被喚醒的原因,可能是當前線程在等待中被中斷,也可能是釋放了鎖以后被喚醒。因此我們通過Thread.interrupted()方法檢查中斷標記(該方法返回了當前線程的中斷狀態,并將當前線程的中斷標識設置為False),并記錄下來,如果發現該線程被中斷過,就再中斷一次。
(2)?線程在等待資源的過程中被喚醒,喚醒后還是會不斷地去嘗試獲取鎖,直到搶到鎖為止。也就是說,在整個流程中,并不響應中斷,只是記錄中斷記錄。最后搶到鎖返回了,那么如果被中斷過的話,就需要補充一次中斷。
這里的處理方式主要是運用線程池中基本運作單元Worder中的runWorker,通過Thread.interrupted()進行額外的判斷處理,感興趣的同學可以看下ThreadPoolExecutor源碼。
2.3.5 小結
我們在1.3小節中提出了一些問題,現在來回答一下。
Q:某個線程獲取鎖失敗的后續流程是什么呢?
A:存在某種排隊等候機制,線程繼續等待,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續。
Q:既然說到了排隊等候機制,那么就一定會有某種隊列形成,這樣的隊列是什么數據結構呢?
A:是CLH變體的FIFO雙端隊列。
Q:處于排隊等候機制中的線程,什么時候可以有機會獲取鎖呢?
A:可以詳細看下2.3.1.3小節。
Q:如果處于排隊等候機制中的線程一直無法獲取鎖,需要一直等待么?還是有別的策略來解決這一問題?
A:線程所在節點的狀態會變成取消狀態,取消狀態的節點會從隊列中釋放,具體可見2.3.2小節。
Q:Lock函數通過Acquire方法進行加鎖,但是具體是如何加鎖的呢?
A:AQS的Acquire會調用tryAcquire方法,tryAcquire由各個自定義同步器實現,通過tryAcquire完成加鎖過程。
3 AQS應用
3.1 ReentrantLock的可重入應用
ReentrantLock的可重入性是AQS很好的應用之一,在了解完上述知識點以后,我們很容易得知ReentrantLock實現可重入的方法。在ReentrantLock里面,不管是公平鎖還是非公平鎖,都有一段邏輯。
公平鎖:
//?java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquireif?(c?==?0)?{if?(!hasQueuedPredecessors()?&&?compareAndSetState(0,?acquires))?{setExclusiveOwnerThread(current);return?true;} } else?if?(current?==?getExclusiveOwnerThread())?{int?nextc?=?c?+?acquires;if?(nextc?<?0)throw?new?Error("Maximum?lock?count?exceeded");setState(nextc);return?true; }非公平鎖:
//?java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquireif?(c?==?0)?{if?(compareAndSetState(0,?acquires)){setExclusiveOwnerThread(current);return?true;} } else?if?(current?==?getExclusiveOwnerThread())?{int?nextc?=?c?+?acquires;if?(nextc?<?0)?//?overflowthrow?new?Error("Maximum?lock?count?exceeded");setState(nextc);return?true; }從上面這兩段都可以看到,有一個同步狀態State來控制整體可重入的情況。State是Volatile修飾的,用于保證一定的可見性和有序性。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?volatile?int?state;接下來看State這個字段主要的過程:
(1) State初始化的時候為0,表示沒有任何線程持有鎖。
(2)?當有線程持有該鎖時,值就會在原來的基礎上+1,同一個線程多次獲得鎖是,就會多次+1,這里就是可重入的概念。
(3)?解鎖也是對這個字段-1,一直到0,此線程對鎖釋放。
3.2 JUC中的應用場景
除了上邊ReentrantLock的可重入性的應用,AQS作為并發編程的框架,為很多其他同步工具提供了良好的解決方案。下面列出了JUC中的幾種同步工具,大體介紹一下AQS的應用場景:
| ReentrantLock | 使用AQS保存鎖重復持有的次數。當一個線程獲取鎖時,ReentrantLock記錄當前獲得鎖的線程標識,用于檢測是否重復獲取,以及錯誤線程試圖解鎖操作時異常情況的處理。 |
| ReentrantReadWriteLock | 使用AQS同步狀態中的16位保存寫鎖持有的次數,剩下的16位用于保存讀鎖的持有次數。 |
| Semaphore | 使用AQS同步狀態來保存信號量的當前計數。tryRelease會增加計數,acquireShared會減少計數。 |
| CountDownLatch | 使用AQS同步狀態來表示計數。計數為0時,所有的Acquire操作(CountDownLatch的await方法)才可以通過。 |
| ThreadPoolExecutor | Worker利用AQS同步狀態實現對獨占線程變量的設置(tryAcquire和tryRelease)。 |
3.3 自定義同步工具
了解AQS基本原理以后,按照上面所說的AQS知識點,自己實現一個同步工具。
public?class?LeeLock??{private?static?class?Sync?extends?AbstractQueuedSynchronizer?{@Overrideprotected?boolean?tryAcquire?(int?arg)?{return?compareAndSetState(0,?1);}@Overrideprotected?boolean?tryRelease?(int?arg)?{setState(0);return?true;}@Overrideprotected?boolean?isHeldExclusively?()?{return?getState()?==?1;}}private?Sync?sync?=?new?Sync();public?void?lock?()?{sync.acquire(1);}public?void?unlock?()?{sync.release(1);} }通過我們自己定義的Lock完成一定的同步功能。
public?class?LeeMain?{static?int?count?=?0;static?LeeLock?leeLock?=?new?LeeLock();public?static?void?main?(String[]?args)?throws?InterruptedException?{Runnable?runnable?=?new?Runnable()?{@Overridepublic?void?run?()?{try?{leeLock.lock();for?(int?i?=?0;?i?<?10000;?i++)?{count++;}}?catch?(Exception?e)?{e.printStackTrace();}?finally?{leeLock.unlock();}}};Thread?thread1?=?new?Thread(runnable);Thread?thread2?=?new?Thread(runnable);thread1.start();thread2.start();thread1.join();thread2.join();System.out.println(count);} }上述代碼每次運行結果都會是20000。通過簡單的幾行代碼就能實現同步功能,這就是AQS的強大之處。
總結
我們日常開發中使用并發的場景太多,但是對并發內部的基本框架原理了解的人卻不多。由于篇幅原因,本文僅介紹了可重入鎖ReentrantLock的原理和AQS原理,希望能夠成為大家了解AQS和ReentrantLock等同步器的“敲門磚”。
參考資料
-
Lea D. The java. util. concurrent synchronizer framework[J]. Science of Computer Programming, 2005, 58(3): 293-309.
-
《Java并發編程實戰》
-
不可不說的Java“鎖”事
總結
- 上一篇: 复方枸杞子颗粒_功效作用注意事项用药禁忌
- 下一篇: Learning to rank基本算法