python学习点滴记录-Day09
網絡編程之基于udp協議的套接字
基于UDP協議的套接字
udp是無鏈接的,面向消息的(有報頭有數據),先啟動哪一端都不會報錯,也不會發生粘包現象。
udp服務端
1 ss = socket() #創建一個服務器的套接字 2 ss.bind() #綁定服務器套接字 3 inf_loop: #服務器無限循環 4 cs = ss.recvfrom()/ss.sendto() # 對話(接收與發送) 5 ss.close() # 關閉服務器套接字udp客戶端
cs = socket() # 創建客戶套接字 comm_loop: # 通訊循環cs.sendto()/cs.recvfrom() # 對話(發送/接收) cs.close() # 關閉客戶套接字 #_*_coding:utf-8_*_ __author__ = 'Linhaifeng' import socket ip_port=('127.0.0.1',9000) BUFSIZE=1024 udp_server_client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)udp_server_client.bind(ip_port)while True:msg,addr=udp_server_client.recvfrom(BUFSIZE)print(msg,addr)udp_server_client.sendto(msg.upper(),addr) 簡單示例:udp服務端 #_*_coding:utf-8_*_ __author__ = 'Linhaifeng' import socket ip_port=('127.0.0.1',9000) BUFSIZE=1024 udp_server_client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)while True:msg=input('>>: ').strip()if not msg:continueudp_server_client.sendto(msg.encode('utf-8'),ip_port)back_msg,addr=udp_server_client.recvfrom(BUFSIZE)print(back_msg.decode('utf-8'),addr) 簡單示例:udp客戶端udp套接字和tcp套接字比較的話,少了鏈接循環,發送和接受從from/send變成了recvfrom/sendto,每一條消息都是完整的。
基于socketserver的udp并發示例
import socketserver class MyUDPhandler(socketserver.BaseRequestHandler):def handle(self):print(self.request)self.request[1].sendto(self.request[0].upper(),self.client_address)if __name__ == '__main__':s=socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyUDPhandler)s.serve_forever() 服務端 from socket import *udp_client=socket(AF_INET,SOCK_DGRAM)while True:msg=input('>>: ').strip()udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8080))data,server_addr=udp_client.recvfrom(1024)print(data.decode('utf-8')) 客戶端(可復制為多個模擬并發)?進程
? ? 進程:正在進行的一個過程或者說一個任務。而負責執行任務則是cpu。
理論基礎:#一 操作系統的作用:1:隱藏丑陋復雜的硬件接口,提供良好的抽象接口2:管理、調度進程,并且將多個進程對硬件的競爭變得有序#二 多道技術:1.產生背景:針對單核,實現并發ps:現在的主機一般是多核,那么每個核都會利用多道技術有4個cpu,運行于cpu1的某個程序遇到io阻塞,會等到io結束再重新調度,會被調度到4個cpu中的任意一個,具體由操作系統調度算法決定。2.空間上的復用:如內存中同時有多道程序3.時間上的復用:復用一個cpu的時間片強調:遇到io切,占用cpu時間過長也切,核心在于切之前將進程的狀態保存下來,這樣才能保證下次切換回來時,能基于上次切走的位置繼續運行
進程和程序
程序僅僅只是一堆代碼而已,而進程指的是程序的運行過程。同一個程序執行兩次,那也是兩個進程,比如打開暴風影音,雖然都是同一個軟件,但是一個可以播放蒼井空,一個可以播放飯島愛/斜眼笑
并發與并行
無論是并行還是并發,在用戶看來都是'同時'運行的,不管是進程還是線程,都只是一個任務而已,真是干活的是cpu,cpu來做這些任務,而一個cpu同一時刻只能執行一個任務
? ? ? 一 并發:是偽并行,即看起來是同時運行。單個cpu+多道技術就可以實現并發,(并行也屬于并發)
? ? ? 二 并行:同時運行,只有具備多個cpu才能實現并行
? ? ? ? ?單核下,可以利用多道技術,多個核,每個核也都可以利用多道技術(多道技術是針對單核而言的)
? ? ? ? ?有四個核,六個任務,這樣同一時間有四個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4,
? ? ? ? ?一旦任務1遇到I/O就被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術
? ? ? ? ?而一旦任務1的I/O結束了,操作系統會重新調用它(需知進程的調度、分配給哪個cpu運行,由操作系統說了算),可能被分配給四個cpu中的任意一個去執行
所有現代計算機經常會在同一時間做很多件事,一個用戶的PC(無論是單cpu還是多cpu),都可以同時運行多個任務(一個任務可以理解為一個進程)。
啟動一個進程來殺毒(360軟件)
啟動一個進程來看電影(暴風影音)
啟動一個進程來聊天(騰訊QQ)
?所有的這些進程都需被管理,于是一個支持多進程的多道程序系統是至關重要的
? ? ? 多道技術:內存中同時存入多道(多個)程序,cpu從一個進程快速切換到另外一個,使每個進程各自運行幾十或幾百毫秒,這樣,雖然在某一個瞬間,一個cpu只能執行一個任務,但在1秒內,cpu卻可以運行多個進程,這就給人產生了并行的錯覺,即偽并發,以此來區分多處理器操作系統的真正硬件并行(多個cpu共享同一個物理內存)
同步執行與異步執行
同步執行:一個進程在執行某個任務時,另外一個進程必須等待其執行完畢,才能繼續執行
異步執行:一個進程在執行某個任務時,另外一個進程無需等待其執行完畢,就可以繼續執行,當有消息返回時,系統會通知后者進行處理,這樣可以提高執行效率
? ? 舉個例子,打電話時就是同步通信,發短息時就是異步通信。
進程的創建
但凡是硬件,都需要有操作系統去管理,只要有操作系統,就有進程的概念,就需要有創建進程的方式,一些操作系統只為一個應用程序設計,比如微波爐中的控制器,一旦啟動微波爐,所有的進程都已經存在。
而對于通用系統(跑很多應用程序),需要有系統運行過程中創建或撤銷進程的能力,主要分為4中形式創建新的進程
1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,后臺運行的進程與用戶無關,運行在后臺并且只在需要時才喚醒的進程,稱為守護進程,如電子郵件、web頁面、新聞、打印)
2. 一個進程在運行過程中開啟了子進程(如nginx開啟多進程,os.fork,subprocess.Popen等)
3. 用戶的交互式請求,而創建一個新進程(如用戶雙擊暴風影音)
4. 一個批處理作業的初始化(只在大型機的批處理系統中應用)
無論哪一種,新進程的創建都是由一個已經存在的進程執行了一個用于創建進程的系統調用而創建的:
1. 在UNIX中該系統調用是:fork,fork會創建一個與父進程一模一樣的副本,二者有相同的存儲映像、同樣的環境字符串和同樣的打開文件(在shell解釋器進程中,執行一個命令就會創建一個子進程)
2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的創建,也負責把正確的程序裝入新進程。
關于創建的子進程,UNIX和windows
1.相同的是:進程創建后,父進程和子進程有各自不同的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另外一個進程。
2.不同的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是可以有只讀的共享內存區的。但是對于windows系統來說,從一開始父進程與子進程的地址空間就是不同的。
?
進程的終止?
1. 正常退出(自愿,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)
2. 出錯退出(自愿,python a.py中a.py不存在)
3. 嚴重錯誤(非自愿,執行非法指令,如引用不存在的內存,1/0等,可以捕捉異常,try...except...)
4. 被其他進程殺死(非自愿,如kill -9)
進程的層次結構
無論UNIX還是windows,進程只有一個父進程,不同的是:
1. 在UNIX中所有的進程,都是以init進程為根,組成樹形結構。父子進程共同組成一個進程組,這樣,當從鍵盤發出一個信號時,該信號被送給當前與鍵盤相關的進程組中的所有成員。
2. 在windows中,沒有進程層次的概念,所有的進程都是地位相同的,唯一類似于進程層次的暗示,是在創建進程時,父進程得到一個特別的令牌(稱為句柄),該句柄可以用來控制子進程,但是父進程有權把該句柄傳給其他子進程,這樣就沒有層次了。
進程的狀態
tail -f access.log |grep '404'
執行程序tail,開啟一個子進程,執行程序grep,開啟另外一個子進程,兩個進程之間基于管道'|'通訊,將tail的結果作為grep的輸入。
進程grep在等待輸入(即I/O)時的狀態稱為阻塞,此時grep命令都無法運行
其實在兩種情況下會導致一個進程在邏輯上不能運行,
1. 進程掛起是自身原因,遇到I/O阻塞,便要讓出CPU讓其他進程去執行,這樣保證CPU一直在工作
2. 與進程無關,是操作系統層面,可能會因為一個進程占用時間過多,或者優先級等原因,而調用其他的進程去使用CPU。
因而一個進程由三種狀態
進程并發的實現
進程并發的實現在于,硬件中斷一個正在運行的進程,把此時進程運行的所有狀態保存下來,為此,操作系統維護一張表格,即進程表(process table),每個進程占用一個進程表項(這些表項也稱為進程控制塊)
該表存放了進程狀態的重要信息:程序計數器、堆棧指針、內存分配狀況、所有打開文件的狀態、帳號和調度信息,以及其他在進程由運行態轉為就緒態或阻塞態時,必須保存的信息,從而保證該進程在再次啟動時,就像從未被中斷過一樣。
?開啟進程示例
from multiprocessing import Process import time def work(name):print('task <%s> is runing' %name)time.sleep(2)print('task <%s> is done' % name)if __name__ == '__main__':# Process(target=work,kwargs={'name':'egon'})p1=Process(target=work,args=('egon',))p2=Process(target=work,args=('alex',))p1.start()p2.start()print('主') 方法一 方法二基于tcp協議的并發套接字通信示例(通過開啟子進程的方式實現t)
from multiprocessing import Process from socket import * s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(('127.0.0.1',8080)) s.listen(5) def talK(conn,addr):while True:try:data=conn.recv(1024)if not data:breakconn.send(data.upper())except Exception:breakconn.close()if __name__ == '__main__':while True:conn,addr=s.accept()p=Process(target=talK,args=(conn,addr))p.start()s.close() 服務端 from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect(('127.0.0.1',8080))while True:msg=input('>>: ').strip()if not msg:continuec.send(msg.encode('utf-8'))data=c.recv(1024)print(data.decode('utf-8'))c.close() 客戶端join方法(主進程需要等待子進程結束后再繼續)
from multiprocessing import Process import time def work(name):print('task <%s> is runing' %name)time.sleep(3)print('task <%s> is done' % name)if __name__ == '__main__':p1=Process(target=work,args=('egon',))p2=Process(target=work,args=('alex',))p3=Process(target=work,args=('yuanhao',))# p1.start()# p2.start()# p3.start()# # p1.join() #主進程等,等待p1運行結束# p2.join() #主進程等,等待p2運行結束# p3.join() #主進程等,等待p3運行結束 p_l = [p1, p2, p3]for p in p_l:p.start()for p in p_l:p.join()print('主')# p_l = [p1, p2, p3]# for p in p_l:# p.start()# p.join()# p1.start()# p1.join()# p2.start()# p2.join()# p3.start()# p3.join()# print('主') join示例Process的其他方法
from multiprocessing import Process import time,os def work():print('parent:%s task <%s> is runing' %(os.getppid(),os.getpid()))time.sleep(1000)print('parent:%s task <%s> is done' %(os.getppid(),os.getpid()))if __name__ == '__main__':p1=Process(target=work)p1.start()# p1.terminate()# time.sleep(3)# print(p1.is_alive())# print(p1.name)# print(p1.pid)print('主',os.getpid(),os.getppid())time.sleep(10000) terminate/is_alive/name/pid/getpid/getppid?守護進程
# from multiprocessing import Process # import time # def work(name): # print('task <%s> is runing' %name) # time.sleep(2) # print('task <%s> is done' % name) # # if __name__ == '__main__': # p1=Process(target=work,args=('egon',)) # p1.daemon = True # p1.start() # # print('主')#主進程代碼運行完畢,守護進程就會結束 from multiprocessing import Process import time def foo():print(123)time.sleep(1)print("end123")def bar():print(456)time.sleep(3)print("end456") if __name__ == '__main__':p1=Process(target=foo)p2=Process(target=bar)p1.daemon=Truep1.start()p2.start()print("main-------") #打印該行則主進程代碼結束,則守護進程p1應該被終止,可能會有p1任務執行的打印信息123,因為主進程打印main----時,p1也執行了,但是隨即被終止 View Code進程同步鎖
進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,
競爭帶來的結果就是錯亂,如何控制,就是加鎖處理
part1:多個進程共享同一打印終端
#并發運行,效率高,但競爭同一打印終端,帶來了打印錯亂 from multiprocessing import Process import os,time def work():print('%s is running' %os.getpid())time.sleep(2)print('%s is done' %os.getpid())if __name__ == '__main__':for i in range(3):p=Process(target=work)p.start()并發運行,效率高,但競爭同一打印終端,帶來了打印錯亂 #由并發變成了串行,犧牲了運行效率,但避免了競爭 from multiprocessing import Process,Lock import os,time def work(lock):lock.acquire()print('%s is running' %os.getpid())time.sleep(2)print('%s is done' %os.getpid())lock.release() if __name__ == '__main__':lock=Lock()for i in range(3):p=Process(target=work,args=(lock,))p.start()加鎖:由并發變成了串行,犧牲了運行效率,但避免了競爭part2:多個進程共享同一文件
文件當數據庫,模擬搶票
#文件db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 from multiprocessing import Process,Lock import time,json,random def search():dic=json.load(open('db.txt'))print('\033[43m剩余票數%s\033[0m' %dic['count'])def get():dic=json.load(open('db.txt'))time.sleep(0.1) #模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(0.2) #模擬寫數據的網絡延遲json.dump(dic,open('db.txt','w'))print('\033[43m購票成功\033[0m')def task(lock):search()get() if __name__ == '__main__':lock=Lock()for i in range(100): #模擬并發100個客戶端搶票p=Process(target=task,args=(lock,))p.start()并發運行,效率高,但競爭寫同一文件,數據寫入錯亂 #文件db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 from multiprocessing import Process,Lock import time,json,random def search():dic=json.load(open('db.txt'))print('\033[43m剩余票數%s\033[0m' %dic['count'])def get():dic=json.load(open('db.txt'))time.sleep(0.1) #模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(0.2) #模擬寫數據的網絡延遲json.dump(dic,open('db.txt','w'))print('\033[43m購票成功\033[0m')def task(lock):search()lock.acquire()get()lock.release() if __name__ == '__main__':lock=Lock()for i in range(100): #模擬并發100個客戶端搶票p=Process(target=task,args=(lock,))p.start()加鎖:購票行為由并發變成了串行,犧牲了運行效率,但保證了數據安全0總結:
加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1.效率低
2.需要自己加鎖處理
?
為此mutiprocessing模塊為我們提供了基于消息的IPC通信機制:隊列和管道。
1 隊列和管道都是將數據存放于內存中
2 隊列又是基于(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
?
隊列(Queue)
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
創建隊列的類(底層就是以管道和鎖定的方式實現):
1 Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。?? ??參數介紹:
1 maxsize是隊列中允許最大項數,省略則無大小限制。 from multiprocessing import Queue#先進先出 # q=Queue(3) # # q.put('first') # q.put('second') # q.put('third') # # q.put('fourth') # # print(q.get()) # print(q.get()) # print(q.get()) # print(q.get())#了解 q=Queue(3) #block默認是True,也就是會鎖定,等待隊列的進出 q.put('first',block=False) q.put('second',block=False) q.put('third',block=False) # q.put_nowait('fourth') #q.put('fourth',block=False) q.put('fourth',timeout=3) #timeout可以指定超時時間 簡單的隊列示例? 方法介紹:
主要方法
1 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。 2 q.get方法可以從隊列讀取并且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常. 3 4 q.get_nowait():同q.get(False) 5 q.put_nowait():同q.put(False) 6 7 q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。 8 q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。 9 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣其他方法: 1 q.cancel_join_thread():不會在進程退出時自動連接后臺線程。可以防止join_thread()方法阻塞 2 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,后臺線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。 3 q.join_thread():連接隊列的后臺線程。此方法用于在調用q.close()方法之后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為
應用 ''' multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列 都是基于消息傳遞實現的,但是隊列接口 '''from multiprocessing import Process,Queue import time q=Queue(3)#put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) print(q.full()) #滿了print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了 View Code
生產者消費者模型(詳見老師博客http://www.cnblogs.com/linhaifeng/articles/7428874.html#_label5)
在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
? ? 為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。
? ? 什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
基于隊列實現生產者消費者模型
#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。#參數介紹: maxsize是隊列中允許最大項數,省略則無大小限制。 #方法介紹: JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大于從隊列中刪除項目的數量,將引發ValueError異常q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止 from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q):while True:res=q.get()time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))q.task_done() #向q.join()發送一次信號,證明一個數據已經被取走了def producer(name,q):for i in range(10):time.sleep(random.randint(1,3))res='%s%s' %(name,i)q.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))q.join()if __name__ == '__main__':q=JoinableQueue()#生產者們:即廚師們p1=Process(target=producer,args=('包子',q))p2=Process(target=producer,args=('骨頭',q))p3=Process(target=producer,args=('泔水',q))#消費者們:即吃貨們c1=Process(target=consumer,args=(q,))c2=Process(target=consumer,args=(q,))c1.daemon=Truec2.daemon=True#開始p_l=[p1,p2,p3,c1,c2]for p in p_l:p.start()p1.join()p2.join()p3.join()print('主') #主進程等--->p1,p2,p3等---->c1,c2#p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據#因而c1,c2也沒有存在的價值了,應該隨著主進程的結束而結束,所以設置成守護進程 JoinableQueue?
?進程池
在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,并行操作可以節約大量的時間。多進程是實現并發的手段之一,需要注意的問題是:
例如當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
我們就可以通過維護一個進程池來控制進程數目,比如httpd的進程模式,規定最小進程數和最大進程數...?
ps:對于遠程過程調用的高級應用程序而言,應該使用進程池,Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,就重用進程池中的進程。
?創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然后自始至終使用這三個進程去執行所有任務,不會開啟其他進程
1 Pool([numprocess [,initializer [, initargs]]]):創建進程池?? ??參數介紹:
1 numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值 2 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None 3 initargs:是要傳給initializer的參數組?方法介紹:
主要方法: p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。需要強調的是:此操作并不會在所有池工作進程中并執行func函數。如果要通過不同參數并發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async() p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用其他方法:
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法 obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。 obj.ready():如果調用完成,返回True obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常 obj.wait([timeout]):等待結果變為可用。 obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數應用
from multiprocessing import Pool import os,time def work(n):print('%s run' %os.getpid())time.sleep(3)return n**2if __name__ == '__main__':p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務res_l=[]for i in range(10):res=p.apply(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res res_l.append(res)print(res_l) apply同步執行:阻塞式 from multiprocessing import Pool import os,time def work(n):print('%s run' %os.getpid())time.sleep(3)return n**2if __name__ == '__main__':p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務res_l=[]for i in range(10):res=p.apply_async(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res res_l.append(res)#異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然后可以用get收集結果,否則,主進程結束,進程池可能還沒來得及執行,也就跟著一起結束了 p.close()p.join()for res in res_l:print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get apply_async異步執行:非阻塞 #一:使用進程池(非阻塞,apply_async) #coding: utf-8 from multiprocessing import Process,Pool import timedef func(msg):print( "msg:", msg)time.sleep(1)return msgif __name__ == "__main__":pool = Pool(processes = 3)res_l=[]for i in range(10):msg = "hello %d" %(i)res=pool.apply_async(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去 res_l.append(res)print("==============================>") #沒有后面的join,或get,則程序整體結束,進程池中的任務還沒來得及全部執行完也都跟著主進程一起結束了 pool.close() #關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join后執行的,證明結果已經計算完畢,剩下的事情就是調用每個對象下的get方法去獲取結果for i in res_l:print(i.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get#二:使用進程池(阻塞,apply) #coding: utf-8 from multiprocessing import Process,Pool import timedef func(msg):print( "msg:", msg)time.sleep(0.1)return msgif __name__ == "__main__":pool = Pool(processes = 3)res_l=[]for i in range(10):msg = "hello %d" %(i)res=pool.apply(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另外一個print("==============================>")pool.close()pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束print(res_l) #看到的就是最終的結果組成的列表for i in res_l: #apply是同步的,所以直接得到結果,沒有get()方法print(i) 詳解:apply_async與apply?練習:
#Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count()) #開啟6個客戶端,會發現2個客戶端處于等待狀態 #在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程 from socket import * from multiprocessing import Pool import osserver=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5)def talk(conn,client_addr):print('進程pid: %s' %os.getpid())while True:try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':p=Pool()while True:conn,client_addr=server.accept()p.apply_async(talk,args=(conn,client_addr))# p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問 server端 from socket import *client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8')) 客戶端發現:并發開啟多個客戶端,服務端同一時間只有3個不同的pid,干掉一個客戶端,另外一個客戶端才會進來,被3個進程之一處理
回調函數:
需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
這個時候就需要回調函數了
from multiprocessing import Pool import requests import json import osdef get_page(url):print('<進程%s> get %s' %(os.getpid(),url))respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}def pasrse_page(res):print('<進程%s> parse %s' %(os.getpid(),res['url']))parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))with open('db.txt','a') as f:f.write(parse_res)if __name__ == '__main__':urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p=Pool(3)res_l=[]for url in urls:res=p.apply_async(get_page,args=(url,),callback=pasrse_page)res_l.append(res)p.close()p.join()print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了''' 打印結果: <進程3388> get https://www.baidu.com <進程3389> get https://www.python.org <進程3390> get https://www.openstack.org <進程3388> get https://help.github.com/ <進程3387> parse https://www.baidu.com <進程3389> get http://www.sina.com.cn/ <進程3387> parse https://www.python.org <進程3387> parse https://help.github.com/ <進程3387> parse http://www.sina.com.cn/ <進程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] ''' View Code如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數
from multiprocessing import Pool import time,random,osdef work(n):time.sleep(1)return n**2 if __name__ == '__main__':p=Pool()res_l=[]for i in range(10):res=p.apply_async(work,args=(i,))res_l.append(res)p.close()p.join() #等待進程池中所有進程執行完畢 nums=[]for res in res_l:nums.append(res.get()) #拿到所有結果print(nums) #主進程拿到所有的處理結果,可以在主進程中進行統一進行處理 View Code?
?
?
?
產生一個進程池
轉載于:https://www.cnblogs.com/tianleblog/p/7450113.html
總結
以上是生活随笔為你收集整理的python学习点滴记录-Day09的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 孟连边防门,离缅北多少公里
- 下一篇: 部队文职一年有几次报名