Java8 - 一文搞定Fork/Join 框架
文章目錄
- 概述
- CPU密集型 vs IO密集型
- 計算密集型任務
- IO密集型
- 簡單示例
- Fork/Join常用的類
- RecursiveTask 實現 并行計算
- RecursiveAction
- Fork/Join執行流程
- 最佳實踐
概述
分支/合并框架的目的是以遞歸方式將可以并行的任務拆分成更小的任務,然后將每個子任務的結果合并起來生成整體結果。
它是 ExecutorService 接口的一個實現,它把子任務分配給線程池(稱為 ForkJoinPool )中的工作線程。
CPU密集型 vs IO密集型
通常來講,任務可以劃分為計算密集型和IO密集型
計算密集型任務
特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。
這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等于CPU的核心數。
計算密集型任務由于主要消耗CPU資源,因此,代碼運行效率至關重要。Python這樣的腳本語言運行效率很低,完全不適合計算密集型任務。對于計算密集型任務,最好用C語言編寫。
IO密集型
涉及到網絡、磁盤IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低于CPU和內存的速度)。
對于IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。
IO密集型任務執行期間,99%的時間都花在IO上,花在CPU上的時間很少 。
簡單示例
來看個最簡單的求和
public class ForkJoinTest {private static int[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};public static void main(String[] args) {System.out.println("result=> " + calc());System.out.println("result=> " + calcByStream());}private static int calc() {int result = 0;for (int i = 0; i < data.length; i++) {result += data[i];}return result;}private static Long calcByStream() {return LongStream.rangeClosed(0,10).reduce(0, Long::sum);}}Fork/Join常用的類
- ForkJoinTask:我們要使用 ForkJoin 框架,必須首先創建一個 ForkJoin 任務。它提供在任務中執行 fork() 和 join() 操作的機制 。
通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,Fork/Join 框架提供了以下兩個子類:
-
RecursiveAction:用于沒有返回結果的任務。 比如寫數據到磁盤,然后就退出了。 一個RecursiveAction可以把自己的工作分割成更小的幾塊, 這樣它們可以由獨立的線程或者CPU執行。 我們可以通過繼承來實現一個RecursiveAction
-
RecursiveTask :用于有返回結果的任務。 可以將自己的工作分割為若干更小任務,并將這些子任務的執行合并到一個集體結果。 可以有幾個水平的分割和合并
-
CountedCompleter: 在任務完成執行后會觸發執行一個自定義的鉤子函數
-
ForkJoinPool :ForkJoinTask 需要通過 ForkJoinPool 來執行,任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務。
RecursiveTask 實現 并行計算
要把任務提交到這個池,必須創建 RecursiveTask<R> 的一個子類,其中 R 是并行化任務(以及所有子任務)產生的結果類型,或者如果任務不返回結果,則是 RecursiveAction 類型 。
要定義 RecursiveTask, 只需實現它唯一的抽象方法compute
protected abstract R compute();這個方法同時定義了將任務拆分成子任務的邏輯,以及無法再拆分或不方便再拆分時,生成單個子任務結果的邏輯。
if (任務足夠小或不可分) {順序計算該任務 } else {將任務分成兩個子任務遞歸調用本方法,拆分每個子任務,等待所有子任務完成合并每個子任務的結果 }一般來說并沒有確?的標準決定一個任務是否應該再拆分,但有幾種試探方法可以幫助你
事實上,這只不過是著名的分治算法的并行版本而已。
現在編寫一個方法來并行對前n個自然數求和就很簡單了。你只需把想要的數字數組傳給ForkJoinSumCalculator 的構造函數:
public static long forkJoinSum(long n) {long[] numbers = LongStream.rangeClosed(1, n).toArray();ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);return new ForkJoinPool().invoke(task); }這里用了一個 LongStream 來生成包含前n個自然數的數組,然后創建一個 ForkJoinTask( RecursiveTask 的父類),并把數組傳遞給 ForkJoinSumCalculator 的公共構造函數。
最后,創建了一個新的 ForkJoinPool ,并把任務傳給它的調用方法 。在ForkJoinPool 中執行時,最后一個方法返回的值就是 ForkJoinSumCalculator 類定義的任務結果。
在實際應用時,使用多個 ForkJoinPool 是沒有什么意義的。正是出于這個原因,一般來說把它實例化一次,然后把實例保存在靜態字段中,使之成為單例,這樣就可以在軟件中任何部分方便地重用了。這里創建時用了其默認的無參數構造函數,這意味著想讓線程池使用JVM能夠使用的所有處理器。更確切地說,該構造函數將使用 Runtime.availableProcessors 的返回值來決定線程池使用的線程數。請注意 availableProcessors 方法雖然看起來是處理器,但它實際上返回的是可用內核的數量,包括超線程生成的虛擬內核。
當把 ForkJoinSumCalculator 任務傳給 ForkJoinPool 時,這個任務就由池中的一個線程執行,這個線程會調用任務的 compute 方法。
該方法會檢查任務是否小到足以順序執行,如果不夠小則會把要求和的數組分成兩半,分給兩個新的 ForkJoinSumCalculator ,而它們也由ForkJoinPool 安排執行。
因此,這一過程可以遞歸重復,把原任務分為更小的任務,直到滿足不方便或不可能再進一步拆分的條件(本例中是求和的項目數小于等于10 000)。
這時會順序計算每個任務的結果,然后由分支過程創建的(隱含的)任務二叉樹遍歷回到它的根。接下來會合并每個子任務的部分結果,從而得到總任務的結果。
package com.artisan.java8;import java.util.concurrent.RecursiveTask;public class AccumulatorRecursiveTask extends RecursiveTask<Integer> {private final int start;private final int end;private final int[] data;private final int LIMIT = 3;public AccumulatorRecursiveTask(int start, int end, int[] data) {this.start = start;this.end = end;this.data = data;}@Overrideprotected Integer compute() {if ((end - start) <= LIMIT) {int result = 0;for (int i = start; i < end; i++) {result += data[i];}return result;}int mid = (start + end) / 2;AccumulatorRecursiveTask left = new AccumulatorRecursiveTask(start, mid, data);AccumulatorRecursiveTask right = new AccumulatorRecursiveTask(mid, end, data);left.fork();Integer rightResult = right.compute();Integer leftResult = left.join();return rightResult + leftResult;} }
RecursiveAction
package com.artisan.java8;import java.util.concurrent.RecursiveAction; import java.util.concurrent.atomic.AtomicInteger;public class AccumulatorRecursiveAction extends RecursiveAction {private final int start;private final int end;private final int[] data;private final int LIMIT = 3;public AccumulatorRecursiveAction(int start, int end, int[] data) {this.start = start;this.end = end;this.data = data;}@Overrideprotected void compute() {if ((end - start) <= LIMIT) {for (int i = start; i < end; i++) {AccumulatorHelper.accumulate(data[i]);}} else {int mid = (start + end) / 2;AccumulatorRecursiveAction left = new AccumulatorRecursiveAction(start, mid, data);AccumulatorRecursiveAction right = new AccumulatorRecursiveAction(mid, end, data);left.fork();right.fork();left.join();right.join();}}static class AccumulatorHelper {private static final AtomicInteger result = new AtomicInteger(0);static void accumulate(int value) {result.getAndAdd(value);}public static int getResult() {return result.get();}static void rest() {result.set(0);}} }Fork/Join執行流程
最佳實踐
雖然分支/合并框架還算簡單易用,不幸的是它也很容易被誤用
總結
以上是生活随笔為你收集整理的Java8 - 一文搞定Fork/Join 框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解分布式技术 - 缓存高可用
- 下一篇: Java - Jackson JSON