Spark ListenerBus 和 MetricsSystem 体系分析
生活随笔
收集整理的這篇文章主要介紹了
Spark ListenerBus 和 MetricsSystem 体系分析
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
轉(zhuǎn)載自:https://yq.aliyun.com/articles/60196
摘要:?Spark 事件體系的中樞是ListenerBus,由該類接受Event并且分發(fā)給各個(gè)Listener。MetricsSystem 則是一個(gè)為了衡量系統(tǒng)的各種指標(biāo)的度量系統(tǒng)。Listener可以是MetricsSystem的信息來(lái)源之一。他們之間總體是一個(gè)互相補(bǔ)充的關(guān)系。
前言
監(jiān)控是一個(gè)大系統(tǒng)完成后最重要的一部分。Spark整個(gè)系統(tǒng)運(yùn)行情況是由ListenerBus以及MetricsSystem 來(lái)完成的。這篇文章重點(diǎn)分析他們之間的工作機(jī)制以及如何通過(guò)這兩個(gè)系統(tǒng)完成更多的指標(biāo)收集。ListenerBus 是如何工作的
Spark的事件體系是如何工作的呢?我們先簡(jiǎn)要描述下,讓大家有個(gè)大概的了解。 首先,大部分類都會(huì)引入一個(gè)對(duì)象叫l(wèi)istenerBus,這個(gè)類具體是什么得看實(shí)現(xiàn),但是都一定繼承自org.apache.spark.util.ListenerBus. 假設(shè)我們要提交一個(gè)任務(wù)集。這個(gè)動(dòng)作可能會(huì)很多人關(guān)心,我就是使用listenerBus把Event發(fā)出去,類似下面的第二行代碼。 def submitJobSet(jobSet: JobSet) {listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))logInfo("Added jobs for time " + jobSet.time)} listenerBus里已經(jīng)注冊(cè)了很多監(jiān)聽(tīng)者,我們叫l(wèi)istener,通常listenerBus 會(huì)啟動(dòng)一個(gè)線程異步的調(diào)用這些listener去消費(fèi)這個(gè)Event。而所謂的消費(fèi),其實(shí)就是觸發(fā)事先設(shè)計(jì)好的回調(diào)函數(shù)來(lái)執(zhí)行譬如信息存儲(chǔ)等動(dòng)作。? 這就是整個(gè)listenerBus的工作方式。這里我們看到,其實(shí)類似于埋點(diǎn),這是有侵入性的,每個(gè)你需要關(guān)注的地方,如果想讓人知曉,就都需要發(fā)出一個(gè)特定的Event。 ListenerBus 分析
特定實(shí)現(xiàn) < AsynchronousListenerBus < ListenerBus
特定實(shí)現(xiàn) < SparkListenerBus < ListenerBus 這里的特定實(shí)現(xiàn)有:
* StreamingListenerBus extends AsynchronousListenerBus * LiveListenerBus extends AsynchronousListenerBus with SparkListenerBus* ReplayListenerBus extends SparkListenerBus AsynchronousListenerBus 內(nèi)部維護(hù)了一個(gè)queue,事件都會(huì)先放到這個(gè)queue,然后通過(guò)一個(gè)線程來(lái)讓Listener處理Event。 SparkListenerBus 也是一個(gè)trait,但是里面有個(gè)具體的實(shí)現(xiàn),預(yù)先定義了onPostEvent 方法對(duì)一些特定的事件做了處理。 其他更下面的類則根據(jù)需要混入或者繼承SparkListenerBus ,AsynchronousListenerBus來(lái)完成他們需要的功能。 不同的ListenerBus 需要不同的Event 集 和Listener,比如你看StreamingListenerBus的簽名,就知道所有的Event都必須是StreamingListenerEvent,所有的Listener都必須是StreamingListener。 StreamingListenerBus extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent] Listener(監(jiān)聽(tīng)器)
通常而言,Listener 是有狀態(tài)的,一般接受到一個(gè)Event后,可能就會(huì)更新內(nèi)部的某個(gè)數(shù)據(jù)結(jié)構(gòu)。以 org.apache.spark.streaming.ui.StreamingJobProgressListener為例,他是一個(gè)StreamingListener,內(nèi)部就含有一些存儲(chǔ)結(jié)構(gòu),譬如: private val waitingBatchUIData = new HashMap[Time, BatchUIData]private val runningBatchUIData = new HashMap[Time, BatchUIData] 看申明都是普通的 HashMap ,所以操作是需要做synchronized操作。如下
override def onReceiverError(receiverError: StreamingListenerReceiverError) {synchronized {receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo}} MetricsSystem介紹
MetricsSystem 比較好理解,一般是為了衡量系統(tǒng)的各種指標(biāo)的度量系統(tǒng)。算是一個(gè)key-value形態(tài)的東西。舉個(gè)比較簡(jiǎn)單的例子,我怎么把當(dāng)前JVM相關(guān)信息展示出去呢?做法自然很多,通過(guò)MetricsSystem就可以做的更標(biāo)準(zhǔn)化些,具體方式如下:- Source 。數(shù)據(jù)來(lái)源。比如對(duì)應(yīng)的有org.apache.spark.metrics.source.JvmSource
- Sink。 ?數(shù)據(jù)發(fā)送到哪去。有被動(dòng)和主動(dòng)。一般主動(dòng)的是通過(guò)定時(shí)器來(lái)完成輸出,譬如CSVSink,被動(dòng)的如MetricsServlet等需要被用戶主動(dòng)調(diào)用。
- 橋接Source 和Sink的則是MetricRegistry了。
如何配置MetricsSystem
MetricsSystem的配置有兩種,第一種是 metrics.properties 配置文件的形態(tài)。第二種是通過(guò)spark conf完成,參數(shù)以spark.metrics.conf.開(kāi)頭 。 我這里簡(jiǎn)單介紹下第二種方式。 比如我想查看JVM的信息,包括GC和Memory的使用情況,則我通過(guò)類似? conf.set("spark.metrics.conf.driver.source.jvm.class","org.apache.spark.metrics.source.JvmSource") 默認(rèn)情況下,MetricsSystem 配置了一個(gè)全局的Sink,MetricsServlet。所以你添加的任何Source 都可以通過(guò)一個(gè)path /metrics/json獲取到。如果你的程序設(shè)置做了上面的設(shè)置,把你的spark-ui的路徑換成/metrics/json,就能看到j(luò)vm源的一些信息了。 通常,如果你要實(shí)現(xiàn)一個(gè)自定義的Source,可以遵循如下步驟(這里以JvmSource為例)。 -- 創(chuàng)建一個(gè)Source private[spark] class JvmSource extends Source {override val sourceName = "jvm"override val metricRegistry = new MetricRegistry()metricRegistry.registerAll(new GarbageCollectorMetricSet)metricRegistry.registerAll(new MemoryUsageGaugeSet)
} 其中 sourceName 是為了給配置用的,比如上面我們?cè)O(shè)置
spark.metrics.conf.driver.source.jvm.class 里面的jvm 就是JvmSource里設(shè)置的sourceName 每個(gè)Source 一般會(huì)自己構(gòu)建一個(gè)MetricRegistry。上面的例子,具體的數(shù)據(jù)收集工作是由GarbageCollectorMetricSet,MemoryUsageGaugeSet完成的。 具體就是寫一個(gè)類繼承com.codahale.metrics.MetricSet,然后實(shí)現(xiàn)Map<String, Metric> getMetrics() 方法就好。 接著通過(guò)metricRegistry.registerAll將寫好的MetricSet注冊(cè)上就行。 -- 添加配置 conf.set("spark.metrics.conf.driver.source.jvm.class","org.apache.spark.metrics.source.JvmSource") -- 調(diào)用結(jié)果 將Spark UI 的地址換成/metrics/json,就能看到輸出結(jié)果了。當(dāng)然,這里是因?yàn)槟J(rèn)系統(tǒng)默認(rèn)提供了一個(gè)Sink實(shí)現(xiàn):org.apache.spark.metrics.sink.MetricsServlet,你可以自己實(shí)現(xiàn)一個(gè)。 如何定制更多的監(jiān)控指標(biāo)
通過(guò)之前我寫的Spark UI (基于Yarn) 分析與定制,你應(yīng)該學(xué)會(huì)了如何添加新的頁(yè)面到Spark UI上。 而這通過(guò)這一片文章,你應(yīng)該了解了數(shù)據(jù)來(lái)源有兩個(gè):- 各個(gè)Listener
- MetricsSystem
- 你需要監(jiān)控新的事件,那么你需要添加新的ListenerBus,Listener,Event,然后到你需要的地方去埋點(diǎn)(post事件)。這肯定需要修改spark-core里的代碼了。
- 你需要呈現(xiàn)現(xiàn)有的listener或者已知對(duì)象的變量,則使用MetricsSystem,定義一個(gè)新的Source 即可。
轉(zhuǎn)載于:https://www.cnblogs.com/itboys/p/9153091.html
總結(jié)
以上是生活随笔為你收集整理的Spark ListenerBus 和 MetricsSystem 体系分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。