Kubernetes的Device Plugin机制源码解析
簡介:?Kubernetes 1.8 引入的Device Plugin機制,通過擴展的方式實現支持GPU、FPGA、高性能 NIC、InfiniBand等各種設備的集成。而Device Manager正是Kubelet內負責Device Plugin交互和設備生命周期管理的模塊,在了解其基本設計后,本文對Device Manager的源碼分析,理解其運作方式。
Kubernetes 1.8 引入的Device Plugin機制,通過擴展的方式實現支持GPU、FPGA、高性能 NIC、InfiniBand等各種設備的集成。而Device Manager正是Kubelet內負責Device Plugin交互和設備生命周期管理的模塊,在了解其基本設計后,我們需要通過對Device Manager的源碼分析,理解其運作方式。
基本原則
首先明確目標:
并不是搞懂Kubelet的所有實現,而是希望理解Device Manager如何在資源發現,Pod創建,設備健康檢查過程中所作的工作以及其如何與Kubelet交互,所以我們會忽略掉與Device Manager無關的操作。
這里是我閱讀代碼的原則和一些體會:
- 理解接口,搞清楚和外部模塊的交互
- 理解實現接口的結構體
- 從用戶場景的角度將方法調用和數據結構關聯起來,好比將劇情和人物串聯起來,了解了任務設定后,就可以更快速切入代碼的調用過程;而代碼調用的閱讀也可以加深對數據結構設計的理解
- Kubernetes的代碼比較復雜,很難一下就搞清楚每一個數據結構定義的目的和用途,這時我們可以把問題和假設記下來,不要過分糾結,可以在后面求證。書讀百變其義自見,代碼也是一樣,當你逐漸熟悉了代碼的脈絡的時候,有些問題也會迎刃而解
- 由于Device Manager工作在Kubelet中,對于Kubelet的源碼通篇的了解是理解具體模塊運作機制的基礎
P.S. 本文解析的Kubernetes源碼版本是1.9.3
DeviceManager的核心代碼都在?pkg/kubelet/cm/deviceplugin下
DeviceManager接口定義
所在的文件
pkg/kubelet/cm/deviceplugin/types.go
具體定義:
// Manager manages all the Device Plugins running on a node. type Manager interface {// Start starts device plugin registration service.Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error// Devices is the map of devices that have registered themselves// against the manager.// The map key is the ResourceName of the device plugins.Devices() map[string][]pluginapi.Device// Allocate configures and assigns devices to pods. The pods are provided// through the pod admission attributes in the attrs argument. From the// requested device resources, Allocate will communicate with the owning// device plugin to allow setup procedures to take place, and for the// device plugin to provide runtime settings to use the device (environment// variables, mount points and device files). The node object is provided// for the device manager to update the node capacity to reflect the// currently available devices.Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error// Stop stops the manager.Stop() error// GetDeviceRunContainerOptions checks whether we have cached containerDevices// for the passed-in <pod, container> and returns its DeviceRunContainerOptions// for the found one. An empty struct is returned in case no cached state is found.GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions// GetCapacity returns the amount of available device plugin resource capacity// and inactive device plugin resources previously registered on the node.GetCapacity() (v1.ResourceList, []string) }從注釋中可以看到DeviceManager負責管理節點上運行的所有設備插件,這里分別定義了可以和外界交互的6個方法:
- Start()和stop()分別是啟動設備插件注冊和停止服務,這其實K8S中的常見套路
- Devices()以map的形式列出device列表
以下3個方法是比較核心的工作:
- Allocate()為Pod分配可用的設備,并且調用設備插件進行所需的設備初始化
- GetDeviceRunContainerOptions()獲得為容器配置設備所需要的的參數,比如Environment,Volume和Device,這個方法會用于創建容器的過程中
- GetCapacity()用于節點向API Server上報Extended Resource的數量
當然要更清楚的理解,還需要結合具體場景中的調用鏈路進行理解。這里DeviceManager接口有兩個實現分別是:MangerImpl?和?ManagerStub, ManagerStub實際上是一個空實現,無需細看。下面簡單了解一下?MangerImpl的實現
DeviceManager接口實現
所在的文件
pkg/kubelet/cm/deviceplugin/manager.go具體定義:
// ManagerImpl is the structure in charge of managing Device Plugins. type ManagerImpl struct {socketname stringsocketdir stringendpoints map[string]endpoint // Key is ResourceNamemutex sync.Mutexserver *grpc.Server// activePods is a method for listing active pods on the node// so the amount of pluginResources requested by existing pods// could be counted when updating allocated devicesactivePods ActivePodsFunc// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.// We use it to determine when we can purge inactive pods from checkpointed state.sourcesReady config.SourcesReady// callback is used for updating devices' states in one time call.// e.g. a new device is advertised, two old devices are deleted and a running device fails.callback monitorCallback// allDevices contains all of registered resourceNames and their exported device IDs.allDevices map[string]sets.String// allocatedDevices contains allocated deviceIds, keyed by resourceName.allocatedDevices map[string]sets.String// podDevices contains pod to allocated device mapping.podDevices podDevices }在ManagerImpl的定義和注釋中,可以大致猜測它在做三件事:
- 提供grpc的服務,支持多個Device Plugin的注冊
- 為Device Plugin提供回調函數monitorCallback,當設備的狀態發生變化時,可以讓Device Manager被通知,從而做一些相應的處理。比如當某個設備無法正常工作時,就需要將節點上可用資源總數減去一個
- 設備的分配和管理,具體講就是記錄某種設備一共有哪幾個,已經分配出去的是哪幾個。從這里看,Device Plugin需要為每個設備提供一個UUID, 這個UUID需要在本節點唯一并且不可改變,而Device Manager要做的事情就是維護這個UUID的集合,并且負責設備更新和分配
場景分類
這里主要涉及五個場景:
- Device Manager的初始化和啟動
- 接收Device Plugin的endpoint注冊,并且向Endpoint查詢Device ID列表
- 定時上報節點上的設備信息
- 創建Pod時,將設備信息與Pod結合,生成創建容器所需要的配置(Environment, Device, Volume)
- 當設備狀態不健康的時候,通知Kubelet更新可用設備的狀態
本文首先分析場景一:Device Manager的初始化和啟動過程
Device Manager的初始化和啟動過程
Kubernetes的代碼量巨大,但是細看每個模塊的啟動流程都有比較相似的套路,以Kubelet為例:
而DeviceManger的初始化就是發生在步驟3和步驟4
- app.kubelet對應的是cmd/kubelet/kubelet.go
- server對應的是cmd/kubelet/app/server.go
- kubelet對應的是pkg/kubelet/kubelet.go
- container_manager_linux對應的是pkg/kubelet/cm/container_manager_linux.go
- device.manager對應的是pkg/kubelet/cm/deviceplugin/manager.go
以上時序圖就是Kubelet如何初始化和啟動DeviceManager的流程(為了方便理解,這里會忽略和DeviceManager無關的方法)
可以看到server中run()方法做兩件事情:NewMainKubelet和startKubelet,而Device Manager的初始化與啟動也是在這兩個步驟中完成,同時啟動grpc注冊服務,這時Device Plugin就可以注冊進來。
實際定義在:pkg/kubelet/cm/container_manager_linux.go
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) { ...glog.Infof("Creating device plugin manager: %t", devicePluginEnabled)if devicePluginEnabled {cm.devicePluginManager, err = deviceplugin.NewManagerImpl()} else {cm.devicePluginManager, err = deviceplugin.NewManagerStub()}... }由于這個功能目前還比較新,需要通過feature gate打開, 即配置?--feature-gates=DevicePlugins=true,默認該功能是關閉的。當該功能打開時會調用deviceplugin.NewManagerImpl(),否則會有stub實現,不作任何事情。
deviceplugin.NewManagerImpl()定義在pkg/kubelet/cm/deviceplugin/manager.go內,
// NewManagerImpl creates a new manager. func NewManagerImpl() (*ManagerImpl, error) {return newManagerImpl(pluginapi.KubeletSocket) }實際上真正做初始的工作都是在下列方法完成的
func newManagerImpl(socketPath string) (*ManagerImpl, error) {glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)if socketPath == "" || !filepath.IsAbs(socketPath) {return nil, fmt.Errorf(errBadSocket+" %v", socketPath)}dir, file := filepath.Split(socketPath)manager := &ManagerImpl{endpoints: make(map[string]endpoint),socketname: file,socketdir: dir,allDevices: make(map[string]sets.String),allocatedDevices: make(map[string]sets.String),podDevices: make(podDevices),}manager.callback = manager.genericDeviceUpdateCallback// The following structs are populated with real implementations in manager.Start()// Before that, initializes them to perform no-op operations.manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }manager.sourcesReady = &sourcesReadyStub{}return manager, nil }這里只是做ManagerImpl的初始化,有意義的工作只有兩個
- 設置DeviceManager內置grpc服務的監聽文件?socketPath, 由于DeviceManager和Device Plugin部署在同一個節點,所以只需要利用Unix Socket的模式通信
- 設置設備狀態的回調函數?genericDeviceUpdateCallback
就像注釋中提到?The following structs are populated with real implementations in manager.Start()的一樣,實際上在初始化階段,并沒有
這里會把活躍的pod列表以及pod元數據的來源(FILE, URL, api-server)作為輸入用來啟動DeviceManager, 這兩個參數在啟動的時候并沒有用到
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {glog.V(2).Infof("Starting Device Plugin manager")m.activePods = activePodsm.sourcesReady = sourcesReady// Loads in allocatedDevices information from disk.err := m.readCheckpoint()if err != nil {glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)}socketPath := filepath.Join(m.socketdir, m.socketname)os.MkdirAll(m.socketdir, 0755)// Removes all stale sockets in m.socketdir. Device plugins can monitor// this and use it as a signal to re-register with the new Kubelet.if err := m.removeContents(m.socketdir); err != nil {glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err)}s, err := net.Listen("unix", socketPath)if err != nil {glog.Errorf(errListenSocket+" %+v", err)return err}m.server = grpc.NewServer([]grpc.ServerOption{}...)pluginapi.RegisterRegistrationServer(m.server, m)go m.server.Serve(s)glog.V(2).Infof("Serving device plugin registration server on %q", socketPath)return nil }Start主要核心做兩件事情:
- m.readCheckpoint()?負責從本地checkpoint(/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint)中獲取已經注冊和分配了的設備信息,為什么要這樣做呢?這主要是因為Kubelet負責設備的分配和管理工作, 這些信息只存在于Kubelet的內存中。一旦Kubelet重啟了之后,哪些設備已經分配了出去,以及這些分配出去的設備具體和哪個Pod關聯
DeviceManager在每次分配設備給Pod后會將Pod和設備的映射關系以json格式記錄到本地的一個文件
- go m.server.Serve(s)?以后臺grouting的方式啟動grpc服務,這樣就可以完成Device Plugin的注冊,我們會在后面介紹grpc開放的服務如何與Device Plugin進行交互。
小結:
閱讀開源源代碼可以幫助我們提升技術水平, 不但能深入技術底層原理,快速理解技術架構;同樣也可以幫助我們學習優秀的代碼風格和設計模式。本文這里只是拋磚引玉,對Device Manager初始化場景進行了分析,后續我們也會對其他場景繼續研究,加深對Kubernetes的Device Plugin機制的理解。
總結
以上是生活随笔為你收集整理的Kubernetes的Device Plugin机制源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cookie,sessionStorag
- 下一篇: 本地存储localStorage用法详解