并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究
并發(fā)隊列-無界非阻塞隊列 ConcurrentLinkedQueue 原理探究
http://www.importnew.com/25668.html一、 前言
常用的并發(fā)隊列有阻塞隊列和非阻塞隊列,前者使用鎖實現(xiàn),后者則使用CAS非阻塞算法實現(xiàn),使用非阻塞隊列一般性能比較好,下面就看看常用的非阻塞ConcurrentLinkedQueue是如何使用CAS實現(xiàn)的。
二、 ConcurrentLinkedQueue類圖結構
如圖ConcurrentLinkedQueue中有兩個volatile類型的Node節(jié)點分別用來存在列表的首尾節(jié)點,其中head節(jié)點存放鏈表第一個item為null的節(jié)點,tail則并不是總指向最后一個節(jié)點。Node節(jié)點內(nèi)部則維護一個變量item用來存放節(jié)點的值,next用來存放下一個節(jié)點,從而鏈接為一個單向無界列表。
| 123 | public ConcurrentLinkedQueue() {????head = tail = new Node<E>(null);} |
如上代碼初始化時候會構建一個item為NULL的空節(jié)點作為鏈表的首尾節(jié)點。
三、offer操作
offer操作是在鏈表末尾添加一個元素,下面看看實現(xiàn)原理。
| 12345678910111213141516171819202122232425262728293031323334 | public boolean offer(E e) {????//e為null則拋出空指針異常????checkNotNull(e);???//構造Node節(jié)點構造函數(shù)內(nèi)部調(diào)用unsafe.putObject,后面統(tǒng)一講????final Node<E> newNode = new Node<E>(e);????//從尾節(jié)點插入????for (Node<E> t = tail, p = t;;) {????????Node<E> q = p.next;????????//如果q=null說明p是尾節(jié)點則插入????????if (q == null) {????????????//cas插入(1)????????????if (p.casNext(null, newNode)) {????????????????//cas成功說明新增節(jié)點已經(jīng)被放入鏈表,然后設置當前尾節(jié)點(包含head,1,3,5.。。個節(jié)點為尾節(jié)點)????????????????if (p != t) // hop two nodes at a time????????????????????casTail(t, newNode);? // Failure is OK.????????????????return true;????????????}????????????// Lost CAS race to another thread; re-read next????????}????????else if (p == q)//(2)????????????//多線程操作時候,由于poll時候會把老的head變?yōu)樽砸?#xff0c;然后head的next變?yōu)樾耯ead,所以這里需要????????????//重新找新的head,因為新的head后面的節(jié)點才是激活的節(jié)點????????????p = (t != (t = tail)) ? t : head;????????else????????????// 尋找尾節(jié)點(3)????????????p = (p != t && t != (t = tail)) ? t : q;????}} |
從構造函數(shù)知道一開始有個item為null的哨兵節(jié)點,并且head和tail都是指向這個節(jié)點,然后當一個線程調(diào)用offer時候首先
如圖首先查找尾節(jié)點,q==null,p就是尾節(jié)點,所以執(zhí)行p.casNext通過cas設置p的next為新增節(jié)點,這時候p==t所以不重新設置尾節(jié)點為當前新節(jié)點。由于多線程可以調(diào)用offer方法,所以可能兩個線程同時執(zhí)行到了(1)進行cas,那么只有一個會成功(假如線程1成功了),成功后的鏈表為:
失敗的線程會循環(huán)一次這時候指針為:
這時候會執(zhí)行(3)所以p=q,然后在循環(huán)后指針位置為:
所以沒有其他線程干擾的情況下會執(zhí)行(1)執(zhí)行cas把新增節(jié)點插入到尾部,沒有干擾的情況下線程2 cas會成功,然后去更新尾節(jié)點tail,由于p!=t所以更新。這時候鏈表和指針為:
假如線程2cas時候線程3也在執(zhí)行,那么線程3會失敗,循環(huán)一次后,線程3的節(jié)點狀態(tài)為:
這時候p!=t ;并且t的原始值為told,t的新值為tnew ,所以told!=tnew,所以 p=tnew=tail;
然后在循環(huán)一下后節(jié)點狀態(tài):
q==null所以執(zhí)行(1)。
現(xiàn)在就差p==q這個分支還沒有走,這個要在執(zhí)行poll操作后才會出現(xiàn)這個情況。poll后會存在下面的狀態(tài)
這個時候添加元素時候指針分布為:
所以會執(zhí)行(2)分支 結果 p=head
然后循環(huán),循環(huán)后指針分布:
所以執(zhí)行(1),然后p!=t所以設置tail節(jié)點?,F(xiàn)在分布圖:
自引用的節(jié)點會被垃圾回收掉。
四、 add操作
add操作是在鏈表末尾添加一個元素,下面看看實現(xiàn)原理。
其實內(nèi)部調(diào)用的還是offer
| 123 | public boolean add(E e) {????return offer(e);} |
五、poll操作
poll操作是在鏈表頭部獲取并且移除一個元素,下面看看實現(xiàn)原理。
| 123456789101112131415161718192021222324252627282930313233343536 | public E poll() {????restartFromHead:????//死循環(huán)????for (;;) {????????//死循環(huán)????????for (Node<E> h = head, p = h, q;;) {????????????//保存當前節(jié)點值????????????E item = p.item;????????????//當前節(jié)點有值則cas變?yōu)閚ull(1)????????????if (item != null && p.casItem(item, null)) {????????????????//cas成功標志當前節(jié)點以及從鏈表中移除????????????????if (p != h) // 類似tail間隔2設置一次頭節(jié)點(2)????????????????????updateHead(h, ((q = p.next) != null) ? q : p);????????????????return item;????????????}????????????//當前隊列為空則返回null(3)????????????else if ((q = p.next) == null) {????????????????updateHead(h, p);????????????????return null;????????????}????????????//自引用了,則重新找新的隊列頭節(jié)點(4)????????????else if (p == q)????????????????continue restartFromHead;????????????else//(5)????????????????p = q;????????}????}}????final void updateHead(Node<E> h, Node<E> p) {????????if (h != p && casHead(h, p))????????????h.lazySetNext(h);????} |
當隊列為空時候:
可知執(zhí)行(3)這時候有兩種情況,第一沒有其他線程添加元素時候(3)結果為true然后因為h!=p為false所以直接返回null。第二在執(zhí)行q=p.next前,其他線程已經(jīng)添加了一個元素到隊列,這時候(3)返回false,然后執(zhí)行(5)p=q,然后循環(huán)后節(jié)點分布:
這時候執(zhí)行(1)分支,進行cas把當前節(jié)點值值為null,同時只有一個線程會成功,cas成功 標示該節(jié)點從隊列中移除了,然后p!=h,調(diào)用updateHead方法,參數(shù)為h,p;h!=p所以把p變?yōu)楫斍版湵韍ead節(jié)點,然后h節(jié)點的next指向自己?,F(xiàn)在狀態(tài)為:
cas失敗 后 會再次循環(huán),這時候分布圖為:
這時候執(zhí)行(3)返回null.
現(xiàn)在還有個分支(4)沒有執(zhí)行過,那么什么時候會執(zhí)行那?
這時候執(zhí)行(1)分支,進行cas把當前節(jié)點值值為null,同時只有一個線程A會成功,cas成功 標示該節(jié)點從隊列中移除了,然后p!=h,調(diào)用updateHead方法,假如執(zhí)行updateHead前另外一個線程B開始poll這時候它p指向為原來的head節(jié)點,然后當前線程A執(zhí)行updateHead這時候B線程鏈表狀態(tài)為:
所以會執(zhí)行(4)重新跳到外層循環(huán),獲取當前head,現(xiàn)在狀態(tài)為:
六、peek操作
peek操作是獲取鏈表頭部一個元素(只讀取不移除),下面看看實現(xiàn)原理。
代碼與poll類似,只是少了castItem.并且peek操作會改變head指向,offer后head指向哨兵節(jié)點,第一次peek后head會指向第一個真的節(jié)點元素。
| 12345678910111213141516 | public E peek() {????restartFromHead:????for (;;) {????????for (Node<E> h = head, p = h, q;;) {????????????E item = p.item;????????????if (item != null || (q = p.next) == null) {????????????????updateHead(h, p);????????????????return item;????????????}????????????else if (p == q)????????????????continue restartFromHead;????????????else????????????????p = q;????????}????}} |
七、size操作
獲取當前隊列元素個數(shù),在并發(fā)環(huán)境下不是很有用,因為使用CAS沒有加鎖所以從調(diào)用size函數(shù)到返回結果期間有可能增刪元素,導致統(tǒng)計的元素個數(shù)不精確。
| 123456789101112131415161718192021222324252627282930313233 | public int size() {????int count = 0;????for (Node<E> p = first(); p != null; p = succ(p))????????if (p.item != null)????????????// 最大返回Integer.MAX_VALUE????????????if (++count == Integer.MAX_VALUE)????????????????break;????return count;}//獲取第一個隊列元素(哨兵元素不算),沒有則為nullNode<E> first() {????restartFromHead:????for (;;) {????????for (Node<E> h = head, p = h, q;;) {????????????boolean hasItem = (p.item != null);????????????if (hasItem || (q = p.next) == null) {????????????????updateHead(h, p);????????????????return hasItem ? p : null;????????????}????????????else if (p == q)????????????????continue restartFromHead;????????????else????????????????p = q;????????}????}}//獲取當前節(jié)點的next元素,如果是自引入節(jié)點則返回真正頭節(jié)點final Node<E> succ(Node<E> p) {????Node<E> next = p.next;????return (p == next) ? head : next;} |
八、remove操作
如果隊列里面存在該元素則刪除給元素,如果存在多個則刪除第一個,并返回true,否者返回false
| 12345678910111213141516171819202122232425 | public boolean remove(Object o) {????//查找元素為空,直接返回false????if (o == null) return false;????Node<E> pred = null;????for (Node<E> p = first(); p != null; p = succ(p)) {????????E item = p.item;????????//相等則使用cas值null,同時一個線程成功,失敗的線程循環(huán)查找隊列中其他元素是否有匹配的。????????if (item != null &&????????????o.equals(item) &&????????????p.casItem(item, null)) {????????????//獲取next元素????????????Node<E> next = succ(p);????????????//如果有前驅節(jié)點,并且next不為空則鏈接前驅節(jié)點到next,????????????if (pred != null && next != null)????????????????pred.casNext(p, next);????????????return true;????????}????????pred = p;????}????return false;} |
九、contains操作
判斷隊列里面是否含有指定對象,由于是遍歷整個隊列,所以類似size 不是那么精確,有可能調(diào)用該方法時候元素還在隊列里面,但是遍歷過程中才把該元素刪除了,那么就會返回false.
| 123456789 | public boolean contains(Object o) {????if (o == null) return false;????for (Node<E> p = first(); p != null; p = succ(p)) {????????E item = p.item;????????if (item != null && o.equals(item))????????????return true;????}????return false;} |
十、開源框架中使用
Tomcat中NioEndPoint中的每個poller里面就維護一個ConcurrentLinkedQueue<Runnable>用來作為緩沖存放任務。
10.1 Acceptor線程
accept線程作用是接受客戶端發(fā)來的連接請求并放入到事件隊列。
看下代碼:
| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 | protected class Acceptor extends AbstractEndpoint.Acceptor {????????@Override????????public void run() {????????????int errorDelay = 0;????????????// 一直循環(huán)直到接收到shutdown命令????????????while (running) {????????????????...????????????????if (!running) {????????????????????break;????????????????}????????????????state = AcceptorState.RUNNING;????????????????try {????????????????????//如果達到max connections個請求則等待????????????????????countUpOrAwaitConnection();????????????????????SocketChannel socket = null;????????????????????try {????????????????????????// 從TCP緩存獲取一個完成三次握手的套接字,沒有則阻塞????????????????????????// socket????????????????????????socket = serverSock.accept();????????????????????} catch (IOException ioe) {????????????????????????...????????????????????}????????????????????// Successful accept, reset the error delay????????????????????errorDelay = 0;???????????????????if (running && !paused) {????????????????????????if (!setSocketOptions(socket)) {????????????????????????????countDownConnection();????????????????????????????closeSocket(socket);????????????????????????}????????????????????} else {????????????????????????countDownConnection();????????????????????????closeSocket(socket);????????????????????}???????????????????....????????????????} catch (SocketTimeoutException sx) {????????????????????// Ignore: Normal condition????????????????....????????????}????????????state = AcceptorState.ENDED;????????}????}?protected boolean setSocketOptions(SocketChannel socket) {????????// Process the connection????????try {????????????//disable blocking, APR style, we are gonna be polling it???????????...????????????getPoller0().register(channel);????????} catch (Throwable t) {???????????...????????????return false;????????}????????return true;}public void register(final NioChannel socket) {???...????addEvent(r);}public void addEvent(Runnable event) {????events.offer(event);????...} |
10.2 Poll線程
poll線程作用是從事件隊列里面獲取事件把鏈接套接字加入selector,并且監(jiān)聽socket事件進行處理。
| 12345678910111213141516171819202122232425262728293031323334353637383940414243 | public void run() {????while (true) {????????try {????????????...????????????if (close) {???????????????...????????????} else {????????????????hasEvents = events();????????????}????????????try {????????????????...????????????} catch ( NullPointerException x ) {...????????????}????????????Iterator<SelectionKey> iterator =????????????????keyCount > 0 ? selector.selectedKeys().iterator() : null;????????????// 遍歷所有注冊的channel對感興趣的事件處理????????????while (iterator != null && iterator.hasNext()) {????????????????SelectionKey sk = iterator.next();????????????????KeyAttachment attachment = (KeyAttachment)sk.attachment();????????????????if (attachment == null) {????????????????????iterator.remove();????????????????} else {????????????????????attachment.access();????????????????????iterator.remove();????????????????????processKey(sk, attachment);????????????????}????????????}//while????????????//process timeouts????????????timeout(keyCount,hasEvents);????????????if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();????????} catch (OutOfMemoryError oom) {????????????...????????}????}//while????synchronized (this) {????????this.notifyAll();????}????stopLatch.countDown();} |
| 123456789101112131415161718192021222324252627282930313233343536373839 | public boolean events() {????????????boolean result = false;????????????//從隊列獲取任務并執(zhí)行????????????Runnable r = null;????????????while ( (r = events.poll()) != null ) {????????????????result = true;????????????????try {????????????????????r.run();????????????????????if ( r instanceof PollerEvent ) {????????????????????????((PollerEvent)r).reset();????????????????????????eventCache.offer((PollerEvent)r);????????????????????}????????????????} catch ( Throwable x ) {????????????????????log.error("",x);????????????????}????????????}????????????return result;????????}//如配置線程池則請求交給線程池處理。public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {????try {????????KeyAttachment attachment = (KeyAttachment)socket.getAttachment();????????if (attachment == null) {????????????return false;????????}????????attachment.setCometNotify(false); //will get reset upon next reg????????SocketProcessor sc = processorCache.poll();????????if ( sc == null ) sc = new SocketProcessor(socket,status);????????else sc.reset(socket,status);????????if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);????????else sc.run();????} catch (RejectedExecutionException rx) {???????...????}????return true;} |
十一、有意思的問題
10.1 一個判斷的執(zhí)行結果分析
offer中有個 判斷 t != (t = tail)假如 t=node1;tail=node2;并且node1!=node2那么這個判斷是true還是false那,答案是true,這個判斷是看當前t是不是和tail相等,相等則返回true否者為false,但是無論結果是啥執(zhí)行后t的值都是tail。
下面從字節(jié)碼來分析下為啥?
- 一個例子
| 123456789 | public static void main(String[] args)? {????int t = 2;????int tail = 3;????System.out.println(t != (t = tail));} |
結果為:true;
- 字節(jié)碼文件:
字節(jié)碼命令介紹可參考: http://blog.csdn.net/web_code/article/details/12164733
一開始棧為空
- 第0行指令作用是把值2入棧棧頂元素為2
- 第1行指令作用是將棧頂int類型值保存到局部變量t中。
- 第2行指令作用是把值3入棧棧頂元素為3
- 第3行指令作用是將棧頂int類型值保存到局部變量tail中。
- 第4調(diào)用打印命令
- 第7行指令作用是把變量t中的值入棧
- 第8行指令作用是把變量tail中的值入棧
- 現(xiàn)在棧里面元素為3,2并且3位棧頂
- 第9行指令作用是當前棧頂元素入棧,所以現(xiàn)在棧內(nèi)容3,3,2
- 第10行指令作用是把棧頂元素存放到t,現(xiàn)在棧內(nèi)容3,2
- 第11行指令作用是判斷棧頂兩個元素值,相等則跳轉 18。由于現(xiàn)在棧頂嚴肅為3,2不相等所以返回true.
- 第14行指令作用是把1入棧。
然后回頭分析下!=是雙目運算符,應該是首先把左邊的操作數(shù)入棧,然后在去計算了右側操作數(shù)。
10.2 Node的構造函數(shù)
另外對于每個節(jié)點Node在構造時候使用UnSafe.putObject設置item替代了直接對volatile的賦值,這個是為了性能考慮?為啥不直接賦值那,看看類注解怎么說:
| 123 | Node(E item) {????UNSAFE.putObject(this, itemOffset, item);} |
When constructing a Node (before enqueuing it) we avoid paying for a volatile write to item by using Unsafe.putObject instead of a normal write. This allows the cost of enqueue to be”one-and-a-half”
CASes.
也就是說當構造Node節(jié)點時候(這時候節(jié)點還沒有放入隊列鏈表)為了避免正常的寫volatile變量的開銷 使用了Unsafe.putObject代替。這使元素進隊列僅僅花費1.5個cas操作的耗時。這個是說使用Unsafe.putObject比直接給volatile變量賦值更高效?目前還沒有查到相關資料。
十二、總結
ConcurrentLinkedQueue使用CAS非阻塞算法實現(xiàn)使用CAS解決了當前節(jié)點與next節(jié)點之間的安全鏈接和對當前節(jié)點值的賦值。由于使用CAS沒有使用鎖,所以獲取size的時候有可能進行offer,poll或者remove操作,導致獲取的元素個數(shù)不精確,所以在并發(fā)情況下size函數(shù)不是很有用。另外第一次peek或者first時候會把head指向第一個真正的隊列元素。
下面總結下如何實現(xiàn)線程安全的,可知入隊出隊函數(shù)都是操作volatile變量:head,tail。所以要保證隊列線程安全只需要保證對這兩個Node操作的可見性和原子性,由于volatile本身保證可見性,所以只需要看下多線程下如果保證對著兩個變量操作的原子性(CAS)。
對于offer操作是在tail后面添加元素,也就是調(diào)用tail.casNext方法,而這個方法是使用的CAS操作,只有一個線程會成功,然后失敗的線程會循環(huán)一下,重新獲取tail,然后執(zhí)行casNext方法。對于poll也是這樣的。
轉載于:https://www.cnblogs.com/silyvin/p/9106630.html
總結
以上是生活随笔為你收集整理的并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java EE 课程作业(second)
- 下一篇: centos6.8_64部署django