Queue —— JUC 的豪华队列组件
目錄
- 引言
- 一、Queue 的繼承關系
- 1.1 Queue 定義基礎操作
- 1.2 AbstractQueue 為子類減負
- 1.3 BlockingQueue 阻塞式Queue
- 1.4 Deque 兩頭進出
- 二、Queue 的重要實現
- 三、BlockingQueue 的實現原理
- 四、Queue 在生產者消費者模式中的應用
- 五、Queue 在線程池中的應用
- 六、ConcurrentLinkedQueue
- 總結
引言
Queue 是Collection 接口下的另一個重要接口。
常常作為生產者/消費者模式、線程池任務隊列等基礎組件的形式存在。
JUC中提供了豐富的Queue組件,如 ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue、SynchronousQueue 等等。
學習 Queue 家族對于理解生產者消費者模式、并發、線程池等至關重要。
本篇博客總結 Queue 家族的接口、類,和一些重要的實現,及其部分底層邏輯。
一、Queue 的繼承關系
Queue 接口下面幾個重要的抽象有 AbstractQueue、BlockingQueue、Deque:
1.1 Queue 定義基礎操作
Queue 接口定義了 6 個基本方法:
| 添加 | add() | offer() |
| 移除 | remove() | poll() |
| 查看 | element() | peek() |
1.2 AbstractQueue 為子類減負
AbstractQueue 實現了 Queue 中的 add()、remove()、element(),為它們添加了失敗時拋異常的邏輯:
同時它還實現了 Collection 接口中的一些公共方法,如 clear()、addAll() 等。
大多數常用 Queue 都實現了這個抽象類。
它的作用是為子類填補常規集合所需要的添加、刪除操作的邏輯,讓子類更加專注于某種特性的實現,為子類減負。 由于這些方法是集合通用API,因此提升到抽象類中來實現,
在實際使用子類Queue的時候,這些方法往往不常使用。
1.3 BlockingQueue 阻塞式Queue
BlockingQueue 毫不意外的繼承了 Queue 接口。
并在 Queue 的基礎之上,擴展了兩個新的阻塞式方法:
又是一對添加和取出的組合方法,這兩個方法要求子類必須以阻塞的方式放入和取出元素,所謂阻塞式行為就是由于隊列中的容量限制等因素,導致在隊列已滿或為空時,無法再繼續添加或取出的時候,線程必須等待不返回,直到條件滿足完成操作。
舉個例子:
去飯店吃午飯,但是廚師沒有做好的飯菜(隊列為空),你可以選擇離開去別的飯店(非阻塞直接返回),也可以選擇留下來,等待廚師把飯菜做好,吃完再走(阻塞直到成功)。
在 Queue 中,offer() 和 poll() 都是非阻塞式的方法,put() 和 take() 是阻塞式的方法,我們也可以理解為 put() 就是阻塞式的 offer() ,take() 就是阻塞式的 poll()。
1.4 Deque 兩頭進出
經典的隊列結構是 FIFO,即先進先出,一般都是“尾進頭出”。在一些常規的 Queue 組件的源碼中也經常看到會 setTail 這樣的操作。
為了滿足更豐富的應用場景,Queue 家族引入了 Deque 抽象。
Deque 意為 “雙端隊列”,可以在一端進也可以在這端出:
值得一提的是,對于經典數據結構中的 Stack 棧結構,Deque 也提供了相應的方法:
二、Queue 的重要實現
Queue 的重要實現有以下這些:
| 面向并發 | ConcurrentLinkedQueue | 基于CAS的并發線程安全的隊列容器 |
| 阻塞式 | ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue | LinkedBQ 雖然是用鏈表實現的,但依然是有界隊列,最大值為 Integer.maxValue,DelayQueue 可以在任務提交后延遲執行 |
| 雙端隊列 | LinkedList | 從名字完全看不出是隊列的雙端隊列 |
| 交換隊列 | TransferQueue(阻塞式) | TransferQueue是一個接口,但它只有一個實現:LinkedTransferQueue |
| 同步隊列 | SynchronousQueue(阻塞式) | 不存儲元素的隊列,手遞手傳遞元素 |
這些隊列往往都會繼承 AbstractQueue,同時又兼具其他特性,如 阻塞、延遲 等等。
三、BlockingQueue 的實現原理
凡是支持阻塞操作的隊列,都繼承自 BlockingQueue,是線程安全的隊列容器。
在 ArrayBlockingQueue 和 LinkedBlockingQueue 中,都用到了顯式鎖 Lock 和條件隊列 Condition。
在《Java 并發編程實戰》一書中的 “14.3 顯式的 Condition 對象” 中也舉例了類似的有界緩存代碼。
Condition 的原理是將基于不同的前提條件的依賴性操作區分在多個條件等待隊列中,基于不同條件等待和喚醒。
以 ArrayBlockingQueue 為例,當執行 put 或 take 的時候,首先需要拿到主鎖 lock。
緊接著就是校驗前置條件:count == items.length 或 count == 0,如果條件不滿足,就執行相應 Condition.await。
如果條件滿足,那么執行入隊或出隊方法——enqueue(E) 或 dequeue:
四、Queue 在生產者消費者模式中的應用
在沒有阻塞隊列的情況下,如果想要實現生產者-消費者,必須自行處理生產者的狀態切換和消費者的狀態切換。
而 BlockingQueue 的實現中已經集成了 Condition 和 Lock 等一系列生產者-消費者模式所必須的關鍵元素,讓代碼變得異常簡潔。
不需要考慮生產者或消費者的線程狀態,只要調用了阻塞隊列的 put 和 take,BlockingQueue 會自動為我們維護線程狀態的切換。
以下代碼是使用 ArrayBlockingQueue(5) 實現的生產者消費者模式。
producer 線程負責生產任務,consumer 線程負責消費任務,隊列中只能容納 5 個元素。stateMonitor 線程用于監控任務隊列的滿、空情況,并打印 producer 和 consumer 線程的狀態。
通過調整 producer 和 consumer 線程的工作快慢程度,可以很清晰的感受到 BlockingQueue 的強大!
提示:由于程序中為了模擬生產或消費動作需要一定時間,使用了 sleep ,“動作更快”的線程會打印出 TIMED_WAITING ,它并不是 BlockingQueue 處理成的線程狀態,直接忽略即可。
/*** 使用 BlockingQueue 實現生產者消費者模式** @author 圣斗士Morty* @data 2021/6/27 13:29*/ public class T08_ProducerConsumerMode {/* 調換producer和consumer線程的快慢程度,觀察執行結果*/static int slow = 5, fast = 2;public static void main(String[] args) {// 可容納 5 個任務的有界阻塞隊列BlockingQueue<String> taskQueue = new ArrayBlockingQueue<>(5);// 生產者線程Thread producer = new Thread(() -> {for (int i = 0; ; i++) {try {TimeUnit.SECONDS.sleep(fast);System.out.println("正在生產'任務" + i + "'...");taskQueue.put("任務" + i);} catch (InterruptedException e) {e.printStackTrace();}}});// 消費者線程Thread concumer = new Thread(() -> {for (; ; ) {try {TimeUnit.SECONDS.sleep(slow);String task = taskQueue.take();System.out.println("正在消費'" + task + "'...");} catch (InterruptedException e) {e.printStackTrace();}}});// 狀態監控線程Thread stateMonitor = new Thread(() -> {for (; ; ) {try {TimeUnit.SECONDS.sleep(1);if (taskQueue.isEmpty())System.out.println("消費者狀態:" + concumer.getState());if (taskQueue.size() == 5)System.out.println("生產者狀態:" + producer.getState());} catch (InterruptedException e) {e.printStackTrace();}}});stateMonitor.start();producer.start();concumer.start();} }五、Queue 在線程池中的應用
這里簡單帶一下隊列在線程池中的應用。
juc 中提供了四種常用的線程池實現:
Executors 是類似Collections的工具方法,以上四個方法返回一個 ExecutorService 對象,實際上是它的一個子類 ThreadPoolExecutor 和 ScheduledExecutorService。
以 cachedThreadPool 和 fixedThreadPool 為例:
它們直接返回固定構造結果的 ThreadPoolExecutor,構造器如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}ThreadPoolExecutor 的構造器如上所示,它有 7 個參數:
1、corePoolSize :核心線程數,隨線程池永遠存活的線程數量
2、maximumPoolSize:最大線程數
3、keepAliveTime:空閑時存活時間
4、TimeUnit :存活時間單位,3、4兩個參數共同決定非核心線程可以空閑多久回收給操作系統
5、workQueue:一個 BlockingQueue類型的任務隊列。
6、threadFactory:線程工廠,定義線程的創建方式
7、RejectedExecutionHandler:拒絕策略,當線程池已滿時應該執行怎樣的拒絕策略
cachedThreadPool 使用 SynchronousQueue 作為任務隊列,SynchronousQueue是手遞手式的隊列,即放入的元素都必須直接交給消費者。
通過這樣一個隊列,任何任務提交到 cachedThreadPool 之后,都會立即被消費,如果任務足夠多,可能會打到 Integer.MAX_VALUE 的級別,所以在實際開發中,一定要清楚的知道業務中的任務規模才能很好的使用。
fixedThreadPool 使用 LinkedBlockingQueue 作為任務隊列的實現,這個線程池能夠維持一個固定任務數的線程池。但是為什么不使用 ArrayBlockingQueue 呢?
六、ConcurrentLinkedQueue
ConcurrentLinkedQueue 是針對并發場景下提供的一種線程安全的隊列容器,繼承 AbstractQueue,同時實現了 Queue。
線程安全的非阻塞隊列,沒有支持阻塞的 put 和 take 方法。
內部維護了 Node 鏈表結構。
offer()、poll()等方法都采用了CAS的方式,保證了線程安全性,同時也兼顧了性能:
總結
了解 Queue 的類圖結構非常關鍵,重點記憶 BlockingQueue 的結構和擴展的兩個方法 put() 、take(),這是阻塞式API的關鍵:
| 添加 | add() | offer() | put() |
| 取出 | remove() | poll() | take() |
| 查看 | element() | peek() |
以上表格是Queue api中的重點,需要熟記。
常見的實現隊列:
| 面向并發 | ConcurrentLinkedQueue | 基于CAS的并發線程安全的隊列容器 |
| 阻塞式 | ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue | LinkedBQ 雖然是用鏈表實現的,但依然是有界隊列,最大值為 Integer.maxValue,DelayQueue 可以在任務提交后延遲執行 |
| 雙端隊列 | LinkedList | 從名字完全看不出是隊列的雙端隊列 |
| 交換隊列 | TransferQueue(阻塞式) | TransferQueue是一個接口,但它只有一個實現:LinkedTransferQueue |
| 同步隊列 | SynchronousQueue(阻塞式) | 不存儲元素的隊列,手遞手傳遞元素 |
常見的四種線程池:
| Executors.newCachedThreadPool(); | SynchronousQueue |
| Executors.newFixedThreadPool(10); | LinkedBlockingQueue |
| Executors.newSingleThreadExecutor(); | LinkedBlockingQueue |
| Executors.newScheduledThreadPool(10); | DelayedWorkQueue |
ConcurrentLinkedQueue 是非阻塞并發隊列,它的 CAS操作,offer 為例:
public boolean offer(E e) {// 非空校驗checkNotNull(e);final Node<E> newNode = new Node<E>(e);// 取得 tail 節點,同時令 p 指向它,然后循環CASfor (Node<E> t = tail, p = t;;) {// 取得 tail 的next節點Node<E> q = p.next;// 拿到 tail 的瞬間可能會有其他線程設置了新的 tail,需要判斷 tail 后面是否為 nullif (q == null) {// 如果 q 為 null,說明 p 確實是 tail,此時將新的節點 CAS 到 tail 的后面if (p.casNext(null, newNode)) {// p 指向新的節點if (p != t) // CAS 設置新的 tailcasTail(t, newNode);return true;}// Lost CAS race to another thread; re-read next}// ... 其他邏輯}}總結
以上是生活随笔為你收集整理的Queue —— JUC 的豪华队列组件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 惠普打印机只打印一半_惠普打印机如何安装
- 下一篇: layui多文件上传讲解_Layui 多