谈谈 ForkJoin 框架的设计与实现
在了解Fork-Join之前,我們得先了解什么是并行計算。
并行計算
相對于串行計算,并行計算可以劃分成時間并行和空間并行。時間并行即指令流水化,也就是流水線技術。比如說生產一輛小汽車,有特定的輪子車間/發動機車間,同時進行各自的生產。空間并行是指使用多個處理器執行并發計算。
以程序和算法設計人員的角度看,并行計算又可分為數據并行和任務并行。數據并行把大的任務化解成若干個相同的子任務,任務并行是指每一個線程執行一個分配到的任務,而這些線程則被分配(通常是操作系統內核)到該并行計算體系的各個計算節點中去。
簡單來說,并行計算是通過把大問題劃分為小問題,運用計算機資源并行的處理子問題,當需要得到大問題的結果時,將小問題的結果按順序合并起來得到最終結果。這種思想就是分治思想,小到歸并排序,大到大數據計算...
?
Fork-Join
Fork-Join框架是Doug Lea 大神在JDK7引入的。Fork就是把大問題拆分成小問題,也就是大任務拆成多個子任務,并行執行子任務。Join就是把任務的結果按順序合并起來。
假設我們需要求從 1-1億之間的數字和,按照Fork-Join的思想,可分為以下三步:
Step1.定義拆分子任務和合并子任務的規則
-
劃分子任務的規則
首先將任務拆為 1-5千萬 和 5千萬01 - 1億兩個子任務,直到每個子任務計算的數字范圍在1萬以內的時候,我們才計算這個任務的和。
-
合并子任務的規則
同一父任務的所有子任務的結果再相加,就是這一父任務的結果。
Step2.充分利用計算機資源,最大并行的執行子任務
Step3.充分利用計算機資源,執行合并所有子任務,獲得最終的結果
顯然一般人做不了后兩步,我們只需要把?怎么拆,怎么和?告訴Fork-Join框架,Fork-Join框架就幫我們做好?如何最大并行執行子任務?和?如何最有效合并子任務。
設計原理
如何充分利用計算機資源,最大并行執行子任務?
一般小伙伴應該可以想到使用多線程,讓線程數等于CPU核數。此時可以充分利用CPU的計算資源。
我們來看一下JDK普通線程池是咋玩的。(不要說你不懂為啥池化 :)
任務都是丟到一個同步隊列BlockingQueue中的。如果你了解JDK BlockingQueue的實現,就知道有界的同步隊列都是用鎖阻塞的,有些push/poll操作還共用一把鎖。
問題1:并行的任務有必要共用一個阻塞隊列嗎?
問題2: 如果任務隊列中的任務存在依賴,worker線程只能被阻塞著。啥意思呢?
假設任務隊列中存在兩個任務task1和task2,task1的執行結果依賴于task2的結果。如果worker1先拉取到task1,結果發現此時task2還沒有被執行。則worker1只能阻塞等待別的worker拉取到task2,task2執行完了worker1才能繼續執行task1。
如果worker1當發現task1無法繼續執行下去時,能夠先把它放一邊,繼續拉取任務執行。這樣效率是比較高的。
Work?Stealing
Fork-Join框架通過Work?Stealing算法解決上面兩個問題。
-
每個線程擁有自己的任務隊列,并且是雙端隊列。
-
線程操作自己的任務隊列是LIFO(Last in First out)模式。
-
線程還可以偷取別的線程任務隊列中的任務,模式為FIFO(First in First out)。
顯然?每個線程擁有自己的任務隊列可以提高獲取隊列的并行度。
雙端任務隊列將所屬的自己線程的push/pop操作?和?其他線程的steal操作通過不同的模式區分開。這樣只有當Base==Top-1時,pop操作和steal操作才會有沖突。
如何才能準確及時知道Base==Top-1呢,Fork-Join框架的牛逼之處也在于對任務的調度是輕量級的。
steal操作
考慮steal操作,是多個其他線程的同步操作。需要保證:偷到Base處的任務和Base++的原子性,同時Base的值一旦改變,其他線程應該能夠馬上可見。聰明的小伙伴是不是想到?鎖和volatile?了:)
//steal操作 就是 poll()方法 final ForkJoinTask<?> poll() {ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; //array就是雙端隊列,實際用數組實現。 //base是將要偷的任務下標,base是用volatile修飾的,保證可見性 //top是將要push進去的任務下標,可參考上面示意圖while ((b = base) - top < 0 && (a = array) != null) {//說明經過while條件初步判斷任務隊列不為空 //獲取base處的任務在任務隊列中的偏移量int j = (((a.length - 1) & b) << ASHIFT) + ABASE; //用volatile load 語義取出base處的任務t,可以簡單理解為一定是最新修改版本的任務t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); //再次讀取base,判斷此時t是否被別的線程偷走if (base == b) {if (t != null) { //如果多次讀判斷都沒啥問題,CAS修改base處的任務t為nullif (U.compareAndSwapObject(a, j, t, null)) { //如果上面修改成功,表示這個任務被該線程偷到了 //此時就將base指針向前移一位,注意這一步是原子操作,base++就不是了base = b + 1;return t;}}else if (b + 1 == top) // 如果t==null && b + 1 == top,此時任務隊列為空break;}}return null;}簡單來說,有任務可偷時,通過CAS偷任務保證只有一個線程能偷成功,偷成功的這個線程接著修改volatile base指針,使得馬上對其他線程可見。同時通過前面的多次讀判斷減少后期CAS并發的沖突概率。沒任務可偷時,通過CAS偷任務失敗可以判斷出來。
請小伙伴一句句看上面的代碼,阿姨都注釋出來了。雖然上面并沒有鎖,,但是小伙伴想想鎖其實是悲觀控制并發的思想,是不是可以拆成多次讀判斷 + CAS原子修改的樂觀思想來控制并發。只要最終保證只有一個能修改成功就可以了。
push操作
考慮push操作,是任務隊列所屬的線程才能操作,天生線程安全:不需要通過CAS或鎖來保證同步,只需要原子的修改top處任務?和?top向前移一位?就可以了。同理,top也不需要用volatile修飾。
final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; ForkJoinPool p;int b = base, s = top, n;if ((a = array) != null) { // ignore if queue removedint m = a.length - 1; // fenced write for task visibility //更新雙端隊列array的top處任務為task,直接原子更新,非CAS操作 //因為這個方法只會被array所屬的線程調用,所以這里是線程安全的U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); //top指針向前移一位U.putOrderedInt(this, QTOP, s + 1);if ((n = s - b) <= 1) { //說明未push前隊列中最多有一個任務if ((p = pool) != null) //此時喚醒其他等待的線程,表示整體pool中有事情可以做了。。p.signalWork(p.workQueues, this);}else if (n >= m) //隊列擴容growArray();}}小伙伴思考下,這里Base和Top指針會存在任務沖突嗎?其實不會哦,因為兩個指針都在往前沖,Base永遠追趕不上Top。這個方法額外需要做的事情?是?喚醒空閑線程?表示有任務進來了, 判斷隊列是否需要擴容就好。
pop操作
考慮pop操作,雖然任務隊列所屬的線程才能操作,但是當任務隊列只有一個任務時,存在steal操作和pop操作的任務競爭。原理就和steal操作一致了,當CAS修改top-1處任務為空 成功時,再更新top值為top-1。
final ForkJoinTask<?> pop() {ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;if ((a = array) != null && (m = a.length - 1) >= 0) {for (int s; (s = top - 1) - base >= 0;) {long j = ((m & s) << ASHIFT) + ABASE;if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)break;if (U.compareAndSwapObject(a, j, t, null)) {U.putOrderedInt(this, QTOP, s);return t;}}}return null;}注意這個pop操作并沒有steal操作那么多次預讀避免并發競爭,小姐姐yy是因為pop操作只有在任務隊列中只有一個任務時,才會存在和Steal操作的競爭問題。而Steal操作也時時可能存在多個其他線程的競爭問題的。
通過上面三個任務調度方法的分析,你有沒有感受到一絲絲FJ的調度輕量級呢?
總結一下:Fork-Join框架通過將共享的任務隊列拆分成線程獨有的雙端任務隊列,多線程steal操作通過多次讀和CAS保證同步,steal操作和pop操作??通過CAS?保證同步,push操作線程安全,不需要同步。
問題是什么時候線程消費自己的任務隊列中的任務,什么時候會去偷別的線程的任務,一個任務在Fork-Join框架中的生命周期是怎樣的,又是怎么流轉的?
?
Fork-Join框架使用
要能回答上面的問題,我們先看一下如何使用Fork-Join框架。上面這三個方法并不是我們能直接調用的,這三個方法是Fork-Join自己在合適的時機自己調用的。像最開始所說,使用者只需要:定義好拆分子任務和合并子任務的規則的大任務,并且把任務丟給ForkJoinPool就好
求 1-1億之間的數字和
Step1.定義一個求和的任務類
繼承RecursiveTask類,重寫其compute()方法:
RecursiveTask如其名,是一個歸并任務。compute()方法是具體如何拆分,如何歸并的實現。fork()方法就是在確定拆分子任務規則時調用的,該方法會把子任務push到當前線程自己的任務隊列中;join()方法就是在確定合并子任務的規則時調用的,該方法會等待直到返回子任務的結果。
public class SumTask extends RecursiveTask<Long> {private long[] numbers;private int from;private int to;public SumTask(long[] numbers, int from, int to) {this.numbers = numbers;this.from = from;this.to = to;}@Overrideprotected Long compute() {//拆分子任務的規則:// 1.當需要計算的數字小于6時,直接計算結果if (to - from < 6) {long total = 0;for (int i = from; i <= to; i++) {total += numbers[i];}return total;// 2.否則,把任務一分為二,遞歸計算} else {int middle = (from + to) / 2;//構造子任務SumTask taskLeft = new SumTask(numbers, from, middle);SumTask taskRight = new SumTask(numbers, middle+1, to);//將子任務添加到任務隊列,這一步我們還是要做了taskLeft.fork();taskRight.fork();//合并所有子任務的規則:所有子任務的結果相加return taskLeft.join() + taskRight.join();}}}在等待子任務結果的時候,線程被阻塞了嗎?
(當然沒有,這段時間其實就會偷任務來做。后面我們再分析:)
Step2.構造一個Fork-Join線程池,把上面的求和大任務SumTask丟進去
public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool();SumTask sumTask = new SumTask(numbers, 0, numbers.length-1)long result = pool.invoke(sumTask);System.out.println(result); }從這里,我們可以看到任務丟進線程池是調用的pool.invoke(sumTask)
( 熟悉JDK線程池實現的小伙伴可以結合上面ForkJoin框架的原理想想任務該如何流轉。小姐姐開始了:)
一個歸并任務的流轉
Step1.任務提交到任務隊列
包括invoke等所有任務提交方法最終都會調用ForkJoinPool.externalPush方法。
這里面需要考慮將任務提交到哪個隊列?
如果提交到ForkJoinWorkerThread自己的雙端任務隊列中:不管提交到頭還是尾,都會和我們上面分析的三個操作發生任務沖突。而且如何選擇負載最小的線程來提交也會增加問題復雜性。
ForkJoinPool中雙端任務隊列是用數組(volatile WorkQueue[] workQueues)實現的,其中奇數下標存放的是可激活的任務隊列,偶數下標存放的是不可激活的任務隊列。激活指的是這個隊列是否是某個ForkJoin線程的任務隊列。
ForkJoinPool.externalPush只能將任務提交到不可激活任務隊列,該方法的主要邏輯為:
當提交的任務是pool的第一個任務時,會初始化workQueues,ForkJoinWorkerThread等資源,通過hash算法選擇一個偶數下標的workQueue,在TOP處放入任務。同時喚醒ForkJoinWorkerThread開始拉取任務工作。
當提交的任務不是第一個任務,此時workQueues等資源已初始化好。同樣需要選擇一個偶數下標的workQueue存放任務,如果選中的workQueue只有這一個任務,說明之前線程資源大概率是閑置的狀態,會嘗試?喚醒(signalWork方法)?一個空閑的ForkJoinWorkerThread開始拉取任務工作。
Step2.ForkJoinWorkerThread的運行
我們先看一下任務的生產和消費模式:
可激活的workQueue自己所屬ForkJoinWorkerThread的任務模式是LIFO(Last In First Out)
不可激活的workQueue的任務模式是FIFO(First In First Out)
ForkJoinWorkerThread剛開始運行時會調用ForkJoinWorkerThread.scan方法隨機選取一個隊列從Base處撈取任務.撈取到任務會調用WorkQueue.runTask方法執行任務,最終對于RecursiveTask任務執行的是RecursiveTask.exec方法。
protected final boolean exec() { //我們一開始定義SumTask的實現方法:computeresult = compute();return true;}里面調用的就是我們一開始定義SumTask的實現方法:compute方法。
fork所做的事情就是將我們切分的子任務添加到當前ForkJoinWorkerThread自己的workQueue中
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);join所做的事情就是等待子任務的返回結果
public final V join() {int s;//doJoin會返回執行結果if ((s = doJoin() & DONE_MASK) != NORMAL)//結果有異常,拋出異常信息reportException(s);//結果無異常,返回正常結果return getRawResult();}講原理的時候我們提到了當調用join獲取任務結果時,ForkJoinWorkerThread會根據當前任務的情況,做出最正確的執行判斷,而不是單純的阻塞等待結果。
Step3.join時執行任務的判斷
結合上面求和的例子,我們來看一下求1-10之間的數字和的求和任務的可能join過程:
case1:任務未被偷
假設求和?1-10任務被Thread1執行,fork出兩個子任務:1-5?和?6-10,只要Thread1能判斷出來要join的任務在自己的任務隊列中,那當前join哪個子任務就把它取出來執行就可以。
case2:任務被偷,此時自己的任務隊列為空,可以幫助小偷執行它未完成的任務
假設求和?1-10任務被Thread1執行,fork出兩個子任務:1-5?和?6-10。6-10已成功執行完成,join返回了結果。但此時發現1-5被Thread2偷走了,自己的任務隊列中已經沒有任務可以執行了。此時Thread1可以找到小偷Thread2,并偷取Thread2的10-20任務來幫助它執行。
case3:任務被偷,此時自己的任務隊列不為空
假設求和?1-10任務被Thread1執行,fork出兩個子任務:1-5?和?6-10,要join?1-5時發現已經被Thread2偷走了,而自己隊列中還有6-10等待join執行。不好意思幫不了小偷了。
只好嘗試掛起自己等待1-5的執行結果通知,并嘗試喚醒空閑線程或者創建新的線程替代自己執行任務隊列中的6-10任務。
上述三種情況代碼均在ForkJoinPool.awaitJoin方法中。整體思路是:
當任務還在自己的隊列:
-
自己執行,獲取結果。
當被別人偷走阻塞了:
-
自己又沒任務執行,就幫助小偷執行任務。
-
自己有任務要執行,就嘗試掛起自己等待小偷的反饋結果,同時找隊友幫助自己執行。
這里任務模式有意思的是:
scan/steal操作都是從Base處獲取任務,那么更容易獲取到大的任務執行,從而使得整體線程的資源分配更加均衡。
任務隊列所屬的線程是LIFO的任務生產消費模式,剛好符合遞歸任務的執行順序。
?
至此你有沒有對ForkJoin框架的輕量級調度和Work?Stealing算法有一些了解呀:)
參考資料:
[1].http://gee.cs.oswego.edu/dl/papers/fj.pdf
[2].https://juejin.im/entry/5a027e2bf265da43247fdef7
[3].https://www.jianshu.com/p/f777abb7b251
[4].http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/
[5].https://zhuanlan.zhihu.com/p/38204373
[6].https://zhuanlan.zhihu.com/p/68554017
[7].https://zh.wikipedia.org/wiki/%E5%B9%B6%E8%A1%8C%E8%AE%A1%E7%AE%97
總結
以上是生活随笔為你收集整理的谈谈 ForkJoin 框架的设计与实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何选择 Git 分支模式?
- 下一篇: 厉害了,如何通过双 key 来解决缓存并