Java多线程(三)之ConcurrentHashMap深入分析
一、Map體系
二、concurrentHashMap的結構
?
我們通過ConcurrentHashMap的類圖來分析ConcurrentHashMap的結構。
?
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap里扮演鎖的角色,HashEntry則用于存儲鍵值對數據。一個ConcurrentHashMap里包含一個Segment數組,Segment的結構和HashMap類似,是一種數組和鏈表結構, 一個Segment里包含一個HashEntry數組,每個HashEntry是一個鏈表結構的元素, 每個Segment守護者一個HashEntry數組里的元素,當對HashEntry數組的數據進行修改時,必須首先獲得它對應的Segment鎖。
?
ConcurrentHashMap 類中包含兩個靜態內部類 HashEntry 和 Segment。HashEntry 用來封裝映射表的鍵 / 值對;Segment 用來充當鎖的角色,每個 Segment 對象守護整個散列映射表的若干個桶。每個桶是由若干個 HashEntry 對象鏈接起來的鏈表。一個 ConcurrentHashMap 實例中包含由若干個 Segment 對象組成的數組。
?
三、ConcurrentHashMap實現原理?
?
鎖分離 (Lock Stripping)
?
比如HashTable是一個過時的容器類,通過使用synchronized來保證線程安全,在線程競爭激烈的情況下HashTable的效率非常低下。原因是所有訪問HashTable的線程都必須競爭同一把鎖。那假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分數據,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效的提高并發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術。
?
ConcurrentHashMap內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的hash table,它們有自己的鎖。只要多個修改操作發生在不同的段上,它們就可以并發進行。同樣當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問。
?
ConcurrentHashMap有些方法需要跨段,比如size()和containsValue(),它們可能需要鎖定整個表而而不僅僅是某個段,這需要按順序鎖定所有段,操作完畢后,又按順序釋放所有段的鎖。這里“按順序”是很重要的,否則極有可能出現死鎖,在ConcurrentHashMap內部,段數組是final的,并且其成員變量實際上也是final的,但是,僅僅是將數組聲明為final的并不保證數組成員也是final的,這需要實現上的保證。這可以確保不會出現死鎖,因為獲得鎖的順序是固定的。不變性是多線程編程占有很重要的地位,下面還要談到。
?
/**
* The segments, each of which is a specialized hash table
*/
final Segment<K,V>[] segments;
?
?
?不變(Immutable)和易變(Volatile)
?
ConcurrentHashMap完全允許多個讀操作并發進行,讀操作并不需要加鎖。如果使用傳統的技術,如HashMap中的實現,如果允許可以在hash鏈的中間添加或刪除元素,讀操作不加鎖將得到不一致的數據。ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的。HashEntry代表每個hash鏈中的一個節點,其結構如下所示:
?
?
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
??
?
可以看到除了value不是final的,其它值都是final的,這意味著不能從hash鏈的中間或尾部添加或刪除節點,因為這需要修改next引用值,所有的節點的修改只能從頭部開始。對于put操作,可以一律添加到Hash鏈的頭部。但是對于remove操作,可能需要從中間刪除一個節點,這就需要將要刪除節點的前面所有節點整個復制一遍,最后一個節點指向要刪除結點的下一個結點。這在講解刪除操作時還會詳述。為了確保讀操作能夠看到最新的值,將value設置成volatile,這避免了加鎖。
?
四、ConcurrentHashMap具體實現
ConcurrentHashMap的初始化
?
ConcurrentHashMap初始化方法是通過initialCapacity,loadFactor, concurrencyLevel幾個參數來初始化segments數組,段偏移量segmentShift,段掩碼segmentMask和每個segment里的HashEntry數組 。
初始化segments數組。讓我們來看一下初始化segmentShift,segmentMask和segments數組的源代碼。
?
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
 ?
