Hadoop Streaming高级编程
本文主要介紹了Hadoop Streaming的一些高級編程技巧,包括,怎樣在mapredue作業中定制輸出輸出格式?怎樣向mapreduce作業中傳遞參數?怎么在mapreduce作業中加載詞典?怎樣利用Hadoop Streamng處理二進制格式的數據等。
關于Hadoop Streaming的基本編程方法,可參考:Hadoop Streaming編程,Hadoop編程實例。
2. 在mapreduce作業中定制輸入輸出格式
Hadoop 0.21.0之前的版本中的Hadoop Streaming工具只支持文本格式的數據,而從Hadoop 0.21.0開始,也支持二進制格式的數據。這里介紹文本文件的輸入輸出格式定制,關于二進制數據的格式,可參考第5節。
Hadoop Streaming提交作業的格式為:
| 1 2 3 | Usage: $HADOOP_HOME/bin/hadoop jar \ $HADOOP_HOME/hadoop-streaming.jar [options] |
其中,-D選項中的一些配置屬性可定義輸入輸出格式,具體如下(注意,對于文本而言,每一行中存在一個key/value對,這里只能定制key和value之間的分割符,而行與行之間的分隔符不可定制,只能是\n):
(1)stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數據的分隔符,默認均為\t。
(2)stream.num.map.output.key.fields:指定map task輸出記錄中key所占的域數目,如
每一行形式為,Key1\tkey2\tkey3\tvalue,采用默認的分隔符,且stream.num.map.output.key.fields設為2,則Key1\tkey2表示key,key3\tvalue表示value。
(3)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數據的分隔符,默認均為\t。
(4)stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所占的域數目
3. 向mapreduce作業傳遞參數
提交作業時,使用-cmdenv選項以環境變量的形式將你的參數傳遞給mapper/reducer,如:
| 1 2 3 4 5 6 7 8 9 10 11 | $HADOOP_HOME/bin/hadoop jar \ contrib/streaming/hadoop-0.20.2-streaming.jar \ -input input \ -ouput output \ -cmdenv grade=1 \ ……. |
然后編寫mapper或reducer時,使用main函數的第三個參數捕獲你傳入的環境變量,如:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | int main(int argc, char *argv[], char *env[]){ int i, grade; for (i = 0; env[i] != NULL; i++) if(strncmp(env[i], “grade=”, 6) == 0) grade=atoi(env[i]+6); …… } |
4. 在mapreduce作業中加載詞典
提交作業時,使用-file選項,如:
| 1 2 3 4 5 6 7 8 9 10 11 | $HADOOP_HOME/bin/hadoop jar \ contrib/streaming/hadoop-0.20.2-streaming.jar \ -input input \ -ouput output \ -file dict.txt \ ……. |
然后編寫mapper或reducer時,像本地文件一樣打開并使用dic.txt文件,如:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | int main(int argc, char *argv[], char *env[]){ FILE *fp; char buffer[1024]; fp = fopen("dict.txt","r"); if (!fp) return 1; while (fgets(buffer, 1024, fp)!=NULL) { …… } …… } |
如果要加載非常大的詞典或配置文件,Hadoop Streaming還提供了另外一個選項-files,該選項后面跟的是HDFS上的一個文件(將你的配置文件放到HDFS上,再大也可以!!!),你可以在程序中像打開本地文件一樣打開該文件,此外,你也可以使用#符號在本地建一個系統鏈接,如:
| 1 2 3 4 5 6 7 | $HADOOP_HOME/bin/hadoop jar \ contrib/streaming/hadoop-0.20.2-streaming.jar \ -file? hdfs://host:fs_port/user/dict.txt#dict_link \ ……. |
在代碼中這樣做:
如:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | int main(int argc, char *argv[], char *env[]){ FILE *fp; char buffer[1024]; fp = fopen("dict_link ","r"); //or fp = fopen("dict.txt ","r"); if (!fp) return 1; while (fgets(buffer, 1024, fp)!=NULL) { …… } …… } |
5. 處理二進制格式的數據
從Hadoop 0.21.0開始,streaming支持二進制文件(具體可參考:HADOOP-1722),用戶提交作業時,使用-io選項指明二進制文件格式。0.21.0版本中增加了兩種二進制文件格式,分別為:
(1) rawbytes:key和value均用【4個字節的長度+原始字節】表示
(2) typedbytes:key和value均用【1字節類型+4字節長度+原始字節】表示
用戶提交作業時,如果用-io指定二進制格式為typedbytes,則map的輸入輸出,reduce的輸入輸出均為typedbytes,如果想細粒度的控制這幾個輸入輸出,可采用以下幾個選項:
| 1 2 3 4 5 6 7 | -D stream.map.input=[identifier] -D stream.map.output=[identifier] -D stream.reduce.input=[identifier] -D stream.reduce.output=[identifier] |
你如果采用的python語言,下面是從?HADOOP-1722?中得到的一個例子(里面用到了解析typedbytes的python庫,見:http://github.com/klbostee/typedbytes ):
mapper腳本如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | import sys import typedbytes input = typedbytes.PairedInput(sys.stdin) output = typedbytes.PairedOutput(sys.stdout) for (key, value) in input: for word in value.split(): output.write((word, 1)) |
reducer腳本:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | import sys import typedbytes from itertools import groupby from operator import itemgetter input = typedbytes.PairedInput(sys.stdin) output = typedbytes.PairedOutput(sys.stdout) for (key, group) in groupby(input, itemgetter(0)): values = map(itemgetter(1), group) output.write((key, sum(values))) |
6. 自定義counter并增加counter的值
用戶采用某種語言編寫的mapper或者reducer可采用標準錯誤輸出(stderr)自定義和改變counter值,格式為:reporter:counter:<group>,<counter>,<amount>,如,在C語言編寫的mapper/reducer中:
| 1 | fprintf(stderr, “reporter:counter:group,counter1,1”); //將組group中的counter1增加1 |
注:用戶定義的自定義counter的最終結果會在桌面或者web界面上顯示出來。
如果你想在mapreduce作業執行過程中,打印一些狀態信息,同樣可使用標準錯誤輸出,格式為:reporter:status:<message>,如,在C語言編寫的mapper/reducer中:
| 1 | fprintf(stderr, “reporter:status:mapreduce job is started…..”); //在shell桌面上打印“mapreduce job is started…..” |
7. 在mapreduce使用Linux Pipes
迄今為止(0.21.0版本之前,包括0.21.0),Hadoop Streaming是不支持Linux Pipes,如:-mapper “cut -f1 | sed s/foo/bar/g”會報”java.io.IOException: Broken pipe”錯誤。
8. 在mapreduce中獲取JobConf的屬性值
在0.21.0版本中,streaming作業執行過程中,JobConf中以mapreduce開頭的屬性(如mapreduce.job.id)會傳遞給mapper和reducer,關于這些參數,可參考:http://hadoop.apache.org/mapreduce/docs/r0.21.0/mapred_tutorial.html#Configured+Parameters
其中,屬性名字中的“.”會變成“_”,如mapreduce.job.id會變為mapreduce_job_id,用戶可在mapper/reducer中獲取這些屬性值直接使用(可能是傳遞給環境變量參數,即main函數的第三個參數,本文作業還未進行驗證)。
9. 一些Hadoop Streaming的開源軟件包
(1) 針對Hadoop Streaming常用操作的C++封裝包(如自定義和更新counter,輸出狀態信息等):https://github.com/dgleich/hadoopcxx
(2) C++實現的typedbytes代碼庫:https://github.com/dgleich/libtypedbytes
(3) python實現的typedbytes代碼庫:?http://github.com/klbostee/typedbytes
(4) Java實現的typedbytes代碼庫(Hadoop 0.21.0代碼中自帶)
10. 總結
Hadoop Streaming使得程序員采用各種語言編寫mapreduce程序變得可能,它具備程序員所需的大部分功能接口,同時由于這種方法編寫mapreduce作業簡單快速,越來越多的程序員開始嘗試使用Hadoop Steraming。
11. 參考資料
http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html
https://issues.apache.org/jira/browse/HADOOP-1722
原創文章,轉載請注明:?轉載自董的博客
本文鏈接地址:?http://dongxicheng.org/mapreduce/hadoop-streaming-advanced-programming/
總結
以上是生活随笔為你收集整理的Hadoop Streaming高级编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 朴素贝叶斯Naïve Bayes分类算法
- 下一篇: Hadoop pipes设计原理