Java高并发编程:同步工具类
內容摘要
這里主要介紹了java5中線程鎖技術以外的其他同步工具,首先介紹Semaphore:一個計數信號量。用于控制同時訪問資源的線程個數,CyclicBarrier同步輔助類:從字面意思看是路障,這里用于線程之間的相互等待,到達某點后,繼續向下執行。CountDownLatch同步輔助類:在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。猶如倒計時計數器,然后是Exchanger:實現兩個對象之間數據交換,可阻塞隊列:ArrayBlockingQueue,通過阻塞隊列間的通信來演示其作用,最后介紹了幾個同步集合。
1. Semaphore實現信號燈
Semaphore可以維護當前訪問自身的線程個數,并提供了同步機制,使用Semaphore可以控制同時訪問資源的線程個數,例如,實現一個文件允許的并發訪問數。Semaphore 只對可用許可的號碼進行計數,并采取相應的行動。
Semaphore實現的功能就像:銀行辦理業務,一共有5個窗口,但一共有10個客戶,一次性最多有5個客戶可以進行辦理,其他的人必須等候,當5個客戶中的任何一個離開后,在等待的客戶中有一個人可以進行業務辦理。
Semaphore提供了兩種規則:
- 一種是公平的:獲得資源的先后,按照排隊的先后。在構造函數中設置true實現
- 一種是野蠻的:誰有本事搶到資源,誰就可以獲得資源的使用權。
與傳統的互斥鎖的異同:
單個信號量的Semaphore對象可以實現互斥鎖的功能,并且可以是由一個線程獲得了“鎖“,再由另外一個線程釋放”鎖“,這可以應用于死鎖恢復的一些場合。
應用場景:共享資源的爭奪,例如游戲中選手進入房間的情況。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest { public static void main(String[] args) { //創建一個可根據需要創建新線程的線程池 ExecutorService service = Executors.newCachedThreadPool(); final Semaphore sp = new Semaphore(3); //創建10個線程 for(int i=0;i<10;i++){ Runnable runnable = new Runnable(){ public void run(){ try { sp.acquire(); //獲取燈,即許可權 } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("線程" + Thread.currentThread().getName() + "進入,當前已有" + (3-sp.availablePermits()) + "個并發"); try { Thread.sleep((long)(Math.random()*10000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線程" + Thread.currentThread().getName() + "即將離開"); sp.release(); // 釋放一個許可,將其返回給信號量 //下面代碼有時候執行不準確,因為其沒有和上面的代碼合成原子單元 System.out.println("線程" + Thread.currentThread().getName() + "已離開,當前已有" + (3-sp.availablePermits()) + "個并發"); } }; service.execute(runnable); } } }輸出結果
線程pool-1-thread-3進入,當前已有3個并發 線程pool-1-thread-2進入,當前已有3個并發 線程pool-1-thread-1進入,當前已有3個并發 線程pool-1-thread-2即將離開 線程pool-1-thread-2已離開,當前已有2個并發 線程pool-1-thread-5進入,當前已有3個并發 線程pool-1-thread-1即將離開 線程pool-1-thread-1已離開,當前已有2個并發 線程pool-1-thread-4進入,當前已有3個并發 線程pool-1-thread-4即將離開 線程pool-1-thread-4已離開,當前已有2個并發 線程pool-1-thread-8進入,當前已有3個并發 線程pool-1-thread-3即將離開 線程pool-1-thread-7進入,當前已有3個并發 線程pool-1-thread-3已離開,當前已有3個并發 線程pool-1-thread-8即將離開 線程pool-1-thread-8已離開,當前已有2個并發 線程pool-1-thread-9進入,當前已有3個并發 線程pool-1-thread-7即將離開 線程pool-1-thread-7已離開,當前已有2個并發 線程pool-1-thread-6進入,當前已有3個并發 線程pool-1-thread-9即將離開 線程pool-1-thread-9已離開,當前已有2個并發 線程pool-1-thread-10進入,當前已有3個并發 線程pool-1-thread-5即將離開 線程pool-1-thread-5已離開,當前已有2個并發 線程pool-1-thread-6即將離開 線程pool-1-thread-6已離開,當前已有1個并發 線程pool-1-thread-10即將離開 線程pool-1-thread-10已離開,當前已有0個并發控制一個方法的并發量,比如同時只能有3個線程進來
public class ThreadPoolTest {//信號量private static Semaphore semaphore = new Semaphore(3);//允許個數,相當于放了3把鎖public static void main(String[] args) {for(int i=0;i<10;i++){new Thread(new Runnable() {@Overridepublic void run() {try {method();} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}//同時最多只允許3個線程過來public static void method() throws InterruptedException{semaphore.acquire();//獲取一把鎖System.out.println("ThreadName="+Thread.currentThread().getName()+"過來了");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("ThreadName="+Thread.currentThread().getName()+"出去了");semaphore.release();//釋放一把鎖} }輸出結果
ThreadName=Thread-1過來了 ThreadName=Thread-4過來了 ThreadName=Thread-0過來了 ThreadName=Thread-1出去了 ThreadName=Thread-4出去了 ThreadName=Thread-2過來了 ThreadName=Thread-3過來了 ThreadName=Thread-0出去了 ThreadName=Thread-5過來了 ThreadName=Thread-3出去了 ThreadName=Thread-2出去了 ThreadName=Thread-6過來了 ThreadName=Thread-7過來了 ThreadName=Thread-5出去了 ThreadName=Thread-9過來了 ThreadName=Thread-7出去了 ThreadName=Thread-6出去了 ThreadName=Thread-8過來了 ThreadName=Thread-9出去了 ThreadName=Thread-8出去了三個線程a、b、c 并發運行,b,c 需要a 線程的數據怎么實現
根據問題的描述,我將問題用以下代碼演示,ThreadA、ThreadB、ThreadC,ThreadA 用于初始化數據num,
只有當num 初始化完成之后再讓ThreadB 和ThreadC 獲取到初始化后的變量num。
分析過程如下:
考慮到多線程的不確定性,因此我們不能確保ThreadA 就一定先于ThreadB 和ThreadC 前執行,就算ThreadA先執行了,我們也無法保證ThreadA 什么時候才能將變量num 給初始化完成。因此我們必須讓ThreadB 和ThreadC去等待ThreadA 完成任何后發出的消息。
現在需要解決兩個難題,一是讓ThreadB 和ThreadC 等待ThreadA 先執行完,二是ThreadA 執行完之后給
ThreadB 和ThreadC 發送消息。
解決上面的難題我能想到的兩種方案,一是使用純Java API 的Semaphore 類來控制線程的等待和釋放,二是使用Android 提供的Handler 消息機制
public class ThreadCommunication {private static int num;//定義一個變量作為數據public static void main(String[] args) {Thread threadA = new Thread(new Runnable() {@Overridepublic void run() {try {//模擬耗時操作之后初始化變量numThread.sleep(1000);num = 1;} catch (InterruptedException e) {e.printStackTrace();}}});Thread threadB = new Thread(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"獲取到num 的值為:"+num);}});Thread threadC = new Thread(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"獲取到num 的值為:"+num);}});//同時開啟3 個線程threadA.start();threadB.start();threadC.start();} } public class ThreadCommunication {private static int num;/*** 定義一個信號量,該類內部維持了多個線程鎖,可以阻塞多個線程,釋放多個線程,* 線程的阻塞和釋放是通過permit 概念來實現的線程通過semaphore.acquire()方法獲取permit,* 如果當前semaphore 有permit 則分配給該線程,如果沒有則阻塞該線程直到semaphore* 調用release()方法釋放permit。構造函數中參數:permit(允許) 個數*/private static Semaphore semaphore = new Semaphore(0);public static void main(String[] args) {Thread threadA = new Thread(new Runnable() {@Overridepublic void run() {try {//模擬耗時操作之后初始化變量numThread.sleep(1000);num = 1;//初始化完參數后釋放兩個permitsemaphore.release(2);} catch (InterruptedException e) {e.printStackTrace();}}});Thread threadB = new Thread(new Runnable() {@Overridepublic void run() {try {//獲取permit,如果semaphore 沒有可用的permit 則等待// 如果有則消耗一個semaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"獲取到num 的值為:"+num);}});Thread threadC = new Thread(new Runnable() {@Overridepublic void run() {try {//獲取permit,如果semaphore 沒有可用的permit 則等待// 如果有則消耗一個semaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"獲取到num 的值為:"+num);}});//同時開啟3 個線程threadA.start();threadB.start();threadC.start();} }2. CyclicBarrier
一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環 的 barrier。
CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續所有參與線程之前更新共享狀態,此屏障操作 很有用。
3個線程到達某個集合點后再向下執行,使用await方法實現
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3); for(int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點1,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候")); cb.await();//在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。 Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點2,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候")); cb.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點3,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候")); cb.await(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); } }輸出結果
線程pool-1-thread-3即將到達集合地點1,當前已有1個已經到達,正在等候 線程pool-1-thread-1即將到達集合地點1,當前已有2個已經到達,正在等候 線程pool-1-thread-2即將到達集合地點1,當前已有3個已經到達,都到齊了,繼續走啊 線程pool-1-thread-1即將到達集合地點2,當前已有1個已經到達,正在等候 線程pool-1-thread-2即將到達集合地點2,當前已有2個已經到達,正在等候 線程pool-1-thread-3即將到達集合地點2,當前已有3個已經到達,都到齊了,繼續走啊 線程pool-1-thread-3即將到達集合地點3,當前已有1個已經到達,正在等候 線程pool-1-thread-1即將到達集合地點3,當前已有2個已經到達,正在等候 線程pool-1-thread-2即將到達集合地點3,當前已有3個已經到達,都到齊了,繼續走啊3. CountDownLatch
一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。猶如倒計時計數器,調用CountDownLatch對象的countDown方法就將計數器減1,當計數到達0時,則所有等待者或單個等待者開始執行。
可以實現一個人(也可以是多個人)等待其他所有人都來通知他,也可以實現一個人通知多個人的效果,類似裁判一聲口令,運動員開始奔跑(一對多),或者所有運送員都跑到終點后裁判才可以公布結果(多對一)。
用指定的計數 初始化 CountDownLatch。在調用 countDown() 方法之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
實現運動員比賽的效果
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountdownLatchTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); //構造一個用給定計數初始化的 CountDownLatch,相當于裁判的口哨 final CountDownLatch cdOrder = new CountDownLatch(1); //相當于定義3個運行員 final CountDownLatch cdAnswer = new CountDownLatch(3); for (int i = 0; i < 3; i++) { Runnable runnable = new Runnable() { public void run() { try { System.out.println("線程" + Thread.currentThread().getName() + "正準備接受命令"); // 等待發令槍 cdOrder.await();//使當前線程在鎖存器倒計數至零之前一直等待 System.out.println("線程" + Thread.currentThread().getName() + "已接受命令"); Thread.sleep((long) (Math.random() * 10000)); System.out .println("線程" + Thread.currentThread().getName() + "回應命令處理結果"); // 各個運動員完報告成績之后,通知裁判 cdAnswer.countDown();//遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程 } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("線程" + Thread.currentThread().getName() + "即將發布命令"); // 發令槍打響,比賽開始 cdOrder.countDown(); System.out.println("線程" + Thread.currentThread().getName() + "已發送命令,正在等待結果"); // 裁判等待各個運動員的結果 cdAnswer.await(); // 裁判公布獲得所有運動員的成績 System.out.println("線程" + Thread.currentThread().getName() + "已收到所有響應結果"); } catch (Exception e) { e.printStackTrace(); } service.shutdown(); } }輸出結果
線程pool-1-thread-2正準備接受命令 線程pool-1-thread-3正準備接受命令 線程pool-1-thread-1正準備接受命令 線程main即將發布命令 線程main已發送命令,正在等待結果 線程pool-1-thread-1已接受命令 線程pool-1-thread-2已接受命令 線程pool-1-thread-3已接受命令 線程pool-1-thread-1回應命令處理結果 線程pool-1-thread-3回應命令處理結果 線程pool-1-thread-2回應命令處理結果 線程main已收到所有響應結果4. Exchanger
用于實現兩個對象之間的數據交換,每個對象在完成一定的事務后想與對方交換數據,第一個先拿出數據的對象將一直等待第二個對象拿著數據到來時,彼此才能交換數據。
方法:exchange(V x)
等待另一個線程到達此交換點(除非當前線程被中斷),然后將給定的對象傳送給該線程,并接收該線程的對象。
應用:使用 Exchanger 在線程間交換緩沖區
示例:模擬毒品交易情景
import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExchangerTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); service.execute(new Runnable(){ public void run() { try { String data1 = "毒品"; System.out.println("線程" + Thread.currentThread().getName() + "正在把: " + data1 +" 交易出去"); Thread.sleep((long)(Math.random()*10000)); String data2 = (String)exchanger.exchange(data1); System.out.println("線程" + Thread.currentThread().getName() + "換得了: " + data2); }catch(Exception e){ } } }); service.execute(new Runnable(){ public void run() { try { String data1 = "美金"; System.out.println("線程" + Thread.currentThread().getName() + "正在把: " + data1 +" 交易出去"); Thread.sleep((long)(Math.random()*10000)); String data2 = (String)exchanger.exchange(data1); System.out.println("線程" + Thread.currentThread().getName() + "換得了: " + data2); }catch(Exception e){ } } }); } } 線程pool-1-thread-1正在把: 毒品 交易出去 線程pool-1-thread-2正在把: 美金 交易出去 線程pool-1-thread-1換得了: 美金 線程pool-1-thread-2換得了: 毒品5. ArrayBlockingQueue
一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。隊列包含固定長度的隊列和不固定長度的隊列。
這是一個典型的“有界緩存區”,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦創建了這樣的緩存區,就不能再增加其容量。試圖向已滿隊列中放入元素會導致操作受阻塞;試圖從空隊列中提取元素將導致類似阻塞。
通俗的講:當指定隊列大小,如果已經放滿,其他存入數據的線程就阻塞,等著該隊列中有空位,才能放進去。當取的比較快,隊列中沒有數據,取數據的線程阻塞,等隊列中放入了數據,才可以取。
ArrayBlockingQueue中只有put和take方法才具有阻塞功能。方法類型如下
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 移除 | remove() | poll() | take() | poll(time, unit) |
| 檢查 | element() | peek() | 不可用 | 不可用 |
示例:用3個空間的隊列來演示向阻塞隊列中存取數據的效果
6. 阻塞隊列間的通信
A隊列向空間中存數據,B從空間里取數據,A存入后,通知B去取,B取過之后,通知A去放,依次循環
示例:子線程先循環10次,接著主線程循環100次,接著又回到子線程,循環10次,再回到主線程又循環100,如此循環50次。
說明:這里通過使 用兩個具有1個空間的隊列來實現同步通知的功能(實現了鎖和condition的功能),以便實現隊列間的通信,其中使用到了構造代碼塊為主隊列先存入一個數據,以使其先阻塞,子隊列先執行。
使用構造代碼塊的原因:
成員變量在創建類的實例對象時,才分配空間,才能有值,所以創建一個構造方法來給main_quene賦值,這里不可以使用靜態代碼塊,因為靜態在還沒創建對象就存在, 而sub_quene和main_quene是對象創建以后的成員變量,所以這里用匿名構造方法,它的運行時期在任何構造方法之前,創建幾個對象就執行幾次
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueCommunication { public static void main(String[] args){ final Business business = new Business(); new Thread(new Runnable(){ @Override public void run() { for(int i=1;i<=50;i++){ business.sub(i); } } }).start(); //主線程外部循環 for(int i=1;i<=50;i++){ business.main(i); } } //業務類 static class Business{ BlockingQueue<Integer> sub_quene = new ArrayBlockingQueue<Integer>(1); BlockingQueue<Integer> main_quene = new ArrayBlockingQueue<Integer>(1); { //為了讓子隊列先走,所以在一開始就往主隊列中存入一個對象,使其阻塞。 try { main_quene.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } //子隊列先走 public void sub(int i){ try { sub_quene.put(1); //子隊列第一次存入,可以執行,但由于只有1個空間,已經存滿,所以只有在執行后要等到take之后才能繼續下次執行 } catch (InterruptedException e) { e.printStackTrace(); } //子隊列循環執行 for(int j=1;j<=10;j++){ System.out.println("sub thread sequence of"+i+",loop of "+j); } try { main_quene.take(); //讓主隊列從已經填滿的隊列中取出數據,使其開始第一次執行 } catch (InterruptedException e) { e.printStackTrace(); } } public void main(int i){ try { main_quene.put(1); //主隊列先前放過1個空間,現在處于阻塞狀態,等待子隊列通知,即子線程中的main_quene.take(); } catch (InterruptedException e) { e.printStackTrace(); } //主隊列循環執行 for(int j=1;j<=100;j++){ System.out.println("main thread sequence of"+i+", loop of "+j); } try { sub_quene.take(); //讓子隊列從已經填滿的隊列中取出數據,使其執行 } catch (InterruptedException e) { e.printStackTrace(); } } } }7. 同步集合類
7.1 同步Map集合
- java.util.concurrent.ConcurrentMap
- ConcurrentHashMap
- ConcurrentNavigableMap
- ConcurrentSkipListMap
ConcurrentHashMap
同步的HashMap,支持獲取的完全并發和更新的所期望可調整并發的哈希表。此類遵守與 Hashtable 相同的功能規范,并且包括對應于 Hashtable 的每個方法的方法版本。
不過,盡管所有操作都是線程安全的,但獲取操作不 必鎖定,并且不 支持以某種防止所有訪問的方式鎖定整個表。此類可以通過程序完全與 Hashtable 進行互操作,這取決于其線程安全,而與其同步細節無關。
內部原理:
其實內部使用了代理模式,你給我一個HashMap,我就給你一個同步的HashMap。同步的HashMap在調用方法時,是去分配給原始的HashMap只是在去調用方法的同時加上了Synchronized,以此實現同步效果
ConcurrentHashMap是線程安全的HashMap的實現,默認構造同樣有initialCapacity和loadFactor屬性,不過還多了一個concurrencyLevel屬性,三屬性默認值分別為16、0.75及16。其內部使用鎖分段技術,維持這鎖Segment的數組,在Segment數組中又存放著Entity[]數組,內部hash算法將數據較均勻分布在不同鎖中。
- put(key , value)
并沒有在此方法上加上synchronized,首先對key.hashcode進行hash操作,得到key的hash值。hash操作的算法和map也不同,根據此hash值計算并獲取其對應的數組中的Segment對象(繼承自ReentrantLock),接著調用此Segment對象的put方法來完成當前操作。
ConcurrentHashMap基于concurrencyLevel劃分出了多個Segment來對key-value進行存儲,從而避免每次put操作都得鎖住整個數組。在默認的情況下,最佳情況下可允許16個線程并發無阻塞的操作集合對象,盡可能地減少并發時的阻塞現象。
- get(key)
首先對key.hashCode進行hash操作,基于其值找到對應的Segment對象,調用其get方法完成當前操作。而Segment的get操作首先通過hash值和對象數組大小減1的值進行按位與操作來獲取數組上對應位置的HashEntry。在這個步驟中,可能會因為對象數組大小的改變,以及數組上對應位置的HashEntry產生不一致性,那么ConcurrentHashMap是如何保證的?
對象數組大小的改變只有在put操作時有可能發生,由于HashEntry對象數組對應的變量是volatile類型的,因此可以保證如HashEntry對象數組大小發生改變,讀操作可看到最新的對象數組大小。
在獲取到了HashEntry對象后,怎么能保證它及其next屬性構成的鏈表上的對象不會改變呢?這點ConcurrentHashMap采用了一個簡單的方式,即HashEntry對象中的hash、key、next屬性都是final的,這也就意味著沒辦法插入一個HashEntry對象到基于next屬性構成的鏈表中間或末尾。這樣就可以保證當獲取到HashEntry對象后,其基于next屬性構建的鏈表是不會發生變化的。
ConcurrentHashMap默認情況下采用將數據分為16個段進行存儲,并且16個段分別持有各自不同的鎖Segment,鎖僅用于put和remove等改變集合對象的操作,基于volatile及HashEntry鏈表的不變性實現了讀取的不加鎖。這些方式使得ConcurrentHashMap能夠保持極好的并發支持,尤其是對于讀遠比插入和刪除頻繁的Map而言,而它采用的這些方法也可謂是對于Java內存模型、并發機制深刻掌握的體現。
ConcurrentNavigableMap
java.util.concurrent.ConcurrentNavigableMap 是一個支持并發訪問的 java.util.NavigableMap,它還能讓它的子 map 具備并發訪問的能力。所謂的 “子 map” 指的是諸如 headMap(),subMap(),tailMap() 之類的方法返回的 map。
NavigableMap 中的方法不再贅述,本小節我們來看一下 ConcurrentNavigableMap 添加的方法。
headMap()
headMap(T toKey) 方法返回一個包含了小于給定 toKey 的 key 的子 map。
如果你對原始 map 里的元素做了改動,這些改動將影響到子 map 中的元素(譯者注:map 集合持有的其實只是對象的引用)。
以下示例演示了對 headMap() 方法的使用:
ConcurrentNavigableMap map = new ConcurrentSkipListMap(); map.put("1", "one"); map.put("2", "two"); map.put("3", "three"); ConcurrentNavigableMap headMap = map.headMap("2");headMap 將指向一個只含有鍵 “1” 的 ConcurrentNavigableMap,因為只有這一個鍵小于 “2”。關于這個方法及其重載版本具體是怎么工作的細節請參考 Java 文檔。
tailMap()
tailMap(T fromKey) 方法返回一個包含了不小于給定 fromKey 的 key 的子 map。
如果你對原始 map 里的元素做了改動,這些改動將影響到子 map 中的元素(譯者注:map 集合持有的其實只是對象的引用)。
以下示例演示了對 tailMap() 方法的使用:
ConcurrentNavigableMap map = new ConcurrentSkipListMap(); map.put("1", "one"); map.put("2", "two"); map.put("3", "three"); ConcurrentNavigableMap tailMap = map.tailMap("2");tailMap 將擁有鍵 “2” 和 “3”,因為它們不小于給定鍵 “2”。關于這個方法及其重載版本具體是怎么工作的細節請參考 Java 文檔。
subMap()
subMap() 方法返回原始 map 中,鍵介于 from(包含) 和 to (不包含) 之間的子 map。示例如下:
ConcurrentNavigableMap map = new ConcurrentSkipListMap(); map.put("1", "one"); map.put("2", "two"); map.put("3", "three"); ConcurrentNavigableMap subMap = map.subMap("2", "3");返回的 submap 只包含鍵 “2”,因為只有它滿足不小于 “2”,比 “3” 小。
更多方法
ConcurrentNavigableMap 接口還有其他一些方法可供使用,比如:
- descendingKeySet()
- descendingMap()
- navigableKeySet()
關于這些方法更多信息參考官方 Java 文檔。
7.2 同步List集合
- ConcurrentSkipListSet
- CopyOnWriteArraySet
- CopyOnWriteArrayList
ConcurrentSkipListSet
一個基于 ConcurrentSkipListMap 的可縮放并發 NavigableSet 實現。類似于TreeSet,set 的元素可以根據它們的自然順序進行排序,也可以根據創建 set 時所提供的Comparator 進行排序,具體取決于使用的構造方法。
CopyOnWriteArrayList
ArrayList 的一個線程安全的變體,可解決線程安全問題,在遍歷的時候,同時進行添加操作。其中所有可變操作(add、set 等等)都是通過對底層數組進行一次新的復制來實現的。
CopyOnWriteArrayList是一個線程安全、并且在讀操作時無鎖的ArrayList,其具體實現方法如下。
- CopyOnWriteArrayList()
和ArrayList不同,此步的做法為創建一個大小為0的數組。
- add(E)
add方法并沒有加上synchronized關鍵字,它通過使用ReentrantLock來保證線程安全。此處和ArrayList的不同是每次都會創建一個新的Object數組,此數組的大小為當前數組大小加1,將之前數組中的內容復制到新的數組中,并將新增加的對象放入數組末尾,最后做引用切換將新創建的數組對象賦值給全局的數組對象。
- remove(E)
和add方法一樣,此方法也通過ReentrantLock來保證其線程安全,但它和ArrayList刪除元素采用的方式并不一樣。
首先創建一個比當前數組小1的數組,遍歷新數組,如找到equals或均為null的元素,則將之后的元素全部賦值給新的數組對象,并做引用切換,返回true;如未找到,則將當前的元素賦值給新的數組對象,最后特殊處理數組中的最后一個元素,如最后一個元素等于要刪除的元素,即將當前數組對象賦值為新創建的數組對象,完成刪除操作,如最后一個元素也不等于要刪除的元素,那么返回false。
此方法和ArrayList除了鎖不同外,最大的不同在于其復制過程并沒有調用System的arrayCopy來完成,理論上來說會導致性能有一定下降。
- get(int)
此方法非常簡單,直接獲取當前數組對應位置的元素,這種方法是沒有加鎖保護的,因此可能會出現讀到臟數據的現象。但相對而言,性能會非常高,對于寫少讀多且臟數據影響不大的場景而言是不錯的選擇。
- iterator()
調用iterator方法后創建一個新的COWIterator對象實例,并保存了一個當前數組的快照,在調用next遍歷時則僅對此快照數組進行遍歷,因此遍歷此list時不會拋出ConcurrentModificatiedException。
與ArrayList的性能對比,在讀多寫少的并發場景中,較之ArrayList是更好的選擇,單線程以及多線程下增加元素及刪除元素的性能不比ArrayList好
CopyOnWriteArraySet
對其所有操作使用內部 CopyOnWriteArrayList 的 Set。因此,它共享以下相同的基本屬性:
- 它最適合于 set 大小通常保持很小、只讀操作遠多于可變操作以及需要在遍歷期間防止線程間沖突的應用程序。
- 它是線程安全的。
- 因為通常需要復制整個基礎數組,所以可變操作(添加、設置、移除,等等)的開銷巨大。
- 迭代器不支持可變移除操作。
- 使用迭代器進行遍歷的速度很快,并且不會與其他線程發生沖突。在構造迭代器時,迭代器依賴于不變的數組快照。
CopyOnWriteArraySet基于CopyOnWriteArrayList實現,其唯一的不同是在add時調用的是CopyOnWriteArrayList的addIfAbsent方法。保證了無重復元素,但在add時每次都要進行數組的遍歷,因此性能會略低于上個。
7.3 ConcurrentLinkedQueue
ConcurrentLinkedQueue是一個基于鏈接節點的、無界的、線程安全的隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序,隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列檢索操作從隊列頭部獲得元素。當許多線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇,此隊列不允許 null 元素。
7.4 ConcurrentLinkedDeque
一個基于鏈接節點的、無界的、線程安全的雙端隊列
總結
以上是生活随笔為你收集整理的Java高并发编程:同步工具类的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java高并发编程:多个线程之间共享数据
- 下一篇: Java高并发编程:定时器、互斥、同步通