Python 操作 pymysql 批量 增、删、改、查
github:https://github.com/PyMySQL/PyMySQL
Python3 MySQL 數據庫連接 - PyMySQL 驅動:Python3 MySQL 數據庫連接 – PyMySQL 驅動 | 菜鳥教程
pymysql 是線程安全的( 搜索 thread,可以看到 thread_safe=1,同時函數 thread_safe() 返回 True?):https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/__init__.py
Mysql? 如果數據存在則更新,不存在則插入
:Mysql:如果數據存在則更新,不存在則插入_飛蛾逐月-CSDN博客_mysql 存在更新不存在寫入
1、PyMySQL 安裝
在使用 PyMySQL 之前,我們需要確保 PyMySQL 已安裝。
PyMySQL 下載地址:GitHub - PyMySQL/PyMySQL: Pure Python MySQL Client
安裝?PyMySQL 的?Python 包:pip3 install PyMySQL
2、數據庫連接
連接數據庫前,請先確認以下事項:
- 已經創建了數據庫 TESTDB.
- 在 TESTDB 數據庫中您已經創建了表 EMPLOYEE
- EMPLOYEE 表字段為 FIRST_NAME, LAST_NAME, AGE, SEX 和 INCOME。
- 連接數據庫 TESTDB 使用的用戶名為 "testuser" ,密碼為 "test123",你可以可以自己設定或者直接使用 root 用戶名及其密碼,Mysql 數據庫用戶授權請使用 Grant 命令。
- 已經安裝了 Python MySQLdb 模塊。
- 如果您對sql語句不熟悉,可以訪問?SQL基礎教程
示? 例:
鏈接 Mysql 的 TESTDB 數據庫:
import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用 cursor() 方法創建一個游標對象 cursor cursor = db.cursor()# 使用 execute() 方法執行 SQL 查詢 cursor.execute("SELECT VERSION()")# 使用 fetchone() 方法獲取單條數據. data = cursor.fetchone()print ("Database version : %s " % data)# 關閉數據庫連接 db.close()3、使用
創建數據庫表
如果數據庫連接存在我們可以使用execute()方法來為數據庫創建表,如下所示創建表EMPLOYEE:
import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用 cursor() 方法創建一個游標對象 cursor cursor = db.cursor()# 使用 execute() 方法執行 SQL,如果表存在則刪除 cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")# 使用預處理語句創建表 sql = """CREATE TABLE EMPLOYEE (FIRST_NAME CHAR(20) NOT NULL,LAST_NAME CHAR(20),AGE INT, SEX CHAR(1),INCOME FLOAT )"""cursor.execute(sql)# 關閉數據庫連接 db.close()查詢? 數據
Python 查詢 Mysql 使用 fetchone() 方法獲取單條數據,使用 fetchall() 方法獲取多條數據。
- fetchone():? 該方法獲取下一個查詢結果集。結果集是一個對象
- fetchall():??接收全部的返回結果行.
- rowcount:? 這是一個只讀屬性,并返回執行execute()方法后影響的行數。
查詢 EMPLOYEE 表中 salary(工資)字段大于 1000 的所有數據:
import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法獲取操作游標 cursor = db.cursor()# SQL 查詢語句 sql = "SELECT * FROM EMPLOYEE \WHERE INCOME > %s" % (1000) try:# 執行SQL語句cursor.execute(sql)# 獲取所有記錄列表results = cursor.fetchall()for row in results:fname = row[0]lname = row[1]age = row[2]sex = row[3]income = row[4]# 打印結果print ("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \(fname, lname, age, sex, income )) except:print ("Error: unable to fetch data")# 關閉數據庫連接 db.close()示例:
import pymysqlclass DB():def __init__(self, host='localhost', port=3306, db='', user='root', passwd='root', charset='utf8'):# 建立連接 self.conn = pymysql.connect(host=host, port=port, db=db, user=user, passwd=passwd, charset=charset)# 創建游標,操作設置為字典類型 self.cur = self.conn.cursor(cursor = pymysql.cursors.DictCursor)def __enter__(self):# 返回游標 return self.curdef __exit__(self, exc_type, exc_val, exc_tb):# 提交數據庫并執行 self.conn.commit()# 關閉游標 self.cur.close()# 關閉數據庫連接 self.conn.close()if __name__ == '__main__':with DB(host='192.168.68.129',user='root',passwd='zhumoran',db='text3') as db:db.execute('select * from course')print(db)for i in db:print(i)插入? 數據
執行 SQL INSERT 語句向表 EMPLOYEE 插入記錄:
import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法獲取操作游標 cursor = db.cursor()# SQL 插入語句 sql = """INSERT INTO EMPLOYEE(FIRST_NAME,LAST_NAME, AGE, SEX, INCOME)VALUES ('Mac', 'Mohan', 20, 'M', 2000)""" try:# 執行sql語句cursor.execute(sql)# 提交到數據庫執行db.commit() except:# 如果發生錯誤則回滾db.rollback()# 關閉數據庫連接 db.close()以上例子也可以寫成如下形式:
import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法獲取操作游標 cursor = db.cursor()# SQL 插入語句 sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \LAST_NAME, AGE, SEX, INCOME) \VALUES ('%s', '%s', %s, '%s', %s)" % \('Mac', 'Mohan', 20, 'M', 2000) try:# 執行sql語句cursor.execute(sql)# 執行sql語句db.commit() except:# 發生錯誤時回滾db.rollback()# 關閉數據庫連接 db.close()以下代碼使用變量向SQL語句中傳遞參數:
.................................. user_id = "test123" password = "password"con.execute('insert into Login values( %s, %s)' % \(user_id, password)) ..................................單條插入數據:
#!/usr/bin/python3import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法獲取操作游標 cursor = db.cursor()# SQL 插入語句 里面的數據類型要對應 sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \LAST_NAME, AGE, SEX, INCOME) \VALUES ('%s', '%s', %s, '%s', %s)" % \('Mac', 'Mohan', 20, 'M', 2000) try:# 執行sql語句cursor.execute(sql)# 執行sql語句db.commit() except:# 發生錯誤時回滾db.rollback()# 關閉數據庫連接 db.close()批量插入數據:
注意:批量插入數據 與 單條插入數據?的區別:
- 批量插入:VALUES (%s, %s, %s, %s, %s,) 里面 不用引號
- 單條插入:VALUES ('%s', '%s',?'%s',?'%s', '%s') 里面 需要引號
更新? 數據
更新操作用于更新數據表的數據,以下實例將 TESTDB 表中 SEX 為 'M' 的 AGE 字段遞增 1:
import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法獲取操作游標 cursor = db.cursor()# SQL 更新語句 sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M') try:# 執行SQL語句cursor.execute(sql)# 提交到數據庫執行db.commit() except:# 發生錯誤時回滾db.rollback()# 關閉數據庫連接 db.close()批量 更新
使用 pymysql 的?course.executemany(sql, update_list)?進行批量更新
- sql:更新一條的 sql 語句模板;
- update_list:一個列表套元組的結構;
示? 例:
db = pymysql.connect(user='root', password='mysql', database='test', host='127.0.0.1', port=3306, charset='utf8mb4')name_list = ["re", "gh", "ds", "D"] # 存儲name的值 age_list = ["10", "20", "30", "40"] # 存儲age的值 id_list = ["1", "2", "3", "4"] # 存儲id的值 val_list = [[name_list[i], age_list[i], id_list[i]] for i in range(len(id_list))] print(val_list) # [['re', '10', '1'], ['gh', '20', '2'], ['ds', '30', '3'], ['D', '40', '4']]with db.cursor() as cursor:try:sql = "UPDATE test SET name=(%s), age=(%s) WHERE id=(%s)"cursor.executemany(sql, val_list)db.commit()except:db.rollback() db.close()pymysql 批量 --- 增、刪、改、查
注意:插入數字也是 %s
# coding=utf-8import time import pymysql.cursorsconn= pymysql.connect(host='rm-xxx.mysql.rds.aliyuncs.com',port=3306,user='dba',password='xxxxx',db='app',charset='utf8') cursor= conn.cursor() # conn.ping(reconnect=True)count= 0 posts=[] for postin posts:try:sql= 'DELETE FROM user_like WHERE user_id=%s and like_post_id=%s'ret= cursor.executemany(sql, ((1,2), (3,4), (5,6)))conn.commit()except Exception as e:print("batch Exception:", e)count+=1cursor.close() conn.close()# 基本sql語句寫法 # INSERT INTO star(name,gender) VALUES(“XX”, 20) # SELECT * FROM app.user_post WHERE post_id LIKE '%xxxx%'; # UPDATE app.user_post SET post_id=replace(post_id,'\'','’); # UPDATE app.user_post SET province = ‘xxx', city =‘xxx'; # DELETE FROM app.user_post where updated_at = '0000-00-00 00:00:00’;# 帶參數構造語句的基本寫法 # sql = 'select user_id, post_id from user_post where user_id="{user_id}" and post_id="{post_id}"'.format(user_id=user_id, post_id=post_id) # sql = 'SELECT count(*) FROM user_like where like_post_id = "%s"' % ("xxx") # sql = 'update star set gender="{gender}", height="{height}" where star_id="{star_id}"'.format(gender='M', height=180, star_id=123456789)刪除? 數據
刪除操作用于刪除數據表中的數據,以下實例演示了刪除數據表 EMPLOYEE 中 AGE 大于 20 的所有數據:
import pymysql# 打開數據庫連接 db = pymysql.connect("localhost","testuser","test123","TESTDB" )# 使用cursor()方法獲取操作游標 cursor = db.cursor()# SQL 刪除語句 sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20) try:# 執行SQL語句cursor.execute(sql)# 提交修改db.commit() except:# 發生錯誤時回滾db.rollback()# 關閉連接 db.close()執行 事務
事務機制可以確保數據一致性。
對于支持事務的數據庫, 在 Python 數據庫編程中,當游標建立之時,就自動開始了一個隱形的數據庫事務。commit()方法游標的所有更新操作,rollback()方法回滾當前游標的所有操作。每一個方法都開始了一個新的事務。
事務應該具有4個屬性:原子性、一致性、隔離性、持久性。這四個屬性通常稱為ACID特性。
- 原子性(atomicity)。一個事務是一個不可分割的工作單位,事務中包括的諸操作要么都做,要么都不做。
- 一致性(consistency)。事務必須是使數據庫從一個一致性狀態變到另一個一致性狀態。一致性與原子性是密切相關的。
- 隔離性(isolation)。一個事務的執行不能被其他事務干擾。即一個事務內部的操作及使用的數據對并發的其他事務是隔離的,并發執行的各個事務之間不能互相干擾。
- 持久性(durability)。持續性也稱永久性(permanence),指一個事務一旦提交,它對數據庫中數據的改變就應該是永久性的。接下來的其他操作或故障不應該對其有任何影響。
示例:
# SQL刪除記錄語句 sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20) try:# 執行SQL語句cursor.execute(sql)# 向數據庫提交db.commit() except:# 發生錯誤時回滾db.rollback()pymysqlpool
線程安全 pymysqlpool
# -*-coding: utf-8-*- # Author : Christopher Lee # License: Apache License # File : test_example.py # Date : 2017-06-18 01-23 # Version: 0.0.1 # Description: simple test.import logging import string import threadingimport pandas as pd import randomfrom pymysqlpool import ConnectionPoolconfig = {'pool_name': 'test','host': 'localhost','port': 3306,'user': 'root','password': 'chris','database': 'test','pool_resize_boundary': 50,'enable_auto_resize': True,# 'max_pool_size': 10 }logging.basicConfig(format='[%(asctime)s][%(name)s][%(module)s.%(lineno)d][%(levelname)s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S',level=logging.DEBUG)def connection_pool():# Return a connection pool instancepool = ConnectionPool(**config)# pool.connect()return pooldef test_pool_cursor(cursor_obj=None):cursor_obj = cursor_obj or connection_pool().cursor()with cursor_obj as cursor:print('Truncate table user')cursor.execute('TRUNCATE user')print('Insert one record')result = cursor.execute('INSERT INTO user (name, age) VALUES (%s, %s)', ('Jerry', 20))print(result, cursor.lastrowid)print('Insert multiple records')users = [(name, age) for name in ['Jacky', 'Mary', 'Micheal'] for age in range(10, 15)]result = cursor.executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)print(result)print('View items in table user')cursor.execute('SELECT * FROM user')for user in cursor:print(user)print('Update the name of one user in the table')cursor.execute('UPDATE user SET name="Chris", age=29 WHERE id = 16')cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 1')print(cursor.fetchone())print('Delete the last record')cursor.execute('DELETE FROM user WHERE id = 16')def test_pool_connection():with connection_pool().connection(autocommit=True) as conn:test_pool_cursor(conn.cursor())def test_with_pandas():with connection_pool().connection() as conn:df = pd.read_sql('SELECT * FROM user', conn)print(df)def delete_users():with connection_pool().cursor() as cursor:cursor.execute('TRUNCATE user')def add_users(users, conn):def execute(c):c.cursor().executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)c.commit()if conn:execute(conn)returnwith connection_pool().connection() as conn:execute(conn)def add_user(user, conn=None):def execute(c):c.cursor().execute('INSERT INTO user (name, age) VALUES (%s, %s)', user)c.commit()if conn:execute(conn)returnwith connection_pool().connection() as conn:execute(conn)def list_users():with connection_pool().cursor() as cursor:cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 5')print('...')for x in sorted(cursor, key=lambda d: d['id']):print(x)def random_user():name = "".join(random.sample(string.ascii_lowercase, random.randint(4, 10))).capitalize()age = random.randint(10, 40)return name, agedef worker(id_, batch_size=1, explicit_conn=True):print('[{}] Worker started...'.format(id_))def do(conn=None):for _ in range(batch_size):add_user(random_user(), conn)if not explicit_conn:do()returnwith connection_pool().connection() as c:do(c)print('[{}] Worker finished...'.format(id_))def bulk_worker(id_, batch_size=1, explicit_conn=True):print('[{}] Bulk worker started...'.format(id_))def do(conn=None):add_users([random_user() for _ in range(batch_size)], conn)time.sleep(3)if not explicit_conn:do()returnwith connection_pool().connection() as c:do(c)print('[{}] Worker finished...'.format(id_))def test_with_single_thread(batch_number, batch_size, explicit_conn=False, bulk_insert=False):delete_users()wk = worker if not bulk_insert else bulk_workerfor i in range(batch_number):wk(i, batch_size, explicit_conn)list_users()def test_with_multi_threads(batch_number=1, batch_size=1000, explicit_conn=False, bulk_insert=False):delete_users()wk = worker if not bulk_insert else bulk_workerthreads = []for i in range(batch_number):t = threading.Thread(target=wk, args=(i, batch_size, explicit_conn))threads.append(t)t.start()[t.join() for t in threads]list_users()if __name__ == '__main__':import timestart = time.perf_counter()test_pool_cursor()test_pool_connection()test_with_pandas()test_with_multi_threads(20, 10, True, bulk_insert=True)test_with_single_thread(1, 10, True, bulk_insert=True)elapsed = time.perf_counter() - startprint('Elapsed time is: "{}"'.format(elapsed))總結
以上是生活随笔為你收集整理的Python 操作 pymysql 批量 增、删、改、查的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux 中 VIM 的使用
- 下一篇: sqlmap 详解