异步调用如何使用是最好的方式?
一、異步調用方式分析
今天在寫代碼的時候,想要調用異步的操作,這里我是用的java8的流式異步調用,但是使用過程中呢,發現這個異步方式有兩個方法,如下所示:
區別是一個 需要指定線程池,一個不需要。
那么指定線程池有哪些好處呢?直觀的說有以下兩點好處:
可以根據我們的服務器性能,通過池的管理更好的規劃我們的線程數。
可以對我們使用的線程自定義名稱,這里也是阿里java開發規范所提到的。
1.1 java8異步調用默認線程池方式
當然常規使用默認的也沒什么問題。我們通過源碼分析下使用默認線程池的過程。
看下這個asyncPool是什么?
如下所示,useCommonPool如果為真,就使用ForkJoinPool.commonPool(),否則創建一個new ThreadPerTaskExecutor():
private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 復制代碼看看useCommonPool 是什么?
private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1); 復制代碼 /*** 公共池的目標并行度級別*/public static int getCommonPoolParallelism() {return commonParallelism;} 復制代碼最終這個并行級別并沒有給出默認值
static final int commonParallelism; 復制代碼通過找到這個常量的調用,我們看看是如何進行初始化的,在ForkJoinPool中有一個靜態代碼塊,啟動時會對commonParallelism進行初始化,我們只關注最后一句話就好了,:
// Unsafe mechanicsprivate static final sun.misc.Unsafe U;private static final int ABASE;private static final int ASHIFT;private static final long CTL;private static final long RUNSTATE;private static final long STEALCOUNTER;private static final long PARKBLOCKER;private static final long QTOP;private static final long QLOCK;private static final long QSCANSTATE;private static final long QPARKER;private static final long QCURRENTSTEAL;private static final long QCURRENTJOIN;static {// initialize field offsets for CAS etctry {U = sun.misc.Unsafe.getUnsafe();Class<?> k = ForkJoinPool.class;CTL = U.objectFieldOffset(k.getDeclaredField("ctl"));RUNSTATE = U.objectFieldOffset(k.getDeclaredField("runState"));STEALCOUNTER = U.objectFieldOffset(k.getDeclaredField("stealCounter"));Class<?> tk = Thread.class;PARKBLOCKER = U.objectFieldOffset(tk.getDeclaredField("parkBlocker"));Class<?> wk = WorkQueue.class;QTOP = U.objectFieldOffset(wk.getDeclaredField("top"));QLOCK = U.objectFieldOffset(wk.getDeclaredField("qlock"));QSCANSTATE = U.objectFieldOffset(wk.getDeclaredField("scanState"));QPARKER = U.objectFieldOffset(wk.getDeclaredField("parker"));QCURRENTSTEAL = U.objectFieldOffset(wk.getDeclaredField("currentSteal"));QCURRENTJOIN = U.objectFieldOffset(wk.getDeclaredField("currentJoin"));Class<?> ak = ForkJoinTask[].class;ABASE = U.arrayBaseOffset(ak);int scale = U.arrayIndexScale(ak);if ((scale & (scale - 1)) != 0)throw new Error("data type scale not a power of two");ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);} catch (Exception e) {throw new Error(e);}commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;defaultForkJoinWorkerThreadFactory =new DefaultForkJoinWorkerThreadFactory();modifyThreadPermission = new RuntimePermission("modifyThread");common = java.security.AccessController.doPrivileged(new java.security.PrivilegedAction<ForkJoinPool>() {public ForkJoinPool run() { return makeCommonPool(); }});// 即使線程被禁用也是1,至少是個1int par = common.config & SMASK;commonParallelism = par > 0 ? par : 1;} 復制代碼如下所示,默認是7:
所以接著下面的代碼看:
private static final boolean useCommonPool =(ForkJoinPool.getCommonPoolParallelism() > 1); 復制代碼這里一定是返回true,證明當前是并行的。
private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 復制代碼上面會返回一個大小是七的的默認線程池
其實這個默認值是當前cpu的核心數,我的電腦是八核,在代碼中默認會將核心數減一,所以顯示是七個線程。
if (parallelism < 0 && //默認是1,小于核心數(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)parallelism = 1;if (parallelism > MAX_CAP)parallelism = MAX_CAP; 復制代碼下面我們寫個main方法測試一下,10個線程,每個阻塞10秒,看結果:
public static void main(String[] args) {// 創建10個任務,每個任務阻塞10秒for (int i = 0; i < 10; i++) {CompletableFuture.runAsync(() -> {try {Thread.sleep(10000);System.out.println(new Date() + ":" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}});}try {Thread.sleep(30000);} catch (InterruptedException e) {e.printStackTrace();}} 復制代碼結果如下所示,前面七個任務先完成,另外三個任務被阻塞10秒后,才完成:
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-5 Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-4 Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-2 Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-7 Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-3 Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-6 Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-1 ----------------------------------------------------------- Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-2 Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-5 Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-4 復制代碼結論:當我們使用默認的線程池進行異步調用時,如果異步任務是一個IO密集型,簡單說處理時間占用長,將導致其他使用共享線程池的任務阻塞,造成系統性能下降甚至異常。甚至當一部分調用接口時,如果接口超時,那么也會阻塞與超時時長相同的時間;實際在計算密集的場景下使用是能提高性能的。
二、使用自定義的線程池
上面說到如果是IO密集型的場景,在異步調用時還是使用自定義線程池比較好。
針對開篇提到的兩個顯而易見的好處,此處新增一條:
可以根據我們的服務器性能,通過池的管理更好的規劃我們的線程數。
可以對我們使用的線程自定義名稱,這里也是阿里java開發規范所提到的。
不會因為阻塞導致使用共享線程池的其他線程阻塞甚至異常。
我們自定義下面的線程池:
調用:
public static void main(String[] args) {// 創建10個任務,每個任務阻塞10秒for (int i = 0; i < 10; i++) {CompletableFuture.runAsync(() -> {try {Thread.sleep(10000);System.out.println(new Date() + ":" + Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}},GlobalThreadPool.getExecutor());}try {Thread.sleep(30000);} catch (InterruptedException e) {e.printStackTrace();} }復制代碼
輸出我們指定線程名稱的線程:
三、題外話,動態線程池
3.1 什么是動態線程池?
在我們使用線程池的時候,是否有的時候很糾結,到底設置多大的線程池參數是最合適的呢?如果不夠用了怎么辦,要改代碼重新部署嗎?
其實是不需要的,記得當初看過美團的一篇文章,真的讓人茅塞頓開啊,動態線程池。
ThreadPoolExecutor這個類其實是提供對于線程池的屬性進行修改的,支持我們動態修改以下的屬性:
從上至下分別是:
線程工廠(用于指定線程名稱)
核心線程數
最大線程數
活躍時間
拒絕策略。
在美團的文章當中呢,是監控服務器線程的使用率,當達到閾值就進行告警,然后通過配置中心去動態修改這些數值。
我們也可以這么做,使用@RefreshScope加nacos就可以實現了。
3.2 實踐
我寫了一個定時任務,監控當前服務的線程使用率,小了就擴容,一段時間后占用率下降,就恢復初始值。
其實還有很多地方需要改進的,請大家多提意見,監控的是文章前面的線程池GlobalThreadPool,下面調度任務的代碼:
/*** @description: 全局線程池守護進程* @author:weirx* @date:2021/9/10 16:32* @version:3.0*/ @Slf4j @Component public class DaemonThreadTask {/*** 服務支持最大線程數*/public final static int SERVER_MAX_SIZE = 50;/*** 最大閾值Maximum threshold,百分比*/private final static int MAXIMUM_THRESHOLD = 8;/*** 每次遞增最大線程數*/private final static int INCREMENTAL_MAX_NUM = 10;/*** 每次遞增核心線程數*/private final static int INCREMENTAL_CORE_NUM = 5;/*** 當前線程數*/private static int currentSize = GlobalThreadPool.MAX_NUM_POOL_SIZE;/*** 當前核心線程數*/private static int currentCoreSize = GlobalThreadPool.CORE_POOL_SIZE;@Scheduled(cron = "0 */5 * * * ?")public static void execute() {threadMonitor();}/*** description: 動態監控并設置線程參數** @return: void* @author: weirx* @time: 2021/9/10 13:20*/private static void threadMonitor() {ThreadPoolExecutor instance = GlobalThreadPool.getExecutor();int activeCount = instance.getActiveCount();int size = instance.getQueue().size();log.info("GlobalThreadPool: the active thread count is {}", activeCount);// 線程數不足,增加線程if (activeCount > GlobalThreadPool.MAX_NUM_POOL_SIZE % MAXIMUM_THRESHOLD&& size >= GlobalThreadPool.BLOCKING_QUEUE_SIZE) {currentSize = currentSize + INCREMENTAL_MAX_NUM;currentCoreSize = currentCoreSize + INCREMENTAL_CORE_NUM;//當前設置最大線程數小于服務最大支持線程數才可以繼續增加線程if (currentSize <= SERVER_MAX_SIZE) {instance.setMaximumPoolSize(currentSize);instance.setCorePoolSize(currentCoreSize);log.info("this max thread size is {}", currentSize);} else {log.info("current size is more than server max size, can not add");}}// 線程數足夠,降低線程數,當前活躍數小于默認核心線程數if (activeCount < GlobalThreadPool.MAX_NUM_POOL_SIZE&& size == 0&& currentSize > GlobalThreadPool.MAX_NUM_POOL_SIZE) {currentSize = GlobalThreadPool.MAX_NUM_POOL_SIZE;currentCoreSize = GlobalThreadPool.CORE_POOL_SIZE;instance.setMaximumPoolSize(currentSize);instance.setCorePoolSize(currentCoreSize);}} } 復制代碼3.3 動態線程池有什么意義?
有的朋友其實問過我,我直接把線程池設置大一點不就好了,這種動態線程池有什么意義呢?
其實這是一個好問題。在以前的傳統軟件當中,單機部署,硬件部署,確實,我們能使用的線程數取決于服務器的核心線程數,而且基本沒有其他服務來爭搶這些線程。
但是現在是容器的時代,云原生的時代。
多個容器部署在一個宿主機上,那么當高峰期的時候,某個容器就需要占用大量的cpu資源,如果所有的容器都將大部分資源占據,那么這個容器必然面臨阻塞甚至癱瘓的風險。
當高峰期過了,釋放這部分資源可以被釋放掉,用于其他需要的容器。。
再結合到目前的云服務器節點擴容,都是需要動態擴縮容的的,和線程相似,在滿足高可用的情況下,盡量的節約成本。
最后
如果你覺得此文對你有一丁點幫助,點個贊。或者可以加入我的開發交流群:1025263163相互學習,我們會有專業的技術答疑解惑
如果你覺得這篇文章對你有點用的話,麻煩請給我們的開源項目點點star:http://github.crmeb.net/u/defu不勝感激 !
PHP學習手冊:https://doc.crmeb.com
技術交流論壇:https://q.crmeb.com
總結
以上是生活随笔為你收集整理的异步调用如何使用是最好的方式?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 生辰八字五行宝宝起名软件 V23.6 算
- 下一篇: dspace rcp_RCP工具箱开源