Airflow安装与使用
生活随笔
收集整理的這篇文章主要介紹了
Airflow安装与使用
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
# Airflow 1.10+安裝
本次安裝Airflow版本為1.10+,其需要依賴Python和DB,本次選擇的DB為Mysql。
本次安裝組件及版本如下:Airflow == 1.10.0
Python == 3.6.5
Mysql == 5.7
# 整體流程
1. 建表
2. 安裝
3. 配置
4. 運行
5. 配置任務(wù)
```
啟動schedule
airflow scheduler -D
啟動webserver
airflow webserver -D
ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}
ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {}
## 建庫、建用戶
```
庫名為airflow
'create database airflow;'
建用戶
用戶名為airflow,并且設(shè)置所有ip均可以訪問。
create user 'airflow'@'%' identified by 'airflow';
create user 'airflow'@'localhost' identified by 'airflow';
用戶授權(quán)
這里為新建的airflow用戶授予airflow庫的所有權(quán)限
grant all on airflow.* to 'airflow'@'%';
flush privileges
```
## Airflow安裝
```
這里通過 virtualenv 進行安裝。
----- 通過virtualenv安裝
$ mkdir /usr/local/virtual_env && cd /usr/local/virtual_env # 創(chuàng)建目錄
$ virtualenv --no-site-packages airflow --python=python # 創(chuàng)建虛擬環(huán)境
$ source /usr/local/virtual_env/airflow/bin/activate # 激活虛擬環(huán)境
----- 安裝指定版本或者默認
$ pip install apache-airflow -i https://pypi.douban.com/simple
在安裝完一堆的依賴后,就需要配置 AIRFLOW_HOME 環(huán)境變量,后續(xù)的 DAG 和 Plugin 都將以該目錄作為根目錄查找,如上,可以直接設(shè)置為 /tmp/project 。
報錯
ERROR: flask 1.1.1 has requirement Jinja2>=2.10.1, but you'll have jinja2 2.10 which is incompatible.
ERROR: flask 1.1.1 has requirement Werkzeug>=0.15, but you'll have werkzeug 0.14.1 which is incompatible.
執(zhí)行:pip3 install -U Flask==1.0.4
執(zhí)行:pip3 install -U pika==0.13.1
重新執(zhí)行 :pip install apache-airflow -i https://pypi.douban.com/simple
----- 設(shè)置環(huán)境變量
(airflow) $ export AIRFLOW_HOME=/tmp/airflow
----- 查看其版本信息
(airflow) $ airflow version
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
v1.8.0
執(zhí)行了上述的命令后,會生成 airflow.cfg 和 unittests.cfg 兩個文件,其中前者是一個配置文件 。
## airflow 配置
----- 修改Airflow DB配置
### 1. 安裝Mysql模塊
pip install "apache-airflow[mysql]"
這里可以簡單說下,airflow依賴的其他組件均可以此方式安裝。在之后安裝password組件同樣是通過此方式。
修改Airflow DB配置
修改${AIRFLOW_HOME}/airflow.cfg
sql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow
參數(shù)的格式為mysql://帳號:密碼@ip:port/db
初始化db
新建airflow依賴的表。
airflow initdb
如報錯 Can't connect to local MySQL server through socket '/var/lib/mysql/mysql.sock' (2)
需改sql_alchemy_conn = mysql+mysqldb://airflow:airflow@127.0.0.1:3306/airflow
```
### 2. 用戶認證
```
本文采用的用戶認證方式為password方式,其他方式如LDAP同樣支持但是本文不會介紹。筆者在安裝時實驗過LDAP方式但是未成功過。
安裝passsword組件
pip install "apache-airflow[password]"
2. 修改 airflow.cfg
[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
3. 在python環(huán)境中執(zhí)行如下代碼以添加賬戶:
import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'admin' # 用戶名
user.email = 'emailExample@163.com' # 用戶郵箱
user.password = 'password' # 用戶密碼
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
```
### 3. 配置郵件服務(wù)
此配置設(shè)置的是dag的task失敗或者重試時發(fā)送郵件的發(fā)送者。配置如下:
```
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = mailExample@163.com
smtp_password = password
smtp_port = 25
smtp_mail_from = mailExample@163.com
接下來簡單把dag的Python代碼列出來,以供參考:
default_args = {
'owner': 'ownerExample',
'start_date': datetime(2018, 9, 18),
'email': ['mailReceiver@163.com'], # 出問題時,發(fā)送報警Email的地址,可以填多個,用逗號隔開。
'email_on_failure': ['mailReceiver@163.com'], # 任務(wù)失敗且重試次數(shù)用完時發(fā)送Email。
'email_on_retry': True, # 任務(wù)重試時是否發(fā)送Email
'depends_on_past': False, # 是否依賴于過去。如果為True,那么必須要昨天的DAG執(zhí)行成功了,今天的DAG才能執(zhí)行。
'retries': 3,
'retry_delay': timedelta(minutes=3),
}
```
### 4、配置Executor
```
設(shè)置Executor
修改:airflow.cfg
executor = LocalExecutor
本文中由于只有單節(jié)點所以使用的是LocalExecutor模式。
```
### 5. 修改log地址
```
[core]
base_log_folder = /servers/logs/airflow
[scheduler]
child_process_log_directory = servers/logs/airflow/scheduler
```
### 6. 修改webserver地址
```
修改webserver地址
[webserver]
base_url = http://host:port
可以通過上面配置的地址訪問webserver。
```
### 7. 可選配置
```
(可選)修改Scheduler線程數(shù)
如果調(diào)度任務(wù)不多的話可以把線程數(shù)調(diào)小,默認為32。參數(shù)為:parallelism
(可選)不加載example dag
如果不想加載示例dag可以把load_examples配置改為False,默認為True。這個配置只有在第一次啟動airflow之前設(shè)置才有效。
如果此方法不生效,可以刪除${PYTHON_HOME}/site-packages/airflow/example_dags目錄,也是同樣的效果。
(可選)修改檢測新dag間隔
修改min_file_process_interval參數(shù)為10,每10s識別一次新的dag。默認為0,沒有時間間隔。
```
## 運行airflow
```
啟動schedule
airflow scheduler
啟動webserver
airflow webserver
```
## 安裝問題匯總
```
1. Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
修改Mysql配置文件my.cnf,具體步驟如下:
查找my.cnf文件位置
mysql --help | grep my.cnf
下圖紅框處為my.cnf文件所在位置:
修改文件
explicit_defaults_for_timestamp=true
注意:必須寫在【mysqld】下
重啟Mysql
sudo systemctl restart mysqld.service
查看修改是否生效。執(zhí)行如下SQL,如果值為1則為生效。
2. pip install "apache-airflow[mysql]"報錯:
mysql_config not found
安裝mysql-devel:
首先查看是否有mysql_config文件。
find / -name mysql_config
如果沒有安裝mysql-devel
yum install mysql-devel
安裝之后再次查找,結(jié)果如圖:
3. 其他問題找我
```
## 配置任務(wù)
在 AirFlow 中,每個節(jié)點都是一個任務(wù),可以是一條命令行 (BashOperator),可以是一段 Python 腳本 (PythonOperator) 等等,然后這些節(jié)點根據(jù)依賴關(guān)系構(gòu)成了一條流程,一個圖,稱為一個 DAG 。
默認會到 ${AIRFLOW_HOME}/dags 目錄下查找,可以直接在該目錄下創(chuàng)建相應(yīng)的文件。
如下是一個簡單的示例。
```
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta, datetime
import pytz
# -------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'qxy',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
tz = pytz.timezone('Asia/Shanghai')
# naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S")
# local_dt = tz.localize(naive, is_dst=None)
# utc_dt = local_dt.astimezone(pytz.utc).replace(tzinfo=None)
dt = datetime(2019, 7, 16, 16, 30, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
dag = DAG(
'airflow_interval_test',
default_args=default_args,
description='airflow_interval_test',
schedule_interval='35 17 * * *',
start_date=utc_dt
)
t1 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=dag)
t2 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t1 >> t2
```
該文件創(chuàng)建一個簡單的 DAG,只有三個運算符,兩個 BaseOperator ,也就是執(zhí)行 Bash 命令分別打印日期以及休眠 5 秒;另一個為 PythonOperator 在執(zhí)行任務(wù)時調(diào)用 print_hello() 函數(shù)。
文件創(chuàng)建好后,放置到 ${AIRFLOW_HOME}/dags,airflow 自動讀取該DAG。
----- 測試是否正常,如果無報錯那么就說明正常
$ python /tmp/project/dags/hello_world.py
本次安裝Airflow版本為1.10+,其需要依賴Python和DB,本次選擇的DB為Mysql。
本次安裝組件及版本如下:Airflow == 1.10.0
Python == 3.6.5
Mysql == 5.7
# 整體流程
1. 建表
2. 安裝
3. 配置
4. 運行
5. 配置任務(wù)
```
啟動schedule
airflow scheduler -D
啟動webserver
airflow webserver -D
ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}
ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {}
## 建庫、建用戶
```
庫名為airflow
'create database airflow;'
建用戶
用戶名為airflow,并且設(shè)置所有ip均可以訪問。
create user 'airflow'@'%' identified by 'airflow';
create user 'airflow'@'localhost' identified by 'airflow';
用戶授權(quán)
這里為新建的airflow用戶授予airflow庫的所有權(quán)限
grant all on airflow.* to 'airflow'@'%';
flush privileges
```
## Airflow安裝
```
這里通過 virtualenv 進行安裝。
----- 通過virtualenv安裝
$ mkdir /usr/local/virtual_env && cd /usr/local/virtual_env # 創(chuàng)建目錄
$ virtualenv --no-site-packages airflow --python=python # 創(chuàng)建虛擬環(huán)境
$ source /usr/local/virtual_env/airflow/bin/activate # 激活虛擬環(huán)境
----- 安裝指定版本或者默認
$ pip install apache-airflow -i https://pypi.douban.com/simple
在安裝完一堆的依賴后,就需要配置 AIRFLOW_HOME 環(huán)境變量,后續(xù)的 DAG 和 Plugin 都將以該目錄作為根目錄查找,如上,可以直接設(shè)置為 /tmp/project 。
報錯
ERROR: flask 1.1.1 has requirement Jinja2>=2.10.1, but you'll have jinja2 2.10 which is incompatible.
ERROR: flask 1.1.1 has requirement Werkzeug>=0.15, but you'll have werkzeug 0.14.1 which is incompatible.
執(zhí)行:pip3 install -U Flask==1.0.4
執(zhí)行:pip3 install -U pika==0.13.1
重新執(zhí)行 :pip install apache-airflow -i https://pypi.douban.com/simple
----- 設(shè)置環(huán)境變量
(airflow) $ export AIRFLOW_HOME=/tmp/airflow
----- 查看其版本信息
(airflow) $ airflow version
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
v1.8.0
執(zhí)行了上述的命令后,會生成 airflow.cfg 和 unittests.cfg 兩個文件,其中前者是一個配置文件 。
## airflow 配置
----- 修改Airflow DB配置
### 1. 安裝Mysql模塊
pip install "apache-airflow[mysql]"
這里可以簡單說下,airflow依賴的其他組件均可以此方式安裝。在之后安裝password組件同樣是通過此方式。
修改Airflow DB配置
修改${AIRFLOW_HOME}/airflow.cfg
sql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow
參數(shù)的格式為mysql://帳號:密碼@ip:port/db
初始化db
新建airflow依賴的表。
airflow initdb
如報錯 Can't connect to local MySQL server through socket '/var/lib/mysql/mysql.sock' (2)
需改sql_alchemy_conn = mysql+mysqldb://airflow:airflow@127.0.0.1:3306/airflow
```
### 2. 用戶認證
```
本文采用的用戶認證方式為password方式,其他方式如LDAP同樣支持但是本文不會介紹。筆者在安裝時實驗過LDAP方式但是未成功過。
安裝passsword組件
pip install "apache-airflow[password]"
2. 修改 airflow.cfg
[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
3. 在python環(huán)境中執(zhí)行如下代碼以添加賬戶:
import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'admin' # 用戶名
user.email = 'emailExample@163.com' # 用戶郵箱
user.password = 'password' # 用戶密碼
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
```
### 3. 配置郵件服務(wù)
此配置設(shè)置的是dag的task失敗或者重試時發(fā)送郵件的發(fā)送者。配置如下:
```
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = mailExample@163.com
smtp_password = password
smtp_port = 25
smtp_mail_from = mailExample@163.com
接下來簡單把dag的Python代碼列出來,以供參考:
default_args = {
'owner': 'ownerExample',
'start_date': datetime(2018, 9, 18),
'email': ['mailReceiver@163.com'], # 出問題時,發(fā)送報警Email的地址,可以填多個,用逗號隔開。
'email_on_failure': ['mailReceiver@163.com'], # 任務(wù)失敗且重試次數(shù)用完時發(fā)送Email。
'email_on_retry': True, # 任務(wù)重試時是否發(fā)送Email
'depends_on_past': False, # 是否依賴于過去。如果為True,那么必須要昨天的DAG執(zhí)行成功了,今天的DAG才能執(zhí)行。
'retries': 3,
'retry_delay': timedelta(minutes=3),
}
```
### 4、配置Executor
```
設(shè)置Executor
修改:airflow.cfg
executor = LocalExecutor
本文中由于只有單節(jié)點所以使用的是LocalExecutor模式。
```
### 5. 修改log地址
```
[core]
base_log_folder = /servers/logs/airflow
[scheduler]
child_process_log_directory = servers/logs/airflow/scheduler
```
### 6. 修改webserver地址
```
修改webserver地址
[webserver]
base_url = http://host:port
可以通過上面配置的地址訪問webserver。
```
### 7. 可選配置
```
(可選)修改Scheduler線程數(shù)
如果調(diào)度任務(wù)不多的話可以把線程數(shù)調(diào)小,默認為32。參數(shù)為:parallelism
(可選)不加載example dag
如果不想加載示例dag可以把load_examples配置改為False,默認為True。這個配置只有在第一次啟動airflow之前設(shè)置才有效。
如果此方法不生效,可以刪除${PYTHON_HOME}/site-packages/airflow/example_dags目錄,也是同樣的效果。
(可選)修改檢測新dag間隔
修改min_file_process_interval參數(shù)為10,每10s識別一次新的dag。默認為0,沒有時間間隔。
```
## 運行airflow
```
啟動schedule
airflow scheduler
啟動webserver
airflow webserver
```
## 安裝問題匯總
```
1. Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
修改Mysql配置文件my.cnf,具體步驟如下:
查找my.cnf文件位置
mysql --help | grep my.cnf
下圖紅框處為my.cnf文件所在位置:
修改文件
explicit_defaults_for_timestamp=true
注意:必須寫在【mysqld】下
重啟Mysql
sudo systemctl restart mysqld.service
查看修改是否生效。執(zhí)行如下SQL,如果值為1則為生效。
2. pip install "apache-airflow[mysql]"報錯:
mysql_config not found
安裝mysql-devel:
首先查看是否有mysql_config文件。
find / -name mysql_config
如果沒有安裝mysql-devel
yum install mysql-devel
安裝之后再次查找,結(jié)果如圖:
3. 其他問題找我
```
## 配置任務(wù)
在 AirFlow 中,每個節(jié)點都是一個任務(wù),可以是一條命令行 (BashOperator),可以是一段 Python 腳本 (PythonOperator) 等等,然后這些節(jié)點根據(jù)依賴關(guān)系構(gòu)成了一條流程,一個圖,稱為一個 DAG 。
默認會到 ${AIRFLOW_HOME}/dags 目錄下查找,可以直接在該目錄下創(chuàng)建相應(yīng)的文件。
如下是一個簡單的示例。
```
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta, datetime
import pytz
# -------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'qxy',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
tz = pytz.timezone('Asia/Shanghai')
# naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S")
# local_dt = tz.localize(naive, is_dst=None)
# utc_dt = local_dt.astimezone(pytz.utc).replace(tzinfo=None)
dt = datetime(2019, 7, 16, 16, 30, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
dag = DAG(
'airflow_interval_test',
default_args=default_args,
description='airflow_interval_test',
schedule_interval='35 17 * * *',
start_date=utc_dt
)
t1 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=dag)
t2 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t1 >> t2
```
該文件創(chuàng)建一個簡單的 DAG,只有三個運算符,兩個 BaseOperator ,也就是執(zhí)行 Bash 命令分別打印日期以及休眠 5 秒;另一個為 PythonOperator 在執(zhí)行任務(wù)時調(diào)用 print_hello() 函數(shù)。
文件創(chuàng)建好后,放置到 ${AIRFLOW_HOME}/dags,airflow 自動讀取該DAG。
----- 測試是否正常,如果無報錯那么就說明正常
$ python /tmp/project/dags/hello_world.py
轉(zhuǎn)載于:https://www.cnblogs.com/duanhaoxin/p/11211815.html
總結(jié)
以上是生活随笔為你收集整理的Airflow安装与使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【快报】基于K2 BPM的新一代协同办公
- 下一篇: 妈妈的菜谱-豉油鸡