python dag调度系统开发_基于机器学习的DAG调度平台
什么是DAG?
有向無環(huán)圖
樹形結(jié)構(gòu):除根節(jié)點,每個節(jié)點有且僅有一個上級節(jié)點,下級節(jié)點不限。根節(jié)點沒有上級節(jié)點。
圖結(jié)構(gòu):每個節(jié)點上級、下級節(jié)點數(shù)不限。
DAG調(diào)度平臺的定義及場景
任務(wù)調(diào)度是在各行各業(yè)是個基礎(chǔ)問題,當(dāng)任務(wù)復(fù)雜同時存在任務(wù)復(fù)雜依賴的時候,就需要DAG調(diào)度。如:機(jī)器學(xué)習(xí)的可視化建模(PAI平臺、第四范式),數(shù)據(jù)的抽取、轉(zhuǎn)換、加載(ETL),在業(yè)務(wù)復(fù)雜情況下就需要DAG的調(diào)度管理等
接下來說說基于機(jī)器學(xué)習(xí)的DAG調(diào)度平臺
系統(tǒng)架構(gòu)
構(gòu)圖:
系統(tǒng)架構(gòu)說明
DAG調(diào)度平臺主要的職責(zé)是:
1.接受機(jī)器學(xué)習(xí)web傳過來的yaml文件(dag定義文件)
2.解析yaml文件,變成結(jié)構(gòu)化數(shù)據(jù)存儲到mysql數(shù)據(jù)庫
3.開始調(diào)度dag定義各個算子任務(wù)
4.算子執(zhí)行引擎根據(jù)算子類型分發(fā)到各個環(huán)境進(jìn)行執(zhí)行
名詞說明
yaml:類型XML的數(shù)據(jù)描述語言,語法更加簡單
算子:機(jī)器學(xué)習(xí)的DAG中各個節(jié)點即為算子,在算子執(zhí)行引擎中稱為算子任務(wù)。算子背后是python實現(xiàn)的一些算法組件
1.機(jī)器學(xué)習(xí)前端交互
機(jī)器學(xué)習(xí)平臺前端主要是將機(jī)器學(xué)習(xí)的流程裝成一個dag,定義各個算子的出入?yún)?#xff0c;以及算子的配置參數(shù),組裝成一個yaml文件,傳給DAG調(diào)圖平臺(Azkaban是zip方式交互,Airflow是通過py文件定義,Oozie通過xml)。
一個完整的DAG定義應(yīng)包含以下算子:
數(shù)據(jù)讀取/數(shù)據(jù)預(yù)處理
特征功能
模型訓(xùn)練
模型預(yù)測
模型評估
模型部署
下圖是個簡化版的DAG定義,除去了模型部署算子
2.DAG調(diào)度平臺各模塊介紹
dag engine(圖引擎):
負(fù)責(zé)解析傳入的yaml文件。根據(jù)yaml的配置生成算子的出入?yún)⒁约斑\行配置信息保存到數(shù)據(jù)庫。同時負(fù)責(zé)任務(wù)的調(diào)用。
opertor engine(算子執(zhí)行引擎):
負(fù)責(zé)算子執(zhí)行,根據(jù)算子類型分發(fā)到不同的執(zhí)行器中。統(tǒng)一的啟停接口,日志查詢接口,任務(wù)狀態(tài)查詢接口
executor(執(zhí)行器):
local executor(本地執(zhí)行器):
執(zhí)行單機(jī)的python任務(wù),執(zhí)行單機(jī)文件方式的機(jī)器學(xué)習(xí)算法。當(dāng)沒有大數(shù)據(jù)平臺的時候,只能通過本地執(zhí)行器執(zhí)行DAG
dc executor(分布式計算平臺執(zhí)行器):
將python算法發(fā)送至大數(shù)據(jù)計算平臺,使用大數(shù)據(jù)平臺資源運行算子。
base executor (執(zhí)行器接口):
以后的執(zhí)行器實現(xiàn)需要實現(xiàn)這個基類,方便拓展。
3.分布式計算平臺交互
針對不同的的計算平臺實現(xiàn)base executor去自定義擴(kuò)充。本系統(tǒng)通過dc executor實現(xiàn),
分布式計算平臺需要將python code通過http接口發(fā)送過去進(jìn)行執(zhí)行。
部署架構(gòu)圖
separation方式
mixture方式
實現(xiàn)細(xì)節(jié)
yaml定義格式
dag:
operator_list: [algo_local_read_file_45_1517360824080,algo_local_split_data_45_1517360836712,algo_local_model_2c_l_45_1517362008544,algo_local_model_predict_45_1517362016532,algo_local_model_2c_eval_45_1517362022452,algo_local_model_gbdt_111_1517801573063]
operator_rels:
algo_local_read_file_45_1517360824080: [{"target":"algo_local_split_data_45_1517360836712","source_index":0,"target_index":0}]
algo_local_split_data_45_1517360836712: [{"target":"algo_local_model_2c_l_45_1517362008544","source_index":0,"target_index":0},{"target":"algo_local_model_gbdt_111_1517801573063","source_index":1,"target_index":0}]
algo_local_model_predict_45_1517362016532: [{"target":"algo_local_model_2c_eval_45_1517362022452","source_index":0,"target_index":0}]
algo_local_model_gbdt_111_1517801573063: [{"target":"algo_local_model_predict_45_1517362016532","source_index":0,"target_index":0}]
algo_local_model_2c_l_45_1517362008544: [{"target":"algo_local_model_predict_45_1517362016532","source_index":0,"target_index":1}]
operator_details:
algo_local_read_file_45_1517360824080:
algo_name: algo_local_read_file
data_type: 本地python
type: 數(shù)據(jù)源
cn_name: 讀文件
coordinate:
x: 137
y: 69
params:
data_id: 40
algo_local_split_data_45_1517360836712:
algo_name: algo_local_split_data
data_type: 本地python
type: 數(shù)據(jù)預(yù)處理
cn_name: 拆分組件
coordinate:
x: 226
y: 164
params:
split_type: 1
ext1: 0.8
ext2: null
algo_local_model_2c_l_45_1517362008544:
algo_name: algo_local_model_2c_l
data_type: 本地python
type: 模型算法
cn_name: 邏輯回歸二分類
coordinate:
x: 130
y: 262
params:
x_cols: [LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6]
y_col: next_month
pre_value: 1
penalty: l2
C: 1
max_iter: 100
senior: true
class_weight: null
dual: false
fit_intercept: true
intercept_scaling: 1
multi_class: ovr
n_jobs: 1
random_state: null
solver: liblinear
tol: 0.0001
verbose: 0
warm_start: false
algo_local_model_predict_45_1517362016532:
algo_name: algo_local_model_predict
data_type: 本地python
type: 模型預(yù)測
cn_name: 模型預(yù)測
coordinate:
x: 258
y: 396
params:
x_cols: [LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6]
algo_local_model_2c_eval_45_1517362022452:
algo_name: algo_local_model_2c_eval
data_type: 本地python
type: 模型評估
cn_name: 二分類評估
coordinate:
x: 270
y: 503
params:
y_col: next_month
pre_col: predict_result
pre_value: 1
algo_local_model_gbdt_111_1517801573063:
algo_name: algo_local_model_gbdt
data_type: 本地python
type: 模型算法
cn_name: GBDT
coordinate:
x: 432.1111111111111
y: 295.3333333333333
params:
x_cols: [LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6]
y_col: next_month
pre_value: 1
n_estimators: 10
max_depth: 5
senior: true
criterion: friedman_mse
init: null
learning_rate: 0.1
loss: deviance
max_features: null
max_leaf_nodes: null
min_impurity_decrease: 0
min_impurity_split: null
min_samples_leaf: 1
min_samples_split: 2
min_weight_fraction_leaf: 0
presort: auto
random_state: null
subsample: 1
verbose: 0
warm_start: false
params:
translate: [41,-20]
scale: 0.9
dag engine實現(xiàn)邏輯
1.當(dāng)前節(jié)點,采用廣度優(yōu)先遍歷獲取所有需要執(zhí)行的算子(節(jié)點)信息。
2.輪詢所有算子(節(jié)點),判斷上算子(節(jié)點)是否全部執(zhí)行完成,執(zhí)行完成開始執(zhí)行當(dāng)前算子(節(jié)點)。
3.發(fā)送請求到operator engine開始執(zhí)行當(dāng)前算子(節(jié)點)任務(wù)。
operator engine實現(xiàn)邏輯
1.主進(jìn)程接受task請求,添加任務(wù)執(zhí)行隊列、任務(wù)監(jiān)聽隊列。
2.任務(wù)執(zhí)行進(jìn)程輪詢接受到的隊列,根據(jù)不同任務(wù)類型調(diào)用不同executor
3.任務(wù)監(jiān)聽進(jìn)程輪詢接受到的隊列,調(diào)用不同executor查詢?nèi)蝿?wù)執(zhí)行狀態(tài),是任務(wù)執(zhí)行的最終狀態(tài)(成功、失敗)回調(diào)dag engine
local executor實現(xiàn)邏輯
1.local executor接受任務(wù),發(fā)送到隊列中。
2.local worker進(jìn)程池(cpu數(shù)*2個進(jìn)程),輪詢獲取隊列中任務(wù),使用importlib的python去執(zhí)行對應(yīng)算子。
總結(jié)
以上是生活随笔為你收集整理的python dag调度系统开发_基于机器学习的DAG调度平台的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python调用matlab函数_从py
- 下一篇: 《数学分析》里的人生