使用Golang自定义Kubernetes Ingress Controller
在Kubernetes中通過Ingress來暴露服務到集群外部,這個已經是很普遍的方式了,而真正扮演請求轉發的角色是背后的Ingress Controller,比如我們經常使用的traefik、ingress-nginx等就是一個Ingress Controller。本文我們將通過golang來實現一個簡單的自定義的Ingress Controller,可以加深我們對Ingress的理解。
概述
我們在 Kubernetes 集群上往往會運行很多無狀態的 Web 應用,一般來說這些應用是通過一個Deployment和一個對應的Service組成,比如我們在集群上運行一個whoami的應用,對應的資源清單如下所示:(whoami.yaml)
apiVersion: apps/v1 kind: Deployment metadata:name: whoamilabels:app: whoami spec:replicas: 1selector:matchLabels:app: whoamitemplate:metadata:labels:app: whoamispec:containers:- name: whoamiimage: cnych/whoamiports:- containerPort: 80--- kind: Service apiVersion: v1 metadata:name: whoami spec:selector:app: whoamiports:- protocol: TCPport: 80targetPort: 80可以直接使用上面的資源清單部署該應用:
$ kubectl apply -f whoami.yaml
通過部署該應用,在Kubernetes集群內部我們可以通過地址whoami.default.svc.cluster.local來訪問該Web應用,但是在集群外部的用戶應該如何來訪問呢?當然我們可以使用NodePort類型的Service來進行訪問,但是當我們應用越來越多的時候端口的管理也是一個很大的問題,所以一般情況下不采用該方式,之前我們的方法使用DaemonSet在每個邊緣節點上運行一個Nginx應用:
spec:hostNetwork: truecontainers:- iamge: nginx:1.15.3-alpinename: nginxports:- name: httpcontainerPort: 80hostPort: 80通過設置hostNetwork:true,容器將綁定節點的80端口,而不僅僅是容器,這樣我們就可以通過節點的公共IP地址的80端口訪問到Nginx應用了。這種方法理論上肯定是有效的,但是有一個最大的問題就是需要創建一個Nginx配置文件,如果應用有變更,還需要手動修改配置,不能自動發現和熱更新,這對于大量的應用維護的成本顯然太大。這個時候我們就可以用另外一個Kubernets提供的方案了:Ingress。
Ingress 對象
Kubernetes內置就支持通過Ingress對象將外部的域名映射到集群內部服務,我們可以通過如下的Ingress對象來對外暴露服務:
apiVersion: extensions/v1beta1 kind: Ingress metadata:name: whoami spec:tls:- hosts:- "*.qikaqiak.com"secretName: qikqiak-tlsrules:- host: who.qikqiak.comhttp:paths:- path: /backend:serviceName: whoamiservicePort: 80該資源清單盛行了如何將HTTP請求路由到后端服務:
- 任何到域名who.qikqiak.com的請求都將被路由到whoami服務后面的Pod列表中去。
- 如果是HTTPS請求,并且域名匹配*.qikqiak.com,則對請求使用qikqiak - tls這個證書。
這個配置顯然比我們去手動維護Nginx的配置要方便太多了,完全就是自動化的。
Ingress Controllers
上面我們聲明的 Ingress 對象,只是一個集群的資源對象而已,并不會去真正處理我們的請求,這個時候我們還必須安裝一個Ingress Controller,該控制器負責讀取Ingress對象的規則并進行真正的請求處理,簡單來說就是Ingress對象只是一個聲明,Ingress Controllers就是真正的實現。
對于Ingress Controller有很多種選擇,比如我們前面文章大量提到的traefik、或者ingress-nginx等等,我們可以根據自己的需求選擇合適的Ingress Controller安裝即可。
但是實際上, 自定義一個Ingress Controller也是非常簡單的(當然要支持各種請求特性就需要大量的工作了)。
自定義 Ingress Controller
這里我們將用 Golang 來自定義一個簡單的 Ingress Controller,自定義的控制器主要需要實現以下幾個功能:
- 通過Kubernets API查詢和監聽Service、Ingress以及Secret這些對象
- 加載TLS證書用于HTTPS請求
- 根據加載的Kubernetes數據構造一個用于HTTP服務的路由,當然該路由需要非常高效,因為所有傳入的流量都將通過該路由
- 在80和443端口上監聽傳入的HTTP請求,然后根據路由查找對應的后端服務,然后代理請求和響應。443端口將使用TLS證書進行安全連接。
下面我們將來依次介紹上面的實現。
Kubernetes 對象查詢
我們可以通過一個 rest 配置然后調用 NewForConfig 來創建一個 Kubernetes 客戶端,由于我們要通過集群內部的 Service 進行服務的訪問,所以不能在集群外部使用,所以不能使用 kubeconfig 的方式來獲取 Config:
config, err := rest.InClusterConfig() if err != nil {log.Fatabl().Err(err).Msg("get kubernetes configuration failed") } client, err := kubernetes.NewForConfig(config) if err != nil {log.Fatal().Err(er).Msg("create kubernetes client failed") }然后我們創建一個Watcher和Payload,Watcher是來負責查詢Kubernetes和創建Payloads的,Payloads包含了滿足HTTP請求所需要的所有的Kubernetes數據:
// 通過Watcher加載的Kubernetes的數據集合。 type Payload struct {Ingresses []IngressPayloadTLSCertificates map[string]*tls.Certificate }//一個IngressPayload是一個Ingress加上他的服務端口 type IngressPayload struct {Ingress *extensionsv1beta1.IngressServicePorts map[string]map[string]int }另外需要注意除了端口外,Ingress還可以通過端口名稱來引用后端服務的端口,所以我們可以通過查詢相應的Service來填充改數據。
Watcher主要用來監聽Ingress、Service、Secret的變化:
// 在Kubernetes 集群中監聽Ingress對象的Watcher type Watcher struct {client kubernetes.InterfaceonChange func(*Payload) }只要我們檢測到某些變化,就會調用onChange函數。為了實現上面的監聽功能,我們需要使用K8s.io/client-go/informers這個包,該包提供餓了一種類型安全、高效的機制來查詢、list和watch Kubernetes對象,我們只需要為需要的每個對象創建一個SharedInformerFactory以及Listers即可:
func (w *Watcher) Run(ctx context.Context) error {factory := informers.NewSharedInformerFactory(w.client, time.Minute)secretLister := factory.Core().V1().Secrets().Lister()serviceLister := factory.Core().V1().Services().Lister()ingressLister := factory.Extensions().V1beta1().Ingresses().Lister() ... }然后定一個onChange的本地函數,該函數在檢測到變更時隨時調用。我們這里在每種類型的變更時每次都從頭開始重新構建所有的內容,暫時還未考慮性能問題。因為Watcher和HTTP處理都在不同的goroutine中運行,所有我們基本上可以構建一個有效的負載,而不會影響任何正在進行的請求,當然這是一種簡單粗暴的做法。
我們可以通過從listing ingresses對象開始:
ingresses, err := ingressLister.List(labels.Everything()) if err != nil {log.Error().Err(err).Msg("failed to list ingresses")return }對于每個ingress對象,如果有TLS規則,則從secrets對象中加載證書:
for _, rec := range ingress.Spec.TLS {if rec.SecretName != "" {secret, err := secretLister.Secrets(ingress.Namespace).Get(rec.SecretName)if err != nil {log.Error().Err(err).Str("namespace", ingress.Namespace).Str("name", rec.SecretName).Msg("unknown secret")continue}cert, err := tls.X509KyePair(secret.Data["tls.crt"], secret.Data["tls.key"])if err != nil {log.Error().Err(err).Str("namespace", ingress.Namespace).Str("name", rec.SecretName).Msg("invalid tls certificate")continue}payload.TLSCertificates[rec.SecretName] = &cert} }Go語言已經內置了一些和加密相關的包,可以很簡單的處理TLS證書,對于實際的HTTP規則, 這里我們添加了一個addBackend的輔助函數:
addBackend := func(ingressPayload *IngressPayload, backend extensionsv1beta1.IngressBackend) {svc, err := serviceLister.Services(ingressPayload.Ingress.Namespace).Get(backend.ServiceName)if err != nil {log.Error().Err(err).Str("namespace", ingressPayload.Ingress.Namespace).Str("name", backend.ServiceName).Msg("unknown service")} else {m := make(map[string]int)for _, port := range svc.Spec.Ports {m[port.Name] = int(port.Port)}ingressPayload.ServicePorts[svc.Name] = m} }每個HTTP規則和可選的默認規則都會調用該方法:
if ingress.Spec.Backend != nil {addBackend(&ingressPayload, *ingress.Spec.Backend) } for _, rule := range ingress.Spec.Rules {if rule.HTTP != nil {continue}for _, path := range rule.HTTP.Paths {addBackend(&ingressPayload, path.Backend)} }然后調用onChange回調:
w.onChange(payload)
每當發生更改時,都會調用本地onChange函數,最后一步就是啟動我們的informers:
var wg sync.WaitGroup wg.Add(1) go func() {informer := factory.Core().V1().Secrets().Informer()informer.AddEventHandler(handler)informer.Run(ctx.Done())wg.Done() }()wg.Add(1) go func() {informer := factory.Extensions().V1Beta1().Ingresses().Informer()informer.AddEventHandler(handler)informer.Run(ctx.Done())wg.Done() }()wg.Add(1) go func() {informer := factory.Core().V1().Services().Informer()informer.AddEventHandler(handler)informer.Run(ctx.Done())wg.Done() }()wg.Wait()我們這里每個informer都使用同一個handler:
debounced := debounce.New(time.Second) handler := cache.ResourceEventHandlerFuncs {AddFunc: func(obj interface{}) {debounced(onChange)},UpdateFunc: func(oldObj, newObj interface{}) {debounced(onChange)},DeleteFunc: func(obj interface{}) {debounced(onChange)}, }Debouncing(防抖動)是一種避免事件重復的方法,我們設置一個小的延遲,如果在達到延遲之前發生了其他事件,則重啟計時器。
路由表
路由表的目標是通過預先計算大部分查詢相關信息來提高查詢效率,這里我們就需要使用一些高效的數據結構來進行存儲,由于在集群中有大量的路由規則,所以要實現映射查詢既高效又容易理解的最簡單的方法我們能想到的就是使用Map,Map可以為我們提供O(1)效率的查詢,我們這里使用Map進行初始化查找,如果在后面找到了多個規則,則使用切片來存儲這些規則。
一個路由表由兩個Map構成,一個是根據域名映射的證書,一個就是根據域名映射的后端路由表:
type RoutingTable struct {certificatesByHost map[string]map[string]*tls.CertificatebackendsByHost map[string][]routingTableBackend }//NewRoutingTable 創建一個新的路由表 func NewRoutingTable(payload *watcher.Payload) *RoutingTable {rt := &RoutingTable {certificatesByHost: make(map[string]map[string]*tls.Certificate),backendsByHost: make(map[string][]routingTableBackend),}rt.init(payload)return rt }此外路由表下面還有兩個主要的方法:
// GetCertificate 獲得一個證書 func (rt *RoutingTable) GetCertificate(sni string)(*tls.Certificate, error) {hostCerts, ok := rt.certificatesByHost[sni]if ok {for h, cert := range hostCerts {if rt.matches(sni, h) {return cert, nil}}}return nil, errors.New("certificate not found") }// GetBackend 通過給定的 host 和 path 獲取后端程序 func (rt *RoutingTable) GetBackend(host, path string) (*url.URL, error) {// strip the portif idx := strings.IndexByte(host, ':'); idx > 0 {host = host[:idx]}backends := rt.backendsByHost[host]for _, backend := range backends {if backend.matches(path) {return backend.url, nil}}return nil, errors.New("backend not found") }其中GetCertificate來獲取用于安全連接的TLS證書。HTTP處理程序使用GetBackend將請求代理到后端,對于TLS證書,我們還有一個matches方法來處理通配符證書:
func (rt *RoutingTable) matches(sni string, certHost string) bool {for strings.HasPrefix(certHost, "*.") {if idx := strings.IndexByte(sni, '.'); idx >= 0 {sni = sni[idx+1:]} else {return false}certHost = certHost[2:]}return sni == certHost }其實對于后端應用來說,matches方法實際上就是一個正則表達式匹配(因為Ingress對象的path字段定義的是一個正則表達式):
type routingTableBackend struct {pathRE *regexp.Regexpurl *url.URL }func (rtb routingTableBackend) matches(path string) bool {if rtb.pathRE == nil {return true}return rtb.pathRE.MatchString(path) }HTTP Server
最后我們需要來實現一個 HTTP Server,用來接收網絡入口的請求,首先定義一個私有的config結構體:
type config struct {host stringport inttlsPort int }定義一個Option類型:
// config的修改器 type Option func(*config)定義一個設置Option的函數:
// WithHost設置host綁定到config上。 func WithHost(host string) Option {return func(cfg *config) {cfg.host = host} }服務的結構體和構造器如下所示
// 代理HTTP請求 type Server struct {cfg *configroutingTable atomic.Valueready *Event }// New創建一個新的服務 func New(options ...Option) *Server {cfg := defaultConfig()for _, o := range options {o(cfg);}s := &Server {cfg: cfg,raady: NewEvent(),}s.routingTable.Store(NewRoutingTable(nil))return s }通過使用一個合適的默認值,上面的初始化方法可以使大多數客戶端使用變得非常容易,同時還可以根據需要進行靈活的更改,這種API方法在Go語言中是非常普遍的,有很多實際示例,比如gRPC的Dail方法。
除了配置之外,我們的服務器還有指向路由表的指針和一個就緒的時間,用于在第一次設置payload時發出信號。但是需要注意的是,我們這里使用的是atomic.Value來存儲路由表,這是為什么呢?
由于這里我們的應用不是線程安全的,如果在HTTP處理程序嘗試讀取路由表的同時對其進行了修改,則可能導致狀態錯亂或者程序崩潰。所以,我們需要防止同時讀取和寫入這個共享的數據結構,當然有多種方法可以實現該需求:
- 第一種就是我們這里使用的atomic.Value,該類型提供了一個Load和Store的方法,可以允許我們自動讀取/寫入該值。由于我們在每次更改時都會重新構建路由表,所以我們可以在一次操作中安全地交換新舊路由表,這和文檔中的ReadMostly示例非常相似:
不過這種方法的一個缺點是必須在運行時聲明存儲的值類型:
s.routingTable.Load().(*RoutingTable).GetBackend(r.Host, r.URL.Path)
- 另外我們也可以使用Mutex或RWMutex來控制對關鍵區域代碼的訪問:
- 還有一種方法就是讓路由表本身稱為線程安全的,使用sync.Map來代替Map并添加方法來動態更新路由表。一般來說,我會避免使用這種方法,它使代碼更難于理解和維護了,而且如果你實際上最終沒有多個goroutine訪問數據結構的話,就會增加不必要的開銷了。
真正的處理服務的ServeHTTP方法如下所示:
// ServeHTTP 處理 HTTP請求 func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {// 根據請求的域名和Path路徑獲取背后真實的后端地址backendURL, err := s.routingTable.Load().(*RoutingTable).GetBackend(r.Host, r.URL.Path)if err != nil {http.Error(w, "upstream server not found", http.StatusNotFound)return}log.Info().Str("host", r.Host).Str("path", r.URL.Path).Str("backend", backendURL.String()).Msg("proxying request")//對后端真實URL發起代理請求p := httputil.NewSingleHostReverseProxy(backendURL)p.ErrorLog = stdlog.New(log.Logger, "", 0)p.ServeHTTP(w, r) }這里我們使用了httpuril這個包,該包具有反向代理的一些實現方法,我們可以將其用于HTTP服務,它可以將請求轉發到指定的URL上,然后將響應發送回客戶端。
Main 函數
將所有組件組合在一起,然后通過main方法提供入口,我們之類使用flag包來提供一些命令行參數:
func main() {flag.StringVar(&host, "host", "0.0.0.0", "the host to bind")flag.IntVar(&port, "port", 80, "the insecure http port")flag.IntVar(&tlsPort, "tls-port", 443, "the secure https port")flag.Parse()client, err := kubernetes.NewForConfig(getKubernetesConfig())if err != nil {log.Fatal().Err(err).Msg("failed to create kuebernetes client")}s := server.New(server.WithHost(host), server.WithPort(port), server.WithTLSPort(tlsPort))w := watcher.New(client, func(payload *watcher.Payload) {s.Update(payload)})var eg errgroup.Groupeg.Go(func() error {return s.Run(context.TODO())})eg.Go(func() error {return w.Run(context.TODO())})if err := eg.Wait(); err != nil { log.Fatal().Err(err).Send()} }Kubernetes 配置
有了服務器代碼,現在我們就可以在Kubernetes上用DaemonSet控制器來運行我們的Ingress Controller:(k8s-ingress-controller.yaml)
apiVersion: v1 kind: ServiceAccount metadata:name: k8s-simple-ingress-controllernamespace: default--- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1beta1 metadata:name: k8s-simple-ingress-controller rules:- apiGroups:- ""resources:- services- endpoints- secretsverbs:- get- list- watch- apiGroups:- extensionsresources:- ingressesverbs:- get- list- watch--- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1beta1 metadata:name: k8s-simple-ingress-controller roleRef:apiGroup:rbac.authorization.k8s.iokind: ClusterRolename: k8s-simple-ingress-controller subjects: - kind: ServiceAccountname:k8s-simple-ingress-controllernamespace: default--- apiVersion: extensions/v1beta1 kind: DaemonSet metadata:name: k8s-simple-ingress-controllerlabels:app: ingress-controller spec:selector:matchLabels:app: ingress-controllertemplate:metadata:labels:app: ingress-controllerspec:hostNetwork: truednsPolicy: ClusterFirstWithHostNetserviceAccountName: k8s-simple-ingress-controllercontainers:- name: k8s-simple-ingress-controllerimage: cnych/k8s-simple-ingress-controller:v0.1ports:- name: httpcontainerPort: 80- name: httpscontainerPort: 443由于我們要在應用中監聽Ingress、Service、Secret這些資源對象,所以需要聲明對應的RBAC權限,這樣當我們的請求到達Ingress? Controller的節點后,然后根據Ingress對象的規則,將請求轉到對應的Service上就完成了服務暴露的整個規程。
直接創建上面我們自定義的Ingress Controller的資源清單:
$ kubectl apply -f k8s-ingress-controller.yaml $ kubectl get pods -l app=ingress-controller NAME READY StatuS RESTARTS AGE k8s-simple-ingress-controller-694df987c7-h2qlc 1/1 Running 0 8m59s然后為我們最開始的whoami服務創建一個Ingress對象:(whoami-ingress.yaml)
apiVersion: extensions/v1beta1 kind: Ingress metadata:name: whoami spec:rules:- host: who.qikqiak.comhttp:paths:- path: /backend:serviceName: whoamiservicePort: 80kubectl apply -f whoami-ingress.yaml
然后將域名who.qikqiak.com解析到我們不熟的Ingress Controller的Pod節點上,就可以直接訪問了:
$ kubectl logs -f k8s-simple-ingress-controller-694df987c7-h2qlc
5:37AM INF starting secure HTTP server addr=0.0.0.0:443
5:37AM INF starting insecure HTTP server addr=0.0.0.0:80
5:39AM INF proxying request backend=http://whoami:80 host=who.qikqiak.com path=/
到這里我們就完成了自定義一個簡單的Ingress?Controller,當然這只是一個最基礎的功能,在實際使用中還會有更多的需求,比如TCP的支持、對請求進行一些修改之類的,這就需要花更多的時間去實現了。
本文相關代碼都整理到了 GitHub 上,地址:https://github.com/cnych/kubernetes-simple-ingress-controller。
參考鏈接
-
https://kubernetes.io/docs/concepts/services-networking/ingress-controllers/
-
http://www.doxsey.net/blog/how-to-build-a-custom-kubernetes-ingress-controller-in-go
總結
以上是生活随笔為你收集整理的使用Golang自定义Kubernetes Ingress Controller的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 渗透测试技巧之Redis漏洞利用总结
- 下一篇: Kubernetes NodePort