多线程小抄集(新编三)
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/java/java-concurrent-note-list-new3/
終止正在運行的線程的三種方法
使用退出標志,是線程正常退出,也就是當run方法完成后線程終止;
使用stop方法強行終止線程,但是不推薦使用這個方法,因為stop和suspend及resume一樣都是作廢過期的方法,使用它們可能產生不可預料的結果;
使用interrupt()方法中斷線程;
線程中斷
interrupted()方法:返回對應線程的中斷標志位是否為true,但它還有一個重要的副作用,就是清空中斷標志位,也就是說,連續兩次調用interrupted(),第一次返回的結果為true,第二次一般就是false (除非同時又發生了一次中斷)。
isInterrupted()方法:返回對應線程的中斷標志位是否為true。
這兩個方法的定義如下:
interrupt()方法:中斷線程。
- 如果線程在等待鎖,對線程對象調用interrupt()只是會設置線程的中斷標志位,線程依然會處于BLOCKED狀態,也就是說,interrupt()并不能使一個在等待鎖的線程真正”中斷”。
- 如果線程尚未啟動(NEW),或者已經結束(TERMINATED),則調用interrupt()對它沒有任何效果,中斷標志位也不會被設置。
- 如果線程處于WAITING/TIMED_WAITING在這些狀態時,對線程對象調用interrupt()會使得該線程拋出InterruptedException。需要注意的是,方法在拋出InterruptedException之前,JVM會先將該線程的中斷標識位清除,然后拋出InterruptedException,此時調用isInterrupted()方法將會返回false。
- 如果線程在運行(Runnable)中,interrupt()只是會設置線程的中斷標志位,沒有任何其它作用。
interrupt方法不一定會真正”中斷”線程,它只是一種協作機制,如果不明白線程在做什么,不應該貿然的調用線程的interrupt方法,以為這樣就能取消線程。對于以線程提供服務的程序模塊而言,它應該封裝取消/關閉操作,提供單獨的取消/關閉方法給調用者。Java并發庫的一些代碼就提供了單獨的取消/關閉方法,比如說,Future接口提供了如下方法以取消任務:boolean cancel(boolean mayInterruptIfRunning)。再比如ExecutorService提供的兩個關閉方法:void shutdown()和 List shutdownNow()。
處理不可中斷的阻塞
對于以下幾種情況,中斷請求只能設置線程的中斷狀態,除此之外沒有其他任何作用:
- Java.io包中的同步Socket I/O:雖然InputStream和OutputStream中的read和write等方法都不會響應中斷,但通過關閉底層的套接字,可以使得由于執行read或write等方法而被阻塞的線程拋出一個SocketException。
- Java.io包中的同步I/O:當中斷一個在InterruptibleChannel上等待的線程時會拋出ClosedByInterrptException并關閉鏈路。當關閉一個InterruptibleChannel時,將導致所有在鏈路操作上阻塞的線程都拋出AsynchronousCloseException。
- Selector的異步I/O:如果一個線程在調用Selector.select方法時阻塞了,那么調用close或wakeup方法會使線程拋出ClosedSelectorException并提前返回。
- 獲得某個鎖:如果一個線程由于等待某個內置鎖而阻塞,那么將無法響應中斷,因為線程認為它肯定會獲得鎖,所以將不會理會中斷請求,但是在Lock類中提供了lockInterruptibly方法,該方法允許在等待一個鎖的同時仍能響應中斷。
線程池
線程池的作用:利用線程池管理并復用線程、控制最大并發數等;實現任務線程隊列緩存策略和飽和策略;實現某些與時間相關的功能,如定時執行、周期執行等;隔離線程環境。可以通過ThreadPoolExecutor來創建一個線程池:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)corePoolSize 表示常駐核心線程數。如果等于0,則任務執行完之后,沒有任何請求進入時銷毀線程池的線程;如果大于0,即使本地任務執行完畢,核心線程也不會被銷毀(除非allowCoreThreadTimeOut設置為true)。向線程池提交一個任務時,若線程池已創建的線程數小于corePoolSize,即便此時存在空閑線程,也會通過創建一個新線程來執行該任務,直到已創建的線程數大于或等于corePoolSize時,才會根據是否存在空閑線程,來決定是否需要創建新的線程。除了利用提交新任務來創建和啟動線程(按需構造),也可以通過 prestartCoreThread() 或 prestartAllCoreThreads() 方法來提前啟動線程池中的基本線程。
maximumPoolSize 表示線程池能夠容納同時執行的最大線程數(>0)。線程池中允許的最大線程數。線程池的阻塞隊列滿了之后,如果還有任務提交,如果當前的線程數小于maximumPoolSize,則會新建線程來執行任務。注意,如果使用的是無界隊列,該參數也就沒有什么效果了。如果maximumPoolSize與coolPoolSize相等,即是固定大小線程池。
keepAliveTime 表示線程池中的線程空閑時間,當空閑時間到達keepAliveTime值時,線程會被銷毀,直到只剩下corePoolSize個線程為止。在默認情況下,當線程池的線程數大于corePoolSize時,這個參數才會起作用。但是當ThreadPoolExecutor的allowCoreThreadTimeOut變量設置為true時,核心線程超時后也會被回收,可以通過ThreadPoolExecutor.allowCoreThreadTimeOut(boolean value)設置。
unit 表示時間單位。keepAliveTime的時間單位通常是TimeUnit.SECONDS。
workQueue 表示緩存隊列。對于無界隊列,可以忽略該參數。
- ArrayBlockingQueue:基于數組結構的有界阻塞隊列,FIFO。
- LinkedBlockingQueue:基于鏈表結構的有界/無界阻塞隊列,FIFO。
- SynchronousQueue:不存儲元素的阻塞隊列,每個插入操作都必須等待一個移出操作,反之亦然。
- PriorityBlockingQueue:具有優先級的無界阻塞隊列。
threadFactory 用于創建新線程。由同一個threadFactory創建的線程,屬于同一個ThreadGroup,創建的線程優先級都為Thread.NORM_PRIORITY,以及是非守護進程狀態。threadFactory創建的線程也是采用new Thread()方式,threadFactory創建的線程名都具有統一的風格:pool-m-thread-n(m為線程池的編號,n為線程池內的線程編號)
handler 表示執行飽和策略的對象。當超過workQueue的任務緩存區上限的時候,就可以通過該策略處理請求。可以實現自己的拒絕策略,例如記錄日志等等,實現RejectedExecutionHandler接口即可。可以拒絕策略有4種:
a. AbortPolicy:直接拋出異常RejectedExecutionException,默認策略
b. CallerRunsPolicy:調用者所在線程來運行該任務,此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
c. DiscardPolicy:直接丟棄任務
d. DiscardOldestPolicy:如果執行程序尚未關閉,則位于工作隊列頭部的任務將被刪除,然后重新嘗試執行任務(如果再次失敗,則重復此過程)。
可以使用兩個方法向線程池提交任務,分別為execute()和submit()方法。execute()方法用于提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。submit()方法用于提交需要返回值的任務,線程池會返回一個Future類型的對象,通過這個對象可以判斷任務是否執行成功。如Future future = executor.submit(task);
利用線程池提供的參數進行監控,參數如下:
- getTaskCount():線程池需要執行的任務數量。
- getCompletedTaskCount():線程池在運行過程中已完成的任務數量,小于或等于taskCount。
- getLargestPoolSize():線程池曾經創建過的最大線程數量,通過這個數據可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經滿了。
- getPoolSize():線程池的線程數量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減。
- getActiveCount():獲取活動的線程數。
shutdown和shutdownNow
可以調用線程池的shutdown或者shutdownNow方法來關閉線程池。shutdown和shutdownNow的區別:1. 當線程池調用該方法時,線程池的狀態則立刻變成SHUTDOWN狀態。此時,則不能再往線程池中添加任何任務,否則將會拋出RejectedExecutionException異常。但是,此時線程池不會立刻退出,直到添加到線程池中的任務都已經處理完成,才會退出。2. 執行該方法,線程池的狀態立刻變成STOP狀態,并試圖停止所有正在執行的線程,不再處理還在池隊列中等待的任務,當然,它會返回那些未執行的任務。它試圖終止線程的方法是通過調用Thread.interrupt()方法來實現的,但是大家知道,這種方法的作用有限,如果線程中沒有sleep 、wait、Condition、定時鎖等應用, interrupt()方法是無法中斷當前的線程的。所以,shutdownNow()并不代表線程池就一定立即就能退出,它可能必須要等待所有正在執行的任務都執行完成了才能退出。
擴展ThreadPoolExecutor
可以通過繼承線程池來自定義線程池,重寫線程池的beforeExecute, afterExecute和terminated方法。在執行任務的線程中將調用beforeExecute和afterExecute等方法,在這些方法中還可以添加日志、計時、監視或者統計信息收集的功能。無論任務是從run中正常返回,還是拋出一個異常而返回,afterExecute都會被調用。如果任務在完成后帶有一個Error,那么就不會調用afterExecute。如果beforeExecute拋出一個RuntimeException,那么任務將不被執行,并且afterExecute也不會被調用。在線程池完成關閉時調用terminated,也就是在所有任務都已經完成并且所有工作者線程也已經關閉后,terminated可以用來釋放Executor在其生命周期里分配的各種資源,此外還可以執行發送通知、記錄日志或者手機finalize統計等操作。
合理地配置線程池
需要針對具體情況而具體處理,不同的任務類別應采用不同規模的線程池,任務類別可劃分為CPU密集型任務、IO密集型任務和混合型任務。
- 對于CPU密集型任務:線程池中線程個數應盡量少,不應大于CPU核心數;
- 對于IO密集型任務:由于IO操作速度遠低于CPU速度,那么在運行這類任務時,CPU絕大多數時間處于空閑狀態,那么線程池可以配置盡量多些的線程,以提高CPU利用率;
- 對于混合型任務:可以拆分為CPU密集型任務和IO密集型任務,當這兩類任務執行時間相差無幾時,通過拆分再執行的吞吐率高于串行執行的吞吐率,但若這兩類任務執行時間有數據級的差距,那么沒有拆分的意義。
對于計算密集型的任務,在擁有Ncpu個處理器的系統上,當線程池的大小為Ncpu+1時,通常能實現最優的利用率。(即使當計算密集型的線程偶爾由于頁缺失故障或者其他原因而暫停時,這個“額外”的線程也能確保CPU的時鐘周期不會被浪費)對于包含IO操作或者其它組she操作的任務,由于線程并不會一直執行,因此線程池的規模應該更大。要正確地設置線程池的大小,你必須估算出任務的等待時間和計算時間的比值。
Ncpu = number of CPUs
Ucpu = target CPU utilization, 0<=Ucpu<=1
W/C = ratio of wait time to compute time
要使處理器達到期望的使用率,線程池的最優大小等于
Nthreads = Ncpu * Ucpu * (1+W/C)
Amdahl定律
在增加計算資源的情況下,程序在理論上能夠實現最高加速比,這個取決于程序中可并行組件與串行組件所占的比重。假定F是必須被串行執行的部分,那么根據Amdahl定律,在包含N個處理器的機器中,最高加速比為
Speedup<= 1/(F+(1-F)/N)
提交任務
ThreadPoolExecutor中可以使用兩個方法向線程池提交任務,分別是execute()和submit()方法。execute()方法用于提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。submit()方法用于提交需要返回值的任務,線程池會返回一個Future類型的對象,通過這個Future對象可以判斷任務是否執行成功,并且可以通過Future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而get(long timeout, TimeUnit unit)方法則會阻塞當前線程一段時間后立即返回,這個時候又可能任務還沒有執行完。
Future & FutureTask
FutureTask表示的計算是通過Callable來實現的,相當于一種可生產結果的Runnable,并且可以處于以下3種狀態:等待運行、正在運行和運行完成。運行表示計算的所有可能結束方式,包括正常結束、由于取消而結束和由于異常而結束等。當FutureTask進入完成狀態后,它會永遠停止在這個狀態上。Future.get的行為取決于任務的狀態,如果任務已經完成,那么get會立刻返回結果,否則get將阻塞直到任務進入完成狀態,然后返回結果或者異常。FutureTask的使用方式如下:
public class Preloader {//method1private final static FutureTask<Object> future = new FutureTask<Object>(new Callable<Object>(){@Overridepublic Object call() throws Exception{return "yes";}});//method2static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private static final Future<Object> futureExecutor = executor.submit(new Callable<Object>(){@Overridepublic Object call() throws Exception{return "no";}}); public static void main(String[] args) throws InterruptedException, ExecutionException{executor.shutdown();future.run();System.out.println(future.get());System.out.println(futureExecutor.get());} }運行結果:yes no 。Callable表示的任務可以拋出受檢查或未受檢查的異常,并且任何代碼都可能拋出一個Error.無論任務代碼拋出什么異常,都會被封裝到一個ExecutionException中,并在Future.get中被重新拋出。
通過Future來實現取消
ExecutorService.submit將返回一個Future來描述任務。Future擁有一個cancel方法,該方法帶有一個boolean類型的參數mayInterruptIfRunning。如果mayInterruptIfRunning為true并且任務當前正在某個線程運行,那么這個線程能被中斷。如果這個參數為false,那么意味著“若任務還沒啟動,就不要運行它”(取消還沒有開始的任務),這種方式應該用于那些不處理中斷的任務中。當Future.get拋出InterruptedException或TimeoutException時,如果你知道不再需要結果,那么就可以調用Future.cancel來取消任務。
Executors
newFixedThreadPool:創建一個固定長度的線程池,每當提交一個任務時就創建一個線程,直到達到線程池的最大數量,這時線程池的規模將不再變化(如果某個線程由于發生了未預期的Exception而結束,那么線程池會補充一個新的線程)。(LinkedBlockingQueue)
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }newCachedThreadPool:創建一個可換成的線程池,如果線程池的當前規模超過了處理需求時,那么將回收空閑的線程,而當需求增加時,則可以添加新的線程,線程池的規模不存在任何限制。(SynchronousQueue)
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); }SynchronousQueue是一個沒有元素的阻塞隊列,加上corePool = 0 ,maximumPoolSize = Integer.MAX_VALUE,這樣就會存在一個問題,如果主線程提交任務的速度遠遠大于CachedThreadPool的處理速度,則CachedThreadPool會不斷地創建新線程來執行任務,這樣有可能會導致系統耗盡CPU和內存資源,所以在使用該線程池是,一定要注意控制并發的任務數,否則創建大量的線程可能導致嚴重的性能問題。
newSingleThreadExecutor:是一個單線程的Executor,它創建單個工作者線程來執行任務,如果這個線程異常結束,會創建另一個線程來替代。能確保一組任務在隊列中的順序來串行執行。(LinkedBlockingQueue)
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); }newWorkStealingPool:使用ForkJoin實現的線程池。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
newScheduledThreadPool:創建了一個固定長度的線程池,而且以延遲或者定時的方式來執行任務,類似于Timer。
ScheduledThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService在ScheduledThreadPoolExecutor的構造函數中,我們發現它都是利用ThreadLocalExecutor來構造的,唯一變動的地方就在于它所使用的阻塞隊列變成了DelayedWorkQueue,而不是ThreadPoolExecutor的LinkedBlockingQueue(通過Executors產生ThreadPoolExecutor對象)。DelayedWorkQueue為ScheduledThreadPoolExecutor中的內部類,它其實和阻塞隊列DelayQueue有點兒類似。DelayQueue是可以提供延遲的阻塞隊列,它只有在延遲期滿時才能從中提取元素,其列頭是延遲期滿后保存時間最長的Delayed元素。如果延遲都還沒有期滿,則隊列沒有頭部,并且poll 將返回null。DelayedWorkQueue中的任務必然是按照延遲時間從短到長來進行排序的。
ScheduledThreadPoolExecutor提供了如下四個方法,也就是四個調度器:
Timer的使用
JDK中的Timer類主要負責計劃任務的功能,也就是在指定時間開始執行某一任務。Timer類的主要作用就是設置計劃任務,但封裝任務的類卻是TimerTask類(public abstract class TimerTask extends Object implements Runnable)。可以通過new Timer(true)設置為后臺線程。有以下幾個方法:
- void schedule(TimerTask task, Date time):在指定的日期執行某一次任務。如果執行任務的時間早于當前時間則立刻執行。
- void schedule(TimerTask task, Date firstTime, long period):在指定的日期之后,按指定的間隔周期性地無限循環地執行某一任務。如果執行任務的時間早于當前時間則立刻執行。
- void schedule(TimerTask task, long delay):以當前時間為參考時間,在此基礎上延遲指定的毫秒數后執行一次TimerTask任務。
- void schedule(TimerTask task, long delay, long period):以當前時間為參考時間,在此基礎上延遲指定的毫秒數,再以某一間隔無限次數地執行某一任務。
- void scheduleAtFixedRate(TimerTask task, Date firstTime, long period):下次執行任務時間參考上次任務的結束時間,且具有“追趕性”。
TimerTask是以隊列的方式一個一個被順序執行的,所以執行的時間有可能和預期的時間不一致,因為前面的任務有可能消耗的時間較長,則后面的任務運行時間也會被延遲。TimerTask類中的cancel方法的作用是將自身從任務隊列中清除。Timer類中的cancel方法的作用是將任務隊列中的全部任務清空,并且進程被銷毀。
Timer的缺陷:Timer支持基于絕對時間而不是相對時間的調度機制,因此任務的執行對系統時鐘變化很敏感,而ScheduledThreadPoolExecutor只支持相對時間的調度。Timer在執行所有定時任務時只會創建一個線程。如果某個任務的執行時間過長,那么將破壞其他TimerTask的定時精確性。Timer的另一個問題是,如果TimerTask拋出了一個未檢查的異常,那么Timer將表現出糟糕的行為。Timer線程并不波或異常,因此當TimerTask拋出為檢測的異常時將終止定時線程。JDK5或者更高的JDK中已經很少使用Timer.
歡迎跳轉到本文的原文鏈接:https://honeypps.com/java/java-concurrent-note-list-new3/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的多线程小抄集(新编三)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 多线程小抄集(新编二)
- 下一篇: 多线程小抄集(新编四)