pyhive数据库连接池使用
生活随笔
收集整理的這篇文章主要介紹了
pyhive数据库连接池使用
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
python連接hive的工具可以用 pyhive 和 impala,不管是哪個配置都比較麻煩。需要的依賴包比較多。
- https://github.com/cloudera/impyla
- https://github.com/dropbox/PyHive
pyhive模塊沒有提供數據庫連接池的API。所以自己根據模塊 mysql-connector-python 的連接池改裝了一個 pyhive 的連接池,效率會提升不少。
連接池介紹
# hive_pool.py import re from pyhive import hivetry:import queue except ImportError:import Queue as queue import threadingCONNECTION_POOL_LOCK = threading.RLock() CNX_POOL_MAXSIZE = 32 CNX_POOL_MAXNAMESIZE = 64 CNX_POOL_NAMEREGEX = re.compile(r'[^a-zA-Z0-9._:\-*$#]')class PoolError(BaseException):passdef generate_pool_name(**kwargs):parts = []for key in ('host', 'port', 'database'):try:parts.append(str(kwargs[key]))except KeyError:passif not parts:raise PoolError("Failed generating pool name; specify pool_name")return '_'.join(parts)class PooledHiveConnection(object):def __init__(self, pool, cnx):if not isinstance(pool, HiveConnectionPool):raise AttributeError("pool should be a HiveConnectionPool")if not isinstance(cnx, hive.Connection):raise AttributeError("cnx should be a hive.Connection")self._cnx_pool = poolself._cnx = cnxdef __getattr__(self, attr):return getattr(self._cnx, attr)def close(self):cnx = self._cnxself._cnx_pool.add_connection(cnx)self._cnx = None@propertydef pool_name(self):return self._cnx_pool.pool_nameclass HiveConnectionPool(object):def __init__(self, pool_size=5, pool_name=None,**kwargs):self._pool_size = Noneself._pool_name = Noneself._set_pool_size(pool_size)self._set_pool_name(pool_name or generate_pool_name(**kwargs))self._cnx_config = {}self._cnx_queue = queue.Queue(self._pool_size)if kwargs:self.set_config(**kwargs)cnt = 0while cnt < self._pool_size:self.add_connection()cnt += 1@propertydef pool_name(self):return self._pool_name@propertydef pool_size(self):return self._pool_sizedef set_config(self, **kwargs):if not kwargs:returnwith CONNECTION_POOL_LOCK:try:hive.Connection(**kwargs)self._cnx_config = kwargsexcept AttributeError as err:raise PoolError("Connection configuration not valid: {0}".format(err))def _set_pool_size(self, pool_size):if pool_size <= 0 or pool_size > CNX_POOL_MAXSIZE:raise AttributeError("Pool size should be higher than 0 and ""lower or equal to {0}".format(CNX_POOL_MAXSIZE))self._pool_size = pool_sizedef _set_pool_name(self, pool_name):if CNX_POOL_NAMEREGEX.search(pool_name):raise AttributeError("Pool name '{0}' contains illegal characters".format(pool_name))if len(pool_name) > CNX_POOL_MAXNAMESIZE:raise AttributeError("Pool name '{0}' is too long".format(pool_name))self._pool_name = pool_namedef _queue_connection(self, cnx):if not isinstance(cnx, hive.Connection):raise PoolError("Connection instance not subclass of MySQLConnection.")try:self._cnx_queue.put(cnx, block=False)except queue.Full:raise PoolError("Failed adding connection; queue is full")def add_connection(self, cnx=None):with CONNECTION_POOL_LOCK:if not self._cnx_config:raise PoolError("Connection configuration not available")if self._cnx_queue.full():raise PoolError("Failed adding connection; queue is full")if not cnx:cnx = hive.Connection(**self._cnx_config)else:if not isinstance(cnx, hive.Connection):raise PoolError("Connection instance not subclass of MySQLConnection.")self._queue_connection(cnx)def get_connection(self):with CONNECTION_POOL_LOCK:try:cnx = self._cnx_queue.get(block=False)except queue.Empty:raise PoolError("Failed getting connection; pool exhausted")return PooledHiveConnection(self, cnx)def _remove_connections(self):with CONNECTION_POOL_LOCK:cnt = 0cnxq = self._cnx_queuewhile cnxq.qsize():try:cnx = cnxq.get(block=False)cnx.close()cnt += 1except queue.Empty:return cntexcept PoolError:raisereturn cntclass ReallyHiveConnectionPool(HiveConnectionPool):def __init__(self, **hive_config):pool_size = hive_config.get('pool_size', 10)self._semaphore = threading.Semaphore(pool_size)super().__init__(**hive_config)def get_connection(self):self._semaphore.acquire()return super().get_connection()def put_connection(self, con):con.close()self._semaphore.release()連接池代碼樣例
連接池如何使用,樣例如下。
from contextlib import contextmanagerfrom hive_pool import ReallyHiveConnectionPoolhive_config = {'host': '***.***.***.***','port': '10000','database': 'default' }conxpool = ReallyHiveConnectionPool(pool_size=10, pool_name='myhive', **hive_config)@contextmanager def get_cursor():try:# con = hive.Connection(**hive_config)con = conxpool.get_connection()cursor = con.cursor()yield cursorfinally:cursor.close()# con.close()conxpool.put_connection(con)class MYPyHive(object):"""創建python操作hive類"""@staticmethoddef get_all(sql):with get_cursor() as cursor:cursor.execute(sql)return cursor.fetchall()if __name__ == '__main__':def t(n):ph = MYPyHivehive_query = "show tables"r = ph.get_all(hive_query)print(str(n) + str(r))import timefrom concurrent.futures import ThreadPoolExecutors = time.time()# for i in range(20):# t(i)with ThreadPoolExecutor(max_workers=15) as pool:for i in range(20):pool.submit(t, (i))print(time.time() - s)?
總結
以上是生活随笔為你收集整理的pyhive数据库连接池使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 机器学习数据导入方法总结
- 下一篇: 飞行管理计算机在ATA章节,民航ATA章