Java集合篇:ConcurrentHashMap详解(JDK1.6)
(本文有關ConcurrentHashMap的源碼都是基于JDK1.6的)
(基于JDK1.8的版本可以參考這篇文章:https://blog.csdn.net/a745233700/article/details/83123359)
摘要:
ConcurrentHashMap是J.U.C(java.util.concurrent包)的重要成員,它是HashMap的一個線程安全的、支持高效并發的版本。在默認理想狀態下,ConcurrentHashMap可以支持16個線程執行并發寫操作及任意數量線程的讀操作。本文將結合Java內存模型,分析JDK源代碼,探索ConcurrentHashMap高并發的具體實現機制,包括其在JDK中的定義和結構、并發存取、重哈希和跨段操作,并著重剖析了ConcurrentHashMap讀操作不需要加鎖和分段鎖機制的內在奧秘和原理。
一. ConcurrentHashMap 概述:
HashMap 是 Java Collection Framework 的重要成員,也是Map族(如下圖所示)中我們最為常用的一種。不過遺憾的是,HashMap不是線程安全的。也就是說,在多線程環境下,操作HashMap會導致各種各樣的線程安全問題,比如在HashMap擴容重哈希時出現的死循環問題,臟讀問題等。HashMap的這一缺點往往會造成諸多不便,雖然在并發場景下HashTable和由同步包裝器包裝的HashMap(Collections.synchronizedMap(Map<K,V> m) )可以代替HashMap,但是它們都是通過使用一個全局的鎖來同步不同線程間的并發訪問,因此會帶來不可忽視的性能問題。慶幸的是,JDK為我們解決了這個問題,它為HashMap提供了一個線程安全的高效版本 —— ConcurrentHashMap。在ConcurrentHashMap中,無論是讀操作還是寫操作都能保證很高的性能:在進行讀操作時(幾乎)不需要加鎖,而在寫操作時通過鎖分段技術只對所操作的段加鎖而不影響客戶端對其它段的訪問。特別地,在理想狀態下,ConcurrentHashMap 可以支持 16 個線程執行并發寫操作(如果并發級別設為16),及任意數量線程的讀操作。
如下圖所示,ConcurrentHashMap本質上是一個Segment數組,而一個Segment實例又包含若干個桶,每個桶都包含一條由若干個HashEntry對象鏈接起來的鏈表。ConcurrentHashMap的高效并發機制是通過以下三方面來保證的(具體細節見后文闡述):
- 通過鎖分段技術保證并發環境下的寫操作;
- 通過HashEntry的不變形、volatile變量的內存可見性和加鎖重讀機制 保證高效、安全的讀操作;
- 通過不加鎖和加鎖 兩種方案控制跨段操作的安全性。
二. HashMap 線程不安全的典型表現:
我們先回顧一下HashMap。HashMap是一個數組鏈表,當一個key/Value對被加入時,首先會通過Hash算法定位出這個鍵值對要被放入的桶,然后就把它插到相應桶中。如果這個桶中已經有元素了,那么發生了碰撞,這樣會在這個桶中形成一個鏈表。一般來說,當有數據要插入HashMap時,都會檢查容量有沒有超過設定的thredhold,如果超過,需要增大HashMap的尺寸,但是這樣一來,就需要對整個HashMap里的節點進行重哈希操作。在重哈希的過程中,就會出現HashMap線程不安全的典型表現 —— 死循環。
HashMap重哈希的關鍵源碼如下:
/*** Transfers all entries from current table to newTable.*/void transfer(Entry[] newTable) {// 將原數組 table 賦給數組 srcEntry[] src = table;int newCapacity = newTable.length;// 將數組 src 中的每條鏈重新添加到 newTable 中for (int j = 0; j < src.length; j++) {Entry<K,V> e = src[j];if (e != null) {src[j] = null; // src 回收// 將每條鏈的每個元素依次添加到 newTable 中相應的桶中do {Entry<K,V> next = e.next;// e.hash指的是 hash(key.hashCode())的返回值;// 計算在newTable中的位置,注意原來在同一條子鏈上的元素可能被分配到不同的桶中int i = indexFor(e.hash, newCapacity); e.next = newTable[i];newTable[i] = e;e = next;} while (e != null);}}}1、單線程環境下的重哈希過程演示:
單線程情況下,rehash 不會出現任何問題,如上圖所示。假設hash算法就是最簡單的 key mod table.length(也就是桶的個數)。最上面的是old hash表,其中的Hash表桶的個數為2, 所以對于 key = 3、7、5 的鍵值對在 mod 2以后都沖突在table[1]這里了。接下來的三個步驟是,Hash表resize成4,然后對所有的鍵值對重哈希的過程。
2、多線程環境下的重哈希過程演示:
假設我們有兩個線程,我用紅色和淺藍色標注了一下,被這兩個線程共享的資源正是要被重哈希的原來1號桶中的Entry鏈。我們再回頭看一下我們的transfer代碼中的這個細節:
do {Entry<K,V> next = e.next; // <--假設線程一執行到這里就被調度掛起了int i = indexFor(e.hash, newCapacity);e.next = newTable[i];newTable[i] = e;e = next; } while (e != null);而我們的線程二執行完成了,于是我們有下面的這個樣子:
注意,在Thread2重哈希后,Thread1的指針e和指針next分別指向了Thread2重組后的鏈表(e指向了key(3),而next指向了key(7))。此時,Thread1被調度回來執行:Thread1先是執行 newTalbe[i] = e;然后是e = next,導致了e指向了key(7),而下一次循環的next = e.next導致了next指向了key(3),如下圖所示:
這時,一切安好。Thread1有條不紊的工作著:把key(7)摘下來,放到newTable[i]的第一個,然后把e和next往下移,如下圖所示:
在此時,特別需要注意的是,當執行e.next = newTable[i]后,會導致 key(3).next 指向了 key(7),而此時的key(7).next 已經指向了key(3),環形鏈表就這樣出現了,如下圖所示。于是,當我們的Thread1調用HashMap.get(11)時,悲劇就出現了 —— Infinite Loop。
這是HashMap在并發環境下使用中最為典型的一個問題,就是在HashMap進行擴容重哈希時導致Entry鏈形成環。一旦Entry鏈中有環,勢必會導致在同一個桶中進行插入、查詢、刪除等操作時陷入死循環。
三. ConcurrentHashMap 在 JDK 中的定義:
為了更好的理解 ConcurrentHashMap 高并發的具體實現,我們先來了解它在JDK中的定義。ConcurrentHashMap類中包含兩個靜態內部類 HashEntry 和 Segment,其中 HashEntry 用來封裝具體的K/V對,是個典型的四元組;Segment 用來充當鎖的角色,每個 Segment 對象守護整個ConcurrentHashMap的若干個桶 (可以把Segment看作是一個小型的哈希表),其中每個桶是由若干個 HashEntry 對象鏈接起來的鏈表。總的來說,一個ConcurrentHashMap實例中包含由若干個Segment實例組成的數組,而一個Segment實例又包含由若干個桶,每個桶中都包含一條由若干個 HashEntry 對象鏈接起來的鏈表。特別地,ConcurrentHashMap 在默認并發級別下會創建16個Segment對象的數組,如果鍵能均勻散列,每個 Segment 大約守護整個散列表中桶總數的 1/16。
1、類結構定義:
ConcurrentHashMap 繼承了AbstractMap并實現了ConcurrentMap接口,其在JDK中的定義為:
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>implements ConcurrentMap<K, V>, Serializable {... }2、成員變量定義:
HashMap相比,ConcurrentHashMap 增加了兩個屬性用于定位段,分別是 segmentMask 和 segmentShift。此外,不同于HashMap的是,ConcurrentHashMap底層結構是一個Segment數組,而不是Object數組,具體源碼如下:
/*** Mask value for indexing into segments. The upper bits of a* key's hash code are used to choose the segment.*/final int segmentMask; // 用于定位段,大小等于segments數組的大小減 1,是不可變的/*** Shift value for indexing within segments.*/final int segmentShift; // 用于定位段,大小等于32(hash值的位數)減去對segments的大小取以2為底的對數值,是不可變的/*** The segments, each of which is a specialized hash table*/final Segment<K,V>[] segments; // ConcurrentHashMap的底層結構是一個Segment數組3、段的定義:Segment:
Segment 類繼承于 ReentrantLock 類,從而使得 Segment 對象能充當鎖的角色。每個 Segment 對象用來守護它的成員對象 table 中包含的若干個桶。table 是一個由 HashEntry 對象組成的鏈表數組,table 數組的每一個數組成員就是一個桶。
在Segment類中,count 變量是一個計數器,它表示每個 Segment 對象管理的 table 數組包含的 HashEntry 對象的個數,也就是 Segment 中包含的 HashEntry 對象的總數。特別需要注意的是,之所以在每個 Segment 對象中包含一個計數器,而不是在 ConcurrentHashMap 中使用全局的計數器,是對 ConcurrentHashMap 并發性的考慮:因為這樣當需要更新計數器時,不用鎖定整個ConcurrentHashMap。事實上,每次對段進行結構上的改變,如在段中進行增加/刪除節點(修改節點的值不算結構上的改變),都要更新count的值,此外,在JDK的實現中每次讀取操作開始都要先讀取count的值。特別需要注意的是,count是volatile的,這使得對count的任何更新對其它線程都是立即可見的。modCount用于統計段結構改變的次數,主要是為了檢測對多個段進行遍歷過程中某個段是否發生改變,這一點具體在談到跨段操作時會詳述。threashold用來表示段需要進行重哈希的閾值。loadFactor表示段的負載因子,其值等同于ConcurrentHashMap的負載因子的值。table是一個典型的鏈表數組,而且也是volatile的,這使得對table的任何更新對其它線程也都是立即可見的。段(Segment)的定義如下:
/*** Segments are specialized versions of hash tables. This* subclasses from ReentrantLock opportunistically, just to* simplify some locking and avoid separate construction.*/static final class Segment<K,V> extends ReentrantLock implements Serializable {/*** The number of elements in this segment's region.*/transient volatile int count; // Segment中元素的數量,可見的/*** Number of updates that alter the size of the table. This is* used during bulk-read methods to make sure they see a* consistent snapshot: If modCounts change during a traversal* of segments computing size or checking containsValue, then* we might have an inconsistent view of state so (usually)* must retry.*/transient int modCount; //對count的大小造成影響的操作的次數(比如put或者remove操作)/*** The table is rehashed when its size exceeds this threshold.* (The value of this field is always <tt>(int)(capacity ** loadFactor)</tt>.)*/transient int threshold; // 閾值,段中元素的數量超過這個值就會對Segment進行擴容/*** The per-segment table.*/transient volatile HashEntry<K,V>[] table; // 鏈表數組/*** The load factor for the hash table. Even though this value* is same for all segments, it is replicated to avoid needing* links to outer object.* @serial*/final float loadFactor; // 段的負載因子,其值等同于ConcurrentHashMap的負載因子...}我們知道,ConcurrentHashMap允許多個修改(寫)操作并發進行,其關鍵在于使用了鎖分段技術,它使用了不同的鎖來控制對哈希表的不同部分進行的修改(寫),而 ConcurrentHashMap 內部使用段(Segment)來表示這些不同的部分。實際上,每個段實質上就是一個小的哈希表,每個段都有自己的鎖(Segment 類繼承了 ReentrantLock 類)。這樣,只要多個修改(寫)操作發生在不同的段上,它們就可以并發進行。下圖是依次插入 ABC 三個 HashEntry 節點后,Segment 的結構示意圖:
4、基本元素:HashEntry:
HashEntry用來封裝具體的鍵值對,是個典型的四元組。與HashMap中的Entry類似,HashEntry也包括同樣的四個域,分別是key、hash、value和next。不同的是,在HashEntry類中,key,hash和next域都被聲明為final的,value域被volatile所修飾,因此HashEntry對象幾乎是不可變的,這是ConcurrentHashmap讀操作并不需要加鎖的一個重要原因。next域被聲明為final本身就意味著我們不能從hash鏈的中間或尾部添加或刪除節點,因為這需要修改next引用值,因此所有的節點的修改只能從頭部開始。對于put操作,可以一律添加到Hash鏈的頭部。但是對于remove操作,可能需要從中間刪除一個節點,這就需要將要刪除節點的前面所有節點整個復制(重新new)一遍,最后一個節點指向要刪除結點的下一個結點(這在談到ConcurrentHashMap的刪除操作時還會詳述)。特別地,由于value域被volatile修飾,所以其可以確保被讀線程讀到最新的值,這是ConcurrentHashmap讀操作并不需要加鎖的另一個重要原因。實際上,ConcurrentHashMap完全允許多個讀操作并發進行,讀操作并不需要加鎖。HashEntry代表hash鏈中的一個節點,其結構如下所示:
/*** ConcurrentHashMap 中的 HashEntry 類* * ConcurrentHashMap list entry. Note that this is never exported* out as a user-visible Map.Entry.** Because the value field is volatile, not final, it is legal wrt* the Java Memory Model for an unsynchronized reader to see null* instead of initial value when read via a data race. Although a* reordering leading to this is not likely to ever actually* occur, the Segment.readValueUnderLock method is used as a* backup in case a null (pre-initialized) value is ever seen in* an unsynchronized access method.*/static final class HashEntry<K,V> {final K key; // 聲明 key 為 final 的final int hash; // 聲明 hash 值為 final 的volatile V value; // 聲明 value 被volatile所修飾final HashEntry<K,V> next; // 聲明 next 為 final 的HashEntry(K key, int hash, HashEntry<K,V> next, V value) {this.key = key;this.hash = hash;this.next = next;this.value = value;}@SuppressWarnings("unchecked")static final <K,V> HashEntry<K,V>[] newArray(int i) {return new HashEntry[i];}}與HashMap類似,在ConcurrentHashMap中,如果在散列時發生碰撞,也會將碰撞的 HashEntry 對象鏈成一個鏈表。由于HashEntry的next域是final的,所以新節點只能在鏈表的表頭處插入。下圖是在一個空桶中依次插入 A,B,C 三個 HashEntry 對象后的結構圖(由于只能在表頭插入,所以鏈表中節點的順序和插入的順序相反):
與HashEntry不同的是,HashMap 中的 Entry 類結構如下所示:
/*** HashMap 中的 Entry 類*/static class Entry<K,V> implements Map.Entry<K,V> {final K key;V value;Entry<K,V> next;final int hash;/*** Creates new entry.*/Entry(int h, K k, V v, Entry<K,V> n) {value = v;next = n;key = k;hash = h;}...}四. ConcurrentHashMap 的構造函數:
ConcurrentHashMap 一共提供了五個構造函數,其中默認無參的構造函數和參數為Map的構造函數 為 Java Collection Framework 規范的推薦實現,其余三個構造函數則是 ConcurrentHashMap 專門提供的。
1、ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel):
該構造函數意在構造一個具有指定容量、指定負載因子和指定段數目/并發級別(若不是2的冪次方,則會調整為2的冪次方)的空ConcurrentHashMap,其相關源碼如下:
/*** Creates a new, empty map with the specified initial* capacity, load factor and concurrency level.** @param initialCapacity the initial capacity. The implementation* performs internal sizing to accommodate this many elements.* @param loadFactor the load factor threshold, used to control resizing.* Resizing may be performed when the average number of elements per* bin exceeds this threshold.* @param concurrencyLevel the estimated number of concurrently* updating threads. The implementation performs internal sizing* to try to accommodate this many threads.* @throws IllegalArgumentException if the initial capacity is* negative or the load factor or concurrencyLevel are* nonpositive.*/public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS;// Find power-of-two sizes best matching argumentsint sshift = 0; // 大小為 lg(ssize) int ssize = 1; // 段的數目,segments數組的大小(2的冪次方)while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}segmentShift = 32 - sshift; // 用于定位段segmentMask = ssize - 1; // 用于定位段this.segments = Segment.newArray(ssize); // 創建segments數組if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;int c = initialCapacity / ssize; // 總的桶數/總的段數if (c * ssize < initialCapacity)++c;int cap = 1; // 每個段所擁有的桶的數目(2的冪次方)while (cap < c)cap <<= 1;for (int i = 0; i < this.segments.length; ++i) // 初始化segments數組this.segments[i] = new Segment<K,V>(cap, loadFactor);}2、ConcurrentHashMap(int initialCapacity, float loadFactor):
該構造函數意在構造一個具有指定容量、指定負載因子和默認并發級別(16)的空ConcurrentHashMap,其相關源碼如下:
/*** Creates a new, empty map with the specified initial capacity* and load factor and with the default concurrencyLevel (16).** @param initialCapacity The implementation performs internal* sizing to accommodate this many elements.* @param loadFactor the load factor threshold, used to control resizing.* Resizing may be performed when the average number of elements per* bin exceeds this threshold.* @throws IllegalArgumentException if the initial capacity of* elements is negative or the load factor is nonpositive** @since 1.6*/public ConcurrentHashMap(int initialCapacity, float loadFactor) {this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); // 默認并發級別為16}3、ConcurrentHashMap(int initialCapacity):
該構造函數意在構造一個具有指定容量、默認負載因子(0.75)和默認并發級別(16)的空ConcurrentHashMap,其相關源碼如下:
/*** Creates a new, empty map with the specified initial capacity,* and with default load factor (0.75) and concurrencyLevel (16).** @param initialCapacity the initial capacity. The implementation* performs internal sizing to accommodate this many elements.* @throws IllegalArgumentException if the initial capacity of* elements is negative.*/public ConcurrentHashMap(int initialCapacity) {this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);}4、ConcurrentHashMap():
該構造函數意在構造一個具有默認初始容量(16)、默認負載因子(0.75)和默認并發級別(16)的空ConcurrentHashMap,其相關源碼如下:
/*** Creates a new, empty map with a default initial capacity (16),* load factor (0.75) and concurrencyLevel (16).*/public ConcurrentHashMap() {this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);}5、ConcurrentHashMap(Map<? extends K, ? extends V> m):
該構造函數意在構造一個與指定 Map 具有相同映射的 ConcurrentHashMap,其初始容量不小于 16 (具體依賴于指定Map的大小),負載因子是 0.75,并發級別是 16, 是 Java Collection Framework 規范推薦提供的,其源碼如下:
/*** Creates a new map with the same mappings as the given map.* The map is created with a capacity of 1.5 times the number* of mappings in the given map or 16 (whichever is greater),* and a default load factor (0.75) and concurrencyLevel (16).** @param m the map*/public ConcurrentHashMap(Map<? extends K, ? extends V> m) {this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,DEFAULT_INITIAL_CAPACITY),DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);putAll(m);}在這里,我們提到了三個非常重要的參數:初始容量、負載因子 和 并發級別,這三個參數是影響ConcurrentHashMap性能的重要參數。從上述源碼我們可以看出,ConcurrentHashMap 也正是通過initialCapacity、loadFactor和concurrencyLevel這三個參數進行構造并初始化segments數組、段偏移量segmentShift、段掩碼segmentMask和每個segment的。
五. ConcurrentHashMap 的數據結構:
本質上,ConcurrentHashMap就是一個Segment數組,而一個Segment實例則是一個小的哈希表。由于Segment類繼承于ReentrantLock類,從而使得Segment對象能充當鎖的角色,這樣,每個 Segment對象就可以守護整個ConcurrentHashMap的若干個桶,其中每個桶是由若干個HashEntry 對象鏈接起來的鏈表。通過使用段(Segment)將ConcurrentHashMap劃分為不同的部分,ConcurrentHashMap就可以使用不同的鎖來控制對哈希表的不同部分的修改,從而允許多個修改操作并發進行, 這正是ConcurrentHashMap鎖分段技術的核心內涵。進一步地,如果把整個ConcurrentHashMap看作是一個父哈希表的話,那么每個Segment就可以看作是一個子哈希表,如下圖所示:
注意,假設ConcurrentHashMap一共分為2^n個段,每個段中有2^m個桶,那么段的定位方式是將key的hash值的高n位與(2^n-1)相與。在定位到某個段后,再將key的hash值的低m位與(2^m-1)相與,定位到具體的桶位。
六. ConcurrentHashMap 的并發存取:
在ConcurrentHashMap中,線程對映射表做讀操作時,一般情況下不需要加鎖就可以完成,對容器做結構性修改的操作(比如,put操作、remove操作等)才需要加鎖。
1、用分段鎖機制實現多個線程間的并發寫操作: put(key, vlaue):
在ConcurrentHashMap中,典型結構性修改操作包括put、remove和clear,下面我們首先以put操作為例說明對ConcurrentHashMap做結構性修改的過程。ConcurrentHashMap的put操作對應的源碼如下:
/*** Maps the specified key to the specified value in this table.* Neither the key nor the value can be null.** <p> The value can be retrieved by calling the <tt>get</tt> method* with a key that is equal to the original key.** @param key key with which the specified value is to be associated* @param value value to be associated with the specified key* @return the previous value associated with <tt>key</tt>, or* <tt>null</tt> if there was no mapping for <tt>key</tt>* @throws NullPointerException if the specified key or value is null*/public V put(K key, V value) {if (value == null)throw new NullPointerException();int hash = hash(key.hashCode());return segmentFor(hash).put(key, hash, value, false);}從上面的源碼我們看到,ConcurrentHashMap不同于HashMap,它既不允許key值為null,也不允許value值為null。此外,我們還可以看到,實際上我們對ConcurrentHashMap的put操作被ConcurrentHashMap委托給特定的段來實現。也就是說,當我們向ConcurrentHashMap中put一個Key/Value對時,首先會獲得Key的哈希值并對其再次哈希,然后根據最終的hash值定位到這條記錄所應該插入的段,定位段的segmentFor()方法源碼如下:
/*** Returns the segment that should be used for key with given hash* @param hash the hash code for the key* @return the segment*/final Segment<K,V> segmentFor(int hash) {return segments[(hash >>> segmentShift) & segmentMask];}segmentFor()方法根據傳入的hash值向右無符號右移segmentShift位,然后和segmentMask進行與操作就可以定位到特定的段。在這里,假設Segment的數量(segments數組的長度)是2的n次方(Segment的數量總是2的倍數,具體見構造函數的實現),那么segmentShift的值就是32-n(hash值的位數是32),而segmentMask的值就是2^n-1(寫成二進制的形式就是n個1)。進一步地,我們就可以得出以下結論:根據key的hash值的高n位就可以確定元素到底在哪一個Segment中。緊接著,調用這個段的put()方法來將目標Key/Value對插到段中,段的put()方法的源碼如下所示:
?
從源碼中首先可以知道,ConcurrentHashMap對Segment的put操作是加鎖完成的。在第二節我們已經知道,Segment是ReentrantLock的子類,因此Segment本身就是一種可重入的Lock,所以我們可以直接調用其繼承而來的lock()方法和unlock()方法對代碼進行上鎖/解鎖。需要注意的是,這里的加鎖操作是針對某個具體的Segment,鎖定的也是該Segment而不是整個ConcurrentHashMap。因為插入鍵/值對操作只是在這個Segment包含的某個桶中完成,不需要鎖定整個ConcurrentHashMap。因此,其他寫線程對另外15個Segment的加鎖并不會因為當前線程對這個Segment的加鎖而阻塞。故而 相比較于 HashTable 和由同步包裝器包裝的HashMap每次只能有一個線程執行讀或寫操作,ConcurrentHashMap 在并發訪問性能上有了質的提高。在理想狀態下,ConcurrentHashMap 可以支持 16 個線程執行并發寫操作(如果并發級別設置為 16),及任意數量線程的讀操作。
在將Key/Value對插入到Segment之前,首先會檢查本次插入會不會導致Segment中元素的數量超過閾值threshold,如果會,那么就先對Segment進行擴容和重哈希操作,然后再進行插入。重哈希操作暫且不表,稍后詳述。第8和第9行的操作就是定位到段中特定的桶并確定鏈表頭部的位置。第12行的while循環用于檢查該桶中是否存在相同key的結點,如果存在,就直接更新value值;如果沒有找到,則進入21行生成一個新的HashEntry并且把它鏈到該桶中鏈表的表頭,然后再更新count的值(由于count是volatile變量,所以count值的更新一定要放在最后一步)。
到此為止,除了重哈希操作,ConcurrentHashMap的put操作已經介紹完了。此外,在ConcurrentHashMap中,修改操作還包括putAll()和replace()。其中,putAll()操作就是多次調用put方法,而replace()操作實現要比put()操作簡單得多,此不贅述。
2、ConcurrentHashMap 的重哈希操作 : rehash():
上面敘述到,在ConcurrentHashMap中使用put操作插入Key/Value對之前,首先會檢查本次插入會不會導致Segment中節點數量超過閾值threshold,如果會,那么就先對Segment進行擴容和重哈希操作。特別需要注意的是,ConcurrentHashMap的重哈希實際上是對ConcurrentHashMap的某個段的重哈希,因此ConcurrentHashMap的每個段所包含的桶位自然也就不盡相同。針對段進行rehash()操作的源碼如下:
void rehash() {HashEntry<K,V>[] oldTable = table; // 擴容前的tableint oldCapacity = oldTable.length;if (oldCapacity >= MAXIMUM_CAPACITY) // 已經擴到最大容量,直接返回return;/** Reclassify nodes in each list to new Map. Because we are* using power-of-two expansion, the elements from each bin* must either stay at same index, or move with a power of two* offset. We eliminate unnecessary node creation by catching* cases where old nodes can be reused because their next* fields won't change. Statistically, at the default* threshold, only about one-sixth of them need cloning when* a table doubles. The nodes they replace will be garbage* collectable as soon as they are no longer referenced by any* reader thread that may be in the midst of traversing table* right now.*/// 新創建一個table,其容量是原來的2倍HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1); threshold = (int)(newTable.length * loadFactor); // 新的閾值int sizeMask = newTable.length - 1; // 用于定位桶for (int i = 0; i < oldCapacity ; i++) {// We need to guarantee that any existing reads of old Map can// proceed. So we cannot yet null out each bin.HashEntry<K,V> e = oldTable[i]; // 依次指向舊table中的每個桶的鏈表表頭if (e != null) { // 舊table的該桶中鏈表不為空HashEntry<K,V> next = e.next;int idx = e.hash & sizeMask; // 重哈希已定位到新桶if (next == null) // 舊table的該桶中只有一個節點newTable[idx] = e;else { // Reuse trailing consecutive sequence at same slotHashEntry<K,V> lastRun = e;int lastIdx = idx;for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;// 尋找k值相同的子鏈,該子鏈尾節點與父鏈的尾節點必須是同一個if (k != lastIdx) {lastIdx = k;lastRun = last;}}// JDK直接將子鏈lastRun放到newTable[lastIdx]桶中newTable[lastIdx] = lastRun;// 對該子鏈之前的結點,JDK會挨個遍歷并把它們復制到新桶中for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {int k = p.hash & sizeMask;HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(p.key, p.hash,n, p.value);}}}}table = newTable; // 擴容完成}其實JDK官方的注釋已經解釋的很清楚了。由于擴容是按照2的冪次方進行的,所以擴展前在同一個桶中的元素,現在要么還是在原來的序號的桶里,或者就是原來的序號再加上一個2的冪次方,就這兩種選擇。根據本文前面對HashEntry的介紹,我們知道鏈接指針next是final的,因此看起來我們好像只能把該桶的HashEntry鏈中的每個節點復制到新的桶中(這意味著我們要重新創建每個節點),但事實上JDK對其做了一定的優化。因為在理論上原桶里的HashEntry鏈可能存在一條子鏈,這條子鏈上的節點都會被重哈希到同一個新的桶中,這樣我們只要拿到該子鏈的頭結點就可以直接把該子鏈放到新的桶中,從而避免了一些節點不必要的創建,提升了一定的效率。因此,JDK為了提高效率,它會首先去查找這樣的一個子鏈,而且這個子鏈的尾節點必須與原hash鏈的尾節點是同一個,那么就只需要把這個子鏈的頭結點放到新的桶中,其后面跟的一串子節點自然也就連接上了。對于這個子鏈頭結點之前的結點,JDK會挨個遍歷并把它們復制到新桶的鏈頭(只能在表頭插入元素)中。特別地,我們注意這段代碼:
for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;if (k != lastIdx) {lastIdx = k;lastRun = last;} } newTable[lastIdx] = lastRun;在該代碼段中,JDK直接將子鏈lastRun放到newTable[lastIdx]桶中,難道這個操作不會覆蓋掉newTable[lastIdx]桶中原有的元素么?事實上,這種情形時不可能出現的,因為桶newTable[lastIdx]在子鏈添加進去之前壓根就不會有節點存在,這還是因為table的大小是按照2的冪次方的方式去擴展的。假設原來table的大小是2^k大小,那么現在新table的大小是2^(k+1)大小,而定位桶的方式是:
// sizeMask = newTable.length - 1,即 sizeMask = 11...1,共k+1個1。 int idx = e.hash & sizeMask;因此這樣得到的idx實際上就是key的hash值的低k+1位的值,而原table的sizeMask也全是1的二進制,不過總共是k位,那么原table的idx就是key的hash值的低k位的值。所以,如果元素的hashcode的第k+1位是0,那么元素在新桶的序號就是和原桶的序號是相等的;如果第k+1位的值是1,那么元素在新桶的序號就是原桶的序號加上2^k。因此,JDK直接將子鏈lastRun放到newTable[lastIdx]桶中就沒問題了,因為newTable中新序號處此時肯定是空的。
3、ConcurrentHashMap 的讀取實現 :get(Object key):
與put操作類似,當我們從ConcurrentHashMap中查詢一個指定Key的鍵值對時,首先會定位其應該存在的段,然后查詢請求委托給這個段進行處理,源碼如下:
/*** Returns the value to which the specified key is mapped,* or {@code null} if this map contains no mapping for the key.** <p>More formally, if this map contains a mapping from a key* {@code k} to a value {@code v} such that {@code key.equals(k)},* then this method returns {@code v}; otherwise it returns* {@code null}. (There can be at most one such mapping.)** @throws NullPointerException if the specified key is null*/public V get(Object key) {int hash = hash(key.hashCode());return segmentFor(hash).get(key, hash);}我們緊接著研讀Segment中get操作的源碼:
V get(Object key, int hash) {if (count != 0) { // read-volatile,首先讀 count 變量HashEntry<K,V> e = getFirst(hash); // 獲取桶中鏈表頭結點while (e != null) {if (e.hash == hash && key.equals(e.key)) { // 查找鏈中是否存在指定Key的鍵值對V v = e.value;if (v != null) // 如果讀到value域不為 null,直接返回return v; // 如果讀到value域為null,說明發生了重排序,加鎖后重新讀取return readValueUnderLock(e); // recheck}e = e.next;}}return null; // 如果不存在,直接返回null}了解了ConcurrentHashMap的put操作后,上述源碼就很好理解了。但是有一個情況需要特別注意,就是鏈中存在指定Key的鍵值對并且其對應的Value值為null的情況。在剖析ConcurrentHashMap的put操作時,我們就知道ConcurrentHashMap不同于HashMap,它既不允許key值為null,也不允許value值為null。但是,此處怎么會存在鍵值對存在且的Value值為null的情形呢?JDK官方給出的解釋是,這種情形發生的場景是:初始化HashEntry時發生的指令重排序導致的,也就是在HashEntry初始化完成之前便返回了它的引用。這時,JDK給出的解決之道就是加鎖重讀,源碼如下:
/*** Reads value field of an entry under lock. Called if value* field ever appears to be null. This is possible only if a* compiler happens to reorder a HashEntry initialization with* its table assignment, which is legal under memory model* but is not known to ever occur.*/V readValueUnderLock(HashEntry<K,V> e) {lock();try {return e.value;} finally {unlock();}}4、ConcurrentHashMap 存取小結:
在ConcurrentHashMap進行存取時,首先會定位到具體的段,然后通過對具體段的存取來完成對整個ConcurrentHashMap的存取。特別地,無論是ConcurrentHashMap的讀操作還是寫操作都具有很高的性能:在進行讀操作時不需要加鎖,而在寫操作時通過鎖分段技術只對所操作的段加鎖而不影響客戶端對其它段的訪問。
七. ConcurrentHashMap 讀操作不需要加鎖的奧秘:
在本文第二節,我們介紹到HashEntry對象幾乎是不可變的(只能改變Value的值),因為HashEntry中的key、hash和next指針都是final的。這意味著,我們不能把節點添加到鏈表的中間和尾部,也不能在鏈表的中間和尾部刪除節點。這個特性可以保證:在訪問某個節點時,這個節點之后的鏈接不會被改變,這個特性可以大大降低處理鏈表時的復雜性。與此同時,由于HashEntry類的value字段被聲明是Volatile的,因此Java的內存模型就可以保證:某個寫線程對value字段的寫入馬上就可以被后續的某個讀線程看到。此外,由于在ConcurrentHashMap中不允許用null作為鍵和值,所以當讀線程讀到某個HashEntry的value為null時,便知道產生了沖突 —— 發生了重排序現象,此時便會加鎖重新讀入這個value值。這些特性互相配合,使得讀線程即使在不加鎖狀態下,也能正確訪問 ConcurrentHashMap。總的來說,ConcurrentHashMap讀操作不需要加鎖的奧秘在于以下三點:
-
用HashEntery對象的不變性來降低讀操作對加鎖的需求;
-
用Volatile變量協調讀寫線程間的內存可見性;
-
若讀時發生指令重排序現象,則加鎖重讀;
由于我們在介紹ConcurrentHashMap的get操作時,已經介紹到了第三點,此不贅述。下面我們結合前兩點分別從線程寫入的兩種角度 —— 對散列表做非結構性修改的操作和對散列表做結構性修改的操作來分析ConcurrentHashMap是如何保證高效讀操作的。
1、用HashEntery對象的不變性來降低讀操作對加鎖的需求:
結構性修改操作只是更改某個HashEntry的value字段的值。由于對Volatile變量的寫入操作將與隨后對這個變量的讀操作進行同步,所以當一個寫線程修改了某個HashEntry的value字段后,Java內存模型能夠保證讀線程一定能讀取到這個字段更新后的值。所以,寫線程對鏈表的非結構性修改能夠被后續不加鎖的讀線程看到。
對ConcurrentHashMap做結構性修改時,實質上是對某個桶指向的鏈表做結構性修改。如果能夠確保在讀線程遍歷一個鏈表期間,寫線程對這個鏈表所做的結構性修改不影響讀線程繼續正常遍歷這個鏈表,那么讀/寫線程之間就可以安全并發訪問這個ConcurrentHashMap。在ConcurrentHashMap中,結構性修改操作包括put操作、remove操作和clear操作,下面我們分別分析這三個操作:
- clear操作只是把ConcurrentHashMap中所有的桶置空,每個桶之前引用的鏈表依然存在,只是桶不再引用這些鏈表而已,而鏈表本身的結構并沒有發生任何修改。因此,正在遍歷某個鏈表的讀線程依然可以正常執行對該鏈表的遍歷。
- 關于put操作的細節我們在上文已經單獨介紹過,我們知道put操作如果需要插入一個新節點到鏈表中時會在鏈表頭部插入這個新節點,此時鏈表中的原有節點的鏈接并沒有被修改。也就是說,插入新的健/值對到鏈表中的操作不會影響讀線程正常遍歷這個鏈表。
下面來分析 remove 操作,先讓我們來看看 remove 操作的源代碼實現:
/*** Removes the key (and its corresponding value) from this map.* This method does nothing if the key is not in the map.** @param key the key that needs to be removed* @return the previous value associated with <tt>key</tt>, or* <tt>null</tt> if there was no mapping for <tt>key</tt>* @throws NullPointerException if the specified key is null*/public V remove(Object key) {int hash = hash(key.hashCode());return segmentFor(hash).remove(key, hash, null);}同樣地,在ConcurrentHashMap中刪除一個鍵值對時,首先需要定位到特定的段并將刪除操作委派給該段。Segment的remove操作如下所示:
/*** Remove; match on key only if value null, else match both.*/V remove(Object key, int hash, Object value) {lock(); // 加鎖try {int c = count - 1; HashEntry<K,V>[] tab = table;int index = hash & (tab.length - 1); // 定位桶HashEntry<K,V> first = tab[index];HashEntry<K,V> e = first;while (e != null && (e.hash != hash || !key.equals(e.key))) // 查找待刪除的鍵值對e = e.next;V oldValue = null;if (e != null) { // 找到V v = e.value;if (value == null || value.equals(v)) {oldValue = v;// All entries following removed node can stay// in list, but all preceding ones need to be// cloned.++modCount;// 所有處于待刪除節點之后的節點原樣保留在鏈表中HashEntry<K,V> newFirst = e.next;// 所有處于待刪除節點之前的節點被克隆到新鏈表中for (HashEntry<K,V> p = first; p != e; p = p.next)newFirst = new HashEntry<K,V>(p.key, p.hash,newFirst, p.value); tab[index] = newFirst; // 將刪除指定節點并重組后的鏈重新放到桶中count = c; // write-volatile,更新Volatile變量count}}return oldValue;} finally {unlock(); // finally子句解鎖}}我們可以看出,刪除節點C之后的所有節點原樣保留到新鏈表中;刪除節點C之前的每個節點被克隆到新鏈表中(它們在新鏈表中的鏈接順序被反轉了)。因此,在執行remove操作時,原始鏈表并沒有被修改,也就是說,讀線程不會受同時執行 remove 操作的并發寫線程的干擾。
綜合上面的分析我們可以知道,無論寫線程對某個鏈表進行結構性修改還是非結構性修改,都不會影響其他的并發讀線程對這個鏈表的訪問。
2、用 Volatile 變量協調讀寫線程間的內存可見性:
一般地,由于內存可見性問題,在未正確同步的情況下,對于寫線程寫入的值讀線程可能并不能及時讀到。下面以寫線程M和讀線程N來說明ConcurrentHashMap如何協調讀/寫線程間的內存可見性問題,如下圖所示:
假設線程M在寫入了volatile變量count后,線程N讀取了這個volatile變量。根據 happens-before 關系法則中的程序次序法則,A appens-before 于 B,C happens-before D。根據 Volatile法則,B happens-before C。結合傳遞性,則可得到:A appens-before 于 B; B appens-before C;C happens-before D。也就是說,寫線程M對鏈表做的結構性修改對讀線程N是可見的。雖然線程N是在未加鎖的情況下訪問鏈表,但Java的內存模型可以保證:只要之前對鏈表做結構性修改操作的寫線程M在退出寫方法前寫volatile變量count,讀線程N就能讀取到這個volatile變量count的最新值。
事實上,ConcurrentHashMap就是一個Segment數組,而每個Segment都有一個volatile變量count去統計Segment中的HashEntry的個數。并且,在ConcurrentHashMap中,所有不加鎖讀方法在進入讀方法時,首先都會去讀這個count變量。比如我們在上一節提到的get方法:
V get(Object key, int hash) {if (count != 0) { // read-volatile,首先讀 count 變量HashEntry<K,V> e = getFirst(hash); // 獲取桶中鏈表頭結點while (e != null) {if (e.hash == hash && key.equals(e.key)) { // 查找鏈中是否存在指定Key的鍵值對V v = e.value;if (v != null) // 如果讀到value域不為 null,直接返回return v; // 如果讀到value域為null,說明發生了重排序,加鎖后重新讀取return readValueUnderLock(e); // recheck}e = e.next;}}return null; // 如果不存在,直接返回null}3、小結:
在ConcurrentHashMap中,所有執行寫操作的方法(put、remove和clear)在對鏈表做結構性修改之后,在退出寫方法前都會去寫這個count變量;所有未加鎖的讀操作(get、contains和containsKey)在讀方法中,都會首先去讀取這個count變量。根據 Java 內存模型,對同一個 volatile 變量的寫/讀操作可以確保:寫線程寫入的值,能夠被之后未加鎖的讀線程“看到”。這個特性和前面介紹的HashEntry對象的不變性相結合,使得在ConcurrentHashMap中讀線程進行讀取操作時基本不需要加鎖就能成功獲得需要的值。這兩個特性以及加鎖重讀機制的互相配合,不僅減少了請求同一個鎖的頻率(讀操作一般不需要加鎖就能夠成功獲得值),也減少了持有同一個鎖的時間(只有讀到 value 域的值為 null 時 , 讀線程才需要加鎖后重讀)。
八. ConcurrentHashMap 的跨段操作:
在ConcurrentHashMap中,有些操作需要涉及到多個段,比如說size操作、containsValaue操作等。以size操作為例,如果我們要統計整個ConcurrentHashMap里元素的大小,那么就必須統計所有Segment里元素的大小后求和。我們知道,Segment里的全局變量count是一個volatile變量,那么在多線程場景下,我們是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢?顯然不能,雖然相加時可以獲取每個Segment的count的最新值,但是拿到之后可能累加前使用的count發生了變化,那么統計結果就不準了。所以最安全的做法,是在統計size的時候把所有Segment的put,remove和clean方法全部鎖住,但是這種做法顯然非常低效。那么,我們還是看一下JDK是如何實現size()方法的吧:
?
size方法主要思路是先在沒有鎖的情況下對所有段大小求和,這種求和策略最多執行RETRIES_BEFORE_LOCK次(默認是兩次):在沒有達到RETRIES_BEFORE_LOCK之前,求和操作會不斷嘗試執行(這是因為遍歷過程中可能有其它線程正在對已經遍歷過的段進行結構性更新);在超過RETRIES_BEFORE_LOCK之后,如果還不成功就在持有所有段鎖的情況下再對所有段大小求和。事實上,在累加count操作過程中,之前累加過的count發生變化的幾率非常小,所以ConcurrentHashMap的做法是先嘗試RETRIES_BEFORE_LOCK次通過不鎖住Segment的方式來統計各個Segment大小,如果統計的過程中,容器的count發生了變化,則再采用加鎖的方式來統計所有Segment的大小。
那么,ConcurrentHashMap是如何判斷在統計的時候容器的段發生了結構性更新了呢?我們在前文中已經知道,Segment包含一個modCount成員變量,在會引起段發生結構性改變的所有操作(put操作、 remove操作和clean操作)里,都會將變量modCount進行加1,因此,JDK只需要在統計size前后比較modCount是否發生變化就可以得知容器的大小是否發生變化。
至于ConcurrentHashMap的跨其他跨段操作,比如contains操作、containsValaue操作等,其與size操作的實現原理相類似,此不贅述。
總結
以上是生活随笔為你收集整理的Java集合篇:ConcurrentHashMap详解(JDK1.6)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java集合篇:HashMap原理详解(
- 下一篇: Java集合篇:ConcurrentHa