1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等
1.16.Flink Window和Time詳解
1.16.1.Window(窗口)
1.16.2.Window的類型
1.16.3.Window類型匯總
1.16.4.TimeWindow的應(yīng)用
1.16.5.CountWindow的應(yīng)用
1.16.6.Window聚合分類
1.16.7.Window聚合分類之增量聚合
1.16.7.1.增量聚合狀態(tài)變化過(guò)程-累加求和
1.16.7.2.reduce(reduceFunction)
1.16.7.3.aggregate(aggregateFunction)
1.16.8.Window聚合分類之全量聚合
1.16.8.1.全量聚合狀態(tài)變化過(guò)程-求最大值
1.16.8.2.apply(windowFunction)
1.16.8.3.process(processWindowFunction)
1.16.9.Time介紹
1.16.9.1.設(shè)置Time類型
1.16.9.2.EventTime和Watermarks
1.16.9.3.有序的流的watermarks
1.16.9.4.無(wú)序的流的watermarks
1.16.9.5.多并行度流的watermarks
1.16.9.6.watermarks的生成方式
1.16.9.7.Flink應(yīng)該如何設(shè)置最大亂序時(shí)間?
1.16.9.8.Flink應(yīng)該如何設(shè)置最大亂序時(shí)間?
1.16.Flink Window和Time詳解
1.16.1.Window(窗口)
?聚合事件(比如計(jì)數(shù)、求和)在流上的工作方式與批處理不同。
- ?比如,對(duì)流中的所有元素進(jìn)行計(jì)數(shù)是不可能的,因?yàn)橥ǔA魇菬o(wú)限的(無(wú)界的)。所以,流上的聚合需要由 window 來(lái)劃定范圍,比如 “計(jì)算過(guò)去的5分鐘” ,或者 “最后100個(gè)元素的和”。
- ?window是一種可以把無(wú)限數(shù)據(jù)切割為有限數(shù)據(jù)塊的手段。
?窗口可以是 時(shí)間驅(qū)動(dòng)的 【Time Window】(比如:每30秒)或者 數(shù)據(jù)驅(qū)動(dòng)的【Count Window】 (比如:每100個(gè)元素)。
1.16.2.Window的類型
?窗口通常被區(qū)分為不同的類型:
一:tumbling windows:滾動(dòng)窗口 【沒(méi)有重疊】
二:sliding windows:滑動(dòng)窗口 【有重疊】
三:session windows:會(huì)話窗口
1.16.3.Window類型匯總
TimeWindow和CountWindow都可以有tumbling windows和sliding wndows
1.16.4.TimeWindow的應(yīng)用
1.16.5.CountWindow的應(yīng)用
1.16.6.Window聚合分類
?增量聚合
?全量聚合
另外的Scala案例:
import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** 滑動(dòng)窗口計(jì)算** 每隔1秒統(tǒng)計(jì)最近2秒內(nèi)的數(shù)據(jù),打印到控制臺(tái)** Created by xxxx on 2020/10/09 .*/ object SocketWindowWordCountScala {def main(args: Array[String]): Unit = {//獲取socket端口號(hào)val port: Int = try {ParameterTool.fromArgs(args).getInt("port")}catch {case e: Exception => {System.err.println("No port set. use default port 9000--scala")}9000}//獲取運(yùn)行環(huán)境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//鏈接socket獲取輸入數(shù)據(jù)val text = env.socketTextStream("hadoop100",port,'\n')//解析數(shù)據(jù)(把數(shù)據(jù)打平),分組,窗口計(jì)算,并且聚合求sum//注意:必須要添加這一行隱式轉(zhuǎn)行,否則下面的flatmap方法執(zhí)行會(huì)報(bào)錯(cuò)import org.apache.flink.api.scala._val windowCounts = text.flatMap(line => line.split("\\s"))//打平,把每一行單詞都切開(kāi).map(w => WordWithCount(w,1))//把單詞轉(zhuǎn)成word , 1這種形式.keyBy("word")//分組.timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,指定間隔時(shí)間.sum("count");// sum或者reduce都可以//.reduce((a,b)=>WordWithCount(a.word,a.count+b.count))//打印到控制臺(tái)windowCounts.print().setParallelism(1);//執(zhí)行任務(wù)env.execute("Socket window count");}case class WordWithCount(word: String,count: Long)} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingFromCollectionScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val data = List(10,15,20)val text = env.fromCollection(data)//針對(duì)map接收到的數(shù)據(jù)執(zhí)行加1的操作val num = text.map(_+1)num.print().setParallelism(1)env.execute("StreamingFromCollectionScala")}}1.16.7.Window聚合分類之增量聚合
窗口中每進(jìn)入一條數(shù)據(jù),就進(jìn)行一次計(jì)算
reduce(reduceFunction) aggregate(aggregateFunction) sum(),min(),max()1.16.7.1.增量聚合狀態(tài)變化過(guò)程-累加求和
1.16.7.2.reduce(reduceFunction)
1.16.7.3.aggregate(aggregateFunction)
1.16.8.Window聚合分類之全量聚合
?全量聚合
- ?等屬于窗口的數(shù)據(jù)到齊,才開(kāi)始進(jìn)行聚合計(jì)算【可以實(shí)現(xiàn)對(duì)窗口內(nèi)的數(shù)據(jù)進(jìn)行排序等需求】
- ?apply(windowFunction)
- ?process(processWindowFunction)
?processWindowFunction比windowFunction提供了更多的上下文信息。
1.16.8.1.全量聚合狀態(tài)變化過(guò)程-求最大值
1.16.8.2.apply(windowFunction)
1.16.8.3.process(processWindowFunction)
1.16.9.Time介紹
?針對(duì)stream數(shù)據(jù)中的時(shí)間,可以分為以下三種
- ?Event Time:事件產(chǎn)生的時(shí)間,它通常由事件中的時(shí)間戳描述。
- ?Ingestion time:事件進(jìn)入Flink的時(shí)間
- ?Processing Time:事件被處理時(shí)當(dāng)前系統(tǒng)的時(shí)間。
?處理時(shí)間(processing time):處理時(shí)間是指執(zhí)行相應(yīng)操作的機(jī)器的系統(tǒng)時(shí)間。
當(dāng)流處理程序基于處理時(shí)間運(yùn)行時(shí),所有基于時(shí)間的操作(如時(shí)間窗口)將使用運(yùn)行相應(yīng)運(yùn)算符的機(jī)器的系統(tǒng)時(shí)鐘。每小時(shí)處理時(shí)間窗口將包括在系統(tǒng)時(shí)鐘指示整個(gè)小時(shí)之間到達(dá)特定運(yùn)算符的所有記錄。 例如,如果應(yīng)用程序在上午9:15開(kāi)始運(yùn)行,則第一個(gè)每小時(shí)處理時(shí)間窗口將包括在上午9:15到10:00之間處理的事件,下一個(gè)窗口將包括在上午10:00到11:00之間處理的事件,以此類推。
處理時(shí)間是最簡(jiǎn)單的時(shí)間概念,不需要流和機(jī)器之間的協(xié)調(diào)。 它提供最佳性能和最低延遲。 但是,在分布式和異步環(huán)境中,處理時(shí)間不提供確定性,因?yàn)樗菀资艿接涗浀竭_(dá)系統(tǒng)的速度(例如從消息隊(duì)列),記錄在系統(tǒng)內(nèi)的運(yùn)算符之間流動(dòng)的速度的影響,以及停電(計(jì)劃或其他)。
?事件時(shí)間(event time):事件時(shí)間是每個(gè)事件在其生產(chǎn)設(shè)備上發(fā)生的時(shí)間。此時(shí)間通常在進(jìn)入Flink之前嵌入記錄中,并且可以從每個(gè)記錄中提取該事件時(shí)間戳。 在事件時(shí)間,時(shí)間的進(jìn)展取決于數(shù)據(jù),而不是任何時(shí)鐘。 事件時(shí)間程序必須指定如何生成事件時(shí)間水印,這是表示事件時(shí)間進(jìn)度的機(jī)制。 該水印機(jī)制在下面的后面部分中描述。
在一個(gè)完美的世界中,事件時(shí)間處理將產(chǎn)生完全一致和確定的結(jié)果,無(wú)論事件何時(shí)到達(dá)或其它們的順序。 但是,除非事件已知按順序到達(dá)(按時(shí)間戳),否則事件時(shí)間處理會(huì)在等待無(wú)序事件時(shí)產(chǎn)生一些延遲。 由于只能等待一段有限的時(shí)間,因此限制了確定性事件時(shí)間應(yīng)用程序的運(yùn)行方式。
假設(shè)所有數(shù)據(jù)都已到達(dá),事件時(shí)間操作將按預(yù)期運(yùn)行,即使在處理無(wú)序或延遲事件或重新處理歷史數(shù)據(jù)時(shí)也會(huì)產(chǎn)生正確且一致的結(jié)果。 例如,每小時(shí)事件時(shí)間窗口將包含帶有落入該小時(shí)的事件時(shí)間戳的所有記錄,無(wú)論它們到達(dá)的順序如何,或者何時(shí)處理它們。 (有關(guān)更多信息,請(qǐng)參閱有關(guān)遲到事件的部分。)
請(qǐng)注意,有時(shí)基于事件時(shí)間的程序處理實(shí)時(shí)數(shù)據(jù)時(shí),它們將使用一些處理時(shí)間(processing time)操作,以保證它們及時(shí)進(jìn)行。
?進(jìn)入時(shí)間(Ingestion time): 進(jìn)入時(shí)間是事件進(jìn)入Flink的時(shí)間。 在源運(yùn)算符處,每個(gè)記錄將源的當(dāng)前時(shí)間作為時(shí)間戳,并且基于時(shí)間的操作(如時(shí)間窗口)引用該時(shí)間戳。
進(jìn)入時(shí)間在概念上位于事件時(shí)間和處理時(shí)間之間。與處理時(shí)間相比,它代價(jià)稍高,但可以提供更可預(yù)測(cè)的結(jié)果。 因?yàn)檫M(jìn)入時(shí)間使用穩(wěn)定的時(shí)間戳(在源處分配一次),所以對(duì)記錄的不同窗口操作將引用相同的時(shí)間戳,而在處理時(shí)間中,每個(gè)窗口操作符可以將記錄分配給不同的窗口(基于本地系統(tǒng)時(shí)鐘和 任何傳輸延誤)。
與事件時(shí)間相比,進(jìn)入時(shí)間程序無(wú)法處理任何無(wú)序事件或延遲數(shù)據(jù),但程序不必指定如何生成水印。
在內(nèi)部,攝取時(shí)間與事件時(shí)間非常相似,但具有自動(dòng)分配時(shí)間戳和自動(dòng)生成水印功能。
1.16.9.1.設(shè)置Time類型
?Flink中,默認(rèn)Time類似是ProcessingTime
?可以在代碼中設(shè)置
1.16.9.2.EventTime和Watermarks
?在使用eventTime的時(shí)候如何處理亂序數(shù)據(jù)?
?我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過(guò)程和時(shí)間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的,但是也不排除由于網(wǎng)絡(luò)延遲等原因,導(dǎo)致亂序的產(chǎn)生,特別是使用kafka的話,多個(gè)分區(qū)的數(shù)據(jù)無(wú)法保證有序。所以在進(jìn)行window計(jì)算的時(shí)候,我們又不能無(wú)限期的等下去,必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了。這個(gè)特別的機(jī)制,就是watermark,watermark是用于處理亂序事件的。
?watermark可以翻譯為水位線
1.16.9.3.有序的流的watermarks
1.16.9.4.無(wú)序的流的watermarks
1.16.9.5.多并行度流的watermarks
注意:多并行度的情況下,watermark對(duì)齊會(huì)取所有channel最小的watermark
1.16.9.6.watermarks的生成方式
?通常,在接收到source的數(shù)據(jù)后,應(yīng)該立刻生成watermark;但是,也可以在source后,應(yīng)用簡(jiǎn)單的map或者filter操作后,再生成watermark。
?注意:如果指定多次watermark,后面指定的會(huì)覆蓋前面的值。
?生成方式
-
?With Periodic Watermarks
1、周期性的觸發(fā)watermark的生成和發(fā)送,默認(rèn)是100ms
2、每隔N秒自動(dòng)向流里注入一個(gè)WATERMARK 時(shí)間間隔由ExecutionConfig.setAutoWatermarkInterval 決定. 每次調(diào)用getCurrentWatermark 方法, 如果得到的WATERMARK 不為空并且比之前的大就注入流中。
3、可以定義一個(gè)最大允許亂序的時(shí)間,這種比較常用
4、實(shí)現(xiàn)AssignerWithPeriodicWatermarks接口 -
?With Punctuated Watermarks
1、基于某些事件觸發(fā)watermark的生成和發(fā)送
2、基于事件向流里注入一個(gè)WATERMARK,每一個(gè)元素都有機(jī)會(huì)判斷是否生成一個(gè)WATERMARK. 如果得到的WATERMARK 不為空并且比之前的大就注入流中。
3、實(shí)現(xiàn)AssignerWithPunctuatedWatermarks接口
scala案例:
import java.text.SimpleDateFormatimport org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collectorimport scala.collection.mutable.ArrayBuffer import scala.util.Sorting/*** Watermark 案例* Created by xxxx on 2020/10/09*/ object StreamingWindowWatermarkScala {def main(args: Array[String]): Unit = {val port = 9000val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val text = env.socketTextStream("hadoop100",port,'\n')val inputMap = text.map(line=>{val arr = line.split(",")(arr(0),arr(1).toLong)})val waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {var currentMaxTimestamp = 0Lvar maxOutOfOrderness = 10000L// 最大允許的亂序時(shí)間是10sval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");override def getCurrentWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long) = {val timestamp = element._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)val id = Thread.currentThread().getIdprintln("currentThreadId:"+id+",key:"+element._1+",eventtime:["+element._2+"|"+sdf.format(element._2)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp+"|"+sdf.format(getCurrentWatermark().getTimestamp)+"]")timestamp}})val window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和調(diào)用TimeWindow效果一樣.apply(new WindowFunction[Tuple2[String, Long], String, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]) = {val keyStr = key.toStringval arrBuf = ArrayBuffer[Long]()val ite = input.iteratorwhile (ite.hasNext){val tup2 = ite.next()arrBuf.append(tup2._2)}val arr = arrBuf.toArraySorting.quickSort(arr)val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");val result = keyStr + "," + arr.length + "," + sdf.format(arr.head) + "," + sdf.format(arr.last)+ "," + sdf.format(window.getStart) + "," + sdf.format(window.getEnd)out.collect(result)}})window.print()env.execute("StreamingWindowWatermarkScala")} } import java.text.SimpleDateFormatimport org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collectorimport scala.collection.mutable.ArrayBuffer import scala.util.Sorting/*** Watermark 案例** sideOutputLateData 收集遲到的數(shù)據(jù)** Created by xxxx on 2020/10/09*/ object StreamingWindowWatermarkScala2 {def main(args: Array[String]): Unit = {val port = 9000val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val text = env.socketTextStream("hadoop100",port,'\n')val inputMap = text.map(line=>{val arr = line.split(",")(arr(0),arr(1).toLong)})val waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {var currentMaxTimestamp = 0Lvar maxOutOfOrderness = 10000L// 最大允許的亂序時(shí)間是10sval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");override def getCurrentWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long) = {val timestamp = element._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)val id = Thread.currentThread().getIdprintln("currentThreadId:"+id+",key:"+element._1+",eventtime:["+element._2+"|"+sdf.format(element._2)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp+"|"+sdf.format(getCurrentWatermark().getTimestamp)+"]")timestamp}})val outputTag = new OutputTag[Tuple2[String,Long]]("late-data"){}val window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和調(diào)用TimeWindow效果一樣//.allowedLateness(Time.seconds(2))//允許數(shù)據(jù)遲到2秒.sideOutputLateData(outputTag).apply(new WindowFunction[Tuple2[String, Long], String, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]) = {val keyStr = key.toStringval arrBuf = ArrayBuffer[Long]()val ite = input.iteratorwhile (ite.hasNext){val tup2 = ite.next()arrBuf.append(tup2._2)}val arr = arrBuf.toArraySorting.quickSort(arr)val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");val result = keyStr + "," + arr.length + "," + sdf.format(arr.head) + "," + sdf.format(arr.last)+ "," + sdf.format(window.getStart) + "," + sdf.format(window.getEnd)out.collect(result)}})val sideOutput: DataStream[Tuple2[String, Long]] = window.getSideOutput(outputTag)sideOutput.print()window.print()env.execute("StreamingWindowWatermarkScala")}}1.16.9.7.Flink應(yīng)該如何設(shè)置最大亂序時(shí)間?
這個(gè)要結(jié)合自己的業(yè)務(wù)以及數(shù)據(jù)情況去設(shè)置。如果maxOutOfOrderness設(shè)置的太小,而自身數(shù)據(jù)發(fā)送時(shí)由于網(wǎng)絡(luò)等原因?qū)е聛y序或者late太多,那么最終的結(jié)果就是會(huì)有很多單條的數(shù)據(jù)在window中被觸發(fā),數(shù)據(jù)的正確性影響太大。
對(duì)于嚴(yán)重亂序的數(shù)據(jù),需要嚴(yán)格統(tǒng)計(jì)數(shù)據(jù)最大延遲時(shí)間,才能保證計(jì)算的數(shù)據(jù)準(zhǔn)確,延時(shí)設(shè)置太小會(huì)影響數(shù)據(jù)準(zhǔn)確性,延時(shí)設(shè)置太大不僅影響數(shù)據(jù)的實(shí)時(shí)性,更加會(huì)加重Flink作業(yè)的負(fù)擔(dān),不是對(duì)eventTime要求特別嚴(yán)格的數(shù)據(jù),盡量不要采用eventTime方式來(lái)處理,會(huì)有丟數(shù)據(jù)的風(fēng)險(xiǎn)。
1.16.9.8.Flink應(yīng)該如何設(shè)置最大亂序時(shí)間?
這個(gè)要結(jié)合自己的業(yè)務(wù)以及數(shù)據(jù)情況去設(shè)置。如果maxOutOfOrderness設(shè)置的太小,而自身數(shù)據(jù)發(fā)送時(shí)由于網(wǎng)絡(luò)等原因?qū)е聛y序或者late太多,那么最終的結(jié)果就是會(huì)有很多單條的數(shù)據(jù)在window中被觸發(fā),數(shù)據(jù)的正確性影響太大。
對(duì)于嚴(yán)重亂序的數(shù)據(jù),需要嚴(yán)格統(tǒng)計(jì)數(shù)據(jù)最大延遲時(shí)間,才能保證計(jì)算的數(shù)據(jù)準(zhǔn)確,延時(shí)設(shè)置太小會(huì)影響數(shù)據(jù)準(zhǔn)確性,延時(shí)設(shè)置太大不僅影響數(shù)據(jù)的實(shí)時(shí)性,更加會(huì)加重Flink作業(yè)的負(fù)擔(dān),不是對(duì)eventTime要求特別嚴(yán)格的數(shù)據(jù),盡量不要采用eventTime方式來(lái)處理,會(huì)有丟數(shù)據(jù)的風(fēng)險(xiǎn)。
與50位技術(shù)專家面對(duì)面20年技術(shù)見(jiàn)證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 开公司独资还是合资好 还是要看个人的判
- 下一篇: 1.17.Flink 并行度详解(Par