网络编程中的锁与队列
進(jìn)程的其他方法:
1 import os,time 2 from multiprocessing import Process 3 def f1(): 4 print('子進(jìn)程的pid',os.getpid()) #查看當(dāng)前子進(jìn)程的id 5 print('子進(jìn)程的父進(jìn)程的pid',os.getppid()) 6 print(123) 7 8 def f2(): 9 print('子進(jìn)程的pid',) 10 print('父進(jìn)程的pid',) 11 print(123) 12 13 if __name__ == '__main__': 14 p1 = Process(target=f1,name='洪七') 15 p2 = Process(target=f2) 16 p1.start() 17 p2.start() 18 print(p1.name) #查看當(dāng)前子進(jìn)程的name,可以進(jìn)行賦值更改name 19 print('子進(jìn)程的pid',p1.pid) #查看當(dāng)前子進(jìn)程的id 20 print('父進(jìn)程的pid',os.getpid()) 21 def f3(): 22 time.sleep(5) 23 print('子進(jìn)程2') 24 if __name__ == '__main__': 25 p = Process(target=f3,) 26 p.start() 27 print(p.is_alive()) #判斷子進(jìn)程是否還活著,是否還在運(yùn)行 28 p.terminate() #給操作系統(tǒng)發(fā)送一個(gè)結(jié)束進(jìn)程的信號(hào) 29 print(p.is_alive())驗(yàn)證進(jìn)程之間是空間隔離的:
?
1 from multiprocessing import Process 2 3 num = 100 4 5 def f1(): 6 global num 7 num = 3 8 print('子進(jìn)程中的num',num) 9 10 print('>>>>>',num) 11 if __name__ == '__main__': 12 p = Process(target=f1,) 13 p.start() 14 p.join() 15 print('主進(jìn)程中的num',num)?
僵尸進(jìn)程與孤兒進(jìn)程(簡(jiǎn)單了解 一下)
1 一:僵尸進(jìn)程(有害) 2 僵尸進(jìn)程:一個(gè)進(jìn)程使用fork創(chuàng)建子進(jìn)程,如果子進(jìn)程退出,而父進(jìn)程并沒(méi)有調(diào)用wait或waitpid獲取子進(jìn)程的狀態(tài)信息,那么子進(jìn)程的進(jìn)程描述符仍然保存在系統(tǒng)中。這種進(jìn)程稱之為僵死進(jìn)程。詳解如下 3 4 我們知道在unix/linux中,正常情況下子進(jìn)程是通過(guò)父進(jìn)程創(chuàng)建的,子進(jìn)程在創(chuàng)建新的進(jìn)程。子進(jìn)程的結(jié)束和父進(jìn)程的運(yùn)行是一個(gè)異步過(guò)程,即父進(jìn)程永遠(yuǎn)無(wú)法預(yù)測(cè)子進(jìn)程到底什么時(shí)候結(jié)束,如果子進(jìn)程一結(jié)束就立刻回收其全部資源,那么在父進(jìn)程內(nèi)將無(wú)法獲取子進(jìn)程的狀態(tài)信息。 5 6 因此,UNⅨ提供了一種機(jī)制可以保證父進(jìn)程可以在任意時(shí)刻獲取子進(jìn)程結(jié)束時(shí)的狀態(tài)信息: 7 1、在每個(gè)進(jìn)程退出的時(shí)候,內(nèi)核釋放該進(jìn)程所有的資源,包括打開(kāi)的文件,占用的內(nèi)存等。但是仍然為其保留一定的信息(包括進(jìn)程號(hào)the process ID,退出狀態(tài)the termination status of the process,運(yùn)行時(shí)間the amount of CPU time taken by the process等) 8 2、直到父進(jìn)程通過(guò)wait / waitpid來(lái)取時(shí)才釋放. 但這樣就導(dǎo)致了問(wèn)題,如果進(jìn)程不調(diào)用wait / waitpid的話,那么保留的那段信息就不會(huì)釋放,其進(jìn)程號(hào)就會(huì)一直被占用,但是系統(tǒng)所能使用的進(jìn)程號(hào)是有限的,如果大量的產(chǎn)生僵死進(jìn)程,將因?yàn)闆](méi)有可用的進(jìn)程號(hào)而導(dǎo)致系統(tǒng)不能產(chǎn)生新的進(jìn)程. 此即為僵尸進(jìn)程的危害,應(yīng)當(dāng)避免。 9 10 任何一個(gè)子進(jìn)程(init除外)在exit()之后,并非馬上就消失掉,而是留下一個(gè)稱為僵尸進(jìn)程(Zombie)的數(shù)據(jù)結(jié)構(gòu),等待父進(jìn)程處理。這是每個(gè)子進(jìn)程在結(jié)束時(shí)都要經(jīng)過(guò)的階段。如果子進(jìn)程在exit()之后,父進(jìn)程沒(méi)有來(lái)得及處理,這時(shí)用ps命令就能看到子進(jìn)程的狀態(tài)是“Z”。如果父進(jìn)程能及時(shí) 處理,可能用ps命令就來(lái)不及看到子進(jìn)程的僵尸狀態(tài),但這并不等于子進(jìn)程不經(jīng)過(guò)僵尸狀態(tài)。 如果父進(jìn)程在子進(jìn)程結(jié)束之前退出,則子進(jìn)程將由init接管。init將會(huì)以父進(jìn)程的身份對(duì)僵尸狀態(tài)的子進(jìn)程進(jìn)行處理。 11 12 二:孤兒進(jìn)程(無(wú)害) 13 14 孤兒進(jìn)程:一個(gè)父進(jìn)程退出,而它的一個(gè)或多個(gè)子進(jìn)程還在運(yùn)行,那么那些子進(jìn)程將成為孤兒進(jìn)程。孤兒進(jìn)程將被init進(jìn)程(進(jìn)程號(hào)為1)所收養(yǎng),并由init進(jìn)程對(duì)它們完成狀態(tài)收集工作。 15 16 孤兒進(jìn)程是沒(méi)有父進(jìn)程的進(jìn)程,孤兒進(jìn)程這個(gè)重任就落到了init進(jìn)程身上,init進(jìn)程就好像是一個(gè)民政局,專門負(fù)責(zé)處理孤兒進(jìn)程的善后工作。每當(dāng)出現(xiàn)一個(gè)孤兒進(jìn)程的時(shí)候,內(nèi)核就把孤 兒進(jìn)程的父進(jìn)程設(shè)置為init,而init進(jìn)程會(huì)循環(huán)地wait()它的已經(jīng)退出的子進(jìn)程。這樣,當(dāng)一個(gè)孤兒進(jìn)程凄涼地結(jié)束了其生命周期的時(shí)候,init進(jìn)程就會(huì)代表黨和政府出面處理它的一切善后工作。因此孤兒進(jìn)程并不會(huì)有什么危害。 17 18 我們來(lái)測(cè)試一下(創(chuàng)建完子進(jìn)程后,主進(jìn)程所在的這個(gè)腳本就退出了,當(dāng)父進(jìn)程先于子進(jìn)程結(jié)束時(shí),子進(jìn)程會(huì)被init收養(yǎng),成為孤兒進(jìn)程,而非僵尸進(jìn)程),文件內(nèi)容 19 20 import os 21 import sys 22 import time 23 24 pid = os.getpid() 25 ppid = os.getppid() 26 print 'im father', 'pid', pid, 'ppid', ppid 27 pid = os.fork() 28 #執(zhí)行pid=os.fork()則會(huì)生成一個(gè)子進(jìn)程 29 #返回值pid有兩種值: 30 # 如果返回的pid值為0,表示在子進(jìn)程當(dāng)中 31 # 如果返回的pid值>0,表示在父進(jìn)程當(dāng)中 32 if pid > 0: 33 print 'father died..' 34 sys.exit(0) 35 36 # 保證主線程退出完畢 37 time.sleep(1) 38 print 'im child', os.getpid(), os.getppid() 39 40 執(zhí)行文件,輸出結(jié)果: 41 im father pid 32515 ppid 32015 42 father died.. 43 im child 32516 1 44 45 看,子進(jìn)程已經(jīng)被pid為1的init進(jìn)程接收了,所以僵尸進(jìn)程在這種情況下是不存在的,存在只有孤兒進(jìn)程而已,孤兒進(jìn)程聲明周期結(jié)束自然會(huì)被init來(lái)銷毀。 46 47 48 三:僵尸進(jìn)程危害場(chǎng)景: 49 50 例如有個(gè)進(jìn)程,它定期的產(chǎn) 生一個(gè)子進(jìn)程,這個(gè)子進(jìn)程需要做的事情很少,做完它該做的事情之后就退出了,因此這個(gè)子進(jìn)程的生命周期很短,但是,父進(jìn)程只管生成新的子進(jìn)程,至于子進(jìn)程 退出之后的事情,則一概不聞不問(wèn),這樣,系統(tǒng)運(yùn)行上一段時(shí)間之后,系統(tǒng)中就會(huì)存在很多的僵死進(jìn)程,倘若用ps命令查看的話,就會(huì)看到很多狀態(tài)為Z的進(jìn)程。 嚴(yán)格地來(lái)說(shuō),僵死進(jìn)程并不是問(wèn)題的根源,罪魁禍?zhǔn)资钱a(chǎn)生出大量僵死進(jìn)程的那個(gè)父進(jìn)程。因此,當(dāng)我們尋求如何消滅系統(tǒng)中大量的僵死進(jìn)程時(shí),答案就是把產(chǎn)生大 量僵死進(jìn)程的那個(gè)元兇槍斃掉(也就是通過(guò)kill發(fā)送SIGTERM或者SIGKILL信號(hào)啦)。槍斃了元兇進(jìn)程之后,它產(chǎn)生的僵死進(jìn)程就變成了孤兒進(jìn) 程,這些孤兒進(jìn)程會(huì)被init進(jìn)程接管,init進(jìn)程會(huì)wait()這些孤兒進(jìn)程,釋放它們占用的系統(tǒng)進(jìn)程表中的資源,這樣,這些已經(jīng)僵死的孤兒進(jìn)程 就能瞑目而去了。 51 52 四:測(cè)試 53 #1、產(chǎn)生僵尸進(jìn)程的程序test.py內(nèi)容如下 54 55 #coding:utf-8 56 from multiprocessing import Process 57 import time,os 58 59 def run(): 60 print('子',os.getpid()) 61 62 if __name__ == '__main__': 63 p=Process(target=run) 64 p.start() 65 66 print('主',os.getpid()) 67 time.sleep(1000) 68 69 70 #2、在unix或linux系統(tǒng)上執(zhí)行 71 [root@vm172-31-0-19 ~]# python3 test.py & 72 [1] 18652 73 [root@vm172-31-0-19 ~]# 主 18652 74 子 18653 75 76 [root@vm172-31-0-19 ~]# ps aux |grep Z 77 USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND 78 root 18653 0.0 0.0 0 0 pts/0 Z 20:02 0:00 [python3] <defunct> #出現(xiàn)僵尸進(jìn)程 79 root 18656 0.0 0.0 112648 952 pts/0 S+ 20:02 0:00 grep --color=auto Z 80 81 [root@vm172-31-0-19 ~]# top #執(zhí)行top命令發(fā)現(xiàn)1zombie 82 top - 20:03:42 up 31 min, 3 users, load average: 0.01, 0.06, 0.12 83 Tasks: 93 total, 2 running, 90 sleeping, 0 stopped, 1 zombie 84 %Cpu(s): 0.0 us, 0.3 sy, 0.0 ni, 99.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st 85 KiB Mem : 1016884 total, 97184 free, 70848 used, 848852 buff/cache 86 KiB Swap: 0 total, 0 free, 0 used. 782540 avail Mem 87 88 PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 89 root 20 0 29788 1256 988 S 0.3 0.1 0:01.50 elfin 90 91 92 #3、 93 等待父進(jìn)程正常結(jié)束后會(huì)調(diào)用wait/waitpid去回收僵尸進(jìn)程 94 但如果父進(jìn)程是一個(gè)死循環(huán),永遠(yuǎn)不會(huì)結(jié)束,那么該僵尸進(jìn)程就會(huì)一直存在,僵尸進(jìn)程過(guò)多,就是有害的 95 解決方法一:殺死父進(jìn)程 96 解決方法二:對(duì)開(kāi)啟的子進(jìn)程應(yīng)該記得使用join,join會(huì)回收僵尸進(jìn)程 97 參考python2源碼注釋 98 class Process(object): 99 def join(self, timeout=None): 100 ''' 101 Wait until child process terminates 102 ''' 103 assert self._parent_pid == os.getpid(), 'can only join a child process' 104 assert self._popen is not None, 'can only join a started process' 105 res = self._popen.wait(timeout) 106 if res is not None: 107 _current_process._children.discard(self) 108 109 join方法中調(diào)用了wait,告訴系統(tǒng)釋放僵尸進(jìn)程。discard為從自己的children中剔除?
守護(hù)進(jìn)程:(**)
主進(jìn)程創(chuàng)建守護(hù)進(jìn)程
其一:守護(hù)進(jìn)程會(huì)在主進(jìn)程代碼執(zhí)行結(jié)束后就終止
其二:守護(hù)進(jìn)程內(nèi)無(wú)法再開(kāi)啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進(jìn)程之間是互相獨(dú)立的,主進(jìn)程代碼運(yùn)行結(jié)束,守護(hù)進(jìn)程隨即終止
守護(hù)進(jìn)程代碼展示:
1 import time 2 from multiprocessing import Process 3 4 def f1(): 5 time.sleep(3) 6 print('xxxx') 7 8 def f2(): 9 time.sleep(5) 10 print('普通子進(jìn)程的代碼') 11 if __name__ == '__main__': 12 p = Process(target=f1,) 13 p.daemon = True #將該進(jìn)程設(shè)置為守護(hù)進(jìn)程,必須寫(xiě)在start之前,意思如果我的主進(jìn)程代碼運(yùn)行結(jié)束了,你這個(gè)子進(jìn)程不管運(yùn)行到什么地方,都直接結(jié)束 14 p.start() 15 16 #開(kāi)啟一個(gè)普通的子進(jìn)程來(lái)驗(yàn)證一下守護(hù)進(jìn)程的結(jié)束只和主進(jìn)程的代碼運(yùn)行結(jié)束有關(guān)系,而整個(gè)程序的結(jié)束需要主進(jìn)程和普通的子進(jìn)程的代碼都運(yùn)行結(jié)束才結(jié)束 17 p2 = Process(target=f2,) 18 p2.start() 19 #等待2號(hào)普通進(jìn)程的結(jié)束,才繼續(xù)執(zhí)行下面主進(jìn)程中的代碼 20 # p2.join() 21 #守護(hù)進(jìn)程會(huì)跟跟著父進(jìn)程的代碼運(yùn)行結(jié)束,就結(jié)束 22 print('主進(jìn)程結(jié)束')進(jìn)程鎖(同步鎖/互斥鎖)? *****
進(jìn)程之間數(shù)據(jù)不共享,但是共享同一套文件系統(tǒng),所以訪問(wèn)同一個(gè)文件,或同一個(gè)打印終端,是沒(méi)有問(wèn)題的,而共享帶來(lái)的是競(jìng)爭(zhēng),競(jìng)爭(zhēng)帶來(lái)的結(jié)果就是錯(cuò)亂,如何控制,就是加鎖處理。
簡(jiǎn)單的進(jìn)程鎖代碼展示
1 # 互斥鎖/進(jìn)程鎖/同步鎖 2 # import json 3 import time 4 from multiprocessing import Process,Lock 5 6 def show_t(i): 7 8 with open('ticket','r',encoding='utf-8') as f: 9 ticket_data = f.read() 10 # print(ticket_data) 11 t_data = eval(ticket_data) 12 # print(t_data,type(t_data)) 13 print('%s查詢剩余票數(shù)為%s'%(i,t_data['count'])) 14 15 def get_t(i,l1): 16 l1.acquire() 17 with open('ticket', 'r', encoding='utf-8') as f: 18 ticket_data = f.read() 19 # print(ticket_data) 20 t_data = eval(ticket_data) 21 # print(t_data,type(t_data)) 22 # print('%s查詢剩余票數(shù)為%s' % (i, t_data['count'])) 23 if t_data['count'] > 0: 24 t_data['count'] -= 1 25 print('%s搶票成功'%i) 26 time.sleep(0.2) 27 with open('ticket', 'w') as f: 28 f.write(str(t_data)) 29 30 else: 31 print('沒(méi)票了!!!') 32 l1.release() 33 34 if __name__ == '__main__': 35 l1 = Lock() 36 for i in range(10): 37 p1 = Process(target=show_t,args=(i,)) 38 p1.start() 39 for i in range(10): 40 p2 = Process(target=get_t,args=(i,l1) ) 41 p2.start()數(shù)據(jù)共享:
1 import time 2 from multiprocessing import Process,Manager,Lock 3 4 def f1(m_d,l2): 5 # m_d['num'] -= 1 # 6 with l2: 7 # l2.acquire() 8 tmp = m_d['num'] 9 tmp -= 1 10 time.sleep(0.1) 11 m_d['num'] = tmp 12 # l2.release() 13 14 if __name__ == '__main__': 15 m = Manager() 16 l2 = Lock() 17 m_d = m.dict({'num':100}) 18 p_list = [] 19 for i in range(10): 20 p = Process(target=f1,args=(m_d,l2)) 21 p.start() 22 p_list.append(p) 23 24 [pp.join() for pp in p_list] 25 26 print(m_d['num'])for?循環(huán)創(chuàng)建多進(jìn)程:
1 import time 2 from multiprocessing import Process 3 4 5 def f1(): 6 time.sleep(0.5) 7 print('xxx') 8 9 if __name__ == '__main__': 10 p_list = [] 11 #for循環(huán)創(chuàng)建子進(jìn)程,并且完成主進(jìn)程等待所有子進(jìn)程執(zhí)行結(jié)束,才繼續(xù)執(zhí)行 12 for i in range(10): 13 p = Process(target=f1,) 14 p.start() 15 p_list.append(p) 16 p.join() 17 # for pp in p_list: 18 # pp.join() 19 20 print('主進(jìn)程結(jié)束')隊(duì)列(Queue):
進(jìn)程彼此之間互相隔離,要實(shí)現(xiàn)進(jìn)程間通信(IPC),multiprocessing模塊支持兩種形式:隊(duì)列和管道,這兩種方式都是使用消息傳遞的。隊(duì)列就像一個(gè)特殊的列表,但是可以設(shè)置固定長(zhǎng)度,并且從前面插入數(shù)據(jù),從后面取出數(shù)據(jù),先進(jìn)先出。
Queue([maxsize]) 創(chuàng)建共享的進(jìn)程隊(duì)列。參數(shù) :maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無(wú)大小限制。
底層隊(duì)列使用管道和鎖實(shí)現(xiàn)。
基于隊(duì)列的進(jìn)程通信: 1 from multiprocessing import Process,Queue 2 3 def f1(q): 4 q.put('約嗎?') 5 6 if __name__ == '__main__': 7 q = Queue(3) 8 9 p = Process(target=f1,args=(q,)) 10 p.start() 11 12 son_p_msg = q.get() 13 14 print('來(lái)自子進(jìn)程的消息:',son_p_msg)
利用隊(duì)列實(shí)現(xiàn)一個(gè)生產(chǎn)消費(fèi)模型:
1 import time 2 from multiprocessing import Process,JoinableQueue 3 #生產(chǎn)者 4 def producer(q): 5 for i in range(10): 6 time.sleep(0.2) 7 s = '大包子%s號(hào)'%i 8 print(s+'新鮮出爐,拿去用') 9 q.put(s) 10 q.join() #就等著task_done()信號(hào)的數(shù)量,和我put進(jìn)去的數(shù)量相同時(shí),才繼續(xù)執(zhí)行 11 print('所有的任務(wù)都被處理了,繼續(xù)潛行吧騷年們') 12 13 def consumer(q): 14 while 1: 15 time.sleep(0.5) 16 baozi = q.get() 17 18 print(baozi+'被吃了') 19 q.task_done() #給隊(duì)列發(fā)送一個(gè)取出的這個(gè)任務(wù)已經(jīng)處理完畢的信號(hào) 20 21 if __name__ == '__main__': 22 # q = Queue(30) 23 q = JoinableQueue(30) #同樣是一個(gè)長(zhǎng)度為30的隊(duì)列 24 25 pro_p = Process(target=producer,args=(q,)) 26 con_p = Process(target=consumer,args=(q,)) 27 pro_p.start() 28 con_p.daemon = True 29 con_p.start() 30 31 32 pro_p.join() 33 print('主進(jìn)程結(jié)束')?
進(jìn)程隊(duì)列? ***** Queue()
Q = Queue(5)
Q.put()? #滿了會(huì)等待
Q.get()? #沒(méi)有數(shù)據(jù)了會(huì)等待
Q.qsize()
Q.empty()? 不可靠
Q.full()不可靠?
Q.get_nowait()? #不等待,但是報(bào)錯(cuò)
Q.put_nowait()?? #不等待,但是報(bào)錯(cuò)
轉(zhuǎn)載于:https://www.cnblogs.com/Godisgirl/p/10246522.html
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的网络编程中的锁与队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 【J2EE】第四章 SpringMVC
- 下一篇: Docker for Linux 安装