DelayQueue源码
介紹
一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在創建元素時,可以指定多久才能從隊列中獲取當前元素。只有延時期滿后才能從隊列中獲取元素。(DelayQueue可以運用在以下應用場景:1.緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。)
數據結構
public interface Delayed extends Comparable<Delayed> {/*** 返回與此對象相關的剩余延遲時間,以給定的時間單位表示*/long getDelay(TimeUnit unit); }?getDelay方法一般用內部存儲的事件,減去當前事件,即為剩余延遲事件
屬性
private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();/***用于優化內部阻塞通知的線程*/private Thread leader = null;private final Condition available = lock.newCondition();以支持優先級的PriorityQueue無界隊列作為一個容器,因為元素都必須實現Delayed接口,可以根據元素的過期時間來對元素進行排列,因此,先過期的元素會在隊首,每次從隊列里取出來都是最先要過期的元素。
leader是一個Thread元素,它在offer和take中都有使用,它代表當前獲取到鎖的消費者線程,
DelayQueue實現Leader-Folloer pattern
 1、當存在多個take線程時,同時只生效一個,即,leader線程
 2、當leader存在時,其它的take線程均為follower,其等待是通過condition實現的
 3、當leader不存在時,當前線程即成為leader,在delay之后,將leader角色釋放還原
 4、最后如果隊列還有內容,且leader空缺,則調用一次condition的signal,喚醒掛起的take線程,其中之一將成為新的leader
 5、最后在finally中釋放鎖
方法實現
offer,poll,peek
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);//如果插入元素是第一個元素if (q.peek() == e) {//leader設置為nullleader = null;//喚醒available.signal();}return true;} finally {lock.unlock();}}public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e);}public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();//如果未到期,則返回null,否則刪除if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}}public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null) {if (nanos <= 0)return null;elsenanos = available.awaitNanos(nanos);} else {long delay = first.getDelay(NANOSECONDS);//到期,則pollif (delay <= 0)return q.poll();if (nanos <= 0)return null;first = null; // don't retain ref while waitingif (nanos < delay || leader != null)//nanos<delay,表示超時剩余時間小于到期時間,nanos = available.awaitNanos(nanos);else {Thread thisThread = Thread.currentThread();//設置當前線程為leaderleader = thisThread;try {//等待條件long timeLeft = available.awaitNanos(delay);//剩余超時時間nanos -= delay - timeLeft;} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}}public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return q.peek();} finally {lock.unlock();}}put,take
/*** Retrieves and removes the head of this queue, waiting if necessary* until an element with an expired delay is available on this queue.** @return the head of this queue* @throws InterruptedException {@inheritDoc}*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 獲取可中斷鎖。lock.lockInterruptibly();try {for (;;) {// 從優先級隊列中獲取隊列頭元素E first = q.peek();if (first == null)// 無元素,當前線程加入等待隊列,并阻塞available.await();else {// 通過getDelay 方法獲取延遲時間long delay = first.getDelay(NANOSECONDS);if (delay <= 0)// 延遲時間到期,獲取并刪除頭部元素。return q.poll();first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {// 線程節點進入等待隊列 x 納秒。available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {// leader == null且還存在元素的話,喚醒一個消費線程。if (leader == null && q.peek() != null)available.signal();lock.unlock();}}public void put(E e) {offer(e);}take()方法邏輯:
1.獲取鎖
 2.取出優先級隊列q的首元素
 3.如果元素q的隊首/隊列為空,阻塞
 3.如果元素q的隊首(first)不為空,獲得這個元素的delay時間值,如果first的延遲delay時間值為0的話,說明該元素已經到了可以使用的時間,調用poll方法彈出該元素,跳出方法
 4.如果first的延遲delay時間值不為0的話,釋放元素first的引用,避免內存泄露
 5.循環以上操作,直到return
leader作用
如果leader不為null,說明已經有消費者線程拿到鎖,直接阻塞當前線程,如果leader為null,把當前線程賦值給leader,并等待剩余的到期時間,最后釋放leader,這里我們想象著我們有個多個消費者線程用take方法去取,如果沒有leader!=null的判斷,這些線程都會無限循環,直到返回第一個元素,很顯然很浪費資源。所以leader的作用是設置一個標記,來避免消費者的無腦競爭。
參考
- ReentrantLock源碼
- Java并發包--阻塞隊列(BlockingQueue)
- PriorityBlockingQueue源碼
總結
以上是生活随笔為你收集整理的DelayQueue源码的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: PriorityBlockingQueu
- 下一篇: LinkedBlockingDeque源
