spark读取文件源码分析-1
文章目錄
- 1. 問題背景
- 2. 測試代碼
- 3. 生成的DAG圖
- 1. job0
- 2. job1
- 4. job0 產生的時機源碼分析
- 1. 調用DataFrameReader.load,DataFrameReader.loadV1Source
- 2. 調用DataSoure.resolveRelation方法
- 3. 調用DataSource.getOrInferFileFormatSchema()
- 4. InMemoryFileIndex 初始化
- 5. 調用InMemoryFileIndex.bulkListLeafFiles 方法
- 1. path.size判斷是否生成job
- 2. list-files 的job0
- 1. 設置job-description
- 2. 接下來開始創建執行job
- 5. 調用鏈總結
1. 問題背景
??在測試spark任務的時候,發現讀取目錄下的多個文件,和直接讀取一個文件,spark的DAG中對應的job個數不一樣,讀取目錄下的多個文件比單個文件多一個job,下面從源碼的角度做一個簡單的分析,本篇文章比較長,所以分為兩篇,第一篇介紹job0的源碼分析過程,第二篇介紹job1的源碼分析過程。
2. 測試代碼
public class UserProfileTest {static String filePath = "hdfs:///user/daily/20200828/*.parquet";public static void main(String[] args) {SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("user_profile_test").set(ConfigurationOptions.ES_NODES, "").set(ConfigurationOptions.ES_PORT, "").set(ConfigurationOptions.ES_MAPPING_ID, "uid");//主要想要考察一下這個地方為什么會產生更多的jobSparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();Dataset<Row> userProfileSource = sparkSession.read().parquet(filePath);userProfileSource.count();userProfileSource.write().parquet("hdfs:///user/daily/result2020082808/");} }3. 生成的DAG圖
我們這里可以看到
Dataset<Row> userProfileSource = sparkSession.read().parquet(filePath);這一句產生了兩個job,我們這里也只關注這兩個job
截取上面的有效部分放大
1. job0
job0的Description是
Listing leaf files and directories for 100 paths: hdfs://hadoop-01:9000/user/daily/20200828/part-00000-0e0dc5b5-5061-41ca-9fa6-9fb7b3e09e98-c000.snappy.parquet, ... parquet at UserProfileTest.java:26job1的partition數量是100
2. job1
job1的Description是
parquet at UserProfileTest.java:26 parquet at UserProfileTest.java:26想知道這兩個job產生的時機,為什么會有這個區別。
4. job0 產生的時機源碼分析
1. 調用DataFrameReader.load,DataFrameReader.loadV1Source
sparkSession.read().parquet(filePath)會走到 DataFrameReader.load方法,執行條件判斷的時候會走到最后一個else 執行 loadV1Source
/*** Loads input in as a `DataFrame`, for data sources that support multiple paths.* Only works if the source is a HadoopFsRelationProvider.** @since 1.6.0*/@scala.annotation.varargsdef load(paths: String*): DataFrame = {if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {throw new AnalysisException("Hive data source can only be used with tables, you can not " +"read files of Hive data source directly.")}val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)if (classOf[DataSourceV2].isAssignableFrom(cls)) {val ds = cls.newInstance()val options = new DataSourceOptions((extraOptions ++DataSourceV2Utils.extractSessionConfigs(ds = ds.asInstanceOf[DataSourceV2],conf = sparkSession.sessionState.conf)).asJava)// Streaming also uses the data source V2 API. So it may be that the data source implements// v2, but has no v2 implementation for batch reads. In that case, we fall back to loading// the dataframe as a v1 source.val reader = (ds, userSpecifiedSchema) match {case (ds: ReadSupportWithSchema, Some(schema)) =>ds.createReader(schema, options)case (ds: ReadSupport, None) =>ds.createReader(options)case (ds: ReadSupportWithSchema, None) =>throw new AnalysisException(s"A schema needs to be specified when using $ds.")case (ds: ReadSupport, Some(schema)) =>val reader = ds.createReader(options)if (reader.readSchema() != schema) {throw new AnalysisException(s"$ds does not allow user-specified schemas.")}readercase _ => null // fall back to v1}if (reader == null) {loadV1Source(paths: _*)} else {Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))}} else {// 會走到這里來loadV1Source(paths: _*)}}調用這個方法private def loadV1Source(paths: String*) = {// Code path for data source v1.sparkSession.baseRelationToDataFrame(DataSource.apply(sparkSession,paths = paths,userSpecifiedSchema = userSpecifiedSchema,className = source,options = extraOptions.toMap).resolveRelation())}在loadV1Source中new了一個DataSource對象,這里的apply方法是因為DataSource是case類,所以產生了伴生對象,在其中定義了apply和unapply方法,參考這里進一步了解apply
然后調用了DataSoure對象的resolveRelation()方法。
2. 調用DataSoure.resolveRelation方法
/*** Create a resolved [[BaseRelation]] that can be used to read data from or write data into this* [[DataSource]]** @param checkFilesExist Whether to confirm that the files exist when generating the* non-streaming file based datasource. StructuredStreaming jobs already* list file existence, and when generating incremental jobs, the batch* is considered as a non-streaming file based data source. Since we know* that files already exist, we don't need to check them again.*/def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {val relation = (providingClass.newInstance(), userSpecifiedSchema) match {// TODO: Throw when too much is given.case (dataSource: SchemaRelationProvider, Some(schema)) =>dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)case (dataSource: RelationProvider, None) =>dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)case (_: SchemaRelationProvider, None) =>throw new AnalysisException(s"A schema needs to be specified when using $className.")case (dataSource: RelationProvider, Some(schema)) =>val baseRelation =dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)if (baseRelation.schema != schema) {throw new AnalysisException(s"$className does not allow user-specified schemas.")}baseRelation// We are reading from the results of a streaming query. Load files from the metadata log// instead of listing them using HDFS APIs.case (format: FileFormat, _)if FileStreamSink.hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths,sparkSession.sessionState.newHadoopConf()) =>val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)val tempFileCatalog = new MetadataLogFileIndex(sparkSession, basePath, None)val fileCatalog = if (userSpecifiedSchema.nonEmpty) {val partitionSchema = combineInferredAndUserSpecifiedPartitionSchema(tempFileCatalog)new MetadataLogFileIndex(sparkSession, basePath, Option(partitionSchema))} else {tempFileCatalog}val dataSchema = userSpecifiedSchema.orElse {format.inferSchema(sparkSession,caseInsensitiveOptions,fileCatalog.allFiles())}.getOrElse {throw new AnalysisException(s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " +"It must be specified manually")}HadoopFsRelation(fileCatalog,partitionSchema = fileCatalog.partitionSchema,dataSchema = dataSchema,bucketSpec = None,format,caseInsensitiveOptions)(sparkSession)// This is a non-streaming file based datasource.// 最后會命中這個casecase (format: FileFormat, _) =>val allPaths = caseInsensitiveOptions.get("path") ++ pathsval hadoopConf = sparkSession.sessionState.newHadoopConf()val globbedPaths = allPaths.flatMap(DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArrayval fileStatusCache = FileStatusCache.getOrCreate(sparkSession)// 這里會發生調用關系val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytesnew CatalogFileIndex(sparkSession,catalogTable.get,catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))} else {new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache)}HadoopFsRelation(fileCatalog,partitionSchema = partitionSchema,dataSchema = dataSchema.asNullable,bucketSpec = bucketSpec,format,caseInsensitiveOptions)(sparkSession)case _ =>throw new AnalysisException(s"$className is not a valid Spark SQL Data Source.")}relation match {case hs: HadoopFsRelation =>SchemaUtils.checkColumnNameDuplication(hs.dataSchema.map(_.name),"in the data schema",equality)SchemaUtils.checkColumnNameDuplication(hs.partitionSchema.map(_.name),"in the partition schema",equality)case _ =>SchemaUtils.checkColumnNameDuplication(relation.schema.map(_.name),"in the data schema",equality)}relation}在上面方法的
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)調用了 getOrInferFileFormatSchema方法
3. 調用DataSource.getOrInferFileFormatSchema()
private def getOrInferFileFormatSchema(format: FileFormat,fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {// the operations below are expensive therefore try not to do them if we don't need to, e.g.,// in streaming mode, we have already inferred and registered partition columns, we will// never have to materialize the lazy val below// 這里定義的是lazy變量,最終使用的時候才會初始化lazy val tempFileIndex = {val allPaths = caseInsensitiveOptions.get("path") ++ pathsval hadoopConf = sparkSession.sessionState.newHadoopConf()val globbedPaths = allPaths.toSeq.flatMap { path =>val hdfsPath = new Path(path)val fs = hdfsPath.getFileSystem(hadoopConf)val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)}.toArray// 這個地方初始化了InMemoryFileIndex 對象,也就是在這里形成了第一個jobnew InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)}val partitionSchema = if (partitionColumns.isEmpty) {// Try to infer partitioning, because no DataSource in the read path provides the partitioning// columns properly unless it is a Hive DataSource// 在這里第一次真正使用lazy的tempFileIndex變量,也就促使了InMemoryFileIndex 的初始化。combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex)} else {// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred// partitioningif (userSpecifiedSchema.isEmpty) {val inferredPartitions = tempFileIndex.partitionSchemainferredPartitions} else {val partitionFields = partitionColumns.map { partitionColumn =>userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {val inferredPartitions = tempFileIndex.partitionSchemaval inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))if (inferredOpt.isDefined) {logDebug(s"""Type of partition column: $partitionColumn not found in specified schema|for $format.|User Specified Schema|=====================|${userSpecifiedSchema.orNull}||Falling back to inferred dataType if it exists.""".stripMargin)}inferredOpt}.getOrElse {throw new AnalysisException(s"Failed to resolve the schema for $format for " +s"the partition column: $partitionColumn. It must be specified manually.")}}StructType(partitionFields)}}val dataSchema = userSpecifiedSchema.map { schema =>StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))}.orElse {format.inferSchema(sparkSession,caseInsensitiveOptions,tempFileIndex.allFiles())}.getOrElse {throw new AnalysisException(s"Unable to infer schema for $format. It must be specified manually.")}// We just print a waring message if the data schema and partition schema have the duplicate// columns. This is because we allow users to do so in the previous Spark releases and// we have the existing tests for the cases (e.g., `ParquetHadoopFsRelationSuite`).// See SPARK-18108 and SPARK-21144 for related discussions.try {SchemaUtils.checkColumnNameDuplication((dataSchema ++ partitionSchema).map(_.name),"in the data schema and the partition schema",equality)} catch {case e: AnalysisException => logWarning(e.getMessage)}(dataSchema, partitionSchema)}在這里會調用到
new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)4. InMemoryFileIndex 初始化
接著來看看InMemoryFileIndex 類
class InMemoryFileIndex(sparkSession: SparkSession,rootPathsSpecified: Seq[Path],parameters: Map[String, String],partitionSchema: Option[StructType],fileStatusCache: FileStatusCache = NoopCache)extends PartitioningAwareFileIndex(sparkSession, parameters, partitionSchema, fileStatusCache) {// Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir)// or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain// such streaming metadata dir or files, e.g. when after globbing "basePath/*" where "basePath"// is the output of a streaming query.override val rootPaths =rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory(_, hadoopConf))@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _@volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _@volatile private var cachedPartitionSpec: PartitionSpec = _//該類在初始化的時候回執行 ```refresh0 ```方法refresh0()............該類在初始化的時候回執行 refresh0方法
private def refresh0(): Unit = {// 這里發生了調用val files = listLeafFiles(rootPaths)cachedLeafFiles =new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)cachedPartitionSpec = null}在refresh0方法中又會調用 listLeafFiles(rootPaths)方法。
/*** List leaf files of given paths. This method will submit a Spark job to do parallel* listing whenever there is a path having more files than the parallel partition discovery* discovery threshold.** This is publicly visible for testing.*/def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {val output = mutable.LinkedHashSet[FileStatus]()val pathsToFetch = mutable.ArrayBuffer[Path]()for (path <- paths) {fileStatusCache.getLeafFiles(path) match {case Some(files) =>HiveCatalogMetrics.incrementFileCacheHits(files.length)output ++= filescase None =>pathsToFetch += path}Unit // for some reasons scalac 2.12 needs this; return type doesn't matter}val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))// 這里發生了bulkListLeafFiles 的調用val discovered = InMemoryFileIndex.bulkListLeafFiles(pathsToFetch, hadoopConf, filter, sparkSession)discovered.foreach { case (path, leafFiles) =>HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)fileStatusCache.putLeafFiles(path, leafFiles.toArray)output ++= leafFiles}output} }然后又發生了對InMemoryFileIndex.bulkListLeafFiles方法的調用
5. 調用InMemoryFileIndex.bulkListLeafFiles 方法
/*** Lists a collection of paths recursively. Picks the listing strategy adaptively depending* on the number of paths to list.** This may only be called on the driver.** @return for each input path, the set of discovered files for the path*/private def bulkListLeafFiles(paths: Seq[Path],hadoopConf: Configuration,filter: PathFilter,sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {//在這里如果path下的數量小于32(parallelPartitionDiscoveryThreshold的默認值),就直接返回了,// 如果大于32的話會開一個job單獨來查找有哪些文件,防止萬一path下的文件太多耗時比較長// Short-circuits parallel listing when serial listing is likely to be faster.if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {return paths.map { path =>(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))}}logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")HiveCatalogMetrics.incrementParallelListingJobCount(1)val sparkContext = sparkSession.sparkContextval serializableConfiguration = new SerializableConfiguration(hadoopConf)val serializedPaths = paths.map(_.toString)val parallelPartitionDiscoveryParallelism =sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism// Set the number of parallelism to prevent following file listing from generating many tasks// in case of large #defaultParallelism.val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)val statusMap = try {// 在這里會判斷出 job的description為 Listing leaf files and directories for 100 paths:val description = paths.size match {case 0 =>s"Listing leaf files and directories 0 paths"case 1 =>s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"case s =>s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."}//這里對job Description進行設置sparkContext.setJobDescription(description)sparkContext.parallelize(serializedPaths, numParallelism).mapPartitions { pathStrings =>val hadoopConf = serializableConfiguration.valuepathStrings.map(new Path(_)).toSeq.map { path =>(path, listLeafFiles(path, hadoopConf, filter, None))}.iterator}.map { case (path, statuses) =>val serializableStatuses = statuses.map { status =>// Turn FileStatus into SerializableFileStatus so we can send it back to the driverval blockLocations = status match {case f: LocatedFileStatus =>f.getBlockLocations.map { loc =>SerializableBlockLocation(loc.getNames,loc.getHosts,loc.getOffset,loc.getLength)}case _ =>Array.empty[SerializableBlockLocation]}SerializableFileStatus(status.getPath.toString,status.getLen,status.isDirectory,status.getReplication,status.getBlockSize,status.getModificationTime,status.getAccessTime,blockLocations)}(path.toString, serializableStatuses)// 這里的collect() 為action算子,所以會觸發一個job的形成}.collect()} finally {sparkContext.setJobDescription(previousJobDescription)}// turn SerializableFileStatus back to StatusstatusMap.map { case (path, serializableStatuses) =>val statuses = serializableStatuses.map { f =>val blockLocations = f.blockLocations.map { loc =>new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)}new LocatedFileStatus(new FileStatus(f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,new Path(f.path)),blockLocations)}(new Path(path), statuses)}}下面的代碼都是上面InMemoryFileIndex.bulkListLeafFiles方法的部分節選分析
1. path.size判斷是否生成job
// Short-circuits parallel listing when serial listing is likely to be faster.if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {return paths.map { path =>(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))}}這一段代碼主要是用來判斷傳過來的一級目錄下有多少path,在我們這里對應的就是匹配路徑hdfs:///user/daily/20200828/*.parquet的有多少個path,這個時候spark并不認為匹配的路徑是一個文件,只是當作一個目錄應對,因為spark支持多級目錄的識別,所以,如果目錄比較多的話都放在driver端進行查找的話耗時可能會很長,在path的數量大于32的時候會生成一個job,扔到yarn集群中通過多個executor來進行并行的查找。
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold的值參考下面的代碼
def parallelPartitionDiscoveryThreshold: Int =getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold").doc("The maximum number of paths allowed for listing files at driver side. If the number " +"of detected paths exceeds this value during partition discovery, it tries to list the " +"files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +"LibSVM data sources.").intConf.checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " +"files at driver side must not be negative").createWithDefault(32)這里因為hdfs:///user/daily/20200828/*.parquet有100個文件,所以上面的if并不成立,也就是會走到下面生成job0來查找文件
注意這里如果小于32調用的方法是 listLeafFiles(path, hadoopConf, filter, Some(sparkSession))并不是上面的 listLeafFiles(paths: Seq[Path])
/*** Lists a single filesystem path recursively. If a SparkSession object is specified, this* function may launch Spark jobs to parallelize listing.** If sessionOpt is None, this may be called on executors.** @return all children of path that match the specified filter.*/private def listLeafFiles(path: Path,hadoopConf: Configuration,filter: PathFilter,sessionOpt: Option[SparkSession]): Seq[FileStatus] = {....}這里省略了方法體,從方法簽名上可以看到是Lists a single filesystem path recursively
就是從一個路徑下遞歸的查找文件的意思,也就是說一級路徑數量小于32會在driver端對每個路徑進行遞歸的查找。注意這個方法也是屬于InMemoryFileIndex,但是和上面出現的
不是同一個方法
2. list-files 的job0
因為上面的if代碼不會執行,接著往下走就是對應生成的job0的代碼,因為還是有一些內容的,我們會再拆開了看,當然,在代碼中也有詳細的注釋
val sparkContext = sparkSession.sparkContextval parallelPartitionDiscoveryParallelism =sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism// Set the number of parallelism to prevent following file listing from generating many tasks// in case of large #defaultParallelism.val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)val statusMap = try {// 在這里會判斷出 job的description為 Listing leaf files and directories for 100 paths:val description = paths.size match {case 0 =>s"Listing leaf files and directories 0 paths"case 1 =>s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"case s =>s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."}//這里對job Description進行設置sparkContext.setJobDescription(description)sparkContext.parallelize(serializedPaths, numParallelism).mapPartitions { pathStrings =>val hadoopConf = serializableConfiguration.valuepathStrings.map(new Path(_)).toSeq.map { path =>(path, listLeafFiles(path, hadoopConf, filter, None))}.iterator}.map { case (path, statuses) =>val serializableStatuses = statuses.map { status =>// Turn FileStatus into SerializableFileStatus so we can send it back to the driverval blockLocations = status match {case f: LocatedFileStatus =>f.getBlockLocations.map { loc =>SerializableBlockLocation(loc.getNames,loc.getHosts,loc.getOffset,loc.getLength)}case _ =>Array.empty[SerializableBlockLocation]}SerializableFileStatus(status.getPath.toString,status.getLen,status.isDirectory,status.getReplication,status.getBlockSize,status.getModificationTime,status.getAccessTime,blockLocations)}(path.toString, serializableStatuses)// 這里的collect() 為action算子,所以會觸發一個job的形成}.collect()1. 設置job-description
在bulkListLeafFiles() 中設置job-description為
val description = paths.size match {case 0 =>s"Listing leaf files and directories 0 paths"case 1 =>s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"case s =>s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."}sparkContext.setJobDescription(description)2. 接下來開始創建執行job
val parallelPartitionDiscoveryParallelism = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelismval numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)sparkContext.setJobDescription(description)sparkContext.parallelize(serializedPaths, numParallelism).mapPartitions { pathStrings =>val hadoopConf = serializableConfiguration.valuepathStrings.map(new Path(_)).toSeq.map { path =>(path, listLeafFiles(path, hadoopConf, filter, None))}.iterator}.map { case (path, statuses) =>val serializableStatuses = statuses.map { status =>// Turn FileStatus into SerializableFileStatus so we can send it back to the driverval blockLocations = status match {case f: LocatedFileStatus =>f.getBlockLocations.map { loc =>SerializableBlockLocation(loc.getNames,loc.getHosts,loc.getOffset,loc.getLength)}case _ =>Array.empty[SerializableBlockLocation]}SerializableFileStatus(status.getPath.toString,status.getLen,status.isDirectory,status.getReplication,status.getBlockSize,status.getModificationTime,status.getAccessTime,blockLocations)}(path.toString, serializableStatuses)// 這里的collect() 為action算子,所以會觸發一個job的形成}.collect()這里可以看到,并行度的設置為Math.min(paths.size, parallelPartitionDiscoveryParallelism)
這里的調試發現sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism默認值為10000
所以numParallelism=paths.size 為100(在對應的目錄下有100個paquet文件)
而且這個并行任務的最終方式是遞歸的找到所有文件的block信息,可以通過這段代碼看出來
里面的listLeafFiles(path, hadoopConf, filter, None)的定義是遞歸的從一個路徑下查找所有的文件
5. 調用鏈總結
DataFrameReader.load() DataFrameReader.loadV1Source() DataSoure.resolveRelation() DataSource.getOrInferFileFormatSchema() new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) InMemoryFileIndex.refresh0() InMemoryFileIndex.listLeafFiles() InMemoryFileIndex.bulkListLeafFiles()總結
以上是生活随笔為你收集整理的spark读取文件源码分析-1的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark-jar冲突解决方案
- 下一篇: spark读取文件源码分析-2