并发新特性—Executor 框架与线程池
Executor 框架簡介
在 Java 5 之后,并發編程引入了一堆新的啟動、調度和管理線程的API。Executor 框架便是 Java 5 中引入的,其內部使用了線程池機制,它在 java.util.cocurrent 包下,通過該框架來控制線程的啟動、執行和關閉,可以簡化并發編程的操作。因此,在 Java 5之后,通過 Executor 來啟動線程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用線程池實現,節約開銷)外,還有關鍵的一點:有助于避免 this 逃逸問題——如果我們在構造器中啟動一個線程,因為另一個任務可能會在構造器結束之前開始執行,此時可能會訪問到初始化了一半的對象用 Executor 在構造器中。
Executor 框架包括:線程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable 等。
Executor 接口中之定義了一個方法 execute(Runnable command),該方法接收一個 Runable 實例,它用來執行一個任務,任務即一個實現了 Runnable 接口的類。ExecutorService 接口繼承自 Executor 接口,它提供了更豐富的實現多線程的方法,比如,ExecutorService 提供了關閉自己的方法,以及可為跟蹤一個或多個異步任務執行狀況而生成 Future 的方法。 可以調用 ExecutorService 的 shutdown()方法來平滑地關閉 ExecutorService,調用該方法后,將導致 ExecutorService 停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢后將會關閉 ExecutorService。因此我們一般用該接口來實現和管理多線程。
ExecutorService 的生命周期包括三種狀態:運行、關閉、終止。創建后便進入運行狀態,當調用了 shutdown()方法時,便進入關閉狀態,此時意味著 ExecutorService 不再接受新的任務,但它還在執行已經提交了的任務,當素有已經提交了的任務執行完后,便到達終止狀態。如果不調用 shutdown()方法,ExecutorService 會一直處在運行狀態,不斷接收新的任務,執行新的任務,服務器端一般不需要關閉它,保持一直運行即可。
Executors 提供了一系列工廠方法用于創先線程池,返回的線程池都實現了 ExecutorService 接口。
public static ExecutorService newFixedThreadPool(int nThreads)
創建固定數目線程的線程池。
public static ExecutorService newCachedThreadPool()
創建一個可緩存的線程池,調用execute將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則創建一個新線 程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。
public static ExecutorService newSingleThreadExecutor()
創建一個單線程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
創建一個支持定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類。
這四種方法都是用的 Executors 中的 ThreadFactory 建立的線程,下面就以上四個方法做個比較:
newCachedThreadPool()
- 緩存型池子,先查看池中有沒有以前建立的線程,如果有,就 reuse 如果沒有,就建一個新的線程加入池中
- 緩存型池子通常用于執行一些生存期很短的異步型任務 因此在一些面向連接的 daemon 型 SERVER 中用得不多。但對于生存期短的異步任務,它是 Executor 的首選。
能 reuse 的線程,必須是 timeout IDLE 內的池中線程,缺省 timeout 是 60s,超過這個 IDLE 時長,線程實例將被終止及移出池。
注意,放入 CachedThreadPool 的線程不必擔心其結束,超過 TIMEOUT 不活動,其會自動被終止。
newFixedThreadPool(int)
- newFixedThreadPool 與 cacheThreadPool 差不多,也是能 reuse 就用,但不能隨時建新的線程。
- 其獨特之處:任意時間點,最多只能有固定數目的活動線程存在,此時如果有新的線程要建立,只能放在另外的隊列中等待,直到當前的線程中某個線程終止直接被移出池子。
- 和 cacheThreadPool 不同,FixedThreadPool 沒有 IDLE 機制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的 TCP 或 UDP IDLE 機制之類的),所以 FixedThreadPool 多數針對一些很穩定很固定的正規并發線程,多用于服務器。
- 從方法的源代碼看,cache池和fixed 池調用的是同一個底層 池,只不過參數不同:
- fixed 池線程數固定,并且是0秒IDLE(無IDLE)。
- cache 池線程數支持 0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60 秒 IDLE 。
newScheduledThreadPool(int)
- 調度型線程池
- 這個池子里的線程可以按 schedule 依次 delay 執行,或周期執行
SingleThreadExecutor()
- 單例線程,任意時間池中只能有一個線程
- 用的是和 cache 池和 fixed 池相同的底層池,但線程數目是 1-1,0 秒 IDLE(無 IDLE)
一般來說,CachedTheadPool 在程序執行過程中通常會創建與所需數量相同的線程,然后在它回收舊線程時停止創建新線程,因此它是合理的 Executor 的首選,只有當這種方式會引發問題時(比如需要大量長時間面向連接的線程時),才需要考慮用 FixedThreadPool。(該段話摘自《Thinking in Java》第四版)
Executor 執行 Runnable 任務
通過 Executors 的以上四個靜態工廠方法獲得 ExecutorService 實例,而后調用該實例的 execute(Runnable command)方法即可。一旦 Runnable 任務傳遞到 execute()方法,該方法便會自動在一個線程上執行。下面是 Executor 執行 Runnable 任務的示例代碼:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestCachedThreadPool{ public static void main(String[] args){ ExecutorService executorService = Executors.newCachedThreadPool(); // ExecutorService executorService = Executors.newFixedThreadPool(5); // ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++){ executorService.execute(new TestRunnable()); System.out.println("************* a" + i + " *************"); } executorService.shutdown(); } } class TestRunnable implements Runnable{ public void run(){ System.out.println(Thread.currentThread().getName() + "線程被調用了。"); } }執行后的結果如下:
從結果中可以看出,pool-1-thread-1 和 pool-1-thread-2 均被調用了兩次,這是隨機的,execute 會首先在線程池中選擇一個已有空閑線程來執行任務,如果線程池中沒有空閑線程,它便會創建一個新的線程來執行任務。
Executor 執行 Callable 任務
在 Java 5 之后,任務分兩類:一類是實現了 Runnable 接口的類,一類是實現了 Callable 接口的類。兩者都可以被 ExecutorService 執行,但是 Runnable 任務沒有返回值,而 Callable 任務有返回值。并且 Callable 的 call()方法只能通過 ExecutorService 的 submit(Callable?task) 方法來執行,并且返回一個?Future,是表示任務等待完成的 Future。
Callable 接口類似于 Runnable,兩者都是為那些其實例可能被另一個線程執行的類設計的。但是 Runnable 不會返回結果,并且無法拋出經過檢查的異常而 Callable 又返回結果,而且當獲取返回結果時可能會拋出異常。Callable 中的 call()方法類似 Runnable 的 run()方法,區別同樣是有返回值,后者沒有。
當將一個 Callable 的對象傳遞給 ExecutorService 的 submit 方法,則該 call 方法自動在一個線程上執行,并且會返回執行結果 Future 對象。同樣,將 Runnable 的對象傳遞給 ExecutorService 的 submit 方法,則該 run 方法自動在一個線程上執行,并且會返回執行結果 Future 對象,但是在該 Future 對象上調用 get 方法,將返回 null。
下面給出一個 Executor 執行 Callable 任務的示例代碼:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; public class CallableDemo{ public static void main(String[] args){ ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<String>> resultList = new ArrayList<Future<String>>(); //創建10個任務并執行 for (int i = 0; i < 10; i++){ //使用ExecutorService執行Callable類型的任務,并將結果保存在future變量中 Future<String> future = executorService.submit(new TaskWithResult(i)); //將任務執行結果存儲到List中 resultList.add(future); } //遍歷任務的結果 for (Future<String> fs : resultList){ try{ while(!fs.isDone);//Future返回如果沒有完成,則一直循環等待,直到Future返回完成 System.out.println(fs.get()); //打印各個線程(任務)執行的結果 }catch(InterruptedException e){ e.printStackTrace(); }catch(ExecutionException e){ e.printStackTrace(); }finally{ //啟動一次順序關閉,執行以前提交的任務,但不接受新任務 executorService.shutdown(); } } } } class TaskWithResult implements Callable<String>{ private int id; public TaskWithResult(int id){ this.id = id; } /** * 任務的具體過程,一旦任務傳給ExecutorService的submit方法, * 則該方法自動在一個線程上執行 */ public String call() throws Exception { System.out.println("call()方法被自動調用!!! " + Thread.currentThread().getName()); //該返回結果將被Future的get方法得到 return "call()方法被自動調用,任務返回的結果是:" + id + " " + Thread.currentThread().getName(); } }執行結果如下:
從結果中可以同樣可以看出,submit 也是首先選擇空閑線程來執行任務,如果沒有,才會創建新的線程來執行任務。另外,需要注意:如果 Future 的返回尚未完成,則 get()方法會阻塞等待,直到 Future 完成返回,可以通過調用 isDone()方法判斷 Future 是否完成了返回。
自定義線程池
自定義線程池,可以用 ThreadPoolExecutor 類創建,它有多個構造方法來創建線程池,用該類很容易實現自定義的線程池,這里先貼上示例程序:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolTest{ public static void main(String[] args){ //創建等待隊列 BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20); //創建線程池,池中保存的線程數為3,允許的最大線程數為5 ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue); //創建七個任務 Runnable t1 = new MyThread(); Runnable t2 = new MyThread(); Runnable t3 = new MyThread(); Runnable t4 = new MyThread(); Runnable t5 = new MyThread(); Runnable t6 = new MyThread(); Runnable t7 = new MyThread(); //每個任務會在一個線程上執行 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); pool.execute(t6); pool.execute(t7); //關閉線程池 pool.shutdown(); } } class MyThread implements Runnable{ @Override public void run(){ System.out.println(Thread.currentThread().getName() + "正在執行。。。"); try{ Thread.sleep(100); }catch(InterruptedException e){ e.printStackTrace(); } } }運行結果如下:
從結果中可以看出,七個任務是在線程池的三個線程上執行的。這里簡要說明下用到的 ThreadPoolExecuror 類的構造方法中各個參數的含義。
public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)corePoolSize:線程池中所保存的核心線程數,包括空閑線程。
maximumPoolSize:池中允許的最大線程數。
keepAliveTime:線程池中的空閑線程所能持續的最長時間。
unit:持續時間的單位。
- workQueue:任務執行前保存任務的隊列,僅保存由 execute 方法提交的 Runnable 任務。
根據 ThreadPoolExecutor 源碼前面大段的注釋,我們可以看出,當試圖通過 excute 方法講一個 Runnable 任務添加到線程池中時,按照如下順序來處理:
如果線程池中的線程數量少于 corePoolSize,即使線程池中有空閑線程,也會創建一個新的線程來執行新添加的任務;
如果線程池中的線程數量大于等于 corePoolSize,但緩沖隊列 workQueue 未滿,則將新添加的任務放到 workQueue 中,按照 FIFO 的原則依次等待執行(線程池中有線程空閑出來后依次將緩沖隊列中的任務交付給空閑的線程執行);
如果線程池中的線程數量大于等于 corePoolSize,且緩沖隊列 workQueue 已滿,但線程池中的線程數量小于 maximumPoolSize,則會創建新的線程來處理被添加的任務;
總結起來,也即是說,當有新的任務要處理時,先看線程池中的線程數量是否大于 corePoolSize,再看緩沖隊列 workQueue 是否滿,最后看線程池中的線程數量是否大于 maximumPoolSize。
另外,當線程池中的線程數量大于 corePoolSize 時,如果里面有線程的空閑時間超過了 keepAliveTime,就將其移除線程池,這樣,可以動態地調整線程池中線程的數量。
我們大致來看下 Executors 的源碼,newCachedThreadPool 的不帶 RejectedExecutionHandler 參數(即第五個參數,線程數量超過 maximumPoolSize 時,指定處理方式)的構造方法如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }它將 corePoolSize 設定為 0,而將 maximumPoolSize 設定為了 Integer 的最大值,線程空閑超過 60 秒,將會從線程池中移除。由于核心線程數為 0,因此每次添加任務,都會先從線程池中找空閑線程,如果沒有就會創建一個線程(SynchronousQueue決定的,后面會說)來執行新的任務,并將該線程加入到線程池中,而最大允許的線程數為 Integer 的最大值,因此這個線程池理論上可以不斷擴大。
再來看 newFixedThreadPool 的不帶 RejectedExecutionHandler 參數的構造方法,如下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }它將 corePoolSize 和 maximumPoolSize 都設定為了 nThreads,這樣便實現了線程池的大小的固定,不會動態地擴大,另外,keepAliveTime 設定為了 0,也就是說線程只要空閑下來,就會被移除線程池,敢于 LinkedBlockingQueue 下面會說。
幾種排隊的策略
直接提交。緩沖隊列采用 SynchronousQueue,它將任務直接交給線程處理而不保持它們。如果不存在可用于立即運行任務的線程(即線程池中的線程都在工作),則試圖把任務加入緩沖隊列將會失敗,因此會構造一個新的線程來處理新添加的任務,并將其加入到線程池中。直接提交通常要求無界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒絕新提交的任務。newCachedThreadPool 采用的便是這種策略。
無界隊列。使用無界隊列(典型的便是采用預定義容量的 LinkedBlockingQueue,理論上是該緩沖隊列可以對無限多的任務排隊)將導致在所有 corePoolSize 線程都工作的情況下將新任務加入到緩沖隊列中。這樣,創建的線程就不會超過 corePoolSize,也因此,maximumPoolSize 的值也就無效了。當每個任務完全獨立于其他任務,即任務執行互不影響時,適合于使用無界隊列。newFixedThreadPool采用的便是這種策略。
- 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(一般緩沖隊列使用 ArrayBlockingQueue,并制定隊列的最大長度)有助于防止資源耗盡,但是可能較難調整和控制,隊列大小和最大池大小需要相互折衷,需要設定合理的參數。
from:?http://wiki.jikexueyuan.com/project/java-concurrency/executor.html
總結
以上是生活随笔為你收集整理的并发新特性—Executor 框架与线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解 Java 线程池:Thread
- 下一篇: Java虚拟机学习(8):查看JVM参数