码农技术炒股之路——实时交易信息、主力动向信息分库备份
? ? ? ? 一般來說,一個股票信息應該保存在一張表中。但是由于我機器資源限制,且我希望盡快頻率的抓取數(shù)據(jù)。所以每天我將所有股票的實時交易信息放在daily_temp庫中的一個以日期命名的表中。主力動向信息也是如此。但是盤后分析股票時,我們會以單只股票進行分析。這樣就需要跨越很多天,而這樣的設計將導致需要查詢?nèi)舾蓚€表,且隨著日期增加,讀取的表也將增加。我覺得這樣是不合適的。(轉(zhuǎn)載請指明出于breaksoftware的csdn博客)
? ? ? ? 目前我們系統(tǒng)繁忙的時間和交易時間同步。為了最大幅度的利用資源,我決定在盤后對每日的數(shù)據(jù)按照股票代碼進行拆分備份。這樣我們就可以查詢一張表得到該股票所有歷史數(shù)據(jù)。
拆分備份實時交易信息
? ? ? ? 首先我們要從股票基本信息表中讀取所有股票代碼
def _get_all_share_ids(self):date_info = time.strftime('%Y_%m_%d')trade_table_name = "trade_info_%s" % (date_info)share_ids = fetch_data.get_data(fetch_data.select_db(self._daily_temp_conn_name, trade_table_name, ["share_id"],{}, pre = "distinct"))return share_ids
? ? ? ? fetch_data是我用于封裝獲取數(shù)據(jù)的接口。其中包含通過網(wǎng)絡獲取數(shù)據(jù),通過數(shù)據(jù)庫獲取數(shù)據(jù)和通過正則拆分數(shù)據(jù)
class select_db:def __init__(self, conn_name, table_name, select_columns, conditions, pre="", extend=""):self._conn_name = conn_nameself._table_name = table_nameself._select_columns = select_columnsself._conditions = conditionsself._pre = preself._extend = extenddef get_data(self):db_manager = mysql_manager()conn = db_manager.get_mysql_conn(self._conn_name)result = conn.select(self._table_name, self._select_columns, self._conditions, self._pre, self._extend)return resultclass query_http:def __init__(self, url):self._url = urldef get_data(self):res = ""tried = Falsewhile True:try:socket.setdefaulttimeout(15)req = urllib2.Request(self._url)res_data = urllib2.urlopen(req)res = res_data.read()breakexcept Exception as e:LOG_ERROR("request error: %s %s" % (self._url ,e))if tried:breakelse:tried = Truereturn resclass regular_split:def __init__(self, regular_name, data):self._regular_name = regular_nameself._data = datadef get_data(self):regular_split_mgr = regular_split_manager()ret_array = regular_split_mgr.get_split_data(self._data, self._regular_name)return ret_arraydef get_data(query_item):if False == hasattr(query_item, "get_data"):return Noneresult = query_item.get_data()return result
? ? ? ? 下一步通過股票代碼查詢當天所有數(shù)據(jù)
def _bak_trade_info(self, share_id):date_info = time.strftime('%Y_%m_%d')table_name = "trade_info_%s" % (date_info)db_manager = mysql_manager()conn = db_manager.get_mysql_conn(self._daily_temp_conn_name)fields_array = ["today_open","yesterday_close","cur","today_high","today_low","compete_buy_price","compete_sale_price","trade_num","trade_price","buy_1_num","buy_1_price","buy_2_num","buy_2_price","buy_3_num","buy_3_price","buy_4_num","buy_4_price","buy_5_num","buy_5_price","sale_1_num","sale_1_price","sale_2_num","sale_2_price","sale_3_num","sale_3_price","sale_4_num","sale_4_price","sale_5_num","sale_5_price","time_date_str","time_str"]daily_data = conn.select(table_name, fields_array, {"share_id":[share_id, "="]})self._bak_single_market_maker_info(share_id, daily_data)
? ? ? ? 由于抓取時間和數(shù)據(jù)源時間存在差異,所以我們可能會抓取到交易時間之外的數(shù)據(jù)。于是我們要對這些數(shù)據(jù)進行歸一化處理。比如我們有11.29、11:31和11:32三個數(shù)據(jù),則對交易時間之外的數(shù)據(jù)11:31和11:32數(shù)據(jù)歸一為11:30的數(shù)據(jù)并保存。
def _bak_single_market_maker_info(self, share_id, daily_data):daily_data_list = []has_between_11_30_and_13_00 = Falseafter_15_00 = Falsekeys_list = []for item in daily_data:item_list = list(item)date_str = item[-2] + " " + item[-1]today_11_30 = date_str[:date_str.find(" ")] + " 11:30:00" today_13_00 = date_str[:date_str.find(" ")] + " 13:00:00"today_15_00 = date_str[:date_str.find(" ")] + " 15:00:00"today_11_30_int = time.mktime(time.strptime(today_11_30,'%Y-%m-%d %H:%M:%S'))today_13_00_int = time.mktime(time.strptime(today_13_00,'%Y-%m-%d %H:%M:%S'))today_15_00_int = time.mktime(time.strptime(today_15_00,'%Y-%m-%d %H:%M:%S'))date_int = time.mktime(time.strptime(date_str,'%Y-%m-%d %H:%M:%S'))if date_int >= today_11_30_int and date_int < today_13_00_int:if has_between_11_30_and_13_00:continueelse:has_between_11_30_and_13_00 = Trueif date_int >= today_15_00_int:if after_15_00:continueelse:after_15_00 = Trueif date_int in keys_list:continueelse:keys_list.append(date_int)item_list.insert(0, date_int)del item_list[-1]del item_list[-1]daily_data_list.append(item_list)keys_array = ["time","today_open","yesterday_close","cur","today_high","today_low","compete_buy_price","compete_sale_price","trade_num","trade_price","buy_1_num","buy_1_price","buy_2_num","buy_2_price","buy_3_num","buy_3_price","buy_4_num","buy_4_price","buy_5_num","buy_5_price","sale_1_num","sale_1_price","sale_2_num","sale_2_price","sale_3_num","sale_3_price","sale_4_num","sale_4_price","sale_5_num","sale_5_price"]share_trade_info_table_name = "trade_info_detail_" +share_idself._create_table_if_not_exist(share_id, share_trade_info_table_name)stock_conn_manager_obj = stock_conn_manager()conn = stock_conn_manager_obj.get_conn(share_id)conn.insert_data(share_trade_info_table_name, keys_array, daily_data_list)
? ? ? ? 此處我們并沒有使用直接檢查并創(chuàng)建表的方式,而是使用了_create_table_if_not_exist方法
def _create_table_if_not_exist(self, share_id, table_name):stock_conn_manager_obj = stock_conn_manager()conn_name = stock_conn_manager_obj.get_conn_name(share_id)prepare_table_obj = prepare_table(conn_name, "trade_info")prepare_table_obj.prepare(table_name)
? ? ? ? 為什么要這么用?因為我們要將三千多支股票信息保存分片到300個不同的數(shù)據(jù)庫中。那么當前這支股票在哪個庫中,則需要一個中間層去代理管理。
@singleton
class stock_conn_manager():def __init__(self):passdef get_conn(self, share_id):conn_name = self.get_conn_name(share_id)db_manager = mysql_manager()conn = db_manager.get_mysql_conn(conn_name)return conndef get_conn_name(self, share_id):share_id_int = int(share_id)share_id_part_no = share_id_int % 300conn_name = "stock_part_%d" % (share_id_part_no)return conn_name
? ? ? ??stock_conn_manager類將股票代碼和300取余數(shù),得出分片ID。然后連接該ID對應的庫。這層設計非常重要,因為不僅此處我們備份數(shù)據(jù)要用到,之后對全部股票進行分析時也要用到它。
拆分備份主力動向信息
? ? ? ? 主要邏輯同實時交易信息。故只貼出代碼
class bak_today_market_maker(job_base):def __init__(self):self._db_manager = mysql_manager()self._daily_temp_conn_name = "daily_temp"def run(self):share_ids = self._get_all_share_ids()for share_id in share_ids:self._bak_market_maker_info(share_id[0])LOG_INFO("run bak_today_market_maker")def _bak_market_maker_info(self, share_id):date_info = time.strftime('%Y_%m_%d')table_name = "market_maker_%s" % (date_info)fields_array =["time_str", "price", "up_percent", "market_maker_net_inflow", "market_maker_net_inflow_per","huge_inflow", "huge_inflow_per", "large_inflow", "large_inflow_per", "medium_inflow", "medium_inflow_per", "small_inflow", "small_inflow_per"]daily_data = fetch_data.get_data(fetch_data.select_db(self._daily_temp_conn_name, table_name, fields_array, {"share_id":[share_id, "="]}))self._bak_single_market_maker_info(share_id, daily_data)def _bak_single_market_maker_info(self, share_id, daily_data):daily_data_list = []has_between_11_30_and_13_00 = Falseafter_15_00 = Falsekeys_list = []for item in daily_data:item_list = list(item)date_str = item[0]today_11_30 = date_str[:date_str.find(" ")] + " 11:30:00" today_13_00 = date_str[:date_str.find(" ")] + " 13:00:00"today_15_00 = date_str[:date_str.find(" ")] + " 15:00:00"today_11_30_int = time.mktime(time.strptime(today_11_30,'%Y-%m-%d %H:%M:%S'))today_13_00_int = time.mktime(time.strptime(today_13_00,'%Y-%m-%d %H:%M:%S'))today_15_00_int = time.mktime(time.strptime(today_15_00,'%Y-%m-%d %H:%M:%S'))date_int = time.mktime(time.strptime(date_str,'%Y-%m-%d %H:%M:%S'))if date_int >= today_11_30_int and date_int < today_13_00_int:if has_between_11_30_and_13_00:continueelse:has_between_11_30_and_13_00 = Trueif date_int >= today_15_00_int:if after_15_00:continueelse:after_15_00 = Trueif date_int in keys_list:continueelse:keys_list.append(date_int)item_list[0] = date_intdaily_data_list.append(item_list)keys_array =["time", "price", "up_percent", "market_maker_net_inflow", "market_maker_net_inflow_per","huge_inflow", "huge_inflow_per", "large_inflow", "large_inflow_per", "medium_inflow", "medium_inflow_per", "small_inflow", "small_inflow_per"]share_market_maker_table_name = "market_maker_detail_" + share_idself._create_table_if_not_exist(share_id, share_market_maker_table_name)stock_conn_manager_obj = stock_conn_manager()conn = stock_conn_manager_obj.get_conn(share_id)conn.insert_data(share_market_maker_table_name, keys_array, daily_data_list)def _get_all_share_ids(self):date_info = time.strftime('%Y_%m_%d')trade_table_name = "trade_info_%s" % (date_info)share_ids = fetch_data.get_data(fetch_data.select_db(self._daily_temp_conn_name, trade_table_name, ["share_id"],{}, pre = "distinct"))return share_idsdef _create_table_if_not_exist(self, share_id, table_name):stock_conn_manager_obj = stock_conn_manager()conn_name = stock_conn_manager_obj.get_conn_name(share_id)prepare_table_obj = prepare_table(conn_name, "market_maker")prepare_table_obj.prepare(table_name)
? ? ? ? 實時交易和主力動向拆分備份的任務配置如下。因為這兩個數(shù)據(jù)庫比較大,我給每個任務留了一個小時的處理時間。
[bak_today_market_maker]
type=cron
class=bak_today_market_maker
day_of_week=1-5
hour=16
minute=50
timezone = Asia/Shanghai[bak_today_trade]
type=cron
class=bak_today_trade
day_of_week=1-5
hour=15
minute=50
timezone = Asia/Shanghai
總結(jié)
以上是生活随笔為你收集整理的码农技术炒股之路——实时交易信息、主力动向信息分库备份的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 码农技术炒股之路——抓取股票基本信息、实
- 下一篇: 码农技术炒股之路——抓取日线数据、计算均