ArrayBlockingQueue, LinkedBlockingQueue, ConcurrentLinkedQueue, RingBuffer
1. ArrayBlockingQueue, LinkedBlockingQueue, ConcurrentLinkedQueue
ArrayBlockingQueue, LinkedBlockingQueue 繼承自 BlockingQueue, 他們的特點就是 Blocking, Blocking 特有的方法就是 take() 和 put(), 這兩個方法是阻塞方法, 每當隊列容量滿的時候, put() 方法就會進入wait, 直到隊列空出來, 而每當隊列為空時, take() 就會進入等待, 直到隊列有元素可以 take()
ArrayBlockingQueue, LinkedBlockingQueue 區(qū)別在于 ArrayBlockingQueue 必須指定容量, 且可以指定 fair 變量, 如果 fair 為 true, 則會保持 take() 或者 put() 操作時線程的 block 順序, 先 block 的線程先 take() 或 put(), fair 又內(nèi)部變量 ReentrantLock 保證
ConcurrentLinkedQueue 通過 CAS 操作實現(xiàn)了無鎖的 poll() 和 offer(), 他的容量是動態(tài)的, 由于無鎖, 所以在 poll() 或者 offer() 的時候 head 與 tail 可能會改變, 所以它會持續(xù)的判斷 head 與 tail 是否改變來保證操作正確性, 如果改變, 則會重新選擇 head 與 tail. 而由于無鎖的特性, 他的元素更新與 size 變量更新無法做到原子 (實際上它沒有 size 變量), 所以他的 size() 是通過遍歷 queue 來獲得的, 在效率上是 O(n), 而且無法保證準確性, 因為遍歷的時候有可能 queue size 發(fā)生了改變.
?
RingBuffer 是 Distruptor 中的一個用來替代 ArrayBlockingQueue 的隊列, 它的思想在于長度可控, 且無鎖, 只有在 blocking 的時候(沒有數(shù)據(jù)的時候出隊, 數(shù)據(jù)滿的時候入隊)會自旋. 實現(xiàn)原理是使用一個環(huán)形array, 生產(chǎn)者作為 tail, 消費者作為 head, 每生產(chǎn)一次 tail atomic++, 每消費一次 head atomic++, tail 不能超過 head 一圈(array size, 即隊列滿時 blocking), tail 不能超過自己tail一圈(即不能覆蓋未被消費的值), head 不能超過 tail (即無可消費任務(wù)時 blocking), head 不能取到空值(取到空值時 blocking). blocking 使用一個 while 自旋來完成, 那么只要生產(chǎn)者消費者的速度相當時, 即可通過 atomicInteger(cas) 保證無鎖, 而如果你需要在 blocking 的時候立即返回, 則 while 自旋都可以不需要. 相比于 ArrayBlockingQueue, 它可以絕大部分時間無鎖, blocking 自旋, 相比于 concurrentLinkedQueue, 他又能做到長度限制. 代碼如下:
public class RingBuffer<T> implements Serializable {/****/private static final long serialVersionUID = 6976960108708949038L;private volatile AtomicInteger head;private volatile AtomicInteger tail;private int length;final T EMPTY = null;private volatile T[] queue;public RingBuffer(Class<T> type, int length){this.head = new AtomicInteger(0);this.tail = new AtomicInteger(0);this.length = length == 0 ? 2 << 16 : length; // 默認2^16 this.queue = (T[]) Array.newInstance(type, this.length);}public void enQueue(T t){if(t == null) t= (T) new Object();// 阻塞 -- 避免多生成者循環(huán)生產(chǎn)同一個節(jié)點 while(this.getTail() - this.getHead() >= this.length);int ctail = this.tail.getAndIncrement();while(this.queue[this.getTail(ctail)] != EMPTY); // 自旋 this.queue[this.getTail(ctail)] = t;}public T deQueue(){T t = null;// 阻塞 -- 避免多消費者循環(huán)消費同一個節(jié)點 while(this.head.get() >= this.tail.get());int chead = this.head.getAndIncrement();while(this.queue[this.getHead(chead)] == EMPTY); // 自旋 t = this.queue[this.getHead(chead)];this.queue[this.getHead(chead)] = EMPTY;return t;}public int getHead(int index){return index & (this.length - 1);}public int getTail(int index) {return index & (this.length - 1);}public int getHead() {return head.get() & (this.length - 1);}public int getTail() {return tail.get() & (this.length - 1);}public T[] getQueue() {return queue;}public int getLength() {return length;}public void setLength(int length) {this.length = length;}}?
?
轉(zhuǎn)載于:https://www.cnblogs.com/zemliu/p/3823597.html
總結(jié)
以上是生活随笔為你收集整理的ArrayBlockingQueue, LinkedBlockingQueue, ConcurrentLinkedQueue, RingBuffer的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jquery如何获取元素的滚动高度
- 下一篇: 数据可视化(9)--数据可视化6步法