python speed为0但是速度过快_通过并发加速你的 python 程序
本文翻譯至 realpython 上題為 Speed Up Your Python Program With Concurrency 的教程,教程對怎么利用并發加速 python 程序分析非常全面到位。
什么是并發(concurrency)?
從字典定義上講,并發(concurrency)就是同時發生,在 python 語言里面,這些同時發生的事物被不同命名(thread、task、process),但是站在一個高層級(high level)上看,他們都指的是一系列按照順序執行的指令。
我喜歡將它們視為不同的思路,每一條思路可以在某個特定的點上停止下來,執行他們的 CPU 或大腦可以切換到不同的思路上面,每個思路的狀態(state)都會被保存,以至于它被打斷后還可以正確的重新啟動。
你可能很好奇為什么在 python 領域用不同的詞匯來闡述相同的概念,事實證明在一個高層級(high level)上面思考,他們(thread, task, process)是一樣的。一旦你開始往細節深處挖掘,它們都代表略有不同的事物。通過這些示例,您將了解更多有關它們的區別。
現在,讓我們談談該定義的同時。 您必須謹慎一些,因為當您深入細節時,就會知道實際上只有 multiprocessing 是在同一時間運行這些思路。 threading 和 asyncio 都是在單個處理器上運行,因此一次只能運行一個。 他們只是巧妙地找到了輪流加快總體流程的方法。 即使他們不會同時運行不同的思路,我們仍然稱其為并發(concurrency)。
線程或任務輪流運行的方式是 threading 和 asyncio 之間的最大區別。 在線程中,操作系統實際上了解每個線程,并可以隨時中斷它以開始運行其他線程。 這被稱為搶先式多任務處理,因為操作系統可以搶占您的線程以進行切換。
搶先式多任務處理非常方便,因為線程中的代碼不需要執行任何操作即可進行切換。 由于存在“隨時”一詞,這也可能很困難。 此切換可能發生在單個 Python 語句的中間,甚至是 x = x + 1 之類的瑣碎語句。
另一方面,asyncio 使用協作式多任務處理。 這些任務必須通過宣布何時準備好退出來進行協作。 這意味著任務中的代碼必須稍作更改才能實現。
預先進行此額外工作的好處是,您始終知道任務將在哪里調出。 除非標記了該語句,否則它不會在 Python 語句中間交換出來。 稍后您將看到這如何簡化設計的各個部分。
什么是并行(Parallelism)?
到目前為止,您已經研究了在單個處理器上發生的并發。那你的酷炫新筆記本電腦擁有的所有這些 CPU 核心用來干嘛,您如何利用它們? multiprocessing 就是答案。
通過 multiprocessing,Python 創建了新的進程。一個進程可以視為幾乎完全不同的程序,盡管從技術上講,它們通常被定義為資源的集合,其中資源包括內存,文件句柄等。考慮它的一種方法是,每個進程都在自己的 Python 解釋器中運行。
由于它們是不同的進程,因此 multiprocessing 程序中的每個思路都可以在不同的核心上運行。在不同的內核上運行意味著它們實際上可以同時運行,這真是太好了。這樣做會帶來一些復雜性,但是 Python 在大多數情況下都可以很好地平滑它們。
現在您已經了解了 concurrency 和 Parallelism 是什么,讓我們回顧一下它們之間的差異,然后我們來看一下它們為何有用的原因:
什么時候并發有用
并發可以對兩種類型的問題產生很大的影響,這些通常稱為:CPU 約束
I/O 約束
受 I/O 約束的問題會導致程序運行緩慢,因為它經常必須等待某些外部資源的輸入/輸出(I/O)。 當您的程序使用比 CPU 慢得多的東西時,它們經常出現。
比 CPU 慢的例子很多,但值得慶幸的是,您的程序并未與其中大多數交互。 程序最常與之交互的較慢事物是文件系統(file system)和網絡連接(network connections)。
讓我們看看它是什么樣的:
在上圖中,藍色框顯示了程序執行工作的時間,紅色框是等待 I/O 操作完成所花費的時間。 該圖未按比例繪制,因為 Internet 上的請求可能比 CPU 指令花費幾個數量級,因此您的程序最終可能會花費大部分時間等待。 這是您的瀏覽器大部分時間都在執行的操作。
另一方面,有些程序類可以進行大量計算,而無需與網絡交談或訪問文件。 這些是與 CPU 約束的程序,因為限制程序速度的資源是 CPU,而不是網絡或文件系統。
這就是 CPU 約束程序的對應圖:
在閱讀下一節中的示例時,您會發現不同形式的并發在受 CPU 約束和受 I/O 約束的程序中表現更好或更差。 在您的程序中增加并發性會增加代碼和復雜性,因此您需要確定潛在的速度提升是否值得付出額外的努力。 在本文末尾,您應該有足夠的信息來開始做出此決定。
以下是簡要說明,以闡明這一概念:
接下來您將首先看到 I/O 約束程序。然后您將看到一些處理與 CPU 約束的程序的代碼。
怎么加速一個 I/O 約束的程序
讓我們從關注 I/O 約束程序和一個常見問題開始:通過網絡下載內容。 對于我們的示例,您將從幾個站點下載網頁,但實際上可能是任何網絡流量。 可視化和設置網頁更加容易。
Synchronous 版本
我們將從該任務的非并行版本開始。 請注意,此程序需要 requests 模塊。 您應該在運行它之前用 pip 安裝好 requests 模塊,可以使用 virtualenv。 此版本完全不使用并發:
import requests
import time
def download_site(url, session):
with session.get(url) as response:
print(f"Read {len(response.content)} from{url}")
def download_all_sites(sites):
with requests.Session() as session:
for url in sites:
download_site(url, session)
if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in{duration}seconds")
如您所見,這是一個相當短的程序。 download_site() 只是從 URL 下載內容并打印大小。 需要指出的一點是,我們正在使用來自 requests 的 Session 對象。
可以直接從請求中簡單地使用 get(),但是創建 Session 對象可以使請求做一些花哨的網絡技巧并真正加快速度。
download_all_sites() 創建會話,然后瀏覽站點列表,依次下載每個站點。 最后,它打印出此過程花費了多長時間,因此您可以在以下示例中看到并發對我們的幫助,這使您感到滿意。
該程序的處理圖與上一節中的 I/O 約束圖非常相似。注意: 網絡流量取決于許多因素,這些因素可能會每秒變化。 我已經看到由于網絡問題,這些測試的次數從一次運行到另一次運行翻了一番。
Synchronous 版本優點
這個版本的代碼最棒的是: 很容易。 編寫和調試相對容易。 考慮起來也更加簡單明了。 貫穿其中只有一條思路,因此您可以預測下一步是什么以及其行為方式。
Synchronous 版本存在的問題
這里最大的問題是,與我們將提供的其他解決方案相比,它相對較慢。這是最終輸出在我的計算機上給出的示例:
$ ./io_non_concurrent.py
[most output skipped]
Downloaded 160 in 14.289619207382202 seconds注意: 您的結果可能會有很大差異。 運行此腳本時,我看到時間從14.2秒到21.9秒不等。 在本文中,我以最快的速度運行了三個時間。 兩種方法之間的差異仍然很明顯。
但是,變慢并不總是一個大問題。 如果您正在運行的程序使用同步版本僅需2秒鐘,而很少運行,則可能不值得增加并發性。 你可以在這里停下來。
如果您的程序經常運行該怎么辦? 如果要花幾個小時運行怎么辦? 讓我們繼續進行并發,方法是使用線程重寫此程序。
threading 版本
您可能已經猜到了,編寫線程程序會花費更多的精力。 但是,對于簡單的案例,您花費很少的額外精力可能會感到驚訝。 這是帶有線程的相同程序的外觀:
import concurrent.futures
import requests
import threading
import time
thread_local = threading.local()
def get_session():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
def download_site(url):
session = get_session()
with session.get(url) as response:
print(f"Read {len(response.content)} from{url}")
def download_all_sites(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_site, sites)
if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in{duration}seconds")
添加線程時,整體結構相同,只需要進行一些更改。 download_all_sites() 從每個站點調用一次功能更改為更復雜的結構。
在此版本中,您正在創建 ThreadPoolExecutor,這似乎很復雜。讓我們來分解一下:ThreadPoolExecutor = 線程 + 池 + 執行器。
您已經了解線程部分。那只是我們前面提到的思路。池部分開始變得有趣。該對象將創建一個線程池,每個線程可以同時運行。最后,執行程序是控制池中每個線程運行方式和時間的部分。它將在池中執行請求。
很有幫助的是,標準庫將 ThreadPoolExecutor 實現為上下文管理器,因此您可以使用 with 語法管理創建和釋放線程池。
一旦有了 ThreadPoolExecutor,就可以使用它的便捷 .map() 方法。此方法在列表中的每個站點上運行傳遞的功能。最重要的是,它使用正在管理的線程池自動并發運行它們。
那些來自其他語言甚至 Python 2 的人可能想知道普通對象和函數在哪里,這些對象和函數管理著處理線程時所習慣的細節,例如 Thread.start(),Thread.join() 和隊列。
這些都仍然存在,您可以使用它們來實現對線程運行方式的細粒度控制。但是,從 Python 3.2 開始,標準庫添加了一個稱為 Executors 的更高級別的抽象,如果您不需要細粒度的控件,則可以為您管理許多細節。
我們的示例中另一個有趣的變化是,每個線程都需要創建自己的 request.Session() 對象。當您查看有關請求的文檔時,說起來不一定容易,但是閱讀此問題后,很顯然,每個線程都需要一個單獨的 Session。
這是線程有趣和棘手的問題之一。因為操作系統控制著您的任務何時被中斷以及另一個任務的啟動時間,所??以線程之間共享的任何數據都需要受到保護,或者是線程安全的。不幸的是,requests.Session() 不是線程安全的。
根據數據是什么以及如何使用數據,有幾種使數據訪問線程安全的策略。其中之一是使用線程安全的數據結構,例如 Python 隊列模塊中的 Queue。
這些對象使用諸如 threading.Lock 之類的低級原語,以確保只有一個線程可以同時訪問一段代碼或一部分內存。您正在通過 ThreadPoolExecutor 對象間接使用此策略。
這里使用的另一種策略是稱為線程本地存儲。 Threading.local() 創建一個看起來像全局對象的對象,但該對象特定于每個單獨的線程。在您的示例中,這是通過 threadLocal 和 get_session() 完??成的:
threadLocal = threading.local()
def get_session():
if not hasattr(threadLocal, "session"):
threadLocal.session = requests.Session()
return threadLocal.session
ThreadLocal 在線程模塊中專門用于解決此問題。看起來有些奇怪,但是您只想創建這些對象之一,而不是為每個線程創建一個。對象本身負責將來自不同線程的訪問分離到不同的數據。
調用 get_session() 時,它查找的會話是特定于其正在運行的特定線程的。因此,每個線程在首次調用 get_session() 時都會創建一個會話,然后在整個生命周期中僅在隨后的每次調用中使用該會話。
最后,簡要介紹一下選擇線程數的方法。您可以看到示例代碼使用 5 個線程。隨意使用此數字,看看整體時間如何變化。您可能希望每次下載只有一個線程是最快的,但至少在我的系統上不是。我發現最快的結果在 5 到 10 個線程之間。如果您的要求更高,那么創建和銷毀線程的額外開銷將消除任何時間節省。
此處的難題是,正確的線程數并不是一項任務到另一項任務的常數。需要一些實驗。
線程版本的優點
它很快!這是我測試的最快速度。請記住,非并行版本花費了超過14秒的時間:
$ ./io_threading.py
[most output skipped]
Downloaded 160 in 3.7238826751708984 seconds
它的執行時序圖如下所示:
它使用多個線程來同時向網站發出多個打開的請求,從而使您的程序可以重疊等待時間并更快地獲得最終結果! 對的!那是目標。
線程版本的問題
然而,正如您從示例中看到的那樣,這需要花費更多的代碼才能實現,并且您實際上必須考慮一下線程之間共享的數據。
線程可以以微妙且難以檢測的方式進行交互。 這些相互作用會導致競態條件,從而經常導致隨機,間歇性的錯誤,這些錯誤很難發現。 那些不熟悉比賽條件概念的人可能想擴大閱讀下面的部分。注意: 擴展部分未翻譯
asyncio 版本
在深入研究 asyncio 示例代碼之前,讓我們先詳細介紹一下 asyncio 的工作原理。
asyncio 基礎
這將是 asyncio 的簡化版本。這里掩蓋了許多細節,但仍然傳達了其工作原理的想法。
asyncio 的一般概念是單個 Python 對象(稱為事件循環)控制每個任務的運行方式和時間。事件循環知道每個任務并知道其處于什么狀態。實際上,任務可以處于許多狀態,但現在讓我們想象一個簡化的事件循環,其中只有兩個狀態。
就緒狀態將指示任務有工作要做并且已準備好運行,而等待狀態意味著任務正在等待某些外部事物完成,例如網絡操作。
簡化的事件循環維護兩個任務列表,每個狀態一個列表。它選擇一個就緒任務,然后將其重新啟動運行。該任務處于完全控制狀態,直到它協同將控制權移交給事件循環為止。
當正在運行的任務將控制權交還給事件循環時,事件循環將該任務放入就緒列表或等待列表中,然后遍歷等待列表中的每個任務以查看其是否已通過 I/O 操作準備就緒完成。它知道就緒列表中的任務仍處于就緒狀態,因為它知道它們尚未運行。
一旦將所有任務重新分類到正確的列表中,事件循環就會選擇要運行的下一個任務,然后重復該過程。簡化的事件循環將選擇等待時間最長的任務并運行它。重復此過程,直到事件循環完成。
asyncio 的重要一點是,任務不會在無意的情況下放棄控制。他們從不會在手術過程中被打斷。這使我們可以比 threading 方式更輕松地共享資源。您不必擔心要使你的代碼成為線程安全的。
這是 asyncio 發生情況的高級視圖。如果您想了解更多細節,如果您想更深入地了解,這個 StackOverflow 答案提供了一些很好的細節。
async and await
現在,我們來談談添加到Python中的兩個新關鍵字:async 和 await。根據上面的討論,您可以將 await 看作是一種魔術,它使任務可以將控制權交還給事件循環。當您的代碼等待函數調用時,這表明該調用可能要花一些時間,并且該任務應該放棄控制。
最好將 async 視為 Python 的標志,告訴它即將定義的函數使用了 await。在某些情況下,這并不是嚴格意義上的要求,例如異步生成器,但它在很多情況下都適用,并在您入門時為您提供了一個簡單的模型。
在下一個代碼中將看到的一個例外是 async with 語句,該語句從通常等待的對象創建上下文管理器。盡管語義有所不同,但思路是相同的:將上下文管理器標記為可以交換的內容。
您一定可以想象,管理事件循環和任務之間的交互會有些復雜。對于剛開始使用 asyncio 的開發人員來說,這些細節并不重要,但是您需要記住,任何調用 await 的函數都必須標記為 async。否則,您會收到語法錯誤。
回到代碼
現在,您已經對什么是 asyncio 有了基本的了解,讓我們看一下示例代碼的 asyncio 版本,并弄清楚它是如何工作的。 請注意,此版本添加了 aiohttp。 您應先運行 pip install aiohttp,然后再運行它:
import asyncio
import time
import aiohttp
async def download_site(session, url):
async with session.get(url) as response:
print("Read{0}from{1}".format(response.content_length, url))
async def download_all_sites(sites):
async with aiohttp.ClientSession() as session:
tasks = []
for url in sites:
task = asyncio.ensure_future(download_site(session, url))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
duration = time.time() - start_time
print(f"Downloaded {len(sites)} sites in{duration}seconds")
這個版本比前兩個版本復雜一些。 它具有類似的結構,但是設置任務要比創建 ThreadPoolExecutor 多得多。 讓我們從示例頂部開始。
download_site()
頂部的 download_site() 與線程版本幾乎完全相同,但函數定義行上的 async 關鍵字和實際調用 session.get() 時的 async with關鍵字除外。 稍后您將看到為什么可以在此處傳遞 Session 而不使用 thread-local 存儲的原因。
download_all_sites()
download_all_sites() 是您從線程示例中看到的最大變化。
您可以在所有任務之間共享會話,因此該會話在此處創建為上下文管理器。這些任務可以共享會話,因為它們都在同一線程上運行。當會話處于不良狀態時,一個任務不可能中斷另一個任務。
在該上下文管理器內部,它使用 asyncio.ensure_future() 創建任務列表,該列表還負責啟動任務。創建所有任務后,此函數將使用 asyncio.gather() 使會話上下文保持活動狀態,直到完成所有任務。
線程代碼執行的操作與此相似,但是細節可以在 ThreadPoolExecutor 中方便地處理。當前沒有 AsyncioPoolExecutor 類。
但是,這里的細節中隱藏著一個小而重要的變化。還記得我們是如何談論要創建的線程數的嗎?在線程示例中,最佳線程數并不明顯。
asyncio 的很酷的優點之一是它的伸縮性遠勝于線程。與線程相比,每個任務花費的資源更少,創建時間也更少,因此創建和運行更多任務的效果很好。這個例子只是為每個站點創建一個單獨的任務供下載,效果很好。
__main__
最后,asyncio 的性質意味著您必須啟動事件循環并告訴它要運行哪些任務。 文件底部的 __main__ 部分包含 get_event_loop() 和 run_until_complete() 的代碼。 如果沒有別的,他們在命名這些功能方面做得很好。
如果您已更新至 Python 3.7,則 Python 核心開發人員會為您簡化此語法。 可以使用 asyncio.run() 代替 asyncio. get_event_loop() 。run_until_complete() 繞口令。
asyncio 版本的優點
真的很快!在我的機器上的測試中,這是最快的代碼版本:
$ ./io_asyncio.py
[most output skipped]
Downloaded 160 in 2.5727896690368652 seconds
執行時序圖看起來與線程示例中的情況非常相似。只是I / O請求都由同一線程完成:
缺少像 ThreadPoolExecutor 這樣的好的包裝器,使得此代碼比線程示例要復雜一些。在這種情況下,您必須做一些額外的工作才能獲得更好的性能。
還有一個普遍的論點,就是必須在適當的位置添加異步并等待,這是一個額外的麻煩。在一定程度上,這是對的。該論點的另一面是,它迫使您考慮何時換出給定的任務,這可以幫助您創建更好,更快的設計。
伸縮問題在這里也很明顯。在一個線程上用 threading 示例去獲取每個站點是明顯比使用少數線程運行它更慢。運行帶有數百個任務的 asyncio 示例并不會降低它的運行速度。
asyncio 版本的問題
此時,asyncio 存在幾個問題。您需要特殊的異步版本的庫才能充分利用 asyncio。如果您只是使用請求來下載網站,則速度會慢得多,因為請求并非旨在通知事件循環其已被阻止。隨著時間的流逝,這個問題越來越小,越來越多的庫開始采用異步技術。
另一個更微妙的問題是,如果其中一項任務不合作,則協作多任務處理的所有優勢都會被丟棄。代碼中的小錯誤會導致任務運行并長時間占用處理器,從而使其他需要運行的任務餓死。如果任務沒有將控制權交還給它,則無法中斷事件循環。
考慮到這一點,讓我們著手采用根本不同的并發: multiprocessing。
multiprocessing 版本
與以前的方法不同,multiprocessing 版本可以充分利用您的全新酷炫計算機具有的多個 CPU。 或者,就我而言,那是我笨拙的舊筆記本電腦。 讓我們從代碼開始:
import requests
import multiprocessing
import time
session = None
def set_global_session():
global session
if not session:
session = requests.Session()
def download_site(url):
with session.get(url) as response:
name = multiprocessing.current_process().name
print(f"{name}:Read {len(response.content)} from{url}")
def download_all_sites(sites):
with multiprocessing.Pool(initializer=set_global_session) as pool:
pool.map(download_site, sites)
if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in{duration}seconds")
它比 asyncio 示例要短得多,并且實際上看起來與線程示例非常相似,但是在深入研究代碼之前,讓我們快速瀏覽一下 multiprocessing 為您提供的功能。
multiprocessing 總結
到目前為止,本文中的所有并發示例僅在計算機上的單個 CPU 或內核上運行。這樣做的原因與 CPython 的當前設計以及稱為全局解釋器鎖(GIL)的東西有關。
本文不會深入探討 GIL 的方式和原因。現在對你來講已經足夠知道關于 synchronous,threading 和 asyncio 版本都在單個CPU上運行。
標準庫中的 multiprocessing 旨在消除這一障礙,并在多個CPU上運行您的代碼。從高層次上講,它是通過創建一個新的 Python 解釋器實例以在每個 CPU 上運行,然后將一部分程序進行運行來實現的。
可以想象,建立一個單獨的 Python 解釋器并不像在當前的 Python 解釋器中啟動新線程那樣快。這是一項重量級的操作,并且有一些限制和困難,但是對于正確的問題,它可以帶來很大的不同。
multiprocessing 代碼
該代碼與我們的 synchronous 版本相比有一些小的變化。第一個在 download_all_sites()中。它創建一個 multiprocessing.Pool 對象,并將 download_site .map 到可迭代的站點,而不是簡單地反復調用 download_site()。從線程示例中應該看起來很熟悉。
這里發生的是,池創建了許多單獨的 Python 解釋器進程,并且每個進程都在 iterable 中的某些項目(在我們的情況下是站點列表)上運行指定的功能。主流程與其他流程之間的通信由您的 multiprocessing 模塊處理。
創建 Pool 的行值得您注意。首先,它沒有指定在池中創建多少個進程,盡管這是一個可選參數。默認情況下,multiprocessing.Pool() 將確定計算機中的 CPU 數量并進行匹配。這通常是最好的答案,就我們而言。
對于此問題,增加進程數量并不能使事情變得更快。實際上,它減慢了速度,因為建立和拆除所有這些進程的成本大于并行執行 I/O 請求的收益。
接下來,我們進行該調用的 initializer = set_global_session 部分。請記住,我們池中的每個進程都有其自己的內存空間。這意味著他們無法共享諸如 Session 對象之類的東西。您不想每次調用該函數都創建一個新的 Session,而是想為每個進程創建一個 Session。
initializer 函數參數就是針對這種情況而構建的。沒有辦法將返回值從初始化程序傳遞回由進程 download_site() 調用的函數,但是您可以初始化一個全局 session 變量來為每個進程保留單個 session。因為每個進程都有其自己的內存空間,所以每個進程的全局空間將有所不同。
這就是全部。其余代碼與您之前所見非常相似。
multiprocessing 版本的優點
此示例的 multiprocessing 版本很棒,因為它相對容易設置,并且幾乎不需要任何額外的代碼。它還充分利用了計算機中的 CPU 功能。該代碼的執行時序圖如下所示:
multiprocessing 版本的問題
此版本的示例確實需要一些額外的設置,并且全局 session 對象很奇怪。您必須花一些時間考慮在每個過程中將訪問哪些變量。
最后,在此示例中,它顯然比 asyncio 和 threading 版本慢:
$ ./io_mp.py
[most output skipped]
Downloaded 160 in 5.718175172805786 seconds
這不足為奇,因為與 I/O 約束的問題并不是 multiprocessing 真正存在的原因。進入下一部分,你將看到更多受 CPU 限制的示例。
怎么加速一個 CPU 約束程序
讓我們在這里換檔。 到目前為止,所有示例都處理了與 I/O 約束的問題。 現在,您將研究 CPU 受限的問題。 如您所見,一個受 I/O 約束的問題花費了大部分時間來等待外部操作(如網絡調用)完成。 另一方面,受 CPU 限制的問題很少執行 I/O 操作,其總體執行時間是可以處理所需數據的速度的一個因素。
就我們的示例而言,我們將使用一個有點愚蠢的函數來創建一些需要很長時間才能在 CPU 上運行的函數。 此函數計算從 0 到傳入值的每個數字的平方和:
def cpu_bound(number):
return sum(i * i for i in range(number))
您將傳遞大量的數字,因此需要一段時間。 請記住,這只是您的代碼的占位符,它實際上在做有用的事情,并且需要大量的處理時間,例如計算方程式的根或對大型數據結構進行排序。
Synchronous 版本
現在讓我們看一下該示例的非并行版本:
import time
def cpu_bound(number):
return sum(i * i for i in range(number))
def find_sums(numbers):
for number in numbers:
cpu_bound(number)
if __name__ == "__main__":
numbers = [5_000_000 + x for x in range(20)]
start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"Duration{duration}seconds")
此代碼調用 cpu_bound() 20 次,每次均使用不同的大的數字。它在單個 CPU 上的單個進程中的單個線程上完成所有這些操作。執行時序圖如下所示:
與受 I/O 約束的示例不同,受 CPU 約束的示例在運行時間上通常相當一致。這在我的機器上大約需要 7.8 秒:
$ ./cpu_non_concurrent.py
Duration 7.834432125091553 seconds
顯然,我們可以做得更好。所有這些都在沒有并發的單個CPU上運行。讓我們看看如何做才能更好。
threading 和 asyncio 版本
您認為使用線程或asyncio重寫此代碼將加快多少速度?
如果您回答“完全不”,請給自己一個小甜點。 如果您回答“它將減慢速度”,請給自己兩個甜點。
原因如下:在上面的 I/O 約束示例中,大部分時間都花在了等待緩慢的操作完成上。 線程和異步加速了此過程,它允許您重疊等待的時間,而不是依次執行。
但是,在受 CPU 限制的問題上,無需等待。 CPU 正在盡可能快地啟動以解決問題。 在 Python 中,線程和任務都在同一進程中在同一 CPU 上運行。 這意味著一個 CPU 負責完成非并行代碼的所有工作以及設置線程或任務的額外工作。 耗時超過 10 秒:
$ ./cpu_threading.py
Duration 10.407078266143799 seconds
我已經編寫了該代碼的線程版本,并將其與其他示例代碼一起放置在 GitHub repo 中,以便您可以自己進行測試。但是我們現在不看。
multiprocessing 版本
現在您終于到達了真正意義上的 multiprocessing 的地方。與其他并發庫不同, multiprocessing 被明確設計為在多個 CPU 之間共享繁重的 CPU 工作負載。它的執行時序圖如下所示:
代碼如下所示:
import multiprocessing
import time
def cpu_bound(number):
return sum(i * i for i in range(number))
def find_sums(numbers):
with multiprocessing.Pool() as pool:
pool.map(cpu_bound, numbers)
if __name__ == "__main__":
numbers = [5_000_000 + x for x in range(20)]
start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"Duration{duration}seconds")
此代碼幾乎不需要與非并行版本進行更改。您必須導入 multiprocessing,然后才從遍歷數字轉變為創建 multiprocessing.Pool 對象,并使用其 .map() 方法將單個數字發送給工作進程,以免它們變為空閑狀態。
這就是您對 I/O 約束的 multiprocessing 代碼所做的事情,但是在這里,您不必擔心 Session 對象。
如上所述,multiprocessing.Pool() 構造函數的 process 可選參數值得關注。您可以指定要在池中創建和管理的 Process 對象數量。默認情況下,它將確定計算機中有多少個 CPU,并為每個 CPU 創建一個進程。盡管這對于我們的簡單示例非常有用,但您可能希望在生產環境中擁有更多控制權。
而且,正如我們在第一部分中關于線程的提到的那樣,multiprocessing.Pool 代碼是建立在諸如 Queue 和 Semaphore 之類的構建塊上的,這些人對使用其他語言完成多線程和 multiprocessing 代碼的人很熟悉。
multiprocessing 版本的優點
此示例的 multiprocessing 版本很棒,因為它相對容易設置,并且幾乎不需要任何額外的代碼。 它還充分利用了計算機中的 CPU 功能。
嘿,這就是我上次討論 multiprocessing 時所說的。 最大的區別是這次顯然是最好的選擇。 在我的機器上花費 2.5 秒:
$ ./cpu_mp.py
Duration 2.5175397396087646 seconds
這比我們在其他選項中看到的要好得多。
multiprocessing 版本的問題
使用 multiprocessing 有一些缺點。 在這個簡單的示例中并沒有真正顯示它們,但是有時很難將問題分解成每個處理器可以獨立工作。
同樣,許多解決方案要求流程之間進行更多的交流。 這會給您的解決方案增加一些復雜性,從而使非并行程序無需處理。
什么時候使用并發
您已經在此處看到了很多介紹,因此讓我們回顧一些關鍵思想,然后討論一些決策點,這些決策點將幫助您確定要在項目中使用哪個并發模塊(如果有)。
此過程的第一步是確定是否應使用并發模塊。盡管這里的示例使每個庫看起來都非常簡單,但是并發總是帶來額外的復雜性,并且常常會導致難以發現的錯誤。
堅持添加并發,直到遇到已知的性能問題,然后確定所需的并發類型。正如 Donald Knuth 所說:“過早的優化是編程中所有(或至少大部分)邪惡的根源。”
決定優化程序后,下一步是確定程序是 CPU 約束的還是 I/O 約束的。請記住,與 I/O 約束的程序是那些花費大量時間等待事情發生的程序,而與 CPU 約束的程序則花費時間盡可能快地處理數據或處理數字。
如您所見,使用 multiprocessing 只能真正解決CPU受限的問題。線程和異步根本沒有幫助這類問題。
對于受 I/O 約束的問題,Python 社區中有一條通用的經驗法則:"能使用 asyncio 的時候 asyncio,必須使用 threading 的時候使用 threading ", asyncio 可以為此類程序提供最快的速度,但是有時您會需要尚未移植的關鍵庫以利用 asyncio。請記住,任何不放棄對事件循環的控制的任務都將阻止所有其他任務。
結論
現在,您已經了解了Python中可用的基本并發類型:threading
asyncio
multiprocessing
您已經了解了可以針對特定問題使用哪種并發方法,或者是否應該使用任何并發方法! 此外,您對使用并發時可能出現的一些問題有了更好的了解。
希望您從本文中學到了很多知識,并希望您在自己的項目中發現并發的重要用處!
總結
以上是生活随笔為你收集整理的python speed为0但是速度过快_通过并发加速你的 python 程序的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么是供应链管理?供应链管理的流程
- 下一篇: ModelSim 相关实用设置