Hadoop推测执行(以空间换取时间)
1. 背景
Speculative Task,又叫推測式任務,是指在分布式集群環境下,因為程序bug,負載不均衡或者資源分布不均,造成同一個job的多個task運行速度不一致,有的task運行速度明顯慢于其他task(比如:一個job的某個task進度只有10%,而其他所有task已經運行完畢),則這些task拖慢了作業的整體執行進度,為了避免這種情況發生,Hadoop會為該task啟動speculative task,讓該speculative task與原始task同時運行,哪個先運行完,則使用它的結果。
Speculative Task思路是以空間換時間的,同時啟動多個相同task,哪個完成的早,則采用哪個task的結果,這樣明顯可以提高任務計算速度,但是,這樣卻會占用更多的資源,在集群資源緊缺的情況下,合理的控制Speculative Task,可在多用少量資源情況下,減少大作業的計算時間。
本文主要介紹了Hadoop各個版本中Speculative Task設計思路,并指出了各自的不足及改進之處。
2. Hadoop 0.20.2和1.0.0中的算法
【算法流程】
如果一個task滿足以下條件,則會為該task啟動speculative task:
(1)該task還未啟動任何speculative task(TaskInProgress僅對應一個running的task)
(2)在60s內落后(同一個作業所有task的)平均進度20%
某個Task可能對應多個正在運行的attempt task,任何一個運行結束,則會kill(不是fail)其他task。
【相關代碼】
| 1 2 3 4 5 6 7 8 9 10 11 12 | booleanhasSpeculativeTask(longcurrentTime, doubleaverageProgress) { // these constants should be examined // in more depth eventually... // ??if(!skipping && activeTasks.size() <= MAX_TASK_EXECS && ?????(averageProgress - progress >= SPECULATIVE_GAP) && ?????????(currentTime - startTime >= SPECULATIVE_LAG) ????????????&& completes == 0&& !isOnlyCommitPending()) { ????returntrue; ??} ??returnfalse; } |
【存在問題】
以上算法可能造成以下幾個問題:
(1) 作業的某個task被卡在87.7%處(MAPREDUCE-94)
(2) 當作業將要運行結束時,總不會啟動speculative task
(3) ?各個參數不可配置(SPECULATIVE_GAP,SPECULATIVE_LAG),不夠靈活。
3. Hadoop 0.21.0中的算法
為了對Hadoop-0.20.2中的算法進行改進,Hadoop-0.21.0進行了以下優化:
(1)?添加了三個可配置選項
mapreduce.job.speculative.slownodethreshold,默認是1,用于判斷某個TaskTracker是否適合啟動某個task的speculative task
mapreduce.job.speculative.slowtaskthreshold,默認是1,用于判斷某個task是否可以啟動speculative task
mapreduce.job.speculative.speculativecap, 默認是0.1,用于限定某個job最多同時啟動的spculative task的數目
(2)?限定條件
如果一個tasktracker/job/task滿足以下條件,則會在該tasktracker上為該task啟動一個speculative task:
(1) ?Job已經啟動的specutive task數目小于SpeculativeCap
(2) 該TaskTracker上該Job的所有task平均進度不小于SlowNodeThreshold,具體判斷方法為:
tasktracker.?mean-job.progessRate >job.std*job. SlowNodeThreshold
其中,tasktracker.?Mean為該job在該tasktracker正在運行/已經運行完成的task的平均進度,job.progessRate為該作業的平均計算速度,job.std為該作業所有task計算速度的標準方差。
(3) ?按照Task剩余時間,對Task進行排序
Task剩余時間定義為:(1-progress) / progressRate,其中process為task當前進度,progressRate為task的平均計算速度:progressRate= progress/deltaTime,其中deltaTime為該task啟動以來所經歷的時間
(4) 選擇一個剩余時間最多,且?mean-progessRate >std*SlowTaskThreshold的task,并為該task啟動speculative task,,其中mean為所有task平均計算速度,std為所有task計算速度的標準方差。
(3) 存在問題
(1)MAPREDUCE-2062
當一個作業只有一個task時,所有task的計算速度的標準方差為0,因而,總會為這樣的作業啟動speculative task
如果一個作業僅有一個task正在運行,則所有task的標準方差仍未0,Hadoop很可能為其他所有task啟動speculative task。
(2)MAPREDUCE-3895
在Hadoop中,reduce task進度(對應上面的progress變量)計算很不合理,采用的方法是,將reduce task分為三個子過程:shuffle(copy),sort和reduce,各個階段占進度的1/3,比如,一個task的shuffle階段剛結束,它的進度應該是33.3%。 對于大部分作業,reduce task的進度變化并不是均勻的,一個task在某一時刻進度為33.3%,下一秒進度可能變為66.6%,這主要是因為sort階段所做工作非常少,時間很短。也正是由于以上原因,reduce task很容易被誤判速度過慢,進而啟動speculative task。一種可行的解決方案是將sort階段利用某個數學模型平滑處理。
4. 終極解決方案
拖后腿task出現的原因是系統負載不均衡和資源分配不均。尤其是在異構Hadoop集群中,拖后腿task會經常出現,而且最好不要打開speculative task功能,否則會出現大量的speculative task,造成嚴重的資源浪費,因為當前所有的speculative task解決方案均是假設集群是同構的。
為什么會造成這種問題?根本原因這種基于speculative task來解決拖后腿task是有問題的。拖后腿task最終應該通過調度器解決:每個TaskTracker通過heartbeat匯報自己機器上的資源分布情況和當前資源使用情況,比如CPU個數,CPU利用率,內存剩余量,IO負載等,然后由調度器根據當前資源使用情況,動態對任務進行調度(參考https://issues.apache.org/jira/browse/MAPREDUCE-1380),進而最大限度避免產生拖后腿task。
關于拖后腿task,還有一個需要解決的問題是,防止為某個task啟動speculative task后,該task又變成拖后腿task,比如:某個node上所有task均往一個磁盤上寫數據,且該磁盤負載很重,而此時又將一個speculative task調度到該節點上(也往該磁盤上寫數據),此時,該speculative task變得緩慢,因而有人提出了Hadoop disk scheduler,具體參考:https://issues.apache.org/jira/browse/MAPREDUCE-2636
5. 關于Speculative Task的Hadoop代碼
Hadoop中,關于推測式任務的相關代碼均在文件JobInProgress.java和TaskInProgress.java中,JobInProgress.java主要函數定義如下:
| 1 2 3 | protectedsynchronized TaskInProgress findSpeculativeTask( ??Collection<TaskInProgress> list, String taskTrackerName, ????String taskTrackerHost, TaskType taskType) {….} |
TaskInProgress.java中主要函數定義如下:
| 1 | booleancanBeSpeculated(longcurrentTime){…} |
原創文章,轉載請注明:?轉載自董的博客
本文鏈接地址:?http://dongxicheng.org/mapreduce/hadoop-speculative-task/
總結
以上是生活随笔為你收集整理的Hadoop推测执行(以空间换取时间)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop机架感知
- 下一篇: hadoop2.20+hive+sqoo