Python Celery和RabbitMQ实战教程
前言
Celery是一個(gè)異步任務(wù)隊(duì)列。它可以用于需要異步運(yùn)行的任何內(nèi)容。RabbitMQ是Celery廣泛使用的消息代理。在本這篇文章中,我將使用RabbitMQ來(lái)介紹Celery的基本概念,然后為一個(gè)小型演示項(xiàng)目設(shè)置Celery 。最后,設(shè)置一個(gè)Celery Web控制臺(tái)來(lái)監(jiān)視我的任務(wù)
基本概念
來(lái)!看圖說(shuō)話:
-
Broker
Broker(RabbitMQ)負(fù)責(zé)創(chuàng)建任務(wù)隊(duì)列,根據(jù)一些路由規(guī)則將任務(wù)分派到任務(wù)隊(duì)列,然后將任務(wù)從任務(wù)隊(duì)列交付給worker -
Consumer (Celery Workers)
Consumer是執(zhí)行任務(wù)的一個(gè)或多個(gè)Celery workers。可以根據(jù)用例啟動(dòng)許多workers -
Result Backend
后端用于存儲(chǔ)任務(wù)的結(jié)果。但是,它不是必需的元素,如果不在設(shè)置中包含它,就無(wú)法訪問(wèn)任務(wù)的結(jié)果
安裝Celery
首先,需要安裝好Celery,可以使用PyPI:
pip install celery選擇一個(gè)Broker:RabbitMQ
為什么我們需要broker呢這是因?yàn)镃elery本身并不構(gòu)造消息隊(duì)列,所以它需要一個(gè)額外的消息傳輸來(lái)完成這項(xiàng)工作。這里可以將Celery看作消息代理的包裝器
實(shí)際上,也可以從幾個(gè)不同的代理中進(jìn)行選擇,比如RabbitMQ、Redis或數(shù)據(jù)庫(kù)(例如Django數(shù)據(jù)庫(kù))
在這里使用RabbitMQ作為代理,因?yàn)樗δ芡暾⒎€(wěn)定,Celery推薦使用它。由于演示我的環(huán)境是在Mac OS中,安裝RabbitMQ使用Homebrew即可:
brew install rabbitmq #如果是Ubuntu的話使用apt-get安裝啟動(dòng)RabbitMQ
程序?qū)⒃?*/usr/local/sbin中安裝RabbitMQ,雖然有些系統(tǒng)可能會(huì)有所不同。可以將此路徑添加到環(huán)境變量路徑,以便以后方便地使用。例如,打開shell啟動(dòng)文件~/.bash_profile**添加:
PATH=$PATH:/usr/local/sbin現(xiàn)在,可以使用rabbitmq-server命令啟動(dòng)我們的RabbitMQ服務(wù)器。檢查RabbitMQ服務(wù)器成功啟動(dòng),將看到類似的輸出:
為Celery配置RabbitMQ
RabbitMQ使用Celery之前,需要對(duì)RabbitMQ進(jìn)行一些配置。簡(jiǎn)單地說(shuō),我們需要?jiǎng)?chuàng)建一個(gè)虛擬主機(jī)和用戶,然后設(shè)置用戶權(quán)限,以便它可以訪問(wèn)虛擬主機(jī)
# 添加用戶跟密碼 $ rabbitmqctl add_user test test123 # 添加虛擬主機(jī) $ rabbitmqctl add_vhost test_vhost # 為用戶添加標(biāo)簽 $ rabbitmqctl set_user_tags test test_tag # 設(shè)置用戶權(quán)限 $ rabbitmqctl set_permissions -p test_vhost test ".*" ".*" ".*"**敲黑板!**RabbitMQ中有三種操作:配置、寫入和讀取
上面命令末尾的字符串表示用戶test將擁有所有配置、寫入和讀取權(quán)限
演示項(xiàng)目
現(xiàn)在讓我們創(chuàng)建一個(gè)簡(jiǎn)單的項(xiàng)目來(lái)演示Celery的使用
在celery.py中添加以下代碼:
在這里,初始化了一個(gè)名為app的Celery實(shí)例,將用于創(chuàng)建一個(gè)任務(wù)。Celery的第一個(gè)參數(shù)只是項(xiàng)目包的名稱,即“test_celery”。
broker參數(shù)指定代理URL,對(duì)于RabbitMQ,傳輸是amqp。
后端參數(shù)指定后端URL。Celery中的后端用于存儲(chǔ)任務(wù)結(jié)果。因此,如果需要在任務(wù)完成時(shí)訪問(wèn)任務(wù)的結(jié)果,應(yīng)該為Celery設(shè)置一個(gè)后端。
rpc意味著將結(jié)果作為AMQP消息發(fā)送回去,這對(duì)本次演示來(lái)說(shuō)是一種可接受的格式
include參數(shù)指定了在Celery工作程序啟動(dòng)時(shí)要導(dǎo)入的模塊列表。我們?cè)谶@里添加了tasks模塊,以便找到我們的任務(wù)。
在tasks.py這個(gè)文件中,定義了我們的任務(wù)add_longtime:
from __future__ import absolute_import from test_celery.celery import app import time@app.task def add_longtime(a, b):print 'long time task begins'# sleep 5 secondstime.sleep(5)print 'long time task finished'return a + b可以看到,導(dǎo)入了在前面的Celery模塊中定義的應(yīng)用程序,并將其用作任務(wù)方法的裝飾器。另外注意!app.task只是一個(gè)裝飾器。此外,我們?cè)赼dd_longtime任務(wù)中休眠5秒,以模擬一個(gè)耗時(shí)較長(zhǎng)的Task
在設(shè)置好Celery之后,我們需要開始運(yùn)行任務(wù),它包含在runs_tasks.py:
from .tasks import add_longtime import timeif __name__ == '__main__':result = add_longtime.delay(1,2)#此時(shí),任務(wù)還未完成,它將返回Falseprint 'Task finished? ', result.ready()print 'Task result: ', result.result# 延長(zhǎng)到10秒以確保任務(wù)已經(jīng)完成time.sleep(10)# 現(xiàn)在任務(wù)完成,ready方法將返回Trueprint 'Task finished? ', result.ready()print 'Task result: ', result.result這里,我們使用delay方法調(diào)用任務(wù)add_longtime,如果我們想異步處理任務(wù),就需要使用delay方法。此外,保存任務(wù)的結(jié)果并打印一些信息。如果任務(wù)已經(jīng)完成,ready方法將返回True,否則返回False。result屬性是任務(wù)的結(jié)果,如果任務(wù)尚未完成,則返回None。
啟動(dòng)Celery
現(xiàn)在,可以使用下面的命令啟動(dòng)Celery(注:在項(xiàng)目文件夾中運(yùn)行):
celery -A test_celery worker --loglevel=infoCelery成功連接到RabbitMQ,你會(huì)看到這樣的東西:
運(yùn)行任務(wù)
再項(xiàng)目文件中輸入以下命令運(yùn)行它:
python -m test_celery.run_tasks查看Celery控制臺(tái),看到運(yùn)行任務(wù):
[2020-05-15 17:15:21,508: INFO/MainProcess] Received task: test_celery.tasks.add_longtime[25ba9c87-69a7-4383-b983-1cefdb32f8b3] [2020-05-15 17:15:21,508: WARNING/Worker-3] long time task begins [2020-05-15 17:15:31,510: WARNING/Worker-3] long time task finished [2020-05-15 17:15:31,512: INFO/MainProcess] Task test_celery.tasks.add_longtime[25ba9c87-69a7-4383-b983-1cefdb32f8b3] succeeded in 15.003732774s: 3當(dāng)Celery收到一個(gè)任務(wù),它打印出任務(wù)名稱與任務(wù)id(在括號(hào)中):
Received task: test_celery.tasks.add_longtime[7d942984-8ea6-4e4d-8097-225616f797d5]在這一行下面是我們的任務(wù)add_longtime打印的兩行,時(shí)間延遲為5秒:
long time task begins long time task finished最后一行顯示我們的任務(wù)在5秒內(nèi)完成,任務(wù)結(jié)果為3:
Task test_celery.tasks.add_longtime[7d942984-8ea6-4e4d-8097-225616f797d5] succeeded in 5.025242167s: 3在當(dāng)前控制臺(tái)中,您將看到以下輸出:
實(shí)時(shí)監(jiān)控Celery
Flower是一款基于網(wǎng)絡(luò)的Celery實(shí)時(shí)監(jiān)控軟件。使用Flower,可以輕松地監(jiān)視任務(wù)進(jìn)度和歷史記錄
使用pip來(lái)安裝Flower:
pip install flower要啟動(dòng)Flower web控制臺(tái),需要運(yùn)行以下命令:
celery -A test_celery flowerFlower將運(yùn)行具有默認(rèn)端口5555的服務(wù)器,可以通過(guò)http://localhost:5555訪問(wèn)web控制臺(tái)
好了,到這里又到了跟大家說(shuō)再見的時(shí)候了。我只是一個(gè)會(huì)寫爬蟲的段子手而已,一個(gè)希望有朝一日能夠?qū)崿F(xiàn)財(cái)富自由,能夠早日榮歸故里的游子罷了。希望我的文章能帶給您知識(shí),帶給您幫助,帶給您歡笑!同時(shí)也謝謝您能抽出寶貴的時(shí)間閱讀,創(chuàng)作不易,如果您喜歡的話,點(diǎn)個(gè)贊再走吧。您的支持是我創(chuàng)作的動(dòng)力,希望今后能帶給大家更多優(yōu)質(zhì)的文章
總結(jié)
以上是生活随笔為你收集整理的Python Celery和RabbitMQ实战教程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 火狐浏览器中如何设置自动翻译网页
- 下一篇: oracle的left join和inn