ConcurrentHashMap源码解析(2)
此文已由作者趙計剛授權網易云社區發布。
歡迎訪問網易云社區,了解更多網易技術產品運營經驗。
ConcurrentHashMap()
????/***?創建ConcurrentHashMap*/public?ConcurrentHashMap()?{this(DEFAULT_INITIAL_CAPACITY,?//?16DEFAULT_LOAD_FACTOR,?//?0.75fDEFAULT_CONCURRENCY_LEVEL);//?16}該方法調用了上邊的三參構造器。
五點注意:
傳入的concurrencyLevel只是用于計算Segment數組的大小(可以傳入不是2的幾次方的數,但是根據下邊的計算,最終segment數組的大小ssize將是2的幾次方的數),并非真正的Segment數組的大小
傳入的initialCapacity只是用于計算Segment數組中的每一個segment的HashEntry[]的容量, 但是并不是每一個segment的HashEntry[]的容量,而每一個HashEntry[]的容量不是2的幾次方
非常值得注意的是,在默認情況下,創建出的HashEntry[]數組的容量為1,并不是傳入的initialCapacity(16),證實了上一點;而每一個Segment的擴容因子threshold,一開始算出來是0,即開始put第一個元素就要擴容,不太理解JDK為什么這樣做。
想要在初始化時擴大HashEntry[]的容量,可以指定initialCapacity參數,且指定時最好指定為2的幾次方的一個數,這樣的話,在代碼執行中可能會少執行一句"c++",具體參看三參構造器的注釋
對于Concurrenthashmap的擴容而言,只會擴當前的Segment,而不是整個Concurrenthashmap中的所有Segment都擴
兩點改進:
在Concurrenthashmap的構造過程中,相對于JDK的代碼,有兩點改進:
在遍歷Segment數組為每一個數組元素實例化的時候,可以直接寫作i<ssize,而不必在每次循環時都去計算this.segments.length,JDK代碼如下,可以按照代碼中的注釋做優化
????????for?(int?i?=?0;?i?<?this.segments.length;?++i)//?這一塊this.segments.length就是ssize,為了不去計算這個值,可以直接改成i<ssizethis.segments[i]?=?new?Segment<K,?V>(cap,?loadFactor);另外一個,就是在程序中,盡量少用小寫"l",容易與數字"1"混淆,要么不用"l",若要用的話,就用大寫"L",JDK代碼如下,可參照注釋進行優化:
????????/***?這里要注意一個很不好的編程習慣,就是小寫l,容易與數字1混淆,所以最好不要用小寫l,可以改為大寫L*/Segment(int?initialCapacity,?float?lf)?{loadFactor?=?lf;//每個Segment的加載因子setTable(HashEntry.<K,?V>?newArray(initialCapacity));}
一個疑問:
在默認情況下,創建出的HashEntry[]數組的容量為1,而每一個Segment的擴容因子threshold,一開始算出來是0,即開始put第一個元素就要擴容,不太理解JDK為什么這樣做。在我們實際開發中,其實空間有的是,所以我們一般會采用"以適當的空間換取時間"的方式,所以我們會適當的擴大HashEntry[],以確保在put數據的時候盡量減少擴容才對,但是JDK這樣做到底是為了什么?是為了減少空間嗎?還是我本身的理解就有問題?求大神指點!!!
注意我上邊說的適當容量,是因為如果容量設的太大,可能會導致某個HashEntry[i]中的HashEntry鏈表過長,進而影響查詢的效率,容量設的太小的話,有需要不斷擴容,影響插入效率。
3、put(Object key, Object value)
上述方法,若添加已有key的key-value對,則新值覆蓋舊值。
putIfAbsent(K key, V value):若添加已有key的key-value對,直接返回舊值,則新值相當于沒有添加。
使用方法:
map.put("hello",?"world");源代碼:
ConcurrentHashMap的put(Object key, Object value)方法?
????/***?將key-value放入map*?注意:key和value都不可以為空*?步驟:*?1)計算key.hashCode()的hash值*?2)根據hash值定位到某個Segment*?3)調用Segment的put()方法*?Segment的put()方法:*?1)上鎖*?2)從主內存中讀取key-value對個數count*?3)count+1如果大于threshold,執行rehash()*?4)計算將要插入的HashEntry[]的下標index*?5)獲取HashEntry的頭節點HashEntry[index]-->first*?6)從頭結點開始遍歷整個HashEntry鏈表,*?6.1)若找到與key和hash相同的節點,則判斷onlyIfAbsent如果為false,新值覆蓋舊值,返回舊值;如果為true,則直接返回舊值(相當于不添加重復key的元素)*?6.2)若沒有找到與key和hash相同的節點,則創建新節點HashEntry,并將之前的有節點作為新節點的next,即將新節點放入鏈頭,然后將新節點賦值給HashEntry[index],將count強制寫入主內存,最后返回null*/public?V?put(K?key,?V?value)?{if?(key?==?null?||?value?==?null)throw?new?NullPointerException();int?hash?=?hash(key.hashCode());//計算key.hashCode()的hash值/***?根據hash值定位到某個Segment,調用Segment的put()方法*/return?segmentFor(hash).put(key,?hash,?value,?false);}?注意:
key和value都不可為null,這一點與HashMap不同,但是從代碼來看,并沒有判斷key為空的情況,這一段代碼在哪里呢?為了可讀性,建議將判斷的地方改為如下代碼
????????if?(key?==?null?||?value?==?null)throw?new?NullPointerException();注釋部分寫明了整個插入流程,詳細的流程步驟見代碼,這里列出大致流程
根據key獲取key.hashCode的hash值
根據hash值算出將要插入的Segment
根據hash值與Segment中的HashEntry的容量-1按位與獲取將要插入的HashEntry的index
若HashEntry[index]中的HashEntry鏈表有與插入元素相同的key和hash值,根據onlyIfAbsent決定是否替換舊值
若沒有相同的key和hash,直接返回將新節點插入鏈頭,原來的頭節點設為新節點的next(采用的方式與HashMap一致,都是HashEntry替換的方法)
?
Segment的put(K key, int hash, V value, boolean onlyIfAbsent)
????????/***?往當前segment中添加key-value*?注意:*?1)onlyIfAbsent-->false如果有舊值存在,新值覆蓋舊值,返回舊值;true如果有舊值存在,則直接返回舊值,相當于不添加元素(不可添加重復key的元素)*?2)ReentrantLock的用法*?3)volatile只能配合鎖去使用才能實現原子性*/V?put(K?key,?int?hash,?V?value,?boolean?onlyIfAbsent)?{lock();//加鎖:ReentrantLocktry?{int?c?=?count;//當前Segment中的key-value對(注意:由于count是volatile型的,所以讀的時候工作內存會從主內存重新加載count值)if?(c++?>?threshold)?//?需要擴容rehash();//擴容HashEntry<K,?V>[]?tab?=?table;int?index?=?hash?&?(tab.length?-?1);//按位與獲取數組下標:與HashMap相同HashEntry<K,?V>?first?=?tab[index];//獲取相應的HashEntry[i]中的頭節點HashEntry<K,?V>?e?=?first;//一直遍歷到與插入節點的hash和key相同的節點e;若沒有,最后e==nullwhile?(e?!=?null?&&?(e.hash?!=?hash?||?!key.equals(e.key)))e?=?e.next;V?oldValue;//舊值if?(e?!=?null)?{//table中已經有與將要插入節點相同hash和key的節點oldValue?=?e.value;//獲取舊值if?(!onlyIfAbsent)e.value?=?value;//false?覆蓋舊值??true的話,就不添加元素了}?else?{//table中沒有與將要插入節點相同hash或key的節點oldValue?=?null;++modCount;tab[index]?=?new?HashEntry<K,?V>(key,?hash,?first,?value);//將頭節點作為新節點的next,所以新加入的元素也是添加在鏈頭count?=?c;?//設置key-value對(注意:由于count是volatile型的,所以寫的時候工作內存會立即向主內存重新寫入count值)}return?oldValue;}?finally?{unlock();//手工釋放鎖}}注意:在注釋中已經寫明了,這里還是要寫一下
onlyIfAbsent-->false如果有舊值存在,新值覆蓋舊值,返回舊值;true如果有舊值存在,則直接返回舊值,相當于不添加元素
ReentrantLock的用法:必須手工釋放鎖。可實現Synchronized的效果,原子性。
volatile需要配合鎖去使用才能實現原子性,否則在多線程操作的情況下依然不夠用,在程序中,count變量(當前Segment中的key-value對個數)通過volatile修飾,實現內存可見性(關于內存可見性以后會仔細去記錄,這里列出大概的一個流程)在有鎖保證了原子性的情況下
當我們讀取count變量的時候,會強制從主內存中讀取count的最新值
當我們對count變量進行賦值之后,會強制將最新的count值刷到主內存中去
通過以上兩點,我們可以保證在高并發的情況下,執行這段流程的線程可以讀取到最新值
在這里的ReentrantLock與volatile結合的用法值得我們學習
?補:volatile的介紹見《附2 volatile》,鏈接如下:
http://www.cnblogs.com/java-zhao/p/5125698.html
hash(int h)
????/***?對key.hashCode()進行hash計算*?@param?h?key.hashCode()*/private?static?int?hash(int?h)?{//?Spread?bits?to?regularize?both?segment?and?index?locations,//?using?variant?of?single-word?Wang/Jenkins?hash.h?+=?(h?<<?15)?^?0xffffcd7d;h?^=?(h?>>>?10);h?+=?(h?<<?3);h?^=?(h?>>>?6);h?+=?(h?<<?2)?+?(h?<<?14);return?h?^?(h?>>>?16);}segmentFor(int hash)
????/***?根據給定的key的hash值定位到一個Segment*?@param?hash*/final?Segment<K,?V>?segmentFor(int?hash)?{return?segments[(hash?>>>?segmentShift)?&?segmentMask];}注意:hash(int h)與segmentFor(int hash)這兩個方法應該會盡量將key的hash值打散,從而保證盡可能多的同時在多個Segment上進行put操作,而不是在同一個Segment上執行多個put操作,這樣之后,在同一個Segment中,要盡可能的保證向HashEntry[]的不同元素上進行put,而不是向同一個元素上一直put,以上兩個函數究竟是怎樣保證實現這樣的將hash打散的效果呢?求大神指點啊!!!
rehash()
JDK的實現代碼:
????????/***?步驟:*?需要注意的是:同一個桶下邊的HashEntry鏈表中的每一個元素的hash值不一定相同,只是hash&(table.length-1)的結果相同*?1)創建一個新的HashEntry數組,容量為舊數組的二倍*?2)計算新的threshold*?3)遍歷舊數組的每一個元素,對于每一個元素*?3.1)根據頭節點e重新計算將要存入的新數組的索引idx*?3.2)若整個鏈表只有一個節點e,則是直接將e賦給newTable[idx]即可*?3.3)若整個鏈表還有其他節點,先算出最后一個節點lastRun的位置lastIdx,并將最后一個節點賦值給newTable[lastIdx]*?3.4)最后將從頭節點開始到最后一個節點之前的所有節點計算其將要存儲的索引k,然后創建新節點,將新節點賦給newTable[k],并將之前newTable[k]上存在的節點作為新節點的下一節點*/void?rehash()?{HashEntry<K,?V>[]?oldTable?=?table;int?oldCapacity?=?oldTable.length;if?(oldCapacity?>=?MAXIMUM_CAPACITY)return;HashEntry<K,?V>[]?newTable?=?HashEntry.newArray(oldCapacity?<<?1);//擴容為原來二倍threshold?=?(int)?(newTable.length?*?loadFactor);//計算新的擴容臨界值int?sizeMask?=?newTable.length?-?1;for?(int?i?=?0;?i?<?oldCapacity;?i++)?{//?We?need?to?guarantee?that?any?existing?reads?of?old?Map?can//?proceed.?So?we?cannot?yet?null?out?each?bin.HashEntry<K,?V>?e?=?oldTable[i];//頭節點if?(e?!=?null)?{HashEntry<K,?V>?next?=?e.next;int?idx?=?e.hash?&?sizeMask;//重新按位與計算將要存放的新數組中的索引if?(next?==?null)//如果是只有一個頭節點,只需將頭節點設置到newTable[idx]即可newTable[idx]?=?e;else?{//?Reuse?trailing?consecutive?sequence?at?same?slotHashEntry<K,?V>?lastRun?=?e;int?lastIdx?=?idx;//存放最后一個元素將要存儲的數組索引//查找到最后一個元素,并設置相關信息for?(HashEntry<K,?V>?last?=?next;?last?!=?null;?last?=?last.next)?{int?k?=?last.hash?&?sizeMask;if?(k?!=?lastIdx)?{lastIdx?=?k;lastRun?=?last;}}newTable[lastIdx]?=?lastRun;//存放最后一個元素//?Clone?all?remaining?nodesfor?(HashEntry<K,?V>?p?=?e;?p?!=?lastRun;?p?=?p.next)?{int?k?=?p.hash?&?sizeMask;HashEntry<K,?V>?n?=?newTable[k];//獲取newTable[k]已經存在的HashEntry,并將此HashEntry賦給n//創建新節點,并將之前的n作為新節點的下一節點newTable[k]?=?new?HashEntry<K,?V>(p.key,?p.hash,?n,p.value);}}}}table?=?newTable;}個人感覺JDK的實現方式比較拖沓,改造后的代碼如下,如有問題,請指出!!!
我對其進行改造后的實現代碼:
????????/***?步驟:*?需要注意的是:同一個桶下邊的HashEntry鏈表中的每一個元素的hash值不一定相同,只是hash&(table.length-1)的結果相同*?1)創建一個新的HashEntry數組,容量為舊數組的二倍*?2)計算新的threshold*?3)遍歷舊數組的每一個元素,對于每一個元素(即一個鏈表)*?3.1)獲取頭節點e*?3.2)從頭節點開始到最后一個節點(null之前的那個節點)的所有節點計算其將要存儲的索引k,然后創建新節點,將新節點賦給newTable[k],并將之前newTable[k]上存在的節點作為新節點的下一節點*/void?rehash()?{HashEntry<K,?V>[]?oldTable?=?table;int?oldCapacity?=?oldTable.length;if?(oldCapacity?>=?MAXIMUM_CAPACITY)return;HashEntry<K,?V>[]?newTable?=?HashEntry.newArray(oldCapacity?<<?1);//擴容為原來二倍threshold?=?(int)?(newTable.length?*?loadFactor);//計算新的擴容臨界值int?sizeMask?=?newTable.length?-?1;for?(int?i?=?0;?i?<?oldCapacity;?i++)?{//遍歷每一個數組元素//?We?need?to?guarantee?that?any?existing?reads?of?old?Map?can//?proceed.?So?we?cannot?yet?null?out?each?bin.HashEntry<K,?V>?e?=?oldTable[i];//頭節點if?(e?!=?null)?{for?(HashEntry<K,?V>?p?=?e;?p?!=?null;?p?=?p.next)?{//遍歷數組元素中的鏈表int?k?=?p.hash?&?sizeMask;HashEntry<K,?V>?n?=?newTable[k];//獲取newTable[k]已經存在的HashEntry,并將此HashEntry賦給n//創建新節點,并將之前的n作為新節點的下一節點newTable[k]?=?new?HashEntry<K,?V>(p.key,?p.hash,?n,p.value);}}}table?=?newTable;}注意點:
同一個桶下邊的HashEntry鏈表中的每一個元素的hash值不一定相同,只是index = hash&(table.length-1)的結果相同,當table.length發生變化時,同一個桶下各個HashEntry算出來的index會不同。
總結:ConcurrentHashMap基于concurrencyLevel劃分出多個Segment來存儲key-value,這樣的話put的時候只鎖住當前的Segment,可以避免put的時候鎖住整個map,從而減少了并發時的阻塞現象。
免費領取驗證碼、內容安全、短信發送、直播點播體驗包及云服務器等套餐
更多網易技術、產品、運營經驗分享請點擊。
相關文章:
【推薦】?Google準實時數據倉庫Mesa(一)
【推薦】?21分鐘學會寫編譯器
轉載于:https://www.cnblogs.com/zyfd/p/10144424.html
總結
以上是生活随笔為你收集整理的ConcurrentHashMap源码解析(2)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 洛谷 P4706 取石子 解题报告
- 下一篇: 定时器 Quartz