用 Celery 实现邮件推送系统
2019獨角獸企業重金招聘Python工程師標準>>>
系統需求
本文以Celery 實現分布式任務隊列為基礎,簡述了一個郵件推送系統的模型。
Celery 是 Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等著被分配工作的碼農。
需求:
1.在郵件推送系統中,我們需要對成千上萬的用戶發送郵件,發送郵件具有時效性,即不能說今天開始發郵件,要等到明天才能發送完畢。
2.發送郵件過程中,可能會遇到過于頻繁,郵件服務器上信件堆積無法及時接受新信件而產生的拒信,或者郵件服務器將我們的郵件判決為垃圾郵件。
3.郵件發送的 I/O 時間較長,不能讓程序在等待郵件服務器返回消息上浪費時間。
所以我們的推送系統要有以下特性:1.分布式處理作業;2.閉環監控;3.異步式分發作業
系統框圖
前端通過 ajax 調用 views 中的 callpush 接口,該接口將被推送用戶的篩選條件傳入 service,然后 service 請求數據庫,將返回數據作為參數調用 celery 接口中 addtask 函數。celery 接口中 addtask 根據 action 參數來判斷所要添加的任務類型,根據不同的類型分別進行處理,放入隊列。
系統的另外一頭,worker 從隊列中取出任務,用 mail 函數推送郵件,如果發送失敗就調用 error_handler 進行異常處理,此處我們將所有 task 的執行情況放入 redis 中,給每個任務進行標記,如果成功則標記為 1,失敗則 0.
前端可以通過 ajax 調用 pushstatus 來向 redis 中讀取任務執行情況,此處我們返回了成功和失敗任務的個數。
偽代碼實現
# Controller from redis import StrictRedis red = StrictRedis(host='localhost', port=6379, db=0)def callpush(request):area = request.POST.get('area')return HttpResponse(str(mailpush(area)))def pushstatus(request):failure = red.scard('status:0:task')success = red.scard('status:1:task')return HttpResponse('Failures: ' + str(failure) + '\nSuccess: ' + str(success))# Service def mailpush(**kargs):targets = MtUser.objects.filter(kargs).values('username', 'address')addtask(action='mailpush', data=targets, content='Hello %s!', subject='Greetings')return len(targets)# Celery Interface (Dispatcher) from celery import Celeryapp = Celery() app.config_from_object('celeryconfig')def addtask(action, data, **kargs):if action == 'mailpush':for (address, username) in data:app.send_task('worker.mail', args=[kargs['subject'], kargs['content'] % username, address], link_error=app.signature('worker.error_handler'))elif action == 'messagepush':passelse:pass# Celery Backend (Worker) from celery import Celery from celery import Task from redis import StrictRedisapp = Celery() app.config_from_object('celeryconfig') red = StrictRedis(host='localhost', port=6379, db=0)@app.task(bind=True) def mail(self, subject, content, address):from django.core.mail import EmailMessagemsg = EmailMessage(subject, content, 'admin@admin.com', address)msg.content_subtype = 'html'msg.send()red.sadd('status:1:task', self.request.id)# Overwrite the on_failure function in trace.py @app.task def error_handler(uuid, args):print uuidprint argsred.set(uuid, args)red.sadd('status:0:task', uuid)red.srem('status:1:task', uuid)轉載于:https://my.oschina.net/shinedev/blog/500554
總結
以上是生活随笔為你收集整理的用 Celery 实现邮件推送系统的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SSH 安全性和配置入门
- 下一篇: DBMS_STATS.GATHER_TA