[改善Java代码]适时选择不同的线程池来实现
Java的線程池實(shí)現(xiàn)從最根本上來(lái)說(shuō)只有兩個(gè):ThreadPoolExecutor類和ScheduledThreadPoolExecutor類,這兩個(gè)類還是父子關(guān)系,但是Java為了簡(jiǎn)化并行計(jì)算,還提供了一個(gè)Executors的靜態(tài)類,它可以直接生成多種不同的線程池執(zhí)行器,比如單線程執(zhí)行器,帶緩沖功能的執(zhí)行器等.但歸根結(jié)底還是使ThreadPoolExecutor類或ScheduledThreadPoolExecutor類的封裝類.
?為了了解這些個(gè)執(zhí)行器,看ThreadPoolExecutor類,其中它復(fù)雜的構(gòu)造函數(shù)可以很好的解釋該線程池的作用:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}?
上面第二個(gè)是ThreadPoolExecutor最完整的構(gòu)造函數(shù),其他的構(gòu)造函數(shù)都是引用該構(gòu)造函數(shù)實(shí)現(xiàn)的.
1.corePoolSize:最小線程數(shù)
線程池啟動(dòng)后,池中保持線程的最小數(shù)量,需要說(shuō)明的是線程數(shù)量是逐步到達(dá)corePoolSize值的,例如corePoolSize被設(shè)置為10,而任務(wù)數(shù)量只有5,則線程池中最多啟動(dòng)5個(gè)線程,而不是一次性啟動(dòng)10個(gè)線程.
2.maximumPoolSize:最大線程數(shù)量
這是池中能夠榮達(dá)的最大線程數(shù),如果超出,則使用RejectedExecutionHandler拒絕策略處理.
3.keepAliveTime:線程最大生命期
這個(gè)生命周期有兩個(gè)約束條件:一是該參數(shù)針對(duì)的是超過corePoolSize數(shù)量的線程,二是處于非運(yùn)行狀態(tài)的線程.
如果corePoolSize為10,maximumPoolSize為20,此時(shí)線程池中有15個(gè)線程在運(yùn)行,一段時(shí)間后,其中有3個(gè)線程處于等待狀態(tài)的時(shí)間超過了keepAliveTime指定的時(shí)間,則結(jié)束這3個(gè)線程,
此時(shí)線程中則還有12個(gè)線程正在運(yùn)行.
4.unit:時(shí)間單位
這是keepAliveTime的時(shí)間單位,可以是納秒,毫秒,秒,分鐘等選項(xiàng)
5.workQueue:任務(wù)隊(duì)列
當(dāng)線程池中的線程都處于運(yùn)行狀態(tài),而此時(shí)任務(wù)數(shù)量繼續(xù)增加,則需要有一個(gè)容器來(lái)容納這些任務(wù),這就是任務(wù)隊(duì)列.
6.threadFactory:線程工廠
定義如何啟動(dòng)一個(gè)線程,可以設(shè)置線程名稱,并且可以確認(rèn)是否是后臺(tái)線程等.
7.handler:拒絕任務(wù)處理器
由于超出線程數(shù)量和任務(wù)隊(duì)列容量而對(duì)繼續(xù)增加的任務(wù)進(jìn)行處理的程序.
?
Executors提供的幾個(gè)創(chuàng)建線程池的便捷方法:
1.newSingleThreadExecutors:單線程池
顧名思義就是一個(gè)池中就只有一個(gè)線程,該線程用不超時(shí),而且由于是一個(gè)線程,當(dāng)有多個(gè)任務(wù)需要處理時(shí),會(huì)將它們放置到一個(gè)無(wú)界阻塞隊(duì)列中逐個(gè)處理.
1 /** 2 * Creates an Executor that uses a single worker thread operating 3 * off an unbounded queue. (Note however that if this single 4 * thread terminates due to a failure during execution prior to 5 * shutdown, a new one will take its place if needed to execute 6 * subsequent tasks.) Tasks are guaranteed to execute 7 * sequentially, and no more than one task will be active at any 8 * given time. Unlike the otherwise equivalent 9 * <tt>newFixedThreadPool(1)</tt> the returned executor is 10 * guaranteed not to be reconfigurable to use additional threads. 11 * 12 * @return the newly created single-threaded Executor 13 */ 14 public static ExecutorService newSingleThreadExecutor() { 15 return new FinalizableDelegatedExecutorService 16 (new ThreadPoolExecutor(1, 1, 17 0L, TimeUnit.MILLISECONDS, 18 new LinkedBlockingQueue<Runnable>())); 19 }?
改方法的使用代碼如下:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;public class Client {public static void main(String[] args) throws Exception {//創(chuàng)建單線程執(zhí)行器ExecutorService es = Executors.newSingleThreadExecutor();//執(zhí)行一個(gè)任務(wù)Future<String> future = es.submit(new Callable<String>() {public String call() throws Exception {return "";}});//獲得任務(wù)執(zhí)行后的返回值System.out.println("返回值:" + future.get());//關(guān)閉執(zhí)行器 es.shutdown();} }?
2.newCachedThreadPool:緩沖功能的線程池
建立了一個(gè)線程池,該線程池的數(shù)量是沒有限制的(不能超過Integer的最大值),新增一個(gè)任務(wù)就有一個(gè)線程處理,或者復(fù)用之前的空閑線程,或者啟動(dòng)一個(gè)新的線程.但是一旦一個(gè)線程在60秒內(nèi)一直是處于等待狀態(tài)時(shí),也就是1分鐘沒有事情可做,就會(huì)被終止,源代碼:
/*** Creates a thread pool that creates new threads as needed, but* will reuse previously constructed threads when they are* available. These pools will typically improve the performance* of programs that execute many short-lived asynchronous tasks.* Calls to <tt>execute</tt> will reuse previously constructed* threads if available. If no existing thread is available, a new* thread will be created and added to the pool. Threads that have* not been used for sixty seconds are terminated and removed from* the cache. Thus, a pool that remains idle for long enough will* not consume any resources. Note that pools with similar* properties but different details (for example, timeout parameters)* may be created using {@link ThreadPoolExecutor} constructors.** @return the newly created thread pool*/public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}?
這里需要說(shuō)明的是,任務(wù)隊(duì)列使用了同步阻塞隊(duì)列,這意味著向隊(duì)列中加入一個(gè)元素,即可喚醒一個(gè)線程(新創(chuàng)建的線程或復(fù)用池中空閑線程)來(lái)處理.這種隊(duì)列已經(jīng)沒有隊(duì)列深度的概念了.
3.newFixedThreadPool:固定線程數(shù)量的線程池
在初始化時(shí)已經(jīng)決定了線程的最大數(shù)量,若任務(wù)添加的能力超出線程處理能力,則建立阻塞隊(duì)列容納多余的任務(wù),源代碼:
/*** Creates a thread pool that reuses a fixed number of threads* operating off a shared unbounded queue. At any point, at most* <tt>nThreads</tt> threads will be active processing tasks.* If additional tasks are submitted when all threads are active,* they will wait in the queue until a thread is available.* If any thread terminates due to a failure during execution* prior to shutdown, a new one will take its place if needed to* execute subsequent tasks. The threads in the pool will exist* until it is explicitly {@link ExecutorService#shutdown shutdown}.** @param nThreads the number of threads in the pool* @return the newly created thread pool* @throws IllegalArgumentException if {@code nThreads <= 0}*/public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}?
上面返回是一個(gè)ThreadPoolExector,它的corePoolSize和maximumPoolSize是相等的,也就是說(shuō)最大線程數(shù)是nThreads.
如果任務(wù)增長(zhǎng)非常快,超過了LinkedBlockingQueue的最大容量(Integer最大值),那此時(shí)會(huì)如何處理呢?
會(huì)按照ThreadPoolExecutor默認(rèn)的拒絕策略(默認(rèn)是DiscardPolicy,直接丟棄)來(lái)處理.
以上三種線程池執(zhí)行器都是ThreadPoolExecutor的簡(jiǎn)化版,目的是幫助開發(fā)人員屏蔽過多的線程細(xì)節(jié),簡(jiǎn)化多線程開發(fā).
可以這樣比喻:newSingleTheadExecutor,newCachedThreadPool,newFixedThreadPool是線程池的簡(jiǎn)化版,而ThreadPoolExecutor是旗艦版.
?
轉(zhuǎn)載于:https://www.cnblogs.com/DreamDrive/p/5624024.html
總結(jié)
以上是生活随笔為你收集整理的[改善Java代码]适时选择不同的线程池来实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux 上配置swoole
- 下一篇: JVM 内存初学 (堆(heap)、栈(