初见akka-02:rpc框架
1.RPC:簡單點說,就是多線程之間的通信,我們今天用了scala以及akka
來簡單的實現了
rpc框架的一些簡單的內容,一臉包括了,心跳,間隔時間,
注冊以及一些問題,
模式匹配的一些東西,雖然比較簡單,但是屬于麻雀雖小,五臟俱全
這個里面一共有有四個文件:
Master.scala
RemoteMessage.scala
Worker.scala
WorkerInfo
Master.scala
package cn.wj.rpcimport akka.actor.{Actor, ActorSystem} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactory import akka.actor.Props import scala.concurrent.duration._import scala.collection.mutable/*** Created by WJ on 2016/12/23.*/ class Master(val host:String,val port:Int ) extends Actor {// workerId -> WorkerInfoval idToWorker = new mutable.HashMap[String,WorkerInfo]()val workers = new mutable.HashSet[WorkerInfo]()//時間間隔時間,超時檢測的間隔val CHECK_INTERVAL = 15000//用于接收消息override def receive: Receive = {case RegisterWorker(id,memory,cores) => { // println("a client connected") // sender ! "reply" //往發送給他消息的人回復一個消息//判斷一下是不是已經注冊過了if(!(idToWorker.contains(id))){//把Worker的信息封裝以前,保存到內存當中val workerInfo = new WorkerInfo(id,memory,cores)idToWorker(id) = workerInfo //這個應該是scala的特定版本workers += workerInfosender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")}}case Heartbeat(id) =>{if(idToWorker.contains(id)) {val workerInfo = idToWorker(id)//報活//得到系統當前時間val currentTime = System.currentTimeMillis()workerInfo.lastHeartbeatTime = currentTime}}case CheckTimeOutWorker => {val currentTime = System.currentTimeMillis()val toRemove = workers.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL)for(w <- toRemove){workers -= widToWorker -= w.id}println(workers.size)}}override def preStart(): Unit = {println("prestart invoked")//導入隱式轉換的功能import context.dispatchercontext.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)} }object Master{def main(args: Array[String]): Unit = {val host = args(0)val port = args(1).toInt// 準備配置val configStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval config = ConfigFactory.parseString(configStr)//ActorSystem老大,輔助創建和監控下面的Actor,他是單例的val actorSystem = ActorSystem("MasterSystem",config )//創建Actorval master = actorSystem.actorOf(Props(new Master(host,port)),"Master")actorSystem.awaitTermination()} }Worker.scala
package cn.wj.rpcimport java.util.UUIDimport akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ /*** Created by WJ on 2016/12/23.*/ class Worker(val masterHost:String,val masterPort:Int,val memory:Int,val cores:Int) extends Actor {var master : ActorSelection = _val workerId = UUID.randomUUID().toStringval HEART_INTERVAL = 10000//preStart執行方法的時機:構造器之后,receive之前//與Master(Actor)建立連接override def preStart(): Unit = {//master已經是別的Master的引用了 ,這是跟master建立連接master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")//向Master發送注冊消息master ! RegisterWorker(workerId,memory,cores)}override def receive: Receive = {case RegisteredWorker(masterUrl) => {println(masterUrl)//啟動定時器發送心跳import context.dispatchercontext.system.scheduler.schedule(0 millis,HEART_INTERVAL millis,self,SendHeartbeat)}case SendHeartbeat =>{println("send heartbeat to master")master ! Heartbeat(workerId)}} }object Worker{def main(args: Array[String]): Unit = {val host = args(0)val port = args(1).toIntval masterHost = args(2)val masterPort = args(3).toIntval memory = args(4).toIntval cores = args(5).toInt// 準備配置val configStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval config = ConfigFactory.parseString(configStr)//ActorSystem老大,輔助創建和監控下面的Actor,他是單例的val actorSystem = ActorSystem("WorkerSystem",config )//創建Actor,此時調用該(Actor)的prestart以及receive方法actorSystem.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)),"Worker")actorSystem.awaitTermination()} }RemoteMessage.scala
package cn.wj.rpc/*** Created by WJ on 2016/12/25.*/ trait RemoteMessage extends Serializable//Worker->Master(這個表明當master接受這個worker時的信息,是receive)case class RegisterWorker (id:String, memory:Int, cores:Int) extends RemoteMessage//Master -> Worker(這個是master收到workerd的注冊信息,表明已經注冊過這條信息,是sender ! xxx時候出現的) case class RegisteredWorker(masterUrl:String) extends RemoteMessage//這是進程之間自己給自己發送消息,所以采用case object,并且不需要實現Serializable //Worker -> Worker(self) case object SendHeartbeat//這個是work向master發送定時器,其中的id是work的id,因為要向master說明,是哪一個work給他發送的心跳 //Worker -> Master case class Heartbeat(id:String) extends RemoteMessage//Master -> self case object CheckTimeOutWorkerWorkerInfo.scala
package cn.wj.rpc/*** Created by WJ on 2016/12/25.*/ class WorkerInfo(val id:String ,val memory :Int,val cores:Int) {//TODO 上一次心跳var lastHeartbeatTime:Long = _ }這個上面的四個就是簡單的實現了RPC框架,其實就是一個Master監控多個Worker,
當一個Worker創建了,他就是需要在Master注冊信息,其實這個Master個人感覺就像
是個Zookeeper,掌管Worker的信息,為其Worker分配一些資源,當Master接到Worker
的注冊信息的時候,他就在自己的注冊表添加上這個Worker,然后向Worker發送一個注冊
成功的信息,此時這個Worker的收到這個注冊信息,然后他就給Master發送心跳,這個的
作用是在告訴Master,我這個Worker是存活的(報活),當一個Worke發送心跳的時間間隔
過長,長過我們規定的時間,那么此時我們就需要主動殺死這個Worker,感覺hadoop的一些
分布式和這個原理差不多。
下面奉上原理圖一張:
其中的receive是用于接受信息,因為繼承Actor,
prestart這個方法是執行實在類實例之后,receive的方法之后
2.RPC的大概流程
?
首先定義了一個worker,一個master,master首先啟動了,
然后它在prestart()的方法里面
檢測超時的worker,那么在這個里面啟動了一個定時器,
那么我們自己是不是自己可以手寫一個定時器,
比如我們可以用線程來搞定時器,但是我們的akka
里面提供了一個超級簡單的定時器,
context.system.schedular.schedule
(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)
其中第一個參數:延遲多少秒
第二個參數:時間間隔
第三個參數:把這個消息發給誰
第四個參數:發送什么消息
雖然它起了消息,但是他不能一下子就把消息發送出去
,它只能把消息先發送給自己的receive接收到這個消息,
然后在發送給我們master,這個里面有一個檢測,
檢測worker有多長時間沒有向我發送心跳了,
如果這個時間大過了我規定的范圍,
這樣,Master啟動完成檢測心跳,worker啟動完成后
,首先向master建立連接,然后發送注冊消息
,master接受到這個注冊消息,
把worker的信息保存到內存當中,然后向worker反饋一個消息,
說你注冊成功了,然后worker啟動一個定時器,
定時的向master發送心跳,就是這樣的流程
?
轉載于:https://www.cnblogs.com/wnbahmbb/p/6220528.html
總結
以上是生活随笔為你收集整理的初见akka-02:rpc框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: nginx subrequest演示示例
- 下一篇: jquery中的ajax方法参数——$.