ConcurrentHashMap的源码分析-transfer
擴容是ConcurrentHashMap的精華之一,擴容操作的核心在于數據的轉移,在單線程環(huán)境下數據的轉移很簡單,無非就是把舊數組中的數據遷移到新的數組。但是這在多線程環(huán)境下,在擴容的時候其他線程也可能正在添加元素,這時又觸發(fā)了擴容怎么辦?可能大家想到的第一個解決方案是加互斥鎖,把轉移過程鎖住,雖然是可行的解決方案,但是會帶來較大的性能開銷。因為互斥鎖會導致所有訪問臨界區(qū)的線程陷入到阻塞狀態(tài),持有鎖的線程耗時越長,其他競爭線程就會一直被阻塞,導致吞吐量較低。而且還可能導致死鎖。
而ConcurrentHashMap并沒有直接加鎖,而是采用CAS實現無鎖的并發(fā)同步策略,最精華的部分是它可以利用多線程來進行協(xié)同擴容
簡單來說,它把Node數組當作多個線程之間共享的任務隊列,然后通過維護一個指針來劃分每個線程鎖負責的區(qū)間,每個線程通過區(qū)間逆向遍歷來實現擴容,一個已經遷移完的bucket會被替換為一個ForwardingNode節(jié)點,標記當前bucket已經被其他線程遷移完了。接下來分析一下它的源碼實現
1、fwd:這個類是個標識類,用于指向新表用的,其他線程遇到這個類會主動跳過這個類,因為這個類要么就是擴容遷移正在進行,要么就是已經完成擴容遷移,也就是這個類要保證線程安全,再進行操作。
2、advance:這個變量是用于提示代碼是否進行推進處理,也就是當前桶處理完,處理下一個桶的標識
3、finishing:這個變量用于提示擴容是否結束用的?
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //將 (n>>>3相當于 n/8) 然后除以 CPU核心數。如果得到的結果小于 16,那么就使用 16 // 這里的目的是讓每個 CPU 處理的桶一樣多,避免出現轉移任務不均勻的現象,如果桶較少的話,默認一個 CPU(一個線程)處理 16 個桶,也就是長度為16的時候,擴容的時候只會有一 個線程來擴容 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //nextTab未初始化,nextTab是用來擴容的node數組 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked")//新建一個n<<1原始table大小的nextTab,也就是32 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt;//賦值給nextTab } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; //擴容失敗,sizeCtl使用int的最大值 return; } nextTable = nextTab; //更新成員變量 transferIndex = n;//更新轉移下標,表示轉移時的下標 } int nextn = nextTab.length;//新的tab的長度// 創(chuàng)建一個 fwd 節(jié)點,表示一個正在被遷移的Node,并且它的hash值為-1(MOVED),也就是前面我們在講putval方法的時候,會有一個判斷MOVED的邏輯。它的作用是用來占位,表示原數組中位置i處的節(jié)點完成遷移以后,就會在i位置設置一個fwd來告訴其他線程這個位置已經處理過了,具體后續(xù)還會在講 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 首次推進為 true,如果等于 true,說明需要再次推進一個下標(i--),反之,如果是false,那么就不能推進下標,需要將當前的下標處理完畢才能繼續(xù)推進 boolean advance = true; //判斷是否已經擴容完成,完成就return,退出循環(huán) boolean finishing = false; // to ensure sweep before committing nextTab 通過for自循環(huán)處理每個槽位中的鏈表元素,默認advace為真,通過CAS設置transferIndex屬性值,并初始化i和bound值,i指當前處理的槽位序號,bound指需要處理的槽位邊界,先處理槽位15的節(jié)點; for (int i = 0, bound = 0;;) {// 這個循環(huán)使用CAS不斷嘗試為當前線程分配任務 // 直到分配成功或任務隊列已經被全部分配完畢 // 如果當前線程已經被分配過bucket區(qū)域 // 那么會通過--i指向下一個待處理bucket然后退出該循環(huán) Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; //--i表示下一個待處理的bucket,如果它>=bound,表示當前線程已經分配過bucket區(qū)域 if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) {//表示所有bucket已經被分配完畢 i = -1; advance = false; } //通過cas來修改TRANSFERINDEX,為當前線程分配任務,處理的節(jié)點區(qū)間為(nextBound,nextIndex)->(0,15) else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound;//0 i = nextIndex - 1;//15 advance = false;} } //i<0說明已經遍歷完舊的數組,也就是當前線程已經處理完所有負責的bucket if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) {//如果完成了擴容 nextTable = null;//刪除成員變量 table = nextTab;//更新table數組 sizeCtl = (n << 1) - (n >>> 1);//更新閾值(32*0.75=24) return; } // sizeCtl 在遷移前會設置為 (rs << RESIZE_STAMP_SHIFT) + 2 (詳細介紹點擊這里) // 然后,每增加一個線程參與遷移就會將 sizeCtl 加 1,// 這里使用 CAS 操作對 sizeCtl 的低16位進行減 1,代表做完了屬于自己的任務 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {第一個擴容的線程,執(zhí)行transfer方法之前,會設置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)后續(xù)幫其擴容的線程,執(zhí)行transfer方法之前,會設置 sizeCtl = sizeCtl+1 每一個退出transfer的方法的線程,退出之前,會設置 sizeCtl = sizeCtl-1 那么最后一個線程退出時:必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT // 如果 sc - 2 不等于標識符左移 16 位。如果他們相等了,說明沒有線程在幫助他們擴容了。也就是說,擴容結束了。 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 如果相等,擴容結束了,更新 finising 變量finishing = advance = true; // 再次循環(huán)檢查一下整張表i = n; // recheck before commit } } // 如果位置 i 處是空的,沒有任何節(jié)點,那么放入剛剛初始化的 ForwardingNode ”空節(jié)點“ else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //表示該位置已經完成了遷移,也就是如果線程A已經處理過這個節(jié)點,那么線程B處理這個節(jié)點時,hash值一定為MOVED else if ((fh = f.hash) == MOVED) advance = true; // already processed } }?
?
總結
以上是生活随笔為你收集整理的ConcurrentHashMap的源码分析-transfer的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ConcurrentHashMap的源码
- 下一篇: ConcurrentHashMap的源码