使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用
從flink的官方文檔,我們知道flink的編程模型分為四層,sql層是最高層的api,Table api是中間層,DataStream/DataSet Api 是核心,stateful Streaming process層是底層實現。
?
?
?
其中,
flink dataset api使用及原理?介紹了DataSet Api?
flink DataStream API使用及原理介紹了DataStream?Api?
flink中的時間戳如何使用?---Watermark使用及原理?介紹了底層實現的基礎Watermark
flink window實例分析?介紹了window的概念及使用原理
Flink中的狀態與容錯?介紹了State的概念及checkpoint,savepoint的容錯機制
?上上篇<使用flink Table &Sql api來構建批量和流式應用(1)Table的基本概念>介紹了Table的基本概念及使用方法
上篇<使用flink Table &Sql api來構建批量和流式應用(2)Table API概述>
本篇主要看看Flink Sql 有哪些功能及背后的原理
1. sql功能
?體現在org.apache.flink.table.api.TableEnvironment,目前flink僅支持select和insert操作
(1) select?
/*** Evaluates a SQL query on registered tables and retrieves the result as a {@link Table}.** <p>All tables referenced by the query must be registered in the TableEnvironment.* A {@link Table} is automatically registered when its {@link Table#toString()} method is* called, for example when it is embedded into a String.* Hence, SQL queries can directly reference a {@link Table} as follows:** <pre>* {@code* Table table = ...;* String tableName = table.toString();* // the table is not registered to the table environment* tEnv.sqlQuery("SELECT * FROM tableName");* }* </pre>** @param query The SQL query to evaluate.* @return The result of the query as Table*/Table sqlQuery(String query);(2) update(當前僅支持insert)
/*** Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;* NOTE: Currently only SQL INSERT statements are supported.** <p>All tables referenced by the query must be registered in the TableEnvironment.* A {@link Table} is automatically registered when its {@link Table#toString()} method is* called, for example when it is embedded into a String.* Hence, SQL queries can directly reference a {@link Table} as follows:** <pre>* {@code* // register the configured table sink into which the result is inserted.* tEnv.registerTableSink("sinkTable", configuredSink);* Table sourceTable = ...* String tableName = sourceTable.toString();* // sourceTable is not registered to the table environment* tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName");* }* </pre>** @param stmt The SQL statement to evaluate.*/void sqlUpdate(String stmt);/*** Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;* NOTE: Currently only SQL INSERT statements are supported.** <p>All tables referenced by the query must be registered in the TableEnvironment.* A {@link Table} is automatically registered when its {@link Table#toString()} method is* called, for example when it is embedded into a String.* Hence, SQL queries can directly reference a {@link Table} as follows:** <pre>* {@code* // register the configured table sink into which the result is inserted.* tEnv.registerTableSink("sinkTable", configuredSink);* Table sourceTable = ...* String tableName = sourceTable.toString();* // sourceTable is not registered to the table environment* tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);* }* </pre>** @param stmt The SQL statement to evaluate.* @param config The {@link QueryConfig} to use.*/void sqlUpdate(String stmt, QueryConfig config);2. sql解析原理
Apache Calcite面向Hadoop新的sql引擎,它提供了標準的SQL語言、多種查詢優化和連接各種數據源的能力。除此之外,Calcite還提供了OLAP和流處理的查詢引擎。它2013年成為了Apache孵化項目以來,在Hadoop中越來越引人注目,并被眾多項目集成。比如Flink/Storm/Drill/Phoenix都依賴它做sql解析和優化。
先從demo跑起來,看看sql 解析都經歷了什么工程?
(1) select?
package org.apache.flink.table.examples.java;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment;import java.util.Arrays;/*** Simple example for demonstrating the use of SQL on a Stream Table in Java.** <p>This example shows how to:* - Convert DataStreams to Tables* - Register a Table under a name* - Run a StreamSQL query on the registered Table**/ public class StreamSQLExample {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {// set up execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStream<Order> orderA = env.fromCollection(Arrays.asList(new Order(1L, "beer", 3),new Order(1L, "diaper", 4),new Order(3L, "rubber", 2)));DataStream<Order> orderB = env.fromCollection(Arrays.asList(new Order(2L, "pen", 3),new Order(2L, "rubber", 3),new Order(4L, "beer", 1)));// convert DataStream to TableTable tableA = tEnv.fromDataStream(orderA, "user, product, amount");// register DataStream as TabletEnv.registerDataStream("OrderB", orderB, "user, product, amount");// union the two tablesTable result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " +"SELECT * FROM OrderB WHERE amount < 2");tEnv.toAppendStream(result, Order.class).print();env.execute();}// *************************************************************************// USER DATA TYPES// *************************************************************************/*** Simple POJO.*/public static class Order {public Long user;public String product;public int amount;public Order() {}public Order(Long user, String product, int amount) {this.user = user;this.product = product;this.amount = amount;}@Overridepublic String toString() {return "Order{" +"user=" + user +", product='" + product + '\'' +", amount=" + amount +'}';}} }實現代碼如下
override def sqlQuery(query: String): Table = {val planner = getFlinkPlanner// parse the sql queryval parsed = planner.parse(query)if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {// validate the sql queryval validated = planner.validate(parsed)// transform to a relational treeval relational = planner.rel(validated)new TableImpl(this, new PlannerQueryOperation(relational.rel))} else {throw new TableException("Unsupported SQL query! sqlQuery() only accepts SQL queries of type " +"SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.")}}>>parse the sql query
在calcite中用SqlNode表示
public SqlSelect(SqlParserPos pos,SqlNodeList keywordList,SqlNodeList selectList,SqlNode from,SqlNode where,SqlNodeList groupBy,SqlNode having,SqlNodeList windowDecls,SqlNodeList orderBy,SqlNode offset,SqlNode fetch) {super(pos);this.keywordList = Objects.requireNonNull(keywordList != null? keywordList : new SqlNodeList(pos));this.selectList = selectList;this.from = from;this.where = where;this.groupBy = groupBy;this.having = having;this.windowDecls = Objects.requireNonNull(windowDecls != null? windowDecls : new SqlNodeList(pos));this.orderBy = orderBy;this.offset = offset;this.fetch = fetch;}>>validate the sql query
SqlValidatorImpl驗證sqlNode
public SqlNode validate(SqlNode topNode) {SqlValidatorScope scope = new EmptyScope(this);scope = new CatalogScope(scope, ImmutableList.of("CATALOG"));final SqlNode topNode2 = validateScopedExpression(topNode, scope);final RelDataType type = getValidatedNodeType(topNode2);Util.discard(type);return topNode2;}>>transform to a relational tree
SqlToRelConverter.java
/*** Converts an unvalidated query's parse tree into a relational expression.** @param query Query to convert* @param needsValidation Whether to validate the query before converting;* <code>false</code> if the query has already been* validated.* @param top Whether the query is top-level, say if its result* will become a JDBC result set; <code>false</code> if* the query will be part of a view.*/public RelRoot convertQuery(SqlNode query,final boolean needsValidation,final boolean top) {if (needsValidation) {query = validator.validate(query);}RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));RelNode result = convertQueryRecursive(query, top, null).rel;if (top) {if (isStream(query)) {result = new LogicalDelta(cluster, result.getTraitSet(), result);}}RelCollation collation = RelCollations.EMPTY;if (!query.isA(SqlKind.DML)) {if (isOrdered(query)) {collation = requiredCollation(result);}}checkConvertedType(query, result);if (SQL2REL_LOGGER.isDebugEnabled()) {SQL2REL_LOGGER.debug(RelOptUtil.dumpPlan("Plan after converting SqlNode to RelNode",result, SqlExplainFormat.TEXT,SqlExplainLevel.EXPPLAN_ATTRIBUTES));}final RelDataType validatedRowType = validator.getValidatedNodeType(query);return RelRoot.of(result, validatedRowType, query.getKind()).withCollation(collation);}?
(2)update
代碼實現
override def sqlUpdate(stmt: String): Unit = {sqlUpdate(stmt, this.queryConfig)}override def sqlUpdate(stmt: String, config: QueryConfig): Unit = {val planner = getFlinkPlanner// parse the sql queryval parsed = planner.parse(stmt)parsed match {case insert: SqlInsert =>// validate the SQL queryval query = insert.getSourceval validatedQuery = planner.validate(query)// get query result as Tableval queryResult = new TableImpl(this,new PlannerQueryOperation(planner.rel(validatedQuery).rel))// get name of sink tableval targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names// insert query result into sink tableinsertInto(queryResult, config, targetTablePath.asScala:_*)case _ =>throw new TableException("Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")}}步驟類似,不再贅述。
3. 總結
Flink Table API&SQL 為流式數據和靜態數據的關系查詢保留統一的接口,而且利用了Calcite的查詢優化框架和SQL parser。該設計是基于Flink已構建好的API構建的,DataStream API 提供低延時高吞吐的流處理能力而且就有exactly-once語義而且可以基于event-time進行處理。而且DataSet擁有穩定高效的內存算子和流水線式的數據交換。Flink的core API和引擎的所有改進都會自動應用到Table API和SQL上。
參考資料:
【1】http://blog.chinaunix.net/uid-29038263-id-5765791.html
?
轉載于:https://www.cnblogs.com/davidwang456/p/11205723.html
總結
以上是生活随笔為你收集整理的使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用flink Table Sql ap
- 下一篇: 编译Tomcat9源码及tomcat乱码