kylin KV+cube方案分析
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
前言
??在使用Kylin的時(shí)候,最重要的一步就是創(chuàng)建cube的模型定義,即指定度量和維度以及一些附加信息,然后對(duì)cube進(jìn)行build,當(dāng)然我們也可以根據(jù)原始表中的某一個(gè)string字段(這個(gè)字段的格式必須是日期格式,表示日期的含義)設(shè)定分區(qū)字段,這樣一個(gè)cube就可以進(jìn)行多次build,每一次的build會(huì)生成一個(gè)segment,每一個(gè)segment對(duì)應(yīng)著一個(gè)時(shí)間區(qū)間的cube,這些segment的時(shí)間區(qū)間是連續(xù)并且不重合的,對(duì)于擁有多個(gè)segment的cube可以執(zhí)行merge,相當(dāng)于將一個(gè)時(shí)間區(qū)間內(nèi)部的segment合并成一個(gè)。下面從源碼開(kāi)始分析cube的build和merge過(guò)程。本文基于Kylin-1.0-incubating版本,對(duì)于Kylin的介紹可以參見(jiàn):http://blog.csdn.net/yu616568/article/details/48103415
入口介紹
??在kylin的web頁(yè)面上創(chuàng)建完成一個(gè)cube之后可以點(diǎn)擊action下拉框執(zhí)行build或者merge操作,這兩個(gè)操作都會(huì)調(diào)用cube的rebuild接口,調(diào)用的參數(shù)包括:1、cube名,用于唯一標(biāo)識(shí)一個(gè)cube,在當(dāng)前的kylin版本中cube名是全局唯一的,而不是每一個(gè)project下唯一的;2、本次構(gòu)建的startTime和endTime,這兩個(gè)時(shí)間區(qū)間標(biāo)識(shí)本次構(gòu)建的segment的數(shù)據(jù)源只選擇這個(gè)時(shí)間范圍內(nèi)的數(shù)據(jù);對(duì)于BUILD操作而言,startTime是不需要的,因?yàn)樗偸菚?huì)選擇最后一個(gè)segment的結(jié)束時(shí)間作為當(dāng)前segment的起始時(shí)間。3、buildType標(biāo)識(shí)著操作的類型,可以是”BUILD”、”MERGE”和”REFRESH”。?
??這些操作的統(tǒng)一入口就是JobService.submitJob函數(shù),該函數(shù)首先取出該cube所有關(guān)聯(lián)的構(gòu)建cube的job,并且判斷這些job是否有處于READY、RUNNING、ERROR狀態(tài),如果處于該狀態(tài)意味著這個(gè)job正在執(zhí)行或者可以之后被resume執(zhí)行,做這種限制的原因不得而知(可能是構(gòu)建的區(qū)間是基于時(shí)間吧,需要對(duì)一個(gè)cube并行的構(gòu)建多個(gè)segment(時(shí)間區(qū)間的數(shù)據(jù))的需求并不明顯)。所以如果希望build或者merge cube,必須將未完成的cube的操作執(zhí)行discard操作。然后根據(jù)操作類型執(zhí)行具體的操作:?
1. 如果是BUILD,如果這個(gè)cube中包含distinct count聚合方式的度量并且這個(gè)cube中已經(jīng)存在其他segment,則執(zhí)行appendAndMergeSegments函數(shù),否則執(zhí)行buildJob函數(shù)。?
2. 如果是MERGE操作則執(zhí)行mergeSegments函數(shù)。?
3. 如果是REFRESH,則同樣執(zhí)行buildJob函數(shù)。為這個(gè)時(shí)間區(qū)間的segment重新構(gòu)建。?
??buildJob函數(shù)構(gòu)建一個(gè)新的segment,mergeSegments函數(shù)合并一個(gè)時(shí)間區(qū)間內(nèi)的所有segments,appendAndMergeSegments函數(shù)則首先根據(jù)最后一個(gè)segment的時(shí)間區(qū)間的end值build一個(gè)新的segment然后再將所有的時(shí)間區(qū)間的segments進(jìn)行合并(為什么包含distinct count的聚合函數(shù)的cube的構(gòu)建一定要進(jìn)行合并呢?這應(yīng)該是有distinct-count使用的hyperloglog算法決定的,下次可以專門分析一下這個(gè)算法)。
BUILD操作
??Build操作是構(gòu)建一個(gè)cube指定時(shí)間區(qū)間的數(shù)據(jù),由于kylin基于預(yù)計(jì)算的方式提供數(shù)據(jù)查詢,構(gòu)建操作是指將原始數(shù)據(jù)(存儲(chǔ)在Hadoop中,通過(guò)Hive獲取)轉(zhuǎn)換成目標(biāo)數(shù)據(jù)(存儲(chǔ)在Hbase中)的過(guò)程。主要的步驟可以按照順序分為四個(gè)階段:1、根據(jù)用戶的cube信息計(jì)算出多個(gè)cuboid文件,2、根據(jù)cuboid文件生成htable,3、更新cube信息,4、回收臨時(shí)文件。每一個(gè)階段操作的輸入都需要依賴于上一步的輸出,所以這些操作全是順序執(zhí)行的。
1. 計(jì)算cuboid文件
??在kylin的CUBE模型中,每一個(gè)cube是由多個(gè)cuboid組成的,理論上有N個(gè)普通維度的cube可以是由2的N次方個(gè)cuboid組成的,那么我們可以計(jì)算出最底層的cuboid,也就是包含全部維度的cuboid(相當(dāng)于執(zhí)行一個(gè)group by全部維度列的查詢),然后在根據(jù)最底層的cuboid一層一層的向上計(jì)算,直到計(jì)算出最頂層的cuboid(相當(dāng)于執(zhí)行了一個(gè)不帶group by的查詢),其實(shí)這個(gè)階段kylin的執(zhí)行原理就是這個(gè)樣子的,不過(guò)它需要將這些抽象成mapreduce模型,提交mapreduce作業(yè)執(zhí)行。
1.1 生成原始數(shù)據(jù)(Create Intermediate Flat Hive Table)
??這一步的操作是根據(jù)cube的定義生成原始數(shù)據(jù),這里會(huì)新創(chuàng)建一個(gè)hive外部表,然后再根據(jù)cube中定義的星狀模型,查詢出維度(對(duì)于DERIVED類型的維度使用的是外鍵列)和度量的值插入到新創(chuàng)建的表中,這個(gè)表是一個(gè)外部表,表的數(shù)據(jù)文件(存儲(chǔ)在HDFS)作為下一個(gè)子任務(wù)的輸入,它首先根據(jù)維度中的列和度量中作為參數(shù)的列得到需要出現(xiàn)在該表中的列,然后執(zhí)行三步hive操作,這三步hive操作是通過(guò)hive -e的方式執(zhí)行的shell命令。?
??1. drop TABLE IF EXISTS xxx.?
??2. CREATE EXTERNAL TABLE IF NOT EXISTS xxx() ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\177’ STORED AS SEQUENCEFILE LOCATION xxxx,其中表名是根據(jù)當(dāng)前的cube名和segment的uuid生成的,location是當(dāng)前job的臨時(shí)文件,只有當(dāng)insert插入數(shù)據(jù)的時(shí)候才會(huì)創(chuàng)建,注意這里每一行的分隔符指定的是’\177’(目前是寫(xiě)死的,十進(jìn)制為127).?
??3. 插入數(shù)據(jù),在執(zhí)行之前需要首先設(shè)置一些配置項(xiàng),這些配置項(xiàng)通過(guò)hive的SET命令設(shè)置,是根據(jù)這個(gè)cube的job的配置文件(一般是在kylin的conf目錄下)設(shè)置的,最后執(zhí)行的是INSERT OVERWRITE TABLE xxx SELECT xxxx語(yǔ)句,SELECT子句中選出cube星狀模型中事實(shí)表與維度表按照設(shè)置的方式j(luò)oin之后的出現(xiàn)在維度或者度量參數(shù)中的列(特殊處理derived列),然后再加上用戶設(shè)置的where條件和partition的時(shí)間條件(根據(jù)輸入build的參數(shù)).?
??需要注意的是這里無(wú)論用戶設(shè)置了多少維度和度量,每次join都會(huì)使用事實(shí)表和所有的維度表進(jìn)行join,這可能造成不必要的性能損失(多一個(gè)join會(huì)影響hive性能,畢竟要多讀一些文件)。這一步執(zhí)行完成之后location指定的目錄下就有了原始數(shù)據(jù)的文件,為接下來(lái)的任務(wù)提供了輸入。
1.2 創(chuàng)建事實(shí)表distinct column文件(Extract Fact Table Distinct Columns)
??在這一步是根據(jù)上一步生成的hive表計(jì)算出還表中的每一個(gè)出現(xiàn)在事實(shí)表中的度量的distinct值,并寫(xiě)入到文件中,它是啟動(dòng)一個(gè)MR任務(wù)完成的,MR任務(wù)的輸入是HCatInputFormat,它關(guān)聯(lián)的表就是上一步創(chuàng)建的臨時(shí)表,這個(gè)MR任務(wù)的map階段首先在setup函數(shù)中得到所有度量中出現(xiàn)在事實(shí)表的度量在臨時(shí)表的index,根據(jù)每一個(gè)index得到該列在臨時(shí)表中在每一行的值value,然后將<index, value>作為mapper的輸出,該任務(wù)還啟動(dòng)了一個(gè)combiner,它所做的只是對(duì)同一個(gè)key的值進(jìn)行去重(同一個(gè)mapper的結(jié)果),reducer所做的事情也是進(jìn)行去重(所有mapper的結(jié)果),然后將每一個(gè)index對(duì)應(yīng)的值一行行的寫(xiě)入到以列名命名的文件中。如果某一個(gè)維度列的distinct值比較大,那么可能導(dǎo)致MR任務(wù)執(zhí)行過(guò)程中的OOM。?
??對(duì)于這一步我有一個(gè)疑問(wèn)就是既然所有的原始數(shù)據(jù)都已經(jīng)通過(guò)第一步存入到臨時(shí)hive表中了,我覺(jué)得接下來(lái)就不用再區(qū)分維度表和事實(shí)表了,所有的任務(wù)都基于這個(gè)臨時(shí)表,那么這一步就可以根據(jù)臨時(shí)表計(jì)算出所有的維度列的distinct column值,但是這里僅僅針對(duì)出現(xiàn)在事實(shí)表上的維度,不知道這樣做的原因是什么?難道是因?yàn)樵谙乱徊綍?huì)單獨(dú)計(jì)算維度表的dictionary以及snapshot?
1.3 創(chuàng)建維度詞典(Build Dimension Dictionary)
??這一步是根據(jù)上一步生成的distinct column文件和維度表計(jì)算出所有維度的詞典信息,詞典是為了節(jié)約存儲(chǔ)而設(shè)計(jì)的,用于將一個(gè)成員值編碼成一個(gè)整數(shù)類型并且可以通過(guò)整數(shù)值獲取到原始成員值,每一個(gè)cuboid的成員是一個(gè)key-value形式存儲(chǔ)在hbase中,key是維度成員的組合,但是一般情況下維度是一些字符串之類的值(例如商品名),所以可以通過(guò)將每一個(gè)維度值轉(zhuǎn)換成唯一整數(shù)而減少內(nèi)存占用,在從hbase查找出對(duì)應(yīng)的key之后再根據(jù)詞典獲取真正的成員值。?
?? 這一步是在kylin進(jìn)程內(nèi)的一個(gè)線程中執(zhí)行的,它會(huì)創(chuàng)建所有維度的dictionary,如果是事實(shí)表上的維度則可以從上一步生成的文件中讀取該列的distinct成員值(FileTable),否則則需要從原始的hive表中讀取每一列的信息(HiveTable),根據(jù)不同的源(文件或者h(yuǎn)ive表)獲取所有的列去重之后的成員列表,然后根據(jù)這個(gè)列表生成dictionary,kylin中針對(duì)不同類型的列使用不同的實(shí)現(xiàn)方式,對(duì)于time之類的(date、time、dtaetime和timestamp)使用DateStrDictionary,這里目前還存在著一定的問(wèn)題,因?yàn)檫@種編碼方式會(huì)首先將時(shí)間轉(zhuǎn)換成‘yyyy-MM-dd’的格式,會(huì)導(dǎo)致timestamp之類的精確時(shí)間失去天以后的精度。針對(duì)數(shù)值型的使用NumberDictionary,其余的都使用一般的TrieDictionary(字典樹(shù))。這些dictionary會(huì)作為cube的元數(shù)據(jù)存儲(chǔ)的kylin元數(shù)據(jù)庫(kù)里面,執(zhí)行query的時(shí)候進(jìn)行轉(zhuǎn)換。?
?? 之后還需要計(jì)算維度表的snapshotTable,每一個(gè)snapshot是和一個(gè)hive維度表對(duì)應(yīng)的,生成的過(guò)程是:首先從原始的hive維度表中順序得讀取每一行每一列的值,然后使用TrieDictionary方式對(duì)這些所有的值進(jìn)行編碼,這樣每一行每一列的之都能夠得到一個(gè)編碼之后的id(相同的值id也相同),然后再次讀取原始表中每一行的值,將每一列的值使用編碼之后的id進(jìn)行替換,得到了一個(gè)只有id的新表,這樣同時(shí)保存這個(gè)新表和dictionary對(duì)象(id和值得映射關(guān)系)就能夠保存整個(gè)維度表了,同樣,kylin也會(huì)將這個(gè)數(shù)據(jù)存儲(chǔ)元數(shù)據(jù)庫(kù)中。?
?? 針對(duì)這一步需要注意的問(wèn)題:首先,這一步的兩個(gè)步驟都是在kylin進(jìn)程的一個(gè)線程中執(zhí)行的,第一步會(huì)加載某一個(gè)維度的所有distinct成員到內(nèi)存,如果某一個(gè)維度的cardinality比較大 ,可能會(huì)導(dǎo)致內(nèi)存出現(xiàn)OOM,然后在創(chuàng)建snapshotTable的時(shí)候會(huì)限制原始表的大小不能超過(guò)配置的一個(gè)上限值,如果超過(guò)則會(huì)執(zhí)行失敗。但是應(yīng)該強(qiáng)調(diào)的是這里加載全部的原始維度表更可能出現(xiàn)OOM。另外,比較疑惑的是:1、為什么不在上一步的MR任務(wù)中直接根據(jù)臨時(shí)表中的數(shù)據(jù)生成每一個(gè)distinct column值,而是從原始維度表中讀取?2、計(jì)算全表的dictionary是為了做什么?我目前只了解對(duì)于drived維度是必要保存主鍵和列之間的映射,但是需要保存整個(gè)維度表?!
1.4 計(jì)算生成BaseCuboid文件(Build Base Cuboid Data)
?? 何謂Base cuboid呢?假設(shè)一個(gè)cube包含了四個(gè)維度:A/B/C/D,那么這四個(gè)維度成員間的所有可能的組合就是base cuboid,這就類似在查詢的時(shí)候指定了select count(1) from xxx group by A,B,C,D;這個(gè)查詢結(jié)果的個(gè)數(shù)就是base cuboid集合的成員數(shù)。這一步也是通過(guò)一個(gè)MR任務(wù)完成的,輸入是臨時(shí)表的路徑和分隔符,map對(duì)于每一行首先進(jìn)行split,然后獲取每一個(gè)維度列的值組合作為rowKey,但是rowKey并不是簡(jiǎn)單的這些維度成員的內(nèi)容組合,而是首先將這些內(nèi)容從dictionary中查找出對(duì)應(yīng)的id,然后組合這些id得到rowKey,這樣可以大大縮短hbase的存儲(chǔ)空間,提升查找性能。然后在查找該行中的度量列,根據(jù)cube定義中度量的函數(shù)返回對(duì)該列計(jì)算之后的值。這個(gè)MR任務(wù)還會(huì)執(zhí)行combiner過(guò)程,執(zhí)行邏輯和reducer相同,在reducer中的key是一個(gè)rowKey,value是相同的rowKey的measure組合的數(shù)組,reducer回分解出每一個(gè)measure的值,然后再根據(jù)定義該度量使用的聚合函數(shù)計(jì)算得到這個(gè)rowKey的結(jié)果,其實(shí)這已經(jīng)類似于hbase存儲(chǔ)的格式了。
1.5 計(jì)算第N層cuboid文件(Build N-Dimension Cuboid Data)
??這一個(gè)流程是由多個(gè)步驟的,它是根據(jù)維度組合的cuboid的總數(shù)決定的,上一層cuboid執(zhí)行MR任務(wù)的輸入是下一層cuboid計(jì)算的輸出,由于最底層的cuboid(base)已經(jīng)計(jì)算完成,所以這幾步不需要依賴于任何的hive信息,它的reducer和base cuboid的reducer過(guò)程基本一樣的(相同rowkey的measure執(zhí)行聚合運(yùn)算),mapper的過(guò)程只需要根據(jù)這一行輸入的key(例如A、B、C、D中某四個(gè)成員的組合)獲取可能的下一層的的組合(例如只有A、B、C和B、C、D),那么只需要將這些可能的組合提取出來(lái)作為新的key,value不變進(jìn)行輸出就可以了。?
舉個(gè)例子,假設(shè)一共四個(gè)維度A/B/C/D,他們的成員分別是(A1、A2、A3),(B1、B2)、(C1)、(D1),有一個(gè)measure(對(duì)于這列V,計(jì)算sum(V)),這里忽略dictionary編碼。原始表如下:
| A1 | B1 | C1 | D1 | 2 |
| A1 | B2 | C1 | D1 | 3 |
| A2 | B1 | C1 | D1 | 5 |
| A3 | B1 | C1 | D1 | 6 |
| A3 | B2 | C1 | D1 | 8 |
那么base cuboid最終的輸出如下?
(<A1、B1、C1、D1>、2)?
(<A1、B2、C1、D1>, 3)?
(<A2、B1、C1、D1>, 5)?
(<A3、B1、C1、D1>, 6)?
(<A3、B2、C1、D1>, 8)?
那么它作為下面一個(gè)cuboid的輸入,對(duì)于第一行輸入?
(<A1、B1、C1、D1>, 2),mapper執(zhí)行完成之后會(huì)輸出?
(<A1、B1、C1>, 2)、?
(<A1、B1、D1>, 2)、?
(<A1、C1、D1>, 2)、?
(<B1、C1、D1>, 2)這四項(xiàng),同樣對(duì)于其他的內(nèi)一行也會(huì)輸出四行,最終他們經(jīng)過(guò)reducer的聚合運(yùn)算,得到如下的結(jié)果:?
(<A1、B1、C1>, 2)?
(<A1、B1、D1>, 2)?
(<A1、C1、D1>, 2 + 3)?
(<B1、C1、D1>,2 + 5 +6)?
...?
這樣一次將下一層的結(jié)果作為輸入計(jì)算上一層的cuboid成員,直到最頂層的cuboid,這一個(gè)層cuboid只包含一個(gè)成員,不按照任何維度進(jìn)行g(shù)roup by。?
??上面的這些步驟用于生成cuboid,假設(shè)有N個(gè)維度(對(duì)于特殊類型的),那么就需要有N +1層cuboid,每一層cuboid可能是由多個(gè)維度的組合,但是它包含的維度個(gè)數(shù)相同。
2 準(zhǔn)備輸出
??在上面幾步中,我們已經(jīng)將每一層的cuboid計(jì)算完成,每一層的cuboid文件都是一些cuboid的集合,每一層的cuboid的key包含相同的維度個(gè)數(shù),下面一步就是將這些cuboid文件導(dǎo)入到hbase中。
2.1 計(jì)算分組
??這一步的輸入是之前計(jì)算的全部的cuboid文件,按照cuboid文件的順序(層次的順序)一次讀取每一個(gè)key-value,再按照key-value的形式統(tǒng)計(jì)每一個(gè)key和value占用的空間大小,然后以GB為單位,mapper階段的輸出是每當(dāng)統(tǒng)計(jì)到1GB的數(shù)據(jù),將當(dāng)前的這個(gè)key和當(dāng)前數(shù)據(jù)量總和輸出,在reducer階段根據(jù)用戶創(chuàng)建cube時(shí)指定的cube大小(SMALL,MEDIUM和LARGE)和總的大小計(jì)算出實(shí)際需要?jiǎng)澐譃槎嗌俜謪^(qū),這時(shí)還需要參考最多分區(qū)數(shù)和最少分區(qū)數(shù)進(jìn)行計(jì)算,再根據(jù)實(shí)際數(shù)據(jù)量大小和分區(qū)數(shù)計(jì)算出每一個(gè)分區(qū)的邊界key,將這個(gè)key和對(duì)應(yīng)的分區(qū)編號(hào)輸出到最終文件中,為下一步創(chuàng)建htable做準(zhǔn)備。
2.2 創(chuàng)建HTable
??這一步非常簡(jiǎn)單,根據(jù)上一步計(jì)算出的rowKey分布情況(split數(shù)組)創(chuàng)建HTable,創(chuàng)建一個(gè)HTable的時(shí)候還需要考慮一下幾個(gè)事情:1、列組的設(shè)置,2、每一個(gè)列組的壓縮方式,3、部署coprocessor,4、HTable中每一個(gè)region的大小。在這一步中,列組的設(shè)置是根據(jù)用戶創(chuàng)建cube時(shí)候設(shè)置的,在hbase中存儲(chǔ)的數(shù)據(jù)key是維度成員的組合,value是對(duì)應(yīng)聚合函數(shù)的結(jié)果,列組針對(duì)的是value的,一般情況下在創(chuàng)建cube的時(shí)候只會(huì)設(shè)置一個(gè)列組,該列包含所有的聚合函數(shù)的結(jié)果;在創(chuàng)建HTable時(shí)默認(rèn)使用LZO壓縮,如果不支持LZO則不進(jìn)行壓縮,在后面kylin的版本中支持更多的壓縮方式;kylin強(qiáng)依賴于hbase的coprocessor,所以需要在創(chuàng)建HTable為該表部署coprocessor,這個(gè)文件會(huì)首先上傳到HBase所在的HDFS上,然后在表的元信息中關(guān)聯(lián),這一步很容易出現(xiàn)錯(cuò)誤,例如coprocessor找不到了就會(huì)導(dǎo)致整個(gè)regionServer無(wú)法啟動(dòng),所以需要特別小心;region的劃分已經(jīng)在上一步確定了,所以這里不存在動(dòng)態(tài)擴(kuò)展的情況,所以kylin創(chuàng)建HTable使用的接口如下:?
public void createTable( final HTableDescriptor desc , byte [][] splitKeys)
2.3 構(gòu)建hfile文件
??創(chuàng)建完了HTable之后一般會(huì)通過(guò)插入接口將數(shù)據(jù)插入到表中,但是由于cuboid中的數(shù)據(jù)量巨大,頻繁的插入會(huì)對(duì)Hbase的性能有非常大的影響,所以kylin采取了首先將cuboid文件轉(zhuǎn)換成HTable格式的Hfile文件,然后在通過(guò)bulkLoad的方式將文件和HTable進(jìn)行關(guān)聯(lián),這樣可以大大降低Hbase的負(fù)載,這個(gè)過(guò)程通過(guò)一個(gè)MR任務(wù)完成。?
??這個(gè)任務(wù)的輸入是所有的cuboid文件,在mapper階段根據(jù)每一個(gè)cuboid成員的key-value輸出,如果cube定義時(shí)指定了多個(gè)列組,那么同一個(gè)key要按照不同列組中的值分別輸出,例如在cuboid文件中存在一行cuboid=1,key=1,value=sum(cost),count(1)的數(shù)據(jù),而cube中將這兩個(gè)度量劃分到兩個(gè)列組中,這時(shí)候?qū)τ谶@一行數(shù)據(jù),mapper的輸出為<1, sum(cost)>和<1,count(1)>。reducer使用的是org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer,它會(huì)按照行排序輸出,如果一行中包含多個(gè)值,那么會(huì)將這些值進(jìn)行排序再輸出。輸出的格式則是根據(jù)HTable的文件格式定義的。
2.4 BulkLoad文件
??這一步將HFile文件load到HTable中,因?yàn)閘oad操作會(huì)將原始的文件刪除(相當(dāng)于remove),在操作之前首先將所有列組的Hfile的權(quán)限都設(shè)置為777,然后再啟動(dòng)LoadIncrementalHFiles任務(wù)執(zhí)行l(wèi)oad操作,它的輸入為文件的路徑和HTable名,這一步完全依賴于HBase的工具。這一步完成之后,數(shù)據(jù)已經(jīng)存儲(chǔ)到HBase中了,key的格式由cuboid編號(hào)+每一個(gè)成員在字典樹(shù)的id組成,value可能保存在多個(gè)列組里,包含在原始數(shù)據(jù)中按照這幾個(gè)成員進(jìn)行GROUP BY計(jì)算出的度量的值。
3 收尾工作
??執(zhí)行完上一步就已經(jīng)完成了從輸入到輸出的計(jì)算過(guò)程,接下來(lái)要做的就是一些kylin內(nèi)部的工作,分別是更新元數(shù)據(jù),更新cube狀態(tài),垃圾數(shù)據(jù)回收。
3.1 更新?tīng)顟B(tài)
??這一步主要是更新cube的狀態(tài),其中需要更新的包括cube是否可用、以及本次構(gòu)建的數(shù)據(jù)統(tǒng)計(jì),包括構(gòu)建完成的時(shí)間,輸入的record數(shù)目,輸入數(shù)據(jù)的大小,保存到Hbase中數(shù)據(jù)的大小等,并將這些信息持久到元數(shù)據(jù)庫(kù)中。
3.2 垃圾文件回收
??這一步是否成功對(duì)正確性不會(huì)有任何影響,因?yàn)榻?jīng)過(guò)上一步之后這個(gè)segment就可以在這個(gè)cube中被查找到了,但是在整個(gè)執(zhí)行過(guò)程中產(chǎn)生了很多的垃圾文件,其中包括:1、臨時(shí)的hive表,2、因?yàn)閔ive表是一個(gè)外部表,存儲(chǔ)該表的文件也需要額外刪除,3、fact distinct 這一步將數(shù)據(jù)寫(xiě)入到HDFS上為建立詞典做準(zhǔn)備,這時(shí)候也可以刪除了,4、rowKey統(tǒng)計(jì)的時(shí)候會(huì)生成一個(gè)文件,此時(shí)可以刪除。5、生成HFile時(shí)文件存儲(chǔ)的路徑和hbase真正存儲(chǔ)的路徑不同,雖然load是一個(gè)remove操作,但是上層的目錄還是存在的,也需要?jiǎng)h除。這一步kylin做的比較簡(jiǎn)單,并沒(méi)有完全刪除所有的臨時(shí)文件,其實(shí)在整個(gè)計(jì)算過(guò)程中,真正還需要保留的數(shù)據(jù)只有多個(gè)cuboid文件(需要增量build的cube),這個(gè)因?yàn)樵诓煌瑂egment進(jìn)行merge的時(shí)候是基于cuboid文件的,而不是根據(jù)HTable的。
??在Kylin-1.x版本中,整個(gè)cube的一個(gè)build的過(guò)程大概就是這樣,這樣的一個(gè)build只不過(guò)是生成一虐segment,而當(dāng)一個(gè)cube中存在多個(gè)segment時(shí)可能需要將它們進(jìn)行merge,merge的過(guò)程和build的流程大致是相同的,不過(guò)它不需要從頭開(kāi)始,只需要對(duì)字典進(jìn)行merge,然后在對(duì)cuboid文件進(jìn)行merge,最后生成一個(gè)新的HTable。?
但是在Kylin-2.x版本中,整個(gè)家溝發(fā)生了很大的變化,build的引擎也分成了多套,分別是原始的MR引擎,基于Fast Cubing的MR引擎和Spark引擎,這使得build進(jìn)行的更迅速,大大降低等待時(shí)間,后面會(huì)持續(xù)的再對(duì)新的引擎進(jìn)行分析。
Kylin Cube Build的接口說(shuō)明
每一個(gè)Cube需要設(shè)置數(shù)據(jù)源、計(jì)算引擎和存儲(chǔ)引擎,工廠類負(fù)責(zé)創(chuàng)建數(shù)據(jù)源對(duì)象、計(jì)算引擎對(duì)象和存儲(chǔ)引擎對(duì)象
三者之間通過(guò)適配器進(jìn)行串聯(lián)
數(shù)據(jù)源接口(ISource)
public interface ISource extends Closeable {
? ? // 同步數(shù)據(jù)源中表的元數(shù)據(jù)信息
? ? ISourceMetadataExplorer getSourceMetadataExplorer();
? ? // 適配制定的構(gòu)建引擎接口
? ? <I> I adaptToBuildEngine(Class<I> engineInterface);
? ? // 順序讀取表
? ? IReadableTable createReadableTable(TableDesc tableDesc);
? ? // 構(gòu)建之前豐富數(shù)據(jù)源的Partition
? ? SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
存儲(chǔ)引擎接口(IStorage)
public interface IStorage {
? ? // 創(chuàng)建一個(gè)查詢指定Cube的對(duì)象
? ? public IStorageQuery createQuery(IRealization realization);
? ? public <I> I adaptToBuildEngine(Class<I> engineInterface);
}
1
2
3
4
5
6
7
8
計(jì)算引擎接口(IBatchCubingEngine)
public interface IBatchCubingEngine {
? ? public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment);
? ? // 返回一個(gè)工作流計(jì)劃, 用以構(gòu)建指定的CubeSegment
? ? public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);
? ?// 返回一個(gè)工作流計(jì)劃, 用以合并指定的CubeSegment
? ? public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
? ?// 返回一個(gè)工作流計(jì)劃, 用以優(yōu)化指定的CubeSegment
? ? public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter);
? ? public Class<?> getSourceInterface();
? ? public Class<?> getStorageInterface();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
離線Cube Build 調(diào)用鏈
Rest API請(qǐng)求/{cubeName}/rebuild, 調(diào)用CubeController.rebuild()方法 -> jobService.submitJob()
Project級(jí)別的權(quán)限校驗(yàn): aclEvaluate.checkProjectOperationPermission(cube);
ISource source = SourceManager.getSource(cube)根據(jù)CubeInstance的方法getSourceType()的返回值決定ISource的對(duì)象類型
public int getSourceType() {
? ? return getModel().getRootFactTable().getTableDesc().getSourceType();
}
1
2
3
分配新的segment: newSeg = getCubeManager().appendSegment(cube, src);
EngineFactory根據(jù)Cube定義的engine type, 創(chuàng)建對(duì)應(yīng)的IBatchCubingEngine對(duì)象 -> 調(diào)用createBatchCubingJob()方法創(chuàng)建作業(yè)鏈,MRBatchCubingEngine2新建的是BatchCubingJobBuilder2
public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
? ? super(newSegment, submitter);
? ? this.inputSide = MRUtil.getBatchCubingInputSide(seg);
? ? this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
} ? ?
1
2
3
4
5
適配輸入數(shù)據(jù)源到構(gòu)建引擎
SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) {
? ? return getSource(table).adaptToBuildEngine(engineInterface);
}
// HiveSource返回的是HiveMRInput
public <I> I adaptToBuildEngine(Class<I> engineInterface) {
? ? if (engineInterface == IMRInput.class) {
? ? ? ? return (I) new HiveMRInput();
? ? } else {
? ? ? ? throw new RuntimeException("Cannot adapt to " + engineInterface);
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
適配存儲(chǔ)引擎到構(gòu)建引擎
StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);
public static <T> T createEngineAdapter(IStorageAware aware, Class<T> engineInterface) {
? ? return storage(aware).adaptToBuildEngine(engineInterface);
}
// HBaseStorage返回的是HBaseMROutput2Transition
public <I> I adaptToBuildEngine(Class<I> engineInterface) {
? ? if (engineInterface == IMROutput2.class) {
? ? ? ? return (I) new HBaseMROutput2Transition();
? ? } else {
? ? ? ? throw new RuntimeException("Cannot adapt to " + engineInterface);
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
適配成功后, new BatchCubingJobBuilder2(newSegment, submitter).build()該方法創(chuàng)建具體的執(zhí)行步驟, 形成工作流
將工作流添加到執(zhí)行管理器,等待調(diào)度執(zhí)行: getExecutableManager().addJob(job);
---------------------?
本文主要介紹了Apache Kylin是如何將Hive表中的數(shù)據(jù)轉(zhuǎn)化為HBase的KV結(jié)構(gòu),并簡(jiǎn)單介紹了Kylin的SQL查詢是如何轉(zhuǎn)化為HBase的Scan操作。
Apache Kylin 是什么
Apache Kylin是一個(gè)開(kāi)源的、基于Hadoop生態(tài)系統(tǒng)的OLAP引擎(OLAP查詢引擎、OLAP多維分析引擎),能夠通過(guò)SQL接口對(duì)十億、甚至百億行的超大數(shù)據(jù)集實(shí)現(xiàn)秒級(jí)的多維分析查詢。
Apache?Kylin 核心:Kylin OLAP引擎基礎(chǔ)框架,包括元數(shù)據(jù)引擎,查詢引擎,Job(Build)引擎及存儲(chǔ)引擎等,同時(shí)包括REST服務(wù)器以響應(yīng)客戶端請(qǐng)求。
OLAP 是什么
即聯(lián)機(jī)分析處理:以復(fù)雜的分析型查詢?yōu)橹?#xff0c;需要掃描,聚合大量數(shù)據(jù)。
Kylin如何實(shí)現(xiàn)超大數(shù)據(jù)集的秒級(jí)多維分析查詢
預(yù)計(jì)算
對(duì)于超大數(shù)據(jù)集的復(fù)雜查詢,既然現(xiàn)場(chǎng)計(jì)算需要花費(fèi)較長(zhǎng)時(shí)間,那么根據(jù)空間換時(shí)間的原理,我們就可以提前將所有可能的計(jì)算結(jié)果計(jì)算并存儲(chǔ)下來(lái),從而實(shí)現(xiàn)超大數(shù)據(jù)集的秒級(jí)多維分析查詢。
Kylin的預(yù)計(jì)算是如何實(shí)現(xiàn)的
將數(shù)據(jù)源Hive表中的數(shù)據(jù)按照指定的維度和指標(biāo) 由計(jì)算引擎MapReduce離線計(jì)算出所有可能的查詢結(jié)果(即Cube)存儲(chǔ)到HBase中。
Cube 和 Cuboid是什么
簡(jiǎn)單地說(shuō),一個(gè)cube就是一個(gè)Hive表的數(shù)據(jù)按照指定維度與指標(biāo)計(jì)算出的所有組合結(jié)果。
其中每一種維度組合稱為cuboid,一個(gè)cuboid包含一種具體維度組合下所有指標(biāo)的值。
如下圖,整個(gè)立方體稱為1個(gè)cube,立方體中每個(gè)網(wǎng)格點(diǎn)稱為1個(gè)cuboid,圖中(A,B,C,D)和(A,D)都是cuboid,特別的,(A,B,C,D)稱為Base cuboid。cube的計(jì)算過(guò)程是逐層計(jì)算的,首先計(jì)算Base cuboid,然后計(jì)算維度數(shù)依次減少,逐層向下計(jì)算每層的cuboid。
圖1
Build引擎Cube構(gòu)建流程
BatchCubingJobBuilder2.build方法邏輯如下:
??public CubingJob build() {
????????logger.info("MR_V2 new job to BUILD segment " + seg);???????
????????final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
????????final String jobId = result.getId();
????????final String cuboidRootPath = getCuboidRootPath(jobId);
??????
????????// Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
????????// 根據(jù)事實(shí)表和維表抽取需要的維度和度量,創(chuàng)建一張寬表或平表,并且進(jìn)行文件再分配(執(zhí)行Hive命令行來(lái)完成操作)
????????inputSide.addStepPhase1_CreateFlatTable(result);???????
?
????????// Phase 2: Build Dictionary
????????// 創(chuàng)建字典由三個(gè)子任務(wù)完成,由MR引擎完成,分別是抽取維度值(包含抽樣統(tǒng)計(jì))、創(chuàng)建維度字典和保存統(tǒng)計(jì)信息
????????result.addTask(createFactDistinctColumnsStep(jobId));
????????result.addTask(createBuildDictionaryStep(jobId));
????????result.addTask(createSaveStatisticsStep(jobId));
????????// add materialize lookup tables if needed
????????LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
?
????????// 創(chuàng)建HTable
????????outputSide.addStepPhase2_BuildDictionary(result);
??????
????????// Phase 3: Build Cube
????????// 構(gòu)建Cube,包含兩種Cube構(gòu)建算法,分別是逐層算法和快速算法,在執(zhí)行時(shí)會(huì)根據(jù)源數(shù)據(jù)的統(tǒng)計(jì)信息自動(dòng)選擇一種算法(各個(gè)Mapper的小Cube的行數(shù)之和 / reduce后的Cube行數(shù) > 7,重復(fù)度高就選逐層算法,重復(fù)度低就選快速算法)
????????addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
????????addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute
????????// 構(gòu)建HFile文件及把HFile文件BulkLoad到HBase
????????outputSide.addStepPhase3_BuildCube(result);
???????
????????// Phase 4: Update Metadata & Cleanup
????????// 更新Cube元數(shù)據(jù),其中需要更新的包括cube是否可用、以及本次構(gòu)建的數(shù)據(jù)統(tǒng)計(jì),包括構(gòu)建完成的時(shí)間,輸入的record數(shù)目,輸入數(shù)據(jù)的大小,保存到Hbase中數(shù)據(jù)的大小等,并將這些信息持久到元數(shù)據(jù)庫(kù)中
?
????????// 以及清理臨時(shí)數(shù)據(jù),是在整個(gè)執(zhí)行過(guò)程中產(chǎn)生了很多的垃圾文件,其中包括:1、臨時(shí)的hive表,2、因?yàn)閔ive表是一個(gè)外部表,存儲(chǔ)該表的文件也需要額外刪除,3、fact distinct 這一步將數(shù)據(jù)寫(xiě)入到HDFS上為建立詞典做準(zhǔn)備,這時(shí)候也可以刪除了,4、rowKey統(tǒng)計(jì)的時(shí)候會(huì)生成一個(gè)文件,此時(shí)可以刪除。
?
????????result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
????????inputSide.addStepPhase4_Cleanup(result);
????????outputSide.addStepPhase4_Cleanup(result);????????
?
????????return result;
????}
一、?根據(jù)事實(shí)表和維表抽取需要的維度和度量,創(chuàng)建一張寬表或平表,并且進(jìn)行文件再分配
1.1 生成Hive寬表或平表(Create Intermediate Flat Hive Table)(執(zhí)行Hive命令行)
這一步的操作是根據(jù)cube的定義生成原始數(shù)據(jù),這里會(huì)新創(chuàng)建一個(gè)hive外部表,然后再根據(jù)cube中定義的星狀模型,查詢出維度(對(duì)于DERIVED類型的維度使用的是外鍵列)和度量的值插入到新創(chuàng)建的表中,這個(gè)表是一個(gè)外部表,表的數(shù)據(jù)文件(存儲(chǔ)在HDFS)作為下一個(gè)子任務(wù)的輸入,它首先根據(jù)維度中的列和度量中作為參數(shù)的列得到需要出現(xiàn)在該表中的列,然后執(zhí)行三步hive操作,這三步hive操作是通過(guò)hive -e的方式執(zhí)行的shell命令。
??1. drop TABLE IF EXISTS xxx
??2. CREATE EXTERNAL TABLE IF NOT EXISTS xxx() ROW FORMAT DELIMITED FIELDS TERMINATED BY '\177' STORED AS SEQUENCEFILE LOCATION xxxx,其中表名是根據(jù)當(dāng)前的cube名和segment的uuid生成的,location是當(dāng)前job的臨時(shí)文件,只有當(dāng)insert插入數(shù)據(jù)的時(shí)候才會(huì)創(chuàng)建,注意這里每一行的分隔符指定的是'\177'(目前是寫(xiě)死的,十進(jìn)制為127)。
??3. 插入數(shù)據(jù),在執(zhí)行之前需要首先設(shè)置一些配置項(xiàng),這些配置項(xiàng)通過(guò)hive的SET命令設(shè)置,是根據(jù)這個(gè)cube的job的配置文件(一般是在kylin的conf目錄下)設(shè)置的,最后執(zhí)行的是INSERT OVERWRITE TABLE xxx SELECT xxxx語(yǔ)句,SELECT子句中選出cube星狀模型中事實(shí)表與維度表按照設(shè)置的方式j(luò)oin之后的出現(xiàn)在維度或者度量參數(shù)中的列(特殊處理derived列),然后再加上用戶設(shè)置的where條件和partition的時(shí)間條件(根據(jù)輸入build的參數(shù))。
??需要注意的是這里無(wú)論用戶設(shè)置了多少維度和度量,每次join都會(huì)使用事實(shí)表和所有的維度表進(jìn)行join,這可能造成不必要的性能損失(多一個(gè)join會(huì)影響hive性能,畢竟要多讀一些文件)。這一步執(zhí)行完成之后location指定的目錄下就有了原始數(shù)據(jù)的文件,為接下來(lái)的任務(wù)提供了輸入。
JoinedFlatTable.generateDropTableStatement(flatDesc);
JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
JoinedFlatTable.generateInsertDataStatement(flatDesc);
二、?提取緯度值、創(chuàng)建維度字典和保存統(tǒng)計(jì)信息
2.1 提取事實(shí)表維度去重值(Extract Fact Table Distinct Columns)(執(zhí)行一個(gè)MapReduce任務(wù),包含抽取緯度值及統(tǒng)計(jì)各Mapper間的重復(fù)度兩種任務(wù))
????在這一步是根據(jù)上一步生成的hive表計(jì)算出還表中的每一個(gè)出現(xiàn)在事實(shí)表中的維度的distinct值,并寫(xiě)入到文件中,它是啟動(dòng)一個(gè)MR任務(wù)完成的,MR任務(wù)的輸入是HCatInputFormat,它關(guān)聯(lián)的表就是上一步創(chuàng)建的臨時(shí)表,這個(gè)MR任務(wù)的map階段首先在setup函數(shù)中得到所有維度中出現(xiàn)在事實(shí)表的維度列在臨時(shí)表的index,根據(jù)每一個(gè)index得到該列在臨時(shí)表中在每一行的值value,然后將<index+value,EMPTY_TEXT>作為mapper的輸出,通過(guò)index決定由哪個(gè)Reduce處理(而Reduce啟動(dòng)的時(shí)候根據(jù)ReduceTaskID如0000,0001來(lái)初始化決定處理哪個(gè)index對(duì)應(yīng)的維度列),該任務(wù)還啟動(dòng)了一個(gè)combiner,它所做的只是對(duì)同一個(gè)key(維度值)進(jìn)行去重(同一個(gè)mapper的結(jié)果),reducer所做的事情也是進(jìn)行key(維度值)去重(所有mapper的結(jié)果),然后在Reduce中將該維度列去重后的維度值一行行的寫(xiě)入到以列名命名的文件中(注意kylin實(shí)現(xiàn)的方式,聚合的key是緯度值,而不是index)。
提取事實(shí)表維度列的唯一值是通過(guò)FactDistinctColumnsJob這個(gè)MapReduce來(lái)完成,核心思想是每個(gè)Reduce處理一個(gè)維度列,然后每個(gè)維度列Reduce單獨(dú)輸出該維度列對(duì)應(yīng)的去重后的數(shù)據(jù)文件(output written to baseDir/colName/-r-00000,baseDir/colName2/-r-00001 or 直接輸出字典?output written to baseDir/colName/colName.rldict-r-00000)。另外會(huì)輸出各Mapper間重復(fù)度統(tǒng)計(jì)文件(output written to baseDir/statistics/statistics-r-00000,baseDir/statistics/statistics-r-00001)
FactDistinctColumnsJob
FactDistinctColumnsMapper
FactDistinctColumnPartitioner
FactDistinctColumnsCombiner
FactDistinctColumnsReducer
org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper
org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer
?
在FactDistinctColumnsMapper中輸出維度值或通過(guò)HHL近似算法統(tǒng)計(jì)每個(gè)Mapper中各個(gè)CuboID的去重行數(shù)
? ? public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
? ? ? ? Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);
? ? ? ? for (String[] row : rowCollection) {
? ? ? ? ? ? context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
? ? ? ? ? ? for (int i = 0; i < allCols.size(); i++) {
? ? ? ? ? ? ? ? String fieldValue = row[columnIndex[i]];
? ? ? ? ? ? ? ? if (fieldValue == null)
? ? ? ? ? ? ? ? ? ? continue;
? ? ? ? ? ? ? ? final DataType type = allCols.get(i).getType();
? ? ? ? ? ? ? ? if (dictColDeduper.isDictCol(i)) {
? ? ? ? ? ? ? ? ? ? if (dictColDeduper.add(i, fieldValue)) {
? ? ? ? ? ? ? ? ? ? ? ? // 輸出維度值,KEY=COLUMN_INDEX+COLUME_VALUE,VALUE=EMPTY_TEXT
? ? ? ? ? ? ? ? ? ? ? ? writeFieldValue(context, type, i, fieldValue);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
? ? ? ? ? ? ? ? ? ? if (old == null) {
? ? ? ? ? ? ? ? ? ? ? ? old = new DimensionRangeInfo(fieldValue, fieldValue);
? ? ? ? ? ? ? ? ? ? ? ? dimensionRangeInfoMap.put(i, old);
? ? ? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ? ? old.setMax(type.getOrder().max(old.getMax(), fieldValue));
? ? ? ? ? ? ? ? ? ? ? ? old.setMin(type.getOrder().min(old.getMin(), fieldValue));
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? // 抽樣統(tǒng)計(jì),KEY=CUBOID,VALUE=HLLCount
? ? ? ? ? ? if (rowCount % 100 < samplingPercentage) {
? ? ? ? ? ? ? ? putRowKeyToHLL(row);
? ? ? ? ? ? }
? ? ? ? ? ??
? ? ? ? ? ? if (rowCount % 100 == 0) {
? ? ? ? ? ? ? ? dictColDeduper.resetIfShortOfMem();
? ? ? ? ? ? }
? ? ? ? ? ? rowCount++;
? ? ? ? }
? ? }
? ? protected void doCleanup(Context context) throws IOException, InterruptedException {
? ? ? ? ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
? ? ? ? // output each cuboid's hll to reducer, key is 0 - cuboidId
? ? ? ? for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
? ? ? ? ? ? cuboidStatCalculator.waitForCompletion();
? ? ? ? }
? ? ? ? for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
? ? ? ? ? ? Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
? ? ? ? ? ? HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
? ? ? ? ? ? HLLCounter hll;
? ? ? ? ? ? // 輸出各個(gè)CuboID的去重行數(shù)HLLCount
? ? ? ? ? ? for (int i = 0; i < cuboidIds.length; i++) {
? ? ? ? ? ? ? ? hll = cuboidsHLL[i];
? ? ? ? ? ? ? ? tmpbuf.clear();
? ? ? ? ? ? ? ? tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
? ? ? ? ? ? ? ? tmpbuf.putLong(cuboidIds[i]);
? ? ? ? ? ? ? ? outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
? ? ? ? ? ? ? ? hllBuf.clear();
? ? ? ? ? ? ? ? hll.writeRegisters(hllBuf);
? ? ? ? ? ? ? ? outputValue.set(hllBuf.array(), 0, hllBuf.position());
? ? ? ? ? ? ? ? sortableKey.init(outputKey, (byte) 0);
? ? ? ? ? ? ? ? context.write(sortableKey, outputValue);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
? ? ? ? ? ? DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);
? ? ? ? ? ? DataType dataType = allCols.get(colIndex).getType();
? ? ? ? ? ? writeFieldValue(context, dataType, colIndex, rangeInfo.getMin());
? ? ? ? ? ? writeFieldValue(context, dataType, colIndex, rangeInfo.getMax());
? ? ? ? }
? ? }
?
在FactDistinctColumnPartitioner中根據(jù)SelfDefineSortableKey(COLUMN_INDEX)選擇分區(qū)
? ? public int getPartition(SelfDefineSortableKey skey, Text value, int numReduceTasks) {
? ? ? ? Text key = skey.getText();
? ? ? ? // 統(tǒng)計(jì)任務(wù)
? ? ? ? if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
? ? ? ? ? ? Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
? ? ? ? ? ? return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
? ? ? ? } else {
? ? ? ? ? ? // 抽取緯度值任務(wù),直接根據(jù)COLUMN_INDEX指定分區(qū)
? ? ? ? ? ? return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
? ? ? ? }
? ? }
?
在FactDistinctColumnsReducer中輸出去重后的維度值或輸出通過(guò)HLL近似算法統(tǒng)計(jì)CuboID去重后的行數(shù)
? ? public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context) throws IOException, InterruptedException {
? ? ? ? Text key = skey.getText();
? ? ? ??
? ? ? ? // 統(tǒng)計(jì)邏輯
? ? ? ? if (isStatistics) {
? ? ? ? ? ? // for hll
? ? ? ? ? ? long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
? ? ? ? ? ? for (Text value : values) {
? ? ? ? ? ? ? ? HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
? ? ? ? ? ? ? ? ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
? ? ? ? ? ? ? ? hll.readRegisters(bf);
? ? ? ? ? ? ? ? // 累計(jì)Mapper輸出的各個(gè)CuboID未去重的行數(shù)(每個(gè)Reduce處理部分CuboIDs)
? ? ? ? ? ? ? ? totalRowsBeforeMerge += hll.getCountEstimate();
? ? ? ? ? ? ? ? if (cuboidId == baseCuboidId) {
? ? ? ? ? ? ? ? ? ? baseCuboidRowCountInMappers.add(hll.getCountEstimate());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? // 合并CuboID
? ? ? ? ? ? ? ? if (cuboidHLLMap.get(cuboidId) != null) {
? ? ? ? ? ? ? ? ? ? cuboidHLLMap.get(cuboidId).merge(hll);
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? cuboidHLLMap.put(cuboidId, hll);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } else {
? ? ? ? ? ? String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
? ? ? ? ? ? logAFewRows(value);
? ? ? ? ? ? // if dimension col, compute max/min value
? ? ? ? ? ? if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
? ? ? ? ? ? ? ? if (minValue == null || col.getType().compare(minValue, value) > 0) {
? ? ? ? ? ? ? ? ? ? minValue = value;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
? ? ? ? ? ? ? ? ? ? maxValue = value;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? //if dict column
? ? ? ? ? ? if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
? ? ? ? ? ? ? ? if (buildDictInReducer) {
? ? ? ? ? ? ? ? ? ? // 如果需要在Reduce階段構(gòu)建詞典,則在doCleanup后構(gòu)建完輸出詞典文件
? ? ? ? ? ? ? ? ? ? // output written to baseDir/colName/colName.rldict-r-00000 (etc)
? ? ? ? ? ? ? ? ? ? builder.addValue(value);
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? // 直接輸出去重后的維度值
? ? ? ? ? ? ? ? ? ? byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
? ? ? ? ? ? ? ? ? ? // output written to baseDir/colName/-r-00000 (etc)
? ? ? ? ? ? ? ? ? ? String fileName = col.getIdentity() + "/";
? ? ? ? ? ? ? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? rowCount++;
? ? }
?
? ? protected void doCleanup(Context context) throws IOException, InterruptedException {
? ? ? ? if (isStatistics) {
? ? ? ? ? ? //output the hll info;
? ? ? ? ? ? List<Long> allCuboids = Lists.newArrayList();
? ? ? ? ? ? allCuboids.addAll(cuboidHLLMap.keySet());
? ? ? ? ? ? Collections.sort(allCuboids);
? ? ? ? ? ? logMapperAndCuboidStatistics(allCuboids); // for human check
? ? ? ? ? ? 輸出通過(guò)HLL近似算法統(tǒng)計(jì)CuboID去重后的行數(shù)
? ? ? ? ? ? outputStatistics(allCuboids);
? ? ? ? } else {
? ? ? ? ? ? //dimension col
? ? ? ? ? ? if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
? ? ? ? ? ? ? ? outputDimRangeInfo();
? ? ? ? ? ? }
? ? ? ? ? ? // dic col
? ? ? ? ? ? if (buildDictInReducer) {
? ? ? ? ? ? ? ? Dictionary<String> dict = builder.build();
? ? ? ? ? ? ? ? outputDict(col, dict);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? mos.close();
? ? }
?
? ? private void outputStatistics(List<Long> allCuboids) throws IOException, InterruptedException {
? ? ? ? // output written to baseDir/statistics/statistics-r-00000 (etc)
? ? ? ? String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
? ? ? ? ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
? ? ? ? // 獲取進(jìn)入這個(gè)Reduce各個(gè)CuboID去重后的最終統(tǒng)計(jì)行數(shù)
? ? ? ? // mapper overlap ratio at key -1
? ? ? ? long grandTotal = 0;
? ? ? ? for (HLLCounter hll : cuboidHLLMap.values()) {
? ? ? ? ? ? // 累計(jì)各個(gè)CuboID去重后的最終統(tǒng)計(jì)行數(shù)
? ? ? ? ? ? grandTotal += hll.getCountEstimate();
? ? ? ? }
? ? ? ??
? ? ? ? // 輸出進(jìn)入這個(gè)Reduce中的各Mapper間的重復(fù)度,totalRowsBeforeMerge / grandTotal
? ? ? ? double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
? ? ? ? // ?Mapper數(shù)量
? ? ? ? // mapper number at key -2
? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
? ? ? ? // 抽樣百分比
? ? ? ? // sampling percentage at key 0
? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
? ? ? ? // 輸出進(jìn)入這個(gè)Reduce的各個(gè)cuboId的最終統(tǒng)計(jì)結(jié)果
? ? ? ? for (long i : allCuboids) {
? ? ? ? ? ? valueBuf.clear();
? ? ? ? ? ? cuboidHLLMap.get(i).writeRegisters(valueBuf);
? ? ? ? ? ? valueBuf.flip();
? ? ? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
? ? ? ? }
? ? }
2.2 基于維度去重值構(gòu)建維度字典(Build Dimension Dictionary)(在kylin進(jìn)程內(nèi)的一個(gè)線程中去創(chuàng)建所有維度的dictionary)
??這一步是根據(jù)上一步生成的distinct column文件和維度表計(jì)算出所有維度的詞典信息,詞典是為了節(jié)約存儲(chǔ)而設(shè)計(jì)的,用于將一個(gè)成員值編碼成一個(gè)整數(shù)類型并且可以通過(guò)整數(shù)值獲取到原始成員值,每一個(gè)cuboid的成員是一個(gè)key-value形式存儲(chǔ)在hbase中,key是維度成員的組合,但是一般情況下維度是一些字符串之類的值(例如商品名),所以可以通過(guò)將每一個(gè)維度值轉(zhuǎn)換成唯一整數(shù)而減少內(nèi)存占用,在從hbase查找出對(duì)應(yīng)的key之后再根據(jù)詞典獲取真正的成員值。使用字典的好處是有很好的數(shù)據(jù)壓縮率,可降低存儲(chǔ)空間,同時(shí)也提升存儲(chǔ)讀取的速度。缺點(diǎn)是構(gòu)建字典需要較多的內(nèi)存資源,創(chuàng)建維度基數(shù)超過(guò)千萬(wàn)的容易造成內(nèi)存溢出。
???這一步是在kylin進(jìn)程內(nèi)的一個(gè)線程中執(zhí)行的,它會(huì)創(chuàng)建所有維度的dictionary,如果是事實(shí)表上的維度則可以從上一步生成的文件中讀取該列的distinct成員值(FileTable),否則則需要從原始的hive表中讀取每一列的信息(HiveTable),根據(jù)不同的源(文件或者h(yuǎn)ive表)獲取所有的列去重之后的成員列表,然后根據(jù)這個(gè)列表生成dictionary,kylin中針對(duì)不同類型的列使用不同的實(shí)現(xiàn)方式,對(duì)于time之類的(date、time、dtaetime和timestamp)使用DateStrDictionary,這里目前還存在著一定的問(wèn)題,因?yàn)檫@種編碼方式會(huì)首先將時(shí)間轉(zhuǎn)換成‘yyyy-MM-dd’的格式,會(huì)導(dǎo)致timestamp之類的精確時(shí)間失去天以后的精度。針對(duì)數(shù)值型的使用NumberDictionary,其余的都使用一般的TrieDictionary(字典樹(shù))。這些dictionary會(huì)作為cube的元數(shù)據(jù)存儲(chǔ)的kylin元數(shù)據(jù)庫(kù)里面,執(zhí)行query的時(shí)候進(jìn)行轉(zhuǎn)換。
???針對(duì)這一步需要注意的問(wèn)題:首先,這一步的兩個(gè)步驟都是在kylin進(jìn)程的一個(gè)線程中執(zhí)行的,第一步會(huì)加載某一個(gè)維度的所有distinct成員到內(nèi)存,如果某一個(gè)維度的基數(shù)比較大 ,可能會(huì)導(dǎo)致內(nèi)存出現(xiàn)OOM,然后在創(chuàng)建snapshotTable的時(shí)候會(huì)限制原始表的大小不能超過(guò)配置的一個(gè)上限值,如果超過(guò)則會(huì)執(zhí)行失敗。但是應(yīng)該強(qiáng)調(diào)的是這里加載全部的原始維度表更可能出現(xiàn)OOM。
CreateDictionaryJob
2.3 保存統(tǒng)計(jì)信息(合并保存統(tǒng)計(jì)信息及基于上一個(gè)HyperLogLog模擬去重統(tǒng)計(jì)信息選擇Cube構(gòu)建算法等)
???針對(duì)上一個(gè)MR的HyperLogLog模擬去重統(tǒng)計(jì)結(jié)果文件baseDir/statistics/statistics-r-00000,baseDir/statistics/statistics-r-00001,合并相關(guān)統(tǒng)計(jì)信息,根據(jù)最終重復(fù)度選擇Cube構(gòu)建算法
在FactDistinctColumnsReducer中輸出進(jìn)入這個(gè)Reduce的各個(gè)CuboID的統(tǒng)計(jì)信息???
private void outputStatistics(List<Long> allCuboids) throws IOException, InterruptedException {
????????// output written to baseDir/statistics/statistics-r-00000 (etc)
????????String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
????????ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
????????// 獲取進(jìn)入這個(gè)Reduce各個(gè)CuboID去重后的最終統(tǒng)計(jì)行數(shù)
????????// mapper overlap ratio at key -1
????????long grandTotal = 0;
????????for (HLLCounter hll : cuboidHLLMap.values()) {
????????????// 累計(jì)各個(gè)CuboID去重后的最終統(tǒng)計(jì)行數(shù)
????????????grandTotal += hll.getCountEstimate();
????????}
????????// 輸出進(jìn)入這個(gè)Reduce中的各Mapper間的重復(fù)度,totalRowsBeforeMerge / grandTotal
????????double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
????????mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
????????//??Mapper數(shù)量
????????// mapper number at key -2
????????mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
????????// 抽樣百分比
????????// sampling percentage at key 0
????????mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
????????// 輸出進(jìn)入這個(gè)Reduce的各個(gè)cuboId的最終統(tǒng)計(jì)結(jié)果
????????for (long i : allCuboids) {
????????????valueBuf.clear();
????????????cuboidHLLMap.get(i).writeRegisters(valueBuf);
????????????valueBuf.flip();
????????????mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
????????}
}
在SaveStatisticsStep保存統(tǒng)計(jì)信息任務(wù)階段會(huì)去讀取上一步任務(wù)產(chǎn)出的cuboID統(tǒng)計(jì)結(jié)果文件,產(chǎn)出最終統(tǒng)計(jì)信息保存到元數(shù)據(jù)引擎中并且根據(jù)各個(gè)Mapper重復(fù)度選擇Cube構(gòu)建算法。
?Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
?long totalRowsBeforeMerge = 0;
?long grantTotal = 0;
?int samplingPercentage = -1;
?int mapperNumber = -1;
?for (Path item : statisticsFiles) {
?// 讀取解析統(tǒng)計(jì)文件
CubeStatsReader.CubeStatsResult cubeStatsResult = new CubeStatsReader.CubeStatsResult(item,
????????????????????????kylinConf.getCubeStatsHLLPrecision());????????????
????????????????// 獲取各個(gè)CuboID的計(jì)數(shù)器
????????????????cuboidHLLMap.putAll(cubeStatsResult.getCounterMap());
????????????????long pGrantTotal = 0L;
????????????????for (HLLCounter hll : cubeStatsResult.getCounterMap().values()) {
????????????????????pGrantTotal += hll.getCountEstimate();
????????????????}????????????????
????????????????// 累計(jì)所有Mapper輸出的cuboID行數(shù)
????????????????totalRowsBeforeMerge += pGrantTotal * cubeStatsResult.getMapperOverlapRatio();
????????????????// 累計(jì)去重后的cuboID統(tǒng)計(jì)行數(shù)
????????????????grantTotal += pGrantTotal;
????????????double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal;
????????????CubingJob cubingJob = (CubingJob) getManager()
????????????????????.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
????????????// fact源數(shù)據(jù)行數(shù)
????????????long sourceRecordCount = cubingJob.findSourceRecordCount();
???????????
????????????// 保存CuboID最終統(tǒng)計(jì)信息到最終統(tǒng)計(jì)文件cuboid_statistics.seq中
????????????// cuboidHLLMap CuboID的統(tǒng)計(jì)信息
????????????// samplingPercentage 抽樣百分比
????????????// mapperNumber Mapper數(shù)
????????????// mapperOverlapRatio 各個(gè)Mapper間的重復(fù)度
????????????// sourceRecordCount fact源數(shù)據(jù)行數(shù)
????????????CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage,mapperNumber, mapperOverlapRatio, sourceRecordCount);
????????????Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
????????????logger.info(newSegment + " stats saved to hdfs " + statisticsFile);
????????????FSDataInputStream is = fs.open(statisticsFile);
????????????try {
?
????????????????// put the statistics to metadata store
????????????????// 把統(tǒng)計(jì)信息存儲(chǔ)到kylin的元數(shù)據(jù)引擎中
????????????????String resPath = newSegment.getStatisticsResourcePath();
????????????????rs.putResource(resPath, is, System.currentTimeMillis());
????????????????logger.info(newSegment + " stats saved to resource " + resPath);
????????????????// 根據(jù)抽樣數(shù)據(jù)計(jì)算重復(fù)度,選擇Cube構(gòu)建算法,如mapperOverlapRatio > 7 選逐層算法,否則選快速算法
????????????????StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment);
????????????????StatisticsDecisionUtil.optimizeCubingPlan(newSegment);
????????????} finally {
????????????????IOUtils.closeStream(is);
}
用戶該如何選擇算法呢?無(wú)需擔(dān)心,Kylin會(huì)自動(dòng)選擇合適的算法。Kylin在計(jì)算Cube之前對(duì)數(shù)據(jù)進(jìn)行采樣,在“fact distinct”步,利用HyperLogLog模擬去重,估算每種組合有多少不同的key,從而計(jì)算出每個(gè)Mapper輸出的數(shù)據(jù)大小,以及所有Mapper之間數(shù)據(jù)的重合度,據(jù)此來(lái)決定采用哪種算法更優(yōu)。在對(duì)上百個(gè)Cube任務(wù)的時(shí)間做統(tǒng)計(jì)分析后,Kylin選擇了7做為默認(rèn)的算法選擇閥值(參數(shù)kylin.cube.algorithm.layer-or-inmem-threshold):如果各個(gè)Mapper的小Cube的行數(shù)之和,大于reduce后的Cube行數(shù)的7倍,采用Layered Cubing, 反之采用Fast Cubing。如果用戶在使用過(guò)程中,更傾向于使用Fast Cubing,可以適當(dāng)調(diào)大此參數(shù)值,反之調(diào)小。
org.apache.kylin.engine.mr.steps.SaveStatisticsStep???????????????
?int mapperNumLimit = kylinConf.getCubeAlgorithmAutoMapperLimit();
????????????????double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); // 默認(rèn)7
????????????????logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit);
????????????????logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is "+ overlapThreshold);
????????????????// in-mem cubing is good when
????????????????// 1) the cluster has enough mapper slots to run in parallel
????????????????// 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage
????????????????alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= overlapThreshold)//
????????????????????????? CubingJob.AlgorithmEnum.INMEM ????// 快速算法
????????????????????????: CubingJob.AlgorithmEnum.LAYER; ???// 逐層算法
三、?構(gòu)建Cube
3.1 計(jì)算BaseCuboid文件(Build Base Cuboid Data)(執(zhí)行一個(gè)MapReduce任務(wù))
???何謂Base cuboid呢?假設(shè)一個(gè)cube包含了四個(gè)維度:A/B/C/D,那么這四個(gè)維度成員間的所有可能的組合就是base cuboid,這就類似在查詢的時(shí)候指定了select count(1) from xxx group by A,B,C,D;這個(gè)查詢結(jié)果的個(gè)數(shù)就是base cuboid集合的成員數(shù)。這一步也是通過(guò)一個(gè)MR任務(wù)完成的,輸入是臨時(shí)表的路徑和分隔符,map對(duì)于每一行首先進(jìn)行split,然后獲取每一個(gè)維度列的值組合作為rowKey,但是rowKey并不是簡(jiǎn)單的這些維度成員的內(nèi)容組合,而是首先將這些內(nèi)容從dictionary中查找出對(duì)應(yīng)的id,然后組合這些id得到rowKey,這樣可以大大縮短hbase的存儲(chǔ)空間,提升查找性能。然后在查找該行中的度量列。這個(gè)MR任務(wù)還會(huì)執(zhí)行combiner過(guò)程,執(zhí)行邏輯和reducer相同,在reducer中的key是一個(gè)rowKey,value是相同的rowKey的measure組合的數(shù)組,reducer會(huì)分解出每一個(gè)measure的值,然后再根據(jù)定義該度量使用的聚合函數(shù)計(jì)算得到這個(gè)rowKey的結(jié)果,其實(shí)這已經(jīng)類似于hbase存儲(chǔ)的格式了。
org.apache.kylin.engine.mr.steps.BaseCuboidJob
org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper
org.apache.kylin.engine.mr.steps.CuboidReducer
3.2 計(jì)算第N層cuboid文件(Build N-Dimension Cuboid Data)(執(zhí)行N個(gè)MapReduce任務(wù))
??這一個(gè)流程是由多個(gè)步驟的,它是根據(jù)維度組合的cuboid的總數(shù)決定的,上一層cuboid執(zhí)行MR任務(wù)的輸入是下一層cuboid計(jì)算的輸出,由于最底層的cuboid(base)已經(jīng)計(jì)算完成,所以這幾步不需要依賴于任何的hive信息,它的reducer和base cuboid的reducer過(guò)程基本一樣的(相同rowkey的measure執(zhí)行聚合運(yùn)算),mapper的過(guò)程只需要根據(jù)這一行輸入的key(例如A、B、C、D中某四個(gè)成員的組合)獲取可能的下一層的的組合(例如只有A、B、C和B、C、D),那么只需要將這些可能的組合提取出來(lái)作為新的key,value不變進(jìn)行輸出就可以了。
舉個(gè)例子,假設(shè)一共四個(gè)維度A/B/C/D,他們的成員分別是(A1、A2、A3),(B1、B2)、(C1)、(D1),有一個(gè)measure(對(duì)于這列V,計(jì)算sum(V)),這里忽略dictionary編碼。原始表如下:
A
B
C
D
V
A1
B1
C1
D1
2
A1
B2
C1
D1
3
A2
B1
C1
D1
5
A3
B1
C1
D1
6
A3
B2
C1
D1
8
那么base cuboid最終的輸出如下
(<A1、B1、C1、D1>、2)
(<A1、B2、C1、D1>, 3)
(<A2、B1、C1、D1>, 5)
(<A3、B1、C1、D1>, 6)
(<A3、B2、C1、D1>, 8)
那么它作為下面一個(gè)cuboid的輸入,對(duì)于第一行輸入
(<A1、B1、C1、D1>, 2),mapper執(zhí)行完成之后會(huì)輸出
(<A1、B1、C1>, 2)、
(<A1、B1、D1>, 2)、
(<A1、C1、D1>, 2)、
(<B1、C1、D1>, 2)這四項(xiàng),
同樣對(duì)于其他的內(nèi)一行也會(huì)輸出四行,最終他們經(jīng)過(guò)reducer的聚合運(yùn)算,得到如下的結(jié)果:
(<A1、B1、C1>, 2)
(<A1、B1、D1>, 2)
(<A1、C1、D1>, 2 + 3)
(<B1、C1、D1>,2 + 5 +6)
???...
這樣一次將下一層的結(jié)果作為輸入計(jì)算上一層的cuboid成員,直到最頂層的cuboid,這一個(gè)層cuboid只包含一個(gè)成員,不按照任何維度進(jìn)行g(shù)roup by。
?????上面的這些步驟用于生成cuboid,假設(shè)有N個(gè)維度(對(duì)于特殊類型的),那么就需要有N +1層cuboid,每一層cuboid可能是由多個(gè)維度的組合,但是它包含的維度個(gè)數(shù)相同。
org.apache.kylin.engine.mr.steps.NDCuboidJob
org.apache.kylin.engine.mr.steps.NDCuboidMapper
org.apache.kylin.engine.mr.steps.CuboidReducer
3.3 創(chuàng)建HTable
??在上面幾步中,我們已經(jīng)將每一層的cuboid計(jì)算完成,每一層的cuboid文件都是一些cuboid的集合,每一層的cuboid的key包含相同的維度個(gè)數(shù),下面一步就是將這些cuboid文件導(dǎo)入到hbase中,根據(jù)上一步計(jì)算出的rowKey分布情況(split數(shù)組)創(chuàng)建HTable,創(chuàng)建一個(gè)HTable的時(shí)候還需要考慮一下幾個(gè)事情:1、列組的設(shè)置,2、每一個(gè)列組的壓縮方式,3、部署coprocessor,4、HTable中每一個(gè)region的大小。在這一步中,列組的設(shè)置是根據(jù)用戶創(chuàng)建cube時(shí)候設(shè)置的,在hbase中存儲(chǔ)的數(shù)據(jù)key是維度成員的組合,value是對(duì)應(yīng)聚合函數(shù)的結(jié)果,列組針對(duì)的是value的,一般情況下在創(chuàng)建cube的時(shí)候只會(huì)設(shè)置一個(gè)列組,該列包含所有的聚合函數(shù)的結(jié)果;在創(chuàng)建HTable時(shí)默認(rèn)使用LZO壓縮,如果不支持LZO則不進(jìn)行壓縮,在后面kylin的版本中支持更多的壓縮方式;kylin強(qiáng)依賴于hbase的coprocessor,所以需要在創(chuàng)建HTable為該表部署coprocessor,這個(gè)文件會(huì)首先上傳到HBase所在的HDFS上,然后在表的元信息中關(guān)聯(lián),這一步很容易出現(xiàn)錯(cuò)誤,例如coprocessor找不到了就會(huì)導(dǎo)致整個(gè)regionServer無(wú)法啟動(dòng),所以需要特別小心;region的劃分已經(jīng)在上一步確定了,所以這里不存在動(dòng)態(tài)擴(kuò)展的情況,所以kylin創(chuàng)建HTable使用的接口如下:
public void createTable( final HTableDescriptor desc , byte [][] splitKeys)。
CreateHTableJob
3.4 轉(zhuǎn)換HFile文件
??創(chuàng)建完了HTable之后一般會(huì)通過(guò)插入接口將數(shù)據(jù)插入到表中,但是由于cuboid中的數(shù)據(jù)量巨大,頻繁的插入會(huì)對(duì)Hbase的性能有非常大的影響,所以kylin采取了首先將cuboid文件轉(zhuǎn)換成HTable格式的HFile文件,然后在通過(guò)bulkLoad的方式將文件和HTable進(jìn)行關(guān)聯(lián),這樣可以大大降低Hbase的負(fù)載,這個(gè)過(guò)程通過(guò)一個(gè)MR任務(wù)完成。
??這個(gè)任務(wù)的輸入是所有的cuboid文件,在mapper階段根據(jù)每一個(gè)cuboid成員的key-value輸出,如果cube定義時(shí)指定了多個(gè)列組,那么同一個(gè)key要按照不同列組中的值分別輸出,例如在cuboid文件中存在一行cuboid=1,key=1,value=sum(cost),count(1)的數(shù)據(jù),而cube中將這兩個(gè)度量劃分到兩個(gè)列組中,這時(shí)候?qū)τ谶@一行數(shù)據(jù),mapper的輸出為<1, sum(cost)>和<1,count(1)>。reducer使用的是org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer,它會(huì)按照行排序輸出,如果一行中包含多個(gè)值,那么會(huì)將這些值進(jìn)行排序再輸出。輸出的格式則是根據(jù)HTable的文件格式定義的。
CubeHFileJob
3.5 BulkLoad文件
??這一步將HFile文件load到HTable中,因?yàn)閘oad操作會(huì)將原始的文件刪除(相當(dāng)于remove),在操作之前首先將所有列組的Hfile的權(quán)限都設(shè)置為777,然后再啟動(dòng)LoadIncrementalHFiles任務(wù)執(zhí)行l(wèi)oad操作,它的輸入為文件的路徑和HTable名,這一步完全依賴于HBase的工具。這一步完成之后,數(shù)據(jù)已經(jīng)存儲(chǔ)到HBase中了,key的格式由cuboid編號(hào)+每一個(gè)成員在字典樹(shù)的id組成,value可能保存在多個(gè)列組里,包含在原始數(shù)據(jù)中按照這幾個(gè)成員進(jìn)行GROUP BY計(jì)算出的度量的值。
BulkLoadJob
四、?收尾工作
??執(zhí)行完上一步就已經(jīng)完成了從輸入到輸出的計(jì)算過(guò)程,接下來(lái)要做的就是一些kylin內(nèi)部的工作,分別是更新Cube元數(shù)據(jù),更新cube狀態(tài),臨時(shí)數(shù)據(jù)清理。
4.1 更新Cube元數(shù)據(jù)信息
??這一步主要是更新cube的狀態(tài),其中需要更新的包括cube是否可用、以及本次構(gòu)建的數(shù)據(jù)統(tǒng)計(jì),包括構(gòu)建完成的時(shí)間,輸入的record數(shù)目,輸入數(shù)據(jù)的大小,保存到Hbase中數(shù)據(jù)的大小等,并將這些信息持久到元數(shù)據(jù)庫(kù)中。
UpdateCubeInfoAfterBuildStep
4.2 清理臨時(shí)數(shù)據(jù)
??這一步是否成功對(duì)正確性不會(huì)有任何影響,因?yàn)榻?jīng)過(guò)上一步之后這個(gè)segment就可以在這個(gè)cube中被查找到了,但是在整個(gè)執(zhí)行過(guò)程中產(chǎn)生了很多的垃圾文件,其中包括:1、臨時(shí)的hive表,2、因?yàn)閔ive表是一個(gè)外部表,存儲(chǔ)該表的文件也需要額外刪除,3、fact distinct 這一步將數(shù)據(jù)寫(xiě)入到HDFS上為建立詞典做準(zhǔn)備,這時(shí)候也可以刪除了,4、rowKey統(tǒng)計(jì)的時(shí)候會(huì)生成一個(gè)文件,此時(shí)可以刪除。5、生成HFile時(shí)文件存儲(chǔ)的路徑和hbase真正存儲(chǔ)的路徑不同,雖然load是一個(gè)remove操作,但是上層的目錄還是存在的,也需要?jiǎng)h除。這一步kylin做的比較簡(jiǎn)單,并沒(méi)有完全刪除所有的臨時(shí)文件,其實(shí)在整個(gè)計(jì)算過(guò)程中,真正還需要保留的數(shù)據(jù)只有多個(gè)cuboid文件(需要增量build的cube),這個(gè)因?yàn)樵诓煌瑂egment進(jìn)行merge的時(shí)候是基于cuboid文件的,而不是根據(jù)HTable的。
GarbageCollectionStep
Cuboid 的維度和指標(biāo)如何轉(zhuǎn)換為HBase的KV結(jié)構(gòu)
簡(jiǎn)單的說(shuō)Cuboid的維度會(huì)映射為HBase的Rowkey,Cuboid的指標(biāo)會(huì)映射為HBase的Value。如下圖所示:?圖2
如上圖原始表所示:Hive表有兩個(gè)維度列year和city,有一個(gè)指標(biāo)列price。
如上圖預(yù)聚合表所示:我們具體要計(jì)算的是year和city這兩個(gè)維度所有維度組合(即4個(gè)cuboid)下的sum(priece)指標(biāo),這個(gè)指標(biāo)的具體計(jì)算過(guò)程就是由MapReduce完成的。
如上圖字典編碼所示:為了節(jié)省存儲(chǔ)資源,Kylin對(duì)維度值進(jìn)行了字典編碼。圖中將beijing和shanghai依次編碼為0和1。
如上圖HBase KV存儲(chǔ)所示:在計(jì)算cuboid過(guò)程中,會(huì)將Hive表的數(shù)據(jù)轉(zhuǎn)化為HBase的KV形式。Rowkey的具體格式是cuboid id + 具體的維度值(最新的Rowkey中為了并發(fā)查詢還加入了ShardKey),以預(yù)聚合表內(nèi)容的第2行為例,其維度組合是(year,city),所以cuboid id就是00000011,cuboid是8位,具體維度值是1994和shanghai,所以編碼后的維度值對(duì)應(yīng)上圖的字典編碼也是11,所以HBase的Rowkey就是0000001111,對(duì)應(yīng)的HBase Value就是sum(priece)的具體值。
所有的cuboid計(jì)算完成后,會(huì)將cuboid轉(zhuǎn)化為HBase的KeyValue格式生成HBase的HFile,最后將HFile load進(jìn)cube對(duì)應(yīng)的HBase表中。
Cube 構(gòu)建過(guò)程重要源碼分析
1 從Hive表生成Base Cuboid
在實(shí)際的cube構(gòu)建過(guò)程中,會(huì)首先根據(jù)cube的Hive事實(shí)表和維表生成一張大寬表,然后計(jì)算大寬表列的基數(shù),建立維度字典,估算cuboid的大小,建立cube對(duì)應(yīng)的HBase表,再計(jì)算base cuboid。
計(jì)算base cuboid就是一個(gè)MapReduce作業(yè),其輸入是上面提到的Hive大寬表,輸出的是key是各種維度組合,value是Hive大寬表中指標(biāo)的值。
org.apache.kylin.engine.mr.steps.BaseCuboidJob
org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper
org.apache.kylin.engine.mr.steps.CuboidReducer
map階段生成key-value的代碼如下:???
public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
????????Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(value);
????????for (String[] row: rowCollection) {
????????????try {
????????????????outputKV(row, context);
????????????} catch (Exception ex) {
????????????????handleErrorRecord(row, ex);
????????????}
????????}
?
????}
2 從Base Cuboid 逐層計(jì)算 Cuboid(Cube構(gòu)建算法-逐層算法)
從base cuboid 逐層計(jì)算每層的cuboid,也是MapReduce作業(yè),map階段每層維度數(shù)依次減少。
org.apache.kylin.engine.mr.steps.NDCuboidJob
org.apache.kylin.engine.mr.steps.NDCuboidMapper
org.apache.kylin.engine.mr.steps.CuboidReducer
????????public void doMap(Text key, Text value, Context context) throws Exception {
????????????long cuboidId = rowKeySplitter.split(key.getBytes());
????????????Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
????????????/**
?????????????* Build N-Dimension Cuboid
??????????????## 構(gòu)建N維cuboid
??????????????這些步驟是“逐層”構(gòu)建cube的過(guò)程,每一步以前一步的輸出作為輸入,然后去掉一個(gè)維度以聚合得到一個(gè)子cuboid。舉個(gè)例子,cuboid ABCD去掉A得到BCD,去掉B得到ACD。
??????????????有些cuboid可以從一個(gè)以上的父cuboid聚合得到,這種情況下,Kylin會(huì)選擇最小的一個(gè)父cuboid。舉例,AB可以從ABC(id:1110)和ABD(id:1101)生成,則ABD會(huì)被選中,因?yàn)樗谋華BC要小。
??????????????在這基礎(chǔ)上,如果D的基數(shù)較小,聚合運(yùn)算的成本就會(huì)比較低。所以,當(dāng)設(shè)計(jì)rowkey序列的時(shí)候,請(qǐng)記得將基數(shù)較小的維度放在末尾。這樣不僅有利于cube構(gòu)建,而且有助于cube查詢,因?yàn)轭A(yù)聚合也遵循相同的規(guī)則。
??????????????通常來(lái)說(shuō),從N維到(N/2)維的構(gòu)建比較慢,因?yàn)檫@是cuboid數(shù)量爆炸性增長(zhǎng)的階段:N維有1個(gè)cuboid,(N-1)維有N個(gè)cuboid,(N-2)維有(N-2)*(N-1)個(gè)cuboid,以此類推。經(jīng)過(guò)(N/2)維構(gòu)建的步驟,整個(gè)構(gòu)建任務(wù)會(huì)逐漸變快。
?????????????*/
????????????Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
????????????// if still empty or null
????????????if (myChildren == null || myChildren.size() == 0) {
????????????????context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L);
????????????????if (skipCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
????????????????????logger.info("Skipping record with ordinal: " + skipCounter);
????????????????}
????????????????return;
????????????}???????????
????????????context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L);
????????????Pair<Integer, ByteArray> result;
????????????for (Long child : myChildren) {
????????????????Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
????????????????result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
????????????????outputKey.set(result.getSecond().array(), 0, result.getFirst());
????????????????context.write(outputKey, value);
????????????}?????????
????????}
從base cuboid 逐層計(jì)算每層的cuboid,也是MapReduce作業(yè),map階段每層維度數(shù)依次減少,reduce階段對(duì)指標(biāo)進(jìn)行聚合。
org.apache.kylin.engine.mr.steps.CuboidReducer
????public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
????????aggs.reset();??//MeasureAggregators 根據(jù)每種指標(biāo)的不同類型對(duì)指標(biāo)進(jìn)行聚合
????????for (Text value : values) {
????????????codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
????????????if (cuboidLevel > 0) { // Base Cuboid 的 cuboidLevel 是0
????????????????aggs.aggregate(input, needAggr); //指標(biāo)進(jìn)行進(jìn)一步聚合
????????????} else {
????????????????aggs.aggregate(input);
????????????}
????????}
????????aggs.collectStates(result);
????????ByteBuffer valueBuf = codec.encode(result);
????????outputValue.set(valueBuf.array(), 0, valueBuf.position());
????????context.write(key, outputValue);
}?
3 讀取Hive寬表直接在Mapper端預(yù)聚合構(gòu)建完整Cube(Cube構(gòu)建算法-快速算法)
快速算法的核心思想是清晰簡(jiǎn)單的,就是最大化利用Mapper端的CPU和內(nèi)存,對(duì)分配的數(shù)據(jù)塊,將需要的組合全都做計(jì)算后再輸出給Reducer;由Reducer再做一次合并(merge),從而計(jì)算出完整數(shù)據(jù)的所有組合。如此,經(jīng)過(guò)一輪Map-Reduce就完成了以前需要N輪的Cube計(jì)算。本質(zhì)就是在Mapper端基于內(nèi)存提前做預(yù)聚合。
org.apache.kylin.engine.mr.steps.InMemCuboidJob
org.apache.kylin.engine.mr.steps.InMemCuboidMapper
org.apache.kylin.engine.mr.steps.InMemCuboidReducer
map階段生成key-value的代碼如下:
????public void doMap(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
????????// put each row to the queue
????????T row = getRecordFromKeyValue(key, value);
????????if (offer(context, row, 1, TimeUnit.MINUTES, 60)) {
????????????counter++;
????????????countOfLastSplit++;
????????????if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
????????????????logger.info("Handled " + counter + " records, internal queue size = " + queue.size());
????????????}
????????} else {
????????????throw new IOException("Failed to offer row to internal queue due to queue full!");
????????}
????????if (counter % unitRows == 0 && shouldCutSplit(nSplit, countOfLastSplit)) {
????????????if (offer(context, inputConverterUnit.getCutRow(), 1, TimeUnit.MINUTES, 60)) {
????????????????countOfLastSplit = 0;
????????????} else {
????????????????throw new IOException("Failed to offer row to internal queue due to queue full!");
????????????}
????????????nSplit++;
????????}
}
?
reduce階段整體合并的代碼如下:
????public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
????????aggs.reset();
????????for (ByteArrayWritable value : values) {
????????????if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
????????????????logger.info("Handling value with ordinal (This is not KV number!): " + vcounter);
????????????}
????????????codec.decode(value.asBuffer(), input);
????????????aggs.aggregate(input);
????????}
????????aggs.collectStates(result);
????????// output key
????????outputKey.set(key.array(), key.offset(), key.length());
????????// output value
????????ByteBuffer valueBuf = codec.encode(result);
????????outputValue.set(valueBuf.array(), 0, valueBuf.position());
????????context.write(outputKey, outputValue);
????}
4 Cuboid 轉(zhuǎn)化為HBase的HFile。
主要就是數(shù)據(jù)格式的轉(zhuǎn)化。詳情請(qǐng)參考:?Hive 數(shù)據(jù) bulkload 導(dǎo)入 HBase
不同類型的指標(biāo)是如何進(jìn)行聚合的
每種不同的指標(biāo)都會(huì)有對(duì)應(yīng)的聚合算法,所有指標(biāo)聚合的基類是org.apache.kylin.measure.MeasureAggregator。其核心方法如下:???
? ? abstract public void reset();
????//不同類型的指標(biāo)算法會(huì)實(shí)現(xiàn)該方法
????abstract public void aggregate(V value);
????abstract public V getState();
?
以最簡(jiǎn)單的long類型的sum指標(biāo)為例:???
public class LongSumAggregator extends MeasureAggregator<LongMutable> {
????????LongMutable sum = new LongMutable();
????????@Override
????????public void reset() {
????????????sum.set(0);
????????}
????????@Override
????????public void aggregate(LongMutable value) {
????????????sum.set(sum.get() + value.get());
????????}
????????@Override
????????public LongMutable getState() {
????????????return sum;
????????}
}
SQL查詢是如何轉(zhuǎn)化為HBase的Scan操作的
還是以圖2舉例,假設(shè)查詢SQL如下:???
select year, sum(price)
from table
where city = "beijing"
group by year
這個(gè)SQL涉及維度year和city,所以其對(duì)應(yīng)的cuboid是00000011,又因?yàn)閏ity的值是確定的beijing,所以在Scan HBase時(shí)就會(huì)Scan Rowkey以00000011開(kāi)頭且city的值是beijing的行,取到對(duì)應(yīng)指標(biāo)sum(price)的值,返回給用戶。
總結(jié)
本文主要介紹了Apache Kylin是如何將Hive表中的數(shù)據(jù)轉(zhuǎn)化為HBase的KV結(jié)構(gòu),并簡(jiǎn)單介紹了Kylin的SQL查詢是如何轉(zhuǎn)化為HBase的Scan操作。希望對(duì)大家有所幫助。
---------------------?
Kylin三大引擎和Cube構(gòu)建源碼解析
????最近在工作中用到了kylin,相關(guān)資料還不是很多,關(guān)于源碼的更是少之又少,于是結(jié)合《kylin權(quán)威指南》、《基于Apache Kylin構(gòu)建大數(shù)據(jù)分析平臺(tái)》、相關(guān)技術(shù)博客和自己對(duì)部分源碼的理解進(jìn)行了整理。
一、工作原理
每一個(gè)Cube都可以設(shè)定自己的數(shù)據(jù)源、計(jì)算引擎和存儲(chǔ)引擎,這些設(shè)定信息均保存在Cube的元數(shù)據(jù)中。在構(gòu)建Cube時(shí),首先由工廠類創(chuàng)建數(shù)據(jù)源、計(jì)算引擎和存儲(chǔ)引擎對(duì)象。這三個(gè)對(duì)象獨(dú)立創(chuàng)建,相互之間沒(méi)有關(guān)聯(lián)。
要把它們串聯(lián)起來(lái),使用的是適配器模式。計(jì)算引擎好比是一塊主板,主控整個(gè)Cube的構(gòu)建過(guò)程。它以數(shù)據(jù)源為輸入,以存儲(chǔ)為Cube的輸出,因此也定義了IN和OUT兩個(gè)接口。數(shù)據(jù)源和存儲(chǔ)引擎則需要適配IN和OUT,提供相應(yīng)的接口實(shí)現(xiàn),把自己接入計(jì)算引擎,適配過(guò)程見(jiàn)下圖。適配完成之后,數(shù)據(jù)源和存儲(chǔ)引擎即可被計(jì)算引擎調(diào)用。三大引擎連通,就能協(xié)同完成Cube構(gòu)建。
計(jì)算引擎只提出接口需求,每個(gè)接口都可以有多種實(shí)現(xiàn),也就是能接入多種不同的數(shù)據(jù)源和存儲(chǔ)。類似的,每個(gè)數(shù)據(jù)源和存儲(chǔ)也可以實(shí)現(xiàn)多個(gè)接口,適配到多種不同的計(jì)算引擎上。三者之間是多對(duì)多的關(guān)系,可以任意組合,十分靈活。
二、三大主要接口
(一)數(shù)據(jù)源接口ISource
·adaptToBuildEngine:適配指定的構(gòu)建引擎接口。返回一個(gè)對(duì)象,實(shí)現(xiàn)指定的IN接口。該接口主要由計(jì)算引擎調(diào)用,要求數(shù)據(jù)源向計(jì)算引擎適配。如果數(shù)據(jù)源無(wú)法提供指定接口的實(shí)現(xiàn),則適配失敗,Cube構(gòu)建將無(wú)法進(jìn)行。
·createReadableTable:返回一個(gè)ReadableTable,用來(lái)順序讀取一個(gè)表。除了計(jì)算引擎之外,有時(shí)也會(huì)調(diào)用此方法順序訪問(wèn)數(shù)據(jù)維表的內(nèi)容,用來(lái)創(chuàng)建維度字典或維表快照。
(二)存儲(chǔ)引擎接口IStorage
·adaptToBuildEngine:適配指定的構(gòu)建引擎接口。返回一個(gè)對(duì)象,實(shí)現(xiàn)指定的OUT接口。該接口主要由計(jì)算引擎調(diào)用,要求存儲(chǔ)引擎向計(jì)算引擎適配。如果存儲(chǔ)引擎無(wú)法提供指定接口的實(shí)現(xiàn),則適配失敗,Cube構(gòu)建無(wú)法進(jìn)行。
·createQuery:創(chuàng)建一個(gè)查詢對(duì)象IStorageQuery,用來(lái)查詢給定的IRealization。簡(jiǎn)單來(lái)說(shuō),就是返回一個(gè)能夠查詢指定Cube的對(duì)象。IRealization是在Cube之上的一個(gè)抽象。其主要的實(shí)現(xiàn)就是Cube,此外還有被稱為Hybrid的聯(lián)合Cube。
(三)計(jì)算引擎接口IBatchCubingEngine
·createBatchCubingJob:返回一個(gè)工作流計(jì)劃,用以構(gòu)建指定的CubeSegment。這里的CubeSegment是一個(gè)剛完成初始化,但還不包含數(shù)據(jù)的CubeSegment。返回的DefaultChainedExecutable是一個(gè)工作流的描述對(duì)象。它將被保存并由工作流引擎在稍后調(diào)度執(zhí)行,從而完成Cube的構(gòu)建。
·createBatchMergeJob:返回一個(gè)工作流計(jì)劃,用以合并指定的CubeSegment。這里的CubeSegment是一個(gè)待合并的CubeSegment,它的區(qū)間橫跨了多個(gè)現(xiàn)有的CubeSegment。返回的工作流計(jì)劃一樣會(huì)在稍后被調(diào)度執(zhí)行,執(zhí)行的過(guò)程會(huì)將多個(gè)現(xiàn)有的CubeSegment合并為一個(gè),從而降低Cube的碎片化成都。
·getSourceInterface:指明該計(jì)算引擎的IN接口。
·getStorageInterface:指明該計(jì)算引擎的OUT接口。
三、三大引擎互動(dòng)過(guò)程
1.Rest API接收到構(gòu)建(合并)CubeSegment的請(qǐng)求。
2.EngineFactory根據(jù)Cube元數(shù)據(jù)的定義,創(chuàng)建IBatchCubingEngine對(duì)象,并調(diào)用其上的createBatchCubingJob(或者createBatchMergeJob)方法。
3.IBatchCubingEngine根據(jù)Cube元數(shù)據(jù)的定義,通過(guò)SourceFactory和StorageFactory創(chuàng)建出相應(yīng)的數(shù)據(jù)源ISource和存儲(chǔ)IStorage對(duì)象。
4.IBatchCubingEngine調(diào)用ISource上的adaptToBuildEngine方法傳入IN接口,要求數(shù)據(jù)源向自己適配。
5.IBatchCubingEngine調(diào)用IStorage上的adaptToBuildEngine方法,傳入OUT接口,要求存儲(chǔ)引擎向自己適配。
6.適配成功后,計(jì)算引擎協(xié)同數(shù)據(jù)源和存儲(chǔ)引擎計(jì)劃Cube構(gòu)建的具體步驟,將結(jié)果以工作流的形式返回。
7.執(zhí)行引擎將在稍后執(zhí)行工作流,完成Cube構(gòu)建。
四、Kylin默認(rèn)三大引擎Hive+MapReduce+HBase的介紹和代碼實(shí)現(xiàn)
(一)構(gòu)建引擎MapReduce
每一個(gè)構(gòu)建引擎必須實(shí)現(xiàn)接口IBatchCubingEngine,并在EngineFactory中注冊(cè)實(shí)現(xiàn)類。只有這樣才能在Cube元數(shù)據(jù)中引用該引擎,否則會(huì)在構(gòu)建Cube時(shí)出現(xiàn)“找不到實(shí)現(xiàn)”的錯(cuò)誤。
注冊(cè)的方法是通過(guò)kylin.properties來(lái)完成的。在其中添加一行構(gòu)建引擎的聲明。比如:
EngineFactory在啟動(dòng)時(shí)會(huì)讀取kylin.properties,默認(rèn)引擎即為標(biāo)號(hào)2的MRBatchCubingEngine2這個(gè)引擎。
1.MRBatchCubingEngine2
這是一個(gè)入口類,構(gòu)建Cube的主要邏輯都封裝在BatchCubingJobBuilder2和BatchMergeJobBuilder2中。其中的DefaultChainedExecutable,代表了一種可執(zhí)行的對(duì)象,其中包含了很多子任務(wù)。它執(zhí)行的過(guò)程就是一次串行執(zhí)行每一個(gè)子任務(wù),直到所有子任務(wù)都完成。kylin的構(gòu)建比較復(fù)雜,要執(zhí)行很多步驟,步驟之間有直接的依賴性和順序性。DefaultChainedExecutable很好地抽象了這種連續(xù)依次執(zhí)行的模型,可以用來(lái)表示Cube的構(gòu)建的工作流。
另外,重要的輸入輸出接口也在這里進(jìn)行聲明。IMRInput是IN接口,由數(shù)據(jù)源適配實(shí)現(xiàn);IMROutput2是OUT接口,由存儲(chǔ)引擎適配實(shí)現(xiàn)。
2.BatchCubingJobBuilder2
BatchCubingJobBuilder2和BatchMergeJobBuilder2大同小異,這里以BatchCubingJobBuilder2為例。
BatchCubingJobBuilder2中的成員變量IMRBatchCubingInputSide inputSide和IMRBatchCubingOutputSide2 outputSide分別來(lái)自數(shù)據(jù)源接口IMRInput和存儲(chǔ)接口IMROutput2,分別代表著輸入和輸出兩端參與創(chuàng)建工作流。
BatchCubingJobBuilder2的主體函數(shù)build()中,整個(gè)Cube構(gòu)建過(guò)程是一個(gè)子任務(wù)一次串行執(zhí)行的過(guò)程,這些子任務(wù)又被分為四個(gè)階段。
第一階段:創(chuàng)建平表。
這一階段的主要任務(wù)是預(yù)計(jì)算連接運(yùn)算符,把事實(shí)表和維表連接為一張大表,也稱為平表。這部分工作可通過(guò)調(diào)用數(shù)據(jù)源接口來(lái)完成,因?yàn)閿?shù)據(jù)源一般有現(xiàn)成的計(jì)算表連接方法,高效且方便,沒(méi)有必要在計(jì)算引擎中重復(fù)實(shí)現(xiàn)。
第二階段:創(chuàng)建字典。
創(chuàng)建字典由三個(gè)子任務(wù)完成,由MR引擎完成,分別是抽取列值、創(chuàng)建字典和保存統(tǒng)計(jì)信息。是否使用字典是構(gòu)建引擎的選擇,使用字典的好處是有很好的數(shù)據(jù)壓縮率,可降低存儲(chǔ)空間,同時(shí)也提升存儲(chǔ)讀取的速度。缺點(diǎn)是構(gòu)建字典需要較多的內(nèi)存資源,創(chuàng)建維度基數(shù)超過(guò)千萬(wàn)的容易造成內(nèi)存溢出。
第三階段:構(gòu)建Cube。
其中包含兩種構(gòu)建cube的算法,分別是分層構(gòu)建和快速構(gòu)建。對(duì)于不同的數(shù)據(jù)分布來(lái)說(shuō)它們各有優(yōu)劣,區(qū)別主要在于數(shù)據(jù)通過(guò)網(wǎng)絡(luò)洗牌的策略不同。兩種算法的子任務(wù)將全部被加入工作流計(jì)劃中,在執(zhí)行時(shí)會(huì)根據(jù)源數(shù)據(jù)的統(tǒng)計(jì)信息自動(dòng)選擇一種算法,未被選擇的算法的子任務(wù)將被自動(dòng)跳過(guò)。在構(gòu)建cube的最后還將調(diào)用存儲(chǔ)引擎的接口,存儲(chǔ)引擎負(fù)責(zé)將計(jì)算完的cube放入引擎。
第四階段:更新元數(shù)據(jù)和清理。
最后階段,cube已經(jīng)構(gòu)建完畢,MR引擎將首先添加子任務(wù)更新cube元數(shù)據(jù),然后分別調(diào)用數(shù)據(jù)源接口和存儲(chǔ)引擎接口對(duì)臨時(shí)數(shù)據(jù)進(jìn)行清理。
3.IMRInput
這是BatchCubingJobBuilder2對(duì)數(shù)據(jù)源的要求,所有希望接入MRBatchCubingEngine2的數(shù)據(jù)源都必須實(shí)現(xiàn)該接口。
·getTableInputFormat方法返回一個(gè)IMRTableInputFormat對(duì)象,用以幫助MR任務(wù)從數(shù)據(jù)源中讀取指定的關(guān)系表,為了適應(yīng)MR編程接口,其中又有兩個(gè)方法,configureJob在啟動(dòng)MR任務(wù)前被調(diào)用,負(fù)責(zé)配置所需的InputFormat,連接數(shù)據(jù)源中的關(guān)系表。由于不同的InputFormat所讀入的對(duì)象類型各不相同,為了使得構(gòu)建引擎能夠統(tǒng)一處理,因此又引入了parseMapperInput方法,對(duì)Mapper的每一行輸入都會(huì)調(diào)用該方法一次,該方法的輸入是Mapper的輸入,具體類型取決于InputFormat,輸出為統(tǒng)一的字符串?dāng)?shù)組,每列為一個(gè)元素。整體表示關(guān)系表中的一行。這樣Mapper救能遍歷數(shù)據(jù)源中的表了。
·getBatchCubingInputSide方法返回一個(gè)IMRBatchCubingInputSide對(duì)象,參與創(chuàng)建一個(gè)CubeSegment的構(gòu)建工作流,它內(nèi)部包含三個(gè)方法,addStepPhase1_CreateFlatTable()方法由構(gòu)建引擎調(diào)用,要求數(shù)據(jù)源在工作流中添加步驟完成平表的創(chuàng)建;getFlatTableInputFormat()方法幫助MR任務(wù)讀取之前創(chuàng)建的平表;addStepPhase4_Cleanup()是進(jìn)行清理收尾,清除已經(jīng)沒(méi)用的平表和其它臨時(shí)對(duì)象,這三個(gè)方法將由構(gòu)建引擎依次調(diào)用。
4.IMROutput2
這是BatchCubingJobBuilder2對(duì)存儲(chǔ)引擎的要求,所有希望接入BatchCubingJobBuilder2的存儲(chǔ)都必須實(shí)現(xiàn)該接口。
IMRBatchCubingOutputSide2代表存儲(chǔ)引擎配合構(gòu)建引擎創(chuàng)建工作流計(jì)劃,該接口的內(nèi)容如下:
·addStepPhase2_BuildDictionary:由構(gòu)建引擎在字典創(chuàng)建后調(diào)用。存儲(chǔ)引擎可以借此機(jī)會(huì)在工作流中添加步驟完成存儲(chǔ)端的初始化或準(zhǔn)備工作。
·addStepPhase3_BuildCube:由構(gòu)建引擎在Cube計(jì)算完畢之后調(diào)用,通知存儲(chǔ)引擎保存CubeSegment的內(nèi)容。每個(gè)構(gòu)建引擎計(jì)算Cube的方法和結(jié)果的存儲(chǔ)格式可能都會(huì)有所不同。存儲(chǔ)引擎必須依照數(shù)據(jù)接口的協(xié)議讀取CubeSegment的內(nèi)容,并加以保存。
·addStepPhase4_Cleanup:由構(gòu)建引擎在最后清理階段調(diào)用,給存儲(chǔ)引擎清理臨時(shí)垃圾和回收資源的機(jī)會(huì)。
(二)數(shù)據(jù)源Hive
Hive是kylin的默認(rèn)數(shù)據(jù)源,由于數(shù)據(jù)源的實(shí)現(xiàn)依賴構(gòu)建引擎對(duì)輸入接口的定義,因此本節(jié)的具體內(nèi)容只適用于MR引擎。
數(shù)據(jù)源HiveSource首先要實(shí)現(xiàn)ISource接口。
HiveSource實(shí)現(xiàn)了ISource接口中的方法。adaptToBuildEngine()只能適配IMRInput,返回HiveMRInput實(shí)例。另一個(gè)方法createReadableTable()返回一個(gè)ReadableTable對(duì)象,提供讀取一張hive表的能力。
HiveMRInput
HiveMRInput實(shí)現(xiàn)了IMRInput接口,實(shí)現(xiàn)了它的兩個(gè)方法。
一是HiveTableInputFormat實(shí)現(xiàn)了IMRTableInputFormat接口,它主要使用了HCatInputFormat作為MapReduce的輸入格式,用通用的方式讀取所有類型的Hive表。Mapper輸入對(duì)象為DefaultHCatRecord,統(tǒng)一轉(zhuǎn)換為String[]后交由構(gòu)建引擎處理。
二是BatchCubingInputSide實(shí)現(xiàn)了IMRBatchCubingInputSide接口。主要實(shí)現(xiàn)了在構(gòu)建的第一階段創(chuàng)建平表的步驟。首先用count(*)查詢獲取Hive平表的總行數(shù),然后用第二句HQL創(chuàng)建Hive平表,同時(shí)添加參數(shù)根據(jù)總行數(shù)分配Reducer數(shù)目。
(三)存儲(chǔ)引擎HBase
存儲(chǔ)引擎HBaseStorage實(shí)現(xiàn)了IStorage接口。
·createQuery方法,返回指定IRealization(數(shù)據(jù)索引實(shí)現(xiàn))的一個(gè)查詢對(duì)象。因?yàn)镠Base存儲(chǔ)是為Cube定制的,所以只支持Cube類型的數(shù)據(jù)索引。具體的IStorageQuery實(shí)現(xiàn)應(yīng)根據(jù)存儲(chǔ)引擎的版本而有所不同。
·adaptToBuildEngine方法,適配IMROutput2的輸出接口。
HBaseMROutput2
觀察IMRBatchCubingOutputSide2的實(shí)現(xiàn)。它在兩個(gè)時(shí)間點(diǎn)參與Cube構(gòu)建的工作流。一是在字典創(chuàng)建之后(Cube構(gòu)造之前),在addStepPhase2_BuildDictionary()中添加了“創(chuàng)建HTable”這一步,估算最終CubeSegment的大小,并以此來(lái)切分HTable Regions,創(chuàng)建HTable。
第二個(gè)插入點(diǎn)是在Cube計(jì)算完畢之后,由構(gòu)建引擎調(diào)用addStepPhase3_BuildCube()。這里要將Cube保存為HTable,實(shí)現(xiàn)分為“轉(zhuǎn)換HFile”和“批量導(dǎo)入到HTable”兩步。因?yàn)橹苯硬迦際Table比較緩慢,為了最快速地將數(shù)據(jù)導(dǎo)入到HTable,采取了Bulk Load的方法。先用一輪MapReduce將Cube數(shù)據(jù)轉(zhuǎn)換為HBase的存儲(chǔ)文件格式HFile,然后就可以直接將HFile導(dǎo)入空的HTable中,完成數(shù)據(jù)導(dǎo)入。
最后一個(gè)插入點(diǎn)是addStepPhase4_Cleanup()是空實(shí)現(xiàn),對(duì)于HBase存儲(chǔ)來(lái)說(shuō)沒(méi)有需要清理的資源。
五、CubingJob的構(gòu)建過(guò)程
在Kylin構(gòu)建CubeSegment的過(guò)程中,計(jì)算引擎居于主導(dǎo)地位,通過(guò)它來(lái)協(xié)調(diào)數(shù)據(jù)源和存儲(chǔ)引擎。
在網(wǎng)頁(yè)上向Kylin服務(wù)端發(fā)送構(gòu)建新的CubeSegment的請(qǐng)求后,通過(guò)controller層來(lái)到service層,進(jìn)入JobService類中的submitJob方法,方法內(nèi)部再調(diào)用submitJobInternal方法,在build、merge和refresh的時(shí)候,通過(guò)EngineFactory.createBatchCubingJob(newSeg, submitter)返回一個(gè)job實(shí)例,從這里可以看出,CubingJob的構(gòu)建入口是由計(jì)算引擎提供的,即默認(rèn)的計(jì)算引擎MRBatchCubingEngine2。
Kylin所支持的所有計(jì)算引擎,都會(huì)在EngineFactory中注冊(cè),并保存在batchEngine中,可以通過(guò)配置文件kylin.properties選擇計(jì)算引擎,目前Kylin支持的計(jì)算引擎有:
MRBatchCubingEngine2實(shí)現(xiàn)了createBatchCubingJob方法,方法內(nèi)調(diào)用了BatchCubingJobBuild2的build方法。
在new的初始化過(guò)程中,super(newSegment,submitter)就是執(zhí)行父類的構(gòu)造方法,進(jìn)行了一些屬性的初始化賦值,其中的inputSide和outputSide就上上文提到的數(shù)據(jù)源和存儲(chǔ)引擎實(shí)例,通過(guò)計(jì)算引擎的協(xié)調(diào)來(lái)進(jìn)行CubingJob的構(gòu)建。
數(shù)據(jù)源inputSide實(shí)例獲取:
以上即為數(shù)據(jù)源實(shí)例獲取過(guò)程的代碼展現(xiàn),BatchCubingJobBuilder2初始化的時(shí)候,調(diào)用MRUtil的getBatchCubingInputSide方法,它最終調(diào)用的其實(shí)還是MRBatchCubingEngine2這個(gè)計(jì)算引擎的getJoinedFlatTableDesc方法,它返回了一個(gè)IJoinedFlatTableDesc實(shí)例,這個(gè)對(duì)象就是對(duì)數(shù)據(jù)源表信息的封裝。獲得了這個(gè)flatDesc實(shí)例之后,就要來(lái)獲取inputSide實(shí)例,與獲取計(jì)算引擎代碼類似,目前kylin中支持的數(shù)據(jù)源有:
Kylin默認(rèn)的數(shù)據(jù)源是序號(hào)為0的HiveSource,所以最后調(diào)用的是HiveSource的adaptToBuildEngine,根據(jù)傳入的IMRInput.class接口,最終返回得到HiveMRInput的實(shí)例,最后再通過(guò)它的getBatchCubingInputSide的方法獲取inputSide的實(shí)例。
存儲(chǔ)引擎outputSide實(shí)例獲取:
以上即為存儲(chǔ)引擎實(shí)例獲取的代碼展現(xiàn),BatchCubingJobBuilder2初始化的時(shí)候,調(diào)用MRUtil的getBatchCubingOutputSide方法,方法內(nèi)先調(diào)用了StorageFactory類的createEngineAdapter方法,方法內(nèi)又調(diào)用實(shí)現(xiàn)了Storage接口的HBaseStorage類的adaptToBuildEngine方法,最后返回了HBaseMROutput2Transition實(shí)例,然后在通過(guò)它的getBatchCubingOutputSide方法就可以獲取到outputSide的實(shí)例。目前kylin中支持的數(shù)據(jù)源有:
kylin默認(rèn)的存儲(chǔ)引擎是HBase。
——————————————————————————————————
通過(guò)構(gòu)造函數(shù),數(shù)據(jù)源、計(jì)算引擎和數(shù)據(jù)存儲(chǔ)三個(gè)模塊已經(jīng)關(guān)聯(lián)到一起了,上文介紹到的MRBatchCubingEngine2的方法中,在new出了一個(gè)BatchCubingJobBuild2實(shí)例后,接著就調(diào)用了build方法,最后返回了一個(gè)CubingJob實(shí)例。build方法邏輯如下:
方法的內(nèi)容就是構(gòu)建一個(gè)CubeSegment的步驟,依次順序的加入到CubingJob的任務(wù)list中。
從第一行開(kāi)始,調(diào)用了CubingJob的createBuildJob方法,里面又調(diào)用了initCubingJob方法。
initCubingJob方法就是獲取到cube相關(guān)的一些配置信息進(jìn)行初始化,它是根據(jù)cube的名字去查詢所在的project,如果不同的project下創(chuàng)建了相同名字的cube,那返回的就會(huì)是一個(gè)List,然后看配置文件中是否開(kāi)啟了允許cube重名,如不允許則直接拋出異常,如果允許就在設(shè)置projectName時(shí)取返回List中的第一個(gè)元素,那么這里就可能導(dǎo)致projectName設(shè)置錯(cuò)誤,所以最好保證cube的name是全局唯一的。
在CubingJob初始化之后,會(huì)獲取cuboidRootPath,獲取邏輯如下:
經(jīng)過(guò)一連串的調(diào)用拼裝,最終獲取的路徑格式如下:
hdfs:///kylin/kylin_metadata/kylin-jobId/cubeName/cuboid
接下來(lái)就是三大引擎相互協(xié)作,構(gòu)建CubeSegment的過(guò)程,整個(gè)過(guò)程大致分為創(chuàng)建hive平表、創(chuàng)建字典、構(gòu)建Cube和更新元數(shù)據(jù)和清理這四個(gè)步驟。
第一步和第四步是由數(shù)據(jù)源來(lái)實(shí)現(xiàn)的,具體是在HiveMRInput類實(shí)現(xiàn)了IMRInput接口的getBatchCubingInputSide方法中,它返回了一個(gè)BatchCubingInputSide實(shí)例,在這個(gè)類中完成了具體工作;第二步是由計(jì)算引擎實(shí)現(xiàn)的,依靠JobBuilderSupport類中的方法完成;第三步是由計(jì)算引擎和存儲(chǔ)引擎共同完成的,包括構(gòu)建cube和存儲(chǔ)到HBase;第四步是由數(shù)據(jù)源和存儲(chǔ)引擎分別完成的;我們按步驟對(duì)代碼進(jìn)行分析。
首先是第一步創(chuàng)建hive平表調(diào)用了HiveMRInput類中的靜態(tài)內(nèi)部類BatchCubingInputSide中的addStepPhase1_CreateFlatTable方法。
先獲取cubeName、cubeConfig、hive命令(USE faltTableDatabase)三個(gè)變量。
接下來(lái)的方法就是抽取變量,進(jìn)行hive命令的拼接,完成以下步驟:
一是從hive表中,將所需字段從事實(shí)表和維表中提取出來(lái),構(gòu)建一個(gè)寬表;
二是將上一步得到的寬表,按照某個(gè)字段進(jìn)行重新分配,如果沒(méi)有指定字段,則隨機(jī),目的是產(chǎn)生多個(gè)差不多大小的文件,作為后續(xù)構(gòu)建任務(wù)的輸入,防止數(shù)據(jù)傾斜。
三是將hive中的視圖物化。
——————————————————————————————————
創(chuàng)建平表命令例子:
hive -e "USE default;
DROP TABLE IF EXISTS kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d;
CREATE EXTERNAL TABLE IF NOT EXISTS kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d
(
TACONFIRM_BUSINESSCODE string
,TACONFIRM_FUNDCODE string
,TACONFIRM_SHARETYPE string
,TACONFIRM_NETCODE string
,TACONFIRM_CURRENCYTYPE string
,TACONFIRM_CODEOFTARGETFUND string
,TACONFIRM_TARGETSHARETYPE string
,TACONFIRM_TARGETBRANCHCODE string
,TACONFIRM_RETURNCODE string
,TACONFIRM_DEFDIVIDENDMETHOD string
,TACONFIRM_FROZENCAUSE string
,TACONFIRM_TAINTERNALCODE string
,TACONFIRM_C_PROVICE string
,TAPROVINCE_PROVINCENAME string
,TASHARETYPE_SHARETYPENAME string
)
STORED AS SEQUENCEFILE
LOCATION 'hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d';
ALTER TABLE kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d SET TBLPROPERTIES('auto.purge'='true');
INSERT OVERWRITE TABLE kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d SELECT
TACONFIRM.BUSINESSCODE as TACONFIRM_BUSINESSCODE
,TACONFIRM.FUNDCODE as TACONFIRM_FUNDCODE
,TACONFIRM.SHARETYPE as TACONFIRM_SHARETYPE
,TACONFIRM.NETCODE as TACONFIRM_NETCODE
,TACONFIRM.CURRENCYTYPE as TACONFIRM_CURRENCYTYPE
,TACONFIRM.CODEOFTARGETFUND as TACONFIRM_CODEOFTARGETFUND
,TACONFIRM.TARGETSHARETYPE as TACONFIRM_TARGETSHARETYPE
,TACONFIRM.TARGETBRANCHCODE as TACONFIRM_TARGETBRANCHCODE
,TACONFIRM.RETURNCODE as TACONFIRM_RETURNCODE
,TACONFIRM.DEFDIVIDENDMETHOD as TACONFIRM_DEFDIVIDENDMETHOD
,TACONFIRM.FROZENCAUSE as TACONFIRM_FROZENCAUSE
,TACONFIRM.TAINTERNALCODE as TACONFIRM_TAINTERNALCODE
,TACONFIRM.C_PROVICE as TACONFIRM_C_PROVICE
,TAPROVINCE.PROVINCENAME as TAPROVINCE_PROVINCENAME
,TASHARETYPE.SHARETYPENAME as TASHARETYPE_SHARETYPENAME
FROM DEFAULT.TACONFIRM as TACONFIRM?
INNER JOIN DEFAULT.TAPROVINCE as TAPROVINCE
ON TACONFIRM.C_PROVICE = TAPROVINCE.C_PROVICE
INNER JOIN DEFAULT.TASHARETYPE as TASHARETYPE
ON TACONFIRM.SHARETYPE = TASHARETYPE.SHARETYPE
WHERE 1=1;
" --hiveconf hive.merge.mapredfiles=false --hiveconf hive.auto.convert.join=true --hiveconf dfs.replication=2 --hiveconf hive.exec.compress.output=true --hiveconf hive.auto.convert.join.noconditionaltask=true --hiveconf mapreduce.job.split.metainfo.maxsize=-1 --hiveconf hive.merge.mapfiles=false --hiveconf hive.auto.convert.join.noconditionaltask.size=100000000 --hiveconf hive.stats.autogather=true
——————————————————————————————————
文件再分配和視圖物化命令例子:
hive -e "USE default;
set mapreduce.job.reduces=3;
set hive.merge.mapredfiles=false;
INSERT OVERWRITE TABLE kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d SELECT * FROM kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d DISTRIBUTE BY RAND();
" --hiveconf hive.merge.mapredfiles=false --hiveconf hive.auto.convert.join=true --hiveconf dfs.replication=2 --hiveconf hive.exec.compress.output=true --hiveconf hive.auto.convert.join.noconditionaltask=true --hiveconf mapreduce.job.split.metainfo.maxsize=-1 --hiveconf hive.merge.mapfiles=false --hiveconf hive.auto.convert.join.noconditionaltask.size=100000000 --hiveconf hive.stats.autogather=true
——————————————————————————————————
創(chuàng)建字典由三個(gè)子任務(wù)完成,分別是抽取列值、創(chuàng)建字典和保存統(tǒng)計(jì)信息,由MR引擎完成,所以直接在build方法中add到任務(wù)list中。是否使用字典是構(gòu)建引擎的選擇,使用字典的好處是有很好的數(shù)據(jù)壓縮率,可降低存儲(chǔ)空間,同時(shí)也提升存儲(chǔ)讀取的速度。缺點(diǎn)是構(gòu)建字典需要較多的內(nèi)存資源,創(chuàng)建維度基數(shù)超過(guò)千萬(wàn)的容易造成內(nèi)存溢出。在這個(gè)過(guò)程最后,還要?jiǎng)?chuàng)建HTable,這屬于存儲(chǔ)引擎的任務(wù),所以是在HBaseMROutput2Transition實(shí)例中完成的。
——————————————————————————————————
抽取列值步驟參數(shù)例子:
?-conf /usr/local/apps/apache-kylin-2.3.1-bin/conf/kylin_job_conf.xml -cubename Taconfirm_kylin_15all -output hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/fact_distinct_columns -segmentid ddacfb18-3d2e-4e1b-8975-f0871183418d -statisticsoutput hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/fact_distinct_columns/statistics -statisticssamplingpercent 100 -jobname Kylin_Fact_Distinct_Columns_Taconfirm_kylin_15all_Step -cubingJobId 4c5d4bb4-791f-4ec3-b3d7-89780adc3f58
——————————————————————————————————
?構(gòu)建維度字典步驟參數(shù)例子 :
?-cubename Taconfirm_kylin_15all -segmentid ddacfb18-3d2e-4e1b-8975-f0871183418d -input hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/fact_distinct_columns -dictPath hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/dict
——————————————————————————————————
創(chuàng)建HTable步驟參數(shù)例子:
?-cubename Taconfirm_kylin_15all -segmentid ddacfb18-3d2e-4e1b-8975-f0871183418d -partitions hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/rowkey_stats/part-r-00000 -cuboidMode CURRENT
——————————————————————————————————
構(gòu)建Cube屬于計(jì)算引擎的任務(wù),就是根據(jù)準(zhǔn)備好的數(shù)據(jù),依次產(chǎn)生cuboid的數(shù)據(jù),在這里調(diào)用了兩種構(gòu)建方法,分別是分層構(gòu)建和快速構(gòu)建,但最終只會(huì)選擇一種構(gòu)建方法,分層構(gòu)建首先調(diào)用createBaseCuboidStep方法,生成Base Cuboid數(shù)據(jù)文件,然后進(jìn)入for循環(huán),調(diào)用createNDimensionCuboidStep方法,根據(jù)Base Cuboid計(jì)算N層Cuboid數(shù)據(jù)。
在Cuboid的數(shù)據(jù)都產(chǎn)生好之后,還需要放到存儲(chǔ)層中,所以接下來(lái)調(diào)用outputSide實(shí)例的addStepPhase3_BuildCube方法,HBaseMROutput2Transition類中的addStepPhase3_BuildCube方法主要有兩步,一是createConvertCuboidToHfileStep方法,將計(jì)算引擎產(chǎn)生的cuboid數(shù)據(jù)轉(zhuǎn)換成HBase要求的HFile格式,二是createBulkLoadStep方法,即把HFIle數(shù)據(jù)加載到HBase中。
——————————————————————————————————
構(gòu)建Base Cuboid步驟參數(shù)例子:
?-conf /usr/local/apps/kylin/conf/kylin_job_conf.xml -cubename kylin_sales_cube -segmentid 392634bd-4964-428c-a905-9bbf28884452 -input FLAT_TABLE -output hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/cuboid/level_base_cuboid -jobname Kylin_Base_Cuboid_Builder_kylin_sales_cube -level 0 -cubingJobId 6f3c2a9e-7283-4d87-9487-a5ebaffef811
——————————————————————————————————
構(gòu)建N層Cuboid步驟參數(shù)例子:
?-conf /usr/local/apps/kylin/conf/kylin_job_conf.xml -cubename kylin_sales_cube -segmentid 392634bd-4964-428c-a905-9bbf28884452 -input hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/cuboid/level_1_cuboid -output hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/cuboid/level_2_cuboid -jobname Kylin_ND-Cuboid_Builder_kylin_sales_cube_Step -level 2 -cubingJobId 6f3c2a9e-7283-4d87-9487-a5ebaffef811
——————————————————————————————————
轉(zhuǎn)換HFile格式步驟參數(shù)例子:
?-conf /usr/local/apps/kylin/conf/kylin_job_conf.xml -cubename kylin_sales_cube -partitions hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/rowkey_stats/part-r-00000_hfile -input hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/cuboid/* -output hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/hfile -htablename KYLIN_O2SYZPV449 -jobname Kylin_HFile_Generator_kylin_sales_cube_Step
——————————————————————————————————
加載HFile到HBase步驟參數(shù)例子:
?-input hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/hfile -htablename KYLIN_O2SYZPV449 -cubename kylin_sales_cube
——————————————————————————————————
最后一步就是一些收尾工作,包括更新Cube元數(shù)據(jù)信息,調(diào)用inputSide和outputSide實(shí)例進(jìn)行中間臨時(shí)數(shù)據(jù)的清理工作。
完成所有步驟之后,就回到了JobService的submitJob方法中,在得到CubingJob的實(shí)例后,會(huì)執(zhí)行以上代碼。這里做的是將CubingJob的信息物化到HBase的kylin_metadata表中,并沒(méi)有真正的提交執(zhí)行。
真正執(zhí)行CubingJob的地方是在DefaultScheduler,它里面有一個(gè)線程會(huì)每隔一分鐘,就去HBase的kylin_metadata表中掃一遍所有的CubingJob,然后將需要執(zhí)行的job,提交到線程池執(zhí)行。
kylin中任務(wù)的構(gòu)建和執(zhí)行是異步的。單個(gè)kylin節(jié)點(diǎn)有query、job和all三種角色,query只提供查詢服務(wù),job只提供真正的構(gòu)建服務(wù),all則兼具前兩者功能。實(shí)際操作中kylin的三種角色節(jié)點(diǎn)都可以進(jìn)行CubingJob的構(gòu)建,但只有all和job模式的節(jié)點(diǎn)可以通過(guò)DefaultScheduler進(jìn)行調(diào)度執(zhí)行
---------------------?
總目錄
Kylin系列(一)—— 入門?
Kylin系列(二)—— Cube 構(gòu)造算法
總目錄
Kylin cube 構(gòu)造算法
逐層算法(layer Cubing)
算法的優(yōu)點(diǎn)
算法的缺點(diǎn)
快速Cube算法(Fast Cubing)
舉個(gè)例子
子立方體生成樹(shù)(Cuboid spanning Tree)的遍歷次序
優(yōu)點(diǎn)
缺點(diǎn)
By-layer Spark Cubing算法
改進(jìn)
Spark中Cubing的過(guò)程
性能測(cè)試
Kylin cube 構(gòu)造算法
逐層算法(layer Cubing)
我們知道,一個(gè)N維的Cube,是有1個(gè)N維子立方體、N個(gè)(N-1)維子立方體、N*(N-1)/2個(gè)(N-2)維子立方體、……、N個(gè)1維子立方體和1個(gè)0維子立方體構(gòu)成,總共有2^N個(gè)子立方體。在逐層算法中,按照維度數(shù)逐層減少來(lái)計(jì)算,每個(gè)層級(jí)的計(jì)算(除了第一層,他是從原始數(shù)據(jù)聚合而來(lái)),是基于他上一層級(jí)的計(jì)算結(jié)果來(lái)計(jì)算的。
比如group by [A,B]的結(jié)果,可以基于group by [A,B,C]的結(jié)果,通過(guò)去掉C后聚合得來(lái)的,這樣可以減少重復(fù)計(jì)算;當(dāng)0維Cuboid計(jì)算出來(lái)的時(shí)候,整個(gè)Cube的計(jì)算也就完成了。
如上圖所示,展示了一個(gè)4維的Cube構(gòu)建過(guò)程。
此算法的Mapper和Reducer都比較簡(jiǎn)單。Mapper以上一層Cuboid的結(jié)果(key-value對(duì))作為輸入。由于Key是由各維度值拼接在一起,從其中找出要聚合的維度,去掉它的值成新的key,并對(duì)value進(jìn)行操作,然后把新的key和value輸出,進(jìn)而Hadoop MapReduce對(duì)所有新的key進(jìn)行排序、洗牌(shuffle)、再送到Reducer處;Reducer的輸入會(huì)是一組具有相同key的value集合,對(duì)這些value做聚合運(yùn)算,再結(jié)合key輸出就完成了一輪計(jì)算。
舉個(gè)例子:?
假設(shè)一共四個(gè)維度A/B/C/D,他們的成員分別是(A1、A2、A3),(B1、B2)、(C1)、(D1),有一個(gè)measure(對(duì)于這列V,計(jì)算sum(V)),這里忽略dictionary編碼。原始表如下:?
那么base cuboid最終的輸出如下?
(A1、B1、C1、D1、2)?
(A1、B2、C1、D1, 3)?
(A2、B1、C1、D1, 5)?
(A3、B1、C1、D1, 6)?
(A3、B2、C1、D1, 8)?
那么它作為下面一個(gè)cuboid的輸入,對(duì)于第一行輸入?
(A1、B1、C1、D1,2),mapper執(zhí)行完成之后會(huì)輸出?
(A1、B1、C1, 2)、?
(A1、B1、D1, 2)、?
(A1、C1、D1, 2)、?
(B1、C1、D1,2)這四項(xiàng),同樣對(duì)于其他的內(nèi)一行也會(huì)輸出四行,最終他們經(jīng)過(guò)reducer的聚合運(yùn)算,得到如下的結(jié)果:?
(A1、B1、C1, 2)?
(A1、B1、D1, 2)?
(A1、C1、D1, 2 + 3)?
(B1、C1、D1,2 + 5 +6)
這個(gè)例子其實(shí)在cube的構(gòu)建過(guò)程中可以看到。
一定要注意,這里的每一輪計(jì)算都是MapReducer任務(wù),且串行執(zhí)行;一個(gè)N維的Cube,至少需要N次MapReduce Job。
算法的優(yōu)點(diǎn)
此算法充分利用了MR的能力,處理了中間復(fù)雜的排序和洗牌工作,故而算法代碼清晰簡(jiǎn)單,易于維護(hù)。
受益于Hadoop的日趨成熟,此算法對(duì)集群要求低,運(yùn)行穩(wěn)定。
算法的缺點(diǎn)
當(dāng)Cube有比較多維度的時(shí)候,所需要的MR任務(wù)也相應(yīng)增加;由于Hadoop的任務(wù)調(diào)度需要耗費(fèi)額外資源,特別是集群較龐大的時(shí)候,反復(fù)遞交任務(wù)造成的額外開(kāi)銷會(huì)很可觀
由于Mapper不做預(yù)聚合,此算法會(huì)對(duì)Hadoop MR輸出較多數(shù)據(jù);雖然已經(jīng)使用了Combiner來(lái)減少?gòu)腗apper端到Reducer端的數(shù)據(jù)傳輸,所有數(shù)據(jù)依然需要通過(guò)MR來(lái)排序和組合才能被聚合,無(wú)形之中增加了集群的壓力。
對(duì)HDFS的讀寫(xiě)操作較多:由于每一層計(jì)算的輸出會(huì)用作下一層計(jì)算的輸入,這些Key-value需要寫(xiě)到HDFS上;當(dāng)所有計(jì)算都完成后,Kylin還需要額外一輪任務(wù)將這些文件轉(zhuǎn)成Hbase的HFile格式,以導(dǎo)入到HBase中去。
總體而言,該算法的效率較低,尤其當(dāng)Cube維度數(shù)較大的時(shí)候。
這里其實(shí)在困惑到底什么是0維,后來(lái)想明白了。舉個(gè)例子,現(xiàn)在有一個(gè)度量叫成交量。有幾個(gè)維度從大到小:業(yè)務(wù)類型、渠道、門店。3維的例子就是[業(yè)務(wù)類型、渠道、門店],二維的例子是[業(yè)務(wù)類型、渠道],一維[業(yè)務(wù)類型],0維其實(shí)就是沒(méi)有維度,也就是全部聚合,舉個(gè)例子就是
select sum(price) from table1
1
其實(shí)在我看來(lái),逐層算法就是先算維度數(shù)最高的,一層算完后,再算維度數(shù)減少的一層,以此類推。至于為什么從層級(jí)高向?qū)蛹?jí)低計(jì)算,而不是反過(guò)來(lái),在于如果是反過(guò)來(lái),那你每次的計(jì)算量都是初始數(shù)據(jù),數(shù)據(jù)量非常大,沒(méi)必要。
快速Cube算法(Fast Cubing)
快速Cube算法,它還被稱作“逐段”(By Segment)或“逐塊”(By Split)算法。
該算法的主要思想,對(duì)Mapper所分配的數(shù)據(jù)塊,將它計(jì)算成一個(gè)完整的小Cube段(包含所有Cuboid);每個(gè)Mapper將計(jì)算完的Cube段輸出給Reducer做合并,生成大Cube,也就是最終結(jié)果。
與舊算法相比,快速算法主要有兩點(diǎn)不同:
Mapper會(huì)利用內(nèi)存做預(yù)聚合,算出所有組合;Mapper輸出的每個(gè)Key都是不同的,這樣會(huì)減少輸出到Hadoop MapReduce的數(shù)據(jù)量,Combiner也不再需要;
一輪MapReduce便會(huì)完成所有層次的計(jì)算,減少Hadoop任務(wù)的調(diào)配。
來(lái)說(shuō)個(gè)比較。逐層算法的每一層的計(jì)算都有一個(gè)MapReduce任務(wù),因?yàn)槭菑母呔S到低維的MR任務(wù),任務(wù)之間傳遞的數(shù)據(jù)量是非常大的。比如上面的例子,生成4維的數(shù)據(jù),需要在mapper中對(duì)全數(shù)據(jù)進(jìn)行的整理,再傳遞給reducer聚合,如果數(shù)據(jù)量非常大,那么網(wǎng)絡(luò)IO是很大的。而快速算法,它會(huì)對(duì)某個(gè)分片數(shù)據(jù)進(jìn)行構(gòu)造完整的cube(所有cuboid)。再將mapper中的數(shù)據(jù)送入reducer進(jìn)行大聚合生成Cube。這其實(shí)是在map階段就已經(jīng)完成了聚合,IO是很小的。
舉個(gè)例子
這里不理解沒(méi)關(guān)系,看完后面的構(gòu)建過(guò)程再翻回來(lái)看例子就能懂
一個(gè)Cube有4個(gè)維度:A,B,C,D;每個(gè)Mapper都有100萬(wàn)個(gè)源記錄要處理;Mapper中的列基數(shù)是Car(A),Car(B),Car(C)和Car(D)。(cardinal 基數(shù))
當(dāng)講源記錄聚集到base cuboid(1111)時(shí),使用舊的“逐層”算法,每個(gè)Mapper將向Hadoop輸出1百萬(wàn)條記錄;使用快速立方算法,在預(yù)聚合之后,它預(yù)聚合之后,它只向Hadoop輸出[distinct A,B,C,D]記錄的數(shù)量,這樣肯定比源數(shù)據(jù)小;在正常情況下,他可以源記錄大小的1/10到1/100.
當(dāng)從父cuboid聚合到子cuboid時(shí),從base cuboid(1111) 到3維cuboid 0111,將會(huì)聚合維度A;我們假設(shè)維度A與其他維度獨(dú)立的,聚合后,cuboid 0111的維度base cuboid的1/Card(A);所以在這一步的輸出將減少到原來(lái)的1/Card(A);
總的來(lái)說(shuō),假設(shè)維度的平均基數(shù)是Card(N),從Mapper到Reducer的寫(xiě)入記錄可以減少到原始維度的1/Card(N);Hadoop的輸出越少,I/O和計(jì)算越少,性能就越好。
這里要提一句,其實(shí)很多都是類似的,比如在hive中處理大表,?
各種的調(diào)優(yōu)都和IO、計(jì)算有關(guān)系,因?yàn)樗麄兌际腔贛R任務(wù)。
子立方體生成樹(shù)(Cuboid spanning Tree)的遍歷次序
在舊算法中,Kylin按照層級(jí),也就是廣度優(yōu)先遍歷(Broad First Search)的次序計(jì)算出各個(gè)Cuboid;在快速Cube算法中,Mapper會(huì)按照深度優(yōu)先遍歷(Depth First Search)來(lái)計(jì)算各個(gè)Cuboid。?
深度優(yōu)先遍歷是一個(gè)遞歸方法,將父cuboid壓棧以計(jì)算子Cuboid,直到?jīng)]有子Cuboid需要計(jì)算才出棧并輸出給Hadoop;需要最多暫存N個(gè)Cuboid,N是Cube維度數(shù)。
采用DFS,是為了兼顧C(jī)PU和內(nèi)存。
從父Cuboid計(jì)算子Cuboid,避免重復(fù)計(jì)算。
只壓棧當(dāng)前計(jì)算的Cuboid的父Cuboid,減少內(nèi)存占用。?
舉個(gè)例子從3維到2維的MR任務(wù)中計(jì)算CD,BFS會(huì)壓入ABC ABD ACD BCD,mapper進(jìn)行切分,reducer進(jìn)行聚合;而在DFS中,只會(huì)壓入ABCD,BCD,內(nèi)存大大減少。
上圖是一個(gè)四維Cube的完整生成樹(shù):
按照DFS的次序,在0維Cuboid輸出前的計(jì)算次序是ABCD-》BCD-》CD-》D-》0維,ABCD,BCD,CD和D需要被暫存;在被輸出后,D可被輸出,內(nèi)存得到釋放;在C被計(jì)算并輸出后,CD就可以被輸出,ABCD最后被輸出。
使用DFS訪問(wèn)順序,Mapper的輸出已完全排序,因?yàn)镃uboid ID位于行鍵的開(kāi)始位置,而內(nèi)部的Cuboid的行已排序。
0000?
0001[D0]?
0001[D1]?
....?
0010[C0]?
0010[C1]?
....?
0011[C0][D0]?
0011[C0][D1]?
....?
....?
1111[A0][B0][C0][D0]?
....?
這里的寫(xiě)法可以看構(gòu)造過(guò)程。?
由于mapper的輸出已經(jīng)排序,Hadoop的排序效率會(huì)更高。
此外,mapper的預(yù)聚合發(fā)生在內(nèi)存中,這樣可以避免不必要的磁盤(pán)和網(wǎng)絡(luò)IO,并減少了hadoop的開(kāi)銷。
在開(kāi)發(fā)階段,我們?cè)趍apper中遇到了OOM錯(cuò)誤;這可能發(fā)生在:?
- Mapper的JVM堆大小很小?
- 使用 distinct count度量?
- 使用樹(shù)太深(維度太多)?
- 給Mapper的數(shù)據(jù)太大
我們意識(shí)到Kylin不能認(rèn)為mapper總是有足夠的內(nèi)存;Cubing算法需要自適應(yīng)各種情況;
當(dāng)主動(dòng)檢測(cè)到OOM錯(cuò)誤,會(huì)優(yōu)化內(nèi)存使用并將數(shù)據(jù)spilling到磁盤(pán)上;結(jié)果是有希望的,OOM錯(cuò)誤現(xiàn)在很少發(fā)生。
優(yōu)點(diǎn)
它比舊的方法更快;從我們的比較測(cè)試中可以減少30%到50%的build總時(shí)間:快在排序,快在IO。
他在Hadoop上產(chǎn)生較少的工作負(fù)載,并在HDFS上留下較少的中間文件。
Cubing和Spark等其他立方體引起可以輕松地重復(fù)使用該立方體代碼。
缺點(diǎn)
該算法有點(diǎn)復(fù)雜,這增加了維護(hù)工作;
雖然該算法可以自動(dòng)將數(shù)據(jù)spill到磁盤(pán),但他仍希望Mapper有足夠的內(nèi)存來(lái)獲得最佳性能。
用戶需要更多知識(shí)來(lái)調(diào)整立方體。
By-layer Spark Cubing算法
我們知道,RDD(Resilient Distributed DataSet)是Spark中的一個(gè)基本概念。N維立方體的組合可以很好地描述為RDD,N維立方體將具有N+1個(gè)RDD。這些RDD具有parent/child關(guān)系,因?yàn)檫@些parent RDD 可用于生成child RDD。通過(guò)將父RDD緩存在內(nèi)存中,子RDD的生成可以比磁盤(pán)讀取更有效。
改進(jìn)
每一層的cuboid視為一個(gè)RDD
父RDD被盡可能cache到內(nèi)存
RDD 被導(dǎo)出為sequence file
通過(guò)將“map”替換為“flatMap”,以及把“reduce”替換為“reduceByKey”,可以復(fù)用大部分代碼
Spark中Cubing的過(guò)程
下圖DAG(有向無(wú)環(huán)圖),它詳細(xì)說(shuō)明了這個(gè)過(guò)程:
在Stage 5中,Kylin使用HiveContext讀取中間Hive表,然后執(zhí)行一個(gè)一對(duì)一映射的”map”操作將原始值編碼為KV字節(jié)。完成后Kylin得到一個(gè)中間編碼的RDD。
在Stage 6中,中間RDD用一個(gè)“ReduceByKey”操作聚合以獲得RDD-1,這是base cuboid。接下來(lái),在RDD-1做了一個(gè)flatMap(一對(duì)多map),因?yàn)閎ase cuboid有N個(gè)cuboid。以此類推,各級(jí)RDD得到計(jì)算。在完成時(shí),這些RDD將完整地保存在分布式文件系統(tǒng),但可以緩存在內(nèi)存中用于下一級(jí)計(jì)算。當(dāng)生成子cuboid時(shí),他將從緩存中刪除。
其實(shí)我們和舊的逐層算法去比較會(huì)發(fā)現(xiàn),他們之間的構(gòu)建沒(méi)有什么大的差別,只不過(guò)Spark的是在內(nèi)存中進(jìn)行的,無(wú)需從磁盤(pán)讀取和網(wǎng)絡(luò)IO。并且后面的stage的第一步是reduce。
性能測(cè)試
在所有這三種情況下,Spark都比MR快,總體而言它可以減少約一半的時(shí)間。
Kylin的構(gòu)建算法以及和spark的改進(jìn)?
http://cxy7.com/articles/2018/06/09/1528549073259.html?
https://www.cnblogs.com/zlslch/p/7404465.html
---------------------?
e Kylin是一個(gè)開(kāi)源的分布式分析引擎,提供Hadoop之上的SQL查詢接口及多維分析(OLAP)能力以支持超大規(guī)模數(shù)據(jù)。它能在亞秒內(nèi)查詢巨大的Hive表。本文將詳細(xì)介紹Apache Kylin 1.5中的Fast-Cubing算法。
Fast Cubing,也稱快速數(shù)據(jù)立方算法, 是一個(gè)新的Cube算法。我們知道,Cube的思想是用空間換時(shí)間, 通過(guò)預(yù)先的計(jì)算,把索引及結(jié)果存儲(chǔ)起來(lái),以換取查詢時(shí)候的高性能?。在Kylin v1.5以前,Kylin中的Cube只有一種算法:layered cubing,也稱逐層算法:它是逐層由底向上,把所有組合算完的過(guò)程。
Cube構(gòu)建算法介紹
1 逐層算法(Layer Cubing)
我們知道,一個(gè)N維的Cube,是由1個(gè)N維子立方體、N個(gè)(N-1)維子立方體、N*(N-1)/2個(gè)(N-2)維子立方體、......、N個(gè)1維子立方體和1個(gè)0維子立方體構(gòu)成,總共有2^N個(gè)子立方體組成,在逐層算法中,按維度數(shù)逐層減少來(lái)計(jì)算,每個(gè)層級(jí)的計(jì)算(除了第一層,它是從原始數(shù)據(jù)聚合而來(lái)),是基于它上一層級(jí)的結(jié)果來(lái)計(jì)算的。
比如,[Group by A, B]的結(jié)果,可以基于[Group by A, B, C]的結(jié)果,通過(guò)去掉C后聚合得來(lái)的;這樣可以減少重復(fù)計(jì)算;當(dāng) 0維度Cuboid計(jì)算出來(lái)的時(shí)候,整個(gè)Cube的計(jì)算也就完成了?! ?/p>
逐層算法
?
如上圖所示,展示了一個(gè)4維的Cube構(gòu)建過(guò)程。
此算法的Mapper和Reducer都比較簡(jiǎn)單。Mapper以上一層Cuboid的結(jié)果(Key-Value對(duì))作為輸入。由于Key是由各維度值拼接在一起,從其中找出要聚合的維度,去掉它的值成新的Key,并對(duì)Value進(jìn)行操作,然后把新Key和Value輸出,進(jìn)而Hadoop MapReduce對(duì)所有新Key進(jìn)行排序、洗牌(shuffle)、再送到Reducer處;Reducer的輸入會(huì)是一組有相同Key的Value集合,對(duì)這些Value做聚合計(jì)算,再結(jié)合Key輸出就完成了一輪計(jì)算。
每一輪的計(jì)算都是一個(gè)MapReduce任務(wù),且串行執(zhí)行; 一個(gè)N維的Cube,至少需要N次MapReduce Job。
Layer Cubing算法優(yōu)點(diǎn)
此算法充分利用了MapReduce的能力,處理了中間復(fù)雜的排序和洗牌工作,故而算法代碼清晰簡(jiǎn)單,易于維護(hù);
受益于Hadoop的日趨成熟,此算法對(duì)集群要求低,運(yùn)行穩(wěn)定;在內(nèi)部維護(hù)Kylin的過(guò)程中,很少遇到在這幾步出錯(cuò)的情況;即便是在Hadoop集群比較繁忙的時(shí)候,任務(wù)也能完成。
Layer Cubing算法缺點(diǎn)
當(dāng)Cube有比較多維度的時(shí)候,所需要的MapReduce任務(wù)也相應(yīng)增加;由于Hadoop的任務(wù)調(diào)度需要耗費(fèi)額外資源,特別是集群較龐大的時(shí)候,反復(fù)遞交任務(wù)造成的額外開(kāi)銷會(huì)相當(dāng)可觀;
由于Mapper不做預(yù)聚合,此算法會(huì)對(duì)Hadoop MapReduce輸出較多數(shù)據(jù); 雖然已經(jīng)使用了Combiner來(lái)減少?gòu)腗apper端到Reducer端的數(shù)據(jù)傳輸,所有數(shù)據(jù)依然需要通過(guò)Hadoop MapReduce來(lái)排序和組合才能被聚合,無(wú)形之中增加了集群的壓力;
對(duì)HDFS的讀寫(xiě)操作較多:由于每一層計(jì)算的輸出會(huì)用做下一層計(jì)算的輸入,這些Key-Value需要寫(xiě)到HDFS上;當(dāng)所有計(jì)算都完成后,Kylin還需要額外的一輪任務(wù)將這些文件轉(zhuǎn)成HBase的HFile格式,以導(dǎo)入到HBase中去;
總體而言,該算法的效率較低,尤其是當(dāng)Cube維度數(shù)較大的時(shí)候;時(shí)常有用戶問(wèn),是否能改進(jìn)Cube算法,縮短時(shí)間。
2 快速Cube算法(Fast Cubing)
快速Cube算法(Fast Cubing)是麒麟團(tuán)隊(duì)對(duì)新算法的一個(gè)統(tǒng)稱,它還被稱作“逐段”(By Segment) 或“逐塊”(By Split) 算法。
該算法的主要思想是,對(duì)Mapper所分配的數(shù)據(jù)塊,將它計(jì)算成一個(gè)完整的小Cube 段(包含所有Cuboid);每個(gè)Mapper將計(jì)算完的Cube段輸出給Reducer做合并,生成大Cube,也就是最終結(jié)果;圖2解釋了此流程。新算法的核心思想是清晰簡(jiǎn)單的,就是最大化利用Mapper端的CPU和內(nèi)存,對(duì)分配的數(shù)據(jù)塊,將需要的組合全都做計(jì)算后再輸出給Reducer;由Reducer再做一次合并(merge),從而計(jì)算出完整數(shù)據(jù)的所有組合。如此,經(jīng)過(guò)一輪Map-Reduce就完成了以前需要N輪的Cube計(jì)算。圖2是此算法的概覽。
在Mapper內(nèi)部, 也可以有一些優(yōu)化,圖3是一個(gè)典型的四維Cube的生成樹(shù);第一步會(huì)計(jì)算Base Cuboid(所有維度都有的組合),再基于它計(jì)算減少一個(gè)維度的組合。基于parent節(jié)點(diǎn)計(jì)算child節(jié)點(diǎn),可以重用之前的計(jì)算結(jié)果;當(dāng)計(jì)算child節(jié)點(diǎn)時(shí),需要parent節(jié)點(diǎn)的值盡可能留在內(nèi)存中; 如果child節(jié)點(diǎn)還有child,那么遞歸向下,所以它是一個(gè)深度優(yōu)先遍歷。當(dāng)有一個(gè)節(jié)點(diǎn)沒(méi)有child,或者它的所有child都已經(jīng)計(jì)算完,這時(shí)候它就可以被輸出,占用的內(nèi)存就可以釋放。
如果內(nèi)存夠的話,可以多線程并行向下聚合。如此可以最大限度地把計(jì)算發(fā)生在Mapper這一端,一方面減少shuffle的數(shù)據(jù)量,另一方面減少Reducer端的計(jì)算量。
Fast Cubing的優(yōu)點(diǎn):
總的IO量比以前大大減少。?
此算法可以脫離Map-Reduce而對(duì)數(shù)據(jù)做Cube計(jì)算,故可以很容易地在其它場(chǎng)景或框架下執(zhí)行,例如Streaming 和Spark。
Fast Cubing的缺點(diǎn):
代碼比以前復(fù)雜了很多: 由于要做多層的聚合,并且引入多線程機(jī)制,同時(shí)還要估算JVM可用內(nèi)存,當(dāng)內(nèi)存不足時(shí)需要將數(shù)據(jù)暫存到磁盤(pán),所有這些都增加復(fù)雜度。
對(duì)Hadoop資源要求較高,用戶應(yīng)盡可能在Mapper上多分配內(nèi)存;如果內(nèi)存很小,該算法需要頻繁借助磁盤(pán),性能優(yōu)勢(shì)就會(huì)較弱。在極端情況下(如數(shù)據(jù)量很大同時(shí)維度很多),任務(wù)可能會(huì)由于超時(shí)等原因失敗;
要讓Fast-Cubing算法獲得更高的效率,用戶需要了解更多一些“內(nèi)情”。
首先,在v1.5里,Kylin在對(duì)Fast-Cubing請(qǐng)求資源時(shí)候,默認(rèn)是為Mapper任務(wù)請(qǐng)求3Gb的內(nèi)存,給JVM2.7Gb。如果Hadoop節(jié)點(diǎn)可用內(nèi)存較多的話,用戶可以讓Kylin獲得更多內(nèi)存:在conf/kylin_job_conf_inmem.xml文件,由參數(shù)“mapreduce.map.memory.mb”和“mapreduce.map.java.opts”設(shè)定 。
其次,需要在并發(fā)性和Mapper端聚合之間找到一個(gè)平衡。在v1.5.2里,Kylin默認(rèn)是給每個(gè)Mapper分配32兆的數(shù)據(jù);這樣可以獲得較高的并發(fā)性。但如果Hadoop集群規(guī)模較小,或可用資源較少,過(guò)多的Mapper會(huì)造成任務(wù)排隊(duì)。這時(shí),將數(shù)據(jù)塊切得更大,如 64兆,效果會(huì)更好。數(shù)據(jù)塊是由Kylin創(chuàng)建Hive平表時(shí)生成的, 在kylin_hive_conf.xml由參數(shù)dfs.block.size決定的。從v1.5.3開(kāi)始,分配策略又有改進(jìn),給每個(gè)mapper會(huì)分配一樣的行數(shù),從而避免數(shù)據(jù)塊不均勻時(shí)的木桶效應(yīng):由conf/kylin.properteis里的“kylin.job.mapreduce.mapper.input.rows”配置,默認(rèn)是100萬(wàn),用戶可以示自己集群的規(guī)模設(shè)置更小值獲得更高并發(fā),或更大值減少請(qǐng)求的Mapper數(shù)。
通常推薦Fast-Cubing 算法,但不是所有情況下都如此。舉例說(shuō)明,如果每個(gè)Mapper之間的key交叉重合度較低,fast cubing更適合;因?yàn)镸apper端將這塊數(shù)據(jù)最終要計(jì)算的結(jié)果都達(dá)到了,Reducer只需少量的聚合。另一個(gè)極端是,每個(gè)Mapper計(jì)算出的key跟其它 Mapper算出的key深度重合,這意味著在reducer端仍需將各個(gè)Mapper的數(shù)據(jù)抓取來(lái)再次聚合計(jì)算;如果key的數(shù)量巨大,該過(guò)程IO開(kāi)銷依然顯著。對(duì)于這種情況,Layered-Cubing更適合。
用戶該如何選擇算法呢?無(wú)需擔(dān)心,Kylin會(huì)自動(dòng)選擇合適的算法。Kylin在計(jì)算Cube之前對(duì)數(shù)據(jù)進(jìn)行采樣,在“fact distinct”步,利用HyperLogLog模擬去重,估算每種組合有多少不同的key,從而計(jì)算出每個(gè)Mapper輸出的數(shù)據(jù)大小,以及所有Mapper之間數(shù)據(jù)的重合度,據(jù)此來(lái)決定采用哪種算法更優(yōu)。在對(duì)上百個(gè)Cube任務(wù)的時(shí)間做統(tǒng)計(jì)分析后,Kylin選擇了7做為默認(rèn)的算法選擇閥值(參數(shù)kylin.cube.algorithm.layer-or-inmem-threshold):如果各個(gè)Mapper的小Cube的行數(shù)之和,大于reduce后的Cube行數(shù)的7倍,采用Layered Cubing, 反之采用Fast Cubing。如果用戶在使用過(guò)程中,更傾向于使用Fast Cubing,可以適當(dāng)調(diào)大此參數(shù)值,反之調(diào)小。
????????????????int mapperNumLimit = kylinConf.getCubeAlgorithmAutoMapperLimit();
????????????????double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); // default 7
????????????????logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit);
????????????????logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + overlapThreshold);
????????????????// in-mem cubing is good when
????????????????// 1) the cluster has enough mapper slots to run in parallel
????????????????// 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage
????????????????alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= overlapThreshold)//
????????????????????????? CubingJob.AlgorithmEnum.INMEM
????????????????????????: CubingJob.AlgorithmEnum.LAYER;
Kylin Cube 構(gòu)建算法結(jié)論(逐層算法和快速算法):
1、如果每個(gè)Mapper之間的key交叉重合度較低,fast cubing更適合;因?yàn)镸apper端將這塊數(shù)據(jù)最終要計(jì)算的結(jié)果都達(dá)到了,Reducer只需少量的聚合。另一個(gè)極端是,每個(gè)Mapper計(jì)算出的key跟其它 Mapper算出的key深度重合,這意味著在reducer端仍需將各個(gè)Mapper的數(shù)據(jù)抓取來(lái)再次聚合計(jì)算;如果key的數(shù)量巨大,該過(guò)程IO開(kāi)銷依然顯著。對(duì)于這種情況,Layered-Cubing更適合。
2、在對(duì)上百個(gè)Cube任務(wù)的時(shí)間做統(tǒng)計(jì)分析后,Kylin選擇了7做為默認(rèn)的算法選擇閥值(參數(shù)kylin.cube.algorithm.auto.threshold):如果各個(gè)Mapper的小Cube的行數(shù)之和,大于reduce后的Cube行數(shù)的8倍(各個(gè)Mapper的小Cube的行數(shù)之和 /?reduce后的Cube行數(shù) > 7),采用Layered Cubing, 反之采用Fast Cubing(本質(zhì)就是各個(gè)Mapper之間的key重復(fù)度越小,就用Fast Cubing,重復(fù)度越大,就用Layered Cubing)
---------------------?
轉(zhuǎn)載于:https://my.oschina.net/hblt147/blog/3006400
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的kylin KV+cube方案分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: gRPC入门
- 下一篇: unity2D限制位置的背景移动补偿效果