Flask爱家租房--celery(发送验证短信)
生活随笔
收集整理的這篇文章主要介紹了
Flask爱家租房--celery(发送验证短信)
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
0.配置文件
# coding:utf-8BROKER_URL = "redis://127.0.0.1:6379/1" CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'1.啟動文件
# coding:utf-8from celery import Celery from ihome.tasks import config# 定義celery對象 celery_app = Celery("ihome")# 引入配置信息 celery_app.config_from_object(config)# 自動搜尋異步任務(wù) celery_app.autodiscover_tasks(["ihome.tasks.sms"])2.發(fā)送短信輔助類
# coding=utf-8from CCPRestSDK import REST# 主帳號 accountSid = '8aaf0708568d4143015697b0f4960888'# 主帳號Token accountToken = '42d3191f0e6745d6a9ddc6c795da0bed'# 應(yīng)用Id appId = '8aaf0708568d4143015697b0f56e088f'# 請求地址,格式如下,不需要寫http:// serverIP = 'app.cloopen.com'# 請求端口 serverPort = '8883'# REST版本號 softVersion = '2013-12-26'# 發(fā)送模板短信# @param to 手機(jī)號碼# @param datas 內(nèi)容數(shù)據(jù) 格式為列表 例如:['12','34'],如不需替換請?zhí)?''# @param $tempId 模板Idclass CCP(object):"""自己封裝的發(fā)送短信的輔助類"""# 用來保存對象的類屬性instance = Nonedef __new__(cls):# 判斷CCP類有沒有已經(jīng)創(chuàng)建好的對象,如果沒有,創(chuàng)建一個(gè)對象,并且保存# 如果有,則將保存的對象直接返回if cls.instance is None:obj = super(CCP, cls).__new__(cls)# 初始化REST SDKobj.rest = REST(serverIP, serverPort, softVersion)obj.rest.setAccount(accountSid, accountToken)obj.rest.setAppId(appId)cls.instance = objreturn cls.instancedef send_template_sms(self, to, datas, temp_id):""""""result = self.rest.sendTemplateSMS(to, datas, temp_id)# for k, v in result.iteritems():## if k == 'templateSMS':# for k, s in v.iteritems():# print '%s:%s' % (k, s)# else:# print '%s:%s' % (k, v)# smsMessageSid:ff75e0f84f05445ba08efdd0787ad7d0# dateCreated:20171125124726# statusCode:000000status_code = result.get("statusCode")if status_code == "000000":# 表示發(fā)送短信成功return 0else:# 發(fā)送失敗return -13.發(fā)送短信后端邏輯
# GET /api/v1.0/sms_codes/<mobile>?image_code=xxxx&image_code_id=xxxx @api.route("/sms_codes/<re(r'1[34578]\d{9}'):mobile>") def get_sms_code(mobile):"""獲取短信驗(yàn)證碼"""# 獲取參數(shù)image_code = request.args.get("image_code")image_code_id = request.args.get("image_code_id")# 校驗(yàn)參數(shù)if not all([image_code_id, image_code]):# 表示參數(shù)不完整return jsonify(errno=RET.PARAMERR, errmsg="參數(shù)不完整")# 業(yè)務(wù)邏輯處理# 從redis中取出真實(shí)的圖片驗(yàn)證碼try:real_image_code = redis_store.get("image_code_%s" % image_code_id)except Exception as e:current_app.logger.error(e)return jsonify(errno=RET.DBERR, errmsg="redis數(shù)據(jù)庫異常")# 判斷圖片驗(yàn)證碼是否過期if real_image_code is None:# 表示圖片驗(yàn)證碼沒有或者過期return jsonify(errno=RET.NODATA, errmsg="圖片驗(yàn)證碼失效")# 刪除redis中的圖片驗(yàn)證碼,防止用戶使用同一個(gè)圖片驗(yàn)證碼驗(yàn)證多次try:redis_store.delete("image_code_%s" % image_code_id)except Exception as e:current_app.logger.error(e)# 與用戶填寫的值進(jìn)行對比if real_image_code.lower() != image_code.lower():# 表示用戶填寫錯(cuò)誤return jsonify(errno=RET.DATAERR, errmsg="圖片驗(yàn)證碼錯(cuò)誤")# 判斷對于這個(gè)手機(jī)號的操作,在60秒內(nèi)有沒有之前的記錄,如果有,則認(rèn)為用戶操作頻繁,不接受處理try:send_flag = redis_store.get("send_sms_code_%s" % mobile)except Exception as e:current_app.logger.error(e)else:if send_flag is not None:# 表示在60秒內(nèi)之前有過發(fā)送的記錄return jsonify(errno=RET.REQERR, errmsg="請求過于頻繁,請60秒后重試")# 判斷手機(jī)號是否存在try:user = User.query.filter_by(mobile=mobile).first()except Exception as e:current_app.logger.error(e)else:if user is not None:# 表示手機(jī)號已存在return jsonify(errno=RET.DATAEXIST, errmsg="手機(jī)號已存在")# 如果手機(jī)號不存在,則生成短信驗(yàn)證碼sms_code = "%06d" % random.randint(0, 999999)# 保存真實(shí)的短信驗(yàn)證碼try:redis_store.setex("sms_code_%s" % mobile, constants.SMS_CODE_REDIS_EXPIRES, sms_code)# 保存發(fā)送給這個(gè)手機(jī)號的記錄,防止用戶在60s內(nèi)再次出發(fā)發(fā)送短信的操作redis_store.setex("send_sms_code_%s" % mobile, constants.SEND_SMS_CODE_INTERVAL, 1)except Exception as e:current_app.logger.error(e)return jsonify(errno=RET.DBERR, errmsg="保存短信驗(yàn)證碼異常")# 發(fā)送短信# 使用celery異步發(fā)送短信, delay函數(shù)調(diào)用后立即返回(非阻塞)#send_sms.delay(mobile, [sms_code, int(constants.SMS_CODE_REDIS_EXPIRES/60)], 1)# 返回異步任務(wù)的對象result_obj = send_sms.delay(mobile, [sms_code, int(constants.SMS_CODE_REDIS_EXPIRES/60)], 1)print(result_obj.id)# 通過異步任務(wù)對象的get方法獲取異步任務(wù)的結(jié)果, 默認(rèn)get方法是阻塞的ret = result_obj.get()print("ret=%s" % ret)# 返回值# 發(fā)送成功return jsonify(errno=RET.OK, errmsg="發(fā)送成功")4.client相關(guān)代碼
# coding:utf-8from celery import Celery from ihome.libs.yuntongxun.sms import CCP# 定義celery對象 celery_app = Celery("ihome", broker="redis://127.0.0.1:6379/1")@celery_app.task def send_sms(to, datas, temp_id):"""發(fā)送短信的異步任務(wù)"""ccp = CCP()ccp.send_template_sms(to, datas, temp_id)# celery開啟的命令 # celery -A ihome.tasks.task_sms worker -l info5.worker相關(guān)代碼
# coding:utf-8from ihome.tasks.main import celery_app from ihome.libs.yuntongxun.sms import CCP# # @celery_app.task # def send_sms(to, datas, temp_id): # """發(fā)送短信的異步任務(wù)""" # pass@celery_app.task def send_sms(to, datas, temp_id):"""發(fā)送短信的異步任務(wù)"""ccp = CCP()try:result = ccp.send_template_sms(to, datas, temp_id)except Exception as e:result = -2return result總結(jié)
以上是生活随笔為你收集整理的Flask爱家租房--celery(发送验证短信)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 递归和非递归分别实现求n的阶乘
- 下一篇: android fmod,Android