Python开发基础--- 进程间通信、进程池、协程
進(jìn)程間通信
進(jìn)程彼此之間互相隔離,要實(shí)現(xiàn)進(jìn)程間通信(IPC),multiprocessing模塊支持兩種形式:隊(duì)列和管道,這兩種方式都是使用消息傳遞的。
進(jìn)程隊(duì)列queue
不同于線程queue,進(jìn)程queue的生成是用multiprocessing模塊生成的。
在生成子進(jìn)程的時(shí)候,會(huì)將代碼拷貝到子進(jìn)程中執(zhí)行一遍,及子進(jìn)程擁有和主進(jìn)程內(nèi)容一樣的不同的名稱空間。
示例1:
1 import multiprocessing2 def foo():3 q.put([11,'hello',True])4 print(q.qsize())5 6 q=multiprocessing.Queue() #全局定義一個(gè)q進(jìn)程隊(duì)列,在產(chǎn)生子進(jìn)程時(shí)候會(huì)在子進(jìn)程里生成,可以指定最大數(shù),限制隊(duì)列長(zhǎng)度7 if __name__ == '__main__':8 p=multiprocessing.Process(target=foo,args=()) #因?yàn)槊Q空間不同,子進(jìn)程的主線程創(chuàng)建的q隊(duì)列,主進(jìn)程get不到,所以會(huì)阻塞住9 p.start() 10 # foo() #主進(jìn)程執(zhí)行一下函數(shù)就可以訪問(wèn)到了 11 print(q.get())?
示例2:
1 import multiprocessing2 3 def foo():4 q.put([11,'hello',True])5 print(q.qsize())6 7 if __name__ == '__main__':8 q = multiprocessing.Queue() #主進(jìn)程創(chuàng)建一個(gè)q進(jìn)程隊(duì)列9 p=multiprocessing.Process(target=foo,args=()) #因?yàn)槊Q空間不同,子進(jìn)程的主線程找不到q隊(duì)列,所以會(huì)報(bào)錯(cuò)提示沒有q 10 p.start() 11 print(q.get())?
示例3:
1 import multiprocessing2 3 def foo(argument): #定義函數(shù)處理進(jìn)程隊(duì)列4 argument.put([11,'hello',True])5 print(argument.qsize())6 q = multiprocessing.Queue() #全局定義一個(gè)進(jìn)程隊(duì)列7 print('test')8 9 if __name__ == '__main__': 10 x = multiprocessing.Queue() #主進(jìn)程定義一個(gè)進(jìn)程隊(duì)列 11 p=multiprocessing.Process(target=foo,args=(x,)) #主進(jìn)程把值傳給子進(jìn)程就可以處理了 12 p.start() 13 print(x.get()) 14 # foo(q) 15 # print(q.get())?
常用方法
q.put方法用以插入數(shù)據(jù)到隊(duì)列中,put方法還有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,該方法會(huì)阻塞timeout指定的時(shí)間,直到該隊(duì)列有剩余的空間。如果超時(shí),會(huì)拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會(huì)立即拋出Queue.Full異常。 q.get方法可以從隊(duì)列讀取并且刪除一個(gè)元素。同樣,get方法有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,那么在等待時(shí)間內(nèi)沒有取到任何元素,會(huì)拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個(gè)值可用,則立即返回該值,否則,如果隊(duì)列為空,則立即拋出Queue.Empty異常. q.get_nowait():同q.get(False) q.put_nowait():同q.put(False) q.empty():調(diào)用此方法時(shí)q為空則返回True,該結(jié)果不可靠,比如在返回True的過(guò)程中,如果隊(duì)列中又加入了項(xiàng)目。 q.full():調(diào)用此方法時(shí)q已滿則返回True,該結(jié)果不可靠,比如在返回True的過(guò)程中,如果隊(duì)列中的項(xiàng)目被取走。 q.qsize():返回隊(duì)列中目前項(xiàng)目的正確數(shù)量,結(jié)果也不可靠,理由同q.empty()和q.full()一樣?
其他方法
q.cancel_join_thread():不會(huì)在進(jìn)程退出時(shí)自動(dòng)連接后臺(tái)線程。可以防止join_thread()方法阻塞 q.close():關(guān)閉隊(duì)列,防止隊(duì)列中加入更多數(shù)據(jù)。調(diào)用此方法,后臺(tái)線程將繼續(xù)寫入那些已經(jīng)入隊(duì)列但尚未寫入的數(shù)據(jù),但將在此方法完成時(shí)馬上關(guān)閉。如果q被垃圾收集,將調(diào)用此方法。關(guān)閉隊(duì)列不會(huì)在隊(duì)列使用者中產(chǎn)生任何類型的數(shù)據(jù)結(jié)束信號(hào)或異常。例如,如果某個(gè)使用者正在被阻塞在get()操作上,關(guān)閉生產(chǎn)者中的隊(duì)列不會(huì)導(dǎo)致get()方法返回錯(cuò)誤。 q.join_thread():連接隊(duì)列的后臺(tái)線程。此方法用于在調(diào)用q.close()方法之后,等待所有隊(duì)列項(xiàng)被消耗。默認(rèn)情況下,此方法由不是q的原始創(chuàng)建者的所有進(jìn)程調(diào)用。調(diào)用q.cancel_join_thread方法可以禁止這種行為?
另一個(gè)創(chuàng)建進(jìn)程隊(duì)列的類
http://www.cnblogs.com/zero527/p/7211909.html
管道pipe
管道就是管道,就像生活中的管道,兩頭都能進(jìn)能出
默認(rèn)管道是全雙工的,如果創(chuàng)建管道的時(shí)候映射成False,左邊只能用于接收,右邊只能用于發(fā)送,類似于單行道
最簡(jiǎn)單的管道雙向通信示例:
1 import multiprocessing2 3 def foo(sk):4 sk.send('hello world')5 print(sk.recv())6 7 if __name__ == '__main__':8 conn1,conn2=multiprocessing.Pipe() #開辟兩個(gè)口,都是能進(jìn)能出,括號(hào)中如果False即單向通信9 p=multiprocessing.Process(target=foo,args=(conn1,)) #子進(jìn)程使用sock口,調(diào)用foo函數(shù) 10 p.start() 11 print(conn2.recv()) #主進(jìn)程使用conn口接收 12 conn2.send('hi son') #主進(jìn)程使用conn口發(fā)送?
常用方法
conn1.recv():接收conn2.send(obj)發(fā)送的對(duì)象。如果沒有消息可接收,recv方法會(huì)一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會(huì)拋出EOFError。 conn1.send(obj):通過(guò)連接發(fā)送對(duì)象。obj是與序列化兼容的任意對(duì)象注意:send()和recv()方法使用pickle模塊對(duì)對(duì)象進(jìn)行序列化
?
其他方法
conn1.close():關(guān)閉連接。如果conn1被垃圾回收,將自動(dòng)調(diào)用此方法,不用的時(shí)候兩邊都要closeconn1.fileno():返回連接使用的整數(shù)文件描述符conn1.poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True。timeout指定等待的最長(zhǎng)時(shí)限。如果省略此參數(shù),方法將立即返回結(jié)果。如果將timeout射成None,操作將無(wú)限期地等待數(shù)據(jù)到達(dá)。conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息。maxlength指定要接收的最大字節(jié)數(shù)。如果進(jìn)入的消息,超過(guò)了這個(gè)最大值,將引發(fā)IOError異常,并且在連接上無(wú)法進(jìn)行進(jìn)一步讀取。如果連接的另外一端已經(jīng)關(guān)閉,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常。conn.send_bytes(buffer [, offset [, size]]):通過(guò)連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)接口的任意對(duì)象,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進(jìn)行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對(duì)象中,該對(duì)象支持可寫入的緩沖區(qū)接口(即bytearray對(duì)象或類似的對(duì)象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)。如果消息長(zhǎng)度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。?
注意:生產(chǎn)者和消費(fèi)者都沒有使用管道的某個(gè)端點(diǎn),就應(yīng)該將其關(guān)閉,如在生產(chǎn)者中關(guān)閉管道的右端,在消費(fèi)者中關(guān)閉管道的左端。如果忘記執(zhí)行這些步驟,程序可能再消費(fèi)者中的recv()操作上掛起。管道是由操作系統(tǒng)進(jìn)行引用計(jì)數(shù)的,必須在所有進(jìn)程中關(guān)閉管道后才能生產(chǎn)EOFError異常。因此在生產(chǎn)者中關(guān)閉管道不會(huì)有任何效果,付費(fèi)消費(fèi)者中也關(guān)閉了相同的管道端點(diǎn)。
1 from multiprocessing import Process,Pipe2 3 import time,os4 def consumer(p,name):5 left,right=p6 left.close()7 while True:8 try:9 baozi=right.recv() 10 print('%s 收到包子:%s' %(name,baozi)) 11 except EOFError: 12 right.close() 13 break 14 def producer(seq,p): 15 left,right=p 16 right.close() 17 for i in seq: 18 left.send(i) 19 # time.sleep(1) 20 else: 21 left.close() 22 if __name__ == '__main__': 23 left,right=Pipe() 24 c1=Process(target=consumer,args=((left,right),'c1')) 25 c1.start() 26 seq=(i for i in range(10)) 27 producer(seq,(left,right)) 28 right.close() 29 left.close() 30 c1.join() 31 print('主進(jìn)程')1 from multiprocessing import Process,Pipe2 3 import time,os4 def consumer(p,name):5 left,right=p6 left.close()7 while True:8 try:9 baozi=right.recv() 10 print('%s 收到包子:%s' %(name,baozi)) 11 except EOFError: 12 right.close() 13 break 14 def producer(seq,p): 15 left,right=p 16 right.close() 17 for i in seq: 18 left.send(i) 19 # time.sleep(1) 20 else: 21 left.close() 22 if __name__ == '__main__': 23 left,right=Pipe() 24 c1=Process(target=consumer,args=((left,right),'c1')) 25 c1.start() 26 seq=(i for i in range(10)) 27 producer(seq,(left,right)) 28 right.close() 29 left.close() 30 c1.join() 31 print('主進(jìn)程')
?
?
共享數(shù)據(jù)manage
Queue和pipe只是實(shí)現(xiàn)了數(shù)據(jù)交互,并沒實(shí)現(xiàn)數(shù)據(jù)共享,即一個(gè)進(jìn)程去更改另一個(gè)進(jìn)程的數(shù)據(jù)。
注:進(jìn)程間通信應(yīng)該盡量避免使用共享數(shù)據(jù)的方式
?
共享數(shù)據(jù):列表
1 from multiprocessing import Manager,Process2 def foo(l,i):3 l.append(i**i)4 if __name__ == '__main__':5 man=Manager()6 ml=man.list([11,22,33])7 l=[]8 for i in range(5):9 p=Process(target=foo,args=(ml,i)) 10 p.start() 11 l.append(p) 12 for i in l: #必須要join,不然會(huì)執(zhí)行報(bào)錯(cuò),處理一個(gè)數(shù)據(jù)必須要一個(gè)個(gè)來(lái),不能同時(shí)處理一個(gè)數(shù)據(jù) 13 i.join() 14 print(ml)?
共享數(shù)據(jù):字典
1 from multiprocessing import Manager,Process2 def foo(d,k,v):3 d[k]=v4 if __name__ == '__main__':5 man=Manager()6 md=man.dict({'name':'bob'})7 l=[]8 for i in range(5):9 p=Process(target=foo,args=(md,i,'a')) 10 p.start() 11 l.append(p) 12 for i in l: #必須要join,不然會(huì)執(zhí)行報(bào)錯(cuò),處理一個(gè)數(shù)據(jù)必須要一個(gè)個(gè)來(lái),不能同時(shí)處理一個(gè)數(shù)據(jù) 13 i.join() 14 print(md)?
?
進(jìn)程池
開多進(jìn)程是為了并發(fā),通常有幾個(gè)cpu核心就開幾個(gè)進(jìn)程,但是進(jìn)程開多了會(huì)影響效率,主要體現(xiàn)在切換的開銷,所以引入進(jìn)程池限制進(jìn)程的數(shù)量。
進(jìn)程池內(nèi)部維護(hù)一個(gè)進(jìn)程序列,當(dāng)使用時(shí),則去進(jìn)程池中獲取一個(gè)進(jìn)程,如果進(jìn)程池序列中沒有可供使用的進(jìn)進(jìn)程,那么程序就會(huì)等待,直到進(jìn)程池中有可用進(jìn)程為止。
示例:
1 from multiprocessing import Pool2 import time3 4 def foo(n):5 print(n)6 time.sleep(1)7 8 if __name__ == '__main__':9 pool_obj=Pool(5) # 10 for i in range(47): 11 # pool_obj.apply_async(func=foo,args=(i,)) 12 pool_obj.apply(func=foo,args=(i,)) #子進(jìn)程的生成是靠進(jìn)程池對(duì)象維護(hù)的 13 # apply同步,子進(jìn)程一個(gè)個(gè)執(zhí)行 14 # apply_async異步,多個(gè)子進(jìn)程一起執(zhí)行 15 pool_obj.close() 16 pool_obj.join() 17 print('ending')?
常用方法:
pool_obj.apply(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。需要強(qiáng)調(diào)的是:此操作并不會(huì)在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過(guò)不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async() pool_obj.apply_async(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對(duì)象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí),將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。 pool_obj.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成 pool_obj.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用?
其他方法:
方法apply_async()和map_async()的返回值是AsyncResul的實(shí)例obj。實(shí)例具有以下方法 obj.get():返回結(jié)果,如果有必要?jiǎng)t等待結(jié)果到達(dá)。timeout是可選的。如果在指定時(shí)間內(nèi)還沒有到達(dá),將引發(fā)一場(chǎng)。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時(shí)再次被引發(fā)。 obj.ready():如果調(diào)用完成,返回True obj.successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常 obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩?obj.terminate():立即終止所有工作進(jìn)程,同時(shí)不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動(dòng)調(diào)用此函數(shù)?
?
協(xié)程
協(xié)程:是單線程下的并發(fā),又稱微線程,纖程。英文名Coroutine。
一句話說(shuō)明什么是線程:協(xié)程是一種用戶態(tài)的輕量級(jí)線程,即協(xié)程是由用戶程序自己控制調(diào)度的。
協(xié)程能保留上一次調(diào)用時(shí)的狀態(tài)(即所有局部狀態(tài)的一個(gè)特定組合),每次過(guò)程重入時(shí),就相當(dāng)于進(jìn)入上一次調(diào)用的狀態(tài),換種說(shuō)法:進(jìn)入上一次離開時(shí)所處邏輯流的位置。
注意:
1. python的線程屬于內(nèi)核級(jí)別的,即由操作系統(tǒng)控制調(diào)度(如單線程一旦遇到io就被迫交出cpu執(zhí)行權(quán)限,切換其他線程運(yùn)行)
2. 單線程內(nèi)開啟協(xié)程,一旦遇到io,從應(yīng)用程序級(jí)別(而非操作系統(tǒng))控制切換
協(xié)程優(yōu)點(diǎn):
1. ?協(xié)程的切換開銷更小,屬于程序級(jí)別的切換,操作系統(tǒng)完全感知不到,因而更加輕量級(jí)
2. 單線程內(nèi)就可以實(shí)現(xiàn)并發(fā)的效果,最大限度地利用cpu
協(xié)程缺點(diǎn):
1.協(xié)程的本質(zhì)是單線程下,無(wú)法利用多核,可以是一個(gè)程序開啟多個(gè)進(jìn)程,每個(gè)進(jìn)程內(nèi)開啟多個(gè)線程,每個(gè)線程內(nèi)開啟協(xié)程
2.協(xié)程指的是單個(gè)線程,因而一旦協(xié)程出現(xiàn)阻塞,將會(huì)阻塞整個(gè)線程
yield實(shí)現(xiàn)協(xié)程并發(fā)
1 import time2 def consumer():3 r=''4 while True:5 n=yield r6 if not n:7 return8 print('[CONSUMER] ←← Consuming %s...' % n)9 time.sleep(1) 10 r='200 Ok' 11 12 def produce(c): 13 next(c) #1.啟動(dòng)生成器 14 n=0 15 while n < 5: 16 n=n+1 17 print('[PRODUCER] →→ Producing %s...' % n) 18 cr=c.send(n) 19 #2.將n傳入到consumer的對(duì)象,yield接收到傳入值開始執(zhí)行代碼,遇到y(tǒng)ield執(zhí)行代碼返回r的值 20 print('[PRODUCER] Consumer return: %s' % cr) 21 #3.produce沒有值了,關(guān)閉整個(gè)過(guò)程 22 c.close() 23 24 if __name__ == '__main__': 25 c=consumer() #生成生成器對(duì)象 26 produce(c) #執(zhí)行調(diào)用?
?
greenlet框架實(shí)現(xiàn)協(xié)程(封裝yield的基礎(chǔ)庫(kù))
greenlet機(jī)制的主要思想是:生成器函數(shù)或者協(xié)程函數(shù)中的yield語(yǔ)句掛起函數(shù)的執(zhí)行,直到稍后使用next()或send()操作進(jìn)行恢復(fù)為止。可以使用一個(gè)調(diào)度器循環(huán)在一組生成器函數(shù)之間協(xié)作多個(gè)任務(wù)。greentlet是python中實(shí)現(xiàn)我們所謂的"Coroutine(協(xié)程)"的一個(gè)基礎(chǔ)庫(kù)。
示例1:
1 from greenlet import greenlet2 def foo():3 print('ok1')4 g2.switch() #阻斷5 print('ok3')6 g2.switch()7 def bar():8 print('ok2')9 g1.switch() 10 print('ok4') 11 12 g1=greenlet(foo) #生成foo函數(shù)的greenlet對(duì)象 13 g2=greenlet(bar) #生成bar函數(shù)的greenlet對(duì)象 14 g1.switch() #1、執(zhí)行g(shù)1對(duì)象,打印ok1 15 #2、遇到g2.switch(),轉(zhuǎn)到g2執(zhí)行打印ok2 16 #3、遇到g1.switch(),轉(zhuǎn)到g1的阻斷處繼續(xù)執(zhí)行打印ok3 17 #4、遇到g2.switch(),轉(zhuǎn)到g2執(zhí)行打印ok4?
示例2:
1 def eat(name):2 print('%s eat food 1' %name)3 gr2.switch('bob')4 print('%s eat food 2' %name)5 gr2.switch()6 def play_phone(name):7 print('%s play 1' %name)8 gr1.switch()9 print('%s play 2' %name) 10 11 gr1=greenlet(eat) 12 gr2=greenlet(play_phone) 13 gr1.switch(name='natasha')#可以在第一次switch時(shí)傳入?yún)?shù),以后都不需要?
這種方法不會(huì)節(jié)省時(shí)間,因?yàn)椴皇莍o操作,而greenlet遇到io操作不會(huì)跳轉(zhuǎn),仍然要io阻斷
?
基于greenlet框架的高級(jí)庫(kù)gevent模塊
gevent是第三方庫(kù),通過(guò)greenlet實(shí)現(xiàn)協(xié)程,其基本思想是:
當(dāng)一個(gè)greenlet遇到IO操作時(shí),比如訪問(wèn)網(wǎng)絡(luò),就自動(dòng)切換到其他的greenlet,等到IO操作完成,再在適當(dāng)?shù)臅r(shí)候切換回來(lái)繼續(xù)執(zhí)行。由于IO操作非常耗時(shí),經(jīng)常使程序處于等待狀態(tài),有了gevent為我們自動(dòng)切換協(xié)程,就保證總有g(shù)reenlet在運(yùn)行,而不是等待IO。
由于切換是在IO操作時(shí)自動(dòng)完成,所以gevent需要修改Python自帶的一些標(biāo)準(zhǔn)庫(kù),這一過(guò)程在啟動(dòng)時(shí)通過(guò)monkey patch完成:
簡(jiǎn)單示例:
1 import gevent2 def foo():3 print('ok1')4 gevent.sleep(4) #模擬io操作5 print('ok3')6 def bar():7 print('ok2')8 gevent.sleep(2)9 print('ok4') 10 11 g1=gevent.spawn(foo) 12 g2=gevent.spawn(bar) 13 gevent.joinall([g1,g2]) #全部阻塞,或者單獨(dú)一個(gè)個(gè)join?
spawn括號(hào)內(nèi)第一個(gè)參數(shù)是函數(shù)名,如foo,后面可以有多個(gè)參數(shù),可以是位置實(shí)參或關(guān)鍵字實(shí)參,都是傳給函數(shù)foo的
注意:
gevent.sleep(4)模擬的是gevent可以識(shí)別的io阻塞,
而time.sleep(2)或其他的阻塞,gevent是不能直接識(shí)別的需要用下面一行代碼,打補(bǔ)丁,就可以識(shí)別了
1 #補(bǔ)丁 2 from gevent import monkey 3 monkey.patch_all()?
必須放到被打補(bǔ)丁者的前面,如time,socket模塊之前
或者我們干脆記憶成:要用gevent,需要將補(bǔ)丁放到文件的開頭
爬蟲示例:
1 from gevent import monkey;monkey.patch_all()2 import gevent3 import requests4 import time5 6 def get_page(url):7 print('GET: %s' %url)8 response=requests.get(url)9 if response.status_code == 200: 10 print('%d bytes received from %s' %(len(response.text),url)) 11 12 13 start_time=time.time() 14 gevent.joinall([ 15 gevent.spawn(get_page,'https://www.python.org/'), 16 gevent.spawn(get_page,'https://www.yahoo.com/'), 17 gevent.spawn(get_page,'https://github.com/'), 18 ]) 19 stop_time=time.time() 20 print('run time is %s' %(stop_time-start_time))?
gevent是一個(gè)基于協(xié)程(coroutine)的Python網(wǎng)絡(luò)函數(shù)庫(kù),通過(guò)使用greenlet提供了一個(gè)在libev事件循環(huán)頂部的高級(jí)別并發(fā)API。主要特性有以下幾點(diǎn):<1> 基于libev的快速事件循環(huán),Linux上面的是epoll機(jī)制<2> 基于greenlet的輕量級(jí)執(zhí)行單元<3> API復(fù)用了Python標(biāo)準(zhǔn)庫(kù)里的內(nèi)容<4> 支持SSL的協(xié)作式sockets<5> 可通過(guò)線程池或c-ares實(shí)現(xiàn)DNS查詢<6> 通過(guò)monkey patching功能來(lái)使得第三方模塊變成協(xié)作式gevent.spawn()方法spawn一些jobs,然后通過(guò)gevent.joinall將jobs加入到微線程執(zhí)行隊(duì)列中等待其完成,設(shè)置超時(shí)為2秒。執(zhí)行后的結(jié)果通過(guò)檢查gevent.Greenlet.value值來(lái)收集。===========================二 1、關(guān)于Linux的epoll機(jī)制:epoll是Linux內(nèi)核為處理大批量文件描述符而作了改進(jìn)的poll,是Linux下多路復(fù)用IO接口select/poll的 增強(qiáng)版本,它能顯著提高程序在大量并發(fā)連接中只有少量活躍的情況下的系統(tǒng)CPU利用率。epoll的優(yōu)點(diǎn):(1)支持一個(gè)進(jìn)程打開大數(shù)目的socket描述符。select的一個(gè)進(jìn)程所打開的FD由FD_SETSIZE的設(shè)置來(lái)限定,而epoll沒有這個(gè)限制,它所支持的FD上限是 最大可打開文件的數(shù)目,遠(yuǎn)大于2048。(2)IO效率不隨FD數(shù)目增加而線性下降:由于epoll只會(huì)對(duì)“活躍”的socket進(jìn)行操作,于是,只有”活躍”的socket才會(huì)主動(dòng)去調(diào)用 callback函數(shù),其他 idle狀態(tài)的socket則不會(huì)。(3)使用mmap加速內(nèi)核與用戶空間的消息傳遞。epoll是通過(guò)內(nèi)核于用戶空間mmap同一塊內(nèi)存實(shí)現(xiàn)的。(4)內(nèi)核微調(diào)。2、libev機(jī)制提供了指定文件描述符事件發(fā)生時(shí)調(diào)用回調(diào)函數(shù)的機(jī)制。libev是一個(gè)事件循環(huán)器:向libev注冊(cè)感興趣的事件,比如socket可讀事件,libev會(huì)對(duì)所注冊(cè)的事件 的源進(jìn)行管理,并在事件發(fā)生時(shí)觸發(fā)相應(yīng)的程序。===========================三‘’‘import geventfrom gevent import socketurls = [‘www.google.com.hk’,’www.example.com’, ‘www.python.org’ ]jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]gevent.joinall(jobs, timeout=2)[job.value for job in jobs][‘74.125.128.199’, ‘208.77.188.166’, ‘82.94.164.162’]’‘’gevent.spawn()方法spawn一些jobs,然后通過(guò)gevent.joinall將jobs加入到微線程執(zhí)行隊(duì)列中等待其完成,設(shè)置超時(shí)為2秒。執(zhí)行后的結(jié)果通過(guò)檢查gevent.Greenlet.value值來(lái)收集。gevent.socket.gethostbyname()函數(shù)與標(biāo)準(zhǔn)的socket.gethotbyname()有相同的接口,但它不會(huì)阻塞整個(gè)解釋器,因此會(huì)使得其他的greenlets跟隨著無(wú)阻的請(qǐng)求而執(zhí)行。Monket patchingPython的運(yùn)行環(huán)境允許我們?cè)谶\(yùn)行時(shí)修改大部分的對(duì)象,包括模塊、類甚至函數(shù)。雖然這樣做會(huì)產(chǎn)生“隱式的副作用”,而且出現(xiàn)問(wèn)題很難調(diào)試,但在需要修改Python本身的基礎(chǔ)行為時(shí),Monkey patching就派上用場(chǎng)了。Monkey patching能夠使得gevent修改標(biāo)準(zhǔn)庫(kù)里面大部分的阻塞式系統(tǒng)調(diào)用,包括socket,ssl,threading和select等模塊,而變成協(xié)作式運(yùn)行。from gevent import monkey ;monkey . patch_socket ()import urllib2通過(guò)monkey.patch_socket()方法,urllib2模塊可以使用在多微線程環(huán)境,達(dá)到與gevent共同工作的目的。事件循環(huán)不像其他網(wǎng)絡(luò)庫(kù),gevent和eventlet類似, 在一個(gè)greenlet中隱式開始事件循環(huán)。沒有必須調(diào)用run()或dispatch()的反應(yīng)器(reactor),在twisted中是有 reactor的。當(dāng)gevent的API函數(shù)想阻塞時(shí),它獲得Hub實(shí)例(執(zhí)行時(shí)間循環(huán)的greenlet),并切換過(guò)去。如果沒有集線器實(shí)例則會(huì)動(dòng)態(tài) 創(chuàng)建。libev提供的事件循環(huán)默認(rèn)使用系統(tǒng)最快輪詢機(jī)制,設(shè)置LIBEV_FLAGS環(huán)境變量可指定輪詢機(jī)制。LIBEV_FLAGS=1為select, LIBEV_FLAGS = 2為poll, LIBEV_FLAGS = 4為epoll,LIBEV_FLAGS = 8為kqueue。Libev的API位于gevent.core下。注意libev API的回調(diào)在Hub的greenlet運(yùn)行,因此使用同步greenlet的API。可以使用spawn()和Event.set()等異步API。?
轉(zhuǎn)載于:https://www.cnblogs.com/chenqizhou/p/7359689.html
總結(jié)
以上是生活随笔為你收集整理的Python开发基础--- 进程间通信、进程池、协程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: WebBrowser,挖坑,跳坑,填坑
- 下一篇: vue框架的知识