celery 学习笔记 01-介绍
celery 學習筆記 01-介紹
celery 是 python 中的常用的任務隊列框架,經常用于異步調用、后臺任務等工作。celery 本身以 python 寫,但協議可在不同的語言中實現,其它語言也可以用 celery 執行相應的任務。在 web 應用,為提高系統響應速度,發送郵件、數據整理等需要長時間執行的任務,通常以異步任務的方式執行,這時就需要用到像 celery 類的框架。另一種常見的場景是大型系統的分布式處理,為了提升系統性能,各個組件通常以多個實例運行不同主機上,而組件之間的調用就需要用到 celery 這樣的框架。使用 celery (或消息隊列),有助于降低系統組件之間的耦合,有助于實現灰度發布、實現服務的分布式、實現水平擴展,最終提升系統健壯性和處理性能。
celery (和類似框架)的核心是任務隊列。用戶發起任務,celery 負責把任務排隊和整理,然后交到任務執行器 worker 中。 worker 監視任務隊列,獲取新任務并執行。在 celery 內部,以消息機制協調各個組件工作,消息需要借助一個中間人 broker 進行,如下 ::
client → celery task → broker → celery worker↑ ↓← ← ← ← result backendclient 發起任務時,一般是以異步方式(除非必要的同步 rpc ),獲得一個任務的 id 并保存下來,后續可通過 id 到 result backend 中查詢任務執行結果。broker 是第三方組件,可使用消息隊列( rabbitmq 等)、redis、數據庫等,只要能實現消息的存儲和分發理論上都能使用。 worker 以線程或進程的形式運行,從 broker 中取任務執行,然后把結果保存到 result backend 。
目前 rabbitmq 的 broker 實現的功能最完備,在開發環境中也可以使用 sqlite 等比較方便的方式,但性能會很差,不能用在生產環境上。
另外需要注意的是,由于不同操作系統的進程模型的差異,celery 會在 windows 上產生一些配置方面的怪異問題。
celery 可直接通過 pip 安裝,在 virtualenv 下,直接運行 ::
pip install celery再安裝 broker 所需要的驅動,例如使用 rabbitmq ,則安裝 ::
pip install amqp同時安裝好 rabbitmq (建議通過 docker 安裝,使用 rabbitmq:management 鏡像,可在 15672 端口查看管理控制臺)。
然后使用下面的代碼示例(摘錄來自: Ask Solem. “Celery Manual, Version 3.1“) ::
# hello.py from celery import Celeryapp = Celery('hello', broker='amqp://guest:guest@localhost//')@app.task def hello():return 'hello world'if __name__ == '__main__':r = hello.delay()然后,啟動 worker ::
celery -A hello worker --loglevel=infoclient 執行任務 ::
python hello.pyapp.task 裝飾器標記一個函數為 celery 任務,client 用 delay 方法執行時。 delay 調用 apply_async() 進行異步執行, apply_async 還可配置如隊列、countdown 等執行選項。 celery 返回一個 AsyncResult 對象,如果 result backend 配置正確,client 可暫時把對象中的任務 id 保存到數據庫,后面再通過這個 id 獲取異步執行的結果。
上面的簡單例子是沒有參數的,如果增加參數,如下 ::
# add.py from celery import Celeryapp = Celery('add', broker='amqp://guest:guest@localhost//',backend='db+sqlite:///celery_result.db')@app.task def add(x, y):return x+yif __name__ == '__main__':r = add.delay(1, 2)print(r.wait())啟動 worker ::
celery -A add worker --l info調用 ::
python add.py當任務結果用 amqp 保存時,結果只能取一次, 因此無法在后續調用中查詢任務結果。這個例子用 sqlite 保存了任務執行結果,因此 client 可在 r.wait() 查詢任務的結果、任務的狀態等等很多信息,可把 r.id 保存到數據庫,然后未來查詢任務的 AsyncResult ::
r2 = app.AsyncResult(r.id) print(r2.wait()) print(r2.successful())add.py 中使用了兩個參數 x y ,而 celery 需要通過 broker 傳遞這兩個參數,這時需要對數據進行序列化,將 x y 對象轉換為無結構的數據,然后 worker 接收到后再把數據還原為 x y 對象。 celery 內置的序列化方法包括 pickle 、 json 等等,如果對象比較復雜,需要自己定義序列化方法。
如果不想立即執行任務,而是把任務傳遞到其它地方,通過 celery 的 subtask 支持。 subtask 是對 task 的調用參數和執行選項的一個封裝,如 ::
add.subtask((2,2), countdown=10) add.s(2,2)subtask 或 s 返回的是一個 task 的簽名(celery.canvas.Signature),它可實現工作流、偏函數等效果。subtask 支持和 task 同樣的調用方法,如 ::
s = add.s(2) # subtask ,partial s.delay(2) # 發送消息開始異步執行在 celery 工作流中組織 subtask 的方式有 group / chain / chord 等等, group 中任務并發執行,chain 中任務順序執行,chord 中進行回調。而這些組織方式本身也是 subtask ,可嵌套使用 ::
# workflow.py from celery import Celery, group, chainapp = Celery('add', broker='amqp://guest:guest@localhost//',backend='db+sqlite:///celery_result.db')@app.task def add(x, y):return x+yif __name__ == '__main__':g = group((add.s(i, i) for i in range(10)))r = g.delay()print(r.get())c = chain(add.s(1, 2) | add.s(3))r2 = c.delay()print(r2.get())celery 的任務調用通過網絡發送任務的名字和參數,不發送任務代碼, worker 收到任務后根據任務名和參數執行相應的代碼。因此不同 worker 中的代碼版本不一樣時,會有不同的處理結果。如果 worker 中不能處理相應的任務名,就會報錯。
轉載于:https://www.cnblogs.com/fengyc/p/5655287.html
總結
以上是生活随笔為你收集整理的celery 学习笔记 01-介绍的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 正在或即将被使用的Go依赖包管理方法:G
- 下一篇: HTML5实现屏幕手势解锁(转载)