基于CompletableFuture并发任务编排实现
文章目錄
- 并發任務編排實現
- 不帶返回值/參數傳遞任務
- 串行執行
- 并行執行
- 并行執行-自定義線程池
- 阻塞等待:多并行任務執行完再執行
- 任意一個任務并發執行完就執行下個任務
- 串并行任務依賴場景
- 帶返回值/參數傳遞任務
- 帶返回值實現
- 串行執行
- 多線程任務串行執行
- 對任務并行執行,返回值combine
- 寫在最后
并發任務編排實現
其實Java8中提供了并發編程框架CompletableFuture,以下結合不同場景進行使用。
不帶返回值/參數傳遞任務
模擬任務代碼:
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");class TaskA implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(2000);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務A", LocalDateTime.now().format(formatter)));}}class TaskB implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(1000);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務B", LocalDateTime.now().format(formatter)));}}class TaskC implements Runnable{@SneakyThrows@Overridepublic void run() {Thread.sleep(50);System.out.println(String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務C", LocalDateTime.now().format(formatter)));}}串行執行
A、B、C任務串行執行
CompletableFuture,runAsync():異步執行
thenRun():上個任務結束再執行(不帶上一個返回值結果)下一個任務
get():阻塞等待任務執行完成
實現方式:
@Testvoid thenRunTest() throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.runAsync(new TaskA()).thenRun(new TaskB()).thenRun(new TaskC());future.get();}輸出:
threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務A] time:[2021-06-01 22:56:51] threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務B] time:[2021-06-01 22:56:52] threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務C] time:[2021-06-01 22:56:53]從日志就能看出串行執行就是通過單線程執行多個任務。
并行執行
A、B、C任務并行執行
CompletableFuture.allOf():等待所有的CompletableFuture執行完成,無返回值
代碼實現:
/*** 并發執行ABC任務*/@SneakyThrows@Testvoid SeqTest(){String start = LocalDateTime.now().format(formatter);System.out.println(String.format("start task [%s]", start));CompletableFuture[] futures = new CompletableFuture[3];futures[0] = CompletableFuture.runAsync(new TaskA());futures[1] = CompletableFuture.runAsync(new TaskB());futures[2] = CompletableFuture.runAsync(new TaskC());CompletableFuture.allOf(futures).get();String end = LocalDateTime.now().format(formatter);System.out.println(String.format("end task [%s]", end));}輸出:
start task [2021-06-01 23:03:49] threadName: [ForkJoinPool.commonPool-worker-3] taskName:[任務C] time:[2021-06-01 23:03:49] threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任務B] time:[2021-06-01 23:03:50] threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務A] time:[2021-06-01 23:03:51] end task [2021-06-01 23:03:51]上述這種方式執行可以看出CompletableFuture默認使用的是ForkJoinPool.commonPool線程池,居然用的默認線程池那線程數是如何配置的呢?后來找到源碼發現commonPool線程池配置代碼如下
并行執行-自定義線程池
不是所有任務都是CPU密集型,為了解決上述問題,尤其是IO場景,我們需要根據業務場景配置合理線程數充分使其利用cpu資源。
如何合理配置線程數可以參考我之前文章
輸出:
start task [2021-06-02 00:00:05] threadName: [pool-1-thread-3] taskName:[任務C] time:[2021-06-02 00:00:05] threadName: [pool-1-thread-2] taskName:[任務B] time:[2021-06-02 00:00:06] threadName: [pool-1-thread-1] taskName:[任務A] time:[2021-06-02 00:00:07] end task [2021-06-02 00:00:07]阻塞等待:多并行任務執行完再執行
A、B并行都執行完后再執行C任務
輸出:
start task [2021-06-02 16:56:42] threadName: [pool-1-thread-2] taskName:[任務B] time:[2021-06-02 16:56:43] threadName: [pool-1-thread-1] taskName:[任務A] time:[2021-06-02 16:56:44] threadName: [pool-1-thread-3] taskName:[任務C] time:[2021-06-02 16:56:44] end task [2021-06-02 16:56:44]從輸出中能看出B、A任務并發執行完成以后再執行C任務
任意一個任務并發執行完就執行下個任務
A、B并發執行,只要有一個執行完就執行C任務
anyOf:只要有任意一個CompletableFuture結束,就可以做接下來的事情,而無須像AllOf那樣,等待所有的CompletableFuture結束
@Testvoid anyOf() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskA(), customThreadPool);futures[1] = CompletableFuture.runAsync(new TaskB(), customThreadPool);CompletableFuture.anyOf(futures).get();CompletableFuture.runAsync(new TaskC(), customThreadPool).get();}輸出:
start task [2021-06-02 17:43:42] threadName: [pool-1-thread-2] taskName:[任務B] time:[2021-06-02 17:43:43] threadName: [pool-1-thread-3] taskName:[任務C] time:[2021-06-02 17:43:43] ----------- end task [2021-06-02 17:43:43]串并行任務依賴場景
@Testvoid multiSeqAndParTest() throws ExecutionException, InterruptedException {ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));CompletableFuture.runAsync(new TaskA(), customThreadPool).get();CompletableFuture[] futures = new CompletableFuture[2];futures[0] = CompletableFuture.runAsync(new TaskB(), customThreadPool).thenRun(new TaskC());futures[1] = CompletableFuture.runAsync(new TaskD(), customThreadPool).thenRun(new TaskE());CompletableFuture.allOf(futures).get();CompletableFuture.runAsync(new TaskF(), customThreadPool).get();}輸出:
start task [2021-06-02 17:33:35] threadName: [pool-1-thread-1] taskName:[任務A] time:[2021-06-02 17:33:37] ----------- threadName: [pool-1-thread-3] taskName:[任務D] time:[2021-06-02 17:33:37] threadName: [pool-1-thread-3] taskName:[任務E] time:[2021-06-02 17:33:37] ----------- threadName: [pool-1-thread-2] taskName:[任務B] time:[2021-06-02 17:33:38] threadName: [pool-1-thread-2] taskName:[任務C] time:[2021-06-02 17:33:38] ----------- threadName: [pool-1-thread-4] taskName:[任務F] time:[2021-06-02 17:33:38] end task [2021-06-02 17:33:38]帶返回值/參數傳遞任務
模擬任務
String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務A", LocalDateTime.now().format(formatter));return v;}String taskB(){try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務B", LocalDateTime.now().format(formatter));return v;}String taskC(){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務C", LocalDateTime.now().format(formatter));return v;}帶返回值實現
supplyAsync():異步執行并帶返回值
@Testvoid supplyAsync() throws ExecutionException, InterruptedException {CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> taskA());String result = stringCompletableFuture.get();System.out.println(result);}String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}串行執行
thenApply(): 后面跟的是一個有參數、有返回值的方法,稱為Function。返回值是CompletableFuture類型。
thenAccept():上個任務結束再執行(前面任務的結果作為下一個任務的入參)下一個任務
輸出:
start task [2021-06-03 11:14:27] threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務A] time:[2021-06-03 11:14:30]->threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務B] time:[2021-06-03 11:14:30]->threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務C] time:[2021-06-03 11:14:31] end task [2021-06-03 11:14:31]多線程任務串行執行
A、B、C任務在多個線程環境下執行,但是執行需要帶要帶參數傳遞A->B->C,感覺這種使用場景比較少
thenCompose():第1個參數是一個CompletableFuture類型,第2個參數是一個方法,并且是一個BiFunction,也就是該方法有2個輸入參數,1個返回值。從該接口的定義可以大致推測,它是要在2個 CompletableFuture 完成之后,把2個CompletableFuture的返回值傳進去,再額外做一些事情。
模擬任務:
String taskA(){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務A", LocalDateTime.now().format(formatter));return v;}String taskB(String param){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務B", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}String taskC2(String param){try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String v = String.format("threadName: [%s] taskName:[%s] time:[%s]", Thread.currentThread().getName(), "任務C", LocalDateTime.now().format(formatter));return param + "\n ->" + v;}實現一:
@Testvoid multiCompletableFutureSeqTest() throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> taskA()).thenCompose(firstTaskReturn -> CompletableFuture.supplyAsync(() -> taskB(firstTaskReturn))).thenCompose(secondTaskReturn -> CompletableFuture.supplyAsync(() -> taskC2(secondTaskReturn)));System.out.println(future.get());}輸出:
start task [2021-06-03 15:04:45] threadName: [ForkJoinPool.commonPool-worker-1] taskName:[任務A] time:[2021-06-03 15:04:48]->threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任務B] time:[2021-06-03 15:04:51]->threadName: [ForkJoinPool.commonPool-worker-2] taskName:[任務C] time:[2021-06-03 15:04:54] end task [2021-06-03 15:04:54]對任務并行執行,返回值combine
如果希望返回值是一個非嵌套的CompletableFuture,可以使用thenCompose
@SneakyThrows@Testvoid multiCombineTest(){CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> taskA()).thenCombine(CompletableFuture.supplyAsync(() -> taskB2()), (s1, s2) -> s1 + "\n" + s2 + "\n" + "combine: " + Thread.currentThread().getName()).thenCombine(CompletableFuture.supplyAsync(() -> taskC2()), (s1, s2) -> s1 + "\n" + s2 + "\n" + "combine: " + Thread.currentThread().getName());System.out.println(future.get());}寫在最后
推薦一個大佬的并發編程框架,文章思路是照著他的readme去寫的
總結
以上是生活随笔為你收集整理的基于CompletableFuture并发任务编排实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于统计时间切片标签的一些sql
- 下一篇: 【转载保存】大型推荐系统架构图设计图