叉叉框架_叉/连接框架
叉叉框架
本文是我們名為Java Concurrency Essentials的學院課程的一部分。
在本課程中,您將深入探討并發的魔力。 將向您介紹并發和并發代碼的基礎知識,并學習諸如原子性,同步和線程安全性的概念。 在這里查看 !
目錄
1.簡介 2.叉/連接1.簡介
本文介紹了Fork / Join框架,該框架從1.7版開始就是JDK的一部分。 它描述了框架的基本功能,并提供了一些示例以提供一些實踐經驗。
2.叉/連接
Fork / Join框架的基類是java.util.concurrent.ForkJoinPool 。 此類實現Executor和ExecutorService這兩個接口,并AbstractExecutorService 。 因此, ForkJoinPool基本上是一個線程池,它承擔特殊任務,即ForkJoinTask 。 此類實現已知的Future接口以及諸如get() , cancel()和isDone() 。 除此之外,該類還提供了兩個為整個框架命名的方法: fork()和join() 。
調用fork()將啟動任務的異步執行時,調用join()將等待直到任務完成并檢索其結果。 因此,我們可以將給定任務拆分為多個較小的任務,分叉每個任務,最后等待所有任務完成。 這使復雜問題的實現更加容易。
在計算機科學中,這種方法也稱為分治法。 每當一個問題太復雜而無法立即解決時,它就會分為多個較小的問題,并且更容易解決。 可以這樣寫成偽代碼:
if(problem.getSize() > THRESHOLD) {SmallerProblem smallerProblem1 = new SmallerProblem();smallerProblem1.fork();SmallerProblem smallerProblem2 = new SmallerProblem();smallerProblem2.fork();return problem.solve(smallerProblem1.join(), smallerProblem2.join()); } else {return problem.solve(); }首先,我們檢查問題的當前大小是否大于給定的閾值。 在這種情況下,我們將問題分成較小的問題,對每個新任務進行fork() ,然后通過調用join()等待結果。 當join()返回每個子任務的結果時,我們必須找到較小問題的最佳解決方案,并將其作為最佳解決方案返回。 重復這些步驟,直到給定的閾值太低并且問題很小,我們可以直接計算其解而無需進一步除法。
遞歸任務
為了更好地掌握此過程,我們實現了一種算法,該算法可在整數值數組中找到最小的數字。 這個問題不是您使用ForkJoinPool在日常工作中解決的問題,但是以下實現非常清楚地顯示了基本原理。 在main()方法中,我們設置了一個帶有隨機值的整數數組,并創建了一個新的ForkJoinPool 。
傳遞給其構造函數的第一個參數是所需并行度的指示器。 在這里,我們在Runtime查詢可用的CPU內核數。 然后,我們調用invoke()方法并傳遞FindMin的實例。 FindMin擴展了RecursiveTask類,該類本身是前面提到的ForkJoinTask的子類。 類ForkJoinTask實際上有兩個子類:一個子類用于返回值的任務( RecursiveTask ),另一個子類用于不返回值的任務( RecursiveAction )。 超類迫使我們實現compute() 。 在這里,我們看一下整數數組的給定切片,并確定當前問題是否太大而無法立即解決。
當在數組中找到最小的數時,要直接解決的最小問題大小是將兩個元素相互比較并返回它們的最小值。 如果當前有兩個以上的元素,則將數組分為兩部分,然后再在這兩個部分中找到最小的數字。 通過創建兩個新的FindMin實例來完成此操作。
構造函數被提供給數組以及開始和結束索引。 然后,我們通過調用fork()異步開始執行這兩個任務。 該調用將兩個任務提交到線程池的隊列中。 線程池實現了一種稱為工作竊取的策略,即,如果所有其他線程都有足夠的工作要做,則當前線程會從其他任務之一中竊取其工作。 這樣可以確保任務盡快執行。
public class FindMin extends RecursiveTask<Integer> {private static final long serialVersionUID = 1L;private int[] numbers;private int startIndex;private int endIndex;public FindMin(int[] numbers, int startIndex, int endIndex) {this.numbers = numbers;this.startIndex = startIndex;this.endIndex = endIndex;}@Overrideprotected Integer compute() {int sliceLength = (endIndex - startIndex) + 1;if (sliceLength > 2) {FindMin lowerFindMin = new FindMin(numbers, startIndex, startIndex + (sliceLength / 2) - 1);lowerFindMin.fork();FindMin upperFindMin = new FindMin(numbers, startIndex + (sliceLength / 2), endIndex);upperFindMin.fork();return Math.min(lowerFindMin.join(), upperFindMin.join());} else {return Math.min(numbers[startIndex], numbers[endIndex]);}}public static void main(String[] args) {int[] numbers = new int[100];Random random = new Random(System.currentTimeMillis());for (int i = 0; i < numbers.length; i++) {numbers[i] = random.nextInt(100);}ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());Integer min = pool.invoke(new FindMin(numbers, 0, numbers.length - 1));System.out.println(min);} }遞歸動作
正如上面在RecursiveTask旁邊提到的,我們還有RecursiveAction類。 與RecursiveTask相比,它不必返回值,因此可以將其用于可以直接在給定數據結構上執行的異步計算。 這樣的例子是從彩色圖像中計算出灰度圖像。 我們要做的就是遍歷圖像的每個像素,并使用以下公式從RGB值中計算灰度值:
gray = 0.2126 * red + 0.7152 * green + 0.0722 * blue浮點數表示特定顏色對我們人類對灰色的感知做出的貢獻。 由于最高值用于綠色,因此可以得出結論,灰度圖像僅被計算為綠色部分的近3/4。 因此,假設圖像是代表實際像素數據的對象,并且使用setRGB()和getRGB()方法檢索實際RGB值,則基本實現將如下所示:
for (int row = 0; row < height; row++) {for (int column = 0; column < bufferedImage.getWidth(); column++) {int grayscale = computeGrayscale(image.getRGB(column, row));image.setRGB(column, row, grayscale);} }上面的實現在單個CPU機器上運行良好。 但是,如果我們有多個CPU可用,我們可能希望將此工作分配給可用的內核。 因此,我們可以使用ForkJoinPool并為圖像的每一行(或每一列)提交一個新任務,而不是遍歷所有像素的兩個嵌套for循環。 一旦將一行轉換為灰度,當前線程就可以在另一行上工作。
以下示例實現了此原理:
public class GrayscaleImageAction extends RecursiveAction {private static final long serialVersionUID = 1L;private int row;private BufferedImage bufferedImage;public GrayscaleImageAction(int row, BufferedImage bufferedImage) {this.row = row;this.bufferedImage = bufferedImage;}@Overrideprotected void compute() {for (int column = 0; column < bufferedImage.getWidth(); column++) {int rgb = bufferedImage.getRGB(column, row);int r = (rgb >> 16) & 0xFF;int g = (rgb >> 8) & 0xFF;int b = (rgb & 0xFF);int gray = (int) (0.2126 * (float) r + 0.7152 * (float) g + 0.0722 * (float) b);gray = (gray << 16) + (gray << 8) + gray;bufferedImage.setRGB(column, row, gray);}}public static void main(String[] args) throws IOException {ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());BufferedImage bufferedImage = ImageIO.read(new File(args[0]));for (int row = 0; row < bufferedImage.getHeight(); row++) {GrayscaleImageAction action = new GrayscaleImageAction(row, bufferedImage);pool.execute(action);}pool.shutdown();ImageIO.write(bufferedImage, "jpg", new File(args[1]));} }在main()方法中,我們使用Java的ImageIO類讀取圖像。 返回的BufferedImage實例具有我們需要的所有方法。 我們可以查詢行數和列數,并檢索和設置每個像素的RGB值。 因此,我們要做的是遍歷所有行并將新的GrayscaleImageAction提交到我們的ForkJoinPool 。 后者已收到有關可用處理器的提示,作為其構造函數的參數。
現在, ForkJoinPool通過調用其compute()方法來異步啟動任務。 在此方法中,我們遍歷每行并通過其灰度值更新相應的RGB值。 將所有任務提交給池后,我們在主線程中等待整個池的關閉,然后使用ImageIO.write()方法將更新的BufferedImage寫回到磁盤。
令人驚訝的是,與不使用可用處理器的情況相比,我們只需要多幾行代碼即可。 這再次顯示了使用java.util.concurrent包的可用資源可以節省多少工作。
ForkJoinPool提供了三種不同的提交任務的方法:
- execute(ForkJoinTask) :此方法異步執行給定的任務。 它沒有返回值。
- invoke(ForkJoinTask) :此方法等待任務返回值。
- submit(ForkJoinTask) :此方法異步執行給定的任務。 它返回對任務本身的引用。 因此,任務引用可用于查詢結果(因為它實現了Future接口)。
有了這些知識,很清楚為什么我們要使用execute()方法提交上述GrayscaleImageAction 。 如果我們改為使用invoke() ,則主線程將等待任務完成,而我們將不會利用可用的并行度。
仔細研究ForkJoinTask-API,我們會發現相同的區別:
- ForkJoinTask.fork() : ForkJoinTask是異步執行的。 它沒有返回值。
- ForkJoinTask.invoke() :立即執行ForkJoinTask并在完成后返回結果。
ForkJoinPool和ExecutorService
既然我們知道ExecutorService和ForkJoinPool ,您可能會問自己為什么我們應該使用ForkJoinPool而不是ExecutorService 。 兩者之間的差異不是很大。 兩者都具有execute()和submit()方法,并采用一些常見接口的實例,例如Runnable , Callable , RecursiveAction或RecursiveTask 。
為了更好地理解這些區別,讓我們嘗試使用ExecutorService從上面實現FindMin類:
public class FindMinTask implements Callable<Integer> {private int[] numbers;private int startIndex;private int endIndex;private ExecutorService executorService;public FindMinTask(ExecutorService executorService, int[] numbers, int startIndex, int endIndex) {this.executorService = executorService;this.numbers = numbers;this.startIndex = startIndex;this.endIndex = endIndex;}public Integer call() throws Exception {int sliceLength = (endIndex - startIndex) + 1;if (sliceLength > 2) {FindMinTask lowerFindMin = new FindMinTask(executorService, numbers, startIndex, startIndex + (sliceLength / 2) - 1);Future<Integer> futureLowerFindMin = executorService.submit(lowerFindMin);FindMinTask upperFindMin = new FindMinTask(executorService, numbers, startIndex + (sliceLength / 2), endIndex);Future<Integer> futureUpperFindMin = executorService.submit(upperFindMin);return Math.min(futureLowerFindMin.get(), futureUpperFindMin.get());} else {return Math.min(numbers[startIndex], numbers[endIndex]);}}public static void main(String[] args) throws InterruptedException, ExecutionException {int[] numbers = new int[100];Random random = new Random(System.currentTimeMillis());for (int i = 0; i < numbers.length; i++) {numbers[i] = random.nextInt(100);}ExecutorService executorService = Executors.newFixedThreadPool(64);Future<Integer> futureResult = executorService.submit(new FindMinTask(executorService, numbers, 0, numbers.length-1));System.out.println(futureResult.get());executorService.shutdown();} }該代碼看起來非常相似,期望我們submit()任務submit()給ExecutorService ,然后使用返回的Future實例來wait()結果。 兩種實現之間的主要區別可以在構造線程池的那一點上找到。 在上面的示例中,我們創建了一個具有64(!)個線程的固定線程池。 為什么選擇這么大的數字? 原因是,對每個返回的Future調用get()阻塞當前線程,直到結果可用為止。 如果我們僅向可用池提供盡可能多的線程,則程序將耗盡資源并無限期地掛起。
ForkJoinPool實現了已經提到的工作竊取策略,即每次運行線程必須等待某些結果時; 該線程從工作隊列中刪除當前任務,并執行其他準備運行的任務。 這樣,當前線程不會被阻塞,并且可以用來執行其他任務。 一旦計算出最初暫停的任務的結果,該任務就會再次執行,join()方法將返回結果。 這與普通的ExecutorService有一個重要的區別,在常規ExecutorService ,您必須在等待結果時阻止當前線程。
翻譯自: https://www.javacodegeeks.com/2015/09/forkjoin-framework.html
叉叉框架
總結
以上是生活随笔為你收集整理的叉叉框架_叉/连接框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 雷蛇萨诺狼蛛和达尔优ek815哪个好
- 下一篇: 奇幻水晶缘快捷键(奇幻水晶缘攻略)