akka 消息发送接收_Akka型演员:探索接收器模式
akka 消息發(fā)送接收
在上一篇文章中,我們研究了Akka Typed提供的一些基本功能。 在本文和下一篇文章中,我們將更進(jìn)一步地了解一些其他功能,并通過查看Akka Typed提供的兩種不同模式來做到這一點(diǎn):Receiver和Receptionist模式。 如果您是Akka Typed的新手,那么最好先閱讀上一篇文章,因?yàn)檫@將使您對Akka Typed有所了解。 因此,對于本系列中的Akka型文章,我們將研究Receiver模式。
- 與往常一樣,您可以在Github Gist中找到此示例的代碼: https : //gist.github.com/josdirksen/77e59d236c637d46ab32
 
接收方模式
在Akka Typed發(fā)行版中,有一個(gè)名為akka.typed.patterns的包。 在此程序包中,有兩種不同的模式,即接收方模式和接收方模式。 坦白說,為什么這兩種模式足夠重要以增加發(fā)行版,但我確實(shí)不知道,但是它們確實(shí)為在Akka Typed之后引入更多概念和想法提供了一個(gè)很好的方法。
因此,讓我們看一下Receiver模式,在下一篇文章中我們將做Receptionist模式。 要了解Receiver模式的功能,只需看一下我們可以發(fā)送給它的消息:
/*** Retrieve one message from the Receiver, waiting at most for the given duration.*/final case class GetOne[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetOneResult[T]]) extends Command[T]/*** Retrieve all messages from the Receiver that it has queued after the given* duration has elapsed.*/final case class GetAll[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetAllResult[T]]) extends Command[T]/*** Retrieve the external address of this Receiver (i.e. the side at which it* takes in the messages of type T.*/final case class ExternalAddress[T](replyTo: ActorRef[ActorRef[T]]) extends Command[T]從這些消息中可以看到,Receiver的工作是將T類型的消息排隊(duì),并提供其他命令以在等待特定時(shí)間的同時(shí)獲取這些消息中的一個(gè)或多個(gè)。 要使用接收器,我們需要獲取ExternalAddress,以便我們可以向其發(fā)送類型為T的消息。 并且可以從其他參與者發(fā)送get GetOne和GetAll消息,以查看接收器中是否有任何消息在等待。
對于我們的示例,我們將創(chuàng)建以下參與者:
- 生產(chǎn)者,它向接收者發(fā)送類型為T的消息。
 - 可以從此接收器檢索類型T消息的使用者。
 - 根角色,運(yùn)行此方案。
 
