Kubernetes:kube-apiserver 之启动流程(二)
接著 Kubernetes:kube-apiserver 之啟動流程(一) 加以介紹。
1.2.2 創建 APIExtensions Server
創建完通用 APIServer 后繼續創建 APIExtensions Server。
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
	genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
	s := &CustomResourceDefinitions{
		GenericAPIServer: genericServer,
	}
    // 存儲建立 REST API 到資源實體的信息
    apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
    // 資源實體
	storage := map[string]rest.Storage{}
	// customresourcedefinitions
	if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
        // 創建資源實體
		customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
		if err != nil {
			return nil, err
		}
		storage[resource] = customResourceDefinitionStorage
		storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
	}
	if len(storage) > 0 {
		apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
	}
	if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
		return nil, err
	}
APIGroupInfo 對象用于描述資源組信息,storage 存儲資源到資源實體的對應關系。
資源實體,通過 NewREST() 函數創建。
# kubernetes/vendor/k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go
package customresourcedefinition
// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
	strategy := NewStrategy(scheme)
	store := &genericregistry.Store{
		NewFunc:                   func() runtime.Object { return &apiextensions.CustomResourceDefinition{} },
		NewListFunc:               func() runtime.Object { return &apiextensions.CustomResourceDefinitionList{} },
		PredicateFunc:             MatchCustomResourceDefinition,
		DefaultQualifiedResource:  apiextensions.Resource("customresourcedefinitions"),
		SingularQualifiedResource: apiextensions.Resource("customresourcedefinition"),
		CreateStrategy:      strategy,
		UpdateStrategy:      strategy,
		DeleteStrategy:      strategy,
		ResetFieldsStrategy: strategy,
		// TODO: define table converter that exposes more than name/creation timestamp
		TableConvertor: rest.NewDefaultTableConvertor(apiextensions.Resource("customresourcedefinitions")),
	}
	options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
	if err := store.CompleteWithOptions(options); err != nil {
		return nil, err
	}
	return &REST{store}, nil
}
可以看到,資源實體是在資源包 customresourcedefinition 的 etcd.go 中創建的,創建的資源實體負責和 etcd 交互。
(關于 etcd 交互的部分先不講,后續會專門介紹。)
創建完資源實體后,通過 apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage 將資源實體存儲到 apiGroupInfo。
繼續調用 InstallAPIGroup(apiGroupInfo *APIGroupInfo) 安裝 REST API。
# kubernetes/vendor/k8s.io/apiserver/pkg/server/genericapiserver.go
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
	return s.InstallAPIGroups(apiGroupInfo)
}
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
    for _, apiGroupInfo := range apiGroupInfos {
        // 調用 installAPIResources
		if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
			return fmt.Errorf("unable to install api resources: %v", err)
		}
    }
    s.DiscoveryGroupManager.AddGroup(apiGroup)
	s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
}
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, typeConverter managedfields.TypeConverter) error {
	for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
		apiGroupVersion, err := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
		if err != nil {
			return err
		}
        // 調用 InstallREST
		discoveryAPIResources, r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
    }
}
# kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/groupversion.go
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2beta1.APIResourceDiscovery, []*storageversion.ResourceInfo, error) {
	prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
	installer := &APIInstaller{
		group:             g,
		prefix:            prefix,
		minRequestTimeout: g.MinRequestTimeout,
	}
    // 調用 Install
	apiResources, resourceInfos, ws, registrationErrors := installer.Install()
	container.Add(ws)
	return aggregatedDiscoveryResources, removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
# kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/installer.go
// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
	for _, path := range paths {
        // 注冊資源 Handler
		apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
		if err != nil {
			errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
		}
		if apiResource != nil {
			apiResources = append(apiResources, *apiResource)
		}
		if resourceInfo != nil {
			resourceInfos = append(resourceInfos, resourceInfo)
		}
	}
	return apiResources, resourceInfos, ws, errors
}
如上例所示,注冊資源 REST API 的調用鏈很長,通過逐層調用,走到 registerResourceHandlers 注冊資源 handler。
registerResourceHandlers 函數非常的長,主要抓一點:注冊 RESTful API 的資源 handler 需要什么?回答好這個問題基本上就能抓住 registerResourceHandlers 的精髓了。
注冊資源 handler 需要知道資源的 API path 和資源實體(指和 etcd 交互的資源實例)的對應關系,接著需要知道哪些 action 可以訪問 API path。
圍繞這兩塊看資源 handler 的注冊過程。
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
	// what verbs are supported by the storage, used to know what verbs we support per path
	creater, isCreater := storage.(rest.Creater)
	namedCreater, isNamedCreater := storage.(rest.NamedCreater)
	lister, isLister := storage.(rest.Lister)
	getter, isGetter := storage.(rest.Getter)
	getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
	gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
	collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
	updater, isUpdater := storage.(rest.Updater)
	patcher, isPatcher := storage.(rest.Patcher)
	watcher, isWatcher := storage.(rest.Watcher)
	connecter, isConnecter := storage.(rest.Connecter)
	storageMeta, isMetadata := storage.(rest.StorageMetadata)
	storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
	gvAcceptor, _ := storage.(rest.GroupVersionAcceptor)
	// Get the list of actions for the given scope.
	switch {
	case !namespaceScoped:
		...
	default:
		// Handler for standard REST verbs (GET, PUT, POST and DELETE).
		actions := []action{}
		actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
		actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
		actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
		// DEPRECATED in 1.11
		actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)
		actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
		if getSubpath {
			actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
		}
		actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
		actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
		actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
		// DEPRECATED in 1.11
		actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
		actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
		actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
	}
	for _, action := range actions {
		switch action.Verb {
		case "GET": // Get a resource.
			var handler restful.RouteFunction
			if isGetterWithOptions {
				handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
			} else {
				handler = restfulGetResource(getter, reqScope)
			}
			route := ws.GET(action.Path).To(handler).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Returns(http.StatusOK, "OK", producedObject).
				Writes(producedObject)
			routes = append(routes, route)
		}
		for _, route := range routes {
			route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
				Group:   reqScope.Kind.Group,
				Version: reqScope.Kind.Version,
				Kind:    reqScope.Kind.Kind,
			})
			route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
			ws.Route(route)
		}
	}
}
可以看到,通過資源實體 storage 支持的接口類型可以反射出資源支持的方法。接著將支持的方法加入 actions。actions 存儲的是 action.Verb 和支持的資源 API path 信息。
拿到 actions 后,通過匹配 actions.Verb 建立 action.Verb -> action.Path -> handler 的路由。其中,創建 handler 需要用到 storage,因為 handler 是通過 storage 和 etcd 交互的。
通過 go-restful 庫建立上述路由,接著 ws.Route(route) 將 route 加入到 restful.WebService 中。
回頭看 InstallREST。
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2beta1.APIResourceDiscovery, []*storageversion.ResourceInfo, error) {
	apiResources, resourceInfos, ws, registrationErrors := installer.Install()
	container.Add(ws)
	return aggregatedDiscoveryResources, removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, typeConverter managedfields.TypeConverter) error {
	discoveryAPIResources, r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
}
通過 container.Add(ws) 將 ws 添加到 go-restful 的 container 中,該 container 即為 APIExtensions Server 的 Handler 所提供的 GoRestfulContainer。
這里介紹了 APIExtensions Server 的 REST API 創建過程。對于 KubeAPIServer 和 AggregatorServer 的創建過程與之類似,不過多介紹。
REST API 創建好以后,下一步就到如何運行 kube-apiserver了。
1.3 運行 kube-apiserver
kube-apiserver 作為提供 RESTful API 的組件,其運行主要是監聽端口和啟動服務。理清了這點,就能在復雜的運行代碼中找出頭緒。
調用 APIAggregator.PrepareRun 和 preparedAPIAggregator.Run 運行 kube-apiserver。
func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
	prepared, err := server.PrepareRun()
	if err != nil {
		return err
	}
	return prepared.Run(stopCh)
}
啟動過程在 PrepareRun 中。
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
	prepared := s.GenericAPIServer.PrepareRun()
	return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}
