Istio Pilot 源码分析(二)
張海東,??多點(diǎn)生活(成都)云原生開發(fā)工程師。
本篇主要介紹 Pilot 源碼中的 ServiceEntryStore 及其推送 xDS 的流程。
本文為?Istio Pilot 源碼分析系列的第二篇文章。
Istio Pilot 源碼分析(一)
了解了?Pilot?源碼的基本結(jié)構(gòu)和啟動流程之后,我們可以深入探索?Pilot?究竟是怎么下發(fā)?xDS?協(xié)議的,以及協(xié)議的生成邏輯。相信大家都會有這些疑問:控制面與數(shù)據(jù)面詳細(xì)的交互過程是什么?到底什么時候才會增量推送?增量推送判斷的邏輯是什么?非?Kubernetes?原生的服務(wù)(如存在于虛擬機(jī)的服務(wù)、?Dubbo?服務(wù)等)到底是怎么注冊并且經(jīng)過一系列轉(zhuǎn)化下發(fā)至數(shù)據(jù)面的?
帶著這些問題,開始我們今天對?Pilot?的探索。
注:本文基于?istio release-1.7?分支分析,其他版本的代碼結(jié)構(gòu)會有所不同。
ServiceEntryStore
在多點(diǎn)落地?ServiceMesh?的過程中,大量的用到了?ServiceEntry?,每一個?Dubbo?服務(wù)都會映射一個?ServiceEntry?創(chuàng)建在?Kubernetes?里。?ServiceEntry?的作用就是將集群外部的服務(wù)注冊到?Pilot?中,再統(tǒng)一由?ServiceController?進(jìn)行管理。相應(yīng)的,管理外部服務(wù)實(shí)例的對象為?WorkloadEntry?,?ServiceEntry?可以通過?LabelSelector?篩選出自身對應(yīng)的實(shí)例。
ServiceEntry?是作為 CR (Custome Resource) 保存在?Kubernetes?集群里的(也可以通過 MCP 服務(wù)直接發(fā)送給?Pilot?),暫時只討論在集群中創(chuàng)建 CR 的情況。在上一篇源碼分析中我們介紹到,?Pilot?是通過?ConfigController?來監(jiān)聽創(chuàng)建在集群中的 CR 的,?ServiceEntry?也不例外,保存這些 CR 的?ConfigStore?會被轉(zhuǎn)化為?ServiceEntryStore?中的?store?(轉(zhuǎn)化的詳情見上一篇源碼分析),這就是最終?Pilot?存儲?ServiceEntry?的地方。當(dāng)監(jiān)聽的資源推送更改的事件時,會觸發(fā)?ServiceEntryStore?對應(yīng)的?handler?處理后續(xù)的流程。
我們先來看一下?ServiceEntryStore?的結(jié)構(gòu)和它提供的方法:
// istio/pilot/pkg/serviceregistry/serviceentry/servicediscovery.go:61 // ServiceEntryStore communicates with ServiceEntry CRDs and monitors for changes type ServiceEntryStore struct {XdsUpdater model.XDSUpdater // 用來接收 EnvouXdsServer 的接口,主要用來 Push 相應(yīng)的 xDS 更新請求store model.IstioConfigStore // 保存 ServiceEntry 實(shí)例的地方storeMutex sync.RWMutex // 讀寫 store 時需要的鎖// 以 hostname/namespace 以及類型(是服務(wù)還是實(shí)例)等作為索引的服務(wù)實(shí)例表instances map[instancesKey]map[configKey][]*model.ServiceInstance// seWithSelectorByNamespace 保存了每個 namespace 里所有的 ServiceEntry,也是作為一個索引供 handler 使用seWithSelectorByNamespace map[string][]servicesWithEntryrefreshIndexes bool... }可以看到除了?XdsUpdater?和?store?兩個必須的結(jié)構(gòu)外,其余大部分都是些資源的緩存和索引(索引鍵不同),為后續(xù)?handler?處理事件時提供便利。除了結(jié)構(gòu),還需要關(guān)注兩個比較重要的?handler?:
// WorkloadEntry 變化時的處理邏輯 func (s *ServiceEntryStore) workloadEntryHandler(old, curr model.Config, event model.Event) {} // ServiceEntry 變化時的處理邏輯 func (s *ServiceEntryStore) serviceEntryHandler(old, curr model.Config, event model.Event) {}這兩個?handler?的業(yè)務(wù)邏輯后文中再詳細(xì)討論,先來回憶下?ServiceEntryStore?的初始化流程:
img在?Server?初始化?ServiceController?的時候,通過調(diào)用?NewServiceDiscovery()?方法初始化?ServiceEntryStore?,這里除了將?EnvoyXdsServer?和?IstioConfigStore?與?ServiceEntryStore?關(guān)聯(lián)起來外,最重要的就是向?ConfigController?注冊了?ServiceEntry?和?WorkloadEntry?的事件?Handler:
func NewServiceDiscovery(configController model.ConfigStoreCache, store model.IstioConfigStore, xdsUpdater model.XDSUpdater) *ServiceEntryStore {s := &ServiceEntryStore{XdsUpdater: xdsUpdater,store: store,ip2instance: map[string][]*model.ServiceInstance{},instances: map[instancesKey]map[configKey][]*model.ServiceInstance{},workloadInstancesByIP: map[string]*model.WorkloadInstance{},refreshIndexes: true,}if configController != nil {configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler)configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)}return s }這樣在?ConfigController?監(jiān)聽到資源變化的時候,就會調(diào)用?serviceEntryHandler?和?workloadEntryHandler?來處理事件了。這兩個?handler?的目的都是向?EnvoyXdsServer?推送相應(yīng)的?xDS?資源變化。
workloadEntryHandler
首先來分析服務(wù)實(shí)例?WorkloadEntry?的更新是如何下發(fā)?xDS?的:
imgseWithSelectorByNamespace?和?instances?如上述?ServiceEntryStore?結(jié)構(gòu)介紹中的注釋,前者緩存了各個?namespace?中所有的?ServiceEntry?,后者則是所有服務(wù)節(jié)點(diǎn)?WorkloadEntry?的緩存。
當(dāng)有新的?WorkloadEntry?變化時,先從?seWithSelectorByNamespace?中讀取同一?namespace?中的?ServiceEntry?,遍歷它們并與?WorkloadEntry?的?Label?進(jìn)行比對,確定是關(guān)聯(lián)的服務(wù)后,依據(jù)獲取的服務(wù)創(chuàng)建?ServiceInstance?。?ServiceInstance?是?Pilot?抽象出的描述具體服務(wù)對應(yīng)實(shí)例的結(jié)構(gòu):
type ServiceInstance struct {Service *Service `json:"service,omitempty"`ServicePort *Port `json:"servicePort,omitempty"`Endpoint *IstioEndpoint `json:"endpoint,omitempty"` }創(chuàng)建了新的?ServiceInstance?后,需要及時更新實(shí)例的索引表?s.instances?:
if event != model.EventDelete {s.updateExistingInstances(key, instances) } else {s.deleteExistingInstances(key, instances) }之后將新創(chuàng)建的?ServiceInstance?傳入?ServiceEntryStore?專門處理?EDS?的函數(shù)?s.edsUpdate()?。在做進(jìn)一步處理時,需要再刷新一遍索引表,調(diào)用?maybeRefreshIndexes()?避免其他協(xié)程的工作導(dǎo)致索引表更新不及時,完成后開啟讀鎖,從服務(wù)實(shí)例索引表?s.Instances?中查找我們要處理的實(shí)例。如果是刪除事件,先前更新索引表的時候已經(jīng)刪除了,所以這里是查不到?allInstances?的,直接向?EnvouXdsServer?發(fā)送刪除?EDS?的請求。
// edsUpdate triggers an EDS update for the given instances func (s *ServiceEntryStore) edsUpdate(instances []*model.ServiceInstance) {allInstances := []*model.ServiceInstance{}// Find all keys we need to lookupkeys := map[instancesKey]struct{}{}for _, i := range instances {keys[makeInstanceKey(i)] = struct{}{}}s.maybeRefreshIndexes()s.storeMutex.RLock()for key := range keys {for _, i := range s.instances[key] {allInstances = append(allInstances, i...)}}s.storeMutex.RUnlock()// This was a deleteif len(allInstances) == 0 {for k := range keys {_ = s.XdsUpdater.EDSUpdate(s.Cluster(), string(k.hostname), k.namespace, nil)}return}... }如果實(shí)例有更新則直接發(fā)送更新?EDS?的請求:
// edsUpdate triggers an EDS update for the given instances func (s *ServiceEntryStore) edsUpdate(instances []*model.ServiceInstance) {...endpoints := make(map[instancesKey][]*model.IstioEndpoint)for _, instance := range allInstances {port := instance.ServicePortkey := makeInstanceKey(instance)endpoints[key] = append(endpoints[key],&model.IstioEndpoint{Address: instance.Endpoint.Address,EndpointPort: instance.Endpoint.EndpointPort,ServicePortName: port.Name,Labels: instance.Endpoint.Labels,UID: instance.Endpoint.UID,ServiceAccount: instance.Endpoint.ServiceAccount,Network: instance.Endpoint.Network,Locality: instance.Endpoint.Locality,LbWeight: instance.Endpoint.LbWeight,TLSMode: instance.Endpoint.TLSMode,})}for k, eps := range endpoints {_ = s.XdsUpdater.EDSUpdate(s.Cluster(), string(k.hostname), k.namespace, eps)} }完整的?workloadEntryHandler()?代碼如下:
func (s *ServiceEntryStore) workloadEntryHandler(old, curr model.Config, event model.Event) {wle := curr.Spec.(*networking.WorkloadEntry)key := configKey{kind: workloadEntryConfigType,name: curr.Name,namespace: curr.Namespace,}...s.storeMutex.RLock()// We will only select entries in the same namespaceentries := s.seWithSelectorByNamespace[curr.Namespace]s.storeMutex.RUnlock()// if there are no service entries, return now to avoid taking unnecessary locksif len(entries) == 0 {return}log.Debugf("Handle event %s for workload entry %s in namespace %s", event, curr.Name, curr.Namespace)instances := []*model.ServiceInstance{}for _, se := range entries {workloadLabels := labels.Collection{wle.Labels}if !workloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) {// Not a match, skip this onecontinue}instance := convertWorkloadEntryToServiceInstances(wle, se.services, se.entry)instances = append(instances, instance...)}if event != model.EventDelete {s.updateExistingInstances(key, instances)} else {s.deleteExistingInstances(key, instances)}s.edsUpdate(instances) }接下來就是?EnvoyXdsServer?來處理這次?EDS?的更新請求了。首先?EnvoyXdsServer?會判斷此次?EDS?更新是全量下發(fā)還是增量下發(fā),然后創(chuàng)建?PushRequest?發(fā)送至?EnvoyXdsServer?統(tǒng)一用來接收推送請求的?pushChannel?。
func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string, namespace string,istioEndpoints []*model.IstioEndpoint) error {inboundEDSUpdates.Increment()// 判斷是否是全量下發(fā)fp := s.edsUpdate(clusterID, serviceName, namespace, istioEndpoints)s.ConfigUpdate(&model.PushRequest{Full: fp,ConfigsUpdated: map[model.ConfigKey]struct{}{{Kind: gvk.ServiceEntry,Name: serviceName,Namespace: namespace,}: {}},Reason: []model.TriggerReason{model.EndpointUpdate},})return nil }pushChannel?后續(xù)的處理流程和?EDS?是否增量更新將在下文討論?EnvoyXdsServer?的時候再分析,這里不再贅述。
serviceEntryHandler
了解了?WorkloadEntry?的更新是如何處理之后,我們再來看下?serviceEntryHandler?是如何處理?ServiceEntry?的:
imgserviceEntryHandler?會將?ServiceEntry?轉(zhuǎn)化為一組?Pilot?內(nèi)部抽象的服務(wù),每個不同的?Hosts?、?Address?都會對應(yīng)一個?Service?,并且初始化一個名為?configsUpdated?的?map?來保存是否有?ServiceEntry?需要更新,以及創(chuàng)建了多個?slice?分別保存該新增、刪除、更新和沒有變化的服務(wù):
func (s *ServiceEntryStore) serviceEntryHandler(old, curr model.Config, event model.Event) {cs := convertServices(curr)configsUpdated := map[model.ConfigKey]struct{}{}var addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs []*model.Service... }根據(jù)不同的事件類型,更新不同的?slice?:
switch event { case model.EventUpdate:os := convertServices(old)if selectorChanged(old, curr) {// Consider all services are updated.mark := make(map[host.Name]*model.Service, len(cs))for _, svc := range cs {mark[svc.Hostname] = svcupdatedSvcs = append(updatedSvcs, svc)}for _, svc := range os {if _, f := mark[svc.Hostname]; !f {updatedSvcs = append(updatedSvcs, svc)}}} else {addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs = servicesDiff(os, cs)} case model.EventDelete:deletedSvcs = cs case model.EventAdd:addedSvcs = cs default:// this should not happenunchangedSvcs = cs }比較特別的是,當(dāng)事件為更新事件時,會和老的?Service?列表進(jìn)行比對。先看是否有某個服務(wù)的?Selector?發(fā)生了變化,如果發(fā)生了變化,需要將新老服務(wù)列表里的所有服務(wù)都加入到更新列表中。如果?Selector?沒有發(fā)生變化,通過?serviceDiff()?挨個比對新老服務(wù)列表中的服務(wù),對應(yīng)保存至新增、刪除、更新和未變化的?slice?中。
將服務(wù)歸類后,把需要變化的服務(wù)都寫入?configsUpdated?中:
for _, svcs := range [][]*model.Service{addedSvcs, deletedSvcs, updatedSvcs} {for _, svc := range svcs {configsUpdated[model.ConfigKey{Kind: gvk.ServiceEntry,Name: string(svc.Hostname),Namespace: svc.Attributes.Namespace}] = struct{}{}} }由于?serviceDiff()?只會比對?Service?結(jié)構(gòu),并不會對比?Endpoints?是否變化,所以當(dāng)有?unchangedSvcs?時,可能需要對這些服務(wù)的?xDS?做增量更新(只更新?EDS?),也可能是全量更新。什么時候會全量更新呢?當(dāng)服務(wù)的?Resolution?為?DNS?時(可以閱讀文檔了解?Resolution[1]?),?Endpoint?的?address?都是全域名,需要更新?CDS?才行。
if len(unchangedSvcs) > 0 {// If this service entry had endpoints with IPs (i.e. resolution STATIC), then we do EDS update.// If the service entry had endpoints with FQDNs (i.e. resolution DNS), then we need to do// full push (as fqdn endpoints go via strict_dns clusters in cds).currentServiceEntry := curr.Spec.(*networking.ServiceEntry)oldServiceEntry := old.Spec.(*networking.ServiceEntry)if currentServiceEntry.Resolution == networking.ServiceEntry_DNS {if !reflect.DeepEqual(currentServiceEntry.Endpoints, oldServiceEntry.Endpoints) {// fqdn endpoints have changed. Need full pushfor _, svc := range unchangedSvcs {configsUpdated[model.ConfigKey{Kind: gvk.ServiceEntry,Name: string(svc.Hostname),Namespace: svc.Attributes.Namespace}] = struct{}{}}}} }當(dāng)?unchangedSvcs?的?Resolution?為?STATIC?時,只需要增量的更新?EDS?即可:
if len(unchangedSvcs) > 0 && !fullPush {// IP endpoints in a STATIC service entry has changed. We need EDS update// If will do full-push, leave the edsUpdate to that.// XXX We should do edsUpdate for all unchangedSvcs since we begin to calculate service// data according to this "configsUpdated" and thus remove the "!willFullPush" condition.instances := convertInstances(curr, unchangedSvcs)key := configKey{kind: serviceEntryConfigType,name: curr.Name,namespace: curr.Namespace,}// If only instances have changed, just update the indexes for the changed instances.s.updateExistingInstances(key, instances)s.edsUpdate(instances)return }如果?configsUpdated?中有值,則需要做?fullPush?,先更新這些服務(wù)的?EDS?,再向?pushChannel?發(fā)送?fullPush?的?PushRequest?:
if fullPush {// When doing a full push, for added and updated services trigger an eds update// so that endpoint shards are updated.var instances []*model.ServiceInstanceif len(addedSvcs) > 0 {instances = append(instances, convertInstances(curr, addedSvcs)...)}if len(updatedSvcs) > 0 {instances = append(instances, convertInstances(curr, updatedSvcs)...)}if len(unchangedSvcs) > 0 {currentServiceEntry := curr.Spec.(*networking.ServiceEntry)oldServiceEntry := old.Spec.(*networking.ServiceEntry)// Non DNS service entries are sent via EDS. So we should compare and update if such endpoints change.if currentServiceEntry.Resolution != networking.ServiceEntry_DNS {if !reflect.DeepEqual(currentServiceEntry.Endpoints, oldServiceEntry.Endpoints) {instances = append(instances, convertInstances(curr, unchangedSvcs)...)}}}s.edsUpdate(instances)// If service entry is deleted, cleanup endpoint shards for services.for _, svc := range deletedSvcs {s.XdsUpdater.SvcUpdate(s.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, model.EventDelete)}pushReq := &model.PushRequest{Full: true,ConfigsUpdated: configsUpdated,Reason: []model.TriggerReason{model.ServiceUpdate},}s.XdsUpdater.ConfigUpdate(pushReq) }至此,?ServiceEntryStore?是如何處理?ServiceEntry?和?WorkloadEntry?的邏輯就介紹完了。其余像?ServiceEntry?選擇集群內(nèi)的?Pods?、?Kubernetes?原生?Service?選擇?WorkloadEntry?的用法讀者感興趣可以自行研究相關(guān)源碼。
其余注冊中心的處理邏輯如?kube?、?mcp?等可繼續(xù)關(guān)注本系列的其他文章。讀者也可以自行嘗試走讀分析:
// 相關(guān)源碼目錄 kube: pilot/pkg/serviceregistry/kube mcp: pilot/pkg/serviceregistry/mcp接下來我們介紹?Pilot Server?中的核心,?EnvoyXdsServer?。
引用鏈接
[1]?Resolution:?https://istio.io/latest/docs/reference/config/networking/service-entry/#ServiceEntry-Resolution
云原生社區(qū) Istio SIG
云原生社區(qū)是 ServiceMesher 的姊妹社區(qū),掃碼下面二維碼,回復(fù) 0924 即可加入云原生社區(qū) Istio SIG,和包括本文作者一起交流 Istio 經(jīng)驗(yàn)。?
總結(jié)
以上是生活随笔為你收集整理的Istio Pilot 源码分析(二)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 谈了千百遍的缓存数据的一致性问题
- 下一篇: Git 图形化操作之合并提交记录