進程之間的數據共享 基于消息傳遞的并發編程是大勢所趨, 即便是使用線程,推薦做法也是將程序設計為大量獨立的線程集合,通過消息隊列交換數據。 這樣極大地減少了對使用鎖和其他同步手段的需求,還可以擴展到分布式系統中。 但進程間應該盡量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題。 以后我們會嘗試使用數據庫來解決現在進程之間的數據共享問題。
進程間數據是獨立的,可以借助于隊列或管道實現通信,二者都是基于消息傳遞的。 雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止于此。
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
Manager支持的類型也很多。
例:
from multiprocessing import Manager,Process,Lock
def work(d,lock):
# with lock 就相當于一組 lock.acquire() lock.release()
# 上下文管理 :必須有一個開始動作 和 一個結束動作的時候with lock: #不加鎖而操作共享的數據,肯定會出現數據錯亂d['count']-=1if __name__ == '__main__':lock=Lock()with Manager() as m:dic=m.dict({'count':100})p_l=[]for i in range(98):p=Process(target=work, args=(dic, lock))p_l.append(p)p.start()for p in p_l:p.join()print(dic)
進程池和multiprocess.Pool模塊 進程池 為什么要有進程池?進程池的概念。 在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。 那么在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程么? 首先,創建進程需要消耗時間,銷毀進程也需要消耗時間。 第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,這樣反而會影響程序的效率。 因此我們不能無限制的根據任務開啟或者結束進程。 那么我們要怎么做呢? 在這里,要給大家介紹一個進程池的概念,定義一個池子,在里面放上固定數量的進程, 有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程并不關閉, 而是將進程再放回進程池中繼續等待任務。 如果有很多任務需要執行,池中的進程數量不夠, 任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。 也就是說,池中進程的數量是固定的,那么同一時間最多有固定數量的進程在運行。 這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現并發效果。
multiprocess.Pool模塊 概念介紹 Pool([numprocess [,initializer [, initargs]]]):創建進程池
參數介紹: 1 numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值 2 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None 3 initargs:是要傳給initializer的參數組
主要方法:
p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。
'''需要強調的是:此操作并不會在所有池工作進程中并執行func函數。
如果要通過不同參數并發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()'''p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。'''p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成。P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用。方法apply_async()和map_async()的返回值是AsyncResul的實例obj。
實例具有以下方法
obj.get():返回結果,如果有必要則等待結果到達。
timeout是可選的。如果在指定時間內還沒有到達,將引發異常。
如果遠程操作中引發了異常,它將在調用此方法時再次被引發。
obj.ready():如果調用完成,返回True。
obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常。
obj.wait([timeout]):等待結果變為可用。
obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。
如果p被垃圾回收,將自動調用此函數。例:進程池的同步調用
import os,time
from multiprocessing import Pooldef work(n):print('%s run' %os.getpid())time.sleep(2)return n**2if __name__ == '__main__':p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務res_l=[]for i in range(10):res_l.append(p.apply(work,args=(i,)))# 同步調用,直到本次任務執行完畢拿到return,# 等待任務work執行的過程中可能有阻塞也可能沒有阻塞# 但不管該任務是否存在阻塞,同步調用都會在原地等著print(res_l)972 run
8036 run
892 run
972 run
8036 run
892 run
972 run
8036 run
892 run
972 run
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]例:進程池的異步調用
import os
import time
import random
from multiprocessing import Pooldef work(n):print('%s run' %os.getpid())time.sleep(random.random())return n**2if __name__ == '__main__':p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務res_l=[]for i in range(10):res = p.apply_async(work, args=(i,))# 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行# 返回結果之后,將結果放入列表,歸還進程,之后再執行新的任務# 需要注意的是,進程池中的三個進程不會同時開啟或者同時結束# 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。res_l.append(res)# 異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,# 等待進程池內任務都處理完,然后可以用get收集結果# 否則,主進程結束,進程池可能還沒來得及執行,也就跟著一起結束了p.close()p.join()for res in res_l:print(res.get())# 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,# 因為apply是同步執行,立刻獲取結果,也根本無需get。例:server:進程池版socket并發聊天#!/usr/bin/env python
# _*_ coding: utf-8 _*_# Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count())
# 開啟6個客戶端,會發現2個客戶端處于等待狀態
# 在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import osserver = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 9527))
server.listen(5)def talk(conn):print('進程pid: %s' % os.getpid())while True:try:msg = conn.recv(1024)if not msg: breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':p = Pool(4)while True:conn, *_ = server.accept()p.apply_async(talk, args=(conn,))client端
#!/usr/bin/env python
# _*_ coding: utf-8 _*_from socket import *client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 9527))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8'))效果:
進程pid: 7980
進程pid: 6252
進程pid: 7156
進程pid: 7564
進程pid: 7980同時只能開啟4個進程,開第五個進程的時候會被阻塞,直到手動關掉前四個中的其中一個,第五個才能進到池子里運行。
并發開啟多個客戶端,服務端同一時間只有4個不同的pid,只能結束一個客戶端,另外一個客戶端才會進來.回調函數 callback=
需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。常用于爬蟲場景。例:使用多進程請求多個url來減少網絡等待浪費的時間
#!/usr/bin/env python
# _*_ coding: utf-8 _*_from multiprocessing import Pool
import requests
import osdef get_page(url):print('<進程%s> get %s' % (os.getpid(), url))respone = requests.get(url)if respone.status_code == 200:return {'url': url, 'text': respone.text}def pasrse_page(res):print('<進程%s> parse %s' % (os.getpid(), res['url']))parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text']))with open('db.txt', 'a') as f:f.write(parse_res)if __name__ == '__main__':urls = ['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p = Pool(3)res_l = []for url in urls:res = p.apply_async(get_page, args=(url,), callback=pasrse_page)res_l.append(res)p.close()p.join()print([res.get() for res in res_l]) # 拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了爬蟲實例:
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
import re
from urllib.request import urlopen
from multiprocessing import Pooldef get_page(url, pattern):response = urlopen(url).read().decode('utf-8')return pattern, responsedef parse_page(info):pattern, page_content = infores = re.findall(pattern, page_content)for item in res:dic = {'index': item[0].strip(),'title': item[1].strip(),'actor': item[2].strip(),'time': item[3].strip(),}print(dic)if __name__ == '__main__':regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'pattern1 = re.compile(regex, re.S)url_dic = {'http://maoyan.com/board/7': pattern1,}p = Pool()res_l = []for url, pattern in url_dic.items():res = p.apply_async(get_page, args=(url, pattern), callback=parse_page)res_l.append(res)for i in res_l:i.get()效果:
{'index': '1', 'actor': '主演:泰爾·謝里丹,奧利維亞·庫克,本·門德爾森', 'time': '上映時間:2018-03-30', 'title': '頭號玩家'}
{'index': '2', 'actor': '主演:道恩·強森,娜奧米·哈里斯,杰弗里·迪恩·摩根', 'time': '上映時間:2018-04-13', 'title': '狂暴巨獸'}
{'index': '3', 'actor': '主演:帕拉巴斯,拉納·達格巴帝,安努舒卡·謝蒂', 'time': '上映時間:2018-05-04', 'title': '巴霍巴利王2:終結'}
{'index': '4', 'actor': '主演:小羅伯特·唐尼,克里斯·海姆斯沃斯,馬克·魯法洛', 'time': '上映時間:2018-05-11', 'title': '復仇者聯盟3:無限戰爭'}
{'index': '5', 'actor': '主演:奧古斯特·迪赫,史特凡·柯納斯克,薇姬·克里普斯', 'time': '上映時間:2018-05-05', 'title': '青年馬克思'}
{'index': '6', 'actor': '主演:閆妮,鄒元清,吳若甫', 'time': '上映時間:2018-05-11', 'title': '我是你媽'}
{'index': '7', 'actor': '主演:凱特·瑪拉,湯姆·費爾頓,布萊德利·惠特福德', 'time': '上映時間:2018-05-11', 'title': '戰犬瑞克斯'}
{'index': '8', 'actor': '主演:郭京飛,迪麗熱巴,大鵬', 'time': '上映時間:2018-04-20', 'title': '21克拉'}
{'index': '9', 'actor': '主演:杰森·格里菲,勞里·海梅斯,迪·布拉雷·貝克爾', 'time': '上映時間:2018-04-05', 'title': '冰雪女王3:火與冰'}
{'index': '10', 'actor': '主演:井柏然,周冬雨,田壯壯', 'time': '上映時間:2018-04-28', 'title': '后來的我們'}如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數。例:
#!/usr/bin/env python
# _*_ coding: utf-8 _*_from multiprocessing import Pool
import timedef work(n):time.sleep(1)return n ** 2if __name__ == '__main__':p = Pool()res_l = []for i in range(10):res = p.apply_async(work, args=(i,))res_l.append(res)p.close()p.join() # 等待進程池中所有進程執行完畢nums = []for res in res_l:nums.append(res.get()) # 拿到所有結果print(nums) # 主進程拿到所有的處理結果,可以在主進程中進行統一進行處理[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
end
轉載于:https://www.cnblogs.com/tielemao/p/9043008.html
總結
以上是生活随笔 為你收集整理的铁乐学python_Day40_进程池 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。