从源码透析gRPC调用原理
導語
gRPC是什么,不用多說了。
gRPC如何用,也不用多說了 。
但是,gRPC是如何work的,清楚的理解其調用邏輯,對于我們更好、更深入的使用gRPC很有必要。因此我們必須深度解析下gRPC的實現邏輯,在本文中,將分別從客戶端和服務端來說明gRPC的實現原理。
準備條件
本文將以gRPC Github上helloword代碼作為一個完整的項目示例作為介紹的基礎,在展開分析之前,簡單介紹下作為gRPC的文件結構:
greeter_client greeter_server helloworld mock_helloworld在這里,我們只需要關注前三個文件夾的內容。
其中,greet_client和greet_server文件中分別是grpc客戶端和服務端的業務調用代碼,包含了一個標準的gRPC調用過程。helloworld中包含了是protobuf的協議文件和生成的helloworld.pb.go文件(至于pb的協議和*.pb.go文件的生成等內容,不作為本文的介紹范圍,不再贅述)。
客戶端
首先,我們以Github官網上的example為示例來一覽gRPC client端的使用,從而跟蹤其調用的邏輯個原理。總的來看,調用的過程基本就是分為三步:
- 創建connection
- 創建業務客戶端實例
- 調用RPC接口
創建connection
通過grpc.Dial()接口創建了一個ClientConn類型實例。
Dial()函數的第一個參數作為endpoint我們就不多說了,同時Dial()還接受變長參數DialOption。DialOption是一個接口類型,在grpc中存在著多種返回了DialOption類型的函數,這些返回了DialOption類型的函數,例如編解碼、負載均衡策略等,一些函數聲明示例如下:
func WithBalancer() DialOption func WithInsecure() DialOption func WithCodec() DialOption根據client的需求,調用方在調用Dial()的時候可以將這些函數作為參數傳入Dial()中。
在Dial()中,首先是會根據參數進行一系列的初始化和賦值操作,就不在這里列出說明,而對于這些DailOption參數,在Dial()中最終實現對grpc.ClientConn的成員變量dopts中的CallOption進行了賦值。
通過Dial()的調用,grpc已經建立了到服務端的鏈接,同時也會附帶一些諸如負載均衡、證書檢查、Backoff等策略的執行(如果有進行配置的話)。
創建客戶端實例
創建業務client實例,在使用gRPC的時候,我們都知道其傳遞協議是protobuf。
而NewGreeterClient()則是通過對pb協議生成的代碼接口,存在于helloworld.pb.go中,該函數主要是返回了一個greeterClient類型的實例。
調用RPC請求
SayHello()中的RPC接口也是存在于根據pb協議生成的helloworld.pb.go文件中。
SayHello()除了接受一個context存儲上下文信息和一個request類型參數,同時也支持一個CallOption類型的變量。關于CallOption在上文中有提到,其本身也是一個接口,其中before()用于在請求發送之前設置參數,而after()則是在請求調用完畢之后提取信息。通過對這兩個函數的調用,方便的實現了在請求前后的一些參數設置的功能:
type CallOption interface {before(*callInfo) errorafter(*callInfo) }任何一個我們我們上文說到了返回值為DialOption的函數,大部分都有一個對應的結構實現了CallOption,諸如上面的WithCodec(),其對應的結構為:
type CustomCodecCallOption struct {Codec Codec }func (o CustomCodecCallOption) before(c *callInfo) error {c.codec = o.Codecreturn nil } func (o CustomCodecCallOption) after(c *callInfo) {}回到SayHello()函數的邏輯中來,該函數最終會調用grpc中的call.go中的invoke函數來執行具體的操作。
在invoke()函數中,newClientStream()會首先獲取傳輸層Trasport結構的實例并包裝到一個ClientStream實例中返回,隨后將RPC請求通過SendMsg()接口發送出去,注意,由于SendMsg()并不會等待服務端收到數據,因此還需要通過RecvMsg()同步接收收到的回復消息(關于SendMsg()和RecvMsg()中的具體發送和接收數據邏輯,不在贅述,可以去源碼再詳細了解)。
// pb.go文件 func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {out := new(HelloReply)err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)if err != nil {return nil, err}return out, nil }...// grpc/grpc.go/call.go文件 func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)if err != nil {return err}if err := cs.SendMsg(req); err != nil {return err}return cs.RecvMsg(reply) }服務端
對于Server端,我們同樣地根據Github上的官網示例來展開說明。總的來看,grpc在server端的調用邏輯如下,基本就是分為四步:
- 創建端口監聽listener
- 創建server實例
- 注冊服務(并未真正開始服務)
- 啟動服務端
創建監聽端口
創建listener,不用多介紹了,就是創建了一個監聽tcp端口的Listener實例。
創建服務端實例
NewServer()方法創建了一個grpc.Server實例,其函數內部會對該實例進行一系列初始化賦值操作。該接口與客戶端中的Dial()接口類似,可以接受多個ServerOption入參,在helloworld的示例中并未傳入任務參數,一個簡單那的示例如下:
svr := grpc.NewServer(grpc.CustomCodec(proxy.Codec()))在grpc中,也存在了多種類似于CustomCodec()這樣返回值類型為ServerOption的函數,從而滿足調用方在需要求進行傳參賦值:
func CustomCodec() ServerOption func MaxConcurrentStreams() ServerOption func UnknownServiceHandler() ServerOption服務注冊
RegisterGreeterServer()是由helloworld.pb.go生成的接口,其主要調用了grpc的RegisterService() 來注冊當前service及其實現。
grpc.RegisterService()接收一個參數類型為ServiceDesc的實例_Greeter_serviceDesc用以歲service的描述說明,同時接收一個service實例作注冊進來。其中_Greeter_serviceDesc是由pb生成的對業務RPC接口的描述,如下所示:
// helloworld.pb.go func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {s.RegisterService(&_Greeter_serviceDesc, srv) }var _Greeter_serviceDesc = grpc.ServiceDesc{ServiceName: "helloworld.Greeter",HandlerType: (*GreeterServer)(nil),Methods: []grpc.MethodDesc{{MethodName: "SayHello",Handler: _Greeter_SayHello_Handler,},},Streams: []grpc.StreamDesc{},Metadata: "helloworld.proto", }我們可以看到,在grpc.ServiceDesc中對Methods變量進行了賦值。其中Methods包含了一個RPC接口名到handler的映射數組,描述了當前service支持的所有的方法,MethodName即為調用的RPC接口名,而handler的值_Greeter_SayHello_Handler()也是由pb生成的方法,在其內部通過注冊進來的service實例,實現了對我們的業務函數SayHello()進行了調用:
func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {in := new(HelloRequest)if err := dec(in); err != nil {return nil, err}if interceptor == nil {return srv.(GreeterServer).SayHello(ctx, in)}info := &grpc.UnaryServerInfo{Server: srv,FullMethod: "/helloworld.Greeter/SayHello",}handler := func(ctx context.Context, req interface{}) (interface{}, error) {return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))}return interceptor(ctx, in, info, handler) }啟動服務
Serve函數中開始接收來到listener的請求(實際上也就是對listener進行了Accept()),并為每一個請求創建一個go程來服務。
Serve函數的邏輯判斷比較復雜,但其實真正的調用邏輯過程十分簡單,在下面列出,從而有助于我們的理解。
func (s *Server) Serve(lis net.Listener) error {...for {// 開始接受服務rawConn, err := lis.Accept()...// 為每一個請求啟動一個go程來處理鏈接s.serveWG.Add(1)go func() {s.handleRawConn(rawConn)s.serveWG.Done()}()} }func (s *Server) handleRawConn(rawConn net.Conn) {// 鑒權操作conn, authInfo, err := s.useTransportAuthenticator(rawConn)...// 基于HTTP2,創建一個ServerTransportst := s.newHTTP2Transport(conn, authInfo)...go func() {s.serveStreams(st)s.removeConn(st)}() }其中,newHTTP2Transport()的代碼主要部分有一些關于HTTP2的賦值和初始化操作,存在于internal/transport/http2_server.go中,這兒就不再進入介紹http2的具體實現方式了。而serveStreams()中則主要是調用了HandleStreams()接口去真正的接受請求流。
func (s *Server) serveStreams(st transport.ServerTransport) {defer st.Close()var wg sync.WaitGroupst.HandleStreams(func(stream *transport.Stream) {wg.Add(1)go func() {defer wg.Done()s.handleStream(st, stream, s.traceInfo(st, stream))}()}, func(ctx context.Context, method string) context.Context {if !EnableTracing {return ctx}tr := trace.New("grpc.Recv."+methodFamily(method), method)return trace.NewContext(ctx, tr)})wg.Wait() }HandleStreams()中的實現在grpc-go/internal/transport/handler_server.go文件中。
在HandleStreams()實現中前面一大部分是對數據流Stream的初始化,數據接收以及賦值,詳細的處理過程大家可以去文件中詳細的看代碼,這里我們只做邏輯流程的分析。在數據流stream接收完畢后,通過注冊進來的server的startStream()來處理數據流。
注冊進來的startStream()最終調用了Server中的startStream()函數,區分出是unary請求還是stream請求,并分別通過processUnaryRPC()和processStreamingRPC()進行區分處理。對于兩個主要的處理函數processUnaryRPC()和processStreamingRPC(),基本上是一些具體的數據接收、編解碼等操作,就不在浪費篇幅貼出代碼了。
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {...// 數據流Stream的接受和賦值startStream(s)ht.runStream()close(requestOver)// 等待數據讀取完畢req.Body.Close()<-readerDone }func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {...// 判斷Unary RPC還是Streaming RPCif md, ok := srv.md[method]; ok {s.processUnaryRPC(t, stream, srv, md, trInfo)return}if sd, ok := srv.sd[method]; ok {s.processStreamingRPC(t, stream, srv, sd, trInfo)return}...if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)return}... }最后,簡單以一個圖示來展示grpc服務端的調用流程:
gRPC Server簡化調用流程
總結
上面的就是關于gRPC調用邏輯的分析,gRPC中的代碼十分復雜,本文只涉及了其調用邏輯的分析,在分析展示源碼時,省略的一些錯誤處理或者數據處理的代碼,而側重于邏輯調用的過程,從而在使用gRPC的時候可以更好的理解其原理。
https://cloud.tencent.com/developer/article/1189548
總結
以上是生活随笔為你收集整理的从源码透析gRPC调用原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Shell 企业29道面试题 [转]
- 下一篇: Tensorflow Lite之编译生成