spark sql的简单操作
生活随笔
收集整理的這篇文章主要介紹了
spark sql的简单操作
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
測試數(shù)據(jù) sparkStu.text zhangxs 24 chenxy
wangYr 21 teacher
wangx 26 teacher sparksql {
"name":"zhangxs","age":24,"job":"chengxy",
"name":"li","age":21,"job":"teacher",
"name":"tao","age":14,"job":"student"
} object CreateDataFream {
//創(chuàng)建student對象
case class Student(name:String,age:BigInt,job:String);def main(args: Array[String]){
//初始化sparkSession 這個sparkSession要用val關鍵字修飾
val spark = SparkSession
.builder()
.appName("Spark SQL Example")
.master("spark://服務器ip:7077")
.getOrCreate();
// runDataSetCreate(spark);
// runSarkOnFile(spark);
// applySchema(spark);
//loadParquet(spark);
//jsonFile(spark);
//銷毀sparkSession
spark.stop();
}} //對指定的列進行查詢
private def test1(spark :SparkSession){
//因為要使用變量,$符號,所以導入這個包
import spark.implicits._
//從hdfs上讀取json數(shù)據(jù)文件并創(chuàng)建dataFream
var dataFreamS= spark.read.json("hdfs://服務器ip:8020/tmp/dataTest/sparksql");
//顯示dataFream所有數(shù)據(jù)
dataFreamS.show();
//打印dataFrame結構
dataFreamS.printSchema();
//顯示指定列的數(shù)據(jù)
dataFreamS.select("name").show()
//查詢指定的列,并修改數(shù)據(jù)
dataFreamS.select($"name", $"age"+1).show();
//查詢年齡大于10的人
dataFreamS.select($"age" > 10).show();
//查看每個年齡段的人數(shù)
dataFreamS.groupBy("age").count();
//創(chuàng)建臨時視圖,如果這個視圖已經存在就覆蓋掉
dataFreamS.createOrReplaceTempView("zhangxsView");
}
?
//創(chuàng)建dataFrame并運行 private def runDataSetCreate(spark:SparkSession){ import spark.implicits._ //創(chuàng)建DataSets對象 類型是Student val dataStu = Seq(Student("Andy", 32,"baiLing")).toDS(); //顯示數(shù)據(jù)集信息 dataStu.show(); //創(chuàng)建數(shù)據(jù)的dataSet var dataArr=Seq(1,2,3).toDS(); //顯示數(shù)據(jù)集的信息 dataArr.show(); //對屬性進行簡單操作 print(dataArr.map (_ +1 ).collect()); //dataFrame能夠被轉換成自定義對象類型的dataSet, val dfStu=spark.read.json("hdfs://服務器ip:8020/tmp/dataTest/sparksql").as[Student]; dfStu.show(); //jsonFile支持嵌套表,讀入并注冊成表 spark.read.json("hdfs://服務器ip:8020/tmp/dataTest/sparksql").registerTempTable("student"); //根據(jù)sql查詢注冊的table val temsql=spark.sqlContext.sql("select name from student"); //顯示name的value print(temsql.show()) }?
//從hdfs上讀取數(shù)據(jù)文件并轉為student對象進行操作 private def runSarkOnFile(spark:SparkSession){ import spark.implicits._ //讀取數(shù)據(jù)文件 并生成rdd var rdd=spark.read.textFile("hdfs://服務器ip:8020/tmp/dataTest/sparkStu.txt"); //對獲取的rdd進行解析,并生成sutdent對象 var sturdd=rdd.map { x => x.split(" ")}.map { z => Student(z(0).toString(),z(1).toInt,z(2).toString())}; //顯示student對象 sturdd.show(); //將sutdent對象注冊成臨時表 student sturdd.registerTempTable("student"); //查詢臨時表中的數(shù)據(jù),并顯示 var sqlDF=spark.sql("select t.name,t.age,t.job from friend t where t.age>14 and t.age<26"); sqlDF.show(); }?
private def applySchema(spark:SparkSession){ import spark.implicits._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ //確定schema名稱(列的名稱) var schemaString="name,age,job"; //解析schemaString,并生成StructType對象數(shù)組 var schemaType=StructType(schemaString.split(",").map { x => StructField(x,StringType,true)}) //從hdfs上讀取數(shù)據(jù)文件 var stuDS=spark.sparkContext.textFile(path); //使用Row對象,創(chuàng)建rowRdd var sDS=stuDS.map { x => x.split(" ")}.map(s => Row(s(0),s(1),s(2))) //創(chuàng)建schemaRDD var rowDF=spark.createDataFrame(sDS, schemaType); // var rowDF=spark.sqlContext.applySchema(sDS, schemaType); 這種方法已經過時//打印schemaRDD的結構 rowDF.printSchema(); //注冊Student table rowDF.createOrReplaceTempView("Student"); // rowDF.registerTempTable("Student"); 這種方法已經過時 //rowDF.collect().foreach {print(_) } //var resDS=spark.sql("select * from Student where age > 24"); var resDS=spark.sql("select name from Student"); resDS.show(); } //使用parquet文件的方式 private def loadParquet(spark:SparkSession){ import spark.implicits._ //確定schema 列名稱 var schemaString="name,age,job"; //解析schemaString,并生成StructType對象數(shù)組 var schemaType=StructType(schemaString.split(",").map { x => StructField(x,StringType,true)}) //創(chuàng)建rowRdd var stuDS=spark.sparkContext.textFile(path); var sDS=stuDS.map { x => x.split(" ")}.map(s => Row(s(0),s(1),s(2))) //將schemaRDD保存成parquet文件 var rowDF=spark.sqlContext.applySchema(sDS, schemaType); //將文件寫到hdfs://服務器ip:8020/tmp/dataTest/ rowDF.write.parquet("hdfs://服務器ip:8020/tmp/dataTest/student.parquet"); ------------------------------------------------------------------- //讀取parquet文件 var redParfile=spark.read.parquet("hdfs://服務器ip:8020/tmp/dataTest/student.parquet"); redParfile.createOrReplaceTempView("redParfilered"); var resultRdd=spark.sql("select * from redParfilered t where t.name='zhangxs'"); //DataFrame.rdd 可以將dataFrame轉為RDD類型 resultRdd.rdd.map { x => "name"+x(0) }.collect().foreach { print(_) } }/** * spark可以自動的識別一個json模式并加載成數(shù)據(jù)集, * 這種轉換可以使用SparkSession.read.json() 函數(shù) * 這個數(shù)據(jù)集的來源可以是一個rdd,也可以是一個json文件 * */ private def jsonFile(spark:SparkSession){ var jsonRdd=spark.read.json("hdfs://192.168.177.124:8020/tmp/dataTest/sparksql"); jsonRdd.createOrReplaceTempView("student"); var jfRdd= spark.sql("select * from student t where t.age >24"); jfRdd.show();?
/** * 使用Json類型的rdd加載json * * 如果加:: Nil,返回是一個char類型的rdd,加上則返回的是String類型的rdd */ var rdd=spark.sparkContext.makeRDD("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil); var rddre=spark.read.json(rdd); rddre.show(); }?
轉載于:https://www.cnblogs.com/zhangXingSheng/p/6512599.html
總結
以上是生活随笔為你收集整理的spark sql的简单操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Coursera在线学习---第十节.大
- 下一篇: 《Linux调优工具oprofile的演