Apache Flink 在汽车之家的应用与实践
簡介:?汽車之家如何基于 Flink 上線了 AutoStream 平臺并持續打磨。
本文整理自汽車之家實時計算平臺負責人邸星星在 Flink Forward Asia 2020 分享的議題《Apache Flink 在汽車之家的應用及實踐》。主要內容包括:
一、背景及現狀
1. 第一階段
在 2019 年之前,汽車之家的大部分實時業務都是運行在 Storm 之上的。Storm 作為早期主流的實時計算引擎,憑借簡單的 Spout 和 Bolt 編程模型以及集群本身的穩定性,俘獲了大批用戶,我們在 2016 年搭建了 Storm 平臺。
隨著實時計算的需求日漸增多,數據規模逐步增大,Storm 在開發及維護成本上都凸顯了不足,這里列舉幾個痛點:
我們一直是用的 Lambda 架構,會用 T+1 的離線數據修正實時數據,即最終以離線數據為準,所以計算口徑實時要和離線完全保持一致,實時數據開發的需求文檔就是離線的 SQL,實時開發人員的核心工作就是把離線的 SQL 翻譯成 Storm 代碼,期間雖然封裝了一些通用的 Bolt 來簡化開發,但把離線動輒幾百行的 SQL 精準地翻譯成代碼還是很有挑戰的,并且每次運行都要經過打包、上傳、重啟的一系列的繁瑣操作,調試成本很高。
Storm 對狀態支持的不好,通常需要借助 Redis、HBase 這類 kv 存儲維護中間狀態,我們之前是強依賴 Redis。比如常見的計算 UV 的場景,最簡單的辦法是使用 Redis 的 sadd 命令判斷 uid 是否為已經存在,但這種方法會帶來很高的網絡 IO,同時如果沒有提前報備的大促或搞活動導致流量翻倍的情況,很容易把 Redis 內存搞滿,運維同學也會被殺個措手不及。同時 Redis 的吞吐能力也限制了整個作業的吞吐量。
由于采用編寫 Storm 代碼方式開發,難以分析元數據及血緣關系,同時可讀性差,計算口徑不透明,業務交接成本很高。
數據倉庫團隊是直接對接業務需求的團隊,他們更熟悉基于 Hive 的 SQL 開發模式,通常都不擅長 Storm 作業的開發,這導致一些原本是實時的需求,只能退而求其次選擇 T+1 的方式給出數據。
在這個階段,我們支持了最基本的實時計算需求,因為開發門檻比較高,很多實時業務都是由我們平臺開發來完成,既做平臺,又做數據開發,精力分散很嚴重。
2. 第二階段
我們從 2018 年開始調研 Flink 引擎,其相對完備的 SQL 支持,天生對狀態的支持吸引了我們,在經過學習調研后,2019 年初開始設計開發 Flink SQL 平臺,并于 2019 年中上線了 AutoStream 1.0 平臺。平臺上線之初就在倉庫團隊、監控團隊和運維團隊得以應用,能夠快速被用戶主要得益于以下幾點:
痛點:
隨著平臺作業量的高速增長,平臺 on-call 成本非常高。首先我們經常面對一些新用戶的基礎問題:
還有一些不太容易快速給出答案的問題:
尤其是延遲問題,我們常見的數據傾斜,GC,反壓問題可以直接引導用戶去 Flink UI 和監控圖表上去查看,但有時候還是需要手動去服務器上查看 jmap、jstack 等信息,有時候還需要生成火焰圖來幫助用戶定位性能問題。
初期我們沒有和運營團隊合作,完全是我們開發人員直接對接處理這些問題,雖然期間補充了大量的文檔,但是整體上 on-call 成本還是很高。
在 AutoStream1.0 平臺這個階段,基于 SQL 開發的方式極大地降低了實時開發的門檻,各業務方可以自己實現實時業務的開發,同時數倉同學經過簡單的學習后,就開始對接實時業務,將我們平臺方從大量的業務需求中釋放出來,讓我們可以專心做平臺方面的工作。
3. 當前階段
針對上面的幾個方面,我們有針對性行的做了以下幾點升級:
目前用戶對平臺的使用已經趨于熟悉,同時自助健康檢查和自助診斷等功能的上線,我們平臺方的日常 on-call 頻率在逐步降低,開始逐漸進入平臺建設的良性循環階段。
4. 應用場景
汽車之家用于做實時計算的數據主要分為三類:
以上這三類數據都會實時寫入 Kafka 集群,在 Flink 集群中針對不同場景進行計算,結果數據寫入到 Redis、MySQL、Elasticsearch、HBase、Kafka、Kylin 等引擎中,用于支持上層應用。
下面列舉了一些應用場景:
5. 集群規模
目前 Flink 集群服務器 400+,部署模式為 YARN (80%) 和 Kubernetes,運行作業數 800+,日計算量 1 萬億,峰值每秒處理數據 2000 萬條。
二、AutoStream 平臺
1. 平臺架構
上面是 AutoStream 平臺目前的整體架構,主要是以下幾部分內容:
這是我們平臺的核心服務,負責對元數據服務、Flink 客戶端服務、Jar 管理服務及交互結果查詢服務進行整合,通過前端頁面把平臺功能暴露給用戶。
主要包括 SQL 和 Jar 作業的管理、庫表信息的管理、UDF 管理、操作記錄及歷史版本的管理、健康檢查、自助診斷、報警管理等模塊,同時提供對接外部系統的能力,支持其他系統通過接口方式管理庫表信息、SQL 作業信息及作業啟停操作等。基于 Akka 任務的生命周期管理和調度系統提供了高效,簡單,低延遲的操作保障,提升了用戶使用的效率和易用性。
主要對應 Flink Catalog 的后端實現,除了支持基本的庫表信息管理外,還支持庫表粒度的權限控制,結合我們自身的特點,支持用戶組級別的授權。
底層我們提供了 Plugin Catalog 機制,既可以用于和 Flink 已有的 Catalog 實現做集成,也可以方便我們嵌入自定義的 Catalogs,通過 Plugin 機制可以很容易的重用 HiveCatalog,JdbcCatalog 等,從而保證了庫表的周期的一致性。
同時元數據服務還負責對用戶提交的 DML 語句進行解析,識別當前作業的依賴的表信息,用于作業的分析及提交過程,同時可以記錄血緣關系。
平臺提供的各類 SDK 在 Jar Service 上進行統一管理,同時用戶也可以在平臺上把自定義 Jar、UDF jar 等提交到 Jar Service 上統一管理,然后在作業中通過配置或 DDL 引用。
負責把平臺上的作業轉化成 Flink Job 提交到 Yarn 或 Kubernetes 上,我們在這一層針對 Yarn 和 Kubernetes 做了抽象,統一兩種調度框架的行為,對外暴露統一接口及規范化的參數,弱化 Yarn 和 Kubernetes 的差異,為 Flink 作業在兩種框架上無縫切換打下了良好的基礎。
每個作業的依賴不盡相同,我們除了對基礎依賴的管理以外,還需要支持個性化的依賴。比如不同版本的 SQL SDK,用戶自助上傳的 Jar、UDF 等,所以不同作業的提交階段需要做隔離。
我們采用的是 Jar service + 進程隔離的方式,通過和 Jar Service 對接,根據作業的類型和配置,選用相應的 Jar,并且提交單獨的進程中執行,實現物理隔離。
是一個簡易的緩存服務,用于 SQL 作業開發階段的在線調試場景。當我們分析出用戶的 SQL 語句,將 Select 語句的結果集存入緩存服務中;然后用戶可以在平臺上通過選擇 SQL 序號 (每個完整的 SELECT 語句對應一個序號),實時查看 SQL 對應的結果數據,方便用戶開發與分析問題。
最右側的部分主要是各種 Source、Sink 的實現,有一些是重用 Flink 提供的 connector,有一些是我們自己開發的 connector。
針對每一種 connector 我們都添加了必要 Metric,并配置成單獨的監控圖表,方便用戶了解作業運行情況,同時也為定位問題提供數據依據。
2. 基于 SQL 的開發流程
在平臺提供以上功能的基礎上,用戶可以快速的實現 SQL 作業的開發:
平臺默認會保存 SQL 每一次的變更記錄,用戶可以在線查看歷史版本,同時我們會記錄針對作業的各種操作,在作業維護階段可以幫助用戶追溯變更歷史,定位問題。
下面是一個 Demo,用于統計當天的 PV、UV 數據:
3. 基于 Catalog 的元數據管理
元數據管理的主要內容:
新老版本完全兼容:由于在 AutoStream 1.0 的時候,我們沒有單獨引入 Metastore 服務,此外 1.0 時期的 DDL SQL 解析模塊是自研的組件。所以在建設 MetaStore 服務時,需要考慮歷史作業和歷史庫表信息兼容的問題。
下面是幾個模塊和 Metastore 交互的示意圖:
4. UDXF 管理
我們引入了 Jar Service 服務用來管理各種 Jar,包括用戶自定義作業、平臺內部 SDK 組件、UDXF 等,在 Jar Service 基礎上我們可以很容易的實現 UDXF 的自助管理,在 On k8s 的場景下,我們提供了統一的鏡像,Pod 啟動后會從 Jar Service 下載對應的 Jar 到容器內部,用于支持作業的啟動。
用戶提交的 SQL 中如果包含 Function DDL,我們會在 Job Client Service 中會解析 DDL,下載對應的 Jar 到本地。
為了避免和其他作業有依賴沖突,我們每次都會單獨啟動一個子進程來完成作業提交的操作。UDXF Jar 會被并加入到 classpath 中,我們對 Flink 做了一些修改,作業提交時會把這個 Jar 一并上傳到 HDFS 中;同時 AutoSQL SDK 會根據函數名稱和類名為當前作業注冊 UDF。
5. 監控報警及日志收集
得益于 Flink 完善的 Metric 機制,我們可以方便的添加 Metric,針對 Connector,我們內嵌了豐富的 Metric,并配置了默認的監控看板,通過看板可以查看 CPU、內存、JVM、網絡傳輸、Checkpoint、各種 Connector 的監控圖表。同時平臺和公司的云監控系統對接,自動生成默認的報警策略,監控存活狀態、消費延遲等關鍵指標。同時用戶可以在云監控系統修改默認的報警策略,添加新的報警項實現個性化監控報警。
日志通過云 Filebeat 組件寫入到 Elasticsearch 集群,同時開放 Kibana 供用戶查詢。
整體的監控報警及日志收集架構如下:
6. 健康檢查機制
隨著作業數的高速增長,出現了很多資源使用不合理的情況,比如前面提到的資源浪費的情況。用戶大多時候都是在對接新需求,支持新業務,很少回過頭來評估作業的資源配置是否合理,優化資源使用。所以平臺規劃了一版成本評估的模型,也就是現在說的健康檢查機制,平臺每天會針對作業做多維度的健康評分,用戶可以隨時在平臺上查看單個作業的得分情況及最近 30 天的得分變化曲線。
低分作業會在用戶登錄平臺時進行提示,并且定期發郵件提醒用戶進行優化、整改,在優化作業后用戶可以主動觸發重新評分,查看優化效果。
我們引入了多維度,基于權重的評分策略,針對 CPU、內存使用率、是否存在空閑 Slot、GC 情況、Kafka 消費延遲、單核每秒處理數據量等多個維度的指標結合計算拓補圖進行分析評估,最終產生一個綜合分。
每個低分項都會顯示低分的原因及參考范圍,并顯示一些指導建議,輔助用戶進行優化。
我們新增了一個 Metric,用一個 0%~100% 的數字體現 TaskManagner CPU 利用率。這樣用戶可以直觀的評估 CPU 是否存在浪費的情況。
下面是作業評分的大致流程:首先我們會收集和整理運行作業的基本信息和 Metrics 信息。然后應用我們設定好的規則,得到基本評分和基礎建議信息。最后將得分信息和建議整合,綜合評判,得出綜合得分和最終的報告。用戶可以通過平臺查看報告。對于得分較低的作業,我們會發送報警給作業的歸屬用戶。
7. 自助診斷
如之前提到的痛點,用戶定位線上問題時,只能求助于我們平臺方,造成我們 on-call 工作量很大,同時用戶體驗也不好,鑒于此,所以我們上線了以下功能:
8. 基于 Checkpoint 復制的快速容災
當實時計算應用在重要業務場景時,單個 Yarn 集群一旦出現故障且短期內不可恢復,那么可能會對業務造成較大影響。
在此背景下,我們建設了 Yarn 多集群架構,兩個獨立的 Yarn 各自對應一套獨立的 HDFS 環境,checkpoint 數據定期在兩個 HDFS 間相互復制。目前 checkpoint 復制的延遲穩定在 20 分鐘內。
同時,在平臺層面,我們把切換集群的功能直接開放給用戶,用戶可以在線查看 checkpoint 的復制情況,選擇合適的 checkpoint 后 (當然也可以選擇不從 checkpoint 恢復) 進行集群切換,然后重啟作業,實現作業在集群間的相對平滑的遷移。
三、基于 Flink 的實時生態建設
AutoStream 平臺的核心場景是支持實時計算開發人員的使用,使實時計算開發變得簡單高效、可監控、易運維。同時隨著平臺的逐步完善,我們開始摸索如何對 AutoStream 平臺進行重用,如何讓 Flink 應用在更多場景下。重用 AutoStream 有以下幾點優勢:
基于以上幾點,我們在建設其他系統時,優先重用 AutoStream 平臺,以接口調用的方式進行對接,將 Flink 作業全流程的生命周期,完全托管給 AutoStream 平臺,各系統優先考慮實現自身的業務邏輯即可。
我們團隊內的 AutoDTS (接入及分發任務) 和 AutoKafka (Kafka 集群復制) 系統目前就是依托于 AutoStream 建設的。簡單介紹一下集成的方式,以 AutoDTS 為例:
1. AutoDTS 數據接入分發平臺
AutoDTS 系統主要包含兩部分功能:
1.1 AutoDTS 數據接入
下面是數據接入的架構圖:
我們維護了基于 Flink 的數據接入 SDK 并定義了統一的 JSON 數據格式,也就是說 MySQL Binlog,SQL Server、 TiDB 的變更數據接入到 Kafka 后,數據格式是一致的,下游業務使用時,基于統一格式做開發,無需關注原始業務庫的類型。
數據接入到 Kafka Topic 的同時,Topic 會自動注冊為一張 AutoStream 平臺上的流表,方便用戶使用。
數據接入基于 Flink 建設還有一個額外的好處,就是可以基于 Flink 的精確一次語義,低成本的實現精確一次數據接入,這對支持數據準確性要求很高的業務來說,是一個必要條件。
目前我們在做把業務表中的全量數據接入 Kafka Topic 中,基于 Kafka 的 compact 模式,可以實現 Topic 中同時包含存量數據和增量數據。這對于數據分發場景來說是十分友好的,目前如果想把數據實時同步到其他存儲引擎中,需要先基于調度系統,接入一次全量數據,然后再開啟實時分發任務,進行變更數據的實時分發。有了 Compact Topic 后,可以省去全量接入的操作。Flink1.12 版本已經對 Compact Topic 做支持,引入 upsert-kafka Connector [1]
[1]?FLIP-149: Introduce the upsert-kafka Connector - Apache Flink - Apache Software Foundation
下面是一條樣例數據:
默認注冊到平臺上的流表是 Schemaless 的,用戶可以用 JSON 相關的 UDF 獲取其中的字段數據。
下面是使用流表的示例:
1.2 AutoDTS 數據分發
我們已經知道,接入到 Kafka 中的數據是可以當做一張流表來使用的,而數據分發任務本質上是把這個流表的數據寫入到其他存儲引擎,鑒于 AutoStream 平臺已經支持多種 Table Sink (Connector),我們只需要根據用戶填寫的下游存儲的類型和地址等信息,就可以通過拼裝 SQL 來實現數據的分發。
通過直接重用 Connector 的方式,最大化的避免了重復開發的工作。
下面是一個分發任務對應的 SQL 示例:
2. Kaka 多集群架構
Kafka 在實際應用中,有些場景是需要做 Kafka 多集群架構支持的,下面列舉幾個常見的場景:
- 數據冗余災備,實時復制數據到另一個備用集群,當一個 Kafka 集群不可用時,可以讓應用切換到備用集群,快速恢復業務;
- 集群遷移,當機房合同到期,或者上云時,都需要做集群的遷移,此時需要把集群數據整體復制到新機房的集群,讓業務相對平滑遷移;
- 讀寫分離場景,使用 Kafka 時,大多數情況都是讀多寫少,為保證數據寫入的穩定性,可以選擇建設 Kafka 讀寫分離集群。
我們目前建設了 Kafka 多集群架構,和 Flink 相關的主要有兩塊內容:
2.1 整體架構
先來看一下 Kafka 集群間的數據復制,這是建設多集群架構的基礎。我們是使用 MirrorMaker2 來實現數據復制的,我們把 MirrorMaker2 改造成普通的 Flink 作業,運行在 Flink 集群中。
我們引入了 Route Service 和 Kafka SDK,實現客戶端快速切換訪問的 Kafka 集群。
客戶端需要依賴我們自己發布的 Kafka SDK,并且配置中不再指定 bootstrap.servers 參數,而是通過設置 cluster.code 參數來聲明自己要訪問的集群。 SDK 會根據 cluster.code 參數,訪問 Route Service 獲取集群真正的地址,然后創建 Producer/Consumer 開始生產/消費數據。
SDK 會監聽路由規則的變化,當需要切換集群時,只需要在 Route Service 后臺切換路由規則,SDK 發現路由集群發生變化時,會重啟 Producer/Consumer 實例,切換到新集群。
如果是消費者發生了集群切換,由于 Cluster1 和 Cluster2 中 Topic 的 offset 是不同的,需要通過 Offset Mapping Service 來獲取當前 Consumer Group 在 Cluster2 中的 offset,然后從這些 Offset 開始消費,實現相對平滑的集群切換。
2.2 Kafka 集群間的數據復制
我們使用 MirrorMaker2 來實現集群間的數據復制,MirrorMaker2 是 Kafka 2.4 版本引入的,具體以下特性:
- 自動識別新的 Topic 和 Partition;
- 自動同步 Topic 配置:Topic 的配置會自動同步到目標集群;
- 自動同步 ACL;
- 提供 Offset 的轉換工具:支持根據源集群、目標集群及 Group 信息,獲取到該 Group 在目標集群的中對應的 Offset 信息;
- 支持擴展黑白名單策略:可以靈活定制,動態生效。
clusters = primary, backup
primary.bootstrap.servers = vip1:9091
backup.bootstrap.servers = vip2:9092
primary->backup.enabled = true
backup->primary.enabled = true
這段配置完成 primary 到 backup 集群的雙向數據復制,primary 集群中的 topic1 中的數據會復制到 backup 集群中的 primary.topic1 這個 Topic 中,目標集群的Topic 命名規則是 sourceCluster.sourceTopicName,可以通過實現 ReplicationPolicy 接口來自定義命名策略。
2.3 MirrorMaker2 相關的 Topic 介紹
- 源集群中的 Topic
heartbeats:存儲心跳數據;
mm2-offset-syncs.targetCluster.internal:存儲源集群 (upstreamOffset) 和目標集群的 offset(downstreamOffset) 對應關系。
- 目標集群中的 Topic
mm2-configs.sourceCluster.internal:connect 框架自帶,用來存儲配置;
mm2-offsets.sourceCluster.internal:connect 框架自帶,用來存儲 WorkerSourceTask 當前處理的 offset,mm2 場景下是為了當前數據同步到源集群 topic partition 的哪一個 offset,這個更像是 Flink 的 checkpoint 概念;
mm2-status.sourceCluster.internal:connect 框架自帶,用來存儲 connector 狀態。
上面三個用的都是 connect runtime 模塊中的 KafkaBasedLog 工具類,這個工具類可以讀寫一個 compact 模式的 topic 數據,此時 MirrorMaker2 把 topic 當作 KV 存儲使用。
sourceCluster.checkpoints.internal:記錄 sourceCluster consumer group 在當前集群對應的 offset,mm2 會定期從源 kafka 集群讀取 topic 對應的 consumer group 提交的 offset, 并寫到目標集群的 sourceCluster.checkpoints.internal topic 中。
2.4 MirrorMaker2 的部署
下面是 MirrorMaker2 作業運行的流程,在 AutoKafka 平臺上創建一個數據復制作業,會調用 AutoStream 平臺接口,相應的創建一個 MM2 類型的作業。啟動作業時,會調用 AutoStream 平臺的接口把 MM2 作業提交到 Flink 集群中運行。
2.5 路由服務
Route Service 負責處理客戶端的路由請求,根據客戶端的信息匹配合適的路由規則,將最終路由結果,也就是集群信息返回給客戶端。
支持基于集群名稱、Topic、Group、ClientID 以及客戶端自定義的參數靈活配置路由規則。
下面的例子就是將 Flink 作業 ID 為 1234 的消費者,路由到 cluster_a1 集群。
2.6 Kafka SDK
使用原生的 kafka-clients 是無法和 Route Service 進行通信的,客戶端需要依賴我們提供的 Kafka SDK (汽車之家內部開發的 SDK) 能和 Route Service 通信,實現動態路由的效果。
Kafka SDK 實現了 Producer、Consumer 接口,本質是 kafka-clients 的代理,業務做較少的改動就可以引入 Kafka SDK。
業務依賴 Kafka SDK 后,Kafka SDK 會負責和 Route Service 通信,監聽路由變化,當發現路由的集群發生變化時,會 close 當前的 Producer/Consumer,創建新的 Producer/Consumer,訪問新的集群。
此外 Kafka SDK 還負責將 Producer、Consumer 的 metric 統一上報到云監控系統的 prometheus,通過查看平臺預先配置好的儀表盤,可以清晰的看到業務的生產、消費情況。
同時 SDK 會收集一些信息,比如應用名稱、IP 端口、進程號等,這些信息可以在 AutoKafka 平臺上查到,方便我們和用戶共同定位問題。
2.7 Offset Mapping Service
當 Consumer 的路由發生變化并切換集群時,情況有一些復雜,因為目前 MirrorMaker2 是先把數據從源集群消費出來,再寫入到目標集群的,同一條數據可以確保寫入到目標 topic 的相同分區,但是 offset 和源集群是不同的。
針對這種 offset 不一致的情況,MirrorMaker2 會消費源集群的 __consumer_offsets 數據,加上目標集群對應的 offset,寫入到目標集群的 sourceCluster.checkpoints.internal topic 中。
同時,源集群的 mm2-offset-syncs.targetCluster.internal topic 記錄了源集群和目標集群 offset 的映射關系,結合這兩個 topic,我們建設了 Offset Mapping Service 來完成目標集群的 offset 的轉換工作。
所以當 Consumer 需要切換集群時,會調用 Offset Mapping Service 的接口,獲取到目標集群的 offsets,然后主動 seek 到這些位置開始消費,這樣實現相對平滑的集群切換工作。
2.8 Flink 與 Kafka 多集群架構的集成
由于 Kafka SDK 兼容 kafka-clients 的用法,用戶只需要更換依賴,然后設置 cluster.code、Flink.id 等參數即可。
當 Producer/Consumer 發生集群切換后,由于創建了新的 Producer/Consumer 實例,Kafka 的 metric 數據沒有重新注冊,導致 metric 數據無法正常上報。我們在 AbstractMetricGroup 類中增加了 unregister 方法,在監聽 Producer/Consumer 的切換事件時,重新注冊 kafka metrics 就可以了。
至此我們完成了 Flink 對 Kafka 多集群架構的支持。
四、后續規劃
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。?
總結
以上是生活随笔為你收集整理的Apache Flink 在汽车之家的应用与实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何构建流量无损的在线应用架构 | 专题
- 下一篇: Quick BI电子表格: 新手亦可表格