码农技术炒股之路——任务管理器
? ? ? ? 系統(tǒng)任務(wù)和普通任務(wù)都是通過(guò)任務(wù)管理器調(diào)度的。它們的區(qū)別是:系統(tǒng)任務(wù)在程序運(yùn)行后即不會(huì)被修改,而普通任務(wù)則會(huì)被修改。(轉(zhuǎn)載請(qǐng)指明出于breaksoftware的csdn博客)
? ? ? ? 為什么要有這樣的設(shè)計(jì)?因?yàn)槲蚁M且粋€(gè)可以不用停止服務(wù)就可以更新相關(guān)配置的系統(tǒng)。比如我們現(xiàn)在要加一個(gè)普通任務(wù),我們只要修改下普通任務(wù)配置文件即可。再比如我們需要修改數(shù)據(jù)庫(kù)中表結(jié)構(gòu),我們也不用停止服務(wù)修改代碼來(lái)保證數(shù)據(jù)格式的一致性。
? ? ? ? 我們程序需要知道配置文件是否被修改。如何去做?一種方法是借用一些系統(tǒng)方法監(jiān)聽(tīng)相應(yīng)配置文件的修改,一旦文件有變化,馬上通知我們的主程序去處理。另一種則是采用輪詢檢查機(jī)制,即定期去生成差異結(jié)果。為了不讓這個(gè)系統(tǒng)更加復(fù)雜,我選擇后者。而它就是我所謂的系統(tǒng)任務(wù)。
? ? ? ? 不管是系統(tǒng)任務(wù)還是普通任務(wù),實(shí)現(xiàn)的類都要繼承于job_base
from abc import ABCMeta,abstractmethod
class job_base:__metaclass__ = ABCMeta@abstractmethoddef run(self):pass
? ? ? ? 有這個(gè)限制主要是為了保證每個(gè)任務(wù)都是run方法。調(diào)度框架將執(zhí)行該方法以完成任務(wù)執(zhí)行。
? ? ? ? 在《碼農(nóng)技術(shù)炒股之路——架構(gòu)和設(shè)計(jì)》一文中,介紹了我們將基于APScheduler實(shí)現(xiàn)任務(wù)調(diào)度功能。首先我們需要啟動(dòng)BackgroundScheduler對(duì)象
from apscheduler.schedulers.background import BackgroundScheduler@singleton
class job_center():def __init__(self):self._sched = Noneself._job_conf_path = ""self._job_id_handle = {}self._static_job_id_handle = {}def start(self):self._sched = BackgroundScheduler()self._sched.start()
? ? ? ? 當(dāng)我們需要加入任務(wù)時(shí),則調(diào)用下面這個(gè)方法
def add_jobs(self, jobs_info, is_static = False):if None == self._sched:LOG_WARNING("job center must call start() first")returnfor (job_name,job_info) in jobs_info.items():if is_static and job_name in self._static_job_id_handle.keys():continuejob_type = job_info["type"]class_name = job_info["class"]job_handle = self._get_obj(class_name)if is_static:self._static_job_id_handle[job_name] = job_handleelse:self._job_id_handle[job_name] = job_handlecmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"params = self._join_params(job_info)if 0 != len(params):cmd += " , "cmd += paramscmd += ")"#print cmdeval(cmd)
? ? ? ? jobs_info保存的是任務(wù)配置文件中任務(wù)信息,我們看個(gè)樣例
[update_share_base_info]
type=cron
class=update_stock_base_info
day_of_week=1-5
hour=9
minute=30
second=10
timezone = Asia/Shanghai
? ? ? ? 這個(gè)配置中除了type和class,其他都是APScheduler框架中add_job方法中的參數(shù)。上面配置意思是:以上海時(shí)間,從周一到周五,早上9點(diǎn)30分10秒執(zhí)行一次。
? ? ? ??add_jobs中通過(guò)class字段,獲取該class對(duì)應(yīng)的一個(gè)對(duì)象
def _get_obj(self, _cls_name): _packet_name = _cls_name _module_home = __import__(_packet_name,globals(),locals(),[_cls_name])obj = getattr(_module_home,_cls_name) class_obj = obj()return class_obj
? ? ? ? 這兒又要提到我之前特別強(qiáng)調(diào)過(guò)的單例使用方式。經(jīng)過(guò)測(cè)試,《碼農(nóng)技術(shù)炒股之路——配置管理器、日志管理器》中單例的實(shí)現(xiàn)可以保證上面這個(gè)方法獲取的是同一個(gè)對(duì)象,而網(wǎng)上其他單例模式則不行。
? ? ? ? 獲取對(duì)象后,我們要組裝出要執(zhí)行的命令。cmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"中job_handle就是上面獲取的對(duì)象,而run則是每個(gè)job都要有的方法。這也是為什么要求每個(gè)任務(wù)類都要繼承于job_base的原因。
? ? ? ? 之后調(diào)用_join_params將配置文件中其他信息組裝成參數(shù)拼接出完整命令
def _join_params(self, job_info):params = ""param = ""job_type = job_info["type"]for key in job_info.keys():if key in conf_keys.job_conf_info_dict[job_type]:if 0 != len(params):params += ' , 'value = job_info[key]if value.isdigit():param = key + " = " + valueelse:param = key + " = '" + value + "'"if 0 != len(param):params += paramreturn params
? ? ? ? 如此我們配置中的任務(wù)就會(huì)被加入到APScheduler調(diào)度隊(duì)列中。
? ? ? ? 我們?cè)倏聪氯绾蝿h除一個(gè)任務(wù)
def remove_jobs(self, jobs_info):if None == self._sched:LOG_WARNING("job center must call start() first")returnfor job_name in jobs_info.keys():self._sched.remove_job(job_name)self._job_id_handle.pop(job_name)
? ? ? ? 第7行通過(guò)任務(wù)名稱在APScheduler中把任務(wù)刪除。第8行將任務(wù)對(duì)應(yīng)的對(duì)象從列表中刪除。為什么要使用_job_id_handle去保存這些任務(wù)對(duì)象呢?因?yàn)槿绻辉谝粋€(gè)更大的生命周期內(nèi)保存它,它就會(huì)被認(rèn)為是一個(gè)局部變量,從而被釋放,導(dǎo)致之后APScheduler再也調(diào)用不了它。
? ? ? ? 我們看個(gè)管理普通任務(wù)的系統(tǒng)任務(wù)代碼
@singleton
class j_load_job_conf(job_base):def __init__(self):self._pre_jobs_info = {}self._frame_conf_inst = scheduler_frame_conf_inst()self._job_center = job_center()def run(self):section_name = "strategy_job"option_name = "conf_path"if False == self._frame_conf_inst.has_option(section_name, option_name):LOG_WARNING("no %s %s" % (section_name, option_name))returnconf_path = self._frame_conf_inst.get(section_name, option_name)LOG_DEBUG("Load %s %s %s" % (section_name, option_name, conf_path))job_conf_parser_obj = job_conf_parser()jobs_info = job_conf_parser_obj.parse(conf_path)self._execute_jobs(jobs_info)def _execute_jobs(self, jobs_info):add_dict = {}remove_dict = {}modify_dict = {}frame_tools.dict_diff(jobs_info, self._pre_jobs_info, add_dict, remove_dict, modify_dict)add_jobs_info = dict(add_dict, **modify_dict)remove_jobs_info = {}for item in modify_dict.keys():remove_jobs_info[item] = self._pre_jobs_info[item]LOG_INFO("add jobs %s" % (json.dumps(add_jobs_info)))LOG_INFO("remove jobs %s" % (json.dumps(remove_jobs_info)))if 0 == len(add_jobs_info) and 0 == len(remove_jobs_info):returnself._pre_jobs_info = jobs_infoself._job_center.remove_jobs(remove_jobs_info)self._job_center.add_jobs(add_jobs_info)
? ? ? ? run方法將會(huì)定期執(zhí)行。它會(huì)從固定目錄讀取普通任務(wù)配置文件信息。然后在_execute_jobs方法中,通過(guò)和上一次讀取的任務(wù)信息對(duì)比,生成三個(gè)字典:需要?jiǎng)h除的任務(wù)、需要新增的任務(wù)和需要修改的任務(wù)。需要修改的任務(wù)將變成先刪除后新增的方式實(shí)現(xiàn)修改。所以最后操作的將是兩個(gè)字段信息。
? ? ? ? 普通任務(wù)本文就不介紹了,之后介紹的每個(gè)抓取和離線計(jì)算業(yè)務(wù)都是普通任務(wù)。
總結(jié)
以上是生活随笔為你收集整理的码农技术炒股之路——任务管理器的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 码农技术炒股之路——数据库管理器、正则表
- 下一篇: 码农技术炒股之路——抓取股票基本信息、实