Spark HistoryServer日志解析清理异常
Spark HistoryServer日志解析&清理異常
一、背景介紹
用戶在使用 Spark 提交任務時,經常會出現任務完成后在 HistoryServer(Spark 1.6 和 Spark 2.1 HistoryServer 合并,統一由 Spark 2.1 HistoryServer 管控,因此本文的代碼分析都是基于 Spark 2.1 版本的代碼展開的)中找不到 appid 信息,尤其是對于失敗的任務,用戶無法查看日志分析任務失敗的原因。為此,特地對 Spark 2.1 HistoryServer 進行了研究,發現根本問題出在內部的兩個核心數據結構的使用存在異常導致的。
二、eventLog 日志文件及相關參數
2.1 eventLog 日志文件介紹
eventLog 是 Spark 任務在運行過程中,調用 EventLoggingListener#logEvent() 方法來輸出 eventLog 內容,Spark 中定義各種類型的事件,一旦某個事件被觸發,就會構造一個類型的 Event,然后獲取相應的運行信息并設置進去,最終將該 event 對象序列化成 json 字符串,追加到 eventLog 日志文件中。
Spark 中 eventLog 默認是不開啟的,由參數 ‘spark.history.fs.cleaner.enabled’ 來控制,開啟這個配置后,任務運行的信息就會寫到 eventLog 日志文件中,日志文件具體保存在參數 ‘spark.eventLog.dir’ 配置的目錄下。
2.2 相關配置參數
一般這些配置放在 /etc/spark2/conf/spark-defaults.conf 中。
注:但在實際自定義修改 Spark HistoryServer 配置時,spark-defaults.conf 中并沒有寫入(具體原因待看)。但可以通過查看 HistoryServer 進程使用的 spark-history-server.conf 配置查看,在 Spark HistoryServer 所在機器上,通過 ‘ps -ef |grep HistoryServer’ 查看具體配置 ‘–properties-file /run/cloudera-scm-agent/process/136253-spark2_on_yarn-SPARK2_YARN_HISTORY_SERVER/spark2-conf/spark-history-server.conf’,這里會使用自定義更新的 HistoryServer 參數。
| spark.history.retainedApplications | 50 | 在內存中保存 Application 歷史記錄的個數,如果超過這個值,舊的應用程序信息將被刪除,當再次訪問已被刪除的應用信息時需要重新構建頁面。 |
| spark.history.fs.update.interval | 10s | 指定刷新日志的時間,更短的時間可以更快檢測到新的任務以及任務執行情況,但過快會加重服務器負載。 |
| spark.history.ui.maxApplication | Int.MaxValue | 顯示在總歷史頁面中的程序的數量。如果總歷史頁面未顯示,程序 UI 仍可通過訪問其 URL 來顯示。 |
| spark.history.ui.port | 18089(Spark2.1) | 指定history-server的網頁UI端口號 |
| spark.history.fs.cleaner.enabled | false | 指定history-server的日志是否定時清除,true為定時清除,false為不清除。這個值一定設置成true啊,不然日志文件會越來越大。 |
| spark.history.fs.cleaner.interval | 1d | 定history-server的日志檢查間隔,默認每一天會檢查一下日志文件 |
| spark.history.fs.cleaner.maxAge | 7d | 指定history-server日志生命周期,當檢查到某個日志文件的生命周期為7d時,則會刪除該日志文件 |
| spark.eventLog.compress | false | 設置history-server產生的日志文件是否使用壓縮,true為使用,false為不使用。這個參數務可以成壓縮哦,不然日志文件歲時間積累會過大 |
| spark.history.retainedApplications | 50 | 在內存中保存Application歷史記錄的個數,如果超過這個值,舊的應用程序信息將被刪除,當再次訪問已被刪除的應用信息時需要重新構建頁面。 |
| spark.history.fs.numReplayThreads | ceil(cpu核數/4) | 解析 eventLog 的線程數量 |
三、eventLog 日志解析及日志清理原理
3.1 兩個定時任務
FsHistoryProvider 類在初始化時,會調用 startPolling() 方法,來啟動兩個定時任務,即日志文件解析任務和日志文件清理任務,兩個任務均是由獨立線程執行。當然,日志文件清理任務是否開啟是由參數 spark.history.fs.cleaner.enabled 控制(默認為 false,線上環境為 true,即開啟了日志文件清理任務)。
//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scalaprivate[history] def initialize(): Thread = {if (!isFsInSafeMode()) {// 兩個定時任務啟動入口startPolling()null} else {startSafeModeCheckThread(None)}}private def startPolling(): Unit = {// Validate the log directory.val path = new Path(logDir)// Disable the background thread during tests.if (!conf.contains("spark.testing")) {// A task that periodically checks for event log updates on disk.logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")// 日志文件解析線程pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {// A task that periodically cleans event logs on disk.// 日志文件清理線程pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)}} else {logDebug("Background update thread disabled for testing")}}3.2 eventLog 日志文件解析原理
3.2.1 關鍵數據結構
在介紹日志解析前,先來看看兩個關鍵的數據結構。fileToAppInfo 和 applications。
fileToAppInfo 結構用于保存日志目錄 /user/spark/spark2ApplicationHistory/ 下每一條 eventLog 日志文件。每次 HDFS 目錄下新生成的文件都會更新到該數據結構。
val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]()applications 結構用于保存每個 App 對應的所有 AppAttempt 運行或完成的日志信息。
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = new mutable.LinkedHashMap()舉個例子:HDFS 日志目錄下有同一個 App 的兩個 eventLog 文件。
/user/spark/spark2ApplicationHistory/application_1599034722009_10003548_1 /user/spark/spark2ApplicationHistory/application_1599034722009_10003548_2此時,fileToAppInfo 保存的數據格式為:(兩條記錄)
<'/user/spark/spark2ApplicationHistory/application_1599034722009_10003548_1', AttemptInfo> <'/user/spark/spark2ApplicationHistory/application_1599034722009_10003548_2', AttemptInfo>而 applications 保存的數據格式為:(一條記錄)
<'application_1599034722009_10003548', HistoryInfo<Attemp1, Attempt2>>3.2.2 日志文件解析流程
eventLog 日志文件一次完整解析的流程大概分為以下幾個步驟:
源碼分析如下:
這段代碼主要是前兩個步驟的介紹,定期掃描日志目錄(定期時間由參數 spark.history.fs.update.interval 控制,線上環境為 30s),將文件大小有增加和新生成的文件保存在 logInfos 對象中。然后將新文件放到
replayExecutor 線程池中執行,該線程池大小默認為 機器cpu核數/4,由參數 spark.history.fs.numReplayThreads 控制,線上環境為 50。
//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scalaprivate[history] def checkForLogs(): Unit = {try {val newLastScanTime = getNewLastScanTime()logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Seq[FileStatus]())// logInfos 保存所有新的 eventLog 文件(包括大小增加的和新生成的文件)// filter:過濾出新的日志文件// flatMap:過濾空的entry對象// sortWith:根據日志文件更新時間降序排序val logInfos: Seq[FileStatus] = statusList.filter { entry =>try {val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)!entry.isDirectory() &&!entry.getPath().getName().startsWith(".") &&prevFileSize < entry.getLen()} catch {case e: AccessControlException =>logDebug(s"No permission to read $entry, ignoring.")false}}.flatMap { entry => Some(entry) }.sortWith { case (entry1, entry2) =>entry1.getModificationTime() >= entry2.getModificationTime()}if (logInfos.nonEmpty) {logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")}var tasks = mutable.ListBuffer[Future[_]]()try {for (file <- logInfos) {// 對掃描出來的文件進行解析tasks += replayExecutor.submit(new Runnable {override def run(): Unit = mergeApplicationListing(file)})}} catch {case e: Exception =>logError(s"Exception while submitting event log for replay", e)}... //省略}第三步流程主要在 mergeApplicationListing() 方法中處理。先來看看 fileToAppInfo 結構如何更新,這里的關鍵是 replay() 方法,這里會對 eventLog 進行初步解析,然后將解析后的內容更新到 fileToAppInfo 中。
//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scalaprivate def mergeApplicationListing(fileStatus: FileStatus): Unit = {// 函數監聽兩個事件:作業開始和作業結束val newAttempts = try {val eventsFilter: ReplayEventsFilter = { eventString =>eventString.startsWith(APPL_START_EVENT_PREFIX) ||eventString.startsWith(APPL_END_EVENT_PREFIX)}val logPath = fileStatus.getPath()val appCompleted = isApplicationCompleted(fileStatus)// UI 查看的關鍵,對 eventLog 日志文件進行解析回放val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter)// 根據解析的結果構建 FsApplicationAttemptInfo 對象if (appListener.appId.isDefined) {val attemptInfo = new FsApplicationAttemptInfo(logPath.getName(),appListener.appName.getOrElse(NOT_STARTED),appListener.appId.getOrElse(logPath.getName()),appListener.appAttemptId,appListener.startTime.getOrElse(-1L),appListener.endTime.getOrElse(-1L),fileStatus.getModificationTime(),appListener.sparkUser.getOrElse(NOT_STARTED),appCompleted,fileStatus.getLen())// 更新 fileToAppInfo 結構fileToAppInfo(logPath) = attemptInfologDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo")Some(attemptInfo)} else {logWarning(s"Failed to load application log ${fileStatus.getPath}. " +"The application may have not started.")None}}... // 省略 }那 applications 結構又是如何更新的呢?主要是先找出新的 App 對象,將舊的 App 列表和新的 App 列表進行合并,生成新的對象,并更新到 applications 中。
//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scalaprivate def mergeApplicationListing(fileStatus: FileStatus): Unit = {... // 省略val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()// 多線程同時更新 applications 對象,這里用 synchronized 實現同步訪問該對象applications.synchronized {// newAttempts 對象是剛才解析 eventLog 構造的 FsApplicationAttemptInfo 對象列表// 這一步的目的就是要過濾出剛才新生成的App對象,并更新已存在但大小有增加的App對象newAttempts.foreach { attempt =>val appInfo = newAppMap.get(attempt.appId).orElse(applications.get(attempt.appId)).map { app =>val attempts =app.attempts.filter(_.attemptId != attempt.attemptId) ++ List(attempt)new FsApplicationHistoryInfo(attempt.appId, attempt.name,attempts.sortWith(compareAttemptInfo))}.getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))newAppMap(attempt.appId) = appInfo}val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {if (!mergedApps.contains(info.id)) {mergedApps += (info.id -> info)}}// mergedApps 對象用于保存已有App對象和新生成的App對象進行合并后結果,產生最新的 applications 對象val newIterator = newApps.iterator.bufferedval oldIterator = applications.values.iterator.bufferedwhile (newIterator.hasNext && oldIterator.hasNext) {if (newAppMap.contains(oldIterator.head.id)) {oldIterator.next()} else if (compareAppInfo(newIterator.head, oldIterator.head)) {addIfAbsent(newIterator.next())} else {addIfAbsent(oldIterator.next())}}newIterator.foreach(addIfAbsent)oldIterator.foreach(addIfAbsent)applications = mergedApps} }3.3 eventLog 日志清理原理
了解了前面 fileToAppInfo 和 applications 數據結構,日志清理的原理相對而言就簡單很多,主要是對 applications 對象進行處理。
日志清理大致流程如下:
源碼分析:
//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scalaprivate[history] def cleanLogs(): Unit = {try {// 1、獲取 eventLog 保存的生命周期時間val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000val now = clock.getTimeMillis()val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()// 判斷函數:超過生命周期并完成(后綴不是 .inprogress 結束)的任務可以正常清理def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {now - attempt.lastUpdated > maxAge && attempt.completed}// 2、掃描 applications 對象,將超過生命周期待清理的 eventLog 保存在 attemptsToClean 對象中,未超過的保存在 appsToRetain 對象中applications.values.foreach { app =>val (toClean, toRetain) = app.attempts.partition(shouldClean)attemptsToClean ++= toCleanif (toClean.isEmpty) {appsToRetain += (app.id -> app)} else if (toRetain.nonEmpty) {appsToRetain += (app.id ->new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList))}}// 3、更新 applications 對象applications = appsToRetainval leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]// 4、調用 HDFS api 執行真正的清理操作attemptsToClean.foreach { attempt =>try {fs.delete(new Path(logDir, attempt.logPath), true)} catch {case e: AccessControlException =>logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")case t: IOException =>logError(s"IOException in cleaning ${attempt.logPath}", t)leftToClean += attempt}}// 沒有正常清理的對象重新更新到 attemptsToClean 中attemptsToClean = leftToClean} catch {case t: Exception => logError("Exception in cleaning logs", t)}}四、原因分析&解決方案
上面日志解析和日志清理的邏輯都依賴 fileToAppInfo 和 applications 對象,Spark HistoryServer UI 界面展示的內容也是依賴這兩個對象,所以,UI 無法加載任務信息也是由于這里的數據結構出現了多線程訪問的線程安全問題。
4.1 HashMap 線程同步問題&解決方案
4.1.1 原因分析
fileToAppInfo 對象是 FsHistoryProvider 類的一個對象,數據結構采用 HashMap,是線程不安全的對象,但在多線程調用 mergeApplicationListing() 方法操作 fileToAppInfo 對象并不是同步訪問,導致每次載入所有 eventLog 日志文件,會出現不能保證所有文件都能被正常加載。那為什么會出現這種情況呢?其實就是多線程訪問同一個對象時經常出現的一個問題。
下圖是多線程訪問同一對象帶來的線程安全問題的一個簡單例子:
- 當線程 1 執行 x++ 后將結果更新到內存中,內存中此時 x=1,沒有問題。
- 但由于線程 1 在讀內存數據時線程 2 同時也讀取內存中 x 的值,當線程 2 執行 x++ 后,將結果更新到內存中,此時內存中 x 的值還是 1。
- 而預期的結果是 x = 2,這種情況便是多線程訪問同一對象的線程安全問題。
多線程訪問同一對象帶來的線程安全問題
4.1.2 解決方案
HashMap 對象帶來的線程安全問題,解決方法比較簡單,用 ConcurrentHashMap 替代即可。參考 patch:SPARK-21223。
var fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]()4.2 Synchronized 鎖同步問題
4.2.1 原因分析
在 Spark HistoryServer 中,applications 更新的玩法是這樣的:
//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]= new mutable.LinkedHashMap()applications.synchronized {... // 省略val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()... // 省略更新 mergedApps 的值applications = mergedApps }咋一看,這樣使用 synchronized 鎖住 applications 對象似乎沒什么問題。但其實是有問題的,我們先來看一個例子。
class Synchronized {private List aList = new ArrayList();public void anyObject1() {// 和 HistoryServer 玩法一致,鎖住 aList 對象,代碼塊中用 aList2 更新 aList 對象值synchronized (aList) {List aList2 = new ArrayList();for (int i = 0; i < 10; i++) {System.out.println("anyObject" + "-" + Thread.currentThread());aList2.add(1);}aList = aList2;System.out.println("aList =" + aList.size());}} }public class SynchronizedDemo01 {public static void main(String[] args) {SynchronizedDemo01 syn = new SynchronizedDemo01();syn.anyObjTest();}public void anyObjTest() {final Synchronized syn = new Synchronized();// 啟動5個線程去操作aList對象,每次打印10條記錄for (int i = 0; i < 5; i++) {new Thread() {@Overridepublic void run() {syn.anyObject1();}}.start();}} }運行結果:(隨機多運行幾次) anyObject-Thread[Thread-3,5,main] anyObject-Thread[Thread-2,5,main] anyObject-Thread[Thread-3,5,main] anyObject-Thread[Thread-2,5,main] anyObject-Thread[Thread-3,5,main] aList =10 anyObject-Thread[Thread-3,5,main] anyObject-Thread[Thread-3,5,main] anyObject-Thread[Thread-3,5,main] anyObject-Thread[Thread-3,5,main] anyObject-Thread[Thread-3,5,main] aList =10 anyObject-Thread[Thread-4,5,main] anyObject-Thread[Thread-4,5,main] anyObject-Thread[Thread-4,5,main]通過這個例子,可以看出 Thread-3 在 Thread-2 線程中打印了信息,也就是說通過這種方式鎖住 synchronized(aList 對象)(非 this 對象)是有問題的,線程并沒有真正的鎖住 aList 對象。那為什么會出現這種情況呢?我們接著看。
https://blog.csdn.net/weixin_42762133/article/details/103241439 這篇文章給出了 Synchronized 鎖幾種使用場景。
| 方法 | 實例方法 | 當前對象實例(即方法調用者) |
| 靜態方法 | 類對象 | |
| 代碼塊 | this | 當前對象實例(即方法調用者) |
| class 對象 | 類對象 | |
| 任意 Object 對象 | 當前對象實例(即方法調用者) |
這里重點介紹下 synchronized 修飾目標為 this 和任意 Object 對象這兩種情況。要理解他們之間的區別,就需要搞清楚 synchronized 到底鎖住的是什么?在 https://juejin.im/post/6844903872431915022 這篇文章中,介紹了 synchronized 鎖住的內容有兩種,一種是類,另一種是對象實例。這里的關鍵就在于第二種情況,當使用 synchronized 鎖住的是對象實例時,HistoryServer 和上面 aList 的例子那就有問題了,怎么說呢?我們來看看下面這張圖。
Synchronized 鎖住的對象示意圖
通過這張圖就一目了然,synchronized(aList) 代碼塊鎖住的是 aList 對象指向的堆中的對象實例,當在代碼塊中通過 aList = aList2 賦值后,aList 便指向的新的對象實例,導致原來的對象實例變成了無主狀態,synchronized(aList) 代碼塊的鎖其實也就失去了意義。所以才會出現線程安全的問題。
上面那段測試代碼如果采用 synchronized(this) 則不會出現多線程錯亂打印的情況,為什么呢?通過上表中我們知道 synchronized(this) 的鎖是當前對象實例,即方法的調用者,在測試代碼中也就是 "SynchronizedDemo01 syn = new SynchronizedDemo01(); " 這里創建 syn 對象實例,在內存中的表現為:
Synchronized 對象堆內表現示意圖
使用 synchronized(this) 之所以不會出問題,是由于不管 aList 指向哪個對象實例,this 對象(即 syn 對象)指向的對象實例始終沒有變,所以多線程訪問 aList 不會出現線程安全問題。
至此,HistoryServer 中的那段代碼塊是有問題的,并不能實現 applications 對象的多線程安全訪問。
4.2.2 解決方案
分析清楚了具體原因后,解決方法就比較容易了,將那段代碼的 synchronized 鎖住的對象從 applications 對象改成 this 對象即可。
//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]= new mutable.LinkedHashMap()this.synchronized {... // 省略val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()... // 省略更新 mergedApps 的值applications = mergedApps }4.3.3 一點小擴展
上面解決了 synchronized 鎖住 applications 非 this 對象的問題,那 Spark 中為什么不直接用 this 對象呢?這里還是有一點小竅門的。那就是 synchronzied(this) 比 Synchronized(非this) 的效率要低一些,為什么這么說呢?來看兩個例子。
例子1:兩個線程使用同一個對象分別訪問 synchronized 方法和 synchronized(str) 代碼塊。
結論:兩個線程是異步執行的,Thread1 鎖住的 ‘str’ Object 對象實例,而 Thread2 鎖住的是 service 對象實例,互不影響。
public class SynchronizedDemo02 {static Service service = new Service();public static void main(String[] args) {new Thread () {@Overridepublic void run() {service.method1();}}.start();new Thread () {@Overridepublic void run() {service.method2();}}.start();} }class Service {String str = "test";public void method1() {synchronized (str) {System.out.println("method1 begin");try {Thread.sleep(1000);}catch (Exception e) {e.printStackTrace();}System.out.println("method1 end");}}public synchronized void method2() {System.out.println("method2 begin");try {Thread.sleep(1000);}catch (Exception e) {e.printStackTrace();}System.out.println("method2 end");} }結果輸出: method1 begin method2 begin method1 end method2 end例子2:兩個線程使用同一個對象分別訪問 synchronized 方法和 synchronized(this) 代碼塊。
結論:兩個線程同步執行,鎖住的是同一個 this 對象(即 service 對象),必須一個線程執行完才能執行另一個線程。
public class SynchronizedDemo02 {static Service service = new Service();public static void main(String[] args) {new Thread () {@Overridepublic void run() {service.method1();}}.start();new Thread () {@Overridepublic void run() {service.method2();}}.start();} }class Service {String str = "test";public void method1() {synchronized (this) {System.out.println("method1 begin");try {Thread.sleep(1000);}catch (Exception e) {e.printStackTrace();}System.out.println("method1 end");}}public synchronized void method2() {System.out.println("method2 begin");try {Thread.sleep(1000);}catch (Exception e) {e.printStackTrace();}System.out.println("method2 end");} }結果輸出: method1 begin method1 end method2 begin method2 end所以,采用 synchronized(非 this 對象) 會減少當前對象鎖與其他 synchorinzed(this) 代碼塊或 synchronized 方法之間的鎖競爭,與其他 synchronized 代碼異步執行,互不影響,會提高代碼的執行效率。
【參考資料】
- https://blog.csdn.net/u013332124/article/details/88350345 (HistoryServer 原理介紹)
- https://issues.apache.org/jira/browse/SPARK-21223 (fileToAppInfo HashMap 線程安全解決 patch)
- https://blog.csdn.net/winterking3/article/details/83858782 (Synchronized 非 this 對象同步代碼塊)
- https://juejin.im/post/6844903872431915022 (Synchronized 到底鎖的什么?)
總結
以上是生活随笔為你收集整理的Spark HistoryServer日志解析清理异常的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解决Exception in threa
- 下一篇: Spark常用RDD算子 - saveA