Java沒有提供任何機制來安全地終止線程,雖然Thread.stop和suspend等方法提供了這樣的機制,但是存在嚴重的缺陷,應該避免使用這些方法。但是Java提供了中斷Interruption機制,這是一種協作機制,能夠使一個線程終止另一個線程的當前工作。
這種協作方式是必要的,我們很少希望某個任務線程或者服務立即停止,因為這種立即停止會時某個共享的數據結構處于不一致的狀態。相反,在編寫任務和服務的時候可以使用一種協作方式:當需要停止的時候,它們會先清除當前正在執行的工作,然后再結束。
?
7.1 ?任務取消
如果外部代碼能夠在某個操作正常完成之前將其置于 完成 狀態,那么這個操作就可以稱為可取消的Cancellable
其中一種協作機制是設置一個取消標志Cancellation Requested標志,而任務定期查看該標志。
Java代碼??
@ThreadSafe??public?class?PrimeGenerator?implements?Runnable?{??????private?static?ExecutorService?exec?=?Executors.newCachedThreadPool();????????@GuardedBy("this")??????private?final?List<BigInteger>?primes?=?new?ArrayList<BigInteger>();??????private?volatile?boolean?cancelled;????????public?void?run()?{??????????BigInteger?p?=?BigInteger.ONE;??????????while?(!cancelled)?{??????????????p?=?p.nextProbablePrime();??????????????synchronized?(this)?{??????????????????primes.add(p);??????????????}??????????}??????}????????public?void?cancel()?{??????????cancelled?=?true;??????}????????public?synchronized?List<BigInteger>?get()?{??????????return?new?ArrayList<BigInteger>(primes);??????}????????static?List<BigInteger>?aSecondOfPrimes()?throws?InterruptedException?{??????????PrimeGenerator?generator?=?new?PrimeGenerator();??????????exec.execute(generator);??????????try?{??????????????SECONDS.sleep(1);??????????}?finally?{??????????????generator.cancel();??????????}??????????return?generator.get();??????}??}?? ?在Java的API或語言規范中,并沒有將中斷與任何語義關聯起來,但實際上,如果在取消之外的其他操作中使用中斷,那么都是不合適的,并且很難支撐起更大的應用。
?
每個線程都有一個boolean類型的中斷狀態。但中斷線程時,這個線程的中斷狀態將被設置成true。
?
Thread中的三個方法:
public void interrupt() ? 中斷一個線程
public boolean isInterrupted() ?獲取中斷標志,判斷是否中斷
public static boolean interrupted() ?清楚中斷狀態,并返回它之前的狀態值
?
線程在阻塞狀態下發生中斷的時候會拋出InterruptedException,例如Thread.sleep(), Thread.wait(), Thread.join()等方法。
當線程在非阻塞狀態下中斷的時候,它的中斷狀態將被設置,然后根據檢查中斷狀態來判斷是否中斷。
?
調用interrupt并不意味著立即停止目標線程正在進行的工作,而只是傳遞了請求中斷的消息,換句話說,僅僅修改了線程的isInterrupted標志字段。
?
通常,中斷時實現取消的最合理方式。
Java代碼??
public?class?PrimeProducer?extends?Thread?{??????private?final?BlockingQueue<BigInteger>?queue;????????PrimeProducer(BlockingQueue<BigInteger>?queue)?{??????????this.queue?=?queue;??????}????????public?void?run()?{??????????try?{??????????????BigInteger?p?=?BigInteger.ONE;??????????????while?(!Thread.currentThread().isInterrupted())??????????????????queue.put(p?=?p.nextProbablePrime());??????????}?catch?(InterruptedException?consumed)?{??????????????????????}??????}????????public?void?cancel()?{??????????interrupt();??????}??}?? ?
7.1.3 ?響應中斷
只有實現了線程中斷策略的代碼才可以屏蔽中斷請求,在常規任務和庫代碼中都不應該屏蔽中斷請求。
兩種方法響應中斷:
* 傳遞異常InterruptedException * 恢復中斷狀態,從而使調用棧中上層代碼能夠對其進行處理 不可取消的任務在退出前恢復中斷標志 Java代碼??
public?class?NoncancelableTask?{??????public?Task?getNextTask(BlockingQueue<Task>?queue)?{??????????boolean?interrupted?=?false;??????????try?{??????????????while?(true)?{??????????????????try?{??????????????????????return?queue.take();??????????????????}?catch?(InterruptedException?e)?{??????????????????????interrupted?=?true;??????????????????????????????????????}??????????????}??????????}?finally?{??????????????if?(interrupted)??????????????????Thread.currentThread().interrupt();??????????}??????}????????interface?Task?{??????}??}?? ? 7.1.5 ?定時任務,通過Future來實現取消: 除非你清楚線程的中斷策略,否則不要中斷線程,那么在神馬情況下調用cancel可以將參數指定為true呢。 如果任務的線程是由標準的Executor創建的,那么可以設置mayInterruptIfRunning。 Java代碼??
public?class?TimedRun?{??????private?static?final?ExecutorService?taskExec?=?Executors.newCachedThreadPool();????????public?static?void?timedRun(Runnable?r,?long?timeout,?TimeUnit?unit)??????????????throws?InterruptedException?{??????????Future<?>?task?=?taskExec.submit(r);??????????try?{??????????????task.get(timeout,?unit);??????????}?catch?(TimeoutException?e)?{??????????????????????}?catch?(ExecutionException?e)?{??????????????????????????throw?launderThrowable(e.getCause());??????????}?finally?{??????????????????????????task.cancel(true);?????????}??????}??}?? ? 7.1.6 ?處理不可中斷的阻塞 對于這些線程,中斷請求只能設置線程的中斷狀態,除此之外沒有其他任何作用。 我們可以使用類似中斷的手段來停止這些線程,但這要求知道線程阻塞的原因。 通過newTaskFor將非標準的取消操作封裝在一個任務中: Java代碼??
public?abstract?class?SocketUsingTask<T>?implements?CancellableTask<T>?{??????@GuardedBy("this")??????private?Socket?socket;????????protected?synchronized?void?setSocket(Socket?s)?{??????????socket?=?s;??????}????????public?synchronized?void?cancel()?{??????????try?{??????????????if?(socket?!=?null)??????????????????socket.close();??????????}?catch?(IOException?ignored)?{??????????}??????}????????public?RunnableFuture<T>?newTask()?{??????????return?new?FutureTask<T>(this)?{??????????????public?boolean?cancel(boolean?mayInterruptIfRunning)?{??????????????????try?{??????????????????????SocketUsingTask.this.cancel();??????????????????}?finally?{??????????????????????return?super.cancel(mayInterruptIfRunning);??????????????????}??????????????}??????????};??????}??}??????interface?CancellableTask<T>?extends?Callable<T>?{??????void?cancel();????????RunnableFuture<T>?newTask();??}?? ?
?7.2 ?停止基于線程的服務
應用程序通常會創建擁有多個線程的服務,如果應用程序準備退出,那么這些服務所擁有的線程也需要正確的結束,由于java沒有搶占式方法停止線程,因此需要它們自行結束。
正確的封裝原則是:除非擁有某個線程,否則不要對該線程進行操控,例如中斷線程或者修改優先級等。
?
線程有個相應的所有者,即創建該線程的類,因此線程池是其工作者線程的所有者,如果要中斷線程,那么應該使用線程池去中斷。
線程的所有權是不可傳遞的。服務應該提供生命周期方法Lifecycle Method來關閉它自己以及它所擁有的線程。這樣當應用程序關閉該服務的時候,服務就可以關閉所有的線程了。在ExecutorService中提供了shutdown和shutdownNow等方法,同樣,在其他擁有線程的服務方法中也應該提供類似的關閉機制。
Tips:對于持有線程的服務,只要服務的存在時間大于創建線程的方法的存在時間,那么就應該提供生命周期方法。
?
7.2.1 ?示例:日志服務
我們通常會在應用程序中加入log信息,一般用的框架就是log4j。但是這種內聯的日志功能會給一些高容量Highvolume應用程序帶來一定的性能開銷。另外一種替代方法是通過調用log方法將日志消息放入某個隊列中,并由其他線程來處理。
Java代碼??
public?class?LogService?{??????private?final?BlockingQueue<String>?queue;??????private?final?LoggerThread?loggerThread;??????private?final?PrintWriter?writer;??????@GuardedBy("this")??????private?boolean?isShutdown;??????@GuardedBy("this")??????private?int?reservations;????????public?LogService(Writer?writer)?{??????????this.queue?=?new?LinkedBlockingQueue<String>();??????????this.loggerThread?=?new?LoggerThread();??????????this.writer?=?new?PrintWriter(writer);??????}????????public?void?start()?{??????????loggerThread.start();??????}????????public?void?stop()?{??????????synchronized?(this)?{??????????????isShutdown?=?true;??????????}??????????loggerThread.interrupt();??????}????????public?void?log(String?msg)?throws?InterruptedException?{??????????synchronized?(this)?{??????????????if?(isShutdown)??????????????????throw?new?IllegalStateException(????????????++reservations;??????????}??????????queue.put(msg);??????}????????private?class?LoggerThread?extends?Thread?{??????????public?void?run()?{??????????????try?{??????????????????while?(true)?{??????????????????????try?{??????????????????????????synchronized?(LogService.this)?{??????????????????????????????if?(isShutdown?&&?reservations?==?0)??????????????????????????????????break;??????????????????????????}??????????????????????????String?msg?=?queue.take();??????????????????????????synchronized?(LogService.this)?{??????????????????????????????--reservations;??????????????????????????}??????????????????????????writer.println(msg);??????????????????????}?catch?(InterruptedException?e)?{?????????????????????}??????????????????}??????????????}?finally?{??????????????????writer.close();??????????????}??????????}??????}??}?? ?
7.2.2 ?通過ExecutorService去關閉
簡單的程序可以直接在main函數中啟動和關閉全局的ExecutorService,而在復雜程序中,通常會將ExecutorService封裝在某個更高級別的服務中,并且該服務提供自己的生命周期方法。下面我們利用ExecutorService重構上面的日志服務:
Java代碼??
public?class?LogService?{??????public?void?stop()?throws?InterruptedException?{??????????try?{??????????????exec.shutdown();?exec.awaitTermination(TIMEOUT,?UNIT);??????????}??????}??}?? ?
7.2.3 ?利用Poison Pill對象關閉Producer-Consumer服務
?
7.2.5 ?當關閉線程池的時候,保存尚未開始的和開始后取消的任務數據,以備后面重新處理,下面是一個網頁爬蟲程序,關閉爬蟲服務的時候將記錄所有尚未開始的和已經取消的所有頁面URL:
Java代碼??
public?abstract?class?WebCrawler?{??????private?volatile?TrackingExecutor?exec;??????@GuardedBy("this")??????private?final?Set<URL>?urlsToCrawl?=?new?HashSet<URL>();????????private?final?ConcurrentMap<URL,?Boolean>?seen?=?new?ConcurrentHashMap<URL,?Boolean>();??????private?static?final?long?TIMEOUT?=?500;??????private?static?final?TimeUnit?UNIT?=?MILLISECONDS;????????public?WebCrawler(URL?startUrl)?{??????????urlsToCrawl.add(startUrl);??????}????????public?synchronized?void?start()?{??????????exec?=?new?TrackingExecutor(Executors.newCachedThreadPool());??????????for?(URL?url?:?urlsToCrawl)?submitCrawlTask(url);??????????urlsToCrawl.clear();??????}????????public?synchronized?void?stop()?throws?InterruptedException?{??????????try?{??????????????saveUncrawled(exec.shutdownNow());??????????????if?(exec.awaitTermination(TIMEOUT,?UNIT))??????????????????saveUncrawled(exec.getCancelledTasks());??????????}?finally?{??????????????exec?=?null;??????????}??????}????????protected?abstract?List<URL>?processPage(URL?url);????????private?void?saveUncrawled(List<Runnable>?uncrawled)?{??????????for?(Runnable?task?:?uncrawled)??????????????urlsToCrawl.add(((CrawlTask)?task).getPage());??????}????????private?void?submitCrawlTask(URL?u)?{??????????exec.execute(new?CrawlTask(u));??????}????????private?class?CrawlTask?implements?Runnable?{??????????private?final?URL?url;????????????CrawlTask(URL?url)?{??????????????this.url?=?url;??????????}????????????private?int?count?=?1;????????????boolean?alreadyCrawled()?{??????????????return?seen.putIfAbsent(url,?true)?!=?null;??????????}????????????void?markUncrawled()?{??????????????seen.remove(url);??????????????System.out.printf("marking?%s?uncrawled%n",?url);??????????}????????????public?void?run()?{??????????????for?(URL?link?:?processPage(url))?{??????????????????if?(Thread.currentThread().isInterrupted())??????????????????????return;??????????????????submitCrawlTask(link);??????????????}??????????}????????????public?URL?getPage()?{??????????????return?url;??????????}??????}??}?? ?
Java代碼??
public?class?TrackingExecutor?extends?AbstractExecutorService?{??????private?final?ExecutorService?exec;??????private?final?Set<Runnable>?tasksCancelledAtShutdown?=??????????????Collections.synchronizedSet(new?HashSet<Runnable>());????????public?TrackingExecutor(ExecutorService?exec)?{??????????this.exec?=?exec;??????}????????public?void?shutdown()?{??????????exec.shutdown();??????}????????public?List<Runnable>?shutdownNow()?{??????????return?exec.shutdownNow();??????}????????public?boolean?isShutdown()?{??????????return?exec.isShutdown();??????}????????public?boolean?isTerminated()?{??????????return?exec.isTerminated();??????}????????public?boolean?awaitTermination(long?timeout,?TimeUnit?unit)??????????????throws?InterruptedException?{??????????return?exec.awaitTermination(timeout,?unit);??????}????????public?List<Runnable>?getCancelledTasks()?{??????????if?(!exec.isTerminated())??????????????throw?new?IllegalStateException(????????return?new?ArrayList<Runnable>(tasksCancelledAtShutdown);??????}????????public?void?execute(final?Runnable?runnable)?{??????????exec.execute(new?Runnable()?{??????????????public?void?run()?{??????????????????try?{??????????????????????runnable.run();??????????????????}?finally?{??????????????????????if?(isShutdown()??????????????????????????????&&?Thread.currentThread().isInterrupted())??????????????????????????tasksCancelledAtShutdown.add(runnable);??????????????????}??????????????}??????????});??????}??}?? ?
7.3 ?處理非正常的線程終止
通過給應用程序提供一個UncaughtExceptionHandler異常處理器來處理未捕獲的異常:
Java代碼??
public?class?UEHLogger?implements?Thread.UncaughtExceptionHandler?{??????public?void?uncaughtException(Thread?t,?Throwable?e)?{??????????Logger?logger?=?Logger.getAnonymousLogger();??????????logger.log(Level.SEVERE,?"Thread?terminated?with?exception:?"?+?t.getName(),?e);??????}??}?? ?只有通過execute提交的任務,才能將它拋出的異常交給未捕獲異常處理器。而通過submit提交的任務,無論是拋出未檢查異常還是已檢查異常,都將被認為是任務返回狀態的一部分
?
7.4 ?JVM關閉的時候提供關閉鉤子
在正常關閉JVM的時候,JVM首先調用所有已注冊的關閉鉤子Shutdown Hook。關閉鉤子可以用來實現服務或者應用程序的清理工作,例如刪除臨時文件,或者清除無法由操作系統自動清除的資源。
最佳實踐是對所有服務都使用同一個關閉鉤子,并且在該關閉鉤子中執行一系列的關閉操作。這確保了關閉操作在單個線程中串行執行,從而避免競態條件的發生或者死鎖問題。
Java代碼??
Runtime.getRuntime().addShutdownHook(new?Thread()?{??????public?void?run()?{??????????try{LogService.this.stop();}?catch(InterruptedException)?{..}??????}??})?? ?
總結:在任務、線程、服務以及應用程序等模塊中的生命周期結束問題,可能會增加它們在設計和實現的時候的復雜性。我們通過利用FutureTask和Executor框架,可以幫助我們構建可取消的任務和服務。
?
原文地址:http://yidao620c.iteye.com/blog/1856914
轉載于:https://www.cnblogs.com/davidwang456/p/4086389.html
總結
以上是生活随笔為你收集整理的java 并发编程第七章:取消和关闭的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。