python mutilprocessing多进程编程
??`為了更好的理解本文內容,請務必先了解Synchronization、Asynchronization、Concurrent、Mutex等基本概念
??multiprocessing是一個類似于Threading模塊的由API產生進程的包,關于Threading模塊可以參考我的博客文章。multiprocessing能夠 提供本地和遠程兩種并發模式,通過使用子進程而不是線程有效地避開了GIL。因此,multiprocessing允許程序員充分利用機器上的多個處理器,且該包支持在Unix系統和Windows系統上運行。
??mutilprocessing還引入了在Threading模塊中沒有相類似的API。比如Pool對象,Pool對象提供了一種方便的方法,可以跨多個輸入值并行化函數的執行,跨進程分配輸入數據(數據并行)。使用方法可以看看下面的例子:
from multiprocessing import Pooldef f(x):return x * xif __name__ == '__main__':with Pool(5) as p:print(p.map(f, [1, 2, 3, 4, 5, 6, 7]))# [1, 4, 9, 16, 25, 36, 49]Process類
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={},daemon=None)
?group必須為None,設置該參數僅僅是為了與Threading模塊保持一致
?target是run()方法調用的可調用對象
?name是指進程名
?daemon指示是否設置為守護進程
-
run()
??表示進程活動的方法,可在子類中重寫此方法。標準run()方法調用傳遞給對象構造函數的可調用對象作為目標參數(如果有),分別使用args和kwargs參數中的順序和關鍵字參數。
-
start()
??啟動進程的活動,每個進程對象最多只能調用一次,在一個單獨的進程中調用對象的run()方法
-
join([timeout])
??如果可選參數timeout為None(缺省值),則該方法將阻塞,直到調用其join()方法的進程終止。如果timeout是一個正數,它最多會阻塞timeout秒。請注意,如果方法的進程終止或方法超時,則該方法返回None。檢查進程的exitcode以確定它是否終止。
-
name
??進程名
-
is_alive()
??指示進程是否還活著
-
daemon
??daemon flag, a Boolean value, 必須在進程start之前設置
-
pid
??process ID
-
exitcode
??負值-N表示孩子被信號N終止,默認為None,表示進程未被終止
-
authkey
??The process’s authentication key (a byte string)
-
sentinel
?系統對象的數字句柄,當進程結束時將變為“ready”?
-
terminate()
??終止進程,但注意子進程不會被終止,只是會成孤兒
請注意,start(),join(),is_alive(),terminate()和exitcode方法只應由創建過程對象的進程調用。
>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process(Process-1, initial)> False >>> p.start() >>> print(p, p.is_alive()) <Process(Process-1, started)> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process(Process-1, stopped[SIGTERM])> False >>> p.exitcode == -signal.SIGTERM True??在multiprocessing中,通過創建Process對象然后調用其start()方法來生成進程,其使用方法和threading.Thread一樣。我們看下面的例子:
from multiprocessing import Processdef f(name):print('hello', name)if __name__ == '__main__': # 這句話是必要的,不可去掉p = Process(target=f, args=('bob',))p.start()p.join()我們可以通過進程號來區分不同的進程:
from multiprocessing import Process import osdef info(title):print(title)print('module name:', __name__)print('parent process:', os.getppid())print('process id:', os.getpid(), '\n')def f(name):info('function f')print('hello', name)if __name__ == '__main__':info('main line')p = Process(target=f, args=('bob',)) # 創建新進程p.start() # 啟動進程p.join()進程啟動
??根據平臺的不同,multiprocessing支持三種啟動進程的方法。這些啟動方法是:
-
spawn
spawn
?調用改方法,父進程會啟動一個新的python進程,子進程只會繼承運行進程對象run()方法所需的那些資源。特別地,子進程不會繼承父進程中不必要的文件描述符和句柄。與使用fork或forkserver相比,使用此方法啟動進程相當慢。
? Available on Unix and Windows. The default on Windows.
-
fork
?父進程使用os.fork()來fork Python解釋器。子進程在開始時實際上與父進程相同,父進程的所有資源都由子進程繼承。請注意,安全創建多線程進程尚存在一定的問題。
?Available on Unix only. The default on Unix.
-
forkserver
?當程序啟動并選擇forkserverstart方法時,將啟動服務器進程。從那時起,每當需要一個新進程時,父進程就會連接到服務器并請求它fork一個新進程。 fork服務器進程是單線程的,因此使用os.fork()是安全的。沒有不必要的資源被繼承。
?Available on Unix platforms which support passing file descriptors over Unix pipes.
??要選擇以上某一種start方法,請在主模塊的if __name__ == '__ main__'子句中使用mp.set_start_method()。
并且mp.set_start_method()在一個程序中僅僅能使用一次。
import multiprocessing as mpdef foo(q):q.put('hello')if __name__ == '__main__':mp.set_start_method('spawn')q = mp.Queue()p = mp.Process(target=foo, args=(q,))p.start()print(q.get())p.join()??或者,您可以使用get_context()來獲取上下文對象。上下文對象與多處理模塊具有相同的API,并允許在同一程序中使用多個啟動方法。
import multiprocessing as mpdef foo(q):q.put('hello')if __name__ == '__main__':ctx = mp.get_context('spawn')q = ctx.Queue()p = ctx.Process(target=foo, args=(q,))p.start()print(q.get())p.join()?注意,與一個context相關的對象可能與不同context的進程不兼容。特別是,使用fork context創建的鎖不能傳遞給使用spawn或forkserver start方法啟動的進程。
進程通信
??當使用多個進程時,通常使用消息傳遞來進行進程之間的通信,并避免必須使用任何synchronization primitives(如鎖)。對于傳遞消息,可以使用Pipe(用于兩個進程之間的連接)或Queue(允許多個生產者和消費者)。
Queues
??class multiprocessing.Queue([maxsize])
???Queue實現queue.Queue的所有方法,但task_done()和join()除外。Queue是進程、線程安全的模型
from multiprocessing import Process, Queuedef f(q):q.put([42, None, 'hello'])if __name__ == '__main__':q = Queue()p = Process(target=f, args=(q,))p.start()print(q.get()) # prints "[42, None, 'hello']"p.join()Pipes
??Class multiprocessing.Pipe([duplex])
??返回一對(conn1, conn2) of Connection 對象代表pipe的兩端。如果duplex為True(默認值),則管道是雙向的;如果duplex為False,則管道是單向的:conn1只能用于接收消息,conn2只能用于發送消息。Pipe()`函數返回一個由Pipe連接的連接對象,默認情況下是全雙工雙向通信(duplex)。例如:
from multiprocessing import Process, Pipedef f(conn):conn.send([42, None, 'hello'])conn.close()if __name__ == '__main__':parent_conn, child_conn = Pipe()p = Process(target=f, args=(child_conn,))p.start()print(parent_conn.recv()) # prints "[42, None, 'hello']"p.join()??Pipe()返回的兩個連接對象代表管道的兩端,每個連接對象都有send()和recv()方法。需要注意的是,管道中的數據可能會不一致或被破壞,如當兩個進程(或線程)嘗試同時讀取或寫入管道的同一端。當然,同時使用管道的不同端部的過程不存在損壞的風險。
進程共享狀態
??在進行并發編程時,通常最好避免使用共享狀態,但是,如果你確實需要使用某些共享數據,那么multiprocessing提供了以下兩種方法:
Shared Memory
?可以使用Value或Array將數據存儲在共享內存的map(映射)中。例如,以下代碼:
from multiprocessing import Process, Value, Arraydef f(n, a):n.value = 3.1415927for i in range(len(a)):a[i] = -a[i]if __name__ == '__main__':num = Value('d', 0.0)arr = Array('i', range(10))p = Process(target=f, args=(num, arr))p.start()p.join()print(num.value)print(arr[:])# 3.1415927# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]??創建num和arr時使用的’d’和’i’參數是array module使用的類型的類型代碼:'d’表示雙精度浮點數,'i’表示有符號整數。這些共享對象將是進程和線程安全的。為了更靈活地使用共享內存,可以使用multiprocessing.sharedctypes模塊,該模塊支持創建從共享內存分配的任意ctypes對象。但還是那句話,在進行并發編程時,通常最好避免使用共享狀態。
Server Process
??Manager()返回的Manager對象控制一個服務器進程(server process),該進程保存Python對象并允許其他進程使用代理操作它們。Manager對象支持的對象包括list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value 以及 Array。Managers提供了一種創建可在不同進程之間共享的數據的方法,包括在不同計算機上運行的進程之間通過網絡共享。管理器對象控制管理共享對象的服務器進程。其他進程可以使用代理訪問共享對象。
from multiprocessing import Process, Managerdef f(d, l):d[1] = '1'd['2'] = 2d[0.25] = Nonel.reverse()if __name__ == '__main__':with Manager() as manager:d = manager.dict()l = manager.list(range(10))p = Process(target=f, args=(d, l))p.start()p.join()print(d)print(l)#{0.25: None, 1: '1', '2': 2} #[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]Proxy
??代理是一個對象,它指的是(可能)在不同的進程中存在的共享對象。共享對象被認為是代理的指示對象。多個代理對象可能具有相同的指示對象。代理對象具有調用其引用對象的相應方法的方法。代理對象的一個重要特性是它們是pickable的,因此它們可以在進程之間傳遞。
>>> from multiprocessing import Manager >>> manager = Manager() >>> l = manager.list([i*i for i in range(10)]) >>> print(l) # l即是一個代理對象 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> print(repr(l)) <ListProxy object, typeid 'list' at 0x...> >>> l[4] 16 >>> l[2:5] [4, 9, 16]Connection
?&esmp;connection對象允許發送和接收可序列化對象或字符串。它們可以被認為是面向消息的連接套接字,我們再上面介紹Pipe的時候所實例化的對象就是connection對象。
-
send(obj)
將對象發送到連接的另一端,應使用recv()讀取,且該對象必須是pickable的,>32 MB的對象可能會引發ValueError異常。
-
recv()
返回從連接另一端發送的對象。阻塞直到接收到東西。如果沒有剩余要接收和另一端被關閉,則引發EOFError。
-
fileno()
返回conn所使用的文件描述符或句柄
-
close()
關閉連接
-
poll([timeout])
返回是否有可供讀取的數據,如果未指定超時,則會立即返回;如果timeout是一個數字,則指定阻止的最長時間(以秒為單位);如果timeout為None,則使用無限超時。
-
send_bytes(buffer[, offset[, size]])
發送字節數據
-
recv_bytes([maxlength])
接受字節數據
-
recv_bytes_into(buffer[, offset])
讀取從連接另一端發送的字節數據的完整消息到buffer,并返回消息中的字節數。
>>> from multiprocessing import Pipe >>> a, b = Pipe() >>> a.send([1, 'hello', None]) >>> b.recv() [1, 'hello', None] >>> b.send_bytes(b'thank you') >>> a.recv_bytes() b'thank you' >>> import array >>> arr1 = array.array('i', range(5)) >>> arr2 = array.array('i', [0] * 10) >>> a.send_bytes(arr1) >>> count = b.recv_bytes_into(arr2) >>> assert count == len(arr1) * arr1.itemsize >>> arr2 array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
summary
??Server process Manager比使用共享內存對象更靈活,因為它們可以支持任意對象類型。此外,單個管理器可以通過網絡在不同計算機上的進程共享。但它比使用共享內存慢。
Synchronization
??同步原語和Threading模塊幾乎一致,具體請參考Python Threading 多線程編程
Lock
from multiprocessing import Process, Lockdef f(l, i):"""保證同一時間只有一個標準輸出流"""l.acquire()try:print('hello world', i)finally:l.release()if __name__ == '__main__':lock = Lock()for num in range(10):Process(target=f, args=(lock, num)).start()# output hello world 1 hello world 0 hello world 2 hello world 4 hello world 3 hello world 6 hello world 9 hello world 5 hello world 8 hello world 7Pool類
??Pool類用于創建進程池
主要方法有,具體例子見代碼,并請注意,pool對象的方法只能由創建它的進程使用:
- pool.map()
- pool.imap() Equivalent of map() – can be MUCH slower than Pool.map().
- pool.starmap() Like map() method but the elements of the iterable are expected to be iterables as well and will be unpacked as arguments.
- pool.starmap_async Asynchronous version of starmap() method
- pool.map_async Asynchronous version of map() method.
- pool.imap_unordered()
- pool.apply()
- pool.apply_async() Asynchronous version of apply() method.
Miscellaneous
-
multiprocessing.active_children()
返回當前進程的所有活子進 程的列表
-
multiprocessing.cpu_count()
返回系統中的CPU數量,此數字不等于當前進程可以使用的CPU數量。可以使用len(os.sched_getaffinity(0))獲得可用CPU的數量
-
multiprocessing.current_process()
返回與當前進程對應的Process對象
-
multiprocessing.freeze_support()
為程序打包成exe可執行文件提供支持,在Windows以外的任何操作系統上調用時,調用freeze_support()無效。此外,如果模塊由Windows上的Python解釋器正常運行(程序尚未凍結),則freeze_support()無效
from multiprocessing import Process, freeze_supportdef f():print('hello world!')if __name__ == '__main__':freeze_support()Process(target=f).start() -
multiprocessing.get_all_start_methods()
返回支持的start方法列表,第一個是默認方法。可能的啟動方法是’fork’,‘spawn’和’forkserver’。在Windows上只有“spawn”可用。在Unix上’fork’和’spawn’總是受支持,'fork’是默認值。
-
multiprocessing.get_context(method=None)
返回與multiprocessing模塊具有相同屬性的上下文對象,具體用法前面已經有過例子
-
multiprocessing.get_start_method(allow_none=False)
返回用于啟動進程的start方法的名稱,返回值可以是’fork’,‘spawn’,'forkserver’或None。 'fork’是Unix上的默認值,而’spawn’是Windows上的默認值。
-
multiprocessing.set_executable()
設置啟動子進程時要使用的Python解釋器的路徑
-
multiprocessing.set_start_method(method)
設置用于啟動子進程的方法。方法可以是’fork’,‘spawn’或’forkserver’。請注意,改法最多調用一次,并且應該寫在主模塊的if name ==’__ main__'子句中。
總結
以上是生活随笔為你收集整理的python mutilprocessing多进程编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python Threading 多线程
- 下一篇: python pip修改安装镜像源