Spark PersistenceEngine持久化引擎与领导选举代理机制内核原理深入剖析-Spark商业环境实战...
本套系列博客從真實商業環境抽取案例進行總結和分享,并給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸作者(秦凱新)所有,禁止轉載,歡迎學習。
- Spark商業環境實戰-Spark內置框架rpc通訊機制及RpcEnv基礎設施
- Spark商業環境實戰-Spark事件監聽總線流程分析
- Spark商業環境實戰-Spark存儲體系底層架構剖析
- Spark商業環境實戰-Spark底層多個MessageLoop循環線程執行流程分析
- Spark商業環境實戰-Spark一級資源調度Shedule機制及SpreadOut模式源碼深入剖析
- Spark商業環境實戰-Spark二級調度系統Stage劃分算法和最佳任務調度細節剖析
- Spark商業環境實戰-Spark任務延遲調度及調度池Pool架構剖析
- Spark商業環境實戰-Task粒度的緩存聚合排序結構AppendOnlyMap詳細剖析
- Spark商業環境實戰-ExternalSorter 外部排序器在Spark Shuffle過程中設計思路剖析
- Spark商業環境實戰-ShuffleExternalSorter外部排序器在Spark Shuffle過程中的設計思路剖析
- Spark商業環境實戰-Spark ShuffleManager內存緩沖器SortShuffleWriter設計思路剖析
- Spark商業環境實戰-Spark ShuffleManager內存緩沖器UnsafeShuffleWriter設計思路剖析
- Spark商業環境實戰-Spark ShuffleManager內存緩沖器BypassMergeSortShuffleWriter設計思路剖析
- Spark商業環境實戰-Spark Shuffle 核心組件BlockStoreShuffleReader內核原理深入剖析
- Spark商業環境實戰-Spark Shuffle 管理器SortShuffleManager內核原理深入剖析
- Spark商業環境實戰-Spark PersistenceEngine持久化引擎高可用機制內核原理深入剖析
- Spark商業環境實戰-StreamingContext啟動流程及Dtream 模板源碼剖析
- Spark商業環境實戰-ReceiverTracker 啟動過程及接收器 receiver RDD 任務提交機制源碼剖析
- Spark商業環境實戰-SparkStreaming數據流從Batch到Block定時轉化過程源碼深度剖析
- Spark商業環境實戰-SparkStreaming之JobGenerator周期性任務數據處理邏輯源碼深度剖析
- [Spark商業環境實戰-SparkStreaming Graph 處理鏈迭代過程源碼深度剖析]
- [Spark商業環境實戰-JobGenerator 數據清理流程源碼深度剖析]
- [Spark商業環境實戰-SparkStreaming 容錯機制源碼深度剖析]
- [Spark商業環境實戰-SparkStreaming 之No Receiver方式基于Kafka 拉取內幕源碼深度剖析]
- [Spark商業環境實戰-SparkStreaming 反壓機制控制消費速率內幕源碼深度剖析]
1 PersistenceEngine持久化引擎
1.1 PersistenceEngine的啟動
-
選擇故障恢復機制,主要有ZOOKEEPER 和 FILESYSTEM 。
private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE") 復制代碼 -
PersistenceEngine 的初始化是放在Master的onStart()方法中,用于初始化持久化引擎。
val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {case "ZOOKEEPER" =>logInfo("Persisting recovery state to ZooKeeper")val zkFactory =new ZooKeeperRecoveryModeFactory(conf, serializer)(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))case "FILESYSTEM" =>val fsFactory =new FileSystemRecoveryModeFactory(conf, serializer)(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))case "CUSTOM" =>val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]).newInstance(conf, serializer).asInstanceOf[StandaloneRecoveryModeFactory](factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))case _ =>(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}persistenceEngine = persistenceEngine_leaderElectionAgent = leaderElectionAgent_ 復制代碼
1.2 PersistenceEngine的功能
- PersistenceEngine主要用于當Master發生故障時,來讀取持久化的Application,Worker,Driver的詳細信息。
- PersistenceEngine同樣負責寫入持久化Application,Worker,Driver的詳細信息。
(1)PersistenceEngine 的調用時機:
- 在新的Application注冊之前。
- 在新的Worker注冊之前。
- 在removeApplication和removeWorker方法被調用的時候
舉例如下:
persistenceEngine.removeWorker(worker) 復制代碼1.3 PersistenceEngine的抽象模板,也即調用時機
abstract class PersistenceEngine {/*** Defines how the object is serialized and persisted. Implementation will* depend on the store used.*/def persist(name: String, obj: Object): Unit/*** Defines how the object referred by its name is removed from the store.*/def unpersist(name: String): Unit/*** Gives all objects, matching a prefix. This defines how objects are* read/deserialized back.*/def read[T: ClassTag](prefix: String): Seq[T]final def addApplication(app: ApplicationInfo): Unit = {persist("app_" + app.id, app)}final def removeApplication(app: ApplicationInfo): Unit = {unpersist("app_" + app.id)}final def addWorker(worker: WorkerInfo): Unit = {persist("worker_" + worker.id, worker)}final def removeWorker(worker: WorkerInfo): Unit = {unpersist("worker_" + worker.id)}final def addDriver(driver: DriverInfo): Unit = {persist("driver_" + driver.id, driver)}final def removeDriver(driver: DriverInfo): Unit = {unpersist("driver_" + driver.id)}/*** Returns the persisted data sorted by their respective ids (which implies that they're* sorted by time of creation).*/final def readPersistedData(rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {rpcEnv.deserialize { () =>(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))}}def close() {} } 復制代碼1.4 PersistenceEngine 的基于文件系統持久化和基于Zookeeper的持久化
-
基于文件系統持久化FileSystemPersistenceEngine
private def serializeIntoFile(file: File, value: AnyRef) {val created = file.createNewFile()if (!created) { throw new IllegalStateException("Could not create file: " + file) }val fileOut = new FileOutputStream(file)var out: SerializationStream = nullUtils.tryWithSafeFinally {out = serializer.newInstance().serializeStream(fileOut)out.writeObject(value)} {fileOut.close()if (out != null) {out.close()}}} 復制代碼 -
基于Zookeeper的持久化ZooKeeperPersistenceEngine
Curator是Netflix公司開源的Zookeeper客戶端,注意這里會把ApplicationInfo,WorkerInfo,DriverInfo等數據通過ZooKeeperPersistenceEngine將數據存儲到Zookeeper的不同Znode節點上。
這里Zookeeper能撐得住嗎??疑問
private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status" private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)private def serializeIntoFile(path: String, value: AnyRef) {val serialized = serializer.newInstance().serialize(value)val bytes = new Array[Byte](serialized.remaining())serialized.get(bytes)zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)} 復制代碼
2 領導選舉機制
所謂選舉機制就是注冊監聽機制,一旦監聽到Master掛了,就會進行回調監聽。
主要有:
- ZooKeeperLeaderElectionAgent
- MonarchyLeaderAgent
接下來主要以ZooKeeperLeaderElectionAgent為例:
2.1 借雞生蛋的道理
-
通過/leader_election這個目錄進行監聽:
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"private def start() {logInfo("Starting ZooKeeper LeaderElection agent")zk = SparkCuratorUtil.newClient(conf)leaderLatch = new LeaderLatch(zk, WORKING_DIR)leaderLatch.addListener(this)leaderLatch.start()}private def updateLeadershipStatus(isLeader: Boolean) {if (isLeader && status == LeadershipStatus.NOT_LEADER) {status = LeadershipStatus.LEADERmasterInstance.electedLeader()} else if (!isLeader && status == LeadershipStatus.LEADER) {status = LeadershipStatus.NOT_LEADERmasterInstance.revokedLeadership()}} 復制代碼 -
通過監聽/leader_election對應目錄來進行選舉
override def isLeader() {synchronized {// could have lost leadership by now.if (!leaderLatch.hasLeadership) {return}logInfo("We have gained leadership")updateLeadershipStatus(true)}}override def notLeader() {synchronized {// could have gained leadership by now.if (leaderLatch.hasLeadership) {return}logInfo("We have lost leadership")updateLeadershipStatus(false)}} 復制代碼
3 Master 在選舉中要做什么
Master自己給自己發送消息,開始進行恢復操作:
-
Master繼承了LeaderElectable,因此實現了electedLeader方法:
override def electedLeader() {self.send(ElectedLeader) } 復制代碼 -
Master 的行動beginRecovery和CompleteRecovery
override def receive: PartialFunction[Any, Unit] = {case ElectedLeader =>val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {RecoveryState.ALIVE} else {RecoveryState.RECOVERING}logInfo("I have been elected leader! New state: " + state)if (state == RecoveryState.RECOVERING) {beginRecovery(storedApps, storedDrivers, storedWorkers) <=神來之筆recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {self.send(CompleteRecovery) <=神來之筆}}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)} 復制代碼 -
Master 的行動beginRecovery
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],storedWorkers: Seq[WorkerInfo]) {for (app <- storedApps) {logInfo("Trying to recover app: " + app.id)try {registerApplication(app)app.state = ApplicationState.UNKNOWNapp.driver.send(MasterChanged(self, masterWebUiUrl))} catch {case e: Exception => logInfo("App " + app.id + " had exception on reconnect")}} 復制代碼 -
Master 的行動completeRecovery
private def completeRecovery() {// Ensure "only-once" recovery semantics using a short synchronization period.if (state != RecoveryState.RECOVERING) { return }state = RecoveryState.COMPLETING_RECOVERY// Kill off any workers and apps that didn't respond to us.workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker(_, "Not responding for recovery"))apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)// Update the state of recovered apps to RUNNINGapps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)// Reschedule drivers which were not claimed by any workersdrivers.filter(_.worker.isEmpty).foreach { d =>logWarning(s"Driver ${d.id} was not found after master recovery")if (d.desc.supervise) {logWarning(s"Re-launching ${d.id}")relaunchDriver(d)} else {removeDriver(d.id, DriverState.ERROR, None)logWarning(s"Did not re-launch ${d.id} because it was not supervised")}} 復制代碼
4 總結
秦凱新 于深圳
總結
以上是生活随笔為你收集整理的Spark PersistenceEngine持久化引擎与领导选举代理机制内核原理深入剖析-Spark商业环境实战...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JavaScript专题之模拟实现new
- 下一篇: sitecore系列教程之目标功能有什么