阻塞队列和ArrayBlockingQueue源码解析
什么是阻塞隊列
當隊列中為空時,從隊列總獲取元素的操作將被阻塞,當隊列滿時,向隊列中添加元素的操作將被阻塞。試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,知道其它的線程往隊列中插入新的元素。同樣,試圖往滿的隊列中添加新元素的線程也會被阻塞,直到有其他的線程使隊列重新變的空閑起來。
| 插入方法 | add(e) | offer(e) | put() | offer(e, time, unit) |
| 移除方法 | remove() | poll(e) | take() | poll(time, unit) |
| 檢查方法 | element() | peek() | 無 | 無 |
- 拋出異常:當隊列滿時,再向隊列中插入元素,則會拋出IllegalStateException異常。當隊列空時,再向隊列中獲取元素,則會拋出NoSuchElementException異常。
- 返回特殊值:當隊列滿時,向隊列中添加元素,則返回false,否則返回true。當隊列為空時,向隊列中獲取元素,則返回null,否則返回元素。
- 一直阻塞:當阻塞隊列滿時,如果生產者向隊列中插入元素,則隊列會一直阻塞當前線程,直到隊列可用或響應中斷退出。當阻塞隊列為空時,如果消費者線程向阻塞隊列中獲取數據,則隊列會一直阻塞當前線程,直到隊列空閑或響應中斷退出。
- 超時退出:當隊列滿時,如果生產線程向隊列中添加元素,則隊列會阻塞生產線程一段時間,超過指定的時間則退出返回false。當隊列為空時,消費線程從隊列中移除元素,則隊列會阻塞一段時間,如果超過指定時間退出返回null。
java里的阻塞隊列
LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列,相當于其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。
- transfer:如果當前有消費線程正在獲取元素,transfer則把元素直接傳給消費線程,否則加入到隊列中,知道該元素被消費才返回。
- tryTransfer:如果當前有消費這正在獲取元素,tryTransfer則把元素直接傳給消費線程,否則立即返回false;
LinkedBlockingQueue: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部都可以添加和移除元素,多線程并發時,可以將鎖的競爭最多降到一半。
ArrayBlockingQueue 的源碼解析
ArrayBlockingQueue類的結構如下:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = -817911632652898426L;final Object[] items; //用數據來存儲元素的容器int takeIndex; //下一次讀取或移除的位置(remove、poll、take )int putIndex; //下一次存放元素的位置(add、offer、put)int count; //隊列中元素的總數final ReentrantLock lock; //所有訪問的保護鎖private final Condition notEmpty; //等待獲取元素的條件private final Condition notFull; //等待存放元素的條件略...可以看出ArrayBlockingQueue內部使用final修飾的對象數組來存儲元素,一旦初始化數組,數組的大小就不可改變。使用ReentrantLock鎖來保證鎖競爭,使用Condition來控制插入或獲取元素時,線程是否阻塞。
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;//獲得支持響應中斷的鎖lock.lockInterruptibly();try {//使用while循環來判斷隊列是否已滿,防止假喚醒while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}首先獲得鎖,然后判斷隊列是否已滿,如果已滿則阻塞當前生成線程,直到隊列中空閑時,被喚醒操作。隊列空閑則調用enqueue 插入元素。
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;//把當前元素插入到數組中去items[putIndex] = x;//這里可以看出這個數組是個環形數組if (++putIndex == items.length)putIndex = 0;count++;// 喚醒在notEmpty條件上等待的線程 notEmpty.signal();}把元素插入到隊列中去,可以看出這個隊列中的數組是環形數組結構,這樣每次插入、移除的時候不需要復制移動數組中的元素。
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//獲得可響應中斷鎖lock.lockInterruptibly();try {//使用while循環來判斷隊列是否已滿,防止假喚醒while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}消費者線程從阻塞隊列中獲取元素,如果隊列中元素為空,則阻塞當前的消費者線程直到有數據時才調用dequeue方法獲取元素。否則直接調用dequeue方法獲取元素
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")//獲取元素E x = (E) items[takeIndex];//將當前位置的元素設置為nullitems[takeIndex] = null;//這里可以看出這個數組是個環形數組if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)//修改迭代器參數itrs.elementDequeued();// 喚醒在notFull條件上等待的線程 notFull.signal();return x;}直接從數據中獲取items[takeIndex]的元素,并設置當前位置的元素為null,并設置下一次takeIndex的坐標(++takeIndex),隊列元素總數-1等操作。
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;//獲得不可響應中斷的鎖lock.lock();try {if (count == items.length)return false;else {//enqueue(e);return true;}} finally {lock.unlock();}}首先判斷隊列中的元素是否已滿,如果已滿則直接返回false,否則調用enqueue方法向隊列中插入元素,插入成功返回true。
public E poll() {final ReentrantLock lock = this.lock;//獲得不可響應中斷的鎖lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}}判斷隊列是否為空,如果為空返回null,否則調用dequeue方法返回元素。
public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}首先調用offer方法插入元素,插入成功返回true,否則拋出IllegalStateException異常。
public E remove() {E x = poll();if (x != null)return x;elsethrow new NoSuchElementException();}首先調用poll方法獲取元素,如果不為空則直接返回,否則拋出NoSuchElementException異常。
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);//得到超時的時間long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;//獲得可響應中斷的鎖lock.lockInterruptibly();try {while (count == items.length) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(e);return true;} finally {lock.unlock();}}首先判斷隊列是否已滿,如果已滿再循環判斷超時時間是否超時,超時則直接返回false,否則阻塞該生產線程nanos時間,如果nanos時間之內喚醒則調用enqueue方法插入元素。如果隊列不滿則直接調用enqueue方法插入元素,并返回true。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {//得到超時的時間long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;//獲得可響應中斷的鎖lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}}首先循環判斷隊列是否為空,如果為空再判斷是否超時,超時則返回null。不超時則等待,在nanos時間喚醒則調用dequeue方法獲取元素。
public E element() {E x = peek();if (x != null)return x;elsethrow new NoSuchElementException();}調用peek方法獲取元素,元素不為空則返回,否則拋出NoSuchElementException異常。
public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}}final E itemAt(int i) {return (E) items[i];}調用itemAt方法獲取元素。
其它的阻塞隊列實現原理都類似,都是使用ReentrantLock和Condition來完成并發控制、阻塞的。
????本人簡書blog地址:http://www.jianshu.com/u/1f0067e24ff8
????點擊這里快速進入簡書
總結
以上是生活随笔為你收集整理的阻塞队列和ArrayBlockingQueue源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HashMap 源码解析(JDK1.8)
- 下一篇: JAVA对象在JVM中内存分配