自定义线程池内置线程池的使用 ThreadPoolExecutor和Executorservice 示例与注意事项
文章目錄
- 線程池介紹
- 自己設(shè)計一個線程池
- 1.設(shè)計ThreadPool類:
- 2.設(shè)計工作隊列
- 3.實現(xiàn)自己設(shè)計的線程池
- 用java的ThreadPoolExecutor自定義線程池
- 自定義線程池-參數(shù)設(shè)計分析
- 自定義線程池-實現(xiàn)步驟示例
- Exectors創(chuàng)建內(nèi)置線程池
- ExecutorService介紹和示例
- Scheduledexecutorservice
線程池介紹
自JDK1.5起,utils包提供了ExecutorService線程池的實現(xiàn),主要目的是為了重復(fù)利用線程,提高系統(tǒng)效率。我們知道Thread是一個重量級的資源,創(chuàng)建、啟動以及銷毀都是比較耗費系統(tǒng)資源的,因此對線程的重復(fù)利用一種是非常好的程序設(shè)計習(xí)慣,加之系統(tǒng)中可創(chuàng)建的線程數(shù)量是有限的,線程數(shù)量和系統(tǒng)性能是一種拋物線的關(guān)系,也就是說當(dāng)線程數(shù)量達到某個數(shù)值的時候,性能反倒會降低很多,因此對線程的管理,尤其是數(shù)量的控制更能直接決定程序的性能。
所謂線程池,通俗的理解就是有一個池子,里面存放著已經(jīng)創(chuàng)建好的線程,當(dāng)有任務(wù)提交給線程池執(zhí)行時,池子中的某個線程會主動執(zhí)行該任務(wù)。如果池子中的線程數(shù)量不夠應(yīng)付數(shù)量眾多的任務(wù)時,則需要自動擴充新的線程到池子中,但是該數(shù)量是有限的,就好比池塘的水界線一樣。當(dāng)任務(wù)比較少的時候,池子中的線程能夠自動回收,釋放資源。
為了能夠異步地提交任務(wù)和緩存未被處理的任務(wù),需要有一個任務(wù)隊列,
通過上面的描述可知,一個完整的線程池應(yīng)該具備如下要素。
???任務(wù)隊列:用于緩存提交的任務(wù)。
???線程數(shù)量管理功能:一個線程池必須能夠很好地管理和控制線程數(shù)量,可通過如下三個參數(shù)來實現(xiàn),比如創(chuàng)建線程池時初始的線程數(shù)量init;線程池自動擴充時最大的線程數(shù)量max;在線程池空閑時需要釋放線程但是也要維護一定數(shù)量的活躍數(shù)量或者核心數(shù)量core。
有了這三個參數(shù),就能夠很好地控制線程池中的線程數(shù)量,將其維護在一個合理的范圍之內(nèi),
三者之間的關(guān)系是init<=core<=max。
???任務(wù)拒絕策略:如果線程數(shù)量已達到上限且任務(wù)隊列已達到上限且任務(wù)隊列已滿,則需要有相應(yīng)的拒絕策略來通知任務(wù)提交者。
???線程工廠:主要用于個性化定制線程,比如將線程設(shè)置為守護線程以及設(shè)置線程名稱等。
???QueueSize:任務(wù)隊列主要存放提交的Runnable,但是為了防止內(nèi)存溢出,需要有l(wèi)imit數(shù)量對其進行控制。
???Keepedalive時間:該時間主要決定線程各個重要參數(shù)自動維護的時間間隔。
自己設(shè)計一個線程池
在這里實現(xiàn)一個比較簡單的ThreadPool,雖然比較簡單,但是該有的功能基本上都具備,對讀者學(xué)習(xí)和掌握JUC中的ExecutorService也有一定的幫助
線程池工作過程
a) 如果正在運行的線程數(shù)量小于 corePoolSize,那么馬上創(chuàng)建線程運行這個任務(wù);
b) 如果正在運行的線程數(shù)量大于或等于 corePoolSize,那么將這個任務(wù)放入隊列;
c) 如果這時候隊列滿了,而且正在運行的線程數(shù)量小于 maximumPoolSize,那么還是要創(chuàng)建非核心線程立刻運行這個任務(wù);
d) 如果隊列滿了,而且正在運行的線程數(shù)量大于或等于 maximumPoolSize,那么線程池會拋出異常RejectExecutionException。
1.設(shè)計ThreadPool類:
public interface ThreadPool {//提交任務(wù)到線程池void execute(Runnable runnable);//關(guān)閉線程池void shutdown();//獲取線程池的初始化大小int getInitSize();//獲取線程池最大的線程數(shù)int getMaxSize();//獲取線程池的核心線程數(shù)量int getCoreSize();//獲取線程池中用于緩存任務(wù)隊列的大小int getQueueSize();//獲取線程池中活躍線程的數(shù)量int getActiveCount();//查看線程池是否已經(jīng)被shutdownboolean isShutdown(); }ThreadFactory提供了創(chuàng)建線程的接口,以便于個性化地定制Thread,比如Thread應(yīng)該被加到哪個Group中,優(yōu)先級、線程名字以及是否為守護線程等,
2.設(shè)計工作隊列
RunanbleQueue主要用于存放提交的Runnable,該Runnable是一個BlockedQueue,并且有l(wèi)imit的限制
package com.wangwenjun.concurrent.chapter08;//任務(wù)隊列,主要用于緩存提交到線程池中的任務(wù) public interface RunnableQueue {//當(dāng)有新的任務(wù)進來時首先會offer到隊列中void offer(Runnable runnable);//工作線程通過take方法獲取RunnableRunnable take();//獲取任務(wù)隊列中任務(wù)的數(shù)量int size(); }自定義阻塞隊列LinkedRunnableQueue的示例:
import java.util.LinkedList;public class LinkedRunnableQueue implements RunnableQueue {//任務(wù)隊列的最大容量,在構(gòu)造時傳入private final int limit;//若任務(wù)隊列中的任務(wù)已經(jīng)滿了,則需要執(zhí)行拒絕策略 private final DenyPolicy denyPolicy;//存放任務(wù)的隊列private final LinkedList<Runnable> runnableList = new LinkedList<>();private final ThreadPool threadPool;public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool){this.limit = limit;this.denyPolicy = denyPolicy;this.threadPool = threadPool;} } // 在LinkedRunnableQueue中有幾個重要的屬性,第一個是limit,也就是Runnable隊列的上限;當(dāng)提交的Runnable數(shù)量達到limit上限時,則會調(diào)用DenyPolicy的reject方法;runnableList是一個雙向循環(huán)列表,用于存放Runnable任務(wù)@Override public void offer(Runnable runnable) {synchronized (runnableList) {if (runnableList.size() >= limit){//無法容納新的任務(wù)時執(zhí)行拒絕策略denyPolicy.reject(runnable, threadPool);} else{//將任務(wù)加入到隊尾,并且喚醒阻塞中的線程runnableList.addLast(runnable);runnableList.notifyAll();}} }//offer方法是一個同步方法,如果隊列數(shù)量達到了上限,則會執(zhí)行拒絕策略,否則會將runnable存放至隊列中,同時喚醒take任務(wù)的線程: @Override public Runnable take() throws InterruptedException {synchronized (runnableList){while (runnableList.isEmpty()){try{//如果任務(wù)隊列中沒有可執(zhí)行任務(wù),則當(dāng)前線程將會掛起,進入runnableList關(guān)聯(lián)的monitor waitset中等待喚醒(新的任務(wù)加入)runnableList.wait();} catch (InterruptedException e){//被中斷時需要將該異常拋出throw e;}}//從任務(wù)隊列頭部移除一個任務(wù)return runnableList.removeFirst();} }//take方法也是同步方法,線程不斷從隊列中獲取Runnable任務(wù),當(dāng)隊列為空的時候工作線程會陷入阻塞,有可能在阻塞的過程中被中斷,為了傳遞中斷信號需要在catch語句塊中將異常拋出以通知上游(InternalTask),示例代碼如下: @Override public int size() {synchronized (runnableList){//返回當(dāng)前任務(wù)隊列中的任務(wù)數(shù)return runnableList.size();} //其中,size方法用于返回runnableList的任務(wù)個數(shù)。 }3.實現(xiàn)自己設(shè)計的線程池
public class BasicThreadPool extends Thread implements ThreadPool {//初始化線程數(shù)量private final int initSize;//線程池最大線程數(shù)量private final int maxSize;//線程池核心線程數(shù)量private final int coreSize;//當(dāng)前活躍的線程數(shù)量private int activeCount;//創(chuàng)建線程所需的工廠private final ThreadFactory threadFactory;//任務(wù)隊列private final RunnableQueue runnableQueue;//線程池是否已經(jīng)被shutdownprivate volatile boolean isShutdown = false;//工作線程隊列private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();private final long keepAliveTime;private final TimeUnit timeUnit;//構(gòu)造時需要傳遞的參數(shù):初始的線程數(shù)量,最大的線程數(shù)量,核心線程數(shù)量,任務(wù)隊列的最大數(shù)量public BasicThreadPool(int initSize, int maxSize, int coreSize,int queueSize){this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY,queueSize, DEFAULT_DENY_POLICY, 10, TimeUnit.SECONDS);}//構(gòu)造線程池時需要傳入的參數(shù),該構(gòu)造函數(shù)需要的參數(shù)比較多public BasicThreadPool(int initSize, int maxSize, int coreSize,ThreadFactory threadFactory, int queueSize, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit){this.initSize = initSize;this.maxSize = maxSize;this.coreSize = coreSize;this.threadFactory = threadFactory;this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);this.keepAliveTime = keepAliveTime;this.timeUnit = timeUnit;this.init();}//初始化時,先創(chuàng)建initSize個線程private void init(){start();for (int i = 0; i < initSize; i++){newThread();}} //提交任務(wù)非常簡單,只是將Runnable插入runnableQueue中即可@Override public void execute(Runnable runnable) {if (this.isShutdown)throw new IllegalStateException("The thread pool is destroy");//提交任務(wù)只是簡單地往任務(wù)隊列中插入Runnablethis.runnableQueue.offer(runnable); } ,線程池自動維護代碼如下: private void newThread() {//創(chuàng)建任務(wù)線程,并且啟動InternalTask internalTask = new InternalTask(runnableQueue);Thread thread = this.threadFactory.createThread(internalTask);ThreadTask threadTask = new ThreadTask(thread, internalTask);threadQueue.offer(threadTask);this.activeCount++;thread.start();} private void removeThread() {//從線程池中移除某個線程ThreadTask threadTask = threadQueue.remove();threadTask.internalTask.stop();this.activeCount--; } @Override public void run() {//run方法繼承自Thread,主要用于維護線程數(shù)量,比如擴容、回收等工作while (!isShutdown && !isInterrupted()){try{timeUnit.sleep(keepAliveTime);} catch (InterruptedException e){isShutdown = true;break;}synchronized (this){if (isShutdown)break;//當(dāng)前的隊列中有任務(wù)尚未處理,并且activeCount< coreSize則繼續(xù)擴容if (runnableQueue.size() > 0&& activeCount < coreSize){for (int i = initSize; i < coreSize; i++){newThread();}//continue的目的在于不想讓線程的擴容直接達到maxsizecontinue;} //當(dāng)前的隊列中有任務(wù)尚未處理,并且activeCount< maxSize則繼續(xù)擴容if (runnableQueue.size() > 0&& activeCount < maxSize){for (int i = coreSize; i < maxSize; i++){newThread();}}//如果任務(wù)隊列中沒有任務(wù),則需要回收,回收至coreSize即可if (runnableQueue.size()==0&& activeCount > coreSize){for (int i = coreSize; i < activeCount; i++){removeThread();}}}} }//ThreadTask只是InternalTask和Thread的一個組合 private static class ThreadTask {public ThreadTask(Thread thread, InternalTask internalTask){this.thread = thread;this.internalTask = internalTask;}Thread thread;InternalTask internalTask; }自己設(shè)計的線程池測試代碼
import java.util.concurrent.TimeUnit;public class ThreadPoolTest {public static void main(String[] args) throws InterruptedException{ //定義線程池,初始化線程數(shù)為2,核心線程數(shù)為4,最大線程數(shù)為6,任務(wù)隊列最多允許1000個任務(wù)for (int i = 0; i < 20; i++)threadPool.execute(() ->{try{TimeUnit.SECONDS.sleep(10); System.out.println(Thread.currentThread().getName() + " is running and done.");} catch (InterruptedException e){e.printStackTrace();}});for (; ; ){//不斷輸出線程池的信息System.out.println("getActiveCount:" + threadPool.getActiveCount());System.out.println("getQueueSize:" + threadPool.getQueueSize());System.out.println("getCoreSize:" + threadPool.getCoreSize());System.out.println("getMaxSize:" + threadPool.getMaxSize());System.out.println("======================================");TimeUnit.SECONDS.sleep(5);}} }用java的ThreadPoolExecutor自定義線程池
自定義線程池需要用到ThreadPoolExecutor,這個類提供ExecutorService執(zhí)行方法的默認實現(xiàn)。 此類使用newTaskFor返回的RunnableFuture實現(xiàn)submit 、 invokeAny和invokeAll方法,默??認為該包中提供的FutureTask類。
2.1.1:ThreadPoolExecutor部分源碼
構(gòu)造方法:public ThreadPoolExecutor(int corePoolSize,/核心線程數(shù)量int maximumPoolSize,//最大線程數(shù)long keepAliveTime,/最大空閑時間TimeUnit unit,時間單位BlockingQueue<Runnable>workQueue,/任務(wù)隊列ThreadFactory threadFactory,/線程工廠RejectedExecutionHandler handler/∥飽和處理機制){…}自定義線程池-參數(shù)設(shè)計分析
◆通過觀察Java中的內(nèi)置線程池參數(shù)講解和線程池工作流程總結(jié)我們不難發(fā)現(xiàn)要設(shè)計一個好的線程池,就必須合理的設(shè)置線程池的4個參數(shù)那到底該如何合理的設(shè)計4個參數(shù)的值呢?我們起往下看.
4個參數(shù)的設(shè)計
1:核心線程數(shù)( corepoolsize)
核心線程數(shù)的設(shè)計需要依據(jù)任務(wù)的處理時間可和每秒產(chǎn)生的任務(wù)數(shù)量來確定例如執(zhí)行個任務(wù)常要0.1秒系統(tǒng)百分之80的時間每秒都會產(chǎn)生100個任務(wù)那么要想在1秒內(nèi)處理完這100個任務(wù)就需要10個線程此時我們就可以設(shè)計核心線程數(shù)為10,當(dāng)然實時情兄不可能這么平均所以我們般按照8020原則設(shè)計即可,既技照百分之80的情況設(shè)計核心線程數(shù)剩下的百分之20可以利用最大線程數(shù)處理
2:任務(wù)隊列長度( workqueue)
任務(wù)隊列長度一般設(shè)計為線程數(shù)/單個任務(wù)執(zhí)行時可2即可,如上面的場中核心線程數(shù)設(shè)計為10單個任務(wù)執(zhí)行時可為
0.1秒.則隊列長度可以設(shè)計為200
3:最大線程數(shù)(maximumPoolSize)
最大線程數(shù)的設(shè)計除了需要參照核心線程數(shù)的條件外,還需要參照系統(tǒng)每秒產(chǎn)生的最大任務(wù)數(shù)決定例如:上述環(huán)境中,如果系統(tǒng)每秒最大產(chǎn)生的任務(wù)是1000個,那么最大線程數(shù)=(最大任務(wù)數(shù)-任務(wù)隊列長度)*單個任務(wù)執(zhí)行時間:既:最大線程數(shù)=(1000-200)*0.1=80個:
4:最大空閑時間(keepAliveTime)
這個參數(shù)的設(shè)計完全參考系統(tǒng)運行環(huán)境和硬件壓力設(shè)定沒有固定的參考值用戶可以根據(jù)經(jīng)驗和系統(tǒng)產(chǎn)生任務(wù)的時間間隔合理設(shè)置一個值即可;
自定義線程池-實現(xiàn)步驟示例
1編寫任務(wù)類( My Task),實現(xiàn) Runnable接口
2編寫線程類( My Worker)用于執(zhí)行任務(wù)需要持有所有任務(wù);
3編寫線程池類( Mythread Pool),包含提交任務(wù)執(zhí)行任務(wù)的能力;
4編寫測試類( Mytest)創(chuàng)建線程池對象提交多個任務(wù)
例子:
public static void main(String[] args)throws ExecutionException, InterruptedException {// ① 創(chuàng)建ThreadPoolExecutor,7個構(gòu)造參數(shù) ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30,TimeUnit.SECONDS,new ArrayBlockingQueue<>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());// ② 提交執(zhí)行異步任務(wù),不關(guān)注返回值 executor.execute(() -> System.out.println(" execute the runnable task"));// ③ 提交執(zhí)行異步任務(wù),關(guān)注返回值 Future<String> future = executor.submit(() -> " Execute the callable task and this is the result");// ④獲取并輸出callable任務(wù)的返回值System.out.println(future.get()); }Exectors創(chuàng)建內(nèi)置線程池
注:《阿里巴巴Java開發(fā)手冊》中強制線程池不允許使用 Executors 去創(chuàng)建,而是通過ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風(fēng)險
Executors 返回線程池對象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor : 允許請求的隊列長度為Integer.MAX_VALUE,可能堆積大量的請求,從而導(dǎo)致OOM。
CachedThreadPool 和 ScheduledThreadPool : 允許創(chuàng)建的線程數(shù)量為Integer.MAX_VALUE ,可能會創(chuàng)建大量線程,從而導(dǎo)致OOM。
ExecutorService介紹和示例
Executors其實是個工具類,里面提供了好多靜態(tài)方法,這些方法根據(jù)用戶選擇返回不同的線程池實例。 ThreadPoolExecutor繼承了AbstractExecutorService,成員變量ctl是一個Integer的原子變量,用來記錄線程池狀態(tài)和線程池中線程個數(shù),類似于ReentrantReadWriteLock使用一個變量來保存兩種信息。
獲取ExecutorServicei可以利用JDK中的Executors類中的靜態(tài)方法,常用獲取方式如下:
???static ExecutorService newCachedThreadPool(創(chuàng)建一個默認的線程池對象,里面的線程可重用,且在第一次使用時才創(chuàng)建static ExecutorService ,最多線程個數(shù)為Integer.MAX_VALUE,并且阻塞隊列為同步隊列。keeyAliveTime=60說明只要當(dāng)前線程在60s內(nèi)空閑則回收。這個類型的特殊之處在于,加入同步隊列的任務(wù)會被馬上執(zhí)行,同步隊列里面最多只有一個任務(wù)。
???static ExecutorService newFixedThreadPool(int n Threads)創(chuàng)建一個可重用固定線程數(shù)的線程池并且阻塞隊列長度為Integer.MAX_VALUE。keeyAliveTime=0說明只要線程個數(shù)比核心線程個數(shù)多并且當(dāng)前空閑則回收。
static ExecutorService newFixedThreadPool(int n
???static ExecutorService newSingleThreadExecutor()創(chuàng)建一個使用單個worker線程的Executor,以無界隊列方式來運行該線程。并且阻塞隊列長度為Integer.MAX_VALUE。keeyAliveTime=0說明只要線程個數(shù)比核心線程個數(shù)多并且當(dāng)前空閑則回收。
上面三個構(gòu)造方法都有帶ThreadFactory的重載方法,用于自定義線程創(chuàng)建的方式。
例子:
//創(chuàng)建一個線程池 ExecutorService pool = Executors.newFixedThreadPool(taskSize); // 創(chuàng)建多個有返回值的任務(wù) List<Future> list = new ArrayList<Future>(); for (int i = 0; i < taskSize; i++) { Callable c = new MyCallable(i + " "); // 執(zhí)行任務(wù)并獲取Future 對象 Future f = pool.submit(c); list.add(f); } // 關(guān)閉線程池 pool.shutdown(); // 獲取所有并發(fā)任務(wù)的運行結(jié)果 for (Future f : list) { // 從Future 對象上獲取任務(wù)的返回值,并輸出到控制臺 System.out.println("res:" + f.get().toString());Scheduledexecutorservice
Scheduledexecutorservice,是 ExecutorService的子接口,具備了延遲運行或定期執(zhí)行任務(wù)的能力,常用獲取方式如下
static Scheduledexecutorservice newscheduled Threadpool(int corepoolsize創(chuàng)建一個可重用固定線程數(shù)的線程池且允許延遲運行或定期執(zhí)行任務(wù)
static Scheduledexecutorservice newscheduledthread Pool(int corepoolsize, Threadfactory threadfactory)
創(chuàng)建一個可重用固定線程數(shù)的線程池且線程池中的所有線程都使用 Thread Factory來創(chuàng)建,且允許延遲運行或定期執(zhí)行任務(wù);
static Scheduledexecutorservice newsinglethreadscheduledexecutor(Threadfactory threadfactory)創(chuàng)建一個單線程執(zhí)行程序,它可安排在給定延退后運行命令或者定期地執(zhí)行。
例子:
ScheduledExecutorService scheduledThreadPool= Executors.newScheduledThreadPool(3); scheduledThreadPool.schedule(newRunnable(){@Overridepublic void run() { System.out.println("延遲三秒"); } }, 3, TimeUnit.SECONDS); scheduledThreadPool.scheduleAtFixedRate(newRunnable(){ @Override public void run() { System.out.println("延遲1秒后每三秒執(zhí)行一次");} },1,3,TimeUnit.SECONDS);總結(jié)
以上是生活随笔為你收集整理的自定义线程池内置线程池的使用 ThreadPoolExecutor和Executorservice 示例与注意事项的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用redis+jwt保存在线用户和获得在
- 下一篇: redis+aop防重复提交