flink window实例分析
生活随笔
收集整理的這篇文章主要介紹了
flink window实例分析
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
?window是處理數據的核心。按需選擇你需要的窗口類型后,它會將傳入的原始數據流切分成多個buckets,所有計算都在window中進行。
flink本身提供的實例程序TopSpeedWindowing.java
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;import java.util.Arrays; import java.util.Random; import java.util.concurrent.TimeUnit;/*** An example of grouped stream windowing where different eviction and trigger* policies can be used. A source fetches events from cars every 100 msec* containing their id, their current speed (kmh), overall elapsed distance (m)* and a timestamp. The streaming example triggers the top speed of each car* every x meters elapsed for the last y seconds.*/ public class TopSpeedWindowing {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {final ParameterTool params = ParameterTool.fromArgs(args);final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.getConfig().setGlobalJobParameters(params);@SuppressWarnings({"rawtypes", "serial"})DataStream<Tuple4<Integer, Integer, Double, Long>> carData;if (params.has("input")) {carData = env.readTextFile(params.get("input")).map(new ParseCarData());} else {System.out.println("Executing TopSpeedWindowing example with default input data set.");System.out.println("Use --input to specify file input.");carData = env.addSource(CarSource.create(2));}int evictionSec = 10;double triggerMeters = 50;DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData .assignTimestampsAndWatermarks(new CarTimestamp()) //1.keyBy(0).window(GlobalWindows.create()) //2.evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS))) //3 .trigger(DeltaTrigger.of(triggerMeters,new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {//4private static final long serialVersionUID = 1L;@Overridepublic double getDelta(Tuple4<Integer, Integer, Double, Long> oldDataPoint,Tuple4<Integer, Integer, Double, Long> newDataPoint) {return newDataPoint.f2 - oldDataPoint.f2;}}, carData.getType().createSerializer(env.getConfig())))//4.maxBy(1);if (params.has("output")) {topSpeeds.writeAsText(params.get("output"));} else {System.out.println("Printing result to stdout. Use --output to specify output path.");topSpeeds.print();}env.execute("CarTopSpeedWindowingExample");}// *************************************************************************// USER FUNCTIONS// *************************************************************************private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {private static final long serialVersionUID = 1L;private Integer[] speeds;private Double[] distances;private Random rand = new Random();private volatile boolean isRunning = true;private CarSource(int numOfCars) {speeds = new Integer[numOfCars];distances = new Double[numOfCars];Arrays.fill(speeds, 50);Arrays.fill(distances, 0d);}public static CarSource create(int cars) {return new CarSource(cars);}@Overridepublic void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception {while (isRunning) {Thread.sleep(100);for (int carId = 0; carId < speeds.length; carId++) {if (rand.nextBoolean()) {speeds[carId] = Math.min(100, speeds[carId] + 5);} else {speeds[carId] = Math.max(0, speeds[carId] - 5);}distances[carId] += speeds[carId] / 3.6d;Tuple4<Integer, Integer, Double, Long> record = new Tuple4<>(carId,speeds[carId], distances[carId], System.currentTimeMillis());ctx.collect(record);}}}@Overridepublic void cancel() {isRunning = false;}}private static class ParseCarData extends RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> {private static final long serialVersionUID = 1L;@Overridepublic Tuple4<Integer, Integer, Double, Long> map(String record) {String rawData = record.substring(1, record.length() - 1);String[] data = rawData.split(",");return new Tuple4<>(Integer.valueOf(data[0]), Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3]));}}private static class CarTimestamp extends AscendingTimestampExtractor<Tuple4<Integer, Integer, Double, Long>> {private static final long serialVersionUID = 1L;@Overridepublic long extractAscendingTimestamp(Tuple4<Integer, Integer, Double, Long> element) {return element.f3;}}}其中,
1. 定義時間戳,上篇文章<flink中的時間戳如何使用?---Watermark使用及原理>上進行了介紹,本篇不做贅述。
2.窗口類型,Windows Assigner定義如何將數據流分配到一個或者多個窗口;其層次結構如下:
?
?evictor:用于數據剔除,其層次結構如下:
3. trigger:窗口觸發器,其層次結構如下:
?
?4.?Window function定義窗口內數據的計算邏輯,其層次結構如下:
?參考資料
【1】https://www.jianshu.com/p/5302b48ca19b
轉載于:https://www.cnblogs.com/davidwang456/p/11113237.html
總結
以上是生活随笔為你收集整理的flink window实例分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Flink 零基础入门【转
- 下一篇: 如何判断服务器之间的服务是否可用?pin