Java Review - 并发编程_DelayQueue原理源码剖析
文章目錄
- 概述
- 類圖結構
- 小Demo
- 核心方法&源碼解讀
- offer操作
- take操作
- poll操作
- size操作
- 小結
概述
DelayQueue并發(fā)隊列是一個無界阻塞延遲隊列,隊列中的每個元素都有個過期時間,當從隊列獲取元素時,只有過期元素才會出隊列。
隊列頭元素是最快要過期的元素。
類圖結構
由該圖可知
-
DelayQueue內部使用PriorityQueue存放數據,使用ReentrantLock實現線程同步。
-
另外,隊列里面的元素要實現Delayed接口,由于每個元素都有一個過期時間,所以要實現獲知當前元素還剩下多少時間就過期了的接口,由于內部使用優(yōu)先級隊列來實現,所以要實現元素之間相互比較的接口。
- 條件變量available與lock鎖是對應的,其目的是為了實現線程間同步
- 其中l(wèi)eader變量的使用基于Leader-Follower模式的變體,用于盡量減少不必要的線程等待。當一個線程調用隊列的take方法變?yōu)閘eader線程后,它會調用條件變量available.awaitNanos(delay)等待delay時間,但是其他線程(follwer線程)則會調用available.await()進行無限等待
leader線程延遲時間過期后,會退出take方法,并通過調用available.signal()方法喚醒一個follwer線程,被喚醒的follwer線程被選舉為新的leader線程。
每日一博 - DelayQueue阻塞隊列源碼解讀
/*** Thread designated to wait for the element at the head of* the queue. This variant of the Leader-Follower pattern* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to* minimize unnecessary timed waiting. When a thread becomes* the leader, it waits only for the next delay to elapse, but* other threads await indefinitely. The leader thread must* signal some other thread before returning from take() or* poll(...), unless some other thread becomes leader in the* interim. Whenever the head of the queue is replaced with* an element with an earlier expiration time, the leader* field is invalidated by being reset to null, and some* waiting thread, but not necessarily the current leader, is* signalled. So waiting threads must be prepared to acquire* and lose leadership while waiting.*/private Thread leader = null;小Demo
import java.util.Random; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/19 23:05* @mark: show me the code , change the world*/ public class DelayQueueTest {static class DelayedEle implements Delayed {private final long delayTime; //延遲時間private final long expire; //到期時間private String data; //數據public DelayedEle(long delay, String data) {delayTime = delay;this.data = data;expire = System.currentTimeMillis() + delay;}/*** 剩余時間=到期時間-當前時間*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}/*** 優(yōu)先隊列里面優(yōu)先級規(guī)則*/@Overridepublic int compareTo(Delayed o) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic String toString() {final StringBuilder sb = new StringBuilder("DelayedElement{");sb.append("delay=").append(delayTime);sb.append(", expire=").append(expire);sb.append(", data='").append(data).append('\'');sb.append('}');return sb.toString();}}public static void main(String[] args) throws InterruptedException {// 1 創(chuàng)建延時隊列DelayQueue<DelayedEle> delayQueue = new DelayQueue<DelayedEle>();// 2 創(chuàng)建延時任務Random random = new Random();for (int i = 0; i < 10; i++) {DelayedEle ele = new DelayedEle(random.nextInt(500), "task-" + i);delayQueue.offer(ele);}System.out.println("開始操作,delayQueue隊列大小為:" + delayQueue.size());// 3 依次取出任務并打印DelayedEle delayedEle = null;try {// 3.1 循環(huán),如果想避免虛假喚醒,則不能把全部元素都打印出來for (; ; ) {// 3.2 獲取過期的任務并打印while ((delayedEle = delayQueue.take()) != null) {System.out.println(delayedEle.toString());}}} catch (InterruptedException e) {e.printStackTrace();}} }首先創(chuàng)建延遲任務DelayedEle類,其中delayTime表示當前任務需要延遲多少ms時間過期,expire則是當前時間的ms值加上delayTime的值。
另外,實現了Delayed接口,實現了long getDelay(TimeUnit unit)方法用來獲取當前元素還剩下多少時間過期,實現了int compareTo(Delayed o)方法用來決定優(yōu)先級隊列元素的比較規(guī)則。
在main函數內首先創(chuàng)建了一個延遲隊列,然后使用隨機數生成器生成了10個延遲任務,最后通過循環(huán)依次獲取延遲任務,并打印。運行上面代碼,一個可能的輸出如下所示。
可見,出隊的順序和delay時間有關,而與創(chuàng)建任務的順序無關。
核心方法&源碼解讀
offer操作
插入元素到隊列,如果插入元素為null則拋出NullPointerException異常,否則由于是無界隊列,所以一直返回true。插入元素要實現Delayed接口。
/*** Inserts the specified element into this delay queue.** @param e the element to add* @return {@code true}* @throws NullPointerException if the specified element is null*/public boolean offer(E e) {final ReentrantLock lock = this.lock; // 1 lock.lock();try {q.offer(e);if (q.peek() == e) {// 2 leader = null;available.signal();}return true;} finally {lock.unlock();}}- 首先獲取獨占鎖,然后添加元素到優(yōu)先級隊列,由于q是優(yōu)先級隊列,所以添加元素后,調用q.peek()方法返回的并不一定是當前添加的元素
- 如果代碼(2)判斷結果為true,則說明當前元素e是最先將過期的,那么重置leader線程為null,這時候激活avaliable變量條件隊列里面的一個線程,告訴它隊列里面有元素了。
take操作
獲取并移除隊列里面延遲時間過期的元素,如果隊列里面沒有過期元素則等待。
/*** Retrieves and removes the head of this queue, waiting if necessary* until an element with an expired delay is available on this queue.** @return the head of this queue* @throws InterruptedException {@inheritDoc}*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {// 1 獲取但不移除隊首元素 E first = q.peek();if (first == null) available.await(); // 2 else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0) // 3 return q.poll();first = null; // don't retain ref while waiting if (leader != null) // 4available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread; // 5try {available.awaitNanos(delay); // 6} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null) // 7 available.signal();lock.unlock();}}-
首先獲取獨占鎖lock。假設線程A第一次調用隊列的take()方法時隊列為空,則執(zhí)行代碼(1)后first==null,所以會執(zhí)行代碼(2)把當前線程放入available的條件隊列里阻塞等待。
-
當有另外一個線程B執(zhí)行offer(item)方法并且添加元素到隊列時,假設此時沒有其他線程執(zhí)行入隊操作,則線程B添加的元素是隊首元素,那么執(zhí)行q.peek()。
-
e這時候就會重置leader線程為null,并且激活條件變量的條件隊列里面的一個線程。此時線程A就會被激活。
-
線程A被激活并循環(huán)后重新獲取隊首元素,這時候first就是線程B新增的元素,可知這時候first不為null,則調用first.getDelay(TimeUnit.NANOSECONDS)方法查看該元素還剩余多少時間就要過期,如果delay<=0則說明已經過期,那么直接出隊返回。
-
否則查看leader是否為null,不為null則說明其他線程也在執(zhí)行take,則把該線程放入條件隊列。如果這時候leader為null,則選取當前線程A為leader線程,
-
然后執(zhí)行代碼(5)等待delay時間(這期間該線程會釋放鎖,所以其他線程可以offer添加元素,也可以take阻塞自己),剩余過期時間到后,線程A會重新競爭得到鎖,然后重置leader線程為null,重新進入循環(huán),這時候就會發(fā)現隊頭的元素已經過期了,則會直接返回隊頭元素。
-
在返回前會執(zhí)行finally塊里面的代碼(7),代碼(7)執(zhí)行結果為true則說明當前線程從隊列移除過期元素后,又有其他線程執(zhí)行了入隊操作,那么這時候調用條件變量的singal方法,激活條件隊列里面的等待線程。
poll操作
獲取并移除隊頭過期元素,如果沒有過期元素則返回null。
/*** Retrieves and removes the head of this queue, or returns {@code null}* if this queue has no elements with an expired delay.** @return the head of this queue, or {@code null} if this* queue has no elements with an expired delay*/public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();// 如果隊列為空,或者不為空但是對頭元素沒有過期,則返回nullif (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}}首先獲取獨占鎖,然后獲取隊頭元素,如果隊頭元素為null或者還沒過期則返回null,否則返回隊頭元素。
size操作
計算隊列元素個數,包含過期的和沒有過期的。
public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return q.size();} finally {lock.unlock();}}先獲取獨占鎖,然后調用優(yōu)先級隊列的size方法。
小結
DelayQueue隊列內部使用PriorityQueue存放數據,使用ReentrantLock實現線程同步。
另外隊列里面的元素要實現Delayed接口,其中一個是獲取當前元素到過期時間剩余時間的接口,在出隊時判斷元素是否過期了,一個是元素之間比較的接口,因為這是一個有優(yōu)先級的隊列。
《新程序員》:云原生和全面數字化實踐50位技術專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Java Review - 并发编程_DelayQueue原理源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Review - 并发编程_P
- 下一篇: Java Review - 并发编程_T