我們將從生產(chǎn)者開始,如下所示:
/*** Producer object containing the protocol and the behavior. This is a very simple* actor that produces messages using a schedule. To start producing messages* we need to send an initial message*/object Producer {// a simple protocol defining the messages that can be sentsealed trait ProducerMsgfinal case class registerReceiverMsgIn(msgIn: ActorRef[HelloMsg]) extends ProducerMsgfinal case class addHelloWorldMsg(msg: HelloMsg) extends ProducerMsg// the producer, which first waits for a registerReceiver message, after which// it changes behavior, to send messages.val producer = Full[ProducerMsg] {// if we receive a register message, we know where to send messages tocase Msg(ctx, registerReceiverMsgIn(msgConsumer)) =>println("Producer: Switching behavior")// simple helper function which sends a message to self.def scheduleMessage() = ctx.schedule(500 millisecond, ctx.self, addHelloWorldMsg(Hello(s"hello @ ${System.currentTimeMillis()}")))// schedule the first one, the rest will be triggered through the behavior.scheduleMessage()Static {// add a message to the receiver and schedule a new onecase addHelloWorldMsg(msg) => {println(s"Producer: Adding new '$msg' to receiver: $msgConsumer") ;msgConsumer ! msg; scheduleMessage()}}// don't switch behavior on any of the other messagescase _ => Same}}在此對象中,我們定義了可以發(fā)送給角色的消息以及行為。 registerReceiverMsgIn消息為actor提供了應(yīng)該向其發(fā)送消息的目的地(稍后會(huì)對此進(jìn)行詳細(xì)介紹),并且addHelloWorldMsg告訴行為將什么消息發(fā)送到registerReceiverMsgIn消息提供的地址。 如果您查看此行為,則可以看到我們使用Full [T]行為。 對于這種行為,我們必須為所有消息和信號提供匹配器,此外,我們還可以訪問actor ctx。 在其初始狀態(tài)下,此行為僅響應(yīng)registerReceiverMsgIn消息。 當(dāng)它收到這樣的消息時(shí),它會(huì)做兩件事:
因此,當(dāng)我們發(fā)送初??始的registerReceiverMessage時(shí),它將導(dǎo)致actor每500 ms向接收者發(fā)送一條新消息。 現(xiàn)在讓我們看看另一面:消費(fèi)者。
對于消費(fèi)者,我們還將所有內(nèi)容包裝在一個(gè)對象中,如下所示:
object Consumer {val consumer = Total[HelloMsg] {// in the case of a registerReceiver message, we change the implementation// since we're ready to receive other message.case registerReceiverCmdIn(commandAddress) => {println("Consumer: Switching behavior")// return a static implementation which closes over actorRefs// all messages we receive we pass to the receiver, which will queue// them. We have a specific message that prints out the received messagesContextAware { ctx =>Static[HelloMsg] {// printmessages just prints out the list of messages we've receivedcase PrintMessages(msgs) => println(s"Consumer: Printing messages: $msgs") ;msgs.foreach { hw => println(s" $hw")}// if we get the getAllMessages request, we get all the messages from// the receiver.case GetAllMessages() => {println("Consumer: requesting all messages")val wrap = ctx.spawnAdapter[GetAllResult[HelloMsg]] {case msgs:GetAllResult[HelloMsg] => println(s"Consumer: Received ${msgs.msgs.length} messages"); PrintMessages(msgs.msgs)}commandAddress ! GetAll(2 seconds)(wrap)}}}}// for all the other cases return the existing implementation, in essence// we're just ignoring other messages till we change statecase _ => Same} }在此對象中,我們定義了一個(gè)行為,該行為在接收到第一條消息后也會(huì)切換其實(shí)現(xiàn)。 在這種情況下,第一個(gè)消息稱為registerReceiverCmdIn。 通過此消息,我們可以訪問(接收方的)actorRef,將GetAll和getOne消息發(fā)送至該消息。 切換行為后,我們將處理自己的自定義GetAllMessages消息,這將觸發(fā)將GetAll消息發(fā)送到接收器。 由于未針對從Receiver收到的響應(yīng)類型鍵入我們自己的行為,因此我們使用適配器(ctx.spawnAdapter)。 該適配器將接收來自接收器的響應(yīng)并打印出消息。
最后一個(gè)消息部分是一個(gè)啟動(dòng)此行為的參與者:
// Simple root actor, which we'll use to start the other actorsval scenario1 = {Full[Unit] {case Sig(ctx, PreStart) => {import Producer._import Consumer._println("Scenario1: Started, now lets start up a number of child actors to do our stuff")// first start the two actors, one implements the receiver pattern, and// the other is the one we control directly.val receiverActor = ctx.spawn(Props(Receiver.behavior[HelloMsg]), "receiver")val consumerActor = ctx.spawn(Props(consumer), "adder")val producerActor = ctx.spawn(Props(producer), "producer")// our producerActor first needs the actorRef it can use to add messages to the receiver// for this we use a wrapper, this wrapper creates a child, which we use to get the// address, to which we can send messages.val wrapper = ctx.spawnAdapter[ActorRef[HelloMsg]] {case p: ActorRef[HelloMsg] => producerActor ! registerReceiverMsgIn(p)}// now send the message to get the external address, the response will be sent// to our own actor as a registerReceiver message, through the adapterreceiverActor ! ExternalAddress(wrapper)// our printing actor needs to now the address of the receiver so send it to himconsumerActor ! registerReceiverCmdIn(receiverActor)// by calling getAllMessages we get the messages within a time period.println("Scenario1: Get all the messages")consumerActor ! GetAllMessages()Thread.sleep(3000)consumerActor ! GetAllMessages()Thread.sleep(5000)consumerActor ! GetAllMessages()Same}}}這里沒什么特別的。 在這種情況下,我們將創(chuàng)建各種角色,并使用ctx.spawnAdapter來獲取接收者的外部地址,并將其傳遞給producerActor。 接下來,我們將接收者參與者的地址傳遞給消費(fèi)者。 現(xiàn)在,我們在使用者地址上調(diào)用GetAllMessages,該地址將從接收方獲取消息并打印出來。
因此,總結(jié)一下將在此示例中執(zhí)行的步驟:
這種情況的結(jié)果如下所示:
Scenario1: Started, now lets start up a number of child actors to do our stuff Scenario1: Get all the messages Consumer: Switching behavior Consumer: requesting all messages Producer: Switching behavior Producer: Adding new 'Hello(hello @ 1446277162929)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277163454)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277163969)' to receiver: Actor[akka://Root/user/receiver#1097367365] Consumer: Received 3 messages Consumer: Printing messages: Vector(Hello(hello @ 1446277162929), Hello(hello @ 1446277163454), Hello(hello @ 1446277163969))Hello(hello @ 1446277162929)Hello(hello @ 1446277163454)Hello(hello @ 1446277163969) Producer: Adding new 'Hello(hello @ 1446277164488)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277165008)' to receiver: Actor[akka://Root/user/receiver#1097367365] Consumer: requesting all messages Producer: Adding new 'Hello(hello @ 1446277165529)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277166049)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277166569)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277167089)' to receiver: Actor[akka://Root/user/receiver#1097367365] Consumer: Received 6 messages Consumer: Printing messages: Vector(Hello(hello @ 1446277164488), Hello(hello @ 1446277165008), Hello(hello @ 1446277165529), Hello(hello @ 1446277166049), Hello(hello @ 1446277166569), Hello(hello @ 1446277167089))Hello(hello @ 1446277164488)Hello(hello @ 1446277165008)Hello(hello @ 1446277165529)Hello(hello @ 1446277166049)Hello(hello @ 1446277166569)Hello(hello @ 1446277167089) Producer: Adding new 'Hello(hello @ 1446277167607)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277168129)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277168650)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277169169)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277169690)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277170210)' to receiver: Actor[akka://Root/user/receiver#1097367365] Consumer: requesting all messages Producer: Adding new 'Hello(hello @ 1446277170729)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277171249)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277171769)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277172289)' to receiver: Actor[akka://Root/user/receiver#1097367365] Consumer: Received 10 messages Consumer: Printing messages: Vector(Hello(hello @ 1446277167607), Hello(hello @ 1446277168129), Hello(hello @ 1446277168650), Hello(hello @ 1446277169169), Hello(hello @ 1446277169690), Hello(hello @ 1446277170210), Hello(hello @ 1446277170729), Hello(hello @ 1446277171249), Hello(hello @ 1446277171769), Hello(hello @ 1446277172289))Hello(hello @ 1446277167607)Hello(hello @ 1446277168129)Hello(hello @ 1446277168650)Hello(hello @ 1446277169169)Hello(hello @ 1446277169690)Hello(hello @ 1446277170210)Hello(hello @ 1446277170729)Hello(hello @ 1446277171249)Hello(hello @ 1446277171769)Hello(hello @ 1446277172289) Producer: Adding new 'Hello(hello @ 1446277172808)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277173328)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277173849)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277174369)' to receiver: Actor[akka://Root/user/receiver#1097367365]酷吧! 從消息序列中可以看到,我們的生產(chǎn)者將消息發(fā)送到接收者,接收者將它們排隊(duì)。 接下來,我們有一個(gè)使用者,它請求到目前為止已收到的所有消息并打印出來。
這是關(guān)于Akka-Typed的文章的內(nèi)容,在下一篇文章中,我們將介紹同樣在Akka-Typed中呈現(xiàn)的接待員模式。
翻譯自: https://www.javacodegeeks.com/2015/11/akka-typed-actors-exploring-the-receiver-pattern.html
akka 消息發(fā)送接收
總結(jié)
以上是生活随笔為你收集整理的akka 消息发送接收_Akka型演员:探索接收器模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 电脑开机后得修复网络才能上网是怎么回事(
 - 下一篇: activemq和jms_带有Activ