揭秘 Flink 1.9 新架构,Blink Planner 你会用了吗?
本文為 Apache Flink 新版本重大功能特性解讀之 Flink SQL 系列文章的開篇,Flink SQL 系列文章由其核心貢獻者們分享,涵蓋基礎知識、實踐、調優、內部實現等各個方面,帶你由淺入深地全面了解 Flink SQL。
1. 發展歷程
今年的8月22日 Apache Flink 發布了1.9.0 版本(下文簡稱1.9),在 Flink 1.9 中,Table 模塊迎來了核心架構的升級,引入了阿里巴巴Blink團隊貢獻的諸多功能,本文對Table 模塊的架構進行梳理并介紹如何使用 Blink Planner。
Flink 的 Table 模塊 包括 Table API 和 SQL,Table API 是一種類SQL的API,通過Table API,用戶可以像操作表一樣操作數據,非常直觀和方便;SQL作為一種聲明式語言,有著標準的語法和規范,用戶可以不用關心底層實現即可進行數據的處理,非常易于上手,Flink Table API 和 SQL 的實現上有80%左右的代碼是公用的。作為一個流批統一的計算引擎,Flink 的 Runtime 層是統一的,但在 Flink 1.9 之前,Flink API 層 一直分為DataStream API 和 DataSet API, Table API & SQL 位于 DataStream API 和 DataSet API 之上。
Flink 1.8 Table 架構
在 Flink 1.8 架構里,如果用戶需要同時流計算、批處理的場景下,用戶需要維護兩套業務代碼,開發人員也要維護兩套技術棧,非常不方便。 Flink 社區很早就設想過將批數據看作一個有界流數據,將批處理看作流計算的一個特例,從而實現流批統一,阿里巴巴的 Blink 團隊在這方面做了大量的工作,已經實現了 Table API & SQL 層的流批統一。 幸運的是,阿里巴巴已經將 Blink 開源回饋給 Flink 社區。為了實現 Flink 整個體系的流批統一,在結合 Blink 團隊的一些先行經驗的基礎上,Flink 社區的開發人員在多輪討論后,基本敲定了Flink 未來的技術架構。
Flink 未來架構
在Flink 的未來架構中,DataSet API將被廢除,面向用戶的API只有 DataStream API 和 Table API & SQL,在實現層,這兩個API共享相同的技術棧,使用統一的 DAG 數據結構來描述作業,使用統一的 StreamOperator 來編寫算子邏輯,以及使用統一的流式分布式執行引擎,實現徹底的流批統一。 這兩個API都提供流計算和批處理的功能,DataStream API 提供了更底層和更靈活的編程接口,用戶可以自行描述和編排算子,引擎不會做過多的干涉和優化;Table API & SQL 則提供了直觀的Table API、標準的SQL支持,引擎會根據用戶的意圖來進行優化,并選擇最優的執行計劃。
2.Flink 1.9 Table 架構
Blink 的 Table 模塊的架構在開源時就已經實現了流批統一,向著 Flink 的未來架構邁進了第一步,走在了 Flink 社區前面。 因此在 Flink 1.9 合入 Blink Table 代碼時,為了保證 Flink Table 已有架構和 Blink Table的架構能夠并存并朝著 Flink 未來架構演進,社區的開發人員圍繞FLIP-32(FLIP 即 Flink Improvement Proposals,專門記錄一些對Flink 做較大修改的提議。FLIP-32是:Restructure flink-table for future contributions) 進行了重構和優化,從而使得 Flink Table 的新架構具備了流批統一的能力,可以說 Flink 1.9 是 Flink 向著流批徹底統一這個未來架構邁出的第一步。
Flink 1.9 Table 架構
在 Flink Table 的新架構中,有兩個查詢處理器:Flink Query Processor 和 Blink Query Processor,分別對應兩個Planner,我們稱之為 Old Planner 和 Blink Planner。查詢處理器是 Planner 的具體實現, 通過parser(解析器)、optimizer(優化器)、codegen(代碼生成技術)等流程將 Table API & SQL作業轉換成 Flink Runtime 可識別的 Transformation DAG (由Transformation組成的有向無環圖,表示作業的轉換邏輯),最終由 Flink Runtime 進行作業的調度和執行。
Flink 的查詢處理器針對流計算和批處理作業有不同的分支處理,流計算作業底層的 API 是 DataStream API, 批處理作業底層的 API 是 DataSet API;而 Blink 的查詢處理器則實現流批作業接口的統一,底層的 API 都是Transformation。
3.Flink Planner 與 Blink Planner
Flink Table 的新架構實現了查詢處理器的插件化,社區完整保留原有 Flink Planner (Old Planner),同時又引入了新的 Blink Planner,用戶可以自行選擇使用 Old Planner 還是 Blink Planner。
在模型上,Old Planner 沒有考慮流計算作業和批處理作業的統一,針對流計算作業和批處理作業的實現不盡相同,在底層會分別翻譯到 DataStream API 和 DataSet API 上。而 Blink Planner 將批數據集看作 bounded DataStream (有界流式數據) ,流計算作業和批處理作業最終都會翻譯到 Transformation API 上。 在架構上,Blink Planner 針對批處理和流計算,分別實現了BatchPlanner 和 StreamPlanner ,兩者共用了大部分代碼,共享了很多優化邏輯。 Old Planner 針對批處理和流計算的代碼實現的是完全獨立的兩套體系,基本沒有實現代碼和優化邏輯復用。
除了模型和架構上的優點外,Blink Planner 在阿里巴巴集團內部的海量業務場景下沉淀了許多實用功能,集中在三個方面:
- Blink Planner 對代碼生成機制做了改進、對部分算子進行了優化,提供了豐富實用的新功能,如維表 join、Top N、MiniBatch、流式去重、聚合場景的數據傾斜優化等新功能。
- Blink Planner 的優化策略是基于公共子圖的優化算法,包含了基于成本的優化(CBO)和基于規則的優化(CRO)兩種策略,優化更為全面。同時,Blink Planner 支持從 catalog 中獲取數據源的統計信息,這對CBO優化非常重要。
- Blink Planner 提供了更多的內置函數,更標準的 SQL 支持,在 Flink 1.9 版本中已經完整支持 TPC-H ,對高階的 TPC-DS 支持也計劃在下一個版本實現。
整體看來,Blink 查詢處理器在架構上更為先進,功能上也更為完善。出于穩定性的考慮,Flink 1.9 默認依然使用 Flink Planner,用戶如果需要使用 Blink Planner,可以作業中顯式指定。
4.如何啟用 Blink Planner
在IDE環境里,只需要引入兩個 Blink Planner 的相關依賴,就可以啟用 Blink Planner。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>1.9.0</version> </dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.9.0</version> </dependency>對于流計算作業和批處理作業的配置非常類似,只需要在 EnvironmentSettings 中設置 StreamingMode 或 BatchMode 即可,流計算作業的設置如下:
// ********************** // BLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.StreamTableEnvironment;StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);bsTableEnv.sqlUpdate(…); bsTableEnv.execute();批處理作業的設置如下 :
// ****************** // BLINK BATCH QUERY // ****************** import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment;EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings); bbTableEnv.sqlUpdate(…) bbTableEnv.execute()如果作業需要運行在集群環境,打包時將 Blink Planner 相關依賴的 scope 設置為 provided,表示這些依賴由集群環境提供。這是因為 Flink 在編譯打包時, 已經將 Blink Planner 相關的依賴打包,不需要再次引入,避免沖突。
5. 社區長遠計劃
目前,TableAPI & SQL 已經成為 Flink API 的一等公民,社區也將投入更大的精力在這個模塊。在不遠的將來,待 Blink Planner 穩定之后,將會作為默認的 Planner ,而 Old Planner 也將會在合適的時候退出歷史的舞臺。目前社區也在努力賦予 DataStream 批處理的能力,從而統一流批技術棧,屆時 DataSet API 也將退出歷史的舞臺。
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的揭秘 Flink 1.9 新架构,Blink Planner 你会用了吗?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解密 云HBase 冷热分离技术原理
- 下一篇: 分布式服务架构下的混沌工程实践