深入理解Spark 2.1 Core (九):迭代计算和Shuffle的原理与源码分析
Task有兩個子類,一個是非最后的Stage的Task,ShuffleMapTask;一個是最后的Stage的Task,ResultTask。它們都覆蓋了Task的runTask方法。
我們來看一下ShuffleMapTask的runTask方法中的部分代碼:
var writer: ShuffleWriter[Any, Any] = nulltry {//獲取 shuffleManagerval manager = SparkEnv.get.shuffleManager// 獲取 writerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)// 調用writer.write 開始計算RDD,// 這部分 我們會在后續博文講解writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])// 停止計算,并返回結果writer.stop(success = true).get}- 1
這篇博文,我們就來深入這部分源碼。
RDD迭代
調用棧如下:
- rdd.iterator
- rdd.computeOrReadCheckpoint
- rdd.MapPartitionsRDD.compute
- ……
- rdd.HadoopRDD.compute
- ……
- rdd.MapPartitionsRDD.compute
- rdd.computeOrReadCheckpoint
rdd.RDD.iterator
我們先來看writer.write傳入的參數:
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]partition是該任務所在的分區,context為該任務的上下文。
rdd.iterator的方法如下:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {// 此部分關于存儲,會在后續講解if (storageLevel != StorageLevel.NONE) {getOrCompute(split, context)} else {computeOrReadCheckpoint(split, context)}}- 2
rdd.RDD.computeOrReadCheckpoint
我們來看下上述的computeOrReadCheckpoint方法:
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] ={// 若Checkpointed 獲取結果if (isCheckpointedAndMaterialized) {firstParent[T].iterator(split, context)} else {// 否則計算compute(split, context)}}- 1
rdd.MapPartitionsRDD.compute
這里對compute實現的RDD是MapPartitionsRDD:
override def compute(split: Partition, context: TaskContext): Iterator[U] =f(context, split.index, firstParent[T].iterator(split, context))我們可以看到,這里還是會調用firstParent[T].iterator,這樣父RDD繼續調用MapPartitionsRDD.compute,這樣一層層的向上調用,直到最初的RDD。
rdd.HadoopRDD.compute
若是從HDFS讀取生成的最初的RDD,則經過層層調用,會調用到HadoopRDD.compute。下面我們來看下該方法:
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {// iter 是NextIterator匿名類的一個對象val iter = new NextIterator[(K, V)] {//****************** 以下為NextIterator匿名類內容 *****************private val split = theSplit.asInstanceOf[HadoopPartition]logInfo("Input split: " + split.inputSplit)// hadoop的配置private val jobConf = getJobConf()// 用于計算字節讀取private val inputMetrics = context.taskMetrics().inputMetrics// 之前寫入的值private val existingBytesRead = inputMetrics.bytesRead// 設置 文件名的 線程本地值 split.inputSplit.value match {case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString)case _ => InputFileNameHolder.unsetInputFileName()}// 用于返回該線程從文件讀取的字節數// 需要在RecordReader創建前創建// 因為RecordReader的構造函數可能需要讀取一些字節private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {case _: FileSplit | _: CombineFileSplit =>SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()case _ => None}//對于 Hadoop 2.5以上的版本,我們從線程本地HDFS統計中得到輸入的字節數。// 如果我做一個合并操作的話,// 我們需要在同一個任務且同一個線程理計算多個分區。// 在這種情況下,我們需要去避免覆蓋之前分區中已經被寫入的值private def updateBytesRead(): Unit = {getBytesReadCallback.foreach { getBytesRead =>inputMetrics.setBytesRead(existingBytesRead + getBytesRead())}}private var reader: RecordReader[K, V] = null// 即 TextinputFormatprivate val inputFormat = getInputFormat(jobConf)// 添加hadoop相關任務配置HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),context.stageId, theSplit.index, context.attemptNumber, jobConf)// 創建RecordReaderreader =try {inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)} catch {case e: IOException if ignoreCorruptFiles =>logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)finished = truenull}// 注冊任務完成回調來關閉輸入流context.addTaskCompletionListener{ context => closeIfNeeded() }// key:LongWritableprivate val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()// v:Textprivate val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()// 對reader.next的代理override def getNext(): (K, V) = {try {finished = !reader.next(key, value)} catch {case e: IOException if ignoreCorruptFiles =>logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)finished = true}if (!finished) {inputMetrics.incRecordsRead(1)}if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {// 更新inputMetrics的BytesReadupdateBytesRead()}(key, value)}// 關閉override def close() {if (reader != null) {InputFileNameHolder.unsetInputFileName()try {reader.close()} catch {case e: Exception =>if (!ShutdownHookManager.inShutdown()) {logWarning("Exception in RecordReader.close()", e)}} finally {reader = null}if (getBytesReadCallback.isDefined) {updateBytesRead()} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||split.inputSplit.value.isInstanceOf[CombineFileSplit]) {try {inputMetrics.incBytesRead(split.inputSplit.value.getLength)} catch {case e: java.io.IOException =>logWarning("Unable to get input size to set InputMetrics for task", e)}}}}}new InterruptibleIterator[(K, V)](context, iter)}- 1
InterruptibleIterator傳入參數iter,可以看成是NextIterator類的代理:
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])extends Iterator[T] {def hasNext: Boolean = {if (context.isInterrupted) {throw new TaskKilledException} else {delegate.hasNext}}def next(): T = delegate.next() }- 1
- 2
迭代返回
當rdd.HadoopRDD.compute運算完畢后,生成的初始的RDD計算結果。退回到rdd.HadoopRDD.compute便可以調用函數f:
f(context, split.index, firstParent[T].iterator(split, context))f計算出第二個的RDD計算結果,以此類推,一層層的返回。最終回到writer.write:
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])- 1
ShuffleWriter是一個抽象類,它有子類SortShuffleWriter。SortShuffleWriter.write:
override def write(records: Iterator[Product2[K, V]]): Unit = {// 創建ExecutorSorter,// 用于Shuffle Map Task 輸出結果排序sorter = if (dep.mapSideCombine) {// 當計算結果需要combine,// 則外部排序進行聚合require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// 否則,外部排序不進行聚合new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}// 根據排序方式,對數據進行排序并寫入內存緩沖區。// 若排序中計算結果超出的閾值,// 則將其溢寫到磁盤數據文件sorter.insertAll(records)// 通過shuffle編號和map編號來獲取該數據文件val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)val tmp = Utils.tempFileWith(output)try {// 通過shuffle編號和map編號來獲取 ShuffleBlock 編號val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)// 在外部排序中,// 有部分結果可能在內存中// 另外部分結果在一個或多個文件中// 需要將它們merge成一個大文件val partitionLengths = sorter.writePartitionedFile(blockId, tmp)// 創建索引文件// 將每個partition在數據文件中的起始與結束位置寫入到索引文件shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)// 將元數據寫入mapStatus// 后續任務通過該mapStatus得到處理結果信息mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}}}- 1
Shuffle原理概要
MapReduce Shuffle原理 與 Spark Shuffle 原理
在這里我們重新講解下早起Spark Shuffle的過程:
-
map任務會給每個reduce任務分配一個bucket。假設有M個map任務,每個map任務有N個reduce任務,則map階段一共會創建M×R個bucket。
-
map 任務會將產生的中間結果按照partitione寫入到不同的bucket中
- reduce任務從本地或者遠端的map任務所在的BlockManager獲取相應的bucket作為輸入
MapReduce Shuffle 與 Spark Shuffle缺陷
MapReduce Shuffle缺陷
- map任務產生的結果排序后會寫入磁盤,reduce獲取map任務產生的結果會在磁盤上merge sort,產生很多磁盤I/O
- 當數量很小,但是map和reduce任務很多時,會產生很多網絡I/O
Spark Shuffle缺陷
- map任務產生的結果先寫入內存,當一個節點輸出的結果集很大是,容易內存緊張
- map任務數量與reduce數量大了,bucket數量容易變得非常大,這就帶來了兩個問題:
- 每打開一個文件(bucket為一個文件)都會暫用一定內存,容易內存緊張
- 若bucket本身很小,而對于系統來說遍歷多個文件是隨機讀取,那么磁盤I/O性能會變得非常差
Spark shuffle 的優化
- 把相同的partition的bucket放在一個文件中
- 使用緩存及聚合算法對map任務的輸出結果進行聚合
- 使用緩存及聚合算法對reduce從map拉取的輸出結果進行聚合
- 緩存超出閾值時,將數據寫入磁盤
- reduce任務將同一BlockManager地址的Block累計,減少網絡請求
總結
以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (九):迭代计算和Shuffle的原理与源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Spark 2.1 Core (
- 下一篇: 深入理解Spark 2.1 Core (