Java并发7:并发工具类
CountDownLatch(閉鎖)
閉鎖允許一個或者多個線程等待其他線程都完成了才繼續執行。CountDownLatch 是一種閉鎖的實現,使得一個或多個線程等待一組事情發生。通過計數器表示需要等待的事件數量;使用countDown()方法將計數器減去1,表示有一個事件發生;使用await()方法阻塞當前線程,等待計數器為0,也就是所有需要等待的事情發生。
CountDownLatch 不能重新初始化或者修改內部計數器的值。
CountDownLatch 內部依賴內部類 Sync 實現,而 Sync 類繼承于 AQS。其內部通過共享鎖實現。
示例:
public class CountDownLatchTest {private static CountDownLatch countDownLatch = new CountDownLatch(5);/*** Boss線程,等待員工到達開會*/static class BossThread extends Thread{public void run() {System.out.println("Boss在會議室等待,總共有" + countDownLatch.getCount() + "個人開會...");try {//Boss等待countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("所有人都已經到齊了,開會吧...");}}//員工到達會議室static class EmpleoyeeThread extends Thread{public void run() {System.out.println(Thread.currentThread().getName() + ",到達會議室....");//員工到達會議室 count - 1countDownLatch.countDown();}}public static void main(String[] args){//Boss線程啟動new BossThread().start();for(int i = 0 ; i < countDownLatch.getCount() ; i++){new EmpleoyeeThread().start();}} }復制代碼運行結果:
Boss在會議室等待,總共有5個人開會... Thread-2,到達會議室.... Thread-3,到達會議室.... Thread-1,到達會議室.... Thread-5,到達會議室.... Thread-4,到達會議室.... 所有人都已經到齊了,開會吧... 復制代碼CycliBarrier(屏障)
閉鎖是一次性對象,一旦進入終止狀態,就不能被重置。屏障類似于閉鎖,阻塞一組進程直到某個事件發生。也就是讓一組線程到達一個屏障時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行。
CycliBarrier 內部使用了可重用鎖 ReentrantLock 和 Condition。線程執行await()方法后,計數器減1,進行等待,直到計數器為0,所有調用了await()方法的線程繼續執行。
CycliBarrier適用于多個線程結果合并的場景。
示例:
public class CyclicBarrierTest {private static CyclicBarrier cyclicBarrier;static class CyclicBarrierThread extends Thread{public void run() {System.out.println(Thread.currentThread().getName() + "到了");//等待try {cyclicBarrier.await();} catch (Exception e) {e.printStackTrace();}}}public static void main(String[] args){cyclicBarrier = new CyclicBarrier(5, new Runnable() {public void run() {System.out.println("人到齊了,開會吧....");}});for(int i = 0 ; i < 5 ; i++){new CyclicBarrierThread().start();}} } 復制代碼運行結果:
Thread-0到了 Thread-1到了 Thread-4到了 Thread-2到了 Thread-3到了 人到齊了,開會吧... 復制代碼閉鎖和屏障的比較
CountDownLatch 的計數器只能使用一次,CyclicBarrier 的計數器可以使用 reset() 方法重置。
閉鎖只會阻塞一條線程,目的是為了讓該條任務線程滿足條件后執行; 而同步屏障會阻塞所有線程,目的是為了讓所有線程同時執行。
Semaphore(信號量)
信號量用于控制同時訪問某個特定資源的操作數量,或者執行某個指定操作的數量。計數信號量還可以用來實現某種資源池,或者對容器施加邊界。
信號量可以用于實現資源池,也可以用于將容器變為有界阻塞容器。信號量管理著一組虛擬的許可,在執行操作時首先獲取許可,并在使用以后釋放許可。如果沒有許可,將阻塞直到有許可或被中斷,超時。
信號量的使用場景是,有m個資源,n個線程,且n>m,同一時刻只能允許m條線程訪問資源。
信號量內部同樣依賴于繼承自 AQS 的 Sync 類,且包含兩個子類,分別是 FairSync 和 NonfairSync。
信號量通過acquire()方法獲取許可;通過release()方法釋放許可。
示例:
public class SemaphoreCase {private static class Parking{private Semaphore semaphore;Parking(int count){semaphore=new Semaphore(count);}public void park(){try {semaphore.acquire();long time=(long) (Math.random()*10);System.out.println(Thread.currentThread().getName()+"進入停車場停車"+time+"秒");Thread.sleep(time);System.out.println(Thread.currentThread().getName()+"開出停車場");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();}}}private static class Car extends Thread{Parking parking;Car(Parking parking){this.parking=parking;}public void run() {parking.park();}}public static void main(String[] args) {Parking parking=new Parking(3);for(int i=0;i<5;i++){new Car(parking).start();}} }復制代碼運行結果為:
Thread-2進入停車場停車7秒 Thread-1進入停車場停車9秒 Thread-0進入停車場停車4秒 Thread-0開出停車場 Thread-3進入停車場停車4秒 Thread-2開出停車場 Thread-4進入停車場停車3秒 Thread-1開出停車場 Thread-3開出停車場 Thread-4開出停車場 復制代碼Exchanger(交換者)
Exchanger 是一個用于線程間協作的工具類,用于線程間的數據交換。它提供了一個同步點,在這個同步點,兩個線程可以交換彼此的數據。兩個線程通過exchange()方法交換數據,一個線程執行了該方法,會一直等待另一個線程執行該方法,當兩個線程都到達同步點,這兩個線程就可以交換數據。
示例:
public class ExchangerCase {private static class Producer implements Runnable{private List<String> buffer;private Exchanger<List<String>> exchanger;Producer(List<String> buffer,Exchanger<List<String>> exchanger){this.buffer=buffer;this.exchanger=exchanger;}public void run() {for(int i=1;i<5;i++){System.out.println("生產者生產次數:"+i);for(int j=1;j<=3;j++){System.out.println("生產者裝入"+i+"--"+j);buffer.add("buffer: "+i+"--"+j);}System.out.println("生產者裝滿,等待消費者交換");try {exchanger.exchange(buffer);} catch (InterruptedException e) {e.printStackTrace();}}}}private static class Consumer implements Runnable{private List<String> buffer;private Exchanger<List<String>> exchanger;Consumer(List<String> buffer,Exchanger<List<String>> exchanger){this.buffer=buffer;this.exchanger=exchanger;}public void run() {for(int i=1;i<5;i++){try {buffer=exchanger.exchange(buffer);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消費者消費次數:"+i);for(int j=1;j<=3;j++) {System.out.println("消費者消費" +buffer.get(0));buffer.remove(0);}}}}public static void main(String[] args) {List<String> buffer1 = new ArrayList<String>();List<String> buffer2 = new ArrayList<String>();Exchanger<List<String>> exchanger = new Exchanger<List<String>>();Thread producerThread = new Thread(new Producer(buffer1,exchanger));Thread consumerThread = new Thread(new Consumer(buffer2,exchanger));producerThread.start();consumerThread.start();} }復制代碼運行結果
生產者生產次數:1 生產者裝入1--1 生產者裝入1--2 生產者裝入1--3 生產者裝滿,等待消費者交換 生產者生產次數:2 生產者裝入2--1 生產者裝入2--2 生產者裝入2--3 生產者裝滿,等待消費者交換 消費者消費次數:1 消費者消費buffer: 1--1 消費者消費buffer: 1--2 消費者消費buffer: 1--3 消費者消費次數:2 消費者消費buffer: 2--1 消費者消費buffer: 2--2 生產者生產次數:3 生產者裝入3--1 消費者消費buffer: 2--3 生產者裝入3--2 生產者裝入3--3 生產者裝滿,等待消費者交換 生產者生產次數:4 生產者裝入4--1 生產者裝入4--2 消費者消費次數:3 消費者消費buffer: 3--1 生產者裝入4--3 生產者裝滿,等待消費者交換 消費者消費buffer: 3--2 消費者消費buffer: 3--3 消費者消費次數:4 消費者消費buffer: 4--1 消費者消費buffer: 4--2 消費者消費buffer: 4--3 復制代碼在Exchanger中,如果一個線程已經到達了exchanger節點時,對于它的伙伴節點的情況有三種:
- 如果它的伙伴節點在該線程到達之前已經調用了exchanger方法,則它會喚醒它的伙伴然后進行數據交換,得到各自數據返回。
- 如果它的伙伴節點還沒有到達交換點,則該線程將會被掛起,等待它的伙伴節點到達被喚醒,完成數據交換。
- 如果當前線程被中斷了則拋出異常,或者等待超時了,則拋出超時異常。
參考資料
- Java并發編程的藝術
- Java并發編程實戰
- cmsblogs.com/?p=2611
總結
以上是生活随笔為你收集整理的Java并发7:并发工具类的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: javascript提取标签之间的信息
- 下一篇: 腾讯从百度挖来的AI Lab负责人张潼离