Parallel Stream 的错误实践
一、前言
Java8 Stream 流的出現,極大的簡化了業務需求中對集合數據的加工處理操作。雖然好用,但是一旦使用不當,也會帶來意想不到的結果,本文記錄使用 Parallel Stream 的錯誤實踐。
List<Object> sourceList = ...; List<Object> list = new ArrayList();sourceList.stream.map(...).foreach(list::add);偽代碼如上所示,對 sourceList 進行源數據加工,加工完畢后 add 進結果 list 中。運行過程中,發現其中存在 null 元素。
二、實驗
寫一個簡單 Case 測試下,如下所示:
public class StreamTest {public static void main(String[] args) {List<Integer> list = new ArrayList<>();IntStream.range(0, 50).parallel().map(e -> e * 2).forEach(list::add);System.out.println("size = " + list.size() + "\n" + list);} }多次執行,發現結果集元素個數不等于期望元素個數,且其中存在 null 元素,而且有幾率出現數組下標越界錯誤。
size = 44 [30, 12, 32, 14, 34, 16, 42, 44, 46, 48, 24, 36, 20, 38, 40, null, 22, 6, 8, 10, 0, 2, 4, 56, 88, 82, 60, 84, 90, 92, 74, 94, 76, null, 50, 52, 98, 54, 62, 64, 66, 68, 70, 72] Exception in thread "main" java.lang.ArrayIndexOutOfBoundsExceptionat sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.lang.reflect.Constructor.newInstance(Constructor.java:423)at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)at java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:189)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)at java.util.stream.IntPipeline.forEach(IntPipeline.java:404)at jit.wxs.disruptor.stream.StreamTest.main(StreamTest.java:15) Caused by: java.lang.ArrayIndexOutOfBoundsException: 15at java.util.ArrayList.add(ArrayList.java:463)at java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:205)at java.util.stream.IntPipeline$3$1.accept(IntPipeline.java:233)at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.execLocalTasks(ForkJoinPool.java:1040)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1058)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)三、分析
問題原因也很簡單,了解過 Parallel Stream 的同學知道,其內部采用 ForkJoinPool 線程池進行執行,也就是說存在線程安全問題,而 ArrayList 是線程不安全的。下面依次來分析產生各種異常情況的原因。
3.1 元素數量丟失
// java.util.ArrayList#add(E) public boolean add(E e) {ensureCapacityInternal(size + 1); // Increments modCount!!elementData[size++] = e;return true; }導致數組下標越界的原因是 ArrayList 的 add() 方法中的 elementData[size++] = e,這行代碼不是原子操作,可以拆解為:
這里存在內存可見性問題,當線程 A 從內存讀取 size 后,設置 e 值,將 size 加 1,然后寫入內存。過程中可能有線程 B 也修改了 size 并寫入內存,那么線程 A 寫入內存的值就會丟失線程 B 的更新。這解釋了會出現數組長度比原始數組要小(元素丟失)的情況。
3.2 null 元素
null 元素產生跟元素數據丟失類似,也是由于 elementData[size++] = e 不是原子操作導致的。假設存在三個線程,線程 1、線程 2、線程 3。三個線程同時開始執行,初始 size 值為 1。
-  線程 1 全部執行完畢,此時 size 被更新為 2。 
-  線程 2 一開始讀取 size 值 = 1、將 e 添加到 size 位置后時間片就用完了,輪到執行第三步 size++ 讀取到了線程 1 的更新,size 直接被更新成了 3。【注:此處線程 2 的 e 值也丟失了,被線程 1 覆蓋】 
-  線程3 一開始讀取 size 值 = 1 后時間片就用完了,輪到執行第二步將 e 添加到 size 位置讀取到了線程 2 的更新,size 變成了 3。size = 2 的位置就被跳過了,因此 elementData[2] 為 null 了。 
 
3.3 數組下標越界
數組越界異常則主要發生在數組擴容前的臨界點。假設當前數組剛好只能添加一個元素,兩個線程同時準備執行ensureCapacityInternal(size + 1),同時讀取的 size 值,加 1 后進入ensureCapacityInternal都不會導致擴容。
退出 ensureCapacityInternal 后,兩個線程同時執行 elementData[size] = e,線程 B 的 size++ 先完成,假設此刻線程 A 讀取到了線程 B 的更新,線程 A 再執行 size++,此時 size 的實際值就會大于數組的容量,這樣就會發生數組越界異常。
四、解決
解決問題也很簡單,分兩種,一種是把結果集合變成線程安全的即可。
List<Integer> list = new CopyOnWriteArrayList<>(); // or List<Integer> list = Collections.synchronizedList(new ArrayList<>());第二種不使用 forEach 自己 add,改用 Stream 的 collect:
public class StreamTest {public static void main(String[] args) {List<Integer> list = IntStream.range(0, 50).parallel().map(e -> e * 2).boxed().collect(Collectors.toList());System.out.println("size = " + list.size() + "\n" + list);} }五、參考資料
- JAVA使用并行流(ParallelStream)時要注意的一些問題
- 記一次java8 parallelStream使用不當引發的血案
總結
以上是生活随笔為你收集整理的Parallel Stream 的错误实践的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 去除最新版WinRAR的弹窗广告
- 下一篇: 数据库10大常见安全问题盘点
