Kubernetes Controller Manager 工作原理
原文連接:https://blog.ihypo.net/15763910382218.html
本文基于對 Kubernetes v1.16 的源碼閱讀,文章有一定的源碼,但我會(huì)通過配圖盡量描述清晰
在 Kubernetes Master 節(jié)點(diǎn)中,有三個(gè)重要組件:ApiServer、ControllerManager、Scheduler,它們一起承擔(dān)了整個(gè)集群的管理工作。本文嘗試梳理清楚 ControllerManager 的工作流程和原理。
什么是 Controller Manager
根據(jù)官方文檔的說法:kube-controller-manager 運(yùn)行控制器,它們是處理集群中常規(guī)任務(wù)的后臺(tái)線程。
說白了,Controller Manager 就是集群內(nèi)部的管理控制中心,由負(fù)責(zé)不同資源的多個(gè) Controller 構(gòu)成,共同負(fù)責(zé)集群內(nèi)的 Node、Pod 等所有資源的管理,比如當(dāng)通過 Deployment 創(chuàng)建的某個(gè) Pod 發(fā)生異常退出時(shí),RS Controller 便會(huì)接受并處理該退出事件,并創(chuàng)建新的 Pod 來維持預(yù)期副本數(shù)。
幾乎每種特定資源都有特定的 Controller 維護(hù)管理以保持預(yù)期狀態(tài),而 Controller Manager 的職責(zé)便是把所有的 Controller 聚合起來:
可以這么說,Controller 保證集群內(nèi)的資源保持預(yù)期狀態(tài),而 Controller Manager 保證了 Controller 保持在預(yù)期狀態(tài)。
Controller 工作流程
在講解 Controller Manager 怎么為 Controller 提供基礎(chǔ)設(shè)施和運(yùn)行環(huán)境之前,我們先了解一下 Controller 的工作流程是什么樣子的。
從比較高維度的視角看,Controller Manager 主要提供了一個(gè)分發(fā)事件的能力,而不同的 Controller 只需要注冊對應(yīng)的 Handler 來等待接收和處理事件。
以 Deployment Controller 舉例,在 pkg/controller/deployment/deployment_controller.go 的 NewDeploymentController 方法中,便包括了 Event Handler 的注冊,對于 Deployment Controller 來說,只需要根據(jù)不同的事件實(shí)現(xiàn)不同的處理邏輯,便可以實(shí)現(xiàn)對相應(yīng)資源的管理。
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: dc.addDeployment,UpdateFunc: dc.updateDeployment,// This will enter the sync loop and no-op, because the deployment has been deleted from the store.DeleteFunc: dc.deleteDeployment, }) rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: dc.addReplicaSet,UpdateFunc: dc.updateReplicaSet,DeleteFunc: dc.deleteReplicaSet, }) podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: dc.deletePod, })可以看到,在 Controller Manager 的幫助下,Controller 的邏輯可以做的非常純粹,只需要實(shí)現(xiàn)相應(yīng)的 EventHandler 即可,那么 Controller Manager 都做了哪些具體的工作呢?
Controller Manager 架構(gòu)
輔助 Controller Manager 完成事件分發(fā)的是 client-go,而其中比較關(guān)鍵的模塊便是 informer。
kubernetes 在 github 上提供了一張 client-go 的架構(gòu)圖,從中可以看出,Controller 正是下半部分(CustomController)描述的內(nèi)容,而 Controller Manager 主要完成的是上半部分。
Informer 工廠
從上圖可以看到 Informer 是一個(gè)非常關(guān)鍵的 “橋梁” 作用,因此對 Informer 的管理便是 Controller Manager 要做的第一件事。
在 Controller Manager 啟動(dòng)時(shí),便會(huì)創(chuàng)建一個(gè)名為 SharedInformerFactory 的單例工廠,因?yàn)槊總€(gè) Informer 都會(huì)與 Api Server 維持一個(gè) watch 長連接,所以這個(gè)單例工廠通過為所有 Controller 提供了唯一獲取 Informer 的入口,來保證每種類型的 Informer 只被實(shí)例化一次。
該單例工廠的初始化邏輯:
// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options. func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {factory := &sharedInformerFactory{client: client,namespace: v1.NamespaceAll,defaultResync: defaultResync,informers: make(map[reflect.Type]cache.SharedIndexInformer),startedInformers: make(map[reflect.Type]bool),customResync: make(map[reflect.Type]time.Duration),}// Apply all optionsfor _, opt := range options {factory = opt(factory)}return factory }從上面的初始化邏輯中可以看到,sharedInformerFactory 中最重要的是名為 informers 的 map,其中 key 為資源類型,而 value 便是關(guān)注該資源類型的 Informer。每種類型的 Informer 只會(huì)被實(shí)例化一次,并存儲(chǔ)在 map 中,不同 Controller 需要相同資源的 Informer 時(shí)只會(huì)拿到同一個(gè) Informer 實(shí)例。
對于 Controller Manager 來說,維護(hù)所有的 Informer 使其正常工作,是保證所有 Controller 正常工作的基礎(chǔ)條件。sharedInformerFactory 通過該 map 維護(hù)了所有的 informer 實(shí)例,因此,sharedInformerFactory 也承擔(dān)了提供統(tǒng)一啟動(dòng)入口的職責(zé):
// Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()for informerType, informer := range f.informers {if !f.startedInformers[informerType] {go informer.Run(stopCh)f.startedInformers[informerType] = true}} }當(dāng) Controller Manager 啟動(dòng)時(shí),最重要的就是通過該工廠的 Start 方法,將所有的 Informer 運(yùn)行起來。
Informer 的創(chuàng)建
下面看下這些 Informer 是怎么被創(chuàng)建的。Controller Manager 在 cmd/kube-controller-manager/app/controllermanager.go 的 NewControllerInitializers 函數(shù)中初識(shí)化了所有的 Controller,因?yàn)榇a冗長,這里僅拿 Deployment Controller 舉例子。
初始化 Deployment Controller 的邏輯在 cmd/kube-controller-manager/app/apps.go 的 startDeploymentController 的函數(shù)中:
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {return nil, false, nil}dc, err := deployment.NewDeploymentController(ctx.InformerFactory.Apps().V1().Deployments(),ctx.InformerFactory.Apps().V1().ReplicaSets(),ctx.InformerFactory.Core().V1().Pods(),ctx.ClientBuilder.ClientOrDie("deployment-controller"),)if err != nil {return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)}go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)return nil, true, nil }最關(guān)鍵的邏輯在 deployment.NewDeploymentController 上,該函數(shù)真正創(chuàng)建了 Deployment Controller,而該創(chuàng)建函數(shù)的前三個(gè)參數(shù)分別為 Deployment、ReplicaSet、Pod 的 Informer。可以看到,Informer 的單例工廠以 ApiGroup 為路徑提供了不同資源的 Informer 創(chuàng)建入口。
不過要注意的是,.Apps().V1().Deployments() 雖然返回的是 deploymentInformer 類型的實(shí)例,但是,deploymentInformer 其實(shí)并不是一個(gè)真正的 Informer(盡管他以 Informer 命名),它只是一個(gè)模板類,主要功能是提供關(guān)注 Deployment 這一特定資源 Informer 的創(chuàng)建模板:
// Deployments returns a DeploymentInformer. func (v *version) Deployments() DeploymentInformer {return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} }真正創(chuàng)建 Informer 的邏輯是在 deploymentInformer.Informer() 中(client-go/informers/apps/v1/deployment.go),f.defaultInformer 是默認(rèn)的 Deployment Informer 創(chuàng)建模板方法,通過將資源實(shí)例和該模板方法傳入 Informer 工廠的 InformerFor 方法,來創(chuàng)建僅關(guān)注 Deployment 資源的 Informer:
func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer) }簡單梳理一下:
這里用到了模板方法(設(shè)計(jì)模式),雖然有一點(diǎn)繞口,但可以參考下圖梳理一下,理解關(guān)鍵在于 Informer 的 差異化的創(chuàng)建邏輯下放給了模板類:
最后,名為 sharedIndexInformer 的結(jié)構(gòu)體將被實(shí)例化,并真正的承擔(dān) Informer 的職責(zé)。被注冊到 Informer 工廠 map 中的也是該實(shí)例。
Informer 的運(yùn)行
因?yàn)檎嬲?Informer 實(shí)例是一個(gè) sharedIndexInformer 類型的對象,當(dāng) Informer 工廠啟動(dòng)時(shí)(執(zhí)行 Start 方法),被真正運(yùn)行起來的是 sharedIndexInformer。
sharedIndexInformer 是 client-go 里的組件,它的 Run 方法雖然短短幾十行,但是卻承擔(dān)了很多工作。到這里,才到了 Controller Manager 最有趣的部分。
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)cfg := &Config{Queue: fifo,ListerWatcher: s.listerWatcher,ObjectType: s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError: false,ShouldResync: s.processor.shouldResync,Process: s.HandleDeltas,}func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.controller = New(cfg)s.controller.(*controller).clock = s.clocks.started = true}()// Separate stop channel because Processor should be stopped strictly after controllerprocessorStopCh := make(chan struct{})var wg wait.Groupdefer wg.Wait() // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stopwg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)wg.StartWithChannel(processorStopCh, s.processor.run)defer func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.stopped = true // Don't want any new listeners}()s.controller.Run(stopCh) }sharedIndexInformer 的啟動(dòng)邏輯主要做了下面幾件事:
這幾個(gè)名詞(或者說組件)前文并沒有提到過,而這四件事情是 Controller Manager 工作的核心內(nèi)容,因此下面我會(huì)分別介紹。
sharedIndexInformer
sharedIndexInformer 是一個(gè)共享的 Informer 框架,不同的 Controller 只需要提供一個(gè)模板類(比如上文提到的 deploymentInformer ),便可以創(chuàng)建一個(gè)符合自己需求的特定 Informer。
sharedIndexInformer 包含了一堆工具來完成 Informer 的任務(wù),其主要代碼在 client-go/tools/cache/shared_informer.go 中。其創(chuàng)建邏輯也在其中:
// NewSharedIndexInformer creates a new instance for the listwatcher. func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {realClock := &clock.RealClock{}sharedIndexInformer := &sharedIndexInformer{processor: &sharedProcessor{clock: realClock},indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),listerWatcher: lw,objectType: objType,resyncCheckPeriod: defaultEventHandlerResyncPeriod,defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),clock: realClock,}return sharedIndexInformer }在創(chuàng)建邏輯中,有幾個(gè)東西需要留意:
除此之外,還包含了上文啟動(dòng)邏輯中提到了 DeltaFIFO 隊(duì)列和 controller,下面就分別介紹。
sharedProcessor
processor 是 sharedIndexInformer 中一個(gè)非常有趣的組件,Controller Manager 通過一個(gè) Informer 單例工廠來保證不同的 Controller 共享了同一個(gè) Informer,但是不同的 Controller 對該共享的 Informer 注冊的 Handler 不同,那么 Informer 應(yīng)該怎么管理被注冊的 Handler 呢?
processor 便是用來管理被注冊的 Handler 以及將事件分發(fā)給不同 Handler 的組件。
type sharedProcessor struct {listenersStarted boollistenersLock sync.RWMutexlisteners []*processorListenersyncingListeners []*processorListenerclock clock.Clockwg wait.Group }sharedProcessor 的工作核心是圍繞著 listeners 這個(gè) Listener 切片展開的。
當(dāng)我們注冊一個(gè) Handler 到 Informer 時(shí),最終會(huì)被轉(zhuǎn)換為一個(gè)名為 processorListener 結(jié)構(gòu)體的實(shí)例:
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {ret := &processorListener{nextCh: make(chan interface{}),addCh: make(chan interface{}),handler: handler,pendingNotifications: *buffer.NewRingGrowing(bufferSize),requestedResyncPeriod: requestedResyncPeriod,resyncPeriod: resyncPeriod,}ret.determineNextResync(now)return ret }該實(shí)例主要包含兩個(gè) channel 和外面注冊的 Handler 方法。而此處被實(shí)例化的 processorListener 對象最終會(huì)被添加到 sharedProcessor.listeners 列表中:
func (p *sharedProcessor) addListener(listener *processorListener) {p.listenersLock.Lock()defer p.listenersLock.Unlock()p.addListenerLocked(listener)if p.listenersStarted {p.wg.Start(listener.run)p.wg.Start(listener.pop)} }如圖所示,Controller 中的 Handler 方法最終會(huì)被添加到 Listener 中,而 Listener 將會(huì)被 append 到 sharedProcessor 的 Listeners 切片中。
前文提到,sharedIndexInformer 啟動(dòng)時(shí)會(huì)將 sharedProcessor 運(yùn)行起來,而 sharedProcessor 的啟動(dòng)邏輯便是和這些 listener 有關(guān):
func (p *sharedProcessor) run(stopCh <-chan struct{}) {func() {p.listenersLock.RLock()defer p.listenersLock.RUnlock()for _, listener := range p.listeners {p.wg.Start(listener.run)p.wg.Start(listener.pop)}p.listenersStarted = true}()<-stopChp.listenersLock.RLock()defer p.listenersLock.RUnlock()for _, listener := range p.listeners {close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop}p.wg.Wait() // Wait for all .pop() and .run() to stop }可以看到,sharedProcessor 啟動(dòng)時(shí)會(huì)依次執(zhí)行 listener 的 run 和 pop 方法,我們現(xiàn)在看下這兩個(gè)方法。
listener 的啟動(dòng)
因?yàn)?listener 包含了 Controller 注冊進(jìn)來的 Handler 方法,因此 listener 最重要的職能就是當(dāng)事件發(fā)生時(shí)來觸發(fā)這些方法,而 listener.run 就是不停的從 nextCh 這個(gè) channel 中拿到事件并執(zhí)行對應(yīng)的 handler:
func (p *processorListener) run() {// this call blocks until the channel is closed. When a panic happens during the notification// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)// the next notification will be attempted. This is usually better than the alternative of never// delivering again.stopCh := make(chan struct{})wait.Until(func() {// this gives us a few quick retries before a long pause and then a few more quick retrieserr := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {for next := range p.nextCh {switch notification := next.(type) {case updateNotification:p.handler.OnUpdate(notification.oldObj, notification.newObj)case addNotification:p.handler.OnAdd(notification.newObj)case deleteNotification:p.handler.OnDelete(notification.oldObj)default:utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))}}// the only way to get here is if the p.nextCh is empty and closedreturn true, nil})// the only way to get here is if the p.nextCh is empty and closedif err == nil {close(stopCh)}}, 1*time.Minute, stopCh) }可以看到,listener.run 不停的從 nextCh 這個(gè) channel 中拿到事件,但是 nextCh 這個(gè) channel 里的事件又是從哪來的呢?listener.pop 的職責(zé)便是將事件放入 nextCh 中。
listener.pop 是一段非常精巧和有趣的邏輯:
func (p *processorListener) pop() {defer utilruntime.HandleCrash()defer close(p.nextCh) // Tell .run() to stopvar nextCh chan<- interface{}var notification interface{}for {select {case nextCh <- notification:// Notification dispatchedvar ok boolnotification, ok = p.pendingNotifications.ReadOne()if !ok { // Nothing to popnextCh = nil // Disable this select case}case notificationToAdd, ok := <-p.addCh:if !ok {return}if notification == nil { // No notification to pop (and pendingNotifications is empty)// Optimize the case - skip adding to pendingNotificationsnotification = notificationToAddnextCh = p.nextCh} else { // There is already a notification waiting to be dispatchedp.pendingNotifications.WriteOne(notificationToAdd)}}} }listener 之所以包含了兩個(gè) channel:addCh 和 nextCh,是因?yàn)?Informer 無法預(yù)知 listener.handler 的事件消費(fèi)的速度是否大于事件生產(chǎn)的速度,因此添加了一個(gè)名為 pendingNotifications 的緩沖隊(duì)列來保存未來得及消費(fèi)的事件。
pop 方法一方面會(huì)不停的從 addCh 中獲得最新事件,以保證不會(huì)讓生產(chǎn)方阻塞。然后判斷是否存在 buffer,如果存在則把事件添加到 buffer 中,如果不存在則嘗試推給 nextCh。
而另一方面,會(huì)判斷 buffer 中是否還有事件,如果還有存量,則不停的傳遞給 nextCh。
pop 方法實(shí)現(xiàn)了一個(gè)帶 buffer 的分發(fā)機(jī)制,使得事件可以源源不斷的從 addCh 到 nextCh。但是問題來了,那 addCh 的事件從哪來呢。
其實(shí)來源非常簡單,listener 有一個(gè) add 方法,入?yún)⑹且粋€(gè)事件,該方法會(huì)將新事件推入 addCh 中。而調(diào)用該 add 方法的是管理所有 listener 的 sharedProcessor。
上面提到過,sharedProcessor 的職責(zé)便是管理所有的 Handler 以及分發(fā)事件,而真正做分發(fā)工作的是 distribute 方法:
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {p.listenersLock.RLock()defer p.listenersLock.RUnlock()if sync {for _, listener := range p.syncingListeners {listener.add(obj)}} else {for _, listener := range p.listeners {listener.add(obj)}} }到目前為止,我們有一部分比較清晰了:
那么剩下的問題就是 Informer 的事件從哪來呢?
DeltaFIFO
在分析 Informer 獲取事件之前,需要提前講一個(gè)非常有趣的小工具,就是在 sharedIndexInformer.Run 的時(shí)候創(chuàng)建的 fifo 隊(duì)列:
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)DeltaFIFO 是一個(gè)非常有趣的隊(duì)列,相關(guān)代碼定義在 client-go/tools/cache/delta_fifo.go 中。對于一個(gè)隊(duì)列來說,最重要的肯定是 Add 方法和 Pop 方法,DeltaFIFO 提供了多個(gè) Add 方法,雖然根據(jù)不同的事件類型(add/update/delete/sync)區(qū)分不同的方法,但是最終都會(huì)執(zhí)行 queueActionLocked:
// queueActionLocked appends to the delta list for the object. // Caller must lock first. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)if err != nil {return KeyError{obj, err}}// If object is supposed to be deleted (last event is Deleted),// then we should ignore Sync events, because it would result in// recreation of this object.if actionType == Sync && f.willObjectBeDeletedLocked(id) {return nil}newDeltas := append(f.items[id], Delta{actionType, obj})newDeltas = dedupDeltas(newDeltas)if len(newDeltas) > 0 {if _, exists := f.items[id]; !exists {f.queue = append(f.queue, id)}f.items[id] = newDeltasf.cond.Broadcast()} else {// We need to remove this from our map (extra items in the queue are// ignored if they are not in the map).delete(f.items, id)}return nil }queueActionLocked 方法的第一個(gè)參數(shù) actionType 便是事件類型:
const (Added DeltaType = "Added" // watch api 獲得的創(chuàng)建事件Updated DeltaType = "Updated" // watch api 獲得的更新事件Deleted DeltaType = "Deleted" // watch api 獲得的刪除事件Sync DeltaType = "Sync" // 觸發(fā)了 List Api,需要刷新緩存 )從事件類型以及入隊(duì)列方法可以看出,這是一個(gè)帶有業(yè)務(wù)功能的隊(duì)列,并不是單純的“先入先出”,入隊(duì)列方法中有兩個(gè)非常精巧的設(shè)計(jì):
第二點(diǎn)比較好理解,如果觸發(fā)了 List 請求,而且發(fā)現(xiàn)要被處理的資源已經(jīng)被刪除了,則就不需要再入隊(duì)列處理。而第一點(diǎn)需要結(jié)合出隊(duì)列方法一起來看:
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()defer f.lock.Unlock()for {for len(f.queue) == 0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.// When Close() is called, the f.closed is set and the condition is broadcasted.// Which causes this loop to continue and return from the Pop().if f.IsClosed() {return nil, ErrFIFOClosed}f.cond.Wait()}id := f.queue[0]f.queue = f.queue[1:]if f.initialPopulationCount > 0 {f.initialPopulationCount--}item, ok := f.items[id]if !ok {// Item may have been deleted subsequently.continue}delete(f.items, id)err := process(item)if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}// Don't need to copyDeltas here, because we're transferring// ownership to the caller.return item, err} }DeltaFIFO 的 Pop 方法有一個(gè)入?yún)?#xff0c;即是處理函數(shù),出隊(duì)列時(shí),DeltaFIFO 會(huì)先根據(jù)資源 id 獲得該資源 所有的事件,然后交給處理函數(shù)。
工作流程如圖所示:
總體來看,DeltaFIFO 的入隊(duì)列方法,會(huì)先判斷該資源是否已經(jīng)在 items 中, 如果已經(jīng)存在,說明該資源還沒有被消費(fèi)(還在 queue 中排隊(duì)),則直接將事件 append 到 items[resource_id] 中即可。如果發(fā)現(xiàn)不在 items 中,便會(huì)創(chuàng)建 items[resource_id],并將資源 id append 到 queue 中。
而 DeltaFIFO 出隊(duì)列方法,會(huì)從 queue 中拿到隊(duì)列最前面的資源 id,然后從 items 中拿走該資源所有的事件,最后調(diào)用 Pop 方法傳入的 PopProcessFunc 類型的處理函數(shù)。
因此,DeltaFIFO 的特點(diǎn)在于,入隊(duì)列的是(資源的)事件,而出隊(duì)列時(shí)是拿到的是最早入隊(duì)列的資源的所有事件。這樣的設(shè)計(jì)保證了不會(huì)因?yàn)橛心硞€(gè)資源瘋狂的制造事件,導(dǎo)致其他資源沒有機(jī)會(huì)被處理而產(chǎn)生饑餓。
controller
DeltaFIFO 是一個(gè)非常重要的組件,真正讓他發(fā)揮價(jià)值的,便是 Informer 的 controller。
雖然 K8s 源碼中的確用的是 controller 這個(gè)詞,但是此 controller 并不是 Deployment Controller 這種資源控制器。而是一個(gè)承上啟下的事件控制器(從 API Server 拿到事件,下發(fā)給 Informer 進(jìn)行處理)。
controller 的職責(zé)就兩個(gè):
controller 的定義非常簡單,它的核心就是 Reflector:
type controller struct {config Configreflector *ReflectorreflectorMutex sync.RWMutexclock clock.Clock }Reflector 的代碼比較繁瑣但是功能比較簡單,就是通過 sharedIndexInformer 里定義的 listerWatcher 進(jìn)行 List-Watch,并將獲得的事件推入 DeltaFIFO 中。
controller 啟動(dòng)之后會(huì)先將 Reflector 啟動(dòng),然后在執(zhí)行 processLoop,通過一個(gè)死循環(huán),不停的將從 DeltaFIFO 讀出需要處理的資源事件,并交給 sharedIndexInformer 的 HandleDeltas 方法(創(chuàng)建 controller 時(shí)賦值給了 config.Process)。
func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.c.config.Queue.AddIfNotPresent(obj)}}} }如果我們再查看下 sharedIndexInformer 的 HandleDeltas 方法,就會(huì)發(fā)現(xiàn)整個(gè)事件消費(fèi)流程被打通了:
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Added, Updated:isSync := d.Type == Syncs.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err}s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {if err := s.indexer.Add(d.Object); err != nil {return err}s.processor.distribute(addNotification{newObj: d.Object}, isSync)}case Deleted:if err := s.indexer.Delete(d.Object); err != nil {return err}s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil }前面我們知道了 processor.distribute 方法可以將事件分發(fā)給所有 listener,而 controller 會(huì)使用 Reflector 從 ApiServer 拿到事件,并入隊(duì)列,然后通過 processLoop 從隊(duì)列中拿出要處理的資源的所有事件,最后通過 sharedIndexInformer 的 HandleDeltas 方法,調(diào)用了 processor.distribute。
因此,我們可以將整個(gè)事件流向整理為下圖:
Indexer
以上,我們將事件從接收到分發(fā),中間所有的邏輯已經(jīng)梳理了一遍,但是在 sharedIndexInformer 的 HandleDeltas 方法中,還有一些邏輯比較令人注意,就是所有的事件都會(huì)先對 s.indexer 進(jìn)行更新,然后在分發(fā)。
前面提到 Indexer 是一個(gè)線程安全的存儲(chǔ),作為緩存使用,為了減輕資源控制器(Controller)查詢資源時(shí)對 ApiServer 的壓力。
當(dāng)有任何事件更新時(shí),會(huì)先刷新 Indexer 里的緩存,然后再將事件分發(fā)給資源控制器,資源控制器在需要獲得資源詳情的時(shí)候,優(yōu)先從 Indexer 獲得,就可以減少對 APIServer 不必要的查詢請求。
Indexer 存儲(chǔ)的具體實(shí)現(xiàn)在 client-go/tools/cache/thread_safe_store.go 中,數(shù)據(jù)存儲(chǔ)在 threadSafeMap 中:
type threadSafeMap struct {lock sync.RWMutexitems map[string]interface{}// indexers maps a name to an IndexFuncindexers Indexers// indices maps a name to an Indexindices Indices }從本質(zhì)上講,threadSafeMap 就是加了一個(gè)讀寫鎖的 map。除此之外,還可以定義索引,索引的實(shí)現(xiàn)非常有趣,通過兩個(gè)字段完成:
相關(guān)邏輯比較簡單,可以參考下圖:
MutationDetector
sharedIndexInformer 的 HandleDeltas 方法中,除了向 s.indexer 更新的數(shù)據(jù)之外,還向 s.cacheMutationDetector 更新了數(shù)據(jù)。
在一開始講到 sharedIndexInformer 啟動(dòng)時(shí)還會(huì)啟動(dòng)一個(gè) cacheMutationDetector,來監(jiān)控 indexer 的緩存。
因?yàn)?indexer 緩存的其實(shí)是一個(gè)指針,多個(gè) Controller 訪問 indexer 緩存的資源,其實(shí)獲得的是同一個(gè)資源實(shí)例。如果有一個(gè) Controller 并不本分,修改了資源的屬性,勢必會(huì)影響到其他 Controller 的正確性。
MutationDetector 的作用正是定期檢查有沒有緩存被修改,當(dāng) Informer 接收到新事件時(shí),MutationDetector 會(huì)保存該資源的指針(和 indexer 一樣),以及該資源的深拷貝。通過定期檢查指針指向的資源和開始存儲(chǔ)的深拷貝是否一致,便知道被緩存的資源是否被修改。
不過,具體是否啟用監(jiān)控是受到環(huán)境變量 KUBE_CACHE_MUTATION_DETECTOR 影響的,如果不設(shè)置該環(huán)境變量,sharedIndexInformer 實(shí)例化的是 dummyMutationDetector,在啟動(dòng)后什么事情也不做。
如果 KUBE_CACHE_MUTATION_DETECTOR 為 true,則 sharedIndexInformer 實(shí)例化的是 defaultCacheMutationDetector,該實(shí)例會(huì)以 1s 為間隔,定期執(zhí)行檢查緩存,如果發(fā)現(xiàn)緩存被修改,則會(huì)觸發(fā)一個(gè)失敗處理函數(shù),如果該函數(shù)沒被定義,則會(huì)觸發(fā)一個(gè) panic。
總結(jié)
本文講解的應(yīng)該算是狹義的 Controller Manager,畢竟沒有包含具體的資源管理器(Controller),而只是講解 Controller Manager 是怎么 “Manage Controller” 的。
可以看到 Controller Manager 做了很多工作來保證 Controller 可以只專注于處理自己關(guān)心的事件,而這些工作的核心就是 Informer,當(dāng)理解了 Informer 是如何與其他組件協(xié)同工作,那么 Controller Manager 為資源管理器鋪墊了什么也就了然了。
拓展
- 《圖解 Deployment Controller 工作流程》
總結(jié)
以上是生活随笔為你收集整理的Kubernetes Controller Manager 工作原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kubernetes API 聚合开发汇
- 下一篇: 什么是informer机制