Spark SQL读写方法
一、DataFrame:有列名的RDD
首先,我們知道SparkSQL的目的是用sql語(yǔ)句去操作RDD,和Hive類(lèi)似。SparkSQL的核心結(jié)構(gòu)是DataFrame,如果我們知道RDD里面的字段,也知道里面的數(shù)據(jù)類(lèi)型,就好比關(guān)系型數(shù)據(jù)庫(kù)里面的一張表。那么我們就可以寫(xiě)SQL,所以其實(shí)這兒我們是不能用面向?qū)ο蟮乃季S去編程的。我們最好的方式就是把抽象成為一張表,然后去用SQL語(yǔ)句去操作它。
DataFrame的存儲(chǔ)方式:它采用的存儲(chǔ)是類(lèi)似于數(shù)據(jù)庫(kù)的表的形式進(jìn)行存儲(chǔ)的。一個(gè)數(shù)據(jù)表有幾部分組成:1、數(shù)據(jù),這個(gè)數(shù)據(jù)是一行一行進(jìn)行存儲(chǔ)的,一條記錄就是一行,2、數(shù)據(jù)表的數(shù)據(jù)字典,包括表的名稱(chēng),表的字段和字段的類(lèi)型等元數(shù)據(jù)信息。那么DataFrame也是按照行進(jìn)行存儲(chǔ)的,這個(gè)類(lèi)是Row,一行一行的進(jìn)行數(shù)據(jù)存儲(chǔ)。一般情況下處理粒度是行粒度的,不需要對(duì)其行內(nèi)數(shù)據(jù)進(jìn)行操作。
二、SparkSQL的程序入口:
在Spark2.0之前,是有sqlContext和hiveContext的概念的,因?yàn)檫@兩個(gè)概念難以區(qū)分,Spark2.0之后統(tǒng)一稱(chēng)為SparkSession,除此之外SparkSession還封裝了SparkConf和SparkContext。
值得注意的一點(diǎn)是:Hive有很多依賴(lài)包,所以這些依賴(lài)包沒(méi)有包含在默認(rèn)的Spark包里面。如果Hive依賴(lài)的包能在classpath找到,Spark將會(huì)自動(dòng)加載它們。這些Hive依賴(lài)包必須復(fù)制到所有的工作節(jié)點(diǎn)上,因?yàn)樗鼈優(yōu)榱四軌蛟L問(wèn)存儲(chǔ)在Hive的數(shù)據(jù),會(huì)調(diào)用Hive的序列化和反序列化(SerDes)包。Hive的配置文件hive-site.xml、core-site.xml(security配置)和hdfs-site.xml(HDFS配置)是保存在conf目錄下面。
當(dāng)使用Hive時(shí),必須初始化一個(gè)支持Hive的SparkSession,用戶即使沒(méi)有部署一個(gè)Hive的環(huán)境仍然可以使用Hive。當(dāng)沒(méi)有配置hive-site.xml時(shí),Spark會(huì)自動(dòng)在當(dāng)前應(yīng)用目錄創(chuàng)建metastore_db和創(chuàng)建由spark.sql.warehouse.dir配置的目錄,如果沒(méi)有配置,默認(rèn)是當(dāng)前應(yīng)用目錄下的spark-warehouse目錄。
注意:從Spark 2.0.0版本開(kāi)始,hive-site.xml里面的hive.metastore.warehouse.dir屬性已經(jīng)被spark.sql.warehouse.dir替代,用于指定warehouse的默認(rèn)數(shù)據(jù)路徑(必須有寫(xiě)權(quán)限)。
于是SparkSQL在與Hive有交互的情況下,需要指定支持Hive:
val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}")
val spark = SparkSession.builder().config(conf).config("spark.sql.warehouse.dir",
"hdfs://hadoop1:9000/user/hive/warehouse").enableHiveSupport().getOrCreate()
回到正題,程序入口:
1.6版本:
val conf=new SparkConf()
conf.setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
val sc=new SparkContext(conf)
val sqlContext = new SQLContext(sc)
2.0版本:
SparkSQL的程序入口縮減為一句
 val sparkSession=SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master("local").getOrCreate()
