java 超时集合_Java之集合(二十三)SynchronousQueue
1.前言
本章介紹阻塞隊列SynchronousQueue。之前介紹過LinkedTransferQueue,特點提供了讓生產者知道消費者消費了其產出,沒消費就等待的模式,本章介紹的這個類則必須是生產者生產后消費者消費了才會繼續下去,反之亦然,消費者必須等待生產者產出。SynchronousQueue只有這一種模式,而LinkedTransferQueue是可選的,SynchronousQueue不存儲元素,像接力棒一樣,沒有交接就一直等。所以其特定是沒有容量,不能peek查看,如果沒有消費者不能插入,不能遍歷,該隊列表現的就像一個空的集合。同樣的,該隊列不接受空元素。默認情況下,線程的等待喚醒是非公平的,可以設置成公平模式,保證線程是先入先出(先到先得)。通常兩種模式的性能差不多,非公平模式可以維持更多的線程,公平模式則支持更高的吞吐量。
2.SynchronousQueue
2.1 實現原理
該類的實現是基于dual stack和dual queue算法,dual queue在LinkedTransferQueue中介紹過。queue和stack都包含數據節點和請求節點,其特點就是任何操作都能明確當前隊列的所處模式(數據--沒有被消費者消費或請求--沒有生產者)。stack和queue都繼承自抽象類Transferer,其定義了唯一方法transfer用來put或者take,在dual數據結構中,定義成一個方法原因在于put和take操作是對稱的。
SynchronousQueue的實現與原算法有些不同的地方:1、原算法使用bit-marked指針,這里使用mode bits,導致了一系列的改動。2、SynchronousQueue會阻塞線程等待到裝滿。3、通過超時和中斷,支持取消操作,包括清除所有取消節點/線程,避免垃圾存留或內存損耗。
阻塞操作大多通過LockSupport類的park或unpark方法,除非是在多核CPU上該節點看起來是下一個首個填滿的結點,通過自旋一位。在非常忙碌的隊列中,自旋可以顯著提升吞吐量。cleaning操作在queue和stack中不同,queue中remove操作是O(1)時間,但是stack為O(n)時間。
2.2 數據結構
Transferer就是上面所說的抽象類,里面只有一個方法。后面也有Stack和Queue的實現。
NCPUS:當前主機CPU核數
maxTimedSpins:限時等待阻塞前自旋的次數,單核為0,多核32
maxUntimedSpins:不限時等待阻塞前自旋的次數,maxTimedSpins * 16
spinForTimeoutThreshold:納秒數,這個為了更快的自旋而不是使用park時間。初略估計足夠了,默認1000
transferer:具體使用的實現對象。
通過構造函數可以看出,公平模式使用的是queue,非公平模式使用的是stack,默認非公平。
2.3 基本操作
該類的基本操作都是基于transferer實現的,所以這里就不進行介紹。
存入取出的不同之處只在于第一參數是否是null,不為null就是存入,為null就是取出。所以該隊列也不能存入null元素。其它的方法都是空。
2.4 TransferQueue
數據結構和之前所講LinkedTransferQueue基本一致,方法也類似。主要看transfer(E,boolean,long)方法?;镜乃惴ň褪茄h做兩件事情:1、如果隊列為空或者持有相同的模式的結點,嘗試添加隊列結點,等待fulfilled或cancelled,并返回匹配項。2、如果隊列不為空,放入的和其模式相反,即可以匹配就通過CAS操作填充該節點的item字段并出隊列,返回匹配項。
代碼過長不給出,描述一下相關過長:
1、通過E來判斷當前調用是一個什么模式的結點。
2、死循環處理:
1.頭尾節點存在null,為初始化進行循環。
2.隊列為空或模式一致:
判斷t是否是當前的尾,不是意味丟失尾,重新循環
判斷當前尾的下一個是否為null,不為null就是尾結點滯后了,重新設置尾結點,重新循環
不等待就返回null
創建該節點
設置尾結點的下一個節點失敗,被搶先,重新循環
成功重置尾結點。
進行等待指定時間。
超時被取消,清除返回null。
丟失順序,重置頭
返回結果。
3.隊列不為空且模式不一致:
頭結點的下一個節點,如果為空或者頭尾結點被改變了,讀取不一致重新循環。
此刻沒有亂序,取出節點的item,進行CAS操作判斷是否被搶先了,被搶先了移除該節點,繼續循環嘗試。
成功了移除該節點,解除waiter的等待。
2.5 TransferStack
stack的結點數據結構,和queue的有些不同,就是多了一個match結點。node有四個方法:1、CAS設置next結點。2、CAS設置match結點,返回匹配結構。3、取消當前結點。4、返回當前結點是否取消。
transfer方法的邏輯和queue的類似,stack的transfer循環需要做三件事情:1、如果棧為空或者模式相同,生成結點入棧等待匹配,返回結果或空如果超時。2、如果棧不為空且模式不同,匹配等待的結點,兩個都出棧,返回匹配值。由于其他線程可能執行第3點,匹配或者斷開連接可能不是必須的。3、如果棧頂元素匹配成功,幫助其出棧匹配,然后繼續循環。
整個流程就是上面3點,其他的照著看代碼應該較為簡單,和queue的思路差不多。由于第三點需要幫助其他線程出棧,這個過程可能被其它后到線程搶先,所以是非公平的。
3.使用例子
@Test
public void testSynchronous() {
SynchronousQueue queue = new SynchronousQueue<>();
System.out.println(queue.offer(1));// 立即返回,必須要有消費者
System.out.println(queue.poll());// 立即返回,必須要有生產者
long start = System.currentTimeMillis();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"-" +
queue.take()+",耗時:"+(System.currentTimeMillis()-start)); // 沒有生產者一直阻塞
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+"-" + queue.take());
Thread.sleep(1500);
System.out.println(Thread.currentThread().getName()+"-" + queue.poll(1, TimeUnit.SECONDS));
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
},"consumer").start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+"-等待2被消耗:"+queue.offer(2));
System.out.println(Thread.currentThread().getName()+"-等待3被消耗:");
long one = System.currentTimeMillis();
queue.put(3);
System.out.println("3被消耗,耗時:" + (System.currentTimeMillis() - one));
System.out.println(Thread.currentThread().getName()+"-等待4被消耗:" +
queue.offer(4, 1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"prodcuer").start();
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
總結
以上是生活随笔為你收集整理的java 超时集合_Java之集合(二十三)SynchronousQueue的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 论文英文参考文献[10]的时候后面多空格
- 下一篇: !!基础---c# 下载网页+图片