Scala Actor,receive不断接收消息,react复用线程,结合case class的actor,Future使用,使用Actor进行wordCount
Scala Actor
二、 什么是Scala Actor
1. 概念
Scala中的Actor能夠?qū)崿F(xiàn)并行編程的強大功能,它是基于事件模型的并發(fā)機制,Scala是運用消息(message)的發(fā)送、接收來實現(xiàn)多線程的。使用Scala能夠更容易地實現(xiàn)多線程應用的開發(fā)。
2. 傳統(tǒng)java并發(fā)編程與Scala Actor編程的區(qū)別
對于Java,我們都知道它的多線程實現(xiàn)需要對共享資源(變量、對象等)使用synchronized 關鍵字進行代碼塊同步、對象鎖互斥等等。而且,常常一大塊的try…catch語句塊中加上wait方法、notify方法、notifyAll方法是讓人很頭疼的。原因就在于Java中多數(shù)使用的是可變狀態(tài)的對象資源,對這些資源進行共享來實現(xiàn)多線程編程的話,控制好資源競爭與防止對象狀態(tài)被意外修改是非常重要的,而對象狀態(tài)的不變性也是較難以保證的。 而在Scala中,我們可以通過復制不可變狀態(tài)的資源(即對象,Scala中一切都是對象,連函數(shù)、方法也是)的一個副本,再基于Actor的消息發(fā)送、接收機制進行并行編程
3. Actor方法執(zhí)行順序
1.首先調(diào)用start()方法啟動Actor
2.調(diào)用start()方法后其act()方法會被執(zhí)行
Scala Actor向Actor發(fā)送消息
4. 發(fā)送消息的方式
! 發(fā)送異步消息,沒有返回值。
!? 發(fā)送同步消息,等待返回值。
!! 發(fā)送異步消息,返回值是 Future[Any]。
三、 Actor實戰(zhàn)
如果發(fā)現(xiàn)Actor的包引入不了,解決辦法是:
選中項目右鍵—>Open Module Settings—>Library–>添加jar(scala-2.10.6.zip中的\lib*.jar)
上面的右側(cè)就是我們引入的jar包
1. 第一個例子
說明:上面分別調(diào)用了兩個單例對象的start()方法,他們的act()方法會被執(zhí)行,相同與在java中開啟了兩個線程,線程的run()方法會被執(zhí)行
注意:這兩個Actor是并行執(zhí)行的,act()方法中的for循環(huán)執(zhí)行完成后actor程序就退出了
2. 第二個例子(可以不斷地接收消息)
receive相當于是創(chuàng)建線程和銷毀線程的過程。
說明:在act()方法中加入了while (true) 循環(huán),就可以不停的接收消息
注意:發(fā)送start消息和stop的消息是異步的,但是Actor接收到消息執(zhí)行的過程是同步的按順序執(zhí)行
3. 第三個例子(react方式會復用線程,比receive更高效)
react類似線程池機制,可以復用線程。
說明: react 如果要反復執(zhí)行消息處理,react外層要用loop,不能用while
4. 第四個例子(結(jié)合case class發(fā)送消息)
package cn.toto.scala.day2import scala.actors.Actorcase class SyncMsg(id:Int,msg:String) case class AsyncMsg(id:Int,msg:String) case class ReplyMsg(id:Int,msg:String)/*** Created by toto on 2017/7/1.*/ class AppleActor extends Actor {def act(): Unit = {while (true) {receive {case "start" => println("starting ...")case SyncMsg(id, msg) => {println(id + ",sync " + msg)Thread.sleep(5000)//這里又開了一個子線程,這個在下面的方法中執(zhí)行//sender也是一個方法。看可以通過消息的發(fā)送者發(fā)送sender ! ReplyMsg(3,"finished")}case AsyncMsg(id, msg) => {println(id + ",async " + msg)Thread.sleep(5000)}}}} }object AppleActor {/*** 總體的運行結(jié)果是:* 異步消息發(fā)送完成* 1,async hello actor* None* false* 123* 2,sync hello actor* 2,sync hello actor* true* ReplyMsg(3,finished)* @param args*/def main(args: Array[String]): Unit = {val a = new AppleActora.start()//異步消息a ! AsyncMsg(1,"hello actor")println("異步消息發(fā)送完成")//同步消息(因為這里的最長時間是1秒,但是SyncMsg中休眠的時間更長,為5秒,所以這里是None)//同步消息,表示需要等待val content = a.!? (1000,SyncMsg(2,"hello actor"))println(content)//異步但是返回結(jié)果,這個結(jié)果會被放到future中。然后進行返回。val reply = a !! SyncMsg(2,"hello actor")//replay是否已經(jīng)完成通過isSet類判斷println(reply.isSet);println("123")//通過apply方法拿到里面的值,這個apply是feture中的方法。val c = reply.apply()//拿到之后,reply中的值相當于被設置了。所以c為trueprintln(reply.isSet)println(c)} }Future的使用
package cn.toto.scala.day2;import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;import java.util.concurrent.*;/*** Created by ZhaoXing on 2016/6/27.* Feture相當于給出了一種期望,就是說可能立即不會有結(jié)果,但是執(zhí)行完畢或過一段時間后會有結(jié)果*/ public class FutureDemo {/*** 運行結(jié)果:* main false* 正在查找數(shù)據(jù)* pool-1-thread-1 數(shù)據(jù)查找完畢* true* 9527* @param args* @throws Exception*/public static void main(String[] args) throws Exception{ExecutorService threadPool = Executors.newCachedThreadPool();Future<Integer> future = threadPool.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {System.out.println("正在查找數(shù)據(jù)");Thread.sleep(4000);System.out.println(Thread.currentThread().getName() + " 數(shù)據(jù)查找完畢");return 9527;}});System.out.println(Thread.currentThread().getName() + " " + future.isDone());//System.out.println(future.get(1000, TimeUnit.MILLISECONDS));Thread.sleep(6000);System.out.println(future.isDone());System.out.println(future.get());} }使用Actor進行wordCount
package cn.toto.scala.day2import scala.actors.{Actor, Future} import scala.collection.mutable.ListBuffer import scala.collection.parallel.Task import scala.io.Source/*** Created by toto on 2017/7/1.** 使用線程的方式計算單詞出現(xiàn)的頻率*/ case class SbTask(fn:String) case object StopTaskclass WordCountByActorTask extends Actor {override def act(): Unit = {while(true) {receive {case SbTask(f) => {//通過Source.fromFile(f)讀取文件,獲取每行并且把它轉(zhuǎn)換成Listval lines : List[String] = Source.fromFile(f).getLines().toList;//將lines list里面的內(nèi)容合并,然后并且將它split.val words : List[String] = lines.flatMap(_.split(" "))//將word這些單詞變成map的并且是元組類型的,每個都是1,----,記著對他們進行過分組,接著就是計算單詞val result: Map[String, Int]= words.map((_, 1)).groupBy(_._1).mapValues(_.size)sender ! result}case StopTask => {exit()}}}} }object WordCountByActor {def main(args: Array[String]): Unit = {val replys = new ListBuffer[Future[Any]]val results = new ListBuffer[Map[String,Int]]val files = Array("E://wordcount//input//a.txt","E://wordcount//input//b.txt","E://wordcount//input//c.txt")//每個文件就啟動多少個actor,并將每個actor處理后得到的結(jié)果存儲到List中for(f <- files) {val t = new WordCountByActorTaskt.start()//通過SbTask的方式發(fā)送文件名val reply:Future[Any] = t !! SbTask(f)//將處理到的結(jié)果放到ListBuffer中。replys += reply}//對獲得的replay的值進行計算。while (replys.size > 0) {val dones: ListBuffer[Future[Any]] = replys.filter(_.isSet)for(f <- dones) {//從relay中拿值,并將它轉(zhuǎn)成mapresults += f.apply().asInstanceOf[Map[String, Int]]//計算完成之后,將這個replay移除掉。replys -= f}Thread.sleep(500)}println(results.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2)))} }總結(jié)
以上是生活随笔為你收集整理的Scala Actor,receive不断接收消息,react复用线程,结合case class的actor,Future使用,使用Actor进行wordCount的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 发动机万有特性标定属于什么标定?
- 下一篇: Scala柯里化