初学Flink,对Watermarks的一些理解和感悟(透彻2)
2019獨角獸企業重金招聘Python工程師標準>>>
官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html
翻譯:https://www.jianshu.com/p/68ab40c7f347
1. 幾個重要的概念簡述:
-
Window:Window是處理無界流的關鍵,Windows將流拆分為一個個有限大小的buckets,可以可以在每一個buckets中進行計算
-
start_time,end_time:當Window時時間窗口的時候,每個window都會有一個開始時間和結束時間(前開后閉),這個時間是系統時間
-
event-time: 事件發生時間,是事件發生所在設備的當地時間,比如一個點擊事件的時間發生時間,是用戶點擊操作所在的手機或電腦的時間
-
Watermarks:可以把他理解為一個水位線,這個Watermarks在不斷的變化,一旦Watermarks大于了某個window的end_time,就會觸發此window的計算,Watermarks就是用來觸發window計算的。
2.如何使用Watermarks處理亂序的數據流
什么是亂序呢?可以理解為數據到達的順序和他的event-time排序不一致。導致這的原因有很多,比如延遲,消息積壓,重試等等
因為Watermarks是用來觸發window窗口計算的,我們可以根據事件的event-time,計算出Watermarks,并且設置一些延遲,給遲到的數據一些機會。
假如我們設置10s的時間窗口(window),那么0~10s,10~20s都是一個窗口,以0~10s為例,0位start-time,10為end-time。假如有4個數據的event-time分別是8(A),12.5(B),9(C),13.5(D),我們設置Watermarks為當前所有到達數據event-time的最大值減去延遲值3.5秒
當A到達的時候,Watermarks為max{8}-3.5=8-3.5 = 4.5 < 10,不會觸發計算
當B到達的時候,Watermarks為max(12.8,5)-3.5=12.5-3.5 = 9 < 10,不會觸發計算
當C到達的時候,Watermarks為max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不會觸發計算
當D到達的時候,Watermarks為max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,觸發計算
觸發計算的時候,會將AC(因為他們都小于10)都計算進去
通過上面這種方式,我們就將遲到的C計算進去了
這里的延遲3.5s是我們假設一個數據到達的時候,比他早3.5s的數據肯定也都到達了,這個是需要根據經驗推算的,加入D到達以后有到達了一個E,event-time=6,但是由于0~10的時間窗口已經開始計算了,所以E就丟了。
3.看一個代碼的實際例子
下面代碼中的BoundedOutOfOrdernessGenerator就是一個典型的Watermarks實例
package xuwei.tech;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.meituan.flink.common.conf.FlinkConf; import com.meituan.flink.common.kafka.MTKafkaConsumer08; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date;/*** Created by smile on 14/11/2017. 統計每 10 秒內每種操作有多少個*/ public class EventTimeWindowCount {private static final Logger logger = LoggerFactory.getLogger(EventTimeWindowCount.class);public static void main(String[] args) throws Exception { // 獲取作業名String jobName = FlinkConf.getJobName(args); // 獲取執行環境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 設置使用 EventTime// 作為時間戳(默認是// ProcessingTime)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 開啟 Checkpoint(每 10 秒保存一次檢查點,模式為 Exactly Once)env.enableCheckpointing(10000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 設置從 Kafka 的 topic// "log.orderlog" 中讀取數據MYKafkaConsumer08 consumer = new MYKafkaConsumer08(jobName);DataStream<String> stream = env.addSource(consumer.getInstance("log.orderlog", new SimpleStringSchema())); // 默認接上次開始消費,以下的寫法(setStartFromLatest)可以從最新開始消費,相應的還有(setStartFromEarliest// 從最舊開始消費)// DataStream<String> stream =// env.addSource(consumer.getInstance("log.orderlog", new// SimpleStringSchema()).setStartFromLatest());DataStream<String> orderAmount = // 將讀入的字符串轉化為 OrderRecord 對象stream.map(new ParseOrderRecord()) // 設置從 OrderRecord 對象中提取時間戳的方式,下文 BoundedOutOfOrdernessGenerator// 類中具體實現該方法.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator()) // 用 OrderRecord 對象的 action// 字段進行分流(相同 action// 的進入相同流,不同 action// 的進入不同流).keyBy("action") // 觸發 10s 的滾動窗口,即每十秒的數據進入同一個窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 將同一窗口的每個 OrderRecord 對象的 count// 字段加起來(其余字段只保留第一個進入該窗口的,后進去的丟棄).sum("count") // 將結果從 OrderRecord 對象轉換為 String,每十萬條輸出一條.flatMap(new ParseResult()); // 如果想每條都輸出來,那就輸得慢一點,每 10 秒輸出一條數據(請將上一行的 flatMap 換成下一行的 map)// .map(new ParseResultSleep());// 輸出結果(然后就可以去 Task Manage 的 Stdout 里面看)// 小數據量測試的時候可以這么寫,正式上線的時候不要這么寫!數據量大建議還是寫到 Kafka Topic 或者其他的下游里面去orderAmount.print();env.execute(jobName);}public static class ParseOrderRecord implements MapFunction<String, OrderRecord> {@Overridepublic OrderRecord map(String s) throws Exception {JSONObject jsonObject = JSON.parseObject(s);long id = jsonObject.getLong("id");int dealId = jsonObject.getInteger("dealid");String action = jsonObject.getString("_mt_action");double amount = jsonObject.getDouble("amount");String timestampString = jsonObject.getString("_mt_datetime"); // 將字符串格式的時間戳解析為 long 類型,單位毫秒SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date timestampDate = simpleDateFormat.parse(timestampString);long timestamp = timestampDate.getTime();return new OrderRecord(id, dealId, action, amount, timestamp);}}public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<OrderRecord> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;@Overridepublic long extractTimestamp(OrderRecord record, long previousElementTimestamp) { // 將數據中的時間戳字段(long 類型,精確到毫秒)賦給// timestamp 變量,此處是// OrderRecord 的 timestamp// 字段long timestamp = record.timestamp;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}@Overridepublic Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the// out-of-orderness boundreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}}public static class ParseResult implements FlatMapFunction<OrderRecord, String> {private static long msgCount = 0;@Overridepublic void flatMap(OrderRecord record, Collector<String> out) throws Exception { // 每十萬條輸出一條,防止輸出太多在 Task// Manage 的 Stdout 里面刷新不出來if (msgCount == 0) {out.collect("Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp)+ " action: " + record.action + " count = " + record.count);msgCount = 0;}msgCount++;msgCount %= 100000;}}public static class ParseResultSleep implements MapFunction<OrderRecord, String> {@Overridepublic String map(OrderRecord record) throws Exception { // 每 10 秒輸出一條數據,防止輸出太多在 Task Manage 的 Stdout 里面刷新不出來// 正式上線的時候不要這么寫!Thread.sleep(10000);return "Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp) + " action: "+ record.action + " count = " + record.count;}}public static class OrderRecord { public long id; public int dealId; public String action; public double amount; public long timestamp; public long count; public OrderRecord() {} public OrderRecord(long id, int dealId, String action, double amount, long timestamp) { this.id = id; this.dealId = dealId; this.action = action; this.amount = amount; this.timestamp = timestamp; this.count = 1;}} }
?
轉載于:https://my.oschina.net/xiaominmin/blog/3057628
總結
以上是生活随笔為你收集整理的初学Flink,对Watermarks的一些理解和感悟(透彻2)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JMeter + influxdb +
- 下一篇: shiro登陆流程源码详解