Orleans 知多少 | 3. Hello Orleans
1. 引言
是的,Orleans v3.0.0 已經(jīng)發(fā)布了,并已經(jīng)完全支持 .NET Core 3.0。所以,Orleans 系列是時(shí)候繼續(xù)了,抱歉,讓大家久等了。萬丈高樓平地起,這一節(jié)我們就先來了解下Orleans的基本使用。
2. 模板項(xiàng)目講解
在上一篇文章中,我們了解到Orleans 作為.NET 分布式框架,其主要包括三個(gè)部分:Client、Grains、Silo Host(Server)。因此,為了方便講解,創(chuàng)建如下的項(xiàng)目結(jié)構(gòu)進(jìn)行演示:
這里有幾點(diǎn)需要說明:
Orleans.Grains:類庫項(xiàng)目,用于定義Grain的接口以及實(shí)現(xiàn),需要引用?Microsoft.Orleans.CodeGenerator.MSBuild和?Microsoft.Orleans.Core.Abstractions?NuGet包。
Orleans.Server:控制臺(tái)項(xiàng)目,為 Silo 宿主提供宿主環(huán)境,需要引用?Microsoft.Orleans.Server?和?Microsoft.Extensions.Hosting?NuGet包,以及?Orleans.Grains?項(xiàng)目。
Orleans.Client:控制臺(tái)項(xiàng)目,用于演示如何借助Orleans Client建立與Orleans Server的連接,需要引用?Microsoft.Orleans.Client?和?Microsoft.Extensions.Hosting?NuGet包,同時(shí)添加?Orleans.Grains項(xiàng)目引用。
3. 第一個(gè)Grain
Grain作為Orleans的第一公民,以及Virtual Actor的實(shí)際代言人,想吃透Orleans,那Grain就是第一道坎。先看一個(gè)簡單的Demo,我們來模擬統(tǒng)計(jì)網(wǎng)站的實(shí)時(shí)在線用戶。在 Orleans.Grains添加 ISessionControl接口,主要用戶登錄狀態(tài)的管理。
public interface ISessionControlGrain : IGrainWithStringKey { Task Login(string userId); Task Logout(string userId); Task<int> GetActiveUserCount(); }可以看見Grain的定義很簡單,只需要指定繼承自IGrain的接口就好。這里面繼承自 IGrainWithStringKey,說明該Grain 的Identity Key(身份標(biāo)識(shí))為 string類型。同時(shí)需要注意的是Grain 的方法申明,返回值必須是:Task、Task、ValueTask。緊接著定義 SessionControlGrain來實(shí)現(xiàn) ISessionControlGrain接口。
public class SessionControlGrain : Grain, ISessionControlGrain { private List<string> LoginUsers { get; set; } = new List<string>(); public Task Login(string userId) { //獲取當(dāng)前Grain的身份標(biāo)識(shí)(因?yàn)镮SessionControlGrain身份標(biāo)識(shí)為string類型,GetPrimaryKeyString()); var appName = this.GetPrimaryKeyString(); LoginUsers.Add(userId); Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}"); return Task.CompletedTask; } public Task Logout(string userId) { //獲取當(dāng)前Grain的身份標(biāo)識(shí) var appName = this.GetPrimaryKey(); LoginUsers.Remove(userId); Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}"); return Task.CompletedTask; } public Task<int> GetActiveUserCount() { return Task.FromResult(LoginUsers.Count); } }實(shí)現(xiàn)也很簡單,Grain的實(shí)現(xiàn)要繼承自 Grain基類。代碼中我們定義了一個(gè) List<string>集合用于保存登錄用戶。
4. 第一個(gè)Silo Host(Server)
定義一個(gè)Silo用于暴露Grain提供的服務(wù),在 Orleans.Server.Program中添加以下代碼用于啟動(dòng)Silo Host。
static Task Main(string[] args) { Console.Title = typeof(Program).Namespace; // define the cluster configuration return Host.CreateDefaultBuilder() .UseOrleans((builder) => { builder.UseLocalhostClustering() .AddMemoryGrainStorageAsDefault() .Configure<ClusterOptions>(options => { options.ClusterId = "Hello.Orleans"; options.ServiceId = "Hello.Orleans"; }) .Configure<EndpointOptions>(options => options.AdvertisedIPAddress = IPAddress.Loopback) .ConfigureApplicationParts(parts => parts.AddApplicationPart(typeof(ISessionControlGrain).Assembly).WithReferences()); } ) .ConfigureServices(services => { services.Configure<ConsoleLifetimeOptions>(options => { options.SuppressStatusMessages = true; }); }) .ConfigureLogging(builder => { builder.AddConsole(); }) .RunConsoleAsync(); }Host.CreateDefaultBuilder():創(chuàng)建泛型主機(jī)提供宿主環(huán)境。
UseOrleans:用來配置Oleans。
UseLocalhostClustering()?:用于在開發(fā)環(huán)境下指定連接到本地集群。
Configure<ClusterOptions>:用于指定連接到那個(gè)集群。
Configure<EndpointOptions>:用于配置silo與silo、silo與client之間的通信端點(diǎn)。開發(fā)環(huán)境下可僅指定回環(huán)地址作為集群間通信的IP地址。
ConfigureApplicationParts():用于指定暴露哪些Grain服務(wù)。
以上就是開發(fā)環(huán)境下,Orleans Server的基本配置。對(duì)于詳細(xì)的配置也可以先參考Orleans Server Configuration。后續(xù)也會(huì)有專門的一篇文章來詳解。
5. 第一個(gè)Client
客戶端的定義也很簡單,主要是創(chuàng)建 IClusterClient對(duì)象建立于Orleans Server的連接。因?yàn)?IClusterClient最好能在程序啟動(dòng)之時(shí)就建立連接,所以可以通過繼承 IHostedService來實(shí)現(xiàn)。在 Orleans.Client中定義 ClusterClientHostedService繼承自 IHostedService。
public class ClusterClientHostedService : IHostedService { public IClusterClient Client { get; } private readonly ILogger<ClusterClientHostedService> _logger; public ClusterClientHostedService(ILogger<ClusterClientHostedService> logger, ILoggerProvider loggerProvider) { _logger = logger; Client = new ClientBuilder() .UseLocalhostClustering() .Configure<ClusterOptions>(options => { options.ClusterId = "Hello.Orleans"; options.ServiceId = "Hello.Orleans"; }) .ConfigureLogging(builder => builder.AddProvider(loggerProvider)) .Build(); } public Task StartAsync(CancellationToken cancellationToken) { var attempt = 0; var maxAttempts = 100; var delay = TimeSpan.FromSeconds(1); return Client.Connect(async error => { if (cancellationToken.IsCancellationRequested) { return false; } if (++attempt < maxAttempts) { _logger.LogWarning(error, "Failed to connect to Orleans cluster on attempt {@Attempt} of {@MaxAttempts}.", attempt, maxAttempts); try { await Task.Delay(delay, cancellationToken); } catch (OperationCanceledException) { return false; } return true; } else { _logger.LogError(error, "Failed to connect to Orleans cluster on attempt {@Attempt} of {@MaxAttempts}.", attempt, maxAttempts); return false; } }); } public async Task StopAsync(CancellationToken cancellationToken) { try { await Client.Close(); } catch (OrleansException error) { _logger.LogWarning(error, "Error while gracefully disconnecting from Orleans cluster. Will ignore and continue to shutdown."); } } }代碼講解:
1.構(gòu)造函數(shù)中通過借助?ClientBuilder()?來初始化?IClusterClient。其中?UseLocalhostClustering()用于連接到開發(fā)環(huán)境中的localhost 集群。并通過?Configure<ClusterOptions>指定連接到哪個(gè)集群。(需要注意的是,這里的ClusterId必須與Orleans.Server中配置的保持一致。
Client = new ClientBuilder() .UseLocalhostClustering() .Configure<ClusterOptions>(options => { options.ClusterId = "Hello.Orleans"; options.ServiceId = "Hello.Orleans"; }) .ConfigureLogging(builder => builder.AddProvider(loggerProvider)) .Build();2. 在?StartAsync方法中通過調(diào)用?Client.Connect建立與Orleans Server的連接。同時(shí)定義了一個(gè)重試機(jī)制。
緊接著我們需要將 ClusterClientHostedService添加到Ioc容器,添加以下代碼到 Orleans.Client.Program中:
static Task Main(string[] args) { Console.Title = typeof(Program).Namespace; return Host.CreateDefaultBuilder() .ConfigureServices(services => { services.AddSingleton<ClusterClientHostedService>(); services.AddSingleton<IHostedService>(_ => _.GetService<ClusterClientHostedService>()); services.AddSingleton(_ => _.GetService<ClusterClientHostedService>().Client); services.AddHostedService<HelloOrleansClientHostedService>(); services.Configure<ConsoleLifetimeOptions>(options => { options.SuppressStatusMessages = true; }); }) .ConfigureLogging(builder => { builder.AddConsole(); }) .RunConsoleAsync(); }對(duì)于 ClusterClientHostedService,并沒有選擇直接通過 services.AddHostedService<T>的方式注入,是因?yàn)槲覀冃枰⑷朐摲?wù)中提供的 IClusterClient(單例),以供其他類去消費(fèi)。
緊接著,定義一個(gè) HelloOrleansClientHostedService用來消費(fèi)定義的 ISessionControlGrain。
public class HelloOrleansClientHostedService : IHostedService { private readonly IClusterClient _client; private readonly ILogger<HelloOrleansClientHostedService> _logger; public HelloOrleansClientHostedService(IClusterClient client, ILogger<HelloOrleansClientHostedService> logger) { _client = client; _logger = logger; } public async Task StartAsync(CancellationToken cancellationToken) { // 模擬控制臺(tái)終端用戶登錄 await MockLogin("Hello.Orleans.Console"); // 模擬網(wǎng)頁終端用戶登錄 await MockLogin("Hello.Orleans.Web"); } /// <summary> /// 模擬指定應(yīng)用的登錄 /// </summary> /// <param name="appName"></param> /// <returns></returns> public async Task MockLogin(string appName) { //假設(shè)我們需要支持不同端登錄用戶,則只需要將項(xiàng)目名稱作為身份標(biāo)識(shí)。 //即可獲取一個(gè)代表用來維護(hù)當(dāng)前項(xiàng)目登錄狀態(tài)的的單例Grain。 var sessionControl = _client.GetGrain<ISessionControlGrain>(appName); ParallelLoopResult result = Parallel.For(0, 10000, (index) => { var userId = $"User-{index}"; sessionControl.Login(userId); }); if (result.IsCompleted) { //ParallelLoopResult.IsCompleted 只是返回所有循環(huán)創(chuàng)建完畢,并不保證循環(huán)的內(nèi)部任務(wù)創(chuàng)建并執(zhí)行完畢 //所以,此處手動(dòng)延遲5秒后再去讀取活動(dòng)用戶數(shù)。 await Task.Delay(TimeSpan.FromSeconds(5)); var activeUserCount = await sessionControl.GetActiveUserCount(); _logger.LogInformation($"The Active Users Count of {appName} is {activeUserCount}"); } } public Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("Closed!"); return Task.CompletedTask; ; } }代碼講解:這里定義了一個(gè) MockLogin用于模擬不同終端10000個(gè)用戶的并發(fā)登錄。
通過構(gòu)造函數(shù)注入需要的?IClusterClient。
通過指定Grain接口以及身份標(biāo)識(shí),就可以通過Client 獲取對(duì)應(yīng)的Grain,進(jìn)而消費(fèi)Grain中暴露的方法。?varsessionControl=_client.GetGrain<ISessionControlGrain>(appName);?這里需要注意的是,指定的身份標(biāo)識(shí)為終端應(yīng)用的名稱,那么在整個(gè)應(yīng)用生命周期內(nèi),將有且僅有一個(gè)代表這個(gè)終端應(yīng)用的Grain。
使用?Parallel.For?模擬并發(fā)
ParallelLoopResult.IsCompleted?只是返回所有循環(huán)任務(wù)創(chuàng)建完畢,并不代表循環(huán)的內(nèi)部任務(wù)執(zhí)行完畢。
6. 啟動(dòng)第一個(gè) Orleans 應(yīng)用
先啟動(dòng) Orleans.Server:再啟動(dòng) Orleans.Client:
從上面的運(yùn)行結(jié)果來看,模擬兩個(gè)終端10000個(gè)用戶的并發(fā)登錄,最終輸出的活動(dòng)用戶數(shù)量均為10000個(gè)。回顧整個(gè)實(shí)現(xiàn),并沒有用到諸如鎖、并發(fā)集合等避免并發(fā)導(dǎo)致的線程安全問題,但卻輸出正確的期望結(jié)果,這就正好說明了Orleans強(qiáng)大的并發(fā)控制特性。
public class SessionControlGrain : Grain, ISessionControlGrain { // 未使用并發(fā)集合 private List<string> LoginUsers { get; set; } = new List<string>(); public Task Login(string userId) { //獲取當(dāng)前Grain的身份標(biāo)識(shí)(因?yàn)镮SessionControlGrain身份標(biāo)識(shí)為string類型,GetPrimaryKeyString()); var appName = this.GetPrimaryKeyString(); LoginUsers.Add(userId);//未加鎖 Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}"); return Task.CompletedTask; } .... }7. 小結(jié)
通過簡單的演示,想必你對(duì)Orleans的編程實(shí)現(xiàn)有了基本的認(rèn)知,并體會(huì)到其并發(fā)控制的強(qiáng)大之處。這只是簡單的入門演練,Orleans很多強(qiáng)大的特性,后續(xù)再結(jié)合具體場(chǎng)景進(jìn)行詳細(xì)闡述。源碼已上傳至GitHub:Hello.Orleans
總結(jié)
以上是生活随笔為你收集整理的Orleans 知多少 | 3. Hello Orleans的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .NET如何将字符串分隔为字符
- 下一篇: .NET Core 3.0之深入源码理解