1.13.、1.14.Flink 支持的DataType和序列化、Flink Broadcast Accumulators Counters Distributed Cache
1.13.Flink 支持的DataType和序列化
1.13.1.Flink支持的DataType
1.13.2.Flink的序列化
1.14.Flink Broadcast & Accumulators & Counters &Distributed Cache
1.14.1.DataStreaming中的Broadcast
1.14.2.Flink Broadcast(廣播變量)
1.14.3.Flink Accumulators & Counters
1.14.4.Flink Broadcast和Accumulators的區(qū)別
1.14.5.Flink Distributed Cache(分布式緩存)
1.13.Flink 支持的DataType和序列化
1.13.1.Flink支持的DataType
?Java Tuple 和 Scala case class
?Java POJOs:java實體類
?Primitive Types
默認支持java和scala基本類型
?General Class Types
默認支持大多數(shù)java和scala class
?Hadoop Writables
支持hadoop中實現(xiàn)了org.apache.hadoop.Writable的數(shù)據類型。
?Special Types
例如scala中的Either Option和Try
1.13.2.Flink的序列化
?Flink自帶了針對諸如int,long,String等標準類型的序列化器
?針對Flink無法實現(xiàn)序列化的數(shù)據類型,我們可以交給Avro和Kryo
使用方法:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
使用avro序列化:env.getConfig().enableForceAvro();
使用kryo序列化:env.getConfig().enableForceKryo();
使用自定義序列化:env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
1.14.Flink Broadcast & Accumulators & Counters &Distributed Cache
1.14.1.DataStreaming中的Broadcast
?把元素廣播給所有的分區(qū),數(shù)據會被重復處理
一、類似于storm中的allGrouping
二、dataStream.broadcast()
1.14.2.Flink Broadcast(廣播變量)
廣播變量允許編程人員在每臺機器上保持1個只讀的緩存變量,而不是傳送變量的副本給tasks
廣播變量創(chuàng)建后,它可以運行在集群中的任何function上,而不需要多次傳遞給集群節(jié)點。另外需要記住,不應該修改廣播變量,這樣才能確保每個節(jié)點獲取到的值都是一致的。
一句話解釋,可以理解為是一個公共的共享變量,我們可以把一個dataset數(shù)據集廣播出去,然后不同的task在節(jié)點上都能夠獲取到,這個數(shù)據在每個節(jié)點上只會存在一份。如果不使用broadcast,則在每個節(jié)點中的每個task中都需要拷貝一份dataset數(shù)據集,比較浪費內存(也就是一個節(jié)點中可能會存在多份dataset數(shù)據)。
用法:
1:初始化數(shù)據
DataSet toBroadcast = env.fromElements(1, 2, 3)
2:廣播數(shù)據
.withBroadcastSet(toBroadcast,”broadcastSetName”)
3:獲取數(shù)據
Collection broadcastSet = getRuntimeContext().getBroadcastVariable(“broadcastSetName”);
注意:
1:廣播出去的變量存在于每個節(jié)點的內存中,所以這個數(shù)據集不能太大。因為廣播出去的數(shù)據,會常駐內存,除非程序執(zhí)行結束。
2:廣播變量在初始化廣播出去以后不支持修改,這樣才能保證每個節(jié)點的數(shù)據都是一致的。
1.14.3.Flink Accumulators & Counters
?Accumulator即累加器,與Mapreduce counter的應用場景差不多,都能很好地觀察task在運行期間的數(shù)據變化。
?可以在Flink job任務中的算子函數(shù)中操作累加器,但是只能在任務執(zhí)行結束之后才能獲得累加器的最終結果。
?Counter是一個具體的累加器(Accumulator)實現(xiàn)
IntCounter, LongCounter 和 DoubleCounter
用法:
1:創(chuàng)建累加器
private IntCounter numLines = new IntCounter();
2:注冊累加器
getRuntimeContext().addAccummulator(“num-lines”,this.numLines);
3:使用累加器
this.numLines.add(1);
4:獲取累加器的結果
myJobExecutionResult.getAccumulatorResult(“num-lines”)
案例:
import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.configuration.Configuration;/*** 全局累加器** counter 計數(shù)器** 需求:* 計算map函數(shù)中處理了多少數(shù)據** 注意:只有在任務執(zhí)行結束后,才能獲取到累加器的值** Created by xxx.xxx on 2018/10/8.*/ public class BatchDemoCounter {public static void main(String[] args) throws Exception{//獲取運行環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> data = env.fromElements("a", "b", "c", "d", "e");DataSet<String> result = data.map(new RichMapFunction<String, String>() {//1:創(chuàng)建累加器private IntCounter numLines = new IntCounter();@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//2:注冊累加器getRuntimeContext().addAccumulator("num-lines",this.numLines);}//int sum = 0;@Overridepublic String map(String value) throws Exception {//如果并行度為1,使用普通的累加求和即可,但是設置多個并行度,則普通的累加求和結果就不準了//sum++;//System.out.println("sum:"+sum);this.numLines.add(1);return value;}}).setParallelism(8);//result.print();result.writeAsText("d:\\data\\count10");JobExecutionResult jobResult = env.execute("counter");//3:獲取累加器int num = jobResult.getAccumulatorResult("num-lines");System.out.println("num:"+num);}} import org.apache.flink.api.common.accumulators.IntCounter import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration/*** counter 累加器* Created by xxxx on 2020/10/09 on 2018/10/30.*/ object BatchDemoCounterScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val data = env.fromElements("a","b","c","d")val res = data.map(new RichMapFunction[String,String] {//1:定義累加器val numLines = new IntCounteroverride def open(parameters: Configuration): Unit = {super.open(parameters)//2:注冊累加器getRuntimeContext.addAccumulator("num-lines",this.numLines)}override def map(value: String) = {this.numLines.add(1)value}}).setParallelism(4)res.writeAsText("d:\\data\\count21")val jobResult = env.execute("BatchDemoCounterScala")//3:獲取累加器val num = jobResult.getAccumulatorResult[Int]("num-lines")println("num:"+num)}}1.14.4.Flink Broadcast和Accumulators的區(qū)別
?Broadcast(廣播變量)允許程序將一個只讀的變量緩存在每臺機器上,而不用在任務之間傳遞變量。廣播變量可以進行共享,但是不可以進行修改。
?Accumulators(累加器)是可以在不同任務中對同一個變量進行累加操作。
1.14.5.Flink Distributed Cache(分布式緩存)
Flink提供了一個分布式緩存,類似于hadoop,可以使用戶在并行函數(shù)中很方便的讀取本地文件
此緩存的工作機制如下:程序注冊一個文件或者目錄(本地或遠程文件系統(tǒng),例如hdfs或者s3),通過ExecutionEnvironment注冊緩存文件并為它起一個名字。當程序執(zhí)行,Flink自動將文件或目錄復制到所有taskmanager節(jié)點的本地文件系統(tǒng),用戶可以通過這個指定的名稱查找文件或者目錄,然后從taskmanager節(jié)點的本地文件系統(tǒng)訪問它
用戶:
1:注冊一個文件
env.registerCachedFile(“hdfs:///path/to/your/file”, “hdfsFile”)
2、訪問數(shù)據
File myFile = getRuntimeContext().getDistributedCache().getFile(“hdfsFile”);
案例:
import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.configuration.Configuration;import java.io.File; import java.util.ArrayList; import java.util.List;/*** Distributed Cache** Created by xxxx on 2020/10/09 .*/ public class BatchDemoDisCache {public static void main(String[] args) throws Exception{//獲取運行環(huán)境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1:注冊一個文件,可以使用hdfs或者s3上的文件env.registerCachedFile("d:\\data\\file\\a.txt","a.txt");DataSource<String> data = env.fromElements("a", "b", "c", "d");DataSet<String> result = data.map(new RichMapFunction<String, String>() {private ArrayList<String> dataList = new ArrayList<String>();@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//2:使用文件File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");List<String> lines = FileUtils.readLines(myFile);for (String line : lines) {this.dataList.add(line);System.out.println("line:" + line);}}@Overridepublic String map(String value) throws Exception {//在這里就可以使用dataListreturn value;}});result.print();} } import org.apache.commons.io.FileUtils import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration/*** Distributed Cache* Created by xxxx on 2020/10/09*/ object BatchDemoDisCacheScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//1:注冊文件env.registerCachedFile("d:\\data\\file\\a.txt","b.txt")val data = env.fromElements("a","b","c","d")val result = data.map(new RichMapFunction[String,String] {override def open(parameters: Configuration): Unit = {super.open(parameters)val myFile = getRuntimeContext.getDistributedCache.getFile("b.txt")val lines = FileUtils.readLines(myFile)val it = lines.iterator()while (it.hasNext){val line = it.next();println("line:"+line)}}override def map(value: String) = {value}})result.print()} }總結
以上是生活随笔為你收集整理的1.13.、1.14.Flink 支持的DataType和序列化、Flink Broadcast Accumulators Counters Distributed Cache的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 网贷为什么要开通存管账户
- 下一篇: 北邮国安是国企吗