为什么非阻塞io性能更好_提高性能:流的非阻塞处理
為什么非阻塞io性能更好
1.簡介
想象一下,我們有一個需要訪問外部Web服務的應用程序,以便收集有關客戶端的信息,然后對其進行處理。 更具體地說,我們無法在一次調用中獲得所有這些信息。 如果我們要查找不同的客戶端,則需要多次調用。
如下圖所示,該示例應用程序將檢索有關多個客戶的信息,將它們分組在一個列表中,然后對其進行處理以計算其購買總額:
在這篇文章中,我們將看到收集信息的不同方法,并且從性能方面來看,哪一種是最好的。
這是與Java相關的文章。 但是,我們將使用Spring框架來調用RESTful Web服務。
欄目:
可以在Java 8 GitHub存儲庫中找到源代碼。
此外,您可以訪問此存儲庫中公開RESTful Web服務的Web應用程序的源代碼。
2.解釋示例
在我們的應用程序中,我們有20個ID的列表,這些ID表示我們要從Web服務檢索的客戶端。 檢索完所有客戶之后,我們將查看每個客戶購買了什么,并對它們進行匯總以計算出所有客戶花費的總金額。
但是,有一個問題,該Web服務每次調用僅允許檢索一個客戶端,因此我們將需要調用該服務20次。 此外,Web服務有點慢,至少需要兩秒鐘才能響應請求。
如果我們看一下實現Web服務的應用程序,我們可以看到調用是由ClientController類處理的:
@RestController @RequestMapping(value="/clients") public class ClientController {@Autowiredprivate ClientService service;@RequestMapping(value="/{clientId}", method = RequestMethod.GET)public @ResponseBody Client getClientWithDelay(@PathVariable String clientId) throws InterruptedException {Thread.sleep(2000);Client client = service.getClient(clientId);System.out.println("Returning client " + client.getId());return client;} }Thread.sleep用于模擬響應速度慢。
域類(客戶)包含我們需要的信息; 客戶花了多少錢:
public class Client implements Serializable {private static final long serialVersionUID = -6358742378177948329L;private String id;private double purchases;public Client() {}public Client(String id, double purchases) {this.id = id;this.purchases = purchases;}//Getters and setters }3.首次嘗試:順序流
在第一個示例中,我們將順序調用服務以獲取所有二十個客戶端的信息:
public class SequentialStreamProcessing {private final ServiceInvoker serviceInvoker;public SequentialStreamProcessing() {this.serviceInvoker = new ServiceInvoker();}public static void main(String[] args) {new SequentialStreamProcessing().start();}private void start() {List<String> ids = Arrays.asList("C01", "C02", "C03", "C04", "C05", "C06", "C07", "C08", "C09", "C10", "C11", "C12", "C13", "C14", "C15", "C16", "C17", "C18", "C19", "C20");long startTime = System.nanoTime();double totalPurchases = ids.stream().map(id -> serviceInvoker.invoke(id)).collect(summingDouble(Client::getPurchases));long endTime = (System.nanoTime() - startTime) / 1_000_000;System.out.println("Sequential | Total time: " + endTime + " ms");System.out.println("Total purchases: " + totalPurchases);} }輸出:
Sequential | Total time: 42284 ms Total purchases: 20.0該程序的執行大約需要42秒。 這是太多時間。 讓我們看看是否可以改善其性能。
4.提高性能:并行流
Java 8允許我們將流分成多個塊,并在單獨的線程中處理每個流。 我們需要做的就是簡單地在上一個示例中將流創建為并行流。
您應考慮到每個塊將在其線程中異步執行,因此處理這些塊的順序一定無關緊要。 在我們的案例中,我們正在匯總購買量,因此我們可以做到。
讓我們嘗試一下:
private void start() {List<String> ids = Arrays.asList("C01", "C02", "C03", "C04", "C05", "C06", "C07", "C08", "C09", "C10", "C11", "C12", "C13", "C14", "C15", "C16", "C17", "C18", "C19", "C20");long startTime = System.nanoTime();double totalPurchases = ids.parallelStream().map(id -> serviceInvoker.invoke(id)).collect(summingDouble(Client::getPurchases));long endTime = (System.nanoTime() - startTime) / 1_000_000;System.out.println("Parallel | Total time: " + endTime + " ms");System.out.println("Total purchases: " + totalPurchases); }輸出:
Parallel | Total time: 6336 ms Total purchases: 20.0哇,這是一個很大的進步! 但是這個數字是什么來的呢?
并行流在內部使用ForkJoinPool,它是Java 7中引入的ForkJoin框架所使用的池。默認情況下,該池使用與計算機處理器可以處理的線程數相同的線程。 我的筆記本電腦是可以處理8個線程的四核(您可以通過調用Runtime.getRuntime.availableProcessors進行檢查),因此它可以并行地對Web服務進行8次調用。 由于我們需要20次調用,因此至少需要3次“回合”:
好的,所以從40秒到6秒是一個不錯的改進,但是,我們還能進一步改進嗎? 答案是肯定的。
5.使用CompletableFuture進行非阻塞處理
讓我們分析先前的解決方案。
我們發送8個線程來調用每個Web服務,但是當該服務正在處理請求時(整整兩秒鐘),我們的處理器除了等待外什么也不做(這是IO操作)。 在這些請求不回來之前,我們將無法發送更多請求。
問題是,如果我們可以異步發送所有20個請求,釋放處理器并在可用時處理每個響應,該怎么辦? 這是CompletableFuture搶救的地方:
public class AsyncStreamExecutorProcessing {private final ServiceInvoker serviceInvoker;private final ExecutorService executorService = Executors.newFixedThreadPool(100);public AsyncStreamExecutorProcessing() {this.serviceInvoker = new ServiceInvoker();}public static void main(String[] args) {new AsyncStreamExecutorProcessing().start();}private void start() {List<String> ids = Arrays.asList("C01", "C02", "C03", "C04", "C05", "C06", "C07", "C08", "C09", "C10", "C11", "C12", "C13", "C14", "C15", "C16", "C17", "C18", "C19", "C20");long startTime = System.nanoTime();List<CompletableFuture<Client>> futureRequests = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> serviceInvoker.invoke(id), executorService)).collect(toList());double totalPurchases = futureRequests.stream().map(CompletableFuture::join).collect(summingDouble(Client::getPurchases));long endTime = (System.nanoTime() - startTime) / 1_000_000;System.out.println("Async with executor | Total time: " + endTime + " ms");System.out.println("Total purchases: " + totalPurchases);executorService.shutdown();} }輸出:
Async with executor | Total time: 2192 ms Total purchases: 20.0在上一個示例中花費了三分之一的時間。
我們同時發送了所有20個請求,因此在IO操作上花費的時間僅花費了一次。 收到回復后,我們會Swift對其進行處理。
使用執行程序服務很重要,它被設置為supplyAsync方法的可選第二個參數。 我們指定了一個包含一百個線程的池,因此我們可以同時發送100個請求。 如果我們不指定執行者,則默認情況下將使用ForkJoin池。
您可以嘗試刪除執行程序,您將看到與并行示例相同的性能。
六,結論
我們已經看到,當執行不涉及計算的操作(例如IO操作)時,我們可以使用CompletableFuture類來利用我們的處理器并提高應用程序的性能。
翻譯自: https://www.javacodegeeks.com/2015/03/improving-performance-non-blocking-processing-of-streams.html
為什么非阻塞io性能更好
總結
以上是生活随笔為你收集整理的为什么非阻塞io性能更好_提高性能:流的非阻塞处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: stateless_Spring Sta
- 下一篇: linux linux(linux 下j