java akka 实战_Akka实战:分散、聚合模式
分散與聚合:簡(jiǎn)單說(shuō)就是一個(gè)任務(wù)需要拆分成多個(gè)小任務(wù),每個(gè)小任務(wù)執(zhí)行完后再把結(jié)果聚合在一起返回。
實(shí)例背景
本實(shí)例來(lái)自一個(gè)真實(shí)的線上產(chǎn)品,現(xiàn)將其需求簡(jiǎn)化如下:
傳入一個(gè)關(guān)鍵詞:key,根據(jù)key從網(wǎng)上抓取相關(guān)新聞
可選傳入一個(gè)超時(shí)參數(shù):duration,設(shè)置任務(wù)到期時(shí)必須反回?cái)?shù)據(jù)(返回實(shí)際已抓取數(shù)據(jù))
若超時(shí)到返回實(shí)際已爬取數(shù)據(jù),則任務(wù)應(yīng)繼續(xù)運(yùn)行直到所以數(shù)據(jù)抓取完成,并存庫(kù)
設(shè)計(jì)
根據(jù)需求,一個(gè)簡(jiǎn)化的分散、聚合模式可以使用兩個(gè)actor來(lái)實(shí)現(xiàn)。
NewsTask:接收請(qǐng)求,并設(shè)置超時(shí)時(shí)間
SearchPageTask:執(zhí)行實(shí)際的新聞抓取操作(本實(shí)例將使用TimeUnit模擬抓取耗時(shí))
實(shí)現(xiàn)
NewsTask
override def metricPreStart(): Unit = {
context.system.scheduler.scheduleOnce(doneDuration, self, TaskDelay)
}
override def metricReceive: Receive = {
case StartFetchNews =>
_receipt = sender()
(0 until NewsTask.TASK_SIZE).foreach { i =>
context.actorOf(SearchPageTask.props(self), "scatter-" + i) ! SearchPage(key)
}
case GetNewsItem(newsItem) =>
_newses ::= newsItem
if (_newses.size == NewsTask.TASK_SIZE) {
logger.debug(s"分散任務(wù),${NewsTask.TASK_SIZE}個(gè)已全部完成")
if (_receipt != null) {
_receipt ! NewsResult(key, _newses)
_receipt = null
}
self ! PoisonPill
}
case TaskDelay =>
if (_receipt != null) {
_receipt ! NewsResult(key, _newses)
_receipt = null
}
}
metricPreStart方法中設(shè)置定時(shí)方法,調(diào)用時(shí)間為從代碼運(yùn)行開(kāi)始到doneDuration時(shí)間為止。定時(shí)被觸發(fā)時(shí)將向當(dāng)前Actor發(fā)送一個(gè)TaskDelay消息。
在metricReceive方法中,分別對(duì)StartFetchNews、GetNewsItem、TaskDelay三個(gè)消息進(jìn)行操作。
在收到StartFetchNews消息時(shí),actor首先保存發(fā)送者actor的引用(結(jié)果將返回到此actor)。再根據(jù)TASK_SIZE生成相應(yīng)子任務(wù)
GetNewsItem消息的處理中,每收到一個(gè)消息就將其添加到_newses列表中。并判斷當(dāng)_newses個(gè)數(shù)等于TASK_SIZE時(shí)(所有子任務(wù)已完成)將結(jié)果發(fā)送給_receipt。
self ! PoisonPill,這句代碼停止actor自身。它將把“毒藥”發(fā)送到NewsTask Actor的接收郵箱隊(duì)列中。
TaskDelay消息被觸發(fā)時(shí),將直接返回已完成的新聞_newses。返回?cái)?shù)據(jù)后并不終止當(dāng)前還未運(yùn)行完任務(wù)。
SearchPageTask
override def metricReceive: Receive = {
case SearchPage(key) =>
// XXX 模擬抓取新聞時(shí)間
TimeUtils.sleep(Random.nextInt(20).seconds)
val item = NewsItem(
"http://newssite/news/" + self.path.name,
"測(cè)試新聞" + self.path.name,
self.path.name,
TimeUtils.now().toString,
"內(nèi)容簡(jiǎn)介", "新聞?wù)?#34;)
taskRef ! GetNewsItem(item)
context.stop(self)
}
SearchPageTask的代碼邏輯就比較易懂了,這里使用sleep來(lái)模擬實(shí)際抓取新聞時(shí)的耗時(shí)。生成結(jié)果后返回?cái)?shù)據(jù)給`taskRef`,并終止自己。
執(zhí)行測(cè)試
./sbt
akka-action > testOnly me.yangbajing.akkaaction.scattergather.ScatterGatherTest
總結(jié)
這是一個(gè)簡(jiǎn)單的Akka實(shí)例,實(shí)現(xiàn)了任務(wù)分發(fā)與結(jié)果聚合。提供了一種在指定時(shí)間內(nèi)返回部份有效數(shù)據(jù),同時(shí)任務(wù)繼續(xù)執(zhí)行的方式。這種分散、聚合的模式在實(shí)際生產(chǎn)中很常用,比如對(duì)多種數(shù)據(jù)源的整合,或某些需要長(zhǎng)時(shí)間運(yùn)行同時(shí)對(duì)返回?cái)?shù)據(jù)完整性無(wú)強(qiáng)制要求的情況等。
MetricActor演示了怎么自定義Actor,并為其提供一些偵測(cè)點(diǎn)的方式。以后有時(shí)間會(huì)寫(xiě)篇詳文介紹。
總結(jié)
以上是生活随笔為你收集整理的java akka 实战_Akka实战:分散、聚合模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 学习笔记 Keras:常见问题
- 下一篇: Python学习:深入Python流程控