Flink的时间语义和Watermark
1 時(shí)間語義
?? 數(shù)據(jù)遲到的概念是:數(shù)據(jù)先產(chǎn)生,但是處理的時(shí)候滯后了
?? 在Flink的流式處理中,會(huì)涉及到時(shí)間的不同概念,如下圖所示:
?? Event Time:是事件創(chuàng)建的時(shí)間。它通常由事件中的時(shí)間戳描述,例如采集的日志數(shù)據(jù)中,每一條日志都會(huì)記錄自己的生成時(shí)間,Flink通過時(shí)間戳分配器訪問事件時(shí)間戳。
?? Ingestion Time:是數(shù)據(jù)進(jìn)入Flink的時(shí)間。
?? Processing Time:是每一個(gè)執(zhí)行基于時(shí)間操作的算子的本地系統(tǒng)時(shí)間,與機(jī)器相關(guān),默認(rèn)的時(shí)間屬性就是Processing Time。
?? 在Flink的流式處理中,絕大部分的業(yè)務(wù)都會(huì)使用eventTime,一般只在eventTime無法使用時(shí),才會(huì)被迫使用ProcessingTime或者IngestionTime。引入EventTime的時(shí)間屬性如下:
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer[MyEvent](topic, schema, props))stream.keyBy( _.getUser ).timeWindow(Time.hours(1)).reduce( (a, b) => a.add(b) ).addSink(...)?? 設(shè)置了EventTime后后面處理底層會(huì)判斷
?? 注意:設(shè)置了事件時(shí)間,但是并不知道事件時(shí)間,Event Time 的使用一定要指定數(shù)據(jù)源中的時(shí)間戳,通過assignTimestampsAndWatermarks指定,時(shí)間戳要是ms單位。
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream: DataStream[MyEvent] = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter())val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.filter( _.severity == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>)withTimestampsAndWatermarks.keyBy( _.getGroup ).timeWindow(Time.seconds(10)).reduce( (a, b) => a.add(b) ).addSink(...)?? 對于排序好的數(shù)據(jù),不需要延遲觸發(fā),可以只指定時(shí)間戳就行了
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.assignTimestampsAndWatermarks(_.timestamp)?? 對于亂序數(shù)據(jù)調(diào)用 assignTimestampAndWatermarks 方法,傳入一個(gè) BoundedOutOfOrdernessTimestampExtractor,就可以指定 watermark
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[WC](Time.milliseconds(1000)){override def extractTimestamp(element: WC): Long = {element.timestamp * 1000}}2 WaterMark
2.1 什么是WaterMark
?? 我們的數(shù)據(jù)從采集經(jīng)過kafka,etl等操作要耗時(shí)的,再到流經(jīng)source,到operator,中間是有一個(gè)過程和時(shí)間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導(dǎo)致亂序的產(chǎn)生。
?? 遲到數(shù)據(jù)是因?yàn)橛醒舆t,簡單的想法就多等一下。不要5秒的事件到了就關(guān)閉窗口,多等一會(huì)。我們要考慮的是當(dāng)前事件的時(shí)間進(jìn)展到底要按照什么時(shí)間算,也就是說假設(shè)現(xiàn)在5秒的窗口要關(guān)閉,設(shè)置延遲為2秒,那么5秒的數(shù)據(jù)來了就多等2秒,5秒的事件來了就相當(dāng)于還沒有進(jìn)展到5秒,是進(jìn)展到了5-2=3秒,也就是時(shí)間才進(jìn)展到3秒。按照這種多等2秒的方式的話要等到時(shí)間戳是7的數(shù)據(jù)來了之后7-2=5才關(guān)閉5秒的窗口。這就提出了Watermark
?? 亂序,其實(shí)就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的Event Time順序排列的。一旦出現(xiàn)亂序,如果只根據(jù)eventTime決定window的運(yùn)行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無限期的等下去,此時(shí)必須要有個(gè)機(jī)制來保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了,這個(gè)特別的機(jī)制,就是Watermark。
?? Watermark可以從以下幾個(gè)方面理解:①Watermark是一種衡量Event Time進(jìn)展的機(jī)制。②Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機(jī)制結(jié)合window來實(shí)現(xiàn)。③數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark的數(shù)據(jù),都已經(jīng)到達(dá)了,因此,window的執(zhí)行也是由Watermark觸發(fā)的。④Watermark可以理解成一個(gè)延遲觸發(fā)機(jī)制,我們可以設(shè)置Watermark的延時(shí)時(shí)長t,每次系統(tǒng)會(huì)校驗(yàn)已經(jīng)到達(dá)的數(shù)據(jù)中最大的maxEventTime,然后認(rèn)定eventTime小于maxEventTime - t的所有數(shù)據(jù)都已經(jīng)到達(dá),如果有窗口的停止時(shí)間等于maxEventTime – t,那么這個(gè)窗口被觸發(fā)執(zhí)行
?? Watermark延遲時(shí)間的設(shè)置一般根據(jù)數(shù)據(jù)的亂序情況定義,通常設(shè)置成最大亂序程度
2.2 Watermark傳遞
?? 真正的Watermark其實(shí)就是一條特殊的記錄,可以認(rèn)為是插入數(shù)據(jù)流里面的一個(gè)特殊數(shù)據(jù),Watermark可以理解為是一個(gè)有時(shí)間戳的特殊數(shù)據(jù)結(jié)構(gòu),就和數(shù)據(jù)一樣一條一條來,后面處理數(shù)據(jù)如果是正常數(shù)據(jù)就正常處理,如果是Watermark就按照對于時(shí)間的操作該關(guān)閉窗口就關(guān)閉窗口。
?? Watermark必須單調(diào)遞增,既然表示當(dāng)前事件時(shí)間的進(jìn)展,時(shí)間只能朝前不停的推進(jìn),另外總和當(dāng)前數(shù)據(jù)的時(shí)間戳相關(guān),數(shù)據(jù)的時(shí)間戳就應(yīng)該是當(dāng)前的事件時(shí)間。
?? 當(dāng)Flink接收到數(shù)據(jù)時(shí),會(huì)按照一定的規(guī)則去生成Watermark。Watermark要求單調(diào)遞增的話就選取所有當(dāng)前已經(jīng)來的數(shù)據(jù)里面最大的時(shí)間戳作為當(dāng)前的事件時(shí)間,要多等一會(huì)的話在當(dāng)前最大的時(shí)間戳基礎(chǔ)上再減去一個(gè)延遲時(shí)間就可以了,即maxEventTime - 延遲時(shí)長。所以Watermark是基于數(shù)據(jù)攜帶的時(shí)間戳生成的,如果Watermark比當(dāng)前未觸發(fā)的窗口的停止時(shí)間要晚,那么就會(huì)觸發(fā)相應(yīng)窗口的執(zhí)行,如果運(yùn)行過程中無法獲取新的數(shù)據(jù),那么沒有被觸發(fā)的窗口將永遠(yuǎn)都不被觸發(fā)。
?? 有序流的Watermarker(最大延遲時(shí)間為0)如下圖所示:
?? 亂序流的Watermarker(最大延遲時(shí)間為4)如下圖所示:
?? 上圖中,采用周期性插入Watermark的生成策略,默認(rèn)每200ms系統(tǒng)插入Watermark。我們設(shè)置的允許最大延遲到達(dá)時(shí)間為4s,當(dāng)系統(tǒng)要插入第一個(gè)Watermark時(shí)查看此時(shí)數(shù)據(jù)中的最大事件時(shí)間為15,所以插入的Watermark是11s。過了200ms后到了第二次插入watermark的時(shí)候,此時(shí)數(shù)據(jù)中的最大事件時(shí)間為22,所以插入Watermark是18s。果我們的窗口1是1s-10s,窗口2是10s-20s,那么Watermarker為11到達(dá)之后需要觸發(fā)窗口1。一旦觸發(fā)以當(dāng)前時(shí)刻為準(zhǔn)在窗口范圍內(nèi)的所有所有數(shù)據(jù)都會(huì)收入窗中。
2.3 Watermark的傳遞
?? Watermark的傳遞如上圖所示。
?? Flink 的傳遞策略基本上遵循三點(diǎn):①watermark 會(huì)以廣播的形式在算子之間進(jìn)行傳播。并行任務(wù)沒有數(shù)據(jù)交互不考慮,只要考慮上游有多少個(gè)任務(wù)給他發(fā)數(shù)據(jù),下游要發(fā)送多少個(gè)數(shù)據(jù)到別的任務(wù)。②如果在程序里面收到了一個(gè) Long.MAX_VALUE這個(gè)數(shù)值的 watermark,就表示對應(yīng)的那一條流的一個(gè)部分不會(huì)再有數(shù)據(jù)發(fā)過來了,它相當(dāng)于就是一個(gè)終止的標(biāo)志。③單流輸入取其大,多流輸入取小。不同的上游任務(wù)發(fā)來的Watermark不一樣,不能按照上游所有的Watermark中最大的Watermark來判定當(dāng)前的事件時(shí)間,而是應(yīng)該按照最小的那個(gè)來判定,因?yàn)閃atermark代表的數(shù)據(jù)是他之前的數(shù)據(jù)都到期了,如果只接收到一個(gè)分區(qū)的Watermark是29表示這個(gè)分區(qū)29之前數(shù)據(jù)已經(jīng)到齊了,但是不能保證當(dāng)前任務(wù)不在接收29之前的數(shù)據(jù),因?yàn)橹皠e的Watermark可能還沒進(jìn)展到29,所以應(yīng)該按照最小的。
?? 底層實(shí)現(xiàn):上游有2個(gè)分區(qū)就會(huì)對每一個(gè)分區(qū)都去創(chuàng)建一個(gè)分區(qū)的Watermark(PARTITION Watermark),分別是29,14所以當(dāng)前任務(wù)的事件時(shí)間是14,那么下游的子任務(wù)廣播出去也是14,14之前的數(shù)據(jù)都到齊了。接下來一個(gè)分區(qū)來了一個(gè)新的Watermark是17,相當(dāng)于這個(gè)分區(qū)的時(shí)間進(jìn)展為17之前的都到齊,那么首先更新當(dāng)前的Watermark,然后觀察現(xiàn)在所有分區(qū)的Watermark最小值是否改變,如果改變那么事件時(shí)間就朝前進(jìn)展,事件時(shí)間更新就往下游廣播。
2.4 WaterMark使用
?? watermark對于有序數(shù)據(jù),最常見的引用方式如下:
dataStream.assignTimestampsAndWatermarks(_.timestamp)?? 升序數(shù)據(jù)不用管Watermark,本身數(shù)據(jù)來就帶有時(shí)間戳
?? watermark對于亂序數(shù)據(jù),最常見的引用方式如下:
dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)) {override def extractTimestamp(element: Element): Long = element.ds})?? Watermark就是在assignTimestampsAndWatermarks里面定義出來的,BoundedOutOfOrdernessTimestampExtractor 是Flink內(nèi)置提供的允許亂序最大延時(shí)的watermark生成方式,只需要重寫其extractTimestamp方法?,F(xiàn)在kafka源也支持直接生成Watermark,所以etl的時(shí)候可以把Watermark也產(chǎn)生。不過我們一般是在Flink把數(shù)據(jù)讀進(jìn)來做了轉(zhuǎn)換之后馬上分配一個(gè)Watermark。Watermark要保證正確性,延遲時(shí)間一般定義成最大的亂序程度(從數(shù)據(jù)里面提煉出來的參數(shù))。同個(gè)分區(qū)數(shù)據(jù)可能會(huì)亂序,Watermark不會(huì)亂序(單調(diào)遞增,取最大的時(shí)間戳減去延遲時(shí)間)
2.5 自定義WaterMark
?? watermark的生成策略有兩種:一種是AssignerWithPeriodicWatermarks周期性生成(隔一段時(shí)間系統(tǒng)自動(dòng)插入),另外一種是AssignerWithPunctuatedWatermarks根據(jù)特定標(biāo)記生成。這兩個(gè)接口都是Flink暴露了TimestampAssigner接口的子類型。實(shí)際生成中大量密集數(shù)據(jù)比較多,稀疏較少,所以一般使用周期性AssignerWithPeriodicWatermarks方式。
?? 周期性的生成watermark系統(tǒng)會(huì)周期性的將watermark插入到流中。默認(rèn)周期是200毫秒??梢允褂肊xecutionConfig.setAutoWatermarkInterval(watermarkInterval)方法進(jìn)行設(shè)置。每隔watermarkInterval,Flink會(huì)調(diào)用AssignerWithPeriodicWatermarks的getCurrentWatermark(watermarkInterval)方法。如果方法返回的watermark大于之前的watermark,新的watermark會(huì)被插入到流中。這個(gè)檢查保證了watermark是單調(diào)遞增的。如果方法返回的時(shí)間戳小于等于之前watermark,則不會(huì)產(chǎn)生新的watermark。
?? 自定義一個(gè)周期性的時(shí)間戳抽取:
class MyPeriodicAssigner extends AssignerWithPeriodicWatermarks[Element] {val bound: Long = 60 * 1000 // 延時(shí)為1分鐘var maxTs: Long = Long.MinValue // 觀察到的最大時(shí)間戳override def getCurrentWatermark: Watermark = {new Watermark(maxTs - bound)}override def extractTimestamp(r: Element, previousTS: Long) = {maxTs = maxTs.max(r.timestamp)r.timestamp} }?? 間斷式地生成watermark。和周期性生成的方式不同,這種方式不是固定時(shí)間的,而是可以根據(jù)需要對每條數(shù)據(jù)進(jìn)行篩選和處理,自定義一個(gè)間斷式地生成watermar:
class MyPunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Element] {val bound: Long = 60 * 1000override def checkAndGetNextWatermark(r: Element, extractedTS: Long): Watermark = {if (r.status == "sucess") {new Watermark(extractedTS - bound)} else {null}}override def extractTimestamp(r: Element, previousTS: Long): Long = {r.timestamp} }總結(jié)
以上是生活随笔為你收集整理的Flink的时间语义和Watermark的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Jupyter Notebook安装 n
- 下一篇: MYSQL中有时候不得不使用replac