gRPC的那些事 - streaming
gRPC是一個高性能、通用的開源RPC框架,其由Google主要面向移動應用開發并基于HTTP/2協議標準而設計,基于ProtoBuf(Protocol Buffers)序列化協議開發,且支持眾多開發語言。 gRPC提供了一種簡單的方法來精確地定義服務和為iOS、Android和后臺支持服務自動生成可靠性很強的客戶端功能庫。 客戶端充分利用高級流和鏈接功能,從而有助于節省帶寬、降低的TCP鏈接次數、節省CPU使用、和電池壽命。
gRPC具有以下重要特征:
強大的IDL特性 RPC使用ProtoBuf來定義服務,ProtoBuf是由Google開發的一種數據序列化協議,性能出眾,得到了廣泛的應用。
支持多種語言 支持C++、Java、Go、Python、Ruby、C#、Node.js、Android Java、Objective-C、PHP等編程語言。 3.基于HTTP/2標準設計
gRPC已經應用在Google的云服務和對外提供的API中。
gRPC開發起來非常的簡單,你可以閱讀 一個?helloworld 的例子來了解它的基本開發流程 (本系列文章以Go語言的開發為例)。
最基本的開發步驟是定義?proto?文件, 定義請求 Request 和 響應 Response 的格式,然后定義一個服務 Service, Service可以包含多個方法。
基本的gRPC開發很多文章都介紹過了,官方也有相關的文檔,這個系列的文章也就不介紹這些基礎的開發,而是想通過代碼演示gRPC更深入的開發。 作為這個系列的第一篇文章,想和大家分享一下gRPC流式開發的知識。
gRPC的流可以分為三類, 客戶端流式發送、服務器流式返回以及客戶端/服務器同時流式處理, 也就是單向流和雙向流。 下面針對這三種情況分別通過例子介紹。
服務器流式響應
通過使用流(streaming),你可以向服務器或者客戶端發送批量的數據, 服務器和客戶端在接收這些數據的時候,可以不必等所有的消息全收到后才開始響應,而是接收到第一條消息的時候就可以及時的響應, 這顯然比以前的類HTTP 1.1的方式更快的提供響應,從而提高性能。
比如有一批記錄個人收入數據,客戶端流式發送給服務器,服務器計算出每個人的個人所得稅,將結果流式發給客戶端。這樣客戶端的發送可以和服務器端的計算并行之行,從而減少服務的延遲。這只是一個簡單的例子,你可以利用流來實現RPC調用的異步執行,將客戶端的調用和服務器端的執行并行的處理,
當前gRPC通過 HTTP2 協議傳輸,可以方便的實現 streaming 功能。 如果你對gRPC如何通過 HTTP2 傳輸的感興趣, 你可以閱讀這篇文章?gRPC over HTTP2, 它描述了 gRPC 通過 HTTP2 傳輸的低層格式。Request 和 Response 的格式如下:
- Request → Request-Headers *Length-Prefixed-Message EOS
- Response → (Response-Headers *Length-Prefixed-Message Trailers) / Trailers-Only
要實現服務器的流式響應,只需在proto中的方法定義中將響應前面加上stream標記, 如下圖中SayHello1方法,HelloReply前面加上stream標識。
| 123456789101112131415161718192021 | syntax = "proto3";package pb;import "github.com/gogo/protobuf/gogoproto/gogo.proto";// The greeting service definition.service Greeter {// Sends a greetingrpc SayHello1 (HelloRequest) returns (stream HelloReply) {}}// The request message containing the user's name.message HelloRequest {string name = 1;}// The response message containing the greetingsmessage HelloReply {string message = 1;} |
這個例子中我使用gogo來生成更有效的protobuf代碼,當然你也可以使用原生的工具生成。
| 12 | GOGO_ROOT=${GOPATH}/src/github.com/gogo/protobufprotoc -I.:${GOPATH}/src --gogofaster_out=plugins=grpc:. helloworld.proto |
生成的代碼就已經包含了流的處理,所以和普通的gRPC代碼差別不是很大, 只需要注意的服務器端代碼的實現要通過流的方式發送響應。
| 12345678 | func (s *server) SayHello1(in *pb.HelloRequest, gs pb.Greeter_SayHello1Server) error {name := in.Name for i := 0; i < 100; i++ {gs.Send(&pb.HelloReply{Message: "Hello " + name + strconv.Itoa(i)})} return nil} |
和普通的gRPC有什么區別?
普通的gRPC是直接返回一個HelloReply對象,而流式響應你可以通過Send方法返回多個HelloReply對象,對象流序列化后流式返回。
查看它低層的實現其實是使用ServerStream.SendMsg實現的。
| 123456789 | type Greeter_SayHello1Server interface {Send(*HelloReply) errorgrpc.ServerStream}func (x *greeterSayHello1Server) Send(m *HelloReply) error { return x.ServerStream.SendMsg(m)} |
對于客戶端,我們需要關注兩個方面有沒有變化, 一是發送請求,一是讀取響應。下面是客戶端的代碼:
| 123456789101112131415161718192021222324 | conn, err := grpc.Dial(*address, grpc.WithInsecure()) if err != nil {log.Fatalf("faild to connect: %v", err)} defer conn.Close()c := pb.NewGreeterClient(conn)stream, err := c.SayHello1(context.Background(), &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}for {reply, err := stream.Recv() if err == io.EOF { break} if err != nil {log.Printf("failed to recv: %v", err)}log.Printf("Greeting: %s", reply.Message)} |
發送請求看起來沒有太大的區別,只是返回結果不再是一個單一的HelloReply對象,而是一個Stream。這和服務器端代碼正好對應,通過調用stream.Recv()返回每一個HelloReply對象, 直到出錯或者流結束(io.EOF)。
可以看出,生成的代碼提供了往/從流中方便的發送/讀取對象的能力,而這一切, gRPC都幫你生成好了。
客戶端流式發送
客戶端也可以流式的發送對象,當然這些對象也和上面的一樣,都是同一類型的對象。
首先還是要在proto文件中定義,與上面的定義類似,在請求的前面加上stream標識。
| 12345678910111213141516171819202122 | syntax = "proto3";package pb;import "github.com/gogo/protobuf/gogoproto/gogo.proto";option (gogoproto.unmarshaler_all) = true;// The greeting service definition.service Greeter {rpc SayHello2 (stream HelloRequest) returns (HelloReply) {}}// The request message containing the user's name.message HelloRequest {string name = 1;}// The response message containing the greetingsmessage HelloReply {string message = 1;} |
注意這里我們只標記了請求是流式的, 響應還是以前的樣子。
生成相關的代碼后, 客戶端的代碼為:
| 12345678910111213141516171819 | func sayHello2(c pb.GreeterClient) { var err errorstream, err := c.SayHello2(context.Background()) for i := 0; i < 100; i++ { if err != nil {log.Printf("failed to call: %v", err) break}stream.Send(&pb.HelloRequest{Name: *name + strconv.Itoa(i)})}reply, err := stream.CloseAndRecv() if err != nil {fmt.Printf("failed to recv: %v", err)}log.Printf("Greeting: %s", reply.Message)} |
這里的調用c.SayHello2并沒有直接穿入請求參數,而是返回一個stream,通過這個stream的Send發送,我們可以將對象流式發送。這個例子中我們發送了100個請求。
客戶端讀取的方法是stream.CloseAndRecv(),讀取完畢會關閉這個流的發送,這個方法返回最終結果。注意客戶端只負責關閉流的發送。
服務器端的代碼如下:
| 123456789101112131415161718 | func (s *server) SayHello2(gs pb.Greeter_SayHello2Server) error { var names []string for {in, err := gs.Recv() if err == io.EOF {gs.SendAndClose(&pb.HelloReply{Message: "Hello " + strings.Join(names, ",")}) return nil} if err != nil {log.Printf("failed to recv: %v", err) return err}names = append(names, in.Name)} return nil} |
服務器端收到每條消息都進行了處理,這里的處理簡化為增加到一個slice中。一旦它檢測的客戶端關閉了流的發送,它則把最終結果發送給客戶端,通過關閉這個流。流的關閉通過io.EOF這個error來區分。
雙向流
將上面兩個例子整合,就是雙向流的例子。 客戶端流式發送,服務器端流式響應,所有的發送和讀取都是流式處理的。
proto中的定義如下, 請求和響應的前面都加上了stream標識:
| 123456789101112131415161718192021 | syntax = "proto3";package pb;import "github.com/gogo/protobuf/gogoproto/gogo.proto";// The greeting service definition.service Greeter {rpc SayHello3 (stream HelloRequest) returns (stream HelloReply) {}}// The request message containing the user's name.message HelloRequest {string name = 1;}// The response message containing the greetingsmessage HelloReply {string message = 1;} |
客戶端的代碼:
| 12345678910111213141516171819202122232425 | func sayHello3(c pb.GreeterClient) { var err errorstream, err := c.SayHello3(context.Background()) if err != nil {log.Printf("failed to call: %v", err) return} var i int64 for {stream.Send(&pb.HelloRequest{Name: *name + strconv.FormatInt(i, 10)}) if err != nil {log.Printf("failed to send: %v", err) break}reply, err := stream.Recv() if err != nil {log.Printf("failed to recv: %v", err) break}log.Printf("Greeting: %s", reply.Message)i++}} |
通過stream.Send發送請求,通過stream.Recv讀取響應。客戶端可以通過CloseSend方法關閉發送流。
服務器端代碼也是通過Send發送響應,通過Recv響應:
| 12345678910111213141516 | func (s *server) SayHello3(gs pb.Greeter_SayHello3Server) error { for {in, err := gs.Recv() if err == io.EOF { return nil} if err != nil {log.Printf("failed to recv: %v", err) return err}gs.Send(&pb.HelloReply{Message: "Hello " + in.Name})} return nil} |
這基本上"退化"成一個TCP的client和server的架構。
在實際的應用中,你可以根據你的場景來使用單向流還是雙向流。
http://colobu.com/2017/04/06/dive-into-gRPC-streaming/
總結
以上是生活随笔為你收集整理的gRPC的那些事 - streaming的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: TENSORFLOW GUIDE: EX
- 下一篇: gRPC初体验