【java8】并行流Stream
流在處理數(shù)據(jù)進(jìn)行一些迭代操作的時(shí)候確認(rèn)很方便,但是在執(zhí)行一些耗時(shí)或是占用資源很高的任務(wù)時(shí)候,串行化的流無(wú)法帶來(lái)速度/性能上的提升,并不能滿足我們的需要。
通常我們會(huì)使用多線程來(lái)并行或是分片分解執(zhí)行任務(wù),而在Stream中也提供了這樣的并行方法,下面將會(huì)一一介紹這些方法。
將順序流轉(zhuǎn)為并行流
使用parallelStream()方法或者是使用stream().parallel()來(lái)轉(zhuǎn)化為并行流。
但是只是可能會(huì)返回一個(gè)并行的流,流是否能并行執(zhí)行還受到其他一些條件的約束(如是否有序,是否支持并行)。
對(duì)順序流調(diào)用parallel方法并不意味著流本身有任何實(shí)際的變化。它在內(nèi)部實(shí)際上就是設(shè)了一個(gè)boolean標(biāo)志,表示你想讓調(diào)用parallel之后進(jìn)行的所有操作都并行執(zhí)行。類(lèi)似地,你只需要對(duì)并行流調(diào)用sequential方法就可以把它變成順序流。如果對(duì)這個(gè)方法調(diào)用了多次,將以最后一次執(zhí)行為準(zhǔn)。
package com.morris.java8.parallel;import java.util.concurrent.TimeUnit; import java.util.stream.IntStream;public class ParallerDemo {public static void main(String[] args) {IntStream list = IntStream.range(0, 6);//開(kāi)始并行執(zhí)行list.parallel().forEach(i -> {Thread thread = Thread.currentThread();System.err.println("integer:" + i + "," + "currentThread:" + thread.getName());try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}});} }運(yùn)行結(jié)果如下:
integer:3,currentThread:main integer:4,currentThread:ForkJoinPool.commonPool-worker-3 integer:5,currentThread:ForkJoinPool.commonPool-worker-2 integer:1,currentThread:ForkJoinPool.commonPool-worker-1 integer:2,currentThread:ForkJoinPool.commonPool-worker-1 integer:0,currentThread:ForkJoinPool.commonPool-worker-3從運(yùn)行結(jié)果里面我們可以很清楚的看到parallelStream同時(shí)使用了主線程和ForkJoinPool.commonPool創(chuàng)建的線程。 值得說(shuō)明的是這個(gè)運(yùn)行結(jié)果并不是唯一的,實(shí)際運(yùn)行的時(shí)候可能會(huì)得到多個(gè)結(jié)果。
看看流的parallel方法,你可能會(huì)想,并行流用的線程是從哪兒來(lái)的?有多少個(gè)?怎么自定義這個(gè)過(guò)程呢?
并行流內(nèi)部使用了默認(rèn)的ForkJoinPool,它默認(rèn)的線程數(shù)量就是你的處理器數(shù)量,這個(gè)值是由Runtime.getRuntime().availableProcessors()得到的。
但是你可以通過(guò)系統(tǒng)屬性java.util.concurrent.ForkJoinPool.common.parallelism來(lái)改變線程池大小,如下所示:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");這是一個(gè)全局設(shè)置,因此它將影響代碼中所有的并行流。反過(guò)來(lái)說(shuō),目前還無(wú)法專(zhuān)為某個(gè)并行流指定這個(gè)值。一般而言,讓ForkJoinPool的大小等于處理器數(shù)量是個(gè)不錯(cuò)的默認(rèn)值,除非你有很好的理由,否則我們強(qiáng)烈建議你不要修改它。
// 設(shè)置全局并行流并發(fā)線程數(shù) System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12"); System.out.println(ForkJoinPool.getCommonPoolParallelism());// 輸出 12 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); System.out.println(ForkJoinPool.getCommonPoolParallelism());// 輸出 12為什么兩次的運(yùn)行結(jié)果是一樣的呢?上面剛剛說(shuō)過(guò)了這是一個(gè)全局設(shè)置,java.util.concurrent.ForkJoinPool.common.parallelism是final類(lèi)型的,整個(gè)JVM中只允許設(shè)置一次。既然默認(rèn)的并發(fā)線程數(shù)不能反復(fù)修改,那怎么進(jìn)行不同線程數(shù)量的并發(fā)測(cè)試呢?答案是:引入ForkJoinPool。
IntStream range = IntStream.range(1, 100000); // 傳入parallelism new ForkJoinPool(parallelism).submit(() -> range.parallel().forEach(System.out::println)).get();因此,使用parallelStream時(shí)需要注意的一點(diǎn)是,多個(gè)parallelStream之間默認(rèn)使用的是同一個(gè)線程池,所以IO操作盡量不要放進(jìn)parallelStream中,否則會(huì)阻塞其他parallelStream。
// 獲取當(dāng)前機(jī)器CPU處理器的數(shù)量 System.out.println(Runtime.getRuntime().availableProcessors());// 輸出 4 // parallelStream默認(rèn)的并發(fā)線程數(shù) System.out.println(ForkJoinPool.getCommonPoolParallelism());// 輸出 3為什么parallelStream默認(rèn)的并發(fā)線程數(shù)要比CPU處理器的數(shù)量少1個(gè)?因?yàn)樽顑?yōu)的策略是每個(gè)CPU處理器分配一個(gè)線程,然而主線程也算一個(gè)線程,所以要占一個(gè)名額。 這一點(diǎn)可以從源碼中看出來(lái):
static final int MAX_CAP = 0x7fff; // max #workers - 1 // 無(wú)參構(gòu)造函數(shù) public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false); }測(cè)試流的性能
下面通過(guò)幾種方式計(jì)算數(shù)據(jù)的和來(lái)測(cè)試流的性能。
package com.morris.java8.parallel;import java.util.function.Function; import java.util.stream.LongStream; import java.util.stream.Stream;public class ParallerStreamExample {public static void main(String[] args) {long n = 100_000_000;System.out.println("normal:" + recordTime(ParallerStreamExample::normal, n) + " MS");System.out.println("iterator:" + recordTime(ParallerStreamExample::iterator, n) + " MS");// 太耗時(shí),暫時(shí)注釋// System.out.println("iteratorParallel:" + recordTime(ParallerStreamExample::iteratorParallel, n) + " MS");System.out.println("longStream:" + recordTime(ParallerStreamExample::longStream, n) + " MS");System.out.println("longStreamParallel:" + recordTime(ParallerStreamExample::longStreamParallel, n) + " MS");}public static long recordTime(Function<Long, Long> function, long n) {long lowestCostTime = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long startTime = System.currentTimeMillis();function.apply(n);long costTime = System.currentTimeMillis() - startTime;if(costTime < lowestCostTime) {lowestCostTime = costTime;}}return lowestCostTime;}/*** 正常for循環(huán)* @param n* @return*/public static long normal(long n) {long result = 0;for(long i = 1; i <= n; i++) {result += i;}return result;}/*** iterate順序流* @param n* @return*/public static long iterator(long n) {return Stream.iterate(1L, t -> t + 1).limit(n).reduce(0L, Long::sum);}/*** iterate并行流* @param n* @return*/public static long iteratorParallel(long n) {return Stream.iterate(1L, t -> t + 1).parallel().limit(n).reduce(0L, Long::sum);}/*** rangeClosed順序流* @param n* @return*/public static long longStream(long n) {return LongStream.rangeClosed(1, n).sum();}/*** rangeClosed并行流* @param n* @return*/public static long longStreamParallel(long n) {return LongStream.rangeClosed(1, n).parallel().sum();} }運(yùn)行結(jié)果如下:
normal:33 MS iterator:990 MS longStream:44 MS longStreamParallel:16 MS結(jié)論:
-
Stream串行性能明顯差于for循環(huán)迭代,因?yàn)镾tream串行還有流水線成本在里面。
-
并行的Stream API能夠發(fā)揮多核特性,但是有時(shí)候不如串行流(比如后面的計(jì)算依賴(lài)前面的計(jì)算結(jié)果就不適宜用并行流)
高效使用并行流
下面是一些使用并行流需要思考的方面:
-
留意裝箱。自動(dòng)裝箱和拆箱操作會(huì)大大降低性能。Java 8中有原始類(lèi)型流(IntStream、LongStream、DoubleStream)來(lái)避免這種操作,但凡有可能都應(yīng)該用這些流。
-
有些操作本身在并行流上的性能就比順序流差,比如后面的計(jì)算依賴(lài)前面的計(jì)算結(jié)果。
-
還要考慮流的操作流水線的總計(jì)算成本。設(shè)N是要處理的元素的總數(shù),Q是一個(gè)元素通過(guò)流水線的大致處理成本,則N*Q就是這個(gè)對(duì)成本的一個(gè)粗略的定性估計(jì)。Q值較高就意味著使用并行流時(shí)性能好的可能性比較大。
-
對(duì)于較小的數(shù)據(jù)量,選擇并行流幾乎從來(lái)都不是一個(gè)好的決定。并行處理少數(shù)幾個(gè)元素的好處還抵不上并行化造成的額外開(kāi)銷(xiāo)。
-
要考慮流背后的數(shù)據(jù)結(jié)構(gòu)是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因?yàn)榍罢哂貌恢闅v就可以平均拆分,而后者則必須遍歷。
-
流自身的特點(diǎn),以及流水線中的中間操作修改流的方式,都可能會(huì)改變分解過(guò)程的性能。例如,一個(gè)SIZED流可以分成大小相等的兩部分,這樣每個(gè)部分都可以比較高效地并行處理,但篩選操作可能丟棄的元素個(gè)數(shù)卻無(wú)法預(yù)測(cè),導(dǎo)致流本身的大小未知。
-
還要考慮終端操作中合并步驟的代價(jià)是大是小(例如Collector中的combiner方法)。如果這一步代價(jià)很大,那么組合每個(gè)子流產(chǎn)生的部分結(jié)果所付出的代價(jià)就可能會(huì)超出通過(guò)并行流得到的性能提升。
總結(jié)
以上是生活随笔為你收集整理的【java8】并行流Stream的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: html隐藏visibility,CSS
- 下一篇: 讲真,这两款idea插件,能治愈你英语不