python 任务调度 celery_python任务调度模块celery(二)
關于celery的的基礎介紹及安裝使用參見python任務調度模塊celery。
多worker和多隊列
首先是多worker和多隊列的原理及流程圖。
一般情況下對于多worker和多隊列的配置文件單獨寫在一個配置文件,方便管理和配置。
定義任務列表
multique.py1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16from celery import Celery
app = Celery()
app.config_from_object("celeryconfig")
def (x, y):
return x*y
def taskB(x, y, z):
return x+y+z
def add(x, y):
return x+y
配置文件
celeryconfig.py1
2
3
4
5
6
7
8
9
10
11
12
13
14
15from kombu import Queue, Exchange
BROKER_URL = "redis://118.24.18.158:6380/1"
CELERY_RESULT_BACKEND = "redis://118.24.18.158:6380/2"
CELERY_QUEUES = {
Queue("default", Exchange("default"), routing_key="default"),
Queue("for_task_A", Exchange("for_task_A"), routing_key="for_task_A"),
Queue("for_task_B", Exchange("for_task_B"), routing_key="for_task_B")
}
CELERY_ROUTES = {
"multique.taskA": {"queue": "for_task_A", "routing_key": "for_task_A"},
"multique.taskB": {"queue": "for_task_B", "routing_key": "for_task_B"}
}
啟動celery worker監聽1
2celery -A multique worker -l=info -n workerA.%h -Q for_task_A
celery -A multique worker -l=info -n workerB.%h -Q for_task_B
調用任務
multicelery.py1
2
3
4
5
6
7
8
9
10
11import time
from queue1.multique import *
re1 = taskA.delay(10, 20)
re2 = taskB.delay(100, 200, 300)
re3 = add.delay(1000, 2000)
time.sleep(2)
print(re1.result)
print(re2.result) #輸出結果:600
print(re3.status) #輸出結果:PENDING
print(re3.result) #輸出結果:None
我們看到狀態是PENDING,表示沒有執行,這個是因為沒有celeryconfig.py文件中指定改route到哪一個Queue中,所以會被發動到默認的名字celery的Queue中,但是我們還沒有啟動worker執行celery中的任務。下面,我們來啟動一個worker來執行celery隊列中的任務。1celery -A multique worker -l info -n worker.%h -Q celery
再次調用任務,狀態應該為SUCCESS,結果為3000。
celery定時任務
celery定時任務,Celery Beat進程通過讀取配置文件的內容,周期性的將定時任務發往任務隊列。
以上面多worker的異步任務為例,配置文件celeryconfig.py 中添加CELERYBEAT_SCHEDULE變量,添加內容如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18CELERY_TIMEZONE = 'UTC' #指定時區,默認為UTC
CELERYBEAT_SCHEDULE = {
'taskA_schedule': {
'task': 'multique.taskA',
'schedule': 2, #每2s執行一次
'args': (5, 6) #傳遞函數參數
},
'taskB_scheduler': {
'task': "multique.taskB",
"schedule": 10,
"args":(10, 20, 30)
},
'add_schedule': {
"task": "multique.add",
"schedule": 5,
"args": (1, 2)
}
}
參數說明task
指定任務的名字
schedule
設定任務的調度方式(設定任務如何重復執行),可以是一個表示秒的整數,也可以是一個 timedelta 對象,或者是一個 crontab 對象
args
任務的參數列表
kwargs
任務的參數字典
options
所有 apply_async 所支持的參數
啟動celery worker進程在項目根目錄執行命令1celery -A celeryapp worker -l=info #celeryapp為項目文件所在的package名稱
啟動celery beat進程
啟動Celery Beat進程,定時將任務發送到Broker,在項目根目錄執行下面命令1celery beat -A celeryapp
之后在啟動的worker窗口可以看到任務定時執行的情況。
啟動worker和beat進程也可以放在同一個命令中執行1celery -B -A celeryapp worker --loglevel=info
更多celery定時任務相關內容點擊Periodic Tasks查看官方介紹。
總結
以上是生活随笔為你收集整理的python 任务调度 celery_python任务调度模块celery(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 不刷新页面的tab_SwiftUI小技巧
- 下一篇: 时间控件_Selenium时间控件的处理