深入java并发包源码(三)AQS独占方法源码分析
深入java并發(fā)包源碼(一)簡介
深入java并發(fā)包源碼(二)AQS的介紹與使用
深入java并發(fā)包源碼(三)AQS獨(dú)占方法源碼分析
AQS 的實(shí)現(xiàn)原理
學(xué)完用 AQS 自定義一個(gè)鎖以后,我們可以來看一下剛剛使用過的方法的實(shí)現(xiàn)。
分析源碼的時(shí)候會(huì)省略一些不重要的代碼。
AQS 的實(shí)現(xiàn)是基于一個(gè) FIFO 隊(duì)列的,每一個(gè)等待的線程被封裝成 Node 存放在等待隊(duì)列中,頭結(jié)點(diǎn)是空的,不存儲(chǔ)信息,等待隊(duì)列中的節(jié)點(diǎn)都是阻塞的,并且在每次被喚醒后都會(huì)檢測自己的前一個(gè)節(jié)點(diǎn)是否為頭結(jié)點(diǎn),如果是頭節(jié)點(diǎn)證明在這個(gè)線程之前沒有在等待的線程,就嘗試著去獲取共享資源。
AQS 的繼承關(guān)系
AQS 繼承了AbstractOwnableSynchronizer,我們先分析一下這個(gè)父類。
public abstract class AbstractOwnableSynchronizerimplements java.io.Serializable {protected AbstractOwnableSynchronizer() { }/*** 獨(dú)占模式下的線程*/private transient Thread exclusiveOwnerThread;/*** 設(shè)置線程,只是對線程的 set 方法*/protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}/*** 設(shè)置線程,對線程的 get 方法*/protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;} }父類非常簡單,持有一個(gè)獨(dú)占模式下的線程,然后就只剩下對這個(gè)線程的 get 和 set 方法。
AQS的內(nèi)部類
AQS 是用鏈表隊(duì)列來實(shí)現(xiàn)線程等待的,那么隊(duì)列肯定要有節(jié)點(diǎn),我們先從節(jié)點(diǎn)講起。
Node 類,每一個(gè)等待的線程都會(huì)被封裝成 Node 類
Node 的域
public class Node {int waitStatus;Node prev;Node next;Thread thread;Node nextWaiter; }waitStatus:等待狀態(tài)
prev:前驅(qū)節(jié)點(diǎn)
next:后繼節(jié)點(diǎn)
thread:持有的線程
nextWaiter:condiction 隊(duì)列中的后繼節(jié)點(diǎn)
Node 的 status:
Node 的狀態(tài)有四種:
取消狀態(tài)的值是唯一的正數(shù),也是唯一當(dāng)排隊(duì)排到它了也不要資源而是直接輪到下個(gè)線程來獲取資源的
AQS 中的方法源碼分析
acquire
這個(gè)方法執(zhí)行了:
- 使用我們實(shí)現(xiàn)的 tryAcquire 來嘗試獲得鎖,如果獲得了鎖則退出
- 如果沒有獲得鎖就將線程添加到等待隊(duì)列中
看到上面的 tryAcquire 返回 false 后就會(huì)調(diào)用 addWaiter 新建節(jié)點(diǎn)加入等待隊(duì)列中。參數(shù) EXCLUSIVE 是獨(dú)占模式。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// 拿到尾節(jié)點(diǎn),如果尾節(jié)點(diǎn)是空則說明是第一個(gè)節(jié)點(diǎn),就直接入隊(duì)就好了Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 如果尾節(jié)點(diǎn)不是空的,則需要特殊方法入隊(duì)enq(node);return node; }在 addWaiter 方法創(chuàng)建完節(jié)點(diǎn)后,調(diào)用 enq 方法,在循環(huán)中用 CAS 操作將新的節(jié)點(diǎn)入隊(duì)。
因?yàn)榭赡軙?huì)有多個(gè)線程同時(shí)設(shè)置尾節(jié)點(diǎn),所以需要放在循環(huán)中不斷的設(shè)置尾節(jié)點(diǎn)。
private Node enq(final Node node) {for (;;) {Node t = tail;// 查看尾節(jié)點(diǎn)if (t == null) { // Must initialize// 尾節(jié)點(diǎn)為空則設(shè)置為剛剛創(chuàng)建的節(jié)點(diǎn)if (compareAndSetHead(new Node()))tail = head;} else {// 尾節(jié)點(diǎn)node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}} }在這里,節(jié)點(diǎn)入隊(duì)就結(jié)束了。
那么我們回來前面分析的方法,
public final void acquire(long arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }剛剛分析完了 addWaiter 方法,這個(gè)方法返回了剛剛創(chuàng)建并且加入的隊(duì)列。現(xiàn)在開始分析 acquireQueued 方法。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;// 在循環(huán)中去獲取鎖for (;;) {// 拿前一個(gè)節(jié)點(diǎn)final Node p = node.predecessor();// 如果前一個(gè)節(jié)點(diǎn)是頭結(jié)點(diǎn)則嘗試著去獲取鎖if (p == head && tryAcquire(arg)) {// 拿到鎖后把這個(gè)節(jié)點(diǎn)設(shè)為頭結(jié)點(diǎn),這里 setHead 會(huì)把除了 next 以外的數(shù)據(jù)清除setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 這個(gè)方法查看在獲取鎖失敗以后是否中斷,如果否的話就調(diào)用// parkAndCheckInterrupt 阻塞方法線程,等待被喚醒if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }acquireInterruptibly
因?yàn)楹芟袼皂槺銇砜匆幌?acquireInterruptibly 所調(diào)用的方法:
private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}// 只有這一句有差別,獲取失敗了并且檢測到中斷位被設(shè)為 true 直接拋出異常if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }acquireNanos
再來看一下有限時(shí)間的,當(dāng)獲取超時(shí)以后會(huì)將節(jié)點(diǎn) Node 的狀態(tài)設(shè)為 cancel,設(shè)置為取消的用處在后面的 release 方法中會(huì)有體現(xiàn)。
private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; failed = false;return true;}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }總結(jié)一下過程
release
這個(gè)方法首先去調(diào)用了我們實(shí)現(xiàn)的 tryRelease,當(dāng)結(jié)果返回成功的時(shí)候,拿到頭結(jié)點(diǎn),調(diào)用 unparkSuccessor 方法來喚醒頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)。
public final boolean release(long arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; } private void unparkSuccessor(Node node) {int ws = node.waitSatus;// 因?yàn)橐呀?jīng)獲取過鎖,所以將狀態(tài)設(shè)設(shè)為 0。失敗也沒所謂,說明有其他的線程把它設(shè)為0了if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** 一般來說頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)是在等待著被喚醒的,但是如果是取消的或者意外的是空的,* 則向后遍歷直到找到?jīng)]有被取消的節(jié)點(diǎn)* */Node s = node.next;// 為空或者大于 0,只有 cancel 狀態(tài)是大于 0 的if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread); }參考文獻(xiàn)
- 周志明. 深入理解 Java 虛擬機(jī) [M]. 機(jī)械工業(yè)出版社, 2011.
- 方騰飛.Java 并發(fā)編程的藝術(shù) [M]. 機(jī)械工業(yè)出版社, 2015.
- 深入剖析基于并發(fā)AQS的(獨(dú)占鎖)重入鎖(ReetrantLock)及其Condition實(shí)現(xiàn)原理
- 【JUC】JDK1.8源碼分析之AbstractQueuedSynchronizer(二)
- AbstractQueuedSynchronizer的介紹和原理分析
轉(zhuǎn)載于:https://www.cnblogs.com/zjmeow/p/9972202.html
總結(jié)
以上是生活随笔為你收集整理的深入java并发包源码(三)AQS独占方法源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: swoole 清除定时器提示no tim
- 下一篇: sprint冲刺计划第三天团队任务