Flink 读取文本文件,聚合每一行的uid
生活随笔
收集整理的這篇文章主要介紹了
Flink 读取文本文件,聚合每一行的uid
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文本數據大約30W行,內容格式如下:
001 jack 001 jack 001 rose 004 tom 004 jerry 001 sofia 005 natasha 006 catalina 006 jennifer要求輸出結果如下:
001 [jack,rose,sofia] 004 [tom,jerry] 005 [natasha] 006 [catalina, jennifer]首先將文件的格式進行整理
public class Test2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("E:/test/uid_person.txt");SingleOutputStreamOperator<Tuple2<String, Set<String>>> map = dataStreamSource.map(new MapFunction<String, Tuple2<String, Set<String>>>() {@Overridepublic Tuple2<String, Set<String>> map(String s) throws Exception {String[] split = s.split("\t");String uid = split[0];String name = split[1];Set<String> set = new HashSet();set.add(name);return Tuple2.of(uid, set);}});map.writeAsText("E:/test/mytest.txt").setParallelism(1);env.execute("Test");} }輸出文件內容:
(004,[tom]) (004,[jerry]) (001,[sofia]) (001,[jack]) (001,[jack]) (001,[rose]) (006,[jennifer]) (005,[natasha]) (006,[catalina])每行數據都變為Tuple2<String, Set<String>>,它主要是用來將兩個同類型的值操作為一個同類型的值,第一個參數為前面reduce的結果,第二參數為當前的元素,注意reduce操作只能對相同類型的數據進行處理。將數據合并成一個新的數據,返回單個的結果值,
最每行數據進行keyBy-reduce操作
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("E:/test/uid_person.txt");SingleOutputStreamOperator<Tuple2<String, Set<String>>> map = dataStreamSource.map(new MapFunction<String, Tuple2<String, Set<String>>>() {@Overridepublic Tuple2<String, Set<String>> map(String s) throws Exception {String[] split = s.split("\t");String uid = split[0];String name = split[1];Set<String> set = new HashSet();set.add(name);return Tuple2.of(uid, set);}});map.keyBy(0).reduce(new ReduceFunction<Tuple2<String, Set<String>>>() {@Overridepublic Tuple2<String, Set<String>> reduce(Tuple2<String, Set<String>> stringSetTuple2, Tuple2<String, Set<String>> t1) throws Exception {stringSetTuple2.f1.addAll(t1.f1);return Tuple2.of(stringSetTuple2.f0, stringSetTuple2.f1);}}).writeAsText("E:/test/mytest.txt").setParallelism(1);env.execute("Test");}輸出結果如下:
(001,[sofia]) (001,[sofia, jack]) (001,[sofia, jack]) (001,[sofia, rose, jack]) (006,[catalina]) (006,[jennifer, catalina]) (005,[natasha]) (004,[tom]) (004,[tom, jerry])這樣每個uid的最右一條數據就是最完整的數據。
總結
以上是生活随笔為你收集整理的Flink 读取文本文件,聚合每一行的uid的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java操作HDFS文件系统
- 下一篇: Flink 广播变量