分布式应用框架Akka快速入门
轉(zhuǎn)自:分布式應用框架Akka快速入門_jmppok的專欄-CSDN博客_akka
本文結(jié)合網(wǎng)上一些資料,對他們進行整理,摘選和翻譯而成,對Akka進行簡要的說明。引用資料在最后列出。
1.什么是Akka
Akka 是一個用 Scala 編寫的庫,用于簡化編寫容錯的、高可伸縮性的 Java 和 Scala 的 Actor 模型應用。
官方網(wǎng)站 (http://akka.io/)的介紹是:
Akka is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on the JVM.
Build powerful concurrent & distributed applications?more easily.
翻譯成中文就是:Akka是一個開發(fā)庫和運行環(huán)境,可以用于構(gòu)建高并發(fā)、分布式、可容錯、事件驅(qū)動的基于JVM的應用。使構(gòu)建高并發(fā)的分布式應用更加容易。
Akka可以以兩種不同的方式來使用
- 以庫的形式:在web應用中使用,放到?WEB-INF/lib?中或者作為一個普通的Jar包放進classpath。
- 以微內(nèi)核的形式:你可以將應用放進一個獨立的內(nèi)核。
2.Akka的五大特性
1)易于構(gòu)建并行和分布式應用 (Simple Concurrency & Distribution)
????? Akka在設(shè)計時采用了異步通訊和分布式架構(gòu),并對上層進行抽象,如Actors、Futures ,STM等。
2)可靠性(Resilient by Design)
???? 系統(tǒng)具備自愈能力,在本地/遠程都有監(jiān)護。
3)高性能(High Performance)
??? 在單機中每秒可發(fā)送50000000個消息。內(nèi)存占用小,1GB內(nèi)存中可保存2500000個actors。
4)彈性,無中心(Elastic — Decentralized)
?? 自適應的負責均衡,路由,分區(qū),配置
5)可擴展(Extensible)
? 可以使用Akka 擴展包進行擴展。
3.什么場景下特別適合使用Akka?
我們看到Akka被成功運用在眾多行業(yè)的眾多大企業(yè),從投資業(yè)到商業(yè)銀行、從零售業(yè)到社會媒體、仿真、游戲和賭博、汽車和交通系統(tǒng)、數(shù)據(jù)分析等等等等。任何需要高吞吐率和低延遲的系統(tǒng)都是使用Akka的候選。
Actor使你能夠進行服務失敗管理(監(jiān)管者),負載管理(緩和策略、超時和隔離),水平和垂直方向上的可擴展性(增加cpu核數(shù)和/或增加更多的機器)管理。
下面的鏈接中有一些Akka用戶關(guān)于他們?nèi)绾问褂肁kka的描述:java - What are the best use cases for Akka framework - Stack Overflow
所有以上這些都在這個Apache2許可的開源軟件中。
以下是Akka被部署到生產(chǎn)環(huán)境中的領(lǐng)域
事務處理 (在線游戲,金融/銀行業(yè),貿(mào)易,統(tǒng)計,賭博,社會媒體,電信):垂直擴展,水平擴展,容錯/高可用性
服務后端 (任何行業(yè),任何應用):提供REST, SOAP, Cometd, WebSockets 等服務 作為消息總線/集成層 垂直擴展,水平擴展,容錯/高可用性
并發(fā)/并行 (任何應用):運行正確,方便使用,只需要將jar包添加到現(xiàn)有的JVM項目中(使用Scala,java, Groovy或jruby)
仿真:主/從,計算網(wǎng)格,MaReduce等等.
批處理 (任何行業(yè)):Camel集成來連接批處理數(shù)據(jù)源 Actor來分治地批處理工作負載
通信Hub (電信, Web媒體, 手機媒體):垂直擴展,水平擴展,容錯/高可用性
游戲與賭博 (MOM, 在線游戲, 賭博):垂直擴展,水平擴展,容錯/高可用性
商業(yè)智能/數(shù)據(jù)挖掘/通用數(shù)據(jù)處理:垂直擴展,水平擴展,容錯/高可用性
復雜事件流處理:垂直擴展,水平擴展,容錯/高可用性
4. Scala語言
Scala是一種多范式的編程語言,設(shè)計初衷是要集成面向?qū)ο缶幊毯秃瘮?shù)式編程的各種特性。
百度百科,Scala語言介紹:
Scala(Scala)_百度百科
百度文庫,Scala編程入門:
http://wenku.baidu.com/view/27cbf218964bcf84b9d57b83.html
5.Actors模型
Actor模型并非什么新鮮事物,它由Carl Hewitt于上世紀70年代早期提出,目的是為了解決分布式編程中一系列的編程問題。其特點如下:
系統(tǒng)中的所有事物都可以扮演一個Actor
Actor之間完全獨立
在收到消息時Actor所采取的所有動作都是并行的,在一個方法中的動作沒有明確的順序
Actor由標識和當前行為描述
Actor可能被分成原始(primitive)和非原始(non primitive)類別
很多開發(fā)語言都提供了原生的Actor模型。例如erlang,scala等
?
Actor,可以看作是一個個獨立的實體,他們之間是毫無關(guān)聯(lián)的。但是,他們可以通過消息來通信。
一個Actor收到其他Actor的信息后,它可以根據(jù)需要作出各種相應。消息的類型可以是任意的,消息的內(nèi)容也可以是任意的。這點有點像webservice了。只提供接口服務,你不必了解我是如何實現(xiàn)的。
一個Actor如何處理多個Actor的請求呢?它先建立一個消息隊列,每次收到消息后,就放入隊列,而它每次也從隊列中取出消息體來處理。通常我們都使得這個過程是循環(huán)的。讓Actor可以時刻處理發(fā)送來的消息。
6.示例(http://www.th7.cn/Program/java/2012/03/29/67015.shtml)
應用場景:服務端要處理大量的客戶端的請求,并且處理請求耗費較長的時間。這時就需要使用并發(fā)處理。多線程是一種方法,這里使用Akka框架處理并發(fā)。(以下代碼在Groovy1.7.5、akka-actors-1.2下運行成功)
這里有三個角色:Client、Master、Worker?
Client傻乎乎地發(fā)同步請求給Master,一直等到結(jié)果返回客戶端才離開。?
Master接收客戶端發(fā)來的請求,然后將請求交給Worker處理,處理完成之后將結(jié)果返回給Client。?
Worker負責具體的業(yè)務處理,它耗費的事件比較長。
所以這里的關(guān)鍵在于Master,如果Master線性地“接收請求――調(diào)用Worker處理得到返回結(jié)果――將結(jié)果返回”,這樣的系統(tǒng)必將歇菜。?
使用Akka可以方便地將它變成并行地。
先看看Client,模擬同時多個客戶端給Master發(fā)請求?
import akka.actor.ActorRef?
import static akka.actor.Actors.remote
class HelloClient implements Runnable {?
??? int seq?
??? String serviceName
??? HelloClient(int seq, String serviceName) {?
??????? this.seq = seq?
??????? this.serviceName = serviceName?
??? }
??? void run() {?
??????? ActorRef actor = remote().actorFor(serviceName, "10.68.15.113", 9999);?
??????? String str = "Hello--" + seq?
??????? println "請求-----${str}"?
??????? Object res = actor.sendRequestReply(str)?
??????? println "返回-----${res}"?
??? }
??? public static void main(String[] args) {?
??????? for (int i = 0; i < 5; i++) {?
??????????? Thread thread = new Thread(new HelloClient(i, "hello-service"))?
??????????? thread.start()??????? //同時啟動5個客戶端請求Master?
??????? }?
??? }?
}
真正干活的Worker:?
import akka.actor.UntypedActor
class HelloWorker extends UntypedActor {??? //Worker是一個Actor,需要實現(xiàn)onReceive方法?
??? @Override?
??? void onReceive(Object o) {?
??????? println "Worker 收到消息----" + o?
??????? if (o instanceof String) {?
??????????? String result = doWork(o)??????? //調(diào)用真實的處理方法?
??????????? getContext().replyUnsafe(result)//將結(jié)果返回給Master?
??????? }?
??? }?
??? //Worker處理其實很簡單,僅僅將參數(shù)字符串改造一下而已。只不過使其sleep了20秒,讓它變得“耗時較長”?
??? String doWork(String str) {?
??????? Thread.sleep(1000 * 20)?
??????? return "result----" + str + " 。"?
??? }?
}
負責并發(fā)調(diào)度的Master:?
import akka.actor.ActorRef?
import akka.actor.Actors?
import akka.actor.UntypedActor?
import akka.actor.UntypedActorFactory?
import akka.dispatch.Future?
import akka.dispatch.Futures?
import java.util.concurrent.Callable
class HelloMaster extends UntypedActor {?
??? @Override?
??? void onReceive(Object o) {?
??????? println "Master接收到Work消息:" + o?
??????? def clientChannel = getContext().channel()??? //客戶端鏈接Channel?
??????? //啟動worker actor?
??????? ActorRef worker = Actors.actorOf(new UntypedActorFactory() {?
??????????? public UntypedActor create() {?
??????????????? return new HelloWorker();?
??????????? }?
??????? }).start();
??????? //這里實現(xiàn)真正的并發(fā)?
??????? Future f1 = Futures.future(new Callable() {?
??????????? Object call() {?
??????????????? def result = worker.sendRequestReply(o)??????????? //將消息發(fā)給worker actor,讓Worker處理業(yè)務,同時得到返回結(jié)果
??????????????? worker.stop()?
??????????????? println "Worker Return----" + result?
??????????????? clientChannel.sendOneWay(result)??????????????? //將結(jié)果返回給客戶端?
??????????????? return result?
??????????? }?
??????? })
??????? println "Future call over"?
??? }
??? public static void main(String[] args) {??? //啟動Master進程,綁定IP、端口和服務?
??????? Actors.remote().start("10.68.15.113", 9999).register(?
??????????????? "hello-service",?
??????????????? Actors.actorOf(HelloMaster.class));?
??? }?
}
看看客戶端的調(diào)用日志?
請求-----Hello--4?
請求-----Hello--1?
請求-----Hello--3?
請求-----Hello--0?
請求-----Hello--2?
[GENERIC] [11-10-6 下午9:49] [RemoteClientConnected(akka.remote.netty.NettyRemoteSupport@195b6aad,/10.68.15.113:9999)]
[GENERIC] [11-10-6 下午9:49] [RemoteClientStarted(akka.remote.netty.NettyRemoteSupport@195b6aad,/10.68.15.113:9999)]
返回-----result----Hello--0 。?
返回-----result----Hello--1 。?
返回-----result----Hello--2 。?
返回-----result----Hello--4 。?
返回-----result----Hello--3 。
服務端的日志:?
[GENERIC] [11-10-6 下午9:49] [RemoteServerClientConnected(akka.remote.netty.NettyRemoteSupport@5a4fdf11,Some(/10.68.15.113:53462))]
Master接收到Work消息:Hello--1?
Future call over?
Master接收到Work消息:Hello--2?
Future call over?
Worker 收到消息----Hello--1?
[DEBUG]?? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] started
[DEBUG]?? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] started
Worker 收到消息----Hello--2?
Master接收到Work消息:Hello--0?
Future call over?
Master接收到Work消息:Hello--3?
Worker 收到消息----Hello--0?
[DEBUG]?? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started
Future call over?
Master接收到Work消息:Hello--4?
[DEBUG]?? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started
Worker 收到消息----Hello--3?
Future call over?
Worker 收到消息----Hello--4?
[DEBUG]?? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started
Worker 將消息Hello--1處理完成?
Worker 將消息Hello--2處理完成?
Worker Return----result----Hello--2 。?
Worker Return----result----Hello--1 。?
[DEBUG]?? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] stopping
Worker 將消息Hello--0處理完成?
[DEBUG]?? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-3] [HelloWorker] stopping
Worker Return----result----Hello--0 。?
[DEBUG]?? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-23] [HelloWorker] stopping
Worker 將消息Hello--4處理完成?
Worker 將消息Hello--3處理完成?
Worker Return----result----Hello--4 。?
Worker Return----result----Hello--3 。?
[DEBUG]?? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-11] [HelloWorker] stopping
[DEBUG]?? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-10] [HelloWorker] stopping
可以從服務端日志看到,Master接收到Work消息后onReceive就結(jié)束了(函數(shù)最后打印Future call over),一連接收了5個消息,然后Worker才收到消息并處理。最后消息處理完成好后f1的call才收到Worker Return的消息。
這里使用Future實現(xiàn)并發(fā)。
如果不使用Future:?
def result = worker.sendRequestReply(o)?????? //將消息發(fā)給worker actor?
println "Worker Return----" + result?
getContext().replyUnsafe(result)????? // 將worker返回的消息回復給客戶端?
這就成了同步處理(第一個消息處理完后才接收并處理第二個消息)。
如果在Future后調(diào)用了f1.await()或f1.get(),也成同步的了,因為await將等待worker返回后再繼續(xù)往下執(zhí)行。????????
Future f1 = Futures.future(new Callable() {?
??? Object call() {?
??????? def result = worker.sendRequestReply(o)?????? //將消息發(fā)給worker actor?
??????? worker.stop()?
??????? println "Worker Return----" + result?
??????? clientChannel.sendOneWay(result)?
??????? return result?
??? }?
})
println "Future call over" + f1.get()
服務器日志如下:?
[GENERIC] [11-10-6 下午10:06] [RemoteServerStarted(akka.remote.netty.NettyRemoteSupport@7e566633)]
[DEBUG]?? [11-10-6 下午10:06] [main] [HelloMaster] started?
[GENERIC] [11-10-6 下午10:07] [RemoteServerClientConnected(akka.remote.netty.NettyRemoteSupport@7e566633,Some(/10.68.15.113:53571))]
Master接收到Work消息:Hello--0?
[DEBUG]?? [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到消息----Hello--0?
Worker 將消息Hello--0處理完成?
[DEBUG]?? [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-5] [HelloWorker] stopping
Worker Return----result----Hello--0 。?
Future call overresult----Hello--0 。?
Master接收到Work消息:Hello--2?
Worker 收到消息----Hello--2?
[DEBUG]?? [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 將消息Hello--2處理完成?
Worker Return----result----Hello--2 。?
Future call overresult----Hello--2 。?
Master接收到Work消息:Hello--3?
[DEBUG]?? [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-10] [HelloWorker] stopping
[DEBUG]?? [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到消息----Hello--3?
Worker 將消息Hello--3處理完成?
Worker Return----result----Hello--3 。?
Future call overresult----Hello--3 。?
Master接收到Work消息:Hello--4?
[DEBUG]?? [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-14] [HelloWorker] stopping
[DEBUG]?? [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到消息----Hello--4?
Worker 將消息Hello--4處理完成?
Worker Return----result----Hello--4 。?
Future call overresult----Hello--4 。?
Master接收到Work消息:Hello--1?
[DEBUG]?? [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-18] [HelloWorker] stopping
[DEBUG]?? [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-3] [HelloWorker] started
Worker 收到消息----Hello--1?
Worker 將消息Hello--1處理完成?
Worker Return----result----Hello--1 。?
Future call overresult----Hello--1 。?
[DEBUG]?? [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-21] [HelloWorker] stopping
Master接收到Work消息:Hello--6?
[DEBUG]?? [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-24] [HelloWorker] started
Worker 收到消息----Hello--6?
Worker 將消息Hello--6處理完成?
Worker Return----result----Hello--6 。?
Future call overresult----Hello--6 。?
Master接收到Work消息:Hello--5?
[DEBUG]?? [11-10-6 下午10:09] [akka:event-driven:dispatcher:global-26] [HelloWorker] stopping
Worker 收到消息----Hello--5?
[DEBUG]?? [11-10-6 下午10:09] [akka:event-driven:dispatcher:global-24] [HelloWorker] started
需要注意的是,Akka默認使用環(huán)境變量%AKKA_HOME%/config/akka.conf配置,默認配置是client的read-timeout = 10(客戶端連接10秒后將自動斷開,這時服務端再給客戶端發(fā)消息就發(fā)布了了。報RemoteServerWriteFailed異常),可以將值設(shè)為0,將一直連著不斷開。
actor的timeout默認為5秒,也太短了,延長(不能設(shè)為0,0為總是超時).
7.參考文檔
1)?akka官網(wǎng)??? http://akka.io/?????????
2)akka簡介??????? http://blog.chinaunix.net/uid-25885064-id-3400549.html
3)Scala語言:集成面向?qū)ο蠛秃瘮?shù)式編程的特性
4)actors模型?????????? http://janeky.iteye.com/blog/1504125
5)示例????????? http://www.th7.cn/Program/java/2012/03/29/67015.shtml
6)akka資料? http://www.jdon.com/tags/10531
?
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的分布式应用框架Akka快速入门的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RTA 广告产品能力详解
- 下一篇: Akka框架