eclipse中hadoop2.3.0环境部署及在eclipse中直接提交mapreduce任务
轉(zhuǎn)自:http://my.oschina.net/mkh/blog/340112
1?eclipse中hadoop環(huán)境部署概覽
?????eclipse中部署hadoop包括兩大部分:hdfs環(huán)境部署和mapreduce任務(wù)執(zhí)行環(huán)境部署。一般hdfs環(huán)境部署比較簡單,部署后就 可以在eclipse中像操作windows目錄一樣操作hdfs文件。而mapreduce任務(wù)執(zhí)行環(huán)境的部署就比較復(fù)雜一點,不同版本對環(huán)境的要求度 高低不同就導(dǎo)致部署的復(fù)雜度大相徑庭。例如hadoop1包括以前的版本部署就比較簡單,可在windows和Linux執(zhí)行部署運行,而hadoop2 及以上版本對環(huán)境要求就比較嚴(yán)格,一般只能在Linux中部署,如果需要在windows中部署需要使用cygwin等軟件模擬Linux環(huán)境,該篇介紹在Linux環(huán)境中部署hadoop環(huán)境。該篇假設(shè)hadoop2.3.0集群已經(jīng)部署完成,集群訪問權(quán)限為hadoop用戶。這種在eclipse上操作hdfs和提交mapreduce任務(wù)的方式為hadoop客戶端操作,故無須在該機器上配置hadoop集群文件,也無須在該機器上啟動hadoop相關(guān)進程。
2?部署環(huán)境機器相關(guān)配置
-
Centos6,32位
-
Hadoop2.3.0
-
Eclipse4.3.2_jee Linux版
-
JDK1.7 Linux版
3 eclipse中hdfs及mapreduce環(huán)境部署
?????3.1?Linux中eclipse安裝
??????????? 3.1.1 在Linux中選擇一個eclipse安裝目錄如/home目錄,將eclipse壓縮包eclipse-standard-kepler-SR2-linux-gtk.tar.gz在該目錄下解壓即可,解壓命令如下:
????????????????tar -zxvf eclipse-standard-kepler-SR2-linux-gtk.tar.gz
?????????? 3.1.2? 解壓后的eclipse目錄需要賦予hadoop用戶權(quán)限chown -R hadoop:hadoop /home/eclipse,解壓后eclipse目錄如下圖所示:
????????????
????????3.1.3 將自己打包或者下載的hadoop和eclipse直接的插件導(dǎo)入eclipse的 plugins目錄(復(fù)制進去即可),該篇使用直接下載的插件hadoop-eclipse-plugin-2.2.0.jar,然后啟動eclipse。
????3.2?eclipse環(huán)境部署
????????3.2.1??? 打開eclipse后切換到mapreduce界面會出現(xiàn)mapreduce插件圖標(biāo),一個是DFS顯示的位置,一個是mapreduce顯示的位置,具體如下圖所示:
????
????????3.2.2?在MapReduce?Locations出處點擊右鍵新建mapreduce配置環(huán)境,具體圖示如下:
????????3.2.3?進入mapreduce配置環(huán)境,具體如下圖所示。其中,Location?name可任意填寫,Mapreduce?Master中Host為resourcemanager機器ip,Port為resourcemanager接受任務(wù)的端口號,即yarn-site.xml文件中yarn.resourcemanager.scheduler.address配置項中端口號。DFS?Master中的Host為namenode機器ip,Port為core-site.xml文件中fs.defaultFS配置項中端口號。
????????3.2.4 上一步驟配置完成后,我們看到的界面如下圖所示。左側(cè)欄中即為hdfs目錄,在每個目錄上課點擊右鍵操作。
?4?eclipse中直接提交mapreduce任務(wù)(此處以wordcount為例,同時注意hadoop集群防火墻需對該機器開放相應(yīng)端口)
????如果我們將hadoop自帶的wordcount在eclipse中執(zhí)行是不可以的,調(diào)整后具體操作如下。
????4.1?首先新建Map/Reduce工程(無須手動導(dǎo)入hadoop jar包),或者新建java工程(需要手動導(dǎo)入hadoop相應(yīng)jar包)。
????????4.1.1 新建Map/Reduce工程(無須手動導(dǎo)入hadoop jar包),具體圖示如下圖所示:
?????????4.1.1.1 點擊next輸入hadoop工程名即可,具體如下圖所示:
?????????4.1.1.2 新建的hadoop工程如下圖所示:
?????????4.1.2 新建java工程(需要手動導(dǎo)入hadoop相應(yīng)jar包),具體如下圖所示:
????????????4.1.2.1 新建java工程完成后,下面添加hadoop相應(yīng)jar包,hadoop2.3.0相應(yīng)jar包在/hadoop-2.3.0/share/hadoop目錄中。
????????????4.1.2.2?進入Libraries,點擊Add?Library添加hadoop相應(yīng)jar包。
????????????4.1.2.3?新建hadoop相應(yīng)library成功后添加hadoop相應(yīng)jar包到該library下面即可。
????????????4.1.2.4?需要添加的hadoop相應(yīng)jar包有:
????????????????/hadoop-2.3.0/share/hadoop/common下所有jar包,及里面的lib目錄下所有jar包
????????????????/hadoop-2.3.0/share/hadoop/hdfs下所有jar包,不包括里面lib下的jar包
????????????????/hadoop-2.3.0/share/hadoop/mapreduce下所有jar包,不包括里面lib下的jar包
????????????????/hadoop-2.3.0/share/hadoop/yarn下所有jar包,不包括里面lib下的jar包
????????4.2?eclipse直接提交mapreduce任務(wù)所需環(huán)境配置代碼如下所示:
?| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | import?java.io.File; import?java.io.FileInputStream; import?java.io.FileOutputStream; import?java.io.IOException; import?java.net.URL; import?java.net.URLClassLoader; import?java.util.ArrayList; import?java.util.Iterator; import?java.util.List; import?java.util.jar.JarEntry; import?java.util.jar.JarOutputStream; import?java.util.jar.Manifest; public?class?EJob?{ ????//?To?declare?global?field ????private?static?List<URL>?classPath?=?new?ArrayList<URL>(); ????//?To?declare?method ????public?static?File?createTempJar(String?root)?throws?IOException?{ ????????if?(!new?File(root).exists())?{ ????????????return?null; ????????} ????????Manifest?manifest?=?new?Manifest(); ????????manifest.getMainAttributes().putValue("Manifest-Version",?"1.0"); ????????final?File?jarFile?=?File.createTempFile("EJob-",?".jar",?new?File(System.getProperty("java.io.tmpdir"))); ????????Runtime.getRuntime().addShutdownHook(new?Thread()?{ ????????????public?void?run()?{ ????????????????jarFile.delete(); ????????????} ????????}); ????????JarOutputStream?out?=?new?JarOutputStream(new?FileOutputStream(jarFile),?manifest); ????????createTempJarInner(out,?new?File(root),?""); ????????out.flush(); ????????out.close(); ????????return?jarFile; ????} ????private?static?void?createTempJarInner(JarOutputStream?out,?File?f, ????????????String?base)?throws?IOException?{ ????????if?(f.isDirectory())?{ ????????????File[]?fl?=?f.listFiles(); ????????????if?(base.length()?>?0)?{ ????????????????base?=?base?+?"/"; ????????????} ????????????for?(int?i?=?0;?i?<?fl.length;?i++)?{ ????????????????createTempJarInner(out,?fl[i],?base?+?fl[i].getName()); ????????????} ????????}?else?{ ????????????out.putNextEntry(new?JarEntry(base)); ????????????FileInputStream?in?=?new?FileInputStream(f); ????????????byte[]?buffer?=?new?byte[1024]; ????????????int?n?=?in.read(buffer); ????????????while?(n?!=?-1)?{ ????????????????out.write(buffer,?0,?n); ????????????????n?=?in.read(buffer); ????????????} ????????????in.close(); ????????} ????} ????public?static?ClassLoader?getClassLoader()?{ ????????ClassLoader?parent?=?Thread.currentThread().getContextClassLoader(); ????????if?(parent?==?null)?{ ????????????parent?=?EJob.class.getClassLoader(); ????????} ????????if?(parent?==?null)?{ ????????????parent?=?ClassLoader.getSystemClassLoader(); ????????} ????????return?new?URLClassLoader(classPath.toArray(new?URL[0]),?parent); ????} ????public?static?void?addClasspath(String?component)?{ ????????if?((component?!=?null)?&&?(component.length()?>?0))?{ ????????????try?{ ????????????????File?f?=?new?File(component); ????????????????if?(f.exists())?{ ????????????????????URL?key?=?f.getCanonicalFile().toURL(); ????????????????????if?(!classPath.contains(key))?{ ????????????????????????classPath.add(key); ????????????????????} ????????????????} ????????????}?catch?(IOException?e)?{ ????????????} ????????} ????} } |
????????4.3?修改后的wordcount代碼如下
?| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | import?java.io.File; import?java.io.IOException; import?java.text.SimpleDateFormat; import?java.util.Date; import?java.util.StringTokenizer; import?org.apache.hadoop.conf.Configuration; import?org.apache.hadoop.fs.Path; import?org.apache.hadoop.fs.permission.FsPermission; import?org.apache.hadoop.io.IntWritable; import?org.apache.hadoop.io.Text; import?org.apache.hadoop.mapred.JobConf; import?org.apache.hadoop.mapreduce.Job; import?org.apache.hadoop.mapreduce.Mapper; import?org.apache.hadoop.mapreduce.Reducer; import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import?org.apache.hadoop.util.GenericOptionsParser; public?class?WordCount?{ ????/* ?????*?用戶自定義map函數(shù),對以<key,?value>為輸入的結(jié)果文件進行處理 ?????*?Map過程需要繼承org.apache.hadoop.mapreduce包中Mapper類,并重寫其map方法。 ?????*?通過在map方法中添加兩句把key值和value值輸出到控制臺的代碼 ?????*?,可以發(fā)現(xiàn)map方法中value值存儲的是文本文件中的一行(以回車符為行結(jié)束標(biāo)記),而key值為該行的首字母相對于文本文件的首地址的偏移量。 ?????*?然后StringTokenizer類將每一行拆分成為一個個的單詞 ?????*?,并將<word,1>作為map方法的結(jié)果輸出,其余的工作都交有MapReduce框架處理。?每行數(shù)據(jù)調(diào)用一次?Tokenizer:單詞分詞器 ?????*/ ????public?static?class?TokenizerMapper?extends ????????????Mapper<Object,?Text,?Text,?IntWritable>?{ ????????private?final?static?IntWritable?one?=?new?IntWritable(1); ????????private?Text?word?=?new?Text(); ????????/* ?????????*?重寫Mapper類中的map方法 ?????????*/ ????????public?void?map(Object?key,?Text?value,?Context?context) ????????????????throws?IOException,?InterruptedException?{ ????????????StringTokenizer?itr?=?new?StringTokenizer(value.toString()); ????????????//System.out.println(value.toString()); ????????????while?(itr.hasMoreTokens())?{ ????????????????word.set(itr.nextToken());//?獲取下個字段的值并寫入文件 ????????????????context.write(word,?one); ????????????} ????????} ????} ????/* ?????*?用戶自定義reduce函數(shù),如果有多個熱度測,則每個reduce處理自己對應(yīng)的map結(jié)果數(shù)據(jù) ?????*?Reduce過程需要繼承org.apache.hadoop.mapreduce包中Reducer類,并重寫其reduce方法。 ?????*?Map過程輸出<key,values>中key為單個單詞,而values是對應(yīng)單詞的計數(shù)值所組成的列表,Map的輸出就是Reduce的輸入, ?????*?所以reduce方法只要遍歷values并求和,即可得到某個單詞的總次數(shù)。 ?????*/ ????public?static?class?IntSumReducer?extends ????????????Reducer<Text,?IntWritable,?Text,?IntWritable>?{ ????????private?IntWritable?result?=?new?IntWritable(); ????????public?void?reduce(Text?key,?Iterable<IntWritable>?values, ????????????????Context?context)?throws?IOException,?InterruptedException?{ ????????????int?sum?=?0; ????????????for?(IntWritable?val?:?values)?{ ????????????????sum?+=?val.get(); ????????????} ????????????result.set(sum); ????????????context.write(key,?result); ????????} ????} ????public?static?void?main(String[]?args)?throws?Exception?{ ????????/** ?????????*?環(huán)境變量配置 ?????????*/ ????????File?jarFile?=?EJob.createTempJar("bin"); ????????ClassLoader?classLoader?=?EJob.getClassLoader(); ????????Thread.currentThread().setContextClassLoader(classLoader); ????????/** ?????????*?連接hadoop集群配置 ?????????*/ ????????Configuration?conf?=?new?Configuration(true); ????????conf.set("fs.default.name",?"hdfs://192.168.1.111:9000"); ????????conf.set("hadoop.job.user",?"hadoop"); ????????conf.set("mapreduce.framework.name",?"yarn"); ????????conf.set("mapreduce.jobtracker.address",?"192.168.1.100:9001"); ????????conf.set("yarn.resourcemanager.hostname",?"192.168.1.100"); ????????conf.set("yarn.resourcemanager.admin.address",?"192.168.1.100:8033"); ????????conf.set("yarn.resourcemanager.address",?"192.168.1.100:8032"); ????????conf.set("yarn.resourcemanager.resource-tracker.address",?"192.168.1.100:8036"); ????????conf.set("yarn.resourcemanager.scheduler.address",?"192.168.1.100:8030"); ????????String[]?otherArgs?=?new?String[2]; ????????otherArgs[0]?=?"hdfs://192.168.1.111:9000/test_in";//計算原文件目錄,需提前在里面存入文件 ????????String?time?=?new?SimpleDateFormat("yyyyMMddHHmmss").format(new?Date()); ????????otherArgs[1]?=?"hdfs://192.168.1.111:9000/test_out/"?+?time;//計算后的計算結(jié)果存儲目錄,每次程序執(zhí)行的結(jié)果目錄不能相同,所以添加時間標(biāo)簽 ????????/* ?????????*?setJobName()方法命名這個Job。對Job進行合理的命名有助于更快地找到Job, ?????????*?以便在JobTracker和Tasktracker的頁面中對其進行監(jiān)視 ?????????*/ ????????Job?job?=?new?Job(conf,?"word?count"); ????????job.setJarByClass(WordCount.class); ????????((JobConf)?job.getConfiguration()).setJar(jarFile.toString());//環(huán)境變量調(diào)用,添加此句則可在eclipse中直接提交mapreduce任務(wù),如果將該java文件打成jar包,需要將該句注釋掉,否則在執(zhí)行時反而找不到環(huán)境變量 ????????//?job.setMaxMapAttempts(100);//設(shè)置最大試圖產(chǎn)生底map數(shù)量,該命令不一定會設(shè)置該任務(wù)運行過車中的map數(shù)量 ????????//?job.setNumReduceTasks(5);//設(shè)置reduce數(shù)量,即最后生成文件的數(shù)量 ????????/* ?????????*?Job處理的Map(拆分)、Combiner(中間結(jié)果合并)以及Reduce(合并)的相關(guān)處理類。 ?????????*?這里用Reduce類來進行Map產(chǎn)生的中間結(jié)果合并,避免給網(wǎng)絡(luò)數(shù)據(jù)傳輸產(chǎn)生壓力。 ?????????*/ ????????job.setMapperClass(TokenizerMapper.class);//?執(zhí)行用戶自定義map函數(shù) ????????job.setCombinerClass(IntSumReducer.class);//?對用戶自定義map函數(shù)的數(shù)據(jù)處理結(jié)果進行合并,可以減少帶寬消耗 ????????job.setReducerClass(IntSumReducer.class);//?執(zhí)行用戶自定義reduce函數(shù) ????????/* ?????????*?接著設(shè)置Job輸出結(jié)果<key,value>的中key和value數(shù)據(jù)類型,因為結(jié)果是<單詞,個數(shù)>, ?????????*?所以key設(shè)置為"Text"類型,相當(dāng)于Java中String類型 ?????????*?。Value設(shè)置為"IntWritable",相當(dāng)于Java中的int類型。 ?????????*/ ????????job.setOutputKeyClass(Text.class); ????????job.setOutputValueClass(IntWritable.class); ????????/* ?????????*?加載輸入文件夾或文件路徑,即輸入數(shù)據(jù)的路徑 ?????????*?將輸入的文件數(shù)據(jù)分割成一個個的split,并將這些split分拆成<key,value>對作為后面用戶自定義map函數(shù)的輸入 ?????????*?其中,每個split文件的大小盡量小于hdfs的文件塊大小 ?????????*?(默認64M),否則該split會從其它機器獲取超過hdfs塊大小的剩余部分?jǐn)?shù)據(jù),這樣就會產(chǎn)生網(wǎng)絡(luò)帶寬造成計算速度影響 ?????????*?默認使用TextInputFormat類型,即輸入數(shù)據(jù)形式為文本類型數(shù)據(jù)文件 ?????????*/ ????????System.out.println("Job?start!"); ????????FileInputFormat.addInputPath(job,?new?Path(otherArgs[0])); ????????/* ?????????*?設(shè)置輸出文件路徑?默認使用TextOutputFormat類型,即輸出數(shù)據(jù)形式為文本類型文件,字段間默認以制表符隔開 ?????????*/ ????????FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1])); ????????/* ?????????*?開始運行上面的設(shè)置和算法 ?????????*/ ????????if?(job.waitForCompletion(true))?{ ????????????System.out.println("ok!"); ????????}?else?{ ????????????System.out.println("error!"); ????????????System.exit(0); ????????} ????} } |
????????4.4?在eclipse中代碼區(qū)點擊右鍵,點擊里面的run?on?hadoop即可運行該程序。
總結(jié)
以上是生活随笔為你收集整理的eclipse中hadoop2.3.0环境部署及在eclipse中直接提交mapreduce任务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jvm的参数含义及设置
- 下一篇: hadoop2.x常用端口及定义方法