多线程:并发实现方法之J.U.C
java.util.concurrent(J.U.C)大大提高了并發(fā)性能。
AQS 被認(rèn)為是 J.U.C 的核心。
什么是AQS?
?AQS是AbstractQueuedSynchronizer的簡稱。AQS提供了一種實(shí)現(xiàn)阻塞鎖和一系列依賴FIFO等待隊(duì)列的同步器的框架。
CountdownLatch
什么是CountdownLatch
用來控制一個線程等待多個線程。
維護(hù)了一個計(jì)數(shù)器 cnt,每次調(diào)用 countDown() 方法會讓計(jì)數(shù)器的值減 1,減到 0 的時候,那些因?yàn)檎{(diào)用 await() 方法而在等待的線程就會被喚醒。
?
public class CountdownLatchExample {public static void main(String[] args) throws InterruptedException {final int totalThread = 10;CountDownLatch countDownLatch = new CountDownLatch(totalThread);ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < totalThread; i++) {executorService.execute(() -> {System.out.print("run..");countDownLatch.countDown();});}countDownLatch.await();System.out.println("end");executorService.shutdown();} } run..run..run..run..run..run..run..run..run..run..endCyclicBarrier
用來控制多個線程互相等待,只有當(dāng)多個線程都到達(dá)時,這些線程才會繼續(xù)執(zhí)行。
和 CountdownLatch 相似,都是通過維護(hù)計(jì)數(shù)器來實(shí)現(xiàn)的。線程執(zhí)行 await() 方法之后計(jì)數(shù)器會減 1,并進(jìn)行等待,直到計(jì)數(shù)器為 0,所有調(diào)用 await() 方法而在等待的線程才能繼續(xù)執(zhí)行。
CyclicBarrier 和 CountdownLatch 的一個區(qū)別是,CyclicBarrier 的計(jì)數(shù)器通過調(diào)用 reset() 方法可以循環(huán)使用,所以它才叫做循環(huán)屏障。
CyclicBarrier 有兩個構(gòu)造函數(shù),其中 parties 指示計(jì)數(shù)器的初始值,barrierAction 在所有線程都到達(dá)屏障的時候會執(zhí)行一次。
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction; }public CyclicBarrier(int parties) {this(parties, null); }當(dāng)10個線程都就位之后,打印HELLO。所有線程繼續(xù)向下執(zhí)行。
public class CyclicBarrierExample {public static void main(String[] args) {final int totalThread = 10;CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread,()->{System.out.print("HELLO..");});ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < totalThread; i++) {executorService.execute(() -> {System.out.print("before..");try {cyclicBarrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}System.out.print("after..");});}executorService.shutdown();} } before..before..before..before..before..before..before..before..before..before..HELLO..after..after..after..after..after..after..after..after..after..after..Semaphore
Semaphore 類似于操作系統(tǒng)中的信號量,可以控制對互斥資源的訪問線程數(shù)。
以下代碼模擬了對某個服務(wù)的并發(fā)請求,每次只能有 3 個客戶端同時訪問,請求總數(shù)為 10。
public class SemaphoreExample {public static void main(String[] args) {final int clientCount = 3;final int totalRequestCount = 10;Semaphore semaphore = new Semaphore(clientCount);ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < totalRequestCount; i++) {executorService.execute(()->{try {semaphore.acquire();System.out.print(semaphore.availablePermits() + " ");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();}});}executorService.shutdown();} }?
FutureTask
在介紹 Callable 時我們知道它可以有返回值,返回值通過 Future 進(jìn)行封裝。
FutureTask 實(shí)現(xiàn)了 RunnableFuture 接口,該接口繼承自 Runnable 和 Future 接口。這使得 FutureTask 既可以當(dāng)做一個任務(wù)執(zhí)行,也可以有返回值。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
FutureTask 可用于異步獲取執(zhí)行結(jié)果或取消執(zhí)行任務(wù)的場景。
當(dāng)一個計(jì)算任務(wù)需要執(zhí)行很長時間,那么就可以用 FutureTask 來封裝這個任務(wù),主線程在完成自己的任務(wù)之后再去獲取結(jié)果。
get()方法會阻塞當(dāng)前主線程,直到call()結(jié)束并返回。
也就是說,在一定程度上FutureTask可以實(shí)現(xiàn)異步任務(wù)處理!
public class FutureTaskExample {
?
? ? public static void main(String[] args) throws ExecutionException, InterruptedException {
? ? ? ? FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public Integer call() throws Exception {
? ? ? ? ? ? ? ? int result = 0;
? ? ? ? ? ? ? ? for (int i = 0; i < 100; i++) {
? ? ? ? ? ? ? ? ? ? Thread.sleep(10);
? ? ? ? ? ? ? ? ? ? result += i;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? return result;
? ? ? ? ? ? }
? ? ? ? });
? ? ? ? Thread computeThread = new Thread(futureTask);
? ? ? ? computeThread.start();
? ? ? ? Thread otherThread = new Thread(() -> {
? ? ? ? ? ? System.out.println("other task is running...");
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? });
? ? ? ? otherThread.start();
? ? ? ? System.out.println(futureTask.get());
? ? }
}
?
?
other task is running...
4950
BlockingQueue
java.util.concurrent.BlockingQueue 接口有以下阻塞隊(duì)列的實(shí)現(xiàn):
FIFO 隊(duì)列?:LinkedBlockingQueue、ArrayBlockingQueue(固定長度)
優(yōu)先級隊(duì)列?:PriorityBlockingQueue。它是無界阻塞隊(duì)列,容量是無限的,?它是線程安全的,是阻塞的。
提供了阻塞的 take() 和 put() 方法:如果隊(duì)列為空 take() 將阻塞,直到隊(duì)列中有內(nèi)容;如果隊(duì)列為滿 put() 將阻塞,直到隊(duì)列有空閑位置。
使用 BlockingQueue 實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問題
public class ProducerConsumer {
?
? ? private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
?
? ? private static class Producer extends Thread {
? ? ? ? @Override
? ? ? ? public void run() {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? queue.put("product");
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? ? ? System.out.print("produce..");
? ? ? ? }
? ? }
?
? ? private static class Consumer extends Thread {
?
? ? ? ? @Override
? ? ? ? public void run() {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? String product = queue.take();
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? ? ? System.out.print("consume..");
? ? ? ? }
? ? }
}
public static void main(String[] args) {
? ? for (int i = 0; i < 2; i++) {
? ? ? ? Producer producer = new Producer();
? ? ? ? producer.start();
? ? }
? ? for (int i = 0; i < 5; i++) {
? ? ? ? Consumer consumer = new Consumer();
? ? ? ? consumer.start();
? ? }
? ? for (int i = 0; i < 3; i++) {
? ? ? ? Producer producer = new Producer();
? ? ? ? producer.start();
? ? }
}
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..
ForkJoin
主要用于并行計(jì)算中,和 MapReduce 原理類似,都是把大的計(jì)算任務(wù)拆分成多個小任務(wù)并行計(jì)算。
public class ForkJoinExample extends RecursiveTask<Integer> {
?
? ? private final int threshold = 5;
? ? private int first;
? ? private int last;
?
? ? public ForkJoinExample(int first, int last) {
? ? ? ? this.first = first;
? ? ? ? this.last = last;
? ? }
?
? ? @Override
? ? protected Integer compute() {
? ? ? ? int result = 0;
? ? ? ? if (last - first <= threshold) {
? ? ? ? ? ? // 任務(wù)足夠小則直接計(jì)算
? ? ? ? ? ? for (int i = first; i <= last; i++) {
? ? ? ? ? ? ? ? result += i;
? ? ? ? ? ? }
? ? ? ? } else {
? ? ? ? ? ? // 拆分成小任務(wù)
? ? ? ? ? ? int middle = first + (last - first) / 2;
? ? ? ? ? ? ForkJoinExample leftTask = new ForkJoinExample(first, middle);
? ? ? ? ? ? ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
? ? ? ? ? ? leftTask.fork();
? ? ? ? ? ? rightTask.fork();
? ? ? ? ? ? result = leftTask.join() + rightTask.join();
? ? ? ? }
? ? ? ? return result;
? ? }
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
? ? ForkJoinExample example = new ForkJoinExample(1, 10000);
? ? ForkJoinPool forkJoinPool = new ForkJoinPool();
? ? Future result = forkJoinPool.submit(example);
? ? System.out.println(result.get());
}
ForkJoin 使用 ForkJoinPool 來啟動,它是一個特殊的線程池,線程數(shù)量取決于 CPU 核數(shù)。
public class ForkJoinPool extends AbstractExecutorService
ForkJoinPool 實(shí)現(xiàn)了工作竊取算法來提高 CPU 的利用率。每個線程都維護(hù)了一個雙端隊(duì)列,用來存儲需要執(zhí)行的任務(wù)。工作竊取算法允許空閑的線程從其它線程的雙端隊(duì)列中竊取一個任務(wù)來執(zhí)行。竊取的任務(wù)必須是最晚的任務(wù),避免和隊(duì)列所屬線程發(fā)生競爭。例如下圖中,Thread2 從 Thread1 的隊(duì)列中拿出最晚的 Task1 任務(wù),Thread1 會拿出 Task2 來執(zhí)行,這樣就避免發(fā)生競爭。但是如果隊(duì)列中只有一個任務(wù)時還是會發(fā)生競爭。
除了 RecursiveAction,Fork/Join 框架還提供了其他 ForkJoinTask 子類:帶有返回值的 RecursiveTask。從 RecursiveTask 繼承的子類同樣需要重載 protected void compute() 方法。與 RecursiveAction 稍有不同的是,它可使用泛型指定一個返回值的類型。下面,我們來看看如何使用 RecursiveTask 的子類。
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的多线程:并发实现方法之J.U.C的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 多线程:线程之间的协作(join、wai
- 下一篇: 多线程:了解一下ForkJoin、Fut