Python multiprocess 多进程模块
轉(zhuǎn)發(fā):http://www.langzi.fun/Python multiprocess 多進(jìn)程模塊.html
需要注意的是,如果使用多線程,用法一定要加上if __name__=='__main__':(Python中的multiprocess提供了Process類,實(shí)現(xiàn)進(jìn)程相關(guān)的功能。但是它基于fork機(jī)制,因此不被windows平臺支持。想要在windows中運(yùn)行,必須使用if __name__=='__main__':的方式),但是我有另一種方法在使用線程池的時候可以不使用name_mian,最下面說。
并且多線程就是開啟多個線程,每個線程之間是不會互相通信互相干擾的,適用于密集計算。
案例一 基礎(chǔ)用法
多進(jìn)程的使用方法和多線程使用方法基本一樣,所以如果你會多線程用法多進(jìn)程也就懂了,有一點(diǎn)要注意,定義多進(jìn)程,然后傳遞參數(shù)的時候,如果是有一個參數(shù)就是用args=(i,)一定要加上逗號,如果有兩個或者以上的參數(shù)就不用這樣。
import sys
import multiprocessing
reload(sys)
sys.setdefaultencoding('utf-8')
def fun(i):print sys.pathprint sys.version_infoprint sys.platformprint sys.long_infoif __name__ == '__main__':m = multiprocessing.Process(target=fun,args=(1,))m.start()
運(yùn)行結(jié)果:
['E:\\python27\\python study', 'E:\\python27', 'C:\\windows\\SYSTEM32\\python27.zip', 'F:\\Python27\\DLLs', 'F:\\Python27\\lib', 'F:\\Python27\\lib\\plat-win', 'F:\\Python27\\lib\\lib-tk', 'F:\\Python27', 'F:\\Python27\\lib\\site-packages', 'F:\\Python27\\lib\\site-packages\\certifi-2017.7.27.1-py2.7.egg', 'F:\\Python27\\lib\\site-packages\\idna-2.6-py2.7.egg', 'F:\\Python27\\lib\\site-packages\\pypiwin32-219-py2.7-win-amd64.egg', 'F:\\Python27\\lib\\site-packages\\future-0.16.0-py2.7.egg', 'F:\\Python27\\lib\\site-packages\\dis3-0.1.1-py2.7.egg', 'F:\\Python27\\lib\\site-packages\\macholib-1.8-py2.7.egg', 'F:\\Python27\\lib\\site-packages\\pefile-2017.9.3-py2.7.egg', 'F:\\Python27\\lib\\site-packages\\altgraph-0.14-py2.7.egg', 'F:\\Python27\\lib\\site-packages\\beautifulsoup4-4.6.0-py2.7.egg', 'F:\\Python27\\lib\\site-packages\\chardet-3.0.4-py2.7.egg']
sys.version_info(major=2, minor=7, micro=14, releaselevel='final', serial=0)
win32
sys.long_info(bits_per_digit=30, sizeof_digit=4)
案例二 數(shù)據(jù)通信
ipc:就是進(jìn)程間的通信模式,常用的一般是socke,rpc,pipe和消息隊(duì)列等。
multiprocessing提供了threading包中沒有的IPC(比如Pipe和Queue),效率上更高。應(yīng)優(yōu)先考慮Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因?yàn)樗鼈冋紦?jù)的不是用戶進(jìn)程的資源)。
使用Array共享數(shù)據(jù)
對于Array數(shù)組類,括號內(nèi)的“i”表示它內(nèi)部的元素全部是int類型,而不是指字符“i”,數(shù)組內(nèi)的元素可以預(yù)先指定,也可以只指定數(shù)組的長度。Array類在實(shí)例化的時候必須指定數(shù)組的數(shù)據(jù)類型和數(shù)組的大小,類似temp = Array(‘i’, 5)。對于數(shù)據(jù)類型有下面的對應(yīng)關(guān)系:
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
代碼實(shí)例:
from multiprocessing import Process
from multiprocessing import Arraydef func(i,temp):temp[0] += 100print("進(jìn)程%s " % i, ' 修改數(shù)組第一個元素后----->', temp[0])if __name__ == '__main__':temp = Array('i', [1, 2, 3, 4])for i in range(10):p = Process(target=func, args=(i, temp))p.start()
運(yùn)行結(jié)果:
進(jìn)程2 修改數(shù)組第一個元素后-----> 101
進(jìn)程4 修改數(shù)組第一個元素后-----> 201
進(jìn)程5 修改數(shù)組第一個元素后-----> 301
進(jìn)程3 修改數(shù)組第一個元素后-----> 401
進(jìn)程1 修改數(shù)組第一個元素后-----> 501
進(jìn)程6 修改數(shù)組第一個元素后-----> 601
進(jìn)程9 修改數(shù)組第一個元素后-----> 701
進(jìn)程8 修改數(shù)組第一個元素后-----> 801
進(jìn)程0 修改數(shù)組第一個元素后-----> 901
進(jìn)程7 修改數(shù)組第一個元素后-----> 1001
使用Manager共享數(shù)據(jù)
通過Manager類也可以實(shí)現(xiàn)進(jìn)程間數(shù)據(jù)的共享,主要用于線程池之間通信,Manager()返回的manager對象提供一個服務(wù)進(jìn)程,使得其他進(jìn)程可以通過代理的方式操作Python對象。manager對象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多種格式。
代碼實(shí)例:
from multiprocessing import Process
from multiprocessing import Managerdef func(i, dic):dic["num"] = 100+iprint(dic.items())if __name__ == '__main__':dic = Manager().dict()for i in range(10):p = Process(target=func, args=(i, dic))p.start()p.join()
使用queues的Queue類共享數(shù)據(jù)
multiprocessing是一個包,它內(nèi)部有一個queues模塊,提供了一個Queue隊(duì)列類,可以實(shí)現(xiàn)進(jìn)程間的數(shù)據(jù)共享,如下例所示:
import multiprocessing
from multiprocessing import Process
from multiprocessing import queuesdef func(i, q):ret = q.get()print("進(jìn)程%s從隊(duì)列里獲取了一個%s,然后又向隊(duì)列里放入了一個%s" % (i, ret, i))q.put(i)if __name__ == "__main__":lis = queues.Queue(20, ctx=multiprocessing)lis.put(0)for i in range(10):p = Process(target=func, args=(i, lis,))p.start()
運(yùn)行結(jié)果:
進(jìn)程1從隊(duì)列里獲取了一個0,然后又向隊(duì)列里放入了一個1
進(jìn)程4從隊(duì)列里獲取了一個1,然后又向隊(duì)列里放入了一個4
進(jìn)程2從隊(duì)列里獲取了一個4,然后又向隊(duì)列里放入了一個2
進(jìn)程6從隊(duì)列里獲取了一個2,然后又向隊(duì)列里放入了一個6
進(jìn)程0從隊(duì)列里獲取了一個6,然后又向隊(duì)列里放入了一個0
進(jìn)程5從隊(duì)列里獲取了一個0,然后又向隊(duì)列里放入了一個5
進(jìn)程9從隊(duì)列里獲取了一個5,然后又向隊(duì)列里放入了一個9
進(jìn)程7從隊(duì)列里獲取了一個9,然后又向隊(duì)列里放入了一個7
進(jìn)程3從隊(duì)列里獲取了一個7,然后又向隊(duì)列里放入了一個3
進(jìn)程8從隊(duì)列里獲取了一個3,然后又向隊(duì)列里放入了一個8
例如來跑多進(jìn)程對一批IP列表進(jìn)行運(yùn)算,運(yùn)算后的結(jié)果都存到Queue隊(duì)列里面,這個就必須使用multiprocessing提供的Queue來實(shí)現(xiàn)
關(guān)于queue和Queue,在Python庫中非常頻繁的出現(xiàn),很容易就搞混淆了。甚至是multiprocessing自己還有一個Queue類(大寫的Q)和的Manager類中提供的Queue方法,一樣能實(shí)現(xiàn)消息隊(duì)列queues.Queue的功能,導(dǎo)入方式是from multiprocessing import Queue,前者Queue用于多個進(jìn)程間通信,和queues.Queue()差不多,后者Manager().queue用于進(jìn)程池之間通信。
使用pipe實(shí)現(xiàn)進(jìn)程間通信
pipe只能適用于兩個進(jìn)程間通信,queue則沒這個限制,他有兩個方法
receive_pi = Pipe()
#定義變量,用來獲取數(shù)據(jù)
send_pi = Pipe()
#用來發(fā)送數(shù)據(jù)
具體例子如下:
from multiprocessing import Pipe,Process
import time
def produce(pipe):pipe.send('666')time.sleep(1)
def consumer(pipe):print(pipe.recv())# 有些類似socket的recv方法
if __name__ == '__main__':send_pi,recv_pi = Pipe()my_pro = Process(target=produce,args=(send_pi,))my_con = Process(target=consumer,args=(recv_pi,))my_pro.start()my_con.start()my_pro.join()my_con.join()
pipe相當(dāng)于queue的一個子集,只能服務(wù)兩個進(jìn)程,pipe的性能高于queue。
案例三 進(jìn)程鎖
一般來說每個進(jìn)程使用單獨(dú)的空間,不必加進(jìn)程鎖的,但是如果你需要先實(shí)現(xiàn)進(jìn)程數(shù)據(jù)共享,使用案例二中的代碼,又害怕造成數(shù)據(jù)搶奪和臟數(shù)據(jù)的問題。就可以設(shè)置進(jìn)程鎖,與threading類似,在multiprocessing里也有同名的鎖類RLock,Lock,Event,Condition和 Semaphore,連用法都是一樣樣的。
代碼實(shí)例:
from multiprocessing import Process
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import timedef func(i,lis,lc):lc.acquire()lis[0] = lis[0] - 1time.sleep(1)print('say hi', lis[0])lc.release()if __name__ == "__main__":array = Array('i', 1)array[0] = 10lock = RLock()for i in range(10):p = Process(target=func, args=(i, array, lock))p.start()
運(yùn)行結(jié)果:
say hi 9
say hi 8
say hi 7
say hi 6
say hi 5
say hi 4
say hi 3
say hi 2
say hi 1
say hi 0
案例四 進(jìn)程池
from multiprocessing import Pool導(dǎo)入就行,非常容易使用的。進(jìn)程池內(nèi)部維護(hù)了一個進(jìn)程序列,需要時就去進(jìn)程池中拿取一個進(jìn)程,如果進(jìn)程池序列中沒有可供使用的進(jìn)程,那么程序就會等待,直到進(jìn)程池中有可用進(jìn)程為止。
apply() 同步執(zhí)行(串行)
apply_async() 異步執(zhí)行(并行)
terminate() 立刻關(guān)閉進(jìn)程池
join() 主進(jìn)程等待所有子進(jìn)程執(zhí)行完畢。必須在close或terminate()之后。
close() 等待所有進(jìn)程結(jié)束后,才關(guān)閉進(jìn)程池。
代碼實(shí)例:
from multiprocessing import Pool
import time
def func(args):time.sleep(1)print("正在執(zhí)行進(jìn)程 ", args)
if __name__ == '__main__':p = Pool(5) # 創(chuàng)建一個包含5個進(jìn)程的進(jìn)程池for i in range(30):# 有30個任務(wù)p.apply_async(func=func, args=(i,))# 異步執(zhí)行,并發(fā)。這里不用target,要用funcp.close() # 等子進(jìn)程執(zhí)行完畢后關(guān)閉進(jìn)程池# time.sleep(2)# p.terminate() # 立刻關(guān)閉進(jìn)程池p.join()
from multiprocessing.dummy import Pool as ThreadPool 是多線程進(jìn)程池,綁定一個cpu核心。from multiprocessing import Pool多進(jìn)程,運(yùn)行于多個cpu核心。multiprocessing 是多進(jìn)程模塊, 而multiprocessing.dummy是以相同API實(shí)現(xiàn)的多線程模塊。
沒有繞過GIL情況下,多線程一定受GIL限制。
代碼實(shí)例:
from multiprocessing.dummy import Pool as tp
def fun(i):print i+i+i+ilist_i=[range(100)]px = tp(processes=8)
# 開啟8個線程池
px.map(fun,list_i)
px.close()
px.join()
使用dummy方法可以不用__name__=’__main__‘,并且用法很簡單,開啟線程池用法一樣,需要注意的是導(dǎo)入的參數(shù),要在一個列表中導(dǎo)入。比如你有一批數(shù)據(jù)要放進(jìn)這個線程池,就直接把這批數(shù)據(jù)放在一個列表中。
各模塊作用
Process介紹
構(gòu)造方法:
Process([group [, target [, name [, args [, kwargs]]]]])group: 線程組,目前還沒有實(shí)現(xiàn),庫引用中提示必須是None;
target: 要執(zhí)行的方法;
name: 進(jìn)程名;
args/kwargs: 要傳入方法的參數(shù)。
實(shí)例方法:
is_alive():返回進(jìn)程是否在運(yùn)行。
join([timeout]):阻塞當(dāng)前上下文環(huán)境的進(jìn)程程,直到調(diào)用此方法的進(jìn)程終止或到達(dá)指定的3. timeout(可選參數(shù))。
start():進(jìn)程準(zhǔn)備就緒,等待CPU調(diào)度。
run():strat()調(diào)用run方法,如果實(shí)例進(jìn)程時未制定傳入target,這star執(zhí)行t默認(rèn)run()方法。
terminate():不管任務(wù)是否完成,立即停止工作進(jìn)程。
屬性:
authkey
daemon:和線程的setDeamon功能一樣(將父進(jìn)程設(shè)置為守護(hù)進(jìn)程,當(dāng)父進(jìn)程結(jié)束時,子進(jìn)程也結(jié)束)。
exitcode(進(jìn)程在運(yùn)行時為None、如果為–N,表示被信號N結(jié)束)。
name:進(jìn)程名字。
pid:進(jìn)程號。
Pool介紹
Multiprocessing.Pool可以提供指定數(shù)量的進(jìn)程供用戶調(diào)用,當(dāng)有新的請求提交到pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進(jìn)程用來執(zhí)行該請求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請求就會等待,直到池中有進(jìn)程結(jié)束,才會創(chuàng)建新的進(jìn)程來執(zhí)行它。在共享資源時,只能使用Multiprocessing.Manager類,而不能使用Queue或者Array。Pool類用于需要執(zhí)行的目標(biāo)很多,而手動限制進(jìn)程數(shù)量又太繁瑣時,如果目標(biāo)少且不用控制進(jìn)程數(shù)量則可以用Process類。
構(gòu)造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes :使用的工作進(jìn)程的數(shù)量,如果processes是None那么使用 os.cpu_count()返回的數(shù)量。
initializer: 如果initializer是None,那么每一個工作進(jìn)程在開始的時候會調(diào)用initializer(*initargs)。
maxtasksperchild:工作進(jìn)程退出之前可以完成的任務(wù)數(shù),完成后用一個新的工作進(jìn)程來替代原進(jìn)程,來讓閑置的資源被釋放。maxtasksperchild默認(rèn)是None,意味著只要Pool存在工作進(jìn)程就會一直存活。
context: 用在制定工作進(jìn)程啟動時的上下文,一般使用 multiprocessing.Pool() 或者一個context對象的Pool()方法來創(chuàng)建一個池,兩種方法都適當(dāng)?shù)脑O(shè)置了context。
實(shí)例方法:
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞。
apply(func[, args[, kwds]])是阻塞的
close() 關(guān)閉pool,使其不在接受新的任務(wù)。
terminate() 關(guān)閉pool,結(jié)束工作進(jìn)程,不在處理未完成的任務(wù)。
join() 主進(jìn)程阻塞,等待子進(jìn)程的退出, join方法要在close或terminate之后使用。
Pool使用方法
Pool+map函數(shù)
說明:此寫法缺點(diǎn)在于只能通過map向函數(shù)傳遞一個參數(shù)。
from multiprocessing import Pool
def test(i):print i
if __name__=="__main__":lists=[1,2,3]pool=Pool(processes=2) #定義最大的進(jìn)程數(shù)pool.map(test,lists) #p必須是一個可迭代變量。pool.close()pool.join()
異步進(jìn)程池(非阻塞)
from multiprocessing import Pool
def test(i):print i
if __name__=="__main__":pool = Pool(processes=10)for i in xrange(500):'''For循環(huán)中執(zhí)行步驟:(1)循環(huán)遍歷,將500個子進(jìn)程添加到進(jìn)程池(相對父進(jìn)程會阻塞)(2)每次執(zhí)行10個子進(jìn)程,等一個子進(jìn)程執(zhí)行完后,立馬啟動新的子進(jìn)程。(相對父進(jìn)程不阻塞)apply_async為異步進(jìn)程池寫法。異步指的是啟動子進(jìn)程的過程,與父進(jìn)程本身的執(zhí)行(print)是異步的,而For循環(huán)中往進(jìn)程池添加子進(jìn)程的過程,與父進(jìn)程本身的執(zhí)行卻是同步的。'''pool.apply_async(test, args=(i,)) #維持執(zhí)行的進(jìn)程總數(shù)為10,當(dāng)一個進(jìn)程執(zhí)行完后啟動一個新進(jìn)程. print “test”pool.close()pool.join()
執(zhí)行順序:For循環(huán)內(nèi)執(zhí)行了2個步驟,第一步:將500個對象放入進(jìn)程池(阻塞)。第二步:同時執(zhí)行10個子進(jìn)程(非阻塞),有結(jié)束的就立即添加,維持10個子進(jìn)程運(yùn)行。(apply_async方法的會在執(zhí)行完for循環(huán)的添加步驟后,直接執(zhí)行后面的print語句,而apply方法會等所有進(jìn)程池中的子進(jìn)程運(yùn)行完以后再執(zhí)行后面的print語句)
注意:調(diào)用join之前,先調(diào)用close或者terminate方法,否則會出錯。執(zhí)行完close后不會有新的進(jìn)程加入到pool,join函數(shù)等待所有子進(jìn)程結(jié)束。
同步進(jìn)程池(阻塞)
from multiprocessing import Pool
def test(p):print ptime.sleep(3)
if __name__=="__main__":pool = Pool(processes=10)for i in xrange(500):'''實(shí)際測試發(fā)現(xiàn),for循環(huán)內(nèi)部執(zhí)行步驟:(1)遍歷500個可迭代對象,往進(jìn)程池放一個子進(jìn)程(2)執(zhí)行這個子進(jìn)程,等子進(jìn)程執(zhí)行完畢,再往進(jìn)程池放一個子進(jìn)程,再執(zhí)行。(同時只執(zhí)行一個子進(jìn)程)for循環(huán)執(zhí)行完畢,再執(zhí)行print函數(shù)。'''pool.apply(test, args=(i,)) #維持執(zhí)行的進(jìn)程總數(shù)為10,當(dāng)一個進(jìn)程執(zhí)行完后啟動一個新進(jìn)程.print “test”pool.close()pool.join()
說明:for循環(huán)內(nèi)執(zhí)行的步驟順序,往進(jìn)程池中添加一個子進(jìn)程,執(zhí)行子進(jìn)程,等待執(zhí)行完畢再添加一個子進(jìn)程……等500個子進(jìn)程都執(zhí)行完了,再執(zhí)行print “test”。(從結(jié)果來看,并沒有多進(jìn)程并發(fā))
子進(jìn)程返回值
在實(shí)際使用多進(jìn)程的時候,可能需要獲取到子進(jìn)程運(yùn)行的返回值。如果只是用來存儲,則可以將返回值保存到一個數(shù)據(jù)結(jié)構(gòu)中;如果需要判斷此返回值,從而決定是否繼續(xù)執(zhí)行所有子進(jìn)程,則會相對比較復(fù)雜。另外在Multiprocessing中,可以利用Process與Pool創(chuàng)建子進(jìn)程,這兩種用法在獲取子進(jìn)程返回值上的寫法上也不相同。這篇中,我們直接上代碼,分析多進(jìn)程中獲取子進(jìn)程返回值的不同用法,以及優(yōu)缺點(diǎn)。
初級用法(Pool)
目的:存儲子進(jìn)程返回值
說明:如果只是單純的存儲子進(jìn)程返回值,則可以使用Pool的apply_async異步進(jìn)程池;當(dāng)然也可以使用Process,用法與threading中的相同,這里只介紹前者。
實(shí)例:當(dāng)進(jìn)程池中所有子進(jìn)程執(zhí)行完畢后,輸出每個子進(jìn)程的返回值。
from multiprocessing import Pool
def test(p): return p
if __name__=="__main__":pool = Pool(processes=10)result=[]for i in xrange(50000):'''for循環(huán)執(zhí)行流程:(1)添加子進(jìn)程到pool,并將這個對象(子進(jìn)程)添加到result這個列表中。(此時子進(jìn)程并沒有運(yùn)行)(2)執(zhí)行子進(jìn)程(同時執(zhí)行10個)'''result.append(pool.apply_async(test, args=(i,)))#維持執(zhí)行的進(jìn)程總數(shù)為10,當(dāng)一個進(jìn)程執(zhí)行完后添加新進(jìn)程. pool.join()'''遍歷result列表,取出子進(jìn)程對象,訪問get()方法,獲取返回值。(此時所有子進(jìn)程已執(zhí)行完畢)'''for i in result:print i.get()
錯誤寫法:
for i in xrange(50000):t=pool.apply_async(test, args=(i,)))print t.get()
說明:這樣會造成阻塞,因?yàn)間et()方法只能等子進(jìn)程運(yùn)行完畢后才能調(diào)用成功,否則會一直阻塞等待。如果寫在for循環(huán)內(nèi)容,相當(dāng)于變成了同步,執(zhí)行效率將會非常低。
高級用法(Pool)
目的:父進(jìn)程實(shí)時獲取子進(jìn)程返回值,以此為標(biāo)記結(jié)束所有進(jìn)程。
實(shí)例(一)
執(zhí)行子進(jìn)程的過程中,不斷獲取返回值并校驗(yàn),如果返回值為True則結(jié)果所有進(jìn)程。
from multiprocessing import Pool
import Queue
import time
def test(p):time.sleep(0.001)if p==10000:return Trueelse:return False
if __name__=="__main__":pool = Pool(processes=10)q=Queue.Queue()for i in xrange(50000):'''將子進(jìn)程對象存入隊(duì)列中。'''q.put(pool.apply_async(test, args=(i,)))#維持執(zhí)行的進(jìn)程總數(shù)為10,當(dāng)一個進(jìn)程執(zhí)行完后添加新進(jìn)程. '''因?yàn)檫@里使用的為pool.apply_async異步方法,因此子進(jìn)程執(zhí)行的過程中,父進(jìn)程會執(zhí)行while,獲取返回值并校驗(yàn)。'''while 1:if q.get().get():pool.terminate() #結(jié)束進(jìn)程池中的所有子進(jìn)程。breakpool.join()
說明:總共要執(zhí)行50000個子進(jìn)程(并發(fā)數(shù)量為10),當(dāng)其中一個子進(jìn)程返回True時,結(jié)束進(jìn)程池。因?yàn)槭褂昧薬pply_async為異步進(jìn)程,因此在執(zhí)行完for循環(huán)的添加子進(jìn)程操作后(只是添加并沒有執(zhí)行完所有的子進(jìn)程),可以直接執(zhí)行while代碼,實(shí)時判斷子進(jìn)程返回值是否有True,有的話結(jié)束所有進(jìn)程。
優(yōu)點(diǎn):不必等到所有子進(jìn)程結(jié)束再結(jié)束程序,只要得到想要的結(jié)果就可以提前結(jié)束,節(jié)省資源。
不足:當(dāng)需要執(zhí)行的子進(jìn)程非常大時,不適用,因?yàn)閒or循環(huán)在添加子進(jìn)程時,要花費(fèi)很長的時間,雖然是異步,但是也需要等待for循環(huán)添加子進(jìn)程操作結(jié)束才能執(zhí)行while代碼,因此會比較慢。
實(shí)例(二)
多線程+多進(jìn)程,添加執(zhí)行子進(jìn)程的過程中,不斷獲取返回值并校驗(yàn),如果返回值為True則結(jié)果所有進(jìn)程。
from multiprocessing import Pool
import Queue
import threading
import time
def test(p):time.sleep(0.001)if p==10000:return Trueelse:return False
if __name__=="__main__":result=Queue.Queue() #隊(duì)列pool = Pool()def pool_th():for i in xrange(50000000): ##這里需要創(chuàng)建執(zhí)行的子進(jìn)程非常多try:result.put(pool.apply_async(test, args=(i,)))except:breakdef result_th():while 1:a=result.get().get() #獲取子進(jìn)程返回值if a:pool.terminate() #結(jié)束所有子進(jìn)程break'''利用多線程,同時運(yùn)行Pool函數(shù)創(chuàng)建執(zhí)行子進(jìn)程,以及運(yùn)行獲取子進(jìn)程返回值函數(shù)。'''t1=threading.Thread(target=pool_th)t2=threading.Thread(target=result_th)t1.start()t2.start()t1.join()t2.join()pool.join()
執(zhí)行流程:利用多線程,創(chuàng)建一個執(zhí)行pool_th函數(shù)線程,一個執(zhí)行result_th函數(shù)線程,pool_th函數(shù)用來添加進(jìn)程池,開啟進(jìn)程執(zhí)行功能函數(shù)并將子進(jìn)程對象存入隊(duì)列,而result_th()函數(shù)用來不停地從隊(duì)列中取子進(jìn)程對象,調(diào)用get()方法獲取返回值。等發(fā)現(xiàn)其中存在子進(jìn)程的返回值為True時,結(jié)束所有進(jìn)程,最后結(jié)束線程。
優(yōu)點(diǎn):彌補(bǔ)了實(shí)例(一)的不足,即使for循環(huán)的子進(jìn)程數(shù)量很多,也能提高性能,因?yàn)閒or循環(huán)與判斷子進(jìn)程返回值同時進(jìn)行。
總結(jié)
以上是生活随笔為你收集整理的Python multiprocess 多进程模块的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 利用Procdump+Mimikatz获
- 下一篇: Python多进程与进程锁的基本使用