Redis源码解析——有序整数集
? ? ? ? 有序整數集是Redis源碼中一個以大尾(big endian)形式存儲,由小到大排列且無重復的整型集合。它存儲的類型包括16位、32位和64位的整型數。在介紹這個庫的實現前,我們還需要先熟悉下大小尾內存存儲機制。(轉載請指明出于breaksoftware的csdn博客)
大小尾(Big Endian/Little Endian)
? ? ? ? 第一次接觸這個概念還是在大學時上“計算機原理”時,當時只是簡單的知道它的特點,但是絲毫沒有深究它們產生的原因和特點。這次借著解析Redis源碼,重新學習了一下大小尾。
? ? ? ? 如果進行過逆向的同學一般都會知道ESP指針的作用,它指向當前函數棧Frame的棧頂。相應的,EBP寄存器指向當前函數棧Frame的棧底。在逆向的匯編代碼中,我們會發現一般使用EBP結合偏移的方法去表示棧上的一個變量,如:
mov    eax, dword ptr  [ebp+0ch]
mov    ebx, dword ptr  [ebp+08h] 
? ? ? ? 但是為什么沒有使用ESP表示變量呢?這是因為ESP指針在當前調用堆棧中也不是穩定的,比如我們進行函數調用,會將一些信息進行入棧處理,這個時候棧頂指針就會減小以擴大有效棧的區域。為什么是棧頂值減小以擴大棧區域呢?這是因為棧結構的特點:棧底地址比棧頂地址大。
?
 ? ? ? ? 我們再看下大小尾數據在棧空間的布局。
