Akka in Schedulerx2.0
1. 前言
Schedulerx2.0是阿里中間件自研的基于akka架構的新一代分布式任務調度平臺,提供定時、任務編排、分布式跑批等功能,具有高可靠、海量任務、秒級調度等能力。
本篇文章以Schedulerx2.0為例子,介紹akka的應用場景,希望能給同樣從事分布式系統(tǒng)開發(fā)的同學一些啟發(fā)。這里不詳細介紹akka,初學者可以直接閱讀官方文檔。
2. Reactive
說到近幾年火熱的反應式編程,誰都能說幾句“異步、并發(fā)、非阻塞、高性能”等等,說到有代表性的項目,大家都知道RxJava、Akka、Reactor。
Why Reactive?
——因為Schedulerx2.0作為任務調度平臺,支持海量任務調度,提供任務狀態(tài)機感知任務狀態(tài)變化,需要Reactive的特性。
Why Akka?
——首先akka很簡單,每個actor只需要實現一個onReceive方法。其次,Akka真的非常強大!我們可以看下官方文檔,Akka幾乎提供了一整套解決方案,使用akka可以很方便的實現一套高可靠、高并發(fā)、高性能的分布式系統(tǒng)。Schedulerx2.0也只用到了akka生態(tài)圈里的一小部分功能:
- akka-actor
- akka-eventbus:實現高性能工作流引擎
- akka-remoting:實現進程間通信
- akka-persistence:實現消息的At-Least-Once Delivery
3. Akka-actor in Schedulerx2.0
Schedulerx2.0支持百萬級別任務,一天上億次調度,從架構上來說,主要是
server無狀態(tài),可水平擴展
基于akka-actor模型,單機性能高
Schedulerx2.0提供任務狀態(tài)機,如下圖
當有海量任務匯報任務狀態(tài),單線程肯定是處理不過來的。如果用線程池又會遇到并發(fā)問題,比如當前按順序收到如下消息:
msg1: Instance=100 running
msg2: Instance=101 running
msg3: Instance=102 failed
msg4: Instance=101 success
msg5: Instance=100 failed
有可能instance=100先變成failed,最后變成running,導致狀態(tài)機錯誤。
通過Akka-actor架構的模型,可以很容易處理這種場景:
如上圖所示,JobInstanceRoutingActor作為路由actor,用來轉發(fā)消息。下面掛載了很多jobInstanceActor,用來真實處理消息。
所有instance狀態(tài)的消息都發(fā)給JobInstanceRoutingActor,路由actor會把同一個instanceId的消息發(fā)給同一個jobInstanceActor,akka能保證一個actor按照消息接收的順序來處理消息,以此又能保證整個狀態(tài)機消息的順序性。
Schedulerx2.0中,大量采用了上面這種模型,來支撐job/workflow/instance等消息的傳遞。
4. 基于Akka-eventbus的Pub-Sub模式
在異步處理場景中,當然少不了Pub-Sub模式。相信很多人都用過guava的eventbus,可以很簡單很優(yōu)雅的實現一套基于事件驅動的解決方案。通過@Subscribe注解就能注冊要訂閱的事件,通過@AllowConcurrentEvents注解還能設置并發(fā)消費事件。但是guava-eventbus在實現并發(fā)消費事件的時候非常暴力,公用一個線程池。這在Schedulerx2.0的應用場景中不太合適,比如某個job觸發(fā)頻率特別高,可能整個線程池都被他占滿了,造成其他job餓死。
在項目中大量使用actor模型之后,如果使用原生的actor通信會發(fā)現很困難,因為得知道actor的地址才能和他通信。如果有些actor要給多個actor發(fā)送消息,你的項目就會變成一個網狀的結構,新增一個actor經常會漏掉一些通信。這個時候我們就會想到Pub-Sub模式,所有actor通信只需要給事件總線發(fā)送消息,每個actor只需要訂閱自己的事件就好了。
如上圖所示,定時調度器、工作流引擎、任務狀態(tài)機等大部分模塊,都由akka-eventbus進行管理,每個模塊都是第四節(jié)定義的路由actor+業(yè)務actor的模型。通過該模型,相同的job交給同一個actor處理,不會堵塞其他actor,同樣解決了上文提到的guava-eventbus公用線程池的問題。實現類圖如下:
5. 兩行代碼實現進程間通信
Schedulerx2.0是Server-Worker的架構,server和worker,worker和worker都需要進行通信,使用akka-remoting可以很容易實現任意2個進程之間的通信。
Akka-remoting是peer-to-peer的通信方式,每個節(jié)點都會暴露一個遠程地址,其他節(jié)點只要知道地址,就能進行遠程通信。Akka-remoting也抽象成一個actor,會讓你的程序保持高度的一致,只不過這個actor的地址是遠程的地址而已。Akka-remoting支持多種協(xié)議,使用起來非常簡單,以netty-tcp為例,首先我們在server端定義一個配置文件akka-server.conf
akka {actor {provider = "akka.remote.RemoteActorRefProvider"}remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {port = 52014}} }Server只需要2行代碼就可以起一個remote actor
ActorSystem actorSystem = ActorSystem.create("server", akkaConfig); actorSystem.actorOf(HelloActor.props(), "hello");Worker也只需要2行代碼就能實現和server通信
ActorSelection helloSelection = context.actorSelection("akka.tcp://server@xx.xx.xx.xx:52014/user/hello"); helloSelection.tell("hello",getSelf());對比Schedulerx1.0使用原生netty框架通信需要如下這么多代碼
怎么樣,使用akka進行遠程通信,是不是非常簡單和優(yōu)雅^^
6. 消息At-Least-Once Delivery
Akka默認的消息傳遞是最多傳遞一次,即通過tell,如果發(fā)送失敗,不會重發(fā)。At-Least-Once Delivery,提供了一個消息至少傳遞一次的語義,即保證不丟!這在Schedulerx2.0中很多場景是非常需要的,比如某個實例在worker執(zhí)行成功了,匯報成功的時候server正好重啟了導致匯報失敗,會造成工作流下游都卡住沒法繼續(xù)執(zhí)行。
使用At-Least-Once Delivery要繼承UntypedPersistentActorWithAtLeastOnceDelivery(akka-2.4.x)或者AbstractPersistentActorWithAtLeastOnceDelivery(akka-2.5.x)。Akka在2.5.x為了擁抱函數式編程,只支持java8,并用了很多stream的接口,所以接口和2.4.x已經大大不一樣了。在Schedulerx2.0中,worker主要是給用戶用的,為了兼容低版本的jdk,所以用了2.4.x版本的UntypedPersistentActorWithAtLeastOnceDelivery。
UntypedPersistentActorWithAtLeastOnceDelivery繼承UntypedPersistentActor和AtLeastOnceDelivery。
- UntypedPersistentActor:提供了持久化的actor,對消息持久化、恢復等能力。
- AtLeastOnceDelivery:主要是deliver、confirmDelivery(long deliveryId)兩個接口。
AtLeastOnceDelivery的原理非常簡單,worker向server匯報狀態(tài)的時候,tell改為deliver,deliver會自動生成一個deliveryId,封裝進request發(fā)送給server,server需要實現把deliveryId封裝到response中并返回給worker,worker收到response的時候調用confiremDelivery,會從unconfirmed列表中移除這個deliveryId的request,否則AtLeastOnceDelivery會有一個timer,定期重試這條request。如下圖
原文鏈接
本文為云棲社區(qū)原創(chuàng)內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的Akka in Schedulerx2.0的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解读 Knative Eventing
- 下一篇: CICD联动阿里云容器服务Kuberne