ConcurrentLinkedQueue非阻塞队列实现原理分析
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
引言
在并發(fā)編程中我們有時(shí)候需要使用線程安全的隊(duì)列。如果我們要實(shí)現(xiàn)一個(gè)線程安全的隊(duì)列有兩種實(shí)現(xiàn)方式一種是使用阻塞算法,另一種是使用非阻塞算法。使用阻塞算法的隊(duì)列可以用一個(gè)鎖(入隊(duì)和出隊(duì)用同一把鎖)或兩個(gè)鎖(入隊(duì)和出隊(duì)用不同的鎖)等方式來(lái)實(shí)現(xiàn),而非阻塞的實(shí)現(xiàn)方式則可以使用循環(huán)CAS的方式來(lái)實(shí)現(xiàn),本文讓我們一起來(lái)研究下Doug Lea是如何使用非阻塞的方式來(lái)實(shí)現(xiàn)線程安全隊(duì)列ConcurrentLinkedQueue的,相信從大師身上我們能學(xué)到不少并發(fā)編程的技巧。
ConcurrentLinkedQueue的介紹
ConcurrentLinkedQueue是一個(gè)基于鏈接節(jié)點(diǎn)的無(wú)界線程安全隊(duì)列,它采用先進(jìn)先出的規(guī)則對(duì)節(jié)點(diǎn)進(jìn)行排序,當(dāng)我們添加一個(gè)元素的時(shí)候,它會(huì)添加到隊(duì)列的尾部,當(dāng)我們獲取一個(gè)元素時(shí),它會(huì)返回隊(duì)列頭部的元素。它采用了“wait-free”算法來(lái)實(shí)現(xiàn),該算法在Michael & Scott算法上進(jìn)行了一些修改。
ConcurrentLinkedQueue的結(jié)構(gòu)
我們通過(guò)ConcurrentLinkedQueue的類圖來(lái)分析一下它的結(jié)構(gòu)。?ConcurrentLinkedQueue由head節(jié)點(diǎn)和tair節(jié)點(diǎn)組成,每個(gè)節(jié)點(diǎn)(Node)由節(jié)點(diǎn)元素(item)和指向下一個(gè)節(jié)點(diǎn)的引用(next)組成,節(jié)點(diǎn)與節(jié)點(diǎn)之間就是通過(guò)這個(gè)next關(guān)聯(lián)起來(lái),從而組成一張鏈表結(jié)構(gòu)的隊(duì)列。默認(rèn)情況下head節(jié)點(diǎn)存儲(chǔ)的元素為空,tair節(jié)點(diǎn)等于head節(jié)點(diǎn)。
private transient volatile Node<e> tail = head;入隊(duì)列
入隊(duì)列就是將入隊(duì)節(jié)點(diǎn)添加到隊(duì)列的尾部。為了方便理解入隊(duì)時(shí)隊(duì)列的變化,以及head節(jié)點(diǎn)和tair節(jié)點(diǎn)的變化,每添加一個(gè)節(jié)點(diǎn)我就做了一個(gè)隊(duì)列的快照?qǐng)D。?
第一步添加元素1:隊(duì)列更新head節(jié)點(diǎn)的next節(jié)點(diǎn)為元素1節(jié)點(diǎn)。又因?yàn)閠ail節(jié)點(diǎn)默認(rèn)情況下等于head節(jié)點(diǎn),所以它們的next節(jié)點(diǎn)都指向元素1節(jié)點(diǎn)。?第二步添加元素2:隊(duì)列首先設(shè)置元素1節(jié)點(diǎn)的next節(jié)點(diǎn)為元素2節(jié)點(diǎn),然后更新tail節(jié)點(diǎn)指向元素2節(jié)點(diǎn)。?第三步添加元素3:設(shè)置tail節(jié)點(diǎn)的next節(jié)點(diǎn)為元素3節(jié)點(diǎn)。?第四步添加元素4:設(shè)置元素3的next節(jié)點(diǎn)為元素4節(jié)點(diǎn),然后將tail節(jié)點(diǎn)指向元素4節(jié)點(diǎn)。
通過(guò)debug入隊(duì)過(guò)程并觀察head節(jié)點(diǎn)和tail節(jié)點(diǎn)的變化,發(fā)現(xiàn)入隊(duì)主要做兩件事情,第一是將入隊(duì)節(jié)點(diǎn)設(shè)置成當(dāng)前隊(duì)列尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)。第二是更新tail節(jié)點(diǎn),如果tail節(jié)點(diǎn)的next節(jié)點(diǎn)不為空,則將入隊(duì)節(jié)點(diǎn)設(shè)置成tail節(jié)點(diǎn),如果tail節(jié)點(diǎn)的next節(jié)點(diǎn)為空,則將入隊(duì)節(jié)點(diǎn)設(shè)置成tail的next節(jié)點(diǎn),所以tail節(jié)點(diǎn)不總是尾節(jié)點(diǎn),理解這一點(diǎn)對(duì)于我們研究源碼會(huì)非常有幫助。
上面的分析讓我們從單線程入隊(duì)的角度來(lái)理解入隊(duì)過(guò)程,但是多個(gè)線程同時(shí)進(jìn)行入隊(duì)情況就變得更加復(fù)雜,因?yàn)榭赡軙?huì)出現(xiàn)其他線程插隊(duì)的情況。如果有一個(gè)線程正在入隊(duì),那么它必須先獲取尾節(jié)點(diǎn),然后設(shè)置尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)為入隊(duì)節(jié)點(diǎn),但這時(shí)可能有另外一個(gè)線程插隊(duì)了,那么隊(duì)列的尾節(jié)點(diǎn)就會(huì)發(fā)生變化,這時(shí)當(dāng)前線程要暫停入隊(duì)操作,然后重新獲取尾節(jié)點(diǎn)。讓我們?cè)偻ㄟ^(guò)源碼來(lái)詳細(xì)分析下它是如何使用CAS算法來(lái)入隊(duì)的。
public boolean offer(E e) {if (e == null) throw new NullPointerException();//入隊(duì)前,創(chuàng)建一個(gè)入隊(duì)節(jié)點(diǎn)Node<E> n = new Node<E>(e);retry://死循環(huán),入隊(duì)不成功反復(fù)入隊(duì)。for (;;) {//創(chuàng)建一個(gè)指向tail節(jié)點(diǎn)的引用Node<E> t = tail;//p用來(lái)表示隊(duì)列的尾節(jié)點(diǎn),默認(rèn)情況下等于tail節(jié)點(diǎn)。Node<E> p = t;for (int hops = 0; ; hops++) {//獲得p節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)。Node<E> next = succ(p);//next節(jié)點(diǎn)不為空,說(shuō)明p不是尾節(jié)點(diǎn),需要更新p后在將它指向next節(jié)點(diǎn)if (next != null) {//循環(huán)了兩次及其以上,并且當(dāng)前節(jié)點(diǎn)還是不等于尾節(jié)點(diǎn)if (hops > HOPS && t != tail)continue retry;p = next;}//如果p是尾節(jié)點(diǎn),則設(shè)置p節(jié)點(diǎn)的next節(jié)點(diǎn)為入隊(duì)節(jié)點(diǎn)。else if (p.casNext(null, n)) {//如果tail節(jié)點(diǎn)有大于等于1個(gè)next節(jié)點(diǎn),則將入隊(duì)節(jié)點(diǎn)設(shè)置成tair節(jié)點(diǎn),更新失敗了也沒(méi)關(guān)系,因?yàn)槭×吮硎居衅渌€程成功更新了tair節(jié)點(diǎn)if (hops >= HOPS)casTail(t, n); // 更新tail節(jié)點(diǎn),允許失敗return true;}// p有next節(jié)點(diǎn),表示p的next節(jié)點(diǎn)是尾節(jié)點(diǎn),則重新設(shè)置p節(jié)點(diǎn)else {p = succ(p);}}} }從源代碼角度來(lái)看整個(gè)入隊(duì)過(guò)程主要做二件事情。第一是定位出尾節(jié)點(diǎn),第二是使用CAS算法能將入隊(duì)節(jié)點(diǎn)設(shè)置成尾節(jié)點(diǎn)的next節(jié)點(diǎn),如不成功則重試。
第一步定位尾節(jié)點(diǎn)。tail節(jié)點(diǎn)并不總是尾節(jié)點(diǎn),所以每次入隊(duì)都必須先通過(guò)tail節(jié)點(diǎn)來(lái)找到尾節(jié)點(diǎn),尾節(jié)點(diǎn)可能就是tail節(jié)點(diǎn),也可能是tail節(jié)點(diǎn)的next節(jié)點(diǎn)。代碼中循環(huán)體中的第一個(gè)if就是判斷tail是否有next節(jié)點(diǎn),有則表示next節(jié)點(diǎn)可能是尾節(jié)點(diǎn)。獲取tail節(jié)點(diǎn)的next節(jié)點(diǎn)需要注意的是p節(jié)點(diǎn)等于p的next節(jié)點(diǎn)的情況,只有一種可能就是p節(jié)點(diǎn)和p的next節(jié)點(diǎn)都等于空,表示這個(gè)隊(duì)列剛初始化,正準(zhǔn)備添加第一次節(jié)點(diǎn),所以需要返回head節(jié)點(diǎn)。獲取p節(jié)點(diǎn)的next節(jié)點(diǎn)代碼如下:
final Node<E> succ(Node<E> p) {Node<E> next = p.getNext();return (p == next) ? head : next; }第二步設(shè)置入隊(duì)節(jié)點(diǎn)為尾節(jié)點(diǎn)。p.casNext(null, n)方法用于將入隊(duì)節(jié)點(diǎn)設(shè)置為當(dāng)前隊(duì)列尾節(jié)點(diǎn)的next節(jié)點(diǎn),p如果是null表示p是當(dāng)前隊(duì)列的尾節(jié)點(diǎn),如果不為null表示有其他線程更新了尾節(jié)點(diǎn),則需要重新獲取當(dāng)前隊(duì)列的尾節(jié)點(diǎn)。
hops的設(shè)計(jì)意圖。上面分析過(guò)對(duì)于先進(jìn)先出的隊(duì)列入隊(duì)所要做的事情就是將入隊(duì)節(jié)點(diǎn)設(shè)置成尾節(jié)點(diǎn),doug lea寫(xiě)的代碼和邏輯還是稍微有點(diǎn)復(fù)雜。那么我用以下方式來(lái)實(shí)現(xiàn)行不行?
public boolean offer(E e) {if (e == null)throw new NullPointerException();Node</e><e> n = new Node</e><e>(e);for (;;) {Node</e><e> t = tail;if (t.casNext(null, n) && casTail(t, n)) {return true;}} }讓tail節(jié)點(diǎn)永遠(yuǎn)作為隊(duì)列的尾節(jié)點(diǎn),這樣實(shí)現(xiàn)代碼量非常少,而且邏輯非常清楚和易懂。但是這么做有個(gè)缺點(diǎn)就是每次都需要使用循環(huán)CAS更新tail節(jié)點(diǎn)。如果能減少CAS更新tail節(jié)點(diǎn)的次數(shù),就能提高入隊(duì)的效率,所以doug lea使用hops變量來(lái)控制并減少tail節(jié)點(diǎn)的更新頻率,并不是每次節(jié)點(diǎn)入隊(duì)后都將 tail節(jié)點(diǎn)更新成尾節(jié)點(diǎn),而是當(dāng) tail節(jié)點(diǎn)和尾節(jié)點(diǎn)的距離大于等于常量HOPS的值(默認(rèn)等于1)時(shí)才更新tail節(jié)點(diǎn),tail和尾節(jié)點(diǎn)的距離越長(zhǎng)使用CAS更新tail節(jié)點(diǎn)的次數(shù)就會(huì)越少,但是距離越長(zhǎng)帶來(lái)的負(fù)面效果就是每次入隊(duì)時(shí)定位尾節(jié)點(diǎn)的時(shí)間就越長(zhǎng),因?yàn)檠h(huán)體需要多循環(huán)一次來(lái)定位出尾節(jié)點(diǎn),但是這樣仍然能提高入隊(duì)的效率,因?yàn)閺谋举|(zhì)上來(lái)看它通過(guò)增加對(duì)volatile變量的讀操作來(lái)減少了對(duì)volatile變量的寫(xiě)操作,而對(duì)volatile變量的寫(xiě)操作開(kāi)銷要遠(yuǎn)遠(yuǎn)大于讀操作,所以入隊(duì)效率會(huì)有所提升。
private static final int HOPS = 1;還有一點(diǎn)需要注意的是入隊(duì)方法永遠(yuǎn)返回true,所以不要通過(guò)返回值判斷入隊(duì)是否成功。
出隊(duì)列
出隊(duì)列的就是從隊(duì)列里返回一個(gè)節(jié)點(diǎn)元素,并清空該節(jié)點(diǎn)對(duì)元素的引用。讓我們通過(guò)每個(gè)節(jié)點(diǎn)出隊(duì)的快照來(lái)觀察下head節(jié)點(diǎn)的變化。?
從上圖可知,并不是每次出隊(duì)時(shí)都更新head節(jié)點(diǎn),當(dāng)head節(jié)點(diǎn)里有元素時(shí),直接彈出head節(jié)點(diǎn)里的元素,而不會(huì)更新head節(jié)點(diǎn)。只有當(dāng)head節(jié)點(diǎn)里沒(méi)有元素時(shí),出隊(duì)操作才會(huì)更新head節(jié)點(diǎn)。這種做法也是通過(guò)hops變量來(lái)減少使用CAS更新head節(jié)點(diǎn)的消耗,從而提高出隊(duì)效率。讓我們?cè)偻ㄟ^(guò)源碼來(lái)深入分析下出隊(duì)過(guò)程。
public E poll() {Node<E> h = head;// p表示頭節(jié)點(diǎn),需要出隊(duì)的節(jié)點(diǎn)Node<E> p = h;for (int hops = 0;; hops++) {// 獲取p節(jié)點(diǎn)的元素E item = p.getItem();// 如果p節(jié)點(diǎn)的元素不為空,使用CAS設(shè)置p節(jié)點(diǎn)引用的元素為null,如果成功則返回p節(jié)點(diǎn)的元素。if (item != null && p.casItem(item, null)) {if (hops >= HOPS) {// 將p節(jié)點(diǎn)下一個(gè)節(jié)點(diǎn)設(shè)置成head節(jié)點(diǎn)Node<E> q = p.getNext();updateHead(h, (q != null) ? q : p);}return item;}// 如果頭節(jié)點(diǎn)的元素為空或頭節(jié)點(diǎn)發(fā)生了變化,這說(shuō)明頭節(jié)點(diǎn)已經(jīng)被另外一個(gè)線程修改了。那么獲取p節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)Node<E> next = succ(p);// 如果p的下一個(gè)節(jié)點(diǎn)也為空,說(shuō)明這個(gè)隊(duì)列已經(jīng)空了if (next == null) {// 更新頭節(jié)點(diǎn)。updateHead(h, p);break;}// 如果下一個(gè)元素不為空,則將頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)設(shè)置成頭節(jié)點(diǎn)p = next;}return null; }首先獲取頭節(jié)點(diǎn)的元素,然后判斷頭節(jié)點(diǎn)元素是否為空,如果為空,表示另外一個(gè)線程已經(jīng)進(jìn)行了一次出隊(duì)操作將該節(jié)點(diǎn)的元素取走,如果不為空,則使用CAS的方式將頭節(jié)點(diǎn)的引用設(shè)置成null,如果CAS成功,則直接返回頭節(jié)點(diǎn)的元素,如果不成功,表示另外一個(gè)線程已經(jīng)進(jìn)行了一次出隊(duì)操作更新了head節(jié)點(diǎn),導(dǎo)致元素發(fā)生了變化,需要重新獲取頭節(jié)點(diǎn)。
?
文章來(lái)源:https://my.oschina.net/xianggao/blog/389332
轉(zhuǎn)載于:https://my.oschina.net/u/2988360/blog/956533
總結(jié)
以上是生活随笔為你收集整理的ConcurrentLinkedQueue非阻塞队列实现原理分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: juniper接口打环测试
- 下一篇: NumPy基础入门学习