spark 笔记 16: BlockManager
生活随笔
收集整理的這篇文章主要介紹了
spark 笔记 16: BlockManager
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
spark 筆記 16: BlockManager 先看一下原理性的文章:http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/?,http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/? , 另外,spark的存儲使用了Segment File的概念(http://en.wikipedia.org/wiki/Segmented_file_transfer?),概括的說,它是把文件劃分成多個段,分別存儲在不同的服務器上;在讀取的時候,同時從這些服務器上讀取。(這也是BT的基礎)。之前分析shuffle的調用關系的時候,其實已經包含了很多的BlockManager的流程,但還是有必要系統的看一遍它的代碼。getLocalFromDisk這個函數,是前面看shuffleManager的終點,但卻是BlockManager的起點。即使是到遠端獲取block的操作,也是發送一個消息到遠端服務器上執行getLocalFromDisk,然后再把結果發送回來。
->diskStore.getValues(blockId, serializer)
============================BlockManager============================-> BlockManager::getLocalFromDisk ->diskStore.getValues(blockId, serializer) ->getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) ->val segment = diskManager.getBlockLocation(blockId) --DiskBlockManager的方法,獲取block在一個文件中的一個塊位置 ->if ?blockId.isShuffle and env.shuffleManager.isInstanceOf[SortShuffleManager] --如果是hash類型shuffle, ->sortShuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId], this) --For sort-based shuffle, let it figure out its blocks ->else if blockId.isShuffle and shuffleBlockManager.consolidateShuffleFiles --聯合文件模式 ->shuffleBlockManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) --For hash-based shuffle with consolidated files ->val shuffleState = shuffleStates(id.shuffleId) -- ->for (fileGroup <- shuffleState.allFileGroups) ->val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) --次函數單獨分析 ->if (segment.isDefined) { return segment.get } ->else ->val file = getFile(blockId.name)--getFile(filename: String): File ->val hash = Utils.nonNegativeHash(filename) ->val dirId = hash % localDirs.length ->val subDirId = (hash / localDirs.length) % subDirsPerLocalDir ->var subDir = subDirs(dirId)(subDirId) ->new File(subDir, filename) ->new FileSegment(file, 0, file.length()) ->val channel = new RandomAccessFile(segment.file, "r").getChannel ->if (segment.length < minMemoryMapBytes) ->channel.position(segment.offset) ->channel.read(buf) ->return buf ->else ->return Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))
ShuffleFileGroup:如何通過mapId和reduceId在ShuffleBlockManager 中獲取數據:getFileSegmentFor函數
->根據reduceId從ShuffleFileGroup的屬性val files: Array[File]里面找到reduce的文件句柄fd? ? ->根據mapId從mapIdToIndex找到index,? ?? ???->根據reduce找到blockOffset向量和blockLen向量,? ??? ??? ??->再通過index從向量里面找到offset和len,? ??? ??? ?? ???->最后通過offset和len從fd里面讀取到需要的數據
從遠本地取數據->BlockManager::doGetLocal ->val info = blockInfo.get(blockId).orNull ->val level = info.level ->if (level.useMemory) --Look for the block in memory ->val result = if (asBlockResult) ->memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size)) ->esle ->memoryStore.getBytes(blockId) ->if (level.useOffHeap) -- Look for the block in Tachyon ->tachyonStore.getBytes(blockId) ->if (level.useDisk) ->val bytes: ByteBuffer = diskStore.getBytes(blockId) ->if (!level.useMemory) // If the block shouldn't be stored in memory, we can just return it ->if (asBlockResult) ->return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk, info.size)) ->else ->return Some(bytes) ->else --memory// Otherwise, we also have to store something in the memory store ->if (!level.deserialized || !asBlockResult) 不序列化或者不block"memory serialized", or if it should be cached as objects in memory ->val copyForMemory = ByteBuffer.allocate(bytes.limit) ->copyForMemory.put(bytes) ->memoryStore.putBytes(blockId, copyForMemory, level) ->if (!asBlockResult) ->return Some(bytes) ->else --需要序列化再寫內存 ->val values = dataDeserialize(blockId, bytes) ->if (level.deserialized) // Cache the values before returning them ->val putResult = memoryStore.putIterator(blockId, values, level, returnValues = true, allowPersistToDisk = false) ->putResult.data match case Left(it) return Some(new BlockResult(it, DataReadMethod.Disk, info.size)) ->else ->return Some(new BlockResult(values, DataReadMethod.Disk, info.size)) ->val values = dataDeserialize(blockId, bytes)從遠端獲取數據->BlockManager::doGetRemote ->val locations = Random.shuffle(master.getLocations(blockId)) --隨機打散 ->for (loc <- locations) --遍歷所有地址 ->val data = BlockManagerWorker.syncGetBlock(GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) ->val blockMessage = BlockMessage.fromGetBlock(msg) ->val newBlockMessage = new BlockMessage() ->newBlockMessage.set(getBlock) ->typ = BlockMessage.TYPE_GET_BLOCK ->id = getBlock.id ->val blockMessageArray = new BlockMessageArray(blockMessage) -> val responseMessage = Try(Await.result(connectionManager.sendMessageReliably(toConnManagerId, blockMessageArray.toBufferMessage), Duration.Inf)) ->responseMessage match {case Success(message) => ?val bufferMessage = message.asInstanceOf[BufferMessage] ->logDebug("Response message received " + bufferMessage) ->BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage =>? ->logDebug("Found " + blockMessage)->return blockMessage.getData ->return Some(data)
===========================end=================================再次引用這個圖:多個map可以對應一個文件,其中每個map對應文件中的某些段。這樣做是為了減少文件數量。(圖片來源:http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/?)獲取block數據返回的數據結構/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.bytesRead = bytes
}
private[spark] class BlockManager(
executorId: String,
actorSystem: ActorSystem,
val master: BlockManagerMaster,
defaultSerializer: Serializer,
maxMemory: Long,
val conf: SparkConf,
securityManager: SecurityManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager)
extends BlockDataProvider with Logging {shuffle狀態,主要包含了unusedFileGroups、allFileGroups兩個屬性,記錄當前已經使用和未使用的ShuffleFileGroup/**
* Contains all the state related to a particular shuffle. This includes a pool of unused
* ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
*/
private class ShuffleState(val numBuckets: Int) {
val nextFileId = new AtomicInteger(0)
val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
/**
* The mapIds of all map tasks completed on this Executor for this shuffle.
* NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
*/
val completedMapTasks = new ConcurrentLinkedQueue[Int]()
}shuffleStates 是一個基于時間戳的hash table?
private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
private val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
Used by sort-based shuffle: shuffle結束時將結果注冊到shuffleStates
/**
* Register a completed map without getting a ShuffleWriterGroup. Used by sort-based shuffle
* because it just writes a single file by itself.
*/
def addCompletedMap(shuffleId: Int, mapId: Int, numBuckets: Int): Unit = {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
val shuffleState = shuffleStates(shuffleId)
shuffleState.completedMapTasks.add(mapId)
}將自己注冊給master?/**
* Initialize the BlockManager. Register to the BlockManagerMaster, and start the
* BlockManagerWorker actor.
*/
private def initialize(): Unit = {
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
}從本地磁盤獲取一個block數據。為了方便使用/**
* A short-circuited method to get blocks directly from disk. This is used for getting
* shuffle blocks. It is safe to do so without a lock on block info since disk store
* never deletes (recent) items.
*/
def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
diskStore.getValues(blockId, serializer).orElse {
throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be")
}
}
ShuffleWriterGroup:每個shuffleMapTask都有一組shuffleWriter,它給每個reducer分配了一個writer。當前只有HashShufflle使用了,唯一一個實例化是在forMapTask返回的,給HashShuffleWriter的shuffle屬性使用:
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
val writers: Array[BlockObjectWriter]
/** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
def releaseWriters(success: Boolean)
}
/**
* Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
* per reducer (this set of files is called a ShuffleFileGroup).
*
* As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
* blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
* per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
* files, it releases them for another task.
* Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
* - shuffleId: The unique id given to the entire shuffle stage.
* - bucketId: The id of the output partition (i.e., reducer id)
* - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
* time owns a particular fileId, and this id is returned to a pool when the task finishes.
* Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
* that specifies where in a given file the actual block data is located.
*
* Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
* ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
* each block stored in each file. In order to find the location of a shuffle block, we search the
* files within a ShuffleFileGroups associated with the block's reducer.
*/
// TODO: Factor this into a separate class for each ShuffleManager implementation
private[spark]
class ShuffleBlockManager(blockManager: BlockManager,
shuffleManager: ShuffleManager) extends Logging {ShuffleFileGroup是一組文件,每個reducer對應一個。每個map將會對應一個這個文件(但多個map可以對應一個文件)。多個map對應一個文件時,它們寫入是分段寫入的(mapId,ReduceId)通過getFileSegmentFor函數獲取到這個塊的內容
privateobject /**
* .
* .
*/
private class val Int, val Int, val private var numBlocksInt 0
/**
* For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
*/
private val mapIdToIndex new Int, Int/**
* Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
* position in the file.
* Note: * .
*/
private val blockOffsetsByReducer fillLongnew Longprivate val blockLengthsByReducer fillLongnew Longdef applyIntdef recordMapOutputInt, Long, LongassertmapIdToIndexnumBlocks
numBlocks 1
for 0 blockOffsetsByReducerblockLengthsByReducer/** Returns the FileSegment associated with the given map task, or None if no entry exists. */
def getFileSegmentForInt, Intval val blockOffsetsByReducerval blockLengthsByReducerval mapIdToIndex, 1if 0val val Somenew , , else
來自為知筆記(Wiz)
posted on 2015-01-27 16:20 過雁 閱讀(...) 評論(...) 編輯 收藏
->diskStore.getValues(blockId, serializer)
============================BlockManager============================-> BlockManager::getLocalFromDisk ->diskStore.getValues(blockId, serializer) ->getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) ->val segment = diskManager.getBlockLocation(blockId) --DiskBlockManager的方法,獲取block在一個文件中的一個塊位置 ->if ?blockId.isShuffle and env.shuffleManager.isInstanceOf[SortShuffleManager] --如果是hash類型shuffle, ->sortShuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId], this) --For sort-based shuffle, let it figure out its blocks ->else if blockId.isShuffle and shuffleBlockManager.consolidateShuffleFiles --聯合文件模式 ->shuffleBlockManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) --For hash-based shuffle with consolidated files ->val shuffleState = shuffleStates(id.shuffleId) -- ->for (fileGroup <- shuffleState.allFileGroups) ->val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) --次函數單獨分析 ->if (segment.isDefined) { return segment.get } ->else ->val file = getFile(blockId.name)--getFile(filename: String): File ->val hash = Utils.nonNegativeHash(filename) ->val dirId = hash % localDirs.length ->val subDirId = (hash / localDirs.length) % subDirsPerLocalDir ->var subDir = subDirs(dirId)(subDirId) ->new File(subDir, filename) ->new FileSegment(file, 0, file.length()) ->val channel = new RandomAccessFile(segment.file, "r").getChannel ->if (segment.length < minMemoryMapBytes) ->channel.position(segment.offset) ->channel.read(buf) ->return buf ->else ->return Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))
ShuffleFileGroup:如何通過mapId和reduceId在ShuffleBlockManager 中獲取數據:getFileSegmentFor函數
->根據reduceId從ShuffleFileGroup的屬性val files: Array[File]里面找到reduce的文件句柄fd? ? ->根據mapId從mapIdToIndex找到index,? ?? ???->根據reduce找到blockOffset向量和blockLen向量,? ??? ??? ??->再通過index從向量里面找到offset和len,? ??? ??? ?? ???->最后通過offset和len從fd里面讀取到需要的數據
從遠本地取數據->BlockManager::doGetLocal ->val info = blockInfo.get(blockId).orNull ->val level = info.level ->if (level.useMemory) --Look for the block in memory ->val result = if (asBlockResult) ->memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size)) ->esle ->memoryStore.getBytes(blockId) ->if (level.useOffHeap) -- Look for the block in Tachyon ->tachyonStore.getBytes(blockId) ->if (level.useDisk) ->val bytes: ByteBuffer = diskStore.getBytes(blockId) ->if (!level.useMemory) // If the block shouldn't be stored in memory, we can just return it ->if (asBlockResult) ->return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk, info.size)) ->else ->return Some(bytes) ->else --memory// Otherwise, we also have to store something in the memory store ->if (!level.deserialized || !asBlockResult) 不序列化或者不block"memory serialized", or if it should be cached as objects in memory ->val copyForMemory = ByteBuffer.allocate(bytes.limit) ->copyForMemory.put(bytes) ->memoryStore.putBytes(blockId, copyForMemory, level) ->if (!asBlockResult) ->return Some(bytes) ->else --需要序列化再寫內存 ->val values = dataDeserialize(blockId, bytes) ->if (level.deserialized) // Cache the values before returning them ->val putResult = memoryStore.putIterator(blockId, values, level, returnValues = true, allowPersistToDisk = false) ->putResult.data match case Left(it) return Some(new BlockResult(it, DataReadMethod.Disk, info.size)) ->else ->return Some(new BlockResult(values, DataReadMethod.Disk, info.size)) ->val values = dataDeserialize(blockId, bytes)從遠端獲取數據->BlockManager::doGetRemote ->val locations = Random.shuffle(master.getLocations(blockId)) --隨機打散 ->for (loc <- locations) --遍歷所有地址 ->val data = BlockManagerWorker.syncGetBlock(GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) ->val blockMessage = BlockMessage.fromGetBlock(msg) ->val newBlockMessage = new BlockMessage() ->newBlockMessage.set(getBlock) ->typ = BlockMessage.TYPE_GET_BLOCK ->id = getBlock.id ->val blockMessageArray = new BlockMessageArray(blockMessage) -> val responseMessage = Try(Await.result(connectionManager.sendMessageReliably(toConnManagerId, blockMessageArray.toBufferMessage), Duration.Inf)) ->responseMessage match {case Success(message) => ?val bufferMessage = message.asInstanceOf[BufferMessage] ->logDebug("Response message received " + bufferMessage) ->BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage =>? ->logDebug("Found " + blockMessage)->return blockMessage.getData ->return Some(data)
===========================end=================================再次引用這個圖:多個map可以對應一個文件,其中每個map對應文件中的某些段。這樣做是為了減少文件數量。(圖片來源:http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/?)獲取block數據返回的數據結構/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.bytesRead = bytes
}
private[spark] class BlockManager(
executorId: String,
actorSystem: ActorSystem,
val master: BlockManagerMaster,
defaultSerializer: Serializer,
maxMemory: Long,
val conf: SparkConf,
securityManager: SecurityManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager)
extends BlockDataProvider with Logging {shuffle狀態,主要包含了unusedFileGroups、allFileGroups兩個屬性,記錄當前已經使用和未使用的ShuffleFileGroup/**
* Contains all the state related to a particular shuffle. This includes a pool of unused
* ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
*/
private class ShuffleState(val numBuckets: Int) {
val nextFileId = new AtomicInteger(0)
val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
/**
* The mapIds of all map tasks completed on this Executor for this shuffle.
* NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
*/
val completedMapTasks = new ConcurrentLinkedQueue[Int]()
}shuffleStates 是一個基于時間戳的hash table?
private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
private val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
Used by sort-based shuffle: shuffle結束時將結果注冊到shuffleStates
/**
* Register a completed map without getting a ShuffleWriterGroup. Used by sort-based shuffle
* because it just writes a single file by itself.
*/
def addCompletedMap(shuffleId: Int, mapId: Int, numBuckets: Int): Unit = {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
val shuffleState = shuffleStates(shuffleId)
shuffleState.completedMapTasks.add(mapId)
}將自己注冊給master?/**
* Initialize the BlockManager. Register to the BlockManagerMaster, and start the
* BlockManagerWorker actor.
*/
private def initialize(): Unit = {
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
}從本地磁盤獲取一個block數據。為了方便使用/**
* A short-circuited method to get blocks directly from disk. This is used for getting
* shuffle blocks. It is safe to do so without a lock on block info since disk store
* never deletes (recent) items.
*/
def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
diskStore.getValues(blockId, serializer).orElse {
throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be")
}
}
ShuffleWriterGroup:每個shuffleMapTask都有一組shuffleWriter,它給每個reducer分配了一個writer。當前只有HashShufflle使用了,唯一一個實例化是在forMapTask返回的,給HashShuffleWriter的shuffle屬性使用:
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
val writers: Array[BlockObjectWriter]
/** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
def releaseWriters(success: Boolean)
}
/**
* Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
* per reducer (this set of files is called a ShuffleFileGroup).
*
* As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
* blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
* per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
* files, it releases them for another task.
* Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
* - shuffleId: The unique id given to the entire shuffle stage.
* - bucketId: The id of the output partition (i.e., reducer id)
* - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
* time owns a particular fileId, and this id is returned to a pool when the task finishes.
* Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
* that specifies where in a given file the actual block data is located.
*
* Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
* ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
* each block stored in each file. In order to find the location of a shuffle block, we search the
* files within a ShuffleFileGroups associated with the block's reducer.
*/
// TODO: Factor this into a separate class for each ShuffleManager implementation
private[spark]
class ShuffleBlockManager(blockManager: BlockManager,
shuffleManager: ShuffleManager) extends Logging {ShuffleFileGroup是一組文件,每個reducer對應一個。每個map將會對應一個這個文件(但多個map可以對應一個文件)。多個map對應一個文件時,它們寫入是分段寫入的(mapId,ReduceId)通過getFileSegmentFor函數獲取到這個塊的內容
privateobject /**
* .
* .
*/
private class val Int, val Int, val private var numBlocksInt 0
/**
* For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
*/
private val mapIdToIndex new Int, Int/**
* Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
* position in the file.
* Note: * .
*/
private val blockOffsetsByReducer fillLongnew Longprivate val blockLengthsByReducer fillLongnew Longdef applyIntdef recordMapOutputInt, Long, LongassertmapIdToIndexnumBlocks
numBlocks 1
for 0 blockOffsetsByReducerblockLengthsByReducer/** Returns the FileSegment associated with the given map task, or None if no entry exists. */
def getFileSegmentForInt, Intval val blockOffsetsByReducerval blockLengthsByReducerval mapIdToIndex, 1if 0val val Somenew , , else
來自為知筆記(Wiz)
posted on 2015-01-27 16:20 過雁 閱讀(...) 評論(...) 編輯 收藏
轉載于:https://www.cnblogs.com/zwCHAN/p/4253287.html
總結
以上是生活随笔為你收集整理的spark 笔记 16: BlockManager的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 史诗级升级!iPhone支持Siri操作
- 下一篇: linux rar安装