python下的mysql模块包装
生活随笔
收集整理的這篇文章主要介紹了
python下的mysql模块包装
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
#!/usr/bin/env python
# -*- coding: utf-8 -*-"""
設計db模塊的原因:1. 更簡單的操作數據庫一次數據訪問: 數據庫連接 => 游標對象 => 執行SQL => 處理異常 => 清理資源。db模塊對這些過程進行封裝,使得用戶僅需關注SQL執行。2. 數據安全用戶請求以多線程處理時,為了避免多線程下的數據共享引起的數據混亂,需要將數據連接以ThreadLocal對象傳入。
設計db接口:1.設計原則:根據上層調用者設計簡單易用的API接口2. 調用接口1. 初始化數據庫連接信息create_engine封裝了如下功能:1. 為數據庫連接 準備需要的配置信息2. 創建數據庫連接(由生成的全局對象engine的 connect方法提供)from transwarp import dbdb.create_engine(user='root',password='password',database='test',host='127.0.0.1',port=3306)2. 執行SQL DMLselect 函數封裝了如下功能:1.支持一個數據庫連接里執行多個SQL語句2.支持鏈接的自動獲取和釋放使用樣例:users = db.select('select * from user')# users =># [# { "id": 1, "name": "Michael"},# { "id": 2, "name": "Bob"},# { "id": 3, "name": "Adam"}# ]3. 支持事物transaction 函數封裝了如下功能:1. 事務也可以嵌套,內層事務會自動合并到外層事務中,這種事務模型足夠滿足99%的需求
"""import time
import uuid
import functools
import threading
import logging# global engine object:
engine = Nonedef next_id(t=None):"""生成一個唯一id 由 當前時間 + 隨機數(由偽隨機數得來)拼接得到"""if t is None:t = time.time()return '%015d%s000' % (int(t * 1000), uuid.uuid4().hex)def _profiling(start, sql=''):"""用于剖析sql的執行時間"""t = time.time() - startif t > 0.1:logging.warning('[PROFILING] [DB] %s: %s' % (t, sql))else:logging.info('[PROFILING] [DB] %s: %s' % (t, sql))def create_engine(user, password, database, host='127.0.0.1', port=3306, **kw):"""db模型的核心函數,用于連接數據庫, 生成全局對象engine,engine對象持有數據庫連接"""import mysql.connectorglobal engineif engine is not None:raise DBError('Engine is already initialized.')params = dict(user=user, password=password, database=database, host=host, port=port)defaults = dict(use_unicode=True, charset='utf8', collation='utf8_general_ci', autocommit=False)for k, v in defaults.iteritems():params[k] = kw.pop(k, v)params.update(kw)params['buffered'] = Trueengine = _Engine(lambda: mysql.connector.connect(**params))# test connection...logging.info('Init mysql engine <%s> ok.' % hex(id(engine)))def connection():"""db模塊核心函數,用于獲取一個數據庫連接通過_ConnectionCtx對 _db_ctx封裝,使得惰性連接可以自動獲取和釋放,也就是可以使用 with語法來處理數據庫連接_ConnectionCtx 實現with語法^|_db_ctx _DbCtx實例^|_DbCtx 獲取和釋放惰性連接^|_LasyConnection 實現惰性連接"""return _ConnectionCtx()def with_connection(func):"""設計一個裝飾器 替換with語法,讓代碼更優雅比如:@with_connectiondef foo(*args, **kw):f1()f2()f3()"""@functools.wraps(func)def _wrapper(*args, **kw):with _ConnectionCtx():return func(*args, **kw)return _wrapperdef transaction():"""db模塊核心函數 用于實現事物功能支持事物:with db.transaction():db.select('...')db.update('...')db.update('...')支持事物嵌套:with db.transaction():transaction1transaction2..."""return _TransactionCtx()def with_transaction(func):"""設計一個裝飾器 替換with語法,讓代碼更優雅比如:@with_transactiondef do_in_transaction():>>> @with_transaction... def update_profile(id, name, rollback):... u = dict(id=id, name=name, email='%s@test.org' % name, passwd=name, last_modified=time.time())... insert('user', **u)... update('update user set passwd=? where id=?', name.upper(), id)... if rollback:... raise StandardError('will cause rollback...')>>> update_profile(8080, 'Julia', False)>>> select_one('select * from user where id=?', 8080).passwdu'JULIA'>>> update_profile(9090, 'Robert', True)Traceback (most recent call last):...StandardError: will cause rollback..."""@functools.wraps(func)def _wrapper(*args, **kw):start = time.time()with _TransactionCtx():func(*args, **kw)_profiling(start)return _wrapper@with_connection
def _select(sql, first, *args):"""執行SQL,返回一個結果 或者多個結果組成的列表"""global _db_ctxcursor = Nonesql = sql.replace('?', '%s')logging.info('SQL: %s, ARGS: %s' % (sql, args))try:cursor = _db_ctx.connection.cursor()cursor.execute(sql, args)if cursor.description:names = [x[0] for x in cursor.description]if first:values = cursor.fetchone()if not values:return Nonereturn Dict(names, values)return [Dict(names, x) for x in cursor.fetchall()]finally:if cursor:cursor.close()def select_one(sql, *args):"""執行SQL 僅返回一個結果如果沒有結果 返回None如果有1個結果,返回一個結果如果有多個結果,返回第一個結果>>> u1 = dict(id=100, name='Alice', email='alice@test.org', passwd='ABC-12345', last_modified=time.time())>>> u2 = dict(id=101, name='Sarah', email='sarah@test.org', passwd='ABC-12345', last_modified=time.time())>>> insert('user', **u1)1>>> insert('user', **u2)1>>> u = select_one('select * from user where id=?', 100)>>> u.nameu'Alice'>>> select_one('select * from user where email=?', 'abc@email.com')>>> u2 = select_one('select * from user where passwd=? order by email', 'ABC-12345')>>> u2.nameu'Alice'"""return _select(sql, True, *args)def select_int(sql, *args):"""執行一個sql 返回一個數值,注意僅一個數值,如果返回多個數值將觸發異常>>> u1 = dict(id=96900, name='Ada', email='ada@test.org', passwd='A-12345', last_modified=time.time())>>> u2 = dict(id=96901, name='Adam', email='adam@test.org', passwd='A-12345', last_modified=time.time())>>> insert('user', **u1)1>>> insert('user', **u2)1>>> select_int('select count(*) from user')5>>> select_int('select count(*) from user where email=?', 'ada@test.org')1>>> select_int('select count(*) from user where email=?', 'notexist@test.org')0>>> select_int('select id from user where email=?', 'ada@test.org')96900>>> select_int('select id, name from user where email=?', 'ada@test.org')Traceback (most recent call last):...MultiColumnsError: Expect only one column."""d = _select(sql, True, *args)if len(d) != 1:raise MultiColumnsError('Expect only one column.')return d.values()[0]def select(sql, *args):"""執行sql 以列表形式返回結果>>> u1 = dict(id=200, name='Wall.E', email='wall.e@test.org', passwd='back-to-earth', last_modified=time.time())>>> u2 = dict(id=201, name='Eva', email='eva@test.org', passwd='back-to-earth', last_modified=time.time())>>> insert('user', **u1)1>>> insert('user', **u2)1>>> L = select('select * from user where id=?', 900900900)>>> L[]>>> L = select('select * from user where id=?', 200)>>> L[0].emailu'wall.e@test.org'>>> L = select('select * from user where passwd=? order by id desc', 'back-to-earth')>>> L[0].nameu'Eva'>>> L[1].nameu'Wall.E'"""return _select(sql, False, *args)@with_connection
def _update(sql, *args):"""執行update 語句,返回update的行數"""global _db_ctxcursor = Nonesql = sql.replace('?', '%s')logging.info('SQL: %s, ARGS: %s' % (sql, args))try:cursor = _db_ctx.connection.cursor()cursor.execute(sql, args)r = cursor.rowcountif _db_ctx.transactions == 0:# no transaction enviroment:logging.info('auto commit')_db_ctx.connection.commit()return rfinally:if cursor:cursor.close()def update(sql, *args):"""執行update 語句,返回update的行數>>> u1 = dict(id=1000, name='Michael', email='michael@test.org', passwd='123456', last_modified=time.time())>>> insert('user', **u1)1>>> u2 = select_one('select * from user where id=?', 1000)>>> u2.emailu'michael@test.org'>>> u2.passwdu'123456'>>> update('update user set email=?, passwd=? where id=?', 'michael@example.org', '654321', 1000)1>>> u3 = select_one('select * from user where id=?', 1000)>>> u3.emailu'michael@example.org'>>> u3.passwdu'654321'>>> update('update user set passwd=? where id=?', '***', '123')0"""return _update(sql, *args)def insert(table, **kw):"""執行insert語句>>> u1 = dict(id=2000, name='Bob', email='bob@test.org', passwd='bobobob', last_modified=time.time())>>> insert('user', **u1)1>>> u2 = select_one('select * from user where id=?', 2000)>>> u2.nameu'Bob'>>> insert('user', **u2)Traceback (most recent call last):...IntegrityError: 1062 (23000): Duplicate entry '2000' for key 'PRIMARY'"""cols, args = zip(*kw.iteritems())sql = 'insert into `%s` (%s) values (%s)' % (table, ','.join(['`%s`' % col for col in cols]), ','.join(['?' for i in range(len(cols))]))return _update(sql, *args)class Dict(dict):"""字典對象實現一個簡單的可以通過屬性訪問的字典,比如 x.key = value"""def __init__(self, names=(), values=(), **kw):super(Dict, self).__init__(**kw)for k, v in zip(names, values):self[k] = vdef __getattr__(self, key):try:return self[key]except KeyError:raise AttributeError(r"'Dict' object has no attribute '%s'" % key)def __setattr__(self, key, value):self[key] = valueclass DBError(Exception):passclass MultiColumnsError(DBError):passclass _Engine(object):"""數據庫引擎對象用于保存 db模塊的核心函數:create_engine 創建出來的數據庫連接"""def __init__(self, connect):self._connect = connectdef connect(self):return self._connect()class _LasyConnection(object):"""惰性連接對象僅當需要cursor對象時,才連接數據庫,獲取連接"""def __init__(self):self.connection = Nonedef cursor(self):if self.connection is None:_connection = engine.connect()logging.info('[CONNECTION] [OPEN] connection <%s>...' % hex(id(_connection)))self.connection = _connectionreturn self.connection.cursor()def commit(self):self.connection.commit()def rollback(self):self.connection.rollback()def cleanup(self):if self.connection:_connection = self.connectionself.connection = Nonelogging.info('[CONNECTION] [CLOSE] connection <%s>...' % hex(id(connection)))_connection.close()class _DbCtx(threading.local):"""db模塊的核心對象, 數據庫連接的上下文對象,負責從數據庫獲取和釋放連接取得的連接是惰性連接對象,因此只有調用cursor對象時,才會真正獲取數據庫連接該對象是一個 Thread local對象,因此綁定在此對象上的數據 僅對本線程可見"""def __init__(self):self.connection = Noneself.transactions = 0def is_init(self):"""返回一個布爾值,用于判斷 此對象的初始化狀態"""return self.connection is not Nonedef init(self):"""初始化連接的上下文對象,獲得一個惰性連接對象"""logging.info('open lazy connection...')self.connection = _LasyConnection()self.transactions = 0def cleanup(self):"""清理連接對象,關閉連接"""self.connection.cleanup()self.connection = Nonedef cursor(self):"""獲取cursor對象, 真正取得數據庫連接"""return self.connection.cursor()# thread-local db context:
_db_ctx = _DbCtx()class _ConnectionCtx(object):"""因為_DbCtx實現了連接的 獲取和釋放,但是并沒有實現連接的自動獲取和釋放,_ConnectCtx在 _DbCtx基礎上實現了該功能,因此可以對 _ConnectCtx 使用with 語法,比如:with connection():passwith connection():pass"""def __enter__(self):"""獲取一個惰性連接對象"""global _db_ctxself.should_cleanup = Falseif not _db_ctx.is_init():_db_ctx.init()self.should_cleanup = Truereturn selfdef __exit__(self, exctype, excvalue, traceback):"""釋放連接"""global _db_ctxif self.should_cleanup:_db_ctx.cleanup()class _TransactionCtx(object):"""事務嵌套比Connection嵌套復雜一點,因為事務嵌套需要計數,每遇到一層嵌套就+1,離開一層嵌套就-1,最后到0時提交事務"""def __enter__(self):global _db_ctxself.should_close_conn = Falseif not _db_ctx.is_init():# needs open a connection first:_db_ctx.init()self.should_close_conn = True_db_ctx.transactions += 1logging.info('begin transaction...' if _db_ctx.transactions == 1 else 'join current transaction...')return selfdef __exit__(self, exctype, excvalue, traceback):global _db_ctx_db_ctx.transactions -= 1try:if _db_ctx.transactions == 0:if exctype is None:self.commit()else:self.rollback()finally:if self.should_close_conn:_db_ctx.cleanup()def commit(self):global _db_ctxlogging.info('commit transaction...')try:_db_ctx.connection.commit()logging.info('commit ok.')except:logging.warning('commit failed. try rollback...')_db_ctx.connection.rollback()logging.warning('rollback ok.')raisedef rollback(self):global _db_ctxlogging.warning('rollback transaction...')_db_ctx.connection.rollback()logging.info('rollback ok.')if __name__ == '__main__':logging.basicConfig(level=logging.DEBUG)create_engine('www-data', 'www-data', 'test', '192.168.10.128')update('drop table if exists user')update('create table user (id int primary key, name text, email text, passwd text, last_modified real)')import doctestdoctest.testmod()
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
總結
以上是生活随笔為你收集整理的python下的mysql模块包装的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【youcans 的 OpenCV 例程
- 下一篇: python里面的tuple与list对