Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控
@Author:Runsen
消息隊列
消息隊列讓應用程序在用戶請求之外異步執行稱為任務的工作。如果應用程序需要在后臺執行工作,它會將任務添加到任務隊列中。這些任務稍后由工作服務執行。
Celery
Celery 是一個分布式任務隊列,可幫助在后臺異步執行大量進程/消息并進行實時處理。芹菜由三個主要成分組成:
- Celery 客戶端
- 消息代理
- Celery 工人
下圖顯示了組件之間如何交互的簡化圖。我們將使用 FastAPI 作為我們的 Celery 客戶端和 RabbitMQ 作為消息代理。
-
將Celery 客戶將運行FastAPI應用程序,并會發出消息/后臺作業的RabbitMQ。
-
RabbitMQ 將作為消息代理來調解客戶端和工作線程之間的消息。
-
RabbitMQ 收到客戶端的消息后,會通過將消息發送給 celery worker 來啟動客戶端任務。
-
一個celery 工人被認為是將實現在任何Web服務器的請求的異步后臺任務。可以有多個工人一次執行/完成許多任務。
-
celery 將確保每個 worker 一次只執行一個任務,并且每個任務只由一個 worker 分配。
FastAPI 是一個現代、快速(高性能)的 Web 框架,
安裝 fastapi包
pip install fastapi安裝 celery 包
pip install celery我們還需要安裝 ASGI 服務器來運行我們的 FastAPI 應用程序。
pip install uvicorn在我們的本地機器上運行 RabbitMQ 的最簡單方法之一是使用 Docker。
Docker安裝查看:https://docs.docker.com/get-docker/
運行以下命令即可通過終端中的 docker 啟動 RabbitMQ 映像。
docker run -d --name some-rabbit -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:3如果沒有安裝RabbitMQ 映像,會直接下載安裝。
創建 Celery Worker 任務
現在,一旦消息代理RabbitMQ 運行,就可以創建Worker 程序。
這是因為 celery Worker 會偵聽消息代理以執行排隊的任務。
在這一部分,我們將為 celery worker 創建任務。創建一個文件celery_worker.py:
from time import sleep from celery import Celery from celery.utils.log import get_task_logger #初始Celery celery = Celery('tasks', broker='amqp://guest:guest@127.0.0.1:5672//') #創建記錄器-啟用以在任務記錄器上顯示消息 celery_log = get_task_logger(__name__) #創建訂單-與芹菜異步運行 #長時間運行任務的示例流程 @celery.task def create_order(name, quantity):# 每1個訂單5秒complete_time_per_item = 5# 根據訂購的物品數量不斷增加sleep(complete_time_per_item * quantity)celery_log.info(f"Order Complete!")return {"message": f"Hi {name}, Your order has completed!","order_quantity": quantity}新建一個model.py
from pydantic import BaseModel #請求主體的訂單類模型 class Order(BaseModel):customer_name: strorder_quantity: int然后,創建一個名為main.py的新文件:
from fastapi import FastAPI from celery_worker import create_order from model import Order # 創建FastAPI應用程序 app = FastAPI() # 創建訂單 @app.post('/order') def add_order(order: Order):# 使用delay=() 方法調用芹菜任務create_order.delay(order.customer_name, order.order_quantity)return {"message": "Order Received! Thank you for your patience."}運行 FastAPI 應用程序:
uvicorn main:app --reload訪問http://localhost:8000/docs
以查看 Swagger 文檔中正在運行的 FastAPI。
celery -A celery_worker.celery worker --loglevel=info可以通過插入請求正文輸入來測試我們的端點。這是請求正文的示例輸入:
單擊“Execute”以從端點獲取響應,將看到以下結果:
有沒有什么有效的方法來監控后臺任務?
我們可以使用Flower 監控工具來監控我們所有的Celery 工人。
pip install flower然后,flower在我們的本地機器上運行服務器:
celery flower -A celery_worker.celery --broker:amqp://localhost//訪問http://localhost:5555/。
Flower 服務器并單擊導航欄上的“Tasks”選項卡來監控我們的 celery 工人。
總結
以上是生活随笔為你收集整理的Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深度学习和目标检测系列教程 21-300
- 下一篇: 进账单是什么