?一、前述
SparkStreaming是流式處理框架,是Spark API的擴展,支持可擴展、高吞吐量、容錯的實時數據流處理,實時數據的來源可以是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,并且可以使用高級功能的復雜算子來處理流數據。例如:map,reduce,join,window 。最終,處理后的數據可以存放在文件系統,數據庫等,方便實時展現。
二、SparkStreaming與Storm的區別
1、Storm是純實時的流式處理框架,SparkStreaming是準實時的處理框架(微批處理)。因為微批處理,SparkStreaming的吞吐量比Storm要高。
2、Storm?的事務機制要比SparkStreaming的要完善。
3、Storm支持動態資源調度。(spark1.2開始和之后也支持)
4、SparkStreaming擅長復雜的業務處理,Storm不擅長復雜的業務處理,擅長簡單的匯總型計算。
三、Spark初始
?
?
- receiver ?task是7*24小時一直在執行,一直接受數據,將一段時間內接收來的數據保存到batch中。假設batchInterval為5s,那么會將接收來的數據每隔5秒封裝到一個batch中,batch沒有分布式計算特性,這一個batch的數據又被封裝到一個RDD中,RDD最終封裝到一個DStream中。
例如:假設batchInterval為5秒,每隔5秒通過SparkStreamin將得到一個DStream,在第6秒的時候計算這5秒的數據,假設執行任務的時間是3秒,那么第6~9秒一邊在接收數據,一邊在計算任務,9~10秒只是在接收數據。然后在第11秒的時候重復上面的操作。
- 如果job執行的時間大于batchInterval會有什么樣的問題?
如果接受過來的數據設置的級別是僅內存,接收來的數據會越堆積越多,最后可能會導致OOM(如果設置StorageLevel包含disk, 則內存存放不下的數據會溢寫至disk, 加大延遲?)。
?
四、SparkStreaming代碼
?
代碼注意事項:
?
- 啟動socket server 服務器:nc –lk 9999
- receiver模式下接受數據,local的模擬線程必須大于等于2,一個線程用來receiver用來接受數據,另一個線程用來執行job。
- Durations時間設置就是我們能接收的延遲度。這個需要根據集群的資源情況以及任務的執行情況來調節。
- 創建JavaStreamingContext有兩種方式(SparkConf,SparkContext)
- 所有的代碼邏輯完成后要有一個output operation類算子觸發執行。
- JavaStreamingContext.start() Streaming框架啟動后不能再次添加業務邏輯。
- JavaStreamingContext.stop() 無參的stop方法將SparkContext一同關閉,stop(false),不會關閉SparkContext,方便后面使用
- JavaStreamingContext.stop()停止之后不能再調用start。
package com.spark.sparkstreaming;import java.util.Arrays;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;
/*** 1、local的模擬線程數必須大于等于2 因為一條線程被receiver(接受數據的線程)占用,另外一個線程是job執行* 2、Durations時間的設置,就是我們能接受的延遲度,這個我們需要根據集群的資源情況以及監控每一個job的執行時間來調節出最佳時間。* 3、 創建JavaStreamingContext有兩種方式 (sparkconf、sparkcontext)* 4、業務邏輯完成后,需要有一個output operator* 5、JavaStreamingContext.start()straming框架啟動之后是不能在次添加業務邏輯* 6、JavaStreamingContext.stop()無參的stop方法會將sparkContext一同關閉,stop(false) ,默認為true,會一同關閉* 7、JavaStreamingContext.stop()停止之后是不能在調用start *//*** foreachRDD 算子注意:* 1.foreachRDD是DStream中output operator類算子* 2.foreachRDD可以遍歷得到DStream中的RDD,可以在這個算子內對RDD使用RDD的Transformation類算子進行轉化,但是一定要使用rdd的Action類算子觸發執行。* 3.foreachRDD可以得到DStream中的RDD,在這個算子內,RDD算子外執行的代碼是在Driver端執行的,RDD算子內的代碼是在Executor中執行。**/
public class WordCountOnline {@SuppressWarnings("deprecation"
)public static void main(String[] args) {SparkConf conf =
new SparkConf().setMaster("
local[2]").setAppName("WordCountOnline"
);/*** 在創建streaminContext的時候 設置batch Interval*/JavaStreamingContext jsc =
new JavaStreamingContext(conf, Durations.seconds(5
));
// JavaSparkContext sc = new JavaSparkContext(conf);
// JavaStreamingContext jsc = new JavaStreamingContext(sc,Durations.seconds(5));//兩種辦法得到StreamingContext
// JavaSparkContext sparkContext = jsc.sparkContext();
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("node04", 9999
);//監控socket端口9999JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>
() {/*** */private static final long serialVersionUID = 1L
;@Overridepublic Iterable<String>
call(String s) {return Arrays.asList(s.split(" "
));}});JavaPairDStream<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>
() {/*** */private static final long serialVersionUID = 1L
;@Overridepublic Tuple2<String, Integer>
call(String s) {return new Tuple2<String, Integer>(s, 1
);}});JavaPairDStream<String, Integer> counts = ones.reduceByKey(
new Function2<Integer, Integer, Integer>
() {/*** */private static final long serialVersionUID = 1L
;@Overridepublic Integer call(Integer i1, Integer i2) {return i1 +
i2;}});//outputoperator類的算子
counts.print();/*counts.foreachRDD(new VoidFunction<JavaPairRDD<String,Integer>>() {*//*** 這里寫的代碼是在Driver端執行的*//*private static final long serialVersionUID = 1L;@Overridepublic void call(JavaPairRDD<String, Integer> pairRDD) throws Exception {pairRDD.foreach(new VoidFunction<Tuple2<String,Integer>>() {*//*** *//*private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Integer> tuple)throws Exception {System.out.println("tuple ---- "+tuple );}});}});*/jsc.start();//等待spark程序被終止
jsc.awaitTermination();jsc.stop(false);}
} scala代碼:
?
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Durationsobject Operator_foreachRDD {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]").setAppName("foreachRDD")val sc = new SparkContext(conf)val jsc = new StreamingContext(sc,Durations.seconds(5))val socketDStream = jsc.socketTextStream("node5", 9999)socketDStream.foreachRDD { rdd => {rdd.foreach { elem => {println(elem)} }}}jsc.start()jsc.awaitTermination()jsc.stop(false)}
}
?
?
?
轉載于:https://www.cnblogs.com/LHWorldBlog/p/8435423.html
總結
以上是生活随笔為你收集整理的【Spark篇】---SparkStream初始与应用的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。