Flink的Table API 与SQL介绍及调用
1 概述
? ?DataSetAPI和DateStreamAPI是基于整個Flink的運行時環境做操作處理的,Table API和SQL是在DateStreamAPI上又包了一層。對于新版本的Blink在DateStream基礎上又包了一層實現了批流統一,上層執行環境都是基于流處理,做批流統一的查詢。Table API是流處理和批處理通用的關系型API,與常規SQL語言中將查詢指定為字符串不同,Table API查詢是以Java或Scala中的語言嵌入樣式來定義。 是一套內嵌在 Java 和 Scala 語言中的查詢API,它允許以非常直觀的方式組合來自一些關系運算符的查詢。
??從Flink 1.9開始,Flink為Table 和SQL API程序提供了兩種不同的planner :Blink planner 和the old planner。Blink planner 和the old planner的區別如下:
??(1)批流統一:Blink將批處理作業,視為流式處理的特殊情況。所以,blink不支持表和DataSet之間的轉換,批處理作業將不轉換為DataSet應用程序,而是跟流處理一樣,轉換為DataStream程序來處理。
??(2)因為批流統一,Blink planner也不支持BatchTableSource,而使用有界的StreamTableSource代替
??(3)Blink planner只支持全新的目錄,不支持已棄用的ExternalCatalog
??(4)old planner和Blink planner的FilterableTableSource實現不兼容。舊的planner會把PlannerExpressions下推到filterableTableSource中,而blink planner則會把Expressions下推
??(5)基于字符串的鍵值配置選項僅適用于Blink planner
??(6)PlannerConfig在兩個planner中的實現不同
??(7) Blink planner會將多個sink優化在一個DAG中(僅在TableEnvironment上受支持,而在StreamTableEnvironment上不受支持)。而舊planner的優化總是將每一個sink放在一個新的DAG中,其中所有DAG彼此獨立
??(8)舊的planner不支持目錄統計,而Blink planner支持
??需要添加的依賴
??根據目標編程語言的不同,您需要將Java或ScalaAPI添加到項目中,以便使用TableAPI&SQL來定義管道:
<!-- Either... --> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.10.0</version><scope>provided</scope> </dependency> <!-- or... --> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.10.0</version><scope>provided</scope> </dependency>??您想在IDE中本地運行TableAPI&SQL程序,則必須添加以下一組模塊,具體取決于使用的planner:
<!-- Either... (for the old planner that was available before Flink 1.9) --> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.10.0</version><scope>provided</scope> </dependency> <!-- or.. (for the new Blink planner) --> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.10.0</version><scope>provided</scope> </dependency>???在內部,表生態系統的一部分是在Scala中實現的。因此確保為批處理和流應用程序添加以下依賴項:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.10.0</version><scope>provided</scope> </dependency>2 Table API 和 SQL 的程序結構
??所有用于批處理和流處理的Table API和SQL程序都遵循相同的結構,與流式處理的程序結構類似
//1 創建表的執行環境 // create a TableEnvironment for specific planner batch or streaming val tableEnv = ... // see "Create a TableEnvironment" section//2 創建表,讀取數據 tableEnv.connect(...).createTemporaryTable("table1")//3 注冊表,用于輸出計算結果 tableEnv.connect(...).createTemporaryTable("outputTable")//4.1 通過Table API查詢得到結果表 val tapiResult = tableEnv.from("table1").select(...)//4.2 通過SQL查詢得到結果表 val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ...")//5 將結果表寫入到輸出表 tapiResult.insertInto("outputTable")// execute tableEnv.execute("scala_job")3 創建TableEnvironment
??創建表的執行環境,需要將 flink 流處理的執行環境傳入
val tableEnv = StreamTableEnvironment.create(env)??TableEnvironment 是 flink 中集成 Table API 和 SQL 的核心概念,所有對表的操作都基于 TableEnvironment :注冊 Catalog(可以認為是對表的管理的結構);在 Catalog 中注冊表;執行 SQL 查詢;注冊用戶自定義函數(UDF);轉換DataStream或DataSet變成Table;保存對 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
??在老版本Table總是綁定到特定的TableEnvironment。不可能在同一個查詢中組合不同TableEnvironment的表,例如加入或合并它們。
??TableEnvironment創建通過調用靜態的BatchTableEnvironment.create()或StreamTableEnvironment.create()方法的StreamExecutionEnvironment或者ExecutionEnvironment還有一個可選的TableConfig。這個TableConfig可用于配置TableEnvironment或自定義查詢優化和轉換過程
??如果兩個planner JAR都位于類路徑(默認行為)上,則應顯式設置在當前程序中使用的planner。
// ********************** // FLINK STREAMING QUERY 配置老版本 planner 的流式查詢 // ********************** import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.StreamTableEnvironmentval fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings) // or val fsTableEnv = TableEnvironment.create(fsSettings)// ****************** // FLINK BATCH QUERY 配置老版本 planner 的批式查詢 // ****************** import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.table.api.scala.BatchTableEnvironmentval fbEnv = ExecutionEnvironment.getExecutionEnvironment val fbTableEnv = BatchTableEnvironment.create(fbEnv)// ********************** // BLINK STREAMING QUERY 配置 blink planner 的流式查詢 // ********************** import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.StreamTableEnvironmentval bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) // or val bsTableEnv = TableEnvironment.create(bsSettings)// ****************** // BLINK BATCH QUERY 配置 blink planner 的批式查詢 // ****************** import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() val bbTableEnv = TableEnvironment.create(bbSettings)??注意:老版本可以在代碼里面定義不同的環境,老版本處理過程中可以有批處理和流處理環境。一張批處理環境,一張流處理環境是不能做join查詢的,所有針對表的操作,必須是基于同一個執行環境的。
??如果只有一個Planner/lib目錄,可以使用useAnyPlanner (use_any_planner(用于python)創建特定的EnvironmentSettings.
4 在Catalog中創建表
??一個TableEnvironment維護使用標識符創建的表的Catalog的map,也就是TableEnvironment 可以注冊目錄 Catalog,并可以基于 Catalog 注冊表。表(Table)是由一個“標識符”(identifier)來指定的,由3部分組成:Catalog名、數據庫(database)名和對象名
??表可以是常規的,也可以是虛擬的視圖,View。常規表(Table):一般可以用來描述外部數據,比如文件、數據庫表或消息隊列的數據,也可以直接從 DataStream轉換而來。視圖(View):可以從現有的表中創建,通常是 table API 或者 SQL 查詢的一個結果集
??表可以是臨時的,并且與單個Flink會話的生命周期相關聯,或者是永久的,并且跨多個Flink會話和集群可見。永久表:需要Catalog(如Hive Metastore)來維護有關表的元數據。一旦創建了永久表,它對連接到目錄的任何Flink會話都是可見的,并且將繼續存在,直到該表顯式刪除為止。臨時表:總是存儲在內存中,并且僅在Flink會話期間才存在。這些表在其他會話中不可見。它們不綁定到任何目錄或數據庫,但可以在其中的名稱空間中創建。如果刪除臨時表的相應數據庫,則不會刪除它們。
4.1 創建表
4.1.1 Virtual Tables
??Table API對應于虛擬的表,它封裝了一個邏輯查詢計劃。它可以在Catalog中創建,如下所示:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section// table is the result of a simple projection query val projTable: Table = tableEnv.from("X").select(...)// register the Table projTable as table "projectedTable" tableEnv.createTemporaryView("projectedTable", projTable)??注意:Table對象類似于關系數據庫系統的VIEW,即定義Table未優化,當另一個查詢引用已注冊的Table查詢時,將內聯。如果多個查詢引用同一個已注冊的Table,它將為每個引用查詢內聯并多次執行,即注冊的結果Table不被分享。
4.1.2 Connector Tables
??可以創建一個TABLE從關系數據庫中所知道的連接器申報。連接器描述存儲表數據的外部系統,可以在這里聲明諸如Kafka之類的存儲系統或常規文件系統。其實就是TableEnvironment 可以調用 .connect() 方法,連接外部系統,并調用 .createTemporaryTable() 方法,在 Catalog 中注冊表
tableEnvironment.connect(...) // 定義表的數據來源,與外部系統建立連接.withFormat(...) // 定義數據格式化方法.withSchema(...) // 定義表結構.inAppendMode().createTemporaryTable("MyTable") // 創建臨時表??創建 Table 從文件中讀取如下:
tableEnv.connect(new FileSystem().path(“YOUR_FILE_PATH”)) // 定義到文件系統的連接.withFormat(new Csv()) // 定義以csv格式進行數據格式化.withSchema(new Schema() // 定義表結構.field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())) .createTemporaryTable("sensorTable") // 創建臨時表4.2 表的標識符identifier
??Flink中表由一個identifier指定,identifier由Catalog名、數據庫(database)名和對象名(表名)組成。用戶可以將其中的一Catalog和一個database設置為“當前目錄”和“當前數據庫”。使用它們,上面提到的3部分標識符中的前兩部分可以是可選的,如果不提供它們,則將引用當前目錄和當前數據庫。用戶可以通過表API或SQL切換當前目錄和當前數據庫。
??標識符遵循SQL要求,這意味著可以使用回勾字符(`)。此外,必須轉義所有SQL保留關鍵字。
Scala // get a TableEnvironment val tEnv: TableEnvironment = ...; tEnv.useCatalog("custom_catalog") tEnv.useDatabase("custom_database")val table: Table = ...;// register the view named 'exampleView' in the catalog named 'custom_catalog' // in the database named 'custom_database' tableEnv.createTemporaryView("exampleView", table)// register the view named 'exampleView' in the catalog named 'custom_catalog' // in the database named 'other_database' tableEnv.createTemporaryView("other_database.exampleView", table)// register the view named 'View' in the catalog named 'custom_catalog' in the // database named 'custom_database'. 'View' is a reserved keyword and must be escaped. tableEnv.createTemporaryView("`View`", table)// register the view named 'example.View' in the catalog named 'custom_catalog' // in the database named 'custom_database' tableEnv.createTemporaryView("`example.View`", table)// register the view named 'exampleView' in the catalog named 'other_catalog' // in the database named 'other_database' tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)5 表的查詢
5.1 Table API
??Table API 是集成在 Scala 和 Java 語言內的查詢 API。
??Table API 基于代表“表”的 Table 類,并提供一整套操作處理的方法 API;這些方法會返回一個新的 Table 對象,表示對輸入表應用轉換操作的結果,有些關系型轉換操作,可以由多個方法調用組成,構成鏈式調用結構
??Table API 文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tableApi.html
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section// register Orders table// scan registered Orders table val orders = tableEnv.from("Orders") // compute revenue for all customers from France val revenue = orders.filter('cCountry === "FRANCE").groupBy('cID, 'cName).select('cID, 'cName, 'revenue.sum AS 'revSum)// emit or convert Table // execute query注意:Scala Table API使用Scala符號,前面加了一個單引號’,這是Table API中定義的Expression類型的寫法,可以很方便地表示一個表中的字段。字段可以直接全部用雙引號引起來,也可以用半邊單引號+字段名的方式。Table API使用Scala實現。確保導入org.apache.flink.api.scala._和org.apache.flink.table.api.scala._為了使用Scala隱式轉換。
5.2 SQL
??Flink 的 SQL 集成,基于實現 了SQL 標準的 Apache Calcite,在 Flink 中,用常規字符串來定義 SQL 查詢語句,SQL 查詢的結果,也是一個新的 Table。官方Flink對流表和批處理表的SQL支持地址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/index.html
??指定查詢并將結果作為Table如下:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section// register Orders table// compute revenue for all customers from France val revenue = tableEnv.sqlQuery("""|SELECT cID, cName, SUM(revenue) AS revSum|FROM Orders|WHERE cCountry = 'FRANCE'|GROUP BY cID, cName""".stripMargin)// emit or convert Table // execute query??指定將其結果插入到已注冊表中的更新查詢如下:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section// register "Orders" table // register "RevenueFrance" output table// compute revenue for all customers from France and emit to "RevenueFrance" tableEnv.sqlUpdate("""|INSERT INTO RevenueFrance|SELECT cID, cName, SUM(revenue) AS revSum|FROM Orders|WHERE cCountry = 'FRANCE'|GROUP BY cID, cName""".stripMargin)// execute query??Table API和SQL查詢可以很容易地混合在一起使用,因為兩者都返回Table對象:Table API查詢可以基于SQL查詢返回的Table對象;可以根據Table API查詢的結果定義SQL查詢注冊結果表在TableEnvironment并在FROM子句的SQL查詢。
6 輸出表
6.1 表的輸出
??表的輸出,是通過將數據寫入 TableSink 來實現的,TableSink 是一個通用接口,可以支持不同的文件格式(如CSV、Apache Parquet、Apache Avro)、存儲數據庫(如如JDBC、Apache HBASE、Apache Cassandra、Elasticsearch)和消息隊列(如如Apache Kafka、RabbitMQ)。
??一個batch Table只能寫入BatchTableSink,而Streaming Table需要一個AppendStreamTableSink或RetractStreamTableSink或UpsertStreamTableSink。
??輸出表最直接的方法,就是通過 Table.insertInto() 方法將一個 Table 寫入注冊過的 TableSink 中
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section// create an output Table val schema = new Schema().field("a", DataTypes.INT()).field("b", DataTypes.STRING()).field("c", DataTypes.LONG())tableEnv.connect(new FileSystem("/path/to/file")).withFormat(new Csv().fieldDelimiter('|').deriveSchema()).withSchema(schema).createTemporaryTable("CsvSinkTable")// compute a result Table using Table API operators and/or SQL queries val result: Table = ...// emit the result Table to the registered TableSink result.insertInto("CsvSinkTable")// execute the program6.2 更新模式
??對于流式查詢,需要聲明如何在表和外部連接器之間執行轉換,與外部系統交換的消息類型,由更新模式(Update Mode)指定。有追加(Append),撤回(Retract),更新插入(Upsert)三種模式
??追加(Append)模式:表只做插入操作,和外部連接器只交換插入(Insert)消息。以前發出的結果永遠不會更新,如果更新或刪除操作使用追加模式會失敗報錯。
??撤回(Retract)模式:表和外部連接器交換添加(Add)和撤回(Retract)消息,插入操作(Insert)編碼為 Add 消息;刪除(Delete)編碼為 Retract 消息;更新(Update)編碼為上一條的 Retract 和下一條的 Add 消息。返回值是boolean類型。它用true或false來標記數據的插入和撤回,返回true代表數據插入,false代表數據的撤回。
??更新插入(Upsert)模式:更新和插入都被編碼為 Upsert 消息;刪除編碼為 Delete 消息
??撤回(Retract)和更新插入(Upsert)的區別:
??輸出到文件如下:Retract不能定義key,這一點跟upsert模式完全不同;Update操作需要一個唯一的key,通過這個key可以傳遞更新消息。為了正確應用消息,外部連接器需要知道這個唯一key的屬性,是用單個消息編碼的,所以效率會更高。
6.3 輸出到文件
tableEnv.connect(new FileSystem().path("/path/to/file") // 定義到文件系統的連接.withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("temp", DataTypes.Double())) .createTemporaryTable("outputTable") // 創建臨時表resultTable.insertInto("outputTable") // 輸出表6.4 輸出到Kafka
??可以創建 Table 來描述 kafka 中的數據,作為輸入或輸出的 TableSink
tableEnv.connect(new Kafka().version("0.11").topic("sinkTest").property("zookeeper.connect", "localhost:2181").property("bootstrap.servers", "localhost:9092") ).withFormat( new Csv() ).withSchema( new Schema().field("id", DataTypes.STRING()).field("temp", DataTypes.DOUBLE())).createTemporaryTable("kafkaOutputTable")resultTable.insertInto("kafkaOutputTable")6.5 輸出到ElasticSearch
??ElasticSearch的connector可以在upsert(update+insert,更新插入)模式下操作,這樣就可以使用Query定義的鍵(key)與外部系統交換UPSERT/DELETE消息。對于“僅追加”(append-only)的查詢,connector還可以在append 模式下操作,這樣就可以與外部系統只交換insert消息。es目前支持的數據格式,只有Json,而flink本身并沒有對應的支持,所以還需要引入依賴:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.10.0</version> </dependency> tableEnv.connect(new Elasticsearch().version("6").host("localhost", 9200, "http").index("test").documentType("temp") ).inUpsertMode() // 指定是 Upsert 模式.withFormat(new Json()).withSchema( new Schema().field("id", DataTypes.STRING()).field("count", DataTypes.BIGINT())).createTemporaryTable("esOutputTable")aggResultTable.insertInto("esOutputTable")6.6 輸出到MySql
??Flink專門為Table API的jdbc連接提供了flink-jdbc連接器,我們需要先引入依賴:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.0</version> </dependency>??jdbc連接的代碼實現比較特殊,因為沒有對應的java/scala類實現ConnectorDescriptor,所以不能直接tableEnv.connect()。不過Flink SQL留下了執行DDL的接口:tableEnv.sqlUpdate()
val sinkDDL: String ="""|create table jdbcOutputTable (| id varchar(20) not null,| cnt bigint not null|) with (| 'connector.type' = 'jdbc',| 'connector.url' = 'jdbc:mysql://localhost:3306/test',| 'connector.table' = 'sensor_count',| 'connector.driver' = 'com.mysql.jdbc.Driver',| 'connector.username' = 'root',| 'connector.password' = '123456'|)""".stripMargintableEnv.sqlUpdate(sinkDDL) aggResultSqlTable.insertInto("jdbcOutputTable")7 Query的解釋和執行
??Table API提供了一種機制來解釋(Explain)計算表的邏輯和優化查詢計劃。這是通過TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成的。
??explain方法會返回一個字符串,描述三個計劃:①未優化的邏輯查詢計劃②優化后的邏輯查詢計劃③實際執行計劃
??查看執行計劃如下
val explaination: String = tableEnv.explain(resultTable) println(explaination)??Query的解釋和執行過程,老planner和blink planner是不一樣的。整體來講,Query都會表示成一個邏輯查詢計劃,然后分兩步解釋:①優化查詢計劃②解釋成 DataStream 或者 DataSet程序
??而Blink版本是批流統一的,所以所有的Query,只會被解釋成DataStream程序;另外在批處理環境TableEnvironment下,Blink版本要到tableEnv.execute()執行調用才開始解釋。
8 Table與DataStream,DataSet的集成
??兩種planners都可以與DataStream API集成,只有老的planner才能與DataSet API集成。Scala Table API提供了DataSet、DataStream和Table的隱式轉換,通過導入org.apache.flink.table.api.scala._
8.1 從DataStream或DataSet創建視圖
??DataStream或DataSet可以在TableEnvironment作為一個視圖,只能注冊為臨時視圖
// get TableEnvironment // registration of a DataSet is equivalent val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" sectionval stream: DataStream[(Long, String)] = ...// register the DataStream as View "myTable" with fields "f0", "f1" tableEnv.createTemporaryView("myTable", stream)// register the DataStream as View "myTable2" with fields "myLong", "myString" tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString)8.2 將DataStream或DataSet轉換成表
??DataStream或DataSet可以直接轉換為Table,而不是注冊在TableEnvironment。
// get TableEnvironment // registration of a DataSet is equivalent val tableEnv = ... // see "Create a TableEnvironment" sectionval stream: DataStream[(Long, String)] = ...// convert the DataStream into a Table with default fields '_1, '_2 val table1: Table = tableEnv.fromDataStream(stream)// convert the DataStream into a Table with fields 'myLong, 'myString val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)??可以基于一個DataStream,先流式地讀取數據源,然后map成樣例類,再把它轉成Table。Table的列字段(column fields),就是樣例類里的字段,這樣就不用再定義schema。
??代碼中實現非常簡單,直接用tableEnv.fromDataStream()就可以了。默認轉換后的 Table schema 和 DataStream 中的字段定義一一對應,也可以單獨指定出來。
??這就允許我們更換字段的順序、重命名,或者只選取某些字段出來,相當于做了一次map操作(或者Table API的 select操作)
case class WC(id: String, timestamp: Long, count: Double)val inputStream: DataStream[String] = env.readTextFile("/path/to/file") val dataStream: DataStream[WC] = inputStream.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)})val wcTable: Table = tableEnv.fromDataStream(dataStream)??數據類型與 Table schema的對應:上面DataStream 中的數據類型,與表的 Schema 之間的對應關系,是按照樣例類中的字段名來對應的(name-based mapping),所以還可以用as做重命名。另外一種對應方式是,直接按照字段的位置來對應(position-based mapping),對應的過程中,就可以直接指定新的字段名了。
??基于名稱的對應:
val wcTable = tableEnv.fromDataStream(dataStream, 'timestamp as 'ts, 'id as 'myId, 'temperature)??基于位置的對應:
val wcTable = tableEnv.fromDataStream(dataStream, 'myId, 'ts)??Flink的DataStream和 DataSet API支持多種類型。組合類型,比如元組(內置Scala和Java元組)、POJO、Scala case類和Flink的Row類型等,允許具有多個字段的嵌套數據結構,這些字段可以在Table的表達式中訪問。其他類型,則被視為原子類型。
??元組類型和原子類型,一般用位置對應會好一些;用名稱對應的話:元組類型,默認的名稱是 “_1”, “_2”;而原子類型,默認名稱是 ”f0”。
8.3 將表轉換成DataStream
??表可以轉換為DataStream或DataSet。這樣,自定義流處理或批處理程序就可以繼續在 Table API或SQL查詢的結果上運行了。
??將表轉換為DataStream或DataSet時,需要指定生成的數據類型,即要將表的每一行轉換成的數據類型。通常,最方便的轉換類型就是Row。當然,因為結果的所有字段類型都是明確的,我們也經常會用元組類型來表示。
??表作為流式查詢的結果,是動態更新的。所以,將這種動態查詢轉換成的數據流,同樣需要對表的更新操作進行編碼,進而有不同的轉換模式。Table API中表到DataStream有兩種模式:①追加模式(Append Mode):用于表只會被插入(Insert)操作更改的場景②撤回模式(Retract Mode):用于任何場景。有些類似于更新模式中Retract模式,它只有Insert和Delete兩類操作。得到的數據會增加一個Boolean類型的標識位(返回的第一個字段),用它來表示到底是新增的數據(Insert),還是被刪除的數據(老數據, Delete)。
// get TableEnvironment. // registration of a DataSet is equivalent val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section// Table with two fields (String name, Integer age) val table: Table = ...// convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)// convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream[(String, Int)](table)// convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream[(Boolean, X)]. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)??注意:一般沒有經過groupby之類聚合操作,可以直接用 toAppendStream 來轉換;而如果經過了聚合,有更新操作,一般就必須用 toRetractDstream
總結
以上是生活随笔為你收集整理的Flink的Table API 与SQL介绍及调用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 非监督学习
- 下一篇: Codeforces Round #70