Flink countWindow窗口
窗口在處理數據前,會對數據做分流,有兩種控制流的方式,按照數據流劃分:Keyed和Non-Keyed Windows
Keyed Windows:就是有按照某個字段分組的數據流使用的窗口,可以理解為按照原始數據流中的某個key進行分類,擁有同一個key值的數據流將為進入同一個window,多個窗口并行的邏輯流。
Non-Keyed Windows:沒有進行按照某個字段分組的數據使用的窗口
stream.windowAll(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/fold/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag"Keyed和Non-Keyed Windows的區別
在定義窗口之前,要指定的第一件事是流是否需要Keyed,使用keyBy(…)將無界流分成邏輯的keyed stream。 如果未調用keyBy(…),則表示流不是keyed stream。
實戰
目前原始數據內容如下:
1 100 2 200 1 101 2 201 3 300 4 401 3 302 1 103 5 501 6 602 6 601 4 402 6 600我們使用keyBy(0),根據第一個字段進行分組,然后使用countWindow(2)來進行觸發,代碼如下:
public class Test3 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("E:/test/haha.txt");dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String s) throws Exception {String[] split = s.split("\t");return Tuple2.of(split[0], split[1]);}}).keyBy(0).countWindow(2).apply(new WindowFunction<Tuple2<String, String>, Object, Tuple, GlobalWindow>() {@Overridepublic void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, String>> input, Collector<Object> out) throws Exception {Iterator<Tuple2<String, String>> iterator = input.iterator();while (iterator.hasNext()) {Tuple2<String, String> next = iterator.next();System.out.println("執行操作:" + next.f0 + ", " + next.f1);out.collect( next.f1 + "======");}}}).print();env.execute("Test3");} }輸出結果如下:
執行操作:2, 200 執行操作:2, 201 執行操作:1, 100 執行操作:1, 101 執行操作:3, 300 執行操作:3, 302 執行操作:6, 602 執行操作:6, 601 執行操作:4, 401 執行操作:4, 402原始數據有13條,而輸出結果只有10條,這是為什么?
原因是我們使用的是countWindow(2),也就是當根據keyBy(0)分組之后,數據的數量達到2時進行輸出。
而我們的數據中id為1的有3條,因此其中一條數據將不會被觸發,id為6的有3條,其中一條數據沒有達到countWindow(2)也不會觸發,id為5的有1條,沒有達到countWindow(2)也不會被觸發,因此輸出結果少了3條數據。
因此countWindow是根據分組之后的數據條數來確定是否執行后面的運算。
當把countWindow(2)改為countWindow(1)時,每一條數據都會被處理輸出。
總結
以上是生活随笔為你收集整理的Flink countWindow窗口的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink的重启策略
- 下一篇: linux 根据进程号查看对应的进程