进程与multiprocessing模块
一 進程?
進程(Process)是計算機中的程序關于某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。它是操作系統動態執行的基本單元,在傳統的操作系統中,進程既是基本的分配單元,也是基本的執行單元。 ?————百度百科
PS ? os模塊的getpid方法就是獲取當前進程的進程號(id)。
多道技術產生的背景:針對單核,實現并發。
多路復用分為時間上的復用和空間上的復用。
空間上的復用:將內存分為幾個部分,互不干擾。
時間上的復用:
1 遇到I/O阻塞時切換任務。
2 任務執行固定時間后主動切換。
?
壹:Process類,創建子進程
一 創建子進程
創建子進程的方法一
import multiprocessing import time import os def foo():time.sleep(1)print('子進程 %s 父進程 %s' %(os.getpid(),os.getppid())) if __name__ == '__main__': #在windows下必須加上這一句代碼 p1=multiprocessing.Process(target=foo) p2=multiprocessing.Process(target=foo) p1.start() p2.start() p1.join() #主進程等待子進程完成,在執行主進程 p2.join() print('主進程 %s 主進程的父進程是 %s,這是pycharm的進程'%(os.getpid(),os.getppid()))輸出:
子進程 1260 父進程 12808 子進程 2256 父進程 12808 主進程 12808 主進程的父進程是 8804,這是pycharm的進程創建子進程的方法二
import multiprocessing import os class Pro(multiprocessing.Process):def __init__(self,name):super().__init__()self.name=name def run(self): print('進行姓名',self.name) print('子進程 %s 父進程 %s'%(os.getpid(),os.getppid())) if __name__ == '__main__': p=Pro('egon') p.start() print('主進程 %s' %os.getpid())輸出:
主進程 12632 進行姓名 egon 子進程 10300 父進程 12632
創建子進程方法二的應用
import multiprocessing import socket server = socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1', 8080)) server.listen(5) class Myprocess(multiprocessing.Process):def __init__(self,conn):super().__init__()self.conn = conndef run(self):while True:try:while True:date=self.conn.recv(1024)if date==b'q':breakself.conn.send(date.upper())except Exception:break if __name__ == '__main__':while True:conn,addr=server.accept()print('conn',conn,'addr',addr)s=Myprocess(conn)s.start() server.close()在服務端,用multiprocessing模塊開啟多個子進程時,格式是這樣的:
重要!
import multiprocessing import socket class Myprocess(multiprocessing.Process):def __init__(self,conn):self.conn=conndef run(self):pass #核心代碼 if __name__ == '__main__':server=socket.socket() # server.bind(('127.0.0.1',8080)) # server.listen() #這三行代碼固定不動while True: conn,addr=server.accept() #其實一個子進程就是一個connp=Myprocess(conn) #服務端創建多個子進程本應該就是把conn當做參數傳給Myprocessp.start() #生成p對象,p.start()子進程開啟,conn有了一個屬于自己的子進程。?
?
?
二 Process類的其他方法
1 ?join方法
官方文檔的意思是:阻塞當前進程,直到調用join方法的那個進程執行完,再繼續執行當前進程。
import multiprocessing class Myprocess(multiprocessing.Process):def __init__(self,x):super().__init__()self.x=xdef run(self):print('子進程','----') if __name__ == '__main__':p1=Myprocess(1)p1.start()print('主進程','====')輸出:
主進程 ==== 子進程 ----在加入p1.join()代碼之后,p1子進程會先執行完,在執行主進程。
import multiprocessing class Myprocess(multiprocessing.Process):def __init__(self,x):super().__init__()self.x=xdef run(self):print('子進程','----') if __name__ == '__main__':p1=Myprocess(1)p1.start()p1.join()print('主進程','====')輸出:
子進程 ---- 主進程 ====2 daemon() 守護進程
守護進程(daemon)是一類在后臺運行的特殊進程,用于執行特定的系統任務。很多守護進程在系統引導的時候啟動,并且一直運行直到系統關閉。
守護進程會在主進程代碼執行完畢后終止。
守護進程內無法再開啟子進程,如果這樣做,會報錯。
使用方法
p1.daemon=True,放在start()方法之前。
import multiprocessing import time class Myprocess(multiprocessing.Process):def __init__(self,x):super().__init__()self.x=xdef run(self):print('子進程{}'.format(self.x),'----')time.sleep(2)print('子進程{}'.format(self.x),'=====') if __name__ == '__main__':p1=Myprocess(1)p2=Myprocess(2)p1.daemon=True #p1是守護進程,主進程代碼執行完畢后,立馬結束。 p1.start()p2.start()time.sleep(1) #一秒鐘,足骨歐p1,p2開啟子進程print('主進程','====')輸出:
子進程1 ---- 子進程2 ---- 主進程 ==== 子進程2 ===== #因為p1是守護進程,主進程代碼執行完畢后,就立馬結束了。所以沒有打印‘子進程1 ===’?
貳: ?Lock類,創建互斥鎖。只能一次acquire,然后release才能使用。
Rlock類,創建遞歸所。解決死鎖問題。遞歸所有個引用計數,可以多次acquire,release。
同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。互斥鎖為資源引入一個狀態:鎖定/非鎖定。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為“鎖定”,其他線程不能更改;直到該釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個進行寫入操作,從而保證了多情況下數據的正確性。
與join方法類似,作用是并發變為串行。
應用:
搶票的過程,分為查票的余量和買票。查票的余量應該是并發,買票應該是串行。
import json import time import random import multiprocessing def search():date=json.load(open('db.txt','r'))print('票數:{}'.format(date['count'])) def get(i):date = json.load(open('db.txt', 'r'))if date['count']>0:date['count']-=1time.sleep(random.randint(1,3)) #模擬網絡延遲json.dump(date,open('db.txt','w'))print('{} 搶票成功!'.format(i))def rob_ticket(i,lock): #其實可以沒有get,search函數。完全可以合并在一起。但是,分為兩個小函數,邏輯非常清晰。加鎖也變得更加容易,不出錯。search()lock.acquire() #加鎖 #with lock: 和文件操作一樣,也可以簡化。 get(i) # get(i)lock.release() #釋放鎖,解鎖if __name__ == '__main__':lock = multiprocessing.Lock()print('lock',lock) #主進程創建了一個互斥鎖,作為參數傳給子進程。for i in range(1,21): #創建20個子進程p=multiprocessing.Process(target=rob_ticket,args=(i,lock))p.start()輸出:
lock <Lock(owner=None)> 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 票數:2 1 搶票成功! 3 搶票成功!總結:互斥鎖限定了在同一時刻只有一個進程能夠對共享資源進行修改。弊端是涉及到文件的修改,文件在硬盤上,效率不可避免的會很低。
? 而且還需要自己進行加鎖解鎖處理。所以,如果可以,盡量尋求更好的方法。IPC機制便是答案。
?
?
叁:進程間通信(IPC,Inter-Process Communication)指至少兩個進程或線程間傳送數據或信號的一些技術或方法。
python中提供了隊列(Queue)和管道(Pipe)兩種方法。
1 隊列和管道都是將數據存放在內存中,比起硬盤,速度會快很多。
2 隊列是基于 管道+鎖 實現的,可以幫我們從加鎖的繁瑣代碼中解脫出來。推薦使用
?
肆 Queue類。隊列。
Queue([maxsize]),?創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。
put方法 ?put_mowait()方法
get方法 ?get_nowait()方法
import multiprocessingq=multiprocessing.Queue(3) print(q,) q.put('1') q.put('2') q.put('3') print(q.get()) print(q.get()) print(q.get())輸出:
<multiprocessing.queues.Queue object at 0x000002132885A048> 1 2 3?
伍 ?JoinableQueue類
q=JoinableQueue()
提供了Queue類兩個沒有的方法。
join():阻塞,直到隊列q中沒有item。
task_done():必須跟在get()方法后面。
from multiprocessing import JoinableQueueq = JoinableQueue()q.task_done() # Signal task completionq.join() # Wait for completion。
JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.
task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.
join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks
from multiprocessing import Process,JoinableQueue import time,random def consumer(name,q):while True:time.sleep(random.randint(1,2))res=q.get()if res is None:breakprint('%s 吃了 %s'%(name,res))q.task_done() #一個get()跟著一個task_done() q.task_done()是放在消費者模型這邊的。 def produce(name,q):for i in range(10):time.sleep(random.randint(1,2))res='包子%s'%iq.put(res)print('%s 生產了 %s '%(name,res))q.join() #如果注釋掉,最后顯示的時間是15秒左右,因為p1,p2代碼執行完后,不管隊列q中 if __name__ == '__main__': #有沒有item,p1,p2的完成代表著主進程的完成,c又是守護進程。c盡管沒有消費完所有數據,也會終結。 q.join()是放在生產者模型這邊的。start_time=time.time() #加上join,便是阻塞狀態,知道q隊列中的item被c進程全部完,這樣主進程代碼執行完畢。c作為守護進程,也會隨之終結。用時大約30秒q=JoinableQueue()p1=Process(target=produce,args=('egon',q))p2=Process(target=produce,args=('wupeoqi',q))c=Process(target=consumer,args=('alex',q))c.daemon=Truec.start()p1.start()p2.start()p1.join()p2.join()print(time.time()-start_time)?
陸 Manager()
Python中進程間共享數據,處理基本的queue,pipe和value+array外,還提供了更高層次的封裝。使用multiprocessing.Manager可以簡單地使用這些高級接口。?
Manager()返回的manager對象控制了一個server進程,此進程包含的python對象可以被其他的進程通過proxies來訪問。從而達到多進程間數據通信且安全。Manager支持的類型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。?
Manager()也是創建的內存中的空間。
import time def foo(d,lock):with lock:temp=d['x']time.sleep(0.001)d['x']=temp-1from multiprocessing import Manager,Process,Lock if __name__ == '__main__':m=Manager()lock=Lock()d=m.dict({'x':10})l=[]for i in range(10):p=Process(target=foo,args=(d,lock))l.append(p)p.start()for p in l:p.join()print(d)?
柒 Pool() ? ? ? ? !!!!!http://www.cnblogs.com/Tour/p/4564710.html ?!!!很好的博客地址
? 在使用Python進行系統管理時,特別是同時操作多個文件目錄或者遠程控制多臺主機,并行操作可以節約大量的時間。如果操作的對象數目不大時,還可以直接使用Process類動態的生成多個進程,十幾個還好,但是如果上百個甚至更多,那手動去限制進程數量就顯得特別的繁瑣,此時進程池就派上用場了。?
Pool類可以提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,如果池還沒有滿,就會創建一個新的進程來執行請求。如果池滿,請求就會告知先等待,直到池中有進程結束,才會創建新的進程來執行這些請求。?
下面介紹一下multiprocessing 模塊下的Pool類下的幾個方法。
apply():
該函數用于傳遞不定參數,主進程會被阻塞直到函數執行結束,不建議使用。同步調用
apply_async():
apply_async(func[, args=()[, kwds={}[, callback=None]]])
非阻塞且支持結果返回進行回調
首先來看apply_async方法,源碼如下:
def apply_async(self, func, args=(), kwds={}, callback=None):assert self._state == RUNresult = ApplyResult(self._cache, callback)self._taskqueue.put(([(result._job, None, func, args, kwds)], None))return resultfunc表示執行此任務的方法
args、kwds分別表func的位置參數和關鍵字參數
callback表示一個單參數的方法,當有結果返回時,callback方法會被調用,參數即為任務執行后的結果
每調用一次apply_result方法,實際上就向_taskqueue中添加了一條任務,注意這里采用了非阻塞(異步)的調用方式,即apply_async方法中新建的任務只是被添加到任務隊列中,還并未執行,不需要等待,直接返回創建的ApplyResult對象,注意在創建ApplyResult對象時,將它放入進程池的緩存_cache中。
任務隊列中有了新創建的任務,那么根據上節分析的處理流程,進程池的_task_handler線程,將任務從taskqueue中獲取出來,放入_inqueue中,觸發worker進程根據args和kwds調用func,運行結束后,將結果放入_outqueue,再由進程池中的_handle_results線程,將運行結果從_outqueue中取出,并找到_cache緩存中的ApplyResult對象,_set其運行結果,等待調用端獲取。
close():
關閉進程池,使其不再接收新的任務。
join():
主進程阻塞等待子進程的退出,join方法必須用在close()方法之后,兩者搭配使用。
PS?回調函數:
回調函數就是一個通過函數指針調用的函數。如果你把函數的指針(地址)作為參數傳遞給另一個函數,當這個指針被用來調用其所指向的函數時,我們就說這是回調函數。回調函數不是由該函數的實現方直接調用,而是在特定的事件或條件發生時由另外的一方調用的,用于對該事件或條件進行響應。 ? ? ? ? ? ? ? ?——百度百科
你到一個商店買東西,剛好你要的東西沒有貨,于是你在店員那里留下了你的電話,過了幾天店里有貨了,店員就打了你的電話,然后你接到電話后就到店里去取了貨。在這個例子里,你的電話號碼就叫回調函數,你把電話留給店員就叫登記回調函數,店里后來有貨了叫做觸發了回調關聯的事件,店員給你打電話叫做調用回調函數,你到店里去取貨叫做響應回調事件。 ?——知乎回答callback函數是一個以參數形式傳遞給另一個函數的函數,并且該函數(指callback函數)必須等另一個函數執行完才會被調用!(當被調用時,另一個函數就是callback函數的父函數)。
理解起來可能有點繞,通俗點的例子:
函數a有一個參數,這個參數是個函數b,當函數a執行完以后執行函數b。那么這個過程就叫回調。
這里必須強調的一點:函數b是你以參數形式傳給函數a的,那么函數b被調用時就叫回調函數。
PS 同步與異步
同步和異步關注的是消息通信機制 (synchronous communication/ asynchronous communication)
所謂同步,就是在發出一個*調用*時,在沒有得到結果之前,該*調用*就不返回。但是一旦調用返回,就得到返回值了。
換句話說,就是由*調用者*主動等待這個*調用*的結果。
而異步則是相反,*調用*在發出之后,這個調用就直接返回了,所以沒有返回結果。換句話說,當一個異步過程調用發出后,調用者不會立刻得到結果。而是在*調用*發出后,*被調用者*通過狀態、通知來通知調用者,或通過回調函數處理這個調用。
同步I/O操作:導致請求進程阻塞,直到I/O操作完成;
異步I/O操作:不導致請求進程阻塞。
from multiprocessing import Process,Pool import time,os def foo(n):print(n)time.sleep(5)print('%s is working '%os.getpid())return n**2if __name__ == '__main__':p=Pool(5)objs=[]for i in range(10):obj=p.apply_async(foo,args=(i,))objs.append(obj)p.close()p.join()print(objs)for obj in objs:print(obj.get())輸出:?
0 1 2 3 4 14344 is working 5 15624 is working 6 5312 is working 7 16000 is working 8 11868 is working 9 14344 is working 15624 is working 5312 is working 16000 is working 11868 is working [<multiprocessing.pool.ApplyResult object at 0x000001B9953AF860>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AF908>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AF9B0>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFA90>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFB70>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFC50>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFD30>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFE10>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFEF0>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFFD0>] 0 1 4 9 16 25 36 49 64 81注意:看obj是什么,用get()方法取其值。
?
Pool類的異步以及回調函數。回調函數可以用在爬蟲上。
import requests,os from multiprocessing import Pool,Process def get(url):r=requests.get(url)print('進程%s get %s'%(os.getpid(),url))return {'url':url,'text':len(r.text)} def search(dic):with open('db.txt','a')as f: # a 模式 也可以創建不存在的文件名date='url: %s lenth: %s\n'%(dic['url'],dic['text'])f.write(date) if __name__ == '__main__':p=Pool(3)l=[]url_l=['http://cn.bing.com/','http://www.cnblogs.com/wupeiqi/','http://www.cnblogs.com/654321cc/','https://www.cnblogs.com/','http://society.people.com.cn/n1/2017/1012/c1008-29581930.html','http://www.xilu.com/news/shaonianxinzangyou5gedong.html',]for url in url_l:obj=p.apply_async(get,(url,),callback=search) #在這里,apply_async,創建了進程。search是回調函數,有且唯一參數是get函數的返回值,l.append(obj) #obj一直是ApplyResult object p.close()p.join()print(l)for obj in l:print(obj.get()) #obj.get()一直是get()函數的返回值,不管有沒有回調函數。輸出:
進程14044 get http://www.cnblogs.com/wupeiqi/ 進程13000 get http://www.cnblogs.com/654321cc/ 進程15244 get http://cn.bing.com/ 進程15244 get http://www.xilu.com/news/shaonianxinzangyou5gedong.html 進程14044 get https://www.cnblogs.com/ 進程13000 get http://society.people.com.cn/n1/2017/1012/c1008-29581930.html [<multiprocessing.pool.ApplyResult object at 0x0000027D4C893BE0>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893C88>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893D30>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893DD8>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893E80>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893F60>] {'url': 'http://cn.bing.com/', 'text': 127210} {'url': 'http://www.cnblogs.com/wupeiqi/', 'text': 21292} {'url': 'http://www.cnblogs.com/654321cc/', 'text': 13268} {'url': 'https://www.cnblogs.com/', 'text': 40331} {'url': 'http://society.people.com.cn/n1/2017/1012/c1008-29581930.html', 'text': 23641} {'url': 'http://www.xilu.com/news/shaonianxinzangyou5gedong.html', 'text': 51247}?
轉載于:https://www.cnblogs.com/654321cc/p/7650190.html
總結
以上是生活随笔為你收集整理的进程与multiprocessing模块的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: WPF ComboBox下拉绑定Tree
- 下一篇: js中push(),pop(),unsh