Redis源码解析——字典遍历
? ? ? ? 之前兩篇博文講解了字典庫的基礎,本文將講解其遍歷操作。之所以將遍歷操作獨立成一文來講,是因為其中的內容和之前的基本操作還是有區別的。特別是高級遍歷一節介紹的內容,充滿了精妙設計的算法智慧。(轉載請指明出于breaksoftware的csdn博客)
迭代器遍歷
? ? ? ? 由于Redis字典庫有rehash機制,而且是漸進式的,所以迭代器操作可能會通過其他特殊方式來實現,以保證能遍歷到所有數據。但是閱讀完源碼發現,其實這個迭代器是個受限的迭代器,實現方法也很簡單。我們先看下其基礎結構:
typedef struct dictIterator {dict *d;long index;int table, safe;dictEntry *entry, *nextEntry;/* unsafe iterator fingerprint for misuse detection. */long long fingerprint;
} dictIterator;
? ? ? ? 成員變量d指向迭代器處理的字典。index是dictht中table數組的下標。table是dict結構中dictht數組的下標,即標識ht[0]還是ht[1]。safe字段用于標識該迭代器是否為一個安全的迭代器。如果是,則可以在迭代過程中使用dictDelete、dictFind等方法;如果不是,則只能使用dictNext遍歷方法。entry和nextEntry分別指向當前的元素和下一個元素。fingerprint是字典的指紋,我們可以先看下指紋算法的實現:
long long dictFingerprint(dict *d) {long long integers[6], hash = 0;int j;integers[0] = (long) d->ht[0].table;integers[1] = d->ht[0].size;integers[2] = d->ht[0].used;integers[3] = (long) d->ht[1].table;integers[4] = d->ht[1].size;integers[5] = d->ht[1].used;/* We hash N integers by summing every successive integer with the integer* hashing of the previous sum. Basically:** Result = hash(hash(hash(int1)+int2)+int3) ...** This way the same set of integers in a different order will (likely) hash* to a different number. */for (j = 0; j < 6; j++) {hash += integers[j];/* For the hashing step we use Tomas Wang's 64 bit integer hash. */hash = (~hash) + (hash << 21); // hash = (hash << 21) - hash - 1;hash = hash ^ (hash >> 24);hash = (hash + (hash << 3)) + (hash << 8); // hash * 265hash = hash ^ (hash >> 14);hash = (hash + (hash << 2)) + (hash << 4); // hash * 21hash = hash ^ (hash >> 28);hash = hash + (hash << 31);}return hash;
}
? ? ? ? 可以見得,它使用了ht[0]和ht[1]的相關信息進行Hash運算,從而得到該字典的指紋。我們可以發現,如果dictht的table、size和used任意一個有變化,則指紋將被改變。這也就意味著,擴容、鎖容、rehash、新增元素和刪除元素都會改變指紋(除了修改元素內容)。
? ? ? ? 生成一個迭代器的方法很簡單,該字典庫提供了兩種方式:
dictIterator *dictGetIterator(dict *d)
{dictIterator *iter = zmalloc(sizeof(*iter));iter->d = d;iter->table = 0;iter->index = -1;iter->safe = 0;iter->entry = NULL;iter->nextEntry = NULL;return iter;
}dictIterator *dictGetSafeIterator(dict *d) {dictIterator *i = dictGetIterator(d);i->safe = 1;return i;
}
? ? ? ? 然后我們看下遍歷迭代器的操作。如果是初次迭代,則要查看是否是安全迭代器,如果是安全迭代器則讓其對應的字典對象的iterators自增;如果不是則記錄當前字典的指紋
dictEntry *dictNext(dictIterator *iter)
{while (1) {if (iter->entry == NULL) {dictht *ht = &iter->d->ht[iter->table];if (iter->index == -1 && iter->table == 0) {if (iter->safe)iter->d->iterators++;elseiter->fingerprint = dictFingerprint(iter->d);}
? ? ? ? 因為要遍歷的時候,字典可以已經處于rehash的中間狀態,所以還要遍歷ht[1]中的元素
iter->index++;if (iter->index >= (long) ht->size) {if (dictIsRehashing(iter->d) && iter->table == 0) {iter->table++;iter->index = 0;ht = &iter->d->ht[1];} else {break;}}iter->entry = ht->table[iter->index];} else {iter->entry = iter->nextEntry;}
? ? ? ? 往往使用迭代器獲得元素后,會讓字典刪除這個元素,這個時候就無法通過迭代器獲取下一個元素了,于是作者設計了nextEntry來記錄當前對象的下一個對象指針
if (iter->entry) {/* We need to save the 'next' here, the iterator user* may delete the entry we are returning. */iter->nextEntry = iter->entry->next;return iter->entry;}}return NULL;
}
? ? ? ? 遍歷完成后,要調用下面方法釋放迭代器。需要注意的是,如果是安全迭代器,就需要讓其指向的字典的iterators自減以還原;如果不是,則需要檢測前后字典的指紋是否一致
void dictReleaseIterator(dictIterator *iter)
{if (!(iter->index == -1 && iter->table == 0)) {if (iter->safe)iter->d->iterators--;elseassert(iter->fingerprint == dictFingerprint(iter->d));}zfree(iter);
}
? ? ? ? 最后我們探討下什么是安全迭代器。源碼中我們看到如果safe為1,則讓字典iterators自增,這樣dict字典庫中的操作就不會觸發rehash漸進,從而在一定程度上(消除rehash影響,但是無法阻止用戶刪除元素)保證了字典結構的穩定。如果不是安全迭代器,則只能使用dictNext方法遍歷元素,而像獲取元素值的dictFetchValue方法都不能調用。因為dictFetchValue底層會調用_dictRehashStep讓字典結構發生改變。
static void _dictRehashStep(dict *d) {if (d->iterators == 0) dictRehash(d,1);
}
? ? ? ? 我查了下該庫在Redis中的應用,遍歷操作不是為了獲取值就是為了刪除值,而沒有增加元素的操作,如
void clusterBlacklistCleanup(void) {dictIterator *di;dictEntry *de;di = dictGetSafeIterator(server.cluster->nodes_black_list);while((de = dictNext(di)) != NULL) {int64_t expire = dictGetUnsignedIntegerVal(de);if (expire < server.unixtime)dictDelete(server.cluster->nodes_black_list,dictGetKey(de));}dictReleaseIterator(di);
}
高級遍歷
? ? ? ? 高級遍歷允許ht[0]和ht[1]之間數據在遷移過程中進行遍歷,通過相應的算法可以保證所有的元素都可以被遍歷到。我們先看下功能的實現:
unsigned long dictScan(dict *d,unsigned long v,dictScanFunction *fn,void *privdata)
? ? ? ? 參數d是字典的指針;v是迭代器,這個迭代器初始值為0,每次調用dictScan都會返回一個新的迭代器。于是下次調用這個函數時要傳入新的迭代器的值。fn是個函數指針,每遍歷到一個元素時,都是用該函數對元素進行操作。
typedef void (dictScanFunction)(void *privdata, const dictEntry *de);
? ? ? ? Redis中這個方法的調用樣例是:
do {cursor = dictScan(ht, cursor, scanCallback, privdata);} while (cursor &&maxiterations-- &&listLength(keys) < (unsigned long)count);
? ? ? ? 對于不在rehash狀態的字典,則只要對ht[0]中迭代器指向的鏈表進行遍歷就行了
dictht *t0, *t1;const dictEntry *de;unsigned long m0, m1;if (dictSize(d) == 0) return 0;if (!dictIsRehashing(d)) {t0 = &(d->ht[0]);m0 = t0->sizemask;/* Emit entries at cursor */de = t0->table[v & m0];while (de) {fn(privdata, de);de = de->next;}
? ? ? ? 如果在rehash狀態,就要遍歷ht[0]和ht[1]。遍歷前要確定哪個dictht.table長度短(假定其長度為len=8),先對短的中該迭代器(假定為iter=4)對應的鏈進行遍歷,然后遍歷大的。然而不僅要遍歷大的dictht中迭代器(iter=4)對應的鏈,還要遍歷比iter大len的迭代器(4+8=12)對應的鏈表。
} else {t0 = &d->ht[0];t1 = &d->ht[1];/* Make sure t0 is the smaller and t1 is the bigger table */if (t0->size > t1->size) {t0 = &d->ht[1];t1 = &d->ht[0];}m0 = t0->sizemask;m1 = t1->sizemask;/* Emit entries at cursor */de = t0->table[v & m0];while (de) {fn(privdata, de);de = de->next;}/* Iterate over indices in larger table that are the expansion* of the index pointed to by the cursor in the smaller table */do {/* Emit entries at cursor */de = t1->table[v & m1];while (de) {fn(privdata, de);de = de->next;}/* Increment bits not covered by the smaller mask */v = (((v | m0) + 1) & ~m0) | (v & m0);/* Continue while bits covered by mask difference is non-zero */} while (v & (m0 ^ m1));}
? ? ? ? 最后要重新計算下次使用的迭代器并返回
/* Set unmasked bits so incrementing the reversed cursor* operates on the masked bits of the smaller table */v |= ~m0;/* Increment the reverse cursor */v = rev(v);v++;v = rev(v);return v;
}
? ? ? ? 從上面的設計來看,調用dictScan時不能有多線程操作該字典,否則會出現遺漏遍歷的情況。但是在每次調用dictScan之間可以對字典進行操作。
? ? ? ? 其實這個遍歷中最核心的是迭代器v的計算方法,我們只要讓v從0開始,執行“或操作”最短ht.table(~m0)大小、二進制翻轉、加1、再二進制翻轉就可以實現0到~m0的遍歷。我們看個例子:(下圖有筆誤,第一行十進制是4,第三行十進制是6)
? ? ? ? 我一直想不出這套算法為什么能滿足這樣的特點,還是需要數學大神解釋一下。同時也可見這種算法的作者Pieter Noordhuis數學有一定功底。
? ? ? ? 關鍵這樣的算法不僅可以完成遍歷,還可以在數組大小動態變化時保證元素被全部遍歷到。我把代碼提煉出來,模擬了長度為8的數組向長度為16的數組擴容,和長度為16的數組向長度為8的數組縮容的過程。為了讓問題簡單化,我們先不考慮兩個數組的問題,只認為數組在一瞬間被擴容和縮容。
? ? ? ? 我們先看下擴容前的遍歷過程
? ? ? ? 假如第8次迭代后,數組瞬間擴容,這個時候遍歷過程是
? ? ? ? 此時多了一次對下標為15的遍歷,可以想象這次遍歷應該會重復下標為15%8=7遍歷(即第8次)的元素。所以dictScan具有潛在對一個元素遍歷多次的問題。我們再看第7次迭代時發生瞬間擴容的情況
? ? ? ? 此時數組下標為11的遍歷(即第8次遍歷)會部分重復下標為3的遍歷(即第7次遍歷)元素。而之后的遍歷就不會重復了。
? ? ? ? 我們再看下數組的縮容。為縮容前的狀態是
? ? ? ? 如果第16次遍歷時突然縮容,則遍歷過程是
? ? ? ? 可見第16次遍歷的是新數組下標為7的元素,和第15次遍歷老數組下標為7的元素不同,本次遍歷的結果包含前者(因為它還包含之前下標為15的元素)。所以也存在元素重復遍歷的問題。
? ? ? ? 我們看下第15次遍歷時突然縮容的遍歷過程
? ? ? ? 因為縮容到8,所以最后一次遍歷下標7的情況,既包括之前老數組下標為7的元素,也包含老數組下標為15的元素。所以本次遍歷不會產生重復遍歷元素的問題。
? ? ? ? 我們再看下第14次遍歷突然縮容的遍歷過程
? ? ? ? 第14次本來是要遍歷下標為11的元素。由于發生縮容,就遍歷新的數組的下標為3的元素。所以第14的遍歷包含第13次的遍歷元素。
? ? ? ? 一個數組如此,像dict結構中有兩個dictht的情況,則稍微復雜點。我們通過下圖可以發現,不同時機ht[0]擴容或者縮容,都可以保證元素被全遍歷
? ? ? ? 上面測試的代碼是:
#define TWO_FOUR_MASK 15
#define TWO_THREE_MASK 7static unsigned long rev(unsigned long v) {unsigned long s = 8 * sizeof(v);unsigned long mask = ~0;while ((s >>= 1) > 0) {mask ^= (mask <<s);v = ((v >> s) & mask) | ((v << s) & ~mask);}return v;
}unsigned long loop_single_expand_shrinks(unsigned long v, int change, int expand) {unsigned long m0 = 0;if (expand) {if (change) {m0 = TWO_FOUR_MASK;}else {m0 = TWO_THREE_MASK;}}else {if (change) {m0 = TWO_THREE_MASK;}else {m0 = TWO_FOUR_MASK;}}unsigned long t0idx = t0idx = v & m0; printf(" t0Index: %lu ", t0idx);v |= ~m0;v = rev(v);v++;v = rev(v);return v;
}unsigned long loop(unsigned long v) {unsigned long m0 = TWO_THREE_MASK;unsigned long m1 = TWO_FOUR_MASK;unsigned long t0idx = v & m0;printf(" t0Index: %lu ", t0idx);printf(" t1Index: ");do {unsigned long t1idx = v & m1;printf("%lu ", t1idx);v = (((v | m0) + 1) & ~ m0) | (v & m0);} while (v & (m0 ^ m1));v |= ~m0;v = rev(v);v++;v = rev(v);return v;
}unsigned long loop_expand_shrinks(unsigned long v, int change, int expand) {unsigned long m0 = 0;unsigned long m1 = 0;if (!change) {m0 = TWO_THREE_MASK;m1 = TWO_FOUR_MASK;unsigned long t0idx = v & m0;if (expand) {printf(" t0Index: %lu ", t0idx);printf(" t1Index: ");}else {printf(" t1Index: %lu ", t0idx);printf(" t0Index: ");}do {unsigned long t1idx = v & m1;printf("%lu ", t1idx);v = (((v | m0) + 1) & ~ m0) | (v & m0);} while (v & (m0 ^ m1));}else {if (expand) {m0 = TWO_FOUR_MASK;}else {m0 = TWO_THREE_MASK;}unsigned long t0idx = v & m0;printf(" t0Index: %lu ", t0idx);}v |= ~m0;v = rev(v);v++;v = rev(v);return v;
}void print_binary(unsigned long v) {char s[128] = {0};_itoa_s(v, s, sizeof(s), 2);printf("0x%032s", s);
}void check_loop_normal() {unsigned long v = 0;do {print_binary(v);v = loop(v);printf("\n");} while (v != 0);
}void check_loop_expand_shrinks(int expand) {int loop_count = 9;for (int n = 0; n < loop_count; n++) {unsigned long v = 0;int change = 0;int call_count = 0;do {if (call_count == n) {change = 1;}print_binary(v);v = loop_expand_shrinks(v, change, expand);call_count++;printf("\n");} while (v != 0);printf("\n");}
}void check_loop_single_expand_shrinks(int expand) {int loop_count = 17;for (int n = 0; n < loop_count; n++) {unsigned long v = 0;int change = 0;int call_count = 0;do {if (call_count == n) {change = 1;}print_binary(v);v = loop_single_expand_shrinks(v, change, expand);call_count++;printf("\n");} while (v != 0);printf("\n");}
}
?
總結
以上是生活随笔為你收集整理的Redis源码解析——字典遍历的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Redis源码解析——字典基本操作
- 下一篇: Redis源码解析——双向链表