flink中的WaterMark调研和具体实例
一些基本概念介紹:
| Event Time | 事件時間是每個事件在其生產設備上發生的時間 |
| Ingestion Time | 攝取時間是數據進入Flink的時間 |
| Processing Time | 處理時間是是指正在執行相應算子操作的機器的系統時間,默認的時間屬性就是Processing Time |
[7]在Flink的流式處理中,絕大部分的業務都會使用EventTime,一般只在EventTime無法使用時,才會被迫使用ProcessingTime或者IngestionTime。默認情況下,Flink框架中處理的時間語義為ProcessingTime,如果要使用EventTime,那么需要引入EventTime的時間屬性,引入方式如下所示:
import org.apache.flink.streaming.api.TimeCharacteristic
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
上面的這個只是為了表示當前的環境中有EventTIme屬性,講大白話就是輸入的數據源中會有時間戳,不用想太多。
--------------------------------------------和時間相關的一些概念--------------------------------------------------------------------
假設nc -lk傳入數據,intelliji處理數據,那么其實流處理的大概有這么幾個時間相關的數據。
①終端輸入數據的時間
②該數據中包含的時間
③intellij處理數據的時間
④window中的窗口寬度,slidingWindow還包括滑動時間寬度
⑤水位線時間, 這個是從上一條數據中得到的時間戳,另外還有水位線延遲時間(用來對付遲到的數據)
首先,①≠②,整個實驗不關心①的時間,所以①我們不理會。
③我們一般也不關心(注意指的是一般情況下不關心,因為還是以數據源頭中的時間戳中的Event Time為主)
④和⑤是我們關心范疇
所以主要是②④⑤
---------------------------------------------------------WaterMark和各種參數之間的關系--------------------------------------------------------------
什么是WaterMark呢,其實翻譯過來就是水位線。
數據流好比水流(數據)從瀑布頂端傾瀉而下,
而瀑布下方的水位線(數據生成的時間,Event Time)隨著瀑布的流下而不斷上升。
------------------------------------------------------------------------------------------------------------------------------------------------
Watermarks是基于已經收集的消息來估算是否還有消息未到達,本質上是一個時間戳。時間戳反映的是事件發生的時間,而不是事件處理的時間
所謂的事件(Event),講人話其實就是一條數據。
1.設置StreamTime Characteristic為Event Time,即設置流式時間窗口
2.創建的DataStreamSource調用assignTimestampsAndWatermarks中設置WaterMarks種類(二選一):
①AssignerWithPeriodicWatermarks(周期性(一定時間間隔或者達到一定的記錄條數)生成水印)
②AssignerWithPunctuatedWatermarks(數據流中每一個遞增的?EventTime?都會產生一個?Watermark)
?
數據示例:
| 實驗類型(代碼) | nc -lk 3456輸入 | 實驗結果 | 實驗解析 |
| 窗口的生命周期 | ? tests | 測試::7> (tests,3) | 這個實驗必須輸入三個相同的字符串后,程序才會有反應 |
| 滾動窗口 | later,6565 | windows:>>>:7> window:[1599362540000-1599362545000]:{ (later,6565,1) } | window后面的是實際處理時間。 later后面手動輸入的時間戳被忽視 |
| 滑動窗口 | later,32234 | windows:>>>:7> window:[1599364140000-1599364145000]:{ (later,32234,1) } windows:>>>:7> window:[1599364143000-1599364148000]:{ (later,32234,1) } | 一條數據出現在兩個窗口中 |
| 會話窗口 | test,12312 test,235233 ? | windows:>>>:5> window:[1599366609562-1599366622057]:{ (test,12312,1),(test,235233,1) } | processtime間隔時間超過10s,就會輸出上一個窗口(含10s所有輸入內容) |
這里的例如1599366609562單位是ms
我們根據業務的需求還判斷使用哪個時間類型,一般來說使用Event Time更多,比如:在統計最近5分鐘的訂單總金額時,我們需要的是真實的訂單時間,而不是進入flink的時間或者是處理時間。
另外,生產環境中大多數情況下數據源頭都會帶有時間戳,時間戳經過flink處理提取得到Event Time。
上述三種窗口的事件事件寫法與處理時間寫法的對照(不包含窗口偏移設置)
| ? | 事件時間(Event Time) | 處理時間 |
| TumblingWindow | .window(TumblingEventTimeWindows.of(Time.seconds(5))) | .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) |
| SlidingWindow | .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) | .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) |
| SessionWindow | .window(EventTimeSessionWindows.withGap(Time.minutes(10))) | .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) |
上述三種窗口的事件事件寫法與處理時間寫法的對照(包含窗口偏移設置)
| ? | 事件時間(Event Time) | 處理時間 |
| TumblingWindow | .window(TumblingEventTimeWindows.of(Time.seconds(5),Time.seconds(3))) | .window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3))) |
| SlidingWindow | .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5),Time.seconds(3))) | .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5),Time.seconds(3))) |
| SessionWindow | .window(EventTimeSessionWindows.withGap(Time.minutes(10),Time.seconds(3))) | .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10),Time.seconds(3))) |
系統時間可以理解為是intellij處理數據的時間。
而EventTimeWindows那就是讓窗口以事件中的時間戳(Event TIme)作為時間標尺了。
-----------------------------------------------下面是兩個看似矛盾的說法--------------------------------------------------------------------------
[1]WaterMark為上一條數據的Event Time,并非當前的WaterMark
[2]WaterMark為當前數據的Event Time(根據實驗輸出來看)
為什么會有這么兩種截然不同的說法呢?
這是因為:
[1]的WaterMark是在getCurrentWatermark()函數中輸出的。
[2]的WaterMark是在extractTimestamp()函數中輸出的。
所以其實不矛盾
-----------------------------------------------------對于包含WaterMark的程序的觸發條件--------------------------------------
①并行度大于1的時候,需要各個節點都滿足②③。
?在多并行度的情況下,Watermark 會有一個對齊機制,這個對齊機制會取所有 Channel中最小的 Watermark。
②window條件要滿足(例如每5條數據觸發)、
③算子和定時器相關,需要waterMark高于定時器設置的時間。例如ctx.timerService().registerEventTimeTimer(value.getBizTime());
---------------------------------------------------------------------------------------------------------------------------------------
接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預計,導致窗口在它們到達之前已經關閉。[8]
遲到事件出現時窗口已經關閉并產出了計算結果,因此處理的方法有3種:[8]
- 重新激活已經關閉的窗口并重新計算以修正結果(Side Output)。
- 將遲到事件收集起來另外處理(Allowed Lateness)。
- 將遲到事件視為錯誤消息并丟棄。
Flink 默認的處理方式是第3種直接丟棄,其他兩種方式分別使用Side Output和Allowed Lateness。[8]
接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預計,導致窗口在它們到達之前已經關閉。[8]
上面都是一些概念,實驗方面的記錄在[9]
Reference:
[1]Flink WaterMark實例(閱讀完畢)
[2]Flink系列之Time和WaterMark(代碼不完整)
[5]Flink EventTime和Watermarks案例分析(閱讀完畢)
[3]老板讓阿粉學習 flink 中的 Watermark,現在他出教程了(還存在些問題)
[4]Flink的WaterMark,及demo實例(閱讀完畢)
[7]Flink的window、時間語義,Watermark機制,多代ctx.timerService().registerEventTimeTimer(value.getBizTime());碼案例詳解,Flink學習入門(三)(閱讀完畢)
[8][白話解析] Flink的Watermark機制?(判斷遲到數據,閱讀完畢)
[9]Flink EventTime和Watermarks原理結合代碼分析(轉載+解決+精簡記錄)
?
總結
以上是生活随笔為你收集整理的flink中的WaterMark调研和具体实例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cad工作界面怎么设置为经典模式
- 下一篇: 正余弦函数的Talor近似