Java:多线程之线程池
參考博文:https://www.cnblogs.com/dolphin0520/p/3932921.html
前文講過,使用線程的時候就手動創建并啟動一個線程,使用完后線程被銷毀,這樣就會有一個問題:
如果并發的線程數量非常多,并且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁的創建線程就會大大降低系統的效率,因為頻繁的創建線程和銷毀線程都需要時間。
那么有沒有一種辦法使得線程可以服用呢?就是執行完一個任務,并不銷毀,而是可以繼續執行其他任務。
Java為我們提供了線程池來達到這樣的效果。今天我們就來講解一下Java的線程池。首先我們從最核心的 ThreadPoolExecutor 類中的方法講起,然后給出它的使用示例,最后討論如何合理配置線程池的大小。
一、Java中的ThreadPoolExecutor 類
1、ThreadPoolExecutor
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現源碼。
(1)繼承關系與類聲明
java.lang.Object |____java.util.concurrent.AbstractExecutorService|____java.util.concurrent.ThreadPoolExecutorpackage java.util.concurrent; /*** @since 1.5* @author Doug Lea*/ public class ThreadPoolExecutor extends AbstractExecutorService {
(2)構造方法
從代碼上看,ThreadPoolExecutor 繼承了 AbstractExecutorService類,并提供了四個構造器,事實上,通過觀察每個構造器的實現,發現前面三個構造器都是調用的第四個構造器進行初始化工作的。
下面解釋一下構造器中各個參數的含義:
corePoolSize:核心池的大小
這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中并沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建 corePoolSize 個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,才會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中。
maximumPoolSize:線程池最大線程數
這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程。
keepAliveTime:空閑線程存活時間
表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大于corePoolSize,即當線程池中的線程數大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過 corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)(允許核心線程超時)方法,在線程池中的線程數不大于corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
unit:時間單位
參數 keepAliveTime 的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
TimeUnit.DAYS; ? ? ? ? ? ? ? //天 TimeUnit.HOURS; ? ? ? ? ? ? //小時 TimeUnit.MINUTES; ? ? ? ? ? //分鐘 TimeUnit.SECONDS; ? ? ? ? ? //秒 TimeUnit.MILLISECONDS; ? ? ?//毫秒 TimeUnit.MICROSECONDS; ? ? ?//微妙 TimeUnit.NANOSECONDS; ? ? ? //納秒
workQueue:阻塞隊列
一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue; PriorityBlockingQueue LinkedBlockingQueue; SynchronousQueue
ArrayBlockingQueue 和 PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和SynchronousQueue。線程池的排隊策略與BlockingQueue有關。
threadFactory:線程工廠
主要用來創建線程。
handler:拒絕處理策略
表示當拒絕處理任務時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。? ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。? ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程) ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務?
2、AbstractExecutorService類
從上面給出的 ThreadPoolExecutor 類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現:
package java.util.concurrent;/*** @since 1.5* @author Doug Lea*/public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {}public Future<?> submit(Runnable task) {}public <T> Future<T> submit(Runnable task, T result) {}public <T> Future<T> submit(Callable<T> task) {}private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {}public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {}public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {}}
AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。
3、ExecutorService
我們接著看ExecutorService接口的實現:
package java.util.concurrent;/*** @since 1.5* @author Doug Lea*/ public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;}
ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實現:
4、Executor
Executor接口的實現:
?
package java.util.concurrent;/*** @since 1.5* @author Doug Lea*/ public interface Executor {void execute(Runnable command); }
到這里,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關系了。
Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;
然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法;
然后ThreadPoolExecutor繼承了類AbstractExecutorService。
5、ThreadPoolExecutor類中有幾個非常重要的方法:
(1)execute()方法:
方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。
(2)submit()方法:
方法是在 ExecutorService 中聲明的方法,在 AbstractExecutorService 就已經有了具體的實現,在 ThreadPoolExecutor 中并沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和 execute() 方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是調用的execute()方法,只不過它利用了 Future 來獲取任務執行結果。
(3)shutdown()和shutdownNow()方法:
是用來關閉線程池的。
(4)很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法,有興趣的朋友可以自行查閱API。
6、使用示例
任務類:
package basis.stuThreadPool;import java.util.concurrent.TimeUnit;public class MyTask implements Runnable{private int taskNum;public MyTask(int taskNum) {this.taskNum = taskNum;}@Overridepublic void run() {System.out.println("正在執行task"+taskNum);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("task"+taskNum+"執行完畢。");} }
?測試類:
執行結果:
正在執行task0
線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行完的任務數目:0
線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行完的任務數目:0
線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行完的任務數目:0
線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行完的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行完的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行完的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行完的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行完的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行完的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行完的任務數目:0
線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行完的任務數目:0
線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行完的任務數目:0
線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行完的任務數目:0
線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行完的任務數目:0
線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行完的任務數目:0
正在執行task1
正在執行task2
正在執行task3
正在執行task4
正在執行task10
正在執行task11
正在執行task12
正在執行task13
正在執行task14
task0執行完畢。
正在執行task5
task11執行完畢。
task2執行完畢。
task4執行完畢。
task3執行完畢。
task1執行完畢。
task10執行完畢。
task13執行完畢。
task12執行完畢。
task14執行完畢。
正在執行task9
正在執行task6
正在執行task8
正在執行task7
task5執行完畢。
task7執行完畢。
task8執行完畢。
task6執行完畢。
task9執行完畢。
從執行結果可以看出:
當線程池中線程的數目大于5 (corePoolSize)時,便將任務放入任務緩存隊列里面,當任務緩存隊列滿了(capacity = 5)之后,便創建新的線程,線程數目不能大于 10(maximumPoolSize)的值。該線程池中共創建了10個線程(最開始先創建 5 個,任務緩存隊列滿了之后再創建 maximumPoolSize-corePoolSize = 5個),另外,任務緩存隊列中還有 5 個在等待。
如果上面程序中,將for循環中改成執行20個任務,就會拋出任務拒絕異常了。
不過在java doc中,并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類來創建線程池。
7、Executors
Executors 是一個線程池工具類,提供了很多靜態方法,用于簡化線程池的創建和使用。
Executors 中創建線程池的方法:
方法?? ? 描述 newCachedThreadPool()?? ? 創建一個不限容量(最大為Integer.MAX_VALUE)的線程池 newSingleThreadExecutor()?? ? 創建一個單線程的線程池 newFixedThreadExecutor()?? ?創建一個固定容量的線程池
這三個方法的具體實現:
?示例(1)newFixedThreadPool:
任務類:
package basis.stuThreadPool;import java.util.concurrent.TimeUnit;public class MyTask implements Runnable{private int taskNum;public MyTask(int taskNum) {this.taskNum = taskNum;}@Overridepublic void run() {System.out.println("正在執行task"+taskNum);try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("task"+taskNum+"執行完畢。");} }
測試類:
測試結果:
正在執行task0
正在執行task1
正在執行task2
正在執行task3
正在執行task4
task0執行完畢。
正在執行task5
task1執行完畢。
正在執行task6
task3執行完畢。
task4執行完畢。
task2執行完畢。
正在執行task8
正在執行task7
正在執行task9
task5執行完畢。
task6執行完畢。
task8執行完畢。
task7執行完畢。
task9執行完畢。
上述代碼使用Executors創建了一個固定容量為5的線程池。當線程池中線程達到最大容量時,不再添加新的線程,其他線程必須等待線程池中的線程執行完成后,才能進入線程池中執行。
?示例(2)newCachedThreadPool:
修改測試類,把固定容量的線程池改為不限容量的線程池,
//創建一個不限容量的線程池
ExecutorService executor = Executors.newCachedThreadPool();
其他代碼不變,運行測試,結果如下:
正在執行task0
正在執行task1
正在執行task2
正在執行task3
正在執行task4
正在執行task5
正在執行task6
正在執行task7
正在執行task8
正在執行task9
task0執行完畢。
task2執行完畢。
task6執行完畢。
task4執行完畢。
task3執行完畢。
task9執行完畢。
task1執行完畢。
task8執行完畢。
task7執行完畢。
task5執行完畢。
newCachedThreadPool方法會創建一個容量自動調整的線程池,最大容量為Integer.MAXVALUE。每當有新的任務,線程池中就加入一個新的線程。
示例(3)newSingleThreadExecutor:
修改測試類,把不限容量的線程池改為單線程的線程池,
//創建一個單線程的線程池
ExecutorService executor = Executors.newSingleThreadExecutor();
其他代碼不變,運行測試,結果如下:
正在執行task0
task0執行完畢。
正在執行task1
task1執行完畢。
正在執行task2
task2執行完畢。
正在執行task3
task3執行完畢。
正在執行task4
task4執行完畢。
正在執行task5
task5執行完畢。
正在執行task6
task6執行完畢。
正在執行task7
task7執行完畢。
正在執行task8
task8執行完畢。
正在執行task9
task9執行完畢。
newSingleThreadExecutor方法會創建一個單線程的線程池,一次只能有一個任務進入線程池執行,其他線程只能等待線程池中的那個線程執行完畢后才能進入線程池。
總結
以上是生活随笔為你收集整理的Java:多线程之线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JAVA中return与finally的
- 下一篇: springmvc十一: @Reques