深入理解Spark 2.1 Core (十):Shuffle Map 端的原理与源码分析
- 1
我們先來宏觀的了解下Map端,我們會根據aggregator.isDefined是否定義了聚合函數和ordering.isDefined是否定義了排序函數分為三種:
- 沒有聚合和排序,數據先按照partition寫入不同的文件中,最后按partition順序合并寫入同一文件 。適合partition數量較少時。將多個bucket合并到同一文件,減少map輸出文件數,節省磁盤I/O,提高性能。
- 沒有聚合但有排序,在緩存對數據先根據分區(或者還有key)進行排序,最后按partition順序合并寫入同一文件。適合當partition數量較多時。將多個bucket合并到同一文件,減少map輸出文件數,節省磁盤I/O,提高性能。緩存使用超過閾值,將數據寫入磁盤。
- 有聚合有排序,現在緩存中根據key值聚合,再在緩存對數據先根據分區(或者還有key)進行排序,最后按partition順序合并寫入同一文件。將多個bucket合并到同一文件,減少map輸出文件數,節省磁盤I/O,提高性能。緩存使用超過閾值,將數據寫入磁盤。逐條的讀取數據,并進行聚合,減少了內存的占用。
我們先來深入看下insertAll:
def insertAll(records: Iterator[Product2[K, V]]): Unit = {// 若定義了聚合函數,則shouldCombine為trueval shouldCombine = aggregator.isDefined// 外部排序是否需要聚合if (shouldCombine) { // mergeValue 是 對 Value 進行 merge的函數val mergeValue = aggregator.get.mergeValue// createCombiner 為生成 Combiner 的 函數val createCombiner = aggregator.get.createCombinervar kv: Product2[K, V] = null// update 為偏函數val update = (hadValue: Boolean, oldValue: C) => {// 當有Value時,將oldValue與新的Value kv._2 進行merge// 若沒有Value,傳入kv._2,生成Valueif (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)}while (records.hasNext) {addElementsRead()kv = records.next()// 首先使用我們的AppendOnlyMap// 在內存中對value進行聚合 map.changeValue((getPartition(kv._1), kv._1), update)// 超過閾值時寫入磁盤maybeSpillCollection(usingMap = true)}} else {// 直接把Value插入緩沖區while (records.hasNext) {addElementsRead()val kv = records.next()buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])maybeSpillCollection(usingMap = false)}}}這里的createCombiner我們可以看做用kv._2生成一個Value。而mergeValue我們可以理解成為MapReduce中的combiner,即可以理解為Map端的Reduce操作,先對相同的key的Value進行聚合。
聚合算法
下面我們來深入看看聚合操作部分:
調用棧:
- util.collection.SizeTrackingAppendOnlyMap.changeValue
- util.collection.AppendOnlyMap.changeValue
- util.collection.AppendOnlyMap.incrementSize
- util.collection.AppendOnlyMap.growTable
- util.collection.AppendOnlyMap.incrementSize
- util.collection.SizeTracker.afterUpdate
- util.collection.SizeTracker.takeSample
- util.collection.AppendOnlyMap.changeValue
首先是AppendOnlyMap的changeValue函數:
util.collection.SizeTrackingAppendOnlyMap.changeValue
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {// 應用聚合算法得到newValueval newValue = super.changeValue(key, updateFunc)// 更新對 AppendOnlyMap 大小的采樣super.afterUpdate()// 返回結果newValue}util.collection.AppendOnlyMap.changeValue
聚合算法:
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {assert(!destroyed, destructionMessage)val k = key.asInstanceOf[AnyRef]if (k.eq(null)) {if (!haveNullValue) {incrementSize()}nullValue = updateFunc(haveNullValue, nullValue)haveNullValue = truereturn nullValue}// 根據k的hashCode在哈希 與 上 掩碼 得到 pos// 2*pos 為 k 應該所在的位置// 2*pos + 1 為 k 對應的 v 所在的位置var pos = rehash(k.hashCode) & maskvar i = 1while (true) {// 得到data中k所在的位置上的值curKeyval curKey = data(2 * pos)if (curKey.eq(null)) {// 若curKey為空// 得到根據 kv._2,即單個新值 生成的 newValueval newValue = updateFunc(false, null.asInstanceOf[V])data(2 * pos) = kdata(2 * pos + 1) = newValue.asInstanceOf[AnyRef]// 擴充容量incrementSize()return newValue} else if (k.eq(curKey) || k.equals(curKey)) {// 若k 與 curKey 相等// 將oldValue(data(2 * pos + 1)) 和 新的Value(kv._2) 進行聚合val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]return newValue} else {// 若curKey 不為null,也和k不想等,// 即 hash 沖突// 則 不斷的向后遍歷 直到出現前兩種情況val delta = ipos = (pos + delta) & maski += 1}}null.asInstanceOf[V] }- 1
util.collection.AppendOnlyMap.incrementSize
我們再來看一下擴充容量的實現:
private def incrementSize() {curSize += 1// 當curSize大于閾值growThreshold時,// 調用growTable()if (curSize > growThreshold) {growTable()}}- 1
util.collection.AppendOnlyMap.growTable
protected def growTable() {生成容量翻倍的newDataval newCapacity = capacity * 2require(newCapacity <= MAXIMUM_CAPACITY, s"Can't contain more than ${growThreshold} elements")val newData = new Array[AnyRef](2 * newCapacity)// 生成newMaskval newMask = newCapacity - 1var oldPos = 0while (oldPos < capacity) {// 將舊的Data 中的數據用newMask重新計算位置,// 復制到新的Data 中 if (!data(2 * oldPos).eq(null)) {val key = data(2 * oldPos)val value = data(2 * oldPos + 1)var newPos = rehash(key.hashCode) & newMaskvar i = 1var keepGoing = truewhile (keepGoing) {val curKey = newData(2 * newPos)if (curKey.eq(null)) {newData(2 * newPos) = keynewData(2 * newPos + 1) = valuekeepGoing = false} else {val delta = inewPos = (newPos + delta) & newMaski += 1}}}oldPos += 1}// 更新data = newDatacapacity = newCapacitymask = newMaskgrowThreshold = (LOAD_FACTOR * newCapacity).toInt}- 1
util.collection.SizeTracker.afterUpdate
我們回過頭來看SizeTrackingAppendOnlyMap.changeValue中的更新對AppendOnlyMap大小的采樣super.afterUpdate()。所謂大小的采樣,是只一次Update后AppendOnlyMap大小的變化量。但是如果在每次如insert``update等操作后就進行計算一次AppendOnlyMap會大大降低性能。所以,這里采用了采樣估計的方法:
protected def afterUpdate(): Unit = {numUpdates += 1// 若numUpdates到達閾值,// 則進行采樣if (nextSampleNum == numUpdates) {takeSample()}}- 1
util.collection.SizeTracker.takeSample
private def takeSample(): Unit = {samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))// 只用兩個采樣if (samples.size > 2) {samples.dequeue()}val bytesDelta = samples.toList.reverse match {// 估計出每次更新的變化量case latest :: previous :: tail =>(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)// 若小于 2個 樣本, 假設沒產生變化case _ => 0}// 更新bytesPerUpdate = math.max(0, bytesDelta)// 增大閾值nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong}- 1
- 10
我們再看來下估計AppendOnlyMap大小的函數:
def estimateSize(): Long = {assert(samples.nonEmpty)// 計算估計的總變化量val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)// 之前的大小 加上 估計的總變化量(samples.last.size + extrapolatedDelta).toLong}寫緩沖區
現在我們回到insertAll,深入看看如何直接把Value插入緩沖區。
調用棧:
- util.collection.PartitionedPairBuffer.insert
- util.collection.PartitionedPairBuffer.growArray
util.collection.PartitionedPairBuffer.insert
def insert(partition: Int, key: K, value: V): Unit = {// 到了容量大小,調用growArray()if (curSize == capacity) {growArray()}data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])data(2 * curSize + 1) = value.asInstanceOf[AnyRef]curSize += 1afterUpdate()}- 1
util.collection.PartitionedPairBuffer.growArray
private def growArray(): Unit = {if (capacity >= MAXIMUM_CAPACITY) {throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")}val newCapacity =if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // OverflowMAXIMUM_CAPACITY} else {capacity * 2}// 生成翻倍容量的newArrayval newArray = new Array[AnyRef](2 * newCapacity)// 復制System.arraycopy(data, 0, newArray, 0, 2 * capacity)data = newArraycapacity = newCapacityresetSamples()}- 8
溢出
現在我們回到insertAll,深入看看如何將超過閾值時寫入磁盤:
調用棧:
- util.collection.ExternalSorter.maybeSpillCollection
- util.collection.Spillable.maybeSpill
- util.collection.Spillable.spill
- util.collection.ExternalSorter.spillMemoryIteratorToDisk
- util.collection.Spillable.spill
- util.collection.Spillable.maybeSpill
util.collection.ExternalSorter.maybeSpillCollection
private def maybeSpillCollection(usingMap: Boolean): Unit = {var estimatedSize = 0Lif (usingMap) {estimatedSize = map.estimateSize()if (maybeSpill(map, estimatedSize)) {map = new PartitionedAppendOnlyMap[K, C]}} else {estimatedSize = buffer.estimateSize()if (maybeSpill(buffer, estimatedSize)) {buffer = new PartitionedPairBuffer[K, C]}}if (estimatedSize > _peakMemoryUsedBytes) {_peakMemoryUsedBytes = estimatedSize}}- 5
util.collection.Spillable.maybeSpill
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {var shouldSpill = falseif (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {// 若大于閾值// amountToRequest 為要申請的內存空間val amountToRequest = 2 * currentMemory - myMemoryThresholdval granted = acquireMemory(amountToRequest)myMemoryThreshold += granted// 若果我們分配了太小的內存,// 由于 tryToAcquire 返回0// 或者 內存申請大小超過了myMemoryThreshold// 導致 依然 currentMemory >= myMemoryThreshold// 則 shouldSpillshouldSpill = currentMemory >= myMemoryThreshold}// 若元素讀取數大于閾值// 則 shouldSpillshouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThresholdif (shouldSpill) {// 跟新 Spill 次數_spillCount += 1logSpillage(currentMemory)// Spill操作spill(collection)// 元素讀取數 清零_elementsRead = 0// 增加Spill的內存計數// 釋放內存_memoryBytesSpilled += currentMemoryreleaseMemory()}shouldSpill}- 1
util.collection.Spillable.spill
將內存中的集合spill到一個有序文件中。之后SortShuffleWriter.write中會調用sorter.writePartitionedFile來merge它們
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {// 生成內存中集合的迭代器,// 這部分我們之后會深入講解val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)// 生成spill文件,// 并將其加入數組val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)spills += spillFile}- 1
util.collection.ExternalSorter.spillMemoryIteratorToDisk
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator): SpilledFile = { // 生成臨時文件 及 blockId val (blockId, file) = diskBlockManager.createTempShuffleBlock()// 這些值在每次flush后會被重置var objectsWritten: Long = 0val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetricsval writer: DiskBlockObjectWriter =blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)// 按寫入磁盤的順序記錄分支的大小val batchSizes = new ArrayBuffer[Long]// 記錄每個分區有多少元素val elementsPerPartition = new Array[Long](numPartitions)// Flush writer 內容到磁盤,// 并更新相關變量def flush(): Unit = {val segment = writer.commitAndGet()batchSizes += segment.length_diskBytesSpilled += segment.lengthobjectsWritten = 0}var success = falsetry {// 遍歷內存集合while (inMemoryIterator.hasNext) {val partitionId = inMemoryIterator.nextPartition()require(partitionId >= 0 && partitionId < numPartitions,s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")inMemoryIterator.writeNext(writer)elementsPerPartition(partitionId) += 1objectsWritten += 1// 當寫入的元素個數 到達 批量序列化尺寸,// flushif (objectsWritten == serializerBatchSize) {flush()}}if (objectsWritten > 0) {// 遍歷結束后還有寫入// flushflush()} else {writer.revertPartialWritesAndClose()}success = true} finally {if (success) {writer.close()} else {writer.revertPartialWritesAndClose()if (file.exists()) {if (!file.delete()) {logWarning(s"Error deleting ${file}")}}}}SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)}- 1
排序
我們再在回到,SortShuffleWriter.write中:
// 在外部排序中,// 有部分結果可能在內存中// 另外部分結果在一個或多個文件中// 需要將它們merge成一個大文件val partitionLengths = sorter.writePartitionedFile(blockId, tmp)調用棧:
- util.collection.writePartitionedFile
- util.collection.ExternalSorter.destructiveSortedWritablePartitionedIterator
- util.collection.ExternalSorter.partitionedIterator
- partitionedDestructiveSortedIterator
util.collection.ExternalSorter.writePartitionedFile
我們先來深入看下writePartitionedFile,將數據加入這個ExternalSorter中,寫入一個磁盤文件:
def writePartitionedFile(blockId: BlockId,outputFile: File): Array[Long] = {// 跟蹤輸出文件的位置val lengths = new Array[Long](numPartitions)val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,context.taskMetrics().shuffleWriteMetrics)if (spills.isEmpty) {// 當只有內存中有數據時val collection = if (aggregator.isDefined) map else bufferval it = collection.destructiveSortedWritablePartitionedIterator(comparator)while (it.hasNext) {val partitionId = it.nextPartition()while (it.hasNext && it.nextPartition() == partitionId) {it.writeNext(writer)}val segment = writer.commitAndGet()lengths(partitionId) = segment.length}} else {// 否則必須進行merge-sort// 得到一個分區迭代器// 并且直接把所有數據寫入for ((id, elements) <- this.partitionedIterator) {if (elements.hasNext) {for (elem <- elements) {writer.write(elem._1, elem._2)}val segment = writer.commitAndGet()lengths(id) = segment.length}}}writer.close()context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)lengths}- 1
- 28
util.collection.ExternalSorter.destructiveSortedWritablePartitionedIterator
在writePartitionedFile使用destructiveSortedWritablePartitionedIterator生成了迭代器:
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)在上篇博文中提到util.collection.Spillable.spill中也使用到了它:
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)- 1
我們來看下destructiveSortedWritablePartitionedIterator:
def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]]): WritablePartitionedIterator = {// 生成迭代器val it = partitionedDestructiveSortedIterator(keyComparator)new WritablePartitionedIterator {private[this] var cur = if (it.hasNext) it.next() else nulldef writeNext(writer: DiskBlockObjectWriter): Unit = {writer.write(cur._1._2, cur._2)cur = if (it.hasNext) it.next() else null}def hasNext(): Boolean = cur != nulldef nextPartition(): Int = cur._1._1}}- 1
可以看到WritablePartitionedIterator相當于partitionedDestructiveSortedIterator所返回的迭代器的代理類。destructiveSortedWritablePartitionedIterator并不返回值,而是將DiskBlockObjectWriter傳入,再進行寫。我們先把partitionedDestructiveSortedIterator放一下,往下看。
util.collection.ExternalSorter.partitionedIterator
和另外一個分支不同,這個分支是調用partitionedIterator得到分區迭代器,并且直接把所有數據寫入。我們來深入看看partitionedIterator:
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {val usingMap = aggregator.isDefinedval collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else bufferif (spills.isEmpty) {// 當沒有spills// 按我們之前的流程 不會 加入這分支if (!ordering.isDefined) {// 若不需要對key排序// 則只對Partition進行排序groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))} else {// 否則需要對partition和key 進行排序groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(Some(keyComparator))))}} else {// 當有spills// 需要 Merge spilled出來的那些臨時文件 和 內存中的 數據merge(spills, destructiveIterator(collection.partitionedDestructiveSortedIterator(comparator)))}}- 1
我們先來看下spills.isEmpty時候,兩種排序方式:
- 只對Partition進行排序:
partitionedDestructiveSortedIterator中傳入的是None,意思是不對key進行排序。對Partition進行排序是默認會在partitionedDestructiveSortedIterator中進行的。我們留在后面講解。
- 1
Partition排序后,根據Partition的聚合:
private def groupByPartition(data: Iterator[((Int, K), C)]): Iterator[(Int, Iterator[Product2[K, C]])] ={val buffered = data.buffered(0 until numPartitions).iterator.map(p => (p, new IteratorForPartition(p, buffered)))}- 1
- 2
IteratorForPartition就是對單個partion的迭代器:
private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)])extends Iterator[Product2[K, C]]{override def hasNext: Boolean = data.hasNext && data.head._1._1 == partitionIdoverride def next(): Product2[K, C] = {if (!hasNext) {throw new NoSuchElementException}val elem = data.next()(elem._1._2, elem._2)}}- 對partition和key進行排序
- 1
partitionedDestructiveSortedIterator中傳入的是keyComparator:
private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] {override def compare(a: K, b: K): Int = {val h1 = if (a == null) 0 else a.hashCode()val h2 = if (b == null) 0 else b.hashCode()if (h1 < h2) -1 else if (h1 == h2) 0 else 1}})先根據key的hashCode進行排序,再調用groupByPartition對partition進行排序。
而對于有spills時,我們使用comparator:
private def comparator: Option[Comparator[K]] = {// 若需要排序 或者 需要 聚合if (ordering.isDefined || aggregator.isDefined) {Some(keyComparator)} else {None}}- 1
partitionedDestructiveSortedIterator
好了接下來我們就來看看partitionedDestructiveSortedIterator。partitionedDestructiveSortedIterator是特質WritablePartitionedPairCollection中的方法。WritablePartitionedPairCollection由PartitionedAppendOnlyMap和PartitionedPairBuffer繼承。在partitionedIterator中可以看到:
val usingMap = aggregator.isDefinedval collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer- 1
若需要聚合,則使用PartitionedAppendOnlyMap,否則使用PartitionedPairBuffer
util.collection.PartitionedPairBuffer.partitionedDestructiveSortedIterator
我們先來看下簡單點的PartitionedPairBuffer.partitionedDestructiveSortedIterator:
override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]): Iterator[((Int, K), V)] = {val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)// 對數據進行排序new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)iterator}- 1
我們可以看到上述:
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)- 1
使用partitionKeyComparator將原來的comparator給替換了。partitionKeyComparator就是partition和key二次排序,如果傳入的keyComparator為None,那就是只對Partition進行排序:
def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {new Comparator[(Int, K)] {override def compare(a: (Int, K), b: (Int, K)): Int = {val partitionDiff = a._1 - b._1if (partitionDiff != 0) {partitionDiff} else {keyComparator.compare(a._2, b._2)}}}- 1
- 2
之后我們使用Sort等對數據進行排序,其中用到了TimSort,在以后博文中,我們會深入講解。
最后返回迭代器iterator,其實就是簡單的按一對一對的去遍歷數據:
private def iterator(): Iterator[((Int, K), V)] = new Iterator[((Int, K), V)] {var pos = 0override def hasNext: Boolean = pos < curSizeoverride def next(): ((Int, K), V) = {if (!hasNext) {throw new NoSuchElementException}val pair = (data(2 * pos).asInstanceOf[(Int, K)], data(2 * pos + 1).asInstanceOf[V])pos += 1pair}} }- 1
- 6
util.collection.PartitionedAppendOnlyMap.partitionedDestructiveSortedIterator
def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]): Iterator[((Int, K), V)] = {val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)destructiveSortedIterator(comparator)}- 1
util.collection.PartitionedAppendOnlyMap.destructiveSortedIterator
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {destroyed = true// 向左整理var keyIndex, newIndex = 0while (keyIndex < capacity) {if (data(2 * keyIndex) != null) {data(2 * newIndex) = data(2 * keyIndex)data(2 * newIndex + 1) = data(2 * keyIndex + 1)newIndex += 1}keyIndex += 1}assert(curSize == newIndex + (if (haveNullValue) 1 else 0))new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)// 返回新的 Iteratornew Iterator[(K, V)] {var i = 0var nullValueReady = haveNullValuedef hasNext: Boolean = (i < newIndex || nullValueReady)def next(): (K, V) = {if (nullValueReady) {nullValueReady = false(null.asInstanceOf[K], nullValue)} else {val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])i += 1item}}}}總結
以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (十):Shuffle Map 端的原理与源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Spark 2.1 Core (
- 下一篇: 深入理解Spark 2.1 Core (