java spark读写hdfs_Spark读取HDFS数据输出到不同的文件
最近有一個需求是這樣的:原來的數據是存儲在MySQL,然后通過Sqoop將MySQL的數據抽取到了HDFS集群上,抽取到HDFS上的數據都是純數據,字段值之間以\t分隔,現在需要將這部分數據還原為json格式的,因為這樣做的原因:一來是更清楚具體字段的含義;二來是后期的數據通過kafka直接消費存儲到HDFS,存的就是json數據,所以為了所有存儲數據格式一致,需要將歷史數據進行轉換。所以只能通過MR或者Spark進行一次數據清洗轉換了。因為需要根據每條數據中的一個時間字段將數據存儲到不同的文件中。比如一條純數據如下:
1 2019-04-26 00:32:09.0 null true 1025890 10004515
那么需要根據第二個字段信息來將數據分別存儲到不同的文件夾,分為4個時段,格式為:
/2019/04/26/00-06.txt,/2019/04/26/06-12.txt,/2019/04/26/12-18.txt,/2019/04/26/18-00.txt,
直接上spark代碼:
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.spark.sql.SparkSession
/**
* spark版本將數據輸出到不同文件
* create date:2019-07-16
* author:ly
*/
object OutputToMultiFileApp {
def main(args: Array[String]): Unit = {
val inputPath = args(0)
val outputPath = args(1)
//val inputPath = "D:\\bigdata_workspace\\gey\\3\\in.txt"
//val outputPath = "D:\\bigdata_workspace\\gey\\3\\out"
val spark = SparkSession.builder().appName("OutputToMultiFileApp").master("local[*]").getOrCreate()
val data =spark.sparkContext.textFile(inputPath).map(item => {
val splits = item.toString.split("\t")
val str = "{\"id\":\"" + splits(4) + "\",\"uid\":\"" + splits(5) + "\",\"createTime\":\"" + splits(1) + "\",\"epochs\":\"1\"}"
//將時間字段作為key,包裝后的json作為value
(splits(1),str)
})
/**按Key保存到不同文件*/
data.saveAsHadoopFile(outputPath,
classOf[String],
classOf[String],
classOf[MyMultipleTextOutputFormat]
)
spark.stop()
}
}
class MyMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
//1)文件名:根據key生成我們自己的路徑
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
//2019-04-26 16:32:09.0
val splits: Array[String] = key.toString.split(" ")
//2019-04-26
val ymd: String = splits(0)
//16:32:09.0
val hms: String = splits(1)
//[2019,04,26]
val arr1: Array[String] = ymd.split("-")
//[16,32,09]
val arr2: Array[String] = hms.split(":")
var temp: String = ""
val h6: Int = 6
val h12: Int = 12
val h18: Int = 18
val h24: Int = 24
val h: Int = arr2(0).toInt
if(h >= 0 && h <=6) temp = "00-06"
if(h > h6 && h <= h12) temp = "06-12"
if(h > h12 && h <= h18) temp = "12-18"
if(h > h18 && h < h24) temp = "18-00"
val paths = arr1(0) + "/" + arr1(1) + "/" + arr1(2) + "/" + temp + ".txt"
paths
}
//2)文件內容:默認同時輸出key和value。這里指定不輸出key。
override def generateActualKey(key: Any, value: Any): String = {
null
}
}
上述代碼直接在IDEA上運行,筆者是在win10上搞了一個比較小的文件測試,測試結果如下:
年份:
result1.png
月份:
result2.png
日期:
result3.png
最終數據:
result4.png
妥妥的成功了。。直接打包放到集群上運行。但是數據量大一些的話,好像會丟失數據,目前還不知道為啥。。。
歡迎大家留言討論
內容將同步到微信公眾號,歡迎關注微信公眾號:LearnBigData
qrcode.jpg
總結
以上是生活随笔為你收集整理的java spark读写hdfs_Spark读取HDFS数据输出到不同的文件的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: java二叉树的深度优先遍历_二叉树的广
 - 下一篇: 去年受理消费者投诉等逾2940万件 挽回