Spark Executor内幕
Spark Executor工作原理
ExecutorBackend注冊
Executor實例化
Executor具體是如何工作的?
?
CoarseGrainedExecutorBackend,粗粒度的ExecutorBackend進程。
Worker為什么要啟動另外一個進程?
Worker本身是管理當前機器上的資源,變動資源的時候向Master匯報。有很多應用程序,就需要很多Executor。這樣程序之間不會一個奔潰導致所有的都奔潰。
1.在CoarseGrainedExecutorBackend啟動時,向Driver注冊Executor其實質是注冊ExecutorBackend實例,和Executor實例之間沒有直接的關系!!!
2.CoarseGrainedExecutorBackend是Executor運行所在的進程名稱,Executor才是真正在處理Task的對象,Executor內部是通過線程池的方式來完成Task的計算的。
3. CoarseGrainedExecutorBackend和Executor是一一對應的。
4. CoarseGrainedExecutorBackend是一個消息通信體(其實現了ThreadSafeRpcEndpoint)。可以發送信息給Driver,并可以接收Driver中發過來的指令,例如啟動Task等。
5.在Driver進程中,有兩個至關重要的Endpoint,
a)第一個就是ClientEndpoint,主要負責向Master注冊當前的程序;是AppClient的內部成員。
b)另外一個就是DriverEndpoint,這是整個程序運行時候的驅動器!!是CoarseGrainedExecutorBackend的內部成員。
6.在Driver中通過ExecutorData封裝并注冊ExecutorBackend的信息到Driver的內存數據結構ExecutorMapData中。ExecutorMapData是CoarseGrainedSchedulerBackend的成員。最終是注冊給CoarseGrainedSchedulerBackend。
private[cluster]?class?ExecutorData(
???val?executorEndpoint: RpcEndpointRef,
???val?executorAddress: RpcAddress,
???override val?executorHost:?String,
???var?freeCores:?Int,
???override val?totalCores:?Int,
???override val?logUrlMap:?Map[String,?String]
)?extends?ExecutorInfo(executorHost,?totalCores,?logUrlMap)
?
7.實際在執行的時候,DriverEndpoint會把信息寫入CoarseGrainedSchedulerBackend的內存數據結構ExecutorMapData中,所以說最終是注冊給CoarseGrainedSchedulerBackend,也就是說CoarseGrainedSchedulerBackend掌握了為當前程序分配的所有的ExucutorBackend進程,而在每一個ExecutorBackend進程實例中會通過Executor對象來負責具體Task的運行。在欲行的時候使用syschronized關鍵字來保證ExecutorMapData安全的并發寫操作。
8.CoarseGrainedExecutorBackend收到DriverEndpoint發送過來的RegisteredExecutor消息后會啟動Executor實例對象,而Executor實例對象是事實上負責真正Task計算的。
?
Executor是如何工作的?
1.當Driver發送過來Task的時候,其實是發送給了CoarseGrainedExecutorBackend這個RpcEndpoint,而不是直接發送給了Executor(Executor由于不是消息循環體,所以永遠無法直接接收遠程發送過來的信息)。
case?LaunchTask(data) =>??if?(executor?==?null) {
????logError("Received LaunchTask command but executor was null")
????System.exit(1)
??}?else?{
????val?taskDesc =?ser.deserialize[TaskDescription](data.value)
????logInfo("Got assigned task "?+ taskDesc.taskId)
????executor.launchTask(this,?taskId = taskDesc.taskId,?attemptNumber = taskDesc.attemptNumber,
??????taskDesc.name,?taskDesc.serializedTask)
??}
?
2.ExecutorBackend在收到Driver中發送過來的消息后,會通過調用launchTask來交給Executor去執行。
case?LaunchTask(data) =>??if?(executor?==?null) {
????logError("Received LaunchTask command but executor was null")
????System.exit(1)
??}?else?{
????val?taskDesc =?ser.deserialize[TaskDescription](data.value)
????logInfo("Got assigned task "?+ taskDesc.taskId)
????executor.launchTask(this,?taskId = taskDesc.taskId,?attemptNumber = taskDesc.attemptNumber,
??????taskDesc.name,?taskDesc.serializedTask)
??}
??
轉載于:https://www.cnblogs.com/haoyy/p/6177974.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Spark Executor内幕的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Google Chrome调试js入门
- 下一篇: ASP.NET MVC4 路由的配置