Spark SQL 加载数据
第一種方式:Spark SQL可以將數(shù)據(jù)文件加載成RDD方式,然后將RDD轉(zhuǎn)成DataFrame或者DataSet。
第二種方式:從本地或者Cloud(hdfs hive S3等)
將文件加載成RDD
首先打開(kāi)控制臺(tái),輸入命令打開(kāi)spark-shell:
./spark-shell --master local[2] --jars /home/iie4bu/software/mysql-connector-java-5.1.35.jar
然后加載本地文件:
val masterlog = sc.textFile("file:///home/iie4bu/app/spark-2.4.5-bin-2.6.0-cdh5.15.1/logs/spark-iie4bu-org.apache.spark.deploy.master.Master-1-manager.out.2")
此時(shí)已經(jīng)將本地文件加載成RDD了,還無(wú)法使用SQL進(jìn)行查詢,因?yàn)闆](méi)有轉(zhuǎn)成DataFrame。
將RDD轉(zhuǎn)成DataFrame
import org.apache.spark.sql.Row val masterRDD = masterlog.map(x => Row(x))import org.apache.spark.sql.types._ val schemaString = "line"val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))val schema = StructType(fields)val masterDF = spark.createDataFrame(masterRDD, schema) masterDF.printSchema masterDF.show
這樣就可以使用DataFrame中的相關(guān)查詢了。也可以將DataFrame注冊(cè)成一張表。
將DataFrame注冊(cè)成表
masterDF.createOrReplaceTempView("master_logs") spark.sql("select * from master_logs limit 10").show(false)如果使用JSON或者Parquet,schema信息不需要手寫(xiě),spark可以進(jìn)行推測(cè)
使用Spark導(dǎo)入外部數(shù)據(jù)源,這種方式不需要手動(dòng)寫(xiě)schema了:
val usersDF = spark.read.format("parquet").load("file:///home/iie4bu/app/spark-2.4.5-bin-2.6.0-cdh5.15.1/examples/src/main/resources/users.parquet") usersDF.printSchema
生成DataFrame之后,也可以使用創(chuàng)建表來(lái)執(zhí)行sql:
更簡(jiǎn)單的實(shí)現(xiàn)方式
spark.sql("select * from parquet.`file:///home/iie4bu/app/spark-2.4.5-bin-2.6.0-cdh5.15.1/examples/src/main/resources/users.parquet` where name = 'Ben'").show讀取hdfs或者s3數(shù)據(jù)
讀取成RDD:
val hdfsRDD = sc.textFile("hdfs://path/file") val s3RDD = sc.textFile("s3a://bucket/object")直接讀取成DataFrame:
spark.read.format("text").load("hdfs://path/file") spark.read.format("text").load("s3a://bucket/object")總結(jié)
以上是生活随笔為你收集整理的Spark SQL 加载数据的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Mysql字符集之utf8和utf8mb
- 下一篇: 原创:4个新一线城市遭大学生追捧,热度赶