python访问数据库如何解决高并发_使用 Python 和 Oracle 数据库实现高并发性
隨著趨勢發展的核心轉向更多而不是更快發展,最大限度地提高并發性的重要性日益凸顯。并發性使得編程模式發生了新的轉變,可以編寫異步代碼,從而將多個任務分散到一組線程或進程中并行工作。如果您不是編程新手并且很熟悉 C 或 C++,您可能已經對線程和進程有所了解,并且知道它們之間的區別。在進行并發編程時,線程提供了進程的輕量級替代物,在大多數情況下多線程較多進程更受青睞。因此,本文將討論如何通過多線程來實現并發性。
與很多其他編程語言一樣,在使用多 CPU 計算機時將占用大量 CPU 的任務分散到 Python 中的多個線程中(可以使用 Python 標準庫中的多進程模塊實現)可以提高性能。對于單處理器計算機,這樣確實可以并行運行多個操作,而不是只能在任務間切換且在任何指定時間只能執行一個任務。相反,在將多線程的 Python 程序移到一個多 CPU 計算機時,由于全局解釋器鎖 (GIL) 的原因您不會注意到任何性能提升,Python 使用 GIL 保護內部數據結構,確保在一次只有一個線程運行 CPython 虛擬機。
但是,您可能仍然有興趣向支持數據庫的 Python 程序中添加線程以加快其速度。關鍵是 Python 與之交互的底層數據庫很可能安裝在并行處理提交的查詢的高性能服務器上。這意味著您可以從提交多個查詢到數據庫服務器并在單獨的線程中并行進行的操作中受益,而不是在一個線程中一個接一個地按順序發出查詢。
要注意的是:盡管利用任務自身的并行性可以顯著提升應用程序性能,但是我們必須認識到,不是所有任務都可并行執行。例如,在客戶請求的操作(例如轉賬)完成之前,您無法向客戶發出確認電子郵件。很顯然,此類任務必須按特定順序執行。
另外,構建多線程代碼時還要記住,某些并行運行的線程可能同時嘗試更改共享的對象,這可能導致數據丟失、數據殘缺,甚至損壞正在更改的對象。要避免此問題,應該控制對共享對象的訪問,使得一個線程一次只能使用一個此類對象。幸運的是,利用 Python 可以實施一個鎖定機制來同步對共享對象的訪問(利用線程模塊中的鎖定工具)。
使用鎖定的缺點是損失了可擴展性。設計可擴展性時,不要忘記,對一個線程內的某個資源進行鎖定將使該資源在所有其他正在運行的線程和進程中不可用,直至該鎖定被釋放為止。因此,要確保高效的資源管理,不應過多地使用鎖定,盡可能避免鎖定,如果需要使用鎖定也要盡可能早地釋放該鎖定。
幸運的是,當您處理存儲在 Oracle 數據庫中的資源時不必擔心鎖定。這是因為,在并發環境中對共享數據提供訪問時,Oracle 數據庫將使用其自身的后臺鎖定機制。因此,通常較好的做法是將共享數據存儲在 Oracle 數據庫中,從而由 Oracle 數據庫處理并發性問題。
異步執行操作也是實現可擴展性和受益于并發性的較好方式。在異步編程中,阻塞代碼排隊等待稍后單獨的線程完成,從而確保您的應用程序可以繼續執行其他任務。使用異步框架(如 Twisted)可以極大地簡化構建異步應用程序的任務。
本文將簡單介紹如何使用 Python 和 Oracle 數據庫構建并發應用程序,描述如何使用 Python 代碼利用線程與 Oracle 數據庫交互,并解釋如何將 SQL 查詢并行提交到數據庫服務器而不是依次處理。您還將了解如何讓 Oracle 數據庫處理并發性問題以及如何利用 Python 事件驅動的框架 Twisted。
Python 中的多線程編程
線程是并行處理中的一個非常有用的特性。如果您的一個程序正在執行耗時的操作并且可以將其分成若干個獨立的任務并行執行,那么使用線程可以幫助您構建更加高效、快速的代碼。多線程的另一個有趣的用處是可以提高應用程序的響應能力 — 在后臺執行耗時操作的同時,主程序仍然可以做出響應。
當長時間運行的 SQL 語句彼此并無關聯并且可以并行執行時,將這些語句封裝到 Python 的不同線程中是不錯的做法。例如,如果 Web 頁面將初始的 SQL 查詢并行提交到數據庫服務器而不是按順序處理它們(使它們一個接一個地排隊等待),則可顯著減少 Web 頁面的加載時間。
當您需要將某些大對象 (LOB) 上載到數據庫時,也會發現線程很有用。以并行方式執行此操作不僅可以減少將 LOB 上載到數據庫所需的整體時間,還可以在后臺進行并行上載的同時保持程序主線程的響應能力。
假設您需要將幾個二進制大對象 (BLOB) 上載到數據庫并將其保存到 blob_tab 表(您可能已經在自定義數據庫模式中創建了該表),如下所示:
CREATE TABLE blob_tab(
id NUMBER PRIMARY KEY,
blobdoc BLOB
);
CREATE SEQUENCE blob_seq;
首先,我們來了解一下如何不利用線程將 BLOB 一個接一個地存儲到 blob_tab 表中。以下 Python 腳本可以完成該任務,永久保存分別使用文件名和 URL 獲得的兩個輸入圖像。該示例假設您已經在 usr/pswd 自定義數據庫模式中創建了 blob_tab 表和 blob_seq 序列:
#File: singlethread.py
#Storing BLOBs in a single thread sequentially, one after another
import cx_Oracle
from urllib import urlopen
inputs = []
#if you?ˉre a Windows user, the path could be 'c:/temp/figure1.bmp'
inputs.append(open('/tmp/figure1.bmp', 'rb'))
inputs.append(urlopen('http://localhost/mypictures/figure2.bmp', 'rb'))
#obtaining a connection and predefining a memory area for a BLOB
dbconn = cx_Oracle.connect('usr', 'pswd', '127.0.0.1/XE')
dbconn.autocommit = True
cur = dbconn.cursor()
cur.setinputsizes(blobdoc=cx_Oracle.BLOB)
#executing INSERT statements saving BLOBs to the database
for input in inputs:
blobdoc = input.read()
cur.execute("INSERT INTO blob_tab (ID, BLOBDOC) VALUES(blob_seq.NEXTVAL, :blobdoc)", {'blobdoc':blobdoc})
input.close()
dbconn.close()
盡管獲取和存儲 figure1.bmp 和 figure2.bmp 的任務在此處一個接一個地進行,但是,您可能已經猜到,這些任務實際上并不存在順序上的先后關聯性。因此,您可以重構上述代碼,使其在單個線程中讀取和存儲每個圖像,從而通過并行處理提升性能。在這種特殊的情況下值得一提的是,您不必協調并行運行的線程,從而可以極大地簡化編碼。
以下示例顯示了如何利用面向對象的方法重新編寫上述腳本以使用線程。具體來說,該示例說明了如何從 threading 模塊擴展 Thread 類,針對特定任務對其進行自定義。
#File: multithread.py
#Storing BLOBs in separate threads in parallel
import cx_Oracle
import threading
from urllib import urlopen
#subclass of threading.Thread
class AsyncBlobInsert(threading.Thread):
def __init__(self, cur, input):
threading.Thread.__init__(self)
self.cur = cur
self.input = input
def run(self):
blobdoc = self.input.read()
self.cur.execute("INSERT INTO blob_tab (ID, BLOBDOC) VALUES(blob_seq.NEXTVAL, :blobdoc)", {'blobdoc':blobdoc})
self.input.close()
self.cur.close()
#main thread starts here
inputs = []
inputs.append(open('/tmp/figure1.bmp', 'rb'))
inputs.append(urlopen('http://localhost/_figure2.bmp', 'rb'))
dbconn = cx_Oracle.connect('usr', 'pswd', '127.0.0.1/XE',threaded=True)
dbconn.autocommit = True
for input in inputs:
cur = dbconn.cursor()
cur.setinputsizes(blobdoc=cx_Oracle.BLOB)
th = AsyncBlobInsert(cur, input)
th.start()
在上述代碼中,注意 threaded 屬性的使用,該屬性作為參數傳遞到 cx_Oracle.connect 方法。通過將其設置為 true,您指示 Oracle 數據庫使用 OCI_THREADED 模式(又稱為 threaded 模式),從而指明應用程序正在多線程環境中運行。請注意,在此處針對單線程應用程序使用 threaded 模式并不是一種好的做法。根據 cx_Oracle 文檔,在單線程應用程序中將 threaded 參數設置為 true 將使性能下降 10% 到 15%。
在本示例中,您將在兩個線程間共享一個連接,但是將為每個線程創建一個單獨的游標對象。此處,讀取 BLOB 然后將其插入數據庫的操作是在 threading.Thread 標準 Python 類中 AsyncBlobInsert 自定義子類的改寫的 run 方法中實現的。因此,要在單獨的線程中開始上載 BLOB,您只需創建一個 AsyncBlobInsert 實例,然后調用其 start 方法。
這里要討論一個與腳本有關的問題。執行時,它不會等到正在啟動的線程完成 — 啟動子線程后主線程將結束,不會等到子線程完成。如果您并不希望這樣而是希望程序僅在所有線程都完成后再結束,那么您可以在腳本末尾調用每個 AsyncBlobInsert 實例的 join 方法。這將阻塞主線程,使其等待子線程的完成。對前面的腳本進行修改,使其等待 for 循環中啟動的所有線程完成,如下所示:
...
th = []
for i, input in enumerate(inputs):
cur = dbconn.cursor()
cur.setinputsizes(blobdoc=cx_Oracle.BLOB)
th.append(AsyncBlobInsert(cur, input))
th[i].start()
#main thread waits until all child threads are done
for t in th:
t.join()
下一節中提供了需要強制主線程等待子線程完成的示例。
同步對共享資源的訪問
前面的示例顯示了一個多線程的 Python 應用程序,該程序處理幾個彼此并無關聯的任務,因此很容易分離并放到不同的線程中進行并行處理。但是在實際中,您經常需要處理彼此相互關聯的操作,并且需要在某個時刻進行同步。
作為單個進程的一部分,線程共享相同的全局內存,因此可以通過共享資源(如變量、類實例、流和文件)在彼此之間傳遞信息。但是,這種在線程間交換信息的簡單方法是有條件的 — 當修改的對象可以同時在另一線程中訪問和/或修改時,您確實要非常謹慎。因此,如果能夠避免沖突,使用一個機制來同步對共享數據的訪問,這將是很有用的。
為幫助解決這一問題,Python 允許您指定鎖定,然后可以由某個線程取得該鎖定以確保對該線程中您所使用的數據結構進行獨占訪問。Threading 模塊附帶有 Lock 方法,您可以使用該方法指定鎖定。但是請注意,使用 threading.Lock 方法指定的鎖定最初處于未鎖定狀態。要鎖定一個分配的鎖,您需要顯式調用該鎖定對象的 acquire 方法。之后,可以對需要鎖定的對象執行操作。例如,當向線程中的 stdout 標準輸出流進行寫入時,您可能需要使用鎖,以免其他使用 stdout 的線程發生重疊。進行此操作后,您需要使用鎖定對象的 release 方法釋放該鎖,以使釋放的數據結構可用于其他線程中的進一步處理。
關于鎖定要注意的是,它們并不綁定到單個線程。在一個線程中指定的鎖,可以由另一個線程獲得,并由第三個線程釋放。以下腳本例舉了實際操作中的一個簡單的鎖。此處,為在子線程中進行使用,您在主線程中指定了一個鎖,在向 DOM 文檔寫入之前獲得它,然后立即釋放。
#File: synchmultithread.py
#Using locks for synchronization in a multithreaded script
import sys
import cx_Oracle
import threading
from xml.dom.minidom import parseString
from urllib import urlopen
#subclass of threading.Thread
class SynchThread(threading.Thread):
def __init__(self, cur, query, dom):
threading.Thread.__init__(self)
self.cur = cur
self.query = query[1]
self.tag = query[0]
self.dom = dom
def run(self):
self.cur.execute(self.query)
rslt = self.cur.fetchone()[0]
self.cur.close()
mutex.acquire()
sal = self.dom.getElementsByTagName('salary')[0]
newtag = self.dom.createElement(self.tag)
newtext = self.dom.createTextNode('%s'%rslt)
newtag.appendChild(newtext)
sal.appendChild(newtag)
mutex.release()
#main thread starts here
domdoc = parseString('')
dbconn = cx_Oracle.connect('hr', 'hr', '127.0.0.1/XE',threaded=True)
mutex = threading.Lock()
queries = {}
queries['avg'] = "SELECT AVG(salary) FROM employees"
queries['max'] = "SELECT MAX(salary) FROM employees"
th = []
for i, query in enumerate(queries.items()):
cur = dbconn.cursor()
th.append(SynchThread(cur, query, domdoc))
th[i].start()
#forcing the main thread to wait until all child threads are done
for t in th:
t.join()
#printing out the result xml document
domdoc.writexml(sys.stdout)
在上面的腳本中,您首先在主線程中創建了一個文檔對象模型 (DOM) 文檔對象,然后在并行運行的子線程中修改該文檔,添加包含從數據庫獲取的信息的標簽。此處,您將針對 HR 演示模式中的 employees 表使用了兩個簡單的查詢。為避免在向 DOM 對象并行寫入期間發生沖突,您需要在每個子線程中獲取在主線程中指定的鎖。一個子線程獲得該鎖后,另一個子線程將無法修改此處處理的 DOM 對象,直至第一個線程釋放該鎖。
然后,您可以使用主線程同步在各子線程中對 DOM 對象所做的更新,在主線程中調用每個子線程對象的 join 方法。之后,您可以在主流中對 DOM 文檔對象進行進一步處理。在該特定示例中,您只是將其寫入 stdout 標準輸出流。
因此,您可能已經注意到,此處展示的示例實際上并沒有討論如何鎖定數據庫訪問操作,例如,發出查詢或針對并行線程中的同一數據庫表進行更新。實際上,Oracle 數據庫有自己的強大鎖定機制,可確保并發環境中的數據完整性。而您的任務是正確使用這些機制。下一節中,我們將討論如何利用 Oracle 數據庫特性控制對共享數據的并發訪問,從而讓數據庫處理并發性問題。
使 Oracle 數據庫管理并發性
如上所述,當對存儲在 Oracle 數據庫中的共享數據進行訪問或操作時,您不必在 Python 代碼中手動實施資源鎖定。為解決并發性問題,Oracle 數據庫根據事務概念在后臺使用不同類型的鎖和多版本并發性控制系統。在實際操作中,這意味著,您只需考慮如何正確利用事務以確保正確訪問、更新或更改數據庫數據。具體來說,您必須謹慎地在自動提交事務模式和手動提交事務模式之間做出選擇,將多個 SQL 語句組合到一個事務中時也需小心仔細。最后,必須避免發生并發事務間的破壞性交互。
在這里,需要記住的是,您在 Python 代碼中使用的事務與連接而非游標相關聯,這意味著您可以輕松地按照邏輯將使用不同游標但通過相同連接執行的語句組合到一個事務中。但是,如果您希望實施兩個并發事務,則需要創建兩個不同的連接對象。
在前面的“Python 中的多線程編程”一節中討論的多線程示例中,您將連接對象的 autocommit 模式設置為 true,從而指示 cx_Oracle 模塊在每個 INSERT 語句后隱式執行 COMMIT。在這種特定情況下,使用自動提交模式是合理的,因為這樣可以避免子線程和主線程間的同步,從而可以在主線程中手動執行 COMMIT,如下所示:
...
#main thread waits until all child threads are done
for t in th:
t.join()
#and then issues a commit
dbconn.commit()
但是,在有些情況下,您需要用到上述方案。考慮以下示例。假設您在兩個并行線程中分別執行以下兩個操作。在一個線程中,您將采購訂單文檔保存到數據庫中,包括訂單詳細信息。在另一個線程中,您對包含該訂單中涉及產品的相關信息的表進行修改,更新可供購買的產品數量。
很顯然,上述兩個操作必須封裝到一個事務中。為此,您必須關閉 autocommit 模式,該模式為默認模式。此外,您還將需要使用主線程同步并行線程,然后顯式執行 COMMIT,如上述代碼段所示。
雖然上述方案可以輕松實現,但在實際中,您可能最希望在數據庫中實施第二個操作,即更新可供購買的產品的數量,將 BEFORE INSERT 觸發器放到存儲訂單詳細信息的表上,這樣它可以自動更新包含相關產品信息的表中的相應記錄。這將簡化 Python 端的代碼并消除編寫多線程 Python 腳本的需求,讓 Oracle 數據庫來處理數據完整性問題。實際上,如果在放入 details 表的 BEFORE INSERT 觸發器中更新產品表時出現問題,Oracle 數據庫將自動回滾將新行插入到 details 表的操作。在 Python 端,需要進行的操作僅是將用于保存訂單詳細信息的所有 INSERT 封裝到一個事務中,如下所示:
...
dbconn = cx_Oracle.connect('hr', 'hr', '127.0.0.1/XE',threaded=True)
dbconn.autocommit = False
cur = dbconn.cursor()
...
for detail in details:
id = detail['id']
quantity = person['quantity']
cur.execute("INSERT INTO details(id, quantity) VALUES(:id, :quantity)", {'id':id, 'quantity':quantity})
dbconn.commit()
...
使用 Python 事件驅動的框架 Twisted
Twisted 提供了一種不增加復雜性的編碼事件驅動應用程序的好方法,使 Python 中的多線程編程更加簡單、安全。Twisted 并發性模式基于無阻塞調用概念。您調用一個函數來請求某些數據并指定一個在請求數據就緒時調用的回調函數。而于此同時,程序可以繼續執行其他任務。
twisted.enterprise.adbapi 模塊是一個異步封裝程序,可用于任何 DB-API 兼容的 Python 模塊,使您可以以無阻塞模式執行數據庫相關任務。例如,使用它,您的應用程序不必等待數據的連接建立或查詢完成,而是并行執行其他任務。本節將介紹幾個與 Oracle 數據庫交互的 Twisted 應用程序的簡單示例。
Twisted 不隨 Python 提供,需要下載并在裝有 Python 的系統中安裝。您可以從 Twisted Matrix Labs Web 站點?http://twistedmatrix.com?下載適合您 Python 版本的 Twisted 安裝程序包。下載程序包之后,只需在 Twisted 設置向導中進行幾次點擊即可完成安裝,安裝大約需要一分鐘的時間。
Twisted 是一個事件驅動的框架,因此,其事件循環一旦啟動即持續運行,直到事件完成。在 Twisted 中,事件循環使用名為 reactor 的對象進行實施。使用 reactor.run 方法啟動 Twisted 事件循環,使用 reactor.stop 停止該循環。而另一個名為 Deferred 的 Twisted 對象用于管理回調。以下是簡化了的現實中的 Twisted 事件循環和回調示例。__name__ 測試用于確保解決方案將僅在該模塊作為主腳本調用但不導入時(即,必須從命令行、使用 IDLE Python GUI 或通過單擊圖標調用該解決方案)運行。
#File: twistedsimple.py
#A simple example of a Twisted app
from twisted.internet import reactor
from twisted.enterprise import adbapi
def printResult(rslt):
print rslt[0][0]
reactor.stop()
if __name__ == "__main__":
dbpool = adbapi.ConnectionPool('cx_Oracle', user='hr', password ='hr', dsn='127.0.0.1/XE')
empno = 100
deferred = dbpool.runQuery("SELECT last_name FROM employees WHERE employee_id = :empno", {'empno':empno})
deferred.addCallback(printResult)
reactor.run()
請注意,twisted.enterprise.adbapi 模塊基于標準 DB-API 接口構建,并在后臺使用您在調用 adbapi.ConnectionPool 方法時指定的 Python 數據庫模塊。甚至您在指定 adbapi.ConnectionPool 輸入參數時可以使用的一組關鍵字也取決于您使用的數據庫模塊類型。
盡管與不同的 Python 數據庫模塊結合使用時語法上有一些不同,但是通過 twisted.enterprise.adbapi,您可以編寫異步代碼,從而可以在后臺安全處理數據庫相關任務的同時,繼續執行您的程序流。以下示例展示了一個以異步方式查詢數據庫的簡單 Twisted Web 應用程序。該示例假設您已經創建了 blob_tab 表并為其填充了數據(如本文開始部分“Python 中的多線程編程”一節中所述)。
#File: twistedTCPServer.py
#Querying database asynchronously with Twisted
from twisted.web import resource, server
from twisted.internet import reactor
from twisted.enterprise import adbapi
class BlobLoads(resource.Resource):
def __init__(self, dbconn):
self.dbconn = dbconn
resource.Resource.__init__(self)
def _getBlobs(self, txn, query):
txn.execute(query)
return txn.fetchall()
def render_GET(self, request):
query = "select id, blobdoc from blob_tab"
self.dbconn.runInteraction(self._getBlobs, query).addCallback(
self._writeBlobs, request).addErrback(
self._exception, request)
return server.NOT_DONE_YET
def _writeBlobs(self, results, request):
request.write("""
BLOBs manipulatingWriting BLOBs from the database to your disk
""")
for id, blobdoc in results:
request.write("/tmp/picture%s.bmp
" % id)
blob = blobdoc.read()
output = open("/tmp/picture%s.bmp" % id, 'wb')
output.write(blob)
output.close()
request.write("""
Operation completed
""")
request.finish( )
def _exception(self, error, request):
request.write("Error obtaining BLOBs: %s" % error.getErrorMessage())
request.write("""
Could not complete operation
總結
以上是生活随笔為你收集整理的python访问数据库如何解决高并发_使用 Python 和 Oracle 数据库实现高并发性的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: float去掉小数点之后_float类型
- 下一篇: 在python语言中不能作为变量名的是什