grpc 传递上下文_grpc 源码笔记 02:ClientConn
上篇筆記中梳理了一把 resolver 和 balancer,這里順著前面的流程走一遍入口的 ClientConn 對象。
ClientConn
// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs.
//
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
type ClientConn struct {ctx context.Contextcancel context.CancelFunctarget stringparsedTarget resolver.Targetauthority stringdopts dialOptionscsMgr *connectivityStateManagerbalancerBuildOpts balancer.BuildOptionsblockingpicker *pickerWrappermu sync.RWMutexresolverWrapper *ccResolverWrappersc *ServiceConfigconns map[*addrConn]struct{}// Keepalive parameter can be updated if a GoAway is received.mkp keepalive.ClientParameterscurBalancerName stringbalancerWrapper *ccBalancerWrapperretryThrottler atomic.ValuefirstResolveEvent *grpcsync.EventchannelzID int64 // channelz unique identification numberczData *channelzData
}首先是 ctx 和 cancel 兩個字段,之前好像有看到什么最佳實戰說不要把 context 字段放在 struct 里傳遞而要放在 func 里傳遞,但是這里確實屬于一個非常合理的場景:管理連接的生命周期,這個 ctx 和 cancel 都是來自建立連接時的 DialContext,標準庫的 net.Conn 的結構體中也有同樣的兩個字段,這樣請求上下文中建立的連接,可以在請求結束時安全釋放掉。ClientConn 中派生出的 goroutine,也能通過 cancel 函數安全地關閉掉。
target、parsedTarget、authority、dopts 似乎都屬于比較原始的參數。
csMgr 用于管理 ClientConn 總體的連接狀態,先放一下,后面詳細看。
resolverWrapper、conns、curBalancerName、balancerWrapper、firstResolveEvent 跟名字解析、負載均衡相關,上一篇筆記中簡單看過一點。retryThrottler 大約是重試的退避策略,還沒有了解過。
sc *ServiceConfig 是服務端給出的服務參數信息,大約是 maxRequestMessageBytes、timeout 之類的控制信息,可以具體到接口級別。mkp keepalive.ClientParameters 也是參數信息,與 keepalive 相關。
channelzID 和 czData 與 channelz 的信息相關,channelz 是 grpc 內部的一些埋點監控性質的信息,大體上是一個異步的 AddTraceEvent 然后匯聚數值,看代碼的時候應該可以忽略這部分。
ClientConn 與 resolverWrapper / balancerWrapper 的交互
clientConn 與 resolver / balancer 之間的交互在上一篇筆記中簡單梳理過,好處是接口比較明確,所以交互比較清晰。clientConn 與 resolverWrapper / balancerWrapper 之間的交互都是具體的方法,手工梳理一下。
resolverWrapper 對 clientConn 的調用有 updateResolverState。
clientConn 對 resolverWrapper 的調用有 resolveNow。
clientConn 對 balancerWrapper 的調用有:
- resolveError:調用來自 clientConn 的 updateResolverState 方法,該方法是被 resolverWrapper 所調用的。
- handleSubConnStateChange,調用來自 clientConn 的 handleSubConnStateChange 方法,該方法又是被 addrConn 的 updateConnectivityState 調用的。
- updateClientConnState,調用來自 clientConn 的 updateResolverState,用于傳遞名字解析的更新。
balancerWrapper 對 clientConn 的調用有:
- newAddrConn、removeAddrConn:大體上與 NewSubConn 和 RemoveSubConn 相映射,addrConn 是具體的 SubConn 的實現。
- blockingPicker.updatePicker、csMgr.updateState:皆在 UpdateBalancerState 時調用,將 balancer.State 中的 picker 與總連接狀態設置給 clientConn。
- resolveNow:來自 ResolveNow,向 clientConn 發起 resolver 的解析。
畫一張圖:
交互的過程感覺有點像 k8s 那種偵聽結構體的字段變動做收斂邏輯的意思,比如 resolver 給出后端地址、ServiceConfig、附加元信息的 State 結構體,ClientConn 跟 balancer 都拿這一個結構體中自己關心的字段做自己的邏輯,整個流程都異步做。
這張圖里只有 handleSubConnStateChange 的來源沒標注。它是來自 addrConn 的回調,后面再展開梳理。
ClientConn 的初始化
名字解析與負載均衡都是持續動態刷新的過程,那么整個流程是怎樣啟動的?裁剪一下 DialContext 函數:
// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
//
// The target name syntax is defined in
// <https://github.com/grpc/grpc/blob/master/doc/naming.md>.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {cc := &ClientConn{target: target,csMgr: &connectivityStateManager{},conns: make(map[*addrConn]struct{}),dopts: defaultDialOptions(),blockingpicker: newPickerWrapper(),czData: new(channelzData),firstResolveEvent: grpcsync.NewEvent(),}cc.retryThrottler.Store((*retryThrottler)(nil))cc.ctx, cc.cancel = context.WithCancel(context.Background())for _, opt := range opts {opt.apply(&cc.dopts)}// 好像是初始化什么鉤子chainUnaryClientInterceptors(cc)chainStreamClientInterceptors(cc)defer func() {if err != nil {cc.Close()}}()if channelz.IsOn() {// ... 初始化 channelz}if !cc.dopts.insecure {// ... tlz 相關參數檢查}if cc.dopts.defaultServiceConfigRawJSON != nil {// ... 解析參數指定的默認 ServiceConfig 的 JSON}cc.mkp = cc.dopts.copts.KeepaliveParamsif cc.dopts.copts.Dialer == nil {// ... 默認 Dialer 函數}if cc.dopts.copts.UserAgent != "" {cc.dopts.copts.UserAgent += " " + grpcUA} else {cc.dopts.copts.UserAgent = grpcUA}// 配置 Dial 的超時if cc.dopts.timeout > 0 {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)defer cancel()}// 退出函數時,如果 DialContext 的 ctx 如果中途撤銷或者超時了,則返回 ctx.Err()defer func() {select {case <-ctx.Done():conn, err = nil, ctx.Err()default:}}()// 從 scChan 中偵聽接收 serviceConfig 信息scSet := falseif cc.dopts.scChan != nil {// Try to get an initial service config.select {case sc, ok := <-cc.dopts.scChan:if ok {cc.sc = &scscSet = true}default:}}// 默認取指數退避if cc.dopts.bs == nil {cc.dopts.bs = backoff.DefaultExponential}// 根據名字的 Scheme 選擇 resolverBuilder// Determine the resolver to use.cc.parsedTarget = parseTarget(cc.target)grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)if resolverBuilder == nil {// .. 如果沒有找到則按默認的 resolverBuilder}creds := cc.dopts.copts.TransportCredentials// .. 初始化 cc.authority// 阻塞等待 scChanif cc.dopts.scChan != nil && !scSet {// Blocking wait for the initial service config.select {case sc, ok := <-cc.dopts.scChan:if ok {cc.sc = &sc}case <-ctx.Done():return nil, ctx.Err()}}if cc.dopts.scChan != nil {go cc.scWatcher()}// 初始化 balancervar credsClone credentials.TransportCredentialsif creds := cc.dopts.copts.TransportCredentials; creds != nil {credsClone = creds.Clone()}cc.balancerBuildOpts = balancer.BuildOptions{DialCreds: credsClone,CredsBundle: cc.dopts.copts.CredsBundle,Dialer: cc.dopts.copts.Dialer,ChannelzParentID: cc.channelzID,Target: cc.parsedTarget,}// Build the resolver.rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)if err != nil {return nil, fmt.Errorf("failed to build resolver: %v", err)}cc.mu.Lock()cc.resolverWrapper = rWrappercc.mu.Unlock()// A blocking dial blocks until the clientConn is ready.if cc.dopts.block {for {s := cc.GetState()if s == connectivity.Ready {break} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {if err = cc.blockingpicker.connectionError(); err != nil {terr, ok := err.(interface {Temporary() bool})if ok && !terr.Temporary() {return nil, err}}}if !cc.WaitForStateChange(ctx, s) {// ctx got timeout or canceled.return nil, ctx.Err()}}}return cc, nil
}cc.dopts.scChan 這里有一些邏輯,再就是在 dopts.block 時,有主動等連接的邏輯。
順著 cc.dopts.scChan 找過去,發現參數定義的 dialoptions 里面有這一段:
// WithServiceConfig returns a DialOption which has a channel to read the
// service configuration.
//
// Deprecated: service config should be received through name resolver or via
// WithDefaultServiceConfig, as specified at
// <https://github.com/grpc/grpc/blob/master/doc/service_config.md>. Will be
// removed in a future 1.x release.
func WithServiceConfig(c <-chan ServiceConfig) DialOption {return newFuncDialOption(func(o *dialOptions) {o.scChan = c})
}說 scChan 這個字段要廢棄了,要么換 WithDefaultServiceConfig 傳一個默認的 json,要么通過 resolver 的 UpdateState 中 State 結構體里的 ServiceConfig 字段去動態拿。
ServiceConfig 比想象中更神通廣大一點,ClientConn 中有個 applyServiceConfigAndBalancer 方法,甚至會根據動態下發的 ServiceConfig 來調用 switchBalancer 動態切換 balancer 策略。
csMgr 與 WaitForStateChange
回去單獨看一下 cc.dopts.block 的邏輯:
// A blocking dial blocks until the clientConn is ready.if cc.dopts.block {for {s := cc.GetState()if s == connectivity.Ready {break} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {if err = cc.blockingpicker.connectionError(); err != nil {terr, ok := err.(interface {Temporary() bool})if ok && !terr.Temporary() {return nil, err}}}if !cc.WaitForStateChange(ctx, s) {// ctx got timeout or canceled.return nil, ctx.Err()}}}大約是一個死循環連接狀態直到 Ready 為止,ClientConn 的連接狀態來自 cc.csMgr 做管理,而 csMgr 中的連接狀態來自 balancer 對 ClientConn 的 UpdateState 的回調。balancer 的連接狀態是對多個連接的連接狀態的匯聚,大約是只要有一個連接 Ready,便將 balancer 的連接狀態視為 Ready。之前看 balancer 做匯聚連接狀態還不大清楚這個的用處,現在看應該主要是為 WaitForStateChange 這個方法服務的,而且這個方法是公共方法,是 ClientConn 的對外 API。
工程上如果開啟 cc.dopts.block,似乎配合一個 cc.dopts.timeout 比較好,這樣能超時退出。
csMgr 主要做的事情是輔助 ClientConn 實現 connectivity.Reporter 接口,尤其是 WaitForStateChange 方法:
// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {mu sync.Mutexstate connectivity.StatenotifyChan chan struct{}channelzID int64
}// ...// updateState updates the connectivity.State of ClientConn.
// If there's a change it notifies goroutines waiting on state change to
// happen.
func (csm *connectivityStateManager) updateState(state connectivity.State) {csm.mu.Lock()defer csm.mu.Unlock()if csm.state == connectivity.Shutdown {return}if csm.state == state {return}csm.state = stateif channelz.IsOn() {// ...}if csm.notifyChan != nil {// There are other goroutines waiting on this channel.close(csm.notifyChan)csm.notifyChan = nil}
}func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {csm.mu.Lock()defer csm.mu.Unlock()if csm.notifyChan == nil {csm.notifyChan = make(chan struct{})}return csm.notifyChan
}// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
// This is an EXPERIMENTAL API.
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {ch := cc.csMgr.getNotifyChan()if cc.csMgr.getState() != sourceState {return true}select {case <-ctx.Done():return falsecase <-ch:return true}
}notifyChan 這個 channel 僅通過 close 做廣播性的通知。每當 state 狀態變化會惰性產生新的 notifyChan,當這個 notifyChan 被關閉時就意味著狀態有變化了,起到一個類似條件變量的作用。
blockingpicker
除了 balancerWrapper、resolverWrapper,ClientConn 中還有一個 pickerWrapper 類型的 blockingPicker 字段,本體也是同樣主要是并發同步為主。
// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {mu sync.Mutexdone boolblockingCh chan struct{}picker balancer.V2Picker// The latest connection error. TODO: remove when V1 picker is deprecated;// balancer should be responsible for providing the error.*connErr
}type connErr struct {mu sync.Mutexerr error
}大約是初始化時生成一個 blockingCh,隨后每當 updatePickerV2 改動 picker 時,則關閉舊 blockingCh 同時生成一個新的 blockingCh。
pickerWrapper 對外的主要功能入口是 pick 方法,先看它的注釋:
// pick returns the transport that will be used for the RPC.
// It may block in the following cases:
// - there's no picker
// - the current picker returns ErrNoSubConnAvailable
// - the current picker returns other errors and failfast is false.
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {// ...這些阻塞唯有 balancer 生成新的 picker 對象交給 ClientConn 才能解除。實現風格上,與 WaitForStateChange 類似,每當狀態變化時關閉舊 chan、生成新 chan,上鎖確保狀態變化與更替 chan 兩步操作的原子性,對方阻塞等待 chan 的關閉。
picker.Pick() 方法本身是線程安全的,不是很清楚每個 SubConn 能否被多個 goroutine 使用,后面再確認一下這點。
先看到這里,下面是 addrConn,也就是 SubConn 的實現。
總結
以上是生活随笔為你收集整理的grpc 传递上下文_grpc 源码笔记 02:ClientConn的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python提高照片分辨率怎么调_实拍1
- 下一篇: php读取西门子plc_基于Socket