Hadoop Streaming编程实例
(1)對于一種編寫語言,應該怎么編寫Mapper和Reduce,需遵循什么樣的編程規范
(2) 如何在Hadoop Streaming中自定義Hadoop Counter
(3) 如何在Hadoop Streaming中自定義狀態信息,進而給用戶反饋當前作業執行進度
(4) 如何在Hadoop Streaming中打印調試日志,在哪里可以看到這些日志
(5)如何使用Hadoop Streaming處理二進制文件,而不僅僅是文本文件
我已經在多篇文章中介紹了Hadoop Streaming,如果你對它還不了解,可以閱讀:“Hadoop Streaming 編程”,“Hadoop Streaming高級編程”等文章。
本文重點解決前四個問題,給出了C++和Shell編寫的Wordcount實例,供大家參考。
1. C++版WordCount
(1)Mapper實現(mapper.cpp)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | #include <iostream> #include <string> using namespace std; int main() { ??string key; ??while(cin >> key) { ????cout << key << "\t" << "1" << endl; ????// Define counter named counter_no in group counter_group ????cerr << "reporter:counter:counter_group,counter_no,1\n"; ????// dispaly status ????cerr << "reporter:status:processing......\n"; ????// Print logs for testing ????cerr << "This is log, will be printed in stdout file\n"; ??} ??return 0; } |
(2)Reducer實現(reducer.cpp)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | #include <iostream> #include <string> using namespace std; int main() { //reducer將會被封裝成一個獨立進程,因而需要有main函數 ??string cur_key, last_key, value; ??cin >> cur_key >> value; ??last_key = cur_key; ??int n = 1; ??while(cin >> cur_key) { //讀取map task輸出結果 ????cin >> value; ????if(last_key != cur_key) { //識別下一個key ??????cout << last_key << "\t" << n << endl; ??????last_key = cur_key; ??????n = 1; ????} else { //獲取key相同的所有value數目 ??????n++; //key值相同的,累計value值 ????} ??} ??cout << last_key << "\t" << n << endl; ??return 0; } |
(3)編譯運行
編譯以上兩個程序:
g++ -o mapper mapper.cpp
g++ -o reducer reducer.cpp
測試一下:
echo “dong xicheng is here now, talk to dong xicheng now” | ./mapper | sort | ./reducer
注:上面這種測試方法會頻繁打印以下字符串,可以先注釋掉,這些字符串hadoop能夠識別
reporter:counter:counter_group,counter_no,1
reporter:status:processing……
This is log, will be printed in stdout file
測試通過后,可通過以下腳本將作業提交到集群中(run_cpp_mr.sh):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | #!/bin/bash HADOOP_HOME=/opt/yarn-client INPUT_PATH=/test/input OUTPUT_PATH=/test/output echo "Clearing output path: $OUTPUT_PATH" $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH ${HADOOP_HOME}/bin/hadoop jar\ ???${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\ ??-files mapper,reducer\ ??-input $INPUT_PATH\ ??-output $OUTPUT_PATH\ ??-mapper mapper\ ??-reducer reducer |
2. Shell版WordCount
(1)Mapper實現(mapper.sh)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | #! /bin/bash while read LINE; do ??for word in $LINE ??do ????echo "$word 1" ????# in streaming, we define counter by ????# [reporter:counter:<group>,<counter>,<amount>] ????# define a counter named counter_no, in group counter_group ????# increase this counter by 1 ????# counter shoule be output through stderr ????echo "reporter:counter:counter_group,counter_no,1" >&2 ????echo "reporter:counter:status,processing......" >&2 ????echo "This is log for testing, will be printed in stdout file" >&2 ??done done |
(2)Reducer實現(mapper.sh)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | #! /bin/bash count=0 started=0 word="" while read LINE;do ??newword=`echo $LINE | cut -d ' '? -f 1` ??if [ "$word" != "$newword" ];then ????[ $started -ne 0 ] && echo "$word\t$count" ????word=$newword ????count=1 ????started=1 ??else ????count=$(( $count + 1 )) ??fi done echo "$word\t$count" |
(3)測試運行
測試以上兩個程序:
echo “dong xicheng is here now, talk to dong xicheng now” | sh mapper.sh | sort | sh reducer.sh
注:上面這種測試方法會頻繁打印以下字符串,可以先注釋掉,這些字符串hadoop能夠識別
reporter:counter:counter_group,counter_no,1
reporter:status:processing……
This is log, will be printed in stdout file
測試通過后,可通過以下腳本將作業提交到集群中(run_shell_mr.sh):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | #!/bin/bash HADOOP_HOME=/opt/yarn-client INPUT_PATH=/test/input OUTPUT_PATH=/test/output echo "Clearing output path: $OUTPUT_PATH" $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH ${HADOOP_HOME}/bin/hadoop jar\ ???${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\ ??-files mapper.sh,reducer.sh\ ??-input $INPUT_PATH\ ??-output $OUTPUT_PATH\ ??-mapper "sh mapper.sh"\ ??-reducer "sh reducer.sh" |
3. 程序說明
在Hadoop Streaming中,標準輸入、標準輸出和錯誤輸出各有妙用,其中,標準輸入和輸出分別用于接受輸入數據和輸出處理結果,而錯誤輸出的意義視內容而定:
(1)如果標準錯誤輸出的內容為:reporter:counter:group,counter,amount,表示將名稱為counter,所在組為group的hadoop counter值增加amount,hadoop第一次讀到這個counter時,會創建它,之后查找counter表,增加對應counter值
(2)如果標準錯誤輸出的內容為:reporter:status:message,則表示在界面或者終端上打印message信息,可以是一些狀態提示信息
(3)如果采用錯誤輸出的內容不是以上兩種情況,則表示調試日志,Hadoop會將其重定向到stderr文件中。注:每個Task對應三個日志文件,分別是stdout、stderr和syslog,都是文本文件,可以在web界面上查看這三個日志文件內容,也可以登錄到task所在節點上,到對應目錄中查看。
另外,需要注意一點,默認Map Task輸出的key和value分隔符是\t,Hadoop會在Map和Reduce階段按照\t分離key和value,并對key排序,注意這點非常重要,當然,你可以使用stream.map.output.field.separator指定新的分隔符。
總結
以上是生活随笔為你收集整理的Hadoop Streaming编程实例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用Docker之后还需要OpenStac
- 下一篇: 查询在一张表不在另外一张表的记录