Python之进程 3 - 进程池和multiprocess.Poll
一、為什么要有進程池?
在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那么在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么?首先,創建進程需要消耗時間,銷毀進程(空間,變量,文件信息等等的內容)也需要消耗時間。第二,即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,維護一個很大的進程列表的同時,調度的時候,還需要進行切換并且記錄每個進程的執行節點,也就是記錄上下文(各種變量等等亂七八糟的東西,雖然你看不到,但是操作系統都要做),這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。就看我們上面的一些代碼例子,你會發現有些程序是不是執行的時候比較慢才出結果,就是這個原因,那么我們要怎么做呢?
在這里,要給大家介紹一個進程池的概念,定義一個池子,在里面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程并不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現并發效果
二、multiprocess.Pool模塊
2.1 模塊使用
Pool([numprocess [,initializer [, initargs]]]): # 創建進程池創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然后自始至終使用這三個進程去執行所有任務(高級一些的進程池可以根據你的并發量,搞成動態增加或減少進程池中的進程數量的操作),不會開啟其他進程,提高操作系統效率,減少空間的占用等。
參數介紹:
numprocess: 要創建的進程數,如果省略,將默認使用cpu_count()的值 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None initargs:是要傳給initializer的參數組主要方法:
p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。 '''需要強調的是:此操作并不會在所有池工作進程中并執行func函數。如果要通過不同參數并發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()'''p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。'''p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用其他方法(了解):
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法:
obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。obj.ready():如果調用完成,返回Trueobj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常obj.wait([timeout]):等待結果變為可用。obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數2.2 進程池的簡單應用
進程池與多進程的效率對比:
import time from multiprocessing import Pool, Process# 針對range(100)這種參數的 def func(n):for i in range(3):print(n + 1)def func(n):print(n)# 結果:# (1, 2)# alexdef func2(n):for i in range(3):print(n - 1)if __name__ == '__main__':# 1.進程池的模式s1 = time.time() # 我們計算一下開多進程和進程池的執行效率poll = Pool(5) # 創建含有5個進程的進程池# poll.map(func,range(100)) #異步調用進程,開啟100個任務,map自帶join的功能poll.map(func, [(1, 2), 'alex']) # 異步調用進程,開啟100個任務,map自帶join的功能poll.map(func2, range(100)) # 如果想讓進程池完成不同的任務,可以直接這樣搞# map只限于接收一個可迭代的數據類型參數,列表啊,元祖啊等等,如果想做其他的參數之類的操作,需要用后面我們要學的方法。t1 = time.time() - s1# 2.多進程的模式s2 = time.time()p_list = []for i in range(100):p = Process(target=func, args=(i,))p_list.append(p)p.start()[pp.join() for pp in p_list]t2 = time.time() - s2print('t1>>', t1) # 結果:0.034856319427490234 進程池的效率高print('t2>>', t2) # 結果:0.38016200065612793有一點,map是異步執行的,并且自帶close和join。
一般約定俗成的是進程池中的進程數量為CPU的數量,工作中要看具體情況來考量。
2.3 同步與異步
進程池的同步調用:
import os, time from multiprocessing import Pooldef work(n):print('%s run' % os.getpid())time.sleep(1)return n ** 2if __name__ == '__main__':p = Pool(3) # 進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務res_l = []for i in range(10):res = p.apply(work, args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞# 但不管該任務是否存在阻塞,同步調用都會在原地等著res_l.append(res)print(res_l)進程池的異步調用:
import os import time import random from multiprocessing import Pooldef work(n):print('%s run' %os.getpid())time.sleep(random.random())return n**2if __name__ == '__main__':p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務res_l=[]for i in range(10):res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行,并且可以執行不同的任務,傳送任意的參數了。# 返回結果之后,將結果放入列表,歸還進程,之后再執行新的任務# 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束# 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。 res_l.append(res)# 異步apply_async用法:如果使用異步提交的任務,主進程需要使用join,等待進程池內任務都處理完,然后可以用get收集結果# 否則,主進程結束,進程池可能還沒來得及執行,也就跟著一起結束了p.close() #不是關閉進程池,而是結束進程池接收任務,確保沒有新任務再提交過來。p.join() #感知進程池中的任務已經執行結束,只有當沒有新的任務添加進來的時候,才能感知到任務結束了,所以在join之前必須加上close方法for res in res_l:print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get詳解:apply_async和apply:
一:使用進程池(異步調用apply_async)
from multiprocessing import Process, Pool import timedef func(msg):print("msg:", msg)time.sleep(1)return msgif __name__ == "__main__":pool = Pool(processes=3)res_l = []for i in range(10):msg = "hello %d" % (i)res = pool.apply_async(func, (msg,)) # 維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去res_l.append(res)# s = res.get() #如果直接用res這個結果對象調用get方法獲取結果的話,這個程序就變成了同步,因為get方法直接就在這里等著你創建的進程的結果,第一個進程創建了,并且去執行了,那么get就會等著第一個進程的結果,沒有結果就一直等著,那么主進程的for循環是無法繼續的,所以你會發現變成了同步的效果print("==============================>") # 沒有后面的join,或get,則程序整體結束,進程池中的任務還沒來得及全部執行完也都跟著主進程一起結束了pool.close() # 關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成pool.join() # 調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束print(res_l) # 看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join后執行的,證明結果已經計算完畢,剩下的事情就是調用每個對象下的get方法去獲取結果for i in res_l:print(i.get()) # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get二:使用進程池(同步調用apply)
from multiprocessing import Process, Pool import timedef func(msg):print("msg:", msg)time.sleep(0.1)return msgif __name__ == "__main__":pool = Pool(processes=3)res_l = []for i in range(10):msg = "hello %d" % (i)res = pool.apply(func, (msg,)) # 維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去res_l.append(res) # 同步執行,即執行完一個拿到結果,再去執行另外一個print("==============================>")pool.close()pool.join() # 調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束print(res_l) # 看到的就是最終的結果組成的列表for i in res_l: # apply是同步的,所以直接得到結果,沒有get()方法print(i)2.4 進程池版的socket并發聊天
server端:tcp_server.py
# Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count()) # 開啟6個客戶端,會發現2個客戶端處于等待狀態 # 在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程 from socket import * from multiprocessing import Pool import osserver = socket(AF_INET, SOCK_STREAM) server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) server.bind(('127.0.0.1', 8080)) server.listen(5)def talk(conn):print('進程pid: %s' % os.getpid())while True:try:msg = conn.recv(1024)if not msg: breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':p = Pool(4)while True:conn, *_ = server.accept()p.apply_async(talk, args=(conn,))# p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問client端:tcp_client.py
from socket import *client = socket(AF_INET, SOCK_STREAM) client.connect(('127.0.0.1', 8080))while True:msg = input('>>: ').strip()if not msg: continueclient.send(msg.encode('utf-8'))msg = client.recv(1024)print(msg.decode('utf-8'))發現:并發開啟多個客戶端,服務端同一時間只有4個不同的pid,只能結束一個客戶端,另外一個客戶端才會進來.
同時最多和4個人進行聊天,因為進程池中只有4個進程可供調用,那如果,我們這么多人想同時聊天怎么辦,又不讓用多進程,進程池也不能開太多的進程,那咋整啊,后面我們會學到多線程,到時候大家就知道了
2.5 回調函數
需要回調函數的場景:
進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數,這是進程池特有的,普通進程沒有這個機制,但是我們也可以通過進程通信來拿到返回值,進程池的這個回調也是進程通信的機制完成的。我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
回調函數的簡單使用:
import os from multiprocessing import Pooldef func1(n):print('func1>>', os.getpid())print('func1')return n * ndef func2(nn):print('func2>>', os.getpid())print('func2')print(nn)# import time# time.sleep(0.5)if __name__ == '__main__':print('主進程:', os.getpid())p = Pool(5)# args里面的10給了func1,func1的返回值作為回調函數的參數給了callback對應的函數,不能直接給回調函數直接傳參數,他只能是你任務函數func1的函數的返回值# for i in range(10,20): #如果是多個進程來執行任務,那么當所有子進程將結果給了回調函數之后,回調函數又是在主進程上執行的,那么就會出現打印結果是同步的效果。我們上面func2里面注銷的時間模塊打開看看# p.apply_async(func1,args=(i,),callback=func2)p.apply_async(func1, args=(10,), callback=func2)p.close()p.join()# 結果 # 主進程: 11852 #發現回調函數是在主進程中完成的,其實如果是在子進程中完成的,那我們直接將代碼寫在子進程的任務函數func1里面就行了,對不對,這也是為什么稱為回調函數的原因。 # func1>> 17332 # func1 # func2>> 11852 # func2 # 100回調函數在寫的時候注意一點,回調函數的形參執行有一個,如果你的執行函數有多個返回值,那么也可以被回調函數的這一個形參接收,接收的是一個元祖,包含著你執行函數的所有返回值。
注意:如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數:
from multiprocessing import Pool import time, random, osdef work(n):time.sleep(1)return n ** 2if __name__ == '__main__':p = Pool()res_l = []for i in range(10):res = p.apply_async(work, args=(i,))res_l.append(res)p.close()p.join() # 等待進程池中所有進程執行完畢nums = []for res in res_l:nums.append(res.get()) # 拿到所有結果print(nums) # 主進程拿到所有的處理結果,可以在主進程中進行統一進行處理2.6 使用進程池進行爬蟲
? 使用進程池來搞爬蟲的時候,最耗時間的是請求地址的網絡請求延遲,那么如果我們在將處理數據的操作加到每個子進程中,那么所有在進程池后面排隊的進程就需要等更長的時間才能獲取進程池里面的執行進程來執行自己,所以一般我們就將請求作成一個執行函數,通過進程池去異步執行,剩下的數據處理的內容放到另外一個進程或者主進程中去執行,將網絡延遲的時間也利用起來,效率更高。
爬蟲相關的requests模塊簡單使用:
import requestsresponse = requests.get('http://www.baidu.com') print(response) print(response.status_code) # 200正常,404找不到網頁,503等5開頭的是人家網站內部錯誤 print(response.content.decode('utf-8'))requests這個模塊的get方法請求頁面,就和我們在瀏覽器上輸入一個網址然后回車去請求別人的網站的效果是一樣的。
使用多進程請求多個url來減少網絡等待浪費的時間
from multiprocessing import Pool import requests import json import osdef get_page(url):print('<進程%s> get %s' % (os.getpid(), url))respone = requests.get(url)if respone.status_code == 200:return {'url': url, 'text': respone.text}def pasrse_page(res):print('<進程%s> parse %s' % (os.getpid(), res['url']))parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text']))with open('db.txt', 'a') as f:f.write(parse_res)if __name__ == '__main__':urls = ['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p = Pool(3)res_l = []for url in urls:res = p.apply_async(get_page, args=(url,), callback=pasrse_page)res_l.append(res)p.close()p.join()# print([res.get() for res in res_l]) # 拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了# 打印結果: <進程89815> get https://www.baidu.com <進程89816> get https://www.python.org <進程89817> get https://www.openstack.org <進程89815> get https://help.github.com/ <進程89814> parse https://www.baidu.com <進程89816> get http://www.sina.com.cn/ <進程89814> parse https://www.python.org <進程89814> parse http://www.sina.com.cn/ <進程89814> parse https://www.openstack.org <進程89814> parse https://help.github.com/爬蟲示例
from multiprocessing import Pool import time, random import requests import redef get_page(url, pattern):response = requests.get(url)if response.status_code == 200:return (response.text, pattern)def parse_page(info):page_content, pattern = infores = re.findall(pattern, page_content)for item in res:dic = {'index': item[0],'title': item[1],'actor': item[2].strip()[3:],'time': item[3][5:],'score': item[4] + item[5]}print(dic)if __name__ == '__main__':pattern1 = re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)url_dic = {'http://maoyan.com/board/7': pattern1,}p = Pool()res_l = []for url, pattern in url_dic.items():res = p.apply_async(get_page, args=(url, pattern), callback=parse_page)res_l.append(res)for i in res_l:i.get()# res=requests.get('http://maoyan.com/board/7')# print(re.findall(pattern,res.text))# 打印結果:#2019-4-15 {'index': '1', 'title': '波西米亞狂想曲', 'actor': '拉米·馬雷克,本·哈迪,約瑟夫·梅澤羅', 'time': '2019-03-22', 'score': '9.5'} {'index': '2', 'title': '綠皮書', 'actor': '維果·莫騰森,馬赫沙拉·阿里,琳達·卡德里尼', 'time': '2019-03-01', 'score': '9.5'} {'index': '3', 'title': '老師·好', 'actor': '于謙,湯夢佳,王廣源', 'time': '2019-03-22', 'score': '9.3'} {'index': '4', 'title': '調音師', 'actor': '阿尤斯曼·庫拉納,塔布,拉迪卡·艾普特', 'time': '2019-04-03', 'score': '9.2'} {'index': '5', 'title': '我的英雄學院:兩位英雄', 'actor': '山下大輝,三宅健太,志田未來', 'time': '2019-03-15', 'score': '9.2'} {'index': '6', 'title': '反貪風暴4', 'actor': '古天樂,鄭嘉穎,林峯', 'time': '2019-04-04', 'score': '9.1'} {'index': '7', 'title': '祈禱落幕時', 'actor': '阿部寬,松島菜菜子,溝端淳平', 'time': '2019-04-12', 'score': '9.1'} {'index': '8', 'title': '小飛象', 'actor': '科林·法瑞爾,邁克爾·基頓,丹尼·德維托', 'time': '2019-03-29', 'score': '9.0'} {'index': '9', 'title': '馴龍高手3', 'actor': '杰伊·巴魯切爾,劉昊然,亞美莉卡·費雷拉', 'time': '2019-03-01', 'score': '9.0'} {'index': '10', 'title': '阿麗塔:戰斗天使', 'actor': '羅莎·薩拉查,克里斯托弗·沃爾茲,基恩·約翰遜', 'time': '2019-02-22', 'score': '9.0'}2.7 進程池和信號量的區別
? 進程池是多個需要被執行的任務在進程池外面排隊等待獲取進程對象去執行自己,而信號量是一堆進程等待著去執行一段邏輯代碼。
信號量不能控制創建多少個進程,但是可以控制同時多少個進程能夠執行,但是進程池能控制你可以創建多少個進程。
舉例:就像那些開大車拉煤的,信號量是什么呢,就好比我只有五個車道,你每次只能過5輛車,但是不影響你創建100輛車,但是進程池相當于什么呢?相當于你只有5輛車,每次5個車拉東西,拉完你再把車放回來,給別的人拉煤用。
其他語言里面有更高級的進程池,在設置的時候,可以將進程池中的進程動態的創建出來,當需求增大的時候,就會自動在進程池中添加進程,需求小的時候,自動減少進程,并且可以設置進程數量的上線,最多為多,python里面沒有。
進程池的其他實現方式:https://docs.python.org/dev/library/concurrent.futures.html
轉載于:https://www.cnblogs.com/russellyoung/p/Python-zhi-jin-cheng-3--jin-cheng-chi-hemultiproce.html
總結
以上是生活随笔為你收集整理的Python之进程 3 - 进程池和multiprocess.Poll的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2023最新SSM计算机毕业设计选题大全
- 下一篇: NRC词语情绪词典和词语色彩词典