Flink的Window
1 Window概述
? ? streaming流式計算是一種被設計用于處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增長的本質上無限的數據集,而window是一種切割無限數據為有限塊進行處理的手段。
? ? Window是無限數據流處理的核心,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。
? ? 注意:window一般在keyBy(KeyedStram)后。如果實在DataStream后的話是windowAll(不建議使用,會將所有數據匯總到一個分區計算)
? ? window assigner確定了數據屬于哪個窗口丟到正確的桶里面,還沒有做計算。真正做計算是在window assigner后面的window function。下面兩步和起來才是一個完整的窗口操作
.window(<window assigner>).aggregate(new AverageAggregate)2 Window的類型
? ?Window可以分成兩類:TimeWindow(按照時間生成Window)和CountWindow(按照指定的數據條數生成一個Window,與時間無關)。
2.1 TimeWindow
? ?對于TimeWindow,可以根據窗口實現原理的不同分成三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)。
2.1.1 Tumbling Window
? ?將數據依據固定的窗口長度對數據進行切片,滾動窗口分配器將每個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,并且不會出現重疊。特點:時間對齊,窗口長度固定,沒有重疊。適合做BI統計等(做每個時間段的聚合計算)
2.1.2 Sliding Window
? ?滑動窗口由固定的窗口長度和滑動間隔組成。滑動窗口分配器將元素分配到固定長度的窗口中,如果滑動參數小于窗口大小的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警)。特點:時間對齊,窗口長度固定,可以有重疊。
2.1.3 Session Window
? ?由一系列事件組合一個指定時間長度的timeout間隙組成,類似于web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。不會有重疊和固定的開始時間和結束時間的情況,相反,當它在一個固定的時間周期內不再收到元素,即非活動間隔產生,那個這個窗口就會關閉。特點:時間無對齊。
3 WindowAPI
3.0 窗口分配器
??窗口分配器 即 window() 方法,我們可以用 .window() 來定義一個窗口,然后基于這個 window 去做一些聚合或者其它處理操作。注意 window () 方法必須在 keyBy 之后才能用。Flink 提供了更加簡單的 .timeWindow 和 .countWindow 方法,用于定義時間窗口和計數窗口。
??window() 方法接收的輸入參數是一個 WindowAssigner(窗口分配器),WindowAssigner 負責將每條輸入的數據分發到正確的 window 中。Flink 提供了通用的 WindowAssigner:①滾動時間窗口( .timeWindow(Time.seconds(5)))②滑動時間窗口(.timeWindow(Time.seconds(15), Time.seconds(5)))③會話窗口( .window(EventTimeSessionWindows.withGap(Time.minutes(1)))④滾動計數窗口(.countWindow(5))⑤滑動計數窗口(.countWindow(20,10))
3.1 TimeWindow
? ?TimeWindow將指定時間范圍內的所有數據組成一個window,一次對一個window里面的所有數據進行計算。默認的時間窗口根據Processing Time 進行窗口的劃分,將Flink獲取到的數據根據進入Flink的時間劃分到不同的窗口中。
? ?(1)滾動窗口
? ?時間間隔參數可以通過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等指定。
val timeWindowStream = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))? ?(2)滑動窗口
? ?在傳參數時需要傳入兩個參數,一個是window_size,一個是sliding_size。每5s就計算輸出結果一次,每一次計算的window范圍是1分鐘內的所有元素如下:
val timeWindowStream: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(15), Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))? ?或者
val timeWindowStream: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).window(SlidingEventTimeWindows.of(Time.minutes(1),Time.seconds(5)).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))3.2 CountWindow
? ?CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。
? ?注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的所有元素的總數。
? ?(1)滾動窗口
? ?指定窗口大小即可,當元素數量達到窗口大小時,就會觸發窗口的執行。
val countWindowStream: DataStream[(String, Double)] = dataStream.map(r => (r.id, r.temperature)).keyBy(_._1).countWindow(5).reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))? ?(2)滑動窗口
? ?在傳參數時需要傳入兩個參數,一個是window_size,一個是sliding_size。每當某一個key的個數達到10的時候,觸發計算,計算最近該key最近20個元素的內容如下
val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream.map(r => (r.id, r.temperature)).keyBy(0).countWindow(20,10).sum(1)3.3 window function
? ?window function 定義了要對窗口中收集的數據做的計算操作,主要可以分為兩類:①增量聚合函數:每條數據到來就進行計算,保持一個簡單的狀態。典型的增量聚合函數有ReduceFunction, AggregateFunction。②全窗口函數:先把窗口所有數據收集起來,等到計算的時候會遍歷所有數據。ProcessWindowFunction就是一個全窗口函數
? ?(1)ReduceFunction
? ?下面的示例的展示了如何將遞增的ReduceFunction與ProcessWindowFunction結合使用,以返回窗口中的最小事件以及窗口的開始時間。
val input: DataStream[SensorReading] = ...input.keyBy(<key selector>).timeWindow(<duration>).reduce((r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },( key: String,context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,minReadings: Iterable[SensorReading],out: Collector[(Long, SensorReading)] ) =>{val min = minReadings.iterator.next()out.collect((context.window.getStart, min))})? ?(2)AggregateFunction
? ?下面的示例展示了如何將遞增的AggregateFunction與ProcessWindowFunction結合起來計算平均值,并同時發出鍵和窗口以及平均值。
val input: DataStream[(String, Long)] = ...input.keyBy(<key selector>).timeWindow(<duration>).aggregate(new AverageAggregate(), new MyProcessWindowFunction())// Function definitions/*** The accumulator is used to keep a running sum and a count. The [getResult] method* computes the average.*/ class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {override def createAccumulator() = (0L, 0L)override def add(value: (String, Long), accumulator: (Long, Long)) =(accumulator._1 + value._2, accumulator._2 + 1L)override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2override def merge(a: (Long, Long), b: (Long, Long)) =(a._1 + b._1, a._2 + b._2) }class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]) = {val average = averages.iterator.next()out.collect((key, average))} }? ?(3)FoldFunction
? ?下面的示例顯示如何將遞增的FoldFunction與ProcessWindowFunction結合使用,以提取窗口中的事件數量,并返回窗口的鍵和結束時間。
val input: DataStream[SensorReading] = ...input.keyBy(<key selector>).timeWindow(<duration>).fold (("", 0L, 0),(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },( key: String,window: TimeWindow,counts: Iterable[(String, Long, Int)],out: Collector[(String, Long, Int)] ) =>{val count = counts.iterator.next()out.collect((key, window.getEnd, count._3))})? ?(4)ProcessWindowFunction
? ?除了訪問鍵控狀態(任何富函數都可以),ProcessWindowFunction還可以使用范圍限定在函數當前處理的窗口的鍵控狀態。
val input: DataStream[(String, Long)] = ...input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction())3.4 trigger
? ?觸發器trigger定義 window 什么時候關閉,觸發計算并輸出結果。觸發器確定窗口(由窗口分配者形成)何時準備好由窗口函數處理。每個WindowAssigner都有一個默認的觸發器。如果默認觸發器不符合需要,可以自定義觸發器。
? ?觸發器接口有五個方法,允許觸發器對不同的事件作出反應:
? ?①onElement()方法對添加到窗口的每個元素調用。
? ?②當一個已注冊的事件時間計時器觸發時,將調用onEventTime()方法。
? ?③當觸發注冊的處理時間計時器時,將調用onProcessingTime()方法。
? ?④onMerge()方法與有狀態觸發器相關,當兩個觸發器對應的窗口合并時,將它們的狀態合并起來,例如在使用會話窗口時。
? ?⑤clear()方法在刪除相應窗口時執行所需的任何操作。
? ?關于上述方法,有兩點需要注意:
? ?①前三個函數通過返回一個TriggerResult來決定如何處理它們的調用事件。動作可以是下列動作之一:CONTINUE:什么也不做;FIRE:觸發計算;PURGE:清除窗口中的元素;FIRE_AND_PURGE:觸發計算并隨后清除窗口中的元素。
? ?②這些方法中的任何一個都可以用于為將來的操作注冊處理或事件時間計時器。
3.5 evitor
? ?移除器evitor可以窗口觸發前或觸發后,定義移除某些數據的邏輯。一般和global window一起用,要自定義trigger和evitor,因為把所有的數據都存下來了,不用的數據丟棄。evitor接口有2個方法如下
/*** Optionally evicts elements. Called before windowing function.** @param elements The elements currently in the pane.* @param size The current number of elements in the pane.* @param window The {@link Window}* @param evictorContext The context for the Evictor*/ void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);/*** Optionally evicts elements. Called after windowing function.** @param elements The elements currently in the pane.* @param size The current number of elements in the pane.* @param window The {@link Window}* @param evictorContext The context for the Evictor*/ void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);??evictBefore()包含要在窗口函數之前應用的驅逐邏輯,而evictAfter()包含要在窗口函數之后應用的驅逐邏輯。
? ?Flink提供了三個預先實現的驅逐器:①CountEvictor:保持窗口中元素的用戶指定數量,并丟棄從窗口緩沖區開始的剩余元素。②DeltaEvictor:獲取delta函數和閾值,計算窗口緩沖區中最后一個元素與每個剩余元素之間的增量,并刪除增量大于或等于閾值的元素。③TimeEvictor:以毫秒為單位的時間間隔作為參數,對于給定的窗口,它在元素中查找最大時間戳max_ts,并刪除所有時間戳小于max_ts - interval的元素。
3.6 allowedLateness
? ?允許處理遲到的數據,分布式計算數據可能是亂序的,開了時間窗口之后,可能屬于他的數據姍姍來遲。假設正在是10點關閉窗口,允許1分鐘的遲到數據,到10點不關但是要觸發一次計算輸出一個計算結果,后面一分鐘再來的數據可以在這個基礎上在做疊加觸發一次計算再輸出一個結果。也就是先輸出結果后面更新。
? ?注意:這些處理遲到數據的必須在數據自帶的時間處理才有意義
val input: DataStream[T] = ...input.keyBy(<key selector>).window(<window assigner>).allowedLateness(<time>).<windowed transformation>(<window function>)3.7 sideOutputLateData和getSideOutput
? ?sideOutputLateData將遲到的數據放入側輸出流,getSideOutput獲取側輸出流
val lateOutputTag = OutputTag[T]("late-data")val input: DataStream[T] = ...val result = input.keyBy(<key selector>).window(<window assigner>).allowedLateness(<time>).sideOutputLateData(lateOutputTag).<windowed transformation>(<window function>)val lateStream = result.getSideOutput(lateOutputTag)3.8 window API 總覽
Keyed Windowsstream.keyBy(...) <- keyed versus non-keyed windows.window(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/fold/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag" Non-Keyed Windowsstream.windowAll(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/fold/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag"總結
以上是生活随笔為你收集整理的Flink的Window的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SQL行列问题
- 下一篇: Java什么是重用_深度解析:java必