让人抓头的Java并发(四) 阻塞队列--CPU飙升排查案例
在上一篇文章中給大家介紹了牛批的AQS,大致講解了JUC中同步的思路。本來還沒想好這一篇應(yīng)該寫點(diǎn)什么,剛好上周某個(gè)同事的代碼出現(xiàn)問題,排查后發(fā)現(xiàn)是使用阻塞隊(duì)列不當(dāng)導(dǎo)致的,所以本篇決定介紹下阻塞隊(duì)列。
真實(shí)案例分析
錯(cuò)誤案例:
說來也是挺巧的,那天一位同事iMac換了Macbook Pro。然后像往常一樣啟動(dòng)了各個(gè)服務(wù),過了會(huì)電腦風(fēng)扇瘋狂工作發(fā)出響聲,由于平常iMac上IDEA項(xiàng)目開的比較多占用較多內(nèi)存時(shí)間長(zhǎng)了也會(huì)卡頓,所以他并沒有在意。但是之后一直是這樣我們便覺得很奇怪,然后打開了他的活動(dòng)監(jiān)視器,發(fā)現(xiàn)某個(gè)Java進(jìn)程竟然占用了百分之九十的CPU,然后確認(rèn)是哪一個(gè)項(xiàng)目,最后通過jstack查看該項(xiàng)目中的線程情況,定位到了某個(gè)自定義線程,然后查看代碼發(fā)現(xiàn)如下:
MyThreadPool.exportEnclosurePool.execute(() -> {while (true) {BlockingQueue<EnclosureRequest> blockingQueue = requestQueue.getBlockingQueue();while (!blockingQueue.isEmpty()) {System.out.println("開始消費(fèi)");EnclosureRequest one = null;try {one = blockingQueue.take();ossService.exportEnclosureToLocalServer(one.getEnclosureList(), one.getSobId(), one.getUserUuid(), one.getUserName(), one.getTmpFileName(), one.getZipUuidList());} catch (Exception e) {e.printStackTrace();}}} }復(fù)制代碼該同事的需求是做一個(gè)隊(duì)列化附件導(dǎo)出的功能,因此他選擇了生產(chǎn)者消費(fèi)者模式,采用阻塞隊(duì)列來實(shí)現(xiàn);但是由于對(duì)此不太熟悉,所以寫出了這段有問題的代碼,導(dǎo)致死循環(huán);萬幸的是這段代碼在測(cè)試分支上被我們發(fā)現(xiàn)了并沒有上正式。正確的消費(fèi)者代碼實(shí)現(xiàn)如下:
正確實(shí)現(xiàn):
MyThreadPool.exportEnclosurePool.execute(() -> {BlockingQueue<EnclosureRequest> blockingQueue = requestQueue.getBlockingQueue();while (true) {try {EnclosureRequest one = blockingQueue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("開始消費(fèi)");ossService.exportEnclosureToLocalServer(one.getEnclosureList(), one.getSobId(), one.getUserUuid(), one.getUserName(), one.getTmpFileName(), one.getZipUuidList());} } 復(fù)制代碼阻塞隊(duì)列簡(jiǎn)介
阻塞隊(duì)列是一個(gè)插入和移除方法支持附加操作的隊(duì)列;- 支持阻塞的插入方法:當(dāng)阻塞隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不為滿。
- 支持阻塞的移除方法:當(dāng)阻塞隊(duì)列為空時(shí),獲取隊(duì)列元素的線程會(huì)被阻塞直到隊(duì)列不為空。
四種處理方式:
| 拋出異常 | add(e) | remove() |
| 返回boolean值 | offer(e) | poll() |
| 阻塞 | put(e) | take() |
| 超時(shí)退出 | offer(e,time,unit) | poll(time,unit) |
?小提示: 如果是無界阻塞隊(duì)列,隊(duì)列不可能出現(xiàn)滿的情況,所以使用put()方法永遠(yuǎn)不會(huì)被阻塞,使用offer()方法永遠(yuǎn)返回true
Java中的阻塞列隊(duì)介紹
- ArrayBlockingQueue:基于數(shù)組的有界阻塞隊(duì)列,支持配置公平性策略。
- LinkedBlockingQueue:基于鏈表的無界(默認(rèn)Integer.MAX_VALUE)阻塞隊(duì)列,Executors中newFixedThreadPool()和newSingleThreadExecutor()使用的工作隊(duì)列,所以不推薦使用Executors。
- LinkedBlockingDeque:基于鏈表的無界(默認(rèn)Integer.MAX_VALUE)雙向阻塞隊(duì)列
- LinkedTransferQueue:基于鏈表的無界阻塞隊(duì)列,該隊(duì)列提供transfer(e)方法,如果有消費(fèi)者正在等待則直接把元素給消費(fèi)者,否者將元素放在隊(duì)列的tail節(jié)點(diǎn)并阻塞到該元素被消費(fèi)。
- PriorityBlockingQueue:支持優(yōu)先級(jí)排序的無界阻塞隊(duì)列,默認(rèn)情況下采用自然順序升序排序,也可以通過類重寫compareTo()方法來指定元素排序規(guī)則,或者初始化隊(duì)列時(shí)指定構(gòu)造參數(shù)Comparator來排序。
- DelayQueue:使用PriorityQueue實(shí)現(xiàn)的無界延時(shí)阻塞隊(duì)列。
- SynchronousQueue:不存儲(chǔ)元素的阻塞隊(duì)列,每一個(gè)put操作必須阻塞到一個(gè)take操作發(fā)生,否則不能繼續(xù)添加元素。支持配置公平性策略。
阻塞隊(duì)列(LinkedBlockingQueue)實(shí)現(xiàn)原理分析
LinkedBlockingQueue是一個(gè)由成員變量Node組成的單鏈表結(jié)構(gòu),默認(rèn)容量為Integer的最大值,其內(nèi)部還有兩把ReentrantLock鎖putLock、takeLock用于保證插入和刪除的線程安全(其他阻塞隊(duì)列中使用一個(gè)ReentrantLock鎖),兩個(gè)Condition等待隊(duì)列notEmpty、notFull用于存放take()和put()阻塞的線程。這里我簡(jiǎn)單分析下它兩個(gè)比較重要的方法put()和take()。
源碼分析
/*** 由Node節(jié)點(diǎn)組成單鏈表結(jié)構(gòu)*/ static class Node<E> {E item;Node<E> next;Node(E x) { item = x; } } /** 用于移除操作的鎖 */ private final ReentrantLock takeLock = new ReentrantLock();/** 阻塞于take的等待隊(duì)列 */ private final Condition notEmpty = takeLock.newCondition();/** 用于插入操作的鎖 */ private final ReentrantLock putLock = new ReentrantLock();/** 阻塞于put的等待隊(duì)列 */ private final Condition notFull = putLock.newCondition();/*** 不指定容量默認(rèn)是Integer的最大值*/ public LinkedBlockingQueue() {this(Integer.MAX_VALUE); }/*** 阻塞式插入元素(隊(duì)列為滿則阻塞)*/ public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 獲取插入鎖(響應(yīng)中斷)putLock.lockInterruptibly();try {// 如果當(dāng)前隊(duì)列長(zhǎng)度到達(dá)容量上限則當(dāng)前線程釋放鎖加入不為滿等待隊(duì)列中while (count.get() == capacity) {notFull.await();}// 將元素加入隊(duì)尾enqueue(node);// 當(dāng)前隊(duì)列長(zhǎng)度加一(返回值是加一之前)c = count.getAndIncrement();// 如果加入后隊(duì)列長(zhǎng)度小于容量上限則通知不為滿等待隊(duì)列中的線程if (c + 1 < capacity)notFull.signal();} finally {// 釋放鎖putLock.unlock();}// 如果在插入元素之前隊(duì)列為空則通知不為空等待隊(duì)列中的線程if (c == 0)signalNotEmpty(); } /*** 阻塞式移除元素(隊(duì)列為空則阻塞)*/ public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;// 獲取移除鎖(響應(yīng)中斷)takeLock.lockInterruptibly();try {// 如果當(dāng)前隊(duì)列為空則當(dāng)前線程釋放鎖加入不為空等待隊(duì)列while (count.get() == 0) {notEmpty.await();}// 移除隊(duì)頭元素x = dequeue();c = count.getAndDecrement();// 如果移除之后還有元素則通知不為空等待隊(duì)列中的線程if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 如果移除元素之前到達(dá)容量上線則通知不為滿等待隊(duì)列中的線程if (c == capacity)signalNotFull();return x;}復(fù)制代碼圖解分析
需要注意的是put()操作將元素加入隊(duì)列后釋放鎖是在判斷容量是否小于上限通知notFull等待隊(duì)列之后,通知notEmpty隊(duì)列之前需要先獲取takeLock,take()操作同理。
?小提示: LinkedBlockingQueue的put()和take()方法中和其他阻塞隊(duì)列有個(gè)很大的區(qū)別。其他阻塞隊(duì)列每次put()和take()都會(huì)去通知相應(yīng)的等待隊(duì)列,但是LinkedBlockingQueue只有在put前是空的去通知notEmpty,take前是滿的去通知notFull等待隊(duì)列,并且put后未滿去通知notFull等待隊(duì)列,take后未空去通知notEmpty等待隊(duì)列。關(guān)于這點(diǎn)我個(gè)人的理解是由于LinkedBlockingQueue里分讀寫鎖,如果每次take都通知notFull的話,需要另外去獲取putLock產(chǎn)生競(jìng)爭(zhēng);用已經(jīng)獲取putLock的線程去喚醒notFull等待隊(duì)列中線程減少了鎖的競(jìng)爭(zhēng)。其他阻塞隊(duì)列中只有一把鎖,所以通知不需要另外競(jìng)爭(zhēng)鎖。當(dāng)然這只是我個(gè)人的看法而已,希望有了解的小伙伴指教。
總結(jié)
阻塞隊(duì)列在并發(fā)中很重要,前面介紹的線程池中就用到了阻塞隊(duì)列,生產(chǎn)者消費(fèi)者模型也是可以用阻塞隊(duì)列實(shí)現(xiàn),到此已經(jīng)介紹了AQS、阻塞隊(duì)列、線程池,希望你們能關(guān)聯(lián)起來理解加深印象。
往期文章:
- 讓人抓頭的Java并發(fā)(三) 強(qiáng)大的AQS!
- 讓人抓頭的Java并發(fā)(二) 線程池ThreadPoolExecutor分析
- 讓人抓頭的Java并發(fā)(一) 輕松認(rèn)識(shí)多線程
歡迎同樣有感興趣的小伙伴一起探討
轉(zhuǎn)載于:https://juejin.im/post/5d2801066fb9a07ed524cbab
總結(jié)
以上是生活随笔為你收集整理的让人抓头的Java并发(四) 阻塞队列--CPU飙升排查案例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 手动设计简单的Token验证
- 下一篇: Linux下关机、重启