gRPC 流式调用
gRPC 使用 Protocol buffers 作為接口定義語言(IDL)來描述服務接口和輸入輸出消息的結構,目前支持 4 種定義服務方法類型:
| 簡單 RPC | 客戶端傳入一個請求對象,服務端返回一個結果對象 |
| 客戶端流式 RPC | 客戶端傳入多個請求對象,服務端返回一個結果對象 |
| 服務端流式 RPC | 客戶端傳入一個請求對象,服務端返回多個結果對象 |
| 雙向流式 RPC | 客戶端傳入多個請求對象,服務端返回多個結果對象 |
RPC 定義
簡單 RPC:一般這種方式使用較多,如下:定義?SayHello?方法,輸入?HelloRequest,返回?HelloResponse?。
| 1 2 3 4 5 6 7 8 9 10 11 | service HelloService { rpc SayHello (HelloRequest) returns (HelloResponse); } message HelloRequest { string greeting = 1; } message HelloResponse { string reply = 1; } |
而流式 RPC 定義與 簡單 RPC 的區別只是在請求或返回參數前增加了?stream?關鍵詞,如下:
| 1 2 3 4 5 6 7 8 | service HelloService { // 客戶端流式 RPC rpc SayHello1 (stream HelloRequest) returns (HelloResponse); // 服務端流式 RPC rpc SayHello2 (HelloRequest) returns (stream HelloResponse); // 雙向流式 RPC rpc SayHello3 (stream HelloRequest) returns (stream HelloResponse); } |
gRPC 能支持流式調用本質是因為 gRPC 通信是基于 HTTP/2 實現的,HTTP/2 具有流的概念,流是為了實現 HTTP/2 的多路復用。流是服務器和客戶端在 HTTP/2 連接內用于交換幀數據的獨立雙向序列,邏輯上可看做一個較為完整的交互處理單元,即表達一次完整的資源請求、響應數據交換流程。
使用場景
在 gRPC 中消息接收大小?GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH?默認是 4M,如果大于該值,則會提示:Error: grpc: received message larger than max (xxxxxx vs. 4194304),當然我們可以修改默認值解決問題,但如果默認值支持過大對服務器資源也是一種消耗,這時候其實應該考慮使用流式調用,有效將數據進行分批處理,提高性能。
示例
這里主要介紹一下雙向流式 RPC(客戶端和服務端流式 RPC 類似),完整代碼請?前往這里查看?。雙向流模擬功能是客戶端流式輸入文件路徑,服務端針對每個文件每次最多讀取 1M 的數據返回,客戶端拿到數據后生成新文件。
接口定義
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | syntax = "proto3"; package GrpcStream; service StreamTest { // 雙向流程 RPC rpc BidirectionalStream(stream BidirectionalStreamRequest) returns (stream BidirectionalStreamResponse) {} } message BidirectionalStreamRequest { // 文件路徑 string file_path = 1; } message BidirectionalStreamResponse { // 文件路徑 string file_path = 1; // 數據 bytes data = 2; } |
代碼實現
這里是基于 .NET Core 3.0 使用 gRPC,可以通過 VS 預置的?gRPC 服務?模板來創建服務端,創建后將默認的 porto 文件替換成上面的內容。
服務端代碼實現:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public override async Task BidirectionalStream(IAsyncStreamReader<BidirectionalStreamRequest> requestStream, IServerStreamWriter<BidirectionalStreamResponse> responseStream, ServerCallContext context) { var i = 0; // 監聽客戶端數據輸入 while (await requestStream.MoveNext()) { // 打印次數 Console.WriteLine(i++); using var fs = File.Open(requestStream.Current.FilePath, FileMode.Open); var leftSize = fs.Length; // 1M var buff = new byte[1048576]; while (leftSize > 0) { var len = await fs.ReadAsync(buff); leftSize -= len; Console.WriteLine($"response {requestStream.Current.FilePath} {len} bytes"); // 流式返回數據 await responseStream.WriteAsync(new BidirectionalStreamResponse { FilePath = requestStream.Current.FilePath, Data = ByteString.CopyFrom(buff, 0, len) }); } } } |
客戶端代碼實現:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | // 測試文件,key 是已存在的文件,value 是需要生成的文件 static readonly Dictionary<string, string> fileDic = new Dictionary<string, string>() { {@"d:\dapr\daprd_windows_amd64.zip", @"d:\dapr\daprd_windows_amd64_new.zip" }, {@"d:\dapr\injector_windows_amd64.zip", @"d:\dapr\injector_windows_amd64_new.zip" }, }; static StreamTest.StreamTestClient client; static async Task Main(string[] args) { // 連接 gRPC 服務 var channel = GrpcChannel.ForAddress("https://localhost:5001"); client = new StreamTest.StreamTestClient(channel); await BidirectionalStreamTestAsync(); Console.ReadKey(); } static async Task BidirectionalStreamTestAsync() { using var call = client.BidirectionalStream(); var responseTask = Task.Run(async () => { // 接收返回值 var iterator = call.ResponseStream; // 監聽服務端數據返回 while (await iterator.MoveNext()) { Console.WriteLine($"write to new file {fileDic[iterator.Current.FilePath]} {iterator.Current.Data.Length} bytes"); // 寫入新文件 using var fs = new FileStream(fileDic[iterator.Current.FilePath], FileMode.Append); iterator.Current.Data.WriteTo(fs); } }); var rand = new Random(); foreach (var item in fileDic) { // 流式輸入 await call.RequestStream.WriteAsync(new BidirectionalStreamRequest { FilePath = item.Key }); await Task.Delay(rand.Next(200)); } await call.RequestStream.CompleteAsync(); await responseTask; } |
執行結果:
參考資料
gRPC Concepts
總結
- 上一篇: 【WPF on .NET Core 3.
- 下一篇: .NET如何将字符串分隔为字符