java高并发(十三)并发容器J.U.C--AQS
AbstractQueueSynchronizer (AQS)
J.U.C 大大提高了java并發的性能,而AQS則是J.U.C的核心。
AQS底層使用雙向列表(隊列的一種實現)。
- 使用Node實現FIFO隊列,可以用于構建鎖或者其他同步裝置的基礎框架
- 利用了一個int類型表示狀態。?在AQS中有一個status的成員變量,基于AQS有一個同步組件ReentrantLock,在這個ReentrantLock中status表示獲取鎖的線程數,例如status=0表示還沒有線程獲取鎖,status=1表示已經有線程獲取了鎖,status>1表示重入鎖的數量。
- 使用方法:繼承
- 子類通過繼承并通過實現它的方法管理其狀態{acquire和release}的方法操縱狀態
- 可以同時實現排它鎖和共享鎖模式(獨占,共享)
AQS同步組件
- countdownLatch,閉鎖,通過一個計數來保證線程是否需要一直阻塞。
- semaphore,控制同一時間并發線程的數量。
- cyclicbarrier,與countdownlatch很像,都能阻塞進程。
- reentrantlock
- condition
- futuretask
countdownlatch?
是一個同步輔助類,通過他可以實現類似于阻塞當前線程的功能。一個線程或多個線程一直等待,直到其他線程操作完成,countdownlatch用了一個給定的計數器來進行初始化,該計數器的操作是原子操作,也就是同時只能有一個線程操作該計數器。調用該類的await()方法則會一直處于阻塞狀態,直到其他線程調用countdown()方法,每次調用countdown()方法會使得計數器的值減1,當計數器的值減為0時,所有因調用await方法處于等待狀態的線程就會繼續往下執行。這種狀態只會出現一次,因為這里的計數器是不能被重置的,如果業務上需要一個可以重置計數次數的版本,可以考慮使用cyclicbarrier。
使用場景
在某些業務場景中,程序執行需要等到某個條件完成后才能繼續執行后續的操作,典型的應用例如并行計算:當某個處理的運算量很大時,可以將該運算任務拆分多個子任務,等待所有的子任務都完成之后,父任務拿到所有的子任務的運行結果進行匯總。
下面舉例countdownlatch的基本用法:
@Slf4j public class CountDownLatchExample1 {private final static int threadCount = 200;public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();final CountDownLatch countDownLatch = new CountDownLatch(threadCount);for(int i = 0; i< threadCount; i++) {final int threadNum = i;executorService.execute(() ->{try {test(threadNum);} catch (InterruptedException e) {log.error("exception", e);}finally {countDownLatch.countDown();}});}//可以保證之前的線程都執行完成countDownLatch.await();log.info("finish");executorService.shutdown();}private static void test(int threadNum) throws InterruptedException {Thread.sleep(100);log.info("{}", threadNum);Thread.sleep(100);} }一個復雜的場景:我們開了很多個線程去完成一個任務,但是這個任務需要在指定的時間內完成,如果超過一定的時間沒有完成則放棄該任務。
@Slf4j public class CountDownLatchExample2 {private final static int threadCount = 200;public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();final CountDownLatch countDownLatch = new CountDownLatch(threadCount);for(int i = 0; i< threadCount; i++) {final int threadNum = i;executorService.execute(() ->{try {test(threadNum);} catch (InterruptedException e) {log.error("exception", e);}finally {countDownLatch.countDown();}});}//可以保證之前的線程都執行完成countDownLatch.await(10, TimeUnit.MILLISECONDS);log.info("finish");// 第一時間內并不會把所有線程都銷毀,而是讓當前已有線程執行完之后在把線程池銷毀。executorService.shutdown();}private static void test(int threadNum) throws InterruptedException {Thread.sleep(100);log.info("{}", threadNum);} }semaphore 信號量
可以控制某個資源可被同時訪問的個數,與countdownlatch有些類似,提供了兩個核心方法:aquire和release。aquire表示獲取一個許可,如果沒有則等待,release表示操作完成后釋放一個許可。semaphore維護了當前訪問的個數,提供同步機制控制訪問的個數。
使用場景?
常用于僅能提供有限訪問的資源例如數據庫連接數是有限的,而上層應用的并發數會遠遠大于連接數,如果同時對數據庫進行操作可能出現因為無法獲取數據庫連接而導致異常。這時可以通過信號量semaphore來并發訪問控制。當semaphore把并發數控制到1時就跟單線程運行很相似了。
舉例如下:
@Slf4j public class SemaphoreExample1 {private final static int threadCount = 20;public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();//允許的并發數final Semaphore semaphore = new Semaphore(3);for(int i = 0; i< threadCount; i++) {final int threadNum = i;executorService.execute(() ->{try {// 獲取一個許可semaphore.acquire();test(threadNum);// 釋放一個許可semaphore.release();} catch (InterruptedException e) {log.error("exception", e);}});}log.info("finish");executorService.shutdown();}private static void test(int threadNum) throws InterruptedException {log.info("{}", threadNum);Thread.sleep(1000);} }運行結果可以看到同時3個線程在執行。
也可以獲得多個許可:
@Slf4j public class SemaphoreExample2 {private final static int threadCount = 20;public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();//允許的并發數final Semaphore semaphore = new Semaphore(3);for(int i = 0; i< threadCount; i++) {final int threadNum = i;executorService.execute(() ->{try {// 獲取多個許可semaphore.acquire(3);test(threadNum);// 釋放多個許可semaphore.release(3);} catch (InterruptedException e) {log.error("exception", e);}});}log.info("finish");executorService.shutdown();}private static void test(int threadNum) throws InterruptedException {log.info("{}", threadNum);Thread.sleep(1000);} }每一次獲取三個許可,而同時只允許3個并發數,相當于單線程在運行。
@Slf4j public class SemaphoreExample3 {private final static int threadCount = 20;public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();//允許的并發數final Semaphore semaphore = new Semaphore(3);for (int i = 0; i < threadCount; i++) {final int threadNum = i;executorService.execute(() -> {try {// 嘗試獲取一個許可if (semaphore.tryAcquire()) {test(threadNum);// 釋放一個許可semaphore.release();}} catch (InterruptedException e) {log.error("exception", e);}});}log.info("finish");executorService.shutdown();}private static void test(int threadNum) throws InterruptedException {log.info("{}", threadNum);Thread.sleep(1000);} }輸出結果:
15:24:21.098 [pool-1-thread-1] INFO com.vincent.example.aqs.SemaphoreExample3 - 0 15:24:21.098 [pool-1-thread-2] INFO com.vincent.example.aqs.SemaphoreExample3 - 1 15:24:21.098 [main] INFO com.vincent.example.aqs.SemaphoreExample3 - finish 15:24:21.098 [pool-1-thread-3] INFO com.vincent.example.aqs.SemaphoreExample3 - 2因為我們往線程池中放了二十個請求,二十個請求在同一時間內都會嘗試去執行,semaphore會嘗試讓每個線程去獲取許可,而同一時刻內我們的并發數是3,也就是只有三個線程獲取到了許可,而test方法內有Thread.sleep(1000),因此其余17個線程都不能拿到許可,直接結束。
semaphore.tryAcquire(3, TimeUnit.SECONDS)表示可以等3秒,如果3秒內沒拿到許可就結束。
CyclicBarrier
也是一個同步輔助類,允許一組線程相互等待,直到到達某個公共的屏障點??梢酝瓿啥鄠€線程之間相互等待,只有當每個線程都準備就緒后,才能各自繼續往下執行謀面的操作。它和countdownlatch有相似的地方,都是通過計數器來實現的,當一個線程調用await()方法后,該線程就進入了等待狀態。當循環計數器的值達到設置的初始值之后,進入等待狀態的線程會被喚醒,繼續執行后續操作。因為CyclicBarrier在釋放等待線程后可以重用,所以稱他為循環屏障。
CyclicBarrier的使用場景與countdownlatch類似,CyclicBarrier可以用于多線程計算數據,最后合并計算結果的應用場景。
CyclicBarrier與Countdownlatch的區別:
- countdownlatch的計數器只能使用一次,CyclicBarrier可以使用reset方法重復使用
- countdownlatch主要是實現一個或n個線程需要等待其他線程完成某項操作之后,才能繼續往下執行,他描述的是1個或n個線程等待其他線程的關系。而CyclicBarrier主要實現了多個線程之間相互等待,直到所有線程都滿足了條件之后才能繼續執行后續的操作,它描述的是各個線程內部相互等待的關系。所以CyclicBarrier可以處理更復雜的業務場景,例如計數器發生錯誤可以重置計數器,讓線程重新執行一次。
總結
以上是生活随笔為你收集整理的java高并发(十三)并发容器J.U.C--AQS的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java高并发(十二)并发容器J.U.C
- 下一篇: java高并发(十四)ReetrantL