PriorityBlockingQueue源码
介紹
一個(gè)支持線程優(yōu)先級(jí)排序的無(wú)界隊(duì)列,默認(rèn)自然序進(jìn)行排序,也可以自定義實(shí)現(xiàn)compareTo()方法來(lái)指定元素排序規(guī)則,不能保證同優(yōu)先級(jí)元素的順序。隊(duì)列中的元素必須是可比較的,即實(shí)現(xiàn)Comparable接口,或者在構(gòu)建函數(shù)時(shí)提供可對(duì)隊(duì)列元素進(jìn)行比較的Comparator對(duì)象。不可以放null,會(huì)報(bào)空指針異常。
數(shù)據(jù)結(jié)構(gòu)
PriorityBlockingQueue內(nèi)部使用heap做為存儲(chǔ)結(jié)構(gòu),如下圖:
二叉樹(shù)存入數(shù)組的方式很簡(jiǎn)單,就是從上到下,從左到右。PriorityQueue的是一個(gè)有特點(diǎn)的完全二叉樹(shù),且不允許出現(xiàn)null節(jié)點(diǎn),其父節(jié)點(diǎn)都比葉子節(jié)點(diǎn)小,這個(gè)是堆排序中的小頂堆。如果按數(shù)組順序我們可以得到如下結(jié)論:
- 左葉子節(jié)點(diǎn)=父節(jié)點(diǎn)下標(biāo)*2+1
- 右葉子節(jié)點(diǎn)=父節(jié)點(diǎn)下標(biāo)*2+2
- 父節(jié)點(diǎn)=(葉子節(jié)點(diǎn)-1)/2
加入節(jié)點(diǎn):
新加入的元素x可能會(huì)破壞小頂堆的性質(zhì),因此需要進(jìn)行調(diào)整。調(diào)整的過(guò)程為:從k指定的位置開(kāi)始,將x逐層與當(dāng)前點(diǎn)的parent進(jìn)行比較并交換,直到滿足x >= queue[parent]為止
獲取元素
由于堆用數(shù)組表示,根據(jù)下標(biāo)關(guān)系,0下標(biāo)處的那個(gè)元素既是堆頂元素。所以直接返回?cái)?shù)組0下標(biāo)處的那個(gè)元素即可。
刪除第一個(gè)元素
從k指定的位置開(kāi)始,將x逐層向下與當(dāng)前點(diǎn)的左右孩子中較小的那個(gè)交換,直到x小于或等于左右孩子中的任何一個(gè)為止。
刪除任意一個(gè)元素
由于刪除操作會(huì)改變隊(duì)列結(jié)構(gòu),所以要進(jìn)行調(diào)整;又由于刪除元素的位置可能是任意的,所以調(diào)整過(guò)程比其它函數(shù)稍加繁瑣。具體來(lái)說(shuō),remove(Object o)可以分為2種情況:1. 刪除的是最后一個(gè)元素。直接刪除即可,不需要調(diào)整。2. 刪除的不是最后一個(gè)元素,從刪除點(diǎn)開(kāi)始以最后一個(gè)元素為參照調(diào)用一次siftDown()即可
?
主要屬性
/*** 空間大小默認(rèn)值:11.*/private static final int DEFAULT_INITIAL_CAPACITY = 11;/*** 空間大小最大值:Integer.MAX_VALUE - 8.*/private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;/*** 隊(duì)列元素?cái)?shù)組。平衡二叉堆實(shí)現(xiàn),父節(jié)點(diǎn)下標(biāo)是n,左節(jié)點(diǎn)則是2n+1,右節(jié)點(diǎn)是2n+2。最小的元素在最前面,元素通過(guò)comparator比較。*/private transient Object[] queue;/*** 入隊(duì)元素個(gè)數(shù)*/private transient int size;/*** The comparator, or null 表示自然排序*/private transient Comparator<? super E> comparator;/*** Lock used for all public operations*/private final ReentrantLock lock;/*** Condition for blocking when empty*/private final Condition notEmpty;/***擴(kuò)容數(shù)組分配資源時(shí)的自旋鎖,CAS需要*/private transient volatile int allocationSpinLock;/***只用于序列化的時(shí)候,為了兼容之前的版本。只有在序列化和反序列化的時(shí)候不為null。*/private PriorityQueue<E> q;方法實(shí)現(xiàn)
offer,poll,peek
public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;//如果入隊(duì)數(shù)量大于或者等于heap大小,則擴(kuò)容while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e); // never need to block}public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return dequeue();} finally {lock.unlock();}}public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null && nanos > 0)nanos = notEmpty.awaitNanos(nanos);} finally {lock.unlock();}return result;}public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return (size == 0) ? null : (E) queue[0];} finally {lock.unlock();}}?put,take
public void put(E e) {offer(e); // never need to block}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)notEmpty.await();} finally {lock.unlock();}return result;}enqueue,dequeue
private E dequeue() {int n = size - 1;if (n < 0)return null;else {Object[] array = queue;//array[0]即為隊(duì)首。E result = (E) array[0];//最后一個(gè)元素E x = (E) array[n];array[n] = null;Comparator<? super E> cmp = comparator;//把最后一個(gè)元素放置在0位置。并進(jìn)行下沉。if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}} private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}array[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array,Comparator<? super T> cmp) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0)break;array[k] = e;k = parent;}array[k] = x;}?up,down
/*** 在位置k處插入x。一直向root方向up,直到大于等于等于它的父親* @param k the position to fill* @param x the item to insert* @param array the heap array*/ private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {//獲取父親元素下標(biāo)。int parent = (k - 1) >>> 1;Object e = array[parent];//不比父親元素小,則退出if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}//在位置k處插入xarray[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array,Comparator<? super T> cmp) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (cmp.compare(x, (T) e) >= 0)break;array[k] = e;k = parent;}array[k] = x;} /*** 在k處插入x,將x逐層向下與當(dāng)前點(diǎn)的左右孩子中較小的那個(gè)交換,直到x小于或等于左右孩子中的任何一個(gè)為止* @param k the position to fill* @param x the item to insert* @param array the heap array* @param n heap size*/private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1; // k <half,才可能有子節(jié)點(diǎn)。while (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = array[child];int right = child + 1;//有右孩子,并且右孩子值小于或者右孩子,則與右孩子進(jìn)行交換if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];//如果值小于孩子,直接退出if (key.compareTo((T) c) <= 0)break;//否則,k處放置孩子的值,k設(shè)置為孩子的位置array[k] = c;k = child;}array[k] = key;}}private static <T> void siftDownUsingComparator(int k, T x, Object[] array,int n,Comparator<? super T> cmp) {if (n > 0) {int half = n >>> 1;while (k < half) {int child = (k << 1) + 1;Object c = array[child];int right = child + 1;if (right < n && cmp.compare((T) c, (T) array[right]) > 0)c = array[child = right];if (cmp.compare(x, (T) c) <= 0)break;array[k] = c;k = child;}array[k] = x;}}?
private void removeAt(int i) {Object[] array = queue;int n = size - 1;if (n == i) // 最后一個(gè)元素array[i] = null;else {E moved = (E) array[n];array[n] = null;Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(i, moved, array, n);elsesiftDownUsingComparator(i, moved, array, n, cmp);//如果是最后一個(gè)元素移動(dòng)到i,說(shuō)明未下層,則UPif (array[i] == moved) {if (cmp == null)siftUpComparable(i, moved, array);elsesiftUpUsingComparator(i, moved, array, cmp);}}size = n;}注意
- 所有入庫(kù)操作,例如offer,put等都不會(huì)阻塞,因?yàn)殛?duì)列是無(wú)界的。
參考
- ReentrantLock源碼
- Java并發(fā)包--阻塞隊(duì)列(BlockingQueue)
總結(jié)
以上是生活随笔為你收集整理的PriorityBlockingQueue源码的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: ArrayBlockingQueue源码
- 下一篇: DelayQueue源码