没想到,这么简单的线程池用法,深藏这么多坑
又又又踩坑了
生產有個對賬系統,每天需要從渠道端下載對賬文件,然后開始日終對賬。這個系統已經運行了很久,前兩天突然收到短信預警,沒有獲取渠道端對賬文件。
本以為又是渠道端搞事情,上去一排查才發現,所有下載任務都被阻塞了。再進一步排查源碼,才發現自己一直用錯了線程池某個方法。
由于線程創建比較昂貴,正式項目中我們都會使用線程池執行異步任務。線程池,使用池化技術保存線程對象,使用的時候直接取出來,用完歸還以便使用。
雖然線程池的使用非常方法非常簡單,但是越簡單,越容易踩坑。細數一下,這些年來因為線程池導致生產事故也有好幾起。
所以今天,小黑哥就針對線程池的話題,給大家演示一下怎么使用線程池才會踩坑。
希望大家看完,可以完美避開這些坑~
慎用 Executors 組件
Java 從 JDK1.5 開始提供線程池的實現類,我們只需要在構造函數內傳入相關參數,就可以創建一個線程池。
不過線程池的構造函數可以說非常復雜,就算最簡單的那個構造函數,也需要傳入 5 個參數。這對于新手來說,非常不方便哇。
也許 JDK 開發者也考慮到這個問題,所以非常貼心給我們提供一個工具類 Executors,用來快捷創建創建線程池。
雖然這個工具類使用真的非常方便,可以少寫很多代碼,但是小黑哥還是建議生產系統還是老老實實手動創建線程池,慎用Executors,尤其是工具類中兩個方法 ?Executors#newFixedThreadPool與 Executors#newCachedThreadPool。
如果你圖了方便使用上述方法創建了線程池,那就是一顆定時炸彈,說不準那一天生產系統就會????。
我們來看兩個????,看下這個這兩個方法會有什么問題。
假設我們有個應用有個批量接口,每次請求將會下載 100w 個文件,這里我們使用 Executors#newFixedThreadPool批量下載。
下面方法中,我們隨機休眠,模擬真實下載耗時。
為了快速復現問題,調整 JVM 參數為 -Xmx128m -Xms128m 。
private?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);/***?批量下載對賬文件**?@return*/ @RequestMapping("/batchDownload") public?String?batchDownload()?{//?模擬下載?100w?個文件for?(int?i?=?0;?i?<?1000000;?i++)?{threadPool.execute(()?->?{//?隨機休眠,模擬下載耗時Random?random?=?new?Random();try?{TimeUnit.SECONDS.sleep(random.nextInt(100));}?catch?(InterruptedException?e)?{e.printStackTrace();}});}return?"process"; }程序運行之后,多請求幾次這個批量下載方法,程序很快就會 OOM 。
查看 Executors#newFixedThreadPool源碼,我們可以看到這個方法創建了一個默認的 LinkedBlockingQueue 當做任務隊列。
public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{return?new?ThreadPoolExecutor(nThreads,?nThreads,0L,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue<Runnable>()); }這個問題槽點就在于 LinkedBlockingQueue,這個隊列的默認構造方法如下:
/***?Creates?a?{@code?LinkedBlockingQueue}?with?a?capacity?of*?{@link?Integer#MAX_VALUE}.*/ public?LinkedBlockingQueue()?{this(Integer.MAX_VALUE); }創建 LinkedBlockingQueue 隊列時,如果我們不指定隊列數量,默認數量上限為 Integer.MAX_VALUE。這么大的數量,我們簡直可以當做無界隊列了。
上面我們使用 newFixedThreadPool,我們僅使用了固定數量的線程下載。如果線程都在執行任務,線程池將會任務加入任務隊列中。
如果線程池執行任務過慢,任務將會一直堆積在隊列中。由于我們隊列可以認為是無界的,可以無限制添加任務,這就導致內存占用越來越高,直到 OOM 爆倉。
下面我們將上面的例子稍微修改一下,使用 newCachedThreadPool 創建線程池。
程序運行之后,多請求幾次這個批量下載方法,程序很快就會 OOM ,不過這次報錯信息與之前信息與之前不同。
從報錯信息來看,這次 OOM 的主要原因是因為無法再創建新的線程。
這次看下一下 newCachedThreadPool 方法的源碼,可以看到這個方法將會創建最大線程數為 Integer.MAX_VALUE 的的線程池。
由于這個線程池使用 SynchronousQueue 隊列,這個隊列比較特殊,沒辦法存儲任務。所以默認情況下,線程池只要接到一個任務,就會創建一個線程。
一旦線程池收到大量任務,就會創建大量線程。Java 中的線程是會占用一定的內存空間 ,所以創建大量的線程是必然會導致 OOM。
復用線程池
由于線程池的構造方法比較復雜,而 Executors 創建的線程池比較坑,所以我們有個項目中自己封裝了一個線程池工具類。
工具類代碼如下:
public?static?ThreadPoolExecutor?getThreadPool()?{//?為了快速復現問題,故將線程池?核心線程數與最大線程數設置為?100return?new?ThreadPoolExecutor(100,?100,?60,?TimeUnit.SECONDS,?new?LinkedBlockingDeque<>(200)); }項目代碼中這樣使用這個工具類:
@RequestMapping("/batchDownload") public?String?batchDownload()?{ExecutorService?threadPool?=?ThreadPoolUtils.getThreadPool();//?模擬下載?100w?個文件for?(int?i?=?0;?i?<?100;?i++)?{threadPool.execute(()?->?{//?隨機休眠,模擬下載耗時Random?random?=?new?Random();try?{TimeUnit.SECONDS.sleep(random.nextInt(100));}?catch?(InterruptedException?e)?{e.printStackTrace();}});}return?"process"; }使用 WRK 工具對這個接口同時發起多個請求,很快應用就會拋出 OOM。
每次請求都會創建一個新的線程池執行任務,如果短時間內有大量的請求,就會創建很多的線程池,間接導致創建很多線程。從而導致內存占盡,發生 OOM 問題。
這個問題修復辦法很簡單,要么工具類生成一個單例線程池,要么項目代碼中復用創建出來的線程池。
Spring 異步任務
上面代碼中我們都是自己創建一個線程池執行異步任務,這樣還是比較麻煩。在 Spring 中, 我們可以在方法上使用 Spring 注解 @Async,然后執行異步任務。
代碼如下:
@Async public?void?async()?throws?InterruptedException?{log.info("async?process");Random?random?=?new?Random();TimeUnit.SECONDS.sleep(random.nextInt(100)); }不過使用 Spring 異步任務,我們需要自定義線程池,不然大量請求下,還是有可能發生 OOM 問題。
這是原因主要是 Spring 異步任務默認使用 Spring 內部線程池 ?SimpleAsyncTaskExecutor 。
這個線程池比較坑爹,不會復用線程。也就是說來一個請求,將會新建一個線程。
所以如果需要使用異步任務,一定要使用自定義線程池替換默認線程池。
如果使用 XML 配置,我們可以增加如下配置:
<task:executor?id="myexecutor"?pool-size="5"??/> <task:annotation-driven?executor="myexecutor"/>如果使用注解配置,我們需要設置一個 Bean:
@Bean(name?=?"threadPoolTaskExecutor") public?Executor?threadPoolTaskExecutor()?{ThreadPoolTaskExecutor?executor=new?ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setThreadNamePrefix("test-%d");//?其他設置return?new?ThreadPoolTaskExecutor(); }然后使用注解時指定線程池名稱:
@Async("threadPoolTaskExecutor") public?void?xx()?{//?業務邏輯 }如果是 SpringBoot 項目,從本人測試情況來看,默認將會創建核心線程數為 8,最大線程數為 Integer.MAX_VALUE,隊列數也為 Integer.MAX_VALUE線程池。
“ps:以下代碼基于 Spring-Boot 2.1.6-RELEASE,暫不確定 Spring-Boot 1.x 版本是否也是這種策略,熟悉的同學的,也可以留言指出一下。
雖然上面的線程池不用擔心創建過多線程的問題,不是還是有可能隊列任務過多,導致 OOM 的問題。所以還是建議使用自定義線程池嗎,或者在配置文件修改默認配置,例如:
spring.task.execution.pool.core-size=10 spring.task.execution.pool.max-size=20 spring.task.execution.pool.queue-capacity=200線程池方法使用不當
最后再來說下文章開頭的我踩到的這個坑,這個問題主要是因為理解錯這個方法。
錯誤代碼如下:
//?創建線程池 ExecutorService?threadPool?=?... List<Callable<String>>?tasks?=?new?ArrayList<>(); //?批量創建任務 for?(int?i?=?0;?i?<?100;?i++)?{tasks.add(()?->?{Random?random?=?new?Random();try?{TimeUnit.SECONDS.sleep(random.nextInt(100));}?catch?(InterruptedException?e)?{e.printStackTrace();}return?"success";}); } //?執行所有任務 List<Future<String>>?futures?=?threadPool.invokeAll(tasks); //?獲取結果 for?(Future<String>?future?:?futures)?{try?{future.get();}?catch?(ExecutionException?e)?{e.printStackTrace();} }上面代碼中,使用 invokeAll執行所有任務。由于這個方法返回值為 List<Future<T>>,我誤以為這個方法如 submit一樣,異步執行,不會阻塞主線程。
實際上從源碼上,這個方法實際上逐個調用 Future#get獲取任務結果,而這個方法會同步阻塞主線程。
一旦某個任務被永久阻塞,比如 Socket ?網絡連接位置超時時間,導致任務一直阻塞在網絡連接,間接導致這個方法一直被阻塞,從而影響后續方法執行。
如果需要使用 invokeAll 方法,最好使用其另外一個重載方法,設置超時時間。
?
總結
今天文章通過幾個例子,給大家展示了一下線程池使用過程一些坑。為了快速復現問題,上面的示例代碼還是比較極端,實際中可能并不會這么用。
不過即使這樣,我們千萬不要抱著僥幸的心理,認為這些任務很快就會執行結束。我們在生產上碰到好幾次事故,正常的情況執行都很快。但是偶爾外部程序抽瘋,返回時間變長,就可能導致系統中存在大量任務,導致 OOM。
最后總結一下幾個線程池幾個最佳實踐:
第一,生產系統慎用 Executors 類提供的便捷方法,我們需要自己根據自己的業務場景,配置合理的線程數,任務隊列,拒絕策略,線程回收策略等等,并且一定記得自定義線程池的命名方式,以便于后期排查問題。
第二,線程池不要重復創建,每次都創建一個線程池可能比不用線程池還要糟糕。如果使用其他同學創建的線程池工具類,最好還是看一下實現方式,防止自己誤用。
第三,一定不要按照自己的片面理解去使用 API 方法,如果把握不準,一定要去看下方法上注釋以及相關源碼。
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道公眾號
好文章,我在看??
總結
以上是生活随笔為你收集整理的没想到,这么简单的线程池用法,深藏这么多坑的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: bnuoj 20950 沉重的货物
- 下一篇: hdu 4556 Stern-Bro