python全栈学习--day40()
上一章節內容匯總:
進程 ?multiprocess
Process ?------ 進程 在python中創建一個進程的模塊
start?
daemon 守護進程
join 等待子進程執行結束
鎖 Lock
acquire release
鎖是一個同步控制的工具
如果同一時刻有多個進程同時執行一段代碼,那么在內存中數據是不會發生沖突的。但是,如果涉及到文件,數據庫機會發生資源沖突的問題。我們就需要用鎖來把這段代碼鎖起來。任意一個進程執行了acquire之后,其他所有的進程都會在這里阻塞,等待一個release
信號量 ?semaphore
鎖 + 計數器
set clear is set 控制對象的狀態
wait ?根據狀態的不同執行效果也不同
狀態是True ----> pass
狀態是False ----> 阻塞
一般wait是和set clear 放在不同的進程中
set/clear 是負責控制狀態
我可以在一個進程中控制另外一個或多個進程的運行情況。
?
IPC通信
隊列 Queue
管道 PIPE
?
進程間通信——隊列和管道(multiprocess.Queue、multiprocess.Pipe)
隊列
from multiprocessing import Process,Queue Queue([maxsize]) 創建共享的進程隊列。 參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。 底層隊列使用管道和鎖定實現。 Queue([maxsize]) 創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。 Queue的實例q具有以下方法:q.get( [ block [ ,timeout ] ] ) 返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用于控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。q.get_nowait( ) 同q.get(False)方法。q.put(item [, block [,timeout ] ] ) 將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。q.qsize() 返回隊列中目前項目的正確數量。此函數的結果并不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。q.empty() 如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。q.full() 如果q已滿,返回為True. 由于線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。方法介紹說明
隊列可以在創建的時候指定一個容量
如果在程序運行的過程中,隊列已經有了足夠的數據,在put就會發生阻塞。
如果隊列為空,在get就會發生阻塞。內存------制定容量
判斷隊列是否為空
from multiprocessing import Process,Queue q = Queue() print(q.empty())執行輸出:True
如果隊列已滿,在增加加值的操作,會被阻塞,知道隊列有空余的。
from multiprocessing import Process,Queue q = Queue(10) #創建一個只能放10個value的隊列 for i in range(10):q.put(i) #增加一個value print(q.qsize()) #返回隊列中目前的正確數量 print(q.full()) #如果q已滿,返回為True q.put(111) #在增加一個值 print(q.empty())
從結果中,可以看出,下面的操作q.put(111)之后的代碼被阻塞了。
總結:
隊列可以在創建的時候指定一個容量
如果在程序運行的過程中,隊列已經有了足夠的數據,在put就會發送阻塞
?如果隊列為空,在get就會發送阻塞
?為什么要向隊列的長度呢?是為了防止內存爆炸。
一個隊列,不能無限制的存儲。畢竟,內存是有限制的。
上面提到的put,get,full ,empty都是不準的。
因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
如果其他進程或線程正在往隊列中添加項目,可能是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
import time from multiprocessing import Process,Queue def wahaha(q):print(q.get())q.put(2) #增加數字2if __name__ == '__main__':q = Queue()p = Process(target=wahaha,args=[q,])p.start()q.put(1) #增加數字1time.sleep(0.1)print(q.get())
先執行主進程的q.get(),在執行子進程的q.get()
在進程中使用隊列可以完成雙向通信
隊列是進程安全的,內置了鎖類保證隊列中的每一個數據都不會被多個進程重復取
?
在同一個時刻,只能有一個進程來取值,它內部有一個鎖的機制。那么另外一個進程就會被阻塞一會,但是阻塞的時間非常短隊列能保證數據安全,同一個數據,不能被多個進程獲取。
?
生產者消費者模型
解決數據供需不平衡的情況
from multiprocessing import Process,Queue def producer(q,name,food):for i in range(5):print('{}生產了{}{}'.format(name,food,i))if __name__ == '__main__':q = Queue()Process(target=producer,args=[q,'四川','螞蟻上樹']).start()Process(target=producer,args=[q,'廣東','龍虎斗']).start()
增加一個消費者
import time import random from multiprocessing import Process,Queue def producer(q,name,food):for i in range(5):time.sleep(random.random()) #模擬生產時間print('{}生產了{}{}'.format(name,food,i))q.put('{}{}'.format(food,i)) #放入隊列def consumer(q,name):for i in range(10):food = q.get() #獲取隊列time.sleep(random.random()) #模擬吃時間print('{}吃了{}'.format(name,food))if __name__ == '__main__':q = Queue()Process(target=producer,args=[q,'四川','麻婆豆腐']).start()Process(target=producer,args=[q,'廣東','八寶冬瓜盅']).start()Process(target=consumer,args=[q,'John']).start()
注意:必須將消費者的range(10)修改為5,否則程序會卡住。為什么呢?因為隊列已經是空的。在取機會阻塞這樣才能解決供需平衡。
那么問題來了,如果一個消費者,吃的比較快呢?
在修改range值?太low 了,能者多勞嘛。不能使用q.empty(),他是不準確的。看下圖,有可能一開始,隊列就空了。
看下圖的0.1更快
看下面的解決方案:
import time import random from multiprocessing import Process,Queue def producer(q,name,food):for i in range(5):time.sleep(random.random()) #模擬生產時間print('{}生產了{}{}'.format(name,food,i))q.put('{}{}'.format(food,i)) #放入隊列def consumer(q,name):for i in range(10):food = q.get() #獲取隊列time.sleep(random.random()) #模擬吃時間print('{}吃了{}'.format(name,food))if __name__ == '__main__':q = Queue()#創建隊列對象,如果不提供maxsize,則隊列數無限制p1 = Process(target=producer,args=[q,'四川','麻婆豆腐'])p2 = Process(target=producer,args=[q,'廣東','八寶冬瓜盅'])p1.start() #啟動進程p2.start() #啟動進程Process(target=consumer,args=[q,'John']).start()Process(target=consumer, args=[q, 'jenny']).start()p1.join()p2.join()q.put('鯽魚豆腐')q.put('大醬肘子')
?為什么要有2個done ?因為有兩個2消費者
為什么要有2個join ? 因為必須要等廚師做完菜才可以。
最后輸出2個done,表示通知2個顧客,菜已經上完了,顧客要結賬了。
2個消費者,都會執行break。通俗的來講,親,您一共消費了xx元,請付款!
?
上面的解決方案,代碼太長了,有一個消費者,就得done一次。
下面介紹JoinableQueue
JoinableQueue([maxsize])? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。?
?
JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:q.task_done() 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大于從隊列中刪除的項目數量,將引發ValueError異常。q.join() 生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。 下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,并等待它們被處理。?
JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:q.task_done() 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大于從隊列中刪除的項目數量,將引發ValueError異常。q.join() 生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。 下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,并等待它們被處理。?JoinableQueue隊列實現消費之生產者模型
?
轉載于:https://www.cnblogs.com/haowen980/p/9036952.html
總結
以上是生活随笔為你收集整理的python全栈学习--day40()的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java的重要性
- 下一篇: @property、@sythesize