关于 java.util.concurrent 您不知道的 5 件事--转
第 1 部分
http://www.ibm.com/developerworks/cn/java/j-5things4.html
Concurrent Collections 是 Java? 5 的巨大附加產品,但是在關于注釋和泛型的爭執中很多 Java 開發人員忽視了它們。此外(或者更老實地說),許多開發人員避免使用這個數據包,因為他們認為它一定很復雜,就像它所要解決的問題一樣。
事實上,java.util.concurrent?包含許多類,能夠有效解決普通的并發問題,無需復雜工序。閱讀本文,了解?java.util.concurrent?類,比如?CopyOnWriteArrayList?和BlockingQueue?如何幫助您解決多線程編程的棘手問題。
1. TimeUnit
盡管本質上?不是 Collections 類,但?java.util.concurrent.TimeUnit?枚舉讓代碼更易讀懂。使用?TimeUnit?將使用您的方法或 API 的開發人員從毫秒的 “暴政” 中解放出來。
TimeUnit?包括所有時間單位,從?MILLISECONDS?和?MICROSECONDS?到?DAYS?和?HOURS,這就意味著它能夠處理一個開發人員所需的幾乎所有的時間范圍類型。同時,因為在列舉上聲明了轉換方法,在時間加快時,將?HOURS?轉換回?MILLISECONDS?甚至變得更容易。
回頁首
2. CopyOnWriteArrayList
創建數組的全新副本是過于昂貴的操作,無論是從時間上,還是從內存開銷上,因此在通常使用中很少考慮;開發人員往往求助于使用同步的ArrayList。然而,這也是一個成本較高的選擇,因為每當您跨集合內容進行迭代時,您就不得不同步所有操作,包括讀和寫,以此保證一致性。
這又讓成本結構回到這樣一個場景:需多讀者都在讀取?ArrayList,但是幾乎沒人會去修改它。
CopyOnWriteArrayList?是個巧妙的小寶貝,能解決這一問題。它的 Javadoc 將?CopyOnWriteArrayList?定義為一個 “ArrayList?的線程安全變體,在這個變體中所有易變操作(添加,設置等)可以通過復制全新的數組來實現”。
集合從內部將它的內容復制到一個沒有修改的新數組,這樣讀者訪問數組內容時就不會產生同步成本(因為他們從來不是在易變數據上操作)。
本質上講,CopyOnWriteArrayList?很適合處理?ArrayList?經常讓我們失敗的這種場景:讀取頻繁,但很少有寫操作的集合,例如 JavaBean 事件的?Listeners。
回頁首
3. BlockingQueue
BlockingQueue?接口表示它是一個?Queue,意思是它的項以先入先出(FIFO)順序存儲。在特定順序插入的項以相同的順序檢索 — 但是需要附加保證,從空隊列檢索一個項的任何嘗試都會阻塞調用線程,直到這個項準備好被檢索。同理,想要將一個項插入到滿隊列的嘗試也會導致阻塞調用線程,直到隊列的存儲空間可用。
BlockingQueue?干凈利落地解決了如何將一個線程收集的項“傳遞”給另一線程用于處理的問題,無需考慮同步問題。Java Tutorial 的 Guarded Blocks 試用版就是一個很好的例子。它構建一個單插槽綁定的緩存,當新的項可用,而且插槽也準備好接受新的項時,使用手動同步和wait()/notifyAll()?在線程之間發信。(詳見?Guarded Blocks 實現。)
盡管 Guarded Blocks 教程中的代碼有效,但是它耗時久,混亂,而且也并非完全直觀。退回到 Java 平臺較早的時候,沒錯,Java 開發人員不得不糾纏于這種代碼;但現在是 2010 年 — 情況難道沒有改善?
清單 1 顯示了 Guarded Blocks 代碼的重寫版,其中我使用了一個?ArrayBlockingQueue,而不是手寫的?Drop。
清單 1. BlockingQueue
import java.util.*; import java.util.concurrent.*;class Producerimplements Runnable {private BlockingQueue<String> drop;List<String> messages = Arrays.asList("Mares eat oats","Does eat oats","Little lambs eat ivy","Wouldn't you eat ivy too?");public Producer(BlockingQueue<String> d) { this.drop = d; }public void run(){try{for (String s : messages)drop.put(s);drop.put("DONE");}catch (InterruptedException intEx){System.out.println("Interrupted! " + "Last one out, turn out the lights!");}} }class Consumerimplements Runnable {private BlockingQueue<String> drop;public Consumer(BlockingQueue<String> d) { this.drop = d; }public void run(){try{String msg = null;while (!((msg = drop.take()).equals("DONE")))System.out.println(msg);}catch (InterruptedException intEx){System.out.println("Interrupted! " + "Last one out, turn out the lights!");}} }public class ABQApp {public static void main(String[] args){BlockingQueue<String> drop = new ArrayBlockingQueue(1, true);(new Thread(new Producer(drop))).start();(new Thread(new Consumer(drop))).start();} }ArrayBlockingQueue?還體現了“公平” — 意思是它為讀取器和編寫器提供線程先入先出訪問。這種替代方法是一個更有效,但又冒窮盡部分線程風險的政策。(即,允許一些讀取器在其他讀取器鎖定時運行效率更高,但是您可能會有讀取器線程的流持續不斷的風險,導致編寫器無法進行工作。)
注意 Bug!
順便說一句,如果您注意到 Guarded Blocks 包含一個重大 bug,那么您是對的 — 如果開發人員在?main()?中的Drop?實例上同步,會出現什么情況呢?
BlockingQueue?還支持接收時間參數的方法,時間參數表明線程在返回信號故障以插入或者檢索有關項之前需要阻塞的時間。這么做會避免非綁定的等待,這對一個生產系統是致命的,因為一個非綁定的等待會很容易導致需要重啟的系統掛起。
回頁首
4. ConcurrentMap
Map?有一個微妙的并發 bug,這個 bug 將許多不知情的 Java 開發人員引入歧途。ConcurrentMap?是最容易的解決方案。
當一個?Map?被從多個線程訪問時,通常使用?containsKey()?或者?get()?來查看給定鍵是否在存儲鍵/值對之前出現。但是即使有一個同步的Map,線程還是可以在這個過程中潛入,然后奪取對?Map?的控制權。問題是,在對?put()?的調用中,鎖在?get()?開始時獲取,然后在可以再次獲取鎖之前釋放。它的結果是個競爭條件:這是兩個線程之間的競爭,結果也會因誰先運行而不同。
如果兩個線程幾乎同時調用一個方法,兩者都會進行測試,調用 put,在處理中丟失第一線程的值。幸運的是,ConcurrentMap?接口支持許多附加方法,它們設計用于在一個鎖下進行兩個任務:putIfAbsent(),例如,首先進行測試,然后僅當鍵沒有存儲在?Map?中時進行 put。
回頁首
5. SynchronousQueues
根據 Javadoc,SynchronousQueue?是個有趣的東西:
這是一個阻塞隊列,其中,每個插入操作必須等待另一個線程的對應移除操作,反之亦然。一個同步隊列不具有任何內部容量,甚至不具有 1 的容量。本質上講,SynchronousQueue?是之前提過的?BlockingQueue?的又一實現。它給我們提供了在線程之間交換單一元素的極輕量級方法,使用ArrayBlockingQueue?使用的阻塞語義。在清單 2 中,我重寫了?清單 1?的代碼,使用?SynchronousQueue?替代?ArrayBlockingQueue:
清單 2. SynchronousQueue
import java.util.*; import java.util.concurrent.*;class Producerimplements Runnable {private BlockingQueue<String> drop;List<String> messages = Arrays.asList("Mares eat oats","Does eat oats","Little lambs eat ivy","Wouldn't you eat ivy too?");public Producer(BlockingQueue<String> d) { this.drop = d; }public void run(){try{for (String s : messages)drop.put(s);drop.put("DONE");}catch (InterruptedException intEx){System.out.println("Interrupted! " + "Last one out, turn out the lights!");}} }class Consumerimplements Runnable {private BlockingQueue<String> drop;public Consumer(BlockingQueue<String> d) { this.drop = d; }public void run(){try{String msg = null;while (!((msg = drop.take()).equals("DONE")))System.out.println(msg);}catch (InterruptedException intEx){System.out.println("Interrupted! " + "Last one out, turn out the lights!");}} }public class SynQApp {public static void main(String[] args){BlockingQueue<String> drop = new SynchronousQueue<String>();(new Thread(new Producer(drop))).start();(new Thread(new Consumer(drop))).start();} }實現代碼看起來幾乎相同,但是應用程序有額外獲益:SynchronousQueue?允許在隊列進行一個插入,只要有一個線程等著使用它。
在實踐中,SynchronousQueue?類似于 Ada 和 CSP 等語言中可用的 “會合通道”。這些通道有時在其他環境中也稱為 “連接”,這樣的環境包括 .NET (見?參考資料)。
回頁首
結束語
當 Java 運行時知識庫提供便利、預置的并發性時,為什么還要苦苦掙扎,試圖將并發性導入到您的 Collections 類?本系列的下一篇文章將會進一步探討?java.util.concurrent?名稱空間的內容。
第 2 部分
http://www.ibm.com/developerworks/cn/java/j-5things5.html
并發 Collections 提供了線程安全、經過良好調優的數據結構,簡化了并發編程。然而,在一些情形下,開發人員需要更進一步,思考如何調節和/或限制線程執行。由于?java.util.concurrent?的總體目標是簡化多線程編程,您可能希望該包包含同步實用程序,而它確實包含。
本文是?第 1 部分?的延續,將介紹幾個比核心語言原語(監視器)更高級的同步結構,但它們還未包含在 Collection 類中。一旦您了解了這些鎖和門的用途,使用它們將非常直觀。
關于本系列
您覺得自己懂 Java 編程?事實是,大多數開發人員都只領會到了 Java 平臺的皮毛,所學也只夠應付工作。在本系列?中,Ted Neward 深度挖掘 Java 平臺的核心功能,揭示一些鮮為人知的事實,幫助您解決最棘手的編程困難。
1. Semaphore
在一些企業系統中,開發人員經常需要限制未處理的特定資源請求(線程/操作)數量,事實上,限制有時候能夠提高系統的吞吐量,因為它們減少了對特定資源的爭用。盡管完全可以手動編寫限制代碼,但使用 Semaphore 類可以更輕松地完成此任務,它將幫您執行限制,如清單 1 所示:
清單 1. 使用 Semaphore 執行限制
import java.util.*;import java.util.concurrent.*;public class SemApp {public static void main(String[] args){Runnable limitedCall = new Runnable() {final Random rand = new Random();final Semaphore available = new Semaphore(3);int count = 0;public void run(){int time = rand.nextInt(15);int num = count++;try{available.acquire();System.out.println("Executing " + "long-running action for " + time + " seconds... #" + num);Thread.sleep(time * 1000);System.out.println("Done with #" + num + "!");available.release();}catch (InterruptedException intEx){intEx.printStackTrace();}}};for (int i=0; i<10; i++)new Thread(limitedCall).start();} }即使本例中的 10 個線程都在運行(您可以對運行?SemApp?的 Java 進程執行?jstack?來驗證),但只有 3 個線程是活躍的。在一個信號計數器釋放之前,其他 7 個線程都處于空閑狀態。(實際上,Semaphore?類支持一次獲取和釋放多個?permit,但這不適用于本場景。)
回頁首
2. CountDownLatch
如果?Semaphore?是允許一次進入一個(這可能會勾起一些流行夜總會的保安的記憶)線程的并發性類,那么?CountDownLatch?就像是賽馬場的起跑門柵。此類持有所有空閑線程,直到滿足特定條件,這時它將會一次釋放所有這些線程。
清單 2. CountDownLatch:讓我們去賽馬吧!
import java.util.*; import java.util.concurrent.*;class Race {private Random rand = new Random();private int distance = rand.nextInt(250);private CountDownLatch start;private CountDownLatch finish;private List<String> horses = new ArrayList<String>();public Race(String... names){this.horses.addAll(Arrays.asList(names));}public void run()throws InterruptedException{System.out.println("And the horses are stepping up to the gate...");final CountDownLatch start = new CountDownLatch(1);final CountDownLatch finish = new CountDownLatch(horses.size());final List<String> places = Collections.synchronizedList(new ArrayList<String>());for (final String h : horses){new Thread(new Runnable() {public void run() {try{System.out.println(h + " stepping up to the gate...");start.await();int traveled = 0;while (traveled < distance){// In a 0-2 second period of time....Thread.sleep(rand.nextInt(3) * 1000);// ... a horse travels 0-14 lengthstraveled += rand.nextInt(15);System.out.println(h + " advanced to " + traveled + "!");}finish.countDown();System.out.println(h + " crossed the finish!");places.add(h);}catch (InterruptedException intEx){System.out.println("ABORTING RACE!!!");intEx.printStackTrace();}}}).start();}System.out.println("And... they're off!");start.countDown(); finish.await();System.out.println("And we have our winners!");System.out.println(places.get(0) + " took the gold...");System.out.println(places.get(1) + " got the silver...");System.out.println("and " + places.get(2) + " took home the bronze.");} }public class CDLApp {public static void main(String[] args)throws InterruptedException, java.io.IOException{System.out.println("Prepping...");Race r = new Race("Beverly Takes a Bath","RockerHorse","Phineas","Ferb","Tin Cup","I'm Faster Than a Monkey","Glue Factory Reject");System.out.println("It's a race of " + r.getDistance() + " lengths");System.out.println("Press Enter to run the race....");System.in.read();r.run();} }注意,在?清單 2?中,CountDownLatch?有兩個用途:首先,它同時釋放所有線程,模擬馬賽的起點,但隨后會設置一個門閂模擬馬賽的終點。這樣,“主” 線程就可以輸出結果。 為了讓馬賽有更多的輸出注釋,可以在賽場的 “轉彎處” 和 “半程” 點,比如賽馬跨過跑道的四分之一、二分之一和四分之三線時,添加?CountDownLatch。
回頁首
3. Executor
清單 1?和?清單 2?中的示例都存在一個重要的缺陷,它們要求您直接創建?Thread?對象。這可以解決一些問題,因為在一些 JVM 中,創建Thread?是一項重量型的操作,重用現有?Thread?比創建新線程要容易得多。而在另一些 JVM 中,情況正好相反:Thread?是輕量型的,可以在需要時很容易地新建一個線程。當然,如果 Murphy 擁有自己的解決辦法(他通常都會擁有),那么您無論使用哪種方法對于您最終將部署的平臺都是不對的。
JSR-166 專家組(參見?參考資料)在一定程度上預測到了這一情形。Java 開發人員無需直接創建?Thread,他們引入了?Executor?接口,這是對創建新線程的一種抽象。如清單 3 所示,Executor?使您不必親自對?Thread?對象執行?new?就能夠創建新線程:
清單 3. Executor
Executor exec = getAnExecutorFromSomeplace(); exec.execute(new Runnable() { ... });使用?Executor?的主要缺陷與我們在所有工廠中遇到的一樣:工廠必須來自某個位置。不幸的是,與 CLR 不同,JVM 沒有附帶一個標準的 VM 級線程池。
Executor?類實際上?充當著一個提供?Executor?實現實例的共同位置,但它只有?new?方法(例如用于創建新線程池);它沒有預先創建實例。所以您可以自行決定是否希望在代碼中創建和使用?Executor?實例。(或者在某些情況下,您將能夠使用所選的容器/平臺提供的實例。)
ExecutorService 隨時可以使用
盡管不必擔心?Thread?來自何處,但?Executor?接口缺乏 Java 開發人員可能期望的某種功能,比如結束一個用于生成結果的線程并以非阻塞方式等待結果可用。(這是桌面應用程序的一個常見需求,用戶將執行需要訪問數據庫的 UI 操作,然后如果該操作花費了很長時間,可能希望在它完成之前取消它。)
對于此問題,JSR-166 專家創建了一個更加有用的抽象(ExecutorService?接口),它將線程啟動工廠建模為一個可集中控制的服務。例如,無需每執行一項任務就調用一次?execute(),ExecutorService?可以接受一組任務并返回一個表示每項任務的未來結果的未來列表。
回頁首
4. ScheduledExecutorServices
盡管?ExecutorService?接口非常有用,但某些任務仍需要以計劃方式執行,比如以確定的時間間隔或在特定時間執行給定的任務。這就是ScheduledExecutorService?的應用范圍,它擴展了?ExecutorService。
如果您的目標是創建一個每隔 5 秒跳一次的 “心跳” 命令,使用?ScheduledExecutorService?可以輕松實現,如清單 4 所示:
清單 4. ScheduledExecutorService 模擬心跳
import java.util.concurrent.*;public class Ping {public static void main(String[] args){ScheduledExecutorService ses =Executors.newScheduledThreadPool(1);Runnable pinger = new Runnable() {public void run() {System.out.println("PING!");}};ses.scheduleAtFixedRate(pinger, 5, 5, TimeUnit.SECONDS);} }這項功能怎么樣?不用過于擔心線程,不用過于擔心用戶希望取消心跳時會發生什么,也不用明確地將線程標記為前臺或后臺;只需將所有的計劃細節留給?ScheduledExecutorService。
順便說一下,如果用戶希望取消心跳,scheduleAtFixedRate?調用將返回一個?ScheduledFuture?實例,它不僅封裝了結果(如果有),還擁有一個?cancel?方法來關閉計劃的操作。
回頁首
5. Timeout 方法
為阻塞操作設置一個具體的超時值(以避免死鎖)的能力是?java.util.concurrent?庫相比起早期并發特性的一大進步,比如監控鎖定。
這些方法幾乎總是包含一個?int/TimeUnit?對,指示這些方法應該等待多長時間才釋放控制權并將其返回給程序。它需要開發人員執行更多工作 — 如果沒有獲取鎖,您將如何重新獲取? — 但結果幾乎總是正確的:更少的死鎖和更加適合生產的代碼。(關于編寫生產就緒代碼的更多信息,請參見?參考資料?中 Michael Nygard 編寫的?Release It!。)
回頁首
結束語
java.util.concurrent?包還包含了其他許多好用的實用程序,它們很好地擴展到了 Collections 之外,尤其是在?.locks?和?.atomic?包中。深入研究,您還將發現一些有用的控制結構,比如?CyclicBarrier?等。
與 Java 平臺的許多其他方面一樣,您無需費勁地查找可能非常有用的基礎架構代碼。在編寫多線程代碼時,請記住本文討論的實用程序和?上一篇文章?中討論的實用程序。
轉載于:https://www.cnblogs.com/davidwang456/p/3865117.html
總結
以上是生活随笔為你收集整理的关于 java.util.concurrent 您不知道的 5 件事--转的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于 Java Collections
- 下一篇: 基于 Quartz 开发企业级任务调度应