多线程:了解一下ForkJoin、FutureTask、BlockingQueue
先了解一下這三種類
FutureTask
在介紹 Callable 時我們知道它可以有返回值,返回值通過 Future 進行封裝。FutureTask 實現(xiàn)了 RunnableFuture 接口,該接口繼承自 Runnable 和 Future 接口,這使得 FutureTask 既可以當(dāng)做一個任務(wù)執(zhí)行,也可以有返回值。
public class FutureTask<V> implements RunnableFuture<V> public interface RunnableFuture<V> extends Runnable, Future<V>FutureTask 可用于異步獲取執(zhí)行結(jié)果或取消執(zhí)行任務(wù)的場景。當(dāng)一個計算任務(wù)需要執(zhí)行很長時間,那么就可以用 FutureTask 來封裝這個任務(wù),主線程在完成自己的任務(wù)之后再去獲取結(jié)果。get()方法會阻塞當(dāng)前主線程,直到call()結(jié)束并返回。
public class FutureTaskExample {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {int result = 0;for (int i = 0; i < 100; i++) {Thread.sleep(10);result += i;}return result;}});Thread computeThread = new Thread(futureTask);computeThread.start();Thread otherThread = new Thread(() -> {System.out.println("other task is running...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});otherThread.start();System.out.println(futureTask.get());} } other task is running... 4950BlockingQueue
java.util.concurrent.BlockingQueue 接口有以下阻塞隊列的實現(xiàn):
- FIFO 隊列?:LinkedBlockingQueue、ArrayBlockingQueue(固定長度)
- 優(yōu)先級隊列?:PriorityBlockingQueue
提供了阻塞的 take() 和 put() 方法:如果隊列為空 take() 將阻塞,直到隊列中有內(nèi)容;如果隊列為滿 put() 將阻塞,直到隊列有空閑位置。
使用 BlockingQueue 實現(xiàn)生產(chǎn)者消費者問題
public class ProducerConsumer {private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);private static class Producer extends Thread {@Overridepublic void run() {try {queue.put("product");} catch (InterruptedException e) {e.printStackTrace();}System.out.print("produce..");}}private static class Consumer extends Thread {@Overridepublic void run() {try {String product = queue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.print("consume..");}} } public static void main(String[] args) {for (int i = 0; i < 2; i++) {Producer producer = new Producer();producer.start();}for (int i = 0; i < 5; i++) {Consumer consumer = new Consumer();consumer.start();}for (int i = 0; i < 3; i++) {Producer producer = new Producer();producer.start();} } produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..ForkJoin
主要用于并行計算中,和 MapReduce 原理類似,都是把大的計算任務(wù)拆分成多個小任務(wù)并行計算。
public class ForkJoinExample extends RecursiveTask<Integer> {private final int threshold = 5;private int first;private int last;public ForkJoinExample(int first, int last) {this.first = first;this.last = last;}@Overrideprotected Integer compute() {int result = 0;if (last - first <= threshold) {// 任務(wù)足夠小則直接計算for (int i = first; i <= last; i++) {result += i;}} else {// 拆分成小任務(wù)int middle = first + (last - first) / 2;ForkJoinExample leftTask = new ForkJoinExample(first, middle);ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);leftTask.fork();rightTask.fork();result = leftTask.join() + rightTask.join();}return result;} } public static void main(String[] args) throws ExecutionException, InterruptedException {ForkJoinExample example = new ForkJoinExample(1, 10000);ForkJoinPool forkJoinPool = new ForkJoinPool();Future result = forkJoinPool.submit(example);System.out.println(result.get()); }ForkJoin 使用 ForkJoinPool 來啟動,它是一個特殊的線程池,線程數(shù)量取決于 CPU 核數(shù)。
public class ForkJoinPool extends AbstractExecutorServiceForkJoinPool 實現(xiàn)了工作竊取算法來提高 CPU 的利用率。每個線程都維護了一個雙端隊列,用來存儲需要執(zhí)行的任務(wù)。工作竊取算法允許空閑的線程從其它線程的雙端隊列中竊取一個任務(wù)來執(zhí)行。竊取的任務(wù)必須是最晚的任務(wù),避免和隊列所屬線程發(fā)生競爭。例如下圖中,Thread2 從 Thread1 的隊列中拿出最晚的 Task1 任務(wù),Thread1 會拿出 Task2 來執(zhí)行,這樣就避免發(fā)生競爭。但是如果隊列中只有一個任務(wù)時還是會發(fā)生競爭。
總結(jié)
以上是生活随笔為你收集整理的多线程:了解一下ForkJoin、FutureTask、BlockingQueue的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 多线程:并发实现方法之J.U.C
- 下一篇: String为什么是不可变类型?