Java并发基础框架AbstractQueuedSynchronizer初探(ReentrantLock的实现分析)
AbstractQueuedSynchronizer是實現Java并發類庫的一個基礎框架,Java中的各種鎖(RenentrantLock, ReentrantReadWriteLock)以及同步工具類(Semaphore, CountDownLatch)等很多都是基于AbstractQueuedSynchronizer實現的。AbstractQueuedSynchronizer 一般簡稱AQS,Abstract表示他是一個抽象類,Queued表示他是基于先進先出 FIFO 等待隊列實現的,Synchronizer表示他是一個同步器。
基于隊列的意思是,我們用鎖來說明,比如多個線程想要獲得同一個對象上的鎖,那么這些線程會按照申請鎖的先后順序在該鎖對象中的一個FIFO隊列上排隊等待(也就是將這些線程對象的引用插入到該鎖的隊列中)。AQS是Java并發的基礎框架,同時AOS的實現的基礎卻是 sun.misc.Unsafe 和 volatile,當然還有LockSupport工具類,LockSupport也是借助于Unsafe,主要實現線程的“阻塞”(park)和線程的“喚醒阻塞”(unpark)。基本原理是?sun.misc.Unsafe 保證了內存操作的“原子性”,而volatile保證了內存“可見性”。Unsafe的源碼可以參見:http://www.docjar.com/html/api/sun/misc/Unsafe.java.html ,它提供了各種原子性的內存CAS操作。
本文從ReentrantLock的實現來初步探索AbstractQueuedSynchronizer。為了好把握方向,我們將ReentrantLock的源碼(Java1.8.0_40)簡化如下:
public class ReentrantLock implements Lock, java.io.Serializable {private static final long serialVersionUID = 7373984872572414699L;/** Synchronizer providing all implementation mechanics */private final Sync sync;abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;abstract void lock();final boolean nonfairTryAcquire(int acquires) {// ... ... }protected final boolean tryRelease(int releases) {// ... ... }// ... ... }/*** Sync object for non-fair locks*/static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}/*** Sync object for fair locks*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}protected final boolean tryAcquire(int acquires) {// ... ... }}public ReentrantLock() {sync = new NonfairSync();}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}public void lock() {sync.lock();}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock() {return sync.nonfairTryAcquire(1);}public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}public void unlock() {sync.release(1);}??? public Condition newCondition() {
??????? return sync.newCondition();
??? }// ... ... }
可以明顯的看到,ReentrantLock 實現的所以接口都是借助于他的實例屬性——同步器sync來實現的,從構造函數可以看出,ReentrantLock默認是非公平鎖——使用非公平同步器NonfairSync,傳入true時得到的是公平鎖——使用公平同步器FairSync。而這兩者都是繼承于抽象類Sync,而抽象類Sync又繼承于我們的AbstractQueuedSynchronizer。我們先整體看下AQS的實現代碼:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {static final class Node { volatile int waitStatus;volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter;Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}/*** Head of the wait queue, lazily initialized. */private transient volatile Node head;/*** Tail of the wait queue, lazily initialized. */private transient volatile Node tail;/*** The synchronization state.*/private volatile int state;/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {// ... ... }/*** Creates and enqueues node for current thread and given mode.* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {// ... ... }/*** Sets head of queue to be node, thus dequeuing. Called only by* acquire methods. Also nulls out unused fields for sake of GC* and to suppress unnecessary signals and traversals.* @param node the node*/private void setHead(Node node) {// ... ... }public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;// ... ... } }
AbstractQueuedSynchronizer的實現包含了兩個內部類,Node 類和 ConditionObject類,而后者只有在使用 ReentrantLock.newCondition()時才會用到,暫時不去管它。Node類主要作為FIFO隊列上的節點,存儲在鎖上等待的所有線程對象的信息。提供了enq(final Node node)方法用于插入隊列尾部,addWaiter(Node mode)方法用于加入FIFO隊列,setHead(Node node)用于初始化FIFO隊列的頭部。所以AbstractQueuedSynchronizer沒有我們想象的那么復雜,它主要是用于實現一個FIFO的等待隊列(我們暫時放下ConditionObject不管),以及管理同步器的狀態status。
我們在看一下他繼承的父類:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {protected AbstractOwnableSynchronizer() { }/*** The current owner of exclusive mode synchronization.*/private transient Thread exclusiveOwnerThread;protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;} }很簡單,就是實現了互斥同步器的所有者的功能,比如互斥鎖正被哪個線程占有者。
我們大體了解了AbstractQueuedSynchronizer之后,我們再從細節上仔細分析ReentrantLock的實現。
1)ReentrantLock.lock實現分析:
ReentrantLock分為公平和非公平的鎖,NonfairSync 和 FairSync的lock實現分別如下:
static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}?
NonfairSync.lock 和 FairSync.lock實現差別只有兩行代碼:
if (compareAndSetState(0, 1))
??????????????? setExclusiveOwnerThread(Thread.currentThread());
就是這兩行代碼使得了 NonfairSync.lock 的鎖的實現是非公平的,這兩行代碼的意思是:如果sync同步器的狀態為0,也就是鎖沒有被占,那么就設置為1,也就是立刻獲得鎖,并且設置鎖的擁有者。也就是說非公平鎖,可以 不進入等待隊列而直接獲取鎖,并且不管是否在他的前面已經有其它線程在等待著獲取該鎖,這就是“不公平”鎖的原因之一。原因之二是它們的調用 acquire(1); 都是在 AQS 中,都分別調用了子類中的tryAcquire,而NonfairSync.tryAcquire 和 FairSync.tryAcquire實現又不同:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}這里含義是:tryAcquire(arg)嘗試去獲得鎖,并且調用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),將該申請鎖的線程插入FIFO等待隊列。而NonfairSync.tryAcquire的實現如下:
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}/*** Performs non-fair tryLock. tryAcquire is implemented in* subclasses, but both need nonfair try for trylock method.*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}而FairSync.tryAcquire的實現如下:
/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}可以明顯看到公平鎖的實現:
??????????? if (c == 0) {
??????????????? if (!hasQueuedPredecessors()
即使 c==0 ,也就是鎖沒有被占有,它也要調用hasQueuedPredecessors()去判斷是否在自己前面已經有線程在等待隊列上了,所以這里就是實現了FIFO的公平,先到的先獲得鎖。所以公平鎖和非公平鎖的實現在上面的兩個對方是有區別的。
分析完了鎖的公平和非公平的原因,我們再接著上面看如何實現加入FIFO隊列,以及如何實現等待:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}tryAcquire(arg)剛才分析完了,我們再看addWaiter(Node.EXCLUSIVE):
/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}??? /**
???? * Inserts node into queue, initializing if necessary. See picture above.
???? * @param node the node to insert
???? * @return node's predecessor
???? */
??? private Node enq(final Node node) {
??????? for (;;) {
??????????? Node t = tail;
??????????? if (t == null) { // Must initialize
??????????????? if (compareAndSetHead(new Node()))
??????????????????? tail = head;
??????????? } else {
??????????????? node.prev = t;
??????????????? if (compareAndSetTail(t, node)) {
??????????????????? t.next = node;
??????????????????? return t;
??????????????? }
??????????? }
??????? }
??? }
很簡單,就是構造一個Node節點,然后插入到等待隊列的尾部。
再看acquireQueued(addWaiter(Node.EXCLUSIVE), arg):
/*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}這里就實現了在鎖上的“阻塞”的功能。在一個死循環中,先判斷Node是否是等待隊列的頭節點,如果是的話,然后調用tryAcquire(arg)去獲得鎖,然后就可以返回了,也就是獲得鎖成功了。如果Node不是頭節點的話,線程就要被阻塞了:
/*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops. Requires that pred == node.prev.** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}該函數的功能是將Node的前驅節點的等待狀態pred.waitStatus設置為SIGNAL。這樣設置的原因是方便實現Node節點的“喚醒阻塞”(unpark)。設置成功之后調用:parkAndCheckInterrupt(); 開始被“阻塞”:
/*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}阻塞的實現利用了LockSupport類,而LockSupport類又使用了Unsafe:
public static void park(Object blocker) {Thread t = Thread.currentThread();setBlocker(t, blocker);UNSAFE.park(false, 0L);setBlocker(t, null);}setBlocker(t, blocker) 設置了當前線程被誰阻塞了。UNSAFE.park(false, 0L);實現阻塞:
/*** Block current thread, returning when a balancing* <tt>unpark</tt> occurs, or a balancing <tt>unpark</tt> has* already occurred, or the thread is interrupted, or, if not* absolute and time is not zero, the given time nanoseconds have* elapsed, or if absolute, the given deadline in milliseconds* since Epoch has passed, or spuriously (i.e., returning for no* "reason"). Note: This operation is in the Unsafe class only* because <tt>unpark</tt> is, so it would be strange to place it* elsewhere.*/public native void park(boolean isAbsolute, long time);park方法可以被 unpark 喚醒,超時也會被喚醒,中斷也會被喚醒。
park方法被喚醒了之后,就會在上面那個死循環中,再次檢查自己是否是 頭結點:
for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}如果是頭結點的話, 那么重新調用tryAcquire(arg)去獲得鎖,然后返回,表示獲得鎖成功了。到這里 ReentrantLock.lock()方法的實現算是分析完了。
2)ReentrantLock.unlock實現分析:
/*** Attempts to release this lock.** <p>If the current thread is the holder of this lock then the hold* count is decremented. If the hold count is now zero then the lock* is released. If the current thread is not the holder of this* lock then {@link IllegalMonitorStateException} is thrown.** @throws IllegalMonitorStateException if the current thread does not* hold this lock*/public void unlock() {sync.release(1);} /*** Releases in exclusive mode. Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument. This value is conveyed to* {@link #tryRelease} but is otherwise uninterpreted and* can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}再看 tryRelease:
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}很簡單,就是修改 sync 的屬性status。如果stauts等于0了,就表示鎖已經被釋放了。于是就可以喚醒FIFO隊列的頭節點了,unparkSuccessor(head):
/*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}這里 t.waitStatus <= 0 小于0的包括了 我們在調用shouldParkAfterFailedAcquire時 設置waitStatus 為SIGNAL,因為SIGNAL==-1,所以這里的LockSupport.unpark(s.thread)剛好喚醒了前面的 park().
所以lock() 和 unlock()方法也對應起來了。到這里ReentrantLock的lock和unlock方法分析完成。ReentrantLock的實現借助于AQS,而AQS有借助于LockSupport和Unsafe,以及volatile。ReentrantLock使用state表示鎖被同一個線程獲取了多少次,并且記錄了鎖的擁有者(線程)。可重入鎖的可重入的原因就是因為記錄了鎖的擁有者和記錄鎖被獲取的次數來實現的。另外鎖的公平性的實現就是是否允許鎖申請的插隊。
Semaphore, CountDownLatch的實現相比ReentrantLock而言更加簡單,實現方式也是大體相似的。
其實查看一些JDK關于并發的庫,就可以知道:Java并發庫的構建的基礎基本就兩個——Unsafe和volatile,前者保證“原子性”,后者保證“可見性”。
轉載于:https://www.cnblogs.com/digdeep/p/4445128.html
總結
以上是生活随笔為你收集整理的Java并发基础框架AbstractQueuedSynchronizer初探(ReentrantLock的实现分析)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: oracle SQL not in nu
- 下一篇: 【转】你必须了解的Session的本质