Spark常用RDD算子 - saveAsTextFile、saveAsObjectFile 可保存到本地文件或hdfs系统中
saveAsTextFile
函數(shù)原型
def saveAsTextFile(path: String): Unit def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): UnitsaveAsTextFile用于將RDD以文本文件的格式存儲到文件系統(tǒng)中。
從源碼中可以看到,saveAsTextFile函數(shù)是依賴于saveAsHadoopFile函數(shù),由于saveAsHadoopFile函數(shù)接受PairRDD,所以在saveAsTextFile函數(shù)中利用rddToPairRDDFunctions函數(shù)轉(zhuǎn)化為(NullWritable,Text)類型的RDD,然后通過saveAsHadoopFile函數(shù)實現(xiàn)相應(yīng)的寫操作。
源碼分析
def saveAsTextFile(path: String): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 // // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]` // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an // Ordering for `NullWritable`. That's why the compiler will generate different anonymous // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. // // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate // same bytecodes for `saveAsTextFile`. val nullWritableClassTag = implicitly[ClassTag[NullWritable]] val textClassTag = implicitly[ClassTag[Text]] val r = this.mapPartitions { iter => val text = new Text() iter.map { x => text.set(x.toString) (NullWritable.get(), text) } } RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)}/** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. */def saveAsHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf hadoopConf.setOutputKeyClass(keyClass) hadoopConf.setOutputValueClass(valueClass) // Doesn't work in Scala 2.9 due to what may be a generics bug // TODO: Should we uncomment this for Scala 2.10? // conf.setOutputFormat(outputFormatClass) hadoopConf.set("mapred.output.format.class", outputFormatClass.getName) for (c <- codec) { hadoopConf.setCompressMapOutput(true) hadoopConf.set("mapred.output.compress", "true") hadoopConf.setMapOutputCompressorClass(c) hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } // Use configured output committer if already set if (conf.getOutputCommitter == null) { hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) } FileOutputFormat.setOutputPath(hadoopConf, SparkHadoopWriter.createPathFromString(path, hadoopConf)) saveAsHadoopDataset(hadoopConf)}/** * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for * that storage system. The JobConf should set an OutputFormat and any output paths required * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val wrappedConf = new SerializableConfiguration(hadoopConf) val outputFormatInstance = hadoopConf.getOutputFormat val keyClass = hadoopConf.getOutputKeyClass val valueClass = hadoopConf.getOutputValueClass if (outputFormatInstance == null) { throw new SparkException("Output format class not set") } if (keyClass == null) { throw new SparkException("Output key class not set") } if (valueClass == null) { throw new SparkException("Output value class not set") } SparkHadoopUtil.get.addCredentials(hadoopConf) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") if (isOutputSpecValidationEnabled) { // FileOutputFormat ignores the filesystem parameter val ignoredFs = FileSystem.get(hadoopConf) hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) } val writer = new SparkHadoopWriter(hadoopConf) writer.preSetup() val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { val config = wrappedConf.value // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context) writer.setup(context.stageId, context.partitionId, taskAttemptId) writer.open() var recordsWritten = 0L Utils.tryWithSafeFinally { while (iter.hasNext) { val record = iter.next() writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) // Update bytes written metric every few records maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten) recordsWritten += 1 } } { writer.close() } writer.commit() bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } outputMetrics.setRecordsWritten(recordsWritten) } self.context.runJob(self, writeToFile) writer.commitJob()}saveAsObjectFile
函數(shù)原型
def saveAsObjectFile(path: String): UnitsaveAsObjectFile用于將RDD中的元素序列化成對象,存儲到文件中。
從源碼中可以看出,saveAsObjectFile函數(shù)是依賴于saveAsSequenceFile函數(shù)實現(xiàn)的,將RDD轉(zhuǎn)化為類型為<NullWritable,BytesWritable>的PairRDD,然后通過saveAsSequenceFile函數(shù)實現(xiàn)。在spark的java版的api中沒有實現(xiàn)saveAsSequenceFile函數(shù),該函數(shù)類似于saveAsTextFile函數(shù)。
源碼分析
def saveAsObjectFile(path: String): Unit = withScope { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) .saveAsSequenceFile(path) }def saveAsSequenceFile( path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { def anyToWritable[U <% Writable](u: U): Writable = u // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and // valueWritableClass at the compile time. To implement that, we need to add type parameters to // SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a // breaking change. val convertKey = self.keyClass != keyWritableClass val convertValue = self.valueClass != valueWritableClass logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," + valueWritableClass.getSimpleName + ")" ) val format = classOf[SequenceFileOutputFormat[Writable, Writable]] val jobConf = new JobConf(self.context.hadoopConfiguration) if (!convertKey && !convertValue) { self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec) } else if (!convertKey && convertValue) { self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile( path, keyWritableClass, valueWritableClass, format, jobConf, codec) } else if (convertKey && !convertValue) { self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile( path, keyWritableClass, valueWritableClass, format, jobConf, codec) } else if (convertKey && convertValue) { self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile( path, keyWritableClass, valueWritableClass, format, jobConf, codec) } }讀和寫代碼示例,也可寫入到hdfs中,只是路徑不一樣而已
寫入:javaRDD.saveAsTextFile,javaRDD.saveAsObjectFile
讀取:javaSparkContext.textFile,javaSparkContext.objectFile
SparkConf sparkConf = new SparkConf().setAppName("spark-test-1").setMaster("local");SparkContext sparkContext = SparkContext.getOrCreate(sparkConf); sparkContext.setLogLevel("ERROR");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);List<String> l1 = new ArrayList<>(); l1.add("dog"); l1.add("cat"); l1.add("gnu"); l1.add("salmon"); l1.add("rabbit"); l1.add("turkey"); l1.add("wolf"); l1.add("bear"); l1.add("bee"); JavaRDD<String> javaRDD = javaSparkContext.parallelize(l1, 3);//寫入本地系統(tǒng) javaRDD.saveAsTextFile("file:///C:/rdd"); javaRDD.saveAsObjectFile("file:///C:/rdd2");//寫入hdfs系統(tǒng) javaRDD.saveAsTextFile("hdfs://kncloud02:8020/user/rdd"); javaRDD.saveAsObjectFile("hdfs://kncloud02:8020/user/rdd2");//從本地系統(tǒng)中讀出來 JavaRDD<String> rdd_1= javaSparkContext.textFile("file:///C:/rdd"); JavaRDD<Object> rdd_2= javaSparkContext.objectFile("file:///C:/rdd2");//從hdfs中讀出來 JavaRDD<String> rdd_3= javaSparkContext.textFile("hdfs://kncloud02:8020/user/rdd"); JavaRDD<Object> rdd_2= javaSparkContext.objectFile("hdfs://kncloud02:8020/user/rdd2");總結(jié)
以上是生活随笔為你收集整理的Spark常用RDD算子 - saveAsTextFile、saveAsObjectFile 可保存到本地文件或hdfs系统中的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark HistoryServer日
- 下一篇: 解决spark on yarn报错:Fi