Java8 PriorityBlockingQueue源码分析
在看這篇總結之前,建議大家先熟悉一下 PriorityQueue,這里主要介紹 PriorityBlockingQueue 一些特殊的性質,關于優先級隊列的知識不作著重介紹,因為過程與 PriorityQueue 都是一致的。
關于 PriorityQueue 的文章,你可以參考這里->點擊前往~
PriorityBlockingQueue 相關源碼分析
add 方法
public boolean add(E e) {return offer(e);}add 方法主要調用的是 offer 方法,下面我們來看 offer 方法。
public boolean offer(E e) {// 隊列所有的元素不允許為 nullif (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;// 加鎖lock.lock();int n, cap;Object[] array;// 判斷是否需要擴容while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;// 如果不自定義比較器,則默認為一個小頂堆,從下往上判斷進行調整if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;// 喚醒非空條件對象notEmpty.signal();} finally {// 釋放鎖lock.unlock();}return true;}offer 方法整體的流程并不是復雜,首先加鎖,然后判斷是否需要擴容,接著添加元素,添加元素也分了兩種情況,一種是沒有自定義比較器,默認是小頂堆,如果初始化了自定義比較器,則按照自定義比較器的邏輯添加元素,因為添加了元素,隊列肯定不為空,因此要喚醒 notEmpty 條件。
我們以不自定義比較器為例,看一下 siftUpComparable 方法是如何調整堆結構的。
private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0)break;array[k] = e;k = parent;}array[k] = key;}siftUpComparable 方法與 PriorityQueue 中對應的方法簡直是一模一樣,放一張圖在這,就不具體介紹了。
其中比較有意思的是 tryGrow 擴容方法,我們接下來看一下這個方法。
/*** Q:擴容操作為什么要允許多個線程進來呢?* A:如果整個擴容過程還加鎖的話,其他線程是不能修改隊列的,* 只能等待擴容完后才能繼續執行,并發效率比較低*/ private void tryGrow(Object[] array, int oldCap) {// 釋放鎖lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;/*** compareAndSwapInt:** this:當前對象的引用* allocationSpinLockOffset:allocationSpinLock 在內存中的偏移量* 0:allocationSpinLock 的預期值* 1:更新值*/if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {// 當容量小于 64 時容量為原來的兩倍 + 2,如果大于等于 64 時擴容為原來的 1.5 倍// 與 PriorityQueue 一致int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}// 初始化新的數組if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {// allocationSpinLock 置 0// 因此后面的線程獲取鎖也可能會嘗試 CAS 成功,然后初始化新數組allocationSpinLock = 0;}}/*** 如果當前線程嘗試 CAS 失敗,則嘗試讓步* Q:這里為什么要讓步?* A:因為自己不是成功初始化新數組的線程,就算獲取到了線程也不能正確擴容,* 因此讓步盡量讓成功擴容的線程獲取鎖*/if (newArray == null) // back off if another thread is allocatingThread.yield();/*** Q:在加鎖之前,可能由多個數組嘗試 CAS 成功,且成功的初始化了新的數組,* 那么是不是后面的新數組會覆蓋前面的數組呢?* A:當然答案肯定是不會的,那么是如何保證正確性的呢?關鍵在于 queue == array 判斷,* 因此只有第一個判斷成功的線程能正確擴容,其他非第一個線程再進行判斷的時候會返回 false,* 自然不會進行數組元素拷貝*/lock.lock();if (newArray != null && queue == array) {// 重置隊列內部數組queue = newArray;// 元素拷貝,同 PriorityQueueSystem.arraycopy(array, 0, newArray, 0, oldCap);}}這個方法比較特殊的地方在于先釋放了鎖,然后通過 CAS 操作判斷是否需要初始化新數組,嘗試 CAS 失敗的線程,會做出一個讓步,放棄 CPU 時間片,然后與其他線程一同競爭。這個過程我們可以思考以下幾個問題:
- 為什么不直接加鎖而是通過 CAS 加判斷操作完成擴容步驟
- 為什么嘗試 CAS 失敗的線程需要讓步
- 在多線程情況下可能會有多個線程初始化新數組,那如何保證操作一致性
這些問題在上面的方法里都總結了一些自己的想法,如果大家有不同的見解可以留言交流。
take 方法
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 加鎖(可響應中斷)lock.lockInterruptibly();E result;try {// 如果隊列為空,take 方法會阻塞出隊線程while ( (result = dequeue()) == null)/*** 如果隊列中沒有元素,會阻塞后續調用 take 方法出隊的線程* 直到隊列添加了元素后喚醒 notEmpty,才可以繼續執行*/notEmpty.await();} finally {// 釋放鎖lock.unlock();}return result;}take 方法中調用了 dequeue 方法,如下:
private E dequeue() {int n = size - 1;if (n < 0)return null;else {Object[] array = queue;// 堆頂的元素E result = (E) array[0];// 堆最底層的元素(最后一個)E x = (E) array[n];// 把最后一個元素置 null,因為要把它放到堆頂,向下逐步調整堆結構,與 PriorityQueue 一致array[n] = null;Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}}我們還以不自定義比較器為例,看下 siftDownComparable 方法。
private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1; // loop while a non-leafwhile (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = array[child];int right = child + 1;if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];if (key.compareTo((T) c) <= 0)break;array[k] = c;k = child;}array[k] = key;}}過程與 ProrityQueue 還是一樣的,就不分析了,放一張圖幫助大家理解吧。
(完)
總結
以上是生活随笔為你收集整理的Java8 PriorityBlockingQueue源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 焦糖炖蛋怎么做好吃
- 下一篇: 花溪牛肉粉什么时候创立的?