AbstractQueuedSynchronizer源码解析
目錄
?
關(guān)于AbstractQueuedSynchronizer
基本數(shù)據(jù)結(jié)構(gòu)
節(jié)點結(jié)構(gòu)
同步隊列結(jié)構(gòu)
實現(xiàn)
子類需要實現(xiàn)的方法
獨占模式實現(xiàn)
獨占模式同步隊列示意
共享模式
共享模式同步隊列示意:
關(guān)于AbstractQueuedSynchronizer
JDK1.5之后引入了并發(fā)包java.util.concurrent,里面包含了很多并發(fā)控制鎖類,其核心是:AbstractQueuedSynchronizer,其數(shù)據(jù)結(jié)構(gòu)為鏈表方式的雙向隊列。
基本數(shù)據(jù)結(jié)構(gòu)
節(jié)點結(jié)構(gòu)
鏈表節(jié)點的字段含義如下:
| 字段 | 類型 | 初始值 | 意義 |
| SHARED | final Node | 任意一個Node對象 | 一個指示器,用于標(biāo)識Node處于共享模式。 |
| EXCLUSIVE | final Node | null | 一個指示器,用于標(biāo)識Node處于獨占模式。 |
| CANCELLED | final int | 1 | waitStatus值,表示Node處于取消狀態(tài)。一般當(dāng)Node處于超時或者中斷,設(shè)置此值。取消節(jié)點關(guān)聯(lián)的線程不會重新阻塞。 |
| SIGNAL | final int | -1 | waitStatus值,表示Node的后續(xù)Node 已經(jīng)或者即將通過park 阻塞。當(dāng)此節(jié)點取消或者release時,后續(xù)節(jié)點需要unpark。為了避免競爭。acquire方式必須指示需要SIGNAL,重試acquire,在失敗的情況下阻塞。 |
| CONDITION | final int | -2 | waitStatus值,表示節(jié)點處于等待隊列。當(dāng)在某個時間點set to 0 ,用于同步隊列。 |
| PROPAGATE | final int | -3 | waitStatus值,用于共享模式,表示下一次acquire無條件的傳播。 |
| waitStatus | volatile int | 0 | 節(jié)點狀態(tài) |
| prev | volatile Node | null | 前置節(jié)點 |
| next | volatile Node | null | 后續(xù)節(jié)點 |
| thread | volatile Thread | null | 關(guān)聯(lián)線程 |
| nextWaiter | Node | null | 指向下一個Node is waiting on condition。或者指向SHARED,EXCLUSIVE表示模式。 |
同步隊列結(jié)構(gòu)
| 字段 | 類型 | 初始值 | 意義 |
| head | volatile Node | null | 同步隊列頭節(jié)點,延遲初始化,必須通過setHead方法設(shè)置值。 |
| tail | volatile Node | null | 同步隊列尾節(jié)點,延遲初始化,通過enq方法設(shè)置。 |
| state | volatile int | 0 | 狀態(tài)。用于追蹤同步狀態(tài),具體由各子類處理。 |
?
?
?
?
實現(xiàn)
AbstractQueuedSynchronizer僅實現(xiàn)抽象方法控制并發(fā),由子類實現(xiàn)具體的資源控制。
子類需要實現(xiàn)的方法
| 方法 | 意義 |
| tryAcquire | 嘗試獨占模式下acquire,失敗則進(jìn)入同步隊列。 |
| tryRelease | 嘗試獨占模式下release。 |
| tryAcquireShared | 嘗試共享模式下acquire,失敗則進(jìn)入同步隊列。 |
| tryReleaseShared | 嘗試共享模式下release。 |
| isHeldExclusively | 指示同步器是否在獨占模式下被當(dāng)前線程占用 |
獨占模式實現(xiàn)
/**/public final void acquire(int arg) {if (!tryAcquire(arg) && --acquire失敗,/* 第一嘗試acquire失敗后,加入到同步隊列,會繼續(xù)嘗試*/acquireQueued(/* acquire失敗,則加入一個節(jié)點到同步隊列,節(jié)點為獨占模式。*/addWaiter(Node.EXCLUSIVE), arg))/*acquireQueued 返回的是中斷標(biāo)識(中斷標(biāo)識以清除),如果為true ,重新設(shè)置中斷標(biāo)識*/selfInterrupt();}???
/* 節(jié)點加入同步隊列之后,會嘗試acquire。如果失敗會通過park阻塞。 */ final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {//中斷標(biāo)志,標(biāo)識此線程是否設(shè)置過中斷標(biāo)識。boolean interrupted = false;for (;;) {final Node p = node.predecessor(); //如果前一個節(jié)點是head節(jié)點,(head節(jié)點不關(guān)聯(lián)線程),即本節(jié)點是第一個acquire失敗的節(jié)點,則嘗試acquire。if (p == head && tryAcquire(arg)) {//acquire成功,則把此節(jié)點設(shè)置為head節(jié)點(thread關(guān)聯(lián)解除),返回中斷標(biāo)識。//【10】setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//不是第一個accquire失敗的節(jié)點,則判斷是否通過park阻塞。if (//判斷是否需要park。shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())//此線程 曾經(jīng)被中斷過。(中斷標(biāo)識被清除了)。interrupted = true;}} finally {//僅當(dāng)出現(xiàn)異常時,才會進(jìn)入此代碼塊,一般timeout或者中斷,此時需取消節(jié)點。if (failed)cancelAcquire(node);}} /* 判斷是否需要park */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//前一個節(jié)點的status為SIGNAL,表示后續(xù)節(jié)點需要parkif (ws == Node.SIGNAL) //【4】return true;if (ws > 0) {//ws > 0 ,僅當(dāng)status=CANCELLED。則把取消節(jié)點剔除同步隊列。do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/*狀態(tài)為0或者PROPAGATE,獨占模式為0,共享模式為PROPAGATE。則需要設(shè)置為SIGNAL,表示即將park(是否park由下一次acquire決定)。*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //【3】}return false;} private final boolean parkAndCheckInterrupt() {LockSupport.park(this); //【5】/**********此處時重點,以上代碼執(zhí)行后,線程會立即掛起。當(dāng)線程unpark后,后續(xù)代碼接著運行。*///返回線程的中斷標(biāo)識。同時清除中斷標(biāo)識。return Thread.interrupted();}?
/*取消acquire*/private void cancelAcquire(Node node) {// 節(jié)點不存在,返回。if (node == null)return;//取消線程關(guān)聯(lián)。node.thread = null;// 跳過前置節(jié)點中的CANCELLED節(jié)點Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;Node predNext = pred.next;//設(shè)置狀態(tài)為CANCELLEDnode.waitStatus = Node.CANCELLED;// 如果是尾節(jié)點,僅釋放自身。if (node == tail && compareAndSetTail(node, pred)) {//設(shè)置前置節(jié)點的next為null。compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;if (pred != head && //前置節(jié)點不為head。((ws = pred.waitStatus) == Node.SIGNAL //前置節(jié)點為CANCELLED||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) //前置節(jié)點為0,并且設(shè)置為CANCELLED成功) &&pred.thread != null //關(guān)聯(lián)線程) {//以上判斷條件,表示前置節(jié)點不為head節(jié)點,并且為SIGNALNode next = node.next;if (next != null && next.waitStatus <= 0)//如果本節(jié)點之后仍有后續(xù)節(jié)點,則剔除本節(jié)點,修改指針。//此處未處理node.next.prev,node節(jié)點通過next是剔除的,通過prev是可以訪問得到的。compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);}node.next = node; // help GC}}?
private Node addWaiter(Node mode) {/* 構(gòu)造一個節(jié)點,關(guān)聯(lián)到當(dāng)前線程*/Node node = new Node(Thread.currentThread(), mode);// 為了提高性能,先嘗試一次加入同步隊列。失敗再嘗試enq方式。enq方式時自旋方式(即死循環(huán))Node pred = tail;if (pred != null) { node.prev = pred;//CASif (compareAndSetTail(pred, node)) { //【6】pred.next = node;return node;}}enq(node);return node;} /* 節(jié)點加入同步隊列,并返回此節(jié)點。 采用自旋方式加入, */private Node enq(final Node node) {//自旋for (;;) {Node t = tail;//tail為null,則head必定為null,生成一個node作為head。if (t == null) { // Must initializeif (compareAndSetHead(new Node())) //【1】tail = head;} else {//加入節(jié)點成尾節(jié)點,并返回。node.prev = t;if (compareAndSetTail(t, node)) { //【2】t.next = node;return t;}}}} public final boolean release(int arg) {if (tryRelease(arg)) { //【7】//嘗試成功Node h = head;if (h != null && h.waitStatus != 0) //head節(jié)點不為null,并且head狀態(tài)不為0(在獨占模式下,不為0,也不可能為CANCELLED,則為SIGNAL),則unpark后續(xù)節(jié)點。unparkSuccessor(h); //【8】return true;}return false;} private void unparkSuccessor(Node node) {/*status < 0 ,則把status設(shè)置為0*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {//后續(xù)節(jié)點為null,或者狀態(tài)為CANCELLED。s = null;//從tail往前查找到第一個status<0的節(jié)點,選中作為要unpark的節(jié)點for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)//unpark節(jié)點對應(yīng)的線程。LockSupport.unpark(s.thread); //【9】}獨占模式同步隊列示意
public class AbstractQueuedSynchronizerTest {@Testpublic void testAbstractQueuedSynchronizer() {Lock lock = new ReentrantLock();Runnable runnable0 = new ReentrantLockThread(lock);Thread thread0 = new Thread(runnable0);thread0.setName("t-0");Runnable runnable1 = new ReentrantLockThread(lock);Thread thread1 = new Thread(runnable1);thread1.setName("t-1");Runnable runnable2 = new ReentrantLockThread(lock);Thread thread2 = new Thread(runnable2);thread2.setName("t-2");Runnable runnable3 = new ReentrantLockThread(lock);Thread thread2 = new Thread(runnable3);thread3.setName("t-3");Runnable runnable4 = new ReentrantLockThread(lock);Thread thread4 = new Thread(runnable4);thread4.setName("t-4");thread0.start();thread1.start();thread2.start();thread3.start();thread4.start();thread2.interrupt();for (;;);}private class ReentrantLockThread implements Runnable {private Lock lock;public ReentrantLockThread(Lock lock) {this.lock = lock;}@Overridepublic void run() {try {lock.lock();for (int i=0;i<1000000;i++);} finally {lock.unlock();}}}}以前假設(shè)各線程按順序啟動
隊列變化如下:
1、同步隊列初始化。thread:t-0 acquire成功。
2、thread:t-1 啟動,執(zhí)行代碼【1】,【2】,【3】,【4】處后狀態(tài)如下圖,然后執(zhí)行【5】阻塞。
3、thread:t-2 啟動,執(zhí)行代碼【6】,【3】,【4】處后狀態(tài)如下圖,然后執(zhí)行【5】阻塞。
3、thread:t-3 啟動,執(zhí)行代碼【6】,【3】,【4】處后狀態(tài)如下圖,然后執(zhí)行【5】阻塞。
4、thread:t-0,釋放,執(zhí)行代碼【7】,【8】,經(jīng)過【9】后,t-1 線程在代碼【5】繼續(xù)執(zhí)行。經(jīng)過【10】后狀態(tài)如下圖
5、thread,t-2,t-3,t-4類似
共享模式
public final void acquireShared(int arg) {//獨占模式:tryAcquire,返回boolean表示是否成功。//共享模式:tryAcquireShared,返回int,小于0,表示acquire失敗。if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);// nextWaiter為SHARED,表示共享模式。【11】boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg); //【14】if (r >= 0) {//如果r>=0表示,表示許可有剩余。設(shè)置head,并繼續(xù)傳播setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted) selfInterrupt(); //與acquire最后2行代碼一樣,如果中斷過,仍設(shè)置中斷標(biāo)識。failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}} private void setHeadAndPropagate(Node node, int propagate) {Node h = head; //指向原來的head,后面使用。setHead(node);//設(shè)置head,【15】if (propagate > 0 // 許可有剩余 【16】|| h == null // 原有head為null,表示節(jié)點已釋放|| h.waitStatus < 0 // 狀態(tài)為PROPAGATE或SIGNAL||(h = head) == null // 新的head為null|| h.waitStatus < 0 //或者新的head的狀態(tài)<0) {Node s = node.next; //當(dāng)前節(jié)點的后續(xù)節(jié)點if (s == null || s.isShared()) //當(dāng)前節(jié)點為共享模式或者后續(xù)節(jié)點為nulldoReleaseShared(); //【17】}} public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) { //頭結(jié)點本身的waitStatus是SIGNAL且能通過CAS算法將頭結(jié)點的waitStatus從SIGNAL設(shè)置為0,喚醒頭結(jié)點的后繼節(jié)點if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //【12】continue; // loop to recheck casesunparkSuccessor(h);//CAS成功,則...【13】}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //頭結(jié)點本身的waitStatus是0的話,嘗試將其設(shè)置為PROPAGATE狀態(tài)的,意味著共享狀態(tài)可以向后傳播continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}共享模式同步隊列示意:
package main.java.study;import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch;public class CountDownLatchTest {public class MapOper implements Runnable {CountDownLatch latch ;public MapOper(CountDownLatch latch) {this.latch = latch;}public void run() {try {SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println(Thread.currentThread().getName() + "start:" + df.format(new Date()));latch.await();System.out.println(Thread.currentThread().getName() + "work:" + df.format(new Date()));} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println(Thread.currentThread().getName()+" Sync Started!");}}public static void main(String[] args) throws InterruptedException {// TODO Auto-generated method stubCountDownLatchTest test = new CountDownLatchTest();CountDownLatch latch = new CountDownLatch(1);Thread t1 = new Thread(test.new MapOper(latch));Thread t2 = new Thread(test.new MapOper(latch));Thread t3 = new Thread(test.new MapOper(latch));Thread t4 = new Thread(test.new MapOper(latch));t1.setName("Thread1");t2.setName("Thread2");t3.setName("Thread3");t4.setName("Thread4");t1.start();Thread.sleep(1500);t2.start();Thread.sleep(1500);t3.start();Thread.sleep(1500);t4.start();System.out.println("thread already start, sleep for a while...");Thread.sleep(1000);latch.countDown();}}隊列變化:
1、同步隊列初始化:前3個工作線程調(diào)用await()方法,經(jīng)過【11】,……,【5】線程掛起,狀態(tài)如下:
2、主線程調(diào)用countDown()方法,經(jīng)過【12】,【13】,【9】喚醒線程t-1,t-1繼續(xù)執(zhí)行,經(jīng)過【14】,【15】狀態(tài)如下:
3、t-1繼續(xù)執(zhí)行【16】,【17】喚醒下一個線程(node-2)
4、t-2,t-3依次,都喚醒下一個。(每個節(jié)點都由前置節(jié)點對應(yīng)的線程喚醒,喚醒立即返回)
5、結(jié)束
?
?
?
總結(jié)
以上是生活随笔為你收集整理的AbstractQueuedSynchronizer源码解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java线程阻塞原语-LockSuppo
- 下一篇: ConditionObject源码