Java并行任务框架Fork/Join
Fork/Join是什么?
Fork意思是分叉,Join為合并。Fork/Join是一個(gè)將任務(wù)分割并行運(yùn)行,然后將最終結(jié)果合并成為大任務(wù)的結(jié)果的框架,父任務(wù)可以分割成若干個(gè)子任務(wù),子任務(wù)可以繼續(xù)分割,提供我們一種方便的并行任務(wù)功能,滿足實(shí)際場(chǎng)景的業(yè)務(wù)需求,思想類似于MapReduce。任務(wù)的分割必須保證子任務(wù)獨(dú)立,不會(huì)相互依賴結(jié)果。
從哪里開始?
Fork/Join框架主要有如下接口和類:
- ForkJoinPool:一個(gè)線程池,用于執(zhí)行調(diào)度分割的任務(wù),實(shí)現(xiàn)了ExecutorService接口。提供三種執(zhí)行任務(wù)的方式:
1、execute:最原生的執(zhí)行方式,以異步執(zhí)行,并且無(wú)返回結(jié)果。
2、submit:異步執(zhí)行,有返回結(jié)果,返回結(jié)果是封裝后的Future對(duì)象。
3、invoke和invokeAll:異步執(zhí)行,有返回結(jié)果,會(huì)等待所有任務(wù)執(zhí)行執(zhí)行完成,返回的結(jié)果為無(wú)封裝的泛型T。
- ForkJoinTask:抽象的分割任務(wù),提供以分叉的方式執(zhí)行,以及合并執(zhí)行結(jié)果。
- RecursiveAction:異步任務(wù),無(wú)返回結(jié)果。通常自定義的任務(wù)要繼承,并重寫compute方法,任務(wù)執(zhí)行的就是compute方法。
- RecursiveTask:異步任務(wù),有返回結(jié)果。通常自定義的任務(wù)要繼承,并重寫compute方法,任務(wù)執(zhí)行的就是compute方法。
核心類圖
從核心類圖看出,要想開始一個(gè)分割的并行任務(wù),可以創(chuàng)建一個(gè)ForkJoinPool線程池,同時(shí)創(chuàng)建無(wú)返回結(jié)果的任務(wù)RecursiveAction或有返回結(jié)果的任務(wù)RecursiveTask,最后調(diào)用線程池ForkJoinPool的execute或submit或invoke方法執(zhí)行任務(wù),完成后合并結(jié)果。
實(shí)例
我們以一個(gè)有返回結(jié)果的并行任務(wù)實(shí)例進(jìn)行測(cè)試。計(jì)算從起始值到結(jié)束值得連續(xù)數(shù)的累加結(jié)果,利用Fork/Join框架。并對(duì)比普通計(jì)算和并行計(jì)算的耗時(shí)差異。
package com.misout.forkjoin;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* 計(jì)算從起始值到結(jié)束值得連續(xù)數(shù)的累加結(jié)果,利用Fork/Join框架
* @author Misout
* @date 2018-01-13 16:06:44
*/
public class SumTask extends RecursiveTask<Long> {
private static final long serialVersionUID = 4828818665955149519L;
/** 每個(gè)任務(wù)最多允許計(jì)算的數(shù)字個(gè)數(shù)閾值,超過(guò)這個(gè)閾值,任務(wù)進(jìn)行拆分 */
private static final long THRESHOLD = 1000L;
/** 起始值 */
private Long startNumber;
/** 結(jié)束值 */
private Long endNumber;
public SumTask(Long startNumber, Long endNumber) {
this.startNumber = startNumber;
this.endNumber = endNumber;
}
/**
* 累加數(shù)的個(gè)數(shù)超過(guò)閾值1000個(gè),拆分成2個(gè)子任務(wù)執(zhí)行。子任務(wù)繼續(xù)作拆分。計(jì)算完,合并結(jié)果。
*/
@Override
protected Long compute() {
if(startNumber > endNumber) {
System.out.println("start number should be smaller than end number");
return 0L;
}
if(endNumber - startNumber < THRESHOLD) {
return this.getCount(startNumber, endNumber);
} else {
Long mid = (startNumber + endNumber) / 2;
RecursiveTask<Long> subTask1 = new SumTask(startNumber, mid);
RecursiveTask<Long> subTask2 = new SumTask(mid + 1, endNumber);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
/**
* 普通累加執(zhí)行方法
* @param start 起始數(shù)
* @param end 結(jié)束數(shù)
* @return 累加和
*/
protected Long getCount(Long start, Long end) {
Long sum = 0L;
for(long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Long start = 5L;
Long end = 3463434L;
SumTask task = new SumTask(start, end);
Long startTime = System.currentTimeMillis();
Long sum = forkJoinPool.invoke(task);
Long endTime = System.currentTimeMillis();
System.out.println("fork/join : sum = " + sum + ", cost time = " + (endTime - startTime) + "ms");
startTime = System.currentTimeMillis();
Long sum2 = task.getCount(start, end);
endTime = System.currentTimeMillis();
System.out.println("normal : sum = " + sum2 + ", cost time = " + (endTime - startTime) + "ms");
}
}
說(shuō)明:SumTask繼承RecursiveTask,并實(shí)現(xiàn)了compute方法。在compute方法中會(huì)進(jìn)行任務(wù)分割,并繼續(xù)生成子任務(wù),子任務(wù)仍然以分割的方式運(yùn)行。
運(yùn)行結(jié)果對(duì)比:
fork/join : sum = 5997689267885, cost time = 290ms
normal : sum = 5997689267885, cost time = 41ms
注意事項(xiàng):任務(wù)拆分的深度最好不要太多,否則很容易因創(chuàng)建的線程過(guò)多影響系統(tǒng)性能。
work-stealing規(guī)則
在Java的API說(shuō)明中提到,F(xiàn)orkJoinPool線程池與ThreadPoolExecutor線程池不同的地方在于,F(xiàn)orkJoinPool善于利用竊取工作執(zhí)行加快任務(wù)的總體執(zhí)行速度。實(shí)際上,在ForkJoinPool線程池中,若一個(gè)工作線程的任務(wù)隊(duì)列為空沒(méi)有任務(wù)執(zhí)行時(shí),便從其他工作線程中獲取任務(wù)主動(dòng)執(zhí)行。為了實(shí)現(xiàn)工作竊取,在工作線程中維護(hù)了雙端隊(duì)列,竊取任務(wù)線程從隊(duì)尾獲取任務(wù),被竊取任務(wù)線程從隊(duì)頭獲取任務(wù)。這種機(jī)制充分利用線程進(jìn)行并行計(jì)算,減少了線程競(jìng)爭(zhēng)。但是當(dāng)隊(duì)列中只存在一個(gè)任務(wù)了時(shí),兩個(gè)線程去取反而會(huì)造成資源浪費(fèi)。
總結(jié)
以上是生活随笔為你收集整理的Java并行任务框架Fork/Join的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: QT中的对象模型――QPointer
- 下一篇: Odoo event