flink 自定义 窗口_Flink入门实战 (下)
一、 時間語義與 Wartermark
1、 Flink 中的時間語義
在 Flink 的流式處理中,會涉及到時間的不同概念,如下圖所示:
Event Time:是事件創建的時間。它通常由事件中的時間戳描述,例如采集的
日志數據中,每一條日志都會記錄自己的生成時間,Flink 通過時間戳分配器訪問事
件時間戳。 Ingestion Time:是數據進入 Flink 的時間。 Processing Time:是每一個執行基于時間操作的算子的本地系統時間,與機器
相關,默認的時間屬性就是 Processing Time。
一個例子——電影《星球大戰》:
例如,一條日志進入 Flink 的時間為 2017-11-12 10:00:00.123,到達 Window 的
系統時間為 2017-11-12 10:00:01.234,日志的內容如下:
2017對于業務來說,要統計 1min 內的故障日志個數,哪個時間是最有意義的?——
eventTime,因為我們要根據日志的生成時間進行統計。
2、 EventTime 的引入
在 Flink 的流式處理中,絕大部分的業務都會使用 eventTime,一般只在
eventTime 無法使用時,才會被迫使用 ProcessingTime 或者 IngestionTime。
如果要使用 EventTime,那么需要引入 EventTime 的時間屬性,引入方式如下所
示:
val 3、Watermark
3.1、基本概念
我們知道,流處理從事件產生,到流經 source,再到 operator,中間是有一個過
程和時間的,雖然大部分情況下,流到 operator 的數據都是按照事件產生的時間順
序來的,但是也不排除由于網絡、分布式等原因,導致亂序的產生,所謂亂序,就
是指 Flink 接收到的事件的先后順序不是嚴格按照事件的 Event Time 順序排列的。
那么此時出現一個問題,一旦出現亂序,如果只根據 eventTime 決定 window 的
運行,我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有
個機制來保證一個特定的時間后,必須觸發 window 去進行計算了,這個特別的機
制,就是 Watermark。
- Watermark 是一種衡量 Event Time 進展的機制。
- Watermark 是用于處理亂序事件的,而正確的處理亂序事件,通常用
Watermark 機制結合 window 來實現。
- 數據流中的 Watermark 用于表示 timestamp 小于 Watermark 的數據,都已經
到達了,因此,window 的執行也是由 Watermark 觸發的。
- Watermark 可以理解成一個延遲觸發機制,我們可以設置 Watermark 的延時
時長 t,每次系統會校驗已經到達的數據中最大的 maxEventTime,然后認定 eventTime
小于 maxEventTime - t 的所有數據都已經到達,如果有窗口的停止時間等于
maxEventTime – t,那么這個窗口被觸發執行。
有序流的 Watermarker 如下圖所示:(Watermark 設置為 0)
亂序流的 Watermarker 如下圖所示:(Watermark 設置為 2)
當 Flink 接收到數據時,會按照一定的規則去生成 Watermark,這條 Watermark
就等于當前所有到達數據中的 maxEventTime - 延遲時長,也就是說,Watermark 是
由數據攜帶的,一旦數據攜帶的 Watermark 比當前未觸發的窗口的停止時間要晚,
那么就會觸發相應窗口的執行。由于 Watermark 是由數據攜帶的,因此,如果運行
過程中無法獲取新的數據,那么沒有被觸發的窗口將永遠都不被觸發。
上圖中,我們設置的允許最大延遲到達時間為 2s,所以時間戳為 7s 的事件對應
的 Watermark 是 5s,時間戳為 12s 的事件的 Watermark 是 10s,如果我們的窗口 1
是 1s~5s,窗口 2 是 6s~10s,那么時間戳為 7s 的事件到達時的 Watermarker 恰好觸
發窗口 1,時間戳為 12s 的事件到達時的 Watermark 恰好觸發窗口 2。
Watermark 就是觸發前一窗口的“關窗時間”,一旦觸發關門那么以當前時刻
為準在窗口范圍內的所有所有數據都會收入窗中。
只要沒有達到水位那么不管現實中的時間推進了多久都不會觸發關窗。
3.2、Watermark 的引入
watermark 的引入很簡單,對于亂序數據,最常見的引用方式如下:
dataStreamEvent Time 的使用一定要指定數據源中的時間戳。否則程序無法知道事件的事
件時間是什么(數據源里的數據沒有時間戳的話,就只能使用 Processing Time 了)。
我們看到上面的例子中創建了一個看起來有點復雜的類,這個類實現的其實就
是分配時間戳的接口。Flink 暴露了 TimestampAssigner 接口供我們實現,使我們可
以自定義如何從事件數據中抽取時間戳。
val MyAssigner 有兩種類型
- AssignerWithPeriodicWatermarks
- AssignerWithPunctuatedWatermarks
以上兩個接口都繼承自 TimestampAssigner。
Assigner with periodic watermarks
周期性的生成 watermark:系統會周期性的將 watermark 插入到流中(水位線也
是一種特殊的事件!)。默認周期是 200 毫秒。可以使用
ExecutionConfig.setAutoWatermarkInterval()方法進行設置。
val 產生 watermark 的邏輯:每隔 5 秒鐘,Flink 會調用
AssignerWithPeriodicWatermarks 的 getCurrentWatermark()方法。如果方法返回一個
時間戳大于之前水位的時間戳,新的 watermark 會被插入到流中。這個檢查保證了
水位線是單調遞增的。如果方法返回的時間戳小于等于之前水位的時間戳,則不會
產生新的 watermark。
例子,自定義一個周期性的時間戳抽取:
class 一種簡單的特殊情況是,如果我們事先得知數據流的時間戳是單調遞增的,也
就是說沒有亂序,那我們可以使用 assignAscendingTimestamps,這個方法會直接使
用數據的時間戳生成 watermark。
val 而對于亂序數據流,如果我們能大致估算出數據流中的事件的最大延遲時間,
就可以使用如下代碼:
val Assigner with punctuated watermarks
間斷式地生成 watermark。和周期性生成的方式不同,這種方式不是固定時間的,
而是可以根據需要對每條數據進行篩選和處理。直接上代碼來舉個例子,我們只給
sensor_1 的傳感器的數據流插入 watermark:
class 4、 EvnetTime 在 window 中的使用
案例一:Flink窗口操作之簡單測試
4.1、滾動窗口(TumblingEventTimeWindows)
代碼具體實現:
package 啟動程序后,視頻演示:表示10秒之內統計數據
Flink的滾動窗口實現方式https://www.zhihu.com/video/1241477585970135040案例二:Flink窗口操作之事件時間測試
代碼具體實現:
package 啟動程序后,視頻演示:事件時間測試表示執行多少個才能把窗口關閉,由于這里簡單測試沒遇到窗口關閉
事件時間測試 https://www.zhihu.com/video/1241492597937111040案例三:Flink窗口操作之Window起始點
視頻演示:
Window起始點https://www.zhihu.com/video/1241685277611372544二、ProcessFunction API(底層 API)
我們之前學習的轉換算子是無法訪問事件的時間戳信息和水位線信息的。而這
在一些應用場景下,極為重要。例如 MapFunction 這樣的 map 轉換算子就無法訪問
時間戳或者當前事件的事件時間。
基于此,DataStream API 提供了一系列的 Low-Level 轉換算子。可以訪問時間 戳、watermark 以及注冊定時事件。還可以輸出特定的一些事件,例如超時事件等。
Process Function 用來構建事件驅動的應用以及實現自定義的業務邏輯(使用之前的
window 函數和轉換算子無法實現)。例如,Flink SQL 就是使用 Process Function 實
現的。
Flink 提供了 8 個 Process Function:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
1、KeyedProcessFunction
這里我們重點介紹 KeyedProcessFunction。
KeyedProcessFunction 用來操作 KeyedStream。KeyedProcessFunction 會處理流
的每一個元素,輸出為 0 個、1 個或者多個元素。所有的 Process Function 都繼承自
RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。
而 KeyedProcessFunction[KEY, IN, OUT]還額外提供了兩個方法:
- processElement(v: IN, ctx: Context, out: Collector[OUT]),
流中的每一個元素 都會調用這個方法,調用結果將會放在 Collector 數據類型中輸出。Context 可以訪問元素的時間戳,元素的 key,以及 TimerService 時間服務。Context
還可以將結果輸出到別的流(side outputs)。
- onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])
是一個回 調函數。當之前注冊的定時器觸發時調用。參數 timestamp 為定時器所設定
的觸發的時間戳。Collector 為輸出結果的集合。OnTimerContext 和
processElement 的 Context 參數一樣,提供了上下文的一些信息,例如定時器
觸發的時間信息(事件時間或者處理時間)。
2、TimerService 和 定時器(Timers)
Context 和 OnTimerContext 所持有的 TimerService 對象擁有以下方法:
- currentProcessingTime(): Long 返回當前處理時間
- ? currentWatermark(): Long 返回當前 watermark 的時間戳
- ? registerProcessingTimeTimer(timestamp: Long): Unit 會注冊當前 key 的
processing time 的定時器。當 processing time 到達定時時間時,觸發 timer。
- registerEventTimeTimer(timestamp: Long): Unit 會注冊當前 key 的 event time
定時器。當水位線大于等于定時器注冊的時間時,觸發定時器執行回調函數。
- deleteProcessingTimeTimer(timestamp: Long): Unit 刪除之前注冊處理時間定
時器。如果沒有這個時間戳的定時器,則不執行。
- deleteEventTimeTimer(timestamp: Long): Unit 刪除之前注冊的事件時間定時
器,如果沒有此時間戳的定時器,則不執行。
當定時器 timer 觸發時,會執行回調函數 onTimer()。注意定時器 timer 只能在
keyed streams 上面使用。
下面舉個例子說明 KeyedProcessFunction 如何操作 KeyedStream。
需求:監控溫度傳感器的溫度值,如果溫度值在一秒鐘之內(processing time)連
續上升,則報警。
val 看一下 TempIncreaseAlertFunction 如何實現, 程序中使用了 ValueState 這樣一個
狀態變量。
具體代碼實現:
package 啟動程序,控制臺打印數據
3、側輸出流(SideOutput)
大部分的 DataStream API 的算子的輸出是單一輸出,也就是某種數據類型的流。
除了 split 算子,可以將一條流分成多條流,這些流的數據類型也都相同。process
function 的 side outputs 功能可以產生多條流,并且這些流的數據類型可以不一樣。
一個 side output 可以定義為 OutputTag[X]對象,X 是輸出流的數據類型。process
function 可以通過 Context 對象發射一個事件到一個或者多個 side outputs。
下面是一個示例程序:
val 接下來我們實現 FreezingMonitor 函數,用來監控傳感器溫度值,將溫度值低于
32F 的溫度輸出到 side output。
具體代碼實現:
package 啟動程序,控制臺打印數據
冰點低溫輸出流https://www.zhihu.com/video/12417521135516467204、CoProcessFunction
對于兩條輸入流,DataStream API 提供了 CoProcessFunction 這樣的 low-level
操作。CoProcessFunction 提供了操作每一個輸入流的方法: processElement1()和
processElement2()。
類似于 ProcessFunction,這兩種方法都通過 Context 對象來調用。這個 Context
對象可以訪問事件數據,定時器時間戳,TimerService,以及 side outputs。
CoProcessFunction 也提供了 onTimer()回調函數。
總結
以上是生活随笔為你收集整理的flink 自定义 窗口_Flink入门实战 (下)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 求一个qq网名三字女生。
- 下一篇: 崇开头的成语有哪些啊?