Spark Streaming源码分析 – DStream
A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data.
Dstream本質就是離散化的stream,將stream離散化成一組RDD的list,所以基本的操作仍然是以RDD為基礎
下面看到DStream的基本定義,對于普通的RDD而言,時間對于DStream是更為重要的因素
將stream切分成RDD的interval時間,stream開始的時間,DStream需要保留的RDD的時間,每個RDD所對于的時間key……
DStream抽象定義
/*** A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous* sequence of RDDs (of the same type) representing a continuous stream of data (see* org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).* DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,* etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by* transforming existing DStreams using operations such as `map`,* `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream* periodically generates a RDD, either from live data or by transforming the RDD generated by a* parent DStream.** This class contains the basic operations available on all DStreams, such as `map`, `filter` and* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains* operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and* `join`. These operations are automatically available on any DStream of pairs* (e.g., DStream[(Int, Int)] through implicit conversions when* `org.apache.spark.streaming.StreamingContext._` is imported.** DStreams internally is characterized by a few basic properties:* - A list of other DStreams that the DStream depends on* - A time interval at which the DStream generates an RDD* - A function that is used to generate an RDD after each time interval*/abstract class DStream[T: ClassTag] (@transient private[streaming] var ssc: StreamingContext) extends Serializable with Logging {// =======================================================================// Methods that should be implemented by subclasses of DStream// =======================================================================/** Time interval after which the DStream generates a RDD */def slideDuration: Duration // 將stream切分成RDD的interval/** List of parent DStreams on which this DStream depends on */def dependencies: List[DStream[_]] // 和RDD一樣,DStream之間也存在dependency關系/** Method that generates a RDD for the given time */def compute (validTime: Time): Option[RDD[T]] // RDD的生成邏輯// =======================================================================// Methods and fields available on all DStreams// =======================================================================// RDDs generated, marked as private[streaming] so that testsuites can access it@transientprivate[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () // 最為核心的結構,可以看到DStream就是以time為key的RDD的hashmap// Time zero for the DStreamprivate[streaming] var zeroTime: Time = null // Stream開始的時間// Duration for which the DStream will remember each RDD createdprivate[streaming] var rememberDuration: Duration = null // Stream是無限的,而在DStream不可能保留所有的RDD,所以設置DStream需要remember的duration// Storage level of the RDDs in the streamprivate[streaming] var storageLevel: StorageLevel = StorageLevel.NONE// Checkpoint detailsprivate[streaming] val mustCheckpoint = falseprivate[streaming] var checkpointDuration: Duration = nullprivate[streaming] val checkpointData = new DStreamCheckpointData(this)// Reference to whole DStream graphprivate[streaming] var graph: DStreamGraph = null // DStreamGraph// Duration for which the DStream requires its parent DStream to remember each RDD createdprivate[streaming] def parentRememberDuration = rememberDuration/** Return the StreamingContext associated with this DStream */def context = ssc /** Persist the RDDs of this DStream with the given storage level */def persist(level: StorageLevel): DStream[T] = {this.storageLevel = levelthis}/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */def cache(): DStream[T] = persist()/*** Enable periodic checkpointing of RDDs of this DStream* @param interval Time interval after which generated RDD will be checkpointed*/def checkpoint(interval: Duration): DStream[T] = {persist()checkpointDuration = intervalthis} }
getOrCompute
注意的是,這里是產生RDD對象,而不是真正的進行計算,只有在runjob時才會做真正的計算
Spark RDD本身是不包含具體數據的,只是定義了workflow(依賴關系),處理邏輯
generateJob
對于用getOrCompute產生的RDD對象,需要封裝成job
而Job的關鍵,jobFunc,其實就是想Spark集群提交一個job
這里只是使用了emptyFunc,具體的output邏輯是需要被具體的outputDStream改寫的
clearMetadata
清除過時的RDD對象,其中還會做unpersist,以及調用dependencies的clearMetadata
具體DStream的定義
FilteredDStream
package org.apache.spark.streaming.dstreamprivate[streaming] class FilteredDStream[T: ClassTag](parent: DStream[T],filterFunc: T => Boolean) extends DStream[T](parent.ssc) {override def dependencies = List(parent)override def slideDuration: Duration = parent.slideDurationoverride def compute(validTime: Time): Option[RDD[T]] = {parent.getOrCompute(validTime).map(_.filter(filterFunc))} }?
WindowedDStream
?
ShuffledDStream
private[streaming] class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](parent: DStream[(K,V)],createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiner: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true) extends DStream[(K,C)] (parent.ssc) {override def dependencies = List(parent)override def slideDuration: Duration = parent.slideDurationoverride def compute(validTime: Time): Option[RDD[(K,C)]] = {parent.getOrCompute(validTime) match {case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))case None => None}} }?
PairDStreamFunctions
以groupByKey為例,和普通Spark里面沒啥區別,依賴是基于combineByKey實現
比較有特點是提供groupByKeyAndWindow,其實就是先使用WindowedDStream將windows中的RDD union,然后再使用combineByKey
groupByKeyAndWindow
/*** Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.* Similar to `DStream.groupByKey()`, but applies it over a sliding window.* @param windowDuration width of the window; must be a multiple of this DStream's* batching interval* @param slideDuration sliding interval of the window (i.e., the interval after which* the new DStream will generate RDDs); must be a multiple of this* DStream's batching interval* @param partitioner partitioner for controlling the partitioning of each RDD in the new* DStream.*/def groupByKeyAndWindow(windowDuration: Duration,slideDuration: Duration,partitioner: Partitioner): DStream[(K, Seq[V])] = {val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= vval mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= vval mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2self.groupByKey(partitioner).window(windowDuration, slideDuration) // DStream.window會將當前的dstream封裝成WindowedDStream,見下面的代碼.combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner).asInstanceOf[DStream[(K, Seq[V])]]} /*** Return a new DStream in which each RDD contains all the elements in seen in a* sliding window of time over this DStream.* @param windowDuration width of the window; must be a multiple of this DStream's* batching interval* @param slideDuration sliding interval of the window (i.e., the interval after which* the new DStream will generate RDDs); must be a multiple of this* DStream's batching interval*/def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {new WindowedDStream(this, windowDuration, slideDuration)}?
updateStateByKey
/*** Return a new "state" DStream where the state for each key is updated by applying* the given function on the previous state of the key and the new values of each key.* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.* @param updateFunc State update function. If `this` function returns None, then* corresponding state key-value pair will be eliminated. Note, that* this function may generate a different a tuple with a different key* than the input key. It is up to the developer to decide whether to* remember the partitioner despite the key being changed.* @param partitioner Partitioner for controlling the partitioning of each RDD in the new* DStream* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.* @tparam S State type*/def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean): DStream[(K, S)] = {new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)}StateDStream
普通的DStream,都是直接從ParentRDD通過compute來得到當前的RDD
而StateDStream的特別之處,除了ParentRDD,還需要參考PreviousRDD,這個只存在在stream場景下,只有這個場景下,RDD之間才存在時間關系
PreviousRDD = getOrCompute(validTime - slideDuration),即在DStream的generatedRDDs上前一個時間interval上的RDD
處理函數,val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { },需要3個參數,key,ParentRDD上的value,PreviousRDD上的value
處理函數需要考慮,當ParentRDD或PreviousRDD為空的情況
注意StateDStream,默認需要做persist和checkpoint
private[streaming] class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](parent: DStream[(K, V)],updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,preservePartitioning: Boolean) extends DStream[(K, S)](parent.ssc) {super.persist(StorageLevel.MEMORY_ONLY_SER) // RDD persist默認設為memory,因為后面的RDD需要用到override def dependencies = List(parent)override def slideDuration: Duration = parent.slideDurationoverride val mustCheckpoint = true // 默認需要checkpoint,需要保持狀態override def compute(validTime: Time): Option[RDD[(K, S)]] = {// Try to get the previous state RDDgetOrCompute(validTime - slideDuration) match {case Some(prevStateRDD) => { // If previous state RDD exists// Try to get the parent RDDparent.getOrCompute(validTime) match { // 既有PreviousRDD,又有ParentRDD的casecase Some(parentRDD) => { // If parent RDD exists, then compute as usual// Define the function for the mapPartition operation on cogrouped RDD;// first map the cogrouped tuple to tuples of required type,// and then apply the update functionval updateFuncLocal = updateFuncval finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {val i = iterator.map(t => {(t._1, t._2._1, t._2._2.headOption)})updateFuncLocal(i)}val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) //`(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)Some(stateRDD)}case None => { // If parent RDD does not exist,ParentRDD不存在// Re-apply the update function to the old state RDDval updateFuncLocal = updateFuncval finalFunc = (iterator: Iterator[(K, S)]) => {val i = iterator.map(t => (t._1, Seq[V](), Option(t._2))) // 直接把ParentRDD置空,Seq[V]()updateFuncLocal(i)}val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)Some(stateRDD)}}}case None => { // If previous session RDD does not exist (first input data)// Try to get the parent RDDparent.getOrCompute(validTime) match {case Some(parentRDD) => { // If parent RDD exists, then compute as usual,PreviousRDD為空的case,說明是第一個state RDD// Define the function for the mapPartition operation on grouped RDD;// first map the grouped tuple to tuples of required type,// and then apply the update functionval updateFuncLocal = updateFuncval finalFunc = (iterator: Iterator[(K, Seq[V])]) => {updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None))) // 把PreviousRDD置為None}val groupedRDD = parentRDD.groupByKey(partitioner)val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)//logDebug("Generating state RDD for time " + validTime + " (first)")Some(sessionRDD)}case None => { // If parent RDD does not exist, then nothing to do!,previous和parent都沒有,當然啥也做不了//logDebug("Not generating state RDD (no previous state, no parent)")None}}}}} }?
TransformedDStream
首先這是個比較通用的operation,可以通過自定義的transformFunc,將一組parentRDDs計算出當前的RDD
需要注意的是,這些parentRDDs必須在同一個streamContext下,并且有相同的slideDuration
在DStream接口中,可以提供transform和transformWith兩種,參考下面源碼
總結
以上是生活随笔為你收集整理的Spark Streaming源码分析 – DStream的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux的开始
- 下一篇: php __FILE__和$_SERVE