python3 进程
1.開進程的兩種方式:
1. 使用內置的進程
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/17from multiprocessing import Process import osdef get_id(name):print(name,"Main process:",os.getppid(),"current process;", os.getpid())P1 = Process(target=get_id, args=('andy',)) P2 = Process(target = get_id, args=("Jack", ))if __name__ == "__main__":P2.start()P1.start()print("主進程")?2. 自定義進程類:
from multiprocessing import Process import osclass Custom_Process(Process):def __init__(self, name):super().__init__()self.name = namedef run(self):print(self.name, "Main process:", os.getppid(), "current process;", os.getpid())if __name__ == "__main__":P1 = Custom_Process('andy')P2 = Custom_Process("jack")P1.start()P2.start()print("主進程")?事實上在調用P1.start時,系統調用了Process類的run方法,在我們直接調用Process類時,
我們需要指定target(即要進行的操作,參數args),那么定制后我們重寫了run方法,即重寫的
run方法。
在Custom_Process類中我用到了
super().__init__()?這是重寫父類的方法之一,另一種方法是:
Parent.__init__(self)在這里就是:Process.__init__()
關于super().__init__()事實上并不是調用父類,而是尋找繼承順序中的下一個
具體可以參考:Python’s super() considered super!
?
下面是一個應用進程的例子,之前在寫 cs模型 ? ?時有:
server.listen(5)# 設置可以接受的連接數量?雖然這里可以接受5個鏈接,但事實上由于功能上并未實現
所以每次只有一個鏈接可以正常進行通信,其他的鏈接都必須
等到之前的鏈接完成才行。
下面著手改進:
server
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/16import socket,json, struct, subprocess from multiprocessing import ProcessBUFF_SIZE = 1024 IP_PORT = ("127.0.0.1", 8081)server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)# 重用端口 server.bind(IP_PORT) server.listen(5)# 設置可以接受的連接數量def communicate(conn, client_addr):while True:# 內層循環為通信循環msg = conn.recv(BUFF_SIZE)if not msg:breakpipes = subprocess.Popen(msg.decode("utf-8"),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)error = pipes.stderr.read()if error:print("Error:",error)response_msg = errorelse:response_msg = pipes.stdout.read()header = {'data_size':len(response_msg)}# 數據長度header_json = json.dumps(header)#序列化header_json_byte = bytes(header_json,encoding="utf-8")conn.send(struct.pack('i',len(header_json_byte)))#先發送報頭長度,僅包含數據長度, 這里的i指int類型conn.send(header_json_byte)# 再發送報頭conn.sendall(response_msg)# 正式的信息print("Request from:",client_addr, "Command:",msg)conn.close() if __name__ == "__main__":while True:# 外層循環為鏈接循環conn, client_addr = server.accept()p = Process(target=communicate, args=(conn, client_addr))p.start()server.close()?client未變:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/16import socket, json, structBUFF_SIZE = 1024 IP_PORT = ("127.0.0.1", 8081)client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(IP_PORT)while True:msg = input(">>:").strip().encode("utf-8")if not msg:breakclient.send(msg)header = client.recv(4)print("Header:",struct.unpack("i", header))header_length = struct.unpack('i', header)[0]print("Header_length:", header_length)header_json = json.loads(client.recv(header_length).decode("utf-8"))data_size = header_json['data_size']print("Data_size:",data_size)recv_size = 0recv_data = b''while recv_size < data_size:recv_data += client.recv(BUFF_SIZE)recv_size += len(recv_data)print(recv_data.decode("gbk"))client.close()?看下運行結果,這里只開了兩個客戶端,5個同樣的道理:
?
2.LOCK 互斥鎖:
import os, time from multiprocessing import Process, Lockdef work(mutex):mutex.acquire()print("%d is working..." % os.getpid())time.sleep(2)print("%d is done!" % os.getpid())mutex.release()if __name__ == "__main__":mutex = Lock()p1 = Process(target=work, args=(mutex,))p2 = Process(target=work, args=(mutex,))p1.start()p2.start()模擬 搶票系統:所有人都可以查看到還剩下多票,但是只有部分人能搶到票。
import json, random, time, os from multiprocessing import Process , Lockdef search():dic = json.load(open('db.txt',))print("%s查詢,車票剩余%s" % (os.getpid(),dic['count']))def get_ticket():dic = json.load(open('db.txt',))if dic['count'] > 0:dic['count'] -= 1time.sleep(random.randint(1,4))json.dump(dic,open('db.txt', 'w'))print('%s 購買成功' % os.getpid())print("車票剩下%s" % dic["count"])else:print("%s搶票失敗 " % os.getpid())def task(mutex):search()mutex.acquire()get_ticket()mutex.release()if __name__ == "__main__":mutex = Lock()for i in range(10):p = Process(target=task, args=(mutex,))p.start()?
?
?3.Join
1.join方法的作用是阻塞主進程(擋住,無法執行join以后的語句),專注執行多線程。
2.多線程多join的情況下,依次執行各線程的join方法,前頭一個結束了才能執行后面一個。
3.無參數,則等待到該線程結束,才開始執行下一個線程的join。
4.設置參數后,則等待該線程這么長時間就不管它了(而該線程并沒有結束)。不管的意思就是可以執行后面的主進程了。
看例子:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19import os, time from multiprocessing import Process, Lockdef work(mutex, t):mutex.acquire()print("%s Running at %s\n" % (os.getpid(),time.strftime("%H:%M:%S")))time.sleep(t)mutex.release()print("%s Stop at %s\n" % (os.getpid(),time.strftime("%H:%M:%S")))if __name__ == "__main__":print("Main Process Running at %s\n" % time.strftime("%H:%M:%S"))mutex = Lock()p1 = Process(target=work, args=(mutex,5))p2 = Process(target=work, args=(mutex,3))p1.start()p2.start()p1.join()print("Join1 finish at %s!\n" % time.strftime("%H:%M:%S"))p2.join()print("Join2 finish at %s!\n" % time.strftime("%H:%M:%S"))print("Main Process Stop at %s\n" % time.strftime("%H:%M:%S"))?此時沒有指定join的時長,所以,第一個進程執行完了,第一個join也相應的結束了,
然后第二個進程執行完了,第二個join也結束了。
?
當指定時間后分兩種情況,當join的時間比進程需要執行的時間短時,它就不再等待該進行,直接執行
將join()修改為p2.join(2)
將p2.join()修改為p2.join(2)
?
可以看到,進程4768還未執行完時,join1等待2秒后直接不管它了,執行了后面的打印語句
接著執行了join2,等待2秒后,主進程自己結束了自己(這里應該是打印語句的原因,事實上并未直接的結束)
此時4768仍在運行,直到自己結束。然后才是進程11108
?
如果我將時間設置得比它需要的時間還長呢,那么它應該在進程運行完時也結束
將P1.join()修改為p1.join(6)
將p2.join()修改為p2.join(4)
?可以看到Join1,join2都是在兩個進行結束后自己結束了,并沒有等待設定的時間長度。
?
4.Daemon 守護進程
守護進程的作用:
一:守護進程會在主進程代碼執行結束后就終止
二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
下面看例子:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19from multiprocessing import Process import timedef work():print("Running...")time.sleep(3)print("Finish!")if __name__ == "__main__":p = Process(target=work,)#p.daemon = Truep.start()print("Main finish!")運行結果:
?
將#p.daemon=True注釋掉,再運行:
可以看到,主進程結束了,子進程也結束了, 并不會等待它運行完。
?
守護進程為什么在主進程結束后就結束了呢?
首先,我們要明白守護進程的作用:守護主進程的一些功能,當主進程執行完了,
也就是說它的功能已經全部執行完了,那么,守護進程也就沒有繼續守護下去的
必要了,所以一旦主進程結束了,守護進程也就結束了。
?
?5.Semaphore 信號量
Semaphore制對共享資源的訪問數量,比如可以同時運行的子進程數量:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19import multiprocessing import timedef worker(s):s.acquire()print(multiprocessing.current_process().name + "acquire");time.sleep(2)print(multiprocessing.current_process().name + "release\n");s.release()if __name__ == "__main__":s = multiprocessing.Semaphore(2)for i in range(5):p = multiprocessing.Process(target = worker, args=(s,))p.start()如上, 只有釋放一個進程才有新的進程進來
?
將信號量改成大于等于進程數:
s = multiprocessing.Semaphore(5)可以看到,所有進程一下全部啟動了。
?
進程間通信有一個人種方式,一種是隊列,一種是管道
6.隊列
?下面演示在一個進程中往隊列中傳入數據,用另一個進程取出來:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19import random,os from multiprocessing import Queue,Processdef put_q(q):print("Put...")for i in range(5):n = random.randint(1,5)print(n)q.put(n)def get_q(q):print("\nGet...")while True:if not q.empty():print("%s" % os.getpid(),q.get())else:breakif __name__ == "__main__":q = Queue(8)p1 = Process(target=put_q,args=(q,))p2 = Process(target=get_q,args=(q,))p1.start()p1.join() # 防止進程2先啟動,隊列為空p2.start()?
這樣就實現了進程間的通信
?
7.管道
Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex參數, 如果duplex參數為True(默認值),那么這個管道是全雙工模式, 也就是說conn1和conn2均可收發。duplex為False,conn1只負責接受消息,conn2只負責發送消息。 send和recv方法分別是發送和接受消息的方法。例如,在全雙工模式下, 可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收, recv方法會一直阻塞。如果管道已經被關閉,那么recv方法會拋出EOFError。 事實上,管道的應用與上面的隊列基本一致,對上面的代碼稍作修改: #!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/19import random,os from multiprocessing import Pipe,Processdef send_p(p):print("send...")for i in range(5):n = random.randint(1,5)print(n)p.send(n)def receive_p(p):print("\nReceive...")while True:print("%s" % os.getpid(),p.recv())if __name__ == "__main__":p = Pipe()p1 = Process(target=send_p,args=(p[0],))p2 = Process(target=receive_p,args=(p[1],))p1.start()p1.join() p2.start()運行:
?
8.Pool 進程池
?Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,
如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;
但如果池中的進程數已經達到規定最大值,那么該請求就會等待,
直到池中有進程結束,才會創建新的進程來它。
看例子:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/20import time from multiprocessing import Pool, Processdef work(msg):print(msg, 'is working\n')time.sleep(2)print(msg,'finish!\n')if __name__ == "__main__":pro = Process()pool = Pool(processes=3)for i in range(1,6):msg = "process %s" % ipool.apply_async(work,(msg,))pool.close()pool.join()# 阻塞主進程,等待子進程執行完?運行:
指定進程池只有3個進程,所以第四個進程只有前面結束一個進程時才能開始。
需要說明的是 pool.apply_async()是非阻塞的,pool.apply()則是阻塞的。看區別:
修改:
pool.apply(work,(msg,))?再次運行:
可以看到,子進程只能結束一個后都會運行下一個進程
?回調函數:
回調函數指:進程池中任何一個任務一旦處理完了,就立即告知主進程:
我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果。
?
對上面的例子進行修改:
#!/usr/bin/env python #coding:utf-8 #Created by Andy @ 2017/9/20import time, os from multiprocessing import Pool, Processdef work(msg):print(msg, 'is working\n')time.sleep(2)print(msg,'finish!\n')return msgdef plus(msg):if msg:msg = msg + '*plus*'print(msg)if __name__ == "__main__":pro = Process()pool = Pool(processes=3)for i in range(1,6):msg = "process %s" % ipool.apply_async(work,(msg,), callback=plus)# 回調函數pool.close()pool.join()?運行:
可以看到一個進程結果后,在開啟一個新的進程到進程池后,
主進程又調用一個回調函數對該進程的結果進行了二次處理。
?
補充:
對于計算機來說,也并不能無限開啟進程,通常比較好的情況是
進程數等于計算機核數是比較好的,否則開多了可能會起到反作用
那么要怎么查看自己的計算機是幾核的呢?
?
posted on 2017-09-17 13:49 Andy_963 閱讀(...) 評論(...) 編輯 收藏轉載于:https://www.cnblogs.com/Andy963/p/7535378.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的python3 进程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 最全mysql的复制和读写分离
- 下一篇: luogu p1459 三值的排序