ConsurrentDictionary并发字典知多少?
在上一篇文章你真的了解字典嗎?一文中我介紹了Hash Function和字典的工作的基本原理.
有網(wǎng)友在文章底部評(píng)論,說我的Remove和Add方法沒有考慮線程安全問題.
https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.dictionary-2?redirectedfrom=MSDN&view=netframework-4.7.2
查閱相關(guān)資料后,發(fā)現(xiàn)字典.net中Dictionary本身時(shí)不支持線程安全的,如果要想使用支持線程安全的字典,那么我們就要使用ConcurrentDictionary了.
在研究ConcurrentDictionary的源碼后,我覺得在ConcurrentDictionary的線程安全的解決思路很有意思,其對(duì)線程安全的處理對(duì)對(duì)我們項(xiàng)目中的其他高并發(fā)場(chǎng)景也有一定的參考價(jià)值,在這里再次分享我的一些學(xué)習(xí)心得和體會(huì),希望對(duì)大家有所幫助.
ConcurrentDictionary是Dictionary的線程安全版本,位于System.Collections.Concurrent的命名空間下,該命名空間下除了有ConcurrentDictionary,還有以下Class都是我們常用的那些類庫的線程安全版本.
BlockingCollection:為實(shí)現(xiàn)?IProducerConsumerCollection?的線程安全集合提供阻塞和限制功能。
ConcurrentBag:表示對(duì)象的線程安全的無序集合.
ConcurrentQueue:表示線程安全的先進(jìn)先出 (FIFO) 集合。
如果讀過我上一篇文章你真的了解字典嗎?的小伙伴,對(duì)這個(gè)ConcurrentDictionary的工作原理應(yīng)該也不難理解,它是簡(jiǎn)簡(jiǎn)單單地在讀寫方法加個(gè)lock嗎?
Dictionary
如下圖所示,在字典中,數(shù)組entries用來存儲(chǔ)數(shù)據(jù),buckets作為橋梁,每次通過hash function獲取了key的哈希值后,對(duì)這個(gè)哈希值進(jìn)行取余,即hashResult%bucketsLength=bucketIndex,余數(shù)作為buckets的index,而buckets的value就是這個(gè)key對(duì)應(yīng)的entry所在entries中的索引,所以最終我們就可以通過這個(gè)索引在entries中拿到我們想要的數(shù)據(jù),整個(gè)過程不需要對(duì)所有數(shù)據(jù)進(jìn)行遍歷,的時(shí)間復(fù)雜度為1.
ConcurrentDictionary
ConcurrentDictionary的數(shù)據(jù)存儲(chǔ)類似,只是buckets有個(gè)更多的職責(zé),它除了有dictionary中的buckets的橋梁的作用外,負(fù)責(zé)了數(shù)據(jù)存儲(chǔ).
key的哈希值與buckets的length取余后hashResult%bucketsLength=bucketIndex,余數(shù)作為buckets的索引就能找到我們要的數(shù)據(jù)所存儲(chǔ)的塊,當(dāng)出現(xiàn)兩個(gè)key指向同一個(gè)塊時(shí),即上圖中的John Smith和Sandra Dee他同時(shí)指向152怎么辦呢?存儲(chǔ)節(jié)點(diǎn)Node具有Next屬性執(zhí)行下個(gè)Node,上圖中,node 152的Next為154,即我們從152開始找Sandra Dee,發(fā)現(xiàn)不是我們想要的,再到154找,即可取到所需數(shù)據(jù).
由于官方原版的源碼較為復(fù)雜,理解起來有所難度,我對(duì)官方源碼做了一些精簡(jiǎn),下文將圍繞這個(gè)精簡(jiǎn)版的ConcurrentDictionary展開敘述.
https://github.com/liuzhenyulive/DictionaryMini
數(shù)據(jù)結(jié)構(gòu)
Node
ConcurrentDictionary中的每個(gè)數(shù)據(jù)存儲(chǔ)在一個(gè)Node中,它除了存儲(chǔ)value信息,還存儲(chǔ)key信息,以及key對(duì)應(yīng)的hashcode
private class Node{
internal TKey m_key;
internal TValue m_value;
internal volatile Node m_next;
internal int m_hashcode;
internal Node(TKey key, TValue value, int hashcode, Node next)
{
m_key = key;
m_value = value;
m_next = next;
m_hashcode = hashcode;
}
}
Table
而整個(gè)ConcurrentDictionary的數(shù)據(jù)存儲(chǔ)在這樣的一個(gè)Table中,其中m_buckets的Index負(fù)責(zé)映射key,m_locks是線程鎖,下文中會(huì)有詳細(xì)介紹,m_countPerLock存儲(chǔ)每個(gè)lock鎖負(fù)責(zé)的node數(shù)量.
private class Tables
{
internal readonly Node[] m_buckets;
internal readonly object[] m_locks;
internal volatile int[] m_countPerLock;
internal readonly IEqualityComparer<TKey> m_comparer;
internal Tables(Node[] buckets, object[] locks, int[] countPerlock, IEqualityComparer<TKey> comparer)
{
m_buckets = buckets;
m_locks = locks;
m_countPerLock = countPerlock;
m_comparer = comparer;
}
}
ConcurrentDictionary會(huì)在構(gòu)造函數(shù)中創(chuàng)建Table,這里我對(duì)原有的構(gòu)造函數(shù)進(jìn)行了簡(jiǎn)化,通過默認(rèn)值進(jìn)行創(chuàng)建,其中DefaultConcurrencyLevel默認(rèn)并發(fā)級(jí)別為當(dāng)前計(jì)算機(jī)處理器的線程數(shù).
public ConcurrentDictionaryMini() : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true,
EqualityComparer<TKey>.Default)
{
}
internal ConcurrentDictionaryMini(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer)
{
if (concurrencyLevel < 1)
{
throw new Exception("concurrencyLevel 必須為正數(shù)");
}
if (capacity < 0)
{
throw new Exception("capacity 不能為負(fù)數(shù).");
}
if (capacity < concurrencyLevel)
{
capacity = concurrencyLevel;
}
object[] locks = new object[concurrencyLevel];
for (int i = 0; i < locks.Length; i++)
{
locks[i] = new object();
}
int[] countPerLock = new int[locks.Length];
Node[] buckets = new Node[capacity];
m_tables = new Tables(buckets, locks, countPerLock, comparer);
m_growLockArray = growLockArray;
m_budget = buckets.Length / locks.Length;
}
方法
ConcurrentDictionary中較為基礎(chǔ)重點(diǎn)的方法分別位Add,Get,Remove,Grow Table方法,其他方法基本上是建立在這四個(gè)方法的基礎(chǔ)上進(jìn)行的擴(kuò)充.
Add
向Table中添加元素有以下亮點(diǎn)值得我們關(guān)注.
開始操作前會(huì)聲明一個(gè)tables變量來存儲(chǔ)操作開始前的m_tables,在正式開始操作后(進(jìn)入lock)的時(shí)候,會(huì)檢查tables在準(zhǔn)備工作階段是否別的線程改變,如果改變了,則重新開始準(zhǔn)備工作并從新開始.
通過GetBucketAndLockNo方法獲取bucket索引以及l(fā)ock索引,其內(nèi)部就是取余操作.
int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount)
{
bucketNo = (hashcode & 0x7fffffff) % bucketCount;
lockNo = bucketNo % lockCount;
}
對(duì)數(shù)據(jù)進(jìn)行操作前會(huì)從m_locks取出第lockNo個(gè)對(duì)象最為lock,操作完成后釋放該lock.多個(gè)lock一定程度上減少了阻塞的可能性.
在對(duì)數(shù)據(jù)進(jìn)行更新時(shí),如果該Value的Type為允許原子性寫入的,則直接更新該Value,否則創(chuàng)建一個(gè)新的node進(jìn)行覆蓋.
private static bool IsValueWriteAtomic()
{
Type valueType = typeof(TValue);
if (valueType.IsClass)
{
return true;
}
switch (Type.GetTypeCode(valueType))
{
case TypeCode.Boolean:
case TypeCode.Byte:
case TypeCode.Char:
case TypeCode.Int16:
case TypeCode.Int32:
case TypeCode.SByte:
case TypeCode.Single:
case TypeCode.UInt16:
case TypeCode.UInt32:
return true;
case TypeCode.Int64:
case TypeCode.Double:
case TypeCode.UInt64:
return IntPtr.Size == 8;
default:
return false;
}
}
該方法依據(jù)CLI規(guī)范進(jìn)行編寫,簡(jiǎn)單來說,32位的計(jì)算機(jī),對(duì)32字節(jié)以下的數(shù)據(jù)類型寫入時(shí)可以一次寫入的而不需要移動(dòng)內(nèi)存指針,64位計(jì)算機(jī)對(duì)64位以下的數(shù)據(jù)可一次性寫入,不需要移動(dòng)內(nèi)存指針.保證了寫入的安全.
詳見12.6.6?http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-335.pdf
private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
{
while (true)
{
int bucketNo, lockNo;
int hashcode;
Tables tables = m_tables;
IEqualityComparer<TKey> comparer = tables.m_comparer;
hashcode = comparer.GetHashCode(key);
GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);
bool resizeDesired = false;
bool lockTaken = false;
try
{
if (acquireLock)
Monitor.Enter(tables.m_locks[lockNo], ref lockTaken);
if (tables != m_tables)
continue;
Node prev = null;
for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next)
{
if (comparer.Equals(node.m_key, key))
{
if (updateIfExists)
{
if (s_isValueWriteAtomic)
{
node.m_value = value;
}
else
{
Node newNode = new Node(node.m_key, value, hashcode, node.m_next);
if (prev == null)
{
tables.m_buckets[bucketNo] = newNode;
}
else
{
prev.m_next = newNode;
}
}
resultingValue = value;
}
else
{
resultingValue = node.m_value;
}
return false;
}
prev = node;
}
Volatile.Write(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo]));
checked
{
tables.m_countPerLock[lockNo]++;
}
if (tables.m_countPerLock[lockNo] > m_budget)
{
resizeDesired = true;
}
}
finally
{
if (lockTaken)
Monitor.Exit(tables.m_locks[lockNo]);
}
if (resizeDesired)
{
GrowTable(tables, tables.m_comparer, false, m_keyRehashCount);
}
resultingValue = value;
return true;
}
}
Get
從Table中獲取元素的的流程與前文介紹ConcurrentDictionary工作原理時(shí)一致,但有以下亮點(diǎn)值得關(guān)注.
讀取bucket[i]在Volatile.Read()方法中進(jìn)行,該方法會(huì)自動(dòng)對(duì)讀取出來的數(shù)據(jù)加鎖,避免在讀取的過程中,數(shù)據(jù)被其他線程remove了.
Volatile讀取指定字段時(shí),在讀取的內(nèi)存中插入一個(gè)內(nèi)存屏障,阻止處理器重新排序內(nèi)存操作,如果在代碼中此方法之后出現(xiàn)讀取或?qū)懭?#xff0c;則處理器無法在此方法之前移動(dòng)它。
public bool TryGetValue(TKey key, out TValue value)
{
if (key == null) throw new ArgumentNullException("key");
Tables tables = m_tables;
IEqualityComparer<TKey> comparer = tables.m_comparer;
GetBucketAndLockNo(comparer.GetHashCode(key), out var bucketNo, out _, tables.m_buckets.Length, tables.m_locks.Length);
Node n = Volatile.Read(ref tables.m_buckets[bucketNo]);
while (n != null)
{
if (comparer.Equals(n.m_key, key))
{
value = n.m_value;
return true;
}
n = n.m_next;
}
value = default(TValue);
return false;
}
Remove
Remove方法實(shí)現(xiàn)其實(shí)也并不復(fù)雜,類似我們鏈表操作中移除某個(gè)Node.移除節(jié)點(diǎn)的同時(shí),還要對(duì)前后節(jié)點(diǎn)進(jìn)行鏈接,相信一塊小伙伴們肯定很好理解.
private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue){
while (true)
{
Tables tables = m_tables;
IEqualityComparer<TKey> comparer = tables.m_comparer;
int bucketNo, lockNo;
GetBucketAndLockNo(comparer.GetHashCode(key), out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);
lock (tables.m_locks[lockNo])
{
if (tables != m_tables)
continue;
Node prev = null;
for (Node curr = tables.m_buckets[bucketNo]; curr != null; curr = curr.m_next)
{
if (comparer.Equals(curr.m_key, key))
{
if (matchValue)
{
bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr.m_value);
if (!valuesMatch)
{
value = default(TValue);
return false;
}
}
if (prev == null)
Volatile.Write(ref tables.m_buckets[bucketNo], curr.m_next);
else
{
prev.m_next = curr.m_next;
}
value = curr.m_value;
tables.m_countPerLock[lockNo]--;
return true;
}
prev = curr;
}
}
value = default(TValue);
return false;
}
}
Grow table
當(dāng)table中任何一個(gè)m_countPerLock的數(shù)量超過了設(shè)定的閾值后,會(huì)觸發(fā)此操作對(duì)Table進(jìn)行擴(kuò)容.
private void GrowTable(Tables tables, IEqualityComparer<TKey> newComparer, bool regenerateHashKeys,int rehashCount)
{
int locksAcquired = 0;
try
{
AcquireLocks(0, 1, ref locksAcquired);
if (regenerateHashKeys && rehashCount == m_keyRehashCount)
{
tables = m_tables;
}
else
{
if (tables != m_tables)
return;
long approxCount = 0;
for (int i = 0; i < tables.m_countPerLock.Length; i++)
{
approxCount += tables.m_countPerLock[i];
}
if (approxCount < tables.m_buckets.Length / 4)
{
m_budget = 2 * m_budget;
if (m_budget < 0)
{
m_budget = int.MaxValue;
}
return;
}
}
int newLength = 0;
bool maximizeTableSize = false;
try
{
checked
{
newLength = tables.m_buckets.Length * 2 + 1;
while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0)
{
newLength += 2;
}
}
}
catch (OverflowException)
{
maximizeTableSize = true;
}
if (maximizeTableSize)
{
newLength = int.MaxValue;
m_budget = int.MaxValue;
}
AcquireLocks(1, tables.m_locks.Length, ref locksAcquired);
object[] newLocks = tables.m_locks;
if (m_growLockArray && tables.m_locks.Length < MAX_LOCK_NUMBER)
{
newLocks = new object[tables.m_locks.Length * 2];
Array.Copy(tables.m_locks, newLocks, tables.m_locks.Length);
for (int i = tables.m_locks.Length; i < newLocks.Length; i++)
{
newLocks[i] = new object();
}
}
Node[] newBuckets = new Node[newLength];
int[] newCountPerLock = new int[newLocks.Length];
for (int i = 0; i < tables.m_buckets.Length; i++)
{
Node current = tables.m_buckets[i];
while (current != null)
{
Node next = current.m_next;
int newBucketNo, newLockNo;
int nodeHashCode = current.m_hashcode;
if (regenerateHashKeys)
{
nodeHashCode = newComparer.GetHashCode(current.m_key);
}
GetBucketAndLockNo(nodeHashCode, out newBucketNo, out newLockNo, newBuckets.Length,
newLocks.Length);
newBuckets[newBucketNo] = new Node(current.m_key, current.m_value, nodeHashCode,
newBuckets[newBucketNo]);
checked
{
newCountPerLock[newLockNo]++;
}
current = next;
}
}
if (regenerateHashKeys)
{
unchecked
{
m_keyRehashCount++;
}
}
m_budget = Math.Max(1, newBuckets.Length / newLocks.Length);
m_tables = new Tables(newBuckets, newLocks, newCountPerLock, newComparer);
}
finally
{
ReleaseLocks(0, locksAcquired);
}
}
lock[]:在以往的線程安全上,我們對(duì)數(shù)據(jù)的保護(hù)往往是對(duì)數(shù)據(jù)的修改寫入等地方加上lock,這個(gè)lock經(jīng)常上整個(gè)上下文中唯一的,這樣的設(shè)計(jì)下就可能會(huì)出現(xiàn)多個(gè)線程,寫入的根本不是一塊數(shù)據(jù),卻要等待前一個(gè)線程寫入完成下一個(gè)線程才能繼續(xù)操作.在ConcurrentDictionary中,通過哈希算法,從數(shù)組lock[]中找出key的準(zhǔn)確lock,如果不同的key,使用的不是同一個(gè)lock,那么這多個(gè)線程的寫入時(shí)互不影響的.
寫入要考慮線程安全,讀取呢?不可否認(rèn),在大部分場(chǎng)景下,讀取不必去考慮線程安全,但是在我們這樣的鏈?zhǔn)阶x取中,需要自上而下地查找,是不是有種可能在查找個(gè)過程中,鏈路被修改了呢?所以ConcurrentDictionary中使用Volatile.Read來讀取出數(shù)據(jù),該方法從指定字段讀取對(duì)象引用,在需要它的系統(tǒng)上,插入一個(gè)內(nèi)存屏障,阻止處理器重新排序內(nèi)存操作,如果在代碼中此方法之后出現(xiàn)讀取或?qū)懭?#xff0c;則處理器無法在此方法之前移動(dòng)它。
在ConcurrentDictionary的更新方法中,對(duì)數(shù)據(jù)進(jìn)行更新時(shí),會(huì)判斷該數(shù)據(jù)是否可以原子寫入,如果時(shí)可以原子寫入的,那么就直接更新數(shù)據(jù),如果不是,那么會(huì)創(chuàng)建一個(gè)新的node覆蓋原有node,起初看到這里時(shí)候,我百思不得其解,不知道這么操作的目的,后面在jeo duffy的博客中Thread-safety, torn reads, and the like中找到了答案,這樣操作時(shí)為了防止torn reads(撕裂讀取),什么叫撕裂讀取呢?通俗地說,就是有的數(shù)據(jù)類型寫入時(shí),要分多次寫入,寫一次,移動(dòng)一次指針,那么就有可能寫了一半,這個(gè)結(jié)果被另外一個(gè)線程讀取走了.比如說我把?劉振宇三個(gè)字改成周杰倫的過程中,我先改把劉改成周了,正在我準(zhǔn)備去把振改成杰的時(shí)候,另外一個(gè)線程過來讀取結(jié)果了,讀到的數(shù)據(jù)是周振宇,這顯然是不對(duì)的.所以對(duì)這種,更安全的做法是先把周杰倫三個(gè)字寫好在一張紙條上,然后直接替換掉劉振宇.更多信息在CLI規(guī)范12.6.6有詳細(xì)介紹.
checked和unckecked關(guān)鍵字.非常量的運(yùn)算(non-constant)運(yùn)算在編譯階段和運(yùn)行時(shí)下不會(huì)做溢出檢查,如下這樣的代碼時(shí)不會(huì)拋出異常的,算錯(cuò)了也不會(huì)報(bào)錯(cuò)。
int i2 = 2147483647 + ten;
但是我們知道,int的最大值是2147483647,如果我們將上面這樣的代碼嵌套在checked就會(huì)做溢出檢查了.
checked{
int ten = 10;
int i2 = 2147483647 + ten;
}
相反,對(duì)于常量,編譯時(shí)是會(huì)做溢出檢查的,下面這樣的代碼在編譯時(shí)就會(huì)報(bào)錯(cuò)的,如果我們使用unckeck標(biāo)簽進(jìn)行標(biāo)記,則在編譯階段不會(huì)做移除檢查.
int a = int.MaxValue * 2;那么問題來了,我們當(dāng)然知道checked很有用,那么uncheck呢?如果我們只是需要那么一個(gè)數(shù)而已,至于溢出不溢出的關(guān)系不大,比如說生成一個(gè)對(duì)象的HashCode,比如說根據(jù)一個(gè)算法計(jì)算出一個(gè)相對(duì)隨機(jī)數(shù),這都是不需要準(zhǔn)確結(jié)果的,ConcurrentDictionary中對(duì)于m_keyRehashCount++這個(gè)運(yùn)算就使用了unchecked,就是因?yàn)閙_keyRehashCount是用來生成哈希值的,我們并不關(guān)心它有沒有溢出.
volatile關(guān)鍵字,表示一個(gè)字段可能是由在同一時(shí)間執(zhí)行多個(gè)線程進(jìn)行修改。出于性能原因,編譯器\運(yùn)行時(shí)系統(tǒng)甚至硬件可以重新排列對(duì)存儲(chǔ)器位置的讀取和寫入。聲明的字段volatile不受這些優(yōu)化的約束。添加volatile修飾符可確保所有線程都能按照?qǐng)?zhí)行順序由任何其他線程執(zhí)行的易失性寫入,易失性寫入是一件瘋狂的事情的事情:普通玩家慎用.
本博客鎖涉及的代碼都保存在github中,Take it easy to enjoy it!
https://github.com/liuzhenyulive/DictionaryMini/blob/master/DictionaryMini/DictionaryMini/ConcurrentDictionaryMini.cs
原文地址:https://www.cnblogs.com/CoderAyu/p/10549409.html
.NET社區(qū)新聞,深度好文,歡迎訪問公眾號(hào)文章匯總 http://www.csharpkit.com
總結(jié)
以上是生活随笔為你收集整理的ConsurrentDictionary并发字典知多少?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C# .net 中 Timeout 的处
- 下一篇: ASP.NET Core 基于JWT的认