Java中的Fork / Join框架的简要概述
Fork / Join框架是使用并發分治法解決問題的框架。 引入它們是為了補充現有的并發API。 在介紹它們之前,現有的ExecutorService實現是運行異步任務的流行選擇,但是當任務同質且獨立時,它們會發揮最佳作用。 運行依賴的任務并使用這些實現來組合其結果并不容易。 隨著Fork / Join框架的引入,人們試圖解決這一缺陷。 在本文中,我們將簡要介紹API,并解決幾個簡單的問題以了解其工作原理。
解決非阻塞任務
讓我們直接跳入代碼。 讓我們創建一個任務,該任務將返回List的所有元素的總和。 以下步驟以偽代碼表示我們的算法:
01.查找列表的中間索引
02.在中間劃分列表
03.遞歸創建一個新任務,該任務將計算剩余部分的總和
04.遞歸創建一個新任務,該任務將計算正確部分的總和
05.將左總和,中間元素和右總和的結果相加
這是代碼–
@Slf4j public class ListSummer extends RecursiveTask<Integer> {private final List<Integer> listToSum;ListSummer(List<Integer> listToSum) {this.listToSum = listToSum;}@Overrideprotected Integer compute() {if (listToSum.isEmpty()) {log.info("Found empty list, sum is 0");return 0;}int middleIndex = listToSum.size() / 2;log.info("List {}, middle Index: {}", listToSum, middleIndex);List<Integer> leftSublist = listToSum.subList(0, middleIndex);List<Integer> rightSublist = listToSum.subList(middleIndex + 1, listToSum.size());ListSummer leftSummer = new ListSummer(leftSublist);ListSummer rightSummer = new ListSummer(rightSublist);leftSummer.fork();rightSummer.fork();Integer leftSum = leftSummer.join();Integer rightSum = rightSummer.join();int total = leftSum + listToSum.get(middleIndex) + rightSum;log.info("Left sum is {}, right sum is {}, total is {}", leftSum, rightSum, total);return total;} }首先,我們擴展了ForkJoinTask的RecursiveTask子類型。 這是我們期望并發任務返回結果時的擴展類型。 當任務不返回結果而僅執行效果時,我們擴展RecursiveAction子類型。 對于我們解決的大多數實際任務,這兩個子類型就足夠了。
其次,RecursiveTask和RecursiveAction都定義了一種抽象計算方法。 這是我們進行計算的地方。
第三,在我們的計算方法內部,我們檢查通過構造函數傳遞的列表的大小。 如果為空,則我們已經知道總和的結果為零,然后我們立即返回。 否則,我們將列表分為兩個子列表,并創建ListSummer類型的兩個實例。 然后,我們在這兩個實例上調用fork()方法(在ForkJoinTask中定義)–
leftSummer.fork(); rightSummer.fork();導致將這些任務安排為異步執行的原因,稍后將在本文中解釋用于此目的的確切機制。
之后,我們調用join()方法(也在ForkJoinTask中定義)以等待這兩部分的結果
Integer leftSum = leftSummer.join(); Integer rightSum = rightSummer.join();然后將其與列表的中間元素相加以獲得最終結果。
添加了許多日志消息,以使示例更易于理解。 但是,當我們處理包含數千個條目的列表時,擁有詳細的日志記錄(尤其是記錄整個列表)可能不是一個好主意。
就是這樣。 現在為測試運行創建一個測試類–
public class ListSummerTest {@Testpublic void shouldSumEmptyList() {ListSummer summer = new ListSummer(List.of());ForkJoinPool forkJoinPool = new ForkJoinPool();forkJoinPool.submit(summer);int result = summer.join();assertThat(result).isZero();}@Testpublic void shouldSumListWithOneElement() {ListSummer summer = new ListSummer(List.of(5));ForkJoinPool forkJoinPool = new ForkJoinPool();forkJoinPool.submit(summer);int result = summer.join();assertThat(result).isEqualTo(5);}@Testpublic void shouldSumListWithMultipleElements() {ListSummer summer = new ListSummer(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9));ForkJoinPool forkJoinPool = new ForkJoinPool();forkJoinPool.submit(summer);int result = summer.join();assertThat(result).isEqualTo(45);} }在測試中,我們創建一個ForkJoinPool的實例。 ForkJoinPool是用于運行ForkJoinTasks的唯一ExecutorService實現。 它采用一種稱為工作竊取算法的特殊算法。 與其他ExecutorService實現相反,在該實現中,只有一個隊列包含要執行的所有任務,在工作竊取實現中,每個工作線程都獲得其工作隊列。 每個線程都從其隊列開始執行任務。
當我們檢測到ForkJoinTask可以分解為多個較小的子任務時,便將它們分解為較小的任務,然后在這些任務上調用fork()方法。 該調用導致子任務被推入執行線程的隊列中。 在執行期間,當一個線程用盡隊列/沒有要執行的任務時,它可以從其他線程的隊列中“竊取”任務(因此稱為“工作竊取”)。 與使用任何其他ExecutorService實現相比,這種竊取行為可以帶來更高的吞吐量。
之前,當我們在leftSummer和rightSummer任務實例上調用fork()時,它們被推入執行線程的工作隊列中,之后它們被池中的其他活動線程“偷”(依此類推),因為它們確實那時沒有其他事情要做。
很酷吧?
解決阻止任務
我們剛才解決的問題本質上是非阻塞的。 如果我們想解決一個阻塞操作的問題,那么為了獲得更好的吞吐量,我們將需要改變策略。
讓我們用另一個例子來研究一下。 假設我們要創建一個非常簡單的網絡搜尋器。 該搜尋器將接收HTTP鏈接列表,執行GET請求以獲取響應主體,然后計算響應長度。 這是代碼–
@Slf4j public class ResponseLengthCalculator extends RecursiveTask<Map<String, Integer>> {private final List<String> links;ResponseLengthCalculator(List<String> links) {this.links = links;}@Overrideprotected Map<String, Integer> compute() {if (links.isEmpty()) {log.info("No more links to fetch");return Collections.emptyMap();}int middle = links.size() / 2;log.info("Middle index: {}", links, middle);ResponseLengthCalculator leftPartition = new ResponseLengthCalculator(links.subList(0, middle));ResponseLengthCalculator rightPartition = new ResponseLengthCalculator(links.subList(middle + 1, links.size()));log.info("Forking left partition");leftPartition.fork();log.info("Left partition forked, now forking right partition");rightPartition.fork();log.info("Right partition forked");String middleLink = links.get(middle);HttpRequester httpRequester = new HttpRequester(middleLink);String response;try {log.info("Calling managedBlock for {}", middleLink);ForkJoinPool.managedBlock(httpRequester);response = httpRequester.response;} catch (InterruptedException ex) {log.error("Error occurred while trying to implement blocking link fetcher", ex);response = "";}Map<String, Integer> responseMap = new HashMap<>(links.size());Map<String, Integer> leftLinks = leftPartition.join();responseMap.putAll(leftLinks);responseMap.put(middleLink, response.length());Map<String, Integer> rightLinks = rightPartition.join();responseMap.putAll(rightLinks);log.info("Left map {}, middle length {}, right map {}", leftLinks, response.length(), rightLinks);return responseMap;}private static class HttpRequester implements ForkJoinPool.ManagedBlocker {private final String link;private String response;private HttpRequester(String link) {this.link = link;}@Overridepublic boolean block() {HttpGet headRequest = new HttpGet(link);CloseableHttpClient client = HttpClientBuilder.create().disableRedirectHandling().build();try {log.info("Executing blocking request for {}", link);CloseableHttpResponse response = client.execute(headRequest);log.info("HTTP request for link {} has been executed", link);this.response = EntityUtils.toString(response.getEntity());} catch (IOException e) {log.error("Error while trying to fetch response from link {}: {}", link, e.getMessage());this.response = "";}return true;}@Overridepublic boolean isReleasable() {return false;}} }我們創建ForkJoinPool.ManagedBlocker的實現,在其中放置阻塞的HTTP調用。 該接口定義了兩個方法– block()和isReleasable() 。 block()方法是我們進行阻塞調用的地方。 在完成阻塞操作之后,我們返回true,指示不再需要進一步的阻塞。 我們從isReleasable()實現中返回false,以向fork-join工作線程指示block()方法實現本質上可能在阻塞。 isReleasable()實現將在調用block()方法之前先由fork-join工作線程調用。 最后,我們通過調用ForkJoinPool.managedBlock()靜態方法將HttpRequester實例提交到池中。 之后,我們的阻止任務將開始執行。 當它阻塞HTTP請求時,如果有必要,ForkJoinPool.managedBlock()方法還將安排激活備用線程,以確保足夠的并行性。
那么,讓我們將此實現用于測試驅動! 這是代碼–
public class ResponseLengthCalculatorTest {@Testpublic void shouldReturnEmptyMapForEmptyList() {ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(Collections.emptyList());ForkJoinPool pool = new ForkJoinPool();pool.submit(responseLengthCalculator);Map<String, Integer> result = responseLengthCalculator.join();assertThat(result).isEmpty();}@Testpublic void shouldHandle200Ok() {ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of("http://httpstat.us/200"));ForkJoinPool pool = new ForkJoinPool();pool.submit(responseLengthCalculator);Map<String, Integer> result = responseLengthCalculator.join();assertThat(result).hasSize(1).containsKeys("http://httpstat.us/200").containsValue(0);}@Testpublic void shouldFetchResponseForDifferentResponseStatus() {ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of("http://httpstat.us/200","http://httpstat.us/302","http://httpstat.us/404","http://httpstat.us/502"));ForkJoinPool pool = new ForkJoinPool();pool.submit(responseLengthCalculator);Map<String, Integer> result = responseLengthCalculator.join();assertThat(result).hasSize(4);} }今天就這樣,伙計們! 與往常一樣,任何反饋/改進建議/評論都將受到高度贊賞!
此處討論的所有示例都可以在Github上找到( 特定提交 )。
大呼大叫的http://httpstat.us服務,對于開發簡單的測試非常有幫助。
翻譯自: https://www.javacodegeeks.com/2019/01/brief-overview-fork-join-framework-java.html
總結
以上是生活随笔為你收集整理的Java中的Fork / Join框架的简要概述的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安卓玩赛尔号(安卓赛尔号)
- 下一篇: java 鲜为人知的知识点_鲜为人知的J