?
由上面的代碼可知segments數組的長度ssize通過concurrencyLevel計算得出。為了能通過按位與的哈希算法來定位segments數組的索引,必須保證segments數組的長度是2的N次方(power-of-two size),所以必須計算出一個是大于或等于concurrencyLevel的最小的2的N次方值來作為segments數組的長度。假如concurrencyLevel等于14,15或16,ssize都會等于16,即容器里鎖的個數也是16。注意concurrencyLevel的最大大小是65535,意味著segments數組的長度最大為65536,對應的二進制是16位。
?
初始化segmentShift和segmentMask。這兩個全局變量在定位segment時的哈希算法里需要使用,sshift等于ssize從1向左移位的次數,在默認情況下concurrencyLevel等于16,1需要向左移位移動4次,所以sshift等于4。segmentShift用于定位參與hash運算的位數,segmentShift等于32減sshift,所以等于28,這里之所以用32是因為ConcurrentHashMap里的hash()方法輸出的最大數是32位的,后面的測試中我們可以看到這點。segmentMask是哈希運算的掩碼,等于ssize減1,即15,掩碼的二進制各個位的值都是1。因為ssize的最大長度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,對應的二進制是16位,每個位都是1。
初始化每個Segment。輸入參數initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每個segment的負載因子,在構造方法里需要通過這兩個參數來初始化數組中的每個segment。
?
上面代碼中的變量cap就是segment里HashEntry數組的長度,它等于initialCapacity除以ssize的倍數c,如果c大于1,就會取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。segment的容量threshold=(int)cap*loadFactor,默認情況下initialCapacity等于16,loadfactor等于0.75,通過運算cap等于1,threshold等于零。
?
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
while (cap < c)
cap <<= 1;
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
?
?
定位Segment
?
既然ConcurrentHashMap使用分段鎖Segment來保護不同段的數據,那么在插入和獲取元素的時候,必須先通過哈希算法定位到Segment。可以看到ConcurrentHashMap會首先使用Wang/Jenkins hash的變種算法對元素的hashCode進行一次再哈希。
?
private static int hash(int h) {
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
 ?
?
之所以進行再哈希,其目的是為了減少哈希沖突,使元素能夠均勻的分布在不同的Segment上,從而提高容器的存取效率。假如哈希的質量差到極點,那么所有的元素都在一個Segment中,不僅存取元素緩慢,分段鎖也會失去意義。我做了一個測試,不通過再哈希而直接執行哈希計算。
?
System.out.println(Integer.parseInt("0001111", 2) & 15);
System.out.println(Integer.parseInt("0011111", 2) & 15);
System.out.println(Integer.parseInt("0111111", 2) & 15);
System.out.println(Integer.parseInt("1111111", 2) & 15);
?
?
計算后輸出的哈希值全是15,通過這個例子可以發現如果不進行再哈希,哈希沖突會非常嚴重,因為只要低位一樣,無論高位是什么數,其哈希值總是一樣。我們再把上面的二進制數據進行再哈希后結果如下,為了方便閱讀,不足32位的高位補了0,每隔四位用豎線分割下。
?
0100|0111|0110|0111|1101|1010|0100|1110
1111|0111|0100|0011|0000|0001|1011|1000
0111|0111|0110|1001|0100|0110|0011|1110
1000|0011|0000|0000|1100|1000|0001|1010
?
?
可以發現每一位的數據都散列開了,通過這種再哈希能讓數字的每一位都能參加到哈希運算當中,從而減少哈希沖突。ConcurrentHashMap通過以下哈希算法定位segment。
?
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
?
?
默認情況下segmentShift為28,segmentMask為15,再哈希后的數最大是32位二進制數據,向右無符號移動28位,意思是讓高4位參與到hash運算中, (hash >>> segmentShift) & segmentMask的運算結果分別是4,15,7和8,可以看到hash值沒有發生沖突。
?
ConcurrentHashMap的get操作
?
前面提到過ConcurrentHashMap的get操作是不用加鎖的,我們這里看一下其實現:
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}
?
看第三行,segmentFor這個函數用于確定操作應該在哪一個segment中進行,幾乎對ConcurrentHashMap的所有操作都需要用到這個函數,我們看下這個函數的實現:
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
?
這個函數用了位操作來確定Segment,根據傳入的hash值向右無符號右移segmentShift位,然后和segmentMask進行與操作,結合我們之前說的segmentShift和segmentMask的值,就可以得出以下結論:假設Segment的數量是2的n次方,根據元素的hash值的高n位就可以確定元素到底在哪一個Segment中。
在確定了需要在哪一個segment中進行操作以后,接下來的事情就是調用對應的Segment的get方法:
V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
 ?
