Flink中的Time与Window
一、Time
在Flink的流式處理中,會(huì)涉及到時(shí)間的不同概念
Event Time:是事件創(chuàng)建的時(shí)間。它通常由事件中的時(shí)間戳描述,例如采集的日志數(shù)據(jù)中,每一條日志都會(huì)記錄自己的生成時(shí)間,Flink通過(guò)時(shí)間戳分配器訪問(wèn)事件時(shí)間戳
Ingestion Time:是數(shù)據(jù)進(jìn)入Flink的時(shí)間
Processing Time:是每一個(gè)執(zhí)行基于時(shí)間操作的算子的本地系統(tǒng)時(shí)間,與機(jī)器相關(guān),默認(rèn)的時(shí)間屬性就是Processing Time。
例如一條日志進(jìn)入Flink的時(shí)間為2017-11-12 10:00:00.123 到達(dá)window的系統(tǒng)時(shí)間為 2017-11-12 10:00:01.234,日志內(nèi)容如下:
2017-11-02 18:37:15.624 INFO Fair over to rm2
對(duì)于業(yè)務(wù)來(lái)說(shuō),要統(tǒng)計(jì)1min內(nèi)的故障日志個(gè)數(shù),哪個(gè)時(shí)間是最有意義的?----- eventTime,因?yàn)槲覀円鶕?jù)日志的生成時(shí)間進(jìn)行統(tǒng)計(jì)。
??
如果要想聚合,不可能對(duì)無(wú)解數(shù)據(jù)流進(jìn)行聚合。
?
二、Window
1、streaming流式計(jì)算是一種被設(shè)計(jì)用于處理處理無(wú)限數(shù)據(jù)集的數(shù)據(jù)處理引擎,而無(wú)限數(shù)據(jù)集是指一種不斷增長(zhǎng)的本質(zhì)上無(wú)限的數(shù)據(jù)集,而window是一種切割無(wú)限數(shù)據(jù)為有限塊進(jìn)行處理的手段。
Window是無(wú)限數(shù)據(jù)流處理的核心,Window將一個(gè)無(wú)限的stream拆分成有限大小的"buckets"桶,我們可以在這些桶上做計(jì)算操作。
共有兩類,五種時(shí)間窗口。
2、Window類型(兩類)
2.1、CountWindow:按照指定的數(shù)據(jù)條數(shù)生成一個(gè)window,與時(shí)間無(wú)關(guān)
2.2、TimeWindow:按照時(shí)間生成window。(按照Processing Time來(lái)劃分Window)
對(duì)于TimeWindow和CountWindow,可以根據(jù)窗口實(shí)現(xiàn)原理的不同分成三類:滾動(dòng)窗口(Tumbling Window)、滑動(dòng)窗口(Sliding Window)和會(huì)話窗口(Session Window)。
(1)滾動(dòng)窗口(Tumbling Windows)
將數(shù)據(jù)依據(jù)固定的窗口長(zhǎng)度對(duì)數(shù)據(jù)進(jìn)行切分。
特點(diǎn):時(shí)間對(duì)齊,窗口長(zhǎng)度固定,沒(méi)有重疊。
滾動(dòng)窗口分配器將每個(gè)元素分配到一個(gè)指定窗口大小的窗口中,滾動(dòng)窗口有一個(gè)固定的大小,并且不會(huì)出現(xiàn)重疊。
(2)滑動(dòng)窗口(Sliding Windows)
滑動(dòng)窗口是固定窗口的更廣義的一種形式,滑動(dòng)窗口由固定的窗口長(zhǎng)度和滑動(dòng)間隔組成。
特點(diǎn):時(shí)間對(duì)齊,窗口長(zhǎng)度固定,有重疊。
滑動(dòng)窗口分配器將元素分配到固定長(zhǎng)度的窗口中,與滾動(dòng)窗口類似,窗口的大小由窗口大小參數(shù)來(lái)配置,另一個(gè)窗口滑動(dòng)參數(shù)控制滑動(dòng)窗口開(kāi)始的頻率。
因此,滑動(dòng)窗口如果滑動(dòng)參數(shù)小于窗口大小的話,窗口是可以重疊的,在這種情況下元素會(huì)被分配到多個(gè)窗口中。
使用場(chǎng)景:對(duì)最近一個(gè)時(shí)間段內(nèi)的統(tǒng)計(jì)(求某接口最近5min的失敗率來(lái)決定是否要報(bào)警。)
(3)會(huì)話窗口(Session Windows)
由一系列事件組合一個(gè)指定時(shí)間長(zhǎng)度的timeout間隙組成。類似于web應(yīng)用的session,也就是一段時(shí)間沒(méi)有接收到新數(shù)據(jù)就會(huì)生成新的窗口。
特點(diǎn):時(shí)間無(wú)對(duì)齊。
session 窗口分配器通過(guò)session活動(dòng)來(lái)對(duì)元素進(jìn)行分組,session窗口跟滾動(dòng)窗口和滑動(dòng)窗口相比,不會(huì)有重疊和固定的開(kāi)始時(shí)間和結(jié)束時(shí)間的情況,相反,當(dāng)它在一個(gè)固定的
時(shí)間周期內(nèi)不再收到元素,即非活動(dòng)間隔產(chǎn)生,那這個(gè)窗口就會(huì)關(guān)閉。一個(gè)Session窗口通過(guò)一個(gè)session間隔來(lái)配置,這個(gè)session間隔定義了非活躍周期的長(zhǎng)度,當(dāng)這個(gè)非活躍
周期產(chǎn)生,那么當(dāng)前的session將關(guān)閉并且后續(xù)的元素將被分配到新的session窗口中去。
?
三、Window API
3.1、CountWindow
CountWindow根據(jù)窗口中相同key元素的數(shù)量來(lái)觸發(fā)執(zhí)行,執(zhí)行時(shí)只計(jì)算元素?cái)?shù)量達(dá)到窗口大小的key對(duì)應(yīng)的結(jié)果。
注意:CountWindow的window_size 指的是相同key的元素的個(gè)數(shù),不是輸入的所有元素的總數(shù)。
import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}/*** CountWindow 中的滾動(dòng)窗口(Tumbling Windows)* 將數(shù)據(jù)依據(jù)固定的窗口長(zhǎng)度對(duì)數(shù)據(jù)進(jìn)行切分。*/ object TimeAndWindow {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval stream: DataStream[String] = env.socketTextStream("localhost",11111)val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0)//注意:CountWindow的window_size 指的是相同key的元素的個(gè)數(shù),不是輸入的所有元素的總數(shù)。val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5).reduce((item1, item2)=>(item1._1,item1._2+item2._2))streamWindow.print()env.execute("TimeAndWindow")} }3.2
import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}/*** CountWindow 中的滑動(dòng)窗口(Sliding Windows)* 將數(shù)據(jù)依據(jù)固定的窗口長(zhǎng)度對(duì)數(shù)據(jù)進(jìn)行切分。*/ object TimeAndWindow {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval stream: DataStream[String] = env.socketTextStream("localhost",11111)val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0)//注意:CountWindow的window_size 指的是相同key的元素的個(gè)數(shù),不是輸入的所有元素的總數(shù)。//滿足步長(zhǎng),就執(zhí)行一次,按第一個(gè)參數(shù)的長(zhǎng)度val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5,2).reduce((item1, item2)=>(item1._1,item1._2+item2._2))streamWindow.print()env.execute("TimeAndWindow")} }四、EventTime與Window
1、EventTime的引入
在Flink的流式處理中,絕大部分的業(yè)務(wù)都會(huì)使用eventTime,一般只在eventTime無(wú)法使用時(shí),才會(huì)被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的時(shí)間戳,引入方式如下所示:
2、Watermark
概念:我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過(guò)程和時(shí)間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的
事件戳順序來(lái)的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的EventTime順序排列的。
Watermark是一種衡量Event Time進(jìn)展的機(jī)制,它是數(shù)據(jù)本身的一個(gè)隱藏屬性,數(shù)據(jù)本身攜帶著對(duì)應(yīng)的Watermark。
Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)。
數(shù)據(jù)流中的Watermark用于表示eventTime小于Watermark的數(shù)量,都已經(jīng)到達(dá)了,因此,window的執(zhí)行也是由Watermark觸發(fā)的。
Watermark可以理解成一個(gè)延遲觸發(fā)機(jī)制。我們可以設(shè)置Watermark的延時(shí)時(shí)長(zhǎng)t,每次系統(tǒng)會(huì)校驗(yàn)已經(jīng)到達(dá)的數(shù)據(jù)中最大的maxEventTime,然后認(rèn)定eventTime 小于
maxEventTime-t 的所有數(shù)據(jù)都已經(jīng)到達(dá)。如果有窗口的停止時(shí)間等于maxEventTime-t,那么這個(gè)窗口被觸發(fā)執(zhí)行。
滾動(dòng)窗口/滑動(dòng)窗口/會(huì)話窗口
import org.apache.flink.api.java.tuple.Tupleimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
/**
* TimeWindow
*/
object EventTimeAndWindow {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//開(kāi)啟watermark
//從調(diào)用時(shí)刻開(kāi)始給env創(chuàng)建的每一個(gè)stream追加時(shí)間特征。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: KeyedStream[(String, Long), Tuple] = env.socketTextStream("192.168.218.130", 1111).assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) {
override def extractTimestamp(element: String): Long = {
// event word eventTime是日志生成時(shí)間,我們從日志中解析EventTime
val eventTime = element.split(" ")(0).toLong
println(eventTime)
eventTime
}
}
).map(item => (item.split(" ")(1),1L)).keyBy(0)
//加上滾動(dòng)窗口,窗口大小是5s,調(diào)用window的api
// val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//滑動(dòng)窗口
// val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
//會(huì)話窗口
val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
val streamReduce = streamWindow.reduce((item1,item2)=>(item1._1,item1._2+item2._2))
streamReduce.print()
env.execute("EventTimeAndWindow")
}
}
?
轉(zhuǎn)載于:https://www.cnblogs.com/ssqq5200936/p/11014296.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的Flink中的Time与Window的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: FANUC服务器维修,FANUC系统40
- 下一篇: 常见显示器PPI备忘