java.util.concurrent包详细分析--转
原文地址:http://blog.csdn.net/windsunmoon/article/details/36903901
概述
Java.util.concurrent 包含許多線程安全、測(cè)試良好、高性能的并發(fā)構(gòu)建塊。不客氣地說(shuō),創(chuàng)建java.util.concurrent 的目的就是要實(shí)現(xiàn) Collection 框架對(duì)數(shù)據(jù)結(jié)構(gòu)所執(zhí)行的并發(fā)操作。通過(guò)提供一組可靠的、高性能并發(fā)構(gòu)建塊,開(kāi)發(fā)人員可以提高并發(fā)類的線程安全、可伸縮性、性能、可讀性和可靠性。
此包包含locks,concurrent,atomic 三個(gè)包。
Atomic:原子數(shù)據(jù)的構(gòu)建。
Locks:基本的鎖的實(shí)現(xiàn),最重要的AQS框架和lockSupport
Concurrent:構(gòu)建的一些高級(jí)的工具,如線程池,并發(fā)隊(duì)列等。
其中都用到了CAS(compare-and-swap)操作。CAS 是一種低級(jí)別的、細(xì)粒度的技術(shù),它允許多個(gè)線程更新一個(gè)內(nèi)存位置,同時(shí)能夠檢測(cè)其他線程的沖突并進(jìn)行恢復(fù)。它是許多高性能并發(fā)算法的基礎(chǔ)。在 JDK 5.0 之前,Java 語(yǔ)言中用于協(xié)調(diào)線程之間的訪問(wèn)的惟一原語(yǔ)是同步,同步是更重量級(jí)和粗粒度的。公開(kāi) CAS 可以開(kāi)發(fā)高度可伸縮的并發(fā) Java 類。這些更改主要由 JDK 庫(kù)類使用,而不是由開(kāi)發(fā)人員使用。
CAS操作都封裝在java 不公開(kāi)的類庫(kù)中,sun.misc.Unsafe。此類包含了對(duì)原子操作的封裝,具體用本地代碼實(shí)現(xiàn)。本地的C代碼直接利用到了硬件上的原子操作。
Atomic原子數(shù)據(jù)
?這個(gè)包里面提供了一組原子變量類。其基本的特性就是在多線程環(huán)境下,當(dāng)有多個(gè)線程同時(shí)執(zhí)行這些類的實(shí)例包含的方法時(shí),具有排他性,即當(dāng)某個(gè)線程進(jìn)入方法,執(zhí)行其中的指令時(shí),不會(huì)被其他線程打斷,而別的線程就像自旋鎖一樣,一直等到該方法執(zhí)行完成,才由JVM從等待隊(duì)列中選擇一個(gè)另一個(gè)線程進(jìn)入,這只是一種邏輯上的理解。實(shí)際上是借助硬件的相關(guān)指令來(lái)實(shí)現(xiàn)的,不會(huì)阻塞線程(或者說(shuō)只是在硬件級(jí)別上阻塞了)。可以對(duì)基本數(shù)據(jù)、數(shù)組中的基本數(shù)據(jù)、對(duì)類中的基本數(shù)據(jù)進(jìn)行操作。原子變量類相當(dāng)于一種泛化的volatile變量,能夠支持原子的和有條件的讀-改-寫操作。
?java.util.concurrent.atomic中的類可以分成4組:
標(biāo)量類(Scalar):AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
數(shù)組類:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
更新器類:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
復(fù)合變量類:AtomicMarkableReference,AtomicStampedReference
標(biāo)量類
第一組AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference這四種基本類型用來(lái)處理布爾,整數(shù),長(zhǎng)整數(shù),對(duì)象四種數(shù)據(jù),其內(nèi)部實(shí)現(xiàn)不是簡(jiǎn)單的使用synchronized,而是一個(gè)更為高效的方式CAS (compare and swap) + volatile和native方法,從而避免了synchronized的高開(kāi)銷,執(zhí)行效率大為提升。
他們的實(shí)現(xiàn)都是依靠 真正的值為volatile 類型,通過(guò)Unsafe 包中的原子操作實(shí)現(xiàn)。最基礎(chǔ)就是CAS,他是一切的基礎(chǔ)。如下 。其中offset是 在內(nèi)存中 value相對(duì)于基地址的偏移量。(它的獲得也由Unsafe 本地代碼獲得)。關(guān)于加鎖的原理見(jiàn)附錄。
核心代碼如下,其他都是在compareAndSet基礎(chǔ)上構(gòu)建的。
1.?private?static?final?Unsafe?unsafe?=?Unsafe.getUnsafe();
2.?private?volatile?int?value;??
3.?public?final?int?get()?{??
4.?????????return?value;??
5.?}??
6.?public?final?void?set(int?newValue)?{??
7.?????????value?=?newValue;??
8.?}??
9.?public?final?boolean?compareAndSet(int?expect,?int?update)?{??
10.????return?unsafe.compareAndSwapInt(this,?valueOffset,?expect,?update);??
11.}
void set()和void lazySet():set設(shè)置為給定值,直接修改原始值;lazySet延時(shí)設(shè)置變量值,這個(gè)等價(jià)于set()方法,但是由于字段是volatile類型的,因此次字段的修改會(huì)比普通字段(非volatile字段)有稍微的性能延時(shí)(盡管可以忽略),所以如果不是想立即讀取設(shè)置的新值,允許在“后臺(tái)”修改值,那么此方法就很有用。
getAndSet( )方法,利用compareAndSet循環(huán)自旋實(shí)現(xiàn)。
原子的將變量設(shè)定為新數(shù)據(jù),同時(shí)返回先前的舊數(shù)據(jù)。
其本質(zhì)是get( )操作,然后做set( )操作。盡管這2個(gè)操作都是atomic,但是他們合并在一起的時(shí)候,就不是atomic。在Java的源程序的級(jí)別上,如果不依賴synchronized的機(jī)制來(lái)完成這個(gè)工作,是不可能的。只有依靠native方法才可以。
Java代碼??
1.??public?final?int?getAndSet(int?newValue)?{??
2.??????for?(;;)?{??
3.??????????int?current?=?get();??
4.??????????if?(compareAndSet(current,?newValue))??
5.??????????????return?current;??
6.??????}??
7.??}?
對(duì)于 AtomicInteger、AtomicLong還提供了一些特別的方法。貼別是如,
getAndAdd( ):以原子方式將給定值與當(dāng)前值相加, 相當(dāng)于線程安全的t=i;i+=delta;return t;操作。
以實(shí)現(xiàn)一些加法,減法原子操作。(注意 --i、++i不是原子操作,其中包含有3個(gè)操作步驟:第一步,讀取i;第二步,加1或減1;第三步:寫回內(nèi)存)
數(shù)組類
第二組AtomicIntegerArray,AtomicLongArray還有AtomicReferenceArray類進(jìn)一步擴(kuò)展了原子操作,對(duì)這些類型的數(shù)組提供了支持。這些類在為其數(shù)組元素提供?volatile?訪問(wèn)語(yǔ)義方面也引人注目,這對(duì)于普通數(shù)組來(lái)說(shuō)是不受支持的。
他們內(nèi)部并不是像AtomicInteger一樣維持一個(gè)valatile變量,而是全部由native方法實(shí)現(xiàn),如下
AtomicIntegerArray的實(shí)現(xiàn)片斷:
Java代碼??
1.??private?static?final?Unsafe?unsafe?=?Unsafe.getUnsafe();??
2.??private?static?final?int?base?=?unsafe.arrayBaseOffset(int[].class);??//數(shù)組基地址
3.??private?static?final?int?scale?=?unsafe.arrayIndexScale(int[].class);??//數(shù)組元素占的大小精度
4.??private?final?int[]?array;??
5.??public?final?int?get(int?i)?{??
6.??????????return?unsafe.getIntVolatile(array,?rawIndex(i));??
7.??}??
8.??public?final?void?set(int?i,?int?newValue)?{??
9.??????????unsafe.putIntVolatile(array,?rawIndex(i),?newValue);??
10.?}
11.???
12.??private longrawIndex(int i) {//獲取具體某個(gè)元素的偏移量
13.?????????if (i <0 || i >= array.length)
14.?????????????thrownew IndexOutOfBoundsException("index " + i);
15.?????????return base+ (long) i * scale;
16.?}
更新器類
第三組AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater基于反射的實(shí)用工具,可以對(duì)指定類的指定?volatile?字段進(jìn)行原子更新。API非常簡(jiǎn)單,但是也是有一些約束:
(1)字段必須是volatile類型的
(2)字段的描述類型(修飾符public/protected/default/private)是與調(diào)用者與操作對(duì)象字段的關(guān)系一致。也就是說(shuō) 調(diào)用者能夠直接操作對(duì)象字段,那么就可以反射進(jìn)行原子操作。但是對(duì)于父類的字段,子類是不能直接操作的,盡管子類可以訪問(wèn)父類的字段。
(3)只能是實(shí)例變量,不能是類變量,也就是說(shuō)不能加static關(guān)鍵字。
(4)只能是可修改變量,不能使final變量,因?yàn)閒inal的語(yǔ)義就是不可修改。實(shí)際上final的語(yǔ)義和volatile是有沖突的,這兩個(gè)關(guān)鍵字不能同時(shí)存在。
(5)對(duì)于AtomicIntegerFieldUpdater?和AtomicLongFieldUpdater?只能修改int/long類型的字段,不能修改其包裝類型(Integer/Long)。如果要修改包裝類型就需要使用AtomicReferenceFieldUpdater?。
復(fù)合變量類
防止ABA問(wèn)題出現(xiàn)而構(gòu)造的類。如什么是ABA問(wèn)題呢,當(dāng)某些流程在處理過(guò)程中是順向的,也就是不允許重復(fù)處理的情況下,在某些情況下導(dǎo)致一個(gè)數(shù)據(jù)由A變成B,再中間可能經(jīng)過(guò)0-N個(gè)環(huán)節(jié)后變成了A,此時(shí)A不允許再變成B了,因?yàn)榇藭r(shí)的狀態(tài)已經(jīng)發(fā)生了改變,他們都是對(duì)atomicReference的進(jìn)一步包裝,AtomicMarkableReference和AtomicStampedReference功能差不多,有點(diǎn)區(qū)別的是:它描述更加簡(jiǎn)單的是與否的關(guān)系,通常ABA問(wèn)題只有兩種狀態(tài),而AtomicStampedReference是多種狀態(tài),那么為什么還要有AtomicMarkableReference呢,因?yàn)樗谔幚硎桥c否上面更加具有可讀性。
Lcoks 鎖
此包中實(shí)現(xiàn)的最基本的鎖,阻塞線程的LockSupport。核心是AQS框架(AbstractQueuedSynchronizer),是J U C(Java?util concurrent) 最復(fù)雜的一個(gè)類。
Lock 和Synchronized
J U C 中的Lock和synchronized具有同樣的語(yǔ)義和功能。不同的是,synchronized 鎖在退出塊時(shí)自動(dòng)釋放。而Lock 需要手動(dòng)釋放,且Lock更加靈活。Syschronizd 是 java 語(yǔ)言層面的,是系統(tǒng)關(guān)鍵字;Lock則是java 1.5以來(lái)提供的一個(gè)類。
Synchronized 具有以下缺陷,它無(wú)法中斷一個(gè)正在等候獲得鎖的線程;也無(wú)法通過(guò)投票得到鎖,如果不想等下去,也就沒(méi)法得到鎖;同步還要求鎖的釋放只能在與獲得鎖所在的堆棧幀相同的堆棧幀中進(jìn)行。
而Lock(如ReentrantLock?)除了與Synchronized 具有相同的語(yǔ)義外,還支持鎖投票、定時(shí)鎖等候和可中斷鎖等候(就是說(shuō)在等待鎖的過(guò)程中,可以被中斷)的一些特性。
Lock. lockInterruptibly ,調(diào)用后,或者獲得鎖,或者被中斷后拋出異常。優(yōu)先響應(yīng)異常。這點(diǎn)可以用 類似以下代碼測(cè)試。
?Thread a =?new?Thread(task1,?"aa");
?????? Thread b =?new?Thread(task1,?"bb");
?????? a.start();
?????? b.start();
?????? b.interrupt();
LockSupport 和java內(nèi)置鎖
???在LockSupport出現(xiàn)之前,如果要block/unblock某個(gè)Thread,除了使用Java語(yǔ)言內(nèi)置的monitor機(jī)制之外,只能通過(guò)Thread.suspend()和Thread.resume()。然而Thread.suspend()和Thread.resume()基本上不可用,除了可能導(dǎo)致死鎖之外,它們還存在一個(gè)無(wú)法解決的競(jìng)爭(zhēng)條件:如果在調(diào)用Thread.suspend()之前調(diào)用了Thread.resume(),那么該Thread.resume()調(diào)用沒(méi)有任何效果。LockSupport最主要的作用,便是通過(guò)一個(gè)許可(permit)狀態(tài),解決了這個(gè)問(wèn)題。LockSupport?只能阻塞當(dāng)前線程,但是可以喚醒任意線程。
?????那么LockSupport和Java語(yǔ)言內(nèi)置的monitor機(jī)制有什么區(qū)別呢?它們的語(yǔ)義是不同的。LockSupport是針對(duì)特定Thread來(lái)進(jìn)行block/unblock操作的;wait()/notify()/notifyAll()是用來(lái)操作特定對(duì)象的等待集合的。為了防止知識(shí)生銹,在這里簡(jiǎn)單介紹一下Java語(yǔ)言內(nèi)置的monitor機(jī)制(詳見(jiàn):http://whitesock.iteye.com/blog/162344?)。正如每個(gè)Object都有一個(gè)鎖,每個(gè)Object也有一個(gè)等待集合(wait set),它有wait、notify、notifyAll和Thread.interrupt方法來(lái)操作。同時(shí)擁有鎖和等待集合的實(shí)體,通常被成為監(jiān)視器(monitor)。每個(gè)Object的等待集合是由JVM維護(hù)的。等待集合一直存放著那些因?yàn)檎{(diào)用對(duì)象的wait方法而被阻塞的線程。由于等待集合和鎖之間的交互機(jī)制,只有獲得目標(biāo)對(duì)象的同步鎖時(shí),才可以調(diào)用它的wait、notify和notifyAll方法。這種要求通常無(wú)法靠編譯來(lái)檢查,如果條件不能滿足,那么在運(yùn)行的時(shí)候調(diào)用以上方法就會(huì)導(dǎo)致其拋出IllegalMonitorStateException。
??? wait() 方法被調(diào)用后,會(huì)執(zhí)行如下操作:
·????????如果當(dāng)前線程已經(jīng)被中斷,那么該方法立刻退出,然后拋出一個(gè)InterruptedException異常。否則線程會(huì)被阻塞。
·????????JVM把該線程放入目標(biāo)對(duì)象內(nèi)部且無(wú)法訪問(wèn)的等待集合中。
·????????目標(biāo)對(duì)象的同步鎖被釋放,但是這個(gè)線程鎖擁有的其他鎖依然會(huì)被這個(gè)線程保留著。當(dāng)線程重新恢復(fù)質(zhì)執(zhí)行時(shí),它會(huì)重新獲得目標(biāo)對(duì)象的同步鎖。
??? notify()方法被調(diào)用后,會(huì)執(zhí)行如下操作:
·????????如果存在的話,JVM會(huì)從目標(biāo)對(duì)象內(nèi)部的等待集合中任意移除一個(gè)線程T。如果等待集合中的線程數(shù)大于1,那么哪個(gè)線程被選中完全是隨機(jī)的。
·????????T必須重新獲得目標(biāo)對(duì)象的同步鎖,這必然導(dǎo)致它將會(huì)被阻塞到調(diào)用Thead.notify()的線程釋放該同步鎖。如果其他線程在T獲得此鎖之前就獲得它,那么T就要一直被阻塞下去。
·????????T從執(zhí)行wait()的那點(diǎn)恢復(fù)執(zhí)行。
??? notifyAll()方法被調(diào)用后的操作和notify()類似,不同的只是等待集合中所有的線程(同時(shí))都要執(zhí)行那些操作。然而等待集合中的線程必須要在競(jìng)爭(zhēng)到目標(biāo)對(duì)象的同步鎖之后,才能繼續(xù)執(zhí)行。
?在標(biāo)準(zhǔn)的Sun jdk 中,Locksupport的實(shí)現(xiàn)基于Unsafe,都是本地代碼,Android的實(shí)現(xiàn)不全是本地代碼。
一個(gè)線程調(diào)用park阻塞之后,如果被其他線程調(diào)用interrupt(),那么他它會(huì)響應(yīng)中斷,解除阻塞,但是不會(huì)拋出interruption?異常。這點(diǎn)在構(gòu)造可中斷獲取鎖的時(shí)候用到了。
AbstractQueuedSynchronizer
AQS框架是 J U C包的核心。是構(gòu)建同步、鎖、信號(hào)量和自定義鎖的基礎(chǔ)。也是構(gòu)建高級(jí)工具的基礎(chǔ)。
?
從上圖可以看到,鎖,信號(hào)量的實(shí)現(xiàn)內(nèi)部都有兩個(gè)內(nèi)部類,都繼承AQS。
由于AQS的構(gòu)建上采用模板模式(Template mode),即 AQS定義一些框架,而它的實(shí)現(xiàn)延遲到子類。如tryAcquire()方法。由于這個(gè)模式,我們?nèi)绻苯涌碅QS源碼會(huì)比較抽象。所以從某個(gè)具體的實(shí)現(xiàn)切入簡(jiǎn)單易懂。這里選澤ReentrantLock ,它和Synchronized具有同樣的語(yǔ)義。
簡(jiǎn)單說(shuō)來(lái),AbstractQueuedSynchronizer會(huì)把所有的請(qǐng)求線程構(gòu)成一個(gè)CLH隊(duì)列,當(dāng)一個(gè)線程執(zhí)行完畢(lock.unlock())時(shí)會(huì)激活自己的后繼節(jié)點(diǎn),但正在執(zhí)行的線程并不在隊(duì)列中,而那些等待執(zhí)行的線程全 部處于阻塞狀態(tài),經(jīng)過(guò)調(diào)查線程的顯式阻塞是通過(guò)調(diào)用LockSupport.park()完成,而LockSupport.park()則調(diào)用 sun.misc.Unsafe.park()本地方法,再進(jìn)一步,HotSpot在Linux中中通過(guò)調(diào)用pthread_mutex_lock函數(shù)把 線程交給系統(tǒng)內(nèi)核進(jìn)行阻塞。
?
ReentrantLock
從ReentrantLock(可重入鎖)開(kāi)始,分析AQS。首先需要知道這個(gè)鎖和java 內(nèi)置的同步Synchronized具有同樣的語(yǔ)義。如下代碼解釋重入的意思
| Lock?lock?=?new?ReentrantLock(); ????public?void?test() { ???????lock.lock(); ?????? System.out.print("I am test1"); ?????? test();?//?遞歸調(diào)用?……………………………1?遞歸調(diào)用不會(huì)阻塞,因?yàn)橐呀?jīng)獲得了鎖,這就是重入的含義 ???????// test2();//?調(diào)用test2 ………………………2 ???????lock.unlock();//?這里應(yīng)該放在finally?塊中,這里簡(jiǎn)單省略,以后一樣。 ??? } ????public?void?test2() { ???????lock.lock(); ?????? System.out.println("I am test1"); ?????? test2();// ???????lock.unlock(); ??? } |
重入的意思就是,如果已經(jīng)獲得了鎖,如果執(zhí)行期間還需要獲得這個(gè)鎖的話,會(huì)直接獲得所,不會(huì)被阻塞,獲得鎖的次數(shù)加1;每執(zhí)行一次unlock,持有鎖的次數(shù)減1,當(dāng)為0時(shí)釋放鎖。這點(diǎn),Synchronized 具有同樣語(yǔ)義。
查看源碼,可以看到ReentrantLock 對(duì)Lock接口的實(shí)現(xiàn),把所有的操作都委派給一個(gè)叫Sync的類,如下源碼:
?????
?
其中Sync的定義如右圖
所以這個(gè)Syc類是關(guān)鍵。而Sync 基礎(chǔ)AQS。Sync又有兩個(gè)子類,
| final static class NonfairSync extends Sync? final static class FairSync extends Sync? |
顯然是為了支持公平鎖和非公平鎖而定義,默認(rèn)情況下為非公平鎖。
先理一下Reentrant.lock()方法的調(diào)用過(guò)程(默認(rèn)非公平鎖):
?
這 些討厭的Template模式導(dǎo)致很難直觀的看到整個(gè)調(diào)用過(guò)程,其實(shí)通過(guò)上面調(diào)用過(guò)程及AbstractQueuedSynchronizer的注釋可以發(fā)現(xiàn),AbstractQueuedSynchronizer中抽象了絕大多數(shù)Lock的功能,而只把tryAcquire方法延遲到子類中實(shí)現(xiàn)。 tryAcquire方法的語(yǔ)義在于用具體子類判斷請(qǐng)求線程是否可以獲得鎖,無(wú)論成功與否AbstractQueuedSynchronizer都將處理后面的流程。
NonfairSync 和 FairSync 不同的是執(zhí)行l(wèi)ock時(shí)做的操作,如下為 NonfairSync 的操作,其中compareAndSetState(intexpect, int des) 為AQS的方法,設(shè)置同步狀態(tài),NonfairSync 通過(guò)修改同步狀態(tài)獲得鎖,鎖定不成功才執(zhí)行acquire(1),此方法也在AQS中定義。而 FairSync.lock 直接執(zhí)行acquire(1)。
| final?void?lock() { ????????????if?(compareAndSetState(0, 1)) ??????????????? setExclusiveOwnerThread(Thread.currentThread()); ????????????else ??????????????? acquire(1); } |
AQS中的Acquire(int)方法調(diào)用子類中的tryAcquire(int)實(shí)現(xiàn),這里正是模板模式。如下面的源碼。自此已經(jīng)進(jìn)入到了AQS的實(shí)現(xiàn)。
| public?final?void?acquire(int?arg) { ????????if?(!tryAcquire(arg) && ??????????? acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) ????????????selfInterrupt(); } |
其他方法的調(diào)用順序類似,如unlock 調(diào)用AQS的release ,release 調(diào)用Sync的tryRelease()。
下面看NonfairSync.tryAcquire,它調(diào)用Sync.nonfairTryAcquire。以下為實(shí)現(xiàn),首先獲取同步狀態(tài)c,o代表鎖沒(méi)有線程正在競(jìng)爭(zhēng)鎖。如果c=0,那么嘗試用CAS操作獲得鎖;或者c!=0,但是鎖被當(dāng)前線程擁有,那么獲得鎖的次數(shù)增加 acquires 次,這就是重入的概念。以上兩種情況都成功獲得鎖,返回真。如果不是以上兩種情況,就沒(méi)有獲得鎖,返回假。
| final?boolean?nonfairTryAcquire(int?acquires) { ????????????final?Thread current = Thread.currentThread(); ????????????int?c = getState(); ????????????if?(c == 0) { ????????????????if?(compareAndSetState(0, acquires)) { ??????????????????? setExclusiveOwnerThread(current); ????????????????????return?true; ??????????????? } ??????????? } ????????????else?if?(current == getExclusiveOwnerThread()) { ????????????????int?nextc = c + acquires; ????????????????if?(nextc < 0)?// overflow ????????????????????throw?new?Error("Maximum lock count exceeded"); ??????????????? setState(nextc); ????????????????return?true; ??????????? } ????????????return?false; ??????? } |
如果沒(méi)有獲得鎖,即NonfairSync.tryAcqiuer()返回假,那么可以看出 AQS.acquire 將執(zhí)行acquireQueued(addWaiter(Node.EXCLUSIVE), arg);將此線程追加到等待隊(duì)列的隊(duì)尾。其中Node是AQS的一個(gè)內(nèi)部類,他是等待隊(duì)列節(jié)點(diǎn)的抽象。
| private?Node addWaiter(Node mode) { ??????? Node node =?new?Node(Thread.currentThread(), mode); ????????// Try the fast path of enq; backup to full enq on failure ??????? Node pred =?tail; ????????if?(pred !=?null) { ??????????? node.prev?= pred; ????????????if?(compareAndSetTail(pred, node)) { ??????????????? pred.next?= node; ?????????????? ?return?node; ??????????? } ??????? } ??????? enq(node); ????????return?node; } |
其中mode指的是模式,NULL 為獨(dú)占,否則為共享鎖。RetranLock為獨(dú)占鎖。首先把線程包裝為一個(gè)節(jié)點(diǎn)。然后獲取等待隊(duì)列的尾,如果不為NULL的話(這說(shuō)明有其他線程在待隊(duì)列中行),就把初始化node的前驅(qū)為pred.( node.prev?= pred) 然后通過(guò)CAS操作把node 設(shè)置為新的隊(duì)尾,如果成功則設(shè)置pred的后繼為 node.至此 快速進(jìn)隊(duì)完成。
但是如果pred為null(此時(shí)沒(méi)有線程在等待,一開(kāi)始tail 就是null) ,或者CAS設(shè)置隊(duì)尾失敗。則需要執(zhí)行下面的入隊(duì)流程。?這里可能是整個(gè)阻塞隊(duì)列的初始化過(guò)程。Tail 為null
| private?Node enq(final?Node node) { ????????for?(;;) { ??????????? Node t =?tail; ????????????if?(t ==?null) {?// Must initialize ??????????????? Node h =?new?Node();?// Dummy header ??????????????? h.next?= node; ??????????????? node.prev?= h; ????????????????if?(compareAndSetHead(h)) { ????????????????????tail?= node; ????????????????????return?h; ??????????????? } ??????????? } ????????????else?{ ??????????????? node.prev?= t; ????????????????if?(compareAndSetTail(t, node)) { ??????????????????? t.next?= node; ????????????????????return?t; ??????????????? } ??????????? } ?????? } ??? } ? |
該方法就是循環(huán)調(diào)用CAS,即使有高并發(fā)的場(chǎng)景,無(wú)限循環(huán)將會(huì)最終成功把當(dāng)前線程追加到隊(duì)尾(或設(shè)置隊(duì)頭)。總而言之,addWaiter的目的就是通過(guò)CAS把當(dāng)前現(xiàn)在追加到隊(duì)尾,并返回包裝后的Node實(shí)例。
把線程要包裝為Node對(duì)象的主要原因,除了用Node構(gòu)造供虛擬隊(duì)列外,還用Node包裝了各種線程狀態(tài),這些狀態(tài)被精心設(shè)計(jì)為一些數(shù)字值:
SIGNAL(-1) :線程的后繼線程正/已被阻塞,當(dāng)該線程release或cancel時(shí)要重新這個(gè)后繼線程(unpark)
CANCELLED(1):因?yàn)槌瑫r(shí)或中斷,該線程已經(jīng)被取消
CONDITION(-2):表明該線程被處于條件隊(duì)列,就是因?yàn)檎{(diào)用了Condition.await而被阻塞。
PROPAGATE(-3):傳播共享鎖
0:0代表無(wú)狀態(tài)
接下來(lái)執(zhí)行acquireQueued(Node)方法。acquireQueued的主要作用是把已經(jīng)追加到隊(duì)列的線程節(jié)點(diǎn)(addWaiter方法返回值)進(jìn)行阻塞,但阻塞前又通過(guò)tryAccquire重試是否能獲得鎖,如果重試成功能則無(wú)需阻塞,直接返回。
| final?boolean?acquireQueued(final?Node node,?int?arg) { ????????try?{ ????????????boolean?interrupted =?false; ????????????for?(;;) { ????????????????final?Node p = node.predecessor(); ????????????????if?(p ==?head?&& tryAcquire(arg)) { ??????????????????? setHead(node); ??????????????????? p.next?=?null;?// help GC ????????????????????return?interrupted; ??????????????? } ????????????????if?(shouldParkAfterFailedAcquire(p, node) && ??????????????????? parkAndCheckInterrupt()) ??????????????????? interrupted =?true; ??????????? } ??????? }?catch?(RuntimeException ex) { ??????????? cancelAcquire(node); ????????????throw?ex; ??????? } ??? } |
以上的循環(huán)不會(huì)無(wú)限進(jìn)行,因?yàn)榻酉聛?lái)線程會(huì)被阻塞。這由parkAndCheckInterrupt()方法實(shí)現(xiàn),但是它只有在shouldParkAfterFailedAcquire 方法返回 true 的時(shí)候后才會(huì)繼續(xù)執(zhí)行進(jìn)而阻塞。所以看 shouldParkAfterFailedAcquire方法,從方法的名字看 意思是,當(dāng)獲取鎖失敗的時(shí)候是否應(yīng)該阻塞。
| private?static?boolean?shouldParkAfterFailedAcquire(Node pred, Node node) { ????????int?ws = pred.waitStatus; ????????if?(ws == Node.SIGNAL) ????????????/* ???????????? * This node has already set status asking a release ???????????? * to signal it, so it can safely park ???????????? */ ????????????return?true; ????????if?(ws > 0) { ????????????/* ???????????? * Predecessor was cancelled. Skip over predecessors and ???????????? * indicate retry. ???????????? */ ??? ????do?{ ??? ??? node.prev?= pred = pred.prev; ??? ??? }?while?(pred.waitStatus?> 0); ??? ??? pred.next?= node; ??????? }?else?{ ????????????/* ???????????? * waitStatus must be 0 or PROPAGATE. Indicate that we ???????????? * need a signal, but don't park yet. Caller will need to ???????????? * retry to make sure it cannot acquire before parking. ???????????? */ ????????????compareAndSetWaitStatus(pred, ws, Node.SIGNAL); ??????? } ????????return?false; ??? } |
此方法的作用是根據(jù)它的前驅(qū)節(jié)點(diǎn)決定本節(jié)點(diǎn)做什么樣的操作。前面已經(jīng)說(shuō)過(guò)Node的節(jié)點(diǎn)的waitState 表示它個(gè)后繼節(jié)點(diǎn) 需要做什么操作。這里就是對(duì)線程狀態(tài)的檢查,所有這個(gè)方法參數(shù)中有前驅(qū)節(jié)點(diǎn)。
檢查原則在于:
規(guī)則1:如果前繼的節(jié)點(diǎn)狀態(tài)為SIGNAL,表明當(dāng)前節(jié)點(diǎn)需要unpark,則返回成功,此時(shí)acquireQueued方法的第12行(parkAndCheckInterrupt)將導(dǎo)致線程阻塞
規(guī)則2:如果前繼節(jié)點(diǎn)狀態(tài)為CANCELLED(ws>0),說(shuō)明前置節(jié)點(diǎn)已經(jīng)被放棄,則回溯到一個(gè)非取消的前繼節(jié)點(diǎn),返回false,acquireQueued方法的無(wú)限循環(huán)將遞歸調(diào)用該方法,直至規(guī)則1返回true,導(dǎo)致線程阻塞
規(guī)則3:如果前繼節(jié)點(diǎn)狀態(tài)為非SIGNAL、非CANCELLED,則設(shè)置前繼的狀態(tài)為SIGNAL,返回false后進(jìn)入acquireQueued的無(wú)限循環(huán),與規(guī)則2同
總體看來(lái),shouldParkAfterFailedAcquire就是靠前繼節(jié)點(diǎn)判斷當(dāng)前線程是否應(yīng)該被阻塞,如果前繼節(jié)點(diǎn)處于CANCELLED狀態(tài),則順便刪除這些節(jié)點(diǎn)重新構(gòu)造隊(duì)列。
至此,獲取鎖完畢。
請(qǐng)求鎖不成功的線程會(huì)被掛起在acquireQueued方法的第12行,12行以后的代碼必須等線程被解鎖鎖才能執(zhí)行,假如被阻塞的線程得到解鎖,則執(zhí)行第13行,即設(shè)置interrupted = true,之后又進(jìn)入無(wú)限循環(huán)。
解鎖的過(guò)程相對(duì)簡(jiǎn)單一些。
調(diào)用關(guān)系如下順序 ReentrantLock.unlock()????AQS.release()? --Synx.tryRealse()
從無(wú)限循環(huán)的代碼可以看出,并不是得到解鎖的線程一定能獲得鎖,必須在第6行中調(diào)用tryAccquire重新競(jìng)爭(zhēng),因?yàn)殒i是非公平的,有可能被新加入的線程獲得,從而導(dǎo)致剛被喚醒的線程再次被阻塞,這個(gè)細(xì)節(jié)充分體現(xiàn)了“非公平”的精髓。此可以看到,把tryAcquire方法延遲到子類中實(shí)現(xiàn)的做法非常精妙并具有極強(qiáng)的可擴(kuò)展性,令人嘆為觀止!當(dāng)然精妙的不是這個(gè)Templae設(shè)計(jì)模式,而是Doug Lea對(duì)鎖結(jié)構(gòu)的精心布局。
| public?void?unlock() { ??????? sync.release(1); } |
release的語(yǔ)義在于:如果可以釋放鎖,則喚醒隊(duì)列第一個(gè)線程(Head.next)。release先調(diào)用tryRelease調(diào)用是否解鎖成功,解鎖成長(zhǎng)才進(jìn)行下一步操作。
| public?final?boolean?release(int?arg) { ????????if?(tryRelease(arg)) { ??????????? Node h =?head; ????????????if?(h !=?null?&& h.waitStatus?!= 0) ??????????????? unparkSuccessor(h); ????????????return?true; ??????? } ????????return?false; ??? } |
tryRelease與tryAcquire語(yǔ)義相同,把如何釋放的邏輯延遲到子類中。tryRelease語(yǔ)義很明確:如果線程多次鎖定,則進(jìn)行多次釋放,直至status==0則真正釋放鎖,所謂釋放鎖即設(shè)置status為0,因?yàn)闊o(wú)競(jìng)爭(zhēng)所以沒(méi)有使用CAS。如下源代碼
| protected?final?boolean?tryRelease(int?releases) { ????????????int?c = getState() - releases; ????????????if?(Thread.currentThread() != getExclusiveOwnerThread()) ????????????????throw?new?IllegalMonitorStateException(); ????????????boolean?free =?false; ????????????if?(c == 0) { ??????????????? free =?true; ??????????????? setExclusiveOwnerThread(null); ??????????? } ??????????? setState(c); ????????????return?free; ??????? } |
下面的源代碼是喚醒隊(duì)列的第一個(gè)線程。但是其可能被取消,當(dāng)被取消的時(shí)候,從隊(duì)尾往前找線程。(不從對(duì)頭開(kāi)始的原因是,隊(duì)尾一直在變化,不容易判斷)
| private?void?unparkSuccessor(Node node) { ????????/* ???????? * If status is negative (i.e., possibly needing signal) try ???????? * to clear in anticipation of signalling. It is OK if this ???????? * fails or if status is changed by waiting thread. ???????? */ ????????int?ws = node.waitStatus; ????????if?(ws < 0) ????????????compareAndSetWaitStatus(node, ws, 0); ? ????????/* ???????? * Thread to unpark is held in successor, which is normally ???????? * just the next node.? But if cancelled or apparently null, ???????? * traverse backwards from tail to find the actual ???????? * non-cancelled successor. ???????? */ ??????? Node s = node.next; ????????if?(s ==?null?|| s.waitStatus?> 0) { ??????????? s =?null; ????????????for?(Node t =?tail; t !=?null?&& t != node; t = t.prev) ????????????????if?(t.waitStatus?<= 0) ??????????????????? s = t; ??????? } ????????if?(s !=?null) ??????????? LockSupport.unpark(s.thread); ??? } |
?
可中斷鎖的實(shí)現(xiàn):本質(zhì)是調(diào)用 AQS. 他在響應(yīng)中斷后直接跳出循環(huán),拋出異常,而正常額Lock 忽略這個(gè)中斷,只是簡(jiǎn)單的記錄下,然后繼續(xù)循環(huán)。
| private?void?doAcquireInterruptibly(int?arg) ????????throws?InterruptedException { ????????final?Node node = addWaiter(Node.EXCLUSIVE); ????????try?{ ????????????for?(;;) { ????????????????final?Node p = node.predecessor(); ????????????????if?(p ==?head?&& tryAcquire(arg)) { ??????????????????? setHead(node); ??????????????????? p.next?=?null;?// help GC ????????????????????return; ????? ??????????} ????????????????if?(shouldParkAfterFailedAcquire(p, node) && ??????????????????? parkAndCheckInterrupt()) ????????????????????break; ??????????? } ??????? }?catch?(RuntimeException ex) { ??????????? cancelAcquire(node); ????????????throw?ex; ??????? } ????????// Arrive here only if interrupted ??????? cancelAcquire(node); ????????throw?new?InterruptedException(); ??? } |
超時(shí)鎖的實(shí)現(xiàn)基本類似,就是阻塞一段時(shí)間后自己恢復(fù),如果有中斷則拋出異常。
| private?boolean?doAcquireNanos(int?arg,?long?nanosTimeout) ????????throws?InterruptedException { ????????long?lastTime = System.nanoTime(); ????????final?Node node = addWaiter(Node.EXCLUSIVE); ????????try?{ ????????????for?(;;) { ????????????????final?Node p = node.predecessor(); ????????????????if?(p ==?head?&& tryAcquire(arg)) { ??????????????????? setHead(node); ??????????????????? p.next?=?null;?// help GC ????????????????????return?true; ??????????????? } ????????????????if?(nanosTimeout <= 0) { ??????????????????? cancelAcquire(node); ????????????????????return?false; ??????????????? } ????????????????if?(nanosTimeout >?spinForTimeoutThreshold?&& ????????????????????shouldParkAfterFailedAcquire(p, node)) ??????????????????? LockSupport.parkNanos(this, nanosTimeout); ????????????????long?now = System.nanoTime(); ??????? ????????nanosTimeout -= now - lastTime; ??????????????? lastTime = now; ????????????????if?(Thread.interrupted()) ????????????????????break; ??????????? } ??????? }?catch?(RuntimeException ex) { ??????????? cancelAcquire(node); ????????????throw?ex; ??????? } ????????// Arrive here only if interrupted ??????? cancelAcquire(node); ????????throw?new?InterruptedException(); ??? } |
?
Condition
Condition 實(shí)現(xiàn)了與java內(nèi)容monitor 類似的功能。提供 await,signal,signalall 等操作,與object . wait等一系列操作對(duì)應(yīng)。不同的是一個(gè)condition 可以有多個(gè)條件隊(duì)列。這點(diǎn)內(nèi)置monitor 是做不到的。另外還支持 超時(shí)、取消等更加靈活的方式。
和內(nèi)置的Monitor一樣,調(diào)用 Condition。aWait 等操作,需要獲得鎖,也就是 Condition 是和一個(gè)鎖綁定在一起的。它的實(shí)現(xiàn) 是在AQS中,基本思想如下:一下內(nèi)容抄自博客:http://www.nbtarena.com/Html/soft/201308/2429.html
public final void await() throws InterruptedException {
??? // 1.如果當(dāng)前線程被中斷,則拋出中斷異常
??? if (Thread.interrupted())
??????? throw newInterruptedException();
??? // 2.將節(jié)點(diǎn)加入到Condition隊(duì)列中去,這里如果lastWaiter是cancel狀態(tài),那么會(huì)把它踢出Condition隊(duì)列。
??? Node node = addConditionWaiter();
??? // 3.調(diào)用tryRelease,釋放當(dāng)前線程的鎖
??? long savedState =fullyRelease(node);
??? int interruptMode = 0;
??? // 4.為什么會(huì)有在AQS的等待隊(duì)列的判斷?
??? // 解答:signal*作會(huì)將Node從Condition隊(duì)列中拿出并且放入到等待隊(duì)列中去,在不在AQS等待隊(duì)列就看signal是否執(zhí)行了
??? // 如果不在AQS等待隊(duì)列中,就park當(dāng)前線程,如果在,就退出循環(huán),這個(gè)時(shí)候如果被中斷,那么就退出循環(huán)
??? while (!isOnSyncQueue(node)) {
??????? LockSupport.park(this);
??????? if ((interruptMode =checkInterruptWhileWaiting(node)) != 0)
??????????? break;
??? }
??? // 5.這個(gè)時(shí)候線程已經(jīng)被signal()或者signalAll()*作給喚醒了,退出了4中的while循環(huán)
??? // 自旋等待嘗試再次獲取鎖,調(diào)用acquireQueued方法
??? if (acquireQueued(node,savedState) && interruptMode != THROW_IE)
??????? interruptMode = REINTERRUPT;
??? if (node.nextWaiter != null)
??????? unlinkCancelledWaiters();
??? if (interruptMode != 0)
??????? reportInterruptAfterWait(interruptMode);
}
?
整個(gè)await的過(guò)程如下:
1.將當(dāng)前線程加入Condition鎖隊(duì)列。特別說(shuō)明的是,這里不同于AQS的隊(duì)列,這里進(jìn)入的是Condition的FIFO隊(duì)列。進(jìn)行2。
2.釋放鎖。這里可以看到將鎖釋放了,否則別的線程就無(wú)法拿到鎖而發(fā)生死鎖。進(jìn)行3。
3.自旋(while)掛起,直到被喚醒或者超時(shí)或者CACELLED等。進(jìn)行4。
4.獲取鎖(acquireQueued)。并將自己從Condition的FIFO隊(duì)列中釋放,表明自己不再需要鎖(我已經(jīng)拿到鎖了)。
可以看到,這個(gè)await的*作過(guò)程和Object.wait()方法是一樣,只不過(guò)await()采用了Condition隊(duì)列的方式實(shí)現(xiàn)了Object.wait()的功能。
signal和signalAll方法
await*()清楚了,現(xiàn)在再來(lái)看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要將Condition.await*()中FIFO隊(duì)列中第一個(gè)Node喚醒(或者全部Node)喚醒。盡管所有Node可能都被喚醒,但是要知道的是仍然只有一個(gè)線程能夠拿到鎖,其它沒(méi)有拿到鎖的線程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。
?Java Code
?
public final void signal() {
??? if (!isHeldExclusively())
??????? throw newIllegalMonitorStateException();
??? Node first = firstWaiter;
??? if (first != null)
??????? doSignal(first);
}
這里先判斷當(dāng)前線程是否持有鎖,如果沒(méi)有持有,則拋出異常,然后判斷整個(gè)condition隊(duì)列是否為空,不為空則調(diào)用doSignal方法來(lái)喚醒線程,看看doSignal方法都干了一些什么:
?Java Code
?
private void doSignal(Node first) {
??? do {
??????? if ( (firstWaiter =first.nextWaiter) == null)
??????????? lastWaiter = null;
??????? first.nextWaiter = null;
??? } while(!transferForSignal(first) &&
???????????? (first = firstWaiter)!= null);
}
上面的代*很容易看出來(lái),signal就是喚醒Condition隊(duì)列中的第一個(gè)非CANCELLED節(jié)點(diǎn)線程,而signalAll就是喚醒所有非CANCELLED節(jié)點(diǎn)線程。當(dāng)然了遇到CANCELLED線程就需要將其從FIFO隊(duì)列中剔除。
?
?Java Code
?
final boolean transferForSignal(Node node) {
??? /*
???? * 設(shè)置node的waitStatus:Condition->0
???? */
??? if(!compareAndSetWaitStatus(node, Node.CONDITION, 0))
??????? return false;
??? /*
???? * 加入到AQS的等待隊(duì)列,讓節(jié)點(diǎn)繼續(xù)獲取鎖
???? * 設(shè)置前置節(jié)點(diǎn)狀態(tài)為SIGNAL
???? */
??? Node p = enq(node);
??? int c = p.waitStatus;
??? if (c > 0 ||!compareAndSetWaitStatus(p, c, Node.SIGNAL))
???????LockSupport.unpark(node.thread);
??? return true;
}
?
上面就是喚醒一個(gè)await*()線程的過(guò)程,根據(jù)前面的介紹,如果要unpark線程,并使線程拿到鎖,那么就需要線程節(jié)點(diǎn)進(jìn)入AQS的隊(duì)列。所以可以看到在LockSupport.unpark之前調(diào)用了enq(node)*作,將當(dāng)前節(jié)點(diǎn)加入到AQS隊(duì)列。
?
signalAll和signal方法類似,主要的不同在于它不是調(diào)用doSignal方法,而是調(diào)用doSignalAll方法:
?Java Code
?
private void doSignalAll(Node first) {
??? lastWaiter = firstWaiter? = null;
??? do {
??????? Node next =first.nextWaiter;
??????? first.nextWaiter = null;
??????? transferForSignal(first);
??????? first = next;
??? } while (first != null);
}
這個(gè)方法就相當(dāng)于把Condition隊(duì)列中的所有Node全部取出插入到等待隊(duì)列中去。
?
線程池
JUC 中提供了線程池的實(shí)現(xiàn),其基于一系列的抽象和接口。接下里一步一步解開(kāi)線程池的神秘面紗。
首先應(yīng)該了解線程池的使用。J U C 提供了一個(gè) 構(gòu)造線程池的 工廠類。java.util.concurrent.Executors 。此工廠提供了構(gòu)造各種不同類型線程池的靜態(tài)方法。如固定線程池,單一工作線程池,和緩存線程池等。
如下代碼構(gòu)造了一個(gè)具有2個(gè)固定工作線程的線程池。
| ExecutorService ser = Executors.newFixedThreadPool(2); |
經(jīng)過(guò)跟蹤,此構(gòu)造函數(shù)最終調(diào)用如下,其參數(shù)解釋如下:
corePoolSize - 池中所保存的線程數(shù),包括空閑線程。
maximumPoolSize - 池中允許的最大線程數(shù)。
keepAliveTime - 當(dāng)線程數(shù)大于核心時(shí),此為終止前多余的空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間。
unit - keepAliveTime 參數(shù)的時(shí)間單位。
workQueue - 執(zhí)行前用于保持任務(wù)的隊(duì)列。此隊(duì)列僅保持由 execute 方法提交的 Runnable 任務(wù)。
threadFactory - 執(zhí)行程序創(chuàng)建新線程時(shí)使用的工廠。
handler - 由于超出線程范圍和隊(duì)列容量而使執(zhí)行被阻塞時(shí)所使用的處理程序。
?
| public?ThreadPoolExecutor(int?corePoolSize, ???????? ?????????????????????int?maximumPoolSize, ??????????????????????????????long?keepAliveTime, ????????????????????????????? TimeUnit unit, ????????????????????????????? BlockingQueue<Runnable> workQueue, ????????????????????????????? ThreadFactory threadFactory, ????????????????????????????? RejectedExecutionHandler handler) { ????????if?(corePoolSize < 0 || ??????????? maximumPoolSize <= 0 || ??????????? maximumPoolSize < corePoolSize || ??????????? keepAliveTime < 0) ????????????throw?new?IllegalArgumentException(); ????????if?(workQueue ==?null?|| threadFactory ==?null?|| handler ==?null) ????????????throw?new?NullPointerException(); ????????this.corePoolSize?= corePoolSize; ????????this.maximumPoolSize?= maximumPoolSize; ????????this.workQueue?= workQueue; ????????this.keepAliveTime?= unit.toNanos(keepAliveTime); ????????this.threadFactory?= threadFactory; ????????this.handler?= handler; ??? } |
?
我們構(gòu)造的線程池的類型是?ExecutorService,ThreadPoolExecutor繼承AbstractExecutorService,其總體類圖如下,可以看到最初的抽象是Exector。
?
?
接口Executor
該接口只有一個(gè)方法,JDK解釋如下
執(zhí)行已提交的Runnable?任務(wù)的對(duì)象。此接口提供一種將任務(wù)提交與每個(gè)任務(wù)將如何運(yùn)行的機(jī)制(包括線程使用的細(xì)節(jié)、調(diào)度等)分離開(kāi)來(lái)的方法。
不過(guò),Executor 接口并沒(méi)有嚴(yán)格地要求執(zhí)行是異步的。在最簡(jiǎn)單的情況下,執(zhí)行程序可以在調(diào)用者的線程中立即運(yùn)行已提交的任務(wù):
| class DirectExecutor implements Executor { ???? public void execute(Runnable r) { ???????? r.run(); ???? } ?} |
更常見(jiàn)的是,任務(wù)是在某個(gè)不是調(diào)用者線程的線程中執(zhí)行的。以下執(zhí)行程序?qū)槊總€(gè)任務(wù)生成一個(gè)新線程。
| class ThreadPerTaskExecutor implements Executor { ???? public void execute(Runnable r) { ???????? new Thread(r).start(); ???? } ?} |
方法介紹如下:
void execute(Runnable?command)在未來(lái)某個(gè)時(shí)間執(zhí)行給定的命令。該命令可能在新的線程、已入池的線程或者正調(diào)用的線程中執(zhí)行,這由Executor實(shí)現(xiàn)決定。
接口ExecutorService
ExecutorService 是對(duì)?Executor?的擴(kuò)展,JDK文檔解釋如下:
Executor?提供了管理終止的方法,以及可為跟蹤一個(gè)或多個(gè)異步任務(wù)執(zhí)行狀況而生成?Future?的方法。
可以關(guān)閉ExecutorService,這將導(dǎo)致其拒絕新任務(wù)。提供兩個(gè)方法來(lái)關(guān)閉 ExecutorService。shutdown()?方法在終止前允許執(zhí)行以前提交的任務(wù),而?shutdownNow()方法阻止等待任務(wù)啟動(dòng)并試圖停止當(dāng)前正在執(zhí)行的任務(wù)。在終止時(shí),執(zhí)行程序沒(méi)有任務(wù)在執(zhí)行,也沒(méi)有任務(wù)在等待執(zhí)行,并且無(wú)法提交新任務(wù)。應(yīng)該關(guān)閉未使用的 ExecutorService 以允許回收其資源。
通過(guò)創(chuàng)建并返回一個(gè)可用于取消執(zhí)行和/或等待完成的?Future,方法 submit 擴(kuò)展了基本方法Executor.execute(java.lang.Runnable)。方法 invokeAny 和 invokeAll 是批量執(zhí)行的最常用形式,它們執(zhí)行任務(wù) collection,然后等待至少一個(gè),或全部任務(wù)完成(可使用?ExecutorCompletionService?類來(lái)編寫這些方法的自定義變體)。
此接口中的關(guān)鍵是三個(gè)submit 方法,接受一個(gè)任務(wù),并返回結(jié)果Future。
| <T> Future<T> submit(Callable<T> task); ? <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); |
其中Callable 就是帶返回結(jié)果的Runnable。定義如下:
| public?interface?Callable<V> { ????/** ???? * Computes a result, or throws an exception if unable to do so. ???? * ???? *?@return?computed result ???? *?@throws?Exception if unable to compute a result ???? */ ??? V call()?throws?Exception; } |
精彩的是返回一個(gè)表示任務(wù)的未決結(jié)果的 Future。該 Future 的get?方法在成功完成時(shí)將會(huì)返回該任務(wù)的結(jié)果。注意這些過(guò)程是異步的。
接口Future
JDK解釋如下:
Future 表示異步計(jì)算的結(jié)果。它提供了檢查計(jì)算是否完成的方法,以等待計(jì)算的完成,并獲取計(jì)算的結(jié)果。計(jì)算完成后只能使用 get 方法來(lái)獲取結(jié)果,如有必要,計(jì)算完成前可以阻塞此方法。取消則由 cancel 方法來(lái)執(zhí)行。還提供了其他方法,以確定任務(wù)是正常完成還是被取消了。一旦計(jì)算完成,就不能再取消計(jì)算。如果為了可取消性而使用 Future 但又不提供可用的結(jié)果,則可以聲明 Future<?> 形式類型、并返回 null 作為底層任務(wù)的結(jié)果。
它的方法簡(jiǎn)介如下:
| ?boolean | cancel(boolean?mayInterruptIfRunning)? |
| ?V | get()? |
| ?V | get(long?timeout,?TimeUnit?unit)? |
| ?boolean | isCancelled()? |
| ?boolean | isDone()? |
Submit后發(fā)生的事情
有了以上的一些基本了解,接下來(lái)看當(dāng)任務(wù)提交之后發(fā)生的一系列過(guò)程。
Submit 的實(shí)際代碼位于AbstractExecutorService,繼承ExecutorService。來(lái)觀察其三個(gè)submit方法。
構(gòu)造RunnableFuture
| public?Future<?> submit(Runnable task) { ????????if?(task ==?null)?throw?new?NullPointerException(); ??????? RunnableFuture<Object> ftask = newTaskFor(task,?null); ??????? execute(ftask); ????????return?ftask; ??? } ? ????public?<T> Future<T> submit(Runnable task, T result) { ????????if?(task ==?null)?throw?new?NullPointerException(); ??????? RunnableFuture<T> ftask = newTaskFor(task, result); ??????? execute(ftask); ????????return?ftask; ??? } ? ????public?<T> Future<T> submit(Callable<T> task) { ????????if?(task ==?null)?throw?new?NullPointerException(); ??????? RunnableFuture<T> ftask = newTaskFor(task); ??????? execute(ftask); ????????return?ftask; ??? } |
可以看出,不論submit 方法的參數(shù)是什么,都是先構(gòu)造一個(gè)RunnableFuture ,然偶執(zhí)行它,并返回它。執(zhí)行和返回的都是RunnableFuture。所以RunnableFuture實(shí)現(xiàn)了future 接口和runnnable接口。注意這點(diǎn)的類型是RunnableFuture,所有接下來(lái)的execute方法執(zhí)行的run方法是RunnableFuture?的具體實(shí)現(xiàn)類FutureTask的run方法。
來(lái)看RunnableFuture,其代碼如下:
| /** ?* A?{@link Future}?that is?{@link Runnable}. Successful execution of ?* the?<tt>run</tt>?method causes completion of the?<tt>Future</tt> ?* and allows access to its results. ?*?@see?FutureTask ?*?@see?Executor ?*?@since?1.6 ?*?@author?Doug Lea ?*?@param<V>?The result type returned by this Future's?<tt>get</tt>?method ?*/ public?interface?RunnableFuture<V>?extends?Runnable, Future<V> { ????/** ???? * Sets this Future to the result of its computation ???? * unless it has been cancelled. ???? */ ????void?run(); } |
作為?Runnable?的?Future。成功執(zhí)行 run 方法可以完成 Future 并允許訪問(wèn)其結(jié)果。以下代碼可以看出 返回的實(shí)際上是FutureTask,為RunnableFuture的實(shí)現(xiàn)類。
| protected?<T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { ????????return?new?FutureTask<T>(runnable, value);} |
?
關(guān)于 FutureTask ?JDK對(duì)其介紹如下:
可取消的異步計(jì)算。利用開(kāi)始和取消計(jì)算的方法、查詢計(jì)算是否完成的方法和獲取計(jì)算結(jié)果的方法,此類提供了對(duì)Future?的基本實(shí)現(xiàn)。僅在計(jì)算完成時(shí)才能獲取結(jié)果;如果計(jì)算尚未完成,則阻塞 get 方法。一旦計(jì)算完成,就不能再重新開(kāi)始或取消計(jì)算。
可使用FutureTask 包裝?Callable?或?Runnable?對(duì)象。因?yàn)?FutureTask 實(shí)現(xiàn)了 Runnable,所以可將 FutureTask 提交給?Executor?執(zhí)行。
除了作為一個(gè)獨(dú)立的類外,此類還提供了 protected 功能,這在創(chuàng)建自定義任務(wù)類時(shí)可能很有用。
先看其構(gòu)造函數(shù)。可以看出其構(gòu)造函數(shù)主要是一個(gè) 同步器的構(gòu)造。同步器接受一個(gè)Callable類型的參數(shù)。
| public?FutureTask(Callable<V> callable) { ????????if?(callable ==?null) ????????????throw?new?NullPointerException(); ????????sync?=?new?Sync(callable); ??? } ? ?? ????public?FutureTask(Runnable runnable, V result) { ????????sync?=?new?Sync(Executors.callable(runnable, result)); ??? } |
對(duì)于參數(shù)是Runnable 類型時(shí),經(jīng)過(guò)轉(zhuǎn)化為Callable 類型,轉(zhuǎn)化代碼如下,本質(zhì)上就是在Callable 的call方法中調(diào)用Runnable的run方法:
| public?static?<T> Callable<T> callable(Runnable task, T result) { ????????if?(task ==?null) ????????????throw?new?NullPointerException(); ????????return?new?RunnableAdapter<T>(task, result); } static?final?class?RunnableAdapter<T>?implements?Callable<T> { ????????final?Runnable?task; ????????final?T?result; ??????? RunnableAdapter(Runnable? task, T result) { ????????????this.task?= task; ????????????this.result?= result; ??????? } ????????public?T call() { ????????????task.run(); ????????????return?result; ??????? } ??? } |
?
FutureTask的關(guān)鍵邏輯都由他的一個(gè)內(nèi)部類Sync 實(shí)現(xiàn)。我們先暫且不管其具體實(shí)現(xiàn),留在后面說(shuō)。
執(zhí)行
接下來(lái)看 執(zhí)行任務(wù)。Execute 方法實(shí)現(xiàn)在ThreadPoolExecutor 類中,這是具體的線程池。其Execute 方法如下:
| public?void?execute(Runnable command) { ????????if?(command ==?null) ????????????throw?new?NullPointerException(); ????????if?(poolSize?>=?corePoolSize?|| !addIfUnderCorePoolSize(command)) { ????????????if?(runState?==?RUNNING?&&?workQueue.offer(command)) { ????????????????if?(runState?!=?RUNNING?||?poolSize?== 0) ??????????????????? ensureQueuedTaskHandled(command); ??????????? } ?????????? ?else?if?(!addIfUnderMaximumPoolSize(command))//這里是再給一次機(jī)會(huì) ??????????????? reject(command);?// is shutdown or saturated ??????? } ??? } |
具體的邏輯如下描述:
首先判斷空;
如果當(dāng)前池大小 小于 核心池大小(初始就是這樣),那么會(huì)執(zhí)行 addIfUnderCorePoolSize這個(gè)方法。這個(gè)方法就會(huì)創(chuàng)建新的工作線程,且把當(dāng)前任務(wù) command 設(shè)置為他的第一個(gè)任務(wù),并開(kāi)始執(zhí)行,返回true。整個(gè)execute方法結(jié)束。(1)否則加入到等待隊(duì)列中。(2)
執(zhí)行情況1
先看情況1:如下代碼,只有當(dāng)前池大小小于核心池大小的時(shí)候,且線程池處于RUNNING狀態(tài)的時(shí)候才增加新的工作線程,并把傳進(jìn)來(lái)的任務(wù)作為第一個(gè)任務(wù)并開(kāi)始執(zhí)行。此時(shí)返回真,否則返回假。
| /** ???? * Creates and starts a new thread running firstTask as its first ???? * task, only if fewer than corePoolSize threads are running ???? * and the pool is not shut down. ???? *?@param?firstTask the task the new thread should run first (or ???? * null if none) ???? *?@return?true if successful ???? */ ????private?boolean?addIfUnderCorePoolSize(Runnable firstTask) { ??????? Thread t =?null; ????????final?ReentrantLock mainLock =?this.mainLock; ??????? mainLock.lock(); ????????try?{ ????????????if?(poolSize?<?corePoolSize?&&?runState?==?RUNNING) ??????????????? t = addThread(firstTask); ??????? }?finally?{ ??????????? mainLock.unlock(); ??????? } ????????if?(t ==?null) ????????????return?false; ??????? t.start(); ????????return?true; } ? /** ???? * Creates and returns a new thread running firstTask as its first ???? * task. Call only while holding mainLock. ???? * ???? *?@param?firstTask the task the new thread should run first (or ???? * null if none) ???? *?@return?the new thread, or null if threadFactory fails to create thread ???? */ ????private?Thread addThread(Runnable firstTask) { ??????? Worker w =?new?Worker(firstTask);//工作線程, ??????? Thread t =?threadFactory.newThread(w);//封裝成線程 ????????if?(t !=?null) { ??????????? w.thread?= t; ????????????workers.add(w); ????????????int?nt = ++poolSize; ????????????if?(nt >?largestPoolSize) ????????????????largestPoolSize?= nt; ??????? } ????????return?t; ??? } |
執(zhí)行情況2
如果當(dāng)前池大小 大于核心池的大小,或者添加新的工作線程失敗(這可能是多線程環(huán)境下,競(jìng)爭(zhēng)鎖,被阻塞,其他線程已經(jīng)創(chuàng)建好了工作線程)。那么當(dāng)前任務(wù)進(jìn)入到等待隊(duì)列。
如果隊(duì)列滿,或者線程池已經(jīng)關(guān)閉,那么拒絕該任務(wù)。
工作線程worker
對(duì)工作線程的封裝是類Worker,它實(shí)現(xiàn)了Runnable接口。addThread方法把Worker 組成線程(用threadFactory),并加入線程池,重新設(shè)置線程池 大小的 達(dá)到的最大值。
重點(diǎn)研究下worker的run方法,首先運(yùn)行第一個(gè)任務(wù),以后通過(guò)getTask()獲取新的任務(wù),如果得不到,工作線程會(huì)自動(dòng)結(jié)束,在結(jié)束前 會(huì)執(zhí)行一些工作,見(jiàn)后面。
| public?void?run() { ????????????try?{ ??????????????? Runnable task =?firstTask; ????????????? ??firstTask?=?null; ????????????????while?(task !=?null?|| (task = getTask()) !=?null) { ??????????????????? runTask(task); ??????????????????? task =?null; ??????????????? } ??????????? }?finally?{ ??????????????? workerDone(this); ??????????? } } |
執(zhí)行提交的任務(wù),執(zhí)行任務(wù)前 后可以 各進(jìn)行 一些處理,目前默認(rèn)實(shí)現(xiàn)是什么也不做,擴(kuò)展的類可以實(shí)現(xiàn)它。
| private?void?runTask(Runnable task) { ????????????final?ReentrantLock runLock =?this.runLock; ??????????? runLock.lock(); ????????????try?{ ????????????????/* ???????????????? * Ensure that unless pool is stopping, this thread ???????????????? * does not have its interrupt set. This requires a ???????????????? * double-check of state in case the interrupt was ???????????????? * cleared concurrently with a shutdownNow -- if so, ???????????????? * the interrupt is re-enabled. ???????????????? */ ????????????????if?(runState?<?STOP?&& ??????????????????? Thread.interrupted() && ????????????????????runState?>=?STOP) ????????????????????thread.interrupt(); ????????????????/* ???????????????? * Track execution state to ensure that afterExecute ???????????????? * is called only if task completed or threw ???????????????? * exception. Otherwise, the caught runtime exception ???????????????? * will have been thrown by afterExecute itself, in ???????????????? * which case we don't want to call it again. ???????????????? */ ????????????????boolean?ran =?false; ??????????????? beforeExecute(thread, task); ????????????????try?{ ??????????????????? task.run(); ?????? ?????????????ran =?true; ??????????????????? afterExecute(task,?null); ??????????????????? ++completedTasks; ??????????????? }?catch?(RuntimeException ex) { ????????????????????if?(!ran) ??????????????????????? afterExecute(task, ex); ????????????????????throw?ex; ??????????????? } ??????????? }?finally?{ ??????????????? runLock.unlock(); ??????????? } ??????? } |
下面的方法是 工作線程銷毀錢調(diào)用的方法,是在run中調(diào)用的。當(dāng)池大小為0的時(shí)候,調(diào)用tryterminate 方法。
| /* *Performs bookkeeping for an exiting worker thread. ???? ?*?@param?w the worker?此方法在ThreadPoolExecutor?中 ????? */ void?workerDone(Worker w) { ????????final?ReentrantLock mainLock =?this.mainLock; ??????? mainLock.lock(); ????????try?{ ????????????completedTaskCount?+= w.completedTasks; ????????????workers.remove(w); ????????????if?(--poolSize?== 0) ??????????????? tryTerminate(); ??????? }?finally?{ ??????????? mainLock.unlock(); ??????? } ??? } |
這個(gè)方法只有在線程池的狀態(tài)是是stop 或者shutdown的時(shí)候才會(huì)真正的關(guān)閉整個(gè)線程池。另外shutdown也會(huì)調(diào)用這個(gè)方法。
| /** ???? * Transitions to TERMINATED state if either (SHUTDOWN and pool ???? * and queue empty) or (STOP and pool empty), otherwise unless ???? * stopped, ensuring that there is at least one live thread to ???? * handle queued tasks. ???? * ???? * This method is called from the three places in which ???? * termination can occur: in workerDone on exit of the last thread ???? * after pool has been shut down, or directly within calls to ???? * shutdown or shutdownNow, if there are no live threads. ???? */ ????private?void?tryTerminate() { ????????if?(poolSize?== 0) { ????????????int?state =?runState; ????????????if?(state <?STOP?&& !workQueue.isEmpty()) { ??????????????? state =?RUNNING;?// disable termination check below ??????????????? Thread t = addThread(null); ????????????????if?(t !=?null) ??????????????????? t.start(); ??? ????????} ????????????if?(state ==?STOP?|| state ==?SHUTDOWN) { ????????????????runState?=?TERMINATED; ????????????????termination.signalAll(); ??????????????? terminated(); ??????????? } ??????? } ??? } |
FutureTask
此類是RunnableFuture的實(shí)現(xiàn)類。線程池執(zhí)行的run方法是它的run方法。它委托給Sync實(shí)現(xiàn),SYNC 繼承AQS。
| /** ???? * Sets this Future to the result of its computation ???? * unless it has been cancelled. ???? */ ????public?void?run() { ????????sync.innerRun(); ??? } |
重點(diǎn)看Sync。對(duì)具體任務(wù)的調(diào)用發(fā)生在innerSet(callable.call());這句調(diào)用,innerSet的方法 作用是 設(shè)置get方法的返回值。
| void?innerRun() { ????????????if?(!compareAndSetState(0,?RUNNING)) ????????????????return; ????????????try?{ ????????????????runner?= Thread.currentThread(); ????????????????if?(getState() ==?RUNNING)?// recheck after setting thread ??????????????????? innerSet(callable.call()); ????????????????else ??????????????????? releaseShared(0);?// cancel ??????????? }?catch?(Throwable ex) { ??????????????? innerSetException(ex); ??????????? } ??????? } |
//設(shè)置后才釋放鎖。
| void?innerSet(V v) { ??? ????for?(;;) { ???????int?s = getState(); ???????if?(s ==?RAN) ?????? ????return; ????????????????if?(s ==?CANCELLED) { ?????? ????// aggressively release to set runner to null, ?????? ????// in case we are racing with a cancel request ?????? ????// that will try to interrupt runner ??????????????????? releaseShared(0); ????????????????????return; ??????????????? } ???????if?(compareAndSetState(s,?RAN)) { ????????????????????result?= v; ??????????????????? releaseShared(0); ??????????????????? done(); ?????? ????return; ??????????????? } ??????????? } ??????? } |
而get方法是需要獲取鎖的,所以在具體的任務(wù)沒(méi)有執(zhí)行完前,調(diào)用get方法會(huì)進(jìn)入到阻塞狀態(tài)。
| V innerGet()?throws?InterruptedException, ExecutionException { ??????????? acquireSharedInterruptibly(0); ????????????if?(getState() ==?CANCELLED) ????????????????throw?new?CancellationException(); ????????????if?(exception?!=?null) ????????????????throw?new?ExecutionException(exception); ????????????return?result; ??????? } |
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
參考
http://www.cnblogs.com/sarafill/archive/2011/05/18/2049461.html框架介紹,比較廣泛
http://chenzehe.iteye.com/blog/1759884原子類
http://blog.sina.com.cn/s/blog_75f0b54d0100r7af.html鎖的操作系統(tǒng)原理
http://www.infoq.com/cn/articles/atomic-operation此人是淘寶大神,原子操作的實(shí)現(xiàn)
http://agapple.iteye.com/blog/970055?? java線程阻塞中斷和LockSupport的常見(jiàn)問(wèn)題
http://blog.csdn.net/hintcnuie/article/details/11022049??????????????????? ??對(duì)比synchronized與java.util.concurrent.locks.Lock的異同
http://www.blogjava.net/xylz/archive/2010/07/06/325390.htmlAQS
http://www.open-open.com/lib/view/open1352431606912.html? 比較清晰的解釋AQS的實(shí)現(xiàn)。? 這點(diǎn)給我的啟示是,看源代碼的時(shí)候,如果碰到 抽象類,那么跟它的實(shí)現(xiàn)類 結(jié)合一起看,搞清楚調(diào)用關(guān)系(這里肯定是模板模式,調(diào)用關(guān)系單單看抽象類看不明白)
http://whitesock.iteye.com/blog/162344java內(nèi)置的鎖機(jī)制
http://whitesock.iteye.com/blog/1336409? Inside AbstractQueuedSynchronizer 系列,寫的非常精彩
?
http://www.nbtarena.com/Html/soft/201308/2429.html? 有關(guān)condition 的講解。
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/6094996.html
總結(jié)
以上是生活随笔為你收集整理的java.util.concurrent包详细分析--转的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: spring源码分析之spring-co
- 下一篇: Java 理论与实践: 正确使用 Vol