生活随笔
收集整理的這篇文章主要介紹了
使用Python实现Hadoop MapReduce程序
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
根據上面兩篇文章,下面是我在自己的ubuntu上的運行過程。文字基本采用博文使用Python實現Hadoop MapReduce程序,? 打字很浪費時間滴。?
在這個實例中,我將會向大家介紹如何使用Python?為?Hadoop編寫一個簡單的MapReduce程序。
盡管Hadoop 框架是使用Java編寫的但是我們仍然需要使用像C++、Python等語言來實現 Hadoop程序。盡管Hadoop官方網站給的示例程序是使用Jython編寫并打包成Jar文件,這樣顯然造成了不便,其實,不一定非要這樣來實現,我們可以使用Python與Hadoop 關聯進行編程,看看位于/src/examples/python/WordCount.py ?的例子,你將了解到我在說什么。
我們想要做什么?
我們將編寫一個簡單的 MapReduce 程序,使用的是C-Python,而不是Jython編寫后打包成jar包的程序。
我們的這個例子將模仿 WordCount 并使用Python來實現,例子通過讀取文本文件來統計出單詞的出現次數。結果也以文本形式輸出,每一行包含一個單詞和單詞出現的次數,兩者中間使用制表符來想間隔。
先決條件
編寫這個程序之前,你學要架設好Hadoop 集群,這樣才能不會在后期工作抓瞎。如果你沒有架設好,那么在后面有個簡明教程來教你在Ubuntu Linux 上搭建(同樣適用于其他發行版linux、unix)
如何在Ubuntu Linux 上搭建hadoop的單節點模式和偽分布模式,請參閱博文
Ubuntu上搭建Hadoop環境(單機模式+偽分布模式)
Python的MapReduce代碼
使用Python編寫MapReduce代碼的技巧就在于我們使用了 HadoopStreaming 來幫助我們在Map 和 Reduce間傳遞數據通過STDIN (標準輸入)和STDOUT (標準輸出).我們僅僅使用Python的sys.stdin來輸入數據,使用sys.stdout輸出數據,這樣做是因為HadoopStreaming會幫我們辦好其他事。這是真的,別不相信!
Map: mapper.py
將下列的代碼保存在/usr/local/hadoop/mapper.py中,他將從STDIN讀取數據并將單詞成行分隔開,生成一個列表映射單詞與發生次數的關系:
注意:要確保這個腳本有足夠權限(chmod +x mapper.py)。
[python]?view plaincopy
?? ?? import?sys?? ?? ?? for?line?in?sys.stdin:?? ?????? ????line?=?line.strip()?? ?????? ????words?=?line.split()?? ?????? ????for?word?in?words:?? ?????????? ?????????? ?????????? ?????????? ?????????? ????????print?'%s\t%s'?%?(word,?1)??
在這個腳本中,并不計算出單詞出現的總數,它將輸出 "<word> 1" 迅速地,盡管<word>可能會在輸入中出現多次,計算是留給后來的Reduce步驟(或叫做程序)來實現。當然你可以改變下編碼風格,完全尊重你的習慣。Reduce: reducer.py
將代碼存儲在/usr/local/hadoop/reducer.py 中,這個腳本的作用是從mapper.py 的STDIN中讀取結果,然后計算每個單詞出現次數的總和,并輸出結果到STDOUT。
同樣,要注意腳本權限:chmod +x reducer.py
[python]?view plaincopy
?? ?? from?operator?import?itemgetter?? import?sys?? ?? current_word?=?None?? current_count?=?0?? word?=?None?? ?? ?? for?line?in?sys.stdin:?? ?????? ????line?=?line.strip()?? ?? ?????? ????word,?count?=?line.split('\t',?1)?? ?? ?????? ????try:?? ????????count?=?int(count)?? ????except?ValueError:?? ?????????? ?????????? ????????continue?? ?? ?????? ?????? ????if?current_word?==?word:?? ????????current_count?+=?count?? ????else:?? ????????if?current_word:?? ?????????????? ????????????print?'%s\t%s'?%?(current_word,?current_count)?? ????????current_count?=?count?? ????????current_word?=?word?? ?? ?? if?current_word?==?word:?? ????print?'%s\t%s'?%?(current_word,?current_count)??
測試你的代碼(cat data | map | sort | reduce)
我建議你在運行MapReduce job測試前嘗試手工測試你的mapper.py 和 reducer.py腳本,以免得不到任何返回結果
這里有一些建議,關于如何測試你的Map和Reduce的功能:
[plain]?view plaincopy
hadoop@derekUbun:/usr/local/hadoop$?echo?"foo?foo?quux?labs?foo?bar?quux"?|?./mapper.py?? foo??????1?? foo??????1?? quux?????1?? labs?????1?? foo??????1?? bar??????1?? quux?????1?? hadoop@derekUbun:/usr/local/hadoop$?echo?"foo?foo?quux?labs?foo?bar?quux"?|./mapper.py?|?sort?|./reducer.py?? bar?????1?? foo?????3?? labs????1?? quux????2??
# using one of the ebooks as example input
# (see below on where to get the ebooks)
[plain]?view plaincopy
hadoop@derekUbun:/usr/local/hadoop$?cat?book/book.txt?|./mapper.pysubscribe??????1?? to???1?? our??????1?? email????1?? newsletter???1?? to???1?? hear?????1?? about????1?? new??????1?? eBooks.??????1??
在Hadoop平臺上運行Python腳本
為了這個例子,我們將需要一本電子書,把它放在/usr/local/hadpoop/book/book.txt之下
?
[plain]?view plaincopy
hadoop@derekUbun:/usr/local/hadoop$?ls?-l?book?? 總用量?636?? -rw-rw-r--?1?derek?derek?649669??3月?12?12:22?book.txt??
復制本地數據到HDFS
在我們運行MapReduce job 前,我們需要將本地的文件復制到HDFS中:
[plain]?view plaincopy
hadoop@derekUbun:/usr/local/hadoop$?hadoop?dfs?-copyFromLocal?/usr/local/hadoop/book?book?? hadoop@derekUbun:/usr/local/hadoop$?hadoop?dfs?-ls?? Found?3?items?? drwxr-xr-x???-?hadoop?supergroup??????????0?2013-03-12?15:56?/user/hadoop/book??
執行 MapReduce job現在,一切準備就緒,我們將在運行Python MapReduce job 在Hadoop集群上。像我上面所說的,我們使用的是HadoopStreaming 幫助我們傳遞數據在Map和Reduce間并通過STDIN和STDOUT,進行標準化輸入輸出。
[plain]?view plaincopy
hadoop@derekUbun:/usr/local/hadoop$hadoop?jar?contrib/streaming/hadoop-streaming-1.1.2.jar??? -mapper?/usr/local/hadoop/mapper.py??? -reducer?/usr/local/hadoop/reducer.py??? -input?book/*??? -output?book-output??
在運行中,如果你想更改Hadoop的一些設置,如增加Reduce任務的數量,你可以使用“-jobconf”選項:
[plain]?view plaincopy
hadoop@derekUbun:/usr/local/hadoop$hadoop?jar?contrib/streaming/hadoop-streaming-1.1.2.jar??? -jobconf?mapred.reduce.tasks=4?? ?? -mapper?/usr/local/hadoop/mapper.py??? -reducer?/usr/local/hadoop/reducer.py??? -input?book/*??? -output?book-output???
如果上面兩個運行出錯,請參考下面一段代碼。注意,重新運行,需要刪除dfs中的output文件
[plain]?view plaincopy
bin/hadoop?jar?contrib/streaming/hadoop-streaming-1.1.2.jar???? -mapper?task1/mapper.py???? -file?task1/mapper.py???? -reducer?task1/reducer.py???? -file?task1/reducer.py???? -input?url??? -output?url-output???? -jobconf?mapred.reduce.tasks=3???
一個重要的備忘是關于Hadoop does not honor mapred.map.tasks 這個任務將會讀取HDFS目錄下的book并處理他們,將結果存儲在獨立的結果文件中,并存儲在HDFS目錄下的book-output目錄。之前執行的結果如下:
[plain]?view plaincopy
hadoop@derekUbun:/usr/local/hadoop$?hadoop?jar?contrib/streaming/hadoop-streaming-1.1.2.jar?-jobconf?mapred.reduce.tasks=4?-mapper?/usr/local/hadoop/mapper.py?-reducer?/usr/local/hadoop/reducer.py?-input?book/*?-output?book-output?? 13/03/12?16:01:05?WARN?streaming.StreamJob:?-jobconf?option?is?deprecated,?please?use?-D?instead.?? packageJobJar:?[/usr/local/hadoop/tmp/hadoop-unjar4835873410426602498/]?[]?/tmp/streamjob5047485520312501206.jar?tmpDir=null?? 13/03/12?16:01:06?INFO?util.NativeCodeLoader:?Loaded?the?native-hadoop?library?? 13/03/12?16:01:06?WARN?snappy.LoadSnappy:?Snappy?native?library?not?loaded?? 13/03/12?16:01:06?INFO?mapred.FileInputFormat:?Total?input?paths?to?process?:?1?? 13/03/12?16:01:06?INFO?streaming.StreamJob:?getLocalDirs():?[/usr/local/hadoop/tmp/mapred/local]?? 13/03/12?16:01:06?INFO?streaming.StreamJob:?Running?job:?job_201303121448_0010?? 13/03/12?16:01:06?INFO?streaming.StreamJob:?To?kill?this?job,?run:?? 13/03/12?16:01:06?INFO?streaming.StreamJob:?/usr/local/hadoop/libexec/../bin/hadoop?job??-Dmapred.job.tracker=localhost:9001?-kill?job_201303121448_0010?? 13/03/12?16:01:06?INFO?streaming.StreamJob:?Tracking?URL:?http://localhost:50030/jobdetails.jsp?jobid=job_201303121448_0010?? 13/03/12?16:01:07?INFO?streaming.StreamJob:??map?0%??reduce?0%?? 13/03/12?16:01:10?INFO?streaming.StreamJob:??map?100%??reduce?0%?? 13/03/12?16:01:17?INFO?streaming.StreamJob:??map?100%??reduce?8%?? 13/03/12?16:01:18?INFO?streaming.StreamJob:??map?100%??reduce?33%?? 13/03/12?16:01:19?INFO?streaming.StreamJob:??map?100%??reduce?50%?? 13/03/12?16:01:26?INFO?streaming.StreamJob:??map?100%??reduce?67%?? 13/03/12?16:01:27?INFO?streaming.StreamJob:??map?100%??reduce?83%?? 13/03/12?16:01:28?INFO?streaming.StreamJob:??map?100%??reduce?100%?? 13/03/12?16:01:29?INFO?streaming.StreamJob:?Job?complete:?job_201303121448_0010?? 13/03/12?16:01:29?INFO?streaming.StreamJob:?Output:?book-output?? hadoop@derekUbun:/usr/local/hadoop$??
如你所見到的上面的輸出結果,Hadoop 同時還提供了一個基本的WEB接口顯示統計結果和信息。
當Hadoop集群在執行時,你可以使用瀏覽器訪問 http://localhost:50030/ :
檢查結果是否輸出并存儲在HDFS目錄下的book-output中:
[plain]?view plaincopy
hadoop@derekUbun:/usr/local/hadoop$?hadoop?dfs?-ls?book-output?? Found?6?items?? -rw-r--r--???2?hadoop?supergroup??????????0?2013-03-12?16:01?/user/hadoop/book-output/_SUCCESS?? drwxr-xr-x???-?hadoop?supergroup??????????0?2013-03-12?16:01?/user/hadoop/book-output/_logs?? -rw-r--r--???2?hadoop?supergroup?????????33?2013-03-12?16:01?/user/hadoop/book-output/part-00000?? -rw-r--r--???2?hadoop?supergroup?????????60?2013-03-12?16:01?/user/hadoop/book-output/part-00001?? -rw-r--r--???2?hadoop?supergroup?????????54?2013-03-12?16:01?/user/hadoop/book-output/part-00002?? -rw-r--r--???2?hadoop?supergroup?????????47?2013-03-12?16:01?/user/hadoop/book-output/part-00003?? hadoop@derekUbun:/usr/local/hadoop$??
可以使用dfs -cat 命令檢查文件目錄
[plain]?view plaincopy
hadoop@derekUbun:/usr/local/hadoop$?hadoop?dfs?-cat?book-output/part-00000?? about???1?? eBooks.?????1?? the?????1?? to??2?? hadoop@derekUbun:/usr/local/hadoop$???
下面是原英文作者mapper.py和reducer.py的兩個修改版本:
mapper.py
[python]?view plaincopy
?? ?? ?? import?sys?? ?? def?read_input(file):?? ????for?line?in?file:?? ?????????? ????????yield?line.split()?? ?? def?main(separator='\t'):?? ?????? ????data?=?read_input(sys.stdin)?? ????for?words?in?data:?? ?????????? ?????????? ?????????? ?????????? ?????????? ????????for?word?in?words:?? ????????????print?'%s%s%d'?%?(word,?separator,?1)?? ?? if?__name__?==?"__main__":?? ????main()??
reducer.py
[python]?view plaincopy
?? ?? ?? from?itertools?import?groupby?? from?operator?import?itemgetter?? import?sys?? ?? def?read_mapper_output(file,?separator='\t'):?? ????for?line?in?file:?? ????????yield?line.rstrip().split(separator,?1)?? ?? def?main(separator='\t'):?? ?????? ????data?=?read_mapper_output(sys.stdin,?separator=separator)?? ?????? ?????? ?????? ?????? ????for?current_word,?group?in?groupby(data,?itemgetter(0)):?? ????????try:?? ????????????total_count?=?sum(int(count)?for?current_word,?count?in?group)?? ????????????print?"%s%s%d"?%?(current_word,?separator,?total_count)?? ????????except?ValueError:?? ?????????????? ????????????pass?? ?? if?__name__?==?"__main__":?? ????main()??
總結
以上是生活随笔為你收集整理的使用Python实现Hadoop MapReduce程序的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。