Java并发编程之AbstractQueuedSynchronizer(AQS)源码解析
自己一個人隨便看看源碼學習的心得,分享一下啦,不過我覺得還是建議去買本Java并發(fā)編程的書來看會比較好點,畢竟個人的理解有限嘛。
獨占鎖和共享鎖
首先先引入這兩個鎖的概念:
獨占鎖即同一時刻只有一個線程才能獲取到鎖,Lock的實現(xiàn)類中ReentrantLock和WriteLock就是獨占鎖,所以獨占鎖也叫排它鎖;
共享鎖是同一時刻多線程能獲取到的鎖叫共享鎖,即ReadLock;在讀寫鎖共有的情況下,會出現(xiàn)共存的情況即讀-讀共存,讀-寫、寫-寫不能共存,他們會產(chǎn)生互斥。
數(shù)據(jù)結(jié)構(gòu)
打開源碼首先就得看下它的所有方法和變量,就會發(fā)現(xiàn),AQS其實用的設(shè)計模式是模板模式,先講一下啥是模板模式吧,直接上代碼吧,用代碼看會比較直觀一點:
public abstract class UseTemplate{//這個定義的三個抽象接口方法public abstract void function1();public abstract void function2();public abstract void function3();//...還有其他屬性或方法//這個方法就是模板方法,在繼承的子類中實現(xiàn)好抽象方法,然后在調(diào)用父類的模板方法public final void runTemplate(){//在父類中寫好功能執(zhí)行的方法即可,也可設(shè)置權(quán)限,禁止重新覆蓋function1();function2();function3();}}AQS是采用的是一個雙向鏈表隊列,在該類中包含了一個Node類,雙向鏈表就是右這個實現(xiàn)的
先了解一下這個靜態(tài)內(nèi)部類,以下為源代碼(每個屬性在下面都有解釋):
static final class Node{/** Marker to indicate a node is waiting in shared mode *///用于共享鎖的節(jié)點屬性static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode *///用于獨占鎖使用屬性static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;/*** Status field, taking on only the values:* SIGNAL: The successor of this node is (or will soon be)* blocked (via park), so the current node must* unpark its successor when it releases or* cancels. To avoid races, acquire methods must* first indicate they need a signal,* then retry the atomic acquire, and then,* on failure, block.* CANCELLED: This node is cancelled due to timeout or interrupt.* Nodes never leave this state. In particular,* a thread with cancelled node never again blocks.* CONDITION: This node is currently on a condition queue.* It will not be used as a sync queue node* until transferred, at which time the status* will be set to 0. (Use of this value here has* nothing to do with the other uses of the* field, but simplifies mechanics.)* PROPAGATE: A releaseShared should be propagated to other* nodes. This is set (for head node only) in* doReleaseShared to ensure propagation* continues, even if other operations have* since intervened.* 0: None of the above** The values are arranged numerically to simplify use.* Non-negative values mean that a node doesn't need to* signal. So, most code doesn't need to check for particular* values, just for sign.** The field is initialized to 0 for normal sync nodes, and* CONDITION for condition nodes. It is modified using CAS* (or when possible, unconditional volatile writes).*///當前節(jié)點的狀態(tài),狀態(tài)值即上面幾個狀態(tài)值,有相對應(yīng)的英文解釋,就不翻譯了volatile int waitStatus;/*** Link to predecessor node that current node/thread relies on* for checking waitStatus. Assigned during enqueuing, and nulled* out (for sake of GC) only upon dequeuing. Also, upon* cancellation of a predecessor, we short-circuit while* finding a non-cancelled one, which will always exist* because the head node is never cancelled: A node becomes* head only as a result of successful acquire. A* cancelled thread never succeeds in acquiring, and a thread only* cancels itself, not any other node.*/volatile Node prev;//前一個節(jié)點指向/*** Link to the successor node that the current node/thread* unparks upon release. Assigned during enqueuing, adjusted* when bypassing cancelled predecessors, and nulled out (for* sake of GC) when dequeued. The enq operation does not* assign next field of a predecessor until after attachment,* so seeing a null next field does not necessarily mean that* node is at end of queue. However, if a next field appears* to be null, we can scan prev's from the tail to* double-check. The next field of cancelled nodes is set to* point to the node itself instead of null, to make life* easier for isOnSyncQueue.*/volatile Node next;//下一個節(jié)點指向/*** The thread that enqueued this node. Initialized on* construction and nulled out after use.*/volatile Thread thread;//當前節(jié)點的線程/*** Link to next node waiting on condition, or the special* value SHARED. Because condition queues are accessed only* when holding in exclusive mode, we just need a simple* linked queue to hold nodes while they are waiting on* conditions. They are then transferred to the queue to* re-acquire. And because conditions can only be exclusive,* we save a field by using special value to indicate shared* mode.*/Node nextWaiter;//使用在Condition上面,即下一個等待節(jié)點/*** Returns true if node is waiting in shared mode.*/final boolean isShared() {return nextWaiter == SHARED;}/*** Returns previous node, or throws NullPointerException if null.* Use when predecessor cannot be null. The null check could* be elided, but is present to help the VM.** @return the predecessor of this node*///獲取前一個節(jié)點final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;} }了解完這個之后,在AQS類中有三個變量:
/*** Head of the wait queue, lazily initialized. Except for* initialization, it is modified only via method setHead. Note:* If head exists, its waitStatus is guaranteed not to be* CANCELLED.*/private transient volatile Node head;//當前頭部節(jié)點/*** Tail of the wait queue, lazily initialized. Modified only via* method enq to add new wait node.*/private transient volatile Node tail;//當前尾節(jié)點/*** The synchronization state.*/private volatile int state;//拿鎖的狀態(tài)AQS中還有一個內(nèi)部類ConditionObject,這個類繼承了Condition接口,這個類也有一個隊列,不過不是雙向隊列,單向隊列,也是通過Node實現(xiàn)的:
/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;AQS方法
獨占式獲取:? accquire()、? acquireInterruptibly()、? tryAcquireNanos()
共享式獲取:? acquireShared()、? acquireSharedInterruptibly()、? tryAcquireSharedNanos()
獨占式釋放鎖:? release()
共享式釋放鎖:? releaseShared()
需要子類覆蓋的流程方法:
?獨占式獲取: tryAcquire()
?獨占式釋放: tryRelease()
?共享式獲取: tryAcquireShared()
共享式釋放: tryReleaseShared()
這個同步器是否處于獨占模式 isHeldExclusively()
同步狀態(tài)state:1)、? getState():獲取當前的同步狀態(tài);2)、? setState(int):設(shè)置當前同步狀態(tài);3)、? compareAndSetState(int,int) 使用CAS設(shè)置狀態(tài),保證狀態(tài)設(shè)置的原子性.
AQS使用方法及流程
需要繼承AQS這個類并實現(xiàn)他的方法,我們先自己寫一個獨占鎖的實現(xiàn)方式:
/*** state 當為0的時候表示還沒有拿到鎖 為1的時候為拿到鎖了* @author 26225**/static final class Sync extends AbstractQueuedSynchronizer{/*** */private static final long serialVersionUID = 1L;@Overrideprotected boolean tryAcquire(int arg) {if(getState() == 1)return false;else {/*** 需使用compareAndSetState() CAS原子操作 而不是使用setState(1),因為使用了volatile修飾了state 雖然保證了可見性,但是修改還是得保證原子操作*/if(compareAndSetState(getState(), arg)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}}@Overrideprotected boolean tryRelease(int arg) {if(getState() == 0 )throw new UnsupportedOperationException();setExclusiveOwnerThread(null);setState(arg);return true;}@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}Condition newCondition() {return new ConditionObject();}}private final Sync sync = new Sync();public void lock() {sync.acquire(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock() {return sync.tryAcquire(1);}public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {// TODO Auto-generated method stubreturn sync.tryAcquireNanos(1, unit.toNanos(time));}public void unlock() {sync.release(0);}public Condition newCondition() {return sync.newCondition();}先簡單的講解一下該段代碼:
在調(diào)用Lock的unlock()方法時,需調(diào)用AQS的acquire()方法,我們先看一下這個方法調(diào)用:
/*** Acquires in exclusive mode, ignoring interrupts. Implemented* by invoking at least once {@link #tryAcquire},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used* to implement method {@link Lock#lock}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.*/public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}其實就是先調(diào)用我們自己實現(xiàn)的方法,判斷拿鎖的狀態(tài),并且設(shè)置這個狀態(tài);如果拿到了鎖的話,就不用管它,沒有拿到的話,就得加入到等待隊列中,我們看一下加入等待隊列的方法:
/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}?判斷頭部節(jié)點是否存在,如果不存在的話就調(diào)用enq()方法,判斷當前尾節(jié)點存不存在,存在的話,則將當前節(jié)點通過CAS操作設(shè)置成尾節(jié)點,如果不存在則將當前線程包裝成一個Node對象設(shè)置成頭節(jié)點,并且將頭部和尾部設(shè)置成同一個
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}/*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}?這個方法是加入到隊列,就是設(shè)置當前Node對象的前一個節(jié)點指向以及后一個節(jié)點指向,并且調(diào)用parkAndCheckInterrupt()通過LockSupport將當前線程進入到等待狀態(tài)。
以上就是調(diào)用lock()方法基本流程,現(xiàn)在看一下代用unlock()的基本流程,執(zhí)行的是AQS的release()
/*** Releases in exclusive mode. Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument. This value is conveyed to* {@link #tryRelease} but is otherwise uninterpreted and* can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}/*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}?在釋放鎖的過程中,首先還原之前的拿鎖狀態(tài),還原之后將隊列頭部節(jié)點的下一個節(jié)點通過LockSupport.unpark()進行喚醒。
AbstractOwnableSynchronizer
在JDK1.6之后AQS繼承了AbstractOwnableSynchronizer這個類,其實這個類主要是用來記錄當前訪問的線程:
/*** The current owner of exclusive mode synchronization.*/private transient Thread exclusiveOwnerThread;/*** Sets the thread that currently owns exclusive access.* A {@code null} argument indicates that no thread owns access.* This method does not otherwise impose any synchronization or* {@code volatile} field accesses.* @param thread the owner thread*/protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}/*** Returns the thread last set by {@code setExclusiveOwnerThread},* or {@code null} if never set. This method does not otherwise* impose any synchronization or {@code volatile} field accesses.* @return the owner thread*/protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}AQS的基本流程差不多分析完了,講的有問題的話,還請大佬指正!!!
總結(jié)
以上是生活随笔為你收集整理的Java并发编程之AbstractQueuedSynchronizer(AQS)源码解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Mybatis源码阅读(四):核心接口4
- 下一篇: win10卓越性能模式,提升电脑性能