JUC队列-ConcurrentLinkedQueue(四)
ConcurrentLinkedQueue介紹
ConcurrentLinkedQueue是線程安全的隊列,它適用于“高并發”的場景。
它是一個基于鏈接節點的無界線程安全隊列,按照 FIFO(先進先出)原則對元素進行排序。隊列元素中不可以放置null元素(內部實現的特殊節點除外)。
ConcurrentLinkedQueue的UML圖:
說明:
LinkedBlockingQueue在實現“多線程對競爭資源的互斥訪問”時,對于“插入”和“取出(刪除)”操作分別使用了不同的鎖。對于插入操作,通過“插入鎖putLock”進行同步;對于取出操作,通過“取出鎖takeLock”進行同步。
此外,插入鎖putLock和“非滿條件notFull”相關聯,取出鎖takeLock和“非空條件notEmpty”相關聯。通過notFull和notEmpty更細膩的控制鎖。
ConcurrentLinkedQueue的源碼分析
構造方法
public ConcurrentLinkedQueue() {head = tail = new Node<E>(null); }head和tail的定義如下:
private transient volatile Node<E> head; private transient volatile Node<E> tail;所以他們都是線程可見的。
Node的聲明如下:
private static class Node<E> {volatile E item;volatile Node<E> next;Node(E item) {UNSAFE.putObject(this, itemOffset, item);}boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);}boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long itemOffset;private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class k = Node.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}} }Node是個單向鏈表節點,next用于指向下一個Node,item用于存儲數據。它們都是可見的,所以Node中操作節點數據的API,都是可以通過Unsafe機制的CAS函數實現的;例如casNext()是通過CAS函數“比較并設置節點的下一個節點”
添加元素
public boolean add(E e) {return offer(e); }offer()方法
public boolean offer(E e) {// 檢查e是不是null,是的話拋出NullPointerException異常。checkNotNull(e);// 創建新的節點final Node<E> newNode = new Node<E>(e);// 將“新的節點”添加到鏈表的末尾。for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;// 情況1:q為空if (q == null) {// CAS操作:如果“p的下一個節點為null”(即p為尾節點),則設置p的下一個節點為newNode。// 如果該CAS操作成功的話,則比較“p和t”(若p不等于t,則設置newNode為新的尾節點),然后返回true。// 如果該CAS操作失敗,這意味著“其它線程對尾節點進行了修改”,則重新循環。if (p.casNext(null, newNode)) {if (p != t) // hop two nodes at a timecasTail(t, newNode); // Failure is OK.return true;}}// 情況2:p和q相等else if (p == q)p = (t != (t = tail)) ? t : head;// 情況3:其它elsep = (p != t && t != (t = tail)) ? t : q;} }取出元素
public E poll() {// 設置“標記”restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {E item = p.item;// 情況1// 表頭的數據不為null,并且“設置表頭的數據為null”這個操作成功的話;// 則比較“p和h”(若p!=h,即表頭發生了變化,則更新表頭,即設置表頭為p),然后返回原表頭的item值。if (item != null && p.casItem(item, null)) {if (p != h) // hop two nodes at a timeupdateHead(h, ((q = p.next) != null) ? q : p);return item;}// 情況2// 表頭的下一個節點為null,即鏈表只有一個“內容為null的表頭節點”。則更新表頭為p,并返回null。else if ((q = p.next) == null) {updateHead(h, p);return null;}// 情況3// 這可能到由于“情況4”的發生導致p=q,在該情況下跳轉到restartFromHead標記重新操作。else if (p == q)continue restartFromHead;// 情況4// 設置p為qelsep = q;}}}總結:ConcurrentLinkedQueue為什么支持高并發,它與其他隊列有什么不同之處?
我們知道,常用的多線程同步機制有下面三種:
volatile 變量:輕量級多線程同步機制,不會引起上下文切換和線程調度。僅提供內存可見性保證,不提供原子性。
CAS 原子指令:輕量級多線程同步機制,不會引起上下文切換和線程調度。它同時提供內存可見性和原子化更新保證。
互斥鎖:重量級多線程同步機制,可能會引起上下文切換和線程調度,它同時提供內存可見性和原子性。
ConcurrentLinkedQueue用了volatile和CAS原子指令操作隊列,比起其他隊列的互斥鎖,更加輕量,更加適合高并發。
總結
以上是生活随笔為你收集整理的JUC队列-ConcurrentLinkedQueue(四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JUC队列-LinkedBlocking
- 下一篇: 容器源码分析之PriorityQueue