MapReduce源码分析之JobSplitWriter
JobSplitWriter被作業(yè)客戶端用于寫分片相關(guān)文件,包括分片數(shù)據(jù)文件job.split和分片元數(shù)據(jù)信息文件job.splitmetainfo。它有兩個(gè)靜態(tài)成員變量,如下:
?
[java]?view plaincopy? ? ? ? 并且,提供了一個(gè)靜態(tài)方法,完成SPLIT_FILE_HEADER的初始化,代碼如下:
?
?
[java]?view plaincopy? ? ? ??JobSplitWriter實(shí)現(xiàn)其功能的為createSplitFiles()方法,它有三種實(shí)現(xiàn),我們先看其中的public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,Configuration conf, FileSystem fs, T[] splits),代碼如下:
?
?
[java]?view plaincopy? ? ? ??createSplitFiles()方法的邏輯很清晰,大體如下:
?
? ? ? ? 1、調(diào)用createFile()方法,創(chuàng)建分片文件,并獲取文件系統(tǒng)數(shù)據(jù)輸出流FSDataOutputStream實(shí)例out,對(duì)應(yīng)路徑為jobSubmitDir/job.split,jobSubmitDir為參數(shù)yarn.app.mapreduce.am.staging-dir指定的路徑/作業(yè)所屬用戶user/.staging/作業(yè)ID;
? ? ? ? 2、調(diào)用writeNewSplits()方法,將分片數(shù)據(jù)寫入分片文件,并得到分片元數(shù)據(jù)信息SplitMetaInfo數(shù)組info;
? ? ? ? 3、關(guān)閉輸出流out;
? ? ? ? 4、調(diào)用writeJobSplitMetaInfo()方法,將分片元數(shù)據(jù)信息寫入分片元數(shù)據(jù)文件。
? ? ? ? 我們先來看下createFile()方法,代碼如下:
?
[java]?view plaincopy? ? ? ? 首先,調(diào)用HDFS文件系統(tǒng)FileSystem的create()方法,獲取文件系統(tǒng)數(shù)據(jù)輸出流FSDataOutputStream實(shí)例out,對(duì)應(yīng)權(quán)限為JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;
?
? ? ? ? 其次,獲取副本數(shù)replication,取參數(shù)mapreduce.client.submit.file.replication,參數(shù)未配置默認(rèn)為10;
? ? ? ? 接著,通過文件系統(tǒng)FileSystem實(shí)例fs的setReplication()方法,設(shè)置splitFile的副本數(shù)位10;
? ? ? ? 然后,調(diào)用writeSplitHeader()方法寫入分片頭信息;
? ? ? ? 最后,返回文件系統(tǒng)數(shù)據(jù)輸出流out。
? ? ? ??writeSplitHeader()方法專門用于將分片頭部信息寫入分片文件,代碼如下:
?
[java]?view plaincopy? ? ? ? 很簡(jiǎn)單,首先文件系統(tǒng)數(shù)據(jù)輸出流out寫入byte[],內(nèi)容為UTF-8格式的"SPL",然后文件系統(tǒng)數(shù)據(jù)輸出流out寫入int,分片版本號(hào),目前為1。
?
? ? ? ? 接下來,我們?cè)倏聪聎riteNewSplits()方法,它將分片數(shù)據(jù)寫入分片文件,并得到分片元數(shù)據(jù)信息SplitMetaInfo數(shù)組info,代碼如下:
?
[java]?view plaincopy? ? ? ??writeNewSplits()方法的邏輯比較清晰,大體如下:
?
? ? ? ? 1、根據(jù)array的大小,構(gòu)造同等大小的分片元數(shù)據(jù)信息SplitMetaInfo數(shù)組info,array其實(shí)是傳入的分片數(shù)組;
? ? ? ? 2、如果array中有數(shù)據(jù):
? ? ? ? ? ? ? 2.1、創(chuàng)建序列化工廠SerializationFactory實(shí)例factory;
? ? ? ? ? ? ? 2.2、獲取最大的數(shù)據(jù)塊位置maxBlockLocations,取參數(shù)mapreduce.job.max.split.locations,參數(shù)未配置默認(rèn)為10;
? ? ? ? ? ? ? 2.3、通過輸出流out的getPos()方法獲取輸出流out的當(dāng)前位置offset;
? ? ? ? ? ? ? 2.4、遍歷數(shù)組array中每個(gè)元素split:
? ? ? ? ? ? ? ? ? ? ? ?2.4.1、通過輸出流out的getPos()方法獲取輸出流out的當(dāng)前位置prevCount;
? ? ? ? ? ? ? ? ? ? ? ?2.4.2、往輸出流out中寫入String,內(nèi)容為split對(duì)應(yīng)的類名;
? ? ? ? ? ? ? ? ? ? ? ?2.4.3、獲取序列化器Serializer實(shí)例serializer;
? ? ? ? ? ? ? ? ? ? ? ?2.4.4、打開serializer,接入輸出流out;
? ? ? ? ? ? ? ? ? ? ? ?2.4.5、將split序列化到輸出流out;
? ? ? ? ? ? ? ? ? ? ? ?2.4.6、通過輸出流out的getPos()方法獲取輸出流out的當(dāng)前位置currCount;
? ? ? ? ? ? ? ? ? ? ? ?2.4.7、通過split的getLocations()方法,獲取位置信息locations;
? ? ? ? ? ? ? ? ? ? ? ?2.4.8、確保位置信息locations的長度不能超過maxBlockLocations,超過則截?cái)?#xff1b;
? ? ? ? ? ? ? ? ? ? ? ?2.4.9、構(gòu)造split對(duì)應(yīng)的元數(shù)據(jù)信息,并加入info指定位置,offset為當(dāng)前split在split文件中的起始位置,數(shù)據(jù)長度為split.getLength(),位置信息為locations;
? ? ? ? ? ? ? ? ? ? ? ?2.4.10、offset增加當(dāng)前split已寫入數(shù)據(jù)大小;
? ? ? ? 3、返回分片元數(shù)據(jù)信息SplitMetaInfo數(shù)組info。
? ? ? ? 其中,序列化split對(duì)象時(shí),我們以FileSplit為例來分析,其write()方法如下:
?
[java]?view plaincopy? ? ? ? 比較簡(jiǎn)單,分別寫入文件路徑全名、分片在文件中的起始位置、分片在文件中的長度三個(gè)信息。
?
? ? ? ? 綜上所述,分片文件job.split文件的內(nèi)容為:
? ? ? ? 1、文件頭:"SPL"+int類型版本號(hào)1;
? ? ? ? 2、分片類信息:String類型split對(duì)應(yīng)類名;
? ? ? ? 3、分片數(shù)據(jù)信息:String類型文件路徑全名+Long類型分片在文件中的起始位置+Long類型分片在文件中的長度。
? ? ? ? 而在最后,構(gòu)造分片元數(shù)據(jù)信息時(shí),產(chǎn)生的是JobSplit的靜態(tài)內(nèi)部類SplitMetaInfo對(duì)象,包括分片位置信息locations、split在split文件中的起始位置offset、分片長度split.getLength()。
? ? ? ? 下面,我們?cè)倏聪路制脑獢?shù)據(jù)信息文件是如何產(chǎn)生的,讓我們來研究下writeJobSplitMetaInfo()方法,代碼如下:
?
[java]?view plaincopy? ? ? ??writeJobSplitMetaInfo()方法的主體邏輯也十分清晰,大體如下:
?
? ? ? ? 1、調(diào)用HDFS文件系統(tǒng)FileSystem的create()方法,生成分片元數(shù)據(jù)信息文件,并獲取文件系統(tǒng)數(shù)據(jù)輸出流FSDataOutputStream實(shí)例out,對(duì)應(yīng)文件路徑為jobSubmitDir/job.splitmetainfo,jobSubmitDir為參數(shù)yarn.app.mapreduce.am.staging-dir指定的路徑/作業(yè)所屬用戶user/.staging/作業(yè)ID,對(duì)應(yīng)權(quán)限為JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;
? ? ? ? 2、寫入分片元數(shù)據(jù)頭部信息UTF-8格式的字符串"META-SPL"的字節(jié)數(shù)組byte[];
? ? ? ? 3、寫入分片元數(shù)據(jù)版本號(hào)splitMetaInfoVersion,當(dāng)前為1;
? ? ? ? 4、寫入分片元數(shù)據(jù)個(gè)數(shù),為分片元數(shù)據(jù)信息SplitMetaInfo數(shù)組個(gè)數(shù)allSplitMetaInfo.length;
? ? ? ? 5、遍歷分片元數(shù)據(jù)信息SplitMetaInfo數(shù)組allSplitMetaInfo中每個(gè)splitMetaInfo,挨個(gè)寫入輸出流;
? ? ? ? 6、關(guān)閉輸出流out。
? ? ? ? 我們看下如何序列化JobSplit.SplitMetaInfo,將其寫入文件,JobSplit.SplitMetaInfo的write()如下:
?
[java]?view plaincopy ? ? ? ? 每個(gè)分片的元數(shù)據(jù)信息,包括分片位置個(gè)數(shù)、分片文件位置、分片元數(shù)據(jù)信息的起始位置、分片大小等內(nèi)容。
?
? ? ? ? 總結(jié)
? ? ? ??JobSplitWriter被作業(yè)客戶端用于寫分片相關(guān)文件,包括分片數(shù)據(jù)文件job.split和分片元數(shù)據(jù)信息文件job.splitmetainfo。分片數(shù)據(jù)文件job.split存儲(chǔ)的主要是每個(gè)分片對(duì)應(yīng)的HDFS文件路徑,和其在HDFS文件中的起始位置、長度等信息,而分片元數(shù)據(jù)信息文件job.splitmetainfo存儲(chǔ)的則是每個(gè)分片在分片數(shù)據(jù)文件job.split中的起始位置、分片大小等信息。
? ? ? ??job.split文件內(nèi)容:文件頭 + 分片 + 分片 + ... + 分片
? ? ? ? 文件頭:"SPL" + 版本號(hào)1
? ? ? ? 分片:分片類 + 分片數(shù)據(jù),分片類=String類型split對(duì)應(yīng)類名,分片數(shù)據(jù)=String類型HDFS文件路徑全名+Long類型分片在HDFS文件中的起始位置+Long類型分片在HDFS文件中的長度
? ? ? ??job.splitmetainfo文件內(nèi)容:文件頭 + 分片元數(shù)據(jù)個(gè)數(shù) +?分片元數(shù)據(jù) +?分片元數(shù)據(jù) + ... +?分片元數(shù)據(jù)
? ? ? ??文件頭:"META-SPL" + 版本號(hào)1
? ? ? ??分片元數(shù)據(jù)個(gè)數(shù):分片元數(shù)據(jù)的個(gè)數(shù)
? ? ? ? 分片元數(shù)據(jù):分片位置個(gè)數(shù)+分片位置+在分片文件job.split中的起始位置+分片大小
轉(zhuǎn)載于:https://www.cnblogs.com/jirimutu01/p/5556356.html
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的MapReduce源码分析之JobSplitWriter的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: android training 笔记
- 下一篇: To-do List