Flink State 误用之痛,你中招了吗?
本文主要討論一個問題:ValueState 中存 Map 與 MapState 有什么區別?
如果不懂這兩者的區別,而且使用 ValueState 中存大對象,生產環境很可能會出現以下問題:
· CPU 被打滿
· 吞吐上不去
1、 結論
從性能和 TTL 兩個維度來描述區別。
性能
· RocksDB 場景,MapState 比 ValueState 中存 Map 性能高很多。
· 生產環境強烈推薦使用 MapState,不推薦 ValueState 中存大對象
· ValueState 中存大對象很容易使 CPU 打滿
· Heap State 場景,兩者性能類似。
TTL
Flink 中 State 支持設置 TTL:
· MapState 的 TTL 是基于 UK 級別的
· ValueState 的 TTL 是基于整個 key 的
舉一反三
能使用 ListState 的場景,不要使用 ValueState 中存 List。大佬們已經把 MapState 和 ListState 性能都做了很多優化,高性能不香嗎?下文會詳細分析 ValueState 和 MapState 底層的實現原理,通過分析原理得出上述結論。
2、 State 中要存儲哪些數據
ValueState 會存儲 key、namespace、value,縮寫為 。MapState 會存儲 key、namespace、userKey、userValue,縮寫為 。
解釋一下上述這些名詞。
Key
ValueState 和 MapState 都是 KeyedState,也就是 keyBy 后才能使用 ValueState 和 MapState。所以 State 中肯定要保存 key。
例如:按照 app 進行 keyBy,總共有兩個 app,分別是:app1 和 app2。那么狀態存儲引擎中肯定要存儲 app1 或 app2,用于區分當前的狀態數據到底是 app1 的還是 app2 的。
這里的 app1、app2 也就是所說的 key。
Namespace
Namespace 用于區分窗口。
假設需要統計 app1 和 app2 每個小時的 pv 指標,則需要使用小時級別的窗口。狀態引擎為了區分 app1 在 7 點和 8 點的 pv 值,就必須新增一個維度用來標識窗口。
Flink 用 Namespace 來標識窗口,這樣就可以在狀態引擎中區分出 app1 在 7 點和 8 點的狀態信息。
Value、UserKey、UserValue
ValueState 中存儲具體的狀態值。也就是上述例子中對應的 pv 值。MapState 類似于 Map 集合,存儲的是一個個 KV 鍵值對。為了與 keyBy 的 key 進行區分,所以 Flink 中把 MapState 的 key、value 分別叫 UserKey、UserValue。
下面講述狀態引擎是如何存儲這些數據的。
3、StateBackend 如何存儲和讀寫State 數據
Flink 支持三種 StateBackend,分別是:MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。
其中 MemoryStateBackend、FsStateBackend 兩種 StateBackend 在任務運行期間都會將 State 存儲在內存中,兩者在 Checkpoint 時將快照存儲的位置不同。RocksDBStateBackend 在任務運行期間將 State 存儲在本地的 RocksDB 數據庫中。
所以下文將 MemoryStateBackend、FsStateBackend 統稱為 heap 模式,RocksDBStateBackend 稱為 RocksDB 模式。
3.1 Heap 模式 ValueState 和 MapState 如何存儲
Heap 模式表示所有的狀態數據都存儲在 TM 的堆內存中,所有的狀態都存儲的原始對象,不會做序列化和反序列化。(注:Checkpoint 的時候會涉及到序列化和反序列化,數據的正常讀寫并不會涉及,所以這里先不討論。)
Heap 模式下,無論是 ValueState 還是 MapState 都存儲在 CopyOnWriteStateMap 中。
· key 、 Namespace 分別對應 CopyOnWriteStateMap 的 K、N。
· ValueState 的 value 對應 CopyOnWriteStateMap 的 V。
MapState 將會把整個 Map 作為 CopyOnWriteStateMap 的 V,相當于 Flink 引擎創建了一個 HashMap 用于存儲 MapState 的 KV 鍵值對。
具體 CopyOnWriteStateMap 是如何實現的,可以參考:萬字長文詳解 Flink 中的 CopyOnWriteStateTable。
回到正題:Heap 模式下,ValueState 中存 Map 與 MapState 有什么區別?
Heap 模式下沒有區別。
ValueState 中存 Map,相當于用戶手動創建了一個 HashMap 當做 V 放到了狀態引擎中。而 MapState 是 Flink 引擎幫用戶創建了一個 HashMap 當做 V 放到了狀態引擎中。
所以實質上 ValueState 中存 Map 與 MapState 都是一樣的,存儲結構都是 CopyOnWriteStateMap。區別在于 ValueState 是用戶手動創建 HashMap,MapState 是 Flink 引擎創建 HashMap。
3.2 RocksDB 模式 ValueState 和 MapState 如何存儲
RocksDB 模式表示所有的狀態數據存儲在 TM 本地的 RocksDB 數據庫中。RocksDB 是一個 KV 數據庫,且所有的 key 和 value 都是 byte 數組。所以無論是 ValueState 還是 MapState,存儲到 RocksDB 中都必須將對象序列化成二進制當前 kv 存儲在 RocksDB 中。
■ 3.2.1 ValueState 如何映射成 RocksDB 的 kv
ValueState 有 key、namespace、value 需要存儲,所以最簡單的思路:
1、將 ValueState 的 key 序列化成 byte 數組
2、將 ValueState 的 namespace 序列化成 byte 數組
3、將兩個 byte 數組拼接起來做為 RocksDB 的 key
4、將 ValueState 的 value 序列化成 byte 數組做為 RocksDB 的 value
然后就可以寫入到 RocksDB 中。
查詢數據也用相同的邏輯:將 key 和 namespace 序列化后拼接起來作為 RocksDB 的 key,去 RocksDB 中進行查詢,查詢到的 byte 數組進行反序列化就得到了 ValueState 的 value。
這就是 RocksDB 模式下,ValueState 的讀寫流程。
■ 3.2.2 MapState 如何映射成 RocksDB 的 kv
MapState 有 key、namespace、userKey、userValue 需要存儲,所以最簡單的思路:
1、將 MapState 的 key 序列化成 byte 數組
2、將 MapState 的 namespace 序列化成 byte 數組
3、將 MapState 的 userKey 序列化成 byte 數組
4、將三個 byte 數組拼接起來做為 RocksDB 的 key
5、將 MapState 的 value 序列化成 byte 數組做為 RocksDB 的 value
然后就可以寫入到 RocksDB 中。
查詢數據也用相同的邏輯:將 key、namespace、userKey 序列化后拼接起來作為 RocksDB 的 key,去 RocksDB 中進行查詢,查詢到的 byte 數組進行反序列化就得到了 MapState 的 userValue。
這就是 RocksDB 模式下,MapState 的讀寫流程。
3.3 RocksDB 模式下,ValueState 中存 Map 與 MapState 有什么區別?
■ 3.3.1 假設 Map 集合有 100 個 KV 鍵值對,具體兩種方案會如何存儲數據?
ValueState 中存 Map,Flink 引擎會把整個 Map 當做一個大 Value,存儲在 RocksDB 中。對應到 RocksDB 中,100 個 KV 鍵值對的 Map 集合會序列化成一個 byte 數組當做 RocksDB 的 value,存儲在 RocksDB 的 1 行數據中。
MapState 會根據 userKey,將 100 個 KV 鍵值對分別存儲在 RocksDB 的 100 行中。
■ 3.3.2 修改 Map 中的一個 KV 鍵值對的流程
ValueState 的情況,雖然要修改 Map 中的一個 KV 鍵值對,但需要將整個 Map 集合從 RocksDB 中讀出來。具體流程如下:
1、將 key、namespace 序列化成 byte 數組,生成 RocksDB 的 key
2、從 RocksDB 讀出 key 對應 value 的 byte 數組
3、將 byte 數組反序列化成整個 Map
4、堆內存中修改 Map 集合
5、將 Map 集合寫入到 RocksDB 中,需要將整個 Map 集合序列化成 byte 數組,再寫入
MapState 的情況,要修改 Map 中的一個 KV 鍵值對,根據 key、namespace、userKey 即可定位到要修改的那一個 KV 鍵值對。具體流程如下:
1、將 key、namespace、userKey 序列化成 byte 數組,生成 RocksDB 的 key
2、從 RocksDB 讀出 key 對應 value 的 byte 數組
3、將 byte 數組反序列化成 userValue
4、堆內存中修改 userValue 的值
5、將 userKey、userValue 寫入到 RocksDB 中,需要先序列化,再寫入
■ 3.3.3 結論
要修改 Map 中的一個 KV 鍵值對:
如果使用 ValueState 中存 Map,則每次修改操作需要序列化反序列化整個 Map 集合,每次序列化反序列大對象會非常耗 CPU,很容易將 CPU 打滿。
如果使用 MapState,每次修改操作只需要序列化反序列化 userKey 那一個 KV 鍵值對的數據,效率較高。
舉一反三:其他使用 ValueState、value 是大對象且 value 頻繁更新的場景,都容易將 CPU 打滿。
例如:ValueState 中存儲的位圖,如果每條數據都需要更新位圖,則可能導致 CPU 被打滿。
為了便于理解,上述忽略了一些實現細節,下面補充一下。
3.4 直接拼接 key 和 namespace 可能導致 RocksDB 的 key 沖突
假設 ValueState 中有兩個數據:
· key1 序列化后的二進制為 0x112233, namespace1 序列化后的二進制為0x4455
· key2 序列化后的二進制為 0x1122, namespace2 序列化后的二進制為0x334455
這兩個數據對應的 RocksDB key 都是 0x1122334455,這樣的話,兩個不同的 key、namespace 映射到 RocksDB 中變成了相同的數據,無法做區分。
解決方案:
在 key 和 namespace 中間寫入 key 的 byte 數組長度,在 namespace 后寫入 namespace 的 byte 長度。
寫入這兩個長度就不可能出現 key 沖突了,具體為什么,讀者可以自行思考。
3.5 RocksDB 的 key 中還會存儲 KeyGroupId
對 KeyGroup 不了解的同學可以參考:Flink 源碼:從 KeyGroup 到 Rescale。
加上 KeyGroupId 也比較簡單。只需要修改 RocksDB key 的拼接方式,在序列化 key 和 namespace 之前,先序列化 KeyGroupId 即可。
4. State TTL 簡述
Flink 中 TTL 的實現,都是將用戶的 value 封裝了一層,具體參考下面的 TtlValue 類:
public class TtlValue<T> implements Serializable {@Nullableprivate final T userValue;private final long lastAccessTimestamp; }TtlValue 類中有兩個字段,封裝了用戶的 value 且有一個時間戳字段,這個時間戳記錄了這條數據寫入的時間。
如果開啟了 TTL,則狀態中存儲的 value 就是 TtlValue 對象。時間戳字段也會保存到狀態引擎中,之后查詢數據時,就可以通過該時間戳判斷數據是否過期。
· ValueState 將 value 封裝為 TtlValue。
· MapState 將 userValue 封裝成 TtlValue。
· ListState 將 element 封裝成 TtlValue。
ValueState 中存 Map 與 MapState 有什么區別?
如果 ValueState 中存 Map,則整個 Map 被當做 value,只維護一個時間戳。所以要么整個 Map 過期,要么都不過期。
MapState 中如果存儲了 100 個 KV 鍵值對,則 100 個 KV 鍵值對都會存儲各自的時間戳。因此每個 KV 鍵值對的 TTL 是相互獨立的。
5.總結
本文從實現原理詳細分析了 ValueState 中存 Map 與 MapState 有什么區別?下面將從性能和 TTL 兩個維度來描述兩者的區別。
性能
· RocksDB 場景,MapState 比 ValueState 中存 Map 性能高很多,ValueState 中存大對象很容易使 CPU 打滿
· Heap State 場景,兩者性能類似
TTL
Flink 中 State 支持設置 TTL,TTL 只是將時間戳與 userValue 封裝起來。
· MapState 的 TTL 是基于 UK 級別的
· ValueState 的 TTL 是基于整個 key 的
不過,其實 ListState 的數據映射到 RocksDB 比較復雜,用到了 RocksDB 的 merge 特性,比較有意思,有興趣的同學可以閱讀 RocksDB wiki《Merge Operator Implementation》,鏈接:
https://github.com/facebook/rocksdb/wiki/Merge-Operator-Implementation
更多 Flink 技術交流可加入 Apache Flink 社區釘釘交流群:
原文鏈接:https://developer.aliyun.com/article/777445?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的Flink State 误用之痛,你中招了吗?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 网易云音乐基于 Flink + Kafk
- 下一篇: 云湖共生,下一代数据湖来了?