Dapr + .NET 实战(五)Actor
什么是Actor模式
Actors 為最低級別的“計算單元”
以上解釋來自官方文檔,看起來“晦澀難懂”。大白話就是說Actors模式是一段需要單線程執行的代碼塊。
實際開發中我們經常會有一些邏輯不能并發執行,我們常用的做法就是加鎖,例如:
lock(obj) {//dosomething... }或者用Redis等中間件,為分布式應用加一些分布式鎖。遺憾的是,使用顯式鎖定機制容易出錯。它們很容易導致死鎖,并可能對性能產生嚴重影響。Actors模式為單線程邏輯提供了一種更好的選擇。
什么時候用Actors
需要單線程執行,比如需要加lock
邏輯可以被劃分為小的執行單元
工作原理
Dapr啟動app時,Sidecar調用Actors獲取配置信息,之后Sidecar將Actors的信息發送到安置服務(Placement Service),安置服務會將不同的Actor類型根據其Id和Actor類型分區,并將Actor信息廣播到所有dapr實例。
?在客戶端調用某個Actor時,安置服務會根據其Id和Actor類型,找到其所在的dapr實例,并執行其方法。
調用Actors方法
POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/method/<method><actorType>:執行組件類型。
<actorId>:要調用的特定參與者的 ID。
<method>:要調用的方法
計時器Timers和提醒器Reminders
Actor可以設置timer和reminder設置執行Actor的時間,有點像我們常用的定時任務。但是timer和reminder也存在不同。
timer只作用于激活狀態的Actor。一個Actor長期不被調用,其自己的空閑計時器會逐漸累積,到一定時間后會被Dapr銷毀,timer沒法作用于已銷毀的Actor。
reminder則可以作用于所有狀態的Actor。主要方式是重置空閑計時器,使其處于活躍狀態
操作timer
POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>到期時間(due time)表示注冊后 timer 將首次觸發的時間。?period?表示timer在此之后觸發的頻率。到期時間為0表示立即執行。負 due times 和負 periods 都是無效。
下面的請求體配置了一個 timer,?dueTime?9秒,?period?3秒。這意味著它將在9秒后首次觸發,然后每3秒觸發一次。
{"dueTime":"0h0m9s0ms","period":"0h0m3s0ms" }下面的請求體配置了一個 timer,?dueTime?0秒,?period?3秒。這意味著它將在注冊之后立即觸發,然后每3秒觸發一次。
{"dueTime":"0h0m0s0ms","period":"0h0m3s0ms" }操作reminder
POST/PUT/GET/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>?到期時間(due time)表示注冊后 reminders將首次觸發的時間。?period?表示在此之后 reminders 將觸發的頻率。到期時間為0表示立即執行。負 due times 和負 periods 都是無效。若要注冊僅觸發一次的 reminders ,請將 period 設置為空字符串。
下面的請求體配置了一個 reminders,?dueTime?9秒,?period?3秒。這意味著它將在9秒后首次觸發,然后每3秒觸發一次。
{"dueTime":"0h0m9s0ms","period":"0h0m3s0ms" }下面的請求體配置了一個 reminders,?dueTime?0秒,?period?3秒。這意味著它將在注冊之后立即觸發,然后每3秒觸發一次。
{"dueTime":"0h0m0s0ms","period":"0h0m3s0ms" }下面的請求體配置了一個 reminders,?dueTime?15秒,?period?空字符串。這意味著它將在15秒后首次觸發,之后就不再被觸發。
{"dueTime":"0h0m15s0ms","period":"" }數據持久化
使用 Dapr?狀態管理構建塊保存執行組件狀態。由于執行組件可以一輪執行多個狀態操作,因此狀態存儲組件必須支持多項事務。撰寫本文時,以下狀態存儲支持多項事務:
Azure Cosmos DB
MongoDB
MySQL
PostgreSQL
Redis
RethinkDB
SQL Server
若要配置要與執行組件一起使用的狀態存儲組件,需要將以下元數據附加到狀態存儲配置:
- name: actorStateStorevalue: "true"win10自承載模式下已默認設置此項 C:\Users\<username>\.dapr\components\statestore.yaml
項目實例
Actor操作
下面將通過一個審核流程的例子來演示。
還是用前面的FrontEnd項目,引入nuget包Dapr.Actors和Dapr.Actors.AspNetCore。
?定義IOrderStatusActor接口,需要繼承自IActor
using Dapr.Actors;using System.Threading.Tasks;namespace FrontEnd.ActorDefine {public interface IOrderStatusActor : IActor{Task<string> Paid(string orderId);Task<string> GetStatus(string orderId);} }定義OrderStatusActor實現IOrderStatusActor,并繼承自Actor
using Dapr.Actors.Runtime;using System.Threading.Tasks;namespace FrontEnd.ActorDefine {public class OrderStatusActor : Actor, IOrderStatusActor{public OrderStatusActor(ActorHost host) : base(host){}public async Task<string> Paid(string orderId){// change order status to paidawait StateManager.AddOrUpdateStateAsync(orderId, "init", (key, currentStatus) => "paid");return orderId;}public async Task<string> GetStatus(string orderId){return await StateManager.GetStateAsync<string>(orderId);}} }需要注意的是,執行組件方法的返回類型必須為 Task 或 Task<T>?。此外,執行組件方法最多只能有一個參數。返回類型和參數都必須可 System.Text.Json 序列化。
Actor的api是必需的,因為 Dapr 挎斗調用應用程序來承載和與執行組件實例進行交互,所以在Startup的Configure中配置
app.UseEndpoints(endpoints =>{endpoints.MapActorsHandlers();// .......});Startup類是用于注冊特定執行組件類型的位置。在ConfigureServices?注冊?services.AddActors?:
services.AddActors(options =>{options.Actors.RegisterActor<OrderStatusActor>();});為測試這個Actor,需要定義一個接口調用,新增ActorController
using Dapr.Actors; using Dapr.Actors.Client;using FrontEnd.ActorDefine;using Microsoft.AspNetCore.Mvc;using System.Threading.Tasks;namespace FrontEnd.Controllers {[Route("[controller]")][ApiController]public class ActorController : ControllerBase{[HttpGet("paid/{orderId}")]public async Task<ActionResult> PaidAsync(string orderId){var actorId = new ActorId("myid-"+orderId);var proxy = ActorProxy.Create<IOrderStatusActor>(actorId, "OrderStatusActor");var result = await proxy.Paid(orderId);return Ok(result);}} }ActorProxy.Create?為創建代理實例。?Create方法采用兩個參數:標識特定執行組件和執行組件?ActorId?類型。它還具有一個泛型類型參數,用于指定執行組件類型所實現的執行組件接口。由于服務器和客戶端應用程序都需要使用執行組件接口,它們通常存儲在單獨的共享項目中。
啟動FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll下面通過postman測試下,調用成功
?查看redis中的數據
127.0.0.1:6379> keys *1) "test_topic"2) "frontend||guid"3) "frontend||name"5) "newOrder"6) "frontend||OrderStatusActor||myid-123||123"7) "myapp2||key2"8) "myapp2||key1"9) "deathStarStatus" 10) "myapp||name" 127.0.0.1:6379> hgetall frontend||OrderStatusActor||myid-123||123 1) "data" 2) "\"init\"" 3) "version" 4) "1"可以發現actor數據的命名規則是appName||ActorName||ActorId||key
同樣可以使用注入的方式創建proxy,ActorController中注入IActorProxyFactory
private readonly IActorProxyFactory _actorProxyFactory;public ActorController(IActorProxyFactory actorProxyFactory){_actorProxyFactory = actorProxyFactory;}新增獲取數據接口
[HttpGet("get/{orderId}")]public async Task<ActionResult> GetAsync(string orderId){var proxy = _actorProxyFactory.CreateActorProxy<IOrderStatusActor>(new ActorId("myid-" + orderId),"OrderStatusActor");return Ok(await proxy.GetStatus(orderId));}重新啟動FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dllpostman測試
?Timer操作
使用Actor基類的?RegisterTimerAsync?方法計劃計時器。在OrderStatusActor類中新增方法
public Task StartTimerAsync(string name, string text){return RegisterTimerAsync(name,nameof(TimerCallbackAsync),Encoding.UTF8.GetBytes(text),TimeSpan.Zero,TimeSpan.FromSeconds(3));}public Task TimerCallbackAsync(byte[] state){var text = Encoding.UTF8.GetString(state);_logger.LogInformation($"Timer fired: {text}");return Task.CompletedTask;}StartTimerAsync方法調用?RegisterTimerAsync?來計劃計時器。?RegisterTimerAsync?采用五個參數:
計時器的名稱。
觸發計時器時要調用的方法的名稱。
要傳遞給回調方法的狀態。
首次調用回調方法之前要等待的時間。
回調方法調用之間的時間間隔??梢灾付?以?TimeSpan.FromMilliseconds(-1)?禁用定期信號。
在OrderStatusActor構造方法中調用StartTimerAsync
StartTimerAsync("test-timer", "this is a test timer").ConfigureAwait(false).GetAwaiter().GetResult();重新啟動FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll通過調用paid接口實例化一個Actor,即可開啟timer
?查看控制臺,timer觸發成功
== APP == info: FrontEnd.ActorDefine.OrderStatusActor[0] == APP == Timer fired: this is a test timerTimerCallbackAsync方法以二進制形式接收用戶狀態。在示例中,回調在將狀態寫入日志之前將狀態?string?解碼回 。
可以通過調用 來停止計時器?UnregisterTimerAsync?:
public Task StopTimerAsync(string name){return UnregisterTimerAsync(name);}Reminder操作
使用Actor基類的 RegisterReminderAsync 方法計劃計時器。在OrderStatusActor類中新增方法
public Task SetReminderAsync(string text){return RegisterReminderAsync("test-reminder",Encoding.UTF8.GetBytes(text),TimeSpan.Zero,TimeSpan.FromSeconds(1));}public Task ReceiveReminderAsync(string reminderName, byte[] state,TimeSpan dueTime, TimeSpan period){if (reminderName == "test-reminder"){var text = Encoding.UTF8.GetString(state);Logger.LogWarning($"reminder fired: {text}");}return Task.CompletedTask;}RegisterReminderAsync方法類似于?RegisterTimerAsync?,但不必顯式指定回調方法。如上面的示例所示,實現?IRemindable.ReceiveReminderAsync?以處理觸發的提醒。
public class OrderStatusActor : Actor, IOrderStatusActor, IRemindableReceiveReminderAsync觸發提醒時調用 方法。它采用 4 個參數:
提醒的名稱。
注冊期間提供的用戶狀態。
注冊期間提供的調用到期時間。
注冊期間提供的調用周期。
在OrderStatusActor構造方法中調用SetReminderAsync
SetReminderAsync("this is a test reminder").ConfigureAwait(false).GetAwaiter().GetResult();重新啟動FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll通過調用paid接口實例化一個Actor,即可開啟reminder
?查看控制臺,reminder觸發成功
== APP == warn: FrontEnd.ActorDefine.OrderStatusActor[0] == APP == reminder fired: this is a test reminder 相關文章:Dapr實戰(一) 基礎概念與環境搭建
Dapr + .NET Core實戰(二) 服務調用
Dapr + .NET Core實戰(三)狀態管理
Dapr + .NET 實戰(四)發布和訂閱
總結
以上是生活随笔為你收集整理的Dapr + .NET 实战(五)Actor的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何优化 .NET Core 中的 la
- 下一篇: .NET 排序 Array.SortT