Spark SQL 之SQLContext(二)
1. SQLContext的創建
SQLContext是Spark SQL進行結構化數據處理的入口,可以通過它進行DataFrame的創建及SQL的執行,其創建方式如下:
//sc為SparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc)- 1
其對應的源碼為:
def this(sparkContext: SparkContext) = {this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true)}其調用的是私有的主構造函數:
//1.主構造器中的參數CacheManager用于緩存查詢結果 //在進行后續查詢時會自動讀取緩存中的數據 //2.SQLListener用于監聽Spark scheduler事件,它繼承自SparkListener //3.isRootContext表示是否是根SQLContext class SQLContext private[sql](@transient val sparkContext: SparkContext,@transient protected[sql] val cacheManager: CacheManager,@transient private[sql] val listener: SQLListener,val isRootContext: Boolean)extends org.apache.spark.Logging with Serializable {- 1
當spark.sql.allowMultipleContexts設置為true時,則允許創建多個SQLContexts/HiveContexts,創建方法為newSession
def newSession(): SQLContext = {new SQLContext(sparkContext = sparkContext,cacheManager = cacheManager,listener = listener,isRootContext = false)}- 1
其isRootContext 被設置為false,否則會拋出異常,因為root SQLContext只能有一個,其它SQLContext與root SQLContext共享SparkContext, CacheManager, SQLListener。如果spark.sql.allowMultipleContexts為false,則只允許一個SQLContext存在
2. 核心成員變量 ——catalog
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)catalog用于注銷表、注銷表、判斷表是否存在等,例如當DataFrame調用registerTempTable 方法時
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people")會sqlContext的registerDataFrameAsTable方法
def registerTempTable(tableName: String): Unit = {sqlContext.registerDataFrameAsTable(this, tableName)}- 1
sqlContext.registerDataFrameAsTable實質上調用的就是catalog的registerTable 方法:
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {catalog.registerTable(TableIdentifier(tableName), df.logicalPlan)}- 1
SimpleCatalog整體源碼如下:
class SimpleCatalog(val conf: CatalystConf) extends Catalog {private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {tables.put(getTableName(tableIdent), plan)}override def unregisterTable(tableIdent: TableIdentifier): Unit = {tables.remove(getTableName(tableIdent))}override def unregisterAllTables(): Unit = {tables.clear()}override def tableExists(tableIdent: TableIdentifier): Boolean = {tables.containsKey(getTableName(tableIdent))}override def lookupRelation(tableIdent: TableIdentifier,alias: Option[String] = None): LogicalPlan = {val tableName = getTableName(tableIdent)val table = tables.get(tableName)if (table == null) {throw new NoSuchTableException}val tableWithQualifiers = Subquery(tableName, table)// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are// properly qualified with this alias.alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)}override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {tables.keySet().asScala.map(_ -> true).toSeq}override def refreshTable(tableIdent: TableIdentifier): Unit = {throw new UnsupportedOperationException} }3. 核心成員變量 ——sqlParser
sqlParser在SQLContext的定義:
protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))SparkSQLParser為頂級的Spark SQL解析器,對Spark SQL支持的SQL語法進行解析,其定義如下:
private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParserfallback函數用于解析其它非Spark SQL Dialect的語法。
Spark SQL Dialect支持的關鍵字包括:
- 3
4. 核心成員變量 ——ddlParser
用于解析DDL(Data Definition Language 數據定義語言)
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))其支持的關鍵字有:
protected val CREATE = Keyword("CREATE")protected val TEMPORARY = Keyword("TEMPORARY")protected val TABLE = Keyword("TABLE")protected val IF = Keyword("IF")protected val NOT = Keyword("NOT")protected val EXISTS = Keyword("EXISTS")protected val USING = Keyword("USING")protected val OPTIONS = Keyword("OPTIONS")protected val DESCRIBE = Keyword("DESCRIBE")protected val EXTENDED = Keyword("EXTENDED")protected val AS = Keyword("AS")protected val COMMENT = Keyword("COMMENT")protected val REFRESH = Keyword("REFRESH")- 1
主要做三件事,分別是創建表、描述表和更新表
protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTablecreateTable方法具有如下(具體功能參考注釋說明):
/*** `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]* USING org.apache.spark.sql.avro* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`* or* `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]* USING org.apache.spark.sql.avro* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`* or* `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]* USING org.apache.spark.sql.avro* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`* AS SELECT ...*/protected lazy val createTable: Parser[LogicalPlan] = {// TODO: Support database.table.(CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ tableIdentifier ~tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {case temp ~ allowExisting ~ tableIdent ~ columns ~ provider ~ opts ~ query =>if (temp.isDefined && allowExisting.isDefined) {throw new DDLException("a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")}val options = opts.getOrElse(Map.empty[String, String])if (query.isDefined) {if (columns.isDefined) {throw new DDLException("a CREATE TABLE AS SELECT statement does not allow column definitions.")}// When IF NOT EXISTS clause appears in the query, the save mode will be ignore.val mode = if (allowExisting.isDefined) {SaveMode.Ignore} else if (temp.isDefined) {SaveMode.Overwrite} else {SaveMode.ErrorIfExists}val queryPlan = parseQuery(query.get)CreateTableUsingAsSelect(tableIdent,provider,temp.isDefined,Array.empty[String],mode,options,queryPlan)} else {val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))CreateTableUsing(tableIdent,userSpecifiedSchema,provider,temp.isDefined,options,allowExisting.isDefined,managedIfNoPath = false)}}}- 1
describeTable及refreshTable代碼如下:
/** describe [extended] table avroTable* This will display all columns of table `avroTable` includes column_name,column_type,comment*/protected lazy val describeTable: Parser[LogicalPlan] =(DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ {case e ~ tableIdent =>DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined)}protected lazy val refreshTable: Parser[LogicalPlan] =REFRESH ~> TABLE ~> tableIdentifier ^^ {case tableIndet =>RefreshTable(tableIndet)} 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Spark SQL 之SQLContext(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 语料与词汇资源
- 下一篇: Spark SQL之queryExecu