并行数据处理与性能详解与ForkJoin框架
文章目錄
- 一、并行流
- 1、將順序流轉換成并行流
- 2、測量流的性能
- 二、分之/合并框架ForkJoinPool
- 1、使用RecursiveTask
- 三、Spliterator
本章節可以讓你用Stream接口不費力氣就能對數據集執行并行操作,可以聲明性的講順序流變成并行流。
一、并行流
Stream接口可以調用方法parallelStream很容易把集合轉換為并行流。所謂并行流就是把內容分成多個數據塊,用不同線程處理每塊數據。
1、將順序流轉換成并行流
可以將流轉換成并行流,調用方法parallel。例:
public static long parallelSum(long n) {return Stream.iterate(1L,i -> i+1).limit(n).parallel().reduce(0L,Long::sum);}如果從并行流變成順序流可以調用sequential這個方法完成。
stream.parallel() .filter(...) .sequential() .map(...) .parallel() .reduce();2、測量流的性能
按常理并行求和方法應該比迭代方法性能好。然而在軟件工程上,靠猜是絕對不行的,因此我們要進行實戰看結果。
public class ParallelStreams {public static long measureSumPerf(Function<Long, Long> adder, long n) {long fastest = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long start = System.nanoTime();long sum = adder.apply(n);long duration = (System.nanoTime() - start) / 1_000_000;System.out.println("Result: " + sum);if (duration < fastest) fastest = duration;}return fastest;}public static long iterativeSum(long n) {long result = 0;for (long i = 1L; i <= n; i++) {result += i;}return result;}public static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);}public static long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);}public static void main(String[] args) {System.out.println("Sequential sum done in:" +measureSumPerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");System.out.println("Iterative sum done in:" +measureSumPerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");System.out.println("Parallel sum done in: " +measureSumPerf(ParallelStreams::parallelSum, 10_000_000) + " msecs" );} }運行結果:
運行結果相當令人失望,求和方法并行是順序版本的將近10倍,為什么會出現這樣的結果,實際上有兩個問題:
1、iterate生成是裝箱的對象,必須拆箱才能求和。
2、很難把iterate分成多個獨立塊來并行執行。iterate很難分割成能獨立執行的小塊,因為每次應用這個函數都要 依賴前一次應用執行結果。
使用有針對性的方法,避免裝箱拆箱操作和能分成獨立塊并行??梢允褂?#xff0c;LongStream.rangeClosed與iterate相比有兩個優點。
1、產生原始long數字,沒有裝箱拆箱操作。
2、會生成數字范圍,很容易拆成小塊。
例:
執行結果:
得到結果終于比順序執行快,我們使用并行流的時候一定要正確使用,比如算法改變了某些共享狀態。
二、分之/合并框架ForkJoinPool
分之/合并框架的目的是有遞歸方式將可以并行的任務拆分成更小的任務,然后將每個子任務的結果合并起來生成整體結果。ForkJoinPool是ExecutorService的一個實現,它把子任務分配給線程池(稱為ForkJoinPool)中的工作線程。
1、使用RecursiveTask
要把任務提交到線程池中,必須創建RecursiveTask的子類,重寫compute方法。R是并行化產生的結果類型。如果任務不返回結果用RecursiveAction。
protected abstract R compute();這個方法同時定義了將任務拆分成子任務的邏輯,和任務無法在拆分時,生成單個任務邏輯。
if (任務足夠小或不可分) {
順序計算該任務
} else {
將任務分成兩個子任務
遞歸調用本方法,拆分每個子任務,等待所有子任務完成
合并每個子任務的結果
}
用分之/合并框架并行求和
public class ForkJoinSumCalculator extends RecursiveTask<Long> {private final long[] numbers;private final int start;private final int end;public static final long THRESHOLD = 10_000;public ForkJoinSumCalculator(long[] numbers) {this(numbers,0,numbers.length);}private ForkJoinSumCalculator(long[] numbers, int start, int end) {this.numbers = numbers;this.start = start;this.end = end;}@Overrideprotected Long compute() {//該任務負責求和的部分大小int length = end - start;//如果大小小于或等于閾值,順序執行結果if (length <= THRESHOLD) {return computeSequentially();}//創建一個子任務為數組的前一半求和ForkJoinSumCalculator leftTask =new ForkJoinSumCalculator(numbers, start, start + length/2);//利用另一個ForkJoinPool線程異步執行創建子任務leftTask.fork();//創建一個數組另一半求和ForkJoinSumCalculator rightTask =new ForkJoinSumCalculator(numbers, start + length/2, end);Long rightResult = rightTask.compute();//讀取第一個子任務結果,如果尚未完成就等待Long leftResult = leftTask.join();return leftResult + rightResult;}private long computeSequentially() {long sum = 0;for (int i = start; i < end; i++) {{sum += numbers[i];}}return sum;}public static long forkJoinSum(long n) {long[] numbers = LongStream.rangeClosed(1, n).toArray();ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);return new ForkJoinPool().invoke(task);}public static void main(String[] args) {System.out.println(forkJoinSum(10000000));} }使用分之/合并框架需注意幾點:
1、對于一個任務調用join方法會阻塞調用方,直到該任務結束。因此,在兩個子任務的計算開始之后 再調用它。
2、不在RecursiveTask子類內部使用invoke方法
三、Spliterator
Spliterator是java8中加入的另一個新接口,字面意思是可分迭代器,主要作用于并行執行。
public interface Spliterator<T> {、//按順序一個一個使用Spliterator元素,如果有其它元素遍歷返回trueboolean tryAdvance(Consumer<? super T> action);//可以把元素劃分出去分給第二個SpliteratorSpliterator<T> trySplit();//還剩多少元素要遍歷long estimateSize();//特性int characteristics(); } 與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的并行数据处理与性能详解与ForkJoin框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用流收集数据Collectors的用法介
- 下一篇: JAVA8 Optional新特性和使用