瓜子云的任务调度系统
瓜子云的任務調度系統結合了Kubernetes的Job和Airflow。
Airflow是Airbnb出品的任務調度系統,支持DAG調度,這一點完美的彌補了Kubernetes Job的不足。借助Kubernetes的自動擴展,集群資源統一管理,Airflow將更具靈活性,更穩定。但是,把Airflow部署在Kubernetes上是一個很大的挑戰。
接下來我講詳細介紹一下瓜子云的任務調度系統搭建所遇到的問題和解決方案。
瓜子最早的時候,任務調度用的是Crontab,后來由于數據倉庫的復雜調度需求,我們引入了Airflow。Airflow支持DAG依賴,失敗重試,歷史狀態記錄,log收集等多種非常使用的功能。
Airflow有很多問題:
-
Airflow的Worker需要手動搭建,可擴展性不好。
-
Job代碼更新之后,需要手動部署到Worker上,非常繁瑣。
-
Airflow Worker的環境太多,由各個團隊自行維護,維護成本太高。
-
瓜子云平臺搭建之后,所有機器都會被回收,各業務線擁有的機器將會很少,Worker將會沒有地方部署。
此外,我們還希望調度系統有如下的功能:
-
DAG之間的依賴
因為數據倉庫的ETL非常復雜,沒有任何人能夠完全掌控整個流程,我們需要把整個ETL切成很多小的DAG,這些DAG之間是有互相依賴的。
-
自動擴容縮容
瓜子這樣的特點,晚上有大量批量任務需要跑,白天每個小時,每一分鐘都會有增量任務需要跑。
-
環境隔離
瓜子的語言多種多樣,每個團隊都有很多不同的Job在不同的環境上跑著,管理很混亂,還有可能互相影響。
介于這樣的問題,我們準備把調度系統部署到Kubernetes上,利用Kubernetes的環境隔離,自動擴容縮容的特性。
Airflow分為Master節點和Worker節點兩種。Master節點有Scheduler和Web兩種服務,Worker上有Executor一種服務。
我們從任務的調度過程來看看他們是怎么工作的:
Scheduler讀取DAG配置文件,將需要執行的Job信息發給RabbitMQ,并且在MySQL里面注冊Job信息。
RabbitMQ里面按照環境有很多channel,Scheduler的Job會根據需要執行的環境發到相應的channel里面。
Executor消費RabbitMQ相應的channel,進行執行,執行結果更新到MySQL中,并將log暴露到Executor的某個http端口上調用,并存入MySQL中。
Web讀取MySQL里面的Job信息,展示Job的執行結果,并從MySQL中獲取log的url,展示log。
Web上發現執行錯誤的Job可以點擊重試,直接發送Job給RabbitMQ里,并改變MySQL里面Job的狀態。
Airflow上云有很多問題,我們這里只列舉一些比較麻煩的問題來說一下。
Airflow不支持多個Scheduler,多個Scheduler一起啟動時會報錯,所有Scheduler都會掛掉。當我們在Kubernetes上滾動更新時,需要先啟動一個新的Scheduler,然后再干掉舊的Scheduler。這樣就不可避免會出現多個Scheduler并行的情況。
2、配置更新
Job配置更新后,所有組件自動更新最新配置的問題。Airflow中所有組件都需要拿到DAG配置才能正常工作。其實原理上大可不必,可能是Airflow設計的時候沒考慮到分別部署的情況吧。
3、Web訪問Worker
Web需要通過Worker的HOSTNAME來訪問Worker上的log,但是Kubernetes中不支持通過HOSTNAME來訪問。
4、Worker不同環境
Job需要在不同環境中執行,不可能在Kubernetes中為所有環境單獨搭建長期運行的Pod。
我們引入了ZooKeeper,在Airflow Scheduler啟動時去監聽ZooKeeper下的/airflow/scheduler。
如果發現下面有個running的key,就說明已經有Scheduler在運行了,然后一直監聽,直到running timeout。
如果發現沒有,就可以啟動Scheduler,然后在/airflow/scheduler下注冊running,把自己的信息,作為value。每隔5s注冊一下,該running timeout時間設為30s。
這樣就解決了HA的問題。
2、配置更新的問題
配置更新的配置流程為:
我們自己寫了一個Watcher的組件,通過連接Git的Webhooker,監聽git merge信息,一旦收到merge的信息,就會把Git的commit hash值存入etcd的/medusa/airflow/config 里面。
我們在Scheduler旁邊放一個sidecar —— Confd,兩個容器作為一個Pod,共享一個文件夾作為airflow的DAG配置文件夾。
Confd監聽etcd的/medusa/airflow/config key,發現更新就觸發git pull操作。
這樣子,我們就拿到了最新的配置文件。通過相同的方式部署Web和Worker即可。
3、Web訪問Worker的問題
這個問題,我們在Airflow源碼里面改了一點東西,用IP地址代替HOSTNAME解決了問題。
4、Worker不同環境
我們的解決方案是,不在Worker里面放任何環境,只負責由給定的image和script來生成Kubernetes job xml,并啟動Job和監控。我們將在下面重點介紹。
經過上述改動后,云上Airflow的架構就改成了下圖這樣:
整個任務調度流程為:
1、Scheduler讀取任務配置文件夾信息,發現有個任務需要執行。所有的執行命令都是:
這個樣子的,也就是所有任務都通過KJob來執行。
通過傳入的兩個參數image和script,生成job.yml
通過job.yml 啟動Kubernetes Job
一旦Job開始正常運行,監聽log
Job完成,獲取job的狀態并返回成功與否
這個樣子,我們就把環境依賴的事丟給開發者自行維護了。
這時的任務更新流程如下圖:
我們寫了一個med-sdk,其功能是把代碼打成Docker鏡像,并且push到Docker Registry里面。這里我就不詳細展開了,有興趣的可以看我的之前的分享。
詳細流程為:
如圖右側,任務代碼改動后,會自動觸發med-sdk構建Docker鏡像,并發布到Docker Registry里面,鏡像以latest作為version,確保每次都拉取最新版的鏡像。
如圖左側,Airflow配置改動后,Watcher會收到Git的merge信息,并更新ETCD。Scheduler,Worker會更新相應的配置文件。
Worker收到最新Job之后會拉取最新的鏡像部署服務。
整個Airflow上Kubernetes的難點算是處理完了。
Q:請問為什么要集成Kubernetes?
A:Airflow的Worker需要手動搭建,可擴展性不好;Job代碼更新之后,需要手動部署到Worker上,非常繁瑣;Airflow Worker的環境太多,由各個團隊自行維護,維護成本太高;云平臺搭建之后,所有機器都會被回收,各業務線擁有的機器將會很少,Worker將會沒有地方部署。
Q:Airflow處理的調度量是什么規模,也就是批量任務會不會阻塞,一次并發有多少Pod,多少容器實例,一套Kubernetes Master能否扛得住,方便給個數據量進行參考嗎?
A:目前瓜子每天有2000個任務。任務的執行地點都是在Kubernetes上的,不會阻塞。并發的Pod個數是由同時處理的Job數定的,Airflow的Worker有設置一個Worker可以同時跑幾個Job。我們并發Pod有20個。一套Kubernetes可以抗住我們的規模。數據量不好給,因為任務的計算量不好估算,有的大有的小。
Q:為什么不考慮Celery之類的任務隊列?
A:首先是我們之前選用的是Airflow,用Python寫的DAG,非常符合我們的需求,我們的DAG需求很大,比如數據倉庫,所以選擇了Airflow。
Q: 有做過類似軟件的對比么,差異在哪?
A:Kubernetes目前被Docker官方支持。Mesos用C寫的,不好運維。Rancher社區不夠大。其實功能大家都支持,主要是社區。
Q:并發的容器數量是多少,實際的Docker實例個數量級,20個Pod可大可小。方便給個參考嗎?謝謝!
A:我們測過每臺機的上限在100個,我們的機器是128G,24cores。我們Airflow的Worker有20個Pod。深入學習Kubernetes
本次培訓內容包含:Kubernetes架構、Kubernetes安裝、Kubernetes功能導覽、監控解決方案、Kubernetes高階——設計和實現、Kubernetes落地實踐等,點擊識別下方二維碼加微信好友了解具體培訓內容。
總結
以上是生活随笔為你收集整理的瓜子云的任务调度系统的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 硬盘对拷
- 下一篇: Python数据清洗与可视化——北京租房