java用不用stream_Java parallelStream不使用预期的线程数
Java 8 parallelStream似乎使用的線程數多于系統屬性java.util.concurrent.ForkJoinPool.common.parallelism指定的線程數.這些單元測試顯示我使用自己的ForkJoinPool使用所需數量的線程處理任務,但是當使用parallelStream時,線程數高于預期.
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertTrue;
public class ParallelStreamTest {
private static final int TOTAL_TASKS = 1000;
@Test
public void testParallelStreamWithParallelism1() throws InterruptedException {
final Integer maxThreads = 1;
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", maxThreads.toString());
List objects = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
objects.add(i);
}
final AtomicInteger concurrentThreads = new AtomicInteger(0);
final AtomicInteger taskCount = new AtomicInteger(0);
objects.parallelStream().forEach(i -> {
processTask(concurrentThreads, maxThreads); //expected to be called one at the time
taskCount.addAndGet(1);
});
assertTrue(taskCount.get() == TOTAL_TASKS);
}
@Test
public void testMyOwnForkJoinPoolWithParallelism1() throws InterruptedException {
final Integer threads = 1;
List objects = new ArrayList<>();
for (int i = 0; i < TOTAL_TASKS; i++) {
objects.add(i);
}
ForkJoinPool forkJoinPool = new ForkJoinPool(1);
final AtomicInteger concurrentThreads = new AtomicInteger(0);
final AtomicInteger taskCount = new AtomicInteger(0);
forkJoinPool.submit(() -> objects.parallelStream().forEach(i -> {
processTask(concurrentThreads, threads); //expected to be called one at the time
taskCount.addAndGet(1);
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.MINUTES);
assertTrue(taskCount.get() == TOTAL_TASKS);
}
/**
* It simply processes a task increasing first the concurrentThreads count
*
* @param concurrentThreads Counter for threads processing tasks
* @param maxThreads Maximum number of threads that are expected to be used for processing tasks
*/
private void processTask(AtomicInteger concurrentThreads, int maxThreads) {
int currentConcurrentThreads = concurrentThreads.addAndGet(1);
if (currentConcurrentThreads > maxThreads) {
throw new IllegalStateException("There should be no more than " + maxThreads + " concurrent thread(s) but found " + currentConcurrentThreads);
}
// actual processing would go here
concurrentThreads.decrementAndGet();
}
}
應該只有一個線程用于處理任務,因為ForkJoinPool具有parallelism = 1和java.util.concurrent.ForkJoinPool.common.parallelism = 1.因此,兩個測試都應該通過,但testParallelStreamWithParallelism1失敗:
java.lang.IllegalStateException: There should be no more than 1 concurrent thread(s) but found 2
似乎設置java.util.concurrent.ForkJoinPool.common.parallelism = 1沒有按預期工作,并且同時處理了多個并發任務.
有任何想法嗎?
解決方法:
Fork / Join池的并行性設置確定了池工作線程的數量,但是從調用者線程開始,例如,主線程也將在作業上工作,使用公共池時總會有一個線程.這就是default setting of the common pool is “number of cores minus one”讓實際工作線程數等于核心數的原因.
使用自定義Fork / Join池,流操作的調用者線程已經是池的工作線程,因此,利用它來處理作業不會增加實際工作線程數.
必須強調的是,Stream實現和Fork / Join池之間的交互完全沒有指定,因為流使用Fork / Join框架的事實是一個實現細節.無法保證更改默認池的屬性對流有任何影響,也不保證在自定義Fork / Join池的任務中調用流操作將使用該自定義池.
標簽:java,java-8,java-stream,multithreading
來源: https://codeday.me/bug/20190828/1753500.html
總結
以上是生活随笔為你收集整理的java用不用stream_Java parallelStream不使用预期的线程数的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java个人学生信息的录入_java录入
- 下一篇: java点击表头可进行排序_table中