JAVA进阶----ThreadPoolExecutor机制(转)
ThreadPoolExecutor機制?
一、概述?
1、ThreadPoolExecutor作為java.util.concurrent包對外提供基礎實現,以內部線程池的形式對外提供管理任務執行,線程調度,線程池管理等等服務;?
2、Executors方法提供的線程服務,都是通過參數設置來實現不同的線程池機制。?
3、先來了解其線程池管理的機制,有助于正確使用,避免錯誤使用導致嚴重故障。同時可以根據自己的需求實現自己的線程池?
二、核心構造方法講解?
下面是ThreadPoolExecutor最核心的構造方法?
構造方法參數講解?
| 參數名 | 作用 |
| corePoolSize | 核心線程池大小 |
| maximumPoolSize | 最大線程池大小 |
| keepAliveTime | 線程池中超過corePoolSize數目的空閑線程最大存活時間;可以allowCoreThreadTimeOut(true)使得核心線程有效時間 |
| TimeUnit | keepAliveTime時間單位 |
| workQueue | 阻塞任務隊列 |
| threadFactory | 新建線程工廠 |
| RejectedExecutionHandler | 當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理 |
重點講解:?
其中比較容易讓人誤解的是:corePoolSize,maximumPoolSize,workQueue之間關系。?
1.當線程池小于corePoolSize時,新提交任務將創建一個新線程執行任務,即使此時線程池中存在空閑線程。?
2.當線程池達到corePoolSize時,新提交任務將被放入workQueue中,等待線程池中任務調度執行?
3.當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會創建新線程執行任務?
4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理?
5.當線程池中超過corePoolSize線程,空閑時間達到keepAliveTime時,關閉空閑線程?
6.當設置allowCoreThreadTimeOut(true)時,線程池中corePoolSize線程空閑時間達到keepAliveTime也將關閉?
線程管理機制圖示:?
三、Executors提供的線程池配置方案?
1、構造一個固定線程數目的線程池,配置的corePoolSize與maximumPoolSize大小相同,同時使用了一個無界LinkedBlockingQueue存放阻塞任務,因此多余的任務將存在再阻塞隊列,不會由RejectedExecutionHandler處理?
2、構造一個緩沖功能的線程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一個無容量的阻塞隊列 SynchronousQueue,因此任務提交之后,將會創建新的線程執行;線程空閑超過60s將會銷毀?
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}3、構造一個只支持一個線程的線程池,配置corePoolSize=maximumPoolSize=1,無界阻塞隊列LinkedBlockingQueue;保證任務由一個線程串行執行?
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}4、構造有定時功能的線程池,配置corePoolSize,無界延遲阻塞隊列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由于DelayedWorkQueue是無界隊列,所以這個值是沒有意義的?
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}四、定制屬于自己的線程池?
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;public class CustomThreadPoolExecutor {private ThreadPoolExecutor pool = null;/*** 線程池初始化方法* * corePoolSize 核心線程池大小----10* maximumPoolSize 最大線程池大小----30* keepAliveTime 線程池中超過corePoolSize數目的空閑線程最大存活時間----30+單位TimeUnit* TimeUnit keepAliveTime時間單位----TimeUnit.MINUTES* workQueue 阻塞隊列----new ArrayBlockingQueue<Runnable>(10)====10容量的阻塞隊列* threadFactory 新建線程工廠----new CustomThreadFactory()====定制的線程工廠* rejectedExecutionHandler 當提交任務數超過maxmumPoolSize+workQueue之和時,* 即當提交第41個任務時(前面線程都沒有執行完,此測試方法中用sleep(100)),* 任務會交給RejectedExecutionHandler來處理*/public void init() {pool = new ThreadPoolExecutor(10,30,30,TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(10),new CustomThreadFactory(),new CustomRejectedExecutionHandler());}public void destory() {if(pool != null) {pool.shutdownNow();}}public ExecutorService getCustomThreadPoolExecutor() {return this.pool;}private class CustomThreadFactory implements ThreadFactory {private AtomicInteger count = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);System.out.println(threadName);t.setName(threadName);return t;}}private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 記錄異常// 報警處理等System.out.println("error.............");}}// 測試構造的線程池public static void main(String[] args) {CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();// 1.初始化 exec.init();ExecutorService pool = exec.getCustomThreadPoolExecutor();for(int i=1; i<100; i++) {System.out.println("提交第" + i + "個任務!");pool.execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("running=====");}});}// 2.銷毀----此處不能銷毀,因為任務沒有提交執行完,如果銷毀線程池,任務也就無法執行了// exec.destory();try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}} }方法中建立一個核心線程數為30個,緩沖隊列有10個的線程池。每個線程任務,執行時會先睡眠0.1秒,保證提交40個任務時沒有任務被執行完,這樣提交第41個任務是,會交給CustomRejectedExecutionHandler 類來處理。
http://825635381.iteye.com/blog/2184680
?
?
?
?
?
轉載于:https://www.cnblogs.com/softidea/p/4286051.html
總結
以上是生活随笔為你收集整理的JAVA进阶----ThreadPoolExecutor机制(转)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: AIX 上安装SSH
- 下一篇: dubbo协议参考