兩個(gè)版本一個(gè)獲得sqlContext(或者h(yuǎn)iveContext),一個(gè)獲得sparkSession。
三、算了,還是放在一起寫(xiě)吧。。
case  class Person(var name:String,var age:Int)
object Test {
  def main(args: Array[String]): Unit = {
    //1.6版本入口
    val conf=new SparkConf()
    conf.setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
    val sc=new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
//第一種創(chuàng)建DataFrame的方式:直接讀取列式存儲(chǔ)的格式,可以直接形成DataFrame(后續(xù)怎么操作呢?)
    val df: DataFrame = sqlContext.read.json("")
    //第二種創(chuàng)建DataFrame的方式:因?yàn)閞dd沒(méi)有toDF()方法,需要進(jìn)行隱式轉(zhuǎn)化,通過(guò)map后形成一個(gè)數(shù)組
    import sqlContext.implicits._
    val df: DataFrame = sc.textFile("C:\Users\wangyongxiang\Desktop\plan\person.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
//第二種方法的另一種形態(tài),用sqlContext或者sparkSession的createDataFrame(),其實(shí)和toDF()方法是雷同的
    val rdd: RDD[Person] = sc.textFile("C:\Users\wangyongxiang\Desktop\plan\person.txt")
      .map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
    val df: DataFrame = sqlContext.createDataFrame(rdd)
    //第三種創(chuàng)建DataFrame:生成一個(gè)RowRDD,然后給出構(gòu)造的描述
    val rdd=sc.textFile("C:\Users\wangyongxiang\Desktop\plan\person.txt")
    val rowRDD: RDD[Row] = rdd.map(_.split(",")).map(p=>Row(p(0),p(1).trim.toInt))
    val schame=StructType(
      StructField("name",StringType,true)::
      StructField("age",IntegerType,true)::Nil
    )
    val df: DataFrame = sqlContext.createDataFrame(rowRDD,schame)
 
//后續(xù)代碼,可以創(chuàng)建臨時(shí)視圖作為查詢,與mysql互操作要?jiǎng)?chuàng)建臨時(shí)視圖才能做查詢
//用hiveContext則直接在hive中創(chuàng)建表,然后將數(shù)據(jù)load到hive表中,可以直接進(jìn)行條件查詢,無(wú)需創(chuàng)建臨時(shí)視圖,后面與hive集成會(huì)有說(shuō)明
    df.registerTempTable("person")
    sqlContext.sql("select * from person where age>21").show()
//將處理后的數(shù)據(jù)用jdbc保存到mysql數(shù)據(jù)庫(kù)中成為一張表,注意這里要使用user而不能使用username,因?yàn)橄到y(tǒng)也有一個(gè)username,會(huì)覆蓋你的用戶名
    val properties=new Properties()
    properties.put("user","root")
    properties.put("password","root")
    df.write.mode(SaveMode.Overwrite)jdbc("jdbc:mysql://localhost:3306/test","test",properties)
  }
}
四、load和save操作。
object saveAndLoadTest {
  def main(args: Array[String]): Unit = {
    val conf =new SparkConf().setAppName("").setMaster("local")
    val sc=new SparkContext(conf)
    val sqlContext=new SQLContext(sc)
 
    //read,load:讀取
    sqlContext.read.json("")
//  sqlContext.read.jdbc("url","table",properties)
    sqlContext.read.load("parquet路徑")
    sqlContext.read.format("json").load("路徑")
    val df: DataFrame = sqlContext.read.format("parquet").load("路徑")
 
    //write,save保存
    df.write.parquet("路徑.parquet")
    df.write.json("路徑.json")
//  df.write.jdbc("url","table",properties)
    df.write.format("parquet").save("路徑.parquet")
    df.write.format(("json")).save("路徑.json")
    //保存模式可選擇覆蓋,追加等
    df.write.mode(SaveMode.Overwrite).save("")
  }
}
個(gè)人理解是read和load都是讀取的作用,write和save都是保存的作用,通過(guò)上述的代碼,我們可以完成文件格式轉(zhuǎn)換的工作,將效率低的一些格式轉(zhuǎn)化成parquet這種sparksql原生支持的文件類(lèi)型
總結(jié)
以上是生活随笔為你收集整理的Spark SQL读写方法的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
                            
                        - 上一篇: 【JS】通过JS实现超市小票打印功能——
 - 下一篇: Android动态方式破解apk终极篇(