并发编程之ReentrantLock--Condition
介紹
Condition是條件鎖的意思,是指在獲取鎖之后發(fā)現(xiàn)當前業(yè)務(wù)場景自己無法處理,而需要等待某個條件的出現(xiàn)才可以繼續(xù)處理時使用的一種鎖。
 最常見的生產(chǎn)者消費者模式,生產(chǎn)的元素都被消費完,則需要阻塞消費者,不可繼續(xù)消費直到生產(chǎn)者生產(chǎn)新的后喚醒消費者;或者生產(chǎn)者生產(chǎn)過快,容器已滿,則需要阻塞生產(chǎn)者,直到消費者消費一定數(shù)量才喚醒生產(chǎn)者。
Condition是一個接口,其定義如下
public interface Condition {//阻塞當前線程,等待同一個Condition對象上的signal()/signalAll()喚醒//該方法響應(yīng)中斷,如果發(fā)生中斷,該方法拋出InterruptedException異常void await() throws InterruptedException;///阻塞當前線程,等待同一個Condition對象上的signal()/signalAll()喚醒//與上面方法的區(qū)別是,該方法等待過程中不響應(yīng)中斷void awaitUninterruptibly();//阻塞線程,線程被阻塞指定的時間//當線程被中斷、超時或者signal()/signalAll(),都會喚醒線程long awaitNanos(long nanosTimeout) throws InterruptedException;//同awaitNanos()boolean await(long time, TimeUnit unit) throws InterruptedException;//同awaitNanos()boolean awaitUntil(Date deadline) throws InterruptedException;//喚醒一個等待線程void signal();//喚醒所有的等待線程void signalAll(); }AbstractQueuedSynchronizer提供了一個Condition的實現(xiàn)類ConditionObject,可以通過調(diào)用Lock.newCondition()獲得一個Condition對象。每個Condition對象都與一個Lock對象相關(guān),調(diào)用Condition對象的方法前必須獲得對應(yīng)Lock對象的鎖。
 Condition的作用與Object的wait()/notify()作用類似,調(diào)用await()可以阻塞當前線程,signal()/signalAll()可以喚醒其他阻塞線程。
基本使用
public class ConditionThread{public static void main(String[] args) throws InterruptedException {ReentrantLock lock = new ReentrantLock();Condition condition = lock.newCondition();new Thread(()->{try {lock.lock(); try {System.out.println("await阻塞前"); // 等待條件condition.await(); System.out.println("await阻塞后"); } finally {lock.unlock();System.out.println("await阻塞線程解鎖");}} catch (InterruptedException e) {e.printStackTrace();}}).start();//睡眠讓await線程先拿到鎖Thread.sleep(1000);lock.lock(); try {System.out.println("signal喚醒前");condition.signal();System.out.println("signal喚醒后");} finally {System.out.println("signal喚醒線程解鎖");lock.unlock(); }} }運行結(jié)果:
await阻塞前 signal喚醒前 signal喚醒后 signal喚醒線程解鎖 await阻塞后 await阻塞線程解鎖可以看出,上方線程先拿到鎖,執(zhí)行到await方法后阻塞了,下方線程拿到鎖,執(zhí)行signal,直到下方線程執(zhí)行完unlock操作,上方線程被喚醒,繼續(xù)后續(xù)操作。
源碼解析
AQS中有兩個隊列,一個為AQS同步隊列,未拿到鎖的線程會放入此隊列中;另一個是
 Condition隊列,調(diào)用await會放入到Condition隊列。兩個有幾點不同,AQS同步隊列是雙向的,而Condition隊列是單向的;AQS同步隊列的首節(jié)點是虛擬節(jié)點,而Condition隊列的首節(jié)點是真實節(jié)點,參與排隊。
 Condition隊列如下圖:(AQS同步隊列結(jié)構(gòu)可見前文)
 
await
public final void await() throws InterruptedException {//線程中斷,拋出異常if (Thread.interrupted())throw new InterruptedException();//添加節(jié)點到Condition隊列中,并返回該節(jié)點Node node = addConditionWaiter();// 完全釋放當前線程獲取的鎖// 因為鎖是可重入的,state >= 1,所以這里要把獲取的鎖全部釋放int savedState = fullyRelease(node);int interruptMode = 0;//是否在同步隊列中//注意這是aqs同步隊列,區(qū)分Condition隊列while (!isOnSyncQueue(node)) {// 阻塞當前線程LockSupport.park(this);// 上面部分是調(diào)用await()時釋放自己占有的鎖,并阻塞自己等待條件的出現(xiàn)// *************************分界線************************* //// 下面部分是條件已經(jīng)出現(xiàn),嘗試去獲取鎖//中斷線程一系列操作if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 嘗試獲取鎖// 如果沒獲取到會再次阻塞(前文對此方法有說明)if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 清除取消的節(jié)點if (node.nextWaiter != null)unlinkCancelledWaiters();// 線程中斷相關(guān)if (interruptMode != 0)//拋出InterruptedException,重新中斷當前線程//或不執(zhí)行任何操作,具體取決于interruptMode值reportInterruptAfterWait(interruptMode);}addConditionWaiter新建Node節(jié)點添加至Condition隊列
private Node addConditionWaiter() {//獲取條件隊列的尾節(jié)點Node t = lastWaiter;// 如果條件隊列的尾節(jié)點不為初始狀態(tài),從頭節(jié)點開始清除所有不為初始狀態(tài)節(jié)點if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 新建一個節(jié)點,它的等待狀態(tài)是CONDITIONNode node = new Node(Thread.currentThread(), Node.CONDITION);// 如果尾節(jié)點為空,則把新節(jié)點賦值給頭節(jié)點(相當于初始化隊列)// 否則把新節(jié)點賦值給尾節(jié)點的nextWaiter指針if (t == null)firstWaiter = node;elset.nextWaiter = node;// 尾節(jié)點指向新節(jié)點lastWaiter = node;// 返回新節(jié)點return node;}fullyRelease 完全釋放鎖
final int fullyRelease(Node node) {boolean failed = true;try {//獲取state,state >= 1,下面方法將state改為0表示釋放鎖int savedState = getState();//釋放可重入鎖,此方法上文有介紹if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {//失敗將waitStatus設(shè)置為1,表示已取消的節(jié)點if (failed)node.waitStatus = Node.CANCELLED;}}isOnSyncQueue判斷是否在AQS同步隊列。
 我們先說明下AQS同步隊列和Condition隊列中waitStatus等待狀態(tài)的區(qū)別:
 1、在Condition隊列中,新建節(jié)點的初始等待狀態(tài)是CONDITION(-2);
 2、移到AQS的隊列中時等待狀態(tài)會更改為0(AQS同步隊列節(jié)點的初始等待狀態(tài)為0);
 3、在AQS的隊列中如果需要阻塞,會把它上一個節(jié)點的等待狀態(tài)設(shè)置為SIGNAL(-1);
 4、Condition隊列和AQS隊列中,已取消節(jié)點的等待狀態(tài)設(shè)置為CANCELLED(1);
 5、共享鎖的時候還有另外一種等待狀態(tài)叫PROPAGATE(-3)。
findNodeFromTail 從AQS的尾節(jié)點開始往前尋找看是否可以找到當前節(jié)點
private boolean findNodeFromTail(Node node) {Node t = tail;//從tail尾節(jié)點往前循環(huán)//存在node節(jié)點返回truefor (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}}unlinkCancelledWaiters 清除取消的節(jié)點
private void unlinkCancelledWaiters() {//取Condition隊列首節(jié)點Node t = firstWaiter;Node trail = null;//從首節(jié)點開始循環(huán)Condition隊列while (t != null) {Node next = t.nextWaiter;//waitStatus 不為初始狀態(tài),則從Condition隊列中清除if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}}signal
public final void signal() {// 判斷持有鎖的線程不為當前線程,拋出異常// 說明signal()要在獲取鎖之后執(zhí)行if (!isHeldExclusively())throw new IllegalMonitorStateException();//取Condition隊列首元素,執(zhí)行doSignal操作Node first = firstWaiter;if (first != null)doSignal(first);}doSignal執(zhí)行喚醒操作,循環(huán)執(zhí)行transferForSignal方法,直到transferForSignal方法返回true或執(zhí)行到尾節(jié)點,意思從頭開始循環(huán),成功喚醒一位則退出循環(huán)
private void doSignal(Node first) {do {// 移到條件隊列的頭節(jié)點往后一位if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;// 相當于把頭節(jié)點從隊列中出隊first.nextWaiter = null;// 轉(zhuǎn)移節(jié)點到AQS隊列中} while (!transferForSignal(first) &&(first = firstWaiter) != null);}transferForSignal將Node節(jié)點從Condition隊列移動到AQS同步隊列
final boolean transferForSignal(Node node) {// 把節(jié)點的狀態(tài)更改為0,也就是說即將移到AQS隊列中// 如果失敗了,說明節(jié)點已經(jīng)被改成取消狀態(tài)// 返回false,通過上面的循環(huán)可知會尋找下一個可用節(jié)點if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 調(diào)用AQS的入隊方法把節(jié)點移到AQS的隊列中// 注意,這里enq()的返回值是node的上一個節(jié)點,也就是舊尾節(jié)點Node p = enq(node);// 上一個節(jié)點的等待狀態(tài)int ws = p.waitStatus;// 如果上一個節(jié)點已取消了,或者更新狀態(tài)為SIGNAL失敗(也是說明上一個節(jié)點已經(jīng)取消了)// 則直接喚醒當前節(jié)點對應(yīng)的線程if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);// 如果更新上一個節(jié)點的等待狀態(tài)為SIGNAL成功了// 則返回true,這時上面的循環(huán)不成立了,退出循環(huán),也就是只通知了一個節(jié)點// 此時當前節(jié)點還是阻塞狀態(tài)// 也就是說調(diào)用signal()的時候并不會真正喚醒一個節(jié)點// 只是把節(jié)點從條件隊列移到AQS隊列中return true;}從上述源碼可知,signal()方法并不會真正喚醒一個節(jié)點,只是把節(jié)點從Condition條件隊列移到AQS隊列中,一直等到unlock解鎖后才會真正喚醒。
signalAll
public final void signalAll() {// 判斷持有鎖的線程不為當前線程,拋出異常// 說明signal()要在獲取鎖之后執(zhí)行if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);}doSignalAll和doSignal很相似,doSignal從Condition隊列頭元素往后循環(huán),成功轉(zhuǎn)移一位元素則完成,而doSignalAll將所有符合條件的元素轉(zhuǎn)移至AQS同步隊列
private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;//從頭節(jié)點開始循環(huán)轉(zhuǎn)移,至到最后一個元素do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);}總結(jié)
await()方法的流程:
 1、新建一個節(jié)點加入到條件隊列中去;
 2、完全釋放當前線程占有的鎖;
 3、阻塞當前線程,并等待條件的出現(xiàn);
 8、條件已出現(xiàn)(此時節(jié)點已經(jīng)移到AQS的隊列中),嘗試獲取鎖;
signal()方法的流程為:456
 4、從條件隊列的頭節(jié)點開始尋找一個非取消狀態(tài)的節(jié)點;
 5、把它從條件隊列移到AQS隊列;
 6、且只移動一個節(jié)點
7、signal線程unlock釋放鎖
需要注意一點:signal()方法后,最終會執(zhí)行unlock()方法,此時才會真正喚醒一個節(jié)點,喚醒的這個節(jié)點如果曾經(jīng)是條件節(jié)點的話又會繼續(xù)執(zhí)行await()方法LockSupport.park(this)下面的代碼。
總結(jié)
以上是生活随笔為你收集整理的并发编程之ReentrantLock--Condition的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: POJ3159 Candies 差分
- 下一篇: 基于TensorFlow的SSD车辆检测
