Java 中的并发工具类
From: https://blog.wuwii.com/juc-utils.html
java.util.concurrent 下提供了一些輔助類來幫助我們在并發(fā)編程的設計。
學習了 AQS 后再了解這些工具類,就非常簡單了。
jdk 1.8
等待多線程完成的CountDownLatch
在 concurrent 包下面提供了 CountDownLatch 類,它提供了計數(shù)器的功能,能夠實現(xiàn)讓一個線程等待其他線程執(zhí)行完畢才能進入運行狀態(tài)。
源碼分析
首先看下最關鍵的地方它的自定義同步器的實現(xiàn),非常簡單:
-  private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // 1. 初始化 state 資源變量 Sync(int count) { setState(count); } int getCount() { return getState(); } // 嘗試獲取貢獻模式下的資源, // 定義返回值小于 0 的時候獲取資源失敗 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 自旋。 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; // 每次釋放資源,硬編碼減一個資源 if (compareAndSetState(c, nextc)) return nextc == 0; // 知道為 0 的時候才釋放成功,也就是所有線程必須都執(zhí)行釋放操作說明才釋放成功。 } } } 在這里查看構造器的源碼得知,CountDownLatch 內(nèi)部使用的是 內(nèi)部類Sync 繼承了 AQS ,將我們傳入進來的 count 數(shù)值當作 AQS state。感覺這個是不是和可重入鎖實現(xiàn)是一樣的,只不過開始指定了線程獲取的鎖的次數(shù)。 在上面我也發(fā)現(xiàn)了幾個特點,第一次看這個代碼其實還是不好理解,因為它相對前面的 AQS 和 TwinsLock 就是一個反著設計的代碼: 
- 首先獲取資源的時候,線程全部都是先進入等待隊列,而且在這一步驟中,不改變 state 資源的數(shù)量;
- 釋放資源的時候,每次固定減少一個資源,直到資源為 0 的時候才表示釋放資源成功,所以加入我們有 5 個資源,但是只有四個線程執(zhí)行,如果只釋放四次(總共執(zhí)行 countDown 四次),就永遠也釋放不成功,await 一直在阻塞。
- 經(jīng)過上面的分析,發(fā)現(xiàn)了 state 的資源數(shù)量每次進行 countDown 都去減少一個,沒有方法去增加數(shù)量,所以它是不可逆的,它的計數(shù)器是不可以重復使用的。
-  看下 await 的實現(xiàn),發(fā)現(xiàn)它最終實現(xiàn)的是 doAcquireSharedInterruptibly : 
| // 仔細看這個代碼,和前面的共享模式中的 doAcquireShared 方法基本一摸一樣,只不過是當它遇到線程中斷信號的時候,立刻拋出中斷異常,仔細想想也是的,比如,自己在這里等別人吃飯,不想等了,也懶得管別人做什么了,剩下的吃飯的事情也沒必要繼續(xù)下去了。 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } // 需要注意的是它重寫了嘗試獲取資源的方法,當資源全部消耗完,才能夠讓你去獲取資源,現(xiàn)在才豁然開朗,await 阻塞的線程就是這么被喚醒的。 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } | 
使用場景
CountDownLatch允許一個或多個線程等待其他線程完成操作。
比如經(jīng)典問題:
有Thread1、Thread2、Thread3、Thread4四條線程分別統(tǒng)計C、D、E、F四個盤的大小,所有線程都統(tǒng)計完畢交給Thread5線程去做匯總,應當如何實現(xiàn)?
這個問題關鍵就是要知道四條線程何時執(zhí)行完。
下面是我的解決思路:
| /** * 如有Thread1、Thread2、Thread3、Thread4四條線程分別統(tǒng)計C、D、E、F四個盤的大小, * 所有線程都統(tǒng)計完畢交給Thread5線程去做匯總,應當如何實現(xiàn)? * * Created by KronChan on 2018/5/14 17:00. */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { // 初始化計數(shù)器,設置總量i,調(diào)用一次countDown()方法后i的值會減1。 // 在一個線程中如果調(diào)用了await()方法,這個線程就會進入到等待的狀態(tài),當參數(shù)i為0的時候這個線程才繼續(xù)執(zhí)行。 final CountDownLatch countDownLatch = new CountDownLatch(4); Runnable thread1 = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("統(tǒng)計 C 盤大小"); } catch (InterruptedException e) { e.printStackTrace(); } // 統(tǒng)計完成計數(shù)器 -1 countDownLatch.countDown(); }; Runnable thread2 = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("統(tǒng)計 D 盤大小"); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); }; Runnable thread3 = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("統(tǒng)計 E 盤大小"); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); }; Runnable thread4 = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("統(tǒng)計 F 盤大小"); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); }; ExecutorService pool = Executors.newFixedThreadPool(4); pool.execute(thread1); pool.execute(thread2); pool.execute(thread3); pool.execute(thread4); // 等待 i 值為 0 ,等待四條線程執(zhí)行完畢。 countDownLatch.await(); System.out.println("統(tǒng)計完成"); pool.shutdown(); } } | 
同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運行。
源碼分析
屬性
| private final ReentrantLock lock = new ReentrantLock(); // 線程協(xié)作 private final Condition trip = lock.newCondition(); // 必須同時到達barrier的線程個數(shù)。 private final int parties; // parties個線程到達barrier時,會執(zhí)行的動作,會讓到達屏障中的任意一個線程去執(zhí)行這個動作。 private final Runnable barrierCommand; // 控制屏障的循環(huán)使用,它是可重復使用的,每次使用CyclicBarrier,本次所有線程同屬于一代,即同一個Generation private Generation generation = new Generation(); // 處在等待狀態(tài)的線程個數(shù)。 private int count; | 
主要的方法
構造函數(shù)
| public CyclicBarrier(int parties) { this(parties, null); } // 構造函數(shù)主要實現(xiàn)了,設置一組線程的數(shù)量,到達屏障時候的臨界點,可以設置到達屏障的時候需要處理的動作,后面屏障允許它們通過。 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } | 
await
| public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 獨占鎖 lock.lock(); try { // 保存當前的generation final Generation g = generation; // generation broken,不允許使用,則拋出異常。 if (g.broken) throw new BrokenBarrierException(); // 如果當前線程被中斷,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 等待的計數(shù)器減一 int index = --count; // 如果計數(shù)器的 count 正好為0, 說明已經(jīng)有parties個線程到達barrier了。執(zhí)行預定的Runnable任務后,更新?lián)Q代,準備下一次使用。 if (index == 0) { // tripped boolean ranAction = false; try { // 如果barrierCommand不為null,則執(zhí)行該動作。 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 喚醒所有等待線程,并更新generation,準備下一次使用 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 當前線程一直阻塞, // 1. 有parties個線程到達barrier // 2. 當前線程被中斷 // 3. 超時 // 直到上面三者之一發(fā)生,就喚醒所有線程繼續(xù)執(zhí)行下去 // 自旋 for (;;) { try { // 如果不是超時等待,則調(diào)用awati()進行等待;否則,調(diào)用awaitNanos()進行等待。 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 如果等待過程中,線程被中斷,通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } // borken if (g.broken) throw new BrokenBarrierException(); // 如果generation已經(jīng)換代,則返回index。 if (g != generation) return index; // 超時,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); // 釋放獨占鎖 } } // barrier被broken后,調(diào)用breakBarrier方法,將generation.broken設置為true,并使用signalAll通知所有等待的線程。 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } | 
使用場景
CyclicBarrier可以用于多線程計算數(shù)據(jù),最后合并計算結果的場景,然后四條線程又可以分別去干自己的事情了。
現(xiàn)在我將上面的統(tǒng)計磁盤的任務 CountDownLatch 中改下,統(tǒng)計完統(tǒng)計最終后,每個線程要發(fā)出退出信號。
下面是我的實現(xiàn)代碼:
| public class CyclicBarrierDemo { public static void main(String[] args) { String[] drivers = {"C", "D", "E", "F"}; int length = drivers.length; ExecutorService pool = Executors.newFixedThreadPool(length); // 如果線程都到達barrier狀態(tài)后,會從四個線程中選擇一個線程去執(zhí)行Runnable。 CyclicBarrier cyclicBarrier = new CyclicBarrier(length, () -> { System.out.printf("%s 線程告訴你,統(tǒng)計完畢,待繼續(xù)執(zhí)行%n", Thread.currentThread().getName()); }); Stream.of(drivers).forEach((d) -> { pool.execute(new StatisticsDemo(d, cyclicBarrier)); }); pool.shutdown(); } static class StatisticsDemo implements Runnable { private String driveName; private CyclicBarrier cyclicBarrier; public StatisticsDemo(String driveName, CyclicBarrier cyclicBarrier) { this.driveName = driveName; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { TimeUnit.SECONDS.sleep((int) (Math.random() * 10)); System.out.printf("%s 線程統(tǒng)計 %s 盤大小%n", Thread.currentThread().getName(), driveName); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.printf("%s 準備退出%n", driveName); } } } | 
執(zhí)行結果:
| pool-1-thread-1 線程統(tǒng)計 C 盤大小 pool-1-thread-2 線程統(tǒng)計 D 盤大小 pool-1-thread-3 線程統(tǒng)計 E 盤大小 pool-1-thread-4 線程統(tǒng)計 F 盤大小 pool-1-thread-4 線程告訴你,統(tǒng)計完畢,待繼續(xù)執(zhí)行 F 準備退出 E 準備退出 D 準備退出 C 準備退出 | 
控制并發(fā)線程數(shù)的Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量(許可證數(shù)),它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源。
源碼分析
構造函數(shù)
| public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } | 
具有公平鎖的特性,permits 指定許可數(shù)量,就是資源數(shù)量 state。
同步器的實現(xiàn)
| abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } // 非公平的方式獲取共享鎖 final int nonfairTryAcquireShared(int acquires) { for (;;) { // 獲取資源數(shù)量 int available = getState(); int remaining = available - acquires; // 本次請求獲取鎖需要的資源的數(shù)量 if (remaining < 0 || compareAndSetState(available, remaining)) // 如果資源足夠,嘗試 CAS 獲取鎖 return remaining; } } // 釋放鎖 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; // 釋放鎖的時候,返還資源 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) // CAS 操作,避免其他的線程也在釋放資源 return true; } } // 減少資源數(shù)量 final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } // 清空資源,返回歷史資源數(shù)量 final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } /** * NonFair version */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } /** * Fair version */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { // 同樣的公平鎖情況下,判斷該線程前面有沒有線程等待獲取鎖 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } | 
提供其他的方法
- availablePermits:獲取此信號量中當前可用的許可證數(shù)(還能有多少個線程執(zhí)行);
- drainPermits:立刻使用完所有可用的許可證;
- reducePermits:減少相應數(shù)量的許可證,是一個 protected 方法;
- isFair:是否是公平狀態(tài);
- hasQueuedThreads:等待隊列中是否有線程,等待獲取許可證;
- getQueueLength:等待隊列中等待獲取許可證的線程數(shù)量;
- getQueuedThreads:protected 方法,獲取等待隊列中的線程。
使用場景
Semaphore可以用于做流量控制,特別是公用資源有限的應用場景,比如我們有五臺機器,有十名工人,每個工人需要一臺機器才能工作,一名工人工作完了就可以休息了,機器讓其他沒工作過的工人使用。
下面是我的實現(xiàn)代碼:
| public class SemaphoreDemo { public static void main(String[] args) { int num = 10; Semaphore machines = new Semaphore(5); for (int i = 0; i < num; i++) { new Thread(new Worker(i, machines)).start(); } } static class Worker extends Thread { private Semaphore machines; private int worker; Worker(int worker, Semaphore semaphore) { this.worker = worker; this.machines = semaphore; } @Override public void run() { try { machines.acquire(); System.out.printf("工人 %d 開始使用機器工作了 %n", worker); TimeUnit.SECONDS.sleep((int) (Math.random() * 10)); System.out.printf("工人 %d 干完活了,讓出機器了%n", worker); machines.release(); } catch (Exception e) { e.printStackTrace(); } } } } | 
執(zhí)行一下結果:
| 工人 0 開始使用機器工作了 工人 4 開始使用機器工作了 工人 3 開始使用機器工作了 工人 2 開始使用機器工作了 工人 1 開始使用機器工作了 工人 1 干完活了,讓出機器了 工人 5 開始使用機器工作了 工人 5 干完活了,讓出機器了 工人 6 開始使用機器工作了 工人 2 干完活了,讓出機器了 工人 7 開始使用機器工作了 工人 4 干完活了,讓出機器了 工人 8 開始使用機器工作了 工人 0 干完活了,讓出機器了 工人 9 開始使用機器工作了 工人 8 干完活了,讓出機器了 工人 6 干完活了,讓出機器了 工人 3 干完活了,讓出機器了 工人 9 干完活了,讓出機器了 工人 7 干完活了,讓出機器了 | 
雖然上面有 10 個工人(線程)一起并發(fā),但是,它同時只有五個工人能夠是執(zhí)行的。
線程間交換數(shù)據(jù)的Exchanger
Exchanger(交換者)是一個用于線程間協(xié)作的工具類。Exchanger 用于兩個工作線程間的數(shù)據(jù)交換。
具體上來說,Exchanger類允許在兩個線程之間定義同步點。當兩個線程都到達同步點時,他們交換數(shù)據(jù)結構,因此第一個線程的數(shù)據(jù)進入到第二個線程中,第二個線程的數(shù)據(jù)進入到第一個線程中,這要就完成了一個“交易”的環(huán)節(jié)。
源碼分析
源碼很難看懂,主要還是
【死磕Java并發(fā)】—–J.U.C之并發(fā)工具類:Exchanger
使用場景
Exchanger 可以用于遺傳算法。遺傳算法里需要選出兩個人作為交配對象,這時候會交換兩人的數(shù)據(jù)。
下面做一個賣書買書的例子:
| public class ExchangerDemo { private static final Exchanger<String> EXCHANGER = new Exchanger<>(); private static final ExecutorService POOLS = Executors.newFixedThreadPool(2); public static void main(String[] args) throws InterruptedException { POOLS.execute(() -> { String bookName = "浮生六記"; System.out.printf("飯飯要賣一本%s。%n", bookName); try { String pay = EXCHANGER.exchange(bookName); System.out.printf("飯飯賣出一本%s賺了%s¥。%n", bookName, pay); } catch (InterruptedException e) { e.printStackTrace(); } }); TimeUnit.SECONDS.sleep(5); System.out.println("》》》》》》》》飯飯先到了交易地點 睡了 5 s,七巧來了"); System.out.println("》》》》》》》》準備交易"); POOLS.execute(() -> { String pay = "23"; try { String bookName = EXCHANGER.exchange(pay); System.out.printf("七巧付了%s¥買了一本%s。%n", pay, bookName); } catch (InterruptedException e) { e.printStackTrace(); } }); POOLS.shutdown(); for (; ; ) { if (POOLS.isTerminated()) { System.out.println("交易結束!"); return; } } } } | 
執(zhí)行結果:
| 飯飯要賣一本浮生六記。 》》》》》》》》飯飯先到了交易地點 睡了 5 s,七巧來了 》》》》》》》》準備交易 七巧付了23¥買了一本浮生六記。 飯飯賣出一本浮生六記賺了23¥。 交易結束! | 
總結
Exchanger主要完成的是兩個工作線程之間的數(shù)據(jù)交換,如果有一個線程沒有執(zhí)行 exchange()方法,則會一直等待。還可以設置最大等待時間exchange(V v, TimeUnit unit)
CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch的計數(shù)器只能使用一次,而CyclicBarrier的計數(shù)器可以使用reset()方法重置。所以CyclicBarrier能處理更為復雜的業(yè)務場景。例如,如果計算發(fā)生錯誤,可以重置計數(shù)器,并讓線程重新執(zhí)行一次。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得 CyclicBarrier阻塞的線程數(shù)量。isBroken()方法用來了解阻塞的線程是否被中斷。
參考文章
- 《Java 并發(fā)編程的藝術》
總結
以上是生活随笔為你收集整理的Java 中的并发工具类的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 一分钟了解c语言求开方sqrt函数
- 下一篇: JS逆向之企名科技
