concurrent(七)ConcurrentHashMap源码分析
參考文檔:
https://www.cnblogs.com/xiaoxi/p/7474026.html
https://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/
ConcurrentHashMap的弱一致性
ConcurrentHashMap#clear
clear方法很簡(jiǎn)單,看下代碼即知
public void clear() {for (int i = 0; i < segments.length; ++i)segments[i].clear();}因?yàn)闆](méi)有全局的鎖,在清除完一個(gè)segments之后,正在清理下一個(gè)segments的時(shí)候,已經(jīng)清理segments可能又被加入了數(shù)據(jù),因此clear返回的時(shí)候,ConcurrentHashMap中是可能存在數(shù)據(jù)的。因此,clear方法是弱一致的
ConcurrentHashMap#get put
get是無(wú)鎖操作,如果一個(gè)線程正在刪除e3,一個(gè)線程正在讀取e3,那么這個(gè)線程可能會(huì)讀取到已經(jīng)刪除的e3元素
?
ConcurrentHashMap中的迭代器
ConcurrentHashMap中的迭代器主要包括entrySet、keySet、values方法。它們大同小異,這里選擇entrySet解釋。當(dāng)我們調(diào)用entrySet返回值的iterator方法時(shí),返回的是EntryIterator,在EntryIterator上調(diào)用next方法時(shí),最終實(shí)際調(diào)用到了HashIterator.advance()方法,看下這個(gè)方法:
final void advance() {if (nextEntry != null && (nextEntry = nextEntry.next) != null)return;while (nextTableIndex >= 0) {if ( (nextEntry = currentTable[nextTableIndex--]) != null)return;}while (nextSegmentIndex >= 0) {Segment<K,V> seg = segments[nextSegmentIndex--];if (seg.count != 0) {currentTable = seg.table;for (int j = currentTable.length - 1; j >= 0; --j) {if ( (nextEntry = currentTable[j]) != null) {nextTableIndex = j - 1;return;}}}}}這個(gè)方法在遍歷底層數(shù)組。在遍歷過(guò)程中,如果已經(jīng)遍歷的數(shù)組上的內(nèi)容變化了,迭代器不會(huì)拋出ConcurrentModificationException異常。如果未遍歷的數(shù)組上的內(nèi)容發(fā)生了變化,則有可能反映到迭代過(guò)程中。這就是ConcurrentHashMap迭代器弱一致的表現(xiàn)
?
jdk1.7的ConcurrentHashMap
ConcurrentHashMap的鎖分段技術(shù)
HashTable容器在競(jìng)爭(zhēng)激烈的并發(fā)環(huán)境下表現(xiàn)出效率低下的原因,是因?yàn)樗性L問(wèn)HashTable的線程都必須競(jìng)爭(zhēng)同一把鎖。那假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分?jǐn)?shù)據(jù),那么當(dāng)多線程訪問(wèn)容器里不同數(shù)據(jù)段的數(shù)據(jù)時(shí),線程間就不會(huì)存在鎖競(jìng)爭(zhēng),從而可以有效的提高并發(fā)訪問(wèn)效率,這就是ConcurrentHashMap所使用的鎖分段技術(shù),首先將數(shù)據(jù)分成一段一段的存儲(chǔ),然后給每一段數(shù)據(jù)配一把鎖,當(dāng)一個(gè)線程占用鎖訪問(wèn)其中一個(gè)段數(shù)據(jù)的時(shí)候,其他段的數(shù)據(jù)也能被其他線程訪問(wèn)。? 另外,ConcurrentHashMap可以做到讀取數(shù)據(jù)不加鎖,并且其內(nèi)部的結(jié)構(gòu)可以讓其在進(jìn)行寫操作的時(shí)候能夠?qū)㈡i的粒度保持地盡量地小,不用對(duì)整個(gè)ConcurrentHashMap加鎖。
ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap里扮演鎖的角色,HashEntry則用于存儲(chǔ)鍵值對(duì)數(shù)據(jù)。一個(gè)ConcurrentHashMap里包含一個(gè)Segment數(shù)組,Segment的結(jié)構(gòu)和HashMap類似,是一種數(shù)組和鏈表結(jié)構(gòu), 一個(gè)Segment里包含一個(gè)HashEntry數(shù)組,每個(gè)HashEntry是一個(gè)鏈表結(jié)構(gòu)的元素, 每個(gè)Segment守護(hù)著一個(gè)HashEntry數(shù)組里的元素,當(dāng)對(duì)HashEntry數(shù)組的數(shù)據(jù)進(jìn)行修改時(shí),必須首先獲得它對(duì)應(yīng)的Segment鎖
ConcurrentHashMap的內(nèi)部結(jié)構(gòu)
ConcurrentHashMap為了提高本身的并發(fā)能力,在內(nèi)部采用了一個(gè)叫做Segment的結(jié)構(gòu),一個(gè)Segment其實(shí)就是一個(gè)類Hash Table的結(jié)構(gòu),Segment內(nèi)部維護(hù)了一個(gè)鏈表數(shù)組,我們用下面這一幅圖來(lái)看下ConcurrentHashMap的內(nèi)部結(jié)構(gòu):
?
從上面的結(jié)構(gòu)我們可以了解到,ConcurrentHashMap定位一個(gè)元素的過(guò)程需要進(jìn)行兩次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的鏈表的頭部,因此,這一種結(jié)構(gòu)的帶來(lái)的副作用是Hash的過(guò)程要比普通的HashMap要長(zhǎng),但是帶來(lái)的好處是寫操作的時(shí)候可以只對(duì)元素所在的Segment進(jìn)行加鎖即可,不會(huì)影響到其他的Segment,這樣,在最理想的情況下,ConcurrentHashMap可以最高同時(shí)支持Segment數(shù)量大小的寫操作(剛好這些寫操作都非常平均地分布在所有的Segment上),所以,通過(guò)這一種結(jié)構(gòu),ConcurrentHashMap的并發(fā)能力可以大大的提高
Segment
我們?cè)賮?lái)具體了解一下Segment的數(shù)據(jù)結(jié)構(gòu):
static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile int count; transient int modCount; transient int threshold; transient volatile HashEntry<K,V>[] table; final float loadFactor; }詳細(xì)解釋一下Segment里面的成員變量的意義:
count:Segment中元素的數(shù)量
modCount:對(duì)table的大小造成影響的操作的數(shù)量(比如put或者remove操作)
threshold:閾值,Segment里面元素的數(shù)量超過(guò)這個(gè)值依舊就會(huì)對(duì)Segment進(jìn)行擴(kuò)容
table:鏈表數(shù)組,數(shù)組中的每一個(gè)元素代表了一個(gè)鏈表的頭部
loadFactor:負(fù)載因子,用于確定threshold
HashEntry
Segment中的元素是以HashEntry的形式存放在鏈表數(shù)組中的,看一下HashEntry的結(jié)構(gòu):
static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; }可以看到HashEntry的一個(gè)特點(diǎn),除了value以外,其他的幾個(gè)變量都是final的,這意味著不能從hash鏈的中間或尾部添加或刪除節(jié)點(diǎn),因?yàn)檫@需要修改next 引用值,所有的節(jié)點(diǎn)的修改只能從頭部開始。對(duì)于put操作,可以一律添加到Hash鏈的頭部。但是對(duì)于remove操作,可能需要從中間刪除一個(gè)節(jié)點(diǎn),這就需要將要?jiǎng)h除節(jié)點(diǎn)的前面所有節(jié)點(diǎn)整個(gè)復(fù)制一遍,最后一個(gè)節(jié)點(diǎn)指向要?jiǎng)h除結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn)。這在講解刪除操作時(shí)還會(huì)詳述。為了確保讀操作能夠看到最新的值,將value設(shè)置成volatile,這避免了加鎖
ConcurrentHashMap的初始化
下面我們來(lái)結(jié)合源代碼來(lái)具體分析一下ConcurrentHashMap的實(shí)現(xiàn),先看下初始化方法:
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFactor); }??? CurrentHashMap的初始化一共有三個(gè)參數(shù),一個(gè)initialCapacity,表示初始的容量,一個(gè)loadFactor,表示負(fù)載參數(shù),最后一個(gè)是concurrentLevel,與ConcurrentHashMap內(nèi)部的Segment的數(shù)量相關(guān),ConcurrentLevel一經(jīng)指定,不可改變,后續(xù)如果ConcurrentHashMap的元素?cái)?shù)量增加導(dǎo)致ConrruentHashMap需要擴(kuò)容,ConcurrentHashMap不會(huì)增加Segment的數(shù)量,而只會(huì)增加Segment中鏈表數(shù)組的容量大小,這樣的好處是擴(kuò)容過(guò)程不需要對(duì)整個(gè)ConcurrentHashMap做rehash,而只需要對(duì)Segment里面的元素做一次rehash就可以了
???? 整個(gè)ConcurrentHashMap的初始化方法還是非常簡(jiǎn)單的,先是根據(jù)concurrentLevel來(lái)new出Segment,這里Segment的數(shù)量是不大于concurrentLevel的最大的2的指數(shù),就是說(shuō)Segment的數(shù)量永遠(yuǎn)是2的指數(shù)個(gè),這樣的好處是方便采用移位操作來(lái)進(jìn)行hash,加快hash的過(guò)程。接下來(lái)就是根據(jù)intialCapacity確定Segment的容量的大小,每一個(gè)Segment的容量大小也是2的指數(shù),同樣使為了加快hash的過(guò)程
??? 這邊需要特別注意一下兩個(gè)變量,分別是segmentShift和segmentMask,這兩個(gè)變量在后面將會(huì)起到很大的作用,假設(shè)構(gòu)造函數(shù)確定了Segment的數(shù)量是2的n次方,那么segmentShift就等于32減去n,而segmentMask就等于2的n次方減一
ConcurrentHashMap的get操作
前面提到過(guò)ConcurrentHashMap的get操作是不用加鎖的,count和HashEntry.value 均為volatile 類型,我們這里看一下其實(shí)現(xiàn):
public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); }第二行,對(duì)hash值進(jìn)行了二次hash,之所以要進(jìn)行再哈希,其目的是為了減少哈希沖突,使元素能夠均勻的分布在不同的Segment上,從而提高容器的存取效率。
看第三行,segmentFor這個(gè)函數(shù)用于確定操作應(yīng)該在哪一個(gè)segment中進(jìn)行,幾乎對(duì)ConcurrentHashMap的所有操作都需要用到這個(gè)函數(shù),我們看下這個(gè)函數(shù)的實(shí)現(xiàn):
這個(gè)函數(shù)用了位操作來(lái)確定Segment,根據(jù)傳入的hash值向右無(wú)符號(hào)右移segmentShift位,然后和segmentMask進(jìn)行與操作,結(jié)合我們之前說(shuō)的segmentShift和segmentMask的值,就可以得出以下結(jié)論:假設(shè)Segment的數(shù)量是2的n次方,根據(jù)元素的hash值的高n位就可以確定元素到底在哪一個(gè)Segment中。
在確定了需要在哪一個(gè)segment中進(jìn)行操作以后,接下來(lái)的事情就是調(diào)用對(duì)應(yīng)的Segment的get方法:
先看第二行代碼,這里對(duì)count進(jìn)行了一次判斷,其中count表示Segment中元素的數(shù)量。我們可以來(lái)看一下count的定義:
transient volatile int count;可以看到count是volatile的,實(shí)際上這里面利用了volatile的語(yǔ)義:
對(duì)volatile字段的寫入操作happens-before于每一個(gè)后續(xù)的同一個(gè)字段的讀操作。因?yàn)閷?shí)際上put、remove等操作也會(huì)更新count的值,所以當(dāng)競(jìng)爭(zhēng)發(fā)生的時(shí)候,volatile的語(yǔ)義可以保證寫操作在讀操作之前,也就保證了寫操作對(duì)后續(xù)的讀操作都是可見的,這樣后面get的后續(xù)操作就可以拿到完整的元素內(nèi)容。
然后,在第三行,調(diào)用了getFirst()來(lái)取得鏈表的頭部:
同樣,這里也是用位操作來(lái)確定鏈表的頭部,hash值和HashTable的長(zhǎng)度減一做與操作,最后的結(jié)果就是hash值的低n位,其中n是HashTable的長(zhǎng)度以2為底的結(jié)果。
在確定了鏈表的頭部以后,就可以對(duì)整個(gè)鏈表進(jìn)行遍歷,看第4行,取出key對(duì)應(yīng)的value的值,如果拿出的value的值是null,則可能這個(gè)key,value對(duì)正在put的過(guò)程中,如果出現(xiàn)這種情況,那么就加鎖來(lái)保證取出的value是完整的,如果不是null,則直接返回value
ConcurrentHashMap的put操作
看完了get操作,再看下put操作,put操作的前面也是確定Segment的過(guò)程,這里不再贅述,直接看關(guān)鍵的segment的put方法:
V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; if (c++ > threshold) // ensure capacity rehash(); HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { oldValue = e.value; if (!onlyIfAbsent) e.value = value; } else { oldValue = null; ++modCount; tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // write-volatile } return oldValue; } finally { unlock(); } }首先對(duì)Segment的put操作是加鎖完成的,然后在第五行,如果Segment中元素的數(shù)量超過(guò)了閾值(由構(gòu)造函數(shù)中的loadFactor算出)這需要進(jìn)行對(duì)Segment擴(kuò)容,并且要進(jìn)行rehash,關(guān)于rehash的過(guò)程大家可以自己去了解,這里不詳細(xì)講了
while循環(huán)是在鏈表中尋找和要put的元素相同key的元素,如果找到,就直接更新更新key的value,如果沒(méi)有找到,生成一個(gè)新的HashEntry并且把它加到整個(gè)Segment的頭部,然后再更新count的值。該方法也是在持有段鎖(鎖定整個(gè)segment)的情況下執(zhí)行的,這當(dāng)然是為了并發(fā)的安全,修改數(shù)據(jù)是不能并發(fā)進(jìn)行的,必須得有個(gè)判斷是否超限的語(yǔ)句以確保容量不足時(shí)能夠rehash。接著是找是否存在同樣一個(gè)key的結(jié)點(diǎn),如果存在就直接替換這個(gè)結(jié)點(diǎn)的值。否則創(chuàng)建一個(gè)新的結(jié)點(diǎn)并添加到hash鏈的頭部,這時(shí)一定要修改modCount和count的值,同樣修改count的值一定要放在最后一步。put方法調(diào)用了rehash方法,reash方法實(shí)現(xiàn)得也很精巧,主要利用了table的大小為2^n,這里就不介紹了。而比較難懂的是這句int index = hash & (tab.length - 1),原來(lái)segment里面才是真正的hashtable,即每個(gè)segment是一個(gè)傳統(tǒng)意義上的hashtable,如上圖,從兩者的結(jié)構(gòu)就可以看出區(qū)別,這里就是找出需要的entry在table的哪一個(gè)位置,之后得到的entry就是這個(gè)鏈的第一個(gè)節(jié)點(diǎn),如果e!=null,說(shuō)明找到了,這是就要替換節(jié)點(diǎn)的值(onlyIfAbsent == false),否則,我們需要new一個(gè)entry,它的后繼是first,而讓tab[index]指向它,什么意思呢?實(shí)際上就是將這個(gè)新entry插入到鏈頭,剩下的就非常容易理解了
ConcurrentHashMap的remove操作
Remove操作的前面一部分和前面的get和put操作一樣,都是定位Segment的過(guò)程,然后再調(diào)用Segment的remove方法:
V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } }首先remove操作也是確定需要?jiǎng)h除的元素的位置,不過(guò)這里刪除元素的方法不是簡(jiǎn)單地把待刪除元素的前面的一個(gè)元素的next指向后面一個(gè)就完事了,我們之前已經(jīng)說(shuō)過(guò)HashEntry中的next是final的,一經(jīng)賦值以后就不可修改,在定位到待刪除元素的位置以后,程序就將待刪除元素前面的那一些元素依次復(fù)制一遍,然后再依次重新接到鏈表上去,看一下下面這一幅圖來(lái)了解這個(gè)過(guò)程:
假設(shè)鏈表中原來(lái)的元素如上圖所示,現(xiàn)在要?jiǎng)h除元素3,分別復(fù)制entry1,指向entry4,復(fù)制entry2,指向entry1.那么刪除元素3以后的鏈表就如下圖所示:
整個(gè)操作是在持有段鎖的情況下執(zhí)行的,空白行之前(第11行之前)的行主要是定位到要?jiǎng)h除的節(jié)點(diǎn)e。接下來(lái),如果不存在這個(gè)節(jié)點(diǎn)就直接返回null,否則就要將e前面的結(jié)點(diǎn)復(fù)制一遍,尾結(jié)點(diǎn)指向e的下一個(gè)結(jié)點(diǎn)。e后面的結(jié)點(diǎn)不需要復(fù)制,它們可以重用。
中間那個(gè)for循環(huán)是做什么用的呢?(第22行)從代碼來(lái)看,就是將定位之后的所有entry克隆并拼回前面去,但有必要嗎?每次刪除一個(gè)元素就要將那之前的元素克隆一遍?這點(diǎn)其實(shí)是由entry的不變性來(lái)決定的,仔細(xì)觀察entry定義,發(fā)現(xiàn)除了value,其他所有屬性都是用final來(lái)修飾的,這意味著在第一次設(shè)置了next域之后便不能再改變它,取而代之的是將它之前的節(jié)點(diǎn)全都克隆一次。至于entry為什么要設(shè)置為不變性,這跟不變性的訪問(wèn)不需要同步從而節(jié)省時(shí)間有關(guān)。
整個(gè)remove實(shí)現(xiàn)并不復(fù)雜,但是需要注意如下幾點(diǎn)。
第一,當(dāng)要?jiǎng)h除的結(jié)點(diǎn)存在時(shí),刪除的最后一步操作要將count的值減一。這必須是最后一步操作,否則讀取操作可能看不到之前對(duì)段所做的結(jié)構(gòu)性修改。
第二,remove執(zhí)行的開始就將table賦給一個(gè)局部變量tab,這是因?yàn)閠able是 volatile變量,讀寫volatile變量的開銷很大。編譯器也不能對(duì)volatile變量的讀寫做任何優(yōu)化,直接多次訪問(wèn)非volatile實(shí)例變量沒(méi)有多大影響,編譯器會(huì)做相應(yīng)優(yōu)化
?ConcurrentHashMap的size操作
通過(guò)兩次統(tǒng)計(jì)每個(gè)segment中count(每個(gè)segment的元素總和,volatile類型)的和,如果兩次結(jié)果一致,則返回。如果不一致則加鎖重新計(jì)算size
jdk1.8的ConcurrentHashMap
轉(zhuǎn)載于:https://www.cnblogs.com/amei0/p/8657368.html
總結(jié)
以上是生活随笔為你收集整理的concurrent(七)ConcurrentHashMap源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: QT + OpenCV + MinGW
- 下一篇: CASIO 5800P计算器游戏--猜数