问到ConcurrentHashMap不要再提Segment了
? ? ? ?前面已經介紹了hashMap。但hashMap并沒有考慮多線程并發的情況,因此是線程不安全的。我們可以使用HashTable,它使用synchronized來鎖住整張Hash表讓一個線程獨占來實現線程安全,這就意味著所有的線程都在競爭一把鎖。在多線程的環境下,它是安全的,但無疑是效率低下的。
? ? ? ?ConcurrentHashMap是一個線程安全且高效的HashMap實現。首先要說的是在JDK1.7到JDK8,ConcurrentHashMap的實現邏輯發生了很大變化,已經摒棄了Segment的概念,而是直接用Node數組+鏈表+紅黑樹的數據結構來實現,采用table數組元素作為鎖,從而實現了對每一列數據進行加鎖,降低鎖的粒度,并發控制使用Synchronized和CAS來操作,整個看起來就像是優化過且線程安全的HashMap。
2 源碼解析
2.1 ConcurrentHashMap的屬性和基本結構
在深入JDK1.8的put和get實現之前要知道一些常量設計和數據結構,這些是構成ConcurrentHashMap實現結構的基礎:
// node數組最大容量:2^30=1073741824 private static final int MAXIMUM_CAPACITY = 1 << 30; // 默認初始值,必須是2的幕數 private static final int DEFAULT_CAPACITY = 16 //數組可能最大值,需要與toArray()相關方法關聯 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //并發級別,遺留下來的,為兼容以前的版本 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 負載因子 private static final float LOAD_FACTOR = 0.75f; // 鏈表轉紅黑樹閾值,> 8 鏈表轉換為紅黑樹 static final int TREEIFY_THRESHOLD = 8; //樹轉鏈表閾值,小于等于6(tranfer時,lc、hc=0兩個計數器分別++記錄原bin、新binTreeNode數量 //<=UNTREEIFY_THRESHOLD 則untreeify(lo)) static final int UNTREEIFY_THRESHOLD = 6; static final int MIN_TREEIFY_CAPACITY = 64; private static final int MIN_TRANSFER_STRIDE = 16; private static int RESIZE_STAMP_BITS = 16; // 2^15-1,help resize的最大線程數 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 32-16=16,sizeCtl中記錄size大小的偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // forwarding nodes的hash值 static final int MOVED = -1; // 樹根節點的hash值 static final int TREEBIN = -2; // ReservationNode的hash值 static final int RESERVED = -3; // 可用處理器數量 static final int NCPU = Runtime.getRuntime().availableProcessors(); //存放node的數組 transient volatile Node<K,V>[] table; /*控制標識符,用來控制table的初始化和擴容的操作,不同的值有不同的含義*當為負數時:-1代表正在初始化,-N代表有N-1個線程正在 進行擴容*當為0時:代表當時的table還沒有被初始化*當為正數時:表示初始化或者下一次進行擴容的大小*/ private transient volatile int sizeCtl;Node是ConcurrentHashMap存儲結構的基本單元,繼承于HashMap中的Entry,用于存儲數據,就是一個鏈表,但是只允許對數據進行查找,不允許進行修改。源代碼如下
static class Node<K,V> implements Map.Entry<K,V> {//注意被final修飾final int hash;final K key;volatile V val;volatile Node<K,V> next;Node(int hash, K key, V val, Node<K,V> next) {//構造方法中賦初值后,就不能再修改this.hash = hash;this.key = key;this.val = val;this.next = next;}public final K getKey() { return key; }public final V getValue() { return val; }public final int hashCode() { return key.hashCode() ^ val.hashCode(); }public final String toString(){ return key + "=" + val; }public final V setValue(V value) {//不允許通過setValue方法去修改throw new UnsupportedOperationException();}public final boolean equals(Object o) {Object k, v, u; Map.Entry<?,?> e;return ((o instanceof Map.Entry) &&(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&(v = e.getValue()) != null &&(k == key || k.equals(key)) &&(v == (u = val) || v.equals(u)));}/*** Virtualized support for map.get(); overridden in subclasses.*/Node<K,V> find(int h, Object k) {Node<K,V> e = this;if (k != null) {do {K ek;if (e.hash == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;} while ((e = e.next) != null);}return null;}}2.2 put方法
final V putVal(K key, V value, boolean onlyIfAbsent) {//key和value不允許為nullif (key == null || value == null) {throw new NullPointerException();}//1. 計算key的hash值,先hash也是采用(h ^ (h >>> 16)) & HASH_BITS;來將低16位和高16位進行異或運算,// 這樣能夠使得hash值能夠分散能夠均勻減小hash沖突的概率int hash = spread(key.hashCode());int binCount = 0;for (Node<K, V>[] tab = table; ; ) {Node<K, V> f;int n, i, fh;//2. 如果當前table還沒有初始化先調用initTable方法將tab進行初始化//(構造方法并沒有進行初始化table,而是在put方法中初始化)if (tab == null || (n = tab.length) == 0) {tab = initTable();//3. tab中索引為i的位置的元素為null,則直接使用CAS將值插入即可} else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null))) {break; // no lock when adding to empty bin}//4. 當前正在擴容,就協助進行擴容} else if ((fh = f.hash) == MOVED) {tab = helpTransfer(tab, f);} else {V oldVal = null;//如果以上條件都不滿足,那就要進行加鎖操作,也就是存在hash沖突,鎖住鏈表或者紅黑樹的頭結點synchronized (f) {if (tabAt(tab, i) == f) {//5. 當前為鏈表,在鏈表中插入新的鍵值對if (fh >= 0) {binCount = 1;for (Node<K, V> e = f; ; ++binCount) {K ek;//如果key相同就會覆蓋原先的valueif (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K, V> pred = e;if ((e = e.next) == null) {//否則插入到鏈表尾部pred.next = new Node<K, V>(hash, key, value, null);break;}}// 6.當前為紅黑樹,將新的鍵值對插入到紅黑樹中} else if (f instanceof TreeBin) {Node<K, V> p;binCount = 2;//紅黑樹結構旋轉插入if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null){oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {//7.如果鏈表的長度大于8時就會進行紅黑樹的轉換if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}//8.對當前容量大小進行檢查,如果超過了臨界值(實際大小*加載因子)就需要擴容 addCount(1L, binCount);return null;}這個put的過程很清晰,對當前的table進行無條件自循環直到put成功(即break退出),可以分成以下六步流程來概述:
1、Node[]數組沒有初始化就先初始化
2、如果內部正在擴容,就幫助它一塊擴容
3、key所在的索引位置還沒有Node就直接CAS添加
4、如果節點已存在元素,則synchronized鎖住元素(鏈表/紅黑樹的頭元素)。如果是Node(鏈表結構)則執行鏈表的添加操作;如果是TreeNode(樹型結構)則執行樹添加操作
5、判斷鏈表長度已經達到臨界值8(默認值),當節點超過這個值就需要把鏈表轉換為樹結構
6、如果添加成功就調用addCount()方法統計size,并且檢查是否需要擴容
現在我們來對每一步的細節進行源碼分析,在第一步中,符合條件會進行初始化操作,我們來看看initTable()方法
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0)// 1. 保證只有一個線程正在進行初始化操作Thread.yield(); // lost initialization race; just spinelse if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if ((tab = table) == null || tab.length == 0) {// 2. 得出數組的大小int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")// 3. 這里才真正的初始化數組Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;// 4. 計算數組中可用的大小:實際大小n*0.75(加載因子)sc = n - (n >>> 2);}} finally {sizeCtl = sc;}break;}}return tab; }? ? ? ?為了保證能夠正確初始化,在第1步中會先通過if進行判斷,若當前已經有一個線程正在初始化即sizeCtl值變為-1,這個時候其他線程在If判斷為true從而調用Thread.yield()讓出CPU時間片。正在進行初始化的線程會調用U.compareAndSwapInt方法將sizeCtl改為-1即正在初始化的狀態。
? ? ? ?另外還需要注意的事情是,在第四步中會進一步計算數組中可用的大小即為數組實際大小n乘以加載因子0.75.可以看看這里乘以0.75是怎么算的,0.75為四分之三,這里n - (n >>> 2)是不是剛好是n-(1/4)n=(3/4)n,挺有意思的吧。如果選擇是無參的構造器的話,這里在new Node數組的時候會使用默認大小為DEFAULT_CAPACITY(16)。
Unsafe與CAS
? ? ? ?在ConcurrentHashMap中,隨處可以看到U, 大量使用了U.compareAndSwapXXX的方法,這個方法是利用一個CAS算法實現無鎖化的修改值的操作,他可以大大降低鎖代理的性能消耗。這個算法的基本思想就是不斷地去比較當前內存中的變量值與你指定的一個變量值是否相等,如果相等,則接受你指定的修改的值,否則拒絕你的操作。因為當前線程中的值已經不是最新的值,你的修改很可能會覆蓋掉其他線程修改的結果。這一點與樂觀鎖,SVN的思想是比較類似的。
? ? ? ?ConcurrentHashMap定義了三個原子操作,用于對指定位置的節點進行操作。正是這些原子操作保證了ConcurrentHashMap的線程安全。
2.3 ConcurrentHashMap的擴容
通過判斷該節點的hash值是不是等于-1(MOVED),代碼為(fh = f.hash) == MOVED,說明 Map 正在擴容。那么就幫助 Map 進行擴容。以加快速度。如何幫助擴容呢?
/*** Helps transfer if a resize is in progress.*/ final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;// 如果 table 不是空 且 node 節點是轉移類型,數據檢驗// 且 node 節點的 nextTable(新 table) 不是空,同樣也是數據校驗// 嘗試幫助擴容if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// 根據 length 得到一個標識符號int rs = resizeStamp(tab.length);// 如果 nextTab 沒有被并發修改 且 tab 也沒有被并發修改// 且 sizeCtl < 0 (說明還在擴容)while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {// 如果 sizeCtl 無符號右移 16 不等于 rs ( sc前 16 位如果不等于標識符,則標識符變化了)// 或者 sizeCtl == rs + 1 (擴容結束了,不再有線程進行擴容)(默認第一個線程設置 sc ==rs 左移 16 位 + 2,// 當第一個線程結束擴容了,就會將 sc 減一。這個時候,sc 就等于 rs + 1)// 或者 sizeCtl == rs + 65535 (如果達到最大幫助線程的數量,即 65535)// 或者轉移下標正在調整 (擴容結束)// 結束循環,返回 tableif ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;// 如果以上都不是, 將 sizeCtl + 1, (表示增加了一個線程幫助其擴容)if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {// 進行轉移transfer(tab, nextTab);// 結束循環break;}}return nextTab;}return table; }其實helpTransfer()方法的目的就是調用多個工作線程一起幫助進行擴容,這樣的效率就會更高.這里關鍵transfer(),那么我們繼續深入了解一下
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// 每核處理的量小于16,則強制賦值16if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab == null) { // initiatingtry {//構建一個nextTable對象,其容量為原來容量的兩倍Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;// 連接點指針,用于標志位(fwd的hash值為-1,fwd.nextTable=nextTab)ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// 當advance == true時,表明該節點已經處理過了boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 控制 --i ,遍歷原hash表中的節點while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// 用CAS計算得到的transferIndexelse if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;// 已經完成所有節點復制了if (finishing) {nextTable = null;table = nextTab; // table 指向nextTablesizeCtl = (n << 1) - (n >>> 1); // sizeCtl閾值為原來的1.5倍return; // 跳出死循環,}// CAS 更擴容閾值,在這里面sizectl值減一,說明新加入一個線程參與到擴容操作if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}// 遍歷的節點為null,則放入到ForwardingNode 指針節點else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);// f.hash == -1 表示遍歷到了ForwardingNode節點,意味著該節點已經處理過了// 這里是控制并發擴容的核心else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {// 節點加鎖synchronized (f) {// 節點復制工作if (tabAt(tab, i) == f) {Node<K,V> ln, hn;// fh >= 0 ,表示為鏈表節點if (fh >= 0) {// 構造兩個鏈表 一個是原鏈表 另一個是原鏈表的反序排列int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}// 在nextTable i 位置處插上鏈表setTabAt(nextTab, i, ln);// 在nextTable i + n 位置處插上鏈表setTabAt(nextTab, i + n, hn);// 在table i 位置處插上ForwardingNode 表示該節點已經處理過了setTabAt(tab, i, fwd);// advance = true 可以執行--i動作,遍歷節點advance = true;}// 如果是TreeBin,則按照紅黑樹進行處理,處理邏輯與上面一致else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}// 擴容后樹節點個數若<=6,將樹轉鏈表ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}? ? ? ?擴容過程有點復雜,可以查看上面注釋。這里主要涉及到多線程并發擴容,ForwardingNode的作用就是支持擴容操作,將已處理的節點和空節點置為ForwardingNode,并發處理時多個線程經過ForwardingNode就表示已經遍歷了,就往后遍歷,下圖是多線程合作擴容的過程:
addCount()方法計算ConcurrentHashMap的size
private final void addCount(long x, int check) {CounterCell[] as; long b, s;//更新baseCount,table的數量,counterCells表示元素個數的變化if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;//如果多個線程都在執行,則CAS失敗,執行fullAddCount,全部加入countif (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}//check>=0表示需要進行擴容操作if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}//當前線程發起操作,nextTable=nullelse if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}} }? ? ? ?put的流程現在已經分析完了,你可以從中發現,他在并發處理中使用的是樂觀鎖,當有沖突的時候才進行并發處理,而且流程步驟很清晰,但是細節設計的很復雜,畢竟多線程的場景也復雜.
2.4 get方法
concurrentHashMap的get操作的流程很簡單,可以分為三個步驟來描述:
1.計算hash值,定位到該table索引位置,如果是首節點符合就返回。
2.如果遇到擴容的時候,會調用標志正在擴容節點ForwardingNode的find方法,查找該節點,匹配就返回。
3.以上都不符合的話,就往下遍歷節點,匹配就返回,否則最后就返回null
szie方法的邏輯
? ? ? ?對于size的計算,在擴容和addCount()方法就已經有處理了,可以注意一下Put函數,里面就有addCount()函數,早就計算好的,然后你size的時候直接給你.
HashMap、Hashtable、ConccurentHashMap三者的區別
HashMap線程不安全,數組+鏈表+紅黑樹
Hashtable線程安全,鎖住整個對象,數組+鏈表
ConccurentHashMap線程安全,CAS+同步鎖,數組+鏈表+紅黑樹
HashMap的key,value均可為null,其他兩個不行。
在JDK1.7和JDK8中ConcurrentHashMap的區別
其實可以看出JDK1.8版本的ConcurrentHashMap的數據結構已經接近HashMap,相對而言,ConcurrentHashMap只是增加了同步的操作來控制并發,從JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+紅黑樹,相對而言,總結如下思考
- JDK8的實現不采用segment而采用node。JDK1.7版本鎖的粒度是基于Segment(包含多個HashEntry),而JDK8鎖的粒度就是HashEntry(首節點),減小鎖粒度,不過也由于粒度的降低,實現的復雜度也增加了(代碼量從原來的1000多行變成了驚人的 6000多 行)。因為已經使用synchronized來進行同步,所以不需要分段鎖的概念,也就不需要Segment這種數據結構了,數據結構變得更加簡單,使得操作也更加清晰流暢
- JDK8使用紅黑樹來優化鏈表,基于長度很長的鏈表的遍歷是一個很漫長的過程,而紅黑樹的遍歷效率是很快的,Log(N)
- JDK8為什么使用內置鎖synchronized來代替重入鎖ReentrantLock,我覺得有以下幾點:
- 因為粒度降低了,在相對而言的低粒度加鎖方式,synchronized并不比ReentrantLock差,在粗粒度加鎖中ReentrantLock可能通過Condition來控制各個低粒度的邊界,更加的靈活,而在低粒度中,Condition的優勢就沒有了
- JVM的開發團隊從來都沒有放棄synchronized,而且基于JVM的synchronized優化空間更大,使用內嵌的關鍵字比使用API更加自然
- 在大量的數據操作下,對于JVM的內存壓力,基于API的ReentrantLock會開銷更多的內存,雖然不是瓶頸,但是也是一個選擇依據
ConcurrentHashMap大總結
ConcurrentHashMap原理分析(1.7與1.8)
總結
以上是生活随笔為你收集整理的问到ConcurrentHashMap不要再提Segment了的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 开发中常用工具
- 下一篇: JVM监控工具有哪些