python 多进程 调用模块内函数_Python进程池multiprocessing.Pool的用法
一、multiprocessing模塊
multiprocessing模塊提供了一個(gè)Process類來代表一個(gè)進(jìn)程對(duì)象,multiprocessing模塊像線程一樣管理進(jìn)程,這個(gè)是multiprocessing的核心,它與threading很相似,對(duì)多核CPU的利用率會(huì)比threading好的多
看一下Process類的構(gòu)造方法:
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
參數(shù)說明:
group:進(jìn)程所屬組(基本不用)
target:表示調(diào)用對(duì)象
args:表示調(diào)用對(duì)象的位置參數(shù)元組
name:別名
kwargs:表示調(diào)用對(duì)象的字典
示例:
importmultiprocessingdef do(n): #參數(shù)n由args=(1,)傳入
name = multiprocessing.current_process().name #獲取當(dāng)前進(jìn)程的名字
print(name, 'starting')print("worker", n)return
if __name__ == '__main__':
numList=[]for i in range(5):
p= multiprocessing.Process(target=do, args=(i,)) #(i,)中加入","表示元祖
numList.append(p)print(numList)
p.start()#用start()方法啟動(dòng)進(jìn)程,執(zhí)行do()方法
p.join() #等待子進(jìn)程結(jié)束以后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程間的同步
print("Process end.")
運(yùn)行結(jié)果:
[]
Process-1starting
worker 0
Process end.
[, ]
Process-2starting
worker1Process end.
[, , ]
Process-3starting
worker2Process end.
[, , , ]
Process-4starting
worker3Process end.
[, , , , ]
Process-5starting
worker4Process end.
通過打印numList可以看出當(dāng)前進(jìn)程結(jié)束后,再開始下一個(gè)進(jìn)程
注意:
在Windows上要想使用進(jìn)程模塊,就必須把有關(guān)進(jìn)程的代碼寫在當(dāng)前.py文件的if __name__ == ‘__main__’ :語(yǔ)句的下面,才能正常使用Windows下的進(jìn)程模塊。Unix/Linux下則不需要
二、Pool類
Pool類可以提供指定數(shù)量的進(jìn)程供用戶調(diào)用,當(dāng)有新的請(qǐng)求提交到Pool中時(shí),如果池還沒有滿,就會(huì)創(chuàng)建一個(gè)新的進(jìn)程來執(zhí)行請(qǐng)求。如果池滿,請(qǐng)求就會(huì)告知先等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來執(zhí)行這些請(qǐng)求
下面介紹一下multiprocessing 模塊下的Pool類下的幾個(gè)方法:
1.apply()
函數(shù)原型:apply(func[, args=()[, kwds={}]])
該函數(shù)用于傳遞不定參數(shù),同python中的apply函數(shù)一致,主進(jìn)程會(huì)被阻塞直到函數(shù)執(zhí)行結(jié)束(不建議使用,并且3.x以后不再出現(xiàn))
2.apply_async
函數(shù)原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])
與apply用法一致,但它是非阻塞的且支持結(jié)果返回后進(jìn)行回調(diào)
3.map()
函數(shù)原型:map(func, iterable[, chunksize=None])
Pool類中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會(huì)使進(jìn)程阻塞直到結(jié)果返回
注意:雖然第二個(gè)參數(shù)是一個(gè)迭代器,但在實(shí)際使用中,必須在整個(gè)隊(duì)列都就緒后,程序才會(huì)運(yùn)行子進(jìn)程
4.map_async()
函數(shù)原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,但是它是非阻塞的
5.close()
關(guān)閉進(jìn)程池(pool),使其不再接受新的任務(wù)
6.terminal()
結(jié)束工作進(jìn)程,不再處理未處理的任務(wù)
7.join()
主進(jìn)程阻塞等待子進(jìn)程的退出, join方法要在close或terminate之后使用
示例1--使用map()函數(shù)
importtimefrom multiprocessing importPooldefrun(fn):#fn: 函數(shù)參數(shù)是數(shù)據(jù)列表的一個(gè)元素
time.sleep(1)print(fn *fn)if __name__ == "__main__":
testFL= [1, 2, 3, 4, 5, 6]print('shunxu:') #順序執(zhí)行(也就是串行執(zhí)行,單進(jìn)程)
s =time.time()for fn intestFL:
run(fn)
t1=time.time()print("順序執(zhí)行時(shí)間:", int(t1 -s))print('concurrent:') #創(chuàng)建多個(gè)進(jìn)程,并行執(zhí)行
pool = Pool(3) #創(chuàng)建擁有3個(gè)進(jìn)程數(shù)量的進(jìn)程池
#testFL:要處理的數(shù)據(jù)列表,run:處理testFL列表中數(shù)據(jù)的函數(shù)
pool.map(run, testFL)
pool.close()#關(guān)閉進(jìn)程池,不再接受新的進(jìn)程
pool.join() #主進(jìn)程阻塞等待子進(jìn)程的退出
t2 =time.time()print("并行執(zhí)行時(shí)間:", int(t2 - t1))
運(yùn)行結(jié)果:
1、map函數(shù)中testFL為可迭代對(duì)象--列表
2、當(dāng)創(chuàng)建3個(gè)進(jìn)程時(shí),會(huì)一次打印出3個(gè)結(jié)果“1,4,9”,當(dāng)當(dāng)創(chuàng)建2個(gè)進(jìn)程時(shí),會(huì)一次打印出2個(gè)結(jié)果“1,4”,以此類推,當(dāng)創(chuàng)建多余6個(gè)進(jìn)程時(shí),會(huì)一次打印出所有結(jié)果
3、如果使用Pool(),不傳入?yún)?shù),可以創(chuàng)建一個(gè)動(dòng)態(tài)控制大小的進(jìn)程池
從結(jié)果可以看出,并發(fā)執(zhí)行的時(shí)間明顯比順序執(zhí)行要快很多,但是進(jìn)程是要耗資源的,所以平時(shí)工作中,進(jìn)程數(shù)也不能開太大。 對(duì)Pool對(duì)象調(diào)用join()方法會(huì)等待所有子進(jìn)程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),讓其不再接受新的Process了
示例2--使用map()_async函數(shù)
print('concurrent:') #創(chuàng)建多個(gè)進(jìn)程,并行執(zhí)行
pool = Pool(3) #創(chuàng)建擁有3個(gè)進(jìn)程數(shù)量的進(jìn)程池
#testFL:要處理的數(shù)據(jù)列表,run:處理testFL列表中數(shù)據(jù)的函數(shù)
pool.map_async(run, testFL)
pool.close() #關(guān)閉進(jìn)程池,不再接受新的進(jìn)程
pool.join() #主進(jìn)程阻塞等待子進(jìn)程的退出
t2 =time.time()print("并行執(zhí)行時(shí)間:", int(t2 - t1))
運(yùn)行結(jié)果:
從結(jié)果可以看出,map_async()和map()用時(shí)相同。目前還沒有看出兩者的區(qū)別,后面知道后再完善
示例3--使用apply()函數(shù)
print('concurrent:') #創(chuàng)建多個(gè)進(jìn)程,并行執(zhí)行
pool = Pool(3) #創(chuàng)建擁有3個(gè)進(jìn)程數(shù)量的進(jìn)程池
#testFL:要處理的數(shù)據(jù)列表,run:處理testFL列表中數(shù)據(jù)的函數(shù)
for fn intestFL:
pool.apply(run, (fn,))
pool.close()#關(guān)閉進(jìn)程池,不再接受新的進(jìn)程
pool.join() #主進(jìn)程阻塞等待子進(jìn)程的退出
t2 =time.time()print("并行執(zhí)行時(shí)間:", int(t2 - t1))
運(yùn)行結(jié)果:
可見,使用apply()方法,并行執(zhí)行和順序執(zhí)行用時(shí)相同,經(jīng)過試驗(yàn),進(jìn)程數(shù)目增大也不會(huì)減少并行執(zhí)行的時(shí)間
原因:以阻塞的形式產(chǎn)生進(jìn)程任務(wù),生成1個(gè)任務(wù)進(jìn)程并等它執(zhí)行完出池,第2個(gè)進(jìn)程才會(huì)進(jìn)池,主進(jìn)程一直阻塞等待,每次只執(zhí)行1個(gè)進(jìn)程任務(wù)
示例4--使用apply_async()函數(shù)
print('concurrent:') #創(chuàng)建多個(gè)進(jìn)程,并行執(zhí)行
pool = Pool(3) #創(chuàng)建擁有3個(gè)進(jìn)程數(shù)量的進(jìn)程池
#testFL:要處理的數(shù)據(jù)列表,run:處理testFL列表中數(shù)據(jù)的函數(shù)
for fn intestFL:pool.apply_async(run, (fn,))
pool.close()#關(guān)閉進(jìn)程池,不再接受新的進(jìn)程
pool.join() #主進(jìn)程阻塞等待子進(jìn)程的退出
t2 =time.time()print("并行執(zhí)行時(shí)間:", int(t2 - t1))
運(yùn)行結(jié)果:
可見,使用apply_async()方法,并行執(zhí)行時(shí)間與使用map()、map_async()方法相同
注意:
map_async()和map()方法,第2個(gè)參數(shù)可以是列表也可以是元祖,如下圖:
而使用apply()和apply_async()方法時(shí),第2個(gè)參數(shù)只能傳入元祖,傳入列表進(jìn)程不會(huì)被執(zhí)行,如下圖:
三、apply_async()方法callback參數(shù)的用法
示例:
from multiprocessing importPoolimporttimedeffun_01(i):
time.sleep(2)print('start_time:', time.ctime())return i + 100
deffun_02(arg):print('end_time:', arg, time.ctime())if __name__ == '__main__':
pool= Pool(3)for i in range(4):
pool.apply_async(func=fun_01, args=(i,), callback=fun_02) #fun_02的入?yún)閒un_01的返回值
#pool.apply_async(func=fun_01, args=(i,))
pool.close()
pool.join()print('done')
運(yùn)行結(jié)果:
start_time: Thu Nov 14 16:31:41 2019end_time:100 Thu Nov 14 16:31:41 2019start_time: Thu Nov14 16:31:41 2019end_time:101 Thu Nov 14 16:31:41 2019start_time: Thu Nov14 16:31:41 2019end_time:102 Thu Nov 14 16:31:41 2019start_time: Thu Nov14 16:31:43 2019end_time:103 Thu Nov 14 16:31:43 2019done
map_async()方法callback參數(shù)的用法與apply_async()相同
四、使用進(jìn)程池并關(guān)注結(jié)果
importmultiprocessingimporttimedeffunc(msg):print('hello :', msg, time.ctime())
time.sleep(2)print('end', time.ctime())return 'done' +msgif __name__ == '__main__':
pool= multiprocessing.Pool(2)
result=[]for i in range(3):
msg= 'hello %s' %i
result.append(pool.apply_async(func=func, args=(msg,)))
pool.close()
pool.join()for res inresult:print('***:', res.get()) #get()函數(shù)得出每個(gè)返回結(jié)果的值
print('All end--')
運(yùn)行結(jié)果:
五、多進(jìn)程執(zhí)行多個(gè)函數(shù)
使用apply_async()或者apply()方法,可以實(shí)現(xiàn)多進(jìn)程執(zhí)行多個(gè)方法
示例:
importmultiprocessingimporttimeimportosdefLee():print('\nRun task Lee--%s******ppid:%s' % (os.getpid(), os.getppid()), '~~~~', time.ctime())
start=time.time()
time.sleep(5)
end=time.time()print('Task Lee,runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())defMarlon():print("\nRun task Marlon-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
start=time.time()
time.sleep(10)
end=time.time()print('Task Marlon runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())defAllen():print("\nRun task Allen-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
start=time.time()
time.sleep(15)
end=time.time()print('Task Allen runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())defFrank():print("\nRun task Frank-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
start=time.time()
time.sleep(20)
end=time.time()print('Task Frank runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())if __name__ == '__main__':
func_list=[Lee, Marlon, Allen, Frank]print('parent process id %s' %os.getpid())
pool= multiprocessing.Pool(4)for func infunc_list:
pool.apply_async(func)print('Waiting for all subprocesses done...')
pool.close()
pool.join()print('All subprocesses done.')
運(yùn)行結(jié)果:
parent process id 84172Waitingforall subprocesses done...
Run task Lee--84868******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019Run task Marlon-84252******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019Run task Allen-85344******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019Run task Frank-85116******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019Task Lee,runs5.00 seconds. ~~~~ Thu Nov 14 17:44:19 2019Task Marlon runs10.00 seconds. ~~~~ Thu Nov 14 17:44:24 2019Task Allen runs15.00 seconds. ~~~~ Thu Nov 14 17:44:29 2019Task Frank runs20.00 seconds. ~~~~ Thu Nov 14 17:44:34 2019All subprocesses done.
六、其他
1、獲取當(dāng)前計(jì)算機(jī)的CPU數(shù)量
總結(jié)
以上是生活随笔為你收集整理的python 多进程 调用模块内函数_Python进程池multiprocessing.Pool的用法的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 教育考试专用WPS教育考试专用表
- 下一篇: 超实用PS教程PS实用教程