《ASP.NET Core 微服务实战》-- 读书笔记(第11章)
第 11 章 開發實時應用和服務
在本章,我們將討論“實時”的準確含義,以及在大部分消費者看來應該屬于這一范疇的應用類型
接著,我們將探討 WebSocket,并分析為什么傳統的 WebSocket 與云環境完全不相適應,最后我們將構建一個實時應用的示例,用于展示向一個事件溯源系統添加實時消息的強大功能
實時應用的定義
我認為,實時系統的定義可以稍微寬泛一點,只要是事件的接收與處理過程之間只有少許延遲,或者完全沒有延遲都可以認為是實時系統
下面是真正的實時系統中區分出非實時系統的幾個特點:
應用收集輸入數據后,在生成輸出前,有明顯的等待
應用只按照固定間隔或者基于某種按計劃或隨機觸發的外部信號生成輸出
實時系統有一個真正常見的跡象和特征,即當相關方關注的事件發生時,它們會收到推送通知,而不是由相關方以掛起等待或者間隔查詢的方式來檢查新狀態
云環境中的 WebSocket
WebSocket 協議
WebSocket 協議始于 2008 年,它定義了瀏覽器和服務器之間建立持久的雙向 Socket 連接的標準
這讓服務器向運行于瀏覽器中的 Web 應用發送數據稱為可能,期間不需要由 Web 應用執行“輪詢”
在底層實現中,瀏覽器向服務器請求連接進行升級
握手完成后,瀏覽器和服務器將切換為單獨的二進制 TCP 連接,以實現雙向通信
部署模式
假如所有服務器都運行在亞馬遜云的彈性計算服務環境中
當虛擬機被托管在云基礎設施中時,它們就可能隨時被搬移、銷毀并重建
這原本是一件好事,旨在讓應用近乎不受限制地伸縮
不過,這也意味著這種“實時” WebSocket 連接可能被切斷或者嚴重延遲,并在不知不覺中失去響應
此處的解決方案通常是將對 WebSocket 的使用獨立出去--把管理 WebSocket 連接和數據傳輸工作轉移到應用的代碼之外的位置
簡單地說,相比于在自己的應用中管理 WebSocket,我們應該選用一種基于云的消息服務,讓更專業的人來完成這項工作
使用云消息服務
我們的應用需要擁有實時通信的能力
我們希望微服務能夠向客戶端推送數據,但客戶端無法建立到微服務的持續 TCP 連接
我們還希望能夠使用相同類似的消息機制向后端服務發送消息
為讓微服務遵循云原生特性、保留可伸縮的能力,并在云環境中自由地搬移,我們需要挑選一種消息服務,把一定的實時通信能力提取到進程之外
下面列舉一些廠商,他們提供的云消息服務有的是獨立產品,有的則是大型服務套件中的一部分:
Apigee (API 網關與實時消息通信)
PubNub (實時消息通信與活躍度監控)
Pusher(實時消息通信活躍度監控)
Kaazing(實時消息通信)
Mashery(API 網關與實時消息通信)
Google (Google 云消息通信)
ASP.NET SinglR (Azure 托管的實時消息通信服務)
Amazon (簡單通知服務)
無論選擇哪種機制,我們都應該投入一定的時間讓代碼與具體的消息服務相隔離,從而在更換服務商時,不至于產生太大的影響
開發位置接近監控服務
現在,我們要做的就是開發一個每當后端系統檢測到接近事件時,就能夠實時更新的監視器
我們可以生成一張地圖,在上面繪出兩個團隊成員的位置,當系統檢測到他們相互接近時,就讓他們的頭像跳動,或者生成一個動畫
這些團隊成員的移動設備可能還會在同一時刻收到通知
創建接近監控服務
我們的示例監控服務將包含一系列不同的組件
首先,我們需要消費由第 6 章編寫的服務生成并放入隊列的 ProximityDetectedEvent 事件
此后,我們要提取事件中的原始信息,調用團隊服務以獲取可供用戶讀取識別的信息
獲取這些補充信息后,最后要在實時消息系統上發出一條消息
GitHub鏈接:https://github.com/microservices-aspnetcore/es-proximitymonitor
以下是我們接近監控服務背后的上層協調邏輯
using System; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StatlerWaldorfCorp.ProximityMonitor.Queues; using StatlerWaldorfCorp.ProximityMonitor.Realtime; using StatlerWaldorfCorp.ProximityMonitor.TeamService;namespace StatlerWaldorfCorp.ProximityMonitor.Events {public class ProximityDetectedEventProcessor : IEventProcessor{private ILogger logger;private IRealtimePublisher publisher;private IEventSubscriber subscriber;private PubnubOptions pubnubOptions;public ProximityDetectedEventProcessor(ILogger<ProximityDetectedEventProcessor> logger,IRealtimePublisher publisher,IEventSubscriber subscriber,ITeamServiceClient teamClient,IOptions<PubnubOptions> pubnubOptions){this.logger = logger;this.pubnubOptions = pubnubOptions.Value;this.publisher = publisher;this.subscriber = subscriber;logger.LogInformation("Created Proximity Event Processor.");subscriber.ProximityDetectedEventReceived += (pde) => {Team t = teamClient.GetTeam(pde.TeamID);Member sourceMember = teamClient.GetMember(pde.TeamID, pde.SourceMemberID);Member targetMember = teamClient.GetMember(pde.TeamID, pde.TargetMemberID);ProximityDetectedRealtimeEvent outEvent = new ProximityDetectedRealtimeEvent{TargetMemberID = pde.TargetMemberID,SourceMemberID = pde.SourceMemberID,DetectionTime = pde.DetectionTime,SourceMemberLocation = pde.SourceMemberLocation,TargetMemberLocation = pde.TargetMemberLocation,MemberDistance = pde.MemberDistance,TeamID = pde.TeamID,TeamName = t.Name,SourceMemberName = $"{sourceMember.FirstName} {sourceMember.LastName}",TargetMemberName = $"{targetMember.FirstName} {targetMember.LastName}"};publisher.Publish(this.pubnubOptions.ProximityEventChannel, outEvent.toJson());};}public void Start(){subscriber.Subscribe();}public void Stop(){subscriber.Unsubscribe();}} }在這個代碼清單中,首先要注意的是從 DI 向構造函數注入的一連串依賴:
日志記錄工具
實時事件發布器
事件訂閱器
團隊服務客戶端
PubNub 選項
創建實時事件發布器類實現類
using Microsoft.Extensions.Logging; using PubnubApi;namespace StatlerWaldorfCorp.ProximityMonitor.Realtime {public class PubnubRealtimePublisher : IRealtimePublisher{private ILogger logger;private Pubnub pubnubClient;public PubnubRealtimePublisher(ILogger<PubnubRealtimePublisher> logger,Pubnub pubnubClient){logger.LogInformation("Realtime Publisher (Pubnub) Created.");this.logger = logger;this.pubnubClient = pubnubClient;}public void Validate(){pubnubClient.Time().Async(new PNTimeResultExt((result, status) => {if (status.Error) {logger.LogError($"Unable to connect to Pubnub {status.ErrorData.Information}");throw status.ErrorData.Throwable;} else {logger.LogInformation("Pubnub connection established.");}}));}public void Publish(string channelName, string message){pubnubClient.Publish().Channel(channelName).Message(message).Async(new PNPublishResultExt((result, status) => {if (status.Error) {logger.LogError($"Failed to publish on channel {channelName}: {status.ErrorData.Information}");} else {logger.LogInformation($"Published message on channel {channelName}, {status.AffectedChannels.Count} affected channels, code: {status.StatusCode}");}}));}} }注入實時通信類
在 Startup 類中配置 DI 來提供 PubNub 客戶端和其他相關類
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using StatlerWaldorfCorp.ProximityMonitor.Queues; using StatlerWaldorfCorp.ProximityMonitor.Realtime; using RabbitMQ.Client.Events; using StatlerWaldorfCorp.ProximityMonitor.Events; using Microsoft.Extensions.Options; using RabbitMQ.Client; using StatlerWaldorfCorp.ProximityMonitor.TeamService;namespace StatlerWaldorfCorp.ProximityMonitor {public class Startup{public Startup(IHostingEnvironment env, ILoggerFactory loggerFactory){loggerFactory.AddConsole();loggerFactory.AddDebug();var builder = new ConfigurationBuilder().SetBasePath(env.ContentRootPath).AddJsonFile("appsettings.json", optional: false, reloadOnChange: false).AddEnvironmentVariables();Configuration = builder.Build();}public IConfigurationRoot Configuration { get; }public void ConfigureServices(IServiceCollection services){services.AddMvc();services.AddOptions();services.Configure<QueueOptions>(Configuration.GetSection("QueueOptions"));services.Configure<PubnubOptions>(Configuration.GetSection("PubnubOptions"));services.Configure<TeamServiceOptions>(Configuration.GetSection("teamservice"));services.Configure<AMQPOptions>(Configuration.GetSection("amqp"));services.AddTransient(typeof(IConnectionFactory), typeof(AMQPConnectionFactory));services.AddTransient(typeof(EventingBasicConsumer), typeof(RabbitMQEventingConsumer));services.AddSingleton(typeof(IEventSubscriber), typeof(RabbitMQEventSubscriber));services.AddSingleton(typeof(IEventProcessor), typeof(ProximityDetectedEventProcessor));services.AddTransient(typeof(ITeamServiceClient),typeof(HttpTeamServiceClient));services.AddRealtimeService();services.AddSingleton(typeof(IRealtimePublisher), typeof(PubnubRealtimePublisher));}// Singletons are lazy instantiation.. so if we don't ask for an instance during startup,// they'll never get used.public void Configure(IApplicationBuilder app,IHostingEnvironment env,ILoggerFactory loggerFactory,IEventProcessor eventProcessor,IOptions<PubnubOptions> pubnubOptions,IRealtimePublisher realtimePublisher){realtimePublisher.Validate();realtimePublisher.Publish(pubnubOptions.Value.StartupChannel, "{'hello': 'world'}");eventProcessor.Start();app.UseMvc();}} }我們嘗試為類提供預先創建好的 PubNub API 實例
為整潔地實現這一功能,并繼續以注入方式獲取配置信息,包括 API 密鑰,我們需要向 DI 中注冊一個工廠
工廠類的職責是向外提供裝配完成的 PubNub 實例
using System; using Microsoft.Extensions.Options; using PubnubApi; using System.Linq; using Microsoft.Extensions.Logging;namespace StatlerWaldorfCorp.ProximityMonitor.Realtime {public class PubnubFactory{private PNConfiguration pnConfiguration;private ILogger logger;public PubnubFactory(IOptions<PubnubOptions> pubnubOptions,ILogger<PubnubFactory> logger){this.logger = logger;pnConfiguration = new PNConfiguration();pnConfiguration.PublishKey = pubnubOptions.Value.PublishKey;pnConfiguration.SubscribeKey = pubnubOptions.Value.SubscribeKey;pnConfiguration.Secure = false;logger.LogInformation($"Pubnub Factory using publish key {pnConfiguration.PublishKey}");}public Pubnub CreateInstance(){return new Pubnub(pnConfiguration);}} }將工廠注冊到 DI 時使用的擴展方法機制
using System; using Microsoft.Extensions.DependencyInjection; using PubnubApi;namespace StatlerWaldorfCorp.ProximityMonitor.Realtime {public static class RealtimeServiceCollectionExtensions{public static IServiceCollection AddRealtimeService(this IServiceCollection services){services.AddTransient<PubnubFactory>();return AddInternal(services, p => p.GetRequiredService<PubnubFactory>(), ServiceLifetime.Singleton);}private static IServiceCollection AddInternal(this IServiceCollection collection,Func<IServiceProvider, PubnubFactory> factoryProvider,ServiceLifetime lifetime){Func<IServiceProvider, object> factoryFunc = provider =>{var factory = factoryProvider(provider);return factory.CreateInstance();};var descriptor = new ServiceDescriptor(typeof(Pubnub), factoryFunc, lifetime);collection.Add(descriptor);return collection;}} }上面代碼的關鍵功能是創建了一個 lambda 函數,接收 IServiceProvider 作為輸入,并返回一個對象作為輸出
它正是我們注冊工廠時向服務描述對象中傳入的工廠方法
匯總所有設計
要立即查看效果,從而確保一切工作正常,我們可模擬由第 6 章的服務輸出的信息
只需要手動向 proximitydetected 隊列中放入表示 ProximityDetectedEvent 對象的 JSON 字符串
在這個過程中,如果我們的監控服務處于運行之中、訂閱了隊列,而且團隊服務處于運行之中、擁有正確的數據,那么接近監控服務將取出事件、補充必要的數據,并通過 PubNub 發送一個實時事件
利用 PubNub 調試控制臺,我們可以立即看到這一處理過程生成的輸出
為實時接近監控服務創建界面
為簡化工作,同時掩蓋我缺乏藝術細胞的真相,我將用一個不包含圖形元素的簡單 HTML 頁面,它不需要托管在專門的 Web 服務器上
它實時地監聽接近事件,并將攜帶的信息動態添加到新的 div 元素中
realtimetest.html
<html><head><title>RT page sample</title><script src="https://cdn.pubnub.com/sdk/javascript/pubnub.4.4.0.js"></script><script>var pubnub = new PubNub({subscribeKey: "yoursubkey",publishKey: "yourprivatekey",ssl: true});pubnub.addListener({message: function(m) {// handle messagevar channelName = m.channel; // The channel for which the message belongsvar channelGroup = m.subscription; // The channel group or wildcard subscription match (if exists)var pubTT = m.timetoken; // Publish timetokenvar msg = JSON.parse(m.message); // The Payloadconsole.log("New Message!!", msg);var newDiv = document.createElement('div')var newStr = "** (" + msg.TeamName + ") " + msg.SourceMemberName + " moved within " + msg.MemberDistance + "km of " + msg.TargetMemberName;newDiv.innerHTML = newStrvar oldDiv = document.getElementById('chatLog')oldDiv.appendChild(newDiv)},presence: function(p) {// handle presencevar action = p.action; // Can be join, leave, state-change or timeoutvar channelName = p.channel; // The channel for which the message belongsvar occupancy = p.occupancy; // No. of users connected with the channelvar state = p.state; // User Statevar channelGroup = p.subscription; // The channel group or wildcard subscription match (if exists)var publishTime = p.timestamp; // Publish timetokenvar timetoken = p.timetoken; // Current timetokenvar uuid = p.uuid; // UUIDs of users who are connected with the channel},status: function(s) {// handle status}});console.log("Subscribing..");pubnub.subscribe({channels: ['proximityevents']});</script></head><body><h1>Proximity Monitor</h1><p>Proximity Events listed below.</p><div id="chatLog"></div></body> </html>值得指出的是,這個文件并不需要托管在服務器上
在任何瀏覽器中打開,其中的 JavaScript 都可以運行
總結
以上是生活随笔為你收集整理的《ASP.NET Core 微服务实战》-- 读书笔记(第11章)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Magicodes.IE 2.0发布
- 下一篇: 【在路上5】实时计算助力派件管控