java8使用parallelStream并行流造成数据丢失或下标越界异常解决方案
描述
我們先看一段使用了并行流的代碼
@Test
public void testStream() {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
list.add(i);
}
System.out.println(list.size());
List<Integer> streamList = new ArrayList<>();
list.parallelStream().forEach(streamList::add);
System.out.println(streamList.size());
}
編譯結(jié)果:
觀察發(fā)現(xiàn),原來(lái)集合中的數(shù)據(jù)有10000條,但是使用并行流遍歷數(shù)據(jù)插入到新集合streamList中后,新的集合中只有5746條數(shù)據(jù)。并且會(huì)在多次之后可能會(huì)出現(xiàn)數(shù)組下標(biāo)越界異常,顯然這里的代碼是不合邏輯的。
分析
parallelStream中使用的是ForkJobTask。Fork/Join的框架是通過(guò)把一個(gè)大任務(wù)不斷fork成許多子任務(wù),然后多線程執(zhí)行這些子任務(wù),最后再Join這些子任務(wù)得到最終結(jié)果。關(guān)于分支/合并框架的使用案例可以看我的這篇文章(用分支/合并框架執(zhí)行并行求和)。從程序上看,就是先將list集合fork成多段,然后多線程添加到streamList的結(jié)合中,而streamList是ArrayList類型,它的add方法并不能保證原子性。
ArrayList的add方法源碼如下:
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return <tt>true</tt> (as specified by {@link Collection#add})
*/
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
可以看到add方法可以概括為以下兩個(gè)步驟
ensureCapacityInternal(),確認(rèn)下當(dāng)前ArrayList中的數(shù)組,是否還可以加入新的元素。如果不行,就會(huì)再申請(qǐng)一個(gè):int newCapacity = oldCapacity + (oldCapacity >> 1) 大小的數(shù)組(這個(gè)容量相當(dāng)于:1 + 1/2 = 1.5倍),然后將數(shù)據(jù)copy過(guò)去。
elementData[size++] = e:添加元素到elementData數(shù)組中。
在并發(fā)情況下,如果同時(shí)有A、B兩個(gè)線程同時(shí)執(zhí)行add,在第一步ensureCapacityInternal校驗(yàn)數(shù)組容量時(shí),A、B線程都發(fā)現(xiàn)當(dāng)前容量還可以添加最有一個(gè)元素,不需擴(kuò)容;因此進(jìn)入第二步,此時(shí),A線程先執(zhí)行完,數(shù)組容量已滿,然后B線程再對(duì)elementData賦值時(shí),就會(huì)拋出“ArrayIndexOutOfBoundsException”。
解決方案
第一種:將parallelStream改成stream,或者直接使用foreach處理。這可以通過(guò)判斷并發(fā)處理真實(shí)能帶來(lái)多大的好處,做取舍。
第二種:使用resultList =new CopyOnWriteArrayList<>();這是個(gè)線程安全的類。從源碼上看,CopyOnWriteArrayList在add操作時(shí),通過(guò)ReentrantLock進(jìn)行加鎖,防止并發(fā)寫。不給過(guò)CopyOnWriteArrayList,每次add操作都是把原數(shù)組中的元素拷貝一份到新數(shù)組中,然后在新數(shù)組中添加新元素,最后再把引用指向新數(shù)組。這會(huì)導(dǎo)致頻繁的對(duì)象創(chuàng)建,況且數(shù)組還是需要一塊連續(xù)的內(nèi)存空間,如果有大量add操作,慎用。
第三種:使用包裝類resultList = Collections.synchronizedList(Arrays.asList());
總結(jié)
在從stream和parallelStream方法中進(jìn)行選擇時(shí),我們可以考慮以下幾個(gè)問(wèn)題:
1.是否需要并行?
2.任務(wù)之間是否是獨(dú)立的?是否會(huì)引起任何競(jìng)態(tài)條件?
3.結(jié)果是否取決于任務(wù)的調(diào)用順序?
對(duì)于問(wèn)題1,在回答這個(gè)問(wèn)題之前,你需要弄清楚你要解決的問(wèn)題是什么,數(shù)據(jù)量有多大,計(jì)算的特點(diǎn)是什么?并不是所有的問(wèn)題都適合使用并發(fā)程序來(lái)求解,比如當(dāng)數(shù)據(jù)量不大時(shí),順序執(zhí)行往往比并行執(zhí)行更快。畢竟,準(zhǔn)備線程池和其它相關(guān)資源也是需要時(shí)間的。但是,當(dāng)任務(wù)涉及到I/O操作并且任務(wù)之間不互相依賴時(shí),那么并行化就是一個(gè)不錯(cuò)的選擇。通常而言,將這類程序并行化之后,執(zhí)行速度會(huì)提升好幾個(gè)等級(jí)。
對(duì)于問(wèn)題2,如果任務(wù)之間是獨(dú)立的,并且代碼中不涉及到對(duì)同一個(gè)對(duì)象的某個(gè)狀態(tài)或者某個(gè)變量的更新操作,那么就表明代碼是可以被并行化的。
對(duì)于問(wèn)題3,由于在并行環(huán)境中任務(wù)的執(zhí)行順序是不確定的,因此對(duì)于依賴于順序的任務(wù)而言,并行化也許不能給出正確的結(jié)果。
總結(jié)
以上是生活随笔為你收集整理的java8使用parallelStream并行流造成数据丢失或下标越界异常解决方案的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 已经发车的票还能取出来吗_高铁票在车已经
- 下一篇: python将元祖写入txt文档_pyt