本文首發(fā)在infoQ? ? 作者:劉錕洋
前言 經(jīng)過(guò)本系列的上半部分JDK1.8 AbstractQueuedSynchronizer的實(shí)現(xiàn)分析(上)的解讀,相信很多讀者已經(jīng)對(duì)AbstractQueuedSynchronizer(下文簡(jiǎn)稱AQS)的獨(dú)占功能了然于胸,那么,這次我們?cè)俳柚硪粋€(gè)工具類:CoutDownLatch,換個(gè)角度看看AQS的另外一個(gè)重要功能——共享功能的實(shí)現(xiàn)。
?
?AQS共享功能的實(shí)現(xiàn) ? ? ?在開始解讀AQS的共享功能前,我們?cè)僦販匾幌翪ountDownLatch,CountDownLatch為 java.util.concurrent包下的計(jì)數(shù)器工具類,常被用在多線程環(huán)境下,它在初始時(shí)需要指定一個(gè)計(jì)數(shù)器的大小,然后可被多個(gè)線程并發(fā)的實(shí)現(xiàn) 減1操作,并在計(jì)數(shù)器為0后調(diào)用await方法的線程被喚醒,從而實(shí)現(xiàn)多線程間的協(xié)作。它在多線程環(huán)境下的基本使用方式為:
//main thread // 新建一個(gè)CountDownLatch,并制定一個(gè)初始大小 CountDownLatch countDownLatch = new CountDownLatch(3); // 調(diào)用await方法后,main線程將阻塞在這里,直到countDownLatch 中的計(jì)數(shù)為0 countDownLatch.await(); System.out.println("over"); //thread1 // do something //........... //調(diào)用countDown方法,將計(jì)數(shù)減1 countDownLatch.countDown(); //thread2 // do something //........... //調(diào)用countDown方法,將計(jì)數(shù)減1 countDownLatch.countDown(); //thread3 // do something //........... //調(diào)用countDown方法,將計(jì)數(shù)減1 countDownLatch.countDown();
? ? ?注意,線程thread 1,2,3各自調(diào)用?countDown后,countDownLatch?的計(jì)數(shù)為0,await方法返回,控制臺(tái)輸入“over”,在此之前main thread 會(huì)一直沉睡。 ? ? ? 可以看到CountDownLatch的作用類似于一個(gè)“欄柵”,在CountDownLatch的計(jì)數(shù)為0前,調(diào)用await方法的線程將一直阻塞,直到CountDownLatch計(jì)數(shù)為0,await方法才會(huì)返回, ? ? ?而CountDownLatch的countDown()方法則一般由各個(gè)線程調(diào)用,實(shí)現(xiàn)CountDownLatch計(jì)數(shù)的減1。 ? ? ? 知道了CountDownLatch的基本使用方式,我們就從上述DEMO的第一行new?CountDownLatch(3)開始,看看CountDownLatch是怎么實(shí)現(xiàn)的。 ? ?? ? ? ?首先,看下CountDownLatch的構(gòu)造方法: ? ? ?和ReentrantLock類似,CountDownLatch內(nèi)部也有一個(gè)叫做Sync的內(nèi)部類,同樣也是用它繼承了AQS。 ? ? ?再看下Sync: ? ? ? ? ? ?如果你看過(guò)本系列的上半部分,你對(duì)setState方法一定不會(huì)陌生,它是AQS的一個(gè)“狀態(tài)位”,在不同的場(chǎng)景下,代表不同的含義,比如在ReentrantLock中,表示加鎖的次數(shù),在CountDownLatch中, ? ? 則表示CountDownLatch的計(jì)數(shù)器的初始大小。 ? ? 設(shè)置完計(jì)數(shù)器大小后CountDownLatch的構(gòu)造方法返回,下面我們?cè)倏聪翪ountDownLatch的await()方法: ? ?? ? ? 調(diào)用了Sync的acquireSharedInterruptibly方法,因?yàn)镾ync是AQS子類的原因,這里其實(shí)是直接調(diào)用了AQS的acquireSharedInterruptibly方法: ? ? ? ? ? ? ? ? 從方法名上看,這個(gè)方法的調(diào)用是響應(yīng)線程的打斷的,所以在前兩行會(huì)檢查下線程是否被打斷。接著,嘗試著獲取共享鎖,小于0,表示獲取失敗,通過(guò)本系列的上半部分的解讀, ? ?我們知道AQS在獲取鎖的思路是,先嘗試直接獲取鎖,如果失敗會(huì)將當(dāng)前線程放在隊(duì)列中,按照FIFO的原則等待鎖。 ? ? 而對(duì)于共享鎖也是這個(gè)思路,如果和獨(dú)占鎖一致,這里的tryAcquireShared應(yīng)該是個(gè)空方法,留給子類去判斷: ? ? ? ? ? 再看看CountDownLatch: ? ? ? ? ? ?如果state變成0了,則返回1,表示獲取成功,否則返回-1則表示獲取失敗。 ? ? ?看到這里,讀者可能會(huì)發(fā)現(xiàn),?await方法的獲取方式更像是在獲取一個(gè)獨(dú)占鎖,那為什么這里還會(huì)用tryAcquireShared呢? ? ? ?回想下CountDownLatch的await方法是不是只能在主線程中調(diào)用?答案是否定的,CountDownLatch的await方法可以在多個(gè)線程中調(diào)用,當(dāng)CountDownLatch的計(jì)數(shù)器為0后,調(diào)用await的方法都會(huì)依次返回。 ? ? ?也就是說(shuō)可以多個(gè)線程同時(shí)在等待await方法返回,所以它被設(shè)計(jì)成了實(shí)現(xiàn)tryAcquireShared方法,獲取的是一個(gè)共享鎖,鎖在所有調(diào)用await方法的線程間共享,所以叫共享鎖。 ? ? ? 回到acquireSharedInterruptibly方法: ? ?? ? ?如果獲取共享鎖失敗(返回了-1,說(shuō)明state不為0,也就是CountDownLatch的計(jì)數(shù)器還不為0),進(jìn)入調(diào)用doAcquireSharedInterruptibly方法中,按照我們上述的猜想,應(yīng)該是要將當(dāng)前線程放入到隊(duì)列中去。 ? 在這之前,我們?cè)倩仡櫼幌翧QS隊(duì)列的數(shù)據(jù)結(jié)構(gòu):AQS是一個(gè)雙向鏈表,通過(guò)節(jié)點(diǎn)中的next,pre變量分別指向當(dāng)前節(jié)點(diǎn)后一個(gè)節(jié)點(diǎn)和前一個(gè)節(jié)點(diǎn)。其 中,每個(gè)節(jié)點(diǎn)中都包含了一個(gè)線程和一個(gè)類型變量:表示當(dāng)前節(jié)點(diǎn)是獨(dú)占節(jié)點(diǎn)還是共享節(jié)點(diǎn),頭節(jié)點(diǎn)中的線程為正在占有鎖的線程,而后的所有節(jié)點(diǎn)的線程表示為正 在等待獲取鎖的線程。如下圖所示: ? ? 黃色節(jié)點(diǎn),表示正在獲取鎖的節(jié)點(diǎn),剩下的藍(lán)色節(jié)點(diǎn)(Node1、Node2、Node3)為正在等待鎖的節(jié)點(diǎn),他們通過(guò)各自的next,pre變量分別指向前后節(jié)點(diǎn),形成了AQS中的雙向鏈表。? ? ? 再看看doAcquireSharedInterruptibly方法: 01 private void doAcquireSharedInterruptibly(int arg)
02 ?????throws InterruptedException {
03 ?????final Node node = addWaiter(Node.SHARED); //將當(dāng)前線程包裝為類型為Node.SHARED的節(jié)點(diǎn),標(biāo)示這是一個(gè)共享節(jié)點(diǎn)。
04 ?????boolean failed = true;
07 ?????????????final Node p = node.predecessor();
08 ?????????????if (p == head) {//如果新建節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn),就是Head,說(shuō)明當(dāng)前節(jié)點(diǎn)是AQS隊(duì)列中等待獲取鎖的第一個(gè)節(jié)點(diǎn),按照FIFO的原則,可以直接嘗試獲取鎖。
09 ?????????????????int r = tryAcquireShared(arg);
10 ?????????????????if (r >= 0) {
11 ?????????????????????setHeadAndPropagate(node, r); //獲取成功,需要將當(dāng)前節(jié)點(diǎn)設(shè)置為AQS隊(duì)列中的第一個(gè)節(jié)點(diǎn),這是AQS的規(guī)則,隊(duì)列的頭節(jié)點(diǎn)表示正在獲取鎖的節(jié)點(diǎn)
12 ?????????????????????p.next = null; // help GC
13 ?????????????????????failed = false;
14 ?????????????????????return;
17 ?????????????if (shouldParkAfterFailedAcquire(p, node) && //檢查下是否需要將當(dāng)前節(jié)點(diǎn)掛起
18 ?????????????????parkAndCheckInterrupt())
19 ?????????????????throw new InterruptedException();
23 ?????????????cancelAcquire(node);
這里有幾點(diǎn)需要說(shuō)明的: ?1.?setHeadAndPropagate方法: ? ? ? ? 首先,使用了CAS更換了頭節(jié)點(diǎn),然后,將當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)取出來(lái),如果同樣是“shared”類型的,再做一個(gè)”releaseShared”操作。看下doReleaseShared方法:
03 ?????if (h != null && h != tail) {
04 ?????????int ws = h.waitStatus;
05 ?????????if (ws == Node.SIGNAL) {
06 ?????????????if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //如果當(dāng)前節(jié)點(diǎn)是SIGNAL意味著,它正在等待一個(gè)信號(hào),
07 ???????????????????????????????????????????????????????????????????????????????????????//或者說(shuō),它在等待被喚醒,因此做兩件事,
08 ???????????????????????????????????????????????????????????????????????????????????????//1是重置waitStatus標(biāo)志位,2是重置成功后,喚醒下一個(gè)節(jié)點(diǎn)。
09 ?????????????????continue;??????????? // loop to recheck cases
10 ?????????????unparkSuccessor(h);
12 ?????????else if (ws == 0 &&
13 ??????????????????!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))? //如果本身頭結(jié)點(diǎn)的waitStatus是出于重置狀態(tài)(waitStatus==0)的,將其設(shè)置為“傳播”狀態(tài)。意味著需要將狀態(tài)向后一個(gè)節(jié)點(diǎn)傳播。
14 ?????????????continue;??????????????? // loop on failed CAS
16 ?????if (h == head)?????????????????? // loop if head changed
? ? 為什么要這么做呢?這就是共享功能和獨(dú)占功能最不一樣的地方,對(duì)于獨(dú)占功能來(lái)說(shuō),有且只有一個(gè)線程(通常只對(duì)應(yīng)一個(gè)節(jié)點(diǎn),拿ReentantLock舉例,如果當(dāng)前持有鎖的線程重復(fù)調(diào)用lock()方法, 那根據(jù)本系列上半部分我們的介紹,我們知道,會(huì)被包裝成多個(gè)節(jié)點(diǎn)在AQS的隊(duì)列中,所以用一個(gè)線程來(lái)描述更準(zhǔn)確),能夠獲取鎖,但是對(duì)于共享功能來(lái)說(shuō)。 共享的狀態(tài)是可以被共享的,也就是意味著其他AQS隊(duì)列中的其他節(jié)點(diǎn)也應(yīng)能第一時(shí)間知道狀態(tài)的變化。因此,一個(gè)節(jié)點(diǎn)獲取到共享狀態(tài)流程圖是這樣的: ? ? ?比如現(xiàn)在有如下隊(duì)列: ? ? ?當(dāng)Node1調(diào)用tryAcquireShared成功后,更換了頭節(jié)點(diǎn): ? ? ? ? ?Node1變成了頭節(jié)點(diǎn)然后調(diào)用unparkSuccessor()方法喚醒了Node2,Node2中持有的線程A出于上面流程圖的park node的位置,
? ? ?線程A被喚醒后,重復(fù)黃色線條的流程,重新檢查調(diào)用tryAcquireShared方法,看能否成功,如果成功,則又更改頭結(jié)點(diǎn),重復(fù)以上步驟,以實(shí)現(xiàn)節(jié)點(diǎn)自身獲取共享鎖成功后,喚醒下一個(gè)共享類型結(jié)點(diǎn)的操作,實(shí)現(xiàn)共享狀態(tài)的向后傳遞。
?2.其實(shí)對(duì)于doAcquireShared方法,AQS還提供了集中類似的實(shí)現(xiàn):
? ? ?分別對(duì)應(yīng)了:
?1. 帶參數(shù)請(qǐng)求共享鎖。 (忽略中斷)
?2.?帶參數(shù)請(qǐng)求共享鎖,且響應(yīng)中斷。(每次循環(huán)時(shí),會(huì)檢查當(dāng)前線程的中斷狀態(tài),以實(shí)現(xiàn)對(duì)線程中斷的響應(yīng))
?3.?帶參數(shù)請(qǐng)求共享鎖但是限制等待時(shí)間。(第二個(gè)參數(shù)設(shè)置超時(shí)時(shí)間,超出時(shí)間后,方法返回。)
比較特別的為最后一個(gè)doAcquireSharedNanos方法,我們一起看下它怎么實(shí)現(xiàn)超時(shí)時(shí)間的控制的。
因?yàn)樵摲椒ê推溆喃@取共享鎖的方法邏輯是類似的,我用紅色框圈出了它所不一樣的地方,也就是實(shí)現(xiàn)超時(shí)時(shí)間控制的地方。
可以看到,其實(shí)就是在進(jìn)入方法時(shí),計(jì)算出了一個(gè)“deadline”,每次循環(huán)的時(shí)候用當(dāng)前時(shí)間和“deadline”比較,大于“dealine”說(shuō)明超時(shí)時(shí)間已到,直接返回方法。
注意,最后一個(gè)紅框中的這行代碼:
? ? nanosTimeout > spinForTimeoutThreshold
從變量的字面意思可知,這是拿超時(shí)時(shí)間和超時(shí)自旋的最小閥值作比較,在這里Doug Lea把超時(shí)自旋的閥值設(shè)置成了1000ns,即只有超時(shí)時(shí)間大于1000ns才會(huì)去掛起線程,否則,再次循環(huán),以實(shí)現(xiàn)“自旋”操作。這是“自旋”在AQS中的應(yīng)用之處。
?
看完await方法,我們?cè)賮?lái)看下countDown()方法:
調(diào)用了AQS的releaseShared方法,并傳入了參數(shù)1: 同樣先嘗試去釋放鎖,tryReleaseShared同樣為空方法,留給子類自己去實(shí)現(xiàn),以下是CountDownLatch的內(nèi)部類Sync的實(shí)現(xiàn): 死循環(huán)更新state的值,實(shí)現(xiàn)state的減1操作,之所以用死循環(huán)是為了確保state值的更新成功。
從上文的分析中可知,如果state的值為0,在CountDownLatch中意味:所有的子線程已經(jīng)執(zhí)行完畢,這個(gè)時(shí)候可以喚醒調(diào)用await()方法的線程了,而這些線程正在AQS的隊(duì)列中,并被掛起的,
所以下一步應(yīng)該去喚醒AQS隊(duì)列中的頭結(jié)點(diǎn)了(AQS的隊(duì)列為FIFO隊(duì)列),然后由頭節(jié)點(diǎn)去依次喚醒AQS隊(duì)列中的其他共享節(jié)點(diǎn)。如果tryReleaseShared返回true,進(jìn)入doReleaseShared()方法:
??? private void doReleaseShared() { ??????? /* ???????? * Ensure that a release propagates, even if there are other ???????? * in-progress acquires/releases.? This proceeds in the usual ???????? * way of trying to unparkSuccessor of head if it needs ???????? * signal. But if it does not, status is set to PROPAGATE to ???????? * ensure that upon release, propagation continues. ???????? * Additionally, we must loop in case a new node is added ???????? * while we are doing this. Also, unlike other uses of ???????? * unparkSuccessor, we need to know if CAS to reset status ???????? * fails, if so rechecking. ???????? */ ??????? for (;;) { ??????????? Node h = head; ??????????? if (h != null && h != tail) { ??????????????? int ws = h.waitStatus; ??????????????? if (ws == Node.SIGNAL) { ??????????????????? if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) ??????????????????????? continue;??????????? // loop to recheck cases ??????????????????? unparkSuccessor(h); ??????????????? } ??????????????? else if (ws == 0 && ???????????????????????? !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) ??????????????????? continue;??????????????? // loop on failed CAS ??????????? } ??????????? if (h == head)?????????????????? // loop if head changed ??????????????? break; ??????? } ??? }
? 當(dāng)線程被喚醒后,會(huì)重新嘗試獲取共享鎖,而對(duì)于CountDownLatch線程獲取共享鎖判斷依據(jù)是state是否為0,而這個(gè)時(shí)候顯然state已經(jīng)變成了0,因此可以順利獲取共享鎖并且依次喚醒AQS隊(duì)里中后面的節(jié)點(diǎn)及對(duì)應(yīng)的線程。 ? 總結(jié) ? ? ?本文從CountDownLatch入手,深入分析了AQS關(guān)于共享鎖方面的實(shí)現(xiàn)方式: ? ? ?如果獲取共享鎖失敗后,將請(qǐng)求共享鎖的線程封裝成Node對(duì)象放入AQS的隊(duì)列中,并掛起Node對(duì)象對(duì)應(yīng)的線程,實(shí)現(xiàn)請(qǐng)求鎖線程的等待操作。待共享鎖 可以被獲取后,從頭節(jié)點(diǎn)開始,依次喚醒頭節(jié)點(diǎn)及其以后的所有共享類型的節(jié)點(diǎn)。實(shí)現(xiàn)共享狀態(tài)的傳播。這里有幾點(diǎn)值得注意: 1.???? 與AQS的獨(dú)占功能一樣,共享鎖是否可以被獲取的判斷為空方法,交由子類去實(shí)現(xiàn)。 2.???? 與AQS的獨(dú)占功能不同,當(dāng)鎖被頭節(jié)點(diǎn)獲取后,獨(dú)占功能是只有頭節(jié)點(diǎn)獲取鎖,其余節(jié)點(diǎn)的線程繼續(xù)沉睡,等待鎖被釋放后,才會(huì)喚醒下一個(gè)節(jié)點(diǎn)的線程,而共享 功能是只要頭節(jié)點(diǎn)獲取鎖成功,就在喚醒自身節(jié)點(diǎn)對(duì)應(yīng)的線程的同時(shí),繼續(xù)喚醒AQS隊(duì)列中的下一個(gè)節(jié)點(diǎn)的線程,每個(gè)節(jié)點(diǎn)在喚醒自身的同時(shí)還會(huì)喚醒下一個(gè)節(jié)點(diǎn) 對(duì)應(yīng)的線程,以實(shí)現(xiàn)共享狀態(tài)的“向后傳播”,從而實(shí)現(xiàn)共享功能。
以上的分析都是從AQS子類的角度去看待AQS的部分功能的,而如果直接看待AQS,或許可以這么去解讀: 首先,AQS并不關(guān)心“是什么鎖”,對(duì)于AQS來(lái)說(shuō)它只是實(shí)現(xiàn)了一系列的用于判斷“資源”是否可以訪問(wèn)的API,并且封裝了在“訪問(wèn)資源”受限時(shí)將請(qǐng)求訪 問(wèn)的線程的加入隊(duì)列、掛起、喚醒等操作, AQS只關(guān)心“資源不可以訪問(wèn)時(shí),怎么處理?”、“資源是可以被同時(shí)訪問(wèn),還是在同一時(shí)間只能被一個(gè)線程訪問(wèn)?”、“如果有線程等不及資源了,怎么從 AQS的隊(duì)列中退出?”等一系列圍繞資源訪問(wèn)的問(wèn)題,而至于“資源是否可以被訪問(wèn)?”這個(gè)問(wèn)題則交給AQS的子類去實(shí)現(xiàn)。 當(dāng)AQS的子類是實(shí)現(xiàn)獨(dú)占功能時(shí),例如ReentrantLock,“資源是否可以被訪問(wèn)”被定義為只要AQS的state變量不為0,并且持有鎖的線程不是當(dāng)前線程,則代表資源不能訪問(wèn)。 當(dāng)AQS的子類是實(shí)現(xiàn)共享功能時(shí),例如:CountDownLatch,“資源是否可以被訪問(wèn)”被定義為只要AQS的state變量不為0,說(shuō)明資源不能 訪問(wèn)。這是典型的將規(guī)則和操作分開的設(shè)計(jì)思路:規(guī)則子類定義,操作邏輯因?yàn)榫哂泄眯?#xff0c;放在父類中去封裝。當(dāng)然,正式因?yàn)锳QS只是關(guān)心“資源在什么條件 下可被訪問(wèn)”,所以子類還可以同時(shí)使用AQS的共享功能和獨(dú)占功能的API以實(shí)現(xiàn)更為復(fù)雜的功能。 比如:ReentrantReadWriteLock,我們知道ReentrantReadWriteLock的中也有一個(gè)叫Sync的內(nèi)部類繼承了 AQS,而AQS的隊(duì)列可以同時(shí)存放共享鎖和獨(dú)占鎖,對(duì)于ReentrantReadWriteLock來(lái)說(shuō)分別代表讀鎖和寫鎖,當(dāng)隊(duì)列中的頭節(jié)點(diǎn)為讀鎖 時(shí),代表讀操作可以執(zhí)行,而寫操作不能執(zhí)行,因此請(qǐng)求寫操作的線程會(huì)被掛起,當(dāng)讀操作依次推出后,寫鎖成為頭節(jié)點(diǎn),請(qǐng)求寫操作的線程被喚醒,可以執(zhí)行寫操 作,而此時(shí)的讀請(qǐng)求將被封裝成Node放入AQS的隊(duì)列中。如此往復(fù),實(shí)現(xiàn)讀寫鎖的讀寫交替進(jìn)行。 而本系列文章上半部分提到的FutureTask,其實(shí)思路也是:封裝一個(gè)存放線程執(zhí)行結(jié)果的變量A,使用AQS的獨(dú)占API實(shí)現(xiàn)線程對(duì)變量A的獨(dú)占訪 問(wèn),判斷規(guī)則是,線程沒(méi)有執(zhí)行完畢:call()方法沒(méi)有返回前,不能訪問(wèn)變量A,或者是超時(shí)時(shí)間沒(méi)到前不能訪問(wèn)變量A(這就是FutureTask的 get方法可以實(shí)現(xiàn)獲取線程執(zhí)行結(jié)果時(shí),設(shè)置超時(shí)時(shí)間的原因)。 綜上所述,本系列文章從AQS獨(dú)占鎖和共享鎖兩個(gè)方面深入分析了AQS的實(shí)現(xiàn)方式和獨(dú)特的設(shè)計(jì)思路,希望對(duì)讀者有啟發(fā),下一篇文章,我們將繼續(xù)JDK 1.8下 J.U.C (java.util.concurrent)包中的其他工具類,敬請(qǐng)期待。
轉(zhuǎn)載于:https://www.cnblogs.com/voodgen/p/5655593.html
總結(jié)
以上是生活随笔 為你收集整理的深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下) 的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔 網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔 推薦給好友。