python sqlite数据库一对多_Python:使用sqlite3进行多处理
我有一個(gè)SQLite3數(shù)據(jù)庫(kù).我需要解析10000個(gè)文件.我從每個(gè)文件中讀取一些數(shù)據(jù),然后使用此數(shù)據(jù)查詢(xún)數(shù)據(jù)庫(kù)以獲得結(jié)果.我的代碼在單個(gè)進(jìn)程環(huán)境中工作正常.但是在嘗試使用多重處理池時(shí)出現(xiàn)錯(cuò)誤.
My approach without multiprocessing (works OK):
1. Open DB connection object
2. for f in files:
foo(f, x1=x1, x2=x2, ..., db=DB)
3. Close DB
My approach with multiprocessing (does NOT work):
1. Open DB
2. pool = multiprocessing.Pool(processes=4)
3. pool.map(functools.partial(foo, x1=x1, x2=x2, ..., db=DB), [files])
4. pool.close()
5. Close DB
我收到以下錯(cuò)誤:sqlite3.ProgrammingError:未調(diào)用Base Cursor .__ init__.
我的DB類(lèi)實(shí)現(xiàn)如下:
def open_db(sqlite_file):
"""Open SQLite database connection.
Args:
sqlite_file -- File path
Return:
Connection
"""
log.info('Open SQLite database %s', sqlite_file)
try:
conn = sqlite3.connect(sqlite_file)
except sqlite3.Error, e:
log.error('Unable to open SQLite database %s', e.args[0])
sys.exit(1)
return conn
def close_db(conn, sqlite_file):
"""Close SQLite database connection.
Args:
conn -- Connection
"""
if conn:
log.info('Close SQLite database %s', sqlite_file)
conn.close()
class MapDB:
def __init__(self, sqlite_file):
"""Initialize.
Args:
sqlite_file -- File path
"""
# 1. Open database.
# 2. Setup to receive data as dict().
# 3. Get cursor to execute queries.
self._sqlite_file = sqlite_file
self._conn = open_db(sqlite_file)
self._conn.row_factory = sqlite3.Row
self._cursor = self._conn.cursor()
def close(self):
"""Close DB connection."""
if self._cursor:
self._cursor.close()
close_db(self._conn, self._sqlite_file)
def check(self):
...
def get_driver_net(self, net):
...
def get_cell_id(self, net):
...
函數(shù)foo()看起來(lái)像這樣:
def foo(f, x1, x2, db):
extract some data from file f
r1 = db.get_driver_net(...)
r2 = db.get_cell_id(...)
整體不起作用的實(shí)施如下:
mapdb = MapDB(sqlite_file)
log.info('Create NetInfo objects')
pool = multiprocessing.Pool(processes=4)
files = [get list of files to process]
pool.map(functools.partial(foo, x1=x1, x2=x2, db=mapdb), files)
pool.close()
mapdb.close()
為了解決這個(gè)問(wèn)題,我想我需要在每個(gè)池工作者中創(chuàng)建MapDB()對(duì)象(因此有4個(gè)并行/獨(dú)立的連接).但我不知道該怎么做.有人能告訴我如何用Pool實(shí)現(xiàn)這個(gè)目標(biāo)嗎?
最佳答案 那樣定義foo怎么樣:
def foo(f, x1, x2, db_path):
mapdb = MapDB(db_path)
... open mapdb
... process data ...
... close mapdb
然后將pool.map調(diào)用更改為:
pool.map(functools.partial(foo, x1=x1, x2=x2, db_path="path-to-sqlite3-db"), files)
更新
另一個(gè)選擇是自己處理工作線(xiàn)程并通過(guò)隊(duì)列分配工作.
from Queue import Queue
from threading import Thread
q = Queue()
def worker():
mapdb = ...open the sqlite database
while True:
item = q.get()
if item[0] == "file":
file = item[1]
... process file ...
q.task_done()
else:
q.task_done()
break
...close sqlite connection...
# Start up the workers
nworkers = 4
for i in range(nworkers):
worker = Thread(target=worker)
worker.daemon = True
worker.start()
# Place work on the Queue
for x in ...list of files...:
q.put(("file",x))
# Place termination tokens onto the Queue
for i in range(nworkers):
q.put(("end",))
# Wait for all work to be done.
q.join()
終止令牌用于確保關(guān)閉sqlite連接 – 如果重要的話(huà).
總結(jié)
以上是生活随笔為你收集整理的python sqlite数据库一对多_Python:使用sqlite3进行多处理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 5号出账15号还款什么意思
- 下一篇: 磁盘剩余空间策略_如何无损扩展C盘空间大