码农技术炒股之路——数据库管理器、正则表达式管理器
? ? ? ? 我選用的數(shù)據(jù)庫是Mysql。選用它是因為其可以滿足我的需求,而且資料多。因為作為第三方工具,難免有一些配置問題。所以本文也會講解一些和Mysql配置及開發(fā)相關(guān)的問題。(轉(zhuǎn)載請指明出于breaksoftware的csdn博客)
數(shù)據(jù)庫管理器
? ? ? ? Mysql的安裝我就不說了。我先說說和我習慣相關(guān)的一個問題:我希望在我Windows系統(tǒng)上可以通過Navicat for Mysql連接到我Ubuntu上的Mysql服務(wù)器。這塊問題的解決可以參見《允許ubuntu下mysql遠程連接》。
? ? ? ? 然后需要準備Python下進行Mysql開發(fā)的一些環(huán)境
apt-get install python-dev
apt-get install libmysqld-dev
apt-get install libmysqlclient-dev
updatedb
locate mysql_config
pip install MySQL-python -i http://pypi.douban.com/simple
??? ? ? 由于我們要進行分表,所以數(shù)據(jù)庫連接數(shù)要進行增大。于是需要修改mysql的配置
max_connections=1000
? ? ? ? 基礎(chǔ)環(huán)境配置好后,我們就可以開始進行數(shù)據(jù)庫管理器的設(shè)計和實現(xiàn)了。
數(shù)據(jù)庫連接類
? ? ? ? 數(shù)據(jù)庫連接我們使用PooledDB連接池,使用這個庫的最大好處是我們可以不用考慮很多底層的重連和多線程問題。
from DBUtils.PooledDB import PooledDB
class mysql_conn():def __init__(self, host_name, port_num, user_name, password, db_name, charset_name = "utf8"):self._host = host_nameself._port = int(port_num)self._user = user_nameself._passwd = passwordself._db = db_nameself._charset = charset_nameself._pool = Noneself._table_info = {}self.re_connect()
? ? ? ??re_connect方法要考慮數(shù)據(jù)庫不存在的情況。
def re_connect(self):self._try_close_connect()try:self._pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)LOG_INFO("connect %s success" %(self._db))self.refresh_tables_info()returnexcept MySQLdb.Error, e :if e.args[0] == 1049:self._create_db()else:LOG_WARNING("%s connect error %s" % (self._db, str(e)))returnexcept Exception as e:LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))return
? ? ? ? 如果數(shù)據(jù)庫不存在,MySQLdb.Error對象的值是1049,這種場景我們就需要創(chuàng)建數(shù)據(jù)庫。如果發(fā)生其他錯誤,就直接報錯
def _create_db(self):conn = Nonecursor = Nonetry:conn = MySQLdb.connect(host=self._host, port=self._port, user=self._user, passwd=self._passwd)cursor = conn.cursor()sql = """create database if not exists %s""" %(self._db)#LOG_INFO(sql)cursor.execute(sql)conn.select_db(self._db);conn.commit()except MySQLdb.Error, e :LOG_WARNING("%s execute error %s" % (sql, str(e)))finally:try:if cursor:cursor.close()if conn:conn.close()except:pass
? ? ? ? 創(chuàng)建完數(shù)據(jù)后,要關(guān)閉連接。然后再走一遍數(shù)據(jù)庫連接過程,但是這次就用不判斷數(shù)據(jù)庫是否存在了
try:self._pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)LOG_INFO("connect %s success" %(self._db))self.refresh_tables_info()returnexcept Exception as e:LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))returnif None == self._pool:LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))return
? ? ? ? 連接完數(shù)據(jù)庫后,我們需要通過refresh_tables_info獲取該庫中表的信息。為什么我們需要獲取這個信息呢?因為我希望在調(diào)用數(shù)據(jù)庫操作時,mysql_conn類已經(jīng)知曉一些字段的類型和長度,這樣就可以將用戶傳入的數(shù)據(jù)進行相應(yīng)的格式化,而從讓調(diào)用者不用太多關(guān)心表字段類型。
def refresh_tables_info(self):self._table_info = self._get_tables_info()def _get_tables_info(self):tables_info = {}tables_sql = "show tables"tables_name = self.execute(tables_sql, select = True)for table_name_item in tables_name:table_name = table_name_item[0]if 0 == len(table_name):continuecolumns_sql = "show columns from " + table_name table_info = self.execute(columns_sql, select = True)table_name = table_name_item[0]columns_info = self._get_table_info(table_info)if len(columns_info):tables_info[table_name] = columns_inforeturn tables_infodef _get_table_info(self, table_info):columns_info = {}for item in table_info:column_name = item[0]column_type_info = item[1](type, len) = self._get_column_type_info(column_type_info)columns_info[column_name] = {"type":type, "length":len}return columns_infodef _get_column_type_info(self, type_info):re_str = '(\w*)\((\d*),?.*\)'kw = re.findall(re_str, type_info)if len(kw):if len(kw[0]) > 1:return (kw[0][0], kw[0][1])return (None, None)
? ? ? ? 連接完數(shù)據(jù)庫后,我們需要對表進行一系列操作,比如表查詢
def select(self, table_name, fields_array, conditions, pre = "", extend = ""):fields_str = "," . join(fields_array)conds = []for (column_name, column_data_info) in conditions.items():column_type = self._get_column_type(table_name, column_name)column_data = column_data_info[0]operation = column_data_info[1]if isinstance(column_data, list):new_datas = []for item in column_data:new_data = self._conv_data(item, column_type)try:new_datas.append(new_data)except:LOG_WARNING("%s %s conv error" %(item, column_type))temp_str = "," . join(new_datas)cond = column_name + " " + operation + " (" + temp_str + ")"conds.append(cond)else:new_data = self._conv_data(column_data, column_type)try:cond = column_name + " " + operation + " " + new_dataconds.append(cond)except:LOG_WARNING("%s %s conv error" %(column_data, column_type))conds_str = " and " . join(conds)sql = "select " + pre + " " + fields_str + " from " + table_nameif len(conds_str) > 0:sql = sql + " where " + conds_strif len(extend) > 0:sql = sql + " " + extenddata_info = self.execute(sql, select = True)return data_info
? ? ? ? select方法中table_name是表名;fields_array是需要查詢的字段數(shù)組;conditions是查詢條件的Key/Value對,其中Key是字段名稱,Value是個數(shù)組,數(shù)組的第一個元素是表達式右值,第二個元素是表達式的操作符。比如條件a>1 and b < 2,則conditions是{"a":["1",">"],"b":["2","<"] }。這兒需要考慮表達式右值是一個數(shù)組的場景,比如 a in (1,2,3)這樣的條件,所以組裝條件時做了特殊處理。
? ? ? ? 在處理表中數(shù)據(jù)的時候,比如查詢語句的條件中有表中字段信息,再比如更新、插入數(shù)據(jù)語句中也有相關(guān)信息,這個時候都需要調(diào)用_get_column_type方法獲取字段類型,然后通過_conv_data方法將數(shù)據(jù)進行格式化——當然目前這個函數(shù)還不能涵蓋所有類型。
def _get_column_type(self, table_name, column_name):if 0 == len(self._table_info):self.refresh_tables_info()if table_name not in self._table_info.keys():LOG_WARNING("table_%s info in not exist" %(table_name))return "None"if column_name not in self._table_info[table_name].keys():LOG_WARNING("column name %s is not in table %s" % (column_name, table_name))return "None"return self._table_info[table_name][column_name]["type"]def _conv_data(self, data, type):if type == "varchar" or type == "char":return '"%s"' % (data)elif type == "float" or type == "double":try:conv_data = float(data)return "%.8f" % (conv_data)except Exception as e:LOG_WARNING("conv %s to %s error" % (data, type))return "0"elif type == "tinyint" or type == "bigint" or type == "int":return "%d" % (int(data))
? ? ? ? 數(shù)據(jù)的更新操作和插入操作我就不把代碼貼出來了。大家可以到之后公布的源碼地址里看。
? ? ? ? 最后說明下操作執(zhí)行的方法
def execute(self, sql, select = False, commit=False):try:data = ()conn = self._pool.connection()cursor = conn.cursor()data = cursor.execute(sql)if select:data = cursor.fetchall()if commit:conn.commit()cursor.close()except Exception as e:LOG_WARNING("excute sql error %s" % (str(e)))LOG_ERROR_SQL("%s" % (sql))finally:cursor.close()conn.close()return data
? ? ? ? 一些操作我們需要數(shù)據(jù)庫服務(wù)馬上去執(zhí)行,如創(chuàng)建數(shù)據(jù)庫和創(chuàng)建表操作,因為我們在創(chuàng)建后立即會去使用或者查詢相關(guān)信息。如果不及時執(zhí)行,將影響之后的結(jié)果。這個場景下我們需要把commit參數(shù)設(shè)置為True。當然這種方式不要濫用,否則會影響數(shù)據(jù)庫執(zhí)行效率。
? ? ? ? 還有一些操作我們需要關(guān)心返回結(jié)果,比如select指令。這個時候就需要通過fetchall獲取全部數(shù)據(jù)并返回。而創(chuàng)建表等操作則不需要fetchall結(jié)果。
連接管理類 ??
? ? ? ? 因為我們數(shù)據(jù)庫是分庫的,而上述每個連接只管理一個數(shù)據(jù)庫的操作,所以我們需要一個連接管理器去管理這些連接。
? ? ? ? 連接管理類是個單例,它通過modify_conns方法連接每個數(shù)據(jù)庫
@singleton
class mysql_manager():def __init__(self):self._conns = {}def modify_conns(self, conns_info):for (conn_name, conn_info) in conns_info.items():conn_info_hash = frame_tools.hash(json.dumps(conn_info))if conn_name in self._conns.keys():if conn_info_hash in self._conns[conn_name].conns_dict.keys():continueelse:self._conns[conn_name] = mysql_conn_info()for key in conf_keys.mysql_conn_keys:if key not in conn_info.keys():continueconn_obj = mysql_conn(conn_info["host"], conn_info["port"], conn_info["user"], conn_info["passwd"], conn_info["db"], conn_info["charset"])self._conns[conn_name].conns_dict[conn_info_hash] = conn_objself._conns[conn_name].valid = 1self._print_conns()
? ? ? ? 如果數(shù)據(jù)庫連接信息發(fā)生改變,則需要將發(fā)生改變的數(shù)據(jù)庫連接置為無效,然后再重新連接并記錄
def add_conns(self, conns_info):self.modify_conns(conns_info)def remove_conns(self, conns_info):for (conn_name, conn_info) in conns_info.items():conn_info_hash = frame_tools.hash(json.dumps(conn_info))if conn_name in self._conns.keys():if conn_info_hash in self._conns[conn_name].conns_dict.keys():self._conns[conn_name].valid = 0self._print_conns()
? ? ? ? 連接管理類通過get_mysql_conn方法暴露連接對象
def get_mysql_conn(self, conn_name):if conn_name not in self._conns.keys():return Noneconn_info = self._conns[conn_name]valid = self._conns[conn_name].validif 0 == valid:return Noneconns_dict_keys = self._conns[conn_name].conns_dict.keys()if len(conns_dict_keys) == 0:return Nonekey = conns_dict_keys[-1]ret_conn = self._conns[conn_name].conns_dict[key]return ret_conn
? ? ? ? 它還暴露了一個刷新所有數(shù)據(jù)庫中表信息的方法,用于在系統(tǒng)任務(wù)中定期更新內(nèi)存中信息,保證數(shù)據(jù)穩(wěn)定寫入。
def refresh_all_conns_tables_info(self):for (conn_name, conn_info) in self._conns.items():conn = self.get_mysql_conn(conn_name)if None != conn:conn.refresh_tables_info()
連接管理配置
? ? ? ? 我共設(shè)計了三種數(shù)據(jù)庫。一種是保存股票基礎(chǔ)數(shù)據(jù)的數(shù)據(jù)庫,其配置是
[stock_db]
host=127.0.0.1
port=3306
user=root
passwd=fangliang
db=stock
charset=utf8
? ? ? ? 一個是保存每日實時數(shù)據(jù)的數(shù)據(jù)庫
[daily_temp]
host=127.0.0.1
port=3306
user=root
passwd=fangliang
db=daily_temp
charset=utf8
? ? ? ? 最后一種是按股票代碼分類的庫,這種庫有300個,設(shè)計原因我在《碼農(nóng)技術(shù)炒股之路——架構(gòu)和設(shè)計》有說明
[stock_part]
host=127.0.0.1
port=3306
user=root
passwd=fangliang
db=stock_part
charset=utf8
range_max=300
? ? ? ? 注意range_max這個參數(shù),如果配置中有該參數(shù),則代表其是一個數(shù)據(jù)庫組
class mysql_conf_parser:def parse(self, job_conf_path):cp = ConfigParser.SafeConfigParser()cp.read(job_conf_path)sections = cp.sections()conns_info = {}for section in sections:conn_info = {}for key in conf_keys.mysql_conn_keys:if False == cp.has_option(section, key):LOG_WARNING()continueconn_info[key] = cp.get(section, key)if cp.has_option(section, "range_max"):range_max = int(cp.get(section, "range_max"))db_name_base = conn_info["db"] for index in range(0, range_max):conn_info["db"] = db_name_base + "_" + str(index)section_index_name = section + "_" + str(index)conns_info[section_index_name] = copy.deepcopy(conn_info)else:conns_info[section] = conn_inforeturn conns_info
? ? ? ? 最終我們將建成下圖所示數(shù)據(jù)庫
?
?
正則表達式管理器
? ? ? ? 當我們從數(shù)據(jù)源獲取數(shù)據(jù)后,需要使用一系列正則將原始數(shù)據(jù)轉(zhuǎn)換成一組數(shù)據(jù)。然后才可以將這些數(shù)據(jù)寫入數(shù)據(jù)庫。舉個例子,我們先看下正則管理器的配置
[string_comma_regular]
regular_expression_0 = data:\[(.*)\]
regular_expression_1 = "[^"]+"
regular_expression_2 = [^,"]+[hq_sinajs_cn_list]
regular_expression_0 = var hq_str_([^;]*);
regular_expression_1 = ([^,="shz]+)[quotes_money_163]
regular_expression_0 = ([^\r\n]+)
regular_expression_1 = ([^,'\r\n]+)
? ? ? ? 每一節(jié)都是一個正則名稱,其下都是以“regular_expression_”開始的關(guān)鍵字。正則執(zhí)行的順序從序號小的向序號大的方向執(zhí)行。我們通過下面的配置解釋器讀取配置
import ConfigParserclass regulars_manager_conf_parser:def parse(self, regulars_conf_path):cp = ConfigParser.SafeConfigParser()cp.read(regulars_conf_path)sections = cp.sections()regulars_info = {}for section in sections:regular_info = []regular_name_pre = "regular_expression_"for index in range(0, len(cp.options(section))):regular_name = regular_name_pre + str(index)if cp.has_option(section, regular_name):regular_info.append(cp.get(section, regular_name))else:breakregulars_info[section] = regular_inforeturn regulars_info
? ? ? ? 正則表達式管理通過下面方法維護信息
@singleton
class regular_split_manager():def __init__(self):self._regulars = {}def modify_regulars(self, regulars_info):for (regular_name, regular_info) in regulars_info.items():self._regulars[regular_name] = regulars_infodef add_regulars(self, regulars_info):for (regular_name, regular_info) in regulars_info.items():self._regulars[regular_name] = regular_infodef remove_regulars(self, regulars_info):for (regular_name, regular_info) in regulars_info.items():if regular_name in self._regulars.keys():del self._regulars[regular_name]
? ? ? ? 通過get_split_data方法可以將數(shù)據(jù)通過指定的正則名稱進行分解,且分解到最后一步
def get_split_data(self, data, regular_name):data_array = []self._recursion_regular(data, regular_name, 0, data_array) return data_arraydef _get_regular(self, regular_name, deep):if regular_name not in self._regulars.keys():LOG_WARNING("regular manager has no %s" % (regular_name))return ""if deep >= len(self._regulars[regular_name]):return ""return self._regulars[regular_name][deep]def _recursion_regular(self, data, regular_name, deep, data_array):regular_str = self._get_regular(regular_name, deep)split_data = re.findall(regular_str, data)regualer_next_str = self._get_regular(regular_name, deep + 1)split_array = []if len(regualer_next_str) > 0:for item in split_data:self._recursion_regular(item, regular_name, deep + 1, data_array)else:for item in split_data:split_array.append(item)if len(split_array) > 0:data_array.append(split_array)
? ? ? ? 有了上述各種管理器,我們已經(jīng)把主要的準備工作做好。下一篇我將介紹最核心的任務(wù)調(diào)取管理器,它才是上述管理器最終的使用方。
總結(jié)
以上是生活随笔為你收集整理的码农技术炒股之路——数据库管理器、正则表达式管理器的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 码农技术炒股之路——配置管理器、日志管理
- 下一篇: 码农技术炒股之路——任务管理器