ConditionObject源码
介紹
Condition是在JDK1.5中才出現的,它可以替代傳統的Object中的wait()、notify()和notifyAll()方法來實現線程間的通信,使線程間協作更加安全和高效。
Condition是一個接口,它的定義如下:
public interface Condition {void await() throws InterruptedException;void awaitUninterruptibly();long awaitNanos(long nanosTimeout) throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;boolean awaitUntil(Date deadline) throws InterruptedException;void signal();void signalAll(); }AQS的ConditionObject類實現了Condition接口
實現原理
ConditionObject類內部由條件隊列存儲每個需要等待的線程。條件隊列必須與一個獨占模式的鎖綁定,在執行await,signal之前必須先acquire到鎖。
條件隊列是一個FIFO隊列,是一個單向鏈表,包含firstWaiter,lastWaiter 2個指示節點,firstWaiter指向第一等待的節點,該節點是綁定線程的,與AQS的等待隊列的head節點不同。一個節點當被signal后,會由條件隊列轉移到等待隊列中。
源碼分析
源碼
await
//await方法是能夠響應中斷的。 public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 添加節點到Condition隊列中Node node = addConditionWaiter(); //【1】// 釋放當前線程的lock(要把控制權釋放出去,由其他線程獲取到鎖),從AQS的隊列中移出//此處savedState 返回的是線程獲取到的資源數據(比如ReentranceLock支持多次lock)int savedState = fullyRelease(node); //【2】int interruptMode = 0;// 循環判斷當前線程的Node是否在Sync隊列中(被singal之后,會把節點移動到sync隊列),如果不在(不在說明未被signal),則parkwhile (!isOnSyncQueue(node)) {LockSupport.park(this); //如果發生中斷,會理解從park返回。 //【3】// checkInterruptWhileWaiting方法根據中斷發生的時機返回后續需要處理這次中斷的方式, 如果發生中斷,退出循環if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// acquireQueued需要再次獲取到savedState 對應的資源(即釋放多少再獲取多少)if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 從頭到尾遍歷Condition隊列,移除被cancel的節點if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();// 如果線程已經被中斷,則根據之前獲取的interruptMode的值來判斷是繼續中斷還是拋出異常if (interruptMode != 0)reportInterruptAfterWait(interruptMode); }await方法首先(調用addConditionWaiter)根據當前線程創建了一個Node(waitStauts為CONDITION),然后釋放當前線程的獨占鎖。這里的savedState表示當前線程已經加鎖的次數(ReentrantLock為重入鎖)。while循環其實就是一直判斷,當前的線程是否又被添加到了Sync隊列中,如果已經在Sync隊列中,則退出循環。調用signal方法的時候,因為這里需要喚醒之前調用await方法的線程,所以會把當前線程又加入到Sync隊列中。
final int fullyRelease(Node node) {boolean failed = true;try {//占用了多少資源(這次釋放多少,下次acquire時仍要獲取多少)int savedState = getState();if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;} } /* 該方法判斷當前線程的node是否在Sync隊列中。 */ final boolean isOnSyncQueue(Node node) {//如果當前線程node的狀態是CONDITION或者node.prev為null時說明已經在Condition隊列中了,所以返回false;if (node.waitStatus == Node.CONDITION || node.prev == null)return false;//如果node.next不為null,說明在Sync隊列中,返回true;if (node.next != null) // If has successor, it must be on queuereturn true;/**如果兩個if都未返回時,可以斷定node的prev一定不為null,next一定為null(因為node為lastWaiter),*這個時候可以認為node正處于放入Sync隊列的執行CAS操作執行過程中(enq 函數調用中)。*而這個CAS操作有可能失敗,所以通過findNodeFromTail再嘗試一次判斷。*/return findNodeFromTail(node); }private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;} }private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;// 執行findNodeFromTail方法時可能一直在此自旋if (compareAndSetTail(t, node)) {t.next = node;return t;}}} }中斷處理:
/* 判斷等待過程中是否發生中斷 */ private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0; }/* 判斷中斷的時候,是否有signal方法的調用 */ final boolean transferAfterCancelledWait(Node node) {/*CAS成功,說明signal還未執行,因為signal之后,會把waitStatus修改為SIGNAL。*/if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {//未signal,則把節點加入到sync隊列中去。enq(node);//返回true。表示未發生中斷。return true;}/**CAS失敗了,則不能判斷當前線程是先進行了中斷還是先進行了signal方法的調用,可能是先執行了*signal然后中斷,也可能是先執行了中斷,后執行了signal,當然,這兩個操作肯定是發生在CAS之*前。這時需要做的就是等待當前線程的node被添加到Sync隊列后,也就是enq方法返回后,返回false*告訴checkInterruptWhileWaiting方法返回REINTERRUPT,后續進行重新中斷。*/while (!isOnSyncQueue(node))Thread.yield();return false; }signal
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}public final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);}不管是signal,還是signalAll,都需要先取得lock。
/* 喚醒第一個可以被喚醒的節點。 */ private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null); //【4】 }//釋放條件隊列,把每個節點都轉化為sync節點 private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null); }
?
?
示例
public class ConTest2 { private int queueSize = 5; private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize); private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public static void main(String[] args) throws InterruptedException { ConTest2 test = new ConTest2(); Producer producer1 = test.new Producer(); Consumer consumer1 = test.new Consumer(); Consumer consumer2 = test.new Consumer(); producer1.start(); Thread.sleep(1000);consumer1.start(); consumer2.start();} class Consumer extends Thread{ @Override public void run() { consume(); } volatile boolean flag=true; private void consume() { while(flag){ lock.lock(); try { while(queue.size() == 0){ try { System.out.println("隊列空,等待數據"); notEmpty.await(); } catch (InterruptedException e) { flag =false; } } queue.poll(); //每次移走隊首元素 notFull.signal(); System.out.println("從隊列取走一個元素,隊列剩余"+queue.size()+"個元素"); } finally{ lock.unlock(); } } } } class Producer extends Thread{ @Override public void run() { produce(); } volatile boolean flag=true; private void produce() { while(flag){ lock.lock(); try { while(queue.size() == queueSize){ try { System.out.println("隊列滿,等待有空余空間"); notFull.await(); } catch (InterruptedException e) { flag =false; } } queue.offer(1); //每次插入一個元素 notEmpty.signal(); System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+(queueSize-queue.size())); } finally{ lock.unlock(); } } } } }以上示例為典型生產者-消費者模式,p1,最先獲取到lock。
AQS圖例
1、P1,C1,C2線程開始,假設P1先獲取到lock,并且把隊列寫滿,C1,C2才啟動。此時C1,C2線程獲取lock失敗,加入同步隊列中。
2、P1執行,發現隊列滿,執行notFull.await()方法,經過【1】,【2】在【3】處park,隊列狀態如下:
此時,C1線程被unpark,P1線程在條件隊列notFull中。
2、C1執行 lock方法成功,然后移出一個元素,調用notFull.signal(),經過【4】,【5】,【6】之后,P1關聯節點由條件隊列轉移到同步隊列
?
3、C1釋放鎖,假設C1一直可以獲取到鎖,再釋放鎖,這樣P1,C2一直會在sync隊列中。一直到C1把隊列消費完,調用notEmpty.await(),C2獲取到鎖,但是由于隊列為空,C2也調用notEmpty.await(),此時P1獲取到鎖。狀態如下:
4、當P1調用notEmpty.signalAll()時,把C1,C2對應節點再加入到sync隊列
?
隊列節點的狀態
? 調用條件隊列的等待操作,會設置節點的waitingStatus為Condition,標識當前節點正處于條件隊列中。條件隊列的節點狀態轉換圖如下:
???????? Node的各個狀態的主要作用:Cancelled主要是解決線程在持有鎖時被外部中斷的邏輯,AQS的可中斷鎖獲取方法lockInterrutible()是基于該狀態實現的。顯式鎖必須手動釋放鎖,尤其是有中斷的環境中,一個線程被中斷可能仍然持有鎖,所以必須注意在finally中unlock。Condition則是支持條件隊列的等待操作,是Lock與條件隊列關聯的基礎。Signal是阻塞后繼線程的標識,一個等待線程只有在其前驅節點的狀態是SIGNAL時才會被阻塞,否則一直執行自旋嘗試操作,以減少線程調度的開銷。
條件隊列上的等待和喚醒操作,本質上是節點在AQS線程等待隊列和條件隊列之間相互轉移的過程,當需要等待某個條件時,線程會將當前節點添加到條件隊列中,并釋放持有鎖;當某個線程執行條件隊列的喚醒操作,則會將條件隊列的節點轉移到AQS等待隊列。每個Condition就是一個條件隊列,可以通過Lock的newCondition創建多個等待條件。AQS的條件隊列,它的等待和喚起操作流程如下:
?
?
await與awaitUninterruptibly()比較
?await()方法是可中斷方法,如果有中斷拋出中斷異常。
awaitUninterruptibly(),如果有中斷,僅重新設置中斷狀態。
await方法的幾種實現
與Object類中wait,notify比較
Condition與Object類中的方法對應如下:
| wait() | await() |
| notify() | signal() |
| notifyAll() | signalAll() |
Condition.await()與Object.wait(),都必須先獲取到鎖,才可以執行,執行后釋放鎖
不同的是Condition與Lock結合,wait與synchronized結合。
多線程環境的下,線程直接的互斥[執行]依靠的應該是鎖Lock,線程的之間的[通信]依靠的應該是條件Condition/信號,一般情況下lock確實可以同時滿足做這兩個事情,所以在Object的方式滿足了這個一般情況,但是肯定會有復雜的場景比如剛才例子中,需要讓滿足一定條件的線程執行,僅僅依靠鎖是不能完美解決的。所以condition實際上分離了執行和通信。
?
?
?
?
?
總結
以上是生活随笔為你收集整理的ConditionObject源码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: AbstractQueuedSynchr
- 下一篇: ThreadLocal以及增强