一文看懂Python多进程与多线程编程(工作学习面试必读)
進程(process)和線程(thread)是非常抽象的概念, 也是程序員必需掌握的核心知識。多進程和多線程編程對于代碼的并發執行,提升代碼效率和縮短運行時間至關重要。小編我今天就來嘗試下用一文總結下Python多進程和多線程的概念和區別, 并詳細介紹如何使用python的multiprocess和threading模塊進行多線程和多進程編程。
?
重要知識點 - 什么是進程(process)和線程(thread)
?
進程是操作系統分配資源的最小單元, 線程是操作系統調度的最小單元。
一個應用程序至少包括1個進程,而1個進程包括1個或多個線程,線程的尺度更小。
每個進程在執行過程中擁有獨立的內存單元,而一個線程的多個線程在執行過程中共享內存。
?
網上有篇阮一峰的博客曾對進程和線程做出了一個非常淺顯的解釋,我在這里貼出來方便大家理解。
?計算機的核心是CPU,它承擔了所有的計算任務。它就像一座工廠,時刻在運行。
假定工廠的電力有限,一次只能供給一個車間使用。也就是說,一個車間開工的時候,其他車間都必須停工。背后的含義就是,單個CPU一次只能運行一個任務。編者注: 多核的CPU就像有了多個發電廠,使多工廠(多進程)實現可能。
進程就好比工廠的車間,它代表CPU所能處理的單個任務。任一時刻,CPU總是運行一個進程,其他進程處于非運行狀態。
一個車間里,可以有很多工人。他們協同完成一個任務。
線程就好比車間里的工人。一個進程可以包括多個線程。
車間的空間是工人們共享的,比如許多房間是每個工人都可以進出的。這象征一個進程的內存空間是共享的,每個線程都可以使用這些共享內存。
可是,每間房間的大小不同,有些房間最多只能容納一個人,比如廁所。里面有人的時候,其他人就不能進去了。這代表一個線程使用某些共享內存時,其他線程必須等它結束,才能使用這一塊內存。
一個防止他人進入的簡單方法,就是門口加一把鎖。先到的人鎖上門,后到的人看到上鎖,就在門口排隊,等鎖打開再進去。這就叫"互斥鎖"(Mutual exclusion,縮寫 Mutex),防止多個線程同時讀寫某一塊內存區域。
還有些房間,可以同時容納n個人,比如廚房。也就是說,如果人數大于n,多出來的人只能在外面等著。這好比某些內存區域,只能供給固定數目的線程使用。
這時的解決方法,就是在門口掛n把鑰匙。進去的人就取一把鑰匙,出來時再把鑰匙掛回原處。后到的人發現鑰匙架空了,就知道必須在門口排隊等著了。這種做法叫做"信號量"(Semaphore),用來保證多個線程不會互相沖突。
不難看出,mutex是semaphore的一種特殊情況(n=1時)。也就是說,完全可以用后者替代前者。但是,因為mutex較為簡單,且效率高,所以在必須保證資源獨占的情況下,還是采用這種設計。
?
原文地址見
http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html
?
Python的多進程編程與multiprocess模塊
?
python的多進程編程主要依靠multiprocess模塊。我們先對比兩段代碼,看看多進程編程的優勢。我們模擬了一個非常耗時的任務,計算8的20次方,為了使這個任務顯得更耗時,我們還讓它sleep 2秒。第一段代碼是單進程計算(代碼如下所示),我們按順序執行代碼,重復計算2次,并打印出總共耗時。
import time
import os
def long_time_task():
????print('當前進程: {}'.format(os.getpid()))
????time.sleep(2)
????print("結果: {}".format(8 ** 20))
if __name__ == "__main__":
????print('當前母進程: {}'.format(os.getpid()))
????start = time.time()
????for i in range(2):
????????long_time_task()
????end = time.time()
????print("用時{}秒".format((end-start)))
輸出結果如下,總共耗時4秒,至始至終只有一個進程14236。看來電腦計算8的20次方基本不費時。
當前母進程: 14236
當前進程: 14236
結果: 1152921504606846976
當前進程: 14236
結果: 1152921504606846976
用時4.01080060005188秒
?
第2段代碼是多進程計算代碼。我們利用multiprocess模塊的Process方法創建了兩個新的進程p1和p2來進行并行計算。Process方法接收兩個參數, 第一個是target,一般指向函數名,第二個時args,需要向函數傳遞的參數。對于創建的新進程,調用start()方法即可讓其開始。我們可以使用os.getpid()打印出當前進程的名字。
?
from multiprocessing import Process
import os
import time
def long_time_task(i):
????print('子進程: {} - 任務{}'.format(os.getpid(), i))
????time.sleep(2)
????print("結果: {}".format(8 ** 20))
if __name__=='__main__':
????print('當前母進程: {}'.format(os.getpid()))
????start = time.time()
????p1 = Process(target=long_time_task, args=(1,))
????p2 = Process(target=long_time_task, args=(2,))
????print('等待所有子進程完成。')
????p1.start()
????p2.start()
????p1.join()
????p2.join()
????end = time.time()
????print("總共用時{}秒".format((end - start)))
輸出結果如下所示,耗時變為2秒,時間減了一半,可見并發執行的時間明顯比順序執行要快很多。你還可以看到盡管我們只創建了兩個進程,可實際運行中卻包含里1個母進程和2個子進程。之所以我們使用join()方法就是為了讓母進程阻塞,等待子進程都完成后才打印出總共耗時,否則輸出時間只是母進程執行的時間。
當前母進程: 6920
等待所有子進程完成。
子進程: 17020 - 任務1
子進程: 5904 - 任務2
結果: 1152921504606846976
結果: 1152921504606846976
總共用時2.131091356277466秒
?
知識點:
新創建的進程與進程的切換都是要耗資源的,所以平時工作中進程數不能開太大。
同時可以運行的進程數一般受制于CPU的核數。
除了使用Process方法,我們還可以使用Pool類創建多進程。
?
利用multiprocess模塊的Pool類創建多進程
?
很多時候系統都需要創建多個進程以提高CPU的利用率,當數量較少時,可以手動生成一個個Process實例。當進程數量很多時,或許可以利用循環,但是這需要程序員手動管理系統中并發進程的數量,有時會很麻煩。這時進程池Pool就可以發揮其功效了。可以通過傳遞參數限制并發進程的數量,默認值為CPU的核數。?
?
Pool類可以提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,如果進程池還沒有滿,就會創建一個新的進程來執行請求。如果池滿,請求就會告知先等待,直到池中有進程結束,才會創建新的進程來執行這些請求。?
?
下面介紹一下multiprocessing 模塊下的Pool類的幾個方法:
1.apply_async
函數原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])
其作用是向進程池提交需要執行的函數及參數,?各個進程采用非阻塞(異步)的調用方式,即每個子進程只管運行自己的,不管其它進程是否已經完成。
2.map()
函數原型:map(func, iterable[, chunksize=None])
Pool類中的map方法,與內置的map函數用法行為基本一致,它會使進程阻塞直到結果返回。?注意:雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程。
3.map_async()
函數原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,但是它是非阻塞的。其有關事項見apply_async。
4.close()
關閉進程池(pool),使其不在接受新的任務。
5. terminate()
結束工作進程,不在處理未處理的任務。
6.join()
主進程阻塞等待子進程的退出, join方法要在close或terminate之后使用。
?
下例是一個簡單的multiprocessing.Pool類的實例。因為小編我的CPU是4核的,一次最多可以同時運行4個進程,所以我開啟了一個容量為4的進程池。4個進程需要計算5次,你可以想象4個進程并行4次計算任務后,還剩一次計算任務(任務4)沒有完成,系統會等待4個進程完成后重新安排一個進程來計算。
?
from multiprocessing import Pool, cpu_count
import os
import time
def long_time_task(i):
????print('子進程: {} - 任務{}'.format(os.getpid(), i))
????time.sleep(2)
????print("結果: {}".format(8 ** 20))
if __name__=='__main__':
????print("CPU內核數:{}".format(cpu_count()))
????print('當前母進程: {}'.format(os.getpid()))
????start = time.time()
????p = Pool(4)
????for i in range(5):
????????p.apply_async(long_time_task, args=(i,))
????print('等待所有子進程完成。')
????p.close()
????p.join()
????end = time.time()
????print("總共用時{}秒".format((end - start)))
知識點:??
對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close()或terminate()方法,讓其不再接受新的Process了。
?
輸出結果如下所示,5個任務(每個任務大約耗時2秒)使用多進程并行計算只需4.37秒,, 耗時減少了60%。
CPU內核數:4
當前母進程: 2556
等待所有子進程完成。
子進程: 16480 - 任務0
子進程: 15216 - 任務1
子進程: 15764 - 任務2
子進程: 10176 - 任務3
結果: 1152921504606846976
結果: 1152921504606846976
子進程: 15216 - 任務4
結果: 1152921504606846976
結果: 1152921504606846976
結果: 1152921504606846976
總共用時4.377134561538696秒
?
相信大家都知道python解釋器中存在GIL(全局解釋器鎖), 它的作用就是保證同一時刻只有一個線程可以執行代碼。由于GIL的存在,很多人認為python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。然而這并意味著python多線程編程沒有意義哦,請繼續閱讀下文。
?
?多進程間的數據共享與通信
?
通常,進程之間是相互獨立的,每個進程都有獨立的內存。通過共享內存(nmap模塊),進程之間可以共享對象,使多個進程可以訪問同一個變量(地址相同,變量名可能不同)。多進程共享資源必然會導致進程間相互競爭,所以應該盡最大可能防止使用共享狀態。還有一種方式就是使用隊列queue來實現不同進程間的通信或數據共享,這一點和多線程編程類似。
?
下例這段代碼中中創建了2個獨立進程,一個負責寫(pw), 一個負責讀(pr), 實現了共享一個隊列queue。
from multiprocessing import Process, Queue
import os, time, random
# 寫數據進程執行的代碼:
def write(q):
????print('Process to write: {}'.format(os.getpid()))
????for value in ['A', 'B', 'C']:
????????print('Put %s to queue...' % value)
????????q.put(value)
????????time.sleep(random.random())
# 讀數據進程執行的代碼:
def read(q):
????print('Process to read:{}'.format(os.getpid()))
????while True:
????????value = q.get(True)
????????print('Get %s from queue.' % value)
if __name__=='__main__':
????# 父進程創建Queue,并傳給各個子進程:
???q = Queue()
????pw = Process(target=write, args=(q,))
????pr = Process(target=read, args=(q,))
????# 啟動子進程pw,寫入:
????pw.start()
????# 啟動子進程pr,讀取:
????pr.start()
????# 等待pw結束:
????pw.join()
????# pr進程里是死循環,無法等待其結束,只能強行終止:
????pr.terminate()
輸出結果如下所示:
Process to write: 3036
Put A to queue...
Process to read:9408
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
?
Python的多線程編程與threading模塊
?
python 3中的多進程編程主要依靠threading模塊。創建新線程與創建新進程的方法非常類似。threading.Thread方法可以接收兩個參數, 第一個是target,一般指向函數名,第二個時args,需要向函數傳遞的參數。對于創建的新線程,調用start()方法即可讓其開始。我們還可以使用current_thread().name打印出當前線程的名字。 下例中我們使用多線程技術重構之前的計算代碼。
import threading
import time
def long_time_task(i):
????print('當前子線程: {} - 任務{}'.format(threading.current_thread().name, i))
????time.sleep(2)
????print("結果: {}".format(8 ** 20))
if __name__=='__main__':
????start = time.time()
????print('這是主線程:{}'.format(threading.current_thread().name))
????t1 = threading.Thread(target=long_time_task, args=(1,))
????t2 = threading.Thread(target=long_time_task, args=(2,))
????t1.start()
????t2.start()
????end = time.time()
????print("總共用時{}秒".format((end - start)))
?
下面是輸出結果。為什么總耗時居然是0秒??我們可以明顯看到主線程和子線程其實是獨立運行的,主線程根本沒有等子線程完成,而是自己結束后就打印了消耗時間。主線程結束后,子線程仍在獨立運行,這顯然不是我們想要的。
這是主線程:MainThread
當前子線程: Thread-1 - 任務1
當前子線程: Thread-2 - 任務2
總共用時0.0017192363739013672秒
結果: 1152921504606846976
結果: 1152921504606846976
?
如果要實現主線程和子線程的同步,我們必需使用join方法(代碼如下所示)。
import threading
import time
def long_time_task(i):
????print('當前子線程: {} 任務{}'.format(threading.current_thread().name, i))
????time.sleep(2)
????print("結果: {}".format(8 ** 20))
if __name__=='__main__':
????start = time.time()
????print('這是主線程:{}'.format(threading.current_thread().name))
????thread_list = []
????for i in range(1, 3):
????????t = threading.Thread(target=long_time_task, args=(i, ))
????????thread_list.append(t)
????for t in thread_list:
????????t.start()
????for t in thread_list:
????????t.join()
????end = time.time()
????print("總共用時{}秒".format((end - start)))
修改代碼后的輸出如下所示。這時你可以看到主線程在等子線程完成后才答應出總消耗時間(2秒),比正常順序執行代碼(4秒)還是節省了不少時間。
這是主線程:MainThread
當前子線程: Thread - 1 任務1
當前子線程: Thread - 2 任務2
結果: 1152921504606846976
結果: 1152921504606846976
總共用時2.0166890621185303秒
?
當我們設置多線程時,主線程會創建多個子線程,在python中,默認情況下主線程和子線程獨立運行互不干涉。如果希望讓主線程等待子線程實現線程的同步,我們需要使用join()方法。如果我們希望一個主線程結束時不再執行子線程,我們應該怎么辦呢? 我們可以使用t.setDaemon(True),代碼如下所示。
import threading
import time
def long_time_task():
????print('當子線程: {}'.format(threading.current_thread().name))
????time.sleep(2)
????print("結果: {}".format(8 ** 20))
if __name__=='__main__':
????start = time.time()
????print('這是主線程:{}'.format(threading.current_thread().name))
????for i in range(5):
????????t = threading.Thread(target=long_time_task, args=())
????????t.setDaemon(True)
????????t.start()
????end = time.time()
????print("總共用時{}秒".format((end - start)))
?
通過繼承Thread類重寫run方法創建新進程
?
除了使用Thread()方法創建新的線程外,我們還可以通過繼承Thread類重寫run方法創建新的線程,這種方法更靈活。下例中我們自定義的類為MyThread, 隨后我們通過該類的實例化創建了2個子線程。
#-*- encoding:utf-8 -*-
import threading
import time
def long_time_task(i):
????time.sleep(2)
????return 8**20
class MyThread(threading.Thread):
????def __init__(self, func, args , name='', ):
????????threading.Thread.__init__(self)
????????self.func = func
????????self.args = args
????????self.name = name
????????self.result = None
????def run(self):
????????print('開始子進程{}'.format(self.name))
????????self.result = self.func(self.args[0],)
????????print("結果: {}".format(self.result))
????????print('結束子進程{}'.format(self.name))
if __name__=='__main__':
????start = time.time()
????threads = []
????for i in range(1, 3):
????????t = MyThread(long_time_task, (i,), str(i))
????????threads.append(t)
????for t in threads:
????????t.start()
????for t in threads:
????????t.join()
????end = time.time()
????print("總共用時{}秒".format((end - start)))
?
輸出結果如下所示:
開始子進程1
開始子進程2
結果: 1152921504606846976
結果: 1152921504606846976
結束子進程1
結束子進程2
總共用時2.005445718765259秒
?
不同線程間的數據共享
?
一個進程所含的不同線程間共享內存,這就意味著任何一個變量都可以被任何一個線程修改,因此線程之間共享數據最大的危險在于多個線程同時改一個變量,把內容給改亂了。如果不同線程間有共享的變量,其中一個方法就是在修改前給其上一把鎖lock,確保一次只有一個線程能修改它。threading.lock()方法可以輕易實現對一個共享變量的鎖定,修改完后release供其它線程使用。比如下例中賬戶余額balance是一個共享變量,使用lock可以使其不被改亂。
# -*- coding: utf-8 -*
import threading
class Account:
????def __init__(self):
????????self.balance = 0
????def add(self, lock):
????????# 獲得鎖
????????lock.acquire()
????????for i in range(0, 100000):
????????????self.balance += 1
????????# 釋放鎖
????????lock.release()
????def delete(self, lock):
????????# 獲得鎖
????????lock.acquire()
????????for i in range(0, 100000):
????????????self.balance -= 1
????????????# 釋放鎖
????????lock.release()
if __name__ == "__main__":
????account = Account()
????lock = threading.Lock()
????# 創建線程
???thread_add = threading.Thread(target=account.add, args=(lock,), name='Add')
????thread_delete = threading.Thread(target=account.delete, args=(lock,), name='Delete')
????# 啟動線程
???thread_add.start()
????thread_delete.start()
????# 等待線程結束
???thread_add.join()
????thread_delete.join()
????print('The final balance is: {}'.format(account.balance))
另一種實現不同線程間數據共享的方法就是使用消息隊列queue。不像列表,queue是線程安全的,可以放心使用,見下文。
?
使用queue隊列通信-經典的生產者和消費者模型
下例中創建了兩個線程,一個負責生成,一個負責消費,所生成的產品存放在queue里,實現了不同線程間溝通。
from queue import Queue
import random, threading, time
# 生產者類
class Producer(threading.Thread):
????def __init__(self, name, queue):
????????threading.Thread.__init__(self, name=name)
????????self.queue = queue
????def run(self):
????????for i in range(1, 5):
????????????print("{} is producing {} to the queue!".format(self.getName(), i))
????????????self.queue.put(i)
????????????time.sleep(random.randrange(10) / 5)
????????print("%s finished!" % self.getName())
# 消費者類
class Consumer(threading.Thread):
????def __init__(self, name, queue):
????????threading.Thread.__init__(self, name=name)
????????self.queue = queue
????def run(self):
????????for i in range(1, 5):
????????????val = self.queue.get()
????????????print("{} is consuming {} in the queue.".format(self.getName(), val))
????????????time.sleep(random.randrange(10))
????????print("%s finished!" % self.getName())
def main():
????queue = Queue()
????producer = Producer('Producer', queue)
????consumer = Consumer('Consumer', queue)
????producer.start()
????consumer.start()
????producer.join()
????consumer.join()
????print('All threads finished!')
if __name__ == '__main__':
????main()
?
隊列queue的put方法可以將一個對象obj放入隊列中。如果隊列已滿,此方法將阻塞至隊列有空間可用為止。queue的get方法一次返回隊列中的一個成員。如果隊列為空,此方法將阻塞至隊列中有成員可用為止。queue同時還自帶emtpy(), full()等方法來判斷一個隊列是否為空或已滿,但是這些方法并不可靠,因為多線程和多進程,在返回結果和使用結果之間,隊列中可能添加/刪除了成員。
?
Python多進程和多線程哪個快?
?
由于GIL的存在,很多人認為Python多進程編程更快,針對多核CPU,理論上來說也是采用多進程更能有效利用資源。網上很多人已做過比較,我直接告訴你結論吧。
對CPU密集型代碼(比如循環計算) - 多進程效率更高
對IO密集型代碼(比如文件操作,網絡爬蟲) - 多線程效率更高。
?
為什么是這樣呢?其實也不難理解。對于IO密集型操作,大部分消耗時間其實是等待時間,在等待時間中CPU是不需要工作的,那你在此期間提供雙CPU資源也是利用不上的,相反對于CPU密集型代碼,2個CPU干活肯定比一個CPU快很多。那么為什么多線程會對IO密集型代碼有用呢?這時因為python碰到等待會釋放GIL供新的線程使用,實現了線程間的切換。
?
小結
?
本文總結了多進程和多線程的概念和區別, 并詳細介紹如何使用python的multiprocess和threading模塊進行多線程和多進程編程。我們還簡單介紹了不同進程和線程間的通信和數據共享。如果您能熟練掌握本文中的所有知識點,那么你已經足以應付大部分面試和工作需求了。如果喜歡本文,就加入微信收藏常來看看吧。
————————————————
版權聲明:本文為CSDN博主「大江狗」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/weixin_42134789/article/details/82992326
總結
以上是生活随笔為你收集整理的一文看懂Python多进程与多线程编程(工作学习面试必读)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: re搜索字符串与find字符串不一样的结
- 下一篇: 在长文本中当中使用正则表达式匹配限定长度