Spark-sql:以编程方式执行Spark SQL查询(通过反射的方式推断出Schema,通过StrutType直接指定Schema)
生活随笔
收集整理的這篇文章主要介紹了
Spark-sql:以编程方式执行Spark SQL查询(通过反射的方式推断出Schema,通过StrutType直接指定Schema)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1、編寫Spark SQL查詢語句
在這之前創建Maven項目。創建的過程如:http://blog.csdn.net/tototuzuoquan/article/details/74571374
在這里:http://blog.csdn.net/tototuzuoquan/article/details/74907124,可以知道Spark Shell中使用SQL完成查詢,下面通過在自定義程序中編寫Spark SQL查詢程序。首先在maven項目的pom.xml中添加Spark SQL的依賴。
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>1.5.2</version> </dependency>最終的Pom文件內容如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.spark</groupId><artifactId>bigdata</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><spark.version>1.6.2</spark.version><hadoop.version>2.6.4</hadoop.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>1.5.2</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build></project>2、運行參數準備
person.txt的內容如下:
3、通過反射推斷出Schema
package cn.toto.sparkimport org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/10.*/ object InferringSchema {def main(args: Array[String]): Unit = {//創建SparkConf()并設置App名稱(本地運行的時候加上:setMaster("local"),如果不是本地就不加這句)val conf = new SparkConf().setAppName("SQL-1").setMaster("local")//SQLContext要依賴SparkContextval sc = new SparkContext(conf)//創建SQLContextval sqlContext = new SQLContext(sc)//從指定的地址創建RDDval lineRDD = sc.textFile(args(0)).map(_.split(" "))//創建case class//將RDD和case class關聯val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))//引入隱式轉換,如果不到人無法將RDD轉換成DataFrame//將RDD轉換成DataFrameimport sqlContext.implicits._val personDF = personRDD.toDF//注冊表personDF.registerTempTable("t_person")//傳入SQLval df = sqlContext.sql("select * from t_person order by age desc limit 2")//將結果以JSON的方式存儲到指定位置df.write.json(args(1))//停止Spark Contextsc.stop()} }//case class一定要放在外面 case class Person(id:Int, name:String, age : Int)參數配置:
運行程序,結果如下:
將程序打包成jar,上傳到Spark集群,提交Spark任務(要以代碼中要去掉setMaster(“local”))
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME [root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-submit --class cn.toto.spark.InferringSchema --master spark://hadoop1:7077,hadoop2:7077 /home/tuzq/software/sparkdata/bigdata-1.0-SNAPSHOT.jar hdfs://mycluster/person.txt hdfs://mycluster/out4.通過StructType直接指定Schema
代碼如下:
package cn.toto.sparkimport org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/10.*/ object SpecifyingSchema {def main(args: Array[String]): Unit = {//創建SparkConf()并設置App名稱val conf = new SparkConf().setAppName("SQL-2").setMaster("local")//SQLContext要依賴SparkContextval sc = new SparkContext(conf)//創建SQLContextval sqlContext = new SQLContext(sc)//從指定的地址創建RDDval personRDD = sc.textFile(args(0)).map(_.split(" "))//通過StructType直接指定每個字段的Schema,相當于是表的描述信息val schema = StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))//將RDD映射到rowRDDval rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))//將schema信息應用到rowRDD上val personDataFrame = sqlContext.createDataFrame(rowRDD,schema)//注冊表personDataFrame.registerTempTable("t_person")//執行SQLval df = sqlContext.sql("select * from t_person order by age desc limit 4")//將結果以JSON的方式存儲到指定位置df.write.json(args(1))//停止Spark Contextsc.stop()} }運行參數配置:
運行后的結果:
總結
以上是生活随笔為你收集整理的Spark-sql:以编程方式执行Spark SQL查询(通过反射的方式推断出Schema,通过StrutType直接指定Schema)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 小规模纳税人买车可以抵税吗
- 下一篇: 马云为什么选择张勇接任 这位牛人的战绩了