MapReduce详解和WordCount模拟
最早接觸大數(shù)據(jù),常縈繞耳邊的一個(gè)詞「MapReduce」。它到底是什么,能做什么,原理又是什么?且聽(tīng)下文講解。
是什么
MapReduce 即是一個(gè)編程模型,又是一個(gè)計(jì)算框架,它充分采用了分治的思想,將數(shù)據(jù)處理過(guò)程拆分成兩步:Map 和 Reduce。用戶只需要編寫 map() 和 reduce() 函數(shù),就能使問(wèn)題的計(jì)算實(shí)現(xiàn)分布式,并在Hadoop上執(zhí)行。
數(shù)據(jù)處理
MapReduce 操作數(shù)據(jù)的最小單位是一個(gè)鍵值對(duì)。map 端的主要輸入是一對(duì)<key,value>值,經(jīng)過(guò) map 計(jì)算后輸出一對(duì)<key,value>,然后將相同的 key 合并,形成<key,value 集合>,再將這個(gè)<key,value 集合>輸入 reduce ,經(jīng)過(guò)計(jì)算輸出零個(gè)或多個(gè)<key,value>對(duì)。
兩個(gè)重要的進(jìn)程
JobTracker
JobTracker 在集群中負(fù)責(zé)任務(wù)調(diào)度和集群資源監(jiān)控這兩個(gè)功能。TaskTracker 通過(guò)周期性的心跳向 JobTracker 匯報(bào)當(dāng)前的健康狀況和狀態(tài),心跳中包括自身計(jì)算資源的信息、被占用的計(jì)算資源的信息和正在運(yùn)行中的任務(wù)的狀態(tài)信息。JobTracker 會(huì)根據(jù)各個(gè) TaskTracker 周期性發(fā)送過(guò)來(lái)的心跳信息綜合考慮TaskTracker 的資源余量、作業(yè)優(yōu)先級(jí)、作業(yè)提交時(shí)間等因素,為 TaskTracker 分配合適的任務(wù)。
JobTracker 提供了一個(gè)基于 web 的管理界面,可以通過(guò) JobTracker:50030 端口訪問(wèn)。
TaskTracker
TaskTracker 主要負(fù)責(zé)匯報(bào)心跳和執(zhí)行 JobTracker 命令這兩個(gè)功能。命令主要包括5種:啟動(dòng)命令、提交命令、殺死任務(wù)、殺死作業(yè)和重新初始化。
幾個(gè)概念
作業(yè)(Job) 和 任務(wù)(Task)
MapReduce 作業(yè)是用戶提交的最小單位,任務(wù)是 MapReduce 計(jì)算的最小單位。 簡(jiǎn)單講,用戶提交的是一個(gè)MapReduce作業(yè),一個(gè) MapReduce 作業(yè)可以被拆分成兩種——Map 任務(wù)和 Reduce 任務(wù)。
槽(slot)
槽是Hadoop計(jì)算資源的表示模型,Hadoop 將各個(gè)節(jié)點(diǎn)上的多維度資源(CPU、內(nèi)存等)抽象成一維度的槽。一個(gè)TaskTracker 能夠啟動(dòng)的任務(wù)數(shù)量是由 TaskTracker 配置的任務(wù)槽決定的。
MapReduce 過(guò)程
一個(gè)MapReduce作業(yè)通常經(jīng)過(guò) input、map、combine、reduce、output 五個(gè)階段。combine 階段不一定發(fā)生,map輸出的中間結(jié)果分發(fā)到 reduce 的過(guò)程被稱為 shuffle。shuffle 階段還會(huì)發(fā)生 copy 和 sort。
兩幅重要的流程圖
- map任務(wù)流程圖
- reduce 任務(wù)流程圖
幾個(gè)重要的階段說(shuō)明
map 函數(shù)處理后的中間結(jié)果會(huì)寫到本地磁盤上,在刷寫磁盤的過(guò)程中,還做了 partition 和 sort 操作。
map 函數(shù)輸出時(shí),并不是簡(jiǎn)單地刷寫磁盤,為了保證 I/O 效率,采取了先寫到內(nèi)存的環(huán)形緩沖區(qū),并做一次預(yù)排序。請(qǐng)結(jié)合map任務(wù)流程圖理解。
partition
在分區(qū)階段,通過(guò)對(duì) key 取模,生成<partition,key,value>三元組,分區(qū)階段進(jìn)行了一次內(nèi)排序。
MemoryBuffer
內(nèi)存緩沖區(qū),保存 map 的結(jié)果和 partition 處理后的結(jié)果,默認(rèn)大小為100M,溢寫閾值為80M。
spill(溢寫)
內(nèi)存緩沖區(qū)達(dá)到閾值時(shí),溢寫線程鎖住這80M的緩沖區(qū),開(kāi)始將數(shù)據(jù)寫到本地磁盤中,然后釋放內(nèi)存。
每次溢寫都會(huì)生成一個(gè)數(shù)據(jù)文件,溢出的數(shù)據(jù)寫到磁盤前會(huì)對(duì)數(shù)據(jù)進(jìn)行 sort 以及合并(combine)。
combine
combine 對(duì)map 函數(shù)的輸出結(jié)果進(jìn)行早期聚合以減少傳輸?shù)臄?shù)據(jù)量,其作用其實(shí)和reduce 函數(shù)一樣。combine 的過(guò)程發(fā)生在 spill(溢寫) 階段。
combine 能夠提升程序性能,但并不是所有常見(jiàn)都適合使用 combine ,例如:求中值。
sort
MapReduce 計(jì)算框架主要用到了兩種排序:快速排序和歸并排序。在 Map 任務(wù)和 Reduce 任務(wù)的過(guò)程中,一共發(fā)生了三次排序操作:
- partition 過(guò)程中按照鍵值進(jìn)行的內(nèi)排序。
- map 任務(wù)完成之前,合并溢寫文件產(chǎn)生輸出文件時(shí)進(jìn)行的一次 sort 操作。
- shuffle 過(guò)程的 sort 操作。
wordcount 實(shí)驗(yàn)?zāi)M
map 端編程代碼(map_a.py):
import sys import rep =re.compile(r'\w+') for line in sys.stdin:world_list =line.strip().split()for word in world_list:if len(word)<2:continuew_list =p.findall(word)if len(w_list)>0:w =w_list[0].lower()print "%s\t%d"%(w,1)reduce 端編程代碼(red_b.py)
import sys wt =0 cur_word =None for line in sys.stdin:word,cnt =line.strip().split('\t')if cur_word ==None:cur_word =wordif cur_word !=word:print "%s\t%d"%(cur_word,wt)wt =0cur_word =wordwt =wt+int(cnt) print "%s\t%d"%(cur_word,wt)模擬命令
cat The_man_of_property.txt |python ./project/map_a.py | sort -k 1 |python ./project/red_b.py輸出顯示
轉(zhuǎn)載于:https://www.cnblogs.com/bbmkey/p/10702196.html
總結(jié)
以上是生活随笔為你收集整理的MapReduce详解和WordCount模拟的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: android 上下偏差怎么写_详解 A
- 下一篇: 统计方法在自然语言处理中的应用(统计自然