Java并发57:Akka Actors并发框架浅谈及入门示例
[超級鏈接:Java并發學習系列-緒論]
本章對Actors并發框架進行初步的介紹和入門示例的演示,關于其更深層次的內容,以后會系統性的進行學習。
1.Actors并發模型簡介
Actors并發模型是計算機科學領域中的一個并行計算模型,它把actors當做通用的并行計算原語。
一個actor對接收到的消息做出響應,進行本地決策,可以創建更多的actor,或者發送更多的消息;同時準備接收下一條消息。
在Actor理論中,一切都被認為是actor,這和面向對象語言里一切都被看成對象很類似。
但包括面向對象語言在內的軟件通常是順序執行的,而Actor模型本質上則是并發的。
Actors模型對并發模型進行了更高的抽象,它是一種異步、非阻塞、高性能的輕量級事件驅動編程模型。所謂輕量級事件指的是這些acotr內存消耗極小,1GB內存可容納百萬級別個Actor。Actors模型適用于可以用于高并發、分布式場景。
JDK本身并沒有提供對Actors并發模型的實現,不過其他的開發庫中已經實現并且被廣泛應用,例如本章的學習對象:Akka。
2.Akka Actors模型簡介
Akka Actors官方文檔地址:https://doc.akka.io/docs/akka/current/index-actors.html
Akka Actors的模型圖如下:
總結:
- Actor與Actor之前只能用消息進行通信。
- 每個Actor都有一個郵箱。
- Actor與Actor之間并不是直接通過消息通信,而是將消息發送至此Actor的郵箱MailBox。
- 消息在郵箱MailBox是有序的。
- 每個Actor在處理單個消息時都是串行的。
- Actor中的消息是不可變的。
- 消息的傳遞不保證絕對可靠。
Akka Actors模型的優勢:
- 輕量級事件比線程的粒度小得多,意味著可以在程序中使用大量的Actor。
- Akka提供了一套容錯機制,允許在出錯時進行一些恢復或者重置操作。
- Akka依靠其透明的遠程Actor定位服務,保證了其分布式并發環境中的可用性。
3.Akka Actors重要概念
下面介紹Akka Actors模型的幾個重要概念:
1.Actor
Actor即角色,前面已經說過,Akka Actors模型將actors當做通用的并行計算原語,所以Actor是必不可少的。
Actor總結:
- Akka Actor的組織結構是一種樹形結。
- 因為Actor是樹形組織,所以Actor的路徑類似于文件的路徑。
- 每個Actor都有父級,有可能有子級當然也可能沒有。
- 父級Actor給其子級Actor分配資源,任務,并管理其的生命狀態(監管和監控)。
- 如果我們知道一個遠程Actor的具體位置,那么我們就可以向他發送消息。
- 一個本地Actor的路徑:akka://search-system/user/master
- 一個遠程Actor的路徑:akka.tcp://search-system@host.example.com:5678/user/master
2.ActorSystem
ActorSystem即角色系統,為了統一的調度和管理系統中的眾多actors,我們需要首先定義一個ActorSystem。
ActorSystem的主要功能有三個:
3.ActorRef
ActorRef即角色引用,每個Actor有唯一的ActorRef,Actor引用可以看成是Actor的代理,與Actor打交道都需要通過Actor引用。
4.Akka Actors入門編程流程
下面簡述一下Akka Actors的入門級的編程流程:
5.編程實例
下面以兩個簡單的例子來展示Actor并發編程的基本流程:
- Hello World
- 最快搜索:通過多個搜索引擎查詢某個條件,返回最快的查詢結果。
5.1.Hello World
場景說明:
- 定義一個消息類,用于存放被歡迎的內容。
- 定義一個角色類,用于歡迎接收到的消息。
實例代碼:
消息類:
/*** <p>Hello的消息類</p>* @author hanchao 2018/4/16 21:34**/ public class HelloMessage{/** 歡迎的對象 */private String name;//getter setter constructor toString }角色類:
/*** <p>Actor框架入門示例-HelloWorld的角色</p>** @author hanchao 2018/4/16 21:07**/ public class HelloActor extends UntypedAbstractActor {//定義日志,很重要LoggingAdapter log = Logging.getLogger(getContext().getSystem(),this);/*** <p>重寫接收方法</p>* @author hanchao 2018/4/16 21:31**/@Overridepublic void onReceive(Object message){log.info("HelloActor receive message : " + message);//如果消息類型是HelloMessage,則進行處理if (message instanceof HelloMessage){log.info("Hello " + ((HelloMessage) message).getName() + "!");}} }測試類:
/*** <p>Actor入門示例</p>* @author hanchao 2018/4/16 21:37**/ public static void main(String[] args) {//創建actor系統ActorSystem system = ActorSystem.create("hello-system");//定義Actor引用ActorRef helloActor = system.actorOf(Props.create(HelloActor.class),"hello-actor");//向HelloActor發送消息helloActor.tell(new HelloMessage("World"),null);helloActor.tell(new HelloMessage("Akka Actor"),null);//終止Actor系統system.terminate(); }運行結果:
[INFO] [04/18/2018 22:37:55.744] [hello-system-akka.actor.default-dispatcher-2] [akka://hello-system/user/hello-actor] HelloActor receive message : HelloMessage{name='World'} [INFO] [04/18/2018 22:37:55.744] [hello-system-akka.actor.default-dispatcher-2] [akka://hello-system/user/hello-actor] Hello World! [INFO] [04/18/2018 22:37:55.744] [hello-system-akka.actor.default-dispatcher-2] [akka://hello-system/user/hello-actor] HelloActor receive message : HelloMessage{name='Akka Actor'} [INFO] [04/18/2018 22:37:55.744] [hello-system-akka.actor.default-dispatcher-2] [akka://hello-system/user/hello-actor] Hello Akka Actor!5.2.最快搜索
場景說明:
通過多個搜索引擎查詢某個條件,返回最快的查詢結果。
- 定義一個消息類,用于存放搜索條件。
- 定義一個消息類,用于存放搜索結果。
- 定義一個角色類,用于接收搜索條件、搜索和返回搜索結果。
- 定義一個角色類,用于派發搜索任務和接收搜索結果。
實例代碼:
搜索引擎工具類EngineUtils :模擬搜索引擎列表和搜索過程。
/*** 搜索引擎工具類* Created by 韓超 on 2018/3/6.*/ public class EngineUtils {private final static Logger LOGGER = Logger.getLogger(EngineUtils.class);//搜索引擎列表private static List<String> engineList;static {engineList = new ArrayList<>();engineList.add("百度");engineList.add("Google");engineList.add("必應");engineList.add("搜狗");engineList.add("Redis");engineList.add("Solr");}/*** <p>Title: 模擬一個搜索引擎進行一次問題查詢</p>* @author 韓超 2018/3/6 11:20*/public static String searchByEngine(String question,String engine) throws InterruptedException {//獲取隨機的時間間隔int interval = RandomUtils.nextInt(1,5000); // LOGGER.info("搜索引擎[" + engine + "]正在查詢,預計用時" + interval + "毫秒...");//當前線程休眠指定時間,模擬搜索引擎用時Thread.sleep(interval);return "通過搜索引擎[" + engine + "],首先查到關于(" + question + ")問題的結果,用時 = " + interval + "毫秒!";}public static List<String> getEngineList() {return engineList;}public static void setEngineList(List<String> engineList) {EngineUtils.engineList = engineList;} }搜索條件消息類QueryTerms ,用于存儲搜索條件:
/** * <p>Title: 定義查詢條件類,用于傳遞消息</p>** @author 韓超 2018/3/6 16:16*/ static class QueryTerms {/*** 問題*/private String question;/*** 搜索引擎*/private String engine;//getter setter toString constructor }搜索結果消息類QueryResult ,用于存放搜索結果:
/** * <p>Title: 定義查詢結果類,用于消息傳遞</p>** @author 韓超 2018/3/6 16:17*/ static class QueryResult {/*** 查詢結果*/private String result;//getter setter toString constructor }搜索角色類SearchEngineAcotr ,用于接收搜索條件,調用搜索引擎進行搜索:
/*** <p>Title:搜索引擎Actor </br>* 繼承UntypedAbstractActor成為一個Actor</p>** @author 韓超 2018/3/6 14:42*/ static class SearchEngineAcotr extends UntypedAbstractActor {//定義Actor日志private LoggingAdapter log = Logging.getLogger(getContext().getSystem(),this);/*** <p>Title: Actor都需要重寫消息接收處理方法</p>** @author 韓超 2018/3/6 14:42*/@Overridepublic void onReceive(Object message) throws Throwable {//如果消息是指定的類型Message,則進行處理,否則不處理if (message instanceof QueryTerms) {log.info("接收到搜索條件:" + ((QueryTerms) message).getQuestion());//通過工具類進行一次搜索引擎查詢String result = EngineUtils.searchByEngine(((QueryTerms) message).getQuestion(), ((QueryTerms) message).getEngine());//通過getSender().tell(result,actor)將actor的 處理結果[result] 發送消息的發送者[getSender()]//通過getSender獲取消息的發送方//通過getSelf()獲取當前ActorgetSender().tell(new QueryResult(result), getSelf());} else {unhandled(message);}} }主角色類QuestionQuerier ,用于分發搜索任務和接收搜索結果:
/** * <p>Title: 問題查詢器Actor</br>* 繼承自UntypedAbstractActor</p>** @author 韓超 2018/3/6 16:31*/ static class QuestionQuerier extends UntypedAbstractActor {//定義Actor日志private LoggingAdapter log = Logging.getLogger(getContext().getSystem(),this);/*** 搜索引擎列表*/private List<String> engines;/*** 搜索結果*/private AtomicReference<String> result;/*** 問題*/private String question;public QuestionQuerier(String question, List<String> engines, AtomicReference<String> result) {this.question = question;this.engines = engines;this.result = result;}/*** <p>Title: Actor都需要重寫消息接收處理方法</p>** @author 韓超 2018/3/6 16:35*/@Overridepublic void onReceive(Object message) throws Throwable {//如果收到查詢結果,則對查詢結果進行處理if (message instanceof QueryResult) {//如果消息是指定的類型Result,則進行處理,否則不處理log.info("接收到搜索結果:" + ((QueryResult) message).getResult());//通過CAS設置原子引用的值result.compareAndSet(null, ((QueryResult) message).getResult());//如果已經查詢到了結果,則停止Actor//通過getContext()獲取ActorSystem的上下文環境//通過getContext().stop(self())停止當前ActorgetContext().stop(self());} else {//如果沒有收到處理結果,則創建搜索引擎Actor進行查詢log.info("開始創建搜索引擎進行查詢");//使用原子變量去測試Actor的創建是否有序AtomicInteger count = new AtomicInteger(1);//針對每一個搜索引擎,都創建一個Actorfor (String engine : engines) {log.info("為" + engine + "創建第" + count + "個搜索引擎Actor....");count.getAndIncrement();//通過actorOf(Props,name)創建Actor//通過Props.create(Actor.class)創建PropsActorRef fetcher = this.getContext().actorOf(Props.create(SearchEngineAcotr.class), "fetcher-" + engine.hashCode());//創建查詢條件QueryTerms msg = new QueryTerms(question, engine);//將查詢條件告訴Actorfetcher.tell(msg, self());}}} }測試代碼:
/*** <p>Title:通過多個搜索引擎查詢多個條件,并返回第一條查詢結果 </p>** @author 韓超 2018/3/6 14:15*/ public static void main(String[] args) {//通過工具類獲取搜索引擎列表List<String> engines = EngineUtils.getEngineList();//通過 Actor 進行并發查詢,獲取最先查到的答案String result = new FlavorActorDemo().getFirstResult("今天你吃了嗎?", engines);//打印結果 }/*** 通過多個搜索引擎查詢,并返回第一條查詢結果** @param question 查詢問題* @param engines 查詢條件數組* @return 最先查出的結果* @author 韓超 2018/3/6 16:44*/ @Override public String getFirstResult(String question, List<String> engines) {//創建一個Actor系統ActorSystem system = ActorSystem.create("search-system");//創建一個原子引用用于保存查詢結果AtomicReference<String> result = new AtomicReference<>();//通過靜態方法,調用Props的構造器,創建Props對象Props props = Props.create(QuestionQuerier.class, question, engines, result);//通過system.actorOf(props,name)創建一個 問題查詢器Actorfinal ActorRef querier = system.actorOf(props, "master");//告訴問題查詢器開始查詢querier.tell(new Object(), ActorRef.noSender());//通過while無限循環 等待actor進行查詢,知道產生結果while (null == result.get()) ;//關閉 Actor系統system.terminate();//返回結果return result.get(); }運行結果(某一次):
[INFO] [04/18/2018 23:10:11.949] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 開始創建搜索引擎進行查詢 [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為百度創建第1個搜索引擎Actor.... [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為Google創建第2個搜索引擎Actor.... [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為必應創建第3個搜索引擎Actor.... [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-6] [akka://search-system/user/master/fetcher-964584] 接收到搜索條件:今天你吃了嗎? [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為搜狗創建第4個搜索引擎Actor.... [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為Redis創建第5個搜索引擎Actor.... [INFO] [04/18/2018 23:10:11.959] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master] 為Solr創建第6個搜索引擎Actor.... [INFO] [04/18/2018 23:10:11.969] [search-system-akka.actor.default-dispatcher-7] [akka://search-system/user/master/fetcher-2138589785] 接收到搜索條件:今天你吃了嗎? [INFO] [04/18/2018 23:10:11.969] [search-system-akka.actor.default-dispatcher-2] [akka://search-system/user/master/fetcher-2582786] 接收到搜索條件:今天你吃了嗎? [INFO] [04/18/2018 23:10:11.969] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master/fetcher-784239] 接收到搜索條件:今天你吃了嗎? [INFO] [04/18/2018 23:10:11.969] [search-system-akka.actor.default-dispatcher-3] [akka://search-system/user/master/fetcher-78837083] 接收到搜索條件:今天你吃了嗎? [INFO] [04/18/2018 23:10:11.975] [search-system-akka.actor.default-dispatcher-4] [akka://search-system/user/master/fetcher-823867] 接收到搜索條件:今天你吃了嗎? [INFO] [04/18/2018 23:10:13.255] [search-system-akka.actor.default-dispatcher-11] [akka://search-system/user/master] 接收到搜索結果:通過搜索引擎[搜狗],首先查到關于(今天你吃了嗎?)問題的結果,用時 = 1279毫秒! [INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-2582786#-286481483] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-964584#166806804] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-2138589785#376820931] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-78837083#-224726121] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [04/18/2018 23:10:15.504] [search-system-akka.actor.default-dispatcher-5] [akka://search-system/user/master] Message [pers.hanchao.flavors.FlavorActorDemo$QueryResult] from Actor[akka://search-system/user/master/fetcher-784239#1622650486] to Actor[akka://search-system/user/master#-1184484885] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.參考文獻
[1] Akka系列(一):Akka簡介與Actor模型
[2] 漫談并發編程:Actor模型
[3] Java并發的四種風味:Thread、Executor、ForkJoin和Actor
總結
以上是生活随笔為你收集整理的Java并发57:Akka Actors并发框架浅谈及入门示例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 视频属性介绍
- 下一篇: 高级程序员解决问题的思维模式和普通程序员