深入浅出理解reedsolomon库数据冗余算法原理和具体实现源码分析
reedsolomon算法實現需要矩陣和伽羅華域(Galois Field)的一些知識,這兩者在前面都已做了介紹,并且這部分網上很多人都做了詳細說明,這里再挑重點的部分能使用到的地方介紹一下。
1、伽羅華域運算:
伽羅瓦域,該域本質上是一組受限的數字(也稱有限域),GF(2^n) 表示含有2^n個元素的有限域,這里n取8,即GF(256).有限域中的每一個元素都對應一個多項式。
(1)、加減法:
將兩個多項式中相同階數的項系數相加或相減。例如 (x^2 + x ) + (x + 1) = x^2 + 2x +1
對于GF(2^w)上的多項式計算,多項式系數只能取 0或1。(如果是GF(3^w),那么系數可以取 0、1、 2)
GF(2^w)的多項式加法中,合并階數相同的同類項時,由于0+0=0,1+1=0,0+1=1+0=1,因此系數不是進行加法操作,而是進行異或操作。
GF(2^w)的多項式減法等于加法,例如x ^4 – x^4就等于x^4 + x^4。
(2)、乘除法:
乘除法是使用多項式進行,只不過多項式乘了之后需要mod P(x),P(x)是本源多項式,GF(2^w)上的本源多項式一般取:x^8+x^4 + x^3 +x^2 +1。Reedsolomon里面是使用查表法來進行乘除運算。
(3)、正向表和逆向表:
生成元是域上的一類特殊元素,生成元的冪可以遍歷域上的所有元素。假設g是域GF(2^w)上生成元,那么集合 {g0,g1 , ……,g(2^w-1) } 包含了域GF(2^w)上所有非零元素。在域GF(2^w)中2總是生成元。
將生成元應用到多項式中, GF(2^w)中的所有多項式都是可以通過多項式生成元g通過冪求得。即域中的任意元素a,都可以表示為a = g^k。
GF(2^w)是一個有限域,就是元素個數是有限的,但指數k是可以無窮的。所以必然存在循環。這個循環的周期是2^w-1(g不能生成多項式 0)。所以當k大于等于2^w-1時,g^k=g^(k%(2^w-1))。
例如2^k = a,有正過程和逆過程。知道指數k求a是正過程,知道值a求指數k是逆過程:
對于乘法,假設a=g^n,b=g^m。那么a*b = g^n* g^m = g^(n+m)。查表的方法就是根據a和b,分別查表得到n和m,然后查表g^(n+m)即可。
因此需要構造正表和反表,在GF(2^w)域上分別記為gflog和gfilog。gflog是將二進制形式映射為多項式形式,gfilog是將多項式形式映射為二進制形式。
注意:多項式0 ,是無法用生成元生成的。g^0等于多項式1,而不是 0。
對于2^8 列出部分正向表和逆向表:
| i | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 |
| gflog[i] | \ | 0 | 1 | 25 | 2 | 50 | 26 | 198 | 3 | 223 | 51 | 238 | 27 | 104 | 199 | 75 | 4 | 100 | 224 | 14 |
| gfilog[i] | 1 | 2 | 4 | 8 | 16 | 32 | 64 | 128 | 29 | 58 | 116 | 232 | 205 | 135 | 19 | 38 | 76 | 152 | 45 | 90 |
舉個例子,生成Vandermonde矩陣時需要計算a^n,計算方法是:
(1)、根據a查找gflog表,找出對應的多項式k
(2)、n個多項式加就是多項式k*n
(3)、最后查表gfilog將多項式轉成二進制。
2、范德蒙(Vandermonde)矩陣
在線性代數中,Vandermonde矩陣是一個每行有幾何級數項的矩陣,即m×n矩陣
?也可以表示成:
矩陣的行列式的值可以表示成:
這稱為范德蒙行列式或范德蒙多項式。
范德蒙矩陣是滿秩矩陣,滿秩矩陣是可逆的,從范德蒙矩陣中任意取n行組成n階方陣都可逆。
3、Reedsolomon
?Reedsolomon算法分成兩部分:編碼和解碼。整體結構如下圖:
?????????????????????????????????????????
Reedsolomon提供了三種形式的編碼矩陣和解碼矩陣:Cauchy、PAR1Matrix、vandermonde。默認是使用vandermonde矩陣來完成編解碼工作。因為2^8的限制,數據塊和校驗塊只和不能超過256,如果超過256 就無法提供編解碼服務。默認是4+2,即4個數據塊2個校驗塊。
(1)、首先生成編/解碼矩陣:
func New(dataShards, parityShards int, opts ...Option) (Encoder, error) {r := reedSolomon{DataShards: dataShards,//數據塊個數ParityShards: parityShards, //校驗塊個數Shards: dataShards + parityShards,o: defaultOptions, }for _, opt := range opts {opt(&r.o) } //進行一些參數檢查if dataShards <= 0 || parityShards <= 0 {return nil, ErrInvShardNum}if dataShards+parityShards > 256 {return nil, ErrMaxShardNum}var err errorswitch { case r.o.useCauchy://生成柯西矩陣r.m, err = buildMatrixCauchy(dataShards, r.Shards) case r.o.usePAR1Matrix://生成PAR1矩陣r.m, err = buildMatrixPAR1(dataShards, r.Shards) default://默認的使用范德蒙矩陣r.m, err = buildMatrix(dataShards, r.Shards)}if err != nil {return nil, err} if r.o.shardSize > 0 {//使用cpu計算矩陣的設置cacheSize := cpuid.CPU.Cache.L2if cacheSize <= 0 {// Set to 128K if undetectable.cacheSize = 128 << 10}p := runtime.NumCPU()shards := 1 + parityShardsg := (r.o.shardSize * shards) / (cacheSize - (cacheSize >> 4))g *= 2if g < p {g = p}// Have g be multiple of pg += p - 1g -= g % pr.o.maxGoroutines = g}//逆矩陣緩存在樹中,數據的無效行的索引作為key值。此tree是為了加快解碼,不用每次都重新生成解碼矩陣r.tree = newInversionTree(dataShards, parityShards)//記錄校驗部分的矩陣信息r.parity = make([][]byte, parityShards)for i := range r.parity {r.parity[i] = r.m[dataShards+i]fmt.Println("parity i", i, ":", r.parity[i])}return &r, err }?以vandermonde矩陣為例,生成vandermonde矩陣的函數是buildMatrix:
func buildMatrix(dataShards, totalShards int) (matrix, error) {// 生成標準的范德蒙矩陣vm, err := vandermonde(totalShards, dataShards)if err != nil {return nil, err}fmt.Println("vm :", vm)//從范德蒙標準矩陣中獲取數據塊對應的n階方陣top, err := vm.SubMatrix(0, 0, dataShards, dataShards)if err != nil {return nil, err} fmt.Println("top :", top) //取數據塊對應的n階方陣的逆矩陣topInv, err := top.Invert()if err != nil {return nil, err} fmt.Println("topInv :", topInv) //vm是個標準的范德蒙矩陣,并不是我們想要的編碼矩陣,使用范德蒙原始矩陣和上面解解出來的逆矩陣相乘,可以得到上部分是單位矩陣下部分是校驗矩陣的編碼矩陣。return vm.Multiply(topInv) }范德蒙標準矩陣的生成使用了我們前面說的a^n計算方法,該方法是在伽羅華域內冪運算。
func vandermonde(rows, cols int) (matrix, error) {// rows是數據塊和校驗塊的和,cols是數據塊個數,此處生成一個空矩陣result, err := newMatrix(rows, cols)if err != nil {return nil, err}for r, row := range result {for c := range row {//將次矩陣填充成范德蒙標準形式,使用了冪運算result[r][c] = galExp(byte(r), c)}}return result, nil }galExp使用了上面提到的正向表和逆向表,根據查表在二進制和多項式之間切換。
到此,已經生成了一個范德蒙矩陣:
vm : [[1, 0, 0, 0],
[1, 1, 1, 1],
?[1, 2, 4, 8],
?[1, 3, 5, 15],
?[1, 4, 16, 64],
?[1, 5, 17, 85]]
但是還不是我們想要的單位矩陣的形式,對[1, 0, 0, 0], [1, 1, 1, 1], [1, 2, 4, 8], [1, 3, 5, 15]求解對應的逆矩陣,逆變換使用了增廣矩陣的初等行變換方法,主要的思想是在該矩陣后面補一個單位矩陣,然后進行初等變換,將單位矩陣變道左側,右側的就是對應的逆矩陣:
?????????????????????????????????????????????????????????????????
gaussianElimination函數是執行初等變換的主要函數,其中用到了除法運算。
func (m matrix) gaussianElimination() error {rows := len(m)columns := len(m[0])for r := 0; r < rows; r++ {if m[r][r] == 0 {for rowBelow := r + 1; rowBelow < rows; rowBelow++ {if m[rowBelow][r] != 0 {m.SwapRows(r, rowBelow)break}}}if m[r][r] == 0 {return errSingular}if m[r][r] != 1 {scale := galDivide(1, m[r][r])for c := 0; c < columns; c++ {m[r][c] = galMultiply(m[r][c], scale)}}for rowBelow := r + 1; rowBelow < rows; rowBelow++ {if m[rowBelow][r] != 0 {scale := m[rowBelow][r]for c := 0; c < columns; c++ {m[rowBelow][c] ^= galMultiply(scale, m[r][c])}}}}for d := 0; d < rows; d++ {for rowAbove := 0; rowAbove < d; rowAbove++ {if m[rowAbove][d] != 0 {scale := m[rowAbove][d]for c := 0; c < columns; c++ {m[rowAbove][c] ^= galMultiply(scale, m[d][c])}}}}return nil }逆矩陣求解出來時這個樣子:
[[1, 0, 0, 0],
[123, 1, 142, 244],
[0, 122, 244, 142],
[122, 122, 122, 122]]
用原始的范德蒙矩陣和上面求解出來的逆矩陣,相乘就得到了我們希望的編/解碼矩陣。這里需要注意,矩陣相乘使用的是查表法,因為一共就256個數相乘,總共有256*256種可能,把這256*256種場景全部提前計算出來做成一個矩陣形式,使用時將需要相乘和兩個數分別作為行列號,直接帶入表中即可查詢到這兩個數相乘的結果。
?
func (m matrix) Multiply(right matrix) (matrix, error) {if len(m[0]) != len(right) {return nil, fmt.Errorf("columns on left (%d) is different than rows on right (%d)", len(m[0]), len(right))}result, _ := newMatrix(len(m), len(right[0]))for r, row := range result {for c := range row {var value bytefor i := range m[0] {value ^= galMultiply(m[r][i], right[i][c])}result[r][c] = value}}fmt.Println("Multiply :", result)return result, nil }?
最終形態:
[ [1, 0, 0, 0],
?[0, 1, 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1],
?[27, 28, 18, 20],
[28, 27, 20, 18]]
(2)、編碼
編碼過程是根據數據塊和編碼矩陣計算出校驗塊。因為數據塊長度是單位是字節,因此是按照字節將校驗塊的數據計算出來,根據矩陣乘法我們知道需要每個字節和校驗數據相乘,然后在做加法,加法在伽羅華域中是異或運算,知道這點就很容易看看懂下面函數了.
func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) {if r.o.useAVX512 && len(inputs) >= 4 && len(outputs) >= 2 {r.codeSomeShardsAvx512(matrixRows, inputs, outputs, outputCount, byteCount)return} else if r.o.maxGoroutines > 1 && byteCount > r.o.minSplitSize {r.codeSomeShardsP(matrixRows, inputs, outputs, outputCount, byteCount)return}for c := 0; c < r.DataShards; c++ {in := inputs[c]for iRow := 0; iRow < outputCount; iRow++ {if c == 0 {//第一次計算校驗數據,根據計算出來的值直接賦值到outputsgalMulSlice(matrixRows[iRow][c], in, outputs[iRow], &r.o)} else { //不是第一次計算校驗數據,根據計算出來的值異或原有的值galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], &r.o)}}} } //galMulSlice、galMulSliceXor是對每個字節進行校驗數據計算,同樣是使用了查表法來做乘法 func galMulSlice(c byte, in, out []byte, o *options) {var done intif o.useAVX2 {galMulAVX2(mulTableLow[c][:], mulTableHigh[c][:], in, out)done = (len(in) >> 5) << 5} else if o.useSSSE3 {galMulSSSE3(mulTableLow[c][:], mulTableHigh[c][:], in, out)done = (len(in) >> 4) << 4}remain := len(in) - doneif remain > 0 {mt := mulTable[c][:256]for i := done; i < len(in); i++ {out[i] = mt[in[i]]}} }func galMulSliceXor(c byte, in, out []byte, o *options) {var done intif o.useAVX2 {galMulAVX2Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out)done = (len(in) >> 5) << 5} else if o.useSSSE3 {galMulSSSE3Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out)done = (len(in) >> 4) << 4}remain := len(in) - doneif remain > 0 {mt := mulTable[c][:256]for i := done; i < len(in); i++ {out[i] ^= mt[in[i]]}} }編碼程比較簡單,主要就是查表計算乘法計算出校驗數據塊。
(3)、解碼(數據重建)
解碼分成重建原始數據塊和重建校驗塊。如果是重建數據塊,需要在編/解碼將損壞的數據塊對應的行去掉然后求逆矩陣,根據逆矩陣和完整的數據塊計算出元數據數據塊。如果是重建校驗塊,直接根據上面說的編碼,重新計算出校驗塊。如果數據塊和校驗塊都需要重建,先重建數據塊,然后在重建校驗塊。
func (r reedSolomon) reconstruct(shards [][]byte, dataOnly bool) error {if len(shards) != r.Shards {return ErrTooFewShards}// 校驗,檢查是否有塊需要重建err := checkShards(shards, true)if err != nil {return err}//檢查部分略過shardSize := shardSize(shards)numberPresent := 0for i := 0; i < r.Shards; i++ {if len(shards[i]) != 0 {numberPresent++}}if numberPresent == r.Shards {return nil}// More complete sanity checkif numberPresent < r.DataShards {return ErrTooFewShards } //這里將損壞的塊標記,subShards := make([][]byte, r.DataShards)validIndices := make([]int, r.DataShards)invalidIndices := make([]int, 0)subMatrixRow := 0for matrixRow := 0; matrixRow < r.Shards && subMatrixRow < r.DataShards; matrixRow++ {if len(shards[matrixRow]) != 0 {subShards[subMatrixRow] = shards[matrixRow]validIndices[subMatrixRow] = matrixRowsubMatrixRow++} else {//損壞的塊invalidIndices = append(invalidIndices, matrixRow)} } //如果tree中該損壞的塊已有對應的解碼矩陣直接取來用即可dataDecodeMatrix := r.tree.GetInvertedMatrix(invalidIndices)if dataDecodeMatrix == nil {//若解碼矩陣不存在,新建,獲取完好行對應的矩陣subMatrix, _ := newMatrix(r.DataShards, r.DataShards)for subMatrixRow, validIndex := range validIndices {for c := 0; c < r.DataShards; c++ {subMatrix[subMatrixRow][c] = r.m[validIndex][c]}}//數據完好行矩陣求逆dataDecodeMatrix, err = subMatrix.Invert()if err != nil {return err}fmt.Println("dataDecodeMatrix:", dataDecodeMatrix)//更新進tree中err = r.tree.InsertInvertedMatrix(invalidIndices, dataDecodeMatrix, r.Shards)if err != nil {return err}}outputs := make([][]byte, r.ParityShards)matrixRows := make([][]byte, r.ParityShards)outputCount := 0//取逆矩陣的前N階方陣,此方陣就是數據重建的解碼矩陣for iShard := 0; iShard < r.DataShards; iShard++ {if len(shards[iShard]) == 0 {if cap(shards[iShard]) >= shardSize {shards[iShard] = shards[iShard][0:shardSize]} else {shards[iShard] = make([]byte, shardSize)}outputs[outputCount] = shards[iShard]matrixRows[outputCount] = dataDecodeMatrix[iShard]outputCount++} } //數據重建,這個函數在編碼中已講r.codeSomeShards(matrixRows, subShards, outputs[:outputCount], outputCount, shardSize)if dataOnly {// Exit out early if we are only interested in the data shardsfmt.Println("only interested in the data shards")return nil } //如果還要重建校驗數據,就把校驗數據計算出來,和編碼部分一樣。outputCount = 0for iShard := r.DataShards; iShard < r.Shards; iShard++ {if len(shards[iShard]) == 0 {if cap(shards[iShard]) >= shardSize {shards[iShard] = shards[iShard][0:shardSize]} else {shards[iShard] = make([]byte, shardSize)}outputs[outputCount] = shards[iShard]matrixRows[outputCount] = r.parity[iShard-r.DataShards]outputCount++}}r.codeSomeShards(matrixRows, shards[:r.DataShards], outputs[:outputCount], outputCount, shardSize)return nil }?
總結
以上是生活随笔為你收集整理的深入浅出理解reedsolomon库数据冗余算法原理和具体实现源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 国税总局发票查验平台验证码识别方案,识别
- 下一篇: 计算机对水利方面的影响,计算机技术对于水