Java并发编程(07):Fork/Join框架机制详解
本文源碼:GitHub·點這里 || GitEE·點這里
一、Fork/Join框架
Java提供Fork/Join框架用于并行執行任務,核心的思想就是將一個大任務切分成多個小任務,然后匯總每個小任務的執行結果得到這個大任務的最終結果。
這種機制策略在分布式數據庫中非常常見,數據分布在不同的數據庫的副本中,在執行查詢時,每個服務都要跑查詢任務,最后在一個服務上做數據合并,或者提供一個中間引擎層,用來匯總數據:
核心流程:切分任務,模塊任務異步執行,單任務結果合并;在編程里面,通用的代碼不多,但是通用的思想卻隨處可見。
二、核心API和方法
1、編碼案例
基于1+2…+100的計算案例演示Fork/Join框架基礎用法。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask;public class ForkJoin01 {public static void main (String[] args) {int[] numArr = new int[100];for (int i = 0; i < 100; i++) {numArr[i] = i + 1;}ForkJoinPool pool = new ForkJoinPool();ForkJoinTask<Integer> forkJoinTask =pool.submit(new SumTask(numArr, 0, numArr.length));System.out.println("合并計算結果: " + forkJoinTask.invoke());pool.shutdown();} } /*** 線程任務*/ class SumTask extends RecursiveTask<Integer> {/** 切分任務塊的閾值* 如果THRESHOLD=100* 輸出:main【求和:(0...100)=5050】 合并計算結果: 5050*/private static final int THRESHOLD = 100;private int arr[];private int start;private int over;public SumTask(int[] arr, int start, int over) {this.arr = arr;this.start = start;this.over = over;}// 求和計算private Integer sumCalculate () {Integer sum = 0;for (int i = start; i < over; i++) {sum += arr[i];}String task = "【求和:(" + start + "..." + over + ")=" + sum +"】";System.out.println(Thread.currentThread().getName() + task);return sum ;}@Overrideprotected Integer compute() {if ((over - start) <= THRESHOLD) {return sumCalculate();}else {int middle = (start + over) / 2;SumTask left = new SumTask(arr, start, middle);SumTask right = new SumTask(arr, middle, over);left.fork();right.fork();return left.join() + right.join();}} }2、核心API說明
ForkJoinPool:線程池最大的特點就是分叉(fork)合并(join)模式,將一個大任務拆分成多個小任務,并行執行,再結合工作竊取算法提高整體的執行效率,充分利用CPU資源。
ForkJoinTask:運行在ForkJoinPool的一個任務抽象,可以理解為類線程但是比線程輕量的實體,在ForkJoinPool中運行的少量ForkJoinWorkerThread可以持有大量的ForkJoinTask和它的子任務,同時也是一個輕量的Future,使用時應避免較長阻塞或IO。
繼承子類:
- RecursiveAction:遞歸無返回值的ForkJoinTask子類;
- RecursiveTask:遞歸有返回值的ForkJoinTask子類;
核心方法:
- fork():在當前線程運行的線程池中創建一個子任務;
- join():模塊子任務完成的時候返回任務結果;
- invoke():執行任務,也可以實時等待最終執行結果;
3、核心策略說明
任務拆分
ForkJoinPool基于分治算法,將大任務不斷拆分下去,每個子任務再拆分一半,直到達到最閾值設定的任務粒度為止,并且把任務放到不同的隊列里面,然后從最底層的任務開始執行計算,并且往上一層合并結果,這樣用相對少的線程處理大量的任務。
工作竊取算法
大任務被分割為獨立的子任務,并且子任務分別放到不同的隊列里,并為每個隊列創建一個線程來執行隊列里的任務,假設線程A優先把分配到自己隊列里的任務執行完畢,此時如果線程E對應的隊列里還有任務等待執行,空閑的線程A會竊取線程E隊列里任務執行,并且為了減少竊取任務時線程A和被竊取任務線程E之間的發生競爭,竊取任務的線程A會從隊列的尾部獲取任務執行,被竊取任務線程E會從隊列的頭部獲取任務執行。
工作竊取算法的優點:線程間的競爭很少,充分利用線程進行并行計算,但是在任務隊列里只有一個任務時,也可能會存在競爭情況。
三、應用案例分析
在后端系統的業務開發中,可用做權限校驗,批量定時任務狀態刷新等各種功能場景:
如上圖,假設數據的主鍵id分段如下,數據場景可能是數據源的連接信息,或者產品有效期類似業務,都可以基于線程池任務處理:
權限校驗
基于數據源的連接信息,判斷數據源是否可用,例如:判斷連接是否可用,用戶是否有庫表的讀寫權限,在數據源多的情況下,基于線程池快速校驗。
狀態刷新
在定時任務中,經常見到狀態類的刷新操作,例如判斷產品是否在有效期范圍內,在有效期范圍之外,把數據置為失效狀態,都可以利用線程池快速處理。
四、源代碼地址
GitHub·地址 https://github.com/cicadasmile/java-base-parent GitEE·地址 https://gitee.com/cicadasmile/java-base-parent推薦閱讀:Java并發系列
| 01 | Java并發:線程的創建方式,狀態周期管理 |
| 02 | Java并發:線程核心機制,基礎概念擴展 |
| 03 | Java并發:多線程并發訪問,同步控制 |
| 04 | Java并發:線程間通信,等待/通知機制 |
| 05 | Java并發:悲觀鎖和樂觀鎖機制 |
| 06 | Java并發:Lock機制下API用法詳解 |
總結
以上是生活随笔為你收集整理的Java并发编程(07):Fork/Join框架机制详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: windows2003的一些设置之一
- 下一篇: DHCP数据恢复