Hadoop基本流程与应用开发
?http://www.infoq.com/cn/articles/hadoop-process-develop
http://www.infoq.com/cn/articles/hadoop-intro;jsessionid=1B566AFBB2B51A0D70380914DFFC8333
http://www.infoq.com/cn/articles/hadoop-config-tip;jsessionid=1B566AFBB2B51A0D70380914DFFC8333
── 分布式計算開源框架Hadoop入門實踐(三)
Hadoop基本流程http://www.infoq.com/resource/articles/hadoop-process-develop/zh/resources/image1.gif;jsessionid=1B566AFBB2B51A0D70380914DFFC8333
http://www.infoq.com/resource/articles/hadoop-process-develop/zh/resources/image2.gif;jsessionid=1B566AFBB2B51A0D70380914DFFC8333
一個圖片太大了,只好分割成為兩部分。根據流程圖來說一下具體一個任務執行的情況。
業務場景和代碼范例
業務場景描述:可設定輸入和輸出路徑(操作系統的路徑非HDFS路徑),根據訪問日志分析某一個應用訪問某一個API的總次數和總流量,統計后分別輸出到兩個文件中。這里僅僅為了測試,沒有去細分很多類,將所有的類都歸并于一個類便于說明問題。
http://www.infoq.com/resource/articles/hadoop-process-develop/zh/resources/image3.gif;jsessionid=1B566AFBB2B51A0D70380914DFFC8333
測試代碼類圖
LogAnalysiser就是主類,主要負責創建、提交任務,并且輸出部分信息。內部的幾個子類用途可以參看流程中提到的角色職責。具體地看看幾個類和方法的代碼片斷:
LogAnalysiser::MapClass
??? public static class MapClass extends MapReduceBaseimplements Mapper<LongWritable, Text, Text, LongWritable> {public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException{??? String line = value.toString();//沒有配置RecordReader,所以默認采用line的實現,key就是行號,value就是行內容if (line == null || line.equals(""))return;String[] words = line.split(",");if (words == null || words.length < 8)return;String appid = words[1];String apiName = words[2];LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));Text record = new Text();record.set(new StringBuffer("flow::").append(appid).append("::").append(apiName).toString());reporter.progress();output.collect(record, recbytes);//輸出流量的統計結果,通過flow::作為前綴來標示。record.clear();record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());output.collect(record, new LongWritable(1));//輸出次數的統計結果,通過count::作為前綴來標示}??? }LogAnalysiser:: PartitionerClass
??? public static class PartitionerClass implements Partitioner<Text, LongWritable>{public int getPartition(Text key, LongWritable value, int numPartitions){if (numPartitions >= 2)//Reduce 個數,判斷流量還是次數的統計分配到不同的Reduceif (key.toString().startsWith("flow::"))return 0;elsereturn 1;elsereturn 0;}public void configure(JobConf job){}??? }LogAnalysiser:: CombinerClass
參看ReduceClass,通常兩者可以使用一個,不過這里有些不同的處理就分成了兩個。在ReduceClass中藍色的行表示在CombinerClass中不存在。
LogAnalysiser:: ReduceClass
??? public static class ReduceClass extends MapReduceBaseimplements Reducer<Text, LongWritable,Text, LongWritable> {public void reduce(Text key, Iterator<LongWritable> values,OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException{Text newkey = new Text();newkey.set(key.toString().substring(key.toString().indexOf("::")+2));LongWritable result = new LongWritable();long tmp = 0;int counter = 0;while(values.hasNext())//累加同一個key的統計結果{tmp = tmp + values.next().get();counter = counter +1;//擔心處理太久,JobTracker長時間沒有收到報告會認為TaskTracker已經失效,因此定時報告一下if (counter == 1000){counter = 0;reporter.progress();}}result.set(tmp);output.collect(newkey, result);//輸出最后的匯總結果}??? }LogAnalysiser
public static void main(String[] args){try{run(args);} catch (Exception e){e.printStackTrace();}}public static void run(String[] args) throws Exception{if (args == null || args.length <2){System.out.println("need inputpath and outputpath");return;}String inputpath = args[0];String outputpath = args[1];String shortin = args[0];String shortout = args[1];if (shortin.indexOf(File.separator) >= 0)shortin = shortin.substring(shortin.lastIndexOf(File.separator));if (shortout.indexOf(File.separator) >= 0)shortout = shortout.substring(shortout.lastIndexOf(File.separator));SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");shortout = new StringBuffer(shortout).append("-").append(formater.format(new Date())).toString();if (!shortin.startsWith("/"))shortin = "/" + shortin;if (!shortout.startsWith("/"))shortout = "/" + shortout;shortin = "/user/root" + shortin;shortout = "/user/root" + shortout; File inputdir = new File(inputpath);File outputdir = new File(outputpath);if (!inputdir.exists() || !inputdir.isDirectory()){System.out.println("inputpath not exist or isn't dir!");return;}if (!outputdir.exists()){new File(outputpath).mkdirs();}JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//構建ConfigFileSystem fileSys = FileSystem.get(conf);fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//將本地文件系統的文件拷貝到HDFS中conf.setJobName("analysisjob");conf.setOutputKeyClass(Text.class);//輸出的key類型,在OutputFormat會檢查conf.setOutputValueClass(LongWritable.class); //輸出的value類型,在OutputFormat會檢查conf.setMapperClass(MapClass.class);conf.setCombinerClass(CombinerClass.class);conf.setReducerClass(ReduceClass.class);conf.setPartitionerClass(PartitionerClass.class);conf.set("mapred.reduce.tasks", "2");//強制需要有兩個Reduce來分別處理流量和次數的統計FileInputFormat.setInputPaths(conf, shortin);//hdfs中的輸入路徑FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中輸出路徑Date startTime = new Date();System.out.println("Job started: " + startTime);JobClient.runJob(conf);Date end_time = new Date();System.out.println("Job ended: " + end_time);System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");//刪除輸入和輸出的臨時文件fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));fileSys.delete(new Path(shortin),true);fileSys.delete(new Path(shortout),true);}以上的代碼就完成了所有的邏輯性代碼,然后還需要一個注冊驅動類來注冊業務Class為一個可標示的命令,讓hadoop jar可以執行。
public class ExampleDriver {public static void main(String argv[]){ProgramDriver pgd = new ProgramDriver();try {pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");pgd.driver(argv);}catch(Throwable e){e.printStackTrace();}} }將代碼打成jar,并且設置jar的mainClass為ExampleDriver這個類。在分布式環境啟動以后執行如下語句:
hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out在/home/wenchu/test-in中是需要分析的日志文件,執行后就會看見整個執行過程,包括了Map和Reduce的進度。執行完畢會在/home/wenchu/test-out下看到輸出的內容。有兩個文件:part-00000和part-00001分別記錄了統計后的結果。 如果需要看執行的具體情況,可以看在輸出目錄下的_logs/history/xxxx_analysisjob,里面羅列了所有的Map,Reduce的創建情況以及執行情況。在運行期也可以通過瀏覽器來查看Map,Reduce的情況:http://MasterIP:50030/jobtracker.jsp
Hadoop集群測試
首先這里使用上面的范例作為測試,也沒有做太多的優化配置,這個測試結果只是為了看看集群的效果,以及一些參數配置的影響。
文件復制數為1,blocksize 5M
| Slave數 | 處理記錄數(萬條) | 執行時間(秒) |
| 2 | 95 | 38 |
| 2 | 950 | 337 |
| 4 | 95 | 24 |
| 4 | 950 | 178 |
| 6 | 95 | 21 |
| 6 | 950 | 114 |
Blocksize 5M
| Slave數 | 處理記錄數(萬條) | 執行時間(秒) |
| 2(文件復制數為1) | 950 | 337 |
| 2(文件復制數為3) | 950 | 339 |
| 6(文件復制數為1) | 950 | 114 |
| 6(文件復制數為3) | 950 | 117 |
文件復制數為1
| Slave數 | 處理記錄數(萬條) | 執行時間(秒) |
| 6(blocksize 5M) | 95 | 21 |
| 6(blocksize 77M) | 95 | 26 |
| 4(blocksize 5M) | 950 | 178 |
| 4(blocksize 50M) | 950 | 54 |
| 6(blocksize 5M) | 950 | 114 |
| 6(blocksize 50M) | 950 | 44 |
| 6(blocksize 77M) | 950 | 74 |
測試的數據結果很穩定,基本測幾次同樣條件下都是一樣。通過測試結果可以看出以下幾點:
隨想
“云計算”熱的燙手,就和SAAS、Web2及SNS等一樣,往往都是在搞概念,只有真正踏踏實實的大型互聯網公司,才會投入人力物力去研究符合自己的分布式計算。其實當你的數據量沒有那么大的時候,這種分布式計算也就僅僅只是一個玩具而已,只有在真正解決問題的過程中,它深層次的問題才會被挖掘出來。
這三篇文章(分布式計算開源框架Hadoop介紹,Hadoop中的集群配置和使用技巧)僅僅是為了給對分布式計算有興趣的朋友拋個磚,要想真的掘到金子,那么就踏踏實實的去用、去想、去分析?;蛘咦约阂矔M一步地去研究框架中的實現機制,在解決自己問題的同時,也能夠貢獻一些什么。
前幾日看到有人跪求成為架構師的方式,看了有些可悲,有些可笑,其實有多少架構師知道什么叫做架構?架構師的職責是什么?與其追求這么一個名號,還不如踏踏實實地做塊石頭沉到水底。要知道,積累和沉淀的過程就是一種成長。
相關閱讀:
?
作者介紹:岑文初,就職于阿里軟件公司研發中心平臺一部,任架構師。當前主要工作涉及阿里軟件開發平臺服務框架(ASF)設計與實現,服務集成平臺(SIP)設計與實現。沒有什么擅長或者精通,工作到現在唯一提升的就是學習能力和速度。個人Blog為:http://blog.csdn.net/cenwenchu79。
志愿參與InfoQ中文站內容建設,請郵件至editors@cn.infoq.com。也歡迎大家到InfoQ中文站用戶討論組參與我們的線上討論。
7 條回復
關注此討論 回復
?
圖片是否可以做個鏈接? 發表人 zane dennis 發表于 13/08/2008 07:13 <!-- ww:date name="%{top.creationDate}" format="%{#request['postDateTimeFormat']}" /> -->?
受益了 發表人 lee henry 發表于 14/08/2008 10:45 <!-- ww:date name="%{top.creationDate}" format="%{#request['postDateTimeFormat']}" /> -->?
隨想不錯 發表人 feng jarod 發表于 14/08/2008 05:34 <!-- ww:date name="%{top.creationDate}" format="%{#request['postDateTimeFormat']}" /> -->?
如果要是引用了第三方的jar,要如何存放才能被引用到? 發表人 wang haibin 發表于 25/08/2008 01:54 <!-- ww:date name="%{top.creationDate}" format="%{#request['postDateTimeFormat']}" /> -->?
Re: 如果要是引用了第三方的jar,要如何存放才能被引用到? 發表人 馬 士華 發表于 30/08/2008 10:25 <!-- ww:date name="%{top.creationDate}" format="%{#request['postDateTimeFormat']}" /> -->?
Re: 如果要是引用了第三方的jar,要如何存放才能被引用到? 發表人 馬 士華 發表于 02/09/2008 04:49 <!-- ww:date name="%{top.creationDate}" format="%{#request['postDateTimeFormat']}" /> -->?
有點難的說 發表人 zheng spell 發表于 16/06/2011 10:58 <!-- ww:date name="%{top.creationDate}" format="%{#request['postDateTimeFormat']}" /> -->?
總結
以上是生活随笔為你收集整理的Hadoop基本流程与应用开发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 编写第一个HADOOP应用程序
- 下一篇: HTML 5 教程