python celery多worker、多队列、定时任务
多worker、多隊列
celery是一個分布式的任務調度模塊,那么怎么實現它的分布式功能呢,celery可以支持多臺不同的計算機執行不同的任務或者相同的任務。
如果要說celery的分布式應用的話,就要提到celery的消息路由機制,提到AMQP協議。
簡單理解:
可以有多個"消息隊列"(message Queue),不同的消息可以指定發送給不同的Message Queue,
而這是通過Exchange來實現的,發送消息到"消息隊列"中時,可以指定routiing_key,Exchange通過routing_key來吧消息路由(routes)到不同的"消息隊列"中去。
exchange 對應 一個消息隊列(queue),即:通過"消息路由"的機制使exchange對應queue,每個queue對應每個worker。
下面我們來看一個列子:
''' 遇到問題沒人解答?小編創建了一個Python學習交流QQ群:857662006 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' vi tasks.py#!/usr/bin/env python #-*- coding:utf-8 -*- from celery import Celeryapp = Celery() app.config_from_object("celeryconfig") # 指定配置文件@app.task def taskA(x,y): return x + y@app.task def taskB(x,y,z): return x + y + z@app.task def add(x,y): return x + y編寫配置文件,配置文件一般單獨寫在一個文件中。
vi celeryconfig.py#!/usr/bin/env python #-*- coding:utf-8 -*-from kombu import Exchange,QueueBROKER_URL = "redis://47.106.106.220:5000/1" CELERY_RESULT_BACKEND = "redis://47.106.106.220:5000/2"CELERY_QUEUES = ( Queue("default",Exchange("default"),routing_key="default"), Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"), Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B") ) # 路由 CELERY_ROUTES = { 'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"}, 'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"} }遠程客戶端上編寫測試腳本
vi test.pyfrom tasks import * re1 = taskA.delay(100, 200) print(re1.result) re2 = taskB.delay(1, 2, 3) print(re2.result) re3 = add.delay(1, 2) print(re3.status)啟動兩個worker來分別指定taskA、taskB,開兩個窗口分別執行下面語句。
celery -A tasks worker -l info -n workerA.%h -Q for_task_Acelery -A tasks worker -l info -n workerB.%h -Q for_task_B遠程客戶端上執行腳本可以看到如下輸出:
python test.py 300 6 PENDING在taskA所在窗口可以看到如下輸出:
....... ....... ....... task_A[tasks]. tasks.add. tasks.taskA. tasks.taskB[2018-05-27 19:23:49,235: INFO/MainProcess] Connected to redis://47.106.106.220:5000/1 [2018-05-27 19:23:49,253: INFO/MainProcess] mingle: searching for neighbors [2018-05-27 19:23:50,293: INFO/MainProcess] mingle: all alone [2018-05-27 19:23:50,339: INFO/MainProcess] celery@workerA.izwz920j4zsv1q15yhii1qz ready. [2018-05-27 19:23:56,051: INFO/MainProcess] sync with celery@workerB.izwz920j4zsv1q15yhii1qz [2018-05-27 19:24:28,855: INFO/MainProcess] Received task: tasks.taskA[8860e78a-b82b-4715-980c-ae125dcab2f9] [2018-05-27 19:24:28,872: INFO/ForkPoolWorker-1] Task tasks.taskA[8860e78a-b82b-4715-980c-ae125dcab2f9] succeeded in 0.0162177120219s: 300在taskB所在窗口可以看到如下輸出:
....... ....... ....... task_B [tasks]. tasks.add. tasks.taskA. tasks.taskB[2018-05-27 19:23:56,012: INFO/MainProcess] Connected to redis://47.106.106.220:5000/1 [2018-05-27 19:23:56,022: INFO/MainProcess] mingle: searching for neighbors [2018-05-27 19:23:57,064: INFO/MainProcess] mingle: sync with 1 nodes [2018-05-27 19:23:57,064: INFO/MainProcess] mingle: sync complete [2018-05-27 19:23:57,112: INFO/MainProcess] celery@workerB.izwz920j4zsv1q15yhii1qz ready. [2018-05-27 19:24:33,885: INFO/MainProcess] Received task: tasks.taskB[5646d0b7-3dd5-4b7f-8994-252c5ef03973] [2018-05-27 19:24:33,910: INFO/ForkPoolWorker-1] Task tasks.taskB[5646d0b7-3dd5-4b7f-8994-252c5ef03973] succeeded in 0.0235358460341s: 6我們看到狀態是PENDING,表示沒有執行,這個是因為沒有celeryconfig.py文件中指定改route到哪一個Queue中,所以會被發動到默認的名字celery的Queue中,但是我們還沒有啟動worker執行celery中的任務。下面,我們來啟動一個worker來執行celery隊列中的任務。
celery -A tasks worker -l info -n worker.%h -Q celery
再次在遠程客戶端執行test.py,可以看到結果執行成功,并且剛新啟動的worker窗口有如下輸出:
Celery與定時任務
在celery中執行定時任務非常簡單,只需要設置celery對象中的CELERYBEAT_SCHEDULE屬性即可。
下面我們接著在celeryconfig.py中添加CELERYBEAT_SCHEDULE變量:
還是按之前啟動三個worker
celery -A tasks worker -l info -n workerA.%h -Q for_task_Acelery -A tasks worker -l info -n workerB.%h -Q for_task_Bcelery -A tasks worker -l info -n worker.%h -Q celery啟動定時任務
[root@izwz920j4zsv1q15yhii1qz scripts]# celery -A tasks beat celery beat v4.1.1 (latentcall) is starting. __ - ... __ - _ LocalTime -> 2018-05-27 19:39:29 Configuration ->. broker -> redis://47.106.106.220:5000/1. loader -> celery.loaders.app.AppLoader. scheduler -> celery.beat.PersistentScheduler. db -> celerybeat-schedule. logfile -> [stderr]@%WARNING. maxinterval -> 5.00 minutes (300s)在之前啟動worker的三個窗口分別可以看到定時任務正在運行:
celery -A tasks worker -l info -n workerA.%h -Q for_task_A[2018-05-27 19:41:27,432: INFO/ForkPoolWorker-1] Task tasks.taskA[60f41780-c9a2-477b-be46-6620ef07631f] succeeded in 0.00289130600868s: 11 [2018-05-27 19:41:29,428: INFO/MainProcess] Received task: tasks.taskA[27220f52-dde2-471a-a87c-3f533d67217c] ...... ......celery -A tasks worker -l info -n workerB.%h -Q for_task_B[2018-05-27 19:41:18,420: INFO/ForkPoolWorker-1] Task tasks.taskB[b6f9aee3-e6b4-4f10-9428-457d9bb844cf] succeeded in 0.00282042898471s: 60 [2018-05-27 19:41:28,416: INFO/MainProcess] Received task: tasks.taskB[44dfea0b-b725-4874-bea2-9b66e8da573b] ...... ......celery -A tasks worker -l info -n worker.%h -Q celery [2018-05-27 19:41:23,428: INFO/ForkPoolWorker-1] Task tasks.add[315a9cca-3c95-4517-9289-2ece15cd46a4] succeeded in 0.00355823297286s: 3 [2018-05-27 19:41:28,423: INFO/MainProcess] Received task: tasks.add[c4a1b2c7-ecb7-4af4-85c1-a341b3ec6726] ...... ......總結
以上是生活随笔為你收集整理的python celery多worker、多队列、定时任务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python 常用PEP8规范
- 下一篇: 关于python使用threadpool