Java7任务并行执行神器:ForkJoin框架
轉載自?Java7任務并行執行神器:Fork&Join框架
Fork/Join是什么?
Fork/Join框架是Java7提供的并行執行任務框架,思想是將大任務分解成小任務,然后小任務又可以繼續分解,然后每個小任務分別計算出結果再合并起來,最后將匯總的結果作為大任務結果。其思想和MapReduce的思想非常類似。對于任務的分割,要求各個子任務之間相互獨立,能夠并行獨立地執行任務,互相之間不影響。
Fork/Join的運行流程圖如下:
我們可以通過Fork/Join單詞字面上的意思去理解這個框架。Fork是叉子分叉的意思,即將大任務分解成并行的小任務,Join是連接結合的意思,即將所有并行的小任務的執行結果匯總起來。
工作竊取算法
ForkJoin采用了工作竊取(work-stealing)算法,若一個工作線程的任務隊列為空沒有任務執行時,便從其他工作線程中獲取任務主動執行。為了實現工作竊取,在工作線程中維護了雙端隊列,竊取任務線程從隊尾獲取任務,被竊取任務線程從隊頭獲取任務。這種機制充分利用線程進行并行計算,減少了線程競爭。但是當隊列中只存在一個任務了時,兩個線程去取反而會造成資源浪費。
工作竊取的運行流程圖如下:
Fork/Join核心類
Fork/Join框架主要由子任務、任務調度兩部分組成,類層次圖如下。
- ForkJoinPool 
ForkJoinPool是ForkJoin框架中的任務調度器,和ThreadPoolExecutor一樣實現了自己的線程池,提供了三種調度子任務的方法:
execute:異步執行指定任務,無返回結果;
invoke、invokeAll:異步執行指定任務,等待完成才返回結果;
submit:異步執行指定任務,并立即返回一個Future對象;
- ForkJoinTask 
Fork/Join框架中的實際的執行任務類,有以下兩種實現,一般繼承這兩種實現類即可。
RecursiveAction:用于無結果返回的子任務;
RecursiveTask:用于有結果返回的子任務;
Fork/Join框架實戰
下面實現一個Fork/Join小例子,從1+2+...10億,每個任務只能處理1000個數相加,超過1000個的自動分解成小任務并行處理;并展示了通過不使用Fork/Join和使用時的時間損耗對比。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinTask extends RecursiveTask<Long> { ? ?private static final long MAX = 1000000000L;private static final long THRESHOLD = 1000L;private long start;private long end; ? ?public ForkJoinTask(long start, long end) {this.start = start;this.end = end;} ? ?public static void main(String[] args) {test();System.out.println("--------------------");testForkJoin();} ? ?private static void test() {System.out.println("test");long start = System.currentTimeMillis();Long sum = 0L;for (long i = 0L; i <= MAX; i++) {sum += i;}System.out.println(sum);System.out.println(System.currentTimeMillis() - start + "ms");} ? ?private static void testForkJoin() {System.out.println("testForkJoin");long start = System.currentTimeMillis();ForkJoinPool forkJoinPool = new ForkJoinPool();Long sum = forkJoinPool.invoke(new ForkJoinTask(1, MAX));System.out.println(sum);System.out.println(System.currentTimeMillis() - start + "ms");} ? ?@Overrideprotected Long compute() {long sum = 0;if (end - start <= THRESHOLD) {for (long i = start; i <= end; i++) {sum += i;}return sum;} else {long mid = (start + end) / 2; ? ? ? ? ? ?ForkJoinTask task1 = new ForkJoinTask(start, mid);task1.fork(); ? ? ? ? ? ?ForkJoinTask task2 = new ForkJoinTask(mid + 1, end);task2.fork(); ? ? ? ? ? ?return task1.join() + task2.join();}} }這里需要計算結果,所以任務繼承的是RecursiveTask類。ForkJoinTask需要實現compute方法,在這個方法里首先需要判斷任務是否小于等于閾值1000,如果是就直接執行任務。否則分割成兩個子任務,每個子任務在調用fork方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成孫任務,如果不需要繼續分割,則執行當前子任務并返回結果。使用join方法會阻塞并等待子任務執行完并得到其結果。
程序輸出:
test 500000000500000000 4992ms -------------------- testForkJoin 500000000500000000 508ms從結果看出,并行的時間損耗明顯要少于串行的,這就是并行任務的好處。
總結
以上是生活随笔為你收集整理的Java7任务并行执行神器:ForkJoin框架的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Java对象引用四个级别(强、软、弱、虚
- 下一篇: @Controller,@Service
