Orleans解决并发之痛(四):Streams
Orleans 提供了 Stream擴展編程模型。此模型提供了一套API,使處理流更簡單和更健壯。Stream默認提供了兩種Provider,不同的流類型可能使用不同的Provider來處理,Simple Message Stream Provider 和 Azure Queue Stream Provider。Stream Providers兼容現有的隊列技術,比如: Event Hubs、ServiceBus、Azure Queues、Apache Kafka,不再需要編寫額外的代碼來配合這些隊列技術的使用。
關于為什么Orleans會提供Stream擴展編程模型?
當今已經有一系列技術可以來構建一個流處理系統。包括持久存儲流數據方面,如:Event Hubs、Kafka;數據流計算操作方面,如: Azure Stream Analytics、Apache Storm、Apache Spark Streaming, 而這些技術并不適合細粒度的自由格式的流數據計算, 或者支持的并不好,因為實際情況下可能需要對不同的數據流執行不同的操作,Orleans Streams目的就是解決這類問題,Stream編程模型和發布訂閱模式挺相似。
上述提到的一些技術我并沒有詳細學習,后面會了解并對比,如果已熟悉的可以先思考并給我普及普及。
Orleans Stream大概實現的步驟如下:
獲取 StreamProvider
獲取 IAsyncStream<T>
訂閱者訂閱一個Stream
發布者向某個Stream發布消息
Silo配置文件OrleansConfiguration.xml修改
在Globals節點中添加:
<StorageProviders><Provider Type="Orleans.Storage.MemoryStorage" Name="PubSubStore" /> </StorageProviders> <StreamProviders><Provider Type="Orleans.Providers.Streams.SimpleMessageStream.SimpleMessageStreamProvider" Name="SMSProvider"/> </StreamProviders>Name為PubSubStore的StorageProvider是必須的,Stream內部需要它來跟蹤所有流訂閱,記錄各個流的發布者和訂閱者的關系,本例中使用MemoryStorage,實際生產環境這是不對的。
Name為SMSProvider的StreamProvider指定了消息的發布形式,Orleans當前提供的兩種StreamProvider:Simple Message Stream Provider 和 Azure Queue Stream Provider 都是可靠的。
Simple Message Stream Provider:不保證可靠的交付,失敗的消息不會自動重新發送,但可以根據返回的Task狀態來判斷是否重新發送,事件執行順序遵循FIFO原則。
Azure Queue Stream Provider:事件被加入Azure Queue, 如果傳送或處理失敗,事件不會從隊列中刪除,并且稍后會自動重新被發送,因此事件執行順序不遵循FIFO原則。
獲取 StreamProvider
var streamProvider = this.GetStreamProvider("SMSProvider");SMSProvider 對應配置文件中Name為SMSProvider的StreamProvider
獲取 IAsyncStream<T>
var streamId = this.GetPrimaryKey(); var stream = streamProvider.GetStream<string>(streamId, "GrainStream");GetStream 需要兩個參數,通過兩個值定位唯一的Stream:
streamId:Guid類型,stream標識
streamNamespace:字符串,stream的命名空間
訂閱一個Stream
訂閱Stream分為隱式和顯式訂閱。
隱式訂閱
隱式訂閱的訂閱者是唯一的,不存在對一個Stream的多次訂閱,也不能取消訂閱。
Interface:
public interface IImplicitSubscriberGrain : IGrainWithGuidKey { }Grain:
[ImplicitStreamSubscription("GrainImplicitStream")] public class ImplicitSubscriberGrain : Grain, IImplicitSubscriberGrain, IAsyncObserver<string> {protected StreamSubscriptionHandle<string> streamHandle;public override async Task OnActivateAsync(){var streamId = this.GetPrimaryKey();var streamProvider = this.GetStreamProvider("SMSProvider");var stream = streamProvider.GetStream<string>(streamId, "GrainImplicitStream");streamHandle = await stream.SubscribeAsync(OnNextAsync);}public override async Task OnDeactivateAsync(){if (streamHandle != null)await streamHandle.UnsubscribeAsync();}public Task OnCompletedAsync(){return Task.CompletedTask;}public Task OnErrorAsync(Exception ex){return Task.CompletedTask;}public Task OnNextAsync(string item, StreamSequenceToken token = null){Console.WriteLine($"Received message:{item}");return Task.CompletedTask;} }在Grain上標記 ImplicitStreamSubscription 屬性,變量值為命名空間;
在Grain的OnActivateAsync方法體中調用SubscribeAsync;
實現IAsyncObserver接口,當發布者向Stream發送消息,訂閱者接到消息后將執行OnNextAsync;
隱式訂閱模式訂閱者自動由發布者創建;
顯式訂閱
Interface:
public interface IExplicitSubscriberGrain : IGrainWithGuidKey {Task<StreamSubscriptionHandle<string>> SubscribeAsync();Task ReceivedMessageAsync(string data); }Grain:
public class ExplicitSubscriberGrain : Grain, IExplicitSubscriberGrain {private IAsyncStream<string> stream;public async override Task OnActivateAsync(){var streamProvider = this.GetStreamProvider("SMSProvider");stream = streamProvider.GetStream<string>(this.GetPrimaryKey(), "GrainExplicitStream");var subscriptionHandles = await stream.GetAllSubscriptionHandles();if (subscriptionHandles.Count > 0){subscriptionHandles.ToList().ForEach(async x =>{await x.ResumeAsync((payload, token) => this.ReceivedMessageAsync(payload));});}}public async Task<StreamSubscriptionHandle<string>> SubscribeAsync(){return await stream.SubscribeAsync((payload, token) => this.ReceivedMessageAsync(payload));}public Task ReceivedMessageAsync(string data){Console.WriteLine($"Received message:{data}");return Task.CompletedTask;} }訂閱者通過調用SubscribeAsync方法完成訂閱,并返回StreamSubscriptionHandle,這個對象提供了UnsubscribeAsync方法,方便取消訂閱;
本例子中支持對同一個Stream被訂閱多次,被訂閱多次的結果是當向這個Stream發送消息的時候,ReceivedMessageAsync會執行多次。如果不希望對同一個Stream定義多次,在SubscribeAsync方法中可以通過GetAllSubscriptionHandles獲取當前訂閱者的個數,只有為0才執行訂閱;
訂閱者是一直存在的,除了被顯示調用了UnsubscribeAsync方法。在OnActivateAsync中我們加入了ResumeAsync操作, 當Grain由未激活狀態變為激活狀態的時候,通過GetAllSubscriptionHandles獲取這個Stream中存在的訂閱者,通過ResumeAsync可以把它們重新喚醒。(模擬方式:殺掉Silo,重新啟動即可,不過前提條件是PubSubStore不能使用MemoryStorage,因為使用MemoryStorage存儲一旦重啟后訂閱者和發布者的關系都會丟失)
發布消息
Interface:
public interface IPublisherGrain: IGrainWithGuidKey {Task PublishMessageAsync(string data); }Grain:
public class PublisherGrain : Grain, IPublisherGrain {private IAsyncStream<string> stream;public override Task OnActivateAsync(){var streamId = this.GetPrimaryKey();var streamProvider = this.GetStreamProvider("SMSProvider");this.stream = streamProvider.GetStream<string>(streamId, "GrainExplicitStream"); //隱式:GrainImplicitStreamreturn base.OnActivateAsync();}public async Task PublishMessageAsync(string data){Console.WriteLine($"Sending data: {data}");await this.stream.OnNextAsync(data);} }通過調用IAsyncStream的OnNextAsync發布消息即可。這里可以針對返回的Task狀態再作一些操作,如果不成功,重新發送或記錄日志等。
Client發布消息:
客戶端發布消息:
while (true) {Console.WriteLine("Press 'exit' to exit...");var input = Console.ReadLine();if (input == "exit") break;var publisherGrain = GrainClient.GrainFactory.GetGrain<IPublisherGrain>(Guid.Empty);publisherGrain.PublishMessageAsync(input); }發布消息
顯示訂閱下,需要增加另一個客戶端先完成訂閱:
var subscriberGrain = GrainClient.GrainFactory.GetGrain<IExplicitSubscriberGrain>(Guid.Empty); var streamHandle = subscriberGrain.SubscribeAsync().Result; Console.WriteLine("Press enter to exit..."); Console.ReadLine(); streamHandle.UnsubscribeAsync();顯示訂閱下發布消息
參考鏈接:
Actor模型
Orleans
案例Demo-OrleansStreams
相關文章:?
.NET的Actor模型:Orleans
微軟分布式云計算框架Orleans(1):Hello World
微軟分布式云計算框架Orleans(2):容災與集群(1)
Aaron Stannard談Akka.NET 1.1
使用Akka.net開發第一個分布式應用
Orleans入門例子
Orleans例子再進一步
Orleans稍微復雜的例子—互動
Orleans簡單配置
Orleans配置---持久化
Orleans—一些概念
Orleans的集群構建
Oleans集群之Consul再解釋
Orleans解決并發之痛(一):單線程
Orleans解決并發之痛(二):Grain狀態
Orleans解決并發之痛(三):集群
原文地址:http://www.jianshu.com/p/5f150b5a77e0
.NET社區新聞,深度好文,微信中搜索dotNET跨平臺或掃描二維碼關注
總結
以上是生活随笔為你收集整理的Orleans解决并发之痛(四):Streams的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2017(深圳) .NET技术分享交流会
- 下一篇: .NET Core 2.0应用程序大小减