使用 Akka 实现 Master 与 Worker 之间的通信
生活随笔
收集整理的這篇文章主要介紹了
使用 Akka 实现 Master 与 Worker 之间的通信
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
MessageProtocol.scala
package top.gldwolf.scala.akkademo.sparkmasterandworker.common/*** @author: Gldwolf* @email: ZengqiangZhao@sina.com* @date: 2020/4/17 10:54*//*** 用于 Work 注冊時發送注冊信息*/ case class WorkerRegisterInfo(id: String, cpu: Int, ram: Int) {}/*** 用于保存到 Master 的 HashMap 中*/ class WorkerInfo(var id: String, cpu: Int, ram: Int) {var lastHeartBeatTime = System.currentTimeMillis() }/*** 注冊成功后,Master 回應此類型的消息,表示注冊成功* Worker 接收到后,啟動心跳機制*/ case object RegisteredInfo/*** worker 每隔一定時間由定時器發給自己的一個消息,用于觸發自己給 Master 發送消息*/ case object SendHeartBeat/*** 由自己的消息觸發,然后給 Master 發送 HeartBeat 消息,消息要帶上自己的 id*/ case class HeartBeat(id: String)/*** Master 給自己發送一個觸發檢查超時 Worker 的消息,定時獲取已離線的 Worker*/ case object StartCheckTimeOutWorker/*** Master 給自己發消息,檢測 Worker 是否已離線,如果已離線,則移除*/ case object RemoveTimeOutWorkerSparkWorker.scala
package top.gldwolf.scala.akkademo.sparkmasterandworker.workerimport akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} import top.gldwolf.scala.akkademo.sparkmasterandworker.common.{HeartBeat, RegisteredInfo, SendHeartBeat, WorkerRegisterInfo}import scala.concurrent.duration.FiniteDuration/*** @author: Gldwolf* @email: ZengqiangZhao@sina.com* @date: 2020/4/17 10:03*/object SparkWorker {def main(args: Array[String]): Unit = {if (args.length < 6) {println("參數個數不正確:host, port, workerName, masterName, masterHost, masterPort...")System.exit(-1)}val host = args(0)val port = args(1).toIntval workerName = args(2)val masterName = args(3)val masterHost = args(4)val masterPort = args(5).toIntval config: Config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=$host|akka.remote.netty.tcp.port=$port|""".stripMargin)val workerFactory: ActorSystem = ActorSystem("WorkerFactory", config)val workerRef: ActorRef = workerFactory.actorOf(Props(new SparkWorker(masterHost, masterPort, masterName)), workerName)workerRef ! "start"} }class SparkWorker(masterHost: String, masterPort: Int, masterName: String) extends Actor {var masterRef: ActorSelection = _var id = java.util.UUID.randomUUID().toStringoverride def preStart(): Unit = {masterRef = context.actorSelection("akka.tcp://MasterFactory@" +s"${masterHost}:${masterPort}/user/${masterName}")}override def receive: Receive = {case "start" => {println("Worker " + "已上線~~~")masterRef ! WorkerRegisterInfo(id, 4, 4096)}// 接收到這個消息表示注冊成功,然后會給定時給自己發送 SendHeartBeat 消息,觸發心跳機制case RegisteredInfo => {println("workerId: " + id + " 注冊成功!")import context.dispatcher// 說明:// 1. 0 millis 表示不延時,立即執行定時器// 2. 3000 millis 表示每隔 3 秒執行一次// 3. self 表示發送給自己// 4. SendHeartBeat 表示發送的內容context.system.scheduler.schedule(FiniteDuration(0, "millis"), FiniteDuration(3000, "millis"), self, SendHeartBeat)}// 接收到自己的定時器發送給自己的消息時,觸發下面內容,給 Master 發送心跳信息case SendHeartBeat => {println("worker: " + id + " 發送心跳信息~~~")masterRef ! HeartBeat(id)}} }SparkMaster.scala
package top.gldwolf.scala.akkademo.sparkmasterandworker.masterimport akka.actor.{Actor, ActorRef, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} import top.gldwolf.scala.akkademo.sparkmasterandworker.common._import scala.collection.mutable import scala.concurrent.duration.FiniteDuration/*** @author: Gldwolf* @email: ZengqiangZhao@sina.com* @date: 2020/4/17 9:56*/object SparkMaster {def main(args: Array[String]): Unit = {if (args.length < 3) {println("參數不夠,請轉入 host, port, masterName...")System.exit(-1)}val host = args(0)val port = args(1)val masterName = args(2)val config: Config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=$host|akka.remote.netty.tcp.port=$port|""".stripMargin)val masterFactory: ActorSystem = ActorSystem("MasterFactory", config)val masterRef: ActorRef = masterFactory.actorOf(Props[SparkMaster], masterName)masterRef ! "start"} }class SparkMaster extends Actor {val workers: mutable.HashMap[String, WorkerInfo] = mutable.HashMap()override def receive: Receive = {case "start" => {// 這里開始啟動 Master 定時檢測任務,判斷 Worker 有沒有離線println("Master 已上線~~~")self ! StartCheckTimeOutWorker // 給自己發一個開始檢測的消息}case WorkerRegisterInfo(id, cpu, ram) => { // 如果是注冊信息,則將注冊信息管理起來val workerInfo = new WorkerInfo(id, cpu, ram)if (!workers.contains(id)) { // 判斷是否已經添加這個 workerworkers += ((id, workerInfo)) // 如果沒有則添加進來println("worker:" + id + " 注冊成功~~~")println("目前已有的 Workers: " + workers)// 回復注冊成功消息sender() ! RegisteredInfo}}case HeartBeat(id) => { // 接收到 worker 的心跳信息val workerInfo = workers(id)val lastHeartBeatTime: Long = System.currentTimeMillis()workerInfo.lastHeartBeatTime = lastHeartBeatTimeprintln("Master 更新了 " + id + " 的心跳時間: " + lastHeartBeatTime)}case StartCheckTimeOutWorker => {// 獲取到消息后開始定時檢測離線的 Workerprintln("----- Master 開啟定時任務,檢測已離線的 Worker -----")import context.dispatchercontext.system.scheduler.schedule(FiniteDuration(0, "millis"), FiniteDuration(9000, "millis"), self, RemoveTimeOutWorker)}// 檢測哪些 Worker 超時了,并從 Worker 中刪除case RemoveTimeOutWorker => {val now: Long = System.currentTimeMillis// 如果最后一次心跳距離現在有 6 秒,那么就代表離線了,則刪除這個 Worker // for ((id, workerInfo) <- workers) { // if ((now - workerInfo.lastHeartBeatTime) / 1000 > 6) { // workers.remove(id) // println("worker: " + id + " 已離線,將其移除!") // } // }// 也可以使用函數式編程將其移除workers.values.filter(worker => now - worker.lastHeartBeatTime > 6000).foreach(worker => {println("Worker: " + worker.id + " 離線,已將其移除!")workers.remove(worker.id)})println("當前有 " + workers.size + " 個 Worker 存活!")}} }總結
以上是生活随笔為你收集整理的使用 Akka 实现 Master 与 Worker 之间的通信的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 学习分布式不得不会的CAP理论
- 下一篇: 简析.NET Core 以及与 .NET