k8s kubebuilder系列开发 — 编写自定义资源和Reconciliation循环
原文轉自公眾號云原生CTO
這是 Kubernetes Operator Dev N 部分系列的第三部分。在本文中,我們將深入探討如何編寫自定義資源和operator/控制器的reconciliation協調循環。
我強烈建議您快速瀏覽文章/前面的一部分,因為它深入kubebuilder本文建議的這是一個先決條件,在前面的部分,我提出,引導一個operator叫做PostgresWriter在這篇文章中,我們將operator的reconciliation協調循環進行編碼。
但是,如果您感到繁瑣,我將在下一節為您總結PostgresWriter operator,這樣您就可以獲得足夠的上下文,而不必完全依賴上一篇文章了:)
讓我們設置一些上下文
如果您已經了解Reconciliation循環和PostgresWriter自定義資源/operator(我在前一部分中解釋過了),或者您已經閱讀了本系列文章的第一部分和第二部分,那么您完全可以跳過本部分。
我們的示例operator,即PostgresWriter同樣,如果您已經閱讀了本系列文章的第二部分(上一部分),可以隨意跳過這部分。
我們將要構建的operator將被稱為PostgresWriter。這個想法很簡單。比方說,我們在世界的某個角落有一個Postgres DB。我們的集群將有一個名為“postgresql -writer”的自定義資源。
與“postgresql -writer”資源相關的清單如下:
解析正在創建的傳入“postgres_writer”資源的spec,并識別table,name,age和country。
以<傳入的postgresql資源>的命名空間/<傳入的postgresql資源>的名稱(在本例中為default/sample-student)的格式形成一個與上面傳入的" postgresql -writer "資源對應的唯一id。
因為在Kubernetes中,對于某種資源類型(在我們的例子中是PostgresWriter),名稱空間/名稱組合在集群中總是唯一的。
在名為spec的表中,在Postgres DB中插入一個新行。表和相應的spec.name, spec.age和spec.country字段,主鍵將是我們形成的上面唯一的id(傳入資源的名稱空間/名稱)。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-2gTDsEm7-1646976795925)()]
同時,每當PostgresWriterresource像上面的將被刪除,相應我們的operator將刪除的行對應資源從我們PostgresDB以保持我們的PostgresDB和PostgresWriter資源出現在我們的集群相互一致。
對于上面的示例,如果我們使用kubectl刪除上面的sample-studentPostgresWriter資源,那么operator將刪除id default/sample-student對應的行。
這將確保對于我們集群中的每個PostgresWriter資源,在我們的PostgresDB中都有一行,僅此而已。
Reconciliation 循環
Kubernetes本質上是一個系統,本質上是聲明性的,這意味著它由用戶提供一個所需的狀態/規范,并且它嘗試根據當前的現實來匹配所需的狀態。協調當前狀態和期望狀態之間的漂移的艱苦工作是由稱為控制器的組件完成的。
這些控制器具有“reconciliation循環”(也稱為“控制循環”),它會不斷運行并查看集群的當前狀態和期望的狀態,每當它們之間發生漂移時,它們就會嘗試采取某些行動將當前狀態恢復到期望的狀態。
在我們的例子中,在Kubernetes看來,我們的operator也只是一個控制器。它正在觀察一個名為PostgresWriter的自定義資源,捕獲傳入的資源規范(期望的狀態),并執行一些神奇的操作(在Postgres DB中寫入行)以匹配當前的現實和期望的狀態。
例如:我希望您知道Kubernetes資源Deployment。它的關鍵特性之一是在指定數量的Pods(副本)上運行工作負載/應用程序,并確保始終運行指定數量的Pods。因此,假設部署應該在它的保護傘下運行5個副本,并假設您刪除了它的兩個pod。作為Reconciliation循環的一部分,部署控制器當前狀態之間要注意這個漂移(3pods)和期望狀態(5pods),這是需要創建兩個新的pod的行動(因為3 + 2 = 5)匹配/協調當前狀態所需的狀態。
在這篇文章中我們要做什么?
在前一篇文章中,我們在kubebuilder的幫助下,用一些樣板代碼引導了PostgresWriter operator。因此,在本文中,我們將做以下事情: 我們將訪問api/v1/postgreswriter_types。然后設置與PostgresWriter資源對應的結構體。
我們將編寫所有與PostgresDB連接和插入/刪除相關的輔助函數,因為這是我們的operator將執行的核心功能。
最后,我們將深入到controllers/postgreswriter_controller.go,編輯Reconcile()方法,并編寫出我們operator背后的主要最終邏輯/代碼。
重要提示:在本文中,我不會編寫reconciliation循環來標識PostgresWriter資源的刪除,并足夠聰明地從Postgres DB中相應地刪除該行(對應于已刪除資源)。
我將在下一篇文章中專門討論這個問題,因為這需要深入解釋謂詞、終結器以及將多個協調器連接到同一個控制器/operator,我希望在單獨的一篇文章中順利地專門討論這些主題。所以,請原諒我,?
就讓我們一探究竟吧!
剖析我們的定制資源
首先,讓我們把我們期望從oeprator那里得到的一切。
我們的自定義資源的實例看起來像這樣:
所以這些是以下要求:
- 它的“Group”是demo.yash.com,“Version”是v1。
- 它的“kind”是PostgresWriter。
- 它下面有一個必需的節規范。
- spec部分下面有四個必需字段:table(string)、name(string)、age(整數)和country(string)。
我在前一部分即本系列文章的第二部分解釋了GVK即Group-Version-Kind。
轉到api/v1/postgreswriter_types.go。這個文件將包含與我們的自定義資源將要遵循的結構相對應的所有結構。我們去自上而下。讓我們首先在自定義資源的根中定義所需的spec字段。
//+kubebuilder:object:root=true //+kubebuilder:subresource:status// PostgresWriter is the Schema for the postgreswriters API type PostgresWriter struct {metav1.TypeMeta `json:",inline"`metav1.ObjectMeta `json:"metadata,omitempty"`Spec PostgresWriterSpec `json:"spec"`Status PostgresWriterStatus `json:"status,omitempty"` }現在,讓我們在spec下定義需要的table , name , age 和 country .
// PostgresWriterSpec defines the desired state of PostgresWriter //+kubebuilder:validation:Required type PostgresWriterSpec struct {//+kubebuilder:validation:Required//+kubebuilder:validation:Type=stringTable string `json:"table"`//+kubebuilder:validation:Required//+kubebuilder:validation:Type=integer//+kubebuilder:validation:Minimum=0Age int `json:"age"`//+kubebuilder:validation:Required//+kubebuilder:validation:Type=stringName string `json:"name"`//+kubebuilder:validation:Required//+kubebuilder:validation:Type=stringCountry string `json:"country"` }所有這些注釋看起來像//+kubebuilder:validation....被稱為CRD驗證標記,kubebuilder在生成我們的CRD時將識別這些標記,并相應地將相關過濾器和條件應用到我們的最終CRD。要查看所有其他CRD驗證標記
請參閱 :https://book.kubebuilder.io/reference/markers/crd-validation.html
我們已經定義了PostgresWriter自定義資源的結構,現在讓我們生成CRD。
運行如下命令生成CRD。
make manifests這將在config/CRD/bases/處生成你的CRD您將注意到生成的CRD已經指定了spec和age,country, name, table作為必需字段,并且它已經正確地定義了這些單獨字段的類型,甚至在age字段下,還指定了最小值:0。這都要感謝Kubebuilder的CRD驗證標記
編寫Postgres的代碼
本質上,我們的operator的核心是執行兩個任務:
- 當創建PostgresWriter實例時,在PostgresDB上運行INSERT查詢。
- 當PostgresWriter的一個實例被刪除時,在PostgresDB上運行一個DELETEquery。
為了讓它能夠執行與Postgres相關的功能,讓我們創建一個pkg/psql/psql.go文件和編碼出所有與Postgres相關的核心功能。
讓我們從聲明包并導入所有相關庫開始。
package psqlimport ("database/sql""fmt"_ "github.com/lib/pq" // no use on our end, just sets up the relevant drivers to be used by "database/sql" package )讓我們定義一個名為PostgresDBClient的結構體,它的實例將是我們的operator和DB之間的通信點。
type PostgresDBClient struct {host stringport intdbname stringuser stringpassword stringDbConnection *sql.DB }現在,除了DbConnection. 該對象(sql.DB實際上是指向該對象的指針)將用于與我們的數據庫對話并對其執行查詢。
但是為了在一開始設置它,甚至在需要時返回它(DbConnection),讓我們定義一個方法setupAndReturnDbConnection()
// If need be, setups a new DbConnection connection for a PostgresDBClient object (and returns it), else just returns the existing one functional one // Not setting up a new connection blindly everything this method is called so as to maintain idempotency func (pc *PostgresDBClient) setupAndReturnDbConnection() (*sql.DB, error) {// if the .DbConnection attribute is nil, then we clearly need to setup a new connection as none exists presently// else if it exists but has some other issues (basically, some error occurred while "Ping"ing the DB with current connection), then setup a new connection// PS: expiry of connection won't be a concern because .Ping() not only checks the connection but resets the connections too automatically with new connections, if need be.// Ref: https://cs.opensource.google/go/go/+/refs/tags/go1.17.1:src/database/sql/sql.go;l=868setupNewConnection := falseif pc.DbConnection == nil {setupNewConnection = true} else if err := pc.DbConnection.Ping(); err != nil {setupNewConnection = true}if setupNewConnection {psqlInfo := fmt.Sprintf("host=%s port=%d user=%s "+"password=%s dbname=%s sslmode=disable",pc.host, pc.port, pc.user, pc.password, pc.dbname)newDbConnection, err := sql.Open("postgres", psqlInfo)if err != nil {return nil, err}pc.DbConnection = newDbConnection}return pc.DbConnection, nil }這個方法是冪等的,因為它不會在每次調用時盲目地創建新的DB連接。這個方法計算PostgresDBClient對象并檢查其現有的DbConnection屬性,當且僅當該屬性不存在或有其他問題時,將建立一個新的連接并返回。因此,即使重復調用它也不會導致不必要地創建多個不同的結果/連接。
現在,讓我們定義一個用于執行INSERT查詢的方法。
// Insert inserts a row into the DB to which the receiver PostgresDBClient points func (pc *PostgresDBClient) Insert(id, table, name string, age int, country string) error {dbConnection, err := pc.setupAndReturnDbConnection()if err != nil {return err}insertQuery := fmt.Sprintf("INSERT INTO %s (id, name, age, country) VALUES ('%s', '%s', %d, '%s') ON CONFLICT (id) DO NOTHING;", table, id, name, age, country)if _, err := dbConnection.Exec(insertQuery); err != nil {return err}return nil }讓我們定義一個用于執行DELETE查詢的方法。
// Delete deletes row from the DB to which the receiver PostgresDBClient points func (pc *PostgresDBClient) Delete(id, table string) error {dbConnection, err := pc.setupAndReturnDbConnection()if err != nil {return err}deleteQuery := fmt.Sprintf("DELETE FROM %s WHERE id='%s';", table, id)if _, err := dbConnection.Exec(deleteQuery); err != nil {return err}return nil }最后,讓我們定義一個構造函數(kind),它將返回一個成熟的PostgresDBClient對象,并提供必要的參數。讓我們定義一個Close()方法,它將被用來優雅地清理PostgresDBClient對象,以避免任何潛在的內存泄漏。
// NewPostgresDBClient acts like a constructor to generating a new object of PostgresDBClient with a DbConnection as well func NewPostgresDBClient(host string, port int, dbname, user, password string) (*PostgresDBClient, error) {pc := &PostgresDBClient{host: host,port: port,dbname: dbname,user: user,password: password,}if _, err := pc.setupAndReturnDbConnection(); err != nil {return nil, err}return pc, nil }// Close is used to gracefully close and wrap up the DbConnection associated with the PostgresDBClient object so as to avoid any associated memory leaks func (pc *PostgresDBClient) Close() {pc.DbConnection.Close() }現在,我們已經完成了圍繞Postgres的核心功能的設置,可以開始編寫operator了。但在此之前,讓我們快速了解一下圍繞operator的現有引導代碼(僅是有用的部分)。
理解我們的引導控制器/operator
讓我們探索文件的controllers/postgreswriter_controller.go某些部分,main.go看看我們的operator是如何被實例化和執行的。
讓我們從controllers/postgreswriter_controller.go開始
你會注意到PostgresWriterReconciler結構。這個結構實際上代表了我們的operator。它將實現協調器接口和Reconciliation循環,當PostgresWritercustom資源發生什么事情時,Reconciliation循環將被觸發。它還能夠與PostgresDB通信,并運行相應的INSERT或DELETE查詢。根據其擁有的權限,它可以與集群通信、訪問甚至operator源。
// PostgresWriterReconciler reconciles a PostgresWriter object type PostgresWriterReconciler struct {client.ClientScheme *runtime.Scheme }- client.Client — 只要我們的operator (PostgresWriterReconciler) 想要與我們的 Kubernetes 集群對話并對其執行任何 CRUD,就會使用此屬性。因此,諸如getting/updating/deleting/creating之類的任何事情都將由client.Client.
- Scheme — 該屬性用于通過管理器將 struct/type 注冊PostgresWriter到其對應的 GVK(Group: demo.yash.com, V: v1, Kind: PostgresWriter)。它主要在幕后被不同的客戶端和緩存使用,以將 Go 類型/結構與其對應的 GVK 關聯和關聯。
現在,繼續進行,您會注意到該Reconcile()方法。該方法將包含我們operator的reconciliation循環背后的源代碼。這是我們將在需要時對我們的operator進行編程以運行 PostgresDB 查詢的地方。每當 PostgresWriter 自定義資源發生諸如創建/更新/刪除之類的事情時,就會觸發此方法。
但是誰能確保在我們的集群中當PostgresWriter資源發生問題時調用這個方法?
接下來是SetupWithManager方法。SetupWithManager(mgr ctrl.Manager)——仔細查看代碼中是如何定義的。這幾乎不言自明。它確保了無論什么時候PostgresWrite資源(For(&demov1.PostgresWriter{}))發生任何事情,調用我們的operator/PostgresWriterReconciler (Complete(r),其中r是*PostgresWriterReconciler)的Reconcile()方法。
// SetupWithManager sets up the controller with the Manager. func (r *PostgresWriterReconciler) SetupWithManager(mgr ctrl.Manager) error {return ctrl.NewControllerManagedBy(mgr).For(&demov1.PostgresWriter{}).Complete(r) }我們已經完成了controllers/postgreswriter_controller.go,所以讓我們深入了解一下main.go
func main() {......if err = (&controllers.PostgresWriterReconciler{Client: mgr.GetClient(),Scheme: mgr.GetScheme(),}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "PostgresWriter")os.Exit(1)}//+kubebuilder:scaffold:builder...... }我們的控制器/operator ( PostgresWriterReconciler) 正在實例化并與管理器連接/設置。
因此,無論何時,它執行時,我們的operator也將作為它的一部分啟動。
與我們的operator一起設置PostgresDB客戶端策略
從高效的角度來看,我們想要的是,對于我們的 operator 的一個實例,PostgresDBClient應該只創建一個object實例并用于處理與我們的 PostgresDB的所有通信,而不是盲目地為每個協調循環創建一個 PostgresDBClient和連接.
因此,讓我們從修改引導operator的一段代碼開始,以符合上述每當operator被實例化時實例化一個 Postgres 客戶端的情況。
您可以在代碼中對 Postgres執行大量進一步調整,例如管理連接池、連接超時、空閑超時等。但我們不會深入研究這些內容。本文重點介紹 Kubernetes operator dev😃
修改我們的自舉控制器/operator
我們將對上述兩個文件進行一些修改,以使該operator更符合我們的用例并可操作。我們希望我們的operator在啟動時綁定到一個單獨的 PostgresDB 連接。我們不希望它在每次協調某些事情時都創建/銷毀/重新創建 Postgres 數據庫連接。
首先,讓我們去controllers/postgreswriter_controller.go編輯里面的PostgresWriterReconciler結構體,甚至有一個PostgresDBClient對象作為屬性,當我們的operator啟動時,它只會在開始時被實例化一次。這將確保每當我們的operator(控制器)的協調器運行時,它PostgresDBClient每次都會使用現有對象與我們的PostgresDB 主機對話,而不是每次都創建一個新PostgresDBClient對象和連接,這會導致響應時間變慢和潛在的內存泄漏。
// PostgresWriterReconciler reconciles a PostgresWriter object type PostgresWriterReconciler struct {client.Client // already existedScheme *runtime.Scheme // already existedPostgresDBClient *psql.PostgresDBClient // newly added by me, this will be used by our operator to talk }現在,讓我們回到main.go我們的operator被實例化的地方,讓我們擴充它以定義并附PostgresDBClient加到我們的operator(PostgresWriterReconciler 的)實例。
首先,讓我們從環境變量中解析 Postgres 相關的配置并捕獲它們。
var (psqlHost stringpsqlPort intpsqlDbname stringpsqlUser stringpsqlPassword string )func init() {utilruntime.Must(clientgoscheme.AddToScheme(scheme))utilruntime.Must(demoyashcomv1.AddToScheme(scheme))//+kubebuilder:scaffold:schemepsqlHost, psqlDbname, psqlUser, psqlPassword = os.Getenv("PSQL_HOST"), os.Getenv("PSQL_DBNAME"), os.Getenv("PSQL_USER"), os.Getenv("PSQL_PASSWORD")var err errorpsqlPort, err = strconv.Atoi(os.Getenv("PSQL_PORT"))if err != nil {setupLog.Error(err, "unable to setup the operator, error occurred while parsing the Postgres config.")os.Exit(1)} }現在,跳轉到main()我們PostgresWriterReconciler正在實例化并與管理器一起設置的函數部分。我們將編輯它以包含 PostgresDBClient 對象。
func main() {......//newly added sectionpostgresDbClient, err := psql.NewPostgresDBClient(psqlHost, psqlPort, psqlDbname, psqlUser, psqlPassword)if err != nil {setupLog.Error(err, "unable to setup the Postgres DB Client")os.Exit(1)}// cleanup the postgresDbClient object and connection, if our operator were to stop/crash for any reason.defer postgresDbClient.Close()// already existed section (except one line)if err = (&controllers.PostgresWriterReconciler{Client: mgr.GetClient(),Scheme: mgr.GetScheme(),PostgresDBClient: postgresDbClient, //newly added line}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "PostgresWriter")os.Exit(1)}//+kubebuilder:scaffold:builder...... }現在,讓我們對operator的主要部分進行編碼
編碼我們operator的心臟(或大腦?)
在我們繼續之前,我想再次提到,在本文中,我將要編寫的operator部分將只處理INSERT在我們的 PostgresDB 上運行查詢。分別捕獲“Create”和“Delete”事件并為它們運行不同的代碼的方面需要我解釋謂詞、終結器和附加多個協調器的概念,我認為這篇文章一定已經很沉重了你們現在 不過別擔心,我將在本系列的下一篇文章中全面介紹所有這些內容:)
所以,如果你看一下controllers/postgreswriter_controllers.go文件,看看下一段代碼SetupWithManager在它的方法,你會發現這段代碼是確保一旦有事情有發生PostgresWriter的資源,包括它正在創建的Reconcile()方法PostgresWriterReconciler(我們的operator) 將被調用。驚人的!現在考慮到這一點,讓我們編寫Reconcile()具有每當PostgresWriter創建資源時需要執行的功能的方法,即
- 從傳入資源中獲取table, name, age,country字段PostgresWriter。
- 形成與該傳入資源相對應的唯一 id。namespace/name該傳入資源的。
- 將上述變量發送到PostgresDBClient.Insert()方法以在我們的數據庫上運行INSERT查詢。
我知道,我知道我在這里犯了一個錯誤
我知道當前編碼的方式SetupWithManager和Reconcile方法,它們會導致我們的operator盲目地捕獲任何類型的事件,例如使用PostgresWriter資源創建、更新或刪除,并運行該Reconcile()方法以響應該事件,該方法當前INSERT僅運行該操作。我知道這絕對是可怕的,因為INSERT即使在更新和刪除任何PostgresWriter資源時也會運行相同的操作。
理想情況下,我們的operator應該足夠聰明,只捕獲PostgresWriter資源的“創建”事件并INSERT在我們的數據庫中運行查詢代碼。并且我們的operator應該更聰明,能夠識別資源的“刪除”事件,PostgresWriter并相應地DELETE在我們的數據庫中運行查詢,以保持集群PostgresWriter資源和 Postgres 數據庫之間的一致性(正如我在本文開頭提到的)。
但是實現這些功能將涉及處理謂詞、終結器和附加多個協調器。正如我在上一節中提到的,我很樂意在下一篇文章中專門介紹這一點(并很快發布)。所以,請繼續關注它,它會非常有趣
讓我們運行它🤞
首先,根據您的 Postgres DB 的配置設置所有環境變量,例如主機、端口、數據庫、用戶名和密碼。
至于我自己,我主持一個PostgresDB叫postgres上54.166.146.81它可以通過用戶訪問postgres和密碼password
export PSQL_HOST=54.166.146.81 export PSQL_PORT=5432 export PSQL_DBNAME=postgres export PSQL_USER=postgres export PSQL_PASSWORD=password現在,雖然我們之前已經運行了以下命令,但再次運行它(以防萬一您錯過了),以便根據我們代碼的最新狀態設置 CRD。
make manifests現在,讓我們部署我們的 CRD。
make install最后,讓我們運行我們的operator。
make run您將開始看到如下日志:
為了測試,讓我們編寫以下內容 sample.yaml
apiVersion: demo.yash.com/v1 kind: PostgresWriter metadata:name: sample-studentnamespace: default spec:table: studentsname: alexage: 23country: canada在kubectl apply -ing 這個資源 YAML時,我們的operator的Reconcile()方法被觸發并導致在我們的Postgres數據庫中的表中插入一行,students其中包含 iddefault/sample-student和提供的名稱、age和/country。
因此,在應用上述示例之前。這是我的DB中的studentstable的樣子:
現在運行它
kubectl apply -f sample.yaml現在,如果我檢查我的數據庫
我們的operator插入的行在正確的值下可見
好極了!有用!
尾注
非常感謝您來到這里,我知道這是一篇很長的文章,但好吧,但這是 Kubernetes Operator dev 開發部分,文章會稍許長些,主要希望可以完成教大家如何運行
在下一篇文章中,我們將通過添加捕獲和處理PostgresWriter資源的“delete”事件的能力來完成“PostgresWriter”operator的開發。我們將探索終結器、謂詞以及將多個協調器與一個控制器/operator相關聯的概念。
5.3 參考資料
github[1]
參考資料
[1]
github: https://github.com/yashvardhan-kukreja
總結
以上是生活随笔為你收集整理的k8s kubebuilder系列开发 — 编写自定义资源和Reconciliation循环的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: k8s kubebuilder系列开发
- 下一篇: linux fedora35 buff/