spark读取文件源码分析-2
文章目錄
- 1. job1產生時機源碼分析
- 1. DataSoure.getOrInferFileFormatSchema()
- 2. ParquetFileFormat.inferSchema
- 1. 簡化后代碼
- 2. parquetOptions.mergeSchema 為false
- 3. isParquetSchemaRespectSummaries 默認值為false
- 4. filesByType 信息
- 3. ParquetFileFormat.mergeSchemasInParallel
- 2. job1的調用總結
- 3. spark read 總結
本篇接 上一篇,這一篇主要是針對job1的源碼分析
1. job1產生時機源碼分析
job1是在job0執行完在driver端往下執行的的時候發生的,所以先來復習一下job0的過程。
DataFrameReader.load() DataFrameReader.loadV1Source() DataSoure.resolveRelation() DataSource.getOrInferFileFormatSchema() new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) InMemoryFileIndex.refresh0() InMemoryFileIndex.listLeafFiles() InMemoryFileIndex.bulkListLeafFiles()job1產生的調用是在DataSoure.getOrInferFileFormatSchema()方法中。
再貼一遍這個方法
1. DataSoure.getOrInferFileFormatSchema()
private def getOrInferFileFormatSchema(format: FileFormat,fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {// the operations below are expensive therefore try not to do them if we don't need to, e.g.,// in streaming mode, we have already inferred and registered partition columns, we will// never have to materialize the lazy val below// job0 這里定義的是lazy變量,最終使用的時候才會初始化lazy val tempFileIndex = {val allPaths = caseInsensitiveOptions.get("path") ++ pathsval hadoopConf = sparkSession.sessionState.newHadoopConf()val globbedPaths = allPaths.toSeq.flatMap { path =>val hdfsPath = new Path(path)val fs = hdfsPath.getFileSystem(hadoopConf)val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)}.toArray//job0 這個地方初始化了InMemoryFileIndex 對象,也就是在這里形成了第一個jobnew InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)}val partitionSchema = if (partitionColumns.isEmpty) {// Try to infer partitioning, because no DataSource in the read path provides the partitioning// columns properly unless it is a Hive DataSource// job0 在這里第一次真正使用lazy的tempFileIndex變量,也就促使了InMemoryFileIndex 的初始化。combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex)} else {// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred// partitioningif (userSpecifiedSchema.isEmpty) {val inferredPartitions = tempFileIndex.partitionSchemainferredPartitions} else {val partitionFields = partitionColumns.map { partitionColumn =>userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {val inferredPartitions = tempFileIndex.partitionSchemaval inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))if (inferredOpt.isDefined) {logDebug(s"""Type of partition column: $partitionColumn not found in specified schema|for $format.|User Specified Schema|=====================|${userSpecifiedSchema.orNull}||Falling back to inferred dataType if it exists.""".stripMargin)}inferredOpt}.getOrElse {throw new AnalysisException(s"Failed to resolve the schema for $format for " +s"the partition column: $partitionColumn. It must be specified manually.")}}StructType(partitionFields)}}// 到這里job0已經執行完并且返回回來了val dataSchema = userSpecifiedSchema.map { schema =>StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))}.orElse {// job1 始于這里的調用format.inferSchema(sparkSession,caseInsensitiveOptions,tempFileIndex.allFiles())}.getOrElse {throw new AnalysisException(s"Unable to infer schema for $format. It must be specified manually.")}// We just print a waring message if the data schema and partition schema have the duplicate// columns. This is because we allow users to do so in the previous Spark releases and// we have the existing tests for the cases (e.g., `ParquetHadoopFsRelationSuite`).// See SPARK-18108 and SPARK-21144 for related discussions.try {SchemaUtils.checkColumnNameDuplication((dataSchema ++ partitionSchema).map(_.name),"in the data schema and the partition schema",equality)} catch {case e: AnalysisException => logWarning(e.getMessage)}(dataSchema, partitionSchema)}在這個方法中會先觸發job0,在job0返回后會走下面的步驟,job0主要是文件分析,收集總共有多少個文件,每個文件的block信息等。
job1的主要功能是做data-schema的提取,job1的觸發在這一段,下面的代碼會進入到orElse{}部分
對應的format的類型是ParquetFileFormat,所以就是調用了ParquetFileFormat.inferSchema()
2. ParquetFileFormat.inferSchema
/*** When possible, this method should return the schema of the given `files`. When the format* does not support inference, or no valid files are given should return None. In these cases* Spark will require that user specify the schema manually.*/override def inferSchema(sparkSession: SparkSession,parameters: Map[String, String],files: Seq[FileStatus]): Option[StructType] = {val parquetOptions = new ParquetOptions(parameters, sparkSession.sessionState.conf)// Should we merge schemas from all Parquet part-files?val shouldMergeSchemas = parquetOptions.mergeSchemaval mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummariesval filesByType = splitFiles(files)// Sees which file(s) we need to touch in order to figure out the schema.//// Always tries the summary files first if users don't require a merged schema. In this case,// "_common_metadata" is more preferable than "_metadata" because it doesn't contain row// groups information, and could be much smaller for large Parquet files with lots of row// groups. If no summary file is available, falls back to some random part-file.//// NOTE: Metadata stored in the summary files are merged from all part-files. However, for// user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know// how to merge them correctly if some key is associated with different values in different// part-files. When this happens, Parquet simply gives up generating the summary file. This// implies that if a summary file presents, then://// 1. Either all part-files have exactly the same Spark SQL schema, or// 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus// their schemas may differ from each other).//// Here we tend to be pessimistic and take the second case into account. Basically this means// we can't trust the summary files if users require a merged schema, and must touch all part-// files to do the merge.val filesToTouch =if (shouldMergeSchemas) {// Also includes summary files, 'cause there might be empty partition directories.// If mergeRespectSummaries config is true, we assume that all part-files are the same for// their schema with summary files, so we ignore them when merging schema.// If the config is disabled, which is the default setting, we merge all part-files.// In this mode, we only need to merge schemas contained in all those summary files.// You should enable this configuration only if you are very sure that for the parquet// part-files to read there are corresponding summary files containing correct schema.// As filed in SPARK-11500, the order of files to touch is a matter, which might affect// the ordering of the output columns. There are several things to mention here.//// 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from// the first part-file so that the columns of the lexicographically first file show// first.//// 2. If mergeRespectSummaries config is true, then there should be, at least,// "_metadata"s for all given files, so that we can ensure the columns of// the lexicographically first file show first.//// 3. If shouldMergeSchemas is false, but when multiple files are given, there is// no guarantee of the output order, since there might not be a summary file for the// lexicographically first file, which ends up putting ahead the columns of// the other files. However, this should be okay since not enabling// shouldMergeSchemas means (assumes) all the files have the same schemas.val needMerged: Seq[FileStatus] =if (mergeRespectSummaries) {Seq.empty} else {filesByType.data}needMerged ++ filesByType.metadata ++ filesByType.commonMetadata} else {// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet// don't have this.filesByType.commonMetadata.headOption// Falls back to "_metadata".orElse(filesByType.metadata.headOption)// Summary file(s) not found, the Parquet file is either corrupted, or different part-// files contain conflicting user defined metadata (two or more values are associated// with a same key in different files). In either case, we fall back to any of the// first part-file, and just assume all schemas are consistent..orElse(filesByType.data.headOption).toSeq}ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)}這個方法看著很長,其實注釋很多,主要就是判斷是否需要讀取每個parquet文件的schema來進行schema的判斷,以及具體的判斷方式
把代碼簡化一下。
1. 簡化后代碼
override def inferSchema(sparkSession: SparkSession,parameters: Map[String, String],files: Seq[FileStatus]): Option[StructType] = {val parquetOptions = new ParquetOptions(parameters, sparkSession.sessionState.conf)// 是否要從所有的parquet的part-files中merge schema, 這個值是默認是falseval shouldMergeSchemas = parquetOptions.mergeSchema// merge的話,是否所有的parquet文件的summary信息都是一致的,默認是false,就是不一致,這個變量只有在shouldMergeSchemas 為true的時候才會用上val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummariesval filesByType = splitFiles(files)val filesToTouch =// 這里是falseif (shouldMergeSchemas) {val needMerged: Seq[FileStatus] =if (mergeRespectSummaries) {Seq.empty} else {filesByType.data}needMerged ++ filesByType.metadata ++ filesByType.commonMetadata} else {會走到這里來filesByType.commonMetadata.headOption.orElse(filesByType.metadata.headOption).orElse(filesByType.data.headOption).toSeq}最終調用這里來ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)}2. parquetOptions.mergeSchema 為false
parquetOptions.mergeSchema對應的默認值是false
val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema").doc("When true, the Parquet data source merges schemas collected from all data files, " +"otherwise the schema is picked from the summary file or a random data file " +"if no summary file is available.").booleanConf.createWithDefault(false)3. isParquetSchemaRespectSummaries 默認值為false
sparkSession.sessionState.conf.isParquetSchemaRespectSummaries默認的值是false
val PARQUET_SCHEMA_RESPECT_SUMMARIES = buildConf("spark.sql.parquet.respectSummaryFiles").doc("When true, we make assumption that all part-files of Parquet are consistent with " +"summary files and we will ignore them when merging schema. Otherwise, if this is " +"false, which is the default, we will merge all part-files. This should be considered " +"as expert-only option, and shouldn't be enabled before knowing what it means exactly.").booleanConf.createWithDefault(false)4. filesByType 信息
filesByType 是所有文件的一個封裝,最后生成的filesToTouch 的內容是
LocatedFileStatus{path=hdfs://test.com:9000/user/daily/20200828/part-00000-0e0dc5b5-5061-41ca-9fa6-9fb7b3e09e98-c000.snappy.parquet; isDirectory=false; length=193154555; replication=3; blocksize=134217728; modification_time=1599052749676; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}
也就是filesByType所有文件中的第一個,然后使用這個文件作為參數來調用ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)方法
3. ParquetFileFormat.mergeSchemasInParallel
/*** Figures out a merged Parquet schema with a distributed Spark job.** Note that locality is not taken into consideration here because:** 1. For a single Parquet part-file, in most cases the footer only resides in the last block of* that file. Thus we only need to retrieve the location of the last block. However, Hadoop* `FileSystem` only provides API to retrieve locations of all blocks, which can be* potentially expensive.** 2. This optimization is mainly useful for S3, where file metadata operations can be pretty* slow. And basically locality is not available when using S3 (you can't run computation on* S3 nodes).*/def mergeSchemasInParallel(filesToTouch: Seq[FileStatus],sparkSession: SparkSession): Option[StructType] = {val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsStringval assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestampval serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())// !! HACK ALERT !!//// Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es// to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable`// but only `Writable`. What makes it worse, for some reason, `FileStatus` doesn't play well// with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These// facts virtually prevents us to serialize `FileStatus`es.//// Since Parquet only relies on path and length information of those `FileStatus`es to read// footers, here we just extract them (which can be easily serialized), send them to executor// side, and resemble fake `FileStatus`es there.val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))// Set the number of partitions to prevent following schema reads from generating many tasks// in case of a small number of parquet files.val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),sparkSession.sparkContext.defaultParallelism)val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles// Issues a Spark job to read Parquet schema in parallel.val partiallyMergedSchemas =sparkSession.sparkContext.parallelize(partialFileStatusInfo, numParallelism).mapPartitions { iterator =>// Resembles fake `FileStatus`es with serialized path and length information.val fakeFileStatuses = iterator.map { case (path, length) =>new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))}.toSeq// Reads footers in multi-threaded manner within each taskval footers =ParquetFileFormat.readParquetFootersInParallel(serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`val converter = new ParquetToSparkSchemaConverter(assumeBinaryIsString = assumeBinaryIsString,assumeInt96IsTimestamp = assumeInt96IsTimestamp)if (footers.isEmpty) {Iterator.empty} else {var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter)footers.tail.foreach { footer =>val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter)try {mergedSchema = mergedSchema.merge(schema)} catch { case cause: SparkException =>throw new SparkException(s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause)}}Iterator.single(mergedSchema)}}.collect()if (partiallyMergedSchemas.isEmpty) {None} else {var finalSchema = partiallyMergedSchemas.headpartiallyMergedSchemas.tail.foreach { schema =>try {finalSchema = finalSchema.merge(schema)} catch { case cause: SparkException =>throw new SparkException(s"Failed merging schema:\n${schema.treeString}", cause)}}Some(finalSchema)}}ParquetFileFormat.mergeSchemasInParallel有兩個參數,一個是sparksession,另一個是filesToTouch,存儲的是一個文件列表信息,上面也提到默認情況下實際上傳進來的只有一個文件。
這個方法的作用就是創建一個job1,根據傳進來的文件來解析出來schema信息,對應的job1的內容有:
job1的并行度numParallelism
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),sparkSession.sparkContext.defaultParallelism)partialFileStatusInfo.size是傳進來的參數,這里是1
sparkSession.sparkContext.defaultParallelism返回的是1,實際上這個在不同的環境下值是不同的,但是肯定不會小于1
所以 numParallelism 的值就是1,也就是job1的partition數量是1
2. job1的調用總結
方法的調用鏈是
DataFrameReader.load() DataFrameReader.loadV1Source() DataSoure.resolveRelation() DataSource.getOrInferFileFormatSchema() ParquetFileFormat.inferSchema() ParquetFileFormat.mergeSchemasInParallel()3. spark read 總結
再回顧一下我們的代碼。
public class UserProfileTest {static String filePath = "hdfs:///user/daily/20200828/*.parquet";public static void main(String[] args) {SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("user_profile_test").set(ConfigurationOptions.ES_NODES, "").set(ConfigurationOptions.ES_PORT, "").set(ConfigurationOptions.ES_MAPPING_ID, "uid");//主要想要考察一下這個地方為什么會產生更多的jobSparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();Dataset<Row> userProfileSource = sparkSession.read().parquet(filePath);userProfileSource.count();userProfileSource.write().parquet("hdfs:///user/daily/result2020082808/");} }當filePath命中的path數量大于32的話便會產生一個單獨的job來進行文件的遞歸查找,找到所有復合條件的file信息(包括block size等信息,為后面的schema識別和rdd操作的partition做準備)
在找到所有的文件之后對應的讀取文件如果是parquet的話會創建一個新的job來解析對應的schema。
基于上面的解析,假如我們對上面代碼中的filepath進行修改,假如設置filePath
static String filePath = "hdfs://bj3-dev-search-01.tencn:9000/user/daily/20200828/part-00057-0e0dc5b5-5061-41ca-9fa6-9fb7b3e09e98-c000.snappy.parquet";那么程序運行的時候就沒有job0了。
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的spark读取文件源码分析-2的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark读取文件源码分析-1
- 下一篇: spark读取文件源码分析-3