并发编程之多进程
一 multiprocessing模塊介紹
python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
? ? multiprocessing模塊用來開啟子進程,并在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。
?multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
? ? 需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限于該進程內。
二 Process類的介紹
創建進程的類:
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,
可用來開啟一個子進程強調:
1. 需要使用關鍵字的方式來指定參數
2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號
參數介紹:
group參數未使用,值始終為Nonetarget表示調用對象,即子進程要執行的任務args表示調用對象的位置參數元組,args=(1,2,'egon',)kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}name為子進程的名稱
方法介紹:
p.start():啟動進程,并調用該子進程中的p.run() p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵尸進程,
使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖p.is_alive():如果p仍然運行,返回Truep.join([timeout]):主線程等待p終止(強調:是主線程處于等的狀態,而p是處于運行的狀態)。
timeout是可選的超時時間。
屬性介紹:
p.daemon:默認值為False,如果設為True,代表p為后臺運行的守護進程,當p的父進程
終止時,p也隨之終止,并且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置p.name:進程的名稱p.pid:進程的pid
三,Process類的使用
注意:在windows中Process()必須放到# if __name__ == '__main__':下
Since Windows has no fork, the multiprocessing module starts a new Python
process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of
new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() insideif __name__ == "__main__"since statements inside this if-statement will not get called upon import.由于Windows沒有fork,多處理模塊啟動一個新的Python進程并導入調用模塊。 如果在導入時調用Process(),那么這將啟動無限繼承的新進程(或直到機器耗盡資源)。
這是隱藏對Process()內部調用的原,使用if __name__ == “__main __”,這個if語句
中的語句將不會在導入時被調用。
創建并開啟子進程的方式一
import time
import random
from multiprocessing import Processdef piao(name):print('%s piaoing' %name)time.sleep(random.randrange(1,5))print('%s piao end' %name)if __name__ == '__main__':#實例化得到四個對象p1=Process(target=piao,args=('egon',)) #必須加,號p2=Process(target=piao,args=('alex',))p3=Process(target=piao,args=('wupeqi',))p4=Process(target=piao,args=('yuanhao',))#調用對象下的方法,開啟四個進程p1.start()p2.start()p3.start()p4.start()print('主')
創建并開啟子進程的方式二
import time
import random
from multiprocessing import Processclass Piao(Process):def __init__(self,name):super().__init__()self.name=namedef run(self):print('%s piaoing' %self.name)time.sleep(random.randrange(1,5))print('%s piao end' %self.name)if __name__ == '__main__':#實例化得到四個對象p1=Piao('egon')p2=Piao('alex')p3=Piao('wupeiqi')p4=Piao('yuanhao')#調用對象下的方法,開啟四個進程p1.start() #start會自動調用runp2.start()p3.start()p4.start()print('主')
進程直接的內存空間是隔離的
from multiprocessing import Process
n=100 #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就可以了
def work():global nn=0print('子進程內: ',n)if __name__ == '__main__':p=Process(target=work)p.start()print('主進程內: ',n)
僵尸進程與孤兒進程(了解)
參考博客:http://www.cnblogs.com/Anker/p/3271773.html一:僵尸進程(有害)僵尸進程:一個進程使用fork創建子進程,如果子進程退出,而父進程并沒有調用wait或waitpid獲取子進程的狀態信息,那么子進程的進程描述符仍然保存在系統中。這種進程稱之為僵死進程。詳解如下我們知道在unix/linux中,正常情況下子進程是通過父進程創建的,子進程在創建新的進程。子進程的結束和父進程的運行是一個異步過程,即父進程永遠無法預測子進程到底什么時候結束,如果子進程一結束就立刻回收其全部資源,那么在父進程內將無法獲取子進程的狀態信息。因此,UNⅨ提供了一種機制可以保證父進程可以在任意時刻獲取子進程結束時的狀態信息:
1、在每個進程退出的時候,內核釋放該進程所有的資源,包括打開的文件,占用的內存等。但是仍然為其保留一定的信息(包括進程號the process ID,退出狀態the termination status of the process,運行時間the amount of CPU time taken by the process等)
2、直到父進程通過wait / waitpid來取時才釋放. 但這樣就導致了問題,如果進程不調用wait / waitpid的話,那么保留的那段信息就不會釋放,其進程號就會一直被占用,但是系統所能使用的進程號是有限的,如果大量的產生僵死進程,將因為沒有可用的進程號而導致系統不能產生新的進程. 此即為僵尸進程的危害,應當避免。任何一個子進程(init除外)在exit()之后,并非馬上就消失掉,而是留下一個稱為僵尸進程(Zombie)的數據結構,等待父進程處理。這是每個子進程在結束時都要經過的階段。如果子進程在exit()之后,父進程沒有來得及處理,這時用ps命令就能看到子進程的狀態是“Z”。如果父進程能及時 處理,可能用ps命令就來不及看到子進程的僵尸狀態,但這并不等于子進程不經過僵尸狀態。 如果父進程在子進程結束之前退出,則子進程將由init接管。init將會以父進程的身份對僵尸狀態的子進程進行處理。二:孤兒進程(無害)孤兒進程:一個父進程退出,而它的一個或多個子進程還在運行,那么那些子進程將成為孤兒進程。孤兒進程將被init進程(進程號為1)所收養,并由init進程對它們完成狀態收集工作。孤兒進程是沒有父進程的進程,孤兒進程這個重任就落到了init進程身上,init進程就好像是一個民政局,專門負責處理孤兒進程的善后工作。每當出現一個孤兒進程的時候,內核就把孤 兒進程的父進程設置為init,而init進程會循環地wait()它的已經退出的子進程。這樣,當一個孤兒進程凄涼地結束了其生命周期的時候,init進程就會代表黨和政府出面處理它的一切善后工作。因此孤兒進程并不會有什么危害。我們來測試一下(創建完子進程后,主進程所在的這個腳本就退出了,當父進程先于子進程結束時,子進程會被init收養,成為孤兒進程,而非僵尸進程),文件內容import os
import sys
import timepid = os.getpid()
ppid = os.getppid()
print 'im father', 'pid', pid, 'ppid', ppid
pid = os.fork()
#執行pid=os.fork()則會生成一個子進程
#返回值pid有兩種值:
# 如果返回的pid值為0,表示在子進程當中
# 如果返回的pid值>0,表示在父進程當中
if pid > 0:print 'father died..'sys.exit(0)# 保證主線程退出完畢
time.sleep(1)
print 'im child', os.getpid(), os.getppid()執行文件,輸出結果:
im father pid 32515 ppid 32015
father died..
im child 32516 1看,子進程已經被pid為1的init進程接收了,所以僵尸進程在這種情況下是不存在的,存在只有孤兒進程而已,孤兒進程聲明周期結束自然會被init來銷毀。三:僵尸進程危害場景:例如有個進程,它定期的產 生一個子進程,這個子進程需要做的事情很少,做完它該做的事情之后就退出了,因此這個子進程的生命周期很短,但是,父進程只管生成新的子進程,至于子進程 退出之后的事情,則一概不聞不問,這樣,系統運行上一段時間之后,系統中就會存在很多的僵死進程,倘若用ps命令查看的話,就會看到很多狀態為Z的進程。 嚴格地來說,僵死進程并不是問題的根源,罪魁禍首是產生出大量僵死進程的那個父進程。因此,當我們尋求如何消滅系統中大量的僵死進程時,答案就是把產生大 量僵死進程的那個元兇槍斃掉(也就是通過kill發送SIGTERM或者SIGKILL信號啦)。槍斃了元兇進程之后,它產生的僵死進程就變成了孤兒進 程,這些孤兒進程會被init進程接管,init進程會wait()這些孤兒進程,釋放它們占用的系統進程表中的資源,這樣,這些已經僵死的孤兒進程 就能瞑目而去了。四:測試
#1、產生僵尸進程的程序test.py內容如下#coding:utf-8
from multiprocessing import Process
import time,osdef run():print('子',os.getpid())if __name__ == '__main__':p=Process(target=run)p.start()print('主',os.getpid())time.sleep(1000)#2、在unix或linux系統上執行
[root@vm172-31-0-19 ~]# python3 test.py &
[1] 18652
[root@vm172-31-0-19 ~]# 主 18652
子 18653[root@vm172-31-0-19 ~]# ps aux |grep Z
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 18653 0.0 0.0 0 0 pts/0 Z 20:02 0:00 [python3] <defunct> #出現僵尸進程
root 18656 0.0 0.0 112648 952 pts/0 S+ 20:02 0:00 grep --color=auto Z[root@vm172-31-0-19 ~]# top #執行top命令發現1zombie
top - 20:03:42 up 31 min, 3 users, load average: 0.01, 0.06, 0.12
Tasks: 93 total, 2 running, 90 sleeping, 0 stopped, 1 zombie
%Cpu(s): 0.0 us, 0.3 sy, 0.0 ni, 99.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
KiB Mem : 1016884 total, 97184 free, 70848 used, 848852 buff/cache
KiB Swap: 0 total, 0 free, 0 used. 782540 avail Mem PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
root 20 0 29788 1256 988 S 0.3 0.1 0:01.50 elfin #3、
等待父進程正常結束后會調用wait/waitpid去回收僵尸進程
但如果父進程是一個死循環,永遠不會結束,那么該僵尸進程就會一直存在,僵尸進程過多,就是有害的
解決方法一:殺死父進程
解決方法二:對開啟的子進程應該記得使用join,join會回收僵尸進程
參考python2源碼注釋
class Process(object):def join(self, timeout=None):'''Wait until child process terminates'''assert self._parent_pid == os.getpid(), 'can only join a child process'assert self._popen is not None, 'can only join a started process'res = self._popen.wait(timeout)if res is not None:_current_process._children.discard(self)join方法中調用了wait,告訴系統釋放僵尸進程。discard為從自己的children中剔除解決方法三:http://blog.csdn.net/u010571844/article/details/50419798
查看進程與pid
from multiprocessing import Process
import time
import random
class Piao(Process):def __init__(self,name):# self.name=name# super().__init__() #Process的__init__方法會執行self.name=Piao-1,# #所以加到這里,會覆蓋我們的self.name=name#為我們開啟的進程設置名字的做法super().__init__()self.name=namedef run(self):print('%s is piaoing' %self.name)time.sleep(random.randrange(1,3))print('%s is piao end' %self.name)p=Piao('egon')
p.start()
print('開始')
print(p.pid) #查看pidname與pid
四,Process對象的join方法
在主進程運行過程中如果想要并發的執行其他任務,我們可以開啟子進程,此時主進程的任務和子進程的任務分為兩種情況
一種情況是:在主進程的任務與子進程的任務彼此獨立的情況下,主進程的任務先執行完畢后,主進程還需要等待子進程執行完畢,然后統一回收資源
一種情況是:如果主進程的任務在執行到某一個階段時,需要等待子進程執行完畢后才能繼續執行,就需要一種機制能夠讓主進程監測子進程是否運行完畢,在子進程執行完畢后才繼續執行,否則一直在原地阻塞,這就是join方法的作用。
from multiprocessing import Process
import time
import osdef task(name):print("%s is running ,parent is %s"%(name,os.getppid()))time.sleep(1)if __name__ == '__main__':p1 = Process(target=task,args=('子進程1',))# print(p1.is_alive())p2 = Process(target=task, args=('子進程2',))p3 = Process(target=task, args=('子進程3',))# print(p1.is_alive())p1.start()print(p1.is_alive())p2.start()p3.start()p1.join()p2.join()p3.join()print(p1.is_alive())print("主進程 %s is running ,parent is %s" % (os.getpid(), os.getppid()))print(p1.name)
有人會有疑問,既然join是等待進程結束,那么我像下面join()下去,進程不就變成串行了的嗎?
當然不是了,必須明確join是讓誰等?
詳細解析如下:進程只要start就會在開始運行了,所以p1-p4.start()時,系統中已經有四個并發的進程了而我們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵join是讓主線程等,而p1-p4仍然是并發執行的,p1.join的時候,其余p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接通過檢測,無需等待所以4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間
練習題:改寫下面程序,分別實現下述打印效果
from multiprocessing import Process
import time
import randomdef task(n):time.sleep(random.randint(1,3))print('-------->%s' %n)if __name__ == '__main__':p1=Process(target=task,args=(1,))p2=Process(target=task,args=(2,))p3=Process(target=task,args=(3,))p1.start()p2.start()p3.start()print('-------->4')
效果一:保證最先輸出-------->4
-------->4
-------->1
-------->3
-------->2
程序:不用修改
# _*_ coding: utf-8 _*_
from multiprocessing import Process
import time
import randomdef task(n):time.sleep(random.randint(1,2))print("---------->%s"%n)if __name__ =='__main__':p1 =Process(target=task,args=(1,))p2 = Process(target=task, args=(2,))p3 = Process(target=task, args=(3,))p1.start()p2.start()p3.start()print("---------->4")
效果二:保證最后輸出-------->4
-------->2
-------->3
-------->1
-------->4
加上join
# _*_ coding: utf-8 _*_
from multiprocessing import Process
import time
import randomdef task(n):time.sleep(random.randint(1,2))print("---------->%s"%n)if __name__ =='__main__':p1 =Process(target=task,args=(1,))p2 = Process(target=task, args=(2,))p3 = Process(target=task, args=(3,))p1.start()p2.start()p3.start()p1.join()p2.join()p3.join()print("---------->4")
效果三:保證按順序輸出
-------->1
-------->2
-------->3
-------->4
# _*_ coding: utf-8 _*_
from multiprocessing import Process
import time
import randomdef task(n):time.sleep(random.randint(1,2))print("---------->%s"%n)if __name__ =='__main__':p1 =Process(target=task,args=(1,))p2 = Process(target=task, args=(2,))p3 = Process(target=task, args=(3,))'''這樣寫 是沒有意義的,只是練習一下串行,這樣寫程序就成穿行的了因為多進程的目的就是為了實現并發的效果'''p1.start()p1.join()p2.start()p2.join()p3.start()p3.join()print("---------->4")
思考題:判斷上述三種效果,哪種屬于并發,那種屬于串行?
前兩種屬于并發,第三種屬于串行
五,Process對象的其他屬性或方法
進程對象的其他方法一:terminate與is_alive
?
from multiprocessing import Process
import time
import randomdef task(name):print('%s is piaoing' %name)time.sleep(random.randrange(1,5))print('%s is piao end' %name)if __name__ == '__main__':p1=Process(target=task,args=('egon',))p1.start()p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活print(p1.is_alive()) #結果為Trueprint('主')print(p1.is_alive()) #結果為False
?
進程對象的其他屬性:name與pid
?
from multiprocessing import Process
import time
import randomdef task(name):print('%s is piaoing' %name)time.sleep(random.randrange(1,5))print('%s is piao end' %name)if __name__ == '__main__':p1=Process(target=task,args=('egon',),name='子進程1') #可以用關鍵參數來指定進程名p1.start()print(p1.name,p1.pid,)
?六,守護進程
主進程創建子進程,然后將該進程設置成守護自己的進程,守護進程就好比崇禎皇帝身邊的老太監,崇禎皇帝已死老太監就跟著殉葬了。
關于守護進程需要強調兩點:
其一:守護進程會在主進程代碼執行結束后就終止其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes
are not allowed to have children
如果我們有兩個任務需要并發執行,那么開一個主進程和一個子進程分別去執行就ok了,如果子進程的任務在主進程任務結束后就沒有存在的必要了,那么該子進程應該在開啟前就被設置成守護進程。主進程代碼運行結束,守護進程隨即終止
?
from multiprocessing import Process
import time
import randomdef task(name):print('%s is piaoing' %name)time.sleep(random.randrange(1,3))print('%s is piao end' %name)if __name__ == '__main__':p=Process(target=task,args=('egon',))p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,并且父進程代碼執行結束,p即終止運行p.start()print('主') #只要終端打印出這一行內容,那么守護進程p也就跟著結束掉了
思考下面代碼的執行結果可能有那些情況,為什么?
?
from multiprocessing import Processimport time
def foo():print(123)time.sleep(1)print("end123")def bar():print(456)time.sleep(3)print("end456")if __name__ == '__main__':p1=Process(target=foo)p2=Process(target=bar)p1.daemon=Truep1.start()p2.start()print("main-------")
七,練習題
1,思考開啟進程的方式一和方式二各開啟了幾個進程?
2,進程之間的內存空間是共享的還是隔離的?下述代碼的執行結果是什么?
from multiprocessing import Processn=100 #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就可以了def work():global nn=0print('子進程內: ',n)if __name__ == '__main__':p=Process(target=work)p.start()print('主進程內: ',n)
答:隔離的
子進程 內 0
主進程 內 100
3,基于多進程實現的并發套接字通信?
客戶端
# _*_ coding: utf-8 _*_
# _*_ coding: utf-8 _*_
import socketip_port = ('127.0.0.1', 9999)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.connect(ip_port)while True:try:cmd = input("請輸入:").strip()data = server.send(cmd.encode('utf-8'))if not data:breakexcept ConnectionResetError:breakrecv_data = server.recv(1024)print(recv_data)
server.close()
服務端
# _*_ coding: utf-8 _*_
import socket
from multiprocessing import Processdef talk(conn):while True:try:data = conn.recv(1024)if not data :breakconn.send(data.upper())except ConnectionResetError:breakconn.close()def server(ip_port):server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server.bind(ip_port)server.listen(5)while True:conn, addr = server.accept()p =Process(target=talk,args=(conn,))p.start()server.close()if __name__ == '__main__':ip_port = ('127.0.0.1', 9999)print('啟動')server(ip_port)
4,思考每來一個客戶端,服務端就開啟一個新的進程來服務它,這種實現方式有沒有問題?
? 這種方式是有問題的。
八,互斥鎖
進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或者打印終端是沒有問題的,但是帶來的是競爭,競爭帶來的結果是錯亂,如下:
#并發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
from multiprocessing import Process
import os,time
def work():print('%s is running' %os.getpid())time.sleep(2)print('%s is done' %os.getpid())if __name__ == '__main__':for i in range(3):p=Process(target=work)p.start()
如何控制,就是加鎖處理。而互斥鎖的意思就是互相排斥,如果把多個進程比喻為多個人,互斥鎖的工作原理就是多個人都要去爭搶同一個資源:衛生間,一個人搶到衛生間后上一把鎖,其他人都要等著,等到這個完成任務后釋放鎖,其他人才有可能有一個搶到......所以互斥鎖的原理,就是把并發改成穿行,降低了效率,但保證了數據安全不錯亂
#由并發變成了串行,犧牲了運行效率,但避免了競爭
from multiprocessing import Process,Lock
import os,time
def work(lock):lock.acquire() #加鎖print('%s is running' %os.getpid())time.sleep(2)print('%s is done' %os.getpid())lock.release() #釋放鎖
if __name__ == '__main__':lock=Lock()for i in range(3):p=Process(target=work,args=(lock,))p.start()
模擬搶票練習
多個進程共享同一文件,我們可以把文件當數據庫,用多個進程模擬多個人執行搶票任務
#文件db.txt的內容為:{"count":1}
#注意一定要用雙引號,不然json無法識別
from multiprocessing import Process
import time,jsondef search(name):dic=json.load(open('db.txt'))time.sleep(1)print('\033[43m%s 查到剩余票數%s\033[0m' %(name,dic['count']))def get(name):dic=json.load(open('db.txt'))time.sleep(1) #模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(1) #模擬寫數據的網絡延遲json.dump(dic,open('db.txt','w'))print('\033[46m%s 購票成功\033[0m' %name)def task(name):search(name)get(name)if __name__ == '__main__':for i in range(10): #模擬并發10個客戶端搶票name='<路人%s>' %ip=Process(target=task,args=(name,))p.start()
并發運行,效率高,但競爭寫同一文件,數據寫入錯亂,只有一張票,賣成功給了10個人
<路人0> 查到剩余票數1
<路人1> 查到剩余票數1
<路人2> 查到剩余票數1
<路人3> 查到剩余票數1
<路人4> 查到剩余票數1
<路人5> 查到剩余票數1
<路人6> 查到剩余票數1
<路人7> 查到剩余票數1
<路人8> 查到剩余票數1
<路人9> 查到剩余票數1
<路人0> 購票成功
<路人4> 購票成功
<路人1> 購票成功
<路人5> 購票成功
<路人3> 購票成功
<路人7> 購票成功
<路人2> 購票成功
<路人6> 購票成功
<路人8> 購票成功
<路人9> 購票成功
加鎖處理:購票行為由并發變成了串行,犧牲了運行效率,但保證了數據安全
#把文件db.txt的內容重置為:{"count":1}
from multiprocessing import Process,Lock
import time,jsondef search(name):dic=json.load(open('db.txt'))time.sleep(1)print('\033[43m%s 查到剩余票數%s\033[0m' %(name,dic['count']))def get(name):dic=json.load(open('db.txt'))time.sleep(1) #模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(1) #模擬寫數據的網絡延遲json.dump(dic,open('db.txt','w'))print('\033[46m%s 購票成功\033[0m' %name)def task(name,lock):search(name)with lock: #相當于lock.acquire(),執行完自代碼塊自動執行lock.release()get(name)if __name__ == '__main__':lock=Lock()for i in range(10): #模擬并發10個客戶端搶票name='<路人%s>' %ip=Process(target=task,args=(name,lock))p.start()
執行結果:
<路人0> 查到剩余票數1
<路人1> 查到剩余票數1
<路人2> 查到剩余票數1
<路人3> 查到剩余票數1
<路人4> 查到剩余票數1
<路人5> 查到剩余票數1
<路人6> 查到剩余票數1
<路人7> 查到剩余票數1
<路人8> 查到剩余票數1
<路人9> 查到剩余票數1
<路人0> 購票成功
互斥鎖與join
使用join可以將并發變成串行,互斥鎖的原理也是將并發變成穿行,那我們直接使用join就可以了啊,為何還要互斥鎖,說到這里我趕緊試了一下
#把文件db.txt的內容重置為:{"count":1}
from multiprocessing import Process,Lock
import time,jsondef search(name):dic=json.load(open('db.txt'))print('\033[43m%s 查到剩余票數%s\033[0m' %(name,dic['count']))def get(name):dic=json.load(open('db.txt'))time.sleep(1) #模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(1) #模擬寫數據的網絡延遲json.dump(dic,open('db.txt','w'))print('\033[46m%s 購票成功\033[0m' %name)def task(name,):search(name)get(name)if __name__ == '__main__':for i in range(10):name='<路人%s>' %ip=Process(target=task,args=(name,))p.start()p.join()
執行結果
<路人0> 查到剩余票數1
<路人0> 購票成功
<路人1> 查到剩余票數0
<路人2> 查到剩余票數0
<路人3> 查到剩余票數0
<路人4> 查到剩余票數0
<路人5> 查到剩余票數0
<路人6> 查到剩余票數0
<路人7> 查到剩余票數0
<路人8> 查到剩余票數0
<路人9> 查到剩余票數0
發現使用join將并發改成穿行,確實能保證數據安全,但問題是連查票操作也變成只能一個一個人去查了,很明顯大家查票時應該是并發地去查詢而無需考慮數據準確與否,此時join與互斥鎖的區別就顯而易見了,join是將一個任務整體串行,而互斥鎖的好處則是可以將一個任務中的某一段代碼串行,比如只讓task函數中的get任務串行
def task(name,):search(name) # 并發執行lock.acquire()get(name) #串行執行lock.release()
總結
加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行地修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1、效率低(共享數據基于文件,而文件是硬盤上的數據)
2、需要自己加鎖處理
因此我們最好找尋一種解決方案能夠兼顧:
1、效率高(多個進程共享一塊內存的數據)
2、幫我們處理好鎖問題。
這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機制:隊列和管道。
隊列和管道都是將數據存放于內存中,而隊列又是基于(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,因而隊列才是進程間通信的最佳選擇。
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
十,隊列
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
創建隊列的類(底層就是以管道和鎖定的方式實現):
Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,
可以使用Queue實現多進程之間的數據傳遞。
參數介紹:
maxsize是隊列中允許最大項數,省略則無大小限制。
但需要明確:1、隊列內存放的是消息而非大數據2、隊列占用的是內存空間,因而maxsize即便是無大小限制也受限于內存大小
主要方法介紹:
?
q.put方法用以插入數據到隊列中。
q.get方法可以從隊列讀取并且刪除一個元素。
?
隊列的使用:
from multiprocessing import Process,Queueq=Queue(3)#put ,get ,put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
print(q.full()) #滿了
# q.put(4) #再放就阻塞住了print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
# print(q.get()) #再取就阻塞住了
?
轉載于:https://www.cnblogs.com/wj-1314/p/9042010.html
總結
- 上一篇: 求一个鞠婧祎QQ网名!
- 下一篇: “九月西风兴”下一句是什么