asyncio并发数_Python Futures并发编程详解
無論哪門編程語言,并發(fā)編程都是一項很常用很重要的技巧。例如,爬蟲就被廣泛應(yīng)用在工業(yè)界的各個領(lǐng)域,我們每天在各個網(wǎng)站、各個 App 上獲取的新聞信息,很大一部分便是通過并發(fā)編程版的爬蟲獲得。正確合理地使用并發(fā)編程,無疑會給程序帶來極大的性能提升。因此,本節(jié)就帶領(lǐng)大家一起學(xué)習(xí) Python 中的 Futures 并發(fā)編程。首先,先帶領(lǐng)大家從代碼的角度來理解并發(fā)編程中的 Futures,并進(jìn)一步來比較其與單線程的性能區(qū)別。假設(shè)有這樣一個任務(wù),要下載一些網(wǎng)站的內(nèi)容并打印,如果用單線程的方式,它的代碼實現(xiàn)如下所示(為了突出主題,對代碼做了簡化,忽略了異常處理):
import requestsimport timedef download_one(url): resp = requests.get(url) print('Read {} from {}'.format(len(resp.content), url)) def download_all(sites): for site in sites: download_one(site)def main(): sites = [ 'http://c.biancheng.net', 'http://c.biancheng.net/c', 'http://c.biancheng.net/python' ] start_time = time.perf_counter() download_all(sites) end_time = time.perf_counter() print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time)) if __name__ == '__main__': main()輸出結(jié)果為:
Read 52053 from http://c.biancheng.net
Read 30718 from http://c.biancheng.net/c
Read 34470 from http://c.biancheng.net/python
Download 3 sites in 0.3537296 seconds
注意,此程序中,requests 模塊需單獨安裝,可通過執(zhí)行 pip install requests 命令進(jìn)行安裝。
這種方式應(yīng)該是最直接也最簡單的:
先是遍歷存儲網(wǎng)站的列表;
然后對當(dāng)前網(wǎng)站執(zhí)行下載操作;
等到當(dāng)前操作完成后,再對下一個網(wǎng)站進(jìn)行同樣的操作,一直到結(jié)束。
可以看到,總共耗時約 0.35s。單線程的優(yōu)點是簡單明了,但是明顯效率低下,因為上述程序的絕大多數(shù)時間都浪費在了 I/O 等待上。程序每次對一個網(wǎng)站執(zhí)行下載操作,都必須等到前一個網(wǎng)站下載完成后才能開始。如果放在實際生產(chǎn)環(huán)境中,我們需要下載的網(wǎng)站數(shù)量至少是以萬為單位的,不難想象,這種方案根本行不通。接著再來看多線程版本的代碼實現(xiàn):
import concurrent.futuresimport requestsimport threadingimport timedef download_one(url): resp = requests.get(url) print('Read {} from {}'.format(len(resp.content), url))def download_all(sites): with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: executor.map(download_one, sites)def main(): sites = [ 'http://c.biancheng.net', 'http://c.biancheng.net/c', 'http://c.biancheng.net/python' ] start_time = time.perf_counter() download_all(sites) end_time = time.perf_counter() print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))if __name__ == '__main__': main()運行結(jié)果為:
Read 52053 from http://c.biancheng.net
Read 30718 from http://c.biancheng.net/c
Read 34470 from http://c.biancheng.net/python
Download 3 sites in 0.1606366 seconds
可以看到,總耗時是 0.2s 左右,效率一下子提升了很多。
注意,雖然線程的數(shù)量可以自己定義,但是線程數(shù)并不是越多越好,因為線程的創(chuàng)建、維護(hù)和刪除也會有一定的開銷,所以如果設(shè)置的很大,反而可能會導(dǎo)致速度變慢。我們往往需要根據(jù)實際的需求做一些測試,來尋找最優(yōu)的線程數(shù)量。
上面兩段代碼中,多線程版本和單線程版的主要區(qū)別在于如下代碼:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_one, sites)
這里創(chuàng)建了一個線程池,總共有 5 個線程可以分配使用。executer.map() 與前面所講的 Python 內(nèi)置的 map() 函數(shù)類似,表示對 sites 中的每一個元素并發(fā)地調(diào)用函數(shù) download_one()。在 download_one() 函數(shù)中使用的 requests.get() 方法是線程安全的,在多線程的環(huán)境下也可以安全使用,不會出現(xiàn)條件競爭(多個線程同時競爭使用同一資源)的情況。當(dāng)然,也可以用并行的方式去提高程序運行效率,只需要在 download_all() 函數(shù)中做出下面的變化即可:
with futures.ThreadPoolExecutor(workers) as executor
#=>
with futures.ProcessPoolExecutor() as executor:
這部分代碼中,函數(shù) ProcessPoolExecutor() 表示創(chuàng)建進(jìn)程池,使用多個進(jìn)程并行的執(zhí)行程序。不過,這里通常省略參數(shù) workers,因為系統(tǒng)會自動返回 CPU 的數(shù)量作為可以調(diào)用的進(jìn)程數(shù)。但是,并行的方式一般用在 CPU heavy 的場景中,因為對于 I/O heavy 的操作,多數(shù)時間都會用于等待,相比于多線程,使用多進(jìn)程并不會提升效率。反而很多時候,因為 CPU 數(shù)量的限制,會導(dǎo)致其執(zhí)行效率不如多線程版本。
什么是Futures?
Python Futures 模塊,位于 concurrent.futures 和 asyncio 中,它們都表示帶有延遲的操作。Futures 會將處于等待狀態(tài)的操作包裹起來放到隊列中,這些操作的狀態(tài)隨時可以查詢,當(dāng)然它們的結(jié)果(或是異常)也能夠在操作完成后被獲取。通常來說,用戶不用考慮如何去創(chuàng)建 Futures,這些 Futures 底層都會幫我們處理好,唯一要做的只是去設(shè)定這些 Futures 的執(zhí)行。比如,Futures 中的 Executor 類,當(dāng)執(zhí)行 executor.submit(func) 時,它便會安排里面的 func() 函數(shù)執(zhí)行,并返回創(chuàng)建好的 future 實例,以便之后查詢調(diào)用。這里再介紹一些常用的函數(shù)。比如 Futures 中的方法 done(),表示相對應(yīng)的操作是否完成,返回 True 表示完成;返回 False 表示沒有完成。不過要注意的是,done() 是非阻塞的,會立即返回結(jié)果。相對應(yīng)的 add_done_callback(fn),則表示 Futures 完成后,相對應(yīng)的參數(shù)函數(shù) fn 會被通知并執(zhí)行調(diào)用。Futures 中還有一個重要的函數(shù) result(),它表示當(dāng) future 完成后,返回其對應(yīng)的結(jié)果或異常。而 as_completed(fs),則是針對給定的 future 迭代器 fs,在其完成后返回完成后的迭代器。所以,上述例子也可以寫成下面的形式:
import concurrent.futuresimport requestsimport timedef download_one(url): resp = requests.get(url) print('Read {} from {}'.format(len(resp.content), url))def download_all(sites): with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: to_do = [] for site in sites: future = executor.submit(download_one, site) to_do.append(future) for future in concurrent.futures.as_completed(to_do): future.result()def main(): sites = [ 'http://c.biancheng.net', 'http://c.biancheng.net/c', 'http://c.biancheng.net/python' ] start_time = time.perf_counter() download_all(sites) end_time = time.perf_counter() print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))if __name__ == '__main__': main()運行結(jié)果為:
Read 52053 from http://c.biancheng.net
Read 34470 from http://c.biancheng.net/python
Read 30718 from http://c.biancheng.net/c
Download 3 sites in 0.2275894 seconds
此程序中,首先調(diào)用 executor.submit(),將下載每一個網(wǎng)站的內(nèi)容都放進(jìn) future 隊列 to_do 等待執(zhí)行。然后是 as_completed() 函數(shù)在 future 完成后便輸出結(jié)果。不過,這里要注意,future 列表中每個 future 完成的順序和它在列表中的順序并不一定完全一致。到底哪個先完成、哪個后完成,取決于系統(tǒng)的調(diào)度和每個 future 的執(zhí)行時間。
《新程序員》:云原生和全面數(shù)字化實踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的asyncio并发数_Python Futures并发编程详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 息烽小寨坝附近租房一室一厅500元一个月
- 下一篇: lisp 焊接符号标注_焊接符号标注大全