Spark Streaming Backpressure分析
轉載自:http://www.cnblogs.com/barrenlake/p/5349949.html#
1、為什么引入Backpressure
????????????????默認情況下,Spark Streaming通過Receiver以生產者生產數據的速率接收數據,計算過程中會出現 batch processing time > batch interval 的情況,其中 batch processing time?為實際計算一個批次花費時間,?batch interval為Streaming應用設置的批處理間隔。這意味著Spark Streaming的數據接收速率高于Spark從隊列中移除數據的速率,也就是數據處理能力低,在設置間隔內不能完全處理當前接收速率接收的數據。如果這種情況持續過長的時間,會造成數據在內存中堆積,導致Receiver所在Executor內存溢出等問題(如果設置StorageLevel包含disk,?則內存存放不下的數據會溢寫至disk,?加大延遲)。Spark 1.5以前版本,用戶如果要限制Receiver的數據接收速率,可以通過設置靜態配制參數“spark.streaming.receiver.maxRate”的值來實現,此舉雖然可以通過限制接收速率,來適配當前的處理能力,防止內存溢出,但也會引入其它問題。比如:producer數據生產高于maxRate,當前集群處理能力也高于maxRate,這就會造成資源利用率下降等問題。為了更好的協調數據接收速率與資源處理能力,Spark Streaming?從v1.5開始引入反壓機制(back-pressure),通過動態控制數據接收速率來適配集群數據處理能力。
2、Backpressure
????????????????Spark Streaming Backpressure:??根據JobScheduler反饋作業的執行信息來動態調整Receiver數據接收率。通過屬性“spark.streaming.backpressure.enabled”來控制是否啟用backpressure機制,默認值false,即不啟用。
2.1 Streaming架構如下圖所示(詳見Streaming數據接收過程文檔和Streaming?源碼解析)
2.2 BackPressure執行過程如下圖所示:
在原架構的基礎上加上一個新的組件RateController,這個組件負責監聽“OnBatchCompleted”事件,然后從中抽取processingDelay 及schedulingDelay信息. ?Estimator依據這些信息估算出最大處理速度(rate),最后由基于Receiver的Input Stream將rate通過ReceiverTracker與ReceiverSupervisorImpl轉發給BlockGenerator(繼承自RateLimiter).
??
3、BackPressure?源碼解析
3.1 RateController類體系
????????????????RatenController?繼承自StreamingListener.?用于處理BatchCompleted事件。核心代碼為:
*** A StreamingListener that receives batch completion updates, and maintains* an estimate of the speed at which this stream should ingest messages,* given an estimate computation from a `RateEstimator`*/ private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) extends StreamingListener with Serializable { …… …… /*** Compute the new rate limit and publish it asynchronously.*/private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =Future[Unit] {val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)newRate.foreach { s =>rateLimit.set(s.toLong)publish(getLatestRate())}}def getLatestRate(): Long = rateLimit.get()override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {val elements = batchCompleted.batchInfo.streamIdToInputInfofor {processingEnd <- batchCompleted.batchInfo.processingEndTimeworkDelay <- batchCompleted.batchInfo.processingDelaywaitDelay <- batchCompleted.batchInfo.schedulingDelayelems <- elements.get(streamUID).map(_.numRecords)} computeAndPublish(processingEnd, elems, workDelay, waitDelay)} }3.2 RateController的注冊
??????????????? JobScheduler啟動時會抽取在DStreamGraph中注冊的所有InputDstream中的rateController,并向ListenerBus注冊監聽. 此部分代碼如下:
def start(): Unit = synchronized {if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler")eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)}eventLoop.start()// attach rate controllers of input streams to receive batch completion updatesfor {inputDStream <- ssc.graph.getInputStreamsrateController <- inputDStream.rateController} ssc.addStreamingListener(rateController)listenerBus.start()receiverTracker = new ReceiverTracker(ssc)inputInfoTracker = new InputInfoTracker(ssc)receiverTracker.start()jobGenerator.start()logInfo("Started JobScheduler")}3.3 BackPressure執行過程分析
??????????????? BackPressure 執行過程分為BatchCompleted事件觸發時機和事件處理兩個過程
3.3.1 BatchCompleted觸發過程
??????????????? 對BatchedCompleted的分析,應該從JobGenerator入手,因為BatchedCompleted是批次處理結束的標志,也就是JobGenerator產生的作業執行完成時觸發的,因此進行作業執行分析。
??????????????? Streaming 應用中JobGenerator每個Batch Interval都會為應用中的每個Output Stream建立一個Job, 該批次中的所有Job組成一個Job Set.使用JobScheduler的submitJobSet進行批量Job提交。此部分代碼結構如下所示
/** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) {// Set the SparkEnv in this thread, so that job generation code can access the environment// Example: BlockRDDs are created in this thread, and it needs to access BlockManager// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.SparkEnv.set(ssc.env)// Checkpoint all RDDs marked for checkpointing to ensure their lineages are// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")Try {jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batchgraph.generateJobs(time) // generate jobs using allocated block } match {case Success(jobs) =>val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))case Failure(e) =>jobScheduler.reportError("Error generating jobs for time " + time, e)}eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }?其中,sumitJobSet會創建固定數量的后臺線程(具體由“spark.streaming.concurrentJobs”指定),去處理Job Set中的Job.?具體實現邏輯為:
def submitJobSet(jobSet: JobSet) {if (jobSet.jobs.isEmpty) {logInfo("No jobs added for time " + jobSet.time)} else {listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))jobSets.put(jobSet.time, jobSet)jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))logInfo("Added jobs for time " + jobSet.time)} }其中JobHandler用于執行Job及處理Job執行結果信息。當Job執行完成時會產生JobCompleted事件. JobHandler的具體邏輯如下面代碼所示:
private class JobHandler(job: Job) extends Runnable with Logging {import JobScheduler._def run() {try {val formattedTime = UIUtils.formatBatchTime(job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"ssc.sc.setJobDescription(s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)// Checkpoint all RDDs marked for checkpointing to ensure their lineages are// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")// We need to assign `eventLoop` to a temp variable. Otherwise, because// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then// it's possible that when `post` is called, `eventLoop` happens to null.var _eventLoop = eventLoopif (_eventLoop != null) {_eventLoop.post(JobStarted(job, clock.getTimeMillis()))// Disable checks for existing output directories in jobs launched by the streaming// scheduler, since we may need to write output to an existing directory during checkpoint// recovery; see SPARK-4835 for more details.PairRDDFunctions.disableOutputSpecValidation.withValue(true) {job.run()}_eventLoop = eventLoopif (_eventLoop != null) {_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))}} else {// JobScheduler has been stopped. }} finally {ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)}}} }當Job執行完成時,向eventLoop發送JobCompleted事件。EventLoop事件處理器接到JobCompleted事件后將調用handleJobCompletion 來處理Job完成事件。handleJobCompletion使用Job執行信息創建StreamingListenerBatchCompleted事件并通過StreamingListenerBus向監聽器發送。實現如下:
private def handleJobCompletion(job: Job, completedTime: Long) {val jobSet = jobSets.get(job.time)jobSet.handleJobCompletion(job)job.setEndTime(completedTime)listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)if (jobSet.hasCompleted) {jobSets.remove(jobSet.time)jobGenerator.onBatchCompletion(jobSet.time)logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(jobSet.totalDelay / 1000.0, jobSet.time.toString,jobSet.processingDelay / 1000.0))listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))}job.result match {case Failure(e) =>reportError("Error running job " + job, e)case _ =>}}3.3.2、BatchCompleted事件處理過程
??????????????? StreamingListenerBus將事件轉交給具體的StreamingListener,因此BatchCompleted將交由RateController進行處理。RateController接到BatchCompleted事件后將調用onBatchCompleted對事件進行處理。
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {val elements = batchCompleted.batchInfo.streamIdToInputInfofor {processingEnd <- batchCompleted.batchInfo.processingEndTimeworkDelay <- batchCompleted.batchInfo.processingDelaywaitDelay <- batchCompleted.batchInfo.schedulingDelayelems <- elements.get(streamUID).map(_.numRecords)} computeAndPublish(processingEnd, elems, workDelay, waitDelay) }onBatchCompleted會從完成的任務中抽取任務的執行延遲和調度延遲,然后用這兩個參數用RateEstimator(目前存在唯一實現PIDRateEstimator,proportional-integral-derivative (PID) controller,?PID控制器)估算出新的rate并發布。代碼如下:
/*** Compute the new rate limit and publish it asynchronously.*/private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =Future[Unit] {val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)newRate.foreach { s =>rateLimit.set(s.toLong)publish(getLatestRate())}}其中publish()由RateController的子類ReceiverRateController來定義。具體邏輯如下(ReceiverInputDStream中定義):
/*** A RateController that sends the new rate to receivers, via the receiver tracker.*/private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)extends RateController(id, estimator) {override def publish(rate: Long): Unit =ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)}publish的功能為新生成的rate?借助ReceiverTracker進行轉發。ReceiverTracker將rate包裝成UpdateReceiverRateLimit事交ReceiverTrackerEndpoint
/** Update a receiver's maximum ingestion rate */ def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {if (isTrackerStarted) {endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))} }ReceiverTrackerEndpoint接到消息后,其將會從receiverTrackingInfos列表中獲取Receiver注冊時使用的endpoint(實為ReceiverSupervisorImpl),再將rate包裝成UpdateLimit發送至endpoint.其接到信息后,使用updateRate更新BlockGenerators(RateLimiter子類),來計算出一個固定的令牌間隔。
/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */ private val endpoint = env.rpcEnv.setupEndpoint("Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {override val rpcEnv: RpcEnv = env.rpcEnvoverride def receive: PartialFunction[Any, Unit] = {case StopReceiver =>logInfo("Received stop signal")ReceiverSupervisorImpl.this.stop("Stopped by driver", None)case CleanupOldBlocks(threshTime) =>logDebug("Received delete old batch signal")cleanupOldBlocks(threshTime)case UpdateRateLimit(eps) =>logInfo(s"Received a new rate limit: $eps.")registeredBlockGenerators.asScala.foreach { bg =>bg.updateRate(eps)}}})其中RateLimiter的updateRate實現如下:
/*** Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by* {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.** @param newRate A new rate in events per second. It has no effect if it's 0 or negative.*/private[receiver] def updateRate(newRate: Long): Unit =if (newRate > 0) {if (maxRateLimit > 0) {rateLimiter.setRate(newRate.min(maxRateLimit))} else {rateLimiter.setRate(newRate)}}?setRate的實現?如下:
public final void setRate(double permitsPerSecond) {Preconditions.checkArgument(permitsPerSecond > 0.0&& !Double.isNaN(permitsPerSecond), "rate must be positive");synchronized (mutex) {resync(readSafeMicros());double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond; //固定間隔this.stableIntervalMicros = stableIntervalMicros;doSetRate(permitsPerSecond, stableIntervalMicros);}}到此,backpressure反壓機制調整rate結束。
4.流量控制點
當Receiver開始接收數據時,會通過supervisor.pushSingle()方法將接收的數據存入currentBuffer等待BlockGenerator定時將數據取走,包裝成block. 在將數據存放入currentBuffer之時,要獲取許可(令牌)。如果獲取到許可就可以將數據存入buffer, 否則將被阻塞,進而阻塞Receiver從數據源拉取數據。
/*** Push a single data item into the buffer.*/ def addData(data: Any): Unit = {if (state == Active) {waitToPush() //獲取令牌 synchronized {if (state == Active) {currentBuffer += data} else {throw new SparkException("Cannot add data as BlockGenerator has not been started or has been stopped")}}} else {throw new SparkException("Cannot add data as BlockGenerator has not been started or has been stopped")} }??????其令牌投放采用令牌桶機制進行, 原理如下圖所示:
令牌桶機制: 大小固定的令牌桶可自行以恒定的速率源源不斷地產生令牌。如果令牌不被消耗,或者被消耗的速度小于產生的速度,令牌就會不斷地增多,直到把桶填滿。后面再產生的令牌就會從桶中溢出。最后桶中可以保存的最大令牌數永遠不會超過桶的大小。當進行某操作時需要令牌時會從令牌桶中取出相應的令牌數,如果獲取到則繼續操作,否則阻塞。用完之后不用放回。
Streaming 數據流被Receiver接收后,按行解析后存入iterator中。然后逐個存入Buffer,在存入buffer時會先獲取token,如果沒有token存在,則阻塞;如果獲取到則將數據存入buffer.? 然后等價后續生成block操作。
?
轉載于:https://www.cnblogs.com/itboys/p/7797516.html
總結
以上是生活随笔為你收集整理的Spark Streaming Backpressure分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jquery ajax示例
- 下一篇: 奇偶排序