快手基于 Apache Flink 的优化实践
本次由快手劉建剛老師分享,內容主要分為三部分。首先介紹流式計算的基本概念, 然后介紹 Flink 的關鍵技術,最后講講 Flink 在快手生產實踐中的一些應用,包括實時指標計算和快速 failover。
?
一、流式計算的介紹
?
流式計算主要針對 unbounded data(無界數據流)進行實時的計算,將計算結果快速的輸出或者修正。
?
這部分將分為三個小節來介紹。第一,介紹大數據系統發展史,包括初始的批處理到現在比較成熟的流計算;第二,為大家簡單對比下批處理和流處理的區別;第三,介紹流式計算里面的關鍵問題,這是每個優秀的流式計算引擎所必須面臨的問題。
?
1、大數據系統發展史
?
?
上圖是 2003 年到 2018 年大數據系統的發展史,看看是怎么一步步走到流式計算的。
?
2003 年,Google 的 MapReduce 橫空出世,通過經典的 Map&Reduce 定義和系統容錯等保障來方便處理各種大數據。很快就到了 Hadoop,被認為是開源版的 ?MapReduce, 帶動了整個apache開源社區的繁榮。再往后是谷歌的 Flume,通過算子連接等 pipeline 的方式解決了多個 MapReduce 作業連接處理低效的問題。
?
流式系統的開始以 Storm 來介紹。Storm 在2011年出現, 具備延時短、性能高等特性, 在當時頗受喜愛。但是 Storm 沒有提供系統級別的 failover 機制,無法保障數據一致性。那時的流式計算引擎是不精確的,lamda 架構組裝了流處理的實時性和批處理的準確性,曾經風靡一時,后來因為難以維護也逐漸沒落。
?
接下來出現的是 Spark Streaming,可以說是第一個生產級別的流式計算引擎。Spark Streaming 早期的實現基于成熟的批處理,通過 mini batch 來實現流計算,在 failover 時能夠保障數據的一致性。
?
Google 在流式計算方面有很多探索,包括 MillWheel、Cloud Dataflow、Beam,提出了很多流式計算的理念,對其他的流式計算引擎影響很大。
?
再來看 Kafka。Kafka 并非流式計算引擎,但是對流式計算影響特別大。Kafka 基于log 機制、通過 partition 來保存實時數據,同時也能存儲很長時間的歷史數據。流式計算引擎可以無縫地與kafka進行對接,一旦出現 Failover,可以利用 Kafka 進行數據回溯,保證數據不丟失。另外,Kafka 對 table 和 stream 的探索特別多,對流式計算影響巨大。
?
Flink 的出現也比較久,一直到 2016 年左右才火起來的。Flink 借鑒了很多 Google 的流式計算概念,使得它在市場上特別具有競爭力。后面我會詳細介紹 Flink 的一些特點。
?
2、批處理與流計算的區別
?
批處理和流計算有什么樣的區別,這是很多同學有疑問的地方。我們知道 MapReduce 是一個批處理引擎,Flink 是一個流處理引擎。我們從四個方面來進行一下對比:
?
1)使用場景
?
MapReduce 是大批量文件處理,這些文件都是 bounded data,也就是說你知道這個文件什么時候會結束。相比而言,Flink 處理的是實時的 unbounded data,數據源源不斷,可能永遠都不會結束,這就給數據完備性和 failover 帶來了很大的挑戰。
?
2)容錯
?
MapReduce 的容錯手段包括數據落盤、重復讀取、最終結果可見等。文件落盤可以有效保存中間結果,一旦 task 掛掉重啟就可以直接讀取磁盤數據,只有作業成功運行完了,最終結果才對用戶可見。這種設計的哲理就是你可以通過重復讀取同一份數據來產生同樣的結果,可以很好的處理 failover。
?
Flink 的容錯主要通過定期快照和數據回溯。每隔一段時間,Flink就會插入一些 barrier,barrier 從 source 流動到 sink,通過 barrier 流動來控制快照的生成。快照制作完就可以保存在共享引擎里。一旦作業出現問題,就可以從上次快照進行恢復,通過數據回溯來重新消費。
?
3)性能
?
MapReduce 主要特點是高吞吐、高延時。高吞吐說明處理的數據量非常大;高延時就是前面說到的容錯問題,它必須把整個作業處理完才對用戶可見。
?
Flink 主要特點是高吞吐、低延時。在流式系統里,Flink 的吞吐是很高的。同時,它也可以做到實時處理和輸出,讓用戶快速看到結果。
?
4)計算過程
?
MapReduce 主要通過 Map 和 reduce 來計算。Map 負責讀取數據并作基本的處理, reduce 負責數據的聚合。用戶可以根據這兩種基本算子,組合出各種各樣的計算邏輯。
?
Flink 為用戶提供了 pipeline 的 API 和批流統一的 SQL。通過 pipeline ?的 API, 用戶可以方便地組合各種算子構建復雜的應用;Flink SQL 是一個更高層的 API 抽象,極大地降低了用戶的使用門檻。
?
3、流式計算的關鍵問題
?
這部分主要通過四個問題給大家解答流式計算的關鍵問題,也是很多計算引擎需要考慮的問題。
?
1)What
?
What 是指通過什么樣的算子來進行計算。主要包含三個方面的類型,element-wise 表示一對一的計算,aggregating 表示聚合操作,composite 表示多對多的計算。
?
2)Where
?
aggregating 會進行一些聚合的計算, 主要是在各種 window 里進行計算。窗口包含滑動窗口、滾動窗口、會話窗口。窗口會把無界的數據切分成有界的一個個數據塊進行處理,后面我們會詳細介紹這點。
?
3)When
?
When 就是什么時候觸發計算。窗口里面有數據,由于輸入數據是無窮無盡的,很難知道一個窗口的數據是否全部到達了。流式計算主要通過 watermark 來保障數據的完備性,通過 trigger 來決定何時觸發。當接收到數值為 X 的 Watermark 時,可以認為所有時間戳小于等于X的事件全部到達了。一旦 watermark 跨過窗口結束時間,就可以通過 trigger 來觸發計算并輸出結果。
?
4)How
?
How 主要指我們如何重新定義同一窗口的多次觸發結果。前面也說了 trigger 是用來觸發窗口的, 一個窗口可能會被觸發多次,比如1分鐘的窗口每 10 秒觸發計算一次。處理方式主要包含三種:
?
- Discarding,丟棄之前的狀態重新計算。這種方式每次的觸發結果都是互不關聯的,多次觸發結果的組合反映了全部的窗口內容,下游一般會再次聚合;
- Accumulating,這個就是一個聚合的狀態,比如說第二次觸發的時候是在第一次的結果上進行計算的,下游只需要保存最新的結果即可;
- Accumulating 和 retracting,這個主要在 Accumulating 的基礎上加了一個 retracting,retracting 的意思就是撤銷。窗口再次觸發時,會告訴下游撤銷上一次的計算結果,并告知最新的結果。Flink SQL 的聚合就使用了這種 retract的模式。
?
二、Flink 關鍵技術
?
1、Flink 簡介
?
Flink 是一款分布式計算引擎, 既可以進行流式計算,也可以進行批處理。下圖是官網對 Flink 的介紹:
?
?
Flink 可以運行在 k8s、yarn、mesos 等資源調度平臺上,依賴 hdfs 等文件系統,輸入包含事件和各種其他數據,經過 Flink 引擎計算后再輸出到其他中間件或者數據庫等。
?
Flink 有兩個核心概念:
?
- State:Flink 可以處理有狀態的數據,通過自身的 state 機制來保障作業failover時數據不丟失;
- Event Time:允許用戶按照事件時間來處理數據,通過 watermark 來推動時間前進,這個后面還會詳細介紹。主要是系統的時間和事件的時間。
?
Flink 主要通過上面兩個核心技術來保證 exactly-once, 比如說作業 Failover 的時候狀態不丟失,就好像沒發生故障一樣。
?
2、快照機制
?
Flink 的快照機制主要是為了保障作業 failover 時不丟失狀態。Flink 提供了一種輕量級的快照機制,不需要停止作業就可以幫助用戶持久化內存中的狀態數據。
?
?
上圖中的 markers(與 barrier 語義相同)通過流動來觸發快照的制作,每一個編號都代表了一次快照,比如編號為 n 的 markers 從最上游流動到最下游就代表了一次快照的制作過程。簡述如下:
?
- 系統發送編號為 n 的 markers 到最上游的算子,markers 隨著數據往下游流動;
- 當下游算子收到 marker 后,就開始將自身的狀態保存到共享存儲中;
- 當所有最下游的算子接收到 marker 并完成算子快照后,本次作業的快照制作完成。
?
一旦作業失敗,重啟時就可以從快照恢復。
?
下面為一個簡單的 demo 說明(barrier 等同于 marker)。
?
?
- barrier 到達 Source,將狀態 offset=7 存儲到共享存儲;
- barrier 到達 Task,將狀態 sum=21 存儲到共享存儲;
- barrier 到達 Sink,commit 本次快照,標志著快照的成功制作。
?
?
這時候突然間作業也掛掉, 重啟時 Flink 會通過快照恢復各個狀態。Source 會將自身的 offset 置為 7,Task 會將自身的 sum 置為 21。現在我們可以認為 1、2、3、4、5、6 這 6 個數字的加和結果并沒有丟失。這個時候,offset 從 7 開始消費,跟作業失敗前完全對接了起來,確保了 exactly-once。
?
3、事件時間
?
時間類型分為兩種:
?
- Event time(事件時間),指事件發生的時間,比如采集數據時的時間;
- Processing time(系統時間),指系統的時間,比如處理數據時的時間。
?
如果你對數據的準確性要求比較高的話,采用 Event time 能保障 exactly-once。Processing Time 一般用于實時消費、精準性要求略低的場景,主要是因為時間生成不是 deterministic。
?
我們可以看下面的關系圖, X 軸是 Event time,Y 軸是 Processing time。理想情況下 Event time 和 Processing time 是相同的,就是說只要有一個事件發生,就可以立刻處理。但是實際場景中,事件發生后往往會經過一定延時才會被處理,這樣就會導致我們系統的時間往往會滯后于事件時間。這里它們兩個的差 Processing-time lag 表示我們處理事件的延時。
?
?
事件時間常用在窗口中,使用 watermark 來確保數據完備性,比如說 watermarker 值大于 window 末尾時間時,我們就可以認為 window 窗口所有數據都已經到達了,就可以觸發計算了。
?
?
比如上面 [0-10] 的窗口,現在 watermark 走到了 10,已經到達了窗口的結束,觸發計算 SUM=21。如果要是想對遲到的數據再進行觸發,可以再定義一下后面 late data 的觸發,比如說后面來了個 9,我們的 SUM 就等于 30。
?
4、窗口機制
?
窗口機制就是把無界的數據分成數據塊來進行計算,主要有三種窗口。
?
- 滾動窗口:固定大小的窗口,相鄰窗口沒有交集;
- 滑動窗口:每個窗口的大小是一樣的,但是兩個窗口之間會有重合;
- 會話窗口:根據活躍時間聚合而成的窗口, 比如活躍時間超過3分鐘新起一個窗口。窗口之間留有一定的間隔。
?
?
窗口會自動管理狀態和觸發計算,Flink 提供了豐富的窗口函數來進行計算。主要包括以下兩種:
?
- ProcessWindowFunction,全量計算會把所有數據緩存到狀態里,一直到窗口結束時統一計算。相對來說,狀態會比較大,計算效率也會低一些;
- AggregateFunction,增量計算就是來一條數據就算一條,可能我們的狀態就會特別的小,計算效率也會比 ProcessWindowFunction 高很多,但是如果狀態存儲在磁盤頻繁訪問狀態可能會影響性能。
?
?
三、快手 Flink 實踐
?
1、應用概括
?
快手應用概括主要是分為數據接入、Flink 實時計算、數據應用、數據展示四個部分。各層各司其職、銜接流暢,為用戶提供一體化的數據服務流程。
?
?
2、實時指標計算
?
常見的實時指標計算包括 uv、pv 和 sum。這其中 uv 的計算最為復雜也最為經典。下面我將重點介紹 uv。
?
uv 指的是不同用戶的個數,我們這邊計算的就是不同 deviceld 的個數,主要的挑戰來自三方面:
?
- 用戶數多,數據量大。活動期間的 QPS 經常在千萬級別,實際計算起來特別復雜;
- 實時性要求高,通常為幾秒到分鐘結果的輸出;
- 穩定性要求高,比如說我們在做春晚活動時候要求故障時間需要低于2%或更少。
?
針對各種各樣的 uv 計算,我們提供了一套成熟的計算流程。主要包含了三方面:
?
- 字典方案:將 string 類型的 deviceld 轉成 long 類型,方便后續的 uv 計算;
- 傾斜處理:比如某些大 V 會導致數據嚴重傾斜,這時候就需要打散處理;
- 增量計算:比如計算 1 天的 uv,每分鐘輸出一次結果。
?
字典方案需要確保任何兩個不同的 deviceId 不能映射到相同的 long 類型數字上。快手內部主要使用過以下三種方案:
?
?
- HBase, 基于 partition 分區建立 deviceld 到 id 的映射, 通過緩存和批量訪問來加速;
- Redis, 這種方案嚴格來說不屬于字典,主要通過 key-value 來判斷數據是否首次出現,基于首次數據來計算 uv,這樣就會把 pv 和 uv 的計算進行統一;
- 最后就是一個 Flink 內部自建的全局字典實現 deviceld 到 id 的轉換,之后計算UV。
?
這三種方案里面,前兩種屬于外部存儲的字典方案,優點是可以做到多個作業共享 1 份數據, 缺點是外部訪問慢而且不太穩定。最后一種 Flink 字典方案基于 state,不依賴外部存儲, 性能高但是無法多作業共享。
?
接下來我們重點介紹基于Flink自身的字典方案,下圖主要是建立一個 deviceld 到 id 的映射:
?
?
主要分成三步走:
?
1)建立 Partition 分區, 指定一個比較大的 Partition 分區個數,該個數比較大并且不會變,根據 deviceld 的哈希值將其映射到指定 partition。
?
2)建立 id 映射。每個 Partition 都有自己負責的 id 區間,確保 Partition 之間的long 類型的 id 不重復, partition 內部通過自增 id 來確保每個 deviceId 對應一個 id。
?
3)使用 keyed state 保存 id 映射。這樣我們的作業出現并發的大改變時,可以方便的 rescale,不需要做其他的操作。
?
除了 id 轉換,后面就是一個實時指標計算的常見問題,就是數據傾斜。業界常見的解決數據傾斜處理方案主要是兩種:
?
- 打散再聚合:先將傾斜的數據打散計算,然后再聚合計算結果;
- Local-aggregate:先在本地計算預聚合,這樣會大大減少下游的數據壓力。
?
二者的本質是一樣的,都是先預聚合再匯總,從而避免單點性能問題。
?
?
上圖為計算最小值的熱點問題,紅色數據為熱點數據。如果直接將它們打到同一個分區,會出現性能問題。為了解決傾斜問題,我們通過hash策略將數據分成小的 partition 來計算,如上圖的預計算,最后再將中間結果匯總計算。
?
當一切就緒后,我們來做增量的 UV 計算,比如計算 1 天 uv,每分鐘輸出 1 次結果。計算方式既可以采用 API,也可以采用 SQL。
?
針對 API,我們選擇了 global state+bitmap 的組合,既嚴格遵循了 Event Time 又減少了 state 大小:
?
?
下面為計算流程(需要注意時區問題):
?
- 定義跟觸發間隔一樣大小的 window(比如 1 分鐘);
- Global state 用來保存跨窗口的狀態,我們采用 bitmap 來存儲狀態;
- 每隔一個 window 觸發一次,輸出起始至今的 UV;
- 當前作用域(比如 1 天)結束,清空狀態重新開始。
?
針對 SQL,增量計算支持的還不是那么完善,但是可以利用 early-fire 的參數來提前觸發窗口。
?
配置如下:
?
table.exec.emit.early-fire.enabled: truetable.exec.emit.early-fire.delay:60 s?
early-fire.delay 就是每分鐘輸出一次結果的意思。
SQL 如下:
?
SELECT TUMBLE_ROWTIME(eventTime, interval ‘1’ day) AS rowtime, dimension, count(distinct id) as uv FROM person GROUP BY TUMBLE(eventTime, interval '1' day), dimension?
如果遇到傾斜,可以參考上一步來處理。
?
3、快速 failover
?
最后看下我們部門最近發力的一個方向,如何快速 failover。
?
Flink 作業都是 long-running 的在線作業,很多對可用性的要求特別高,尤其是跟公司核心業務相關的作業,SLA 要求 4 個 9 甚至更高。當作業遇到故障時,如何快速恢復對我們來說是一個巨大的挑戰。
?
下面分三個方面來展開:
?
- Flink 當前已有的快速恢復方案;
- 基于 container 宕掉的快速恢復;
- 基于機器宕掉的快速恢復。
?
1)Flink 當前已有的快速恢復方案
?
Flink 當前已有的快速恢復方案主要包括以下兩種:
?
- region failover。如果流式作業的 DAG 包含多個子圖或者 pipeline,那么 task 失敗時只會影響其所屬的子圖或者 pipeline ,而不用整個 DAG 都重新啟動;
- local recovery。在 Flink 將快照同步到共享存儲的同時,在本地磁盤也保存一份快照。作業失敗恢復時,可以調度到上次部署的位置,并從 local disk 進行快照恢復。
?
2)基于 container 宕掉的快速恢復
?
實際環境中, container 宕掉再申請有時會長達幾十秒,比如因為 hdfs 慢、yarn 慢等原因,嚴重影響恢復速度。為此,我們做了如下優化:
?
- 冗余資源。維持固定個數的冗余 container,一旦 container 宕掉,冗余 container 立刻候補上來,省去了繁雜的資源申請流程;
- 提前申請。一旦發現作業因為 container 宕掉而失敗,立刻申請新的 container 。
?
?
以上優化覆蓋了很大一部分場景,恢復時間從 30s-60s 降到 20s 以內。
?
3)基于機器宕掉的快速恢復
?
機器宕掉時,flink on yarn 的恢復時間超過 3 分鐘,這對重要作業顯然是無法容忍的!為了做到快速恢復,我們需要做到快速感知和恢復:
?
- 冗余資源并打散分配,確保兩個冗余資源不在一個 container,redundantContainerNum=max(containerNumOfHost) + 1;
- 作業宕機,Hawk 監測系統 5 秒內發現;
- 冗余資源快速候補,免去申請資源的流程。
?
?
通過這種方案,我們可以容忍任意一臺機器的宕機,并將宕機恢復時間由原先的 3 分鐘降低到 30 秒以內。
?
四、總結
?
本文從大數據系統的發展入手,進而延伸出流式系統的關鍵概念,之后介紹了 Flink的關鍵特性,最后講解了快手內部的實時指標計算和快速 failover,希望對大家有所幫助。
?
五、Q&A
?
Q1:打算做實時計算,可以跳過 Storm、Spark 直接上手 Flink 嗎?
?
A:可以直接使用 Flink。Storm 在 failover 時會丟失數據,無法做到 exactly-once;spark streaming 是 Flink 的競爭者,是在批處理的基礎上實現流計算,相比而言,Flink 的底層是流處理,更加適合流計算。
?
Q2:一般怎么處理 taskmanager heartbeat timeout?
?
A:默認 10 秒匯報一次心跳,心跳超時為 50 秒,這個時候作業會失敗,如果配置了高可用那么會重啟。
?
Q3:如何保證 2 天大時間跨度延遲消息的窗口計算?
?
A:這里主要的挑戰在于時間長、狀態大,建議 stateBakend 使用 Rocksdb(可以利用磁盤存儲大狀態),窗口計算建議使用增量計算來減少狀態的大小。
?
Q4:Flink on Yarn,Yarn 重啟會自動拉起 Flink 任務嗎,說不能拉起怎么處理,手動啟動嗎?
?
A:如果配置了高可用(依賴 zookeeper),作業失敗了就可以自動拉起。
?
Q5:Kafka 目前多用作數據中轉平臺,Flink 相當于替代了 Kafka Stream 嗎?
?
?A:Kafka的核心功能是消息中間件,kafka stream 可以跟 kafka 很好的集成,但并不是一個專業的計算引擎。相比而言,flink 是一個分布式的流式計算引擎,功能上更加強大。
?
Q6:你們怎么看待 Apache Beam?
?
A:Apache Beam 在上層進行了抽象,可以類比 SQL,只定義規范,底層可以接入各種計算引擎。 ? ?
?
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的快手基于 Apache Flink 的优化实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【产品动态】一文详细解读智能数据构建产品
- 下一篇: 【详谈 Delta Lake 】系列技术