Scala-Actor并行wordcount
生活随笔
收集整理的這篇文章主要介紹了
Scala-Actor并行wordcount
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
scala-2.10.6
文件:
"d://word.txt", "d://word.log"文件內(nèi)容:
代碼:
package cn.zengmg.day26.actorimport scala.actors.{Actor, Future} import scala.collection.mutable.ListBuffer import scala.io.Sourcecase class SubmitFile(filePath: String)class ActorWordCount extends Actor {override def act(): Unit = {loop {react {case SubmitFile(filePath) => {val words = Source.fromFile(filePath).getLines()val result = words.toList.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))sender ! result}}}} }object ActorWordCount {def main(args: Array[String]): Unit = {//發(fā)送異步消息,返回值是 Future[Any]。val futures = new ListBuffer[Future[Any]]//存放每個文件統(tǒng)計得結(jié)果val results = new ListBuffer[Map[String, Int]]val files = Array("d://word.txt", "d://word.log")for (f <- files) {val t = new ActorWordCountt.start()//調(diào)用就會返回值,但該值不是真正的計算結(jié)果,可以理解為放結(jié)果的一個容器,isSet的值為false。val future=t !! SubmitFile(f)futures+=future}while(futures.size>0){val validResult=futures.filter(_.isSet)if(validResult.size>0){//result 類型 Future[Any]。for(result<-validResult){results+=result.apply().asInstanceOf[Map[String, Int]]futures-=result}}}val finalRsult = results.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2))println("未排序:"+finalRsult)val sortFinalRsult=finalRsult.toList.sortBy(_._2).reverseprintln("排序:"+sortFinalRsult)}}運行結(jié)果:
未排序:Map(wecki -> 7, ?-> 1, rerex -> 7, kljk -> 7, oppyed -> 7, weopc -> 7, tom -> 8, aadkj; -> 7, sdlkjfsfsad -> 7, xe;q -> 7, kl;xjljks -> 7, djkdsfsadf -> 7, hello -> 16, jerry -> 8, weweioe -> 7, wiot -> 7)
排序:List((hello,16), (jerry,8), (tom,8), (wiot,7), (weweioe,7), (djkdsfsadf,7), (kl;xjljks,7), (xe;q,7), (sdlkjfsfsad,7), (aadkj;,7), (weopc,7), (oppyed,7), (kljk,7), (rerex,7), (wecki,7), (,1))
總結(jié)
以上是生活随笔為你收集整理的Scala-Actor并行wordcount的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第一个Spark实例:求PI值
- 下一篇: ManicTime软件破解