Python的multiprocessing多进程
https://www.cnblogs.com/tkqasn/p/5701230.html
由于GIL的存在,python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到并發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。該進程可以運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。
但在使用這些共享API的時候,我們要注意以下幾點:
- 在UNIX平臺上,當某個進程終結之后,該進程需要被其父進程調用wait,否則進程成為僵尸進程(Zombie)。所以,有必要對每個Process對象調用join()方法 (實際上等同于wait)。對于多線程來說,由于只有一個進程,所以不存在此必要性。
- multiprocessing提供了threading包中沒有的IPC(比如Pipe和Queue),效率上更高。應優先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因為它們占據的不是用戶進程的資源)。
- 多進程應該避免共享資源。在多線程中,我們可以比較容易地共享資源,比如使用全局變量或者傳遞參數。在多進程情況下,由于每個進程有自己獨立的內存空間,以上方法并不合適。此時我們可以通過共享內存和Manager的方法來共享資源。但這樣做提高了程序的復雜度,并因為同步的需要而降低了程序的效率。
Process.PID中保存有PID,如果進程還沒有start(),則PID為None。
window系統下,需要注意的是要想啟動一個子進程,必須加上那句if __name__ == “__main__”,進程相關的要寫在這句下面。
簡單創建多進程:
有兩種使用方法,直接傳入要運行的方法或從Process繼承并覆蓋run():
from multiprocessing import Process import threading import timedef foo(i):print 'say hi', iif __name__ == '__main__':for i in range(10):p = Process(target=foo, args=(i,))p.start() say hi 4 say hi 3 say hi 5 say hi 2 say hi 1 say hi 6 say hi 0 say hi 7 say hi 8 say hi 9Process finished with exit code 0可以看出多個進程隨機順序執行
from multiprocessing import Process import time class MyProcess(Process):def __init__(self, arg):super(MyProcess, self).__init__()self.arg = argdef run(self):print 'say hi', self.argtime.sleep(1)if __name__ == '__main__':for i in range(10):p = MyProcess(i)p.start()Process類
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
- group: 線程組,目前還沒有實現,庫引用中提示必須是None;
- target: 要執行的方法;
- name: 進程名;
- args/kwargs: 要傳入方法的參數。
實例方法:
-
is_alive():返回進程是否在運行。
-
join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
-
start():進程準備就緒,等待CPU調度
-
run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。
-
terminate():不管任務是否完成,立即停止工作進程
屬性:
-
authkey
-
daemon:和線程的setDeamon功能一樣
-
exitcode(進程在運行時為None、如果為–N,表示被信號N結束)
-
name:進程名字。
-
pid:進程號。
例子一:
from multiprocessing import Process import threading import timedef foo(i):print 'say hi',ifor i in range(10):p = Process(target=foo,args=(i,))p.start() say hi 0 say hi 3 say hi 6 say hi 1 say hi 8 say hi 2 say hi 5 say hi 4 say hi 7 say hi 9Process finished with exit code 0例子二:
def foo(i):time.sleep(1)print 'say hi', itime.sleep(1)if __name__ == '__main__':p_list=[]for i in range(10):p = Process(target=foo, args=(i,))p.daemon=Truep_list.append(p)for p in p_list:p.start()for p in p_list:p.join()print 'main process end' say hi 1 say hi 2 say hi 5 say hi 6 say hi 7 say hi 0 say hi 4 say hi 3 say hi 8 say hi 9 main process endProcess finished with exit code 0可以看出join()方法和deamon屬性的用法和多線程的基本一致。
Pool類
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。進程池設置最好等于CPU核心數量
構造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
- processes :使用的工作進程的數量,如果processes是None那么使用 os.cpu_count()返回的數量。
- initializer: 如果initializer是None,那么每一個工作進程在開始的時候會調用initializer(*initargs)。
- maxtasksperchild:工作進程退出之前可以完成的任務數,完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。- maxtasksperchild默認是None,意味著只要Pool存在工作進程就會一直存活。
- context: 用在制定工作進程啟動時的上下文,一般使用 multiprocessing.Pool() 或者一個context對象的Pool()方法來創建一個池,兩種方法都適當的設置了context
實例方法:
-
apply(func[, args[, kwds]]):同步進程池
-
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :異步進程池
-
close() : 關閉進程池,阻止更多的任務提交到pool,待任務完成后,工作進程會退出。
-
terminate() : 結束工作進程,不在處理未完成的任務
-
join() : wait工作線程的退出,在調用join()前,必須調用close() or terminate()。這樣是因為被終止的進程需要被父進程調用wait(join等價與wait),否則進程會成為僵尸進程。pool.join()必須使用在
例子一(異步進程池):
pool.close()或者pool.terminate()之后。其中close()跟terminate()的區別在于close()會等待池中的worker進程執行結束再關閉pool,而terminate()則是直接關閉。
# coding:utf-8 from multiprocessing import Pool import timedef Foo(i):time.sleep(2)return i + 100def Bar(arg):print argif __name__ == '__main__':t_start=time.time()pool = Pool(5)for i in range(10):pool.apply_async(func=Foo, args=(i,), callback=Bar)#維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去pool.close()pool.join() # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。pool.terminate()t_end=time.time()t=t_end-t_startprint 'the program time is :%s' %t 101 100 102 103 104 106 105 107 108 109 the program time is :4.22099995613Process finished with exit code 0例子二(同步進程池):
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process, Pool import timedef Foo(i):time.sleep(1)print i + 100if __name__ == '__main__':t_start=time.time()pool = Pool(5)for i in range(10):pool.apply(Foo, (i,))pool.close()pool.join() # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。t_end=time.time()t=t_end-t_startprint 'the program time is :%s' %t 100 101 102 103 104 105 106 107 108 109 the program time is :10.2409999371Process finished with exit code 0可以看出進程同步順序執行了,效率降低
例子三:異步進程池使用get()方法獲得進程執行結果值(錯誤使用get()方法獲取結果)
def Bar(arg):return argif __name__ == '__main__':t_start=time.time()pool = Pool(5)for i in range(10):res = pool.apply_async(func=Foo, args=(i,), callback=Bar)#維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去print res.get()pool.close()pool.join() # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。pool.terminate()t_end=time.time()t=t_end-t_startprint 'the program time is :%s' %t 100 101 102 103 104 105 106 107 108 109 the program time is :20.2850000858Process finished with exit code 0可以看出由于每個進程的get()方法,程序變成同步執行了
例子四(正確使用get()方法獲取結果)
# coding:utf-8 from multiprocessing import Pool import timedef Foo(i):time.sleep(2)return i + 100def Bar(arg):return argif __name__ == '__main__':res_list=[]t_start=time.time()pool = Pool(5)for i in range(10):res = pool.apply_async(func=Foo, args=(i,), callback=Bar)res_list.append(res)pool.close()pool.join()for res in res_list:print res.get()t_end=time.time()t=t_end-t_startprint 'the program time is :%s' %t 100 101 102 103 104 105 106 107 108 109 the program time is :4.22399997711Process finished with exit code 0進程數據共享
進程各自持有一份數據,默認無法共享數據
#!/usr/bin/env python # coding:utf-8from multiprocessing import Process li = []def foo(i):li.append(i)print 'say hi', li if __name__ == '__main__':for i in range(10):p = Process(target=foo, args=(i,))p.start()print 'ending', li期望輸出[0到10的隨機排列的列表]
say hi [1] say hi [0] say hi [2] say hi [3] say hi [4] say hi [5] ending [] say hi [6] say hi [7] say hi [8] say hi [9]Process finished with exit code 0方法一(使用Array):
Array(‘i’, range(10))中的‘i’參數C語言中的類型:
‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte ‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint ‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double from multiprocessing import Process, Arraydef f(a):for i in range(len(a)):a[i] = -a[i]if __name__ == '__main__':arr = Array('i', range(10))p = Process(target=f, args=(arr,))p.start()p.join()print(arr[:]) [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]方法二(使用Manager):
Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array類型的支持。
from multiprocessing import Process, Managerdef f(d, l):d[1] = '1'd['2'] = 2d[0.25] = Nonel.reverse()if __name__ == '__main__':with Manager() as manager:d = manager.dict()l = manager.list(range(10))p = Process(target=f, args=(d, l))p.start()p.join()print(d)print(l) {0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]Process finished with exit code 0 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Python的multiprocessing多进程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: win10微信多开
- 下一篇: 只让输入数字、字母、中文的输入框