先看第二行代碼,這里對count進行了一次判斷,其中count表示Segment中元素的數量,我們可以來看一下count的定義:
?
transient volatile int count;?
可以看到count是volatile的,實際上這里里面利用了volatile的語義:
?
對volatile字段的寫入操作happens-before于每一個后續的同一個字段的讀操作。
?
因為實際上put、remove等操作也會更新count的值,所以當競爭發生的時候,volatile的語義可以保證寫操作在讀操作之前,也就保證了寫操作對后續的讀操作都是可見的,這樣后面get的后續操作就可以拿到完整的元素內容。
然后,在第三行,調用了getFirst()來取得鏈表的頭部:
HashEntry<K,V> getFirst(int hash) {
HashEntry<K,V>[] tab = table;
return tab[hash & (tab.length - 1)];
}
?
同樣,這里也是用位操作來確定鏈表的頭部,hash值和HashTable的長度減一做與操作,最后的結果就是hash值的低n位,其中n是HashTable的長度以2為底的結果。
在確定了鏈表的頭部以后,就可以對整個鏈表進行遍歷,看第4行,取出key對應的value的值,如果拿出的value的值是null,則可能這個key,value對正在put的過程中,如果出現這種情況,那么就加鎖來保證取出的value是完整的,如果不是null,則直接返回value。
?
ConcurrentHashMap的put操作
?
看完了get操作,再看下put操作,put操作的前面也是確定Segment的過程,這里不再贅述,直接看關鍵的segment的put方法:
?
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
?
首先對Segment的put操作是加鎖完成的,然后在第五行,如果Segment中元素的數量超過了閾值(由構造函數中的loadFactor算出)這需要進行對Segment擴容,并且要進行rehash,關于rehash的過程大家可以自己去了解,這里不詳細講了。
第8和第9行的操作就是getFirst的過程,確定鏈表頭部的位置。
第11行這里的這個while循環是在鏈表中尋找和要put的元素相同key的元素,如果找到,就直接更新更新key的value,如果沒有找到,則進入21行這里,生成一個新的HashEntry并且把它加到整個Segment的頭部,然后再更新count的值。
?
ConcurrentHashMap的remove操作
?
Remove操作的前面一部分和前面的get和put操作一樣,都是定位Segment的過程,然后再調用Segment的remove方法:
?
V remove(Object key, int hash, Object value) {
lock();
try {
int c = count - 1;
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue = null;
if (e != null) {
V v = e.value;
if (value == null || value.equals(v)) {
oldValue = v;
// All entries following removed node can stay
// in list, but all preceding ones need to be
// cloned.
++modCount;
HashEntry<K,V> newFirst = e.next;
for (HashEntry<K,V> p = first; p != e; p = p.next)
newFirst = new HashEntry<K,V>(p.key, p.hash,
newFirst, p.value);
tab[index] = newFirst;
count = c; // write-volatile
}
}
return oldValue;
} finally {
unlock();
}
}
?
首先remove操作也是確定需要刪除的元素的位置,不過這里刪除元素的方法不是簡單地把待刪除元素的前面的一個元素的next指向后面一個就完事了,我們之前已經說過HashEntry中的next是final的,一經賦值以后就不可修改,在定位到待刪除元素的位置以后,程序就將待刪除元素前面的那一些元素全部復制一遍,然后再一個一個重新接到鏈表上去,看一下下面這一幅圖來了解這個過程:
 假設鏈表中原來的元素如上圖所示,現在要刪除元素3,那么刪除元素3以后的鏈表就如下圖所示:
 ?
?
ConcurrentHashMap的size操作
?
在前面的章節中,我們涉及到的操作都是在單個Segment中進行的,但是ConcurrentHashMap有一些操作是在多個Segment中進行,比如size操作,ConcurrentHashMap的size操作也采用了一種比較巧的方式,來盡量避免對所有的Segment都加鎖。
前面我們提到了一個Segment中的有一個modCount變量,代表的是對Segment中元素的數量造成影響的操作的次數,這個值只增不減,size操作就是遍歷了兩次Segment,每次記錄Segment的modCount值,然后將兩次的modCount進行比較,如果相同,則表示期間沒有發生過寫入操作,就將原先遍歷的結果返回,如果不相同,則把這個過程再重復做一次,如果再不相同,則就需要將所有的Segment都鎖住,然后一個一個遍歷了,具體的實現大家可以看ConcurrentHashMap的源碼,這里就不貼了。
?
參考:
聊聊并發(四)——深入分析ConcurrentHashMap ?http://www.infoq.com/cn/articles/ConcurrentHashMap?
ConcurrentHashMap之實現細節?http://marlonyao.iteye.com/blog/344876
Java并發編程之ConcurrentHashMap?http://www.goldendoc.org/2011/06/juc_concurrenthashmap/
探索 ConcurrentHashMap 高并發性的實現機制?http://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/index.html?ca=drs-
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Java多线程(三)之ConcurrentHashMap深入分析的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Java多线程(四)之Concurren
- 下一篇: 切面优先级设置
