Java并行任务框架Fork/Join
Fork/Join是什么?
Fork意思是分叉,Join為合并。Fork/Join是一個將任務(wù)分割并行運(yùn)行,然后將最終結(jié)果合并成為大任務(wù)的結(jié)果的框架,父任務(wù)可以分割成若干個子任務(wù),子任務(wù)可以繼續(xù)分割,提供我們一種方便的并行任務(wù)功能,滿足實(shí)際場景的業(yè)務(wù)需求,思想類似于MapReduce。任務(wù)的分割必須保證子任務(wù)獨(dú)立,不會相互依賴結(jié)果。
從哪里開始?
Fork/Join框架主要有如下接口和類:
- ForkJoinPool:一個線程池,用于執(zhí)行調(diào)度分割的任務(wù),實(shí)現(xiàn)了ExecutorService接口。提供三種執(zhí)行任務(wù)的方式:
1、execute:最原生的執(zhí)行方式,以異步執(zhí)行,并且無返回結(jié)果。
2、submit:異步執(zhí)行,有返回結(jié)果,返回結(jié)果是封裝后的Future對象。
3、invoke和invokeAll:異步執(zhí)行,有返回結(jié)果,會等待所有任務(wù)執(zhí)行執(zhí)行完成,返回的結(jié)果為無封裝的泛型T。
- ForkJoinTask:抽象的分割任務(wù),提供以分叉的方式執(zhí)行,以及合并執(zhí)行結(jié)果。
- RecursiveAction:異步任務(wù),無返回結(jié)果。通常自定義的任務(wù)要繼承,并重寫compute方法,任務(wù)執(zhí)行的就是compute方法。
- RecursiveTask:異步任務(wù),有返回結(jié)果。通常自定義的任務(wù)要繼承,并重寫compute方法,任務(wù)執(zhí)行的就是compute方法。
核心類圖
從核心類圖看出,要想開始一個分割的并行任務(wù),可以創(chuàng)建一個ForkJoinPool線程池,同時創(chuàng)建無返回結(jié)果的任務(wù)RecursiveAction或有返回結(jié)果的任務(wù)RecursiveTask,最后調(diào)用線程池ForkJoinPool的execute或submit或invoke方法執(zhí)行任務(wù),完成后合并結(jié)果。
實(shí)例
我們以一個有返回結(jié)果的并行任務(wù)實(shí)例進(jìn)行測試。計(jì)算從起始值到結(jié)束值得連續(xù)數(shù)的累加結(jié)果,利用Fork/Join框架。并對比普通計(jì)算和并行計(jì)算的耗時差異。
package com.misout.forkjoin;import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask;/** * 計(jì)算從起始值到結(jié)束值得連續(xù)數(shù)的累加結(jié)果,利用Fork/Join框架 * @author Misout * @date 2018-01-13 16:06:44 */ public class SumTask extends RecursiveTask<Long> { private static final long serialVersionUID = 4828818665955149519L; /** 每個任務(wù)最多允許計(jì)算的數(shù)字個數(shù)閾值,超過這個閾值,任務(wù)進(jìn)行拆分 */ private static final long THRESHOLD = 1000L; /** 起始值 */ private Long startNumber; /** 結(jié)束值 */ private Long endNumber; public SumTask(Long startNumber, Long endNumber) { this.startNumber = startNumber; this.endNumber = endNumber; } /** * 累加數(shù)的個數(shù)超過閾值1000個,拆分成2個子任務(wù)執(zhí)行。子任務(wù)繼續(xù)作拆分。計(jì)算完,合并結(jié)果。 */說明:SumTask繼承RecursiveTask,并實(shí)現(xiàn)了compute方法。在compute方法中會進(jìn)行任務(wù)分割,并繼續(xù)生成子任務(wù),子任務(wù)仍然以分割的方式運(yùn)行。
運(yùn)行結(jié)果對比:
fork/join : sum = 5997689267885, cost time = 290ms normal : sum = 5997689267885, cost time = 41ms注意事項(xiàng):任務(wù)拆分的深度最好不要太多,否則很容易因創(chuàng)建的線程過多影響系統(tǒng)性能。
work-stealing規(guī)則
在Java的API說明中提到,ForkJoinPool線程池與ThreadPoolExecutor線程池不同的地方在于,ForkJoinPool善于利用竊取工作執(zhí)行加快任務(wù)的總體執(zhí)行速度。實(shí)際上,在ForkJoinPool線程池中,若一個工作線程的任務(wù)隊(duì)列為空沒有任務(wù)執(zhí)行時,便從其他工作線程中獲取任務(wù)主動執(zhí)行。為了實(shí)現(xiàn)工作竊取,在工作線程中維護(hù)了雙端隊(duì)列,竊取任務(wù)線程從隊(duì)尾獲取任務(wù),被竊取任務(wù)線程從隊(duì)頭獲取任務(wù)。這種機(jī)制充分利用線程進(jìn)行并行計(jì)算,減少了線程競爭。但是當(dāng)隊(duì)列中只存在一個任務(wù)了時,兩個線程去取反而會造成資源浪費(fèi)。
ForkJoinPool工作竊取圖轉(zhuǎn)載于:https://www.cnblogs.com/Joy-Hu/p/10876628.html
總結(jié)
以上是生活随笔為你收集整理的Java并行任务框架Fork/Join的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 面向对象-抽象类
- 下一篇: 逐步加深的异步操作(上)