《ASP.NET Core 微服务实战》-- 读书笔记(第6章)
第 6 章 事件溯源與 CQRS
在本章,我們來了解一下隨著云平臺一同出現的設計模式
我們先探討事件溯源和命令查詢職責分離(CQRS)背后的動機與哲學
事件溯源簡介
事實由事件溯源而來
我們大腦就是一種事件溯源系統,接收感官多種形式刺激,大腦負責對這些刺激進行合適排序,大約每隔幾百毫秒,對刺激構成的流進行運算,而運算的結果,就是我們所說的事實
事件溯源的定義
傳統應用中,狀態由一系列零散的數據所管理,如果客戶端向我們發送 PUT 或 POST 請求,狀態就會改變
這種方式很好地給出了系統當前狀態,卻不能指示在當前狀態之前,系統是如何變化的
事件溯源可以解決這個問題,因為它把狀態管理的職責與接收導致狀態變更的刺激的職責區分開來
基于事件溯源的系統需要滿足一系列要求
有序:有序事件流
冪等:等價多個有序事件流的操作結果相同
獨立:不依賴外部信息
過去式:事件發生在過去
流行的區塊鏈技術的基礎就是發生在特定私有資源上的安全、可信的事件序列
擁抱最終一致性
一種我們每天都在用的最終一致性的應用,就是社區網絡應用
有時你從一個設備發出的評論要花幾分鐘才能展示在朋友的瀏覽器或者其他設備上
這是因為,應用的架構人員做了妥協:通過放棄同步操作的即時一致性,在可接受的范圍內增加一定的反饋延遲,就能讓應用支持巨大的規模與流量
CQRS 模式
如果把我們討論的模式直接套用到系統中,很快會發現系統必須對輸入命令和查詢加以區分,這也被稱為命令查詢職責分離(CQRS)
我們用一個例子來說明這種模式的實際應用
租戶通過一個門戶網站查看用電情況,每當用戶刷新門戶頁面時,就調用某種數據服務并請求,匯總一段時間內所有度量事件
但這種對于云規模的現代軟件開發來說是不可接受的,如果將計算職責推卸給數據庫,很快會造成數據庫瓶頸
掌握了大多數客戶的使用模式,讓我們能夠利用事件溯源來構建一個合理的 CQRS 實現。
事件處理器每次收到新事件時重新計算已緩存的度量總和
利用這種機制,在查詢時,門戶上的用戶所期望的結果已經存在于數據庫或者緩存中
不需要復制的計算,也沒有臨時的聚合與繁雜的匯總,只需要一個簡單的查詢
事件溯源于 CQRS 實戰--附件的團隊成員
接下來要開發的新版實例中,我們將檢測成員彼此相距一個較小距離的時刻
系統將支持對這些接近的結果予以響應
例如我們可能希望向附近的團隊成員的移動設備發送推送通知,以提醒他們可以約見對方
為了實現這一功能,我們把系統職責劃分為以下四個組件:
位置報送服務(命令)
事件處理器(對事件進行溯源)
事實服務(查詢)
位置接近監控器(對事件進行溯源)
位置報送服務
收到新報送的位置后,執行下列操作:
驗證上報數據
將命令轉換為事件
生成事件,并用消息隊列發送出去
GitHub 鏈接:https://github.com/microservices-aspnetcore/es-locationreporter
創建位置報送控制器
using System; using Microsoft.AspNetCore.Mvc; using StatlerWaldorfCorp.LocationReporter.Events; using StatlerWaldorfCorp.LocationReporter.Models; using StatlerWaldorfCorp.LocationReporter.Services;namespace StatlerWaldorfCorp.LocationReporter.Controllers {[Route("/api/members/{memberId}/locationreports")]public class LocationReportsController : Controller{private ICommandEventConverter converter;private IEventEmitter eventEmitter;private ITeamServiceClient teamServiceClient;public LocationReportsController(ICommandEventConverter converter,IEventEmitter eventEmitter,ITeamServiceClient teamServiceClient) {this.converter = converter;this.eventEmitter = eventEmitter;this.teamServiceClient = teamServiceClient;}[HttpPost]public ActionResult PostLocationReport(Guid memberId, [FromBody]LocationReport locationReport){MemberLocationRecordedEvent locationRecordedEvent = converter.CommandToEvent(locationReport);locationRecordedEvent.TeamID = teamServiceClient.GetTeamForMember(locationReport.MemberID);eventEmitter.EmitLocationRecordedEvent(locationRecordedEvent);return this.Created($"/api/members/{memberId}/locationreports/{locationReport.ReportID}", locationReport);}} }創建 AMQP 事件生成器
using System; using System.Linq; using System.Text; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using StatlerWaldorfCorp.LocationReporter.Models;namespace StatlerWaldorfCorp.LocationReporter.Events {public class AMQPEventEmitter : IEventEmitter{private readonly ILogger logger;private AMQPOptions rabbitOptions;private ConnectionFactory connectionFactory;public AMQPEventEmitter(ILogger<AMQPEventEmitter> logger,IOptions<AMQPOptions> amqpOptions){this.logger = logger;this.rabbitOptions = amqpOptions.Value;connectionFactory = new ConnectionFactory();connectionFactory.UserName = rabbitOptions.Username;connectionFactory.Password = rabbitOptions.Password;connectionFactory.VirtualHost = rabbitOptions.VirtualHost;connectionFactory.HostName = rabbitOptions.HostName;connectionFactory.Uri = rabbitOptions.Uri;logger.LogInformation("AMQP Event Emitter configured with URI {0}", rabbitOptions.Uri);}public const string QUEUE_LOCATIONRECORDED = "memberlocationrecorded";public void EmitLocationRecordedEvent(MemberLocationRecordedEvent locationRecordedEvent){using (IConnection conn = connectionFactory.CreateConnection()) {using (IModel channel = conn.CreateModel()) {channel.QueueDeclare(queue: QUEUE_LOCATIONRECORDED,durable: false,exclusive: false,autoDelete: false,arguments: null);string jsonPayload = locationRecordedEvent.toJson();var body = Encoding.UTF8.GetBytes(jsonPayload);channel.BasicPublish(exchange: "",routingKey: QUEUE_LOCATIONRECORDED,basicProperties: null,body: body);}}}} }配置并啟動服務
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using System; using Microsoft.Extensions.Logging; using System.Linq; using StatlerWaldorfCorp.LocationReporter.Models; using StatlerWaldorfCorp.LocationReporter.Events; using StatlerWaldorfCorp.LocationReporter.Services;namespace StatlerWaldorfCorp.LocationReporter {public class Startup{public Startup(IHostingEnvironment env, ILoggerFactory loggerFactory){loggerFactory.AddConsole();loggerFactory.AddDebug();var builder = new ConfigurationBuilder().SetBasePath(env.ContentRootPath).AddJsonFile("appsettings.json", optional: false, reloadOnChange: false).AddEnvironmentVariables();Configuration = builder.Build();}public IConfigurationRoot Configuration { get; }public void ConfigureServices(IServiceCollection services){services.AddMvc();services.AddOptions();services.Configure<AMQPOptions>(Configuration.GetSection("amqp"));services.Configure<TeamServiceOptions>(Configuration.GetSection("teamservice"));services.AddSingleton(typeof(IEventEmitter), typeof(AMQPEventEmitter));services.AddSingleton(typeof(ICommandEventConverter), typeof(CommandEventConverter));services.AddSingleton(typeof(ITeamServiceClient), typeof(HttpTeamServiceClient));}public void Configure(IApplicationBuilder app,IHostingEnvironment env,ILoggerFactory loggerFactory,ITeamServiceClient teamServiceClient,IEventEmitter eventEmitter){// Asked for instances of singletons during Startup// to force initialization early.app.UseMvc();}} }對 Configure 的兩次調用讓配置子系統把分別從 amqp 和 teamservice 節加載的配置選項以依賴注入的方式提供出來
這些配置可以由 appsettings.json 文件提供,也可以用環境變量覆蓋
{"amqp": {"username": "guest","password": "guest","hostname": "localhost","uri": "amqp://localhost:5672/","virtualhost": "/"},"teamservice": {"url": "http://localhost:5001"} }消費團隊服務
using System; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Linq; using System.Net.Http; using System.Net.Http.Headers; using Newtonsoft.Json; using StatlerWaldorfCorp.LocationReporter.Models;namespace StatlerWaldorfCorp.LocationReporter.Services {public class HttpTeamServiceClient : ITeamServiceClient{private readonly ILogger logger;private HttpClient httpClient;public HttpTeamServiceClient(IOptions<TeamServiceOptions> serviceOptions,ILogger<HttpTeamServiceClient> logger){this.logger = logger;var url = serviceOptions.Value.Url;logger.LogInformation("Team Service HTTP client using URL {0}", url);httpClient = new HttpClient();httpClient.BaseAddress = new Uri(url);}public Guid GetTeamForMember(Guid memberId){httpClient.DefaultRequestHeaders.Accept.Clear();httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));HttpResponseMessage response = httpClient.GetAsync(String.Format("/members/{0}/team", memberId)).Result;TeamIDResponse teamIdResponse;if (response.IsSuccessStatusCode) {string json = response.Content.ReadAsStringAsync().Result;teamIdResponse = JsonConvert.DeserializeObject<TeamIDResponse>(json);return teamIdResponse.TeamID;}else {return Guid.Empty;}}}public class TeamIDResponse{public Guid TeamID { get; set; }} }這個例子中,我們使用 .Result 屬性在等待異步方法響應期間強行阻塞了線程
在生產級質量的代碼里,很可能對此進行重構,確保在服務邊界之內整個調用鏈都傳遞異步結果
運行位置報送服務
RabbitMQ 已經啟動運行,默認的配置也指向了本地的 RabbitMQ 實例
此時可以使用以下方式啟動位置報送服務
(確保位于 src/StatlerWaldorfCorp.LocationReporter 子目錄中)
$ dotnet restore $ dotnet build $ dotnet run --server.urls=http://0.0.0.0:9090服務運行后,只要向服務提交請求,就可以體驗其功能了
$ curl -X POST -d \ '{"reportID":"...", \ "origin":"...", "latitude":10, "longtitude":20, \ "memberID":"..."}' \ http://...le2 \ /locationreports提交完成后,應該能從服務獲得一個 HTTP 201 響應
事件處理器
它的職責是消費來自流的事件,并執行合適的操作
為確保代碼整潔、可測試,我們把事件處理的職責劃分為如下部分:
訂閱隊列并從事件流中獲取新的消息
將消息寫入事件存儲
處理事件流(檢測附近的隊友)
作為流的處理結果,生成新的消息并發送到隊列
作為流的處理結果,向事實服務的服務器 / 緩存提交狀態變更情況
GitHub 鏈接:https://github.com/microservices-aspnetcore/es-eventprocessor
檢測附近隊友的基于 GPS 工具類的檢測器
using System.Collections.Generic; using StatlerWaldorfCorp.EventProcessor.Location; using System.Linq; using System;namespace StatlerWaldorfCorp.EventProcessor.Events {public class ProximityDetector{/** This method assumes that the memberLocations collection only* applies to members applicable for proximity detection. In other words,* non-team-mates must be filtered out before using this method.* distance threshold is in Kilometers.*/public ICollection<ProximityDetectedEvent> DetectProximityEvents(MemberLocationRecordedEvent memberLocationEvent,ICollection<MemberLocation> memberLocations,double distanceThreshold){GpsUtility gpsUtility = new GpsUtility();GpsCoordinate sourceCoordinate = new GpsCoordinate() {Latitude = memberLocationEvent.Latitude,Longitude = memberLocationEvent.Longitude};return memberLocations.Where(ml => ml.MemberID != memberLocationEvent.MemberID &&gpsUtility.DistanceBetweenPoints(sourceCoordinate, ml.Location) < distanceThreshold).Select( ml => {return new ProximityDetectedEvent() {SourceMemberID = memberLocationEvent.MemberID,TargetMemberID = ml.MemberID,TeamID = memberLocationEvent.TeamID,DetectionTime = DateTime.UtcNow.Ticks,SourceMemberLocation = sourceCoordinate,TargetMemberLocation = ml.Location,MemberDistance = gpsUtility.DistanceBetweenPoints(sourceCoordinate, ml.Location)};}).ToList();}} }接著我們就可以用這個方法的結果來產生對應的額外效果,例如可能需要發出一個 ProximityDetectorEvent 事件,并將事件寫入事件存儲
作為主體的事件處理器代碼
using System; using System.Collections.Generic; using Microsoft.Extensions.Logging; using StatlerWaldorfCorp.EventProcessor.Location; using StatlerWaldorfCorp.EventProcessor.Queues;namespace StatlerWaldorfCorp.EventProcessor.Events {public class MemberLocationEventProcessor : IEventProcessor{private ILogger logger;private IEventSubscriber subscriber;private IEventEmitter eventEmitter;private ProximityDetector proximityDetector;private ILocationCache locationCache;public MemberLocationEventProcessor(ILogger<MemberLocationEventProcessor> logger,IEventSubscriber eventSubscriber,IEventEmitter eventEmitter,ILocationCache locationCache){this.logger = logger;this.subscriber = eventSubscriber;this.eventEmitter = eventEmitter;this.proximityDetector = new ProximityDetector();this.locationCache = locationCache;this.subscriber.MemberLocationRecordedEventReceived += (mlre) => {var memberLocations = locationCache.GetMemberLocations(mlre.TeamID);ICollection<ProximityDetectedEvent> proximityEvents =proximityDetector.DetectProximityEvents(mlre, memberLocations, 30.0f);foreach (var proximityEvent in proximityEvents) {eventEmitter.EmitProximityDetectedEvent(proximityEvent);}locationCache.Put(mlre.TeamID, new MemberLocation { MemberID = mlre.MemberID, Location = new GpsCoordinate {Latitude = mlre.Latitude, Longitude = mlre.Longitude} });};}public void Start(){this.subscriber.Subscribe();}public void Stop(){this.subscriber.Unsubscribe();}} }事件處理服務唯一的額外職責是需要將收到的每個事件都寫入事件存儲
這樣做到原因有很多,包括向其他服務提供可供搜索的歷史記錄
如果緩存崩潰、數據丟失、事件存儲也可用于重建事實緩存
請記住,緩存在架構里僅提供便利性,我們不應該在緩存中存儲任何無法從其他位置重建的數據
我們要給服務里每一個團隊創建一個 Redis 哈希(hash)
在哈希中,把團隊成員的位置經序列化得到的 JSON 正文存儲為字段(團隊成員的 ID 用作鍵)
這樣就能輕松地并發更新多個團隊成員地位置而不會覆蓋數據,同時也很容易查詢給定的任意團隊的位置列表,因為團隊就是一個個哈希
事實服務
事實服務負責維護每個團隊成員的位置,不過這些位置只代表最近從一些應用那里收到的位置
關于事實服務的這類服務,有兩條重要的提醒需要記住:
事實服務并不是事件存儲
事實服務是不可依賴服務
位置接近監控器
位置接近監控器的代碼包括
基本的微服務結構
一個隊列消費端,訂閱 ProximityDetectedEvent 事件到達的消息
調用一些第三方或云上的服務來發送推動通知
運行示例項目
下面列出運行本章示例的依賴項:
RabbitMQ 服務器
Redis 服務器
所有依賴項都啟動運行后,可從 GitHub 拉取 es-locationreporter 和 es-eventprocessor 兩個服務的代碼
此外需要一份 teamservice 服務
請確保獲取的是 master 分支,因為在測試期間只需要用到內存存儲
要啟動團隊服務,在命令行中轉到 src/StatlerWaldorfCorp.TeamService 目錄并運行以下命令
$ dotnet run --server.urls=http://0.0.0.:5001要啟動位置報送服務,在命令行中轉到 src/StatlerWaldorfCorp.LocationReporter 目錄下并運行以下命令
$ dotnet run --server.urls=http://0.0.0:5002啟動事件處理器(從 src/StatlerWaldorfCorp.EventProcessor 目錄運行)
$ dotnet run --server.urls=http://0.0.0.:5003可用下列步驟端到端地檢驗整個事件溯源/CQRS系統:
(1)向 http://localhost:5001/teams 發送一個 POST 請求,創建一個新團隊
(2)向 http://localhost:5001/teams/
/members 發送一個 POST 請求,往團隊中添加一個成員(3)向 http://localhost:5002/api/members/
/locationreports 發送一個 POST 請求,報送團隊成員位置(4)觀察由報送的位置轉換而成、被放到對應隊列中的 MemberLocationReportedEvent 事件
(5)再重復幾次第 3 步,添加一些相距較遠的位置,確保不會觸發并被檢測到位置接近事件
(6)重復第 2 步,往第一名測試成員所在團隊添加一名新成員
(7)為第二名成員再次重復第 3 步,添加一個于第一名成員最近的位置相距幾公里以內的位置
(8)現在應該能夠在 proximitydetected 隊列中看到一條新消息
(9)可用直接查詢 Redis 緩存,也可以利用事實服務來查看各團隊成員最新的位置狀態
手動操作幾次后,大多數團隊會花些時間把這一過程自動化
借助 docker compose 之類的工具,或者創建 Kubernetes 部署,或者其他容器編排環境,可自動將所有服務部署到集成測試環境
接著用腳本發送 REST 請求
待測試運行完成后,斷言出現了正確的接近檢測的次數,值也是正確的
總結
以上是生活随笔為你收集整理的《ASP.NET Core 微服务实战》-- 读书笔记(第6章)的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: C#如何安全、高效地玩转任何种类的内存之
 - 下一篇: 编译调试 .NET Core 5.0 P