1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
1.11.Flink DataSetAPI
1.11.1.DataSet API之Data Sources
1.11.2.DataSet API之Transformations
1.11.3.DataSet Sink部分詳解
1.11.Flink DataSetAPI
1.11.1.DataSet API之Data Sources
?基于文件
readTextFile(path)
?基于集合
fromCollection(Collection)
1.11.2.DataSet API之Transformations
?Map:輸入一個元素,然后返回一個元素,中間可以做一些清洗轉換等操作。
?FlatMap:輸入一個元素,可以返回零個,一個或者多個元素。
?MapPartition:類似map,一次處理一個分區的數據【如果在進行map處理的時候需要獲取第三方資源鏈接,建議使用MapPartition】
再例如:
import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.Iterator;/*** Created by xxxx on 2020/10/09 .*/ public class BatchDemoMapPartition {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<String> data = new ArrayList<>();data.add("hello you");data.add("hello me");DataSource<String> text = env.fromCollection(data);/*text.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {//獲取數據庫連接--注意,此時是每過來一條數據就獲取一次鏈接//處理數據//關閉連接return value;}});*/DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() {@Overridepublic void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {//獲取數據庫連接--注意,此時是一個分區的數據獲取一次連接【優點,每個分區獲取一次鏈接】//values中保存了一個分區的數據//處理數據Iterator<String> it = values.iterator();while (it.hasNext()) {String next = it.next();String[] split = next.split("\\W+");for (String word : split) {out.collect(word);}}//關閉鏈接}});mapPartitionData.print();}} import org.apache.flink.api.scala.ExecutionEnvironment import scala.collection.mutable.ListBuffer/*** Created by xxxx on 2020/10/09*/ object BatchDemoMapPartitionScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val data = ListBuffer[String]()data.append("hello you")data.append("hello me")val text = env.fromCollection(data)text.mapPartition(it=>{//創建數據庫連接,建議吧這塊代碼放到try-catch代碼塊中val res = ListBuffer[String]()while(it.hasNext){val line = it.next()val words = line.split("\\W+")for(word <- words){res.append(word)}}res//關閉連接}).print()}}?Filter:過濾函數,對傳入的數據進行判斷,符合條件的數據會被留下
?Reduce:對數據進行聚合操作,結合當前元素和上一次reduce返回的值進行聚合操作,然后返回一個新的值
?Aggregation:sum、max、min等。
?Distinct:返回一個數據集中去重之后的元素,data.distinct()
?Join:內連接
import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3;import java.util.ArrayList;/*** Created by xxxx on 2020/10/09 .*/ public class BatchDemoJoin {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//tuple2<用戶id,用戶姓名>ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();data1.add(new Tuple2<>(1,"zs"));data1.add(new Tuple2<>(2,"ls"));data1.add(new Tuple2<>(3,"ww"));//tuple2<用戶id,用戶所在城市>ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();data2.add(new Tuple2<>(1,"beijing"));data2.add(new Tuple2<>(2,"shanghai"));data2.add(new Tuple2<>(3,"guangzhou"));DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);text1.join(text2).where(0)//指定第一個數據集中需要進行比較的元素角標.equalTo(0)//指定第二個數據集中需要進行比較的元素角標.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)throws Exception {return new Tuple3<>(first.f0,first.f1,second.f1);}}).print();System.out.println("==================================");//注意,這里用map和上面使用的with最終效果是一致的。/*text1.join(text2).where(0)//指定第一個數據集中需要進行比較的元素角標.equalTo(0)//指定第二個數據集中需要進行比較的元素角標.map(new MapFunction<Tuple2<Tuple2<Integer,String>,Tuple2<Integer,String>>, Tuple3<Integer,String,String>>() {@Overridepublic Tuple3<Integer, String, String> map(Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>> value) throws Exception {return new Tuple3<>(value.f0.f0,value.f0.f1,value.f1.f1);}}).print();*/} } import org.apache.flink.api.scala.ExecutionEnvironment import scala.collection.mutable.ListBuffer/*** Created by xxxx on 2020/10/09*/ object BatchDemoJoinScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val data1 = ListBuffer[Tuple2[Int,String]]()data1.append((1,"zs"))data1.append((2,"ls"))data1.append((3,"ww"))val data2 = ListBuffer[Tuple2[Int,String]]()data2.append((1,"beijing"))data2.append((2,"shanghai"))data2.append((3,"guangzhou"))val text1 = env.fromCollection(data1)val text2 = env.fromCollection(data2)text1.join(text2).where(0).equalTo(0).apply((first,second)=>{(first._1,first._2,second._2)}).print()} }?OuterJoin:外鏈接
import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3;import java.util.ArrayList;/*** 外連接** 左外連接* 右外連接* 全外連接** Created by xxxx on 2020/10/09 .*/ public class BatchDemoOuterJoin {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//tuple2<用戶id,用戶姓名>ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();data1.add(new Tuple2<>(1,"zs"));data1.add(new Tuple2<>(2,"ls"));data1.add(new Tuple2<>(3,"ww"));//tuple2<用戶id,用戶所在城市>ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();data2.add(new Tuple2<>(1,"beijing"));data2.add(new Tuple2<>(2,"shanghai"));data2.add(new Tuple2<>(4,"guangzhou"));DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);/*** 左外連接** 注意:second這個tuple中的元素可能為null**/text1.leftOuterJoin(text2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if(second==null){return new Tuple3<>(first.f0,first.f1,"null");}else{return new Tuple3<>(first.f0,first.f1,second.f1);}}}).print();System.out.println("=============================================================================");/*** 右外連接** 注意:first這個tuple中的數據可能為null**/text1.rightOuterJoin(text2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if(first==null){return new Tuple3<>(second.f0,"null",second.f1);}return new Tuple3<>(first.f0,first.f1,second.f1);}}).print();System.out.println("=============================================================================");/*** 全外連接** 注意:first和second這兩個tuple都有可能為null**/text1.fullOuterJoin(text2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if(first==null){return new Tuple3<>(second.f0,"null",second.f1);}else if(second == null){return new Tuple3<>(first.f0,first.f1,"null");}else{return new Tuple3<>(first.f0,first.f1,second.f1);}}}).print();} } import org.apache.flink.api.scala.ExecutionEnvironment import scala.collection.mutable.ListBuffer/*** Created by xxxx on 2020/10/09*/ object BatchDemoOuterJoinScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val data1 = ListBuffer[Tuple2[Int,String]]()data1.append((1,"zs"))data1.append((2,"ls"))data1.append((3,"ww"))val data2 = ListBuffer[Tuple2[Int,String]]()data2.append((1,"beijing"))data2.append((2,"shanghai"))data2.append((4,"guangzhou"))val text1 = env.fromCollection(data1)val text2 = env.fromCollection(data2)text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{if(second==null){(first._1,first._2,"null")}else{(first._1,first._2,second._2)}}).print()println("===============================")text1.rightOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{if(first==null){(second._1,"null",second._2)}else{(first._1,first._2,second._2)}}).print()println("===============================")text1.fullOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{if(first==null){(second._1,"null",second._2)}else if(second==null){(first._1,first._2,"null")}else{(first._1,first._2,second._2)}}).print()}}?Cross:獲取兩個數據集的笛卡爾積
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.CrossOperator; import org.apache.flink.api.java.operators.DataSource;import java.util.ArrayList;/*** 獲取笛卡爾積** Created by xxxx on 2020/10/09 .*/ public class BatchDemoCross {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//tuple2<用戶id,用戶姓名>ArrayList<String> data1 = new ArrayList<>();data1.add("zs");data1.add("ww");//tuple2<用戶id,用戶所在城市>ArrayList<Integer> data2 = new ArrayList<>();data2.add(1);data2.add(2);DataSource<String> text1 = env.fromCollection(data1);DataSource<Integer> text2 = env.fromCollection(data2);CrossOperator.DefaultCross<String, Integer> cross = text1.cross(text2);cross.print();/*** 輸出結果為:* (zs,1)* (zs,2)* (ww,1)* (ww,2)*/} } import org.apache.flink.api.scala.ExecutionEnvironment/*** Created by xxxx on 2020/10/09 on 2018/10/30.*/ object BatchDemoCrossScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val data1 = List("zs","ww")val data2 = List(1,2)val text1 = env.fromCollection(data1)val text2 = env.fromCollection(data2)text1.cross(text2).print()} }?Union:返回兩個數據集的總和,數據類型需要一致。
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.UnionOperator; import org.apache.flink.api.java.tuple.Tuple2;import java.util.ArrayList;/*** Created by xxxx on 2020/10/09 .*/ public class BatchDemoUnion {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();data1.add(new Tuple2<>(1,"zs"));data1.add(new Tuple2<>(2,"ls"));data1.add(new Tuple2<>(3,"ww"));ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();data2.add(new Tuple2<>(1,"lili"));data2.add(new Tuple2<>(2,"jack"));data2.add(new Tuple2<>(3,"jessic"));DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1);DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2);UnionOperator<Tuple2<Integer, String>> union = text1.union(text2);union.print(); }} import org.apache.flink.api.scala.ExecutionEnvironment import scala.collection.mutable.ListBuffer/*** Created by xxxx on 2020/10/09*/ object BatchDemoUnionScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val data1 = ListBuffer[Tuple2[Int,String]]()data1.append((1,"zs"))data1.append((2,"ls"))data1.append((3,"ww"))val data2 = ListBuffer[Tuple2[Int,String]]()data2.append((1,"jack"))data2.append((2,"lili"))data2.append((3,"jessic"))val text1 = env.fromCollection(data1)val text2 = env.fromCollection(data2)text1.union(text2).print()}}?First-n:獲取集合中的前N個元素。
import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2;import java.util.ArrayList;/*** 獲取集合中的前N個元素* Created by xxxx on 2020/10/09 .*/ public class BatchDemoFirstN {public static void main(String[] args) throws Exception{//獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();data.add(new Tuple2<>(2,"zs"));data.add(new Tuple2<>(4,"ls"));data.add(new Tuple2<>(3,"ww"));data.add(new Tuple2<>(1,"xw"));data.add(new Tuple2<>(1,"aw"));data.add(new Tuple2<>(1,"mw"));DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);//獲取前3條數據,按照數據插入的順序text.first(3).print();System.out.println("==============================");//根據數據中的第一列進行分組,獲取每組的前2個元素text.groupBy(0).first(2).print();System.out.println("==============================");//根據數據中的第一列分組,再根據第二列進行組內排序[升序],獲取每組的前2個元素text.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();System.out.println("==============================");//不分組,全局排序獲取集合中的前3個元素,針對第一個元素升序,第二個元素倒序text.sortPartition(0,Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print();}} import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala.ExecutionEnvironment import scala.collection.mutable.ListBuffer/*** Created by xxxx on 2020/10/09*/ object BatchDemoFirstNScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val data = ListBuffer[Tuple2[Int,String]]()data.append((2,"zs"))data.append((4,"ls"))data.append((3,"ww"))data.append((1,"xw"))data.append((1,"aw"))data.append((1,"mw"))val text = env.fromCollection(data)//獲取前3條數據,按照數據插入的順序text.first(3).print()println("==============================")//根據數據中的第一列進行分組,獲取每組的前2個元素text.groupBy(0).first(2).print()println("==============================")//根據數據中的第一列分組,再根據第二列進行組內排序[升序],獲取每組的前2個元素text.groupBy(0).sortGroup(1,Order.ASCENDING).first(2).print()println("==============================")//不分組,全局排序獲取集合中的前3個元素,text.sortPartition(0,Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print()}}?Sort Partition:在本地對數據集的所有分區進行排序,通過sortPartition()的鏈接調用來完成對多個字段的排序。
?Rebalance:對數據集進行再平衡,重分區,消除數據傾斜。
?Hash-Partition:根據指定key的哈希值對數據集進行分區(例如:partitionByHash())
?Range-Partiton:根據指定的key對數據集進行范圍分區(例如:partitionByRange())
?Custom Partitioning:自定義分區規則
一、自定義分區需要實現Partitioner接口
二、partitionCustom(partitioner, “someKey”)
三、或者partitionCustom(partitioner, 0)
1.11.3.DataSet Sink部分詳解
writeAsText():將元素以字符串形式逐行寫入,這些字符串通過調用每個元素的toString()方法來獲取
writeAsCsv():將元組以逗號分隔寫入文件中,行及字段之間的分隔是可配置的。每個字段的值來自對象的toString()方法
print():打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中。
總結
以上是生活随笔為你收集整理的1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 公积金买房贷款最多贷多少
- 下一篇: 160Hz的屏幕刷新率即将到来 160W