hadoop配置2.6.1 centos7
生活随笔
收集整理的這篇文章主要介紹了
hadoop配置2.6.1 centos7
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
?
上傳文件(分發)的三種方式:
1.本地:
-file 的模式,上傳一些小的文件。
例如:
-file ./test
INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_file_broadcast"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH# Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH_1 \-output $OUTPUT_PATH \-mapper "python map.py mapper_func white_list" \-reducer "python red.py reduer_func" \-jobconf "mapred.reduce.tasks=3" \-file ./map.py \-file ./red.py \-file ./white_list run.sh?
?
2.-cacheFile? ,向計算節點分發hdfs文件。(文件需要先上傳到HDFS中)
例如:
-cacheFile "hdfs://master:9000/white_list#ABC" \
?
?
3.-cacheArchive,向計算節點分發hdfs文件。(文件需要先上傳到HDFS中)
例如:
-cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \這種情況是streaming結構會自動給你解壓文件,不用你去考慮。只需要改相應的文件路徑就好了。
def get_file_handler(f):file_in = open(f, 'r')return file_indef get_cachefile_handlers(f):f_handlers_list = []if os.path.isdir(f):for fd in os.listdir(f):f_handlers_list.append(get_file_handler(f + '/' + fd))return f_handlers_listdef read_local_file_func(f):word_set = set()for cachefile in get_cachefile_handlers(f):for line in cachefile:word = line.strip()word_set.add(word)return word_setdef mapper_func(white_list_fd):word_set = read_local_file_func(white_list_fd)for line in sys.stdin:ss = line.strip().split(' ')for s in ss:word = s.strip()#if word != "" and (word in word_set):if word != "":print "%s\t%s" % (s, 1)if __name__ == "__main__":module = sys.modules[__name__]func = getattr(module, sys.argv[1])args = Noneif len(sys.argv) > 1:args = sys.argv[2:]func(*args) map.py #!/usr/bin/pythonimport sysdef reduer_func():current_word = Nonecount_pool = []sum = 0for line in sys.stdin:word, val = line.strip().split('\t')if current_word == None:current_word = wordif current_word != word:for count in count_pool:sum += countprint "%s\t%s" % (current_word, sum)current_word = wordcount_pool = []sum = 0count_pool.append(int(val))for count in count_pool:sum += countprint "%s\t%s" % (current_word, str(sum))if __name__ == "__main__":module = sys.modules[__name__]func = getattr(module, sys.argv[1])args = Noneif len(sys.argv) > 1:args = sys.argv[2:]func(*args)red.pyHADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_cachearchive_broadcast"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH# Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH_1 \-output $OUTPUT_PATH \-mapper "python map.py mapper_func WH.gz" \-reducer "python red.py reduer_func" \-jobconf "mapred.reduce.tasks=10" \-jobconf "mapred.job.name=cachefile_demo" \-jobconf "mapred.compress.map.output=true" \-jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \-jobconf "mapred.output.compress=true" \-jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \-cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \-file "./map.py" \-file "./red. red.py HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar" #!/user/bin/env python #上面這個是讓系統自己尋找python可執行文件#輸入文件,多個文件可以使用,分隔,前提文件需要先上傳到hdfs上。 INPUT_FILE_PATH_1="/1.txt,/2.txt" #hdfs上的輸出文件目錄的位置 OUTPUT_PATH="/table1"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH# Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH_1 \-output $OUTPUT_PATH \-mapper "python map.py " \-reducer "python red.py " \-file ./map.py \-file ./red.py \-jobconf mapred.reduce.tasks=2 \ #設置reduce的數量#下面兩行:是開啟map階段產生的數據是否壓縮,第二行是壓縮的格式-jobconf "mapred.compress.map.output=true" \ ###1-jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \ ###1#下面兩行是:最終輸出的是否開啟壓縮,及其壓縮的格式-jobconf "mapred.output.compress=true" \ ###2-jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \ ###2#下面是壓縮文件上傳的位置 “#”后面是別名,在配置文件中可以使用,slave節點#在運行過程中也是使用別名來建目錄的。 -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \ ###第三種傳文件的方式。#下面第一行是表示以什么分隔,默認是制表符“\t”#第二行是以分隔后的前兩個作為key,剩下為value#第三行是在key中以,分隔,#第四行是在第三行分隔后,用第一列分桶-jobconf stream.map.output.field.separator=',' / -jobconf stream.num.map.output.key.fields=2\ -jobconf map.output.key.field.separator=',' / -jobconf num.key.fields.for.partition=1 \#下面是在你自己設置partition時寫入的東西。-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner run.sh
-jobconf mapred.text.key.partitioner.options=-k2,3 \? 相當于-jobconf num.key.fields.for.partition=1\
的擴展,意思是在key中,選擇2,3列作為partition
在沒有設partion的時候,默認等于
先分桶,之后再在桶中按照key排序,
?
補充:!!!
可以通過壓縮文件的方式,控制map的數量,一個壓縮文件對應一個map
還可以不影響路徑,即可以讓目錄結構保持不變.
----------------------------------------- def get_file_handler(f):file_in = open(f, 'r')return file_indef get_cachefile_handlers(f):f_handlers_list = []if os.path.isdir(f):for fd in os.listdir(f):f_handlers_list.append(get_file_handler(f + '/' + fd))return f_handlers_listdef read_local_file_func(f):word_set = set()for cachefile in get_cachefile_handlers(f):for line in cachefile:word = line.strip()word_set.add(word)return word_setdef mapper_func(white_list_fd):word_set = read_local_file_func(white_list_fd)for line in sys.stdin:ss = line.strip().split(' ')for s in ss:word = s.strip()#if word != "" and (word in word_set):if word != "":print "%s\t%s" % (s, 1)if __name__ == "__main__":module = sys.modules[__name__]func = getattr(module, sys.argv[1])args = Noneif len(sys.argv) > 1:args = sys.argv[2:]f #!/usr/bin/pythonimport sysdef reduer_func():current_word = Nonecount_pool = []sum = 0for line in sys.stdin:word, val = line.strip().split('\t')if current_word == None:current_word = wordif current_word != word:for count in count_pool:sum += countprint "%s\t%s" % (current_word, sum)current_word = wordcount_pool = []sum = 0count_pool.append(int(val))for count in count_pool:sum += countprint "%s\t%s" % (current_word, str(sum))if __name__ == "__main__":module = sys.modules[__name__]func = getattr(module, sys.argv[1])args = Noneif len(sys.argv) > 1:args = sys.argv[2:]f HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_cachearchive_broadcast"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH# Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH_1 \-output $OUTPUT_PATH \-mapper "python map.py mapper_func WH.gz" \-reducer "python red.py reduer_func" \-jobconf "mapred.reduce.tasks=10" \-jobconf "mapred.job.name=cachefile_demo" \-jobconf "mapred.compress.map.output=true" \-jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \-jobconf "mapred.output.compress=true" \-jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \-cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \-file "./map.py" \-file "./red.
轉載于:https://www.cnblogs.com/taozizainali/p/8811893.html
總結
以上是生活随笔為你收集整理的hadoop配置2.6.1 centos7的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HTML如何制作表单(Axure如何制作
- 下一篇: 第五人格pc版怎么下载