Hadoop入门(九)Mapreduce高级shuffle之Combiner
一、Combiner的出現(xiàn)
(1)為什么需要進(jìn)行Map規(guī)約操
作
在上述過程中,我們看到至少兩個(gè)性能瓶頸:
(1)如果我們有10億個(gè)數(shù)據(jù),Mapper會(huì)生成10億個(gè)鍵值對(duì)在網(wǎng)絡(luò)間進(jìn)行傳輸,但如果我們只是對(duì)數(shù)據(jù)求最大值,那么很明顯的Mapper只需要輸出它所知道的最大值即可。這樣做不僅可以減輕網(wǎng)絡(luò)壓力,同樣也可以大幅度提高程序效率。
總結(jié):網(wǎng)絡(luò)帶寬嚴(yán)重被占降低程序效率;
(2)假設(shè)使用美國專利數(shù)據(jù)集中的國家一項(xiàng)來闡述數(shù)據(jù)傾斜這個(gè)定義,這樣的數(shù)據(jù)遠(yuǎn)遠(yuǎn)不是一致性的或者說平衡分布的,由于大多數(shù)專利的國家都屬于美國,這樣不僅Mapper中的鍵值對(duì)、中間階段(shuffle)的鍵值對(duì)等,大多數(shù)的鍵值對(duì)最終會(huì)聚集于一個(gè)單一的Reducer之上,壓倒這個(gè)Reducer,從而大大降低程序的性能。
總結(jié):單一節(jié)點(diǎn)承載過重降低程序性能;
(2)一種方案能夠解決這兩個(gè)問題呢?
在MapReduce編程模型中,在Mapper和Reducer之間有一個(gè)非常重要的組件,它解決了上述的性能瓶頸問題,它就是Combiner。
①與mapper和reducer不同的是,combiner沒有默認(rèn)的實(shí)現(xiàn),需要顯式的設(shè)置在conf中才有作用。
②并不是所有的job都適用combiner,只有操作滿足結(jié)合律的才可設(shè)置combiner。
combine操作類似于:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt為求和、求最大值的話,可以使用,但是如果是求中值的話,不適用。
?
?
二、Combiner的作用
當(dāng)Map程序開始產(chǎn)生結(jié)果的時(shí)候,并不是直接寫到文件的,而是利用緩存做一些排序方面的預(yù)處理操作。
每個(gè)Map任務(wù)都有一個(gè)循環(huán)內(nèi)存緩沖區(qū)(默認(rèn)100MB),當(dāng)緩存的內(nèi)容達(dá)到80%時(shí),后臺(tái)線程開始將內(nèi)容寫到文件,此時(shí)Map任務(wù)可以持續(xù)輸出結(jié)果,但如果緩沖區(qū)滿了,Map任務(wù)則需要等待。
寫文件使用round-robin方式。在寫入文件之前,先將數(shù)據(jù)按照Reduce進(jìn)行分區(qū)。對(duì)于每一個(gè)分區(qū),都會(huì)在內(nèi)存中根據(jù)key進(jìn)行排序,如果配置了Combiner,則排序后執(zhí)行Combiner(Combine之后可以減少寫入文件和傳輸?shù)臄?shù)據(jù))。
每次結(jié)果達(dá)到緩沖區(qū)的閥值時(shí),都會(huì)創(chuàng)建一個(gè)文件,在Map結(jié)束時(shí),可能會(huì)產(chǎn)生大量的文件。在Map完成前,會(huì)將這些文件進(jìn)行合并和排序。如果文件的數(shù)量超過3個(gè),則合并后會(huì)再次運(yùn)行Combiner(1、2個(gè)文件就沒有必要了)。
(1)MapReduce的一種優(yōu)化手段
每一個(gè)map都可能會(huì)產(chǎn)生大量的本地輸出,Combiner的作用就是對(duì)map端的輸出先做一次合并,以減少在map和reduce節(jié)點(diǎn)之間的數(shù)據(jù)傳輸量,以提高網(wǎng)絡(luò)IO性能
(2)Combiner的過程
1)Combiner實(shí)現(xiàn)本地key的聚合,對(duì)map輸出的key排序value進(jìn)行迭代
? ? ? ?如下所示:
? ? ?map: (K1, V1) → list(K2, V2)? combine: (K2, list(V2)) → list(K2, V2)? reduce: (K2, list(V2)) → list(K3, V3)
2)Combiner還有本地reduce功能(其本質(zhì)上就是一個(gè)reduce)
? ? ? ? ?例如wordcount的例子和找出value的最大值的程序
? ? ? ? ? combiner和reduce完全一致,如下所示:
? ? ? ? ? map: (K1, V1) → list(K2, V2)? ? ? ?combine: (K2, list(V2)) → list(K3, V3)? ? ? ?reduce: (K3, list(V3)) → list(K4, V4)
使用combiner之后,先完成的map會(huì)在本地聚合,提升速度。對(duì)于hadoop自帶的wordcount的例子,value就是一個(gè)疊加的數(shù)字,所以map一結(jié)束就可以進(jìn)行reduce的value疊加,而不必要等到所有的map結(jié)束再去進(jìn)行reduce的value疊加。
(3)融合Combiner的MapReduce
1)使用MyReducer作為Combiner
// 設(shè)置Map規(guī)約Combiner
? ? job.setCombinerClass(MyReducer.class);
執(zhí)行后看到map的輸出和combine的輸入統(tǒng)計(jì)是一致的,而combine的輸出與reduce的輸入統(tǒng)計(jì)是一樣的。
由此可以看出規(guī)約操作成功,而且執(zhí)行在map的最后,reduce之前。
2)自己定義Combiner
public static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {protected void reduce(Text key, java.lang.Iterable<LongWritable> values,org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {// 顯示次數(shù)表示規(guī)約函數(shù)被調(diào)用了多少次,表示k2有多少個(gè)分組System.out.println("Combiner輸入分組<" + key.toString() + ",N(N>=1)>");long count = 0L;for (LongWritable value : values) {count += value.get();// 顯示次數(shù)表示輸入的k2,v2的鍵值對(duì)數(shù)量System.out.println("Combiner輸入鍵值對(duì)<" + key.toString() + ",”+ value.get() + ">");}context.write(key, new LongWritable(count));// 顯示次數(shù)表示輸出的k2,v2的鍵值對(duì)數(shù)量System.out.println("Combiner輸出鍵值對(duì)<" + key.toString() + "," + count + ">");};}3)添加設(shè)置Combiner的代碼
// 設(shè)置Map規(guī)約Combiner
job.setCombinerClass(MyCombiner.class);
?
小結(jié): 在實(shí)際的Hadoop集群操作中,我們是由多臺(tái)主機(jī)一起進(jìn)行MapReduce的, 如果加入規(guī)約操作,每一臺(tái)主機(jī)會(huì)在reduce之前進(jìn)行一次對(duì)本機(jī)數(shù)據(jù)的規(guī)約, 然后在通過集群進(jìn)行reduce操作,這樣就會(huì)大大節(jié)省reduce的時(shí)間, 從而加快MapReduce的處理速度
?
?
總結(jié)
以上是生活随笔為你收集整理的Hadoop入门(九)Mapreduce高级shuffle之Combiner的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 路由器的分类有哪些路由器有哪些类型
- 下一篇: Hadoop入门(八)Mapreduce