【原创】大叔问题定位分享(12)Spark保存文本类型文件(text、csv、json等)到hdfs时为什么是压缩格式的...
問題重現
rdd.repartition(1).write.csv(outPath)寫文件之后發現文件是壓縮過的
?
write時首先會獲取hadoopConf,然后從中獲取是否壓縮以及壓縮格式
org.apache.spark.sql.execution.datasources.DataSource
def write(org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {Configuration conf = job.getConfiguration();boolean isCompressed = getCompressOutput(job);String keyValueSeparator = conf.get(SEPERATOR, "\t");CompressionCodec codec = null;String extension = "";if (isCompressed) {Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);extension = codec.getDefaultExtension();}isCompressed取的是mapreduce.output.fileoutputformat.compress,codecClass取的是mapreduce.output.fileoutputformat.compress.codec
?
hadoopConf初始化過程為
org.apache.spark.sql.internal.SessionState
def newHadoopConf(): Configuration = {val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)org.apache.spark.SparkContext
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)def newConfiguration(conf: SparkConf): Configuration = {val hadoopConf = new Configuration()appendS3AndSparkHadoopConfigurations(conf, hadoopConf)hadoopConf}def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = {...conf.getAll.foreach { case (key, value) =>if (key.startsWith("spark.hadoop.")) {hadoopConf.set(key.substring("spark.hadoop.".length), value)}}?
hadoopConf默認會從classpath中加載所有的hadoop相關配置文件,可以通過spark-shell來簡單測試:
scala> val hc = spark.sparkContext.hadoopConfiguration
hc: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml
scala> println(hc.get("mapreduce.output.fileoutputformat.compress"))
true
scala> println(hc.get("mapreduce.output.fileoutputformat.compress.codec"))
org.apache.hadoop.io.compress.DefaultCodec
?
綜上,只需要在創建SparkConf的時候設置spark.hadoop.mapreduce.output.fileoutputformat.compress=false即可不壓縮,
val sparkConf = new SparkConf().set("spark.hadoop.mapreduce.output.fileoutputformat.compress", "false")另外還可以通過option來控制
rdd.repartition(1).write.option("compression", "none").csv(outPath)?
轉載于:https://www.cnblogs.com/barneywill/p/10109568.html
總結
以上是生活随笔為你收集整理的【原创】大叔问题定位分享(12)Spark保存文本类型文件(text、csv、json等)到hdfs时为什么是压缩格式的...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: UVa 12657 - Boxes in
- 下一篇: Linux进程共享通信 -- mmap实