flink DataStream API使用及原理
傳統的大數據處理方式一般是批處理式的,也就是說,今天所收集的數據,我們明天再把今天收集到的數據算出來,以供大家使用,但是在很多情況下,數據的時效性對于業務的成敗是非常關鍵的。
Spark 和 Flink 都是通用的開源大規模處理引擎,目標是在一個系統中支持所有的數據處理以帶來效能的提升。兩者都有相對比較成熟的生態系統。是下一代大數據引擎最有力的競爭者。
Spark 的生態總體更完善一些,在機器學習的集成和易用性上暫時領先。
Flink 在流計算上有明顯優勢,核心架構和模型也更透徹和靈活一些。
本文主要通過實例來分析flink的流式處理過程,并通過源碼的方式來介紹流式處理的內部機制。
DataStream整體概述
主要分5部分,下面我們來分別介紹:
?1.運行環境StreamExecutionEnvironment
StreamExecutionEnvironment是個抽象類,是流式處理的容器,實現類有兩個,分別是
LocalStreamEnvironment: RemoteStreamEnvironment: /*** The StreamExecutionEnvironment is the context in which a streaming program is executed. A* {@link LocalStreamEnvironment} will cause execution in the current JVM, a* {@link RemoteStreamEnvironment} will cause execution on a remote setup.** <p>The environment provides methods to control the job execution (such as setting the parallelism* or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).** @see org.apache.flink.streaming.api.environment.LocalStreamEnvironment* @see org.apache.flink.streaming.api.environment.RemoteStreamEnvironment*/2.數據源DataSource數據輸入
包含了輸入格式InputFormat
/*** Creates a new data source.** @param context The environment in which the data source gets executed.* @param inputFormat The input format that the data source executes.* @param type The type of the elements produced by this input format.*/public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type, String dataSourceLocationName) {super(context, type);this.dataSourceLocationName = dataSourceLocationName;if (inputFormat == null) {throw new IllegalArgumentException("The input format may not be null.");}this.inputFormat = inputFormat;if (inputFormat instanceof NonParallelInput) {this.parallelism = 1;}}?flink將數據源主要分為內置數據源和第三方數據源,內置數據源有 文件,網絡socket端口及集合類型數據;第三方數據源實用Connector的方式來連接如kafka Connector,es connector等,自己定義的話,可以實現SourceFunction,封裝成Connector來做。
?
3.DataStream轉換
DataStream:同一個類型的流元素,DataStream可以通過transformation轉換成另外的DataStream,示例如下
@link DataStream#map
@link DataStream#filter
?StreamOperator:流式算子的基本接口,三個實現類
AbstractStreamOperator:
OneInputStreamOperator:
TwoInputStreamOperator:
/*** Basic interface for stream operators. Implementers would implement one of* {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or* {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators* that process elements.** <p>The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}* offers default implementation for the lifecycle and properties methods.** <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using* the timer service, timer callbacks are also guaranteed not to be called concurrently with* methods on {@code StreamOperator}.** @param <OUT> The output type of the operator*/?4.DataStreamSink輸出
/*** Adds the given sink to this DataStream. Only streams with sinks added* will be executed once the {@link StreamExecutionEnvironment#execute()}* method is called.** @param sinkFunction* The object containing the sink's invoke function.* @return The closed DataStream.*/public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {// read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType();// configure the type if neededif (sinkFunction instanceof InputTypeConfigurable) {((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());}StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);getExecutionEnvironment().addOperator(sink.getTransformation());return sink;}5.執行
/*** Executes the JobGraph of the on a mini cluster of ClusterUtil with a user* specified name.** @param jobName* name of the job* @return The result of the job execution, containing elapsed time and accumulators.*/@Overridepublic JobExecutionResult execute(String jobName) throws Exception {// transform the streaming program into a JobGraphStreamGraph streamGraph = getStreamGraph();streamGraph.setJobName(jobName);JobGraph jobGraph = streamGraph.getJobGraph();jobGraph.setAllowQueuedScheduling(true);Configuration configuration = new Configuration();configuration.addAll(jobGraph.getJobConfiguration());configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");// add (and override) the settings with what the user definedconfiguration.addAll(this.configuration);if (!configuration.contains(RestOptions.BIND_PORT)) {configuration.setString(RestOptions.BIND_PORT, "0");}int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(numSlotsPerTaskManager).build();if (LOG.isInfoEnabled()) {LOG.info("Running job on local embedded Flink mini cluster");}MiniCluster miniCluster = new MiniCluster(cfg);try {miniCluster.start();configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());return miniCluster.executeJobBlocking(jobGraph);}finally {transformations.clear();miniCluster.close();}}6.總結
Flink的執行方式類似于管道,它借鑒了數據庫的一些執行原理,實現了自己獨特的執行方式。
7.展望
Stream涉及的內容還包括Watermark,window等概念,因篇幅限制,這篇僅介紹flink DataStream API使用及原理。
下篇將介紹Watermark,下下篇是windows窗口計算。
參考資料
【1】https://baijiahao.baidu.com/s?id=1625545704285534730&wfr=spider&for=pc
【2】https://blog.51cto.com/13654660/2087705
轉載于:https://www.cnblogs.com/davidwang456/p/11046857.html
總結
以上是生活随笔為你收集整理的flink DataStream API使用及原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: A Step By Step Guide
- 下一篇: 四张图带你了解Tomcat系统架构--让