详解 ConcurrentHashMap
文章目錄
- ConcurrentHashMap 的底層數(shù)據結構?
- ConcurrentHashMap 的帶參構造方法的流程?
- ConcurrentHashMap 的 put 方法的流程?
- ConcurrentHashMap addCount 方法的流程是怎樣的呢?
- ConcurrentHashMap transfer 方法的流程是怎樣的呢?
- ConcurrentHashMap helpTransfer 方法的流程是怎樣的呢?
- ConcurrentHashMap 的 get 方法的流程?
- ConcurrentHashMap 的 sizeCtl 的含義,以及值的流轉過程?
- ConcurrentHashMap 的 size 方法的流程?
- 其他
- 如果 ConcurrentHashMap 的某個數(shù)組下標位置是一顆紅黑樹,那么這個位置上的節(jié)點類型是 TreeNode 嗎?
- 為什么要用 TreeBin 對象作為這個位置上的節(jié)點,而不是 TreeNode 對象呢?
- ConcurrentHashMap 的 size 方法會返回最新的值嗎?
- transferIndex 的真正含義
- ConcurrentHashMap 總結
- put 方法流程總結
- ConcurrentHashMap 的元素數(shù)量計數(shù)
- ConcurrentHashMap 的擴容操作
- ConcurrentHashMap 的設計思想總結
- 大量的無鎖并發(fā)安全處理操作
- 細化臨界資源粒度
- 高效的擴容機制
- 高效的狀態(tài)管理機制
ConcurrentHashMap 的底層數(shù)據結構?
ConcurrentHashMap 的底層數(shù)據結構是 Node 數(shù)組。Node 類的定義如下:
static class Node<K,V> implements Map.Entry<K,V> {//節(jié)點的 hash 值final int hash;//節(jié)點的 key 值final K key;//節(jié)點的 value 值volatile V val;//后繼節(jié)點volatile Node<K,V> next; }其中,元素的 key 和 value 均不能為空。
ConcurrentHashMap 的帶參構造方法的流程?
- 判斷傳入的初始容量是否合法,小于 0 將拋出異常
- 判斷是否傳入的初始容量大于最大值(2^30 次方)的一半,如果是,則將容量設置為最大值
- 否則將容量設置為大于傳入的初始容量的最小的 2 的整數(shù)次冪
- 將 sizeCtl 參數(shù)賦值為初始容量
ConcurrentHashMap 的 put 方法的流程?
ConcurrentHashMap 的 put 方法流程如下:
- 首先檢查 key 和 value 是否為空,如果為空,則直接拋出空指針異常
- 其次調用 spread 方法計算 hash 值
- 將 key 的 hashcode 往右移 16 位,跟原 hashcode 值做異或運算
- 將異或運算得到的結果,跟 HASH_BITS (HASH_BITS = 0x7fffffff,換算成二進制有 31 個 1)做與運算得到最終結果
- 判斷數(shù)組是否為空,如果數(shù)組為空,則執(zhí)行初始化方法
- 當表為空時,一直執(zhí)行循環(huán)
- 完成構造方法后,sizeCtl 參數(shù)要么等于 0,(即使用的無參構造器),要么等于初始容量大小,(使用的指定了初始容量的構造器)
- 當 sizeCtl 為負數(shù)時,即表正在被其他線程初始化或者正在被其他線程擴容時,調用 Thread.yield 方法主動讓出 cpu 執(zhí)行權(即等待其他線程完成初始化或表擴容的操作)
- 當 sizeCtl 不為負數(shù)時,使用 CAS 將 sizeCtl 的值設置為 -1
- 再次判斷表是否為空
- 如果表不為空,則說明表已經被其他線程初始化完成,則直接跳出循環(huán)
- 如果表為空,判斷是否指定了初始容量,如果指定了初始化容量,則使用指定的數(shù)值作為初始化容量;如果沒有指定初始容量,則使用默認容量 16
- 初始化一個大小為上一步中得到的容量的 Node 數(shù)組
- 將 sizeCtl 的值設置為容量的 0.75(可類比于 HashMap 中的擴容閾值)
- 根據 hash 跟數(shù)組長度 - 1進行與運算后,得到元素在數(shù)組中的下標,并檢查該下標位置是否存在元素
- 如果該下標位置不存在元素,則用 CAS 對該下標位置進行賦值,如果賦值成功,則跳出循環(huán)
- 如果 CAS 操作失敗,則繼續(xù)循環(huán)
- 如果數(shù)組該下標位置存在元素(以下簡稱該元素為 f),則檢查 f 的 hash 值是否等于 -1(當元素的 hash 值為 -1 時,代表該數(shù)組正在進行擴容),即 MOVED
- 如果是,則說明其他線程正在進行擴容,則執(zhí)行 helpTransfer 方法協(xié)助完成擴容操作
- 否則,開始對該數(shù)組下標位置上的桶中的元素進行遍歷比較
- 首先使用 synchronized 關鍵字對 f 進行加鎖
- 加鎖成功,則重新獲取一遍該數(shù)組下標位置上的元素,判斷其與 f 是否相等,即判斷 f 是否發(fā)生了變化,如果發(fā)生了變化,則直接進入下一次循環(huán)
- 如果沒有發(fā)生變化,則判斷 f 的 hash 值是否大于等于 0
- 如果大于等于 0,則說明是鏈表結構,則遍歷鏈表,將 binCount 值賦為 1,每次遍歷都將 binCount +1
- 使用 key 的 equals 方法逐一比對元素,如果該 key 不存在,則將待插入元素加入到鏈表的尾部
- 如果存在該 key,則根據 onlyIfAbsent 參數(shù)來判斷是否需要將舊 value 值進行覆蓋
- 如果 f 的 hash 值小于 0 ,則判斷 f 是否是 TreeBin 類型的元素
- 如果是,將 binCount 值賦為 2,將待插入元素插入到紅黑樹中
- 如果紅黑樹插入失敗,則說明存在該 key,則根據 onlyIfAbsent 參數(shù)來判斷是否需要將舊 value 值進行覆蓋
- 判斷 binCount 的值是否不等于 0,即是否進行了紅黑樹和鏈表的查找過程
- 如果不等于 0,則判斷鏈表是否需要轉化成紅黑樹,當鏈表上的元素個數(shù)大于 8(即在插入第 9 個元素時),且數(shù)組的長度大于 64 時,將鏈表轉化成紅黑樹
- 轉化成紅黑樹后,將該數(shù)組下標位置上的元素使用 CAS 替換成 TreeBin 類型的元素
- 如果替換了舊值,則將舊值返回
- 執(zhí)行 addCount 方法,即嘗試將元素數(shù)量 +1
結合源碼來看:
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null; }ConcurrentHashMap addCount 方法的流程是怎樣的呢?
addCount 方法,即嘗試將當前元素數(shù)量自增的方法,其主要的流程如下:
- 首先判斷 counterCells 是否不為空
- 或者嘗試使用 CAS 對 baseCount 屬性進行增加的時候是否失敗
- 如果滿足上面的條件
- 繼續(xù)判斷 counterCells 是否為空
- 如果 counterCells 不為空,則調用 ThreadLocalRandom.getProbe 方法生成一個隨機數(shù),跟 countCells.length-1 進行與操作之后,得到 counterCells 數(shù)組的下標,判斷 counterCells 該下標位置上的元素是否為空(即得到一個沒有線程正在占用的)
- 如果上述條件都不滿足,則使用 CAS 將 counterCells 下標位置的 value 值進行增加,判斷 CAS 操作是否失敗
- 如果上述任一條件滿足,說明已經發(fā)生了線程間的競爭,則調用 fullAddCount 方法進行 counterCells 內部的自增操作
- 如果上述所有條件都不滿足,說明對于 countCells 下標位置的 value 值進行 CAS 增加的操作成功了
- 如果 check 參數(shù)小于等于 1,則直接返回
- 否則,調用 sumCount 方法統(tǒng)計一下當前數(shù)組中的元素數(shù)量
- sumCount 方法,就是簡單地將 baseCount 的值和所有 counterCells 數(shù)組的所有元素的 value 值求和,此方法沒有加鎖,同步措施主要依靠 baseCount 和 CounterCell 的 value 屬性都是用 volatile 關鍵字來修飾的。
- 檢查 check 變量是否大于等于 0
- 如果大于等于 0,說明需要檢查是否要進行擴容
- 判斷當前元素數(shù)量是否大于 sizeCtl 參數(shù),且表不為空,且表的長度小于最大長度時,此時說明需要擴容,則進入循環(huán)
- 首先計算擴容戳(即計算當前表長度數(shù)值的最高非 0 位前的 0 的個數(shù),跟 2152^{15}215 進行或運算)
- 接下來判斷 sizeCtl 是否小于 0
- 如果小于 0 代表數(shù)組正在擴容,即有線程正在對數(shù)組進行擴容
- 判斷 sizeCtl 往右移 16 位后是否不等于 擴容戳
- 判斷 nextTable 屬性是否等于 0
- 判斷 transferIndex 是否小于等于 0
- 如果上述 3 個條件任一成立,代表數(shù)組已經被其他線程擴容完成,則直接返回
- 如果上述 3 個條件都不成立,則嘗試使用 CAS 對 sizeCtl 進行 +1
- 如果 CAS 成功,代表該線程開始執(zhí)行協(xié)助擴容操作,參與擴容的線程數(shù)(sizeCtl 參數(shù)的低 16 位)+1,則開始執(zhí)行協(xié)助擴容
- 如果 sizeCtl 不小于 0,則嘗試使用 CAS 對 sizeCtl 的值修改成擴容戳左移 16 位且 +2
- 如果 CAS 成功,則執(zhí)行初始化擴容操作(此前沒有其他線程在對數(shù)組進行擴容)
- 重新計算當前元素數(shù)量(調用 sumCount 方法)后進入下次循環(huán)
- 如果小于 0 代表數(shù)組正在擴容,即有線程正在對數(shù)組進行擴容
結合源碼來看:
private final void addCount(long x, int check) {CounterCell[] as; long b, s;if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}} }ConcurrentHashMap transfer 方法的流程是怎樣的呢?
ConcurrentHashMap 的 transfer 方法,即為擴容方法,其主要的流程如下:
- 首先,需要通過 CPU 核心數(shù)確定每個線程需要處理的桶的數(shù)量 stride,最小為 16
- 如果 nextTable 屬性為空,則說明正在執(zhí)行初始化擴容,則新建一個原數(shù)組長度兩倍的新數(shù)組,并賦值給 nextTable,并將 nextTransferIndex 屬性賦值為原數(shù)組長度
- 創(chuàng)建一個 FowardingNode 類型的節(jié)點,此類節(jié)點的 hash 值為 -1,其中有一個 nextTab 屬性,記錄的就是擴容時的新數(shù)組
- 根據 transferIndex 與 stride 的值,嘗試使用 CAS 將 transferIndex 的值修改為 transferIndex - stride,這一步是確定當前線程要處理的桶的范圍,即當前線程要處理的數(shù)組下標范圍是 [transferIndex - stride,transferIndex) 這個區(qū)間內的所有桶
- 分配到需要處理的桶的范圍后,從右到左逆序遍歷這個范圍中的每一個桶,遍歷的下標為 i
- 判斷位置為 i 上的這個節(jié)點是否為空,如果為空,則嘗試使用 CAS 將這個位置上的節(jié)點修改成創(chuàng)建好的 FowardingNode 節(jié)點
- 如果這個節(jié)點不為空,那么判斷這個節(jié)點的 hash 值是否等于 -1,如果是,代表這個節(jié)點是 ForwardingNode 類型的節(jié)點,則不予處理
- 否則說明這個節(jié)點上的元素還沒有被遷移,則開始遷移這個桶中的所有節(jié)點
- 首先對這個節(jié)點使用 synchronized 進行加鎖
- 加鎖成功后,判斷這個節(jié)點有沒有被改變
- 如果沒被改變,則判斷這個節(jié)點的 hash 值是否大于 0
- 如果大于 0,則說明這個節(jié)點是鏈表的頭節(jié)點,則開始對鏈表進行遷移
- 首先,遍歷鏈表,計算每一個節(jié)點的 runBit ,其計算方式就是將節(jié)點的 hash 值與原數(shù)組長度進行與運算,計算結果只有兩種
- 如果 runBit 的值為 0,則說明節(jié)點在新數(shù)組中的位置等于原來的下標位置
- 如果 runBit 的值不為 0,則說明節(jié)點在新數(shù)組中的位置等于原來的下標 + 原數(shù)組的長度位置
- 找到最后一個與前驅節(jié)點的 runBit 值不相等的節(jié)點 lastRun,最后的 runBit 值等于 lastRun 節(jié)點的 runBit
- lastRun 節(jié)點的含義,就是在鏈表中找到一個其后續(xù)節(jié)點的 runBit 值都相等的節(jié)點,在發(fā)生遷移的時候,只需要移動這個 lastRun 節(jié)點,就可以完成其后續(xù)所有節(jié)點的遷移
- 如果最后的 runBit 等于 0,則將 lastRun 賦值給低位鏈表頭節(jié)點 ln;如果最后的 runBit 不等于 0,則賦值給高位鏈表頭節(jié)點 hn
- 從頭遍歷鏈表,直到找到 lastRun 的位置停止,根據 runBit 值的不同,使用頭插法將元素插入到低位鏈表中,或者高位鏈表中
- 使用 CAS 將新數(shù)組的 i 的位置上的元素賦值為低位鏈表頭節(jié)點 ln
- 使用 CAS 將新數(shù)組的 i + 原數(shù)組長度 的位置上的元素賦值為高位鏈表頭節(jié)點 hn
- 使用 CAS 將原數(shù)組的 i 位置上的元素賦值為創(chuàng)建好的 ForwardingNode 節(jié)點
- 首先,遍歷鏈表,計算每一個節(jié)點的 runBit ,其計算方式就是將節(jié)點的 hash 值與原數(shù)組長度進行與運算,計算結果只有兩種
- 如果原數(shù)組 i 上的元素是 TreeBin 類型,則執(zhí)行紅黑樹的遷移工作,遷移過程與鏈表類似,也是根據每個節(jié)點的 runBit 來確定在高位的紅黑樹中,還是在低位的紅黑樹中
- 當待處理區(qū)間內的所有桶都處理完畢后,再次嘗試獲取任務,如果獲取成功,則遍歷新獲取的區(qū)間內的所有桶進行遷移處理
- 如果 transferIndex 已經小于等于 0,則說明已經沒有任務可以分配了,那么嘗試使用 CAS 將參與擴容的線程數(shù) -1后(即將 sizeCtl -1 ),看是否當前擴容的線程數(shù)是否只剩下一個(即 sizeCtl - 2 = resizeStamp() << 16,即回到了初始擴容時將 sizeCtl 修改成的數(shù)值),如果是則直接返回
- 如果不是,則進行 recheck 處理,將原數(shù)組上的所有位置,從右到左再次重新遍歷一遍,檢查是否還存在元素還沒有被遷移
- 當 recheck 處理完畢后,則原數(shù)組上的所有位置上的元素都已經遷移完畢,則將新數(shù)組替換掉舊數(shù)組,將 sizeCtl 參數(shù)設置為新數(shù)組長度的 0.75,并將 nextTable 屬性置空后返回
結合源碼來看:
ConcurrentHashMap helpTransfer 方法的流程是怎樣的呢?
helpTransfer 即協(xié)助擴容方法,其主要流程如下:
- 首先進行一些判斷
- 當前數(shù)組不能為空
- 數(shù)組下標位置的節(jié)點是 FowardingNode 類型
- FowardingNode 節(jié)點的 nextTable 屬性不為空
- 同時滿足上述的三個條件后,進行下一步邏輯處理,否則直接將當前數(shù)組 table 對象返回出去
- 使用 resizeStamp 方法,計算數(shù)組長度的擴容戳(resizeStamp,簡寫為 int rs 變量)
- 具體的實現(xiàn)就是首先調用 Integer.numberOfLeadingZeros() 計算數(shù)組長度最高非 0 位前的 0 的個數(shù),由于數(shù)組的長度始終是 2 的整數(shù)次冪,所以當數(shù)組的長度發(fā)生變化時(即發(fā)生擴容時),該值肯定是會變化的(每次擴容后最高非 0 位往左移 1 位,則該數(shù)值減少了 1)
- 再將 1 往左移 15 位,最后將兩個值做 ^ 或運算,(相當于將兩個值相加),即得到了擴容戳數(shù)值
- 可以看出,擴容戳的取值范圍為 [215[2^{15}[215 , 215+32]2^{15}+32]215+32],且數(shù)組每次擴容,該數(shù)值將會 -1
- 進入循環(huán),判斷 nextTab,table 屬性是否發(fā)生變化(判斷其引用是否發(fā)生變化),判斷 sizeCtl 屬性是否小于 0(初始化完成后,sizeCtl 屬性小于 0 說明在進行擴容)
- 如果不滿足條件,直接將棧幀中的本地變量 nextTab 屬性返回出去
- 滿足條件則進入循環(huán)
- 判斷 sizeCtl 往右移 16 位后是否等于擴容戳(如果不等于,說明數(shù)組的大小已經發(fā)生了變化)
- 判斷 transferIndex 是否小于等于 0
- 如果滿足條件,則說明線程已經完成了擴容,則直接跳出循環(huán),將棧幀中的本地變量 nextTable 屬性返回出去
- 如果不滿足條件,則使用 CAS 嘗試將 sizeCtl 屬性 +1(代表協(xié)助擴容的線程數(shù)量 +1 了)
- 如果 CAS 成功,則執(zhí)行擴容方法
結合源碼來看:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;//首先進行判斷//1.當前數(shù)組不能為空//2.數(shù)組下標位置的節(jié)點是 FowardingNode 類型//3.數(shù)組下標位置的節(jié)點的 nextTable 屬性不為空if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// 使用 resizeStamp 方法,計算數(shù)組長度的擴容戳int rs = resizeStamp(tab.length);//判斷 nextTable,table 屬性是否發(fā)生變化(判斷其引用是否發(fā)生變化)//判斷 sizeCtl 屬性是否小于 0while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {//判斷 sizeCtl 往右移 16 位后是否不等于擴容戳//判斷 transferIndex 是否小于等于 0//其他兩個條件我認為是無效條件,不可能成立的,所以不去糾結代表的含義了//如果滿足上面說的兩個條件,則說明線程已經完成了擴容,則直接跳出循環(huán)if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;//使用 CAS 嘗試將 sizeCtl 屬性 +1if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//如果成功,代表協(xié)助擴容的線程數(shù)量 +1 了//執(zhí)行擴容方法transfer(tab, nextTab);break;}}//將棧幀中的本地變量 nextTab 屬性返回出去return nextTab;}//將當前數(shù)組返回出去return table; }ConcurrentHashMap 的 get 方法的流程?
ConcurrentHashMap 的 get 方法,是不加鎖的,具體的流程如下:
- 首先通過 sepread 方法,計算出 key 的 hash 值(計算方法就是將 key 的 hashcode 往右移 16 位后與原 hashcode 進行異或運算)
- 判斷數(shù)組是否為空,如果為空,則直接返回空
- 如果數(shù)組不為空,則根據 hash & 數(shù)組長度 -1 得到節(jié)點在數(shù)組中的位置,判斷這個位置上的節(jié)點是否為空
- 如果為空,則返回空
- 如果不為空,則用 equals 方法判斷下標位置的這個節(jié)點的 key 是否與輸入的 key 相等,如果相等,則將 value 返回出去
- 如果不相等,則判斷下標位置的節(jié)點的 hash 值是否小于 0
- 如果小于 0,則說明該位置上的節(jié)點是 FowardingNode 類型(hash 值為 -1),或者 TreeBin 類型(hash 值為 -2)
- 如果是 FowardingNode 類型,則說明數(shù)組正在進行擴容且這個節(jié)點已經遷移到了新的數(shù)組中,則在 ForwardingNode 的 nextTable 屬性(即擴容后的新數(shù)組)中,查找節(jié)點
- 如果是 TreeBin 類型,則在紅黑樹中執(zhí)行查找的邏輯
- 如果大于等于 0,則說明該位置上的節(jié)點是鏈表類型,則遍歷鏈表查找元素
- 如果小于 0,則說明該位置上的節(jié)點是 FowardingNode 類型(hash 值為 -1),或者 TreeBin 類型(hash 值為 -2)
ConcurrentHashMap 的 sizeCtl 的含義,以及值的流轉過程?
ConcurrentHashMap 的 sizeCtl,在不同的時間有不同的含義,詳解如下:
- 調用構造器完成后,sizeCtl 表示當前容量(使用無參構造器時,sizeCtl = 0,使用帶參構造器時,sizeCtl = 當前容量)
- 當前正在執(zhí)行初始化數(shù)組時,sizeCtl 的值為 -1,代表正在初始化數(shù)組
- 當數(shù)組初始化完成后,sizeCtl 表示擴容閾值,值為數(shù)組長度的 0.75
- 當擴容中時,sizeCtl 的高 16 位代表的是擴容戳(即 2152^{15}215 + 當前數(shù)組長度的最高非 0 位前面的 0 的個數(shù)),低 16 位代表的是參與擴容的線程數(shù) + 1
ConcurrentHashMap 的 size 方法的流程?
size 方法,即統(tǒng)計 ConcurrentHashMap 當前已存入的元素個數(shù)
- 調用 sumCount 方法
- 在 sumCount 方法內部,將 baseCount 和所有的 CounterCell 內部的 value 值進行累加,得到的就是當前已存入的元素個數(shù)
- 判斷元素個數(shù)是否大于整型值的最大值,如果是就返回整型值的最大值
其他
如果 ConcurrentHashMap 的某個數(shù)組下標位置是一顆紅黑樹,那么這個位置上的節(jié)點類型是 TreeNode 嗎?
ConcurrentHashMap 如果某個桶里面是一顆紅黑樹,那么該數(shù)組下標位置就是一個 TreeBin 對象,而不是一個 TreeNode 對象,TreeBin 對象相當于在 TreeNode 對象外面套了個殼子,TreeBin 對象有一個 TreeNode 屬性,這個屬性就是紅黑樹的根節(jié)點。
為什么要用 TreeBin 對象作為這個位置上的節(jié)點,而不是 TreeNode 對象呢?
這是因為在修改紅黑樹的時候,理論上來說需要對紅黑樹的根節(jié)點進行加鎖,但是實際上,在紅黑樹的修改過程中,根節(jié)點很可能因為樹的自平衡動作而被修改為其他節(jié)點。所以單純使用紅黑樹的根節(jié)點作為鎖對象是不靠譜的。
ConcurrentHashMap 的 size 方法會返回最新的值嗎?
ConcurrentHashMap 的 size 方法不會返回最新的值,只會返回調用方法那一刻元素數(shù)量的快照結果。
意思就是說,如果在 size 方法被調用的過程中,元素的數(shù)量發(fā)生了變化,那返回的元素數(shù)量依然是調用 size 方法那一刻的快照值。這是因為,在 size 方法內部,是沒有采取任何同步措施的
- 計算時取的計算依據 counterCells 和 baseCount 屬性,都是在調用方法那一刻的快照引用,如果在計算的過程中,這兩個計算依據發(fā)生了變化,那么計算時還是用的舊值進行計算的
- 在對 counterCellls 數(shù)組中的 CounterCell 對象的 value 屬性進行遍歷累加時,如果累加過后,該屬性發(fā)生了變化,那么返回的數(shù)值就不是最新的值了
transferIndex 的真正含義
代表的是,當前給線程分配任務的邊界,即已經分配給線程處理擴容的區(qū)間為 [transferIndex, newTableSize),而還沒有被分配給線程處理擴容的區(qū)間為:[0,transferIndex)
所以,當 transferIndex 小于等于 0 時,并不意味著擴容就結束了,而只是意味著將整個數(shù)組的擴容任務都分給了參與擴容的線程。
ConcurrentHashMap 總結
ConcurrentHashMap 是一個高性能的并發(fā)安全的 Map,常用做堆緩存,例如 Spring 的單例池,對象池等。除去處理并發(fā)相關操作外,主體流程與 HashMap 的數(shù)據操作流程基本一致。
put 方法流程總結
- 首先計算 key 的 hash 值
- 判斷表是否為空,如果為空則需要先進行初始化
- 當表為空時,一直循環(huán)操作
- 首先看是否有其他線程正在執(zhí)行初始化操作(判斷 sizeCtl 參數(shù)是否小于 0),如果有,則調用 Thread.yield() 方法讓出 CPU 執(zhí)行時間片,進入下次循環(huán)
- 嘗試使用 CAS 把 sizeCtl 參數(shù)替換為 -1,如果替換成功,則當前線程執(zhí)行表初始化操作
- 根據 hash 值 & 數(shù)組長度 - 1 找到數(shù)組中對應桶的位置
- 如果該位置上沒有元素,則嘗試使用 CAS 把待插入的元素替換到該位置上,如果成功則跳出循環(huán)
- 如果該位置上有元素,則判斷該位置上的元素是否處于擴容狀態(tài),如果是,則協(xié)助進行擴容
- 上述條件都不滿足,則嘗試對該位置上的元素使用 synchronized 進行加鎖
- 加鎖成功后,判斷該位置上的元素有沒有變化,如果有,說明有其他線程已經對這個位置上的元素做了改變,進入下次循環(huán)
- 判斷該桶上的數(shù)據結構是鏈表還是紅黑樹,如果是鏈表則使用尾插法插入新元素,如果是紅黑樹則執(zhí)行紅黑樹的插入邏輯
- 判斷鏈表是否要轉化為紅黑樹(當前表的長度大于等于 64 且鏈表的長度大于等于 8),如果是,則執(zhí)行鏈表轉化紅黑樹的操作
- 如果是覆蓋了舊值,則直接將舊值返回
- 將元素數(shù)量 + 1(執(zhí)行 addCount 方法)
ConcurrentHashMap 的元素數(shù)量計數(shù)
ConcurrentHashMap 中的元素數(shù)量,是采用了 LongAdder 類的設計思想,當前元素的數(shù)量并不是用一個數(shù)值變量來表示的,而是由一個計數(shù)器數(shù)組(CounterCell 類型的數(shù)組) 來維護的,當需要獲取當前元素數(shù)量時,會將當前計數(shù)器數(shù)組的快照進行遍歷累加,最后才能得到當前數(shù)組中的元素數(shù)量。
這樣做的好處就是當由多個線程都要去并發(fā)修改元素數(shù)量時,降低發(fā)生競爭的可能性。
試想一下,如果說只是用一個 volatile 修飾的數(shù)值類型 + CAS 來修改元素數(shù)量,那么當同一時刻有多個線程去修改元素數(shù)量時,每次都只會有一個線程修改成功,那么其余的線程都相當于空轉了一次,當并發(fā)的線程數(shù)量很多時,大多數(shù)線程將都會做類似自旋操作,這樣就白白浪費了 CPU 資源。
造成上述問題的根本性原因就是臨界資源的粒度太粗,導致發(fā)生競爭的可能性非常大。所以 CounterCell 數(shù)組的設計,正是將臨界資源的粒度給細化了,當一個線程對某個 CounterCell 的計數(shù)值修改失敗后,將會轉而去嘗試修改其他 CounterCell 的數(shù)值,這樣就降低了發(fā)生競爭的可能性,從而提升了修改操作的命中率。
ConcurrentHashMap 的擴容操作
ComcurremtHashMap 的擴容操作,是允許多個線程協(xié)助共同進行擴容操作的。
- 在判斷當前數(shù)組需要擴容(sizeCtl > 0 時,代表的含義就是擴容閾值)之后,首先發(fā)起擴容操作的線程就會把 sizeCtl 的值使用 CAS 修改為高 16 位代表擴容戳(2 的 15 次方 + 擴容前數(shù)組的長度最高非 0 位前的 0 的個數(shù)),低 16 位為 2 的數(shù)值,這個值小于 0。
- 第一個進行擴容操作的線程負責進行新數(shù)組的初始化
- 后來在 ConcurrentHashMap 中執(zhí)行操作的線程發(fā)現(xiàn)當前正在執(zhí)行擴容后,將會進行協(xié)助擴容,協(xié)助擴容之前將會用 CAS 操作嘗試將 sizeCtl 的值 +1,即 sizeCtl 的低 16 位 +1,即參與擴容的線程數(shù)量 +1。
- 在參與擴容的每個線程,都會嘗試使用 CAS 修改 transferIndex 的值(領取任務),修改后的 transferIndex 的值與修改前的 transferIndex 的值的區(qū)間范圍,即為該線程負責進行擴容的數(shù)組下標范圍,線程將會針對該范圍內的每一個位置上的元素都進行擴容操作
- 線程完成自己負責擴容的數(shù)組下標范圍后,將會再次判斷擴容有沒有完成
- 如果沒有,再次嘗試修改 transferIndex 的值以獲取負責進行擴容的數(shù)組下標范圍(再次領取任務),再次進行擴容操作
- 如果 transferIndex 的值已經小于 0 了(已經沒有可以領取的任務了),那么線程會完整地檢查一遍原數(shù)組,看還有沒有元素沒有被轉移
- 所有工作完成,將會把 sizeCtl 參數(shù) -1 后退出擴容方法,最后一個線程將會把原數(shù)組替換成新數(shù)組
ConcurrentHashMap 的設計思想總結
大量的無鎖并發(fā)安全處理操作
- ConcurrentHashMap 中的很多變量都使用了 volatile 關鍵字修飾,可以確保在變量值在被一個線程修改后,其他線程能立馬得到這個修改后的值
- ConcurrentHashMap 在修改變量值時,采用的是 CAS + 自旋重試的操作,可以在不使用鎖來阻塞其他參與線程的情況下并發(fā)安全地修改變量值
細化臨界資源粒度
- ConcurrentHashMap 使用了計數(shù)器數(shù)組(CounterCell 數(shù)組)來降低修改元素數(shù)量時的發(fā)生并發(fā)競爭概率
- 在添加新元素且這個新元素對應的數(shù)組下標位置有節(jié)點存在時,ConcurrentHashMap 鎖住的是數(shù)組下標位置上的這個元素(鏈表頭節(jié)點或者紅黑樹的根節(jié)點),使不同數(shù)組下標位置的桶上的修改操作互不影響,降低了發(fā)生并發(fā)競爭的概率
高效的擴容機制
高效的擴容機制主要的核心設計思想在于 ConcurrentHashMap 使用 transferIndex 來進行分段擴容,這樣做的好處有:
- 多線程協(xié)助共同完成擴容:ConcurrentHashMap 使用了多線程協(xié)助共同完成擴容的機制,使得 ConcurrentHashMap 的擴容操作在多線程場景下,不會讓其他線程阻塞等待單個線程操作擴容完畢,提高了單個線程的執(zhí)行效率,也使整體的擴容效率大大提升
- 在擴容期間仍可以無阻塞訪問數(shù)據:假設現(xiàn)在有一個線程想要調用 get 方法,并且當前 ConcurrentHashMap 正在執(zhí)行擴容操作,那么可能遇見的場景有以下幾種:
- key 對應的桶已經完成了擴容(但是還有其他桶沒有完成擴容),那么原數(shù)組中的桶的位置上將會放置一個 ForwardingNode 類型的桶,那么線程可以通過 nextTable(新數(shù)組) 完成對數(shù)據的訪問
- key 對應的桶還沒有開始進行擴容,那么直接訪問原數(shù)組中的桶就可以完成對數(shù)據的訪問
- key 對應的桶正在執(zhí)行擴容,由于 get 方法訪問的是調用時刻的原數(shù)組快照,所以該桶正在執(zhí)行擴容時還沒有對其完成改變,所以直接訪問原數(shù)組中的桶就可以完成對數(shù)據的訪問
高效的狀態(tài)管理機制
ConcurrentHashMap 使用單個整形變量來標識當前數(shù)組所處狀態(tài),將單個整形變量根據位數(shù)不同劃分了不同的含義,減少了多余的狀態(tài)值定義,一定程度上減少了內存消耗以及提升了整體效率
總結
以上是生活随笔為你收集整理的详解 ConcurrentHashMap的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数字语音信号处理学习笔记——绪论(2)
- 下一篇: 数字语音信号处理学习笔记——语音信号的数