Spark Shuffle Write阶段磁盘文件分析
流程分析
入口處:
org.apache.spark.scheduler.ShuffleMapTask.runTask
這里manager 拿到的是
先看private[spark] trait ShuffleManager? 是一個接口,
SortShuffleManager實現了該接口。private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging
我們看他是如何拿到可以寫磁盤的那個sorter的。
override def getWriter[K, V](handle: ShuffleHandle,mapId: Int,context: TaskContext): ShuffleWriter[K, V] = {numMapsForShuffle.putIfAbsent(handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)val env = SparkEnv.get handle match {case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>new UnsafeShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],context.taskMemoryManager(),unsafeShuffleHandle,mapId,context,env.conf)case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>new BypassMergeSortShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],bypassMergeSortHandle,mapId,context,env.conf)case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)} } 這里case了2種情況:/** * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the * serialized shuffle. 是否序列化 */ private[spark] class SerializedShuffleHandle[K, V](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, V])extends BaseShuffleHandle(shuffleId, numMaps, dependency) { }/** * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the * bypass merge sort shuffle path. 繞過歸并排序的shuffle路徑。 */ private[spark] class BypassMergeSortShuffleHandle[K, V](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, V])extends BaseShuffleHandle(shuffleId, numMaps, dependency) { }這里再看看BaseShuffleHandle /** * A basic ShuffleHandle implementation that just captures registerShuffle's parameters. */ private[spark] class BaseShuffleHandle[K, V, C](shuffleId: Int,val numMaps: Int,val dependency: ShuffleDependency[K, V, C])extends ShuffleHandle(shuffleId)繼續看abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {}是一個抽象類,實現了序列化。
繼續new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
類的定義:private[spark] class SortShuffleWriter[K, V, C](shuffleBlockResolver: IndexShuffleBlockResolver,handle: BaseShuffleHandle[K, V, C],mapId: Int,context: TaskContext)extends ShuffleWriter[K, V] with Logging然后write操作
/** Write a bunch of records to this task's output */ 一串 bunch override def write(records: Iterator[Product2[K, V]]): Unit = {sorter = if (dep.mapSideCombine) {require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}sorter.insertAll(records)// Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)val tmp = Utils.tempFileWith(output)try {val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)val partitionLengths = sorter.writePartitionedFile(blockId, tmp)shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}} } 我們分析的線路假設需要做mapSideCombine sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C](dep.aggregator, Some(dep.partitioner), dep.keyOrdering, de.serializer)
接著將map的輸出放到sorter當中:
sorter.insertAll(records) //備注一下sorter位置 //private var sorter: ExternalSorter[K, V, _] = null //import org.apache.spark.util.collection.ExternalSorterdef insertAll(records: Iterator[Product2[K, V]]): Unit = {// TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefinedif (shouldCombine) {// Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValueval createCombiner = aggregator.get.createCombinervar kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => {if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)}while (records.hasNext) {addElementsRead()kv = records.next()map.changeValue((getPartition(kv._1), kv._1), update)maybeSpillCollection(usingMap = true)}} else {// Stick values into our buffer while (records.hasNext) {addElementsRead()val kv = records.next()buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])maybeSpillCollection(usingMap = false)}} }
其中insertAll 的流程是這樣的:
while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update)maybeSpillCollection(usingMap = true)} private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024 // Size of object batches when reading/writing from serializers. // // Objects are written in batches, with each batch using its own serialization stream. This // cuts down on the size of reference-tracking maps constructed when deserializing a stream. // // NOTE: Setting this too low can cause excessive copying when serializing, since some serializers // grow internal data structures by growing + copying every time the number of objects doubles. private val serializerBatchSize = conf.getLong("spark.shuffle.spill.batchSize", 10000)// Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we // store them in an array buffer. @volatile private var map = new PartitionedAppendOnlyMap[K, C] @volatile private var buffer = new PartitionedPairBuffer[K, C]里面的map 其實就是PartitionedAppendOnlyMap,這個是全內存的一個結構。當把這個寫滿了,才會觸發spill操作。你可以看到maybeSpillCollection在PartitionedAppendOnlyMap每次更新后都會被調用。
一旦發生呢個spill后,產生的文件名稱是:
"temp_shuffle_" + id邏輯在這:
val (blockId, file) = diskBlockManager.createTempShuffleBlock() def createTempShuffleBlock(): (TempShuffleBlockId, File) = { var blockId = new TempShuffleBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } (blockId, getFile(blockId))}產生的所有 spill文件被被記錄在一個數組里:
private val spills = new ArrayBuffer[SpilledFile]迭代完一個task對應的partition數據后,會做merge操作,把磁盤上的spill文件和內存的,迭代處理,得到一個新的iterator,這個iterator的元素會是這個樣子的:
(p, mergeWithAggregation( iterators, aggregator.get.mergeCombiners, keyComparator,ordering.isDefined))其中p 是reduce 對應的partitionId, p對應的所有數據都會在其對應的iterator中。
接著會獲得最后的輸出文件名:
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)文件名格式會是這樣的:
"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"其中reduceId 是一個固定值NOOP_REDUCE_ID,默認為0。
然后開始真實寫入文件
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)寫入文件的過程過程是這樣的:
for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { val writer = blockManager.getDiskWriter(blockId,outputFile, serInstance,fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { writer.write(elem._1, elem._2) } writer.commitAndClose() val segment = writer.fileSegment() lengths(id) = segment.length } }剛剛我們說了,這個 this.partitionedIterator 其實內部元素是reduce partitionID -> 實際record 的 iterator,所以它其實是順序寫每個分區的記錄,寫完形成一個fileSegment,并且記錄偏移量。這樣后續每個的reduce就可以根據偏移量拿到自己需要的數據。對應的文件名,前面也提到了,是:
"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".data"剛剛我們說偏移量,其實是存在內存里的,所以接著要持久化,通過下面的writeIndexFile來完成:
shuffleBlockResolver.writeIndexFile(dep.shuffleId,mapId, partitionLengths)具體的文件名是:
"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".index"至此,一個task的寫入操作完成,對應一個文件。
最終結論
所以最后的結論是,一個Executor 最終對應的文件數應該是:
MapNum (注:不包含index文件)同時持有并且會進行寫入的文件數最多為::
CoreNum創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
總結
以上是生活随笔為你收集整理的Spark Shuffle Write阶段磁盘文件分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Scala入门到精通——第三十节 Sca
- 下一篇: 人工神经网络之BP神经网络模型