使用flink Table Sql api来构建批量和流式应用(1)Table的基本概念
?從flink的官方文檔,我們知道flink的編程模型分為四層,sql層是最高層的api,Table api是中間層,DataStream/DataSet Api 是核心,stateful Streaming process層是底層實現。
?
?
?
其中,
flink dataset api使用及原理?介紹了DataSet Api?
flink DataStream API使用及原理介紹了DataStream?Api?
flink中的時間戳如何使用?---Watermark使用及原理?介紹了底層實現的基礎Watermark
flink window實例分析?介紹了window的概念及使用原理
Flink中的狀態與容錯?介紹了State的概念及checkpoint,savepoint的容錯機制
0. 基本概念:
0.1?TableEnvironment
TableEnvironment是Table API和SQL集成的核心概念,它主要負責:
1、在內部目錄Catalog中注冊一個Table2、注冊一個外部目錄Catalog
3、執行SQL查詢
4、注冊一個用戶自定義函數UDF
5、將DataStream或者DataSet轉換成Table
6、持有BatchTableEnvironment或者StreamTableEnvironment的引用 /*** The base class for batch and stream TableEnvironments.** <p>The TableEnvironment is a central concept of the Table API and SQL integration. It is* responsible for:** <ul>* <li>Registering a Table in the internal catalog</li>* <li>Registering an external catalog</li>* <li>Executing SQL queries</li>* <li>Registering a user-defined scalar function. For the user-defined table and aggregate* function, use the StreamTableEnvironment or BatchTableEnvironment</li>* </ul>*/
?
0.2?Catalog
Catalog:所有對數據庫和表的元數據信息都存放再Flink CataLog內部目錄結構中,其存放了flink內部所有與Table相關的元數據信息,包括表結構信息/數據源信息等。
/*** This interface is responsible for reading and writing metadata such as database/table/views/UDFs* from a registered catalog. It connects a registered catalog and Flink's Table API.*/其結構如下:
?0.3 TableSource
在使用Table API時,可以將外部的數據源直接注冊成Table數據結構。此結構稱之為TableSource
/*** Defines an external table with the schema that is provided by {@link TableSource#getTableSchema}.** <p>The data of a {@link TableSource} is produced as a {@code DataSet} in case of a {@code BatchTableSource}* or as a {@code DataStream} in case of a {@code StreamTableSource}. The type of ths produced* {@code DataSet} or {@code DataStream} is specified by the {@link TableSource#getProducedDataType()} method.** <p>By default, the fields of the {@link TableSchema} are implicitly mapped by name to the fields of* the produced {@link DataType}. An explicit mapping can be defined by implementing the* {@link DefinedFieldMapping} interface.** @param <T> The return type of the {@link TableSource}.*/0.4?TableSink
數據處理完成后需要將結果寫入外部存儲中,在Table API中有對應的Sink模塊,此模塊為TableSink
/*** A {@link TableSink} specifies how to emit a table to an external* system or location.** <p>The interface is generic such that it can support different storage locations and formats.** @param <T> The return type of the {@link TableSink}.*/0.5 Table Connector
在Flink1.6版本之后,為了能夠讓Table API通過配置化的方式連接外部系統,且同時可以在sql client中使用,flink 提出了Table Connector的概念,主要目的時將Table Source和Table Sink的定義和使用分離。
通過Table Connector將不同內建的Table Source和TableSink封裝,形成可以配置化的組件,在Table Api和Sql client能夠同時使用。
/*** Creates a table source and/or table sink from a descriptor.** <p>Descriptors allow for declaring the communication to external systems in an* implementation-agnostic way. The classpath is scanned for suitable table factories that match* the desired configuration.** <p>The following example shows how to read from a connector using a JSON format and* register a table source as "MyTable":** <pre>* {@code** tableEnv* .connect(* new ExternalSystemXYZ()* .version("0.11"))* .withFormat(* new Json()* .jsonSchema("{...}")* .failOnMissingField(false))* .withSchema(* new Schema()* .field("user-name", "VARCHAR").from("u_name")* .field("count", "DECIMAL")* .registerSource("MyTable");* }*</pre>** @param connectorDescriptor connector descriptor describing the external system*/TableDescriptor connect(ConnectorDescriptor connectorDescriptor);?
?本篇主要聚焦于sql和Table Api。
?1.sql
1.1 基于DataSet api的sql
示例:
package org.apache.flink.table.examples.java;import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment;/*** Simple example that shows how the Batch SQL API is used in Java.** <p>This example shows how to:* - Convert DataSets to Tables* - Register a Table under a name* - Run a SQL query on the registered Table*/ public class WordCountSQL {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {// set up execution environmentExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);DataSet<WC> input = env.fromElements(new WC("Hello", 1),new WC("Ciao", 1),new WC("Hello", 1));// register the DataSet as table "WordCount"tEnv.registerDataSet("WordCount", input, "word, frequency");// run a SQL query on the Table and retrieve the result as a new TableTable table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");DataSet<WC> result = tEnv.toDataSet(table, WC.class);result.print();}// *************************************************************************// USER DATA TYPES// *************************************************************************/*** Simple POJO containing a word and its respective count.*/public static class WC {public String word;public long frequency;// public constructor to make it a Flink POJOpublic WC() {}public WC(String word, long frequency) {this.word = word;this.frequency = frequency;}@Overridepublic String toString() {return "WC " + word + " " + frequency;}} }其中,BatchTableEnvironment
/*** The {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works* with {@link DataSet}s.** <p>A TableEnvironment can be used to:* <ul>* <li>convert a {@link DataSet} to a {@link Table}</li>* <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog</li>* <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>* <li>scan a registered table to obtain a {@link Table}</li>* <li>specify a SQL query on registered tables to obtain a {@link Table}</li>* <li>convert a {@link Table} into a {@link DataSet}</li>* <li>explain the AST and execution plan of a {@link Table}</li>* </ul>*/BatchTableSource
/** Defines an external batch table and provides access to its data.** @param <T> Type of the {@link DataSet} created by this {@link TableSource}.*/BatchTableSink
/** Defines an external {@link TableSink} to emit a batch {@link Table}.** @param <T> Type of {@link DataSet} that this {@link TableSink} expects and supports.*/?
1.2 基于DataStream api的sql
示例代碼
package org.apache.flink.table.examples.java;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment;import java.util.Arrays;/*** Simple example for demonstrating the use of SQL on a Stream Table in Java.** <p>This example shows how to:* - Convert DataStreams to Tables* - Register a Table under a name* - Run a StreamSQL query on the registered Table**/ public class StreamSQLExample {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {// set up execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStream<Order> orderA = env.fromCollection(Arrays.asList(new Order(1L, "beer", 3),new Order(1L, "diaper", 4),new Order(3L, "rubber", 2)));DataStream<Order> orderB = env.fromCollection(Arrays.asList(new Order(2L, "pen", 3),new Order(2L, "rubber", 3),new Order(4L, "beer", 1)));// convert DataStream to TableTable tableA = tEnv.fromDataStream(orderA, "user, product, amount");// register DataStream as TabletEnv.registerDataStream("OrderB", orderB, "user, product, amount");// union the two tablesTable result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " +"SELECT * FROM OrderB WHERE amount < 2"); tEnv.toAppendStream(result, Order.class).print();env.execute();}// *************************************************************************// USER DATA TYPES// *************************************************************************/*** Simple POJO.*/public static class Order {public Long user;public String product;public int amount;public Order() {}public Order(Long user, String product, int amount) {this.user = user;this.product = product;this.amount = amount;}@Overridepublic String toString() {return "Order{" +"user=" + user +", product='" + product + '\'' +", amount=" + amount +'}';}} }其中,StreamTableEnvironment
/*** The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with* {@link DataStream}s.** <p>A TableEnvironment can be used to:* <ul>* <li>convert a {@link DataStream} to a {@link Table}</li>* <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li>* <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>* <li>scan a registered table to obtain a {@link Table}</li>* <li>specify a SQL query on registered tables to obtain a {@link Table}</li>* <li>convert a {@link Table} into a {@link DataStream}</li>* <li>explain the AST and execution plan of a {@link Table}</li>* </ul>*/?StreamTableSource
/** Defines an external stream table and provides read access to its data.** @param <T> Type of the {@link DataStream} created by this {@link TableSource}.*/StreamTableSink
/*** Defines an external stream table and provides write access to its data.** @param <T> Type of the {@link DataStream} created by this {@link TableSink}.*/?
2. table api
示例
package org.apache.flink.table.examples.java;import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment;/*** Simple example for demonstrating the use of the Table API for a Word Count in Java.** <p>This example shows how to:* - Convert DataSets to Tables* - Apply group, aggregate, select, and filter operations*/ public class WordCountTable {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);DataSet<WC> input = env.fromElements(new WC("Hello", 1),new WC("Ciao", 1),new WC("Hello", 1));Table table = tEnv.fromDataSet(input);Table filtered = table.groupBy("word").select("word, frequency.sum as frequency").filter("frequency = 2");DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);result.print();}// *************************************************************************// USER DATA TYPES// *************************************************************************/*** Simple POJO containing a word and its respective count.*/public static class WC {public String word;public long frequency;// public constructor to make it a Flink POJOpublic WC() {}public WC(String word, long frequency) {this.word = word;this.frequency = frequency;}@Overridepublic String toString() {return "WC " + word + " " + frequency;}} }?3.數據轉換
3.1 DataSet與Table相互轉換
DataSet-->Table
注冊方式:
// register the DataSet as table "WordCount"tEnv.registerDataSet("WordCount", input, "word, frequency");轉換方式:
Table table =?tEnv.fromDataSet(input);
Table-->DataSet
DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);
3.2 DataStream與Table相互轉換
DataStream-->Table
注冊方式:
tEnv.registerDataStream("OrderB", orderB, "user, product, amount"); 轉換方式:Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");
Table-->DataStream
DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);
?
參考資料
【1】https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/programming-model.html
【2】Flink原理、實戰與性能優化
轉載于:https://www.cnblogs.com/davidwang456/p/11161621.html
總結
以上是生活随笔為你收集整理的使用flink Table Sql api来构建批量和流式应用(1)Table的基本概念的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink中的状态与容错
- 下一篇: 使用flink Table Sql ap