Kubernetes监控之Heapster源码分析
源碼版本
heapster version: release-1.2
簡(jiǎn)介
Heapster是Kubernetes下的一個(gè)監(jiān)控項(xiàng)目,用于進(jìn)行容器集群的監(jiān)控和性能分析。
基本的功能及概念介紹可以回顧我之前的一篇文章:《Kubernetes監(jiān)控之Heapster介紹》。
隨著的Heapster的版本迭代,支持的功能越越來(lái)越多,比如新版本支持更多的后端數(shù)據(jù)存儲(chǔ)方式:OpenTSDB、Monasca、Kafka、Elasticsearch等等。看過(guò)低版本(如v0.18)的源碼,會(huì)發(fā)現(xiàn)v1.2版本的源碼架構(gòu)完全變了樣,架構(gòu)擴(kuò)展性越來(lái)越強(qiáng),源碼學(xué)無(wú)止境!
上面很多介紹這篇文章并不會(huì)涉及,我們還是會(huì)用到最流行的模式:Heapster + InfluxDB。
監(jiān)控系統(tǒng)架構(gòu)圖:
該圖很好的描述了監(jiān)控系統(tǒng)的關(guān)鍵組件,及數(shù)據(jù)流向。
在源碼分析之前我們先介紹Heapster的實(shí)現(xiàn)流程,由上圖可以看出Heapster會(huì)從各個(gè)Node上kubelet獲取相關(guān)的監(jiān)控信息,然后進(jìn)行匯總發(fā)送給后臺(tái)數(shù)據(jù)庫(kù)InfluxDB。
這里會(huì)涉及到幾個(gè)關(guān)鍵點(diǎn):
k8s集群會(huì)增刪Nodes,Heapster需要獲取這些sources并做相應(yīng)的操作
Heapster后端數(shù)據(jù)庫(kù)怎么存儲(chǔ)?是否支持多后端?
Heapster獲取到數(shù)據(jù)后推送給后端數(shù)據(jù)庫(kù),那么其提供了API的數(shù)據(jù)該從何處獲取?本地cache?
Heapster從kubelet獲取到的數(shù)據(jù)是否需要處理?還是能直接存儲(chǔ)到后端
等等..
一起分析完heapster源碼實(shí)現(xiàn),就能進(jìn)行解惑了。
啟動(dòng)命令
先列出我解析源碼時(shí)所用的命令,及參數(shù)使用,便于后面的理解。
# heapster --source=kubernetes:http://<master-ip>:8080?inClusterConfig=false\&useServiceAccount=false --sink=influxdb:http://<influxdb-ip>:8086啟動(dòng)流程
從Heapster的啟動(dòng)流程開(kāi)始分析其實(shí)現(xiàn),前面做了簡(jiǎn)單的分析,可以帶著問(wèn)題去看源碼會(huì)有更好的收獲。
main()
路徑: heapster/metrics/heapster.go
func main() {...// 根據(jù)--source參數(shù)的輸入來(lái)創(chuàng)建數(shù)據(jù)源// 我們這里會(huì)使用kubernetes,下面會(huì)根據(jù)k8s來(lái)解析sourceFactory := sources.NewSourceFactory()// 創(chuàng)建該sourceProvider時(shí),會(huì)創(chuàng)建Node的ListWatch,用于監(jiān)控k8s節(jié)點(diǎn)的增刪情況,因?yàn)檫@些才是數(shù)據(jù)的真實(shí)來(lái)源.// 該sourceProvider會(huì)包含nodeLister,還有kubeletClient,用于跟各個(gè)節(jié)點(diǎn)的kubelet通信,獲取cadvisor數(shù)據(jù)sourceProvider, err := sourceFactory.BuildAll(argSources)if err != nil {glog.Fatalf("Failed to create source provide: %v", err)}// 創(chuàng)建sourceManager,其實(shí)就是sourceProvider + ScrapeTimeout,用于超時(shí)獲取數(shù)據(jù)sourceManager, err := sources.NewSourceManager(sourceProvider, sources.DefaultMetricsScrapeTimeout)if err != nil {glog.Fatalf("Failed to create source manager: %v", err)}// 根據(jù)--sink創(chuàng)建數(shù)據(jù)存儲(chǔ)后端// 我們這里會(huì)使用influxDB,來(lái)作為數(shù)據(jù)的存儲(chǔ)后端sinksFactory := sinks.NewSinkFactory()// 創(chuàng)建sinks時(shí)會(huì)返回各類對(duì)象:// metricSink: 可以理解為本地的metrics數(shù)據(jù)池,Heapster API獲取到的數(shù)據(jù)都是從該對(duì)象中獲取的,默認(rèn)一定會(huì)創(chuàng)建// sinkList: Heapster在新版本中支持多后端數(shù)據(jù)存儲(chǔ),比如你可以指定多個(gè)不同的influxDB,也可以同時(shí)指定influxDB和Elasticsearch。// historicalSource: 需要配置,我們暫時(shí)沒(méi)有用到metricSink, sinkList, historicalSource := sinksFactory.BuildAll(argSinks, *argHistoricalSource)if metricSink == nil {glog.Fatal("Failed to create metric sink")}if historicalSource == nil && len(*argHistoricalSource) > 0 {glog.Fatal("Failed to use a sink as a historical metrics source")}for _, sink := range sinkList {glog.Infof("Starting with %s", sink.Name())}// 創(chuàng)建sinkManager,會(huì)根據(jù)之前的sinkList,創(chuàng)建對(duì)應(yīng)數(shù)量的協(xié)程,用于從sink的數(shù)據(jù)管道中獲取數(shù)據(jù),然后推送到對(duì)應(yīng)的后端sinkManager, err := sinks.NewDataSinkManager(sinkList, sinks.DefaultSinkExportDataTimeout, sinks.DefaultSinkStopTimeout)if err != nil {glog.Fatalf("Failed to created sink manager: %v", err)}// 創(chuàng)建對(duì)象,用于處理各個(gè)kubelet獲取到的metrics數(shù)據(jù)// 最終都會(huì)加入到dataProcessors,在最終的處理函數(shù)中會(huì)進(jìn)行遍歷并調(diào)用其process()metricsToAggregate := []string{core.MetricCpuUsageRate.Name,core.MetricMemoryUsage.Name,core.MetricCpuRequest.Name,core.MetricCpuLimit.Name,core.MetricMemoryRequest.Name,core.MetricMemoryLimit.Name,}metricsToAggregateForNode := []string{core.MetricCpuRequest.Name,core.MetricCpuLimit.Name,core.MetricMemoryRequest.Name,core.MetricMemoryLimit.Name,}// 速率計(jì)算對(duì)象dataProcessors := []core.DataProcessor{// Convert cumulaties to rateprocessors.NewRateCalculator(core.RateMetricsMapping),}kubernetesUrl, err := getKubernetesAddress(argSources)if err != nil {glog.Fatalf("Failed to get kubernetes address: %v", err)}kubeConfig, err := kube_config.GetKubeClientConfig(kubernetesUrl)if err != nil {glog.Fatalf("Failed to get client config: %v", err)}kubeClient := kube_client.NewOrDie(kubeConfig)// 會(huì)創(chuàng)建podLister、nodeLister、namespaceLister,用于從k8s watch各個(gè)資源的增刪情況// 防止獲取數(shù)據(jù)失敗podLister, err := getPodLister(kubeClient)if err != nil {glog.Fatalf("Failed to create podLister: %v", err)}nodeLister, err := getNodeLister(kubeClient)if err != nil {glog.Fatalf("Failed to create nodeLister: %v", err)}podBasedEnricher, err := processors.NewPodBasedEnricher(podLister)if err != nil {glog.Fatalf("Failed to create PodBasedEnricher: %v", err)}dataProcessors = append(dataProcessors, podBasedEnricher)namespaceBasedEnricher, err := processors.NewNamespaceBasedEnricher(kubernetesUrl)if err != nil {glog.Fatalf("Failed to create NamespaceBasedEnricher: %v", err)}dataProcessors = append(dataProcessors, namespaceBasedEnricher)// 這里的對(duì)象append順序會(huì)有一定的要求// 比如Pod的有些數(shù)據(jù)需要進(jìn)行containers數(shù)據(jù)的累加得到dataProcessors = append(dataProcessors,processors.NewPodAggregator(),&processors.NamespaceAggregator{MetricsToAggregate: metricsToAggregate,},&processors.NodeAggregator{MetricsToAggregate: metricsToAggregateForNode,},&processors.ClusterAggregator{MetricsToAggregate: metricsToAggregate,})nodeAutoscalingEnricher, err := processors.NewNodeAutoscalingEnricher(kubernetesUrl)if err != nil {glog.Fatalf("Failed to create NodeAutoscalingEnricher: %v", err)}dataProcessors = append(dataProcessors, nodeAutoscalingEnricher)// 這是整個(gè)Heapster功能的關(guān)鍵處// 根據(jù)sourceManger、sinkManager、dataProcessors來(lái)創(chuàng)建manager對(duì)象manager, err := manager.NewManager(sourceManager, dataProcessors, sinkManager, *argMetricResolution,manager.DefaultScrapeOffset, manager.DefaultMaxParallelism)if err != nil {glog.Fatalf("Failed to create main manager: %v", err)}// 開(kāi)始創(chuàng)建協(xié)程,從各個(gè)sources獲取metrics數(shù)據(jù),并經(jīng)過(guò)dataProcessors的處理,然后export到各個(gè)用于后端數(shù)據(jù)存儲(chǔ)的sinksmanager.Start()// 以下的就是創(chuàng)建Heapster server,用于提供各類API// 通過(guò)http.mux及go-restful進(jìn)行實(shí)現(xiàn)// 新版的heapster還支持TLShandler := setupHandlers(metricSink, podLister, nodeLister, historicalSource)addr := fmt.Sprintf("%s:%d", *argIp, *argPort)glog.Infof("Starting heapster on port %d", *argPort)mux := http.NewServeMux()promHandler := prometheus.Handler()if len(*argTLSCertFile) > 0 && len(*argTLSKeyFile) > 0 {if len(*argTLSClientCAFile) > 0 {authPprofHandler, err := newAuthHandler(handler)if err != nil {glog.Fatalf("Failed to create authorized pprof handler: %v", err)}handler = authPprofHandlerauthPromHandler, err := newAuthHandler(promHandler)if err != nil {glog.Fatalf("Failed to create authorized prometheus handler: %v", err)}promHandler = authPromHandler}mux.Handle("/", handler)mux.Handle("/metrics", promHandler)healthz.InstallHandler(mux, healthzChecker(metricSink))// If allowed users is set, then we need to enable Client Authenticationif len(*argAllowedUsers) > 0 {server := &http.Server{Addr: addr,Handler: mux,TLSConfig: &tls.Config{ClientAuth: tls.RequestClientCert},}glog.Fatal(server.ListenAndServeTLS(*argTLSCertFile, *argTLSKeyFile))} else {glog.Fatal(http.ListenAndServeTLS(addr, *argTLSCertFile, *argTLSKeyFile, mux))}} else {mux.Handle("/", handler)mux.Handle("/metrics", promHandler)healthz.InstallHandler(mux, healthzChecker(metricSink))glog.Fatal(http.ListenAndServe(addr, mux))} }介紹了Heapster的啟動(dòng)流程后,大致能明白了該啟動(dòng)過(guò)程分為幾個(gè)關(guān)鍵點(diǎn):
創(chuàng)建數(shù)據(jù)源對(duì)象
創(chuàng)建后端存儲(chǔ)對(duì)象list
創(chuàng)建處理metrics數(shù)據(jù)的processors
創(chuàng)建manager,并開(kāi)啟數(shù)據(jù)的獲取及export的協(xié)程
開(kāi)啟Heapster server,并支持各類API
下面進(jìn)行一一介紹。
創(chuàng)建數(shù)據(jù)源
先介紹下相關(guān)的結(jié)構(gòu)體,因?yàn)檫@才是作者的核心思想。
創(chuàng)建的sourceProvider是實(shí)現(xiàn)了MetricsSourceProvider接口的對(duì)象。
先看下MetricsSourceProvider:
每個(gè)最終返回的對(duì)象,都需要提供GetMetricsSources(),看字面意識(shí)就可以知道就是提供所有的獲取Metrics源頭的接口。
我們的參數(shù)--source=kubernetes,所以其實(shí)我們真實(shí)返回的結(jié)構(gòu)是kubeletProvider.
路徑: heapster/metrics/sources/kubelet/kubelet.go
結(jié)構(gòu)介紹完了,看下具體的創(chuàng)建過(guò)程,跟kubernetes相關(guān)的關(guān)鍵接口是NewKubeletProvider():
func NewKubeletProvider(uri *url.URL) (MetricsSourceProvider, error) {// 創(chuàng)建kubernetes master及kubelet client相關(guān)的配置kubeConfig, kubeletConfig, err := GetKubeConfigs(uri)if err != nil {return nil, err}// 創(chuàng)建kubeClient及kubeletClientkubeClient := kube_client.NewOrDie(kubeConfig)kubeletClient, err := NewKubeletClient(kubeletConfig)if err != nil {return nil, err}// 獲取下所有的Nodes,測(cè)試下創(chuàng)建的client是否能正常通訊if _, err := kubeClient.Nodes().List(kube_api.ListOptions{LabelSelector: labels.Everything(),FieldSelector: fields.Everything()}); err != nil {glog.Errorf("Failed to load nodes: %v", err)}// 監(jiān)控k8s的nodes變更// 這里會(huì)創(chuàng)建協(xié)程進(jìn)行watch,便于后面調(diào)用nodeLister.List()列出所有的nodes。// 該Watch的實(shí)現(xiàn),需要看下apiServer中的實(shí)現(xiàn),后面會(huì)進(jìn)行講解lw := cache.NewListWatchFromClient(kubeClient, "nodes", kube_api.NamespaceAll, fields.Everything())nodeLister := &cache.StoreToNodeLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}reflector := cache.NewReflector(lw, &kube_api.Node{}, nodeLister.Store, time.Hour)reflector.Run()// 結(jié)構(gòu)在前面介紹過(guò)return &kubeletProvider{nodeLister: nodeLister,reflector: reflector,kubeletClient: kubeletClient,}, nil }該過(guò)程會(huì)涉及到較多的技術(shù)點(diǎn),比如apiServer中的watch實(shí)現(xiàn),reflector的使用。這里不會(huì)進(jìn)行細(xì)講,該文章主要是針對(duì)heapster的源碼實(shí)現(xiàn),apiServer相關(guān)的實(shí)現(xiàn)后面會(huì)進(jìn)行單獨(dú)輸出。
這里需要注意的是創(chuàng)建了ListWath,需要關(guān)注后面哪里用到了nodeLister.List()進(jìn)行nodes的獲取。
創(chuàng)建后端服務(wù)
前面已經(jīng)提到后端數(shù)據(jù)存儲(chǔ)會(huì)有兩處,一個(gè)是metricSink,另一個(gè)是influxdbSink。所以這里會(huì)涉及到兩個(gè)結(jié)構(gòu):
type MetricSink struct {// 鎖lock sync.Mutex// 長(zhǎng)時(shí)間存儲(chǔ)metrics數(shù)據(jù),默認(rèn)時(shí)間是15minlongStoreMetrics []stringlongStoreDuration time.Duration// 短時(shí)間存儲(chǔ)metrics數(shù)據(jù),默認(rèn)時(shí)間是140sshortStoreDuration time.Duration// 短時(shí)存儲(chǔ)空間shortStore []*core.DataBatch// 長(zhǎng)時(shí)存儲(chǔ)空間longStore []*multimetricStore }該結(jié)構(gòu)就是用于heapster API調(diào)用時(shí)獲取的數(shù)據(jù)源,這里會(huì)分為兩種數(shù)據(jù)存儲(chǔ)方式:長(zhǎng)時(shí)存儲(chǔ)和短時(shí)存儲(chǔ)。所以集群越大時(shí),heapster占用內(nèi)存越多,需要考慮該問(wèn)題如何處理或者優(yōu)化。
type influxdbSink struct {// 連接后端influxDB數(shù)據(jù)庫(kù)的clientclient influxdb_common.InfluxdbClient// 鎖sync.RWMutexc influxdb_common.InfluxdbConfigdbExists bool }這個(gè)就是我們配置的InfluxDB的結(jié)構(gòu),是我們真正的數(shù)據(jù)存儲(chǔ)后端。
開(kāi)始介紹創(chuàng)建后端服務(wù)流程,從sinksFactory.BuildAll()接口直接入手。
路徑: heapster/metrics/sinks/factory.go
該接口流程比較簡(jiǎn)單,就是對(duì)傳入?yún)?shù)進(jìn)行判斷,然后調(diào)用this.Build()進(jìn)行創(chuàng)建,這里只需要注意即使沒(méi)有配置metric,也會(huì)進(jìn)行metricSink的創(chuàng)建。
func (this *SinkFactory) Build(uri flags.Uri) (core.DataSink, error) {switch uri.Key {。。。case "influxdb":return influxdb.CreateInfluxdbSink(&uri.Val)。。。case "metric":return metricsink.NewMetricSink(140*time.Second, 15*time.Minute, []string{core.MetricCpuUsageRate.MetricDescriptor.Name,core.MetricMemoryUsage.MetricDescriptor.Name}), nil。。。default:return nil, fmt.Errorf("Sink not recognized: %s", uri.Key)} }influxdb的創(chuàng)建其實(shí)就是根據(jù)傳入的參數(shù)然后創(chuàng)建一個(gè)config結(jié)構(gòu),用于后面創(chuàng)建連接influxDB的client;
metric的創(chuàng)建其實(shí)就是初始化了一個(gè)MetricSink結(jié)構(gòu),需要注意的是傳入的第三個(gè)參數(shù),因?yàn)檫@是用于指定哪些metrics需要進(jìn)行長(zhǎng)時(shí)間存儲(chǔ),默認(rèn)就是cpu/usage和memory/usage,因?yàn)檫@兩個(gè)參數(shù)用戶最為關(guān)心。
具體的創(chuàng)建接口就不在深入了,較為簡(jiǎn)單。
到這里BuildAll()就結(jié)束了,至于返回值前面已經(jīng)做過(guò)介紹,就不在累贅了。
其實(shí)沒(méi)那么簡(jiǎn)單,還有一步:sinkManager的創(chuàng)建。
進(jìn)入sinks.NewDataSinkManager()接口看下:
這里會(huì)為每個(gè)sink創(chuàng)建協(xié)程,等待數(shù)據(jù)的到來(lái)并最終將數(shù)據(jù)導(dǎo)入到對(duì)應(yīng)的后端數(shù)據(jù)庫(kù)。
這里需要帶個(gè)問(wèn)號(hào),既然channel有一端在收,總得有地方會(huì)發(fā)送,這會(huì)在后面才會(huì)揭曉。
go協(xié)程 + channel的方式,是golang最常見(jiàn)的方式,確實(shí)便用。
創(chuàng)建數(shù)據(jù)Processors
因?yàn)閏Advisor返回的原始數(shù)據(jù)就包含了nodes和containers的相關(guān)數(shù)據(jù),所以heapster需要?jiǎng)?chuàng)建各種processor,用于處理成不同類型的數(shù)據(jù),比如pod, namespace, cluster,node。
還有些數(shù)據(jù)需要計(jì)算出速率,有些數(shù)據(jù)需要進(jìn)行累加,不同類型擁有的metrics還不一樣等等情況。
看下源碼:
Processors的功能基本就是這樣了,相對(duì)有點(diǎn)復(fù)雜,數(shù)據(jù)處理的樣式和類別較多。
各個(gè)對(duì)象的Process()方法就不進(jìn)行一一介紹了,就是按照順序一個(gè)一個(gè)的填充core.DataBatch數(shù)據(jù)。有興趣的可以逐個(gè)看下,可以借鑒下實(shí)現(xiàn)的方式。
獲取源數(shù)據(jù)并存儲(chǔ)
前面的都是鋪墊,開(kāi)始介紹heapster的關(guān)鍵實(shí)現(xiàn),進(jìn)行源數(shù)據(jù)的獲取,并導(dǎo)出到后端存儲(chǔ)。
先介紹相關(guān)結(jié)構(gòu):
Manager是需要實(shí)現(xiàn)Start和stop方法的接口。而真實(shí)創(chuàng)建的對(duì)象其實(shí)是realManager:
type realManager struct {// 數(shù)據(jù)源source core.MetricsSource// 數(shù)據(jù)處理對(duì)象processors []core.DataProcessor// 后端存儲(chǔ)對(duì)象sink core.DataSink// 每次scrape數(shù)據(jù)的時(shí)間間隔resolution time.Duration// 創(chuàng)建多個(gè)scrape協(xié)程時(shí),需要sleep這點(diǎn)時(shí)間,防止異常scrapeOffset time.Duration// scrape 停止的管道stopChan chan struct{}// housekeepSemaphoreChan chan struct{}// 超時(shí)housekeepTimeout time.Duration }關(guān)鍵的代碼如下:
manager, err := manager.NewManager(sourceManager, dataProcessors, sinkManager, *argMetricResolution,manager.DefaultScrapeOffset, manager.DefaultMaxParallelism)if err != nil {glog.Fatalf("Failed to create main manager: %v", err)}manager.Start()首先會(huì)根據(jù)前面創(chuàng)建的sourceManager, dataProcessors, sinkManager對(duì)象,再創(chuàng)建manager。
路徑: heapster/metrics/manager/manager.go
前面介紹了該關(guān)鍵結(jié)構(gòu)readlManager,繼續(xù)進(jìn)入manager.Start():
func (rm *realManager) Start() {go rm.Housekeep() }func (rm *realManager) Housekeep() {for {// Always try to get the newest metricsnow := time.Now()// 獲取數(shù)據(jù)的時(shí)間段,默認(rèn)是1minstart := now.Truncate(rm.resolution)end := start.Add(rm.resolution)// 真正同步一次的時(shí)間間隔,默認(rèn)是1min + 5stimeToNextSync := end.Add(rm.scrapeOffset).Sub(now)select {case <-time.After(timeToNextSync):rm.housekeep(start, end)case <-rm.stopChan:rm.sink.Stop()return}} }繼續(xù)看rm.housekeep(start, end), 該接口就傳入了時(shí)間區(qū)間,其實(shí)cAdvisor就是支持時(shí)間區(qū)間來(lái)獲取metrics值。
func (rm *realManager) housekeep(start, end time.Time) {if !start.Before(end) {glog.Warningf("Wrong time provided to housekeep start:%s end: %s", start, end)return}select {case <-rm.housekeepSemaphoreChan:// ok, good to gocase <-time.After(rm.housekeepTimeout):glog.Warningf("Spent too long waiting for housekeeping to start")return}go func(rm *realManager) {defer func() { rm.housekeepSemaphoreChan <- struct{}{} }()// 從sources獲取數(shù)據(jù)data := rm.source.ScrapeMetrics(start, end)// 遍歷processors,然后進(jìn)行數(shù)據(jù)處理for _, p := range rm.processors {newData, err := process(p, data)if err == nil {data = newData} else {glog.Errorf("Error in processor: %v", err)return}}// 最終將數(shù)據(jù)導(dǎo)出到后端存儲(chǔ)rm.sink.ExportData(data)}(rm) }邏輯比較簡(jiǎn)單,會(huì)有三個(gè)關(guān)鍵:
源數(shù)據(jù)獲取
數(shù)據(jù)處理
導(dǎo)出到后端
先看下rm.source.ScrapeMetrics()接口實(shí)現(xiàn).
路徑: heapster/metrics/sources/manager.go
該接口的邏輯就是先通過(guò)nodeLister獲取k8s所有的nodes,這樣便能知道所有的kubelet信息,然后創(chuàng)建對(duì)應(yīng)數(shù)量的協(xié)程從各個(gè)kubelet中獲取對(duì)應(yīng)的cAdvisor監(jiān)控信息,進(jìn)行處理后再返回。
獲取到數(shù)據(jù)后,就需要調(diào)用各個(gè)processors的Process()接口進(jìn)行數(shù)據(jù)處理,接口太多就不一一介紹了,挑個(gè)node_aggregator.go進(jìn)行介紹:
基本流程就是這樣了,有需要的可以各個(gè)深入查看。
最后就是數(shù)據(jù)的后端存儲(chǔ)。
這里會(huì)涉及到兩部分:metricSink和influxdbSink。
從rm.sink.ExportData(data)接口入手:
路徑: heapster/metrics/sinks/manager.go
千辛萬(wàn)苦,你把數(shù)據(jù)丟入sh.dataBatchChannel完事了?
dataBatchChannel有點(diǎn)眼熟,因?yàn)橹皠?chuàng)建sinkManager的時(shí)候,也創(chuàng)建了協(xié)程并監(jiān)聽(tīng)了該管道,所以真正export數(shù)據(jù)是在之前就完成了,這里只需要把數(shù)據(jù)丟入管道即可。
所以golang中協(xié)程與協(xié)程之間的通信,channel才是王道啊!
ExportData有兩個(gè),一個(gè)一個(gè)講吧。
先來(lái)關(guān)鍵的influxDB.
路徑: heapster/metrics/sinks/influxdb/influxdb.go
該接口中有一處不太明白,metricSet中的LabeledMetrics和MetricsValue有何差別,為何要將filesystem的數(shù)據(jù)進(jìn)行區(qū)分對(duì)待,放入LabeldMetrics?
看代碼的過(guò)程中沒(méi)有得到答案,望大神指點(diǎn)迷津,多謝多謝!
有問(wèn)題,但也不影響繼續(xù)往下學(xué)習(xí),接著看下MetricSink:
func (this *MetricSink) ExportData(batch *core.DataBatch) {this.lock.Lock()defer this.lock.Unlock()now := time.Now()// 將數(shù)據(jù)丟入longStore和shortStore// 需要根據(jù)保存的時(shí)間將老數(shù)據(jù)丟棄this.longStore = append(popOldStore(this.longStore, now.Add(-this.longStoreDuration)),buildMultimetricStore(this.longStoreMetrics, batch))this.shortStore = append(popOld(this.shortStore, now.Add(-this.shortStoreDuration)), batch) }該邏輯比較簡(jiǎn)單,就是將數(shù)據(jù)丟入兩個(gè)Store中,然后把過(guò)期數(shù)據(jù)丟棄。
這里提醒一點(diǎn),heapster API調(diào)用時(shí)先會(huì)從longStore中匹配數(shù)據(jù),沒(méi)匹配上的話再?gòu)膕hortStore獲取,而longStore中存儲(chǔ)的數(shù)據(jù)類型前面已經(jīng)做過(guò)介紹。
終于結(jié)束了。。
Heapster API創(chuàng)建
前面的主流業(yè)務(wù)都介紹完了,Heapster本身也提供了API用于開(kāi)發(fā)者進(jìn)行使用與測(cè)試。
繼續(xù)分析代碼吧:
這里的關(guān)鍵是setupHandlers()接口,需要學(xué)習(xí)下里面如何使用go-restful進(jìn)行請(qǐng)求路由的。
k8s apiServer中也大量使用了go-restful,在學(xué)習(xí)該源碼時(shí)有進(jìn)行過(guò)分析
路徑: heapster/metrics/handlers.go
func setupHandlers(metricSink *metricsink.MetricSink, podLister *cache.StoreToPodLister, nodeLister *cache.StoreToNodeLister, historicalSource core.HistoricalSource) http.Handler {runningInKubernetes := true// 創(chuàng)建container,指定route類型為CurlyRouter// 這些都跟go-restful基礎(chǔ)有關(guān),有興趣的可以看下原理wsContainer := restful.NewContainer()wsContainer.EnableContentEncoding(true)wsContainer.Router(restful.CurlyRouter{})// 注冊(cè)v1版本相關(guān)的api,包括官方介紹的"/api/v1/model"a := v1.NewApi(runningInKubernetes, metricSink, historicalSource)a.Register(wsContainer)// 這個(gè)metricsApi注冊(cè)了"/apis/metrics/v1alpha1"的各類命令// 暫不關(guān)心m := metricsApi.NewApi(metricSink, podLister, nodeLister)m.Register(wsContainer)handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath)switch name {case "profile":pprof.Profile(resp, req.Request)case "symbol":pprof.Symbol(resp, req.Request)case "cmdline":pprof.Cmdline(resp, req.Request)default:pprof.Index(resp, req.Request)}}// Setup pporf handlers.ws = new(restful.WebService).Path(pprofBasePath)ws.Route(ws.GET("/{subpath:*}").To(metrics.InstrumentRouteFunc("pprof", handlePprofEndpoint))).Doc("pprof endpoint")wsContainer.Add(ws)return wsContainer }關(guān)鍵在于v1版本的API注冊(cè),繼續(xù)深入a.Register(wsContainer):
func (a *Api) Register(container *restful.Container) {// 注冊(cè)"/api/v1/metric-export" API// 用于從shortStore中獲取所有的metrics信息ws := new(restful.WebService)ws.Path("/api/v1/metric-export").Doc("Exports the latest point for all Heapster metrics").Produces(restful.MIME_JSON)ws.Route(ws.GET("").To(a.exportMetrics).Doc("export the latest data point for all metrics").Operation("exportMetrics").Writes([]*types.Timeseries{}))// ws必須要add到container中才能生效container.Add(ws)// 注冊(cè)"/api/v1/metric-export-schema" API// 用于導(dǎo)出所有的metrics name,比如network-rx// 還會(huì)導(dǎo)出還有的labels,比如pod-namews = new(restful.WebService)ws.Path("/api/v1/metric-export-schema").Doc("Schema for metrics exported by heapster").Produces(restful.MIME_JSON)ws.Route(ws.GET("").To(a.exportMetricsSchema).Doc("export the schema for all metrics").Operation("exportmetricsSchema").Writes(types.TimeseriesSchema{}))container.Add(ws)// 注冊(cè)metircSink相關(guān)的API,即"/api/v1/model/"if a.metricSink != nil {glog.Infof("Starting to Register Model.")a.RegisterModel(container)}if a.historicalSource != nil {a.RegisterHistorical(container)} }官方資料中介紹heapster metric model,我們使用到這些API也會(huì)比較多。
進(jìn)入a.RegisterModel(container)看下:
繼續(xù)看addClusterMetricsRoutes():
func addClusterMetricsRoutes(a clusterMetricsFetcher, ws *restful.WebService) {。。。if a.isRunningInKubernetes() {// 列出所有namespaces的APIws.Route(ws.GET("/namespaces/").To(metrics.InstrumentRouteFunc("namespaceList", a.namespaceList)).Doc("Get a list of all namespaces that have some current metrics").Operation("namespaceList"))// 獲取指定namespaces的metricsws.Route(ws.GET("/namespaces/{namespace-name}/metrics").To(metrics.InstrumentRouteFunc("availableNamespaceMetrics", a.availableNamespaceMetrics)).Doc("Get a list of all available metrics for a Namespace entity").Operation("availableNamespaceMetrics").Param(ws.PathParameter("namespace-name", "The name of the namespace to lookup").DataType("string")))// 獲取namespace指定的metrics值ws.Route(ws.GET("/namespaces/{namespace-name}/metrics/{metric-name:*}").To(metrics.InstrumentRouteFunc("namespaceMetrics", a.namespaceMetrics)).Doc("Export an aggregated namespace-level metric").Operation("namespaceMetrics").Param(ws.PathParameter("namespace-name", "The name of the namespace to lookup").DataType("string")).Param(ws.PathParameter("metric-name", "The name of the requested metric").DataType("string")).Param(ws.QueryParameter("start", "Start time for requested metrics").DataType("string")).Param(ws.QueryParameter("end", "End time for requested metric").DataType("string")).Param(ws.QueryParameter("labels", "A comma-separated list of key:values pairs to use to search for a labeled metric").DataType("string")).Writes(types.MetricResult{}))。。。}。。。 }Heapster API的注冊(cè)基本就這樣了,在花點(diǎn)時(shí)間看下API的實(shí)現(xiàn)吧。
我們挑一個(gè)例子做下分析,獲取某個(gè)pod的指定的metrics值.
對(duì)應(yīng)的接口:heapster/metrics/api/v1/model_handler.go
根據(jù)URI的輸入?yún)?shù)并調(diào)用processMetricRequest()接口,獲取對(duì)應(yīng)的metric value:
func (a *Api) processMetricRequest(key string, request *restful.Request, response *restful.Response) {// 時(shí)間區(qū)間start, end, err := getStartEndTime(request)if err != nil {response.WriteError(http.StatusBadRequest, err)return}// 獲取metric Name,比如"/cpu/usage"metricName := request.PathParameter("metric-name")// 根據(jù)metricName進(jìn)行轉(zhuǎn)換,比如將cpu-usage轉(zhuǎn)換成cpu/usage_rate// 所以這里需要注意cpu-usage不等于/cpu/usage,一個(gè)表示cpu使用率,一個(gè)表示cpu使用量convertedMetricName := convertMetricName(metricName)// 獲取請(qǐng)求中的labels,根據(jù)是否有指定labels來(lái)調(diào)用不同的接口labels, err := getLabels(request)if err != nil {response.WriteError(http.StatusBadRequest, err)return}var metrics map[string][]core.TimestampedMetricValueif labels != nil {// 該接口從metricSet.LabeledMetrics中獲取對(duì)應(yīng)的valuemetrics = a.metricSink.GetLabeledMetric(convertedMetricName, labels, []string{key}, start, end)} else {// 該接口先從longStoreMetrics中進(jìn)行匹配,匹配不到的話再?gòu)膕hortStore中獲取對(duì)應(yīng)的metricValuemetrics = a.metricSink.GetMetric(convertedMetricName, []string{key}, start, end)}// 將獲取到的metricValue轉(zhuǎn)換成MetricPoint格式的值,會(huì)有多組"時(shí)間戳+value"converted := exportTimestampedMetricValue(metrics[key])// 將結(jié)果進(jìn)行responseresponse.WriteEntity(converted) }OK,大功告成!API的實(shí)現(xiàn)也講完了,很多API都是相通的,最終都會(huì)調(diào)用相同的接口,所以不一一介紹了。
這里需要注意heapster的API的URI還有多種寫(xiě)法,比如/api/v1/model/cpu-usage,等價(jià)于/api/v1/model/cpu/usage_rate/,別誤理解成/cpu/usage了,這兩個(gè)概念不一樣,一個(gè)是cpu使用率,一個(gè)是cpu使用量。
上面的提醒告訴我們,沒(méi)事多看源碼,很多誤解自然而然就解除了!
筆者能力有限,看源碼也在于學(xué)習(xí)提升能力,當(dāng)然也會(huì)有較多不理解或者理解不當(dāng)?shù)牡胤?#xff0c;希望各位能予以矯正,多謝多謝!
擴(kuò)展
上面的介紹完了Heapster的實(shí)現(xiàn),我們可以思考下是否可以動(dòng)手修改源碼,比如增加一些對(duì)象的metrics信息。
筆者考慮是否可以直接支持RC/RS/Deployment的metrics信息,讓業(yè)務(wù)層可以直接拿到服務(wù)的整體信息。
參考資料
Heapster官方資料:https://github.com/kubernetes...
InfluxDB github: https://github.com/influxdata...
總結(jié)
以上是生活随笔為你收集整理的Kubernetes监控之Heapster源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: [Miller-Rabin Polla
- 下一篇: 使用supervisor启动hbase