7. Java8新特性-并行数据处理(parallel)
在JDK7之前,并行處理數(shù)據(jù)集合非常麻煩。首先需要自己明確的把包含數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)分成若干個子部分,第二需要給每個子部分分配一個獨(dú)立的線程;第三需要在恰當(dāng)?shù)臅r候?qū)λ鼈冞M(jìn)行同步來避免不希望出現(xiàn)的競爭條件,等待所有線程完成,最后把這些部分合并起來。
Doug Lea 在JDK7中引入了fork/join框架,讓這些操作更穩(wěn)定,更不易出錯。
本節(jié)主要內(nèi)容:
1. 用并行流并行處理數(shù)據(jù)
2. 并行流的性能分析
3. fork/join框架
4. 使用Spliterator分割流
學(xué)完本節(jié)期望能達(dá)到:
1. 熟練使用并行流,來加速業(yè)務(wù)性能
2. 了解流內(nèi)部的工作原理,以防止誤用的情況
3. 通過Spliterator控制數(shù)據(jù)塊的劃分方式
并行流
可以通過對數(shù)據(jù)源調(diào)用parallelStream方法來將源轉(zhuǎn)換為并行流。并行流就是一個把內(nèi)容分成多個數(shù)據(jù)塊,并用不同的線程分別處理每個數(shù)據(jù)塊的流。這樣可以自動將工作負(fù)荷轉(zhuǎn)到多核中并行處理。
考慮下面一個實現(xiàn):給定正整數(shù)n,計算 1 + 2 + … n的和。
使用stream的實現(xiàn):
將上面的順序流轉(zhuǎn)換為并行流,實現(xiàn)如下:
private static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum); }即通過調(diào)用方法parallel可將順序流轉(zhuǎn)換為并行流。
但需要注意的是流僅在終端操作時才開始執(zhí)行,所以當(dāng)前流是順序流還是并行流以最靠近終端操作的流類型為準(zhǔn),示例:
list.stream().parallel().filter(e -> e.age > 20).sequential().map(...).parallel().collect(...);此種情況并不會按預(yù)想的先使用并行流執(zhí)行過濾,再按順序流執(zhí)行映射轉(zhuǎn)換。而是整個流水線操作都按并行流執(zhí)行。
配置并行流使用的線程池
并行流內(nèi)部使用了默認(rèn)的ForkJoinPool, 它默認(rèn)的線程數(shù)量就是處理器的數(shù)量(Runtime.getRuntime().availableProcessors())。也可以通過設(shè)置系統(tǒng)屬性來改變它(System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “12”))。但它是一個全局設(shè)置,會影響所有的并行流,一般而言線程數(shù)等于處理器數(shù)量是一個合理的數(shù)值,不需要修改。
測試流性能
一般而言,同一個功能給我們的感覺是并行流性能會比順序流性能更好。然而在軟件工程中,優(yōu)化性能的黃金準(zhǔn)則是:測量。我們開發(fā)了程序,用來測量4種寫法的累加,看看性能如何:
@Slf4j public class SumSample {/*** 順序流、并行流性能測試* 實現(xiàn)1~1億整型數(shù)字累加**/public static void main(String[] args) {CostUtil.cost(() -> log.info("==> for: 1 + ... + 100_000_000, result: {}", forSum(100_000_000)));log.info("================================================================================");CostUtil.cost(() -> log.info("==> sequential: 1 + ... + 100_000_000, result: {}", sequentialSum(100_000_000)));log.info("================================================================================");CostUtil.cost(() -> log.info("==> parallel: 1 + ... + 100_000_000, result: {}", parallelSum(100_000_000)));log.info("================================================================================");CostUtil.cost(() -> log.info("==> longParallel: 1 + ... + 100_000_000, result: {}", longParallelSum(100_000_000)));}/*** 內(nèi)部迭代方式實現(xiàn)累加*/private static long forSum(long n) {long result = 0;for (int i = 1; i <= n; i ++) {result += i;}return result;}/*** 順序流實現(xiàn)累加*/private static long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);}/*** 并行流實現(xiàn)累加*/private static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);}/*** long原生流范圍實現(xiàn)累加*/private static long longParallelSum(long n) {return LongStream.rangeClosed(1L, n).parallel().reduce(0L, Long::sum);} } // result: 2022-01-18 10:53:59.035 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> for: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:53:59.039 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 58 2022-01-18 10:53:59.039 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> sequential: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 1420 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:04.627 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> parallel: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:04.628 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 4167 2022-01-18 10:54:04.628 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:04.688 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> longParallel: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:04.688 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 60使用四種方法實現(xiàn)1~1億個數(shù)的累加,這是在i7 2.4GHz 6core/12threads CPU的執(zhí)行結(jié)果。讓人很意外,并非是并行流性能最好,反而是最差的,最樸實的for循環(huán)單線程性能最佳。
原因:
通過上面的比較需要意識到:并行編程比較復(fù)雜,有時候甚至違反直覺。如果用的不對(如本例,采用了一個不易并行化的操作iterate),甚至?xí)屝阅芨睢K粤私鈖arallel方法背后的執(zhí)行細(xì)節(jié)非常必要。
LongStream.rangeClosed 代替 iterate
僅高效求和的示例,可用LongStream.rangeClosed高效替代iterate實現(xiàn)并行計算。它的優(yōu)點(diǎn)是:
通過示例演示它的并行執(zhí)行性能比同樣是并行流的iterate版本要快了70倍。可見它有效利用了并行。
為什么并行流還是比for慢?
上面的執(zhí)行結(jié)果可以看出LongStream.rangeClosed的性能還是比for略慢一點(diǎn),原因是:
并行化是有代價的,并行過程中需要對流做遞歸劃分,把流的歸納操作分配到不同的線程,最后合并。且多個核心之間移動數(shù)據(jù)的代價也很大。
正確使用并行流
使用并行流加速性能需要確保用對,如果計算結(jié)果是錯誤的,再快也沒意義。
誤用并行流而產(chǎn)生錯誤的首要原因是使用的算法改變了某些共享狀態(tài)。 如下面示例:
從上面示例看出雖然很快,但結(jié)果是錯誤的。 原因是total += value非原子操作,出現(xiàn)了競態(tài)條件。如果使用同步來修復(fù),就失去了并行的意義。 所以寫并行流時一定要考慮多個線程是否會修改共享對象的可變狀態(tài)。
高效使用并行流
一些高效使用并行流的建議:
一些常見的數(shù)據(jù)源的可分解性匯總:
Fork/Join框架
想要正確的使用并行流,了解它背后的實現(xiàn)原理至關(guān)重要。 并行流背后就是采用的Fork/Join框架。
// TODO: 待補(bǔ)充
Spliterator
// TODO: 待補(bǔ)充
小結(jié)
行為和性能有時是違反直覺的,因此一定要測量,確保你并沒有把程序拖得更慢。
或處理單個元素特別耗時的時候。
總是比嘗試并行化某些操作更為重要。
上執(zhí)行,然后將各個子任務(wù)的結(jié)果合并起來生成整體結(jié)果。
總結(jié)
以上是生活随笔為你收集整理的7. Java8新特性-并行数据处理(parallel)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: localStorage储存如何正确存储
- 下一篇: 【Golang】JSON Marshal