生活随笔
收集整理的這篇文章主要介紹了
WordCount 实例
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
?http://www.iteye.com/topic/606962
http://www.iteye.com/topic/1117343
看了到Hadoop的代碼,還是不知道他的執行流程,怎么辦呢。我想到了日志,在hadoop的目錄下,有log4j,那就用戶Log4j來記錄Hadoop的執行過程吧.
Java代碼 ?
????import?java.io.IOException; ??import?java.util.StringTokenizer; ????import?org.apache.hadoop.conf.Configuration; ??import?org.apache.hadoop.fs.Path; ??import?org.apache.hadoop.io.IntWritable; ??import?org.apache.hadoop.io.Text; ??import?org.apache.hadoop.mapreduce.Job; ??import?org.apache.hadoop.mapreduce.Mapper; ??import?org.apache.hadoop.mapreduce.Reducer; ??import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat; ??import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; ??import?org.apache.hadoop.util.GenericOptionsParser; ??import?org.apache.log4j.Logger; ????public?class?WordCount?{ ??????public?static?Logger?loger?=?Wloger.loger; ??????????????? ???????????public?static?class?TokenizerMapper ?????????extends?Mapper<Object,?Text,?Text,?IntWritable>{ ????????private?final?static?IntWritable?one?=?new?IntWritable(1); ??????private?Text?word?=?new?Text(); ??????public?void?map(Object?key,?Text?value,?Context?context ??????????????????????)?throws?IOException,?InterruptedException?{ ????????loger.info("Map?<key>"+key+"</key>"); ????????loger.info("Map?<value>"+value+"</key>"); ????????StringTokenizer?itr?=?new?StringTokenizer(value.toString()); ????????while?(itr.hasMoreTokens())?{ ??????????String?wordstr?=?itr.nextToken(); ??????????word.set(wordstr); ??????????loger.info("Map?<word>"+wordstr+"</word>"); ??????????context.write(word,?one); ????????} ??????} ????} ??????????????????????public?static?class?IntSumReducer ?????????extends?Reducer<Text,IntWritable,Text,IntWritable>?{ ??????private?IntWritable?result?=?new?IntWritable(); ????????public?void?reduce(Text?key,?Iterable<IntWritable>?values, ?????????????????????????Context?context ?????????????????????????)?throws?IOException,?InterruptedException?{ ????????loger.info("Reduce?<key>"+key+"</key>"); ????????loger.info("Reduce?<value>"+values+"</key>"); ????????int?sum?=?0; ????????for?(IntWritable?val?:?values)?{ ??????????sum?+=?val.get(); ????????} ????????result.set(sum); ????????loger.info("Reduce?<sum>"+sum+"</sum>"); ????????context.write(key,?result); ??????} ????} ??????public?static?void?main(String[]?args)?throws?Exception?{ ??????Configuration?conf?=?new?Configuration(); ??????String[]?otherArgs?=?new?GenericOptionsParser(conf,?args).getRemainingArgs(); ??????????????if?(otherArgs.length?!=?2)?{ ????????System.err.println("Usage:?wordcount?<in>?<out>"); ????????System.exit(2); ??????} ??????Job?job?=?new?Job(conf,?"word?count"); ??????job.setJarByClass(WordCount.class);??????job.setMapperClass(TokenizerMapper.class);??????job.setCombinerClass(IntSumReducer.class);??????job.setReducerClass(IntSumReducer.class);??????job.setOutputKeyClass(Text.class);??????job.setOutputValueClass(IntWritable.class);??????FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));??????FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));??????System.exit(job.waitForCompletion(true)???0?:?1);????} ??}??
import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.Logger;public class WordCount {public static Logger loger = Wloger.loger;/*** TokenizerMapper 繼續自 Mapper<Object, Text, Text, IntWritable>** [一個文件就一個map,兩個文件就會有兩個map]* map[這里讀入輸入文件內容 以" \t\n\r\f" 進行分割,然后設置 word ==> one 的key/value對]** @param Object Input key Type:* @param Text Input value Type:* @param Text Output key Type:* @param IntWritable Output value Type:** Writable的主要特點是它使得Hadoop框架知道對一個Writable類型的對象怎樣進行serialize以及deserialize.* WritableComparable在Writable的基礎上增加了compareT接口,使得Hadoop框架知道怎樣對WritableComparable類型的對象進行排序。** @author yangchunlong.tw**/public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {loger.info("Map <key>"+key+"</key>");loger.info("Map <value>"+value+"</key>");StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {String wordstr = itr.nextToken();word.set(wordstr);loger.info("Map <word>"+wordstr+"</word>");context.write(word, one);}}}/*** IntSumReducer 繼承自 Reducer<Text,IntWritable,Text,IntWritable>** [不管幾個Map,都只有一個Reduce,這是一個匯總]* reduce[循環所有的map值,把word ==> one 的key/value對進行匯總]** 這里的key為Mapper設置的word[每一個key/value都會有一次reduce]** 當循環結束后,最后的確context就是最后的結果.** @author yangchunlong.tw**/public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {loger.info("Reduce <key>"+key+"</key>");loger.info("Reduce <value>"+values+"</key>");int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);loger.info("Reduce <sum>"+sum+"</sum>");context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();/*** 這里必須有輸入/輸出*/if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);//主類job.setMapperClass(TokenizerMapper.class);//mapperjob.setCombinerClass(IntSumReducer.class);//作業合成類job.setReducerClass(IntSumReducer.class);//reducerjob.setOutputKeyClass(Text.class);//設置作業輸出數據的關鍵類job.setOutputValueClass(IntWritable.class);//設置作業輸出值類FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//文件輸入FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//文件輸出System.exit(job.waitForCompletion(true) ? 0 : 1);//等待完成退出.}
}
這里輸出了每一次Map,每一次Reduce.結果如下:
Java代碼 ?
f1?==>Map?Result ??Map?<key>0</key> ??Map?<value>ycl?ycl?is?ycl?good</key> ??Map?<word>ycl</word> ??Map?<word>ycl</word> ??Map?<word>is</word> ??Map?<word>ycl</word> ??Map?<word>good</word> ????f1?==>Reduce?Result ??Reduce?<key>good</key> ??Reduce?<value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1dfc547</key> ??Reduce?<sum>1</sum> ??Reduce?<key>is</key> ??Reduce?<value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1dfc547</key> ??Reduce?<sum>1</sum> ??Reduce?<key>ycl</key> ??Reduce?<value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1dfc547</key> ??Reduce?<sum>3</sum> ????f2?==>Map?Result ??Map?<key>0</key> ??Map?<value>hello?ycl?hello?lg</key> ??Map?<word>hello</word> ??Map?<word>ycl</word> ??Map?<word>hello</word> ??Map?<word>lg</word> ????f2?==>Reduce?Result ??Reduce?<key>hello</key> ??Reduce?<value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@10f6d3</key> ??Reduce?<sum>2</sum> ??Reduce?<key>lg</key> ??Reduce?<value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@10f6d3</key> ??Reduce?<sum>1</sum> ??Reduce?<key>ycl</key> ??Reduce?<value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@10f6d3</key> ??Reduce?<sum>1</sum> ????f1,f2?==>?Reduce?Result ??Reduce?<key>good</key> ??Reduce?<value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1989f84</key> ??Reduce?<sum>1</sum> ??Reduce?<key>hello</key> ??Reduce?<value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1989f84</key> ??Reduce?<sum>2</sum> ??Reduce?<key>is</key> ??Reduce?<value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1989f84</key> ??Reduce?<sum>1</sum> ??Reduce?<key>lg</key> ??Reduce?<value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1989f84</key> ??Reduce?<sum>1</sum> ??Reduce?<key>ycl</key> ??Reduce?<value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1989f84</key> ??Reduce?<sum>4</sum>??
f1 ==>Map Result
Map <key>0</key>
Map <value>ycl ycl is ycl good</key>
Map <word>ycl</word>
Map <word>ycl</word>
Map <word>is</word>
Map <word>ycl</word>
Map <word>good</word>f1 ==>Reduce Result
Reduce <key>good</key>
Reduce <value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1dfc547</key>
Reduce <sum>1</sum>
Reduce <key>is</key>
Reduce <value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1dfc547</key>
Reduce <sum>1</sum>
Reduce <key>ycl</key>
Reduce <value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1dfc547</key>
Reduce <sum>3</sum>f2 ==>Map Result
Map <key>0</key>
Map <value>hello ycl hello lg</key>
Map <word>hello</word>
Map <word>ycl</word>
Map <word>hello</word>
Map <word>lg</word>f2 ==>Reduce Result
Reduce <key>hello</key>
Reduce <value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@10f6d3</key>
Reduce <sum>2</sum>
Reduce <key>lg</key>
Reduce <value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@10f6d3</key>
Reduce <sum>1</sum>
Reduce <key>ycl</key>
Reduce <value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@10f6d3</key>
Reduce <sum>1</sum>f1,f2 ==> Reduce Result
Reduce <key>good</key>
Reduce <value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1989f84</key>
Reduce <sum>1</sum>
Reduce <key>hello</key>
Reduce <value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1989f84</key>
Reduce <sum>2</sum>
Reduce <key>is</key>
Reduce <value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1989f84</key>
Reduce <sum>1</sum>
Reduce <key>lg</key>
Reduce <value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1989f84</key>
Reduce <sum>1</sum>
Reduce <key>ycl</key>
Reduce <value>org.apache.hadoop.mapreduce.ReduceContext$ValueIterable@1989f84</key>
Reduce <sum>4</sum>
正常人應該能分析出map/reduce的執行機制,比如有兩個輸入文件,map/reduce是一個文件一個文件進行處理的,每map一個輸入文件就會reduce一次,最后再進行總的reduce.
總結
以上是生活随笔為你收集整理的WordCount 实例的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。