Spark SQL之External DataSource外部数据源(二)源代码分析
? ? 上周Spark1.2剛公布,周末在家沒事,把這個特性給了解一下,順便分析下源代碼,看一看這個特性是怎樣設(shè)計及實現(xiàn)的。
? ??/**?Spark SQL源代碼分析系列文章*/
??(Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部數(shù)據(jù)源(一)演示樣例?http://blog.csdn.net/oopsoom/article/details/42061077)
一、Sources包核心
? ? Spark SQL在Spark1.2中提供了External DataSource API。開發(fā)人員能夠依據(jù)接口來實現(xiàn)自己的外部數(shù)據(jù)源,如avro, csv, json, parquet等等。
? ? 在Spark SQL源代碼的org/spark/sql/sources文件夾下,我們會看到關(guān)于External DataSource的相關(guān)代碼。
這里特別介紹幾個:
? ? 1、DDLParser?
? ? 專門負責解析外部數(shù)據(jù)源SQL的SqlParser。解析create?temporary?table xxx using options (key 'value', key 'value') 創(chuàng)建載入外部數(shù)據(jù)源表的語句。
protected lazy val createTable: Parser[LogicalPlan] =CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {case tableName ~ provider ~ opts =>CreateTableUsing(tableName, provider, opts)}? ? 2、CreateTableUsing
? ?一個RunnableCommand。通過反射從外部數(shù)據(jù)源lib中實例化Relation。然后注冊到為temp table。
private[sql] case class CreateTableUsing(tableName: String,provider: String, // org.apache.spark.sql.json options: Map[String, String]) extends RunnableCommand {def run(sqlContext: SQLContext) = {val loader = Utils.getContextOrSparkClassLoaderval clazz: Class[_] = try loader.loadClass(provider) catch { //do reflectioncase cnf: java.lang.ClassNotFoundException =>try loader.loadClass(provider + ".DefaultSource") catch {case cnf: java.lang.ClassNotFoundException =>sys.error(s"Failed to load class for data source: $provider")}}val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] //json包DefaultDataSourceval relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))//創(chuàng)建JsonRelationsqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)//注冊Seq.empty} }? ? 2、DataSourcesStrategy
? ? 在 Strategy 一文中。我已講過Streategy的作用,用來Plan生成物理計劃的。
這里提供了一種專門為了解析外部數(shù)據(jù)源的策略。
? ? 最后會依據(jù)不同的BaseRelation生產(chǎn)不同的PhysicalRDD。
不同的BaseRelation的scan策略下文會介紹。
private[sql] object DataSourceStrategy extends Strategy {def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: CatalystScan)) =>pruneFilterProjectRaw(l,projectList,filters,(a, f) => t.buildScan(a, f)) :: Nil......case l @ LogicalRelation(t: TableScan) =>execution.PhysicalRDD(l.output, t.buildScan()) :: Nilcase _ => Nil} ? ?3、interfaces.scala?? ? 該文件定義了一系列可擴展的外部數(shù)據(jù)源接口,對于想要接入的外部數(shù)據(jù)源,我們僅僅需實現(xiàn)該接口就可以。
里面比較重要的trait?RelationProvider 和?BaseRelation,下文會具體介紹。
? ? 4、filters.scala
? ? 該Filter定義了怎樣在載入外部數(shù)據(jù)源的時候,就進行過濾。注意哦,是載入外部數(shù)據(jù)源到Table里的時候,而不是Spark里進行filter。
這個有點像hbase的coprocessor,查詢過濾在Server上就做了,不在Client端做過濾。
? ?5、LogicalRelation
? ?封裝了baseRelation,繼承了catalyst的LeafNode,實現(xiàn)MultiInstanceRelation。
? ??? ??
二、External DataSource注冊流程
用spark sql下sql/json來做演示樣例, 畫了一張流程圖,例如以下:注冊外部數(shù)據(jù)源的表的流程:1、提供一個外部數(shù)據(jù)源文件,比方j(luò)son文件。2、提供一個實現(xiàn)了外部數(shù)據(jù)源所須要的interfaces的類庫。比方sql下得json包,在1.2版本號后改為了External Datasource實現(xiàn)。3、引入SQLContext。使用DDL創(chuàng)建表,如create?temporary?table xxx using options (key 'value', key 'value')?4、External Datasource的DDLParser將對該SQL進行Parse5、Parse后封裝成為一個CreateTableUsing類的對象。
該類是一個RunnableCommand,其run方法會直接運行創(chuàng)建表語句。
6、該類會通過反射來創(chuàng)建一個org.apache.spark.sql.sources.RelationProvider。該trait定義要createRelation。如json。則創(chuàng)建JSONRelation,若avro,則創(chuàng)建AvroRelation。7、得到external releation后,直接調(diào)用SQLContext的baseRelationToSchemaRDD轉(zhuǎn)換為SchemaRDD8、最后registerTempTable(tableName) 來注冊為Table。能夠用SQL來查詢了。三、External DataSource解析流程
先看圖,圖例如以下:Spark SQL解析SQL流程例如以下:1、Analyzer通過Rule解析,將UnresolvedRelation解析為JsonRelation。2、通過Parse。Analyzer,Optimizer最后得到JSONRelation(file:///path/to/shengli.json,1.0) ?3、通過sources下得DataSourceStrategy將LogicalPlan映射到物理計劃PhysicalRDD。4、PhysicalRDD里包括了怎樣查詢外部數(shù)據(jù)的規(guī)則。能夠調(diào)用execute()方法來運行Spark查詢。
四、External Datasource Interfaces
在第一節(jié)我已經(jīng)介紹過,基本的interfaces,主要看一下BaseRelation和RelationProvider。假設(shè)我們要實現(xiàn)一個外部數(shù)據(jù)源,比方avro數(shù)據(jù)源,支持spark sql操作avro file。那么久必須定義AvroRelation來繼承BaseRelation。同一時候也要實現(xiàn)一個RelationProvider。
BaseRelation:是外部數(shù)據(jù)源的抽象,里面存放了schema的映射。和怎樣scan數(shù)據(jù)的規(guī)則。abstract class BaseRelation {def sqlContext: SQLContextdef schema: StructTypeabstract class PrunedFilteredScan extends BaseRelation {def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] }
1、schema我們假設(shè)自己定義Relation,必須重寫schema,就是我們必須描寫敘述對于外部數(shù)據(jù)源的Schema。2、buildScan我們定義怎樣查詢外部數(shù)據(jù)源。提供了4種Scan的策略。相應4種BaseRelation。
我們支持4種BaseRelation。分為TableScan, PrunedScan。PrunedFilterScan,CatalystScan。1、TableScan:
默認的Scan策略。2、PrunedScan:這里能夠傳入指定的列。requiredColumns。列裁剪,不須要的列不會從外部數(shù)據(jù)源載入。3、PrunedFilterScan:在列裁剪的基礎(chǔ)上,而且增加Filter機制。在載入數(shù)據(jù)也的時候就進行過濾。而不是在client請求返回時做Filter。4、CatalystScan:Catalyst的支持傳入expressions來進行Scan。支持列裁剪和Filter。
RelationProvider:我們要實現(xiàn)這個,接受Parse后傳入的參數(shù)。來生成相應的External Relation,就是一個反射生產(chǎn)外部數(shù)據(jù)源Relation的接口。trait RelationProvider {/*** Returns a new base relation with the given parameters.* Note: the parameters' keywords are case insensitive and this insensitivity is enforced* by the Map that is passed to the function.*/def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation }
五、External Datasource定義演示樣例
在Spark1.2之后,json和parquet也改為通過實現(xiàn)External API來進行外部數(shù)據(jù)源查詢的。以下以json的外部數(shù)據(jù)源定義為演示樣例,說明是怎樣實現(xiàn)的:1、JsonRelation
定義處理對于json文件的,schema和Scan策略,均基于JsonRDD,細節(jié)能夠自行閱讀JsonRDD。
private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(@transient val sqlContext: SQLContext)extends TableScan {private def baseRDD = sqlContext.sparkContext.textFile(fileName) //讀取json fileoverride val schema =JsonRDD.inferSchema( // jsonRDD的inferSchema方法。能自己主動識別json的schema。和類型type。baseRDD,samplingRatio,sqlContext.columnNameOfCorruptRecord)override def buildScan() =JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord) //這里還是JsonRDD,調(diào)用jsonStringToRow查詢返回Row }
2、DefaultSourceparameters中能夠獲取到options中傳入的path等自己定義參數(shù)。
這里接受傳入的參數(shù),來狗仔JsonRelation。private[sql] class DefaultSource extends RelationProvider {/** Returns a new base relation with the given parameters. */override def createRelation(sqlContext: SQLContext,parameters: Map[String, String]): BaseRelation = {val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)JSONRelation(fileName, samplingRatio)(sqlContext)} }
六、總結(jié)External DataSource源代碼分析下來。能夠總結(jié)為3部分。1、外部數(shù)據(jù)源的注冊流程2、外部數(shù)據(jù)源Table查詢的計劃解析流程3、怎樣自己定義一個外部數(shù)據(jù)源,重寫B(tài)aseRelation定義外部數(shù)據(jù)源的schema和scan的規(guī)則。定義RelationProvider。怎樣生成外部數(shù)據(jù)源Relation。External Datasource此部分API還有可能在興許的build中修改,眼下僅僅是涉及到了查詢。關(guān)于其他的操作還未涉及。——EOF——
原創(chuàng)文章。轉(zhuǎn)載請注明:
轉(zhuǎn)載自:OopsOutOfMemory盛利的Blog。作者:?OopsOutOfMemory
本文鏈接地址:http://blog.csdn.net/oopsoom/article/details/42064075??
注:本文基于署名-非商業(yè)性使用-禁止演繹 2.5 中國大陸(CC BY-NC-ND 2.5 CN)協(xié)議,歡迎轉(zhuǎn)載、轉(zhuǎn)發(fā)和評論,可是請保留本文作者署名和文章鏈接。如若須要用于商業(yè)目的或者與授權(quán)方面的協(xié)商,請聯(lián)系我。
轉(zhuǎn)載于:https://www.cnblogs.com/yangykaifa/p/6828063.html
總結(jié)
以上是生活随笔為你收集整理的Spark SQL之External DataSource外部数据源(二)源代码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: u盘启动的工具怎么用 如何使用U盘启动工
- 下一篇: 怎么读取不到u盘启动 如何让电脑从U盘启