基于 gRPC 和 .NET Core 的服务器流
原文:https://bit.ly/3lpz8Ll
作者:Chandan Rauniyar
翻譯:精致碼農-王亮
早在 2019 年,我寫過《用 Mapbox 繪制位置數據》一文,詳細介紹了我如何通過簡單的文件上傳,用 Mapbox 繪制約 230 萬個位置點。本文介紹我是如何通過使用 gRPC 和 .NET Core 的服務器流來快速獲取所有位置歷史數據的。
什么是 gRPC
gRPC 是一個現代開源的高性能 RPC 框架,可以在任何環境下運行。它可以有效地連接數據中心內和跨數據中心的服務,并對負載平衡、跟蹤、健康檢查和認證提供可插拔的支持。gRPC 最初是由谷歌創建的,該公司使用一個名為 Stubby 的單一通用 RPC 基礎設施來連接其數據中心內和跨數據中心運行的大量微服務,使用已經超過十年。2015 年 3 月,谷歌決定建立 Stubby 的下一個版本,并將其開源,結果就是現在的 gRPC,被許多企業或組織使用。
https://grpc.io/gRPC 服務器流
服務器流式(Server Streaming)RPC,客戶端向服務器發送請求,并獲得一個流來讀取一連串的消息。客戶端從返回的流中讀取信息,直到沒有消息為止。gRPC 保證在單個 RPC 調用中的信息是有序的。
rpc GetLocationData (GetLocationRequest) returns (stream GetLocationResponse);協議緩沖區(Protobuf)
gRPC 使用協議緩沖區(protocol buffers)作為接口定義語言(IDL)來定義客戶端和服務器之間的契約。在下面的 proto 文件中,定義了一個 RPC 方法 GetLocations,它接收 GetLocationsRequest 消息類型并返回 GetLocationsResponse 消息類型。響應消息類型前面的 stream 關鍵字表示響應是流類型,而不是單個響應。
syntax = "proto3";option csharp_namespace = "GPRCStreaming";package location_data;service LocationData {rpc GetLocations (GetLocationsRequest) returns (stream GetLocationsResponse); }message GetLocationsRequest {int32 dataLimit = 1; }message GetLocationsResponse {int32 latitudeE7 = 1;int32 longitudeE7 = 2; }創建 gRPC 服務
我們可以使用 dotnet new grpc -n threemillion 命令輕松創建一個 .NET gRPC 服務。更多關于在 ASP.NET Core 中創建 gRPC 服務器和客戶端的信息可在微軟文檔中找到。
Create a gRPC client and server in ASP.NET Core https://docs.microsoft.com/en-us/aspnet/core/tutorials/grpc/grpc-start?view=aspnetcore-5.0&tabs=visual-studio-code在添加了 proto 文件并生成了 gRPC 服務資源文件后,接下來我添加了 LocationService 類。在下面的代碼片段中,我有一個 LocationService 類,它繼承了從 Location.proto 文件中生成的 LocationDataBase 類型。客戶端可以通過 Startup.cs 文件中 Configure 方法中的 endpoints.MapGrpcService<LocationService>() 來訪問 LocationService。當服務器收到 GetLocations 請求時,它首先通過 GetLocationData 方法調用讀取 Data 文件夾中 LocationHistory.json 文件中的所有數據(未包含在源代碼庫)。該方法返回 RootLocation 類型,其中包含 List<Location> 類型的 Location 屬性。Location 類由兩個內部屬性 Longitude 和 Latitude 組成。接下來,我循環瀏覽每個位置,然后將它們寫入 responseStream 中,返回給客戶端。服務器將消息寫入流中,直到客戶在 GetLocationRequest 對象中指定的 dataLimit。
using System.Threading.Tasks; using Grpc.Core; using Microsoft.Extensions.Logging; using System.IO; using System; using System.Linq;namespace?GPRCStreaming {public?class?LocationService : LocationData.LocationDataBase{private?readonly FileReader _fileReader;private?readonly ILogger<LocationService> _logger;public?LocationService(FileReader fileReader, ILogger<LocationService> logger){_fileReader = fileReader;_logger = logger;}public?override?async Task GetLocations(GetLocationsRequest request,IServerStreamWriter<GetLocationsResponse> responseStream,ServerCallContext context){try{_logger.LogInformation("Incoming request for GetLocationData");var locationData = await GetLocationData();var locationDataCount = locationData.Locations.Count;var dataLimit = request.DataLimit > locationDataCount ? locationDataCount : request.DataLimit;for (var i = 0; i <= dataLimit - 1; i++){var item = locationData.Locations[i];await responseStream.WriteAsync(new GetLocationsResponse{LatitudeE7 = item.LatitudeE7,LongitudeE7 = item.LongitudeE7});}}catch (Exception exception){_logger.LogError(exception, "Error occurred");throw;}}private?async Task<RootLocation> GetLocationData(){var currentDirectory = Directory.GetCurrentDirectory();var filePath = $"{currentDirectory}/Data/Location_History.json";var locationData = await _fileReader.ReadAllLinesAsync(filePath);return locationData;}} }現在,讓我們運行該服務并發送一個請求。我將使用一個叫 grpcurl 的命令行工具,它可以讓你與 gRPC 服務器交互。它基本上是針對 gRPC 服務器的 curl。
https://github.com/fullstorydev/grpcurl通過 grpcurl 與 gRPC 端點(endpoint)交互只有在 gRPC 反射服務被啟用時才可用。這允許服務可以被查詢,以發現服務器上的 gRPC 服務。擴展方法 MapGrpcReflectionService 需要引入 Microsoft.AspNetCore.Builder 的命名空間:
public?void?Configure(IApplicationBuilder app, IWebHostEnvironment env) {app.UseEndpoints(endpoints =>{endpoints.MapGrpcService<LocationService>();if (env.IsDevelopment()){endpoints.MapGrpcReflectionService();}endpoints.MapGet("/", async context =>{await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");});}); }grpcurl -plaintext -d '{"dataLimit": "100000"}' localhost:80 location_data.LocationData/GetLocations一旦服務器收到請求,它就會讀取文件,然后在位置列表中循環,直到達到 dataLimit 計數,并將位置數據返回給客戶端。
接下來,讓我們創建一個 Blazor 客戶端來調用 gRPC 服務。我們可以使用 IServiceCollection 接口上的 AddGrpcClient 擴展方法設置一個 gRPC 客戶端:
public?void?ConfigureServices(IServiceCollection services) {services.AddRazorPages();services.AddServerSideBlazor();services.AddSingleton<WeatherForecastService>();services.AddGrpcClient<LocationData.LocationDataClient>(client =>{client.Address = new Uri("http://localhost:80");}); }我使用 Virtualize Blazor 組件來渲染這些位置。Virtualize 組件不是一次性渲染列表中的每個項目,只有當前可見的項目才會被渲染。
ASP.NET Core Blazor component virtualization https://docs.microsoft.com/en-us/aspnet/core/blazor/components/virtualization?view=aspnetcore-5.0相關代碼:
@page "/locationdata"@using Grpc.Core @using GPRCStreaming @using threemillion.Data @using System.Diagnostics @using Microsoft.AspNetCore.Components.Web.Virtualization@inject IJSRuntime JSRuntime; @inject System.Net.Http.IHttpClientFactory _clientFactory @inject GPRCStreaming.LocationData.LocationDataClient _locationDataClient<table class="tableAction"><tbody><tr><td><div class="data-input"><label for="dataLimit">No of records to fetch</label><input id="dataLimit" type="number" @bind="_dataLimit" /><button @onclick="FetchData"?class="btn-submit">Call gRPC</button></div></td><td><p class="info">Total records: <span class="count">@_locations.Count</span></p><p class="info">Time taken: <span class="time">@_stopWatch.ElapsedMilliseconds</span> milliseconds</p></td></tr></tbody> </table><div class="tableFixHead"><table class="table"><thead><tr><th>Longitude</th><th>Latitude</th></tr></thead><tbody><Virtualize Items="@_locations" Context="locations"><tr><td>@locations.LongitudeE7</td><td>@locations.LatitudeE7</td></tr></Virtualize></tbody></table> </div>@code {private?int _dataLimit = 1000;private?List<Location> _locations = new List<Location>();private Stopwatch _stopWatch = new Stopwatch();protected?override?async Task OnInitializedAsync(){await FetchData();}private?async Task FetchData(){ResetState();_stopWatch.Start();using (var call = _locationDataClient.GetLocations(new GetLocationsRequest { DataLimit = _dataLimit })){await?foreach (var response in call.ResponseStream.ReadAllAsync()){_locations.Add(new Location { LongitudeE7 = response.LongitudeE7, LatitudeE7 = response.LatitudeE7 });StateHasChanged();}}_stopWatch.Stop();}private?void?ResetState(){_locations.Clear();_stopWatch.Reset();StateHasChanged();} }通過在本地運行的流調用,從 gRPC 服務器接收 2,876,679 個單獨的響應大約需要 8 秒鐘。讓我們也在 Mapbox 中加載數據:
@page "/mapbox"@using Grpc.Core @using GPRCStreaming @using System.Diagnostics@inject IJSRuntime JSRuntime; @inject System.Net.Http.IHttpClientFactory _clientFactory @inject GPRCStreaming.LocationData.LocationDataClient LocationDataClient<table class="tableAction"><tbody><tr><td><div class="data-input"><label for="dataLimit">No of records to fetch</label><input id="dataLimit" type="number" @bind="_dataLimit" /><button @onclick="LoadMap"?class="btn-submit">Load data</button></div></td><td><p class="info">Total records: <span class="count">@_locations.Count</span></p><p class="info">Time taken: <span class="time">@_stopWatch.ElapsedMilliseconds</span> milliseconds</p></td></tr></tbody> </table><div id='map' style="width: 100%; height: 90vh;"></div>@code {private?int _dataLimit = 100;private?List<object> _locations = new List<object>();private Stopwatch _stopWatch = new Stopwatch();protected?override?async Task OnAfterRenderAsync(bool firstRender){if (!firstRender){return;}await JSRuntime.InvokeVoidAsync("mapBoxFunctions.initMapBox");}private?async Task LoadMap(){ResetState();_stopWatch.Start();using (var call = LocationDataClient.GetLocations(new GetLocationsRequest { DataLimit = _dataLimit })){await?foreach (var response in call.ResponseStream.ReadAllAsync()){var pow = Math.Pow(10, 7);var longitude = response.LongitudeE7 / pow;var latitude = response.LatitudeE7 / pow;_locations.Add(new{type = "Feature",geometry = new{type = "Point",coordinates = new?double[] { longitude, latitude }}});StateHasChanged();}_stopWatch.Stop();await JSRuntime.InvokeVoidAsync("mapBoxFunctions.addClusterData", _locations);}}private?void?ResetState(){JSRuntime.InvokeVoidAsync("mapBoxFunctions.clearClusterData");_locations.Clear();_stopWatch.Reset();StateHasChanged();} }源代碼在我的 GitHub 上 ????:
https://github.com/Chandankkrr/threemillion總結
以上是生活随笔為你收集整理的基于 gRPC 和 .NET Core 的服务器流的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 设计模式之组合
- 下一篇: 为什么我的 Func 如此之慢?