? ? ? ? 大尾結構將數據的高位放在地址低處,而小尾結構將數據的高位放在地址的高位。于是我們看下0x00123456在不同結構中的布局
? ? ? ? 如果我們直接在內存中查看,則是這樣展現的
? ? ? ? 可以發現大尾的結構比較正常,它展現的數據的形式和人類的認知方式是相同的——高位在前,低位在后。但是小尾的展現形式則比較反人類——需要倒著看。但是存在即合理,那么小尾結構那么反人類為什么它還存在呢?
? ? ? ? 這就要從CPU的歷史講起來。歷史上關于選擇大尾還是小尾有著很多爭論,各派都有自己的理論依據。我在網上找到一篇關于這段歷史的文檔——《ON HOLY WARS AND A PLEA FOR PEACE》,鑒于文檔稍長,我沒有仔細閱讀,有興趣的讀者可以去了解下。我在這兒簡單講一下我個人的認識:大尾結構便于人類理解,小尾結構便于計算機計算。
? ? ? ? 大尾結構便于人類理解,這點我們在上面的圖中已經發現了。現行的網絡傳輸協議也是采用大尾結構,很明顯作為協議,其重點是協議的可被理解性,而非其參與計算的能力。
? ? ? ? 小尾結構便于計算機計算怎么理解呢?舉個例子,比如我們要對上面數據(假設為a)執行加法操作,操作數是0x12efcdab。那么計算機取到a的地址是0xFF000000,然后用操作數中的0xab與0xFF000000地址的數據進行add操作,操作結果還保存在0xFF000000中;如果有進位,則參與到操作數0xcd與0xFF000001地址的數據相加中。CPU只要對操作數和被操作數的地址向后移動取值相加就可以了。我們再想像下大尾數據的處理方法,如果也是從地址低位開始計算——即是數據高位,則可能產生回溯的問題——數據低位計算有進位則要求改之前計算的值——甚至還要改之前的之前計算的值。如果從地址高位開始計算——即數據低位,則有一次通過a地址(地址低位)跳轉到地址高位的過程。可能有人會說你為什么不拿減法例子呢?人類的減法操作都是從高位向低位進行的。但是計算機沒有減法器——它是通過加法操作進行減法運算的。
? ? ? ? 大小尾雖然是一個非常古老的問題,但是我們在進行數據跨網絡交互時要考慮,因為網絡字節序和本機字節序可能不一樣。跨語言傳輸數據時也要考慮,像Java的數據就是大尾結構的。
? ? ? ? Redis在源碼的endianconv.c提供了一系列小尾結構向大尾結構轉換的方法。我們看一下64位數據的處理函數:
/* Toggle the 64 bit unsigned integer pointed by *p from little endian to* big endian */
void memrev64(void *p) {unsigned char *x = p, t;t = x[0];x[0] = x[7];x[7] = t;t = x[1];x[1] = x[6];x[6] = t;t = x[2];x[2] = x[5];x[5] = t;t = x[3];x[3] = x[4];x[4] = t;
}uint64_t intrev64(uint64_t v) {memrev64(&v);return v;
} 
? ? ? ? Redis使用大尾也印證了我之前的觀點,因為Redis是重要的功能是存儲和網絡交互,而非進行數值計算。
? ? ? ? 接下來我們看看Redis的有序整數集的保存結構。
基礎結構
typedef struct intset {uint32_t encoding;uint32_t length;int8_t contents[];
} intset; 
? ? ? ? encoding是表示該結構保存的數據類型。它可以是下列類型值:
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t)) 
? ? ? ? 至于結構中encoding最終設置為何種類型,則要視其存儲的最大數據的類型來決定。比如該結構最初始的內容只有0x08,則類型是INTSET_ENC_INT16。而如果加入了0x12345678,則類型將變成INTSET_ENC_INT32。具體的判斷方法是看數值的范圍:
static uint8_t _intsetValueEncoding(int64_t v) {if (v < INT32_MIN || v > INT32_MAX)return INTSET_ENC_INT64;else if (v < INT16_MIN || v > INT16_MAX)return INTSET_ENC_INT32;elsereturn INTSET_ENC_INT16;
} 
? ? ? 不要被上面的區間名給欺騙了,它們的定義如下。
#define INT8_MIN     ((int8_t)_I8_MIN)
#define INT8_MAX     _I8_MAX
#define INT16_MIN    ((int16_t)_I16_MIN)
#define INT16_MAX    _I16_MAX
#define INT32_MIN    ((int32_t)_I32_MIN)
#define INT32_MAX    _I32_MAX 
? ? ? ??_IXX_MIN都是負數,其最高位為1,這樣使用無符號類型強轉之后便是更高區間的最大值。? ? ? ??
? ? ? ? intset結構中length字段表示contents數組元素的個數;contents則是整型數數組的首地址,但是不要被它類型int8_t欺騙了,它實際存儲的類型可能是int32_t或者int64_t。
創建集合
? ? ? ? 集合創建通過下面方法實現
intset *intsetNew(void) {intset *is = zmalloc(sizeof(intset));is->encoding = intrev32ifbe(INTSET_ENC_INT16);is->length = 0;return is;
} 
? ? ? ? 可見集合結構是在堆上分配的,初始類型是INTSET_ENC_INT16,元素個數是0。此時contents指針還是無效的,這說明該結構可能沒有采用預分配空間的設計,而是實時分配。之后的代碼也印證了這點。
重分配集合空間
? ? ? ? 因為inset結構是個可變長度結構,其可變部分就是contents數組的長度,所以重分配集合空間主要是根據集合保存的數據類型和數組元素個數重新分配空間。
static intset *intsetResize(intset *is, uint32_t len) {uint32_t size = len*intrev32ifbe(is->encoding);is = zrealloc(is,sizeof(intset)+size);return is;
} 
獲取集合長度
? ? ? ? 即返回intset的length字段,它表示其保存的數字個數
/* Return intset length */
uint32_t intsetLen(intset *is) {return intrev32ifbe(is->length);
} 
獲取集合占用空間大小
? ? ? ? 集合結構的設計說明其是一個可變長結構,所以計算空間大小要把結構的頭大小和可變長度數組長度相加
/* Return intset blob size in bytes. */
size_t intsetBlobLen(intset *is) {return sizeof(intset)+intrev32ifbe(is->length)*intrev32ifbe(is->encoding);
} 
通過位置設置值
? ? ? ? 因為contents保存的數值長度要視intset的encoding類型決定,所以通過位置定位元素時,需要將contents強轉為相應類型的指針。這樣通過加法操作,可以讓指針步進的長度為元素類型的長度
static void _intsetSet(intset *is, int pos, int64_t value) {uint32_t encoding = intrev32ifbe(is->encoding);if (encoding == INTSET_ENC_INT64) {((int64_t*)is->contents)[pos] = value;memrev64ifbe(((int64_t*)is->contents)+pos);} else if (encoding == INTSET_ENC_INT32) {((int32_t*)is->contents)[pos] = value;memrev32ifbe(((int32_t*)is->contents)+pos);} else {((int16_t*)is->contents)[pos] = value;memrev16ifbe(((int16_t*)is->contents)+pos);}
} 
通過位置獲取值
? ? ? ? 獲取值時同樣要根據intset保存的數據類型決定對contents進行加法操作時步進的長度
/* Return the value at pos, given an encoding. */
static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {int64_t v64;int32_t v32;int16_t v16;if (enc == INTSET_ENC_INT64) {memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));memrev64ifbe(&v64);return v64;} else if (enc == INTSET_ENC_INT32) {memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));memrev32ifbe(&v32);return v32;} else {memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));memrev16ifbe(&v16);return v16;}
}/* Return the value at pos, using the configured encoding. */
static int64_t _intsetGet(intset *is, int pos) {return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding));
} 
查找元素
? ? ? ? 查找元素時,先看待查找的元素數值是否在該集合可以表達的數值空間之內。如果不在則直接認為找不到元素,這樣可以免去查找操作
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;int64_t cur = -1;/* The value can never be found when the set is empty */if (intrev32ifbe(is->length) == 0) {if (pos) *pos = 0;return 0;}  
? ? ? ? 由于intset保存的是有序數字,且數字從小到大排列。這樣如果元素數值比第一個元素小,或者比最后一個元素大,則說明待查元素也不在數組中。
      else {/* Check for the case where we know we cannot find the value,* but do know the insert position. */if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {if (pos) *pos = intrev32ifbe(is->length);return 0;} else if (value < _intsetGet(is,0)) {if (pos) *pos = 0;return 0;}} 
