spark shuffle的写操作之准备工作
前言
在前三篇文章中,spark 源碼分析之十九 -- DAG的生成和Stage的劃分?剖析了DAG的構(gòu)建和Stage的劃分,spark 源碼分析之二十 -- Stage的提交?剖析了TaskSet任務(wù)的提交,以及spark 源碼分析之二十一 -- Task的執(zhí)行細(xì)節(jié)剖析了Task執(zhí)行的整個流程。在第三篇文章中側(cè)重剖析了Task的整個執(zhí)行的流程是如何的,對于Task本身是如何執(zhí)行的?ResultTask?和?ShuffleMapTask兩部分并沒有做過多詳細(xì)的剖析。本篇文章我們針對Task執(zhí)行的細(xì)節(jié)展開,包括Task、ResultTask、ShuffleMapTask的深入剖析以及Spark底層的shuffle的實(shí)現(xiàn)機(jī)制等等。
Spark的任務(wù)劃分為ResultTask和ShuffleMapTask兩種任務(wù)。
其中ResultTask相對來說比較簡單,只是讀取上一個Stage的執(zhí)行結(jié)果或者是從數(shù)據(jù)源讀取任務(wù),最終將結(jié)果返回給driver。
ShuffleMapTask相對復(fù)雜一些,中間涉及了shuffle過程。
緊接上篇
我們再來看一下,ResultTask和ShuffleMapTask的runTask方法。現(xiàn)在只關(guān)注數(shù)據(jù)處理邏輯,下面的兩張圖都做了標(biāo)注。
ResultTask
類名:org.apache.spark.scheduler.ResultTask
其runTask方法如下:
ShuffleMapTask
類名:org.apache.spark.scheduler.ShuffleMapTask
其runTask方法如下:
兩種Task執(zhí)行的相同和差異
相同點(diǎn)
差異點(diǎn)
總結(jié)關(guān)注點(diǎn)
由兩種Task執(zhí)行的相同和差異點(diǎn)可以總結(jié)出,要想對這兩種類型的任務(wù)執(zhí)行有非常深刻的理解,必須搞明白shuffle 數(shù)據(jù)的讀寫。這也是spark 計(jì)算的核心的關(guān)注點(diǎn) -- Shuffle的寫操作、Shuffle的讀操作。
shuffle數(shù)據(jù)分類
shuffle過程中寫入Spark存儲系統(tǒng)的數(shù)據(jù)分為兩種,一種是shuffle數(shù)據(jù),一種是shuffle索引數(shù)據(jù),如下:
shuffle數(shù)據(jù)的管理類--IndexShuffleBlockResolver
下面說一下?IndexShuffleBlockResolver 類。這個類負(fù)責(zé)shuffle數(shù)據(jù)的獲取和刪除,以及shuffle索引數(shù)據(jù)的更新和刪除。
IndexShuffleBlockResolver繼承關(guān)系如下:
我們先來看父類ShuffleBlockResolver。
ShuffleBlockResolver
主要是負(fù)責(zé)根據(jù)邏輯的shuffle的標(biāo)識(比如mapId、reduceId或shuffleId)來獲取shuffle的block。shuffle數(shù)據(jù)一般都被File或FileSegment包裝。
其接口定義如下:
其中,getBlockData根據(jù)shuffleId獲取shuffle數(shù)據(jù)。
下面來看?IndexShuffleBlockResolver的實(shí)現(xiàn)。
IndexShuffleBlockResolver
這個類負(fù)責(zé)shuffle數(shù)據(jù)的獲取和刪除,以及shuffle索引數(shù)據(jù)的更新和刪除。
類結(jié)構(gòu)如下:
blockManager是executor上的BlockManager類。
transportCpnf主要是包含了關(guān)于shuffle的一些參數(shù)配置。
NOOP_REDUCE_ID是0,因?yàn)榇藭r還不知道reduce的id。
核心方法如下:
1. 獲取shuffle數(shù)據(jù)文件,源碼如下,思路:根據(jù)blockManager的DiskBlockManager獲取shuffle的blockId對應(yīng)的物理文件。
2. 獲取shuffle索引文件,源碼如下,思路:根據(jù)blockManager的DiskBlockManager獲取shuffle索引的blockId對應(yīng)的物理文件。
3.根據(jù)mapId將shuffle數(shù)據(jù)移除,源碼如下,思路:根據(jù)shuffleId和mapId刪除shuffle數(shù)據(jù)和索引文件
4.校驗(yàn)shuffle索引和數(shù)據(jù),源碼如下。
從上面可以看出,文件里第一個long型數(shù)是占位符,必為0.
后面的保存的數(shù)據(jù)是每一個block的大小,可以看出來,每次讀的long型數(shù),是前面所有block的大小總和。
所以,當(dāng)前block的大小=這次讀取到的offset - 上次讀取到的offset
這種索引的設(shè)計(jì)非常巧妙。每一個block大小合起來就是整個文件的大小。每一個block的在整個文件中的offset也都記錄在索引文件中。
?
5. 寫索引文件,源碼如下。
思路:首先先獲取shuffle的數(shù)據(jù)文件并創(chuàng)建索引的臨時文件。
獲取索引文件的每一個block 的大小。如果索引存在,則更新新的索引數(shù)組,刪除臨時數(shù)據(jù)文件,返回。
若索引不存在,將新的數(shù)據(jù)的索引數(shù)據(jù)寫入臨時索引文件,最終刪除歷史數(shù)據(jù)文件和歷史索引文件,然后臨時數(shù)據(jù)文件和臨時數(shù)據(jù)索引文件重命名為新的數(shù)據(jù)和索引文件。
這樣的設(shè)計(jì),確保了數(shù)據(jù)索引隨著數(shù)據(jù)的更新而更新。
?
6. 根據(jù)shuffleId獲取block數(shù)據(jù),源碼如下。
?
思路:
先獲取shuffle數(shù)據(jù)的索引數(shù)據(jù),然后調(diào)用position位上,獲取block 的大小,然后初始化FileSegmentManagedBuffer,讀取文件的對應(yīng)segment的數(shù)據(jù)。
可以看出?reduceId就是block物理文件中的小的block(segment)的索引。
7. 停止blockResolver,空實(shí)現(xiàn)。
總結(jié),在這個類中,可以學(xué)習(xí)到spark shuffle索引的設(shè)計(jì)思路,在工作中需要設(shè)計(jì)File和FileSegment的索引文件,這也是一種參考思路。
Shuffle的寫數(shù)據(jù)前的準(zhǔn)備工作
直接來看?org.apache.spark.scheduler.ShuffleMapTask 的runTask的關(guān)鍵代碼如下:
這里的manager是SortShuffleManager,是ShuffleManager的唯一實(shí)現(xiàn)。
org.apache.spark.shuffle.sort.SortShuffleManager#getWriter 源碼如下:
其中,numMapsForShuffle 定義如下:
它保存了shuffleID和mapper數(shù)量的映射關(guān)系。
獲取ShuffleHandle
首先,先來了解一下ShuffleHandle類。
ShuffleHandle
下面大致了解一下ShuffleHandle的相關(guān)內(nèi)容。
類說明:
這個類是Spark內(nèi)部使用的一個類,包含了關(guān)于Shuffle的一些信息,主要給ShuffleManage 使用。本質(zhì)上來說,它是一個標(biāo)志位,除了包含一些用于shuffle的一些屬性之外,沒有其他額外的方法,用case class來實(shí)現(xiàn)更好一點(diǎn)。
類源碼如下:
繼承關(guān)系如下:
BaseShuffleHandle
全稱:org.apache.spark.shuffle.BaseShuffleHandle
類說明:
它是ShuffleHandle的基礎(chǔ)實(shí)現(xiàn)。
類源碼如下:
下面來看一下它的兩個子類實(shí)現(xiàn)。
BypassMergeSortShuffleHandle
全稱:org.apache.spark.shuffle.sort.BypassMergeSortShuffleHandle
類說明:
如果想用于序列化的shuffle實(shí)現(xiàn),可以使用這個標(biāo)志類。其源碼如下:
?
SerializedShuffleHandle
全稱:org.apache.spark.shuffle.sort.SerializedShuffleHandle
類說明:
used to identify when we've chosen to use the bypass merge sort shuffle path.
類源碼如下:?
獲取ShuffleHandle
在org.apache.spark.ShuffleDependency中有如下定義:
shuffleId是SparkContext生成的唯一全局id。
org.apache.spark.shuffle.sort.SortShuffleManager#registerShuffle 源碼如下:
可以看出,mapper的數(shù)量等于父RDD的分區(qū)的數(shù)量。
下面,看一下使用bypassMergeSort的條件,即org.apache.spark.shuffle.sort.SortShuffleWriter#shouldBypassMergeSort 源碼如下:
思路:首先如果父RDD沒有啟用mapSideCombine并且父RDD的結(jié)果分區(qū)數(shù)量小于bypassMergeSort閥值,則使用?bypassMergeSort。其中bypassMergeSort閥值 默認(rèn)是200,可以通過?spark.shuffle.sort.bypassMergeThreshold 參數(shù)設(shè)定。
使用serializedShuffle的條件,即org.apache.spark.shuffle.sort.SortShuffleManager#canUseSerializedShuffle 源碼如下:
思路:序列化類支持支持序列化對象的遷移,并且不使用mapSideCombine操作以及父RDD的分區(qū)數(shù)不大于?(1 << 24) 即可使用該模式的shuffle。
根據(jù)ShuffleHandle獲取ShuffleWriter
首先先對ShuffleWriter做一下簡單說明。
ShuffleWriter
類說明:它負(fù)責(zé)將map任務(wù)的輸出寫入到shuffle系統(tǒng)。其繼承關(guān)系如下,對應(yīng)著ShuffleHandle的三種shuffle實(shí)現(xiàn)標(biāo)志。
獲取ShuffleWriter
org.apache.spark.shuffle.sort.SortShuffleManager#getWriter源碼如下:
一個mapper對應(yīng)一個writer,一個writer往一個分區(qū)上的寫數(shù)據(jù)。
總結(jié)
本篇文章主要從Task 的差異和相同點(diǎn)出發(fā),引出spark shuffle的重要性,接著對Spark shuffle數(shù)據(jù)的類型以及spark shuffle的管理類做了剖析。最后介紹了三種shuffle類型的標(biāo)志位以及如何確定使用哪種類型的數(shù)據(jù)的。
接下來,正式進(jìn)入mapper寫數(shù)據(jù)部分。spark內(nèi)部有三種實(shí)現(xiàn),每一種寫方式會有一篇文章專門剖析,我們逐一來看其實(shí)現(xiàn)機(jī)制。
轉(zhuǎn)載于:https://www.cnblogs.com/johnny666888/p/11265502.html
總結(jié)
以上是生活随笔為你收集整理的spark shuffle的写操作之准备工作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [算法]最小差值
- 下一篇: Quartz.Net - Lesson