react 消息队列_具有AkkaReact流的React队列
react 消息隊列
React性流是最近宣布的一項計劃,旨在在JVM上為具有內置背壓的異步流處理創建標準。 該工作組由Typesafe,Red Hat,Oracle,Netflix等公司組成。
早期的實驗性實現之一是基于Akka的 。 預覽版0.3包括演員生產者和消費者,這為集成提供了新的可能性。
為了測試新技術,我實現了一個非常簡單的Reactive Message Queue 。 該代碼處于PoC階段,缺乏錯誤處理等功能,但如果使用正確,則可以正常工作!
隊列是響應式的,這意味著消息將在需要時傳遞給感興趣的各方,而無需輪詢。 在發送消息時(以便發送者不會使代理不堪重負)和在接收消息時(以便代理僅發送與接收者可以使用的消息一樣多的消息)都會施加反壓。
讓我們看看它是如何工作的!
隊列
首先,隊列本身是一個參與者,對(React式)流一無所知。 該代碼位于com.reactmq.queue包中。 actor接受以下actor消息(“ message”一詞在此處已重載,因此我將使用普通的“ message”來表示我們發送到隊列和從隊列中接收的消息,而“ actor-messages”則為Scala)。類實例發送給演員):
- SendMessage(content) –發送具有指定String內容的消息。 回復( SentMessage(id) )被發送回帶有消息ID的發件人
- ReceiveMessages(count) –表示發件人(演員)想接收最多郵件count信號。 該計數與先前發出信號的需求累加。
- DeleteMessage(id) –毫不奇怪,刪除一條消息
隊列實現是ElasticMQ的簡化版本。 收到消息后,如果在10秒鐘內未將其刪除(確認),則可以再次接收。
當一個actor發出對消息的需求信號時(通過將ReceiveMessages發送到隊列actor),它應該期望有任意數量的ReceivedMessages(msgs) actor-message答復,其中包含接收到的數據。
變得被動
要創建和測試我們的React式隊列,我們??需要三個應用程序:
- Sender
- 中央Broker
- Receiver
我們可以運行任何數量的Senders和Receivers ,但是當然我們應該只運行一個Broker 。
我們需要做的第一件事是通過網絡將Sender與Broker連接,將Receiver與Broker連接。 我們可以使用Akka IO擴展和React式TCP擴展來做到這一點。 使用connect和bind對,我們在綁定端獲得了一個連接流:
// sender: val connectFuture = IO(StreamTcp) ? StreamTcp.Connect(settings, sendServerAddress)connectFuture.onSuccess {case binding: StreamTcp.OutgoingTcpConnection =>logger.info("Sender: connected to broker")// per-connection logic }// broker: val bindSendFuture = IO(StreamTcp) ? StreamTcp.Bind(settings, sendServerAddress)bindSendFuture.onSuccess {case serverBinding: StreamTcp.TcpServerBinding =>logger.info("Broker: send bound")Flow(serverBinding.connectionStream).foreach { conn =>// per-connection logic}.consume(materializer) }有一個用于發送和接收消息的地址。
寄件人
首先讓我們看一下Sender的每個連接邏輯。
Flow(1.second, () => { idx += 1; s"Message $idx from $senderName" }).map { msg =>logger.debug(s"Sender: sending $msg")createFrame(msg)}.toProducer(materializer).produceTo(binding.outputStream)我們正在創建一個滴答流,它每秒產生一個新消息(非常方便測試)。 使用map流轉換器,我們用消息創建了一個字節幀(稍后會詳細介紹)。 但這僅是我們(非常簡單)流的外觀的描述; 它需要使用物化 toProducer方法,該方法將提供流變換節點的具體實現。 當前只有一個FlowMaterializer ,這同樣令人驚訝地使用引擎蓋下的Akka actor來實際創建流和流。
最后,我們將剛剛創建的生產者連接到TCP綁定的outputStream ,而恰好是消費者。 現在,我們有了一個React性的網絡上的消息流,這意味著僅當Broker可以接受消息時才發送消息。 否則,反壓將一直施加到滴答聲產生器。
代理:發送消息
在網絡的另一端是Broker 。 讓我們看看消息到達時會發生什么。
Flow(serverBinding.connectionStream).foreach { conn =>logger.info(s"Broker: send client connected (${conn.remoteAddress})")val sendToQueueConsumer = ActorConsumer[String](system.actorOf(Props(new SendToQueueConsumer(queueActor))))// sending messages to the queue, receiving from the clientval reconcileFrames = new ReconcileFrames()Flow(conn.inputStream).mapConcat(reconcileFrames.apply).produceTo(materializer, sendToQueueConsumer) }.consume(materializer)首先,我們創建了一個Flow ,那將是字節輸入流-從連接的輸入流。 接下來,我們重新構造使用框架發送的String實例,最后將流定向到發送到隊列的使用者。
SendToQueueConsumer是到主隊列SendToQueueConsumer的每個連接的橋。 它使用Akka的Reactive Streams實施中的ActorConsumer特性來自動管理應該在上游發出信號的需求。 利用該特征,我們可以創建一個由演員支持的React流Consumer[_] ,從而實現完全可定制的接收器。
class SendToQueueConsumer(queueActor: ActorRef) extends ActorConsumer {private var inFlight = 0override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {override def inFlightInternally = inFlight}override def receive = {case OnNext(msg: String) =>queueActor ! SendMessage(msg)inFlight += 1case SentMessage(_) => inFlight -= 1} }需要提供給ActorConsumer是一種測量當前正在處理的流項目的方法。 在這里,我們正在計算已發送到隊列但尚未收到ID的消息數(因此,隊列正在處理它們)。
消費者收到包裝在OnNext actor消息中的新消息; 因此, OnNext由流發送給SentMessage ,而SentMessage被隊列SentMessage發送以回復SendMessage 。
接收
接收部分以類似的方式完成,盡管它需要一些額外的步驟。 首先,如果您查看Receiver ,您將看到我們正在從輸入流中讀取字節,從幀中重構消息,并發回ID,從而確認消息。 實際上,我們將在接收消息和發送回ID之間運行一些消息處理邏輯。
在Broker方,我們為每個連接創建兩個流。
一個是發送給接收者的消息流,另一個是來自接收者的已確認消息ID的流,這些流被簡單地轉換為將DeleteMessage消息發送給隊列actor。
與使用者類似,我們需要從隊列參與者到流的每個連接的接收橋。 這是在ReceiveFromQueueProducer實現的。 在這里,我們擴展了ActorProducer特性,它使您可以完全控制實際創建流中消息的過程。
在此參與者中,流正在發送Request參與者消息,以發出需求信號。 有需求時,我們從隊列中請求消息。 隊列最終將以一個或多個ReceivedMessages actor消息進行響應(當隊列中有任何消息時); 由于消息的數量永遠不會超出信號需求,因此我們可以安全地調用ActorProducer.onNext方法,該方法將給定的項目發送到下游。
構圖
一個小細節是我們需要一個自定義的框架協議(感謝Roland Kuhn的澄清 ),因為TCP流只是一個字節流,因此我們可以獲得數據的任意片段,以后需要重新組合。 幸運的是,實現這樣的框架非常簡單–請參閱Framing類。 每個幀都由消息的大小和消息本身組成。
加起來
使用React式流和Akka實施,可以輕松創建具有端到端背壓的React式應用程序。 上面的隊列雖然缺少很多功能和證明,但不允許Senders使Broker過載,而另一方面, Broker會使Receivers過載。 所有這些,而無需實際編寫任何背壓處理代碼!
翻譯自: https://www.javacodegeeks.com/2014/06/reactive-queue-with-akka-reactive-streams.html
react 消息隊列
總結
以上是生活随笔為你收集整理的react 消息队列_具有AkkaReact流的React队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 构建Spring微服务并对其进行Dock
- 下一篇: 机械能如何转化为内能 机械能转化为内能的