Java Review - 并发编程_ CountDownLatch原理源码剖析
文章目錄
- Pre
- 小Demo
- CountDownLatch VS join方法
- 類圖關系
- 核心方法&源碼解析
- void await()
- boolean await(long timeout, TimeUnit unit)
- void countDown()
- long getCount()
- 小結
Pre
每日一博 - CountDownLatch使用場景分析以及源碼分析
在日常開發中經常會遇到需要在主線程中開啟多個線程去并行執行任務,并且主線程需要等待所有子線程執行完畢后再進行匯總的場景。
在CountDownLatch出現之前一般都使用線程的join()方法來實現這一點,但是join方法不夠靈活,不能夠滿足不同場景的需要,所以JDK開發組提供了CountDownLatch這個類,使用CountDownLatch會更優雅.
小Demo
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/19 10:46* @mark: show me the code , change the world*/ public class CountDownLatchTest {// 創建一個CountDownLatch實例private static volatile CountDownLatch countDownLatch = new CountDownLatch(2);public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(2);executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + " 模擬業務運行");try {TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() + " 業務運行Over");} catch (InterruptedException e) {e.printStackTrace();}finally {// 子線程執行結束,減1countDownLatch.countDown();}});executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + " 模擬業務運行");try {TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() + " 業務運行Over");} catch (InterruptedException e) {e.printStackTrace();}finally {// 子線程執行結束,減1countDownLatch.countDown();}});// 等待子線程執行執行結束 返回countDownLatch.await();System.out.println( "子線程業務運行Over,主線程繼續工作");executorService.shutdown();} }
如上代碼中,
-
創建了一個CountDownLatch實例,因為有兩個子線程所以構造函數的傳參為2。
-
主線程調用countDownLatch.await()方法后會被阻塞。
-
子線程執行完畢后調用countDownLatch.countDown()方法讓countDownLatch內部的計數器減1
-
所有子線程執行完畢并調用countDown()方法后計數器會變為0,這時候主線程的await()方法才會返回。
CountDownLatch VS join方法
-
調用一個子線程的join()方法后,該線程會一直被阻塞直到子線程運行完畢
-
而CountDownLatch則使用計數器來允許子線程運行完畢或者在運行中遞減計數,也就是CountDownLatch可以在子線程運行的任何時候讓await方法返回而不一定必須等到線程結束
-
另外,使用線程池來管理線程時一般都是直接添加Runable到線程池,這時候就沒有辦法再調用線程的join方法了,就是說countDownLatch相比join方法讓我們對線程同步有更靈活的控制
類圖關系
從類圖可以看出,CountDownLatch是使用AQS實現的。
通過下面的構造函數, 實際上是把計數器的值賦給了AQS的狀態變量state,也就是這里使用AQS的狀態值來表示計數器值。
/*** Constructs a {@code CountDownLatch} initialized with the given count.** @param count the number of times {@link #countDown} must be invoked* before threads can pass through {@link #await}* @throws IllegalArgumentException if {@code count} is negative*/public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);} Sync(int count) {setState(count);}int getCount() {return getState();}核心方法&源碼解析
接下來分析CountDownLatch中的幾個重要的方法,看它們是如何調用AQS來實現功能的。
void await()
當線程調用CountDownLatch對象的await方法后,當前線程會被阻塞,直到下面的情況之一發生才會返回
-
當所有線程都調用了CountDownLatch對象的countDown方法后,也就是計數器的值為0時
-
其他線程調用了當前線程的interrupt()方法中斷了當前線程,當前線程就會拋出InterruptedException異常,然后返回
await()方法委托sync調用了AQS的acquireSharedInterruptibly方法
// AQS獲取共享資源時響應中斷的方法public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 響應中斷 if (Thread.interrupted())throw new InterruptedException();// 查看當前計數器是否為0 ,為0 直接返回,否則進入AQS隊列等待 if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);} protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}-
由如上代碼可知,該方法的特點是線程獲取資源時可以被中斷,并且獲取的資源是共享資源。
-
acquireSharedInterruptibly首先判斷當前線程是否已被中斷,若是則拋出異常,否則調用sync實現的tryAcquireShared方法查看當前狀態值(計數器值)是否為0,是則當前線程的await()方法直接返回,否則調用AQS的doAcquireSharedInterruptibly方法讓當前線程阻塞。
-
另外可以看到,這里tryAcquireShared傳遞的arg參數沒有被用到,調用tryAcquireShared的方法僅僅是為了檢查當前狀態值是不是為0,并沒有調用CAS讓當前狀態值減1。
boolean await(long timeout, TimeUnit unit)
當線程調用了CountDownLatch對象的該方法后,當前線程會被阻塞,直到下面的情況之一發生才會返回
-
當所有線程都調用了CountDownLatch對象的countDown方法后,也就是計數器值為0時,這時候會返回true
-
設置的timeout時間到了,因為超時而返回false
-
其他線程調用了當前線程的interrupt()方法中斷了當前線程,當前線程會拋出InterruptedException異常,然后返回
void countDown()
線程調用該方法后,計數器的值遞減,遞減后如果計數器值為0則喚醒所有因調用await方法而被阻塞的線程,否則什么都不做
下面看下countDown()方法是如何調用AQS的方法的。
/*** Decrements the count of the latch, releasing all waiting threads if* the count reaches zero.** <p>If the current count is greater than zero then it is decremented.* If the new count is zero then all waiting threads are re-enabled for* thread scheduling purposes.** <p>If the current count equals zero then nothing happens.*/public void countDown() {// 委托調用AQS的releaseSharedsync.releaseShared(1);}AQS的方法
/*** Releases in shared mode. Implemented by unblocking one or more* threads if {@link #tryReleaseShared} returns true.** @param arg the release argument. This value is conveyed to* {@link #tryReleaseShared} but is otherwise uninterpreted* and can represent anything you like.* @return the value returned from {@link #tryReleaseShared}*/public final boolean releaseShared(int arg) {// 調用syn實現的tryReleaseSharedif (tryReleaseShared(arg)) {// AQS釋放資源的方法doReleaseShared();return true;}return false;}在如上代碼中,releaseShared首先調用了sync實現的AQS的tryReleaseShared方法,代碼如下
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zero// 循環進行CAS, 直到當前線程成功彎沉CAS使計數器值(狀態值state)減一 并更新statefor (;;) {int c = getState();// 1 如果狀態值為0 ,則直接返回if (c == 0)return false;// 2 使用CAS讓計數器減1 int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}如上代碼
-
首先獲取當前狀態值(計數器值)。
-
代碼(1)判斷如果當前狀態值為0則直接返回false,從而countDown()方法直接返回
-
否則執行代碼(2)使用CAS將計數器值減1,CAS失敗則循環重試,否則如果當前計數器值為0則返回true,返回true說明是最后一個線程調用的countdown方法,那么該線程除了讓計數器值減1外,還需要喚醒因調用CountDownLatch的await方法而被阻塞的線程,具體是調用AQS的doReleaseShared方法來激活阻塞的線程
-
這里代碼(1)貌似是多余的,其實不然,之所以添加代碼(1)是為了防止當計數器值為0后,其他線程又調用了countDown方法,如果沒有代碼(1),狀態值就可能會變成負數。
long getCount()
獲取當前計數器的值,也就是AQS的state的值,一般在測試時使用該方法
/*** Returns the current count.** <p>This method is typically used for debugging and testing purposes.** @return the current count*/public long getCount() {return sync.getCount();}在其內部還是調用了AQS的getState方法來獲取state的值(計數器當前值)
小結
CountDownLatch是使用AQS實現的。使用AQS的狀態變量state來存放計數器的值。
首先在初始化CountDownLatch時設置狀態值(計數器值),當多個線程調用countdown方法時實際是原子性遞減AQS的狀態值。
當線程調用await方法后當前線程會被放入AQS的阻塞隊列等待計數器為0再返回。其他線程調用countdown方法讓計數器值遞減1,當計數器值變為0時,當前線程還要調用AQS的doReleaseShared方法來激活由于調用await()方法而被阻塞的線程。
總結
以上是生活随笔為你收集整理的Java Review - 并发编程_ CountDownLatch原理源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Review - 并发编程_S
- 下一篇: Java Review - 并发编程_