线程池学习笔记
線程
Callable
public class CallableDemo {public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();Future<String> future = executorService.submit(() -> {log.info("do something in callable");Thread.sleep(5000);return "Done";});log.info("do something in main");Thread.sleep(1000);String result = future.get();log.info("result:{}", result);} }FutureTask
多線程執行任務時,有比較耗時操作,但又需要其返回結果時,可以使用FutureTask
public class FutureTaskDemo {public static void main(String[] args) throws Exception {FutureTask<String> futureTask = new FutureTask<String>(() -> {log.info("do something in callable");Thread.sleep(5000);return "Done";});new Thread(futureTask).start();log.info("do something in main");Thread.sleep(1000);// 獲取耗時操作的返回結果,這里是堵塞操作String result = futureTask.get();log.info("result:{}", result);} }Fork/Join
用于并行執行任務,將大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。使用工作竊取算法,某個線程從其他隊列里竊取任務來執行
- fork:將大任務切割成若干個子任務并行執行
- join:合并子任務的執行結果得到大任務的結果
- …
局限性
歸并排序
package test.thread.pool.merge;import java.util.Arrays; import java.util.Random;/*** 歸并排序* @author yinwenjie*/ public class Merge1 {private static int MAX = 10000;private static int inits[] = new int[MAX];// 這是為了生成一個數量為MAX的隨機整數集合,準備計算數據// 和算法本身并沒有什么關系static {Random r = new Random();for(int index = 1 ; index <= MAX ; index++) {inits[index - 1] = r.nextInt(10000000);}}public static void main(String[] args) {long beginTime = System.currentTimeMillis();int results[] = forkits(inits); long endTime = System.currentTimeMillis();// 如果參與排序的數據非常龐大,記得把這種打印方式去掉System.out.println("耗時=" + (endTime - beginTime) + " | " + Arrays.toString(results)); }// 拆分成較小的元素或者進行足夠小的元素集合的排序private static int[] forkits(int source[]) {int sourceLen = source.length;if(sourceLen > 2) {int midIndex = sourceLen / 2;int result1[] = forkits(Arrays.copyOf(source, midIndex));int result2[] = forkits(Arrays.copyOfRange(source, midIndex , sourceLen));// 將兩個有序的數組,合并成一個有序的數組int mer[] = joinInts(result1 , result2);return mer;} // 否則說明集合中只有一個或者兩個元素,可以進行這兩個元素的比較排序了else {// 如果條件成立,說明數組中只有一個元素,或者是數組中的元素都已經排列好位置了if(sourceLen == 1|| source[0] <= source[1]) {return source;} else {int targetp[] = new int[sourceLen];targetp[0] = source[1];targetp[1] = source[0];return targetp;}}}/*** 這個方法用于合并兩個有序集合* @param array1* @param array2*/private static int[] joinInts(int array1[] , int array2[]) {int destInts[] = new int[array1.length + array2.length];int array1Len = array1.length;int array2Len = array2.length;int destLen = destInts.length;// 只需要以新的集合destInts的長度為標準,遍歷一次即可for(int index = 0 , array1Index = 0 , array2Index = 0 ; index < destLen ; index++) {int value1 = array1Index >= array1Len?Integer.MAX_VALUE:array1[array1Index];int value2 = array2Index >= array2Len?Integer.MAX_VALUE:array2[array2Index];// 如果條件成立,說明應該取數組array1中的值if(value1 < value2) {array1Index++;destInts[index] = value1;}// 否則取數組array2中的值else {array2Index++;destInts[index] = value2;}}return destInts;} }歸并排序
/*** 使用Fork/Join框架的歸并排序算法* @author yinwenjie*/ public class Merge2 {private static int MAX = 100000000;private static int inits[] = new int[MAX];// 同樣進行隨機隊列初始化,這里就不再贅述了static {......}public static void main(String[] args) throws Exception { // 正式開始long beginTime = System.currentTimeMillis();ForkJoinPool pool = new ForkJoinPool();MyTask task = new MyTask(inits);ForkJoinTask<int[]> taskResult = pool.submit(task);try {taskResult.get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace(System.out);}long endTime = System.currentTimeMillis();System.out.println("耗時=" + (endTime - beginTime)); }/*** 單個排序的子任務* @author yinwenjie*/static class MyTask extends RecursiveTask<int[]> {private int source[];public MyTask(int source[]) {this.source = source;}/* (non-Javadoc)* @see java.util.concurrent.RecursiveTask#compute()*/@Overrideprotected int[] compute() {int sourceLen = source.length;// 如果條件成立,說明任務中要進行排序的集合還不夠小if(sourceLen > 2) {int midIndex = sourceLen / 2;// 拆分成兩個子任務MyTask task1 = new MyTask(Arrays.copyOf(source, midIndex));task1.fork();MyTask task2 = new MyTask(Arrays.copyOfRange(source, midIndex , sourceLen));task2.fork();// 將兩個有序的數組,合并成一個有序的數組int result1[] = task1.join();int result2[] = task2.join();int mer[] = joinInts(result1 , result2);return mer;} // 否則說明集合中只有一個或者兩個元素,可以進行這兩個元素的比較排序了else {// 如果條件成立,說明數組中只有一個元素,或者是數組中的元素都已經排列好位置了if(sourceLen == 1|| source[0] <= source[1]) {return source;} else {int targetp[] = new int[sourceLen];targetp[0] = source[1];targetp[1] = source[0];return targetp;}}}private int[] joinInts(int array1[] , int array2[]) {// 和上文中出現的代碼一致}} }BlockingQueue
堵塞隊列,有兩種情況會堵塞
| 添加 | add(o) | offer(o) | put(o) | offer(0,timeout,timeunit) | |
| 移除 | remove(o) | poll() | take() | poll(timeout,timeunit) | |
| 檢查 | element() | peek() |
線程池
ThreadPoolExecutor
Executors使用場景
想要頻繁的創建和銷毀線程的時候
線程池的概念
線程池就是提前創建若干個線程,如果有任務需要處理,線程池里的線程就會處理任務,處理完之后線程并不會被銷毀,而是等待下一個任務。由于創建和銷毀線程都是消耗系統資源的
線程池的優勢
- 降低創建線程和銷毀線程的性能開銷
- 提高響應速度,當有新任務需要執行是不需要等待線程創建就可以立馬執行
- 合理的設置線程池大小(限流)可以避免因為線程數超過硬件資源瓶頸帶來的問題
Api Executors
newFixedThreadPool
該方法返回一個固定數量的線程池,當有一個任務提交時,若線程池中空閑,則立即執行,若沒有,則會被暫緩在一個任務隊列中,等待有空閑的線程去執行,用途:FixedThreadPool 用于負載比較大的服務器,為了資源的合理利用,需要限制當前線程數量。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());newSingleThreadExecutor
創建一個線程的線程池,若空閑則執行,若沒有空閑線程則暫緩在任務隊列中
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());newCachedThreadPool
根據實際情況調整線程個數,不限制最大線程數,若用空閑的線程則執行任務,若無任務則不創建線程。并且每一個空閑線程會在 60 秒后自動回收
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());newScheduledThreadPool
創建一個可以指定線程的數量的線程池,但是這個線程池還帶有延遲和周期性執行任務的功能,類似定時器
ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);線程池參數
ThreadPoolExecutor采取上述步驟的總體設計思路,是為了在執行execute()方法時,盡可能地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之后 (當前運行的線程數大于等于corePoolSize),幾乎所有的execute()方法調用都是執行步驟2,而 步驟2不需要獲取全局鎖。
線程池中線程總數、運行線程數、空閑線程數、任務隊列等之間的關系
飽和策略
RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處于飽和狀 態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法 處理新任務時拋出異常。在JDK 1.5中Java線程池框架提供了以下4種策略。
當然,也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日志或持久化存儲不能處理的任務。
任務提交
最佳線程數
最佳線程數目 = (線程等待時間+任務執行時間)/任務執行時間 * CPU數目
備注:這個公式也是前輩們分享的,當然之前看了淘寶前臺系統優化實踐的文章,和上面的公式很類似,不過在CPU數目那邊,他們更細化了,上面的公式只是參考。不過不管什么公式,最終還是在生產環境中運行后,再優化調整。
例如服務器CPU核數為4核,一個任務線程cpu耗時為20ms,線程等待(網絡IO、磁盤IO)耗時80ms,那最佳線程數目:( 80 + 20 )/20 * 4 = 20。也就是設置20個線程數最佳。
合理地配置線程池
CPU密集型任務應配置盡可能小的線程,如配置Ncpu+1個線程的線程池。由于IO密集型任務線程并不是一直在執行任務,則應配置盡可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐量 將高于串行執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過 Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。
依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,等待的時間越長,則CPU空閑時間就越長,那么線程數應該設置得越大,這樣才能更好地利用CPU。
建議使用有界隊列。有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點 兒,比如幾千。有一次,我們系統里后臺任務線程池的隊列和線程池全滿了,不斷拋出拋棄任 務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后臺任務線 程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻 塞,任務積壓在線程池里。如果當時我們設置成無界隊列,那么線程池的隊列就會越來越多, 有可能會撐滿內存,導致整個系統不可用,而不只是后臺任務出現問題。當然,我們的系統所 有的任務是用單獨的服務器部署的,我們使用不同規模的線程池完成不同類型的任務,但是 出現這樣問題時也會影響到其他任務。
任務的性質
多線程最佳實踐
使用案例
package com.insightfullogic.java8.concurrent;import java.util.ArrayList; import java.util.List; import java.util.concurrent.*;/*** @description: 線程測試類* @author: tiger* @create: 2022-10-07 11:33*/ public class MutilThread {// 建立一個線程池,注意要放在外面,不要每次執行代碼就建立一個,具體線程池的使用就不展開了public static ExecutorService commonThreadPool = new ThreadPoolExecutor(5, 5, 300L,TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());public static void main(String[] args) {// 開始多線程調用List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 12; i++) {int finalI = i;Future<String> future = (Future<String>) commonThreadPool.submit(() -> {System.out.println(finalI);});futures.add(future);}// 獲取結果List<String> list = new ArrayList<>();try {for (int i = 0; i < futures.size(); i++) {list.add(futures.get(i).get());}} catch (Exception e) { // LOGGER.error("出現錯誤:", e);}} }順序調用
CompletableFuture<A> futureA = CompletableFuture.supplyAsync(() -> doA()); CompletableFuture<B> futureB = CompletableFuture.supplyAsync(() -> doB()); CompletableFuture.allOf(futureA,futureB) // 等a b 兩個任務都執行完成C c = doC(futureA.join(), futureB.join());CompletableFuture<D> futureD = CompletableFuture.supplyAsync(() -> doD(c)); CompletableFuture<E> futureE = CompletableFuture.supplyAsync(() -> doE(c)); CompletableFuture.allOf(futureD,futureE) // 等d e兩個任務都執行完成return doResult(futureD.join(),futureE.join());線程池的監控
如果在系統中大量使用線程池,則有必要對線程池進行監控,方便在出現問題時,可以根 據線程池的使用狀況快速定位問題。可以通過線程池提供的參數進行監控,在監控線程池的 時候可以使用以下屬性。
通過擴展線程池進行監控。可以通過繼承線程池來自定義線程池,重寫線程池的 beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行后和線程池關閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。 這幾個方法在線程池里是空方法。
死鎖
所謂死鎖是指兩個或兩個以上的進程在執行過程中因爭奪資源而相互等待的現象;如果沒有外力作用,他們則無法推進下去。
產生死鎖的原因
產生死鎖的必要條件
總結
- 上一篇: 生物 人类听觉皮层神经集群的对歌曲的选择
- 下一篇: pip3 install 指定路径