flink 自定义 窗口_【Flink 精选】阐述 Watermark 机制,剖析 Watermark 的产生和传递流程...
本文闡述 Flink 的事件時間和 Watermark 機制,剖析 Watermark 產生和傳遞的流程。
1 Event time 和 Watermark 的關系
1.1 Event time 和 Processing time介紹
Event time 事件時間和Processing time 處理時間主要區別是產生時間不同,前者是事件的實際發生時間,后者是機器的系統處理時間,如下圖所示。
① Event time 事件時間:事件在其設備上發生的時間。
Event time 是事件在進入 Flink 之前已經嵌入到記錄的時間,其大小取決于事件本身,與網絡延時、系統時區等因素無關。② Processing time 處理時間:作業正在執行相應操作的機器系統時間。
Processing time 提供了最佳的性能和最低的延遲,但是不能提供確定性,即計算結果是不確定的。 例如,時間窗口為5min的求和統計,應用程序在 9:00 開始運行,則第一個時間窗口處理 [9:00, 9:05) 的事件,下一個窗口處理 [9:05, 9:10) 的事件,依此類推。通信延遲、作業故障重啟等問題,可能導致窗口的計算結果是不一樣的。如下圖所示,假設事件(事件時間, 數值) 遇到上述問題,場景一:事件 B 有網絡延遲落在[9:10, 9:15),場景二:作業故障重啟導致事件 A 和事件 B落在[9:10, 9:15)。1.2 Event time 和 Watermark
問題:Flink 支持事件時間,如何測量事件時間的進度?例如,5min 的事件時間窗口,當事件時間超過 5min 時,需要通知 Flink 觸發窗口計算。解答:Watermark 機制。Watermark 本質是時間戳,與業務數據一樣無差別地傳遞下去,目的是衡量事件時間的進度(通知 Flink 觸發事件時間相關的操作,例如窗口)。
說明: Watermark(T) 表示目前系統的時間事件是 T,即系統后續沒有 T'<T 的事件即 Event(T'<T)/*** 1.Watermark 是一個時間戳, 它表示小于該時間戳的事件都已經到達了。* 2.Watermark 一般情況在源位置產生(也可以在流圖中的其它節點產生), 通過流圖節點傳播。* 3.Watermark 也是 StreamElement, 和普通數據一起在算子之間傳遞。* 4.Watermark 可以觸發窗口計算, 時間戳為 Long.MAX_VALUE 表示算子后續沒有任何數據。*/ public final class Watermark extends StreamElement {// 省略.../*** The timestamp of the watermark in milliseconds.*/private final long timestamp;/*** Creates a new watermark with the given timestamp in milliseconds.*/public Watermark(long timestamp) {this.timestamp = timestamp;}/*** Returns the timestamp associated with this {@link Watermark} in milliseconds.*/public long getTimestamp() {return timestamp;}// 省略... }如下圖所示,事件 Event 是按照事件時間 EventTime 順序上報的。
如下圖所示,事件 Event 是不按照事件時間 EventTime 亂序上報的。
2 Watermark 的產生
2.1 Watermark 類型
說明:flink-1.12 支持 WatermarkStrategy 和 WatermarkGeneratorflink 采用 WatermarkStrategy 設置自定義 Watermark 類型,WatermarkGenerator 是 Watermark 的基類。flink 實現了 Punctuated Watermarks 從事件獲取事件的時間戳、Periodic Watermarks 周期獲取事件的時間戳。
/*** The {@code WatermarkGenerator} generates watermarks either based on events or* periodically (in a fixed interval).** <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the* {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.*/ @Public public interface WatermarkGenerator<T> {/*** 從事件獲取事件的時間戳*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** 周期獲取事件的時間戳*/void onPeriodicEmit(WatermarkOutput output); }使用 WatermarkStrategy 的樣例,如下代碼。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> input = env.fromElements("data");// 使用 WatermarkStrategy 設置 Watermark 類型input.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)));2.2 Watermark 的產生
Watermark 是算子 TimestampsAndWatermarksOperator 產生的,WatermarkStrategy 相當于 UDFFunction(封裝于TimestampsAndWatermarksOperator 內部)。processElement 方法實現事件產生 Watermark,processWatermark 方法阻斷上游傳過來的 Watermark,onProcessingTime 方法實現周期產生 Watermark。
public class TimestampsAndWatermarksOperator<T>extends AbstractStreamOperator<T>implements OneInputStreamOperator<T, T>, ProcessingTimeCallback { // 省略...@Overridepublic void processElement(final StreamRecord<T> element) throws Exception {final T event = element.getValue();final long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);element.setTimestamp(newTimestamp);output.collect(element);// 事件產生 WatermarkwatermarkGenerator.onEvent(event, newTimestamp, wmOutput);}// 阻斷上游傳過來的 watermark@Overridepublic void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) throws Exception {// if we receive a Long.MAX_VALUE watermark we forward it since it is used// to signal the end of input and to not block watermark progress downstreamif (mark.getTimestamp() == Long.MAX_VALUE) {wmOutput.emitWatermark(Watermark.MAX_WATERMARK);}}@Overridepublic void onProcessingTime(long timestamp) throws Exception {// 采用定時器, 周期產生 WatermarkwatermarkGenerator.onPeriodicEmit(wmOutput);final long now = getProcessingTimeService().getCurrentProcessingTime();// 更新定時器getProcessingTimeService().registerTimer(now + watermarkInterval, this);} // 省略... }(1)Watermark 周期產生
public class TimePeriodicWatermarkGenerator implements WatermarkGenerator<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {// don't need to do anything because we work on processing time}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));} }結合算子 TimestampsAndWatermarksOperator 和 TimePeriodicWatermarkGenerator,分析 Watermark 的產生流程。如下圖所示,橫軸表示 processing time,圓形表示事件,圓形中的時間 t 表示事件時間,圓形落在橫軸表示事件在算子中的處理,其中 Watermark 的產生周期為 60s 和允許延遲時間為 10s。以第一個周期 [0,60) 為例,獲取事件中的最大事件時間 max,向下游發送 watermark(最大事件時間 - 允許延遲時間 - 1)。(2)Watermark 事件產生
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {if (event.hasWatermarkMarker()) {output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// don't need to do anything because we emit in reaction to events above} }3 Watermark 的傳遞
Watermark 的傳遞方式是廣播,即廣播方式發送到下游。Watermark 與業務數據一樣,無差別地傳遞下去。
例子:多并發的場景下,Watermark 是 source task 產生,經過 keyby 分組后觸發窗口計算。 說明:① Watermark 要單調遞增。② 如果算子有多個上游(廣播)即輸入多個 Watermark(T),則該算子取最小 Watermark 即 min(Watermark(T1), Watermark(T2))。從 WindowOperator 源碼分析窗口是如何傳遞 Watermark。 首先分析 WindowOperator 類圖,可知 WindowOperator 間接繼承AbstractStreamOperator,而 AbstractStreamOperator 實現了接口 Input 的 processWatermark 方法、接口 TwoInputStreamOperator 的 processWatermark1 方法 和 processWatermark2 方法。
接著分析一下 AbstractStreamOperator 實現的 processWatermark 、processWatermark1 和 processWatermark2。
// 省略 ....public void processWatermark(Watermark mark) throws Exception {if (timeServiceManager != null) {timeServiceManager.advanceWatermark(mark);}// 發送 watermarkoutput.emitWatermark(mark);}/*** 2個上游的watermark* 計算最小watermark, 并設置為當前算子的watermark*/public void processWatermark1(Watermark mark) throws Exception {input1Watermark = mark.getTimestamp();long newMin = Math.min(input1Watermark, input2Watermark);if (newMin > combinedWatermark) {combinedWatermark = newMin;processWatermark(new Watermark(combinedWatermark));}}/*** 2個上游的watermark* 計算最小watermark, 并設置為當前算子的watermark*/public void processWatermark2(Watermark mark) throws Exception {input2Watermark = mark.getTimestamp();long newMin = Math.min(input1Watermark, input2Watermark);if (newMin > combinedWatermark) {combinedWatermark = newMin;processWatermark(new Watermark(combinedWatermark));}} // 省略 ....總結
以上是生活随笔為你收集整理的flink 自定义 窗口_【Flink 精选】阐述 Watermark 机制,剖析 Watermark 的产生和传递流程...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: vue 时间插件_Vue3 插件开发详解
- 下一篇: delphi cxgrid读取本地ima