Celery-分布式任务队列
一、介紹
官方文檔:http://docs.celeryproject.org/en/latest/index.html
pip3 install celeryCelery是一個專注于實時處理和任務調度的分布式任務隊列,通過它可以輕松的實現任務的異步處理。
使用Celery的常見場景:
- Web應用。當用戶觸發的一個操作需要較長時間才能執行完成時,可以把它作為任務交給Celery去異步執行,執行完再返回給用戶。這段時間用戶不需要等待,提高了網站的整體吞吐量和響應時間。
- 定時任務。生產環境經常會跑一些定時任務。假如你有上千臺的服務器、上千種任務,定時任務的管理很困難,Celery可以幫助我們快速在不同的機器設定不同種任務。
- 同步完成的附加工作都可以異步完成。比如發送短信/郵件、推送消息、清理/設置緩存等。
Celery包含如下組件:
- Celery Beat:任務調度器,Beat進程會讀取配置文件的內容,周期性地將配置中到期需要執行的任務發送給任務隊列。
- Celery Worker:執行任務的消費者,通常會在多臺服務器運行多個消費者來提高執行效率。
- Broker:消息代理,或者叫作消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫)。
- Producer:調用了Celery提供的API、函數或者裝飾器而產生任務并交給任務隊列處理的都是任務生產者。
- Result Backend:任務處理完后保存狀態信息和結果,以供查詢。
二、簡單示例
創建一個tasks.py:
from celery import Celeryapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672//",backend="redis://:123456@localhost:6379/0")@app.task def add(x, y):return x+y啟動Celery Worker來開始監聽并執行任務:
celery -A tasks worker -l info更多有關命令:
celery worker --help再打開一個終端, 進行命令行模式,調用任務:
>>> from tasks import add >>> relt = add.delay(10, 10) >>> relt.ready() # 檢查任務是否已經完成 True >>> relt.get() # 獲取任務結果,可設置timeout超時 20 >>> relt <AsyncResult: 470d5f45-42eb-4b0c-bd38-06b85fa5599b> >>> relt.id '470d5f45-42eb-4b0c-bd38-06b85fa5599b' >>> relt.result 20 >>> relt.status 'SUCCESS' from celery import Celery from celery.result import AsyncResultapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672/pdvhost",backend="redis://:123456@localhost:6379/0")result = AsyncResult(id="470d5f45-42eb-4b0c-bd38-06b85fa5599b", app=app) print(result.get()) # 20 View Code三、配置
官方文檔,配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html#configuration
像上面簡單示例中,要想添加配置,則可以直接在應用程序設置配置:
app.conf.task_serializer = "json"如果您一次配置多個設置,則:
app.conf.update(task_serializer="json",accept_content=["json"],result_serializer="json",timezone="Europe/Oslo",enable_utc=True, )對于大型項目,建議使用專用配置模塊。因為項目復雜,最好做到程序的解耦,所以將配置保存在集中位置是一個非常好的選擇,一般默認 celeryconfig.py 模塊是用來保存配置的,你也可以使用自己定義的名字,然后通過調用 app.config_from_object() 方法告訴 Celery 實例使用配置模塊:
app.config_from_object("celeryconfig") # 或者 from . import celeryconfig app.config_from_object(celeryconfig)四、在項目中使用Celery
項目布局:
方案選擇:
五、在后臺運行worker
在生產中,我們需要在后臺運行worker,官方文檔daemonization教程中有詳細描述。
守護程序腳本使用celery multi命令在后臺啟動一個或多個worker:
# 啟動worker后臺運行 celery multi start w1 -A proj.celeryapp -l info celery multi start w2 -A proj.celeryapp -l info PS:如果使用的是默認的celery.py,那么直接proj即可# 重啟 celery multi restart w1 -A proj -l info# 停止 celery multi stop w1 -A proj -l info# 確保退出之前完成所有當前正在執行的任務 celery multi stopwait w1 -A proj -l info默認情況下,它會在當前目錄下創建的pid和日志文件,為了防止多個worker在彼此之上啟動,最好將這些文件放在專用目錄中:
mkdir /var/run/celery mkdir /var/log/celery celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log六、指定隊列傳送任務
官方文檔:https://celery.readthedocs.io/en/latest/userguide/routing.html#guide-routing
在 celeryconfig.py 中加入以下配置:
# 路由鍵以 task. 開頭的消息都進default隊列 # 路由鍵以 web. 開頭的消息都進web_tasks隊列 task_queues = (Queue("default", routing_key="task.#"),Queue("web_tasks", routing_key="web.#"), ) # 默認的交換機名字為tasks task_default_exchange = "tasks" # 設置默認交換類型為topic task_default_exchange_type = "topic" # 默認的路由鍵是 task.default task_default_routing_key = "task.default" # 要將任務路由到web_tasks隊列,可以在task_routes設置中添加條目 task_routes = {# tasks.add的消息會進入web_tasks隊列"proj.tasks.add": {"queue": "web_tasks","routing_key": "web.add",}, }其他代碼與上面 四 中的相同。
啟動worker,指定該worker工作于哪個隊列:
# 該worker只會執行web_tasks隊列中的任務 celery -A proj.celeryapp worker -Q web_tasks -l info七、定時任務
官方文檔:https://celery.readthedocs.io/en/latest/userguide/periodic-tasks.html
Celery支持定時任務,設定好任務的執行時間,Celery就會定時自動幫你執行, 這個定時任務模塊叫 celery beat。
函數版tasks.py:
from celery import Celery from celery.schedules import crontabapp = Celery("tasks", broker="amqp://pd:123456@localhost:5672//", backend="redis://:123456@localhost:6379/0") app.conf.timezone = "Asia/Shanghai"@app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs):# 每5秒執行一次 test("Hello")sender.add_periodic_task(5.0, test.s("Hello"), name="every-5s")# 每10秒執行一次 test("World")sender.add_periodic_task(10.0, test.s("World"), name="every-10s", expires=5)# 每周一早上 7:30 執行一次 test("Happy Mondays!") sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),test.s("Happy Mondays!"),)@app.task def test(arg):print(arg) View Code celery -A tasks worker -l info celery -A tasks beat -l info配置版:
########## celeryapp.py ########## from celery import Celery from . import celeryconfigapp = Celery("proj.celeryapp", include=["proj.tasks"]) app.config_from_object(celeryconfig)if __name__ == "__main__":app.start()########## celeryconfig.py ########## broker_url = "amqp://pd:123456@114.116.50.214:5672//" result_backend = "redis://:123456@114.116.50.214:6379/0" task_serializer = "msgpack" result_serializer = "json" result_expires = 60*60*24 accept_content = ["json", "msgpack"] timezone = "Asia/Shanghai"from celery.schedules import crontab beat_schedule = {"every-10s": {"task": "proj.tasks.add","schedule": 10.0,"args": (10, 10)},"every-monday-morning-7:30": {"task": "proj.tasks.mul","schedule": crontab(hour=7, minute=30, day_of_week=1),"args": (10, 10)} }########## tasks.py ########## from .celeryapp import app@app.task def add(x, y):return x+y@app.task def mul(x, y):return x*y View Code celery -A proj.celeryapp worker -l info celery -A proj.celeryapp beat -l info八、在Django中使用celery
發布任務
https://celery.readthedocs.io/en/latest/django/first-steps-with-django.html#extensions
項目布局:
import os from celery import Celeryos.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings") app = Celery("mysite") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks()@app.task(bind=True) def debug_task(self):print("Request: {0!r}".format(self.request)) celeryapp.py from .celeryapp import app as celery_app __all__ = ["celery_app"] __init__.pysettings.py,更多設置參考:https://celery.readthedocs.io/en/latest/userguide/configuration.html
#for celery CELERY_BROKER_URL = "amqp://pd:123456@114.116.50.214:5672//" CELERY_RESULT_BACKEND = "redis://:123456@114.116.50.214:6379/0"在app里的tasks.py里編寫任務:
from celery import shared_task@shared_task def add(x, y):return x+y@shared_task def mul(x, y):return x*y在views里調用celery task:
from django.shortcuts import HttpResponse from app01 import tasksdef test(request):result = tasks.add.delay(100, 100)return HttpResponse(result.get())定時任務
https://celery.readthedocs.io/en/latest/userguide/periodic-tasks.html#using-custom-scheduler-classes
1、安裝?django-celery-beat
pip3 install django-celery-beat2、在settings.py中設置
INSTALLED_APPS = [...,'django_celery_beat', ]3、進行數據庫遷移,以便創建定時任務所需的表
python3 manage.py migrate4、開始監測定時任務
celery -A mysite.celeryapp beat -l info -S django5、在django-admin界面設置定時任務
?
轉載于:https://www.cnblogs.com/believepd/p/10643392.html
總結
以上是生活随笔為你收集整理的Celery-分布式任务队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [20190401]跟踪dbms_loc
- 下一篇: springboot启动出错,