Celery的实践指南
生活随笔
收集整理的這篇文章主要介紹了
Celery的实践指南
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
Celery的實踐指南 celery原理: celery實際上是實現(xiàn)了一個典型的生產(chǎn)者-消費(fèi)者模型的消息處理/任務(wù)調(diào)度統(tǒng),消費(fèi)者(worker)和生產(chǎn)者(client)都可以有任意個,他們通過消息系統(tǒng)(broker)來通信。 典型的場景為: 客戶端啟動一個進(jìn)程(生產(chǎn)者),當(dāng)用戶的某些操作耗時較長或者比較頻繁時,考慮接入本消息系統(tǒng),發(fā)送一個task任務(wù)給broker。 后臺啟動一個worker進(jìn)程(消費(fèi)者),當(dāng)發(fā)現(xiàn)broker中保存有某個任務(wù)到了該執(zhí)行的時間,他就會拿過來,根據(jù)task類型和參數(shù)執(zhí)行。 實踐中的典型場景: 簡單的定時任務(wù): 替換crontab的celery寫法:
from?celery?import?Celery
from?celery.schedules?import?crontab
app?=?Celery("tasks",?backend="redis://localhost",?broker="redis://localhost")
app.conf.update(CELERYBEAT_SCHEDULE?=?{
? ? "add":?{
? ? ? ? "task":?"celery_demo.add",
? ? ? ? "schedule":?crontab(minute="*"),
? ? ? ? "args":?(16,?16)
? ? },
})
@app.task
def?add(x,?y):
? ? return?x?+?y
運(yùn)行celery的worker,讓他作為consumer運(yùn)行,自動從broker上獲得任務(wù)并執(zhí)行。 `celery?-A?celery_demo?worker` 運(yùn)行celery的client,讓其根據(jù)schedule,自動生產(chǎn)出task msg,并發(fā)布到broker上。 `celery?-A?celery_demo?beat` 安裝并運(yùn)行flower,方便監(jiān)控task的運(yùn)行狀態(tài) `celery?flower?-A?celery_demo` 或者設(shè)置登錄密碼 ` celery?flower?-A celery_demo --basic_auth=user1:password1,user2:password2
多同步任務(wù)-鏈?zhǔn)饺蝿?wù)- 失敗自動重試的task 失敗重試方法: 將task代碼函數(shù)參數(shù)增加self,同時綁定bind。 demo代碼: @app.task(bind=True,?default_retry_delay=300,?max_retries=5)
def?my_task_A(self):
? ? try:
? ? ? ? print("doing?stuff?here...")
? ? except?SomeNetworkException?as?e:
? ? ? ? print("maybe?do?some?clenup?here....")
? ? ? ? self.retry(e)
自動重試后,是否將任務(wù)重新入queue后排隊,還是等待指定的時間?可以通過self.retry()參數(shù)來指定。 派發(fā)到不同Queue隊列的task 一個task自動映射到多個queue中的方法, 通過配置task和queue的routing_key命名模式。 比如:把queue的exchange和routing_key配置成通用模式: 再定義task的routing_key的名稱: 可用的不同exchange策略: direct:直接根據(jù)定義routing_key topic:exchange會根據(jù)通配符來將一個消息推送到多個queue。 fanout:將消息拆分,分別推送到不同queue,通常用于超大任務(wù),耗時任務(wù)。 參考:http://celery.readthedocs.org/en/latest/userguide/routing.html#routers 高級配置 result是否保存 失敗郵件通知: 關(guān)閉rate limit: auto_reload方法(*nix系統(tǒng)): celery通過監(jiān)控源代碼目錄的改動,自動地進(jìn)行reload 使用方法:1.依賴inotify(Linux) 2. kqueue(OS X / BSD) 安裝依賴: $?pip?install?pyinotify
(可選) 指定fsNotify的依賴: $?env?CELERYD_FSNOTIFY=stat?celery?worker?-l?info?--autoreload
啟動: celery -A appname worker --autoreload auto-scale方法: 啟用auto-scale 臨時增加worker進(jìn)程數(shù)量(增加consumer): $?celery?-A?proj?control?add_consumer?foo?-d?worker1.local
臨時減少worker進(jìn)程數(shù)量(減少consumer): 將scheduled task的配置從app.conf變成DB的方法: 需要在啟動時指定custom schedule 類名,比如默認(rèn)的是: celery.beat.PersistentScheduler 。 celery?-A?proj?beat?-S?djcelery.schedulers.DatabaseScheduler
啟動停止worker的方法: 啟動 as daemon :?http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing root用戶可以使用celeryd 非特權(quán)用戶:celery multi start worker1 -A appName ?—autoreload ?--pidfile="$HOME/run/celery/%n.pid"? --logfile="$HOME/log/celery/%n.log" 或者 celery worker —detach 停止 ps?auxww?|?grep?'celery?worker'?|?awk?'{print?$2}'?|?xargs?kill?-9
與Flask集成的方法 集成后flask將充當(dāng)producer來創(chuàng)建并發(fā)送task給broker,在celery啟動的獨(dú)立worker進(jìn)程將從broker中獲得task并執(zhí)行,同時將結(jié)果返回。 flask中異步地獲得task結(jié)果的方法:add.delay(x,y),有時需要對參數(shù)進(jìn)行命名后傳遞 或者 add.apply_async(args=(x,y), countdown=30) flask獲得 與flask集成后的啟動問題 由于celery的默認(rèn)routing_key是根據(jù)生產(chǎn)者在代碼中的import級別來設(shè)定的,所以worker端在啟動時應(yīng)該注意其啟動目錄應(yīng)該在項目頂級目錄上,否者會出現(xiàn)KeyError。 性能提升: eventlet 和 greenlet 官方參考:http://docs.celeryproject.org/en/latest/userguide/index.html
from?celery?import?Celery
from?celery.schedules?import?crontab
app?=?Celery("tasks",?backend="redis://localhost",?broker="redis://localhost")
app.conf.update(CELERYBEAT_SCHEDULE?=?{
? ? "add":?{
? ? ? ? "task":?"celery_demo.add",
? ? ? ? "schedule":?crontab(minute="*"),
? ? ? ? "args":?(16,?16)
? ? },
})
@app.task
def?add(x,?y):
? ? return?x?+?y
def?my_task_A(self):
? ? try:
? ? ? ? print("doing?stuff?here...")
? ? except?SomeNetworkException?as?e:
? ? ? ? print("maybe?do?some?clenup?here....")
? ? ? ? self.retry(e)
轉(zhuǎn)載于:https://www.cnblogs.com/ToDoToTry/p/5453149.html
總結(jié)
以上是生活随笔為你收集整理的Celery的实践指南的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 京东白条可以还信用卡吗?信用卡又能还京东
- 下一篇: 邮储银行信用卡附属卡怎么办理?这些事项要