[源码]解析 SynchronousQueue 上界,下界.. 数据保存和数据传递. 堵塞队列. 有无频繁await?...
生活随笔
收集整理的這篇文章主要介紹了
[源码]解析 SynchronousQueue 上界,下界.. 数据保存和数据传递. 堵塞队列. 有无频繁await?...
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
簡析SynchronousQueue。LinkedBlockingQueue(兩個locker,更快),ArrayBlockingQueue(一個locker,讀寫都競爭) 三者都是blockingQueue. 對于blockingQueue的堵塞和非堵塞方法對注記方案: * oppo(oppo手機)是一對,offer和poll不堵塞 * ppt是一對.put和take都堵塞. 解析源碼之前先實戰(zhàn)看下SynchronousQueue.
public?static?void?main(String[]?args)?throws?InterruptedException?{ ??????????final?SynchronousQueue<Long>?workQueue?=?new?SynchronousQueue<Long>(); ????????boolean?offer?=?workQueue.offer(2L);?//?offer不能放入,前面無堵塞線程,本身也不堵塞不會放入到堵塞線程隊列中 ????????System.out.println("main?thread:?offer="?+?offer); ????????Long?poll?=?workQueue.poll();?// 不能放poll出元素為null,前面無堵塞線程,本身也不堵塞不會放入到堵塞線程隊列中 ????????System.out.println("main?thread:?poll="?+?poll); ? ? ? ? ExecutorService?newCachedThreadPool?=?Executors.newFixedThreadPool(4);?// 內(nèi)部源代碼實現(xiàn)是:new?ThreadPoolExecutor(0,Integer.MAX_VALUE,?60L,?TimeUnit.SECONDS,new?SynchronousQueue<Runnable>());? ? ? ? ????????System.out.println("/**=====================以下是隊列是?take類型========*/"); ????????newCachedThreadPool.execute(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????try?{ ????????????????????System.out ????????????????????????????.println("take?thread?1:?begin?take?and?thread?will?be?blocked?by?call??park(await)?method"); ????????????????????Long?take?=?workQueue.take(); ????????????????????System.out ????????????????????????????.println("take?thread?1:???take?suceffull?take?object=" ????????????????????????????????????+?take); ????????????????}?catch?(InterruptedException?e1)?{ ????????????????????//?TODO?Auto-generated?catch?block ????????????????????e1.printStackTrace(); ????????????????} ????????????} ????????}); ????????newCachedThreadPool.execute(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????try?{ ????????????????????System.out ????????????????????????????.println("take?thread?2:?begin?take?and?thread?will?be?blocked?by?call??park(await)?method"); ????????????????????Long?take?=?workQueue.take(); ????????????????????System.out ????????????????????????????.println("take?thread?2:???take?suceffull?take?object=" ????????????????????????????????????+?take); ????????????????}?catch?(InterruptedException?e1)?{ ????????????????????//?TODO?Auto-generated?catch?block ????????????????????e1.printStackTrace(); ????????????????} ????????????} ????????}); ????????Thread.sleep(1000); ?????????poll?=?workQueue.poll(); ????????System.out.println("main?thread:?隊列是take類型.?同類型操作分兩種:1.堵塞take會堵塞,增加隊列中?2.非堵塞操作poll失敗.?Queue接口poll失敗,??poll="?+?poll); ?????????offer?=?workQueue.offer(2L);??//? ????????System.out.println("main?thread:?隊列是take類型.?非同類型無論堵塞不堵塞都成功.?非同類型非堵塞操作Queue接口?offer?成功??"?+?offer); ????????Thread.sleep(2000); ????????long?object?=?123L; ????????System.out.println("put?thead:?begin?put?"?+?object); ????????try?{ ????????????workQueue.put(object); ????????}?catch?(InterruptedException?e)?{ ????????????e.printStackTrace(); ????????} ????????System.out ????????????????.println("put?thead:?隊列是take類型.?非同類型無論堵塞不堵塞都成功.??blockingQueue?堵塞接口?put調(diào)用未堵塞,調(diào)用成功?SynchronousQueue?will?unpark(notify)?the?take?thread?"); ???? ????????System.out.println("/**=====================以下是?Put類型========*/"); ???? ????????System.out.println("/*先堵塞兩個put*/"); ????????newCachedThreadPool.execute(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????long?object?=?123L; ????????????????System.out.println("put?thead?1:?begin?put?"?+?object); ????????????????try?{ ????????????????????workQueue.put(object); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????????System.out ????????????????????????.println("put?thead?1:?finish?put?sucefully?,?SynchronousQueue?will?unpark(notify)?the?take?thread?"); ????????????} ????????}); ????????newCachedThreadPool.execute(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????long?object?=?123L; ????????????????System.out.println("put?thead?2:?begin?put?"?+?object); ????????????????try?{ ????????????????????workQueue.put(object); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????????System.out ????????????????????????.println("put?thead?2:?finish?put?sucefully?,?SynchronousQueue?will?unpark(notify)?the?take?thread?"); ????????????} ????????}); ????????Thread.sleep(2000); ????????System.out.println("/*試圖offer*/"); ?????????offer?=?workQueue.offer(2L);??//? ????????????System.out.println("main?thread:?隊列是put類型.??同類型操作分兩種:1.堵塞put會堵塞,增加隊列中?2.非堵塞操作offer失敗.??非堵塞操作Queue接口?offer?成功??"?+?offer); ???????????? ??????????????poll?=?workQueue.poll(); ????????????System.out.println("main?thread:?隊列是put類型.?非同類型無論堵塞不堵塞都成功.??poll="?+?poll); ???????????? ????Long?take?=?workQueue.take(); ????System.out ????????????.println("main?thread:?隊列是put類型.?非同類型無論堵塞不堵塞都成功.?BlockingQueue堵塞接口take調(diào)用未堵塞,?take?suceffull?take?object=" ????????????????????+?take); ????????newCachedThreadPool.shutdown(); ?????}? ? ?? 輸出: main?thread:?offer=false main?thread:?poll=null /**=====================以下是隊列是?take類型========*/ take?thread?1:?begin?take?and?thread?will?be?blocked?by?call??park(await)?method take?thread?2:?begin?take?and?thread?will?be?blocked?by?call??park(await)?method main?thread:?隊列是take類型.?同類型操作分兩種:1.堵塞take會堵塞,增加隊列中?2.非堵塞操作poll失敗.?Queue接口poll失敗,??poll=null main?thread:?隊列是take類型.?非同類型無論堵塞不堵塞都成功.?非同類型非堵塞操作Queue接口?offer?成功?
依據(jù)上面的代碼能夠簡單總結(jié)SynchronousQueue 的queue特點: 1. queue有三種類型:?空類型,take以及put類型, 分別說明. 1.1. 空類型時, take和put都會被堵塞,?非堵塞offer和poll都不會堵塞. 無法成功操作. 1.2.?隊列是某種類型時, ? ? ??同類型操作:?分兩種:?1.堵塞method會堵塞,增加堵塞隊列中?2.非堵塞method操作失敗 . ? ? ?非同類型:?無論堵塞不堵塞method都成功. 詳細說來: 1.2.1. 隊列是take類型時,?.? 同類型操作:堵塞take操作會堵塞,非堵塞 poll()失敗,? 非同類型:?異類的put和offer可以成功. 1.2.2.?隊列是put類型.,? ? ??? ??? ??同類型操作:堵塞put操作會被堵塞,非堵塞offer操作失敗.? ? ??? ??? ??非同類型:?異類的take,poll會成功2. 關(guān)于隊列長度,界: 一個線程堵塞隊列要么全是take線程,要么全是put線程.?后來的互補的操作將會匹配對頭的線程.使退出隊列. 1. 里面沒有不論什么數(shù)據(jù).調(diào)用offer()無法成功,返回flase,表示填充失敗.不會被堵塞. 2. 先take,被堵塞,直到有一個線程來offer. 兩個不同的互補碰撞發(fā)生匹配完畢(fullfill). 之前的take的線程被喚醒獲得offer的提供的數(shù)據(jù). 再來解析SynchronousQueue 源碼, SynchronousQueue?源碼凝視中關(guān)鍵術(shù)語解析: ?Node類型:?一共分層兩種. 一種是 isDate=true. (相應(yīng)offer , put 函數(shù)) 一種是isDate=false (相應(yīng) take函數(shù)) dual queue:dual的含義就好理解了,由于僅僅有兩類,能夠當isDate=true和isDate=false遇到時會匹配.可翻譯為成雙的,對偶的. 對偶隊列. same mode:?同樣的模式(isDate都=true,或者isDate都=false).比方take產(chǎn)生的Node和前面已經(jīng)放入到隊列中的take動作Node就屬于同一個模式 complementary :互補的.比方先take,放到隊列中.后面來一個offer動作就是complementary (互補).反之亦然. fulfill:?字面英文翻譯,完畢.詳細到算法里的含義是一個動作和之前的complementary(譯為互補)的動作得到匹配. ============= SynchronousQueue以下的一個部分凝視部分翻譯. /* ??*?A?dual?queue?(and?similarly?stack)?is?one?that?at?any?given ?????*?time?either?holds?"data"?--?items?provided?by?put?operations, ?????*?or?"requests"?--?slots?representing?take?operations,?or?is ?????*?empty.?A?call?to?"fulfill"?(i.e.,?a?call?requesting?an?item ?????*?from?a?queue?holding?data?or?vice?versa)?dequeues?a ?????*?complementary?node.??The?most?interesting?feature?of?these ?????*?queues?is?that?any?operation?can?figure?out?which?mode?the ?????*?queue?is?in,?and?act?accordingly?without?needing?locks. 不論什么一個操作 put/take都會產(chǎn)生一個節(jié)點,抓住數(shù)據(jù)(假設(shè)是take,數(shù)據(jù)為null). 一個實現(xiàn)"匹配"的調(diào)用將會將互補的節(jié)點退出隊列. 最有趣的是不論什么一個操作都能指出眼下的隊列處于何種類型(注:隊列里要么全是take線程,要么全是put線程).而且不須要鎖. ? ? ? ?? ??//?以下的凝視比較了java實現(xiàn)的算法和借鑒的算法(見凝視中網(wǎng)址)有何差別 ? ? ??*?The?algorithms?here?differ?from?the?versions?in?the?above?paper ?????*?in?extending?them?for?use?in?synchronous?queues,?as?well?as ?????*?dealing?with?cancellation.?The?main?differences?include:
?????*??1.?The?original?algorithms?used?bit-marked?pointers,?but ?????*?????the?ones?here?use?mode?bits?in?nodes,?leading?to?a?number ?????*?????of?further?adaptations. ?????*??2.?SynchronousQueues?must?block?threads?waiting?to?become ?????*?????fulfilled. 必須等待匹配的線程.?fulfilled-完畢的意思. ?????*??3.?Support?for?cancellation?via?timeout?and?interrupts, ?????*?????including?cleaning?out?cancelled?nodes/threads ?????*?????from?lists?to?avoid?garbage?retention?and?memory?depletion. //可以取消或者中斷 ? ? ?* =============再來看看SynchronousQueue.TransferQueue.transfer以下的凝視.========= /** 當隊列是空,或者是同一種Mode時,直接放入到列隊尾.不會完畢(fullfill) * 2. If queue apparently contains waiting items, and this 我一開始沒理解的一點是: 當一個head是 isDate=false , tail是isDate=true時, 一個線程進來的操作是isDate=false時. 不會進入①,進入②后看代碼又無法和head完畢匹配(fullfill). 后來想明確了,這樣的情況不會發(fā)生.由于tail是isDate=true,這個會與head完畢匹配(fullfill).換句話說. 隊列里tail和head肯定是same mode.所以當①推斷失敗,進入②后, /** ?????????*?Puts?or?takes?an?item. ?????????*/ ????????Object?transfer(Object?e,?boolean?timed,?long?nanos)?{ ????????????/*?Basic?algorithm?is?to?loop?trying?to?take?either?of ?????????????*?two?actions: ?????????????* ?????????????*?1.?If?queue?apparently?empty?or?holding?same-mode?nodes, ?????????????*????try?to?add?node?to?queue?of?waiters,?wait?to?be ?????????????*????fulfilled?(or?cancelled)?and?return?matching?item. ?????????????* ??//?same-mode指的是take還是put.. 假設(shè)空或者是同類型,那么就放入隊列堵塞等待 ?????????????*?2.?If?queue?apparently?contains?waiting?items,?and?this ?????????????*????call?is?of?complementary?mode,?try?to?fulfill?by?CAS'ing ?????????????*????item?field?of?waiting?node?and?dequeuing?it,?and?then ?????????????*????returning?matching?item. ?????????????* ??// 假設(shè)是不同,剛好是互補(complementary)節(jié)點,那么剛好可以匹配(fulfill?) ?????????????*?In?each?case,?along?the?way,?check?for?and?try?to?help ?????????????*?advance?head?and?tail?on?behalf?of?other?stalled/slow ?????????????*?threads. ?????????????* ?????????????*?The?loop?starts?off?with?a?null?check?guarding?against ?????????????*?seeing?uninitialized?head?or?tail?values.?This?never ?????????????*?happens?in?current?SynchronousQueue,?but?could?if ?????????????*?callers?held?non-volatile/final?ref?to?the ?????????????*?transferer.?The?check?is?here?anyway?because?it?places ?????????????*?null?checks?at?top?of?loop,?which?is?usually?faster ?????????????*?than?having?them?implicitly?interspersed. ?????????????*/ ????????????QNode?s?=?null;?//?constructed/reused?as?needed ????????????boolean?isData?=?(e?!=?null); ????????????for?(;;)?{ ????????????????QNode?t?=?tail; ????????????????QNode?h?=?head; ????????????????if?(t?==?null?||?h?==?null)?????????//?saw?uninitialized?value ????????????????????continue;???????????????????????//?spin ????????????????if?(h?==?t?||?t.isData?==?isData)?{?//?empty?or?same-mode?隊列里假設(shè)都是take線程.此線程還是take調(diào)用,進入該分支 ????????????????????QNode?tn?=?t.next; ????????????????????if?(t?!=?tail)??????????????????//?inconsistent?read ????????????????????????continue; ????????????????????if?(tn?!=?null)?{???????????????//?lagging?tail ????????????????????????advanceTail(t,?tn); ????????????????????????continue; ????????????????????} ????????????????????if?(timed?&&?nanos?<=?0)????????//?can't?wait ????????????????????????return?null; ????????????????????if?(s?==?null) ????????????????????????s?=?new?QNode(e,?isData); ????????????????????if?(!t.casNext(null,?s))????????//?failed?to?link?in ????????????????????????continue; ????????????????????advanceTail(t,?s);??????????????//?swing?tail?and?wait ????????????????????Object?x?=?awaitFulfill(s,?e,?timed,?nanos); ????????????????????if?(x?==?s)?{???????????????????//?wait?was?cancelled ????????????????????????clean(t,?s); ????????????????????????return?null; ????????????????????} ????????????????????if?(!s.isOffList())?{???????????//?not?already?unlinked ????????????????????????advanceHead(t,?s);??????????//?unlink?if?head ????????????????????????if?(x?!=?null)??????????????//?and?forget?fields ????????????????????????????s.item?=?s; ????????????????????????s.waiter?=?null; ????????????????????} ????????????????????return?(x?!=?null)?
0.堵塞有幾種?
LinkedBlockingQueue,ArrayBlockingQueue: 有數(shù)據(jù)隊列和堵塞線程隊列 1. 數(shù)據(jù)隊列有最大長度,有界,默認是Integer.Max;? 2. 數(shù)據(jù)隊列達到上界,下界時,對對應(yīng)的堵塞方法有影響. LinkedBlockingQueue 兩個locker,更復(fù)雜.?ArrayBlockingQueue一個locker,兩個Condition. SynchronousQueue : 最重要的差別:? 外在: 一個有容量>=1,依賴緩沖區(qū)線程之間交換數(shù)據(jù). 一個無容量須要線程時時產(chǎn)生數(shù)據(jù)節(jié)點來接受傳遞數(shù)據(jù). 內(nèi)部:數(shù)據(jù)讀取匹配的方式不同:?BlockingQueue是與已存放在隊列上的數(shù)據(jù)配對,?SynchronousQueue是與已堵塞的線程配對(一個線程id相應(yīng)著一個數(shù)據(jù),這是?SynchronousQueue特有的特點) 對于LinkedBlockingQueue。ArrayBlockingQueue,有數(shù)據(jù)隊列,也有線程堵塞隊列?.??數(shù)據(jù)配對即可 ? ??? ? 對于SynchronousQueue?,無數(shù)據(jù)隊列,僅僅有線程堵塞隊列/stack.?與已堵塞的線程配對即可. 要理解上面這句話.這幾個問題思考下. 1.?LinkedBlockingQueue 和?SynchronousQueue是否有界? ?前者有上界,下界>=1. 后者上界,下界都是0.(無緩沖層就無法傳遞數(shù)據(jù),將數(shù)據(jù)巧妙地保存在了由線程調(diào)用時產(chǎn)生的節(jié)點上(和線程同生共死).) 2.?LinkedBlockingQueue?和?SynchronousQueue 的有幾種堵塞線程? 其各自的隊列數(shù)據(jù)結(jié)構(gòu)有何不同? ? ? 總結(jié): 前者堵塞隊列上無數(shù)據(jù),后者堵塞隊列上有數(shù)據(jù)和操作類型.兩者在堵塞時都利用堆棧的局部變量來臨時保存數(shù)據(jù)和傳遞數(shù)據(jù). 前者的利用排它鎖,堵塞數(shù)據(jù)隊列上無數(shù)據(jù), AbstractQueueSyncronizer( 可能是公平也可能是非公平鎖,插入瞬間非公平 ), 堵塞時將數(shù)據(jù)保存在內(nèi)存堆棧局部變量上,每次獲得鎖后將數(shù)據(jù)傳遞給數(shù)據(jù)隊列,? ? ? ?后者獲得匹配后,綜合匹配的線程數(shù)據(jù),返回非null數(shù)據(jù). 并改動匹配線程的數(shù)據(jù)且喚醒被匹配的堵塞線程.被匹配的堵塞線程依據(jù)其堵塞隊列上的新數(shù)據(jù)和原線程內(nèi)存堆棧上的局部變量數(shù)據(jù)(重點,難點)綜合返回非null數(shù)據(jù).?詳細見源碼凝視.
第二個問題引出的另外一個話題是因為SynchronousQueue?已經(jīng)沒有了數(shù)據(jù)隊列緩沖區(qū),導(dǎo)致SynchronousQueue 中繼承Queue的put方法的語義都量變到質(zhì)變了. ? ? ? BlockingQueue 繼承自Queue的put ? ? ? ? javaDoc : ??Inserts the specified element into this queue, waiting if necessary forspaceto become available.
? ? ? SynchronousQueue??繼承自Queue的put javaDoc : ??Adds the specified element to this queue, waiting if necessary foranother thread to receive it. 數(shù)據(jù)存放不同: 因為?LinkedBlockingQueue。ArrayBlockingQueue代碼實現(xiàn)上通過數(shù)據(jù)隊列轉(zhuǎn)發(fā)數(shù)據(jù)的. 故這兩者不能設(shè)置queye長度的最大值為0. 兩者通過堆內(nèi)存?zhèn)鬟f,notifyAll堵塞線程.?假設(shè)數(shù)據(jù)隊列是0,數(shù)據(jù)就無法傳遞了. SynchronousQueue 無數(shù)據(jù)隊列,那么數(shù)據(jù)怎樣傳輸呢??代碼實現(xiàn)上其數(shù)據(jù)是直接通過堆內(nèi)存直接傳遞給堵塞線程. 線程1被堵塞將數(shù)據(jù)同一時候存放在線程堆棧上的局部變量以及和線程id綁定的隊列節(jié)點(是field屬性,狀態(tài))上.?線程2來匹配,會改變原堵塞線程的堆內(nèi)存的值,使得原堵塞線程可以獲取兩份數(shù)據(jù).這兩份數(shù)據(jù)中肯定有份是生產(chǎn)者提供的數(shù)據(jù),一份是消費者偽造的假數(shù)據(jù),通過標示為推斷終于得到生產(chǎn)者得到的數(shù)據(jù). 對于SynchronousQueue? ?最大值是0,也沒有其它線程生成數(shù)據(jù)節(jié)點,put時無法存放數(shù)據(jù),?讓put一開始就進入了堵塞.? 對于LinkedBlockingQueue,ArrayBlockingQueue, put僅僅有在數(shù)據(jù)隊列滿了才會堵塞.
應(yīng)用場景: 以Executors.newCachedThreadPool()為例,CachedThreadPool特點: 有任務(wù)時可以無限制生成線程,無任務(wù)時也可以高速回收線程. 用線程不斷生成替代了緩沖隊列.?該javadoc上,明白說明了適合大批量的小任務(wù). 不適合一下子大量,一下子又無數(shù)據(jù). 不太適合生產(chǎn)者生產(chǎn)速率動蕩變化,每一個任務(wù)都非常長的場景. </pre><pre name="code" class="java">public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 利用了SynchronousQueue.offer和take的調(diào)用配合實現(xiàn)了CachedThreadPool();相比blockingQueue長處:?Syncronize一個堵塞隊列.+ 死循環(huán)cas(compare and swap),不會頻繁的線程掛起和喚醒(park和unpark) ? ?和new ThreadPoolExecutor時設(shè)置coreSize=0,linkedblockingQueue 容量=1 的差別是后者維護一個size=1的堵塞隊列,隊列常常在滿和空之間切換,須要頻繁的線程掛起和喚醒(park和unpark) ? ? ?? 強烈建議打開eclipse相應(yīng)的源碼,.jdk1.6 里的src.zip 源碼包1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { 5 if (runState == RUNNING && workQueue.offer(command)) { 6 if (runState != RUNNING || poolSize == 0) 7 ensureQueuedTaskHandled(command); 8 } 9 else if (!addIfUnderMaximumPoolSize(command)) 10 reject(command); // is shutdown or saturated 11 } 12 }
? ?剛開始execute(runable),queue.offer()失敗,產(chǎn)生新的線程運行任務(wù).一直不停的offer,產(chǎn)生新的線程.到后面老的線程運行完成會調(diào)用take()第一次execute(runable),第5行queue.offer()失敗,進入第9行產(chǎn)生max配額的新線程運行任務(wù)runnable.后面不停的execute(),一直不停的offer失敗,產(chǎn)生max配額的新線程去運行runnable.直到第一次的線程運行任務(wù)完成后會調(diào)用SynchronousQueue.take(). 這樣假設(shè)再有execute(),第5行就能匹配成功新的offer就能配對成功,runnable實例被老的線程獲取運行,不會去新建線程.這個實現(xiàn)了動態(tài)的線程池.所以java才說適合大批量的小的異步任務(wù)(These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.) Executors.固定大小的線程的優(yōu)點是.線程資源是有限的,每一個線程512k?-Xss???? 每一個線程的Stack大小,默認堆棧.避免有些無限制增加線程池的問題.沒有提供可配置的BlockingQueue容量大小.
public?static?void?main(String[]?args)?throws?InterruptedException?{ ??????????final?SynchronousQueue<Long>?workQueue?=?new?SynchronousQueue<Long>(); ????????boolean?offer?=?workQueue.offer(2L);?//?offer不能放入,前面無堵塞線程,本身也不堵塞不會放入到堵塞線程隊列中 ????????System.out.println("main?thread:?offer="?+?offer); ????????Long?poll?=?workQueue.poll();?// 不能放poll出元素為null,前面無堵塞線程,本身也不堵塞不會放入到堵塞線程隊列中 ????????System.out.println("main?thread:?poll="?+?poll); ? ? ? ? ExecutorService?newCachedThreadPool?=?Executors.newFixedThreadPool(4);?// 內(nèi)部源代碼實現(xiàn)是:new?ThreadPoolExecutor(0,Integer.MAX_VALUE,?60L,?TimeUnit.SECONDS,new?SynchronousQueue<Runnable>());? ? ? ? ????????System.out.println("/**=====================以下是隊列是?take類型========*/"); ????????newCachedThreadPool.execute(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????try?{ ????????????????????System.out ????????????????????????????.println("take?thread?1:?begin?take?and?thread?will?be?blocked?by?call??park(await)?method"); ????????????????????Long?take?=?workQueue.take(); ????????????????????System.out ????????????????????????????.println("take?thread?1:???take?suceffull?take?object=" ????????????????????????????????????+?take); ????????????????}?catch?(InterruptedException?e1)?{ ????????????????????//?TODO?Auto-generated?catch?block ????????????????????e1.printStackTrace(); ????????????????} ????????????} ????????}); ????????newCachedThreadPool.execute(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????try?{ ????????????????????System.out ????????????????????????????.println("take?thread?2:?begin?take?and?thread?will?be?blocked?by?call??park(await)?method"); ????????????????????Long?take?=?workQueue.take(); ????????????????????System.out ????????????????????????????.println("take?thread?2:???take?suceffull?take?object=" ????????????????????????????????????+?take); ????????????????}?catch?(InterruptedException?e1)?{ ????????????????????//?TODO?Auto-generated?catch?block ????????????????????e1.printStackTrace(); ????????????????} ????????????} ????????}); ????????Thread.sleep(1000); ?????????poll?=?workQueue.poll(); ????????System.out.println("main?thread:?隊列是take類型.?同類型操作分兩種:1.堵塞take會堵塞,增加隊列中?2.非堵塞操作poll失敗.?Queue接口poll失敗,??poll="?+?poll); ?????????offer?=?workQueue.offer(2L);??//? ????????System.out.println("main?thread:?隊列是take類型.?非同類型無論堵塞不堵塞都成功.?非同類型非堵塞操作Queue接口?offer?成功??"?+?offer); ????????Thread.sleep(2000); ????????long?object?=?123L; ????????System.out.println("put?thead:?begin?put?"?+?object); ????????try?{ ????????????workQueue.put(object); ????????}?catch?(InterruptedException?e)?{ ????????????e.printStackTrace(); ????????} ????????System.out ????????????????.println("put?thead:?隊列是take類型.?非同類型無論堵塞不堵塞都成功.??blockingQueue?堵塞接口?put調(diào)用未堵塞,調(diào)用成功?SynchronousQueue?will?unpark(notify)?the?take?thread?"); ???? ????????System.out.println("/**=====================以下是?Put類型========*/"); ???? ????????System.out.println("/*先堵塞兩個put*/"); ????????newCachedThreadPool.execute(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????long?object?=?123L; ????????????????System.out.println("put?thead?1:?begin?put?"?+?object); ????????????????try?{ ????????????????????workQueue.put(object); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????????System.out ????????????????????????.println("put?thead?1:?finish?put?sucefully?,?SynchronousQueue?will?unpark(notify)?the?take?thread?"); ????????????} ????????}); ????????newCachedThreadPool.execute(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????long?object?=?123L; ????????????????System.out.println("put?thead?2:?begin?put?"?+?object); ????????????????try?{ ????????????????????workQueue.put(object); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????????System.out ????????????????????????.println("put?thead?2:?finish?put?sucefully?,?SynchronousQueue?will?unpark(notify)?the?take?thread?"); ????????????} ????????}); ????????Thread.sleep(2000); ????????System.out.println("/*試圖offer*/"); ?????????offer?=?workQueue.offer(2L);??//? ????????????System.out.println("main?thread:?隊列是put類型.??同類型操作分兩種:1.堵塞put會堵塞,增加隊列中?2.非堵塞操作offer失敗.??非堵塞操作Queue接口?offer?成功??"?+?offer); ???????????? ??????????????poll?=?workQueue.poll(); ????????????System.out.println("main?thread:?隊列是put類型.?非同類型無論堵塞不堵塞都成功.??poll="?+?poll); ???????????? ????Long?take?=?workQueue.take(); ????System.out ????????????.println("main?thread:?隊列是put類型.?非同類型無論堵塞不堵塞都成功.?BlockingQueue堵塞接口take調(diào)用未堵塞,?take?suceffull?take?object=" ????????????????????+?take); ????????newCachedThreadPool.shutdown(); ?????}? ? ?? 輸出: main?thread:?offer=false main?thread:?poll=null /**=====================以下是隊列是?take類型========*/ take?thread?1:?begin?take?and?thread?will?be?blocked?by?call??park(await)?method take?thread?2:?begin?take?and?thread?will?be?blocked?by?call??park(await)?method main?thread:?隊列是take類型.?同類型操作分兩種:1.堵塞take會堵塞,增加隊列中?2.非堵塞操作poll失敗.?Queue接口poll失敗,??poll=null main?thread:?隊列是take類型.?非同類型無論堵塞不堵塞都成功.?非同類型非堵塞操作Queue接口?offer?成功?
?true
take?thread?2:???take?suceffull?take?object=2 put?thead:?begin?put?123 take?thread?1:???take?suceffull?take?object=123 put?thead:?隊列是take類型.?非同類型無論堵塞不堵塞都成功.??blockingQueue?堵塞接口?put調(diào)用未堵塞,調(diào)用成功?SynchronousQueue?will?unpark(notify)?the?take?thread? /**=====================以下是?Put類型========*/ /*先堵塞兩個put*/ put?thead?1:?begin?put?123 put?thead?2:?begin?put?123 /*試圖offer*/ main?thread:?隊列是put類型.??同類型操作分兩種:1.堵塞put會堵塞,增加隊列中?2.非堵塞操作offer失敗.??非堵塞操作Queue接口?offer?成功??false main?thread:?隊列是put類型.?非同類型無論堵塞不堵塞都成功.??poll=123 main?thread:?隊列是put類型.?非同類型無論堵塞不堵塞都成功.?BlockingQueue堵塞接口take調(diào)用未堵塞,?take?suceffull?take?object=123 put?thead?2:?finish?put?sucefully?,?SynchronousQueue?will?unpark(notify)?the?take?thread? put?thead?1:?finish?put?sucefully?,?SynchronousQueue?will?unpark(notify)?the?take?thread?依據(jù)上面的代碼能夠簡單總結(jié)SynchronousQueue 的queue特點: 1. queue有三種類型:?空類型,take以及put類型, 分別說明. 1.1. 空類型時, take和put都會被堵塞,?非堵塞offer和poll都不會堵塞. 無法成功操作. 1.2.?隊列是某種類型時, ? ? ??同類型操作:?分兩種:?1.堵塞method會堵塞,增加堵塞隊列中?2.非堵塞method操作失敗 . ? ? ?非同類型:?無論堵塞不堵塞method都成功. 詳細說來: 1.2.1. 隊列是take類型時,?.? 同類型操作:堵塞take操作會堵塞,非堵塞 poll()失敗,? 非同類型:?異類的put和offer可以成功. 1.2.2.?隊列是put類型.,? ? ??? ??? ??同類型操作:堵塞put操作會被堵塞,非堵塞offer操作失敗.? ? ??? ??? ??非同類型:?異類的take,poll會成功2. 關(guān)于隊列長度,界: 一個線程堵塞隊列要么全是take線程,要么全是put線程.?后來的互補的操作將會匹配對頭的線程.使退出隊列. 1. 里面沒有不論什么數(shù)據(jù).調(diào)用offer()無法成功,返回flase,表示填充失敗.不會被堵塞. 2. 先take,被堵塞,直到有一個線程來offer. 兩個不同的互補碰撞發(fā)生匹配完畢(fullfill). 之前的take的線程被喚醒獲得offer的提供的數(shù)據(jù). 再來解析SynchronousQueue 源碼, SynchronousQueue?源碼凝視中關(guān)鍵術(shù)語解析: ?Node類型:?一共分層兩種. 一種是 isDate=true. (相應(yīng)offer , put 函數(shù)) 一種是isDate=false (相應(yīng) take函數(shù)) dual queue:dual的含義就好理解了,由于僅僅有兩類,能夠當isDate=true和isDate=false遇到時會匹配.可翻譯為成雙的,對偶的. 對偶隊列. same mode:?同樣的模式(isDate都=true,或者isDate都=false).比方take產(chǎn)生的Node和前面已經(jīng)放入到隊列中的take動作Node就屬于同一個模式 complementary :互補的.比方先take,放到隊列中.后面來一個offer動作就是complementary (互補).反之亦然. fulfill:?字面英文翻譯,完畢.詳細到算法里的含義是一個動作和之前的complementary(譯為互補)的動作得到匹配. ============= SynchronousQueue以下的一個部分凝視部分翻譯. /* ??*?A?dual?queue?(and?similarly?stack)?is?one?that?at?any?given ?????*?time?either?holds?"data"?--?items?provided?by?put?operations, ?????*?or?"requests"?--?slots?representing?take?operations,?or?is ?????*?empty.?A?call?to?"fulfill"?(i.e.,?a?call?requesting?an?item ?????*?from?a?queue?holding?data?or?vice?versa)?dequeues?a ?????*?complementary?node.??The?most?interesting?feature?of?these ?????*?queues?is?that?any?operation?can?figure?out?which?mode?the ?????*?queue?is?in,?and?act?accordingly?without?needing?locks. 不論什么一個操作 put/take都會產(chǎn)生一個節(jié)點,抓住數(shù)據(jù)(假設(shè)是take,數(shù)據(jù)為null). 一個實現(xiàn)"匹配"的調(diào)用將會將互補的節(jié)點退出隊列. 最有趣的是不論什么一個操作都能指出眼下的隊列處于何種類型(注:隊列里要么全是take線程,要么全是put線程).而且不須要鎖. ? ? ? ?? ??//?以下的凝視比較了java實現(xiàn)的算法和借鑒的算法(見凝視中網(wǎng)址)有何差別 ? ? ??*?The?algorithms?here?differ?from?the?versions?in?the?above?paper ?????*?in?extending?them?for?use?in?synchronous?queues,?as?well?as ?????*?dealing?with?cancellation.?The?main?differences?include:
?????*??1.?The?original?algorithms?used?bit-marked?pointers,?but ?????*?????the?ones?here?use?mode?bits?in?nodes,?leading?to?a?number ?????*?????of?further?adaptations. ?????*??2.?SynchronousQueues?must?block?threads?waiting?to?become ?????*?????fulfilled. 必須等待匹配的線程.?fulfilled-完畢的意思. ?????*??3.?Support?for?cancellation?via?timeout?and?interrupts, ?????*?????including?cleaning?out?cancelled?nodes/threads ?????*?????from?lists?to?avoid?garbage?retention?and?memory?depletion. //可以取消或者中斷 ? ? ?* =============再來看看SynchronousQueue.TransferQueue.transfer以下的凝視.========= /** 當隊列是空,或者是同一種Mode時,直接放入到列隊尾.不會完畢(fullfill) * 2. If queue apparently contains waiting items, and this 我一開始沒理解的一點是: 當一個head是 isDate=false , tail是isDate=true時, 一個線程進來的操作是isDate=false時. 不會進入①,進入②后看代碼又無法和head完畢匹配(fullfill). 后來想明確了,這樣的情況不會發(fā)生.由于tail是isDate=true,這個會與head完畢匹配(fullfill).換句話說. 隊列里tail和head肯定是same mode.所以當①推斷失敗,進入②后, /** ?????????*?Puts?or?takes?an?item. ?????????*/ ????????Object?transfer(Object?e,?boolean?timed,?long?nanos)?{ ????????????/*?Basic?algorithm?is?to?loop?trying?to?take?either?of ?????????????*?two?actions: ?????????????* ?????????????*?1.?If?queue?apparently?empty?or?holding?same-mode?nodes, ?????????????*????try?to?add?node?to?queue?of?waiters,?wait?to?be ?????????????*????fulfilled?(or?cancelled)?and?return?matching?item. ?????????????* ??//?same-mode指的是take還是put.. 假設(shè)空或者是同類型,那么就放入隊列堵塞等待 ?????????????*?2.?If?queue?apparently?contains?waiting?items,?and?this ?????????????*????call?is?of?complementary?mode,?try?to?fulfill?by?CAS'ing ?????????????*????item?field?of?waiting?node?and?dequeuing?it,?and?then ?????????????*????returning?matching?item. ?????????????* ??// 假設(shè)是不同,剛好是互補(complementary)節(jié)點,那么剛好可以匹配(fulfill?) ?????????????*?In?each?case,?along?the?way,?check?for?and?try?to?help ?????????????*?advance?head?and?tail?on?behalf?of?other?stalled/slow ?????????????*?threads. ?????????????* ?????????????*?The?loop?starts?off?with?a?null?check?guarding?against ?????????????*?seeing?uninitialized?head?or?tail?values.?This?never ?????????????*?happens?in?current?SynchronousQueue,?but?could?if ?????????????*?callers?held?non-volatile/final?ref?to?the ?????????????*?transferer.?The?check?is?here?anyway?because?it?places ?????????????*?null?checks?at?top?of?loop,?which?is?usually?faster ?????????????*?than?having?them?implicitly?interspersed. ?????????????*/ ????????????QNode?s?=?null;?//?constructed/reused?as?needed ????????????boolean?isData?=?(e?!=?null); ????????????for?(;;)?{ ????????????????QNode?t?=?tail; ????????????????QNode?h?=?head; ????????????????if?(t?==?null?||?h?==?null)?????????//?saw?uninitialized?value ????????????????????continue;???????????????????????//?spin ????????????????if?(h?==?t?||?t.isData?==?isData)?{?//?empty?or?same-mode?隊列里假設(shè)都是take線程.此線程還是take調(diào)用,進入該分支 ????????????????????QNode?tn?=?t.next; ????????????????????if?(t?!=?tail)??????????????????//?inconsistent?read ????????????????????????continue; ????????????????????if?(tn?!=?null)?{???????????????//?lagging?tail ????????????????????????advanceTail(t,?tn); ????????????????????????continue; ????????????????????} ????????????????????if?(timed?&&?nanos?<=?0)????????//?can't?wait ????????????????????????return?null; ????????????????????if?(s?==?null) ????????????????????????s?=?new?QNode(e,?isData); ????????????????????if?(!t.casNext(null,?s))????????//?failed?to?link?in ????????????????????????continue; ????????????????????advanceTail(t,?s);??????????????//?swing?tail?and?wait ????????????????????Object?x?=?awaitFulfill(s,?e,?timed,?nanos); ????????????????????if?(x?==?s)?{???????????????????//?wait?was?cancelled ????????????????????????clean(t,?s); ????????????????????????return?null; ????????????????????} ????????????????????if?(!s.isOffList())?{???????????//?not?already?unlinked ????????????????????????advanceHead(t,?s);??????????//?unlink?if?head ????????????????????????if?(x?!=?null)??????????????//?and?forget?fields ????????????????????????????s.item?=?s; ????????????????????????s.waiter?=?null; ????????????????????} ????????????????????return?(x?!=?null)?
?x?:?e;
????????????????}?else?{????????????????????????????//?complementary-mode ????????????????????QNode?m?=?h.next;???????????????//?node?to?fulfill//進入該分支,由于整個隊列的節(jié)點類型都一樣,肯定能和head完畢匹配(fulfill) ????????????????????if?(t?!=?tail?||?m?==?null?||?h?!=?head) ????????????????????????continue;???????????????????//?inconsistent?read ????????????????????Object?x?=?m.item; ????????????????????if?(isData?==?(x?!=?null)?||????//?m?already?fulfilled ????????????????????????x?==?m?||???????????????????//?m?cancelled ????????????????????????!m.casItem(x,?e))?{?????????//?lost?CAS??//這一步非常非常重要,成功把被匹配線程的數(shù)據(jù)節(jié)點改成了自己的節(jié)點,實現(xiàn)了傳輸數(shù)據(jù) ????????????????????????advanceHead(h,?m);??????????//?dequeue?and?retry? ????????????????????????continue; ????????????????????} ????????????????????advanceHead(h,?m);??????????????//?successfully?fulfilled?// 完畢匹配,將被匹配的線程退出原隊列 ????????????????????LockSupport.unpark(m.waiter); ??// 喚醒被匹配的線程 ????????????????????return?(x?!=?null)??x?:?e;
????????????????} ????????????} ????????}0.堵塞有幾種?
?1. lock獲取不到鎖堵塞. 2. 獲取到鎖可是await堵塞.然后又釋放鎖(見 <[源代碼]Condition的原理,簡單案例(ArrayBlockingQueue),復(fù)雜案例(LinkedBlockingQueue).>) 1.?LinkedBlockingQueue 和?SynchronousQueue是否有界,上界,下界范圍是多少?? 2.?LinkedBlockingQueue?和?SynchronousQueue 的有幾種堵塞線程? 前者一個線程堵塞隊列(封裝在reentrantLock內(nèi),使用者不知),一個條件隊列(封裝在ConditionObject內(nèi),使用者不知). 條件隊列signal后把線程會放到堵塞隊列里,見wiz<Condition的原理,簡單案例(ArrayBlockingQueue),復(fù)雜案例(LinkedBlockingQueue).> . 后者僅僅有一個線程堵塞隊列. 其各自的堵塞隊列數(shù)據(jù)結(jié)構(gòu)有何不同?前者無數(shù)據(jù),后者有數(shù)據(jù).? 2.?LinkedBlockingQueue?和?SynchronousQueue怎樣在線程之間傳遞數(shù)據(jù)?
前者獲取鎖后放置到數(shù)據(jù)隊列中,然后unpark鎖.后者將數(shù)據(jù)傳遞給線程節(jié)點上的數(shù)據(jù)引用.使讀線程解鎖后能讀取到.盡管SynchronousQueue沒有了數(shù)據(jù)隊列,用每一個線程持有一個數(shù)據(jù)替代了.
3.?LinkedBlockingQueue.先offer,再poll 和?SynchronousQueue?先offer,再poll有何差別??? 4.?LinkedBlockingQueue 先put,再take?和?SynchronousQueue?先put,再take有何差別? 前者能夠先放在數(shù)據(jù)隊列上.后者沒有地方來接受他的數(shù)據(jù),必須等待到有一個take線程產(chǎn)生的數(shù)據(jù)節(jié)點來接受數(shù)據(jù).LinkedBlockingQueue,ArrayBlockingQueue: 有數(shù)據(jù)隊列和堵塞線程隊列 1. 數(shù)據(jù)隊列有最大長度,有界,默認是Integer.Max;? 2. 數(shù)據(jù)隊列達到上界,下界時,對對應(yīng)的堵塞方法有影響. LinkedBlockingQueue 兩個locker,更復(fù)雜.?ArrayBlockingQueue一個locker,兩個Condition. SynchronousQueue : 最重要的差別:? 外在: 一個有容量>=1,依賴緩沖區(qū)線程之間交換數(shù)據(jù). 一個無容量須要線程時時產(chǎn)生數(shù)據(jù)節(jié)點來接受傳遞數(shù)據(jù). 內(nèi)部:數(shù)據(jù)讀取匹配的方式不同:?BlockingQueue是與已存放在隊列上的數(shù)據(jù)配對,?SynchronousQueue是與已堵塞的線程配對(一個線程id相應(yīng)著一個數(shù)據(jù),這是?SynchronousQueue特有的特點) 對于LinkedBlockingQueue。ArrayBlockingQueue,有數(shù)據(jù)隊列,也有線程堵塞隊列?.??數(shù)據(jù)配對即可 ? ??? ? 對于SynchronousQueue?,無數(shù)據(jù)隊列,僅僅有線程堵塞隊列/stack.?與已堵塞的線程配對即可. 要理解上面這句話.這幾個問題思考下. 1.?LinkedBlockingQueue 和?SynchronousQueue是否有界? ?前者有上界,下界>=1. 后者上界,下界都是0.(無緩沖層就無法傳遞數(shù)據(jù),將數(shù)據(jù)巧妙地保存在了由線程調(diào)用時產(chǎn)生的節(jié)點上(和線程同生共死).) 2.?LinkedBlockingQueue?和?SynchronousQueue 的有幾種堵塞線程? 其各自的隊列數(shù)據(jù)結(jié)構(gòu)有何不同? ? ? 總結(jié): 前者堵塞隊列上無數(shù)據(jù),后者堵塞隊列上有數(shù)據(jù)和操作類型.兩者在堵塞時都利用堆棧的局部變量來臨時保存數(shù)據(jù)和傳遞數(shù)據(jù). 前者的利用排它鎖,堵塞數(shù)據(jù)隊列上無數(shù)據(jù), AbstractQueueSyncronizer( 可能是公平也可能是非公平鎖,插入瞬間非公平 ), 堵塞時將數(shù)據(jù)保存在內(nèi)存堆棧局部變量上,每次獲得鎖后將數(shù)據(jù)傳遞給數(shù)據(jù)隊列,? ? ? ?后者獲得匹配后,綜合匹配的線程數(shù)據(jù),返回非null數(shù)據(jù). 并改動匹配線程的數(shù)據(jù)且喚醒被匹配的堵塞線程.被匹配的堵塞線程依據(jù)其堵塞隊列上的新數(shù)據(jù)和原線程內(nèi)存堆棧上的局部變量數(shù)據(jù)(重點,難點)綜合返回非null數(shù)據(jù).?詳細見源碼凝視.
第二個問題引出的另外一個話題是因為SynchronousQueue?已經(jīng)沒有了數(shù)據(jù)隊列緩沖區(qū),導(dǎo)致SynchronousQueue 中繼承Queue的put方法的語義都量變到質(zhì)變了. ? ? ? BlockingQueue 繼承自Queue的put ? ? ? ? javaDoc : ??Inserts the specified element into this queue, waiting if necessary forspaceto become available.
? ? ? SynchronousQueue??繼承自Queue的put javaDoc : ??Adds the specified element to this queue, waiting if necessary foranother thread to receive it. 數(shù)據(jù)存放不同: 因為?LinkedBlockingQueue。ArrayBlockingQueue代碼實現(xiàn)上通過數(shù)據(jù)隊列轉(zhuǎn)發(fā)數(shù)據(jù)的. 故這兩者不能設(shè)置queye長度的最大值為0. 兩者通過堆內(nèi)存?zhèn)鬟f,notifyAll堵塞線程.?假設(shè)數(shù)據(jù)隊列是0,數(shù)據(jù)就無法傳遞了. SynchronousQueue 無數(shù)據(jù)隊列,那么數(shù)據(jù)怎樣傳輸呢??代碼實現(xiàn)上其數(shù)據(jù)是直接通過堆內(nèi)存直接傳遞給堵塞線程. 線程1被堵塞將數(shù)據(jù)同一時候存放在線程堆棧上的局部變量以及和線程id綁定的隊列節(jié)點(是field屬性,狀態(tài))上.?線程2來匹配,會改變原堵塞線程的堆內(nèi)存的值,使得原堵塞線程可以獲取兩份數(shù)據(jù).這兩份數(shù)據(jù)中肯定有份是生產(chǎn)者提供的數(shù)據(jù),一份是消費者偽造的假數(shù)據(jù),通過標示為推斷終于得到生產(chǎn)者得到的數(shù)據(jù). 對于SynchronousQueue? ?最大值是0,也沒有其它線程生成數(shù)據(jù)節(jié)點,put時無法存放數(shù)據(jù),?讓put一開始就進入了堵塞.? 對于LinkedBlockingQueue,ArrayBlockingQueue, put僅僅有在數(shù)據(jù)隊列滿了才會堵塞.
應(yīng)用場景: 以Executors.newCachedThreadPool()為例,CachedThreadPool特點: 有任務(wù)時可以無限制生成線程,無任務(wù)時也可以高速回收線程. 用線程不斷生成替代了緩沖隊列.?該javadoc上,明白說明了適合大批量的小任務(wù). 不適合一下子大量,一下子又無數(shù)據(jù). 不太適合生產(chǎn)者生產(chǎn)速率動蕩變化,每一個任務(wù)都非常長的場景. </pre><pre name="code" class="java">public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 利用了SynchronousQueue.offer和take的調(diào)用配合實現(xiàn)了CachedThreadPool();相比blockingQueue長處:?Syncronize一個堵塞隊列.+ 死循環(huán)cas(compare and swap),不會頻繁的線程掛起和喚醒(park和unpark) ? ?和new ThreadPoolExecutor時設(shè)置coreSize=0,linkedblockingQueue 容量=1 的差別是后者維護一個size=1的堵塞隊列,隊列常常在滿和空之間切換,須要頻繁的線程掛起和喚醒(park和unpark) ? ? ?? 強烈建議打開eclipse相應(yīng)的源碼,.jdk1.6 里的src.zip 源碼包1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { 5 if (runState == RUNNING && workQueue.offer(command)) { 6 if (runState != RUNNING || poolSize == 0) 7 ensureQueuedTaskHandled(command); 8 } 9 else if (!addIfUnderMaximumPoolSize(command)) 10 reject(command); // is shutdown or saturated 11 } 12 }
? ?剛開始execute(runable),queue.offer()失敗,產(chǎn)生新的線程運行任務(wù).一直不停的offer,產(chǎn)生新的線程.到后面老的線程運行完成會調(diào)用take()第一次execute(runable),第5行queue.offer()失敗,進入第9行產(chǎn)生max配額的新線程運行任務(wù)runnable.后面不停的execute(),一直不停的offer失敗,產(chǎn)生max配額的新線程去運行runnable.直到第一次的線程運行任務(wù)完成后會調(diào)用SynchronousQueue.take(). 這樣假設(shè)再有execute(),第5行就能匹配成功新的offer就能配對成功,runnable實例被老的線程獲取運行,不會去新建線程.這個實現(xiàn)了動態(tài)的線程池.所以java才說適合大批量的小的異步任務(wù)(These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.) Executors.固定大小的線程的優(yōu)點是.線程資源是有限的,每一個線程512k?-Xss???? 每一個線程的Stack大小,默認堆棧.避免有些無限制增加線程池的問題.沒有提供可配置的BlockingQueue容量大小.
轉(zhuǎn)載于:https://www.cnblogs.com/zsychanpin/p/7010142.html
總結(jié)
以上是生活随笔為你收集整理的[源码]解析 SynchronousQueue 上界,下界.. 数据保存和数据传递. 堵塞队列. 有无频繁await?...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里云3节点分布式RDS上存放100万数
- 下一篇: angularjs $watch