腾讯基于 Flink 的实时流计算平台演进之路
原文地址:https://www.infoq.cn/article/TjDeQDJQpKZ*NpG71pRW
大家好,我是來自騰訊大數據團隊的楊華(vinoyang),很高興能夠參加這次北京的 QCon,有機會跟大家分享一下騰訊實時流計算平臺的演進與這個過程中我們的一些實踐經驗。
這次分享主要包含四個議題,我會首先闡述一下騰訊在實時計算中使用 Flink 的歷程,然后會簡單介紹一下騰訊圍繞 Flink 的產品化實踐:我們打造了一個 Oceanus 平臺,同時騰訊云也早已提供基于 Flink 的實時流計算服務,接著我們會重點跟大家聊一聊我們對社區版 Flink 的一些擴展與改進、優化。
Flink 在騰訊實時計算概況簡介
首先,我們進入第一個議題。Flink 在騰訊正式被考慮替代 Storm 是在 2017 年。
17 年上半年,我們主要在調研 Flink 替換 Storm 的可行性、特性、性能是否能夠滿足我們的上線要求。在此之前,我們內部以 Storm 作為實時計算的基礎框架也已經有幾年的時間了,在使用的過程中也發現了 Storm 的一些痛點,比如,沒有內置狀態的支持,沒有提供完備的容錯能力,沒有內置的窗口 API,core API 無法提供 Exactly-once 的語義保證等等。
17 年下半年,我們從社區拉出當時最新的發布分支(1.3.2)作為我們內部的定制開發分支進行開發。作為一個試點,我們選擇了內部一個流量較大的業務來進行替換,這個業務在我們內部是以 standalone 的模式部署的,所以我們最初也使用的是 Flink 的 standalone 部署模式。
18 年上半年,我們開始圍繞 Flink 進行產品化,打造了一個全流程、一體化的實時流計算平臺——Oceanus,來簡化業務方構建實時應用的復雜度并降低運維成本,這也基本明確了后續我們主要的運行模式是 Flink on YARN。
18 年下半年,我們的 Oceanus 平臺已經有足夠的能力來構建常見的流計算應用,我們部門內部的一些實時流計算業務也已經在平臺上穩定運行,于是我們開始為騰訊云、騰訊其他事業群以及業務線提供流計算服務。同時,我們也將平臺整合進我們的大數據套件,為外部私有云客戶提供流計算服務。
19 年上半年,我們的主要目標是在 Oceanus 上沉淀并完善上層的場景化服務建設,比如提供在線機器學習、風控等場景化服務。另外,我們也在 Flink 批處理方向發力,利用 Flink 的計算能力來滿足跨數據中心,跨數據源的聯合分析需求。它可以做到:數據源 SQL 下推,避免集群帶寬資源浪費;單 DC 內 CBO(基于代價優化),生成最優的執行計劃;跨 DC CBO,根據 DC 負載和資源選擇最佳 DC 執行計算,從而獲得更好的資源利用和更快的查詢性能。以上就是騰訊使用 Flink 的整個歷程。
這幅圖展示了,Flink 目前在騰訊內部已經為一些我們耳熟能詳的產品提供實時計算的服務。這些產品,包括微信、支付、財付通、騰訊云、QQ、空間、音樂、游戲、K 歌等等。我們列舉其中幾個業務的使用案例,微信使用我們的平臺來統計朋友圈的實時瀏覽信息、小游戲種子用戶的 UV 計算、實時惡意流量分析判斷、看一看的紅點信息;支付用來計算商戶交易相關的統計;音樂用于實時點唱、熱門排行榜等等。
接下來我們來了解一下,目前 Flink 在騰訊使用的現狀。目前我們 Oceanus 平臺 YARN 集群的 vcore 總數目達到了 34 萬,累計的峰值計算能力接近 2.1 億 / 秒,日均處理消息規模近 20 萬億。到目前為止,騰訊內部除了廣告的在線訓練業務外,原先運行在 Storm 上的實時流計算業務都已逐步遷移到 Flink 引擎上,而廣告這塊的業務預計也會在今年下半年遷移完成。
Oceanus 平臺簡介
接下來,我們進入第二個議題:簡要介紹一下我們的 Oceanus 平臺。
首先,我們來看一下 Oceanus 平臺的整體技術架構。我們內部定制版的 Flink 引擎,稱之為 TDFLINK,它跟其他的一些大數據基礎設施框架交互并協同支撐了我們上層的 Oceanus 平臺,Oceanus 支持畫布、SQL 以及 Jar 三種形式構建應用,為了方便業務方降低整體成本,我們還提供了配置、測試、部署等完整配套的功能,在平臺之上我們提供了一些領域特定的場景化服務比如 ETL、監控、推薦廣告等。
下面我們來介紹 Oceanus 的幾個典型功能。首先這是某個用戶的應用列表頁。從列表中,我們可以看到應用的當前狀態、類型、迭代的版本,它歸屬于哪個場景等信息。在操作欄,我們可以一鍵對應用進行啟停、調試,查看它的指標信息等,除此之外我們還提供了很多便捷的操作,比如快速復制一個應用,他們都收納在“更多”菜單按鈕中。
這是我們的一個指標分鐘級統計的畫布應用詳情頁,我們為 ETL 類型的應用提供了一個通用的 transform 算子。它提供了很多功能細分的可插拔的便捷函數來簡化常見的事件解析與提取的復雜度。在圖中,多種不同類型的指標經過 split 算子分流后將相同的指標進行歸類,然后再對它們應用各自的統計邏輯,就像這里的窗口一樣,基本上每個算子都是配置化的。像這種類型的應用我們通過拖拽、配置就可以輕松完成它的構建。
這幅圖展示了我們的指標詳情頁檢查點的指標明細,為了讓業務人員更直觀地了解它們最關心的指標信息,我們將一些必要的指標進行了重新梳理并展示到我們的平臺上,這里有些指標直接使用了 Flink 提供的 REST API 接口,而有些指標則是我們內部擴展定制的。
最后,我們來介紹一下最近上線的在線機器學習模塊。這是我們一個模型訓練應用的詳情頁,同樣它也是畫布類型的,我們對常規的機器學習類型的應用進行了步驟拆分,包括了數據預處理、算法等相關步驟,每個步驟都可以進行拖拽,再加上配置的方式就可以完成一個 ML 類型的應用創建。
對于訓練得到的模型,我們也提供了模型服務功能,我們用模型服務組來管理每個模型的不同時間的版本,點擊右側的“評估報告”可以查看這個模型的 AUC 趨勢。
以上是對 Oceanus 平臺的介紹,如果大家有興趣可以掃描 PPT 最后的二維碼來進一步了解我們的平臺以及騰訊云上的流計算服務。
針對 Flink 的擴展與優化
接下來,我們進入下一個議題,介紹我們內部 Flink 版本在通過騰訊云對外提供服務時基于內部以及業務的相關需求對社區版的擴展與優化。
第一個改進是我們重構了 Flink Web UI,我們重構的原因是因為社區版的 Flink Web UI 在定位問題的時候不能提供足夠的信息,導致問題定位的效率不夠高。尤其是 Job 并行度非常大,YARN 的 container 數目非常多的時候,當 Job 發生失敗,很難快速去找到 container 和節點以查看進程的堆棧或者機器指標。所以,為了更高效地定位問題,我們對 Flink web UI 進行了重構并暴露了一些關鍵指標。
這是我們一個 TaskManager 的詳情頁,我們為它新增了一個“Threads” Tab,我們可以通過它看到 Task 相關的線程信息:線程名稱、CPU 消耗、狀態以及堆棧等。這樣一旦哪個算子的線程可能成為瓶頸時,我們可以快速定位到它阻塞在什么方法調用上。
接下來的這個改進是對 JobManager failover 的優化。大家應該都知道社區版的 Flink JobManager HA 在 Standalone 模式下有個很大的問題是:它的 standby 節點是冷備的,JobManager 的切換會導致它管理的所有 Job 都會被重啟恢復,這一行為在我們現網環境中是不可接受的。所以,我們首先定制的第一個大特性就是 JobManager 的 failover 優化,讓 standby 節點變成熱備,這使得 JobManager 的切換對 TaskManager 上已經正在運行的 Job 不產生影響。我們已經對 Standalone 以及 Flink on YARN 這兩種部署模式支持了這個特性,Flink on YARN 的支持還處于內部驗證階段。我們以對 Standalone 模式的優化為例來進行分析,它主要包含這么幾個步驟:
-
取消 JobManager 跟 TaskManager 因為心跳超時或 Leadership 變動就 cancel task 的行為;
-
對 ExecutionGraph 核心數據的快照;
-
通過 ExecutionGraphBuilder 重構空的 ExecutionGraph 加上快照重置來恢復出一個跟原先等價的 ExecutionGraph 對象;
-
TaskManager 跟新的 JobManager leader 建立連接后以心跳上報自己的狀態和必要的信息;
-
新的 JobManager 確認在 reconcile 階段 Job 的所有 task 是否正常運行;
接下來的這個改進已經在反饋社區的過程中,它就是對檢查點失敗處理的改進。在探討改進之前,我們先來了解一下社區版當前的處理機制。JobMaster 中,每個 Job 會對應一個 Checkpoint Coordinator,它用來管理并協調 Job 檢查點的執行。當到達一個檢查點的觸發周期,Coordinator 會對所有的 Source Task 下發 TriggerCheckpoint 消息,source task 會在自身完成快照后向下游廣播 CheckpointBarrier,作為下游 task 觸發的通知。其中,如果一個 task 在執行檢查點時失敗了,這取決于用戶是否容忍這個失敗(通過一個配置項),如果選擇不容忍那么這個失敗將變成一個異常導致 task 的失敗,與此同時 task 的失敗將會通知到 JobMaster,JobMaster 將會通知這個 Job 的其他 task 取消它們的執行?,F有的機制存在一些問題:
-
Coordinator 并不能控制 Job 是否容忍檢查點失敗,因為控制權在 task 端;
-
Coordinator 當前的失敗處理代碼邏輯混亂,區分出了觸發階段,卻忽略了執行階段;
-
無法實現容忍多少個連續的檢查點失敗則讓 Job 失敗的邏輯。
了解了現有的實現機制,我們再來看接下來的改進方案。首先,我們對源碼中 checkpoint package 下的相關類進行了重構,使得它不再區分觸發階段,引進了更多的檢查點失敗原因的枚舉并重構了相關的代碼。然后我們引入了 CheckpointFailureManager 組件,用來統一失敗管理,同時為了適配更靈活的容忍失敗的能力,我們引入了檢查點失敗計數器機制?,F在,當我們遇到檢查點失敗后,這個失敗信息會直接上報到 Coordinator,而是否要讓 Job 失敗具體的決策則由 CheckpointFailureManager 作出,這就使得 Coordinator 具有了完整的檢查點控制權,而決策權轉讓給 CheckpointFailureManager,則充分實現了邏輯解耦。
下面我們要看的這個特性是對 Flink 原生窗口的增強,所以我們稱之為 Enhanced window。大家都知道 Flink 的 EventTime 語義的窗口無法保證任意延遲到達的數據都能參與窗口計算,它只允許你設置一個容忍延遲的時間。但我們的應用場景里,數據的延遲可能非常高,甚至有時跨天的也會發生,但我們無法為常規的窗口設置這么長的延遲時間,并且我們的業務無法容忍延遲數據被丟棄的行為。因此針對這種場景,Flink 自帶的窗口無法滿足我們的需求。所以,我們對它做了一些改進,它能夠容忍任意延遲到來的事件,所有的事件都不會被丟棄,而是會加入一個新的窗口重新計算,新窗口跟老窗口毫無關系,所以最終可能針對一個窗口在用戶的目標表中會存在多條記錄,用戶只需自行聚合即可。
為了方便在上層使用這種窗口,我們為它定制了 SQL 關鍵字,這幅圖展示了我們在指標統計場景中使用它的一個示例。
這是我們根據業務需求所定制的另一個窗口——增量窗口。在業務中經常遇到這樣的需求:希望看到一個窗口周期內的增量變化,這個窗口周期可能會很長,比如一個天級別的窗口。比如我們希望看到一天內每個小時的 PV 增長趨勢,或者游戲中的一些虛擬物品的消耗趨勢。Flink 默認的翻滾窗口以及觸發器是沒有內置這種窗口內小批次觸發的功能。當然我們也可以通過一個個的小窗口來計算階段性的結果,然后再對數據進行二次處理,但這樣會比較麻煩。所以我們實現了大窗口內多次增量觸發的功能,擴展實現了一個窗口內多次觸發的 Trigger,并定制了相應的 SQL 語法來供業務使用。這里我們可以看到雖然是大窗口,但由于數據都在不斷地進行增量聚合,所以并不會 hold 住非常大的狀態集。
這幅圖展示了增量窗口的使用方式,通過新的關鍵字,底層會映射到我們自實現的觸發器。
接下來我們要看的這個特性是我們對 Flink keyBy 的優化,我們稱之為:LocalKeyBy。我們在使用 KeyBy 的時候都遇到過數據熱點的問題,也就是數據傾斜。數據傾斜主要是業務數據的 key 取值不夠離散,而 keyBy 背后是 hash 的 partition 方式,它根據一個 key 的 hash 值來決定數據要落到哪個節點分區。如果發生數據傾斜很容易造成計算資源利用不均以及反壓(back pressure)等問題產生。針對這一點,我們在保證計算語義的情況下對 keyBy 進行了優化,開發了 LocalKeyBy 功能。它的原理是通過本地預聚合來減少發送的數據量,但這里需要注意的一點是:使用這個算子的時候,需要對原有的實現代碼進行調整,因為它將原來的 keyBy 拆分為了兩步:預聚合以及合并。
我們在本地對 keyBy 與 LocalKeyBy 做了一個簡單的性能對比測試,發現在流量傾斜嚴重的情況下,使用 LocalKeyBy 整體性能并沒有受到太大的影響,但 Flink 原生的 keyBy 則隨著流量的傾斜而產生顯著的性能下降。
我們繼續來看一個特性:水位線算子定時檢測流分區空閑的功能。Flink 社區目前針對 Source 實現了定時的流 idle 檢測功能(雖然沒有開放),它主要針對的場景是 Kafka 某個分區空閑無數據從而造成對應的 subtask 無法正常提取 watermark,導致對下游的計算產生影響。
但我們的場景和社區略有差別,我們沒有將所有的邏輯都壓到 source 里,為了進行邏輯拆分我們引入了一個 transform 算子,它專門針對 ETL 的場景,所以我們的 watermark 很多情況下不在 source 算子上提取,而是在下游的某個算子上,在某些情況下,如果 watermark 的分配算子在 filter 之類的算子后面,則可能造成某個 pipeline 在中間斷流,也造成了無法正常提取 watermark 的情形。針對這種場景,我們在提取 watermark 的算子上也實現了定時檢測流 idle 的功能。這樣就算因為某個分區的數據都被過濾掉造成空閑,也不至于對下游的計算產生影響。
我們介紹的下一個特性是 Framework 與用戶業務日志的分離。這個特性其實最受益的是 Standalone 部署模式,因為這種模式下多 job 的 task 是混合部署在同一個 TaskManager 中的,而 TaskManager 本身只使用一個日志文件來記錄日志。所以,這導致排查業務問題非常麻煩。另外,我們對 Flink web UI 展示日志文件也做了一些改進,我們會列出 JobManager 以及 TaskManager 的日志文件夾中所有的文件列表。這是因為,隨著流應用長時間運行,累積的日志量會越來越大,我們通常都會對應用的日志配置滾動策略,除此之外我們還會輸出 GC 日志等,而 Flink 的 web UI 默認只能展示最新的那個日志文件,這對于我們定位問題很不方便。所以,我們引入了一個新的 tab,它能夠列出日志文件夾下的文件列表,點擊后再請求特定的日志文件。
在分析這個特性的實現之前,我們需要先了解 Flink 目前加載日志框架類的方式,它為了避免跟業務 Job 中可能包含的日志框架的依賴、配置文件產生沖突,日志相關類的加載都代理給平臺的類加載器,也就是 TaskManager 的類加載器,而 TaskManager 本身加載的這些類都是從 Flink 安裝包的 lib 底下加載的。而關于日志配置文件,Flink 通過 JVM 啟動參數來指定配置文件路徑以及日志文件路徑。這些機制共同保證了 Flink 不會受到用戶 job jar 的干擾。
所以,如果我們要實現日志分離,我們就需要打破 Flink 原先的實現機制,關鍵點在于:為不同 Job 的 Task 加載不同的日志類;為不同 Job 的 Task 指定不同的配置文件以及用戶日志文件的路徑。這意味著我們需要定制 Flink 自帶的 user classloader。針對第一點,我們不再將這些日志類的加載代理給平臺的加載器,而是將平臺類加載器中日志相關的 jar 的 classpath 加入到各個 task 自己的 classloader 中。關于配置文件,我們顯然也不能用 Flink 平臺的配置文件。我們會拿平臺使用的配置文件作為模板,對其內部的日志路徑進行動態修改,然后將內存中的這個配置文件傳遞給特定的日志框架。那么這里就有一個問題,內存中的配置文件二進制數據怎么被日志框架讀取。log4j 以及 logback 都可以接收配置文件的 URL 表示,而 URL 也可以接收一個 URLStreamHandler 的實現(它是所有流協議的處理器用于連接二進制的數據流與 URL),通過效仿 bytebuddy(一個動態修改 Java 二進制字節碼的類庫),我們實現了 ByteArrayUrlStreamHandler 來進行二進制的配置文件跟 URL 之間的銜接,這兩點完成后不同 Job 的 Task 的類加載器就保證了日志類加載和配置的完全獨立性。
目前,我們內部所定制優化的一些特性有些已逐步反饋給社區,還有一些比較大的改動也在跟社區商討合并計劃。我們歡迎有志于迎接萬億級數據規模挑戰以及參與 Flink 引擎研發的同學加入我們。
總結
以上就是所有的分享內容,由于表達與時間關系,可能很多點不能很好地闡述清楚,如果大家有想法我們可以再線下交流。也歡迎大家掃碼了解騰訊的大數據產品、騰訊云上的流計算服務以及 Oceanus 的功能。
轉載于:https://www.cnblogs.com/davidwang456/articles/10953252.html
總結
以上是生活随笔為你收集整理的腾讯基于 Flink 的实时流计算平台演进之路的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: “分库分表 ?选型和流程要慎重,否则会失
- 下一篇: MongoDB如何一次插入多条json数