SmartNews:基于 Flink 加速 Hive 日表生产的实践
簡介:?將 Flink 無縫地集成到以 Airflow 和 Hive 為主的批處理系統的技術挑戰和應對方案。
本文介紹了 SmartNews 利用 Flink 加速 Hive 日表的生產,將 Flink 無縫地集成到以 Airflow 和 Hive 為主的批處理系統的實踐。詳細介紹過程中遇到的技術挑戰和應對方案,以供社區分享。主要內容為:
一、項目背景
SmartNews 是一家機器學習驅動的互聯網公司。自 2012 年于日本東京成立,并在美國和中國設有辦公室。經過 8 年多的發展,SmartNews 已經成長為日本排名第一,美國成長最快的新聞類應用,覆蓋全球超過 150 多個國家市場。據 2019 年初統計,SmartNews 的 iOS 和 Android 版本全球累計下載量已經超過 5000 萬次。
SmartNews 在過去 9 年的時間,基于 Airflow, Hive, EMR 等技術棧構建了大量的數據集。隨著數據量的增長,這些離線表的處理時間在逐漸拉長。另外,隨著業務方迭代節奏的加快,對表的實時性也提出了更高的要求。因此,SmartNews 內部發起了 Speedy Batch 的項目,以加快現有離線表生產效率。
本次分享便是 Speedy Batch 項目中的一個例子,加速用戶行為 (actions) 表的實踐。
APP 端上報的用戶行為日志,每日通過 Hive 作業生成日表,這個表是許多其他表的源頭,至關重要。這個作業需要運行 3 個小時,進而拉高了許多下游表的延遲 (Latency),明顯影響數據科學家、產品經理等用戶的使用體驗。因此我們需要對這些作業進行提速,讓各個表能更早可用。
公司業務基本上都在公有云上,服務器的原始日志以文件形式上傳至云存儲,按日分區;目前的作業用 Airflow 調度到 EMR 上運行,生成 Hive 日表,數據存儲在云存儲。
二、問題的定義
1. 輸入
新聞服務器每隔 30 秒上傳一個原始日志文件,文件上傳至相應日期和小時的云存儲目錄。
2. 輸出
原始日志經過 ETL 處理之后,按日 (dt) 和行為 (action) 兩級分區輸出。action 種類約 300 個,不固定,常有增減。
3. 用戶
對這個表的使用是廣泛的,多途徑的。有從 Hive 里查詢,也有從 Presto,Jupyter 和 Spark 里查詢,我們甚至不能確定以上就是全部的訪問途徑。
三、項目的目標
對下游用戶保持透明。透明又分兩個方面:
- 功能方面:用戶無需修改任何代碼,做到完全無感
- 性能方面:新項目產生的表,不應該導致下游讀取時的性能下降
四、技術選型
在本項目之前,同事已經對該作業做了多輪次改進,效果不是很顯著。
嘗試過的方案包括增加資源,投入更多的機器,但遇到了云存儲的 IOPS 限制:每個 prefix 最多支持 3000 個并發讀寫,這個問題在輸出階段尤為明顯,即多個 reducer 同時向同一個 action 子目錄輸出的時候,容易碰到這個限制。另外還嘗試了按小時預處理,然后到每日凌晨再合并成日表,但合并過程亦耗時較多,整體時延還是在 2.5 小時左右,效果不夠顯著。
鑒于服務器端的日志是近實時上傳至云存儲,團隊提出了流式處理的思路,摒棄了批作業等待一天、處理 3 小時的模式,而是把計算分散在一整天,進而降低當天結束后的處理用時。團隊對 Flink 有比較好的背景,加上 Flink 近期對 Hive 的改進較多,因此決定采用基于 Flink 的方案。
五、技術挑戰
挑戰是多方面的。
1. 輸出 RC 文件格式
當前 Hive 表的文件格式為 RCFile,為了保證對用戶的透明,我們只能在現有的 Hive 表上做 in-place 的 upgrade,也就是我們得重用當前表,那么 Flink 輸出的文件格式也得符合 RCFile 格式,因為一張 Hive 表只能有一個格式。
RCFile 屬于 bulk format (相對應的是 row format),在每次 checkpoint 時必須一次性輸出。如果我們選擇 5 分鐘一次 checkpoint,那么每個 action 每 5 分鐘必須輸出一個文件,這會大量增加結果文件數,進而影響下游的讀取性能。特別是對于低頻 action,文件數會上百倍的增加。我們了解了 Flink 的文件合并功能,但那是在一個 checkpoint 內多個 sink 數據的合并,這并不能解決我們的問題,我們需要的是跨 checkpoint 的文件合并。
團隊考慮過以 row format (e.g. CSV) 輸出,然后實現自定義的 Hive SerDe,使之兼容 RCFile 和 CSV。但很快我們放棄了這個設想,因為那樣的話,需要為每個查詢場景實現這個 Hybrid 的 SerDe,例如需要為 Presto 實現,為 Spark 實現,等等。
- 一方面我們沒法投入這么多資源;
- 另一方面那種方案也是用戶有感的,畢竟用戶還是需要安裝這個自定義的 SerDe。
我們之前提出了生成一個新格式的表,但也因為對用戶不夠透明而被否決。
2. Partition 的可感知性和完整性
如何讓下游作業能感知到當天這個 partition 已經 ready?actions 表分兩級 partition, dt 和 action。action 屬于 Hive 的 dynamic partition,數量多且不固定。當前 Airflow 下游作業是等待 insert_actions 這個 Hive 任務完成后,再開始執行的。這個沒問題,因為 insert_actions 結束時,所有 action 的 partition 都已經 ready 了。但對于 Flink 作業來說,沒有結束的信號,它只能往 Hive 里面提交一個個的 partition,如 dt=2021-05-29/action=refresh。因為 action 數量多,提交 partition 的過程可能持續數分鐘,因此我們也不能讓 Airflow 作業去感知 dt 級別的 partition,那樣很可能在只有部分 action 的情況下觸發下游。
3. 流式讀取云存儲文件
項目的輸入是不斷上傳的云存儲文件,并非來自 MQ (message queue)。Flink 支持 FileStreamingSource,可以流式的讀入文件,但那是基于定時 list 目錄以發現新的文件。但這個方案不適合我們的場景,因為我們的目錄太大,云存儲 list 操作根本無法完成。
4. Exactly Once 保證
鑒于 actions 表的重要性,用戶無法接受任何的數據丟失或者重復,因此整個方案需要保證恰好一次的處理。
六、整體方案及挑戰應對
1. 輸出 RCFile 并且避免小文件
我們最終選擇的方案是分兩步走,第一個 Flink 作業以 json (row format) 格式輸出,然后用另外一個 Flink 作業去做 Json 到 RC 格式的轉化。以此解決 Flink 不能愉快的輸出合適大小 RC 文件的問題。
輸出 json 的中間結果,這樣我們可以通過 Rolling Policy 控制輸出文件的大小,可以跨多個 checkpoint 攢成足夠大,或者時間足夠長,然后再輸出到云存儲。這里 Flink 其實利用的是云存儲的 Multi Part Upload (MPU) 的功能,即每次 checkpoint Flink 也是把當前 checkpoint 攢下來的數據上傳至 云存儲,但輸出的不是文件,而是一個 part。最后當多個 part 達到大小或者時間要求,就可以調用云存儲的接口將多個 part 合并成一個文件,這個合并操作在云存儲端完成,應用端無需再次讀取這個 part 到本地合并然后再上傳。而 Bulk format 均需要一次性全局處理,因此無法分段上傳然后合并,必須一次性全部上傳。
當第二個作業感知到一個新的 json 文件上傳后,加載它,轉化成 RCFile,然后上傳到最終的路徑。這個過程帶來的延遲較小,一個文件可以控制在 10s 以內,這是可以接受的。
2. 優雅的感知輸入文件
輸入端,沒有采用 Flink 的 FileStreamingSource,而是采用云存儲的 event notification 來感知新文件的產生,接受到這個通知后再主動去加載文件。
3. Partition 的可感知性和完整性
輸出端,我們輸出 dt 級別的 success file,來讓下游可靠地感知日表的 ready。我們實現自定義的 StreamingFileWriter,使之輸出 partitionCreated 和 partitionInactive 的信號,并且通過實現自定義的 PartitionCommitter,來基于上述信號判斷日表的結束。
其機制如下,每個云存儲 writer 開始寫某個 action,會發出一個 partitionCreated 信號,當它結束時又發出 partitionInactive 信號。PartitionCommitter 判斷某一天之內是否所有的 partittion 都 inactive 了,如果是,則一天的數據都處理了,輸出 dt 級別的 success file,在 Airflow 通過感知這個文件來判斷 Flink 是否完成了日表的處理。
4. Exactly Once
云存儲的 event notification 提供 At Least once 保證。Flink 作業內對文件級別進行去重,作業采用 Exactly Once 的 checkpoint 設定,云存儲文件輸出基于 MPU 機制等價于支持 truncate,因此云存儲輸出等價于冪等,因此等價于端到端的 Exactly Once。
七、項目成果和展望
項目已經上線,時延維持在 34 分鐘上下,其中包括 15 分鐘的等待遲到文件。
- 第一個 Flink 作業需要 8 分鐘左右完成 checkpoint 和輸出,json 轉 rc 作業需要 12 分鐘完成全部處理。我們可以把這個時間繼續壓縮,但是綜合時效性和成本,我們選擇當前的狀態。
- json 轉 rc 作業耗時比當初的預想的要大,因為上游作業最后一個 checkpoint 輸出太多的文件,導致整體耗時長,這個可以通過增加作業的并發度線性的下降。
- 輸出的文件數比批作業輸出的文件數有所增加,增加 50% 左右。這是流式處理于批處理的劣勢,流式處理需要在時間到達時就輸出一個文件,而此時文件大小未必達到預期。好在這個程度的文件數增加不明顯影響下游的性能。
- 做到了下游的完全透明,整個上線前后,沒有收到任何用戶異常反饋。
該項目讓我們在生產環境驗證了利用流式處理框架 Flink 來無縫介入批處理系統,實現用戶無感的局部改進。將來我們將利用同樣的技術,去加速更多其他的 Hive 表的生產,并且廣泛提供更細粒度 Hive 表示的生產,例如小時級。另一方面,我們將探索利用 data lake 來管理批流一體的數據,實現技術棧的逐步收斂。
八、后記
由于采用完全不同的計算框架,且需要與批處理系統完全保持一致,團隊踩過不少的坑,限于篇幅,無法一一列舉。因此我們挑選幾個有代表的問題留給讀者思考:
- 為了驗證新作業產出的結果與原來 Hive 產出一致,我們需要對比兩者的輸出。那么,如何才能高效的比較兩個 Hive 表的一致性呢?特別是每天有百億級數據,每條有數百個字段,當然也包含復雜類型 (array, map, array等)。
- 兩個 Flink 作業的 checkpoint 模式都必須是 Exactly Once 嗎?哪個可以不是,哪個必須是?
- StreamFileWriter 只有在 checkpoint 時才接受到 partitionCreated 和 partitionInactive 信號,那么我們可以在它的 snapshotState() 函數里面輸出給下游 (下游會保存到 state) 嗎?
- 最后一問:你們有更好的方案可供我們參考嗎?
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的SmartNews:基于 Flink 加速 Hive 日表生产的实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 简单、有效、全面的Kubernetes监
- 下一篇: 阿里云云效技术专家分享:云原生开发、调测