【部署类】专题:消息队列MQ、进程守护Supervisor
目錄
1 背景需求
2 技術(shù)方案
2.1 消息隊列
2.2 進(jìn)程守護(hù)
3 源碼介紹
3.1 supervisor部分
3.1.1 supervisord.conf 內(nèi)容
3.1.2?MM3D.conf 和 MM3D_2.conf 內(nèi)容
3.2?算法程序(也就是我的主函數(shù))
1 背景需求
某C端產(chǎn)品,前端嵌入式(安卓)將采集的數(shù)據(jù)發(fā)送給后端,后端服務(wù)器(Java)要負(fù)責(zé)將數(shù)據(jù)交到算法服務(wù)器(python,C++),算法服務(wù)器收到數(shù)據(jù)并處理完后將結(jié)果再返回給后端,后端拿著結(jié)果二次加工后再發(fā)給前端顯示。
基本要求:
- 算法服務(wù)器有多臺,要充分利用,要滿足并發(fā)。
- 異常崩潰、關(guān)機(jī)等,算法要自啟動。
2 技術(shù)方案
- 應(yīng)對并發(fā)問題:后端和算法端采用RabbitMQ消息訂閱方案。
- 應(yīng)對異常自啟動問題:采用Supervisor進(jìn)程守護(hù)。
架構(gòu)圖如下:
2.1 消息隊列
消息隊列(Message queue)原理比較簡單(當(dāng)然細(xì)節(jié)很多),主要作用就是把所有生產(chǎn)者的數(shù)據(jù)放到一個隊列中,所有消費者從從這個隊列里取,確保每個數(shù)據(jù)僅被消費一次,互相不沖突。
詳細(xì)原理可參考:
消息隊列(mq)是什么? - 知乎
什么是消息隊列啊? - 知乎
RabbitMQ 入門系列(9)— Python 的 pika 庫常用函數(shù)及參數(shù)說明_wohu1104的專欄-CSDN博客
2.2 進(jìn)程守護(hù)
進(jìn)程守護(hù)的目的是讓異常崩潰的程序能自動重啟。
Supervisor是用Python開發(fā)的一套通用的進(jìn)程管理程序,能將一個普通的命令行進(jìn)程變?yōu)楹笈_daemon,并監(jiān)控進(jìn)程狀態(tài),異常退出時能自動重啟。
幾個要點的解釋:
- Supervisor為什么能啟動程序?
- 答:Supervisor自己本身是某種程序,它能在Linux系統(tǒng),通過自定義的配置去指定任意個子程序(每個子程序要定義一個唯一名稱),而每個子進(jìn)程被啟動后會去執(zhí)行一個shell文件(.sh文件),而你可以在這個shell文件中自定義任何命令行代碼,所以你能以任何方式去啟動任意位置的任意多個程序。
- Supervisor為什么能自動啟動崩潰的程序?
- 答:由于supervisor的子進(jìn)程會通過指定的shell腳本去啟動其他“孫”進(jìn)程(也就是你想啟動的程序),并且子進(jìn)程能和孫進(jìn)程通信,所以,當(dāng)你的程序崩潰時,其所屬的supervisor子進(jìn)程會重新執(zhí)行一次shell腳本,把這個崩潰的程序再啟動。這里重啟的規(guī)則和配置有很多方式,很靈活。
更多信息,我看了比較好的參考如下(推薦級分先后順序):
??????Supervisor使用詳解 - 浪淘沙& - 博客園
詳解Supervisor進(jìn)程守護(hù)監(jiān)控 - 請叫我頭頭哥 - 博客園
supervisor 使用詳解_11111-CSDN博客_supervisor
3 源碼介紹
算法服務(wù)器部分運行的邏輯是:
- 算法服務(wù)器開機(jī)。
- supervisor程序自動啟動,通過配置文件,自動開啟相應(yīng)的子進(jìn)程。每個子進(jìn)程啟動后再去調(diào)用一個shell文件,把算法程序逐一啟動起來。
- 眾多算法程序開始實時訂閱唯一的文件服務(wù)器消息。
- 某個算法程序從MQ隊列中拿到一個文件包路徑和名字后,就會通過FTP去文件服務(wù)器下載數(shù)據(jù)到算法服務(wù)器本地,然后算法模塊開始處理數(shù)據(jù)、返回數(shù)據(jù)給后端,然后重新監(jiān)聽。
3.1 supervisor部分
supervisor安裝好后,配置文件一般放在/etc/supervisor文件夾內(nèi),里面有如下兩個文件:
- supervisord.conf:supervisor的基本配置文件
- conf.d:一個文件夾,里面存放supervisor每個子進(jìn)程的配置文件。(我有個疑問是為什么一個文件夾要用.d起名字,看起來還以為是個文件)
- MM3D.conf:我定義的一個子進(jìn)程配置。這個conf文件的名字可以隨便取。
- MM3D_2.conf:我定義的第二個子進(jìn)程配置。
3.1.1 supervisord.conf 內(nèi)容
; supervisor config file[unix_http_server]
file=/var/run/supervisor.sock ; (the path to the socket file)
chmod=0700 ; sockef file mode (default 0700)[supervisord]
logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log)
pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
childlogdir=/var/log/supervisor ; ('AUTO' child log dir, default $TEMP); the below section must remain in the config file for RPC
; (supervisorctl/web interface) to work, additional interfaces may be
; added by defining them in separate rpcinterface: sections
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL for a unix socket; The [include] section can just contain the "files" setting. This
; setting can list multiple files (separated by whitespace or
; newlines). It can also contain wildcards. The filenames are
; interpreted as relative to this file. Included files *cannot*
; include files themselves.[include]
files = /etc/supervisor/conf.d/*.conf
3.1.2?MM3D.conf 和 MM3D_2.conf 內(nèi)容
MM3D.conf內(nèi)容如下:
[program:MM3D]
directory=/mm3d/RV2_magic_mirror_algo/MM3D
command=sh /mm3d/RV2_magic_mirror_algo/MM3D/excute.sh
autostart=true
autorestart=true
startretries=100
redirect_stderr=true
stdout_logfile=/mm3d/RV2_magic_mirror_algo/MM3D/supervisor_out.log
MM3D_2.conf內(nèi)容如下:
[program:MM3D_2]
directory=/mm3d/RV2_magic_mirror_algo/MM3D
command=sh /mm3d/RV2_magic_mirror_algo/MM3D/excute.sh
autostart=true
autorestart=true
startretries=100
redirect_stderr=true
stdout_logfile=/mm3d/RV2_magic_mirror_algo/MM3D/supervisor_2out.log
注意:conf文件中,比較重要的參數(shù)感覺有兩個:
- 唯一的進(jìn)程名。也就是:[program:XXX]里面的XXX。后面使用supervisorctl 各種命令操控子進(jìn)程需要用到這些名字。
- 生成日志的位置和名字。多個子進(jìn)程不要把日志搞一起了。
3.2?算法程序(也就是我的主函數(shù))
算法程序主要包括兩塊:
- MQ通信模塊(包括FTP拉取數(shù)據(jù)流)。
- 算法處理模塊以及數(shù)據(jù)上傳模塊。
代碼如下:
(config_MQ.py就省略了,里面是一些SDK、模型等地址,以及MQ的IP地址和密碼等)
import os
import numpy as npfrom pathlib import Path
from config_MQ import Config
import time
from loguru import logger
import sysimport pika
from ftplib import FTP
import jsondef ftp_connect():try:"""連接ftp:return:"""ftp = FTP()logger.debug('config.ftp_host: {}', config.ftp_host)logger.debug('config.ftp_port: {}', config.ftp_port)ftp.connect(config.ftp_host, config.ftp_port) # 連接遠(yuǎn)程服務(wù)器IP地址ftp.encoding = 'utf-8'ftp.set_debuglevel(1) # 不開啟調(diào)試模式ftp.login(config.ftp_user, config.ftp_pwd) # 登錄ftp# print(ftp.getwelcome()) # ftp服務(wù)器歡迎語except Exception as e:#print(e)logger.exception('ftp_connect error: {}', e)return Noneelse:return ftpdef read_file(file_path, target_dir, filename):ftp = ftp_connect() # 連接ftp# ftp服務(wù)器上文件的路徑# 本地文件下載保存的路徑# 本地文件下載寫入的路徑文件# writefile = '%s/%s' % (write_path, filename)write_path = target_dir + '/%s' % (filename + '.zip')with open(write_path, "wb") as f:ftp.retrbinary('RETR %s' % file_path, f.write)ftp.close();def callbackTry(ch, method, properties, body):print(body.decode())ch.basic_ack(delivery_tag=method.delivery_tag)## 拿到消息轉(zhuǎn)jsonbodyJson = json.loads(body.decode())filepath = bodyJson['filepath']user_id = bodyJson['keypair']callback_url = bodyJson['callbackUrl'] # 回調(diào)云端地址sample_raw_dir = os.path.join(raw_data_root, user_id) #../../MM3D_RAW/B16XXXXXXXXsample_result_dir = os.path.join(result_data_root, user_id) # ../../MM3D_Result/B16XXXXXXX# 拿到ftp url下載文件并保存sample_raw_dirif not os.path.isdir(sample_raw_dir):try:os.mkdir(sample_raw_dir)except Exception as e:logger.exception('Fail to mkdir to raw data: {}', e)#print('Fail to mkdir to raw data', e)if not os.path.isdir(sample_result_dir):try:os.mkdir(sample_result_dir)except Exception as e:logger.exception('Fail to mkdir to result data: {}', e)#print('Fail to mkdir to result data', e)try:# zip_file = user_id + '.zip'# file.save(os.path.join(sample_raw_dir, zip_file))read_file(filepath, sample_raw_dir, user_id) #通過FTP拉取數(shù)據(jù)包并保存在本地except Exception as e:logger.exception('Fail to save raw data: {}', e)#print("Fail to save raw data", e)start_time = int(round(time.time() * 1000))sample_key_pair = sample_raw_dir.split('/')[-1]# 識別文件的路勁logger.debug("sample_raw_dir :{}", sample_raw_dir)logger.debug("callback_url :{}", callback_url)############################ 算法部分 ############################## TODO 調(diào)用算法程序識別def callback(ch, method, properties, body):try:callbackTry(ch, method, properties, body)except Exception as e:logger.exception('algo error: {}', e)#print("algo error:", e)def init_rabbitmq():# 創(chuàng)建連接時的登錄憑證。 username: MQ 賬號, password: MQ 密碼credentials = pika.PlainCredentials(config.rabbitmq_user, config.rabbitmq_pwd)# 阻塞式連接 MQ# parameters: 連接參數(shù)(包含主機(jī)/端口/虛擬主機(jī)/賬號/密碼等憑證信息)connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq_host, port=config.rabbitmq_port, virtual_host='/',credentials=credentials))# 獲取與 rabbitmq 通信的通道channel = connection.channel()# 聲明交換器exchange = "algoExchange"channel.exchange_declare(exchange=exchange, durable=True, exchange_type='topic')# 聲明隊列ex_queue = "algoExchange_queue"channel.queue_declare(queue=ex_queue, durable=True, auto_delete=True)# 通過路由鍵將隊列和交換器綁定channel.queue_bind(exchange=exchange, queue=ex_queue, routing_key='algoConfigRoutingKey')# 從隊列中拿到消息開始消費#(當(dāng)要消費時,調(diào)用該回調(diào)函數(shù) callback)channel.basic_consume(ex_queue, callback,auto_ack=True) # auto_ack設(shè)置成 False,在調(diào)用callback函數(shù)時,未收到確認(rèn)標(biāo)識,消息會重回隊列。True,無論調(diào)用callback成功與否,消息都被消費掉# 處理 I/O 事件和 basic_consume 的回調(diào), 直到所有的消費者被取消# (開始循環(huán),直到發(fā)送退出消息)channel.start_consuming()if __name__ == "__main__":'''configue logger rotation="00:00:00",'''logger.add('../log/log-{time:YYYY-MM-DD}-PID='+ str(os.getpid()) +'.log', level="DEBUG",encoding="utf-8", colorize=True, format="<green>{time}</green> <level>{message}</level>" )config = Config()raw_data_root = config.raw_data_rootresult_data_root = config.result_data_rootraw_data_backup_root = config.raw_data_backupraw_data_backup_root_path = Path(raw_data_backup_root)if not raw_data_backup_root_path.is_dir():os.mkdir(config.raw_data_backup)############################ 算法初始化部分 ############################## TODO 調(diào)用初始化############################ rabbitmq部分 ############################init_rabbitmq()
總結(jié)
以上是生活随笔為你收集整理的【部署类】专题:消息队列MQ、进程守护Supervisor的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 原创:新四军司令遇袭,非但不痛恨,还营救
- 下一篇: CS131专题-2:高斯核、噪声、滤波