05 Python 并发编程(管道,事件,信号量,进程池)
管道
Conn1,conn2 = Pipe()
Conn1.recv()
Conn1.send()
數(shù)據(jù)接收一次就沒有了
from multiprocessing import Process,Pipedef f1(conn):from_zhujincheng = conn.recv()print('子進(jìn)程')print('來自主進(jìn)程的消息:',from_zhujincheng)if __name__ == '__main__':conn1,conn2 = Pipe() #創(chuàng)建一個(gè)管道對象,全雙工,返回管道的兩端,但是一端發(fā)送的消息,只能另外一端接收,自己這一端是不能接收的p1 = Process(target=f1,args=(conn2,))p1.start()conn1.send('出來吧')print('主進(jìn)程')事件
E = Event()? #初識狀態(tài)是false
E.wait()? 當(dāng)事件對象e的狀態(tài)為false的時(shí)候,在wait的地方會阻塞程序,當(dāng)對象狀態(tài)為true的時(shí)候,直接在這個(gè)wait地方繼續(xù)往下執(zhí)行
E.set()? 將事件對象的狀態(tài)改為true,
E.is_set() 查看狀態(tài)
E.clear()? 將事件對象的狀態(tài)改為false
from multiprocessing import Process,Evente = Event() #創(chuàng)建事件對象,這個(gè)對象的初識狀態(tài)為False print('e的狀態(tài)是:',e.is_set()) # Falseprint('進(jìn)程運(yùn)行到這里了') e.set() #將e的狀態(tài)改為True print('e的狀態(tài)是:',e.is_set()) # True e.clear() #將e的狀態(tài)改為False e.wait() #e這個(gè)事件對象如果值為False,就在我加wait的地方等待print('進(jìn)程過了wait')信號量
S = semphore(數(shù)字),內(nèi)部維護(hù)了一個(gè)計(jì)數(shù)器,acquire-1,release+1,為0的時(shí)候,其他的進(jìn)程都要在acquire之前等待
S.acquire()
需要鎖住的代碼
S.release()
import time,random from multiprocessing import Process,Semaphoredef f1(i,s):s.acquire()print('%s男嘉賓到了'%i)time.sleep(random.randint(1,3))s.release()if __name__ == '__main__':s = Semaphore(4) #計(jì)數(shù)器4,acquire一次減一,為0 ,其他人等待,release加1for i in range(10):p = Process(target=f1,args=(i,s))p.start()進(jìn)程池
進(jìn)程的創(chuàng)建和銷毀是很有消耗的,影響代碼執(zhí)行效率
在有進(jìn)程池的代碼中,主進(jìn)程運(yùn)行結(jié)束,進(jìn)程池里面的任務(wù)全部停止,不會等待進(jìn)程池里面的任務(wù)
pl = Pool(數(shù)字) ? 這個(gè)數(shù)字一般是電腦的cpu數(shù)
pl的方法:
Map:異步提交任務(wù),并且傳參需要可迭代類型的數(shù)據(jù),自帶close和join功能
import time from multiprocessing import Process,Pool#對比多進(jìn)程和進(jìn)程池的效率 def f1(n):for i in range(5):n = n + iif __name__ == '__main__':#統(tǒng)計(jì)進(jìn)程池執(zhí)行100個(gè)任務(wù)的時(shí)間s_time = time.time()pool = Pool(4) pool.map(f1,range(100)) e_time = time.time()dif_time = e_time - s_time#統(tǒng)計(jì)100個(gè)進(jìn)程,來執(zhí)行100個(gè)任務(wù)的執(zhí)行時(shí)間p_s_t = time.time() #多進(jìn)程起始時(shí)間p_list = []for i in range(100):p = Process(target=f1,args=(i,))p.start()p_list.append(p)[pp.join() for pp in p_list]p_e_t = time.time()p_dif_t = p_e_t - p_s_tprint('進(jìn)程池的時(shí)間:',dif_time)print('多進(jìn)程的執(zhí)行時(shí)間:',p_dif_t)# 結(jié)果: 進(jìn)程池的時(shí)間: 0.40102291107177734 多進(jìn)程的執(zhí)行時(shí)間: 9.247529029846191 # 可以看出進(jìn)程池運(yùn)行效率遠(yuǎn)遠(yuǎn)大于創(chuàng)建多進(jìn)程
Close : 鎖住進(jìn)程池,防止有其他的新的任務(wù)在提交給進(jìn)程池
Join : 等待著進(jìn)程池將自己里面的任務(wù)都執(zhí)行完
Res = Apply(f1,args=(i,))? #同步執(zhí)行任務(wù),必須等任務(wù)執(zhí)行結(jié)束才能給進(jìn)程池提交下一個(gè)任務(wù),可以直接拿到返回結(jié)果res
import time from multiprocessing import Process,Pooldef f1(n):time.sleep(1)return n*nif __name__ == '__main__':pool = Pool(4)for i in range(10):res = pool.apply(f1,args=(i,))print(res)Res_obj = Apply_async(f1,args=(i,))? #異步提交任務(wù),可以直接拿到結(jié)果對象,從結(jié)果對象里面拿結(jié)果,要用get方法,get方法會阻塞程序,沒有拿到結(jié)果會一直等待
import time from multiprocessing import Process,Pooldef f1(n):time.sleep(0.5)return n*nif __name__ == '__main__':pool = Pool(4)res_list = []for i in range(10):res = pool.apply_async(f1,args=(i,)) # 不能直接打印返回值,因?yàn)橹苯臃祷亟Y(jié)果對象,進(jìn)程還沒執(zhí)行完,結(jié)果對象里沒有數(shù)據(jù) res_list.append(res)pool.close() pool.join()#打印結(jié)果,異步提交之后的結(jié)果對象for i in res_list:print(i.get())回調(diào)函數(shù):
?Apply_async(f1,args=(i,),callback=function)? #將前面f1這個(gè)任務(wù)的返回結(jié)果作為參數(shù)傳給callback指定的那個(gè)function函數(shù)
import os from multiprocessing import Pool,Processdef f1(n):print('傳入的函數(shù)',n)return n*ndef call_back_func(asdf):print('回調(diào)函數(shù)',asdf)if __name__ == '__main__':pool = Pool(4)res = pool.apply_async(f1,args=(5,),callback=call_back_func)pool.close()pool.join()?
轉(zhuǎn)載于:https://www.cnblogs.com/a2534786642/p/10267312.html
總結(jié)
以上是生活随笔為你收集整理的05 Python 并发编程(管道,事件,信号量,进程池)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RedisClient的安装及基本使用
- 下一篇: Python爬虫QQ说说并分析朋友状况