python asyncio 异步编程-协程 2
asyncio 異步編程
官方文檔:
- 中文版:https://docs.python.org/zh-cn/3.8/library/asyncio.html
- 英文本:https://docs.python.org/3.8/library/asyncio.html
1. 事件循環(huán)
事件循環(huán) 是指主線程每次將執(zhí)行序列中的任務(wù)清空后,就去事件隊列中檢查是否有等待執(zhí)行的任務(wù),如果有則每次取出一個推到執(zhí)行序列中執(zhí)行,這個過程是循環(huán)往復(fù)的。
理解成一個死循環(huán),去檢測并執(zhí)行某些代碼。
事件循環(huán)是每個 asyncio 應(yīng)用的核心。 事件循環(huán)會運行異步任務(wù)和回調(diào),執(zhí)行網(wǎng)絡(luò) IO 操作,以及運行子進(jìn)程。
應(yīng)用開發(fā)者通常應(yīng)當(dāng)使用高層級的 asyncio 函數(shù),例如 asyncio.run(),應(yīng)當(dāng)很少有必要引用循環(huán)對象或調(diào)用其方法。
# 偽代碼任務(wù)列表 = [ 任務(wù)1, 任務(wù)2, 任務(wù)3,... ]while True:可執(zhí)行的任務(wù)列表,已完成的任務(wù)列表 = 去任務(wù)列表中檢查所有的任務(wù),將'可執(zhí)行'和'已完成'的任務(wù)返回for 就緒任務(wù) in 已準(zhǔn)備就緒的任務(wù)列表:執(zhí)行已就緒的任務(wù)for 已完成的任務(wù) in 已完成的任務(wù)列表:在任務(wù)列表中移除 已完成的任務(wù)如果 任務(wù)列表 中的任務(wù)都已完成,則終止循環(huán) import asyncio# 去生成或獲取一個事件循環(huán) loop = asyncio.get_event_loop()# 將任務(wù)放到 '任務(wù)列表' loop.run_until_complete(任務(wù))2. 詳解
2.1 async
協(xié)程函數(shù),定義函數(shù)時候 async def 函數(shù)名
協(xié)程對象,執(zhí)行 協(xié)程函數(shù)() 得到的協(xié)程對象
async def func(): # 定義一個協(xié)程函數(shù)passresult = func() # 獲取協(xié)程對象(此時函數(shù)內(nèi)部代碼不會被執(zhí)行)注:加上事件循環(huán)才能執(zhí)行協(xié)程函數(shù)內(nèi)部代碼,即將協(xié)程對象交給事件循環(huán)來處理
import asyncioasync def func():print('人生苦短,我用python')result = func()# loop = asyncio.get_event_loop() # loop.run_until_complete(result) asyncio.run(result) # 等價與上面兩行,python3.7之后引入的便捷方式2.2 await
await + 可等待對象(協(xié)程對象、Future、Task對象 —> IO等待)
2.2.1 協(xié)程對象
協(xié)程對象,執(zhí)行 協(xié)程函數(shù)() 得到的是協(xié)程對象。
示例1:
import asyncioasync def func():print('人生苦短,')response = await asyncio.sleep(2)print('我用python', response)asyncio.run(func()).# 結(jié)果: """ 人生苦短, 我用python None """示例2:
import asyncioasync def others():print('人生苦短,')await asyncio.sleep(2)print('我用python')return '返回值'async def func():print('執(zhí)行協(xié)程函數(shù)內(nèi)部代碼--->')# 遇到IO操作掛起當(dāng)前協(xié)程任務(wù),等IO操作完成之后再繼續(xù)往下執(zhí)行,當(dāng)前協(xié)程掛起時,事件循環(huán)可以去執(zhí)行其他協(xié)程任務(wù)response = await others()print('IO請求結(jié)束,結(jié)果為:', response)asyncio.run(func())""" 執(zhí)行協(xié)程函數(shù)內(nèi)部代碼---> 人生苦短, 我用python IO請求結(jié)束,結(jié)果為: 返回值 """示例3:
import asyncioasync def others():print('人生苦短,')await asyncio.sleep(2)print('我用python')return '返回值'async def func():print('執(zhí)行協(xié)程函數(shù)內(nèi)部代碼--->')# 遇到IO操作掛起當(dāng)前協(xié)程任務(wù),等IO操作完成之后再繼續(xù)往下執(zhí)行,當(dāng)前協(xié)程掛起時,事件循環(huán)可以去執(zhí)行其他協(xié)程任務(wù)response1 = await others()print('IO請求結(jié)束,結(jié)果為:', response1)response2 = await others()print('IO請求結(jié)束,結(jié)果為:', response2)asyncio.run(func()) """ 執(zhí)行協(xié)程函數(shù)內(nèi)部代碼---> 人生苦短, 我用python IO請求結(jié)束,結(jié)果為: 返回值 人生苦短, 我用python IO請求結(jié)束,結(jié)果為: 返回值 """await 就是 等待對象的值得到結(jié)果之后在繼續(xù)往下執(zhí)行。
2.2.2 Task對象
在事件循環(huán)中添加多個任務(wù)。
Tasks 用于并發(fā)調(diào)度協(xié)程,通過 asyncio.create_task(協(xié)程對象) 的方式創(chuàng)建Task對象,這樣可以讓協(xié)程加入事件循環(huán)中等待被調(diào)度執(zhí)行。除了使用 asyncio.create_task() 函數(shù)以外,還可以用低層級的 loop.create_task() 或 ensure_future() 函數(shù)。不建議手動實例化 Task 對象。
本質(zhì)上是將協(xié)程對象封裝成task對象,并將協(xié)程立即加入事件循環(huán),同時追蹤協(xié)程的狀態(tài)。
注意:asyncio.create_task() 函數(shù)在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低層級的 asyncio.ensure_future() 函數(shù)
示例1:
import asyncioasync def func():print(1)await asyncio.sleep(2)print(2)return "返回值"async def main():print("main開始")# 創(chuàng)建協(xié)程,將協(xié)程封裝到一個Task對象中并立即添加到事件循環(huán)的任務(wù)列表中,等待事件循環(huán)去執(zhí)行(默認(rèn)是就緒狀態(tài))。task1 = asyncio.create_task(func())# 創(chuàng)建協(xié)程,將協(xié)程封裝到一個Task對象中并立即添加到事件循環(huán)的任務(wù)列表中,等待事件循環(huán)去執(zhí)行(默認(rèn)是就緒狀態(tài))。task2 = asyncio.create_task(func())print("main結(jié)束")# 當(dāng)執(zhí)行某協(xié)程遇到IO操作時,會自動化切換執(zhí)行其他任務(wù)。# 此處的await是等待相對應(yīng)的協(xié)程全都執(zhí)行完畢并獲取結(jié)果ret1 = await task1ret2 = await task2print(ret1, ret2)asyncio.run(main())""" 結(jié)果:main開始 main結(jié)束 1 1 2 2 返回值 返回值 """示例2(簡化示例1):
import asyncioasync def func():print(1)await asyncio.sleep(2)print(2)return "返回值"async def main():print("main開始")# 創(chuàng)建協(xié)程,將協(xié)程封裝到Task對象中并添加到事件循環(huán)的任務(wù)列表中,等待事件循環(huán)去執(zhí)行(默認(rèn)是就緒狀態(tài))。# 在調(diào)用task_list = [asyncio.create_task(func(), name="n1"),asyncio.create_task(func(), name="n2")]print("main結(jié)束")# 當(dāng)執(zhí)行某協(xié)程遇到IO操作時,會自動化切換執(zhí)行其他任務(wù)。# 此處的await是等待所有協(xié)程執(zhí)行完畢,并將所有協(xié)程的返回值保存到done# 如果設(shè)置了timeout值,則意味著此處最多等待的秒,完成的協(xié)程返回值寫入到done中,未完成則寫到pending中。done, pending = await asyncio.wait(task_list, timeout=None)print(done, pending)asyncio.run(main())""" 結(jié)果:main開始 main結(jié)束 1 1 2 2 {<Task finished name='n1' coro=<func() done, defined at D:\coding\tronado_test\03_tasks.py:31> result='返回值'>, <Task finished name='n2' coro=<func() done, defined at D:\coding\tronado_test\03_tasks.py:31> result='返回值'>} set() """示例3:
import asyncioasync def func():print("執(zhí)行協(xié)程函數(shù)內(nèi)部代碼")# 遇到IO操作掛起當(dāng)前協(xié)程(任務(wù)),等IO操作完成之后再繼續(xù)往下執(zhí)行。當(dāng)前協(xié)程掛起時,事件循環(huán)可以去執(zhí)行其他協(xié)程(任務(wù))。response = await asyncio.sleep(2)print("IO請求結(jié)束,結(jié)果為:", response)# 錯誤:coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ] # 此處不能直接 asyncio.create_task,因為將Task立即加入到事件循環(huán)的任務(wù)列表,但此時事件循環(huán)還未創(chuàng)建,所以會報錯。 # 所以可以將協(xié)程對象添加致列表來使用。 coroutine_list = [func(),func() ]# 使用asyncio.wait將列表封裝為一個協(xié)程,并調(diào)用asyncio.run實現(xiàn)執(zhí)行兩個協(xié)程 # asyncio.wait內(nèi)部會對列表中的每個協(xié)程執(zhí)行ensure_future,封裝為Task對象。 done,pending = asyncio.run( asyncio.wait(coroutine_list) )""" 結(jié)果: 執(zhí)行協(xié)程函數(shù)內(nèi)部代碼 執(zhí)行協(xié)程函數(shù)內(nèi)部代碼 IO請求結(jié)束,結(jié)果為: None IO請求結(jié)束,結(jié)果為: None """2.2.3 async Future對象
A Futureis a special low-level awaitable object that represents an eventual result of an asynchronous operation.
future是一個特殊的低層次可等待對象,它表示異步操作的最終結(jié)果。.
Task繼承Future, Task對象內(nèi)部await結(jié)果的處理基于Future對象來的。
asyncio中的Future對象是一個相對更偏向底層的可對象,通常我們不會直接用到這個對象,而是直接使用Task對象來完成任務(wù)的并和狀態(tài)的追蹤。( Task 是 Futrue的子類 )
Future為我們提供了異步編程中的 最終結(jié)果 的處理(Task類也具備狀態(tài)處理的功能)。
示例1:
import asyncioasync def main():# 獲取當(dāng)前事件循環(huán)loop = asyncio.get_running_loop()# # 創(chuàng)建一個任務(wù)(Future對象),這個任務(wù)什么都不干。fut = loop.create_future()# 等待任務(wù)最終結(jié)果(Future對象),沒有結(jié)果則會一直等下去。await futasyncio.run(main())# 結(jié)果: 會一直等待示例2 :
import asyncioasync def set_after(fut):await asyncio.sleep(2)fut.set_result("666")async def main():# 獲取當(dāng)前事件循環(huán)loop = asyncio.get_running_loop()# 創(chuàng)建一個任務(wù)(Future對象),沒綁定任何行為,則這個任務(wù)永遠(yuǎn)不知道什么時候結(jié)束。fut = loop.create_future()# 創(chuàng)建一個任務(wù)(Task對象),綁定了set_after函數(shù),函數(shù)內(nèi)部在2s之后,會給fut賦值。# 即手動設(shè)置future任務(wù)的最終結(jié)果,那么fut就可以結(jié)束了。await loop.create_task(set_after(fut))# 等待 Future對象獲取 最終結(jié)果,否則一直等下去data = await futprint(data)asyncio.run(main())# 結(jié)果:666Future對象本身函數(shù)進(jìn)行綁定,所以想要讓事件循環(huán)獲取Future的結(jié)果,則需要手動設(shè)置。而Task對象繼承了Future對象,其實就對Future進(jìn)行擴(kuò)展,他可以實現(xiàn)在對應(yīng)綁定的函數(shù)執(zhí)行完成之后,自動執(zhí)行set_result,從而實現(xiàn)自動結(jié)束。
雖然,平時使用的是Task對象,但對于結(jié)果的處理本質(zhì)是基于Future對象來實現(xiàn)的。
2.3 運行 asyncio 程序
asyncio.run(*coro*, ***, *debug=False*)
-
執(zhí)行 coroutine coro 并返回結(jié)果。
-
此函數(shù)會運行傳入的協(xié)程,負(fù)責(zé)管理 asyncio 事件循環(huán),終結(jié)異步生成器,并關(guān)閉線程池。
-
當(dāng)有其他 asyncio 事件循環(huán)在同一線程中運行時,此函數(shù)不能被調(diào)用。
-
如果 debug 為 True,事件循環(huán)將以調(diào)試模式運行。
-
此函數(shù)總是會創(chuàng)建一個新的事件循環(huán)并在結(jié)束時關(guān)閉。它應(yīng)當(dāng)被用作 asyncio 程序的主入口點,理想情況下應(yīng)當(dāng)只被調(diào)用一次。
2.4 創(chuàng)建任務(wù)
asyncio.create_task(coro, *, name=None)
-
將 coro 協(xié)程封裝為一個 Task 并調(diào)度其執(zhí)行。返回 Task 對象。
-
name 不為 None,它將使用 Task.set_name() 來設(shè)為任務(wù)的名稱。
-
該任務(wù)會在 get_running_loop() 返回的循環(huán)中執(zhí)行,如果當(dāng)前線程沒有在運行的循環(huán)則會引發(fā) RuntimeError。
-
此函數(shù) 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低層級的 asyncio.ensure_future() 函數(shù)。
2.5 休眠
coroutine asyncio.sleep(delay, result=None, *, loop=None)
-
阻塞 delay 指定的秒數(shù)。
-
如果指定了 result,則當(dāng)協(xié)程完成時將其返回給調(diào)用者。
-
sleep() 總是會掛起當(dāng)前任務(wù),以允許其他任務(wù)運行。
-
將 delay 設(shè)為 0 將提供一個經(jīng)優(yōu)化的路徑以允許其他任務(wù)運行。 這可供長期間運行的函數(shù)使用以避免在函數(shù)調(diào)用的全過程中阻塞事件循環(huán)。
Deprecated since version 3.8, will be removed in version 3.10: loop 形參。
棄用自3.8版本,3.10版本將被刪除: loop形參
以下協(xié)程示例運行 5 秒,每秒顯示一次當(dāng)前日期:
import asyncio import datetimeasync def display_date():loop = asyncio.get_running_loop()end_time = loop.time() + 5.0while True:print(datetime.datetime.now())if (loop.time() + 1.0) >= end_time:breakawait asyncio.sleep(1)asyncio.run(display_date())""" 2021-06-16 09:45:57.199182 2021-06-16 09:45:58.199578 2021-06-16 09:45:59.200463 2021-06-16 09:46:00.200563 2021-06-16 09:46:01.201021 """3. 擴(kuò)展
3.1 concurrent.futures.Future對象
使用 線程池、進(jìn)程池 實現(xiàn)異步操作時用到的對象。
示例:
import time from concurrent.futures import Future from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.process import ProcessPoolExecutordef func(value):time.sleep(1)print('------>', value)pool = ThreadPoolExecutor(max_workers=5)# 或 pool = ProcessPoolExecutor(max_workers=5)for i in range(10):fut = pool.submit(func, i)print(fut)""" 結(jié)果: <Future at 0x1f178d68730 state=running> <Future at 0x1f1790dca00 state=running> <Future at 0x1f1790dcd60 state=running> <Future at 0x1f1790f7130 state=running> <Future at 0x1f1790f74c0 state=running> <Future at 0x1f1790f7850 state=pending> <Future at 0x1f1790f7970 state=pending> <Future at 0x1f1790f7a90 state=pending> <Future at 0x1f1790f7bb0 state=pending> <Future at 0x1f1790f7cd0 state=pending> ------>------> 01------> 3------>------>42 ------> ------>6 5 ------> ------>------> 87 9 """上述兩種 future 可能會存在交叉使用的時候,所有在此以作對比。
使用場景:協(xié)程的使用必須有第三方庫的支持,爬蟲使用 asyncIO 有 aiohttp 庫支持,可以使用協(xié)程來做異步編程,但是在使用有些不支持協(xié)程的第三方庫的時候,就要使用 進(jìn)程池、線程池 的方式來實現(xiàn)異步編程。
import time import asyncio import concurrent.futuresdef func1():# 某個耗時操作time.sleep(2)return "SB"async def main():loop = asyncio.get_running_loop()# 1. Run in the default loop's executor ( 默認(rèn)ThreadPoolExecutor )# 第一步:內(nèi)部會先調(diào)用 ThreadPoolExecutor 的 submit 方法去線程池中申請一個線程去執(zhí)行func1函數(shù),并返回一個concurrent.futures.Future對象# 第二步:調(diào)用asyncio.wrap_future將concurrent.futures.Future對象包裝為asycio.Future對象。# 因為concurrent.futures.Future對象不支持await語法,所以需要包裝為 asycio.Future對象 才能使用。fut = loop.run_in_executor(None, func1)result = await futprint('default thread pool', result)# 2. Run in a custom thread pool:# with concurrent.futures.ThreadPoolExecutor() as pool:# result = await loop.run_in_executor(# pool, func1)# print('custom thread pool', result)# 3. Run in a custom process pool:# with concurrent.futures.ProcessPoolExecutor() as pool:# result = await loop.run_in_executor(# pool, func1)# print('custom process pool', result)asyncio.run(main())# 結(jié)果:default thread pool SB示例2(async + 不支持異步編程的模塊):
import asyncio import requestsasync def download_image(url):# 發(fā)送網(wǎng)絡(luò)請求,下載圖片(遇到網(wǎng)絡(luò)下載圖片的IO請求,自動化切換到其他任務(wù))print("開始下載:", url)loop = asyncio.get_event_loop()# requests模塊默認(rèn)不支持異步操作,所以就使用線程池來配合實現(xiàn)了。future = loop.run_in_executor(None, requests.get, url)response = await futureprint('下載完成')# 圖片保存到本地文件file_name = url.rsplit('_')[-1]with open(file_name, mode='wb') as file_object:file_object.write(response.content)if __name__ == '__main__':url_list = ['https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg', 'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg', 'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg']tasks = [download_image(url) for url in url_list]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))""" 結(jié)果:開始下載: https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg 開始下載: https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg 開始下載: https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg 下載完成 下載完成 下載完成 """3.2 異步迭代器
什么是異步迭代器
實現(xiàn)了 aiter() 和 anext() 方法的對象。anext 必須返回一個 awaitable 對象。async for 會處理異步迭代器的 anext() 方法所返回的可等待對象,直到其引發(fā)一個 StopAsyncIteration 異常。由 PEP 492 引入。
什么是異步可迭代對象?
可在 async for 語句中被使用的對象。必須通過它的 aiter() 方法返回一個 asynchronous iterator。由 PEP 492 引入。
異步迭代器其實沒什么太大的作用,只是支持了async for語法而已。
import asyncioclass Reader(object):""" 自定義異步迭代器(同時也是異步可迭代對象) """def __init__(self):self.count = 0async def readline(self):# await asyncio.sleep(1)self.count += 1if self.count == 100:return Nonereturn self.countdef __aiter__(self):return selfasync def __anext__(self):val = await self.readline()if val == None:raise StopAsyncIterationreturn valasync def func():# 創(chuàng)建異步可迭代對象async_iter = Reader()# async for 必須要放在async def函數(shù)內(nèi),否則語法錯誤。async for item in async_iter:print(item)asyncio.run(func())3.3 異步上下文管理器
此種對象通過定義 aenter() 和 aexit() 方法來對 async with 語句中的環(huán)境進(jìn)行控制。由 PEP 492 引入。
import asyncioclass AsyncContextManager:def __init__(self):self.conn = Noneasync def do_something(self):# 異步操作數(shù)據(jù)庫return 666async def __aenter__(self):# 異步鏈接數(shù)據(jù)庫self.conn = await asyncio.sleep(1)return selfasync def __aexit__(self, exc_type, exc, tb):# 異步關(guān)閉數(shù)據(jù)庫鏈接await asyncio.sleep(1)async def func():async with AsyncContextManager() as f:result = await f.do_something()print(result)asyncio.run(func())# 結(jié)果:6663.4 uvloop
是asyncio 事件循環(huán)的替代方案,它的效率高于默認(rèn)asyncio的事件循環(huán)。
安裝:
pip install uvloop在使用前必須添加以下代碼:
import asyncio import uvloop # 將asyncio的默認(rèn)事件方案替換為uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())# 編寫asyncio的代碼,與之前寫的代碼一致。 # 內(nèi)部的事件循環(huán)自動化會變?yōu)閡vloop asyncio.run(...)總結(jié)
以上是生活随笔為你收集整理的python asyncio 异步编程-协程 2的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SSD300网络结构(pytorch)+
- 下一篇: 模式识别与机器学习笔记(一)