Flink EventTime和Watermarks原理结合代码分析(转载+解决+精简记录)
Apache Flink 框架保證Watermark單調遞增,算子接收到一個Watermark時候,框架知道不會再有任何小于該Watermark的時間戳的數據元素到來了,所以Watermark可以看做是告訴Apache Flink框架數據流已經處理到什么位置(時間維度)的方式。[3]
?
這個博客整來自[1]的內容
①代碼
②輸入數據如下:
nc -lk 3456
(下面的表格在復習溫故的時候記得從①②③④⑤的順序來看,數據->時間戳->水位線->水位線和窗口的比較->決定是否觸發)
| 輸入數據① | 輸入數據的對應時間② | 窗口區間④ [window_start_time, window_end_time) | WaterMark③ | 備注⑤ | 
| 0001,1538359882000 | 2018-10-01 10:11:22 | ? | 2018-10-01 10:11:12.000 | ? | 
| 0001,1538359886000 | 2018-10-01 10:11:26 | ? | 2018-10-01 10:11:16.000 | 因為有亂序設置,所以有10s差距 | 
| 0001,1538359892000 | 2018-10-01 10:11:32 | ? | 2018-10-01 10:11:22.000 | ? | 
| 0001,1538359893000 | 2018-10-01 10:11:33 | ? | 2018-10-01 10:11:23.000 | ? | 
| 0001,1538359894000 | 2018-10-01 10:11:34 | 2018-10-01 10:11:22.000(輸入數據的時間戳) ? 2018-10-01 10:11:24.000 | 2018-10-01 10:11:24.000 遲到數據可以理解為,我本來想要34的水位線,但是有10s左右的遲到數據,所以實際水位線是24 | 觸發waterMark計算 10:11:34-10=10:11:24 ? final Long maxOutOfOrderness = 10000L; | 
| 0001,1538359896000 | 2018-10-01 10:11:36 | ? | 2018-10-01 10:11:26.000 | ? | 
| 0001,1538359897000 | 2018-10-01 10:11:37 | 2018-10-01 10:11:24.000 2018-10-01 10:11:27.000 | 2018-10-01 10:11:27.000 | 觸發waterMark計算 | 
| 0001,1538359899000 | 2018-10-01 10:11:39 | ? | 2018-10-01 10:11:29.000 | ? | 
| 0001,1538359891000(亂序數據) | 2018-10-01 10:11:31 | ? | 2018-10-01 10:11:29.000 | ? | 
| 0001,1538359903000(亂序數據) | 2018-10-01 10:11:43 | 2018-10-01 10:11:30.000 2018-10-01 10:11:33.000 | 2018-10-01 10:11:33.000 | 觸發waterMark計算 | 
| 0001,1538359890000(延遲數據) | 2018-10-01 10:11:30 | 2018-10-01 10:11:30.000 2018-10-01 10:11:33.000 | 2018-10-01 10:11:33.000 | 觸發waterMark計算 | 
| 0001,1538359903000(延遲數據) | 2018-10-01 10:11:43 | ? | 2018-10-01 10:11:33.000 | ? | 
| 0001,1538359891000(延遲數據) | 2018-10-01 10:11:31 | 2018-10-01 10:11:30.000 2018-10-01 10:11:33.000 | 2018-10-01 10:11:33.000 | 觸發waterMark計算 | 
| 0001,1538359892000(延遲數據) | 2018-10-01 10:11:32 | 2018-10-01 10:11:30.000 2018-10-01 10:11:33.000 | 2018-10-01 10:11:33.000 | 觸發waterMark計算 | 
讀表格中的解釋:
1.由于亂序時間maxOutOfOrderness的設置,導致waterMark的數值落后于最新且最大的數據大約10s
2.延遲數據/遲到數據 指的是時間戳小于水位線,但是比前一條數據來得晚的被接收的。
3.上面表格的第三列就是[window_start_time,window_end_time)
4.如果window大小是3秒,對應代碼是:
TumblingEventTimeWindows.of(Time.seconds(3))
那么1分鐘的區間內,會把window劃分為如下的形式【左閉右開】
[window_start_time,window_end_time] [00:00:00,00:00:03) [00:00:03,00:00:06) [00:00:06,00:00:09) [00:00:09,00:00:12) [00:00:12,00:00:15) [00:00:15,00:00:18) [00:00:18,00:00:21) [00:00:21,00:00:24) [00:00:24,00:00:27) [00:00:27,00:00:30) [00:00:30,00:00:33) [00:00:33,00:00:36) [00:00:36,00:00:39) [00:00:39,00:00:42) [00:00:42,00:00:45) [00:00:45,00:00:48) [00:00:48,00:00:51) [00:00:51,00:00:54) [00:00:54,00:00:57) [00:00:57,00:01:00)在滿足:
一,watermark時間 >= 上面的window_end_time,
二,在[window_start_time,window_end_time)中有數據存在
才會觸發WaterMark流計算。
亂序參數maxOutOfOrderness修改的是水位線
延遲參數修改的是延遲數據
--------------------------------------------------------遲到數據處理--------------------------------------------------------------------------------------
①代碼
②對于遲到的數據,都通過sideOutputLateData保存到了outputTag中
----------------------------------------------------------一圖解千言---------------------------------------------------------------------------------------
對于上面的概念,我畫了一張圖:
這個圖的大意如下:
當WaterMark上升至window_end_time的時候,觸發計算。
EventTime表示數據還在水平面(WaterMark)的上方,沒有最終落入水面以下。
所以有WaterMark+maxOutOfOrderness=Event TIme
在觸發計算的同時,還有一部分數據沒有到位,這些數據是延遲數據/遲到數據。
在延遲數據傳播的同時,WaterMark也在不斷上升,當WaterMark上升至window_end_time+allowedLateness(Time.seconds(2))的時候,即使遲到的數據到達,也會被丟棄。
?
延遲數據打個比方:
木塊(流數據中的水滴)投入水位很淺的臉盆,可以到底部(觸發計算)。
木塊(流數據中的水滴)投入水位很高的臉盆,難以到底部(數據丟棄)。
?
?
Reference:
[1]Flink EventTime和Watermarks案例分析
[2]在線工具
[3]Apache Flink 漫談系列(03) - Watermark
總結
以上是生活随笔為你收集整理的Flink EventTime和Watermarks原理结合代码分析(转载+解决+精简记录)的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: mysql怎么查看有没有死锁
- 下一篇: cad工作界面怎么设置为经典模式
