MapReduce TopK统计加排序
生活随笔
收集整理的這篇文章主要介紹了
MapReduce TopK统计加排序
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
-
Hadoop技術(shù)內(nèi)幕中指出Top K算法有兩步,一是統(tǒng)計(jì)詞頻,二是找出詞頻最高的前K個(gè)詞。在網(wǎng)上找了很多MapReduce的Top K案例,這些案例都只有排序功能,所以自己寫(xiě)了個(gè)案例。
這個(gè)案例分兩個(gè)步驟,第一個(gè)是就是wordCount案例,二就是排序功能。
一,統(tǒng)計(jì)詞頻
view sourceprint? 01.1?package?TopK; 02.2?import?java.io.IOException; 03.3?import?java.util.StringTokenizer; 04.4 05.5?import?org.apache.hadoop.conf.Configuration; 06.6?import?org.apache.hadoop.fs.Path; 07.7?import?org.apache.hadoop.io.IntWritable; 08.8?import?org.apache.hadoop.io.Text; 09.9?import?org.apache.hadoop.mapreduce.Job; 10.10?import?org.apache.hadoop.mapreduce.Mapper; 11.11?import?org.apache.hadoop.mapreduce.Reducer; 12.12?import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13.13?import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14.14 15.15?/** 16.16? * 統(tǒng)計(jì)詞頻 17.17? * @author zx 18.18? * zhangxian1991@<a href="http://www.it165.net/qq/" target="_blank" class="keylink">qq</a>.com 19.19? */ 20.20?public?class?WordCount { 21.21???? 22.22?????/** 23.23????? * 讀取單詞 24.24????? * @author zx 25.25????? * 26.26????? */ 27.27?????public?static?class?Map?extends?Mapper<Object,Text,Text,IntWritable>{ 28.28 29.29?????????IntWritable count =?new?IntWritable(1); 30.30???????? 31.31?????????@Override 32.32?????????protected?void?map(Object key, Text value, Context context) 33.33?????????????????throws?IOException, InterruptedException { 34.34?????????????StringTokenizer st =?new?StringTokenizer(value.toString()); 35.35?????????????while(st.hasMoreTokens()){??? 36.36?????????????????String word = st.nextToken().replaceAll("\"",?"").replace("'",?"").replace(".",?""); 37.37?????????????????context.write(new?Text(word), count); 38.38?????????????} 39.39?????????} 40.40???????? 41.41?????} 42.42???? 43.43?????/** 44.44????? * 統(tǒng)計(jì)詞頻 45.45????? * @author zx 46.46????? * 47.47????? */ 48.48?????public?static?class?Reduce?extends?Reducer<Text,IntWritable,Text,IntWritable>{ 49.49 50.50?????????@SuppressWarnings("unused") 51.51?????????@Override 52.52?????????protected?void?reduce(Text key, Iterable<IntWritable> values,Context context) 53.53?????????????????throws?IOException, InterruptedException { 54.54?????????????int?count =?0; 55.55?????????????for?(IntWritable intWritable : values) { 56.56?????????????????count ++; 57.57?????????????} 58.58?????????????context.write(key,new?IntWritable(count)); 59.59?????????} 60.60???????? 61.61?????} 62.62???? 63.63?????@SuppressWarnings("deprecation") 64.64?????public?static?boolean?run(String in,String out)?throws?IOException, ClassNotFoundException, InterruptedException{ 65.65???????? 66.66?????????Configuration conf =?new?Configuration(); 67.67???????? 68.68?????????Job job =?new?Job(conf,"WordCount"); 69.69?????????job.setJarByClass(WordCount.class); 70.70?????????job.setMapperClass(Map.class); 71.71?????????job.setReducerClass(Reduce.class); 72.72???????? 73.73?????????// 設(shè)置Map輸出類(lèi)型 74.74?????????job.setMapOutputKeyClass(Text.class); 75.75?????????job.setMapOutputValueClass(IntWritable.class); 76.76 77.77?????????// 設(shè)置Reduce輸出類(lèi)型 78.78?????????job.setOutputKeyClass(Text.class); 79.79?????????job.setOutputValueClass(IntWritable.class); 80.80 81.81?????????// 設(shè)置輸入和輸出目錄 82.82?????????FileInputFormat.addInputPath(job,?new?Path(in)); 83.83?????????FileOutputFormat.setOutputPath(job,?new?Path(out)); 84.84???????? 85.85?????????return?job.waitForCompletion(true); 86.86?????} 87.87???? 88.88?}二,排序 并求出頻率最高的前K個(gè)詞
view sourceprint? 001.1?package?TopK; 002.2 003.3?import?java.io.IOException; 004.4?import?java.util.Comparator; 005.5?import?java.util.Map.Entry; 006.6?import?java.util.Set; 007.7?import?java.util.StringTokenizer; 008.8?import?java.util.TreeMap; 009.9?import?java.util.regex.Pattern; 010.10 011.11?import?org.apache.hadoop.conf.Configuration; 012.12?import?org.apache.hadoop.fs.Path; 013.13?import?org.apache.hadoop.io.IntWritable; 014.14?import?org.apache.hadoop.io.Text; 015.15?import?org.apache.hadoop.mapreduce.Job; 016.16?import?org.apache.hadoop.mapreduce.Mapper; 017.17?import?org.apache.hadoop.mapreduce.Reducer; 018.18?import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 019.19?import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 020.20?import?org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 021.21?import?org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 022.22 023.23?/** 024.24? * 以單詞出現(xiàn)的頻率排序 025.25? * 026.26? * @author zx 027.27? * zhangxian1991@<a href="http://www.it165.net/qq/" target="_blank" class="keylink">qq</a>.com 028.28? */ 029.29?public?class?Sort { 030.30 031.31?????/** 032.32????? * 讀取單詞(詞頻 <a href="http://www.it165.net/edu/ebg/" target="_blank" class="keylink">word</a>) 033.33????? * 034.34????? * @author zx 035.35????? * 036.36????? */ 037.37?????public?static?class?Map?extends?Mapper<Object, Text, IntWritable, Text> { 038.38 039.39?????????// 輸出key 詞頻 040.40?????????IntWritable outKey =?new?IntWritable(); 041.41?????????Text outValue =?new?Text(); 042.42 043.43?????????@Override 044.44?????????protected?void?map(Object key, Text value, Context context) 045.45?????????????????throws?IOException, InterruptedException { 046.46 047.47?????????????StringTokenizer st =?new?StringTokenizer(value.toString()); 048.48?????????????while?(st.hasMoreTokens()) { 049.49?????????????????String element = st.nextToken(); 050.50?????????????????if?(Pattern.matches("\\d+", element)) { 051.51?????????????????????outKey.set(Integer.parseInt(element)); 052.52?????????????????}?else?{ 053.53?????????????????????outValue.set(element); 054.54?????????????????} 055.55?????????????} 056.56 057.57?????????????context.write(outKey, outValue); 058.58?????????} 059.59 060.60?????} 061.61 062.62?????/** 063.63????? * 根據(jù)詞頻排序 064.64????? * 065.65????? * @author zx 066.66????? * 067.67????? */ 068.68?????public?static?class?Reduce?extends 069.69?????????????Reducer<IntWritable, Text, Text, IntWritable> { 070.70???????? 071.71?????????private?static?MultipleOutputs<Text, IntWritable> mos =?null; 072.72???????? 073.73?????????//要獲得前K個(gè)頻率最高的詞 074.74?????????private?static?final?int?k =?10; 075.75???????? 076.76?????????//用TreeMap存儲(chǔ)可以利用它的排序功能 077.77?????????//這里用 MyInt 因?yàn)門(mén)reeMap是對(duì)key排序,且不能唯一,而詞頻可能相同,要以詞頻為Key就必需對(duì)它封裝 078.78?????????private?static?TreeMap<MyInt, String> tm =?new?TreeMap<MyInt, String>(new?Comparator<MyInt>(){ 079.79?????????????/** 080.80????????????? * 默認(rèn)是從小到大的順序排的,現(xiàn)在修改為從大到小 081.81????????????? * @param o1 082.82????????????? * @param o2 083.83????????????? * @return 084.84????????????? */ 085.85?????????????@Override 086.86?????????????public?int?compare(MyInt o1, MyInt o2) { 087.87?????????????????return?o2.compareTo(o1); 088.88?????????????} 089.89???????????? 090.90?????????}) ; 091.91???????? 092.92?????????/* 093.93????????? * 以詞頻為Key是要用到reduce的排序功能 094.94????????? */ 095.95?????????@Override 096.96?????????protected?void?reduce(IntWritable key, Iterable<Text> values, 097.97?????????????????Context context)?throws?IOException, InterruptedException { 098.98?????????????for?(Text text : values) { 099.99?????????????????context.write(text, key); 100.100?????????????????tm.put(new?MyInt(key.get()),text.toString()); 101.101???????????????? 102.102?????????????????//TreeMap以對(duì)內(nèi)部數(shù)據(jù)進(jìn)行了排序,最后一個(gè)必定是最小的 103.103?????????????????if(tm.size() > k){ 104.104?????????????????????tm.remove(tm.lastKey()); 105.105?????????????????} 106.106???????????????? 107.107?????????????} 108.108?????????} 109.109 110.110?????????@Override 111.111?????????protected?void?cleanup(Context context) 112.112?????????????????throws?IOException, InterruptedException { 113.113?????????????String path = context.getConfiguration().get("topKout"); 114.114?????????????mos =?new?MultipleOutputs<Text, IntWritable>(context); 115.115?????????????Set<Entry<MyInt, String>> set = tm.entrySet(); 116.116?????????????for?(Entry<MyInt, String> entry : set) { 117.117?????????????????mos.write("topKMOS",?new?Text(entry.getValue()),?new?IntWritable(entry.getKey().getValue()), path); 118.118?????????????} 119.119?????????????mos.close(); 120.120?????????} 121.121 122.122???????? 123.123???????? 124.124?????} 125.125 126.126?????@SuppressWarnings("deprecation") 127.127?????public?static?void?run(String in, String out,String topKout)?throws?IOException, 128.128?????????????ClassNotFoundException, InterruptedException { 129.129 130.130?????????Path outPath =?new?Path(out); 131.131 132.132?????????Configuration conf =?new?Configuration(); 133.133???????? 134.134?????????//前K個(gè)詞要輸出到哪個(gè)目錄 135.135?????????conf.set("topKout",topKout); 136.136???????? 137.137?????????Job job =?new?Job(conf,?"Sort"); 138.138?????????job.setJarByClass(Sort.class); 139.139?????????job.setMapperClass(Map.class); 140.140?????????job.setReducerClass(Reduce.class); 141.141 142.142?????????// 設(shè)置Map輸出類(lèi)型 143.143?????????job.setMapOutputKeyClass(IntWritable.class); 144.144?????????job.setMapOutputValueClass(Text.class); 145.145 146.146?????????// 設(shè)置Reduce輸出類(lèi)型 147.147?????????job.setOutputKeyClass(Text.class); 148.148?????????job.setOutputValueClass(IntWritable.class); 149.149 150.150?????????//設(shè)置MultipleOutputs的輸出格式 151.151?????????//這里利用MultipleOutputs進(jìn)行對(duì)文件輸出 152.152?????????MultipleOutputs.addNamedOutput(job,"topKMOS",TextOutputFormat.class,Text.class,Text.class); 153.153???????? 154.154?????????// 設(shè)置輸入和輸出目錄 155.155?????????FileInputFormat.addInputPath(job,?new?Path(in)); 156.156?????????FileOutputFormat.setOutputPath(job, outPath); 157.157?????????job.waitForCompletion(true); 158.158 159.159?????} 160.160 161.161?}自己封裝的Int
view sourceprint? 01.1?package?TopK; 02.2 03.3?public?class?MyInt?implements?Comparable<MyInt>{ 04.4?????private?Integer value; 05.5 06.6?????public?MyInt(Integer value){ 07.7?????????this.value = value; 08.8?????} 09.9???? 10.10?????public?int?getValue() { 11.11?????????return?value; 12.12?????} 13.13 14.14?????public?void?setValue(int?value) { 15.15?????????this.value = value; 16.16?????} 17.17 18.18?????@Override 19.19?????public?int?compareTo(MyInt o) { 20.20?????????return?value.compareTo(o.getValue()); 21.21?????} 22.22???? 23.23???? 24.24?}運(yùn)行入口
view sourceprint? 01.1?package?TopK; 02.2 03.3?import?java.io.IOException; 04.4 05.5?/** 06.6? * 07.7? * @author zx 08.8? *zhangxian1991@qq.com 09.9? */ 10.10?public?class?TopK { 11.11?????public?static?void?main(String args[])?throws?ClassNotFoundException, IOException, InterruptedException{ 12.12???????? 13.13?????????//要統(tǒng)計(jì)字?jǐn)?shù),排序的文字 14.14?????????String in =?"hdfs://localhost:9000/input/MaDing.text"; 15.15???????? 16.16?????????//統(tǒng)計(jì)字?jǐn)?shù)后的結(jié)果 17.17?????????String <a href="http://www.it165.net/edu/ebg/"?target="_blank"?class="keylink">word</a>Cout ="hdfs://localhost:9000/out/wordCout"; 18.18???????? 19.19?????????//對(duì)統(tǒng)計(jì)完后的結(jié)果再排序后的內(nèi)容 20.20?????????String sort =?"hdfs://localhost:9000/out/sort"; 21.21???????? 22.22?????????//前K條 23.23?????????String topK =?"hdfs://localhost:9000/out/topK"; 24.24???????? 25.25?????????//如果統(tǒng)計(jì)字?jǐn)?shù)的job完成后就開(kāi)始排序 26.26?????????if(WordCount.run(in, wordCout)){ 27.27?????????????Sort.run(wordCout, sort,topK); 28.28?????????} 29.29???????? 30.30?????} 31.31?}
總結(jié)
以上是生活随笔為你收集整理的MapReduce TopK统计加排序的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: MapReduce的自制Writable
- 下一篇: MapReduce DataJoin 链