mapreduce 文件可以切分吗_MapReduce的任务流程
我們按照?qǐng)D中的流程,梳理一下MapReduce的任務(wù)流程。
初始時(shí),是上述的一個(gè)文本。MapReduce接收到作業(yè)輸入后,會(huì)先進(jìn)行數(shù)據(jù)拆分。
數(shù)據(jù)拆分完成之后,會(huì)有多個(gè) 小文本 數(shù)據(jù),每個(gè)小文本都會(huì)作為一個(gè)Map任務(wù)的輸入。這樣一個(gè)大的MapReduce作業(yè),會(huì)被分解為多個(gè)小的Map任務(wù)。
Combiner會(huì)處理Map生成的數(shù)據(jù),需要注意的是,此時(shí)Map生產(chǎn)的僅僅是中間結(jié)果。Combiner是一個(gè)可選的組件,用戶不設(shè)置,他就不存在。
之后,數(shù)據(jù)會(huì)到達(dá)Partitioner,Partitioner組件會(huì)將中間數(shù)據(jù)按照哈希函數(shù)的對(duì)應(yīng)規(guī)則,將中間結(jié)果分配到對(duì)應(yīng)的Reducer所在節(jié)點(diǎn)上。
Reducer會(huì)處理中間數(shù)據(jù),得到最終的結(jié)果。
這就是,一個(gè)完整的MapReduce作業(yè)的生老病死的概括,其真實(shí)的流程自然遠(yuǎn)不止此,我們會(huì)在后面娓娓道來。
先讓我們仔仔細(xì)細(xì)地了解一下上述過程的每一個(gè)組件。
一、扯一扯Map
有了上述的內(nèi)容,我們可以進(jìn)行下一步了。
按照我們說的,我們應(yīng)該將這個(gè)小短文分成幾個(gè)部分。也就是圖中的數(shù)據(jù)劃分。
(1)首先進(jìn)行數(shù)據(jù)劃分
當(dāng)我們開啟一個(gè)MapReduce程序,一般傳入的輸入都是一個(gè)體積巨大的數(shù)據(jù)。MapReduce接收到數(shù)據(jù)后,需要對(duì)數(shù)據(jù)進(jìn)行劃分。通俗來講,就是我們前文說的,我們?cè)撊绻麑⒁粋€(gè)小短文劃分成多行,分配個(gè)多個(gè)人進(jìn)行統(tǒng)計(jì)。
MapReduce中有一個(gè)InputFormat類,它會(huì)完成如下三個(gè)任務(wù):
驗(yàn)證作業(yè)數(shù)據(jù)的輸入形式和格式
將輸入數(shù)據(jù)分割為若干個(gè)邏輯意義上的InputSplit,其中每一個(gè)InputSplit都將單獨(dú)作為Map任務(wù)的輸入。也就是說,InputSplit的個(gè)數(shù),代表了Map任務(wù)的個(gè)數(shù)。需要注意,這里并沒有做實(shí)際切分,僅僅是將數(shù)據(jù)進(jìn)行邏輯上的切分。
提供一個(gè)RecordReader,用于將Map的輸入轉(zhuǎn)換為若干個(gè)記錄。雖然MapReduce作業(yè)可以接受很多種格式的數(shù)據(jù),但是Map任務(wù)接收的任務(wù)其實(shí)是鍵值對(duì)類型的數(shù)據(jù),因此需要將初始的輸入數(shù)據(jù)轉(zhuǎn)化為鍵值對(duì)。RecordReader對(duì)象會(huì)從數(shù)據(jù)分片中讀取出數(shù)據(jù)記錄,然后轉(zhuǎn)化為 Key-Value 鍵值對(duì),逐個(gè)輸入到Map中進(jìn)行處理。
問題在于,這個(gè)InputFormat類該如何進(jìn)行劃分呢?在FileInputFormat類中,會(huì)有一個(gè)getSplits函數(shù),這個(gè)函數(shù)所做的事情其實(shí)就是進(jìn)行數(shù)據(jù)切分的過程。我們稍微看一下這個(gè)函數(shù):
public List<InputSplit> getSplits(JobContext job) throws IOException { ? ?StopWatch sw = new StopWatch().start(); ? ?long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); ? ?long maxSize = getMaxSplitSize(job); ? ?//... ? ?for (FileStatus file: files) { ? ? ? ?if (isSplitable(job, path)) { ? ? ? ? ?long blockSize = file.getBlockSize(); ? ? ? ? ?long splitSize = computeSplitSize(blockSize, minSize, maxSize); ? ? ? ? ? ?//... ? ? ? } ? ? ? ?//... ? } ? ?//...}protected long computeSplitSize(long blockSize, long minSize, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?long maxSize) { ? ?return Math.max(minSize, Math.min(maxSize, blockSize));}minSize :每個(gè)split的最小值,默認(rèn)為1.getFormatMinSplitSize()為代碼中寫死,固定返回1,除非修改了hadoop的源代碼.getMinSplitSize(job)取決于參數(shù)mapreduce.input.fileinputformat.split.minsize,如果沒有設(shè)置該參數(shù),返回1.故minSize默認(rèn)為1.
maxSize:每個(gè)split的最大值,如果設(shè)置了mapreduce.input.fileinputformat.split.maxsize,則為該值,否則為L(zhǎng)ong的最大值。
blockSize :默認(rèn)為HDFS設(shè)置的文件存儲(chǔ)BLOCK大小。注意:該值并不一定是唯一固定不變的。HDFS上不同的文件該值可能不同。故將文件劃分成split的時(shí)候,對(duì)于每個(gè)不同的文件,需要獲取該文件的blocksize。
splitSize :根據(jù)公式,默認(rèn)為blockSize 。
從上述代碼中可以看到,這個(gè)InputSize在 [minSize, maxSize] 之間。
(2)這樣,我們可以理一理劃分邏輯
1)遍歷輸入目錄中的每個(gè)文件,拿到該文件
2)計(jì)算文件長(zhǎng)度,A:如果文件長(zhǎng)度為0,如果mapred.split.zero.file.skip=true,則不劃分split ; 如果mapred.split.zero.file.skip為false,生成一個(gè)length=0的split .B:如果長(zhǎng)度不為0,跳到步驟3
3)判斷該文件是否支持split :如果支持,跳到步驟4;如果不支持,該文件不切分,生成1個(gè)split,split的length等于文件長(zhǎng)度。
4)根據(jù)當(dāng)前文件,計(jì)算splitSize。
5)判斷剩余待切分文件大小/splitsize是否大于SPLIT_SLOP(該值為1.1,代碼中寫死了) 如果true,切分成一個(gè)split,待切分文件大小更新為當(dāng)前值-splitsize ,再次切分。生成的split的length等于splitsize;如果false 將剩余的切到一個(gè)split里,生成的split length等于剩余待切分的文件大小。之所以需要判斷剩余待切分文件大小/splitsize,主要是為了避免過多的小的split。比如文件中有100個(gè)109M大小的文件,如果splitSize=100M,如果不判斷剩余待切分文件大小/splitsize,將會(huì)生成200個(gè)split,其中100個(gè)split的size為100M,而其中100個(gè)只有9M,存在100個(gè)過小的split。MapReduce首選的是處理大文件,過多的小split會(huì)影響性能。
劃分好Split之后,這些數(shù)據(jù)進(jìn)入Map任務(wù),按照用戶設(shè)計(jì)處理邏輯進(jìn)行處理。Map可以由用戶定義設(shè)計(jì)處理邏輯。
二、聊一聊Combiner
Combiner組件并不是一個(gè)必須部分,用戶可以按照實(shí)際的需求靈活的添加。Combiner組件的主要作用是 減少網(wǎng)絡(luò)傳輸負(fù)載,優(yōu)化網(wǎng)絡(luò)數(shù)據(jù)傳輸優(yōu)化 。
當(dāng)我們Map任務(wù)處理完成之后,上述的文本會(huì)變成一個(gè)一個(gè)的 Key-Value 對(duì)。
(This, 1)(distribution, 1)...在沒有Combiner組件前提下,這些鍵值對(duì)會(huì)直接傳輸?shù)絉educer端,進(jìn)行最后的統(tǒng)計(jì)工作。但是這一步是可以優(yōu)化的,因?yàn)镸ap端僅僅是將每行的詞拆分了,但是其實(shí)可以再做一步統(tǒng)計(jì)的。
例如,我們假設(shè)在Map任務(wù)A這里出現(xiàn)了兩次 (This, 1),我們可以做一次統(tǒng)計(jì),將這個(gè)Map任務(wù)上的This做一次統(tǒng)計(jì),生成(This, 2)。在大數(shù)據(jù)場(chǎng)合,千萬個(gè)這樣的相同詞的合并會(huì)顯著降低網(wǎng)絡(luò)負(fù)載。
但是并不是所有的場(chǎng)合都適用Combiner,這個(gè)組件是可有可無的,用戶需要按照自己的需求靈活決定 。
因?yàn)镃ombiner可以存在,也可以不存在,所有,我們?cè)O(shè)計(jì)Combiner時(shí),要保證Combiner的key-value和Map的key-value一致 。這也意味著,若你設(shè)計(jì)的Combiner改變了原先Map的鍵值對(duì)設(shè)計(jì),那么你的Combiner設(shè)計(jì)就是不合法的。
三、瞅一瞅Partitioner
為了保證所有主鍵相同的鍵值對(duì)會(huì)傳輸?shù)酵粋€(gè)Reducer節(jié)點(diǎn),以便Reducer節(jié)點(diǎn)可以在不訪問其他Reducer節(jié)點(diǎn)的情況下就可以計(jì)算出最終的結(jié)果,我們需要對(duì)來自Map(如果有Combiner,就是Combiner之后的結(jié)果)中間鍵值對(duì)進(jìn)行分區(qū)處理,Partitioner主要就是進(jìn)行分區(qū)處理的。
Partitioner 默認(rèn)的分發(fā)規(guī)則
根據(jù) key 的 hashcode%reduce task 數(shù)來分發(fā),所以:如果要按照我們自己的需求進(jìn)行分組,則需要改寫數(shù)據(jù)分發(fā)(分區(qū))組件 Partitioner
Partition 的 key value, 就是Mapper輸出的key value
public interface Partitioner<K2, V2> extends JobConfigurable { ? ?/** ? * Get the paritition number for a given key (hence record) given the total ? * number of partitions i.e. number of reduce-tasks for the job. ? * ? ? *Typically a hash function on a all or a subset of the key.
? * ? * @param key 用來partition的key值。 ? * @param value 鍵值對(duì)的值。 ? * @param numPartitions 分區(qū)數(shù)目。 ? * @return the partition number for the key. ? */ ?int getPartition(K2 key, V2 value, int numPartitions);}輸入是Map的結(jié)果對(duì)和Reducer的數(shù)目,輸出則是分配的Reducer(整數(shù)編號(hào))。就是指定Mappr輸出的鍵值對(duì)到哪一個(gè)reducer上去。系統(tǒng)缺省的Partitioner是HashPartitioner,它以key的Hash值對(duì)Reducer的數(shù)目取模,得到對(duì)應(yīng)的Reducer。這樣保證如果有相同的key值,肯定被分配到同一個(gè)reducre上。如果有N個(gè)reducer,編號(hào)就為0,1,2,3……(N-1)。
MapReduce 中會(huì)將 map 輸出的 kv 對(duì),按照相同 key 分組,然后分發(fā)給不同的 reducetask 默認(rèn)的分發(fā)規(guī)則為:根據(jù) key 的 hashcode%reduce task 數(shù)來分發(fā),所以:如果要按照我們自 己的需求進(jìn)行分組,則需要改寫數(shù)據(jù)分發(fā)(分組)組件 Partitioner, 自定義一個(gè) CustomPartitioner 繼承抽象類:Partitioner
因此, Partitioner 的執(zhí)行時(shí)機(jī), 是在Map輸出 key-value 對(duì)之后
四、MapReduce中的Sort
MapReduce中的很多流程都涉及到了排序,我們會(huì)在后面詳細(xì)說明。
從整個(gè)MapReduce的程序執(zhí)行來看,整個(gè)過程涉及到了 快排、歸并排序、堆排 三種排序方法。
五、遛一遛Reduce
Reduce會(huì)處理上游(Map,也可能有Combiner)的中間結(jié)果。
需要注意的是,Map到Reduce整個(gè)過程中,鍵值的變化是不一樣的
初始是文本內(nèi)容,會(huì)被RecordReader處理為鍵值對(duì)
經(jīng)過Map(也可能有Combiner)后,仍然是鍵值對(duì)形式
經(jīng)過Partition,到達(dá)Reduce的結(jié)果是 key - list(value) 形式。所以在Reduce處理的value其實(shí)一個(gè)整體。
Reduce會(huì)把所有的結(jié)果處理完成,輸出到對(duì)應(yīng)的輸出路徑。
弊端
MapReduce的Reduce處理結(jié)果最后都是需要落盤的,當(dāng)一個(gè)project中含有多個(gè)MapReduce的 作業(yè)(job)時(shí),無法有效利用內(nèi)存。
總結(jié)
以上是生活随笔為你收集整理的mapreduce 文件可以切分吗_MapReduce的任务流程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jq 直接调用php文件_js调用php
- 下一篇: c++ 调用system 不显示黑框_J