? ? ? ? 其他情況則說明待查元素,這個時候就采用二分查找的方式進行
    while(max >= min) {mid = ((unsigned int)min + (unsigned int)max) >> 1;cur = _intsetGet(is,mid);if (value > cur) {min = mid+1;} else if (value < cur) {max = mid-1;} else {break;}}if (value == cur) {if (pos) *pos = mid;return 1;} else {if (pos) *pos = min;return 0;}
} 
檢測元素是否在集合中
? ? ? ? 檢測操作非常簡單,只是簡單的調用intsetSearch方法
/* Determine whether a value belongs to this set */
uint8_t intsetFind(intset *is, int64_t value) {uint8_t valenc = _intsetValueEncoding(value);return valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,NULL);
} 
新增一個更大數值類型元素
? ? ? ? 如果一開始時,集合中保存的元素只有0x01,那么集合的類型是INTSET_ENC_INT16。contents數組的長度也是INTSET_ENC_INT16的長度。現在要往集合中新增一個元素0x12345678,這個時候INTSET_ENC_INT16類型長度的空間已經不能保存該數據了。于是需要對整個結構進行升級
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {uint8_t curenc = intrev32ifbe(is->encoding);uint8_t newenc = _intsetValueEncoding(value);int length = intrev32ifbe(is->length); 
? ? ? ? 先要獲取以前集合類型和長度,還要計算新集合為何種類型
    int prepend = value < 0 ? 1 : 0; 
? ? ? ? 然后檢查新增的值是否為負數。因為該數值的絕對值比之前數組中所有元素都要大,所以如果該數如果是負數,則它比之前任何元素都小,這樣它就要插在頭部。相反,如果它是正數,則可能是插在尾部。
    /* First set new encoding and resize */is->encoding = intrev32ifbe(newenc);is = intsetResize(is,intrev32ifbe(is->length)+1);/* Upgrade back-to-front so we don't overwrite values.* Note that the "prepend" variable is used to make sure we have an empty* space at either the beginning or the end of the intset. */while(length--)_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc)); 
