Apache Spark中的自定义日志
您是否曾經(jīng)對(duì)運(yùn)行了幾個(gè)小時(shí)的Spark作業(yè)感到沮喪,但由于基礎(chǔ)設(shè)施問題而失敗了。
您會(huì)很晚才知道此故障,并浪費(fèi)了數(shù)小時(shí)的時(shí)間,當(dāng)Spark UI日志也無法用于事后檢查時(shí),它會(huì)更加痛苦。
你不是一個(gè)人!
在這篇文章中,我將介紹如何啟用與Spark logger搭配使用的自定義記錄器。
該定制記錄器將收集從被動(dòng)監(jiān)視到主動(dòng)監(jiān)視所需的所有信息。
無需為此設(shè)置額外的日志記錄。
Spark 2.X基于Slf4j抽象,并且使用了logback綁定。
讓我們從基本的日志記錄開始,即如何在Spark作業(yè)或應(yīng)用程序中獲取記錄器實(shí)例。
val _LOG = LoggerFactory.getLogger(this.getClass.getName)就是這么簡(jiǎn)單,現(xiàn)在您的應(yīng)用程序使用的是與Spark基于相同的日志庫和設(shè)置。
現(xiàn)在要做一些更有意義的事情,我們必須注入自定義記錄器,該記錄器將收集信息并將其寫入彈性搜索或發(fā)布到某些REST端點(diǎn)或發(fā)送警報(bào)。
讓我們一步一步去做
構(gòu)建自定義日志附加程序
由于spark 2.X是基于logback的,因此我們必須編寫logback logger。
自定義登錄記錄器的代碼段
class MetricsLogbackAppender extends UnsynchronizedAppenderBase[ILoggingEvent] {override def append(e: ILoggingEvent) = {//Send this message to elastic search or REST end pointmessageCount.compute(Thread.currentThread().getName, mergeValue)System.out.println(messageCount + " " + e)}val messageCount = new ConcurrentHashMap[String, AtomicInteger]()val mergeValue = new BiFunction[String, AtomicInteger, AtomicInteger] {def apply(key: String, currentValue: AtomicInteger) = {val nextValue = currentValue match {case null => new AtomicInteger(0)case _ => currentValue}nextValue.incrementAndGet()nextValue}}}這是一個(gè)非常簡(jiǎn)單的記錄器,它按線程統(tǒng)計(jì)消息,您要做的所有事情都將覆蓋附加函數(shù)。
這種類型的記錄器可以執(zhí)行任何操作,例如寫入數(shù)據(jù)庫或發(fā)送到REST端點(diǎn)或發(fā)出警報(bào)。
啟用記錄器
要使用新的記錄器,請(qǐng)創(chuàng)建logback.xml文件并為新的記錄器添加條目。
該文件可以打包在Shaded jar中,也可以指定為運(yùn)行時(shí)參數(shù)。
樣本logback.xml
<configuration><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><!-- encoders are assigned the typech.qos.logback.classic.encoder.PatternLayoutEncoder by default --><encoder><pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern></encoder></appender><appender name="METRICS" class="micro.logback.MetricsLogbackAppender"/><root level="info"><appender-ref ref="STDOUT" /></root><logger level="info" name="micro" additivity="true"><appender-ref ref="METRICS" /></logger><logger level="info" name="org.apache.spark.scheduler.DAGScheduler" additivity="true"><appender-ref ref="METRICS" /></logger></configuration>此配置文件將MetricsLogbackAppender添加為METRICS
<appender name="METRICS" class="micro.logback.MetricsLogbackAppender"/>接下來為應(yīng)使用此功能的包/類啟用它
<logger level="info" name="micro" additivity="true"> <appender-ref ref="METRICS" /></logger> <logger level="info" name="org.apache.spark.scheduler.DAGScheduler" additivity="true"> <appender-ref ref="METRICS" /></logger大功告成!
從'micro'包或DAGScheduler類記錄的任何消息都將使用new logger。
使用這種技術(shù),執(zhí)行者日志也可以被捕獲,當(dāng)Spark作業(yè)在成百上千的執(zhí)行者上運(yùn)行時(shí),這變得非常有用。
現(xiàn)在,它提供了許多讓BI實(shí)時(shí)顯示所有這些消息的選項(xiàng),允許團(tuán)隊(duì)提出一些有趣的問題或在情況不佳時(shí)訂閱變更。
警告:請(qǐng)確保此新記錄器減慢了應(yīng)用程序的執(zhí)行速度,建議使其異步。
在正確的時(shí)間獲取見解并將其付諸實(shí)踐
此博客中使用的代碼在github中的@sparkmicroservices回購中可用。
我有興趣知道您正在為Spark使用哪種日志記錄模式。
翻譯自: https://www.javacodegeeks.com/2018/05/custom-logs-in-apache-spark.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的Apache Spark中的自定义日志的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hadoop emr_在Amazon E
- 下一篇: 皂苷怎么读 皂苷的拼音是什么