Hive Join Strategies hive的连接策略
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
Common Join
最為普通的join策略,不受數(shù)據(jù)量的大小影響,也可以叫做reduce side join ,最沒效率的一種join 方式. 它由一個(gè)mapreduce job 完成.
首先將大表和小表分別進(jìn)行map 操作, 在map shuffle 的階段每一個(gè)map output key 變成了table_name_tag_prefix + join_column_value , 但是在進(jìn)行partition 的時(shí)候它仍然只使用join_column_value 進(jìn)行hash.
每一個(gè)reduce 接受所有的map 傳過來的split , 在reducce 的shuffle 階段,它將map output key 前面的table_name_tag_prefix 給舍棄掉進(jìn)行比較. 因?yàn)?span style="color:#FF0000;">reduce 的個(gè)數(shù)可以由小表的大小進(jìn)行決定,所以對于每一個(gè)節(jié)點(diǎn)的reduce 一定可以將小表的split 放入內(nèi)存變成hashtable. 然后將大表的每一條記錄進(jìn)行一條一條的比較.
真正的Join在reduce階段。
MapJoin
Map Join 的計(jì)算步驟分兩步,將小表的數(shù)據(jù)變成hashtable廣播到所有的map 端,將大表的數(shù)據(jù)進(jìn)行合理的切分,然后在map 階段的時(shí)候用大表的數(shù)據(jù)一行一行的去探測(probe) 小表的hashtable. 如果join key 相等,就寫入HDFS.
map join 之所以叫做map join 是因?yàn)樗械墓ぷ鞫荚趍ap 端進(jìn)行計(jì)算.
hive 在map join 上做了幾個(gè)優(yōu)化:
hive 0.6 的時(shí)候默認(rèn)認(rèn)為寫在select 后面的是大表,前面的是小表, 或者使用 /*+mapjoin(map_table) */ 提示進(jìn)行設(shè)定. hive 0.7 的時(shí)候這個(gè)計(jì)算是自動(dòng)化的,它首先會(huì)自動(dòng)判斷哪個(gè)是小表,哪個(gè)是大表,這個(gè)參數(shù)由(hive.auto.convert.join=true)來控制. 然后控制小表的大小由(hive.smalltable.filesize=25000000L)參數(shù)控制(默認(rèn)是25M),當(dāng)小表超過這個(gè)大小,hive 會(huì)默認(rèn)轉(zhuǎn)化成common join. 你可以查看HIVE-1642.
首先小表的Map 階段它會(huì)將自己轉(zhuǎn)化成MapReduce Local Task ,然后從HDFS 取小表的所有數(shù)據(jù),將自己轉(zhuǎn)化成Hashtable file 并壓縮打包放入DistributedCache 里面.
目前hive 的map join 有幾個(gè)限制,一個(gè)是它打算用BloomFilter 來實(shí)現(xiàn)hashtable , BloomFilter 大概比hashtable 省8-10倍的內(nèi)存, 但是BloomFilter 的大小比較難控制.
現(xiàn)在DistributedCache 里面hashtable默認(rèn)的復(fù)制是3份,對于一個(gè)有1000個(gè)map 的大表來說,這個(gè)數(shù)字太小,大多數(shù)map 操作都等著DistributedCache 復(fù)制.
優(yōu)化后的map-join
Converting Common Join into Map Join
判斷誰是大表誰是小表(小表的標(biāo)準(zhǔn)就是size小于hive.mapjoin.smalltable.filesize的值)
Hive在Compile階段的時(shí)候?qū)γ恳粋€(gè)common join會(huì)生成一個(gè)conditional task,并且對于每一個(gè)join table,會(huì)假設(shè)這個(gè)table是大表,生成一個(gè)mapjoin task,然后把這些mapjoin tasks裝進(jìn)
conditional task(List<Task<? extends Serializable>> resTasks),同時(shí)會(huì)映射大表的alias和對應(yīng)的mapjoin task。在runtime運(yùn)行時(shí),resolver會(huì)讀取每個(gè)table alias對應(yīng)的input file size,如果小表的file size比設(shè)定的threshold要低 (hive.mapjoin.smalltable.filesize,默認(rèn)值為25M),那么就會(huì)執(zhí)行converted mapjoin task。對于每一個(gè)mapjoin task同時(shí)會(huì)設(shè)置一個(gè)backup task,就是先前的common join task,一旦mapjoin task執(zhí)行失敗了,則會(huì)啟用backup task
Performance Bottleneck
性能瓶頸
1、Distributed Cache is the potential performance bottleneck
分布式緩存是一個(gè)潛在的性能瓶頸
A、Large hashtable file will slow down the propagation of Distributed Cache
大的hashtable文件將會(huì)減速分布式緩存的傳播
B、Mappers are waiting for the hashtables file from Distributed Cache
Mapper排隊(duì)等待從分布式緩存獲取hashtables(因?yàn)槟J(rèn)一個(gè)hashtable緩存是三份,如果mappers數(shù)量太多需要一個(gè)一個(gè)的等待)
2、Compress and archive all the hashtable file into a tar file.
壓縮和歸檔所有的hashtable文件為一個(gè)tar文件。
Bucket Map Join
Why:
Total table/partition size is big, not good for mapjoin.
How:
set hive.optimize.bucketmapjoin = true;
1. Work together with map join
2. All join tables are bucketized, and each small table?s bucket number can be divided by big table?s bucket number.
所有join的表是bucketized并且小表的bucket數(shù)量是大表bucket數(shù)量的整數(shù)倍
3. Bucket columns == Join columns
hive 建表的時(shí)候支持hash 分區(qū)通過指定clustered by (col_name,xxx ) into number_buckets buckets 關(guān)鍵字.
當(dāng)連接的兩個(gè)表的join key 就是bucket column 的時(shí)候,就可以通過
hive.optimize.bucketmapjoin= true
來控制hive 執(zhí)行bucket map join 了, 需要注意的是你的小表的number_buckets 必須是大表的倍數(shù). 無論多少個(gè)表進(jìn)行連接這個(gè)條件都必須滿足.(其實(shí)如果都按照2的指數(shù)倍來分bucket, 大表也可以是小表的倍數(shù),不過這中間需要多計(jì)算一次,對int 有效,long 和string 不清楚)
Bucket Map Join 執(zhí)行計(jì)劃分兩步,第一步先將小表做map 操作變成hashtable 然后廣播到所有大表的map端,大表的map端接受了number_buckets 個(gè)小表的hashtable并不需要合成一個(gè)大的hashtable,直接可以進(jìn)行map 操作,map 操作會(huì)產(chǎn)生number_buckets 個(gè)split,每個(gè)split 的標(biāo)記跟小表的hashtable 標(biāo)記是一樣的, 在執(zhí)行projection 操作的時(shí)候,只需要將小表的一個(gè)hashtable 放入內(nèi)存即可,然后將大表的對應(yīng)的split 拿出來進(jìn)行判斷,所以其內(nèi)存限制為小表中最大的那個(gè)hashtable 的大小.
Bucket Map Join 同時(shí)也是Map Side Join 的一種實(shí)現(xiàn),所有計(jì)算都在Map 端完成,沒有Reduce 的都被叫做Map Side Join ,Bucket 只是hive 的一種hash partition 的實(shí)現(xiàn),另外一種當(dāng)然是值分區(qū).
create table a (xxx) partition by (col_name)
不過一般hive 中兩個(gè)表不一定會(huì)有同一個(gè)partition key, 即使有也不一定會(huì)是join key. 所以hive 沒有這種基于值的map side join, hive 中的list partition 主要是用來過濾數(shù)據(jù)的而不是分區(qū). 兩個(gè)主要參數(shù)為(hive.optimize.cp = true 和 hive.optimize.pruner=true)
hadoop 源代碼中默認(rèn)提供map side join 的實(shí)現(xiàn), 你可以在hadoop 源碼的src/contrib/data_join/src 目錄下找到相關(guān)的幾個(gè)類. 其中TaggedMapOutput 即可以用來實(shí)現(xiàn)hash 也可以實(shí)現(xiàn)list , 看你自己決定怎么分區(qū). Hadoop Definitive Guide 第8章關(guān)于map side join 和side data distribution 章節(jié)也有一個(gè)例子示例怎樣實(shí)現(xiàn)值分區(qū)的map side join.
上圖解釋:b表是大表,a,c是小表并且都是整數(shù)倍,將a,c表加入內(nèi)存先join然后到每個(gè)b表的map去做匹配。
Sort Merge Bucket Map Join
Why:
No limit on file/partition/table size.
How:
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
1.Work together with bucket map join
將bucket加入到map join中
2.Bucket columns == Join columns == sort columns
Bucket Map?Join?并沒有解決map?join 在小表必須完全裝載進(jìn)內(nèi)存的限制, 如果想要在一個(gè)reduce 節(jié)點(diǎn)的大表和小表都不用裝載進(jìn)內(nèi)存,必須使兩個(gè)表都在join?key 上有序才行,你可以在建表的時(shí)候就指定sorted byjoin?key?或者使用index 的方式.
做法還是兩邊要做hash bucket,而且每個(gè)bucket內(nèi)部要進(jìn)行排序。這樣一來當(dāng)兩邊bucket要做局部join的時(shí)候,只需要用類似merge sort算法中的merge操作一樣把兩個(gè)bucket順序遍歷一遍即可完成,這樣甚至都不用把一個(gè)bucket完整的加載成hashtable,這對性能的提升會(huì)有很大幫助。
set hive.optimize.bucketmapjoin?= true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
Bucket columns ==?Join?columns == sort columns
這樣小表的數(shù)據(jù)可以每次只讀取一部分,然后還是用大表一行一行的去匹配,這樣的join?沒有限制內(nèi)存的大小. 并且也可以執(zhí)行全外連接.
Skew Join
Join bottlenecked on the reducer who gets the
skewed key
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = skew_key_threshold
轉(zhuǎn)載于:https://my.oschina.net/CostBasedOptimizatio/blog/388277
總結(jié)
以上是生活随笔為你收集整理的Hive Join Strategies hive的连接策略的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: find : 路径必须在表达式之前
- 下一篇: android 开发 命名规范