ConcurrentHashMap源码jdk1.8学习笔记
生活随笔
收集整理的這篇文章主要介紹了
ConcurrentHashMap源码jdk1.8学习笔记
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
目錄
put方法
initTable方法
helpTransfer方法
transfer方法
addCount方法
?
put方法
public V put(K key, V value) {// 直接調(diào)用了putVal方法return putVal(key, value, false);}final V putVal(K key, V value, boolean onlyIfAbsent) {// 1.判斷key、value是否有為null的。疑問1:為什么不能為null?if (key == null || value == null) throw new NullPointerException();// 2.獲取key的hash值,位運算效率更高int hash = spread(key.hashCode());int binCount = 0;// 3.table數(shù)組賦值給tab變量,為什么要循環(huán)?比如高并發(fā)下3個線程進入了initTable方法// initTable中1個線程初始化,其余2線程循環(huán)Thread.yield()直到初始化完成退出initTable// 初始化完成后則需要這個循環(huán)進入其它的if分支進行其它操作,直到真正put完成break// 其它操作可能是helpTransfer,幫助擴容后reHash,以及后續(xù)會講到的cas put失敗的for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 4.判斷Node數(shù)組是否為空,為空則初始化一個Node數(shù)組。// 線程安全由初始化方法initTable保證。if (tab == null || (n = tab.length) == 0)tab = initTable();// 5.這里包含了第二次哈希(n - 1) & hash 得到k,v存放的數(shù)組下標i// 將數(shù)組第i個Node桶賦值給f,判斷此Node桶是否有數(shù)據(jù)else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 6.當然也存在并發(fā)情況,多個線程進入此分支,但沒關(guān)系,只有一個能cas成功if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin// 7.其它的線程什么也不干,進入下一次循環(huán)即可}// 8.通過此桶hash值 == -1 MOVED判斷是否在進行resize,且此桶還未完成rehashelse if ((fh = f.hash) == MOVED)// 9.則幫下忙,幫忙結(jié)束后,進入下次循環(huán)。幫忙線程有一個算一個來者不拒。tab = helpTransfer(tab, f);else {// 10.Node數(shù)組已經(jīng)初始化過了,桶中也已經(jīng)有了數(shù)據(jù),也沒在resize,則putV oldVal = null;synchronized (f) {// 11.貌似是為了更加保險,再判斷一下?if (tabAt(tab, i) == f) {// 12.這個桶的hash值判斷是否為鏈表結(jié)構(gòu)if (fh >= 0) {binCount = 1;// 13.循環(huán)鏈表,覆蓋或者新增節(jié)點,binCount記錄節(jié)點數(shù)for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 14.若是紅黑樹,調(diào)用其put方法else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {// 15.判斷節(jié)點數(shù)量是否>8,是則調(diào)用treeifyBin() 執(zhí)行擴容或轉(zhuǎn)紅黑樹if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);// 16.若是新值替換舊值,不用執(zhí)行addCount()方法,直接returnif (oldVal != null)return oldVal;break;}}}// 17.數(shù)據(jù)個數(shù)+1, binCount是鏈表的長度/紅黑樹的結(jié)點數(shù)// 擴容判斷也在此方法,線程安全詳見addCount()方法解析addCount(1L, binCount);return null;}1、為什么不能put null,而HashMap卻允許?
當你通過get(key)獲取對應(yīng)的value時,如果獲取到的是null,你無法判斷,是put(k,v)的時候value為null,還是這個key就不存在于map中。HashMap是非并發(fā)的,可以通過后續(xù)執(zhí)行contains(key)來做這個判斷。而支持并發(fā)的Map在后續(xù)調(diào)用contains(key)時,可能其它線程已經(jīng)將原先存在的remove。
?
initTable方法
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {// 1.sizeCtl == -1 < 0說明已經(jīng)有別的線程在執(zhí)行初始化,yield放棄cpu等待即可// 值得注意的是,hotspot虛擬機來說yield() == seelp(0), seelp(0)底層也是使用yieldif ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spin// 2.并發(fā)下,可能有多個線程來到這個分支,但只有一個能cas成功,其余下次循環(huán)會走上邊else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;sc = n - (n >>> 2);}} finally {// 3.修改后原先等待線程走else if分支,但cas均會修改失敗,退出方法。sizeCtl = sc;}break;}}return tab;}helpTransfer方法
先看需要轉(zhuǎn)移的節(jié)點的數(shù)據(jù)結(jié)構(gòu)
static final class ForwardingNode<K,V> extends Node<K,V> {// 指向擴容后的tabfinal Node<K,V>[] nextTable;// 只有transfer()中調(diào)用ForwardingNode(Node<K,V>[] tab) {super(MOVED, null, null, null);this.nextTable = tab;}// ... 還有一個方法省略} final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;// 1.數(shù)據(jù)檢驗,如果 table 不是空 且 node 節(jié)點是轉(zhuǎn)移類型,// 且 node 節(jié)點的 nextTable(新 table) 不是空if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// 嘗試幫助擴容// 2.根據(jù) length 得到一個值,沒看懂含義?? TODOint rs = resizeStamp(tab.length);// 3.如果 nextTab 沒有被并發(fā)修改 且 tab 也沒有被并發(fā)修改// 且 sizeCtl < 0 (說明還在擴容)while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {// 4.如果 sizeCtl 無符號右移 16 不等于 rs // sc前 16 位如果不等于標識符,則標識符變化了// 或者 sizeCtl == rs + 1 擴容結(jié)束// 默認第一個線程設(shè)置 sc ==rs 左移 16 位 + 2,當?shù)谝粋€線程結(jié)束擴容了,就會將 sc 減一。這個時候,sc 就等于 rs + 1// 或者 sizeCtl == rs + 65535 (如果達到最大幫助線程的數(shù)量,即 65535)// 或者轉(zhuǎn)移下標正在調(diào)整 (擴容結(jié)束)// 結(jié)束循環(huán),返回 tableif ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;// 5.如果以上都不是, 將 sizeCtl + 1, (表示增加了一個線程幫助其擴容)if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {// 6.進行轉(zhuǎn)移,線程有一個算一個來者不拒transfer(tab, nextTab);// 結(jié)束循環(huán)break;}}return nextTab;}return table; }?
transfer方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// 1.計算每個CPU處理桶個數(shù),NCPU==1則處理所有,小于16則取16int n = tab.length, stride;if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range// 2.外邊調(diào)用此方法時,大部分為情況為nextTab!=null,為null時有cas保證只有一個線程進入if (nextTab == null) { // initiatingtry {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOME// 3.處理擴容失敗sizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;// 4.判斷是否完成了擴容if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// 5.線程第一次for循環(huán),且未完成擴容,都會走這個分支,分配一個桶區(qū)間遷移任務(wù)else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;// 6.全部任務(wù)完成if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 7.不是最后一個完成任務(wù)if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;// 8.最后一個完成任務(wù)還有后續(xù)要干,下次循環(huán)走6finishing = advance = true;i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {// 加鎖進行一個桶的遷移synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;if (fh >= 0) {int runBit = fh & n;// 循環(huán),尋找lastRun節(jié)點,往后的跟lastRun在一個桶,免去復制操作Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;// 頭插法,即快速,又符合后put的節(jié)點被訪問的可能性更大的事實// 但因為lastRun的存在,所以又并非純粹的倒序if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}addCount方法
private final void addCount(long x, int check) {CounterCell[] as; long b, s;// 1.若計數(shù)格子非空,或嘗試 cas baseCount+x 失敗,則進入if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;// 2.計數(shù)格子空 --說明還未出現(xiàn)并發(fā)// 或 隨機獲取一個計數(shù)格子,格子為空// 或 隨機一個計數(shù)格子不為空,則嘗試cas +x失敗,總體來說優(yōu)先使用計數(shù)格子計數(shù)if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// 3.通過循環(huán)自旋 以及 cas 操作初始化counterCells 或嘗試 cas baseCount+xfullAddCount(x, uncontended);return;}if (check <= 1)return;// sumCount()方法實現(xiàn):counterCells累加 + baseCounts = sumCount();}// 是否需要擴容檢查參數(shù),putVal()中調(diào)用的會檢查if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;// size >= 擴容閾值sizeCtl 且 容量 < 最大容量MAXIMUM_CAPACITY = 1 << 30while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {// 已經(jīng)在擴容了,幫助擴容if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}// 開始擴容else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}}}?
?
參考文章:
https://blog.csdn.net/weixin_40299948/article/details/99692294
https://www.cnblogs.com/zerotomax/p/8687425.html
總結(jié)
以上是生活随笔為你收集整理的ConcurrentHashMap源码jdk1.8学习笔记的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一些Chrome 调试小技巧汇总
- 下一篇: vue的this.$set的作用