運行 prepared.Run(stopCh) 實際調用的是 preparedGenericAPIServer.Run 方法。
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
	// 調用 preparedGenericAPIServer.NonBlockingRun
	stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
	if err != nil {
		return err
	}
}
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
	if s.SecureServingInfo != nil && s.Handler != nil {
		var err error
		// 調用 SecureServingInfo.Serve
		stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
		if err != nil {
			close(internalStopCh)
			return nil, nil, err
		}
	}
}
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
	// 組 http.Server
	secureServer := &http.Server{
		Addr:           s.Listener.Addr().String(),
		Handler:        handler,
		MaxHeaderBytes: 1 << 20,
		TLSConfig:      tlsConfig,
		IdleTimeout:       90 * time.Second, // matches http.DefaultTransport keep-alive timeout
		ReadHeaderTimeout: 32 * time.Second, // just shy of requestTimeoutUpperBound
	}
	return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}
func RunServer(
	server *http.Server,
	ln net.Listener,
	shutDownTimeout time.Duration,
	stopCh <-chan struct{},
) (<-chan struct{}, <-chan struct{}, error) {
	go func() {
		// 調用 http 的 Server.Serve 提供 `RESTful API` 服務
		err := server.Serve(listener)
		msg := fmt.Sprintf("Stopped listening on %s", ln.Addr().String())
	}
}
可以看到,最終調用 http 包的 Server.Serve 提供 RESTful API 服務。
至此,已介紹完 kube-apiserver 從啟動到運行的核心邏輯。下一篇,將重點介紹 kube-apiserver 是怎么和 etcd 進行交互的。
總結
以上是生活随笔為你收集整理的Kubernetes:kube-apiserver 之启动流程(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: Go 方法集合与选择receiver类型
 - 下一篇: Hundred Finance 攻击事件