? ? ? ? 使用新的類型更新集合encoding字段,再通過intsetResize重新分配集合的內存。因為當前內存數據分布和之前的一致(除了變長了),所以還要通過之后的while循環將之前的值轉移到其現在應該在的位置。
    /* Set the value at the beginning or the end. */if (prepend)_intsetSet(is,0,value);else_intsetSet(is,intrev32ifbe(is->length),value);is->length = intrev32ifbe(intrev32ifbe(is->length)+1);return is;
} 
? ? ? ? 最后視新增數據的正負情況插入到新結構的不同位置。
數組尾部空間平移
? ? ? ? 這步操作在要往數組中間插入或者刪除元素時發生。如果插入元素,則需要將插入位置的元素及之后的元素一起向后平移。如果刪除元素,則要將被刪除元素之后的元素向前平移
static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {void *src, *dst;uint32_t bytes = intrev32ifbe(is->length)-from;uint32_t encoding = intrev32ifbe(is->encoding);if (encoding == INTSET_ENC_INT64) {src = (int64_t*)is->contents+from;dst = (int64_t*)is->contents+to;bytes *= sizeof(int64_t);} else if (encoding == INTSET_ENC_INT32) {src = (int32_t*)is->contents+from;dst = (int32_t*)is->contents+to;bytes *= sizeof(int32_t);} else {src = (int16_t*)is->contents+from;dst = (int16_t*)is->contents+to;bytes *= sizeof(int16_t);}memmove(dst,src,bytes);
} 
增加元素
? ? ? ? 增加元素時,要先判斷待添加的元素是否比現在的集合類型大。如果是,則要重新分配和更新整個集合內存空間
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {uint8_t valenc = _intsetValueEncoding(value);uint32_t pos;if (success) *success = 1;/* Upgrade encoding if necessary. If we need to upgrade, we know that* this value should be either appended (if > 0) or prepended (if < 0),* because it lies outside the range of existing values. */if (valenc > intrev32ifbe(is->encoding)) {/* This always succeeds, so we don't need to curry *success. */return intsetUpgradeAndAdd(is,value);} 
? ? ? ? 如果之前集合的類型可以承載待添加的元素,則先去檢查元素是否已經在數組中。如果已經存在,則不再進行添加操作,直接認為操作成功
      else {/* Abort if the value is already present in the set.* This call will populate "pos" with the right position to insert* the value when it cannot be found. */if (intsetSearch(is,value,&pos)) {if (success) *success = 0;return is;} 
? ? ? ? 如果不在數組中,則上面的intsetSearch方法將計算出待添加的數據需要被插入到數組中的的位置。這個時候就需要重新分配集合長度,并將要插入的位置及之后的數據向后平移,并把待添加數據設置到數組的相應位置。
        is = intsetResize(is,intrev32ifbe(is->length)+1);if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);}_intsetSet(is,pos,value);is->length = intrev32ifbe(intrev32ifbe(is->length)+1);return is;
} 
刪除元素
? ? ? ? 刪除元素比較簡單,只要通過intsetSearch找到元素的位置,將該位置之后的元素向前平移就行了
/* Delete integer from intset */
intset *intsetRemove(intset *is, int64_t value, int *success) {uint8_t valenc = _intsetValueEncoding(value);uint32_t pos;if (success) *success = 0;if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {uint32_t len = intrev32ifbe(is->length);/* We know we can delete */if (success) *success = 1;/* Overwrite value with tail and update length */if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);is = intsetResize(is,len-1);is->length = intrev32ifbe(len-1);}return is;
} 
? ? ? ? 通過上面的函數,我們發現Redis的整數集在增刪改元素時要自動調整元素排序。在新增絕對值超過當前集合可以表達的數據時,升級當前集合。但是如果刪除元素時,即使現存的數字都比當前集合表達的區間的最小值還要小,也不會發生降級的操作。
總結
以上是生活随笔為你收集整理的Redis源码解析——有序整数集的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: Redis源码解析——双向链表
 - 下一篇: Redis源码解析——Zipmap