python asyncio理解_我实在不懂Python的Asyncio
這是Flask,Sentry的作者Armin Ronacher的一篇博客,這篇文章的影響很大,后來asyncio的文檔重寫就是受這篇文章影響。這篇文章寫于2016.10.30。而Asyncio的一個重要的PEP525(加入了async/await語法),是2016.7.28出臺的。也就是說,在PEP525之后,本文作者決定學習一下Asyncio,但是卻覺得是一個大坑。
最近我詳細地看了一遍Python的asyncio模塊。原因是,我想要使用事件IO來做一些工作,我決定試一下Python世界最近很火的新東東。我最初感受到的是,這個asyncio系統比我預期中的要復雜的多?,F在我十分確定的是,我不知道如何正確地使用它。
它的概念并不是很難理解,畢竟它從Twisted中借鑒了很多。但是它的很多細節,我很難搞清楚到底是什么。也許是我不夠聰明,不過我還是想分享一下哪些東西讓我很困惑。
原語
asyncio被設計于,通過協程來實現異步IO。最初,是通過yield和yield from表達式來實現的,不過現在它變得十分復雜。
下面是目前我必須了解的概念:
事件循環(event loop)
事件循環政策(event loop policy)
可等待對象(awaitable)
協程函數(coroutine function)
舊式協程函數(old style coroutine function)
協程(coroutine)
協程封裝器(coroutine wrapper)
生成器(generator)
futures
concurrent futures
tasks
handles
executors
transports
protocols
除此之外,語言中還增加了下面這些特殊方法:
__aenter__和__aexit__,用來實現異步的with語句塊.
__aiter__和__anext__,用來實現異步的迭代器(異步循環,和異步解析式).另外這個協議更改過。在3.5中,它返回awaitable。在3.6中,它返回異步生成器。
__await__,用來定義自定義awaitable。
文檔中涵蓋的這些知識也太多啦。不過我做了一些筆記,讓一些東西可以更好理解。
事件循環(Event Loop)
asyncio中的事件循環,和你乍看之下所期望的那個事件循環有很大的不同。
表面看起來,每個線程都有一個事件循環,但是實際上它不是這么工作的。
下面是我猜想它如何工作的:
如果你在主線程,那么事件循環會在你調用asyncio.get_event_loop()的時候被創建。
如果你在其它線程中調用asyncio.get_event_loop(),那么會拋出一個RuntimeError。
你可以在任何時候,通過asyncio.set_event_loop(),來將一個事件循環和當前的線程綁定起來。
事件循環,也可以在不綁定與當前線程的時候工作。
asyncio.get_event_loop()返回與線程綁定的事件循環,并不是返回當前運行的那個事件循環。
這些行為組合起來,非常地讓人困擾。
首先,你要知道底層的事件循環政策,這樣才能明白具體的行為。默認情況下,事件循環被綁定到了線程。另外,從理論上來說,事件循環可以被綁定到greelet或者類似的東西上面。不過重要的是,庫代碼不能控制政策,asyncio也沒有理由和線程扯上關系。
其次,asyncio并沒有要求事件循環通過政策來綁定上下文。事件循環完全可以在一個隔離環境中良好地運行。這是庫代碼中協程,或者類似東西遇到的第一個問題,因為它們不知道由哪個事件循環來負責規劃自己。這意味著,你在一個協程中調用asyncio.get_evenet_loop(),你并不知道返回的事件循環是哪個。這也是為什么所有的API都會需要一個可選的loop參數的原因。
舉例來說,想要知道目前哪個協程正在運行,你不可以像直接調用Task.get_current來得到,除非你顯式地傳入loop:
def get_task():
loop = asyncio.get_event_loop()
try:
return asyncio.Task.get_current(loop)
except RuntimeError:
return None
也就是說,在庫代碼中,你需要在任何地方都顯式地傳入loop,否則可能會發生非常古怪的行為。我不確定這樣設計背后的考量,但是如果這里沒有被修改(get_event_loop()返回當前運行的事件循環),那么就有必要在其它地方作出修改,比如要求必須傳入loop參數,要求loop綁定當前上下文(比如線程)。
由于事件循環政策沒有為當前上下文提供一個標志符,所以庫代碼可能在任何地方為當前上下文作出標識。另外,在上下文結束的時候,也沒有callback可以設定。
Awaitables和Coroutines
就我個人的淺見,Python設計上的一個最大失誤就是讓迭代器攜帶了太多功能。它不僅可以用來迭代,還可以用來支持各種協程。
Python迭代器中的一個最大錯誤就是,如果沒有捕獲,StopIteration會持續冒泡。這樣會在生成器或者協程終止的時候,產生很大的底層異常。Jinja開發過程中,和這個問題戰斗了很久。模版引擎內部渲染原理可以看作是一個生成器,如果模版中因為某種原因出現了StopIteration,那么渲染就會結束。
Python從這個過載系統中學到的教訓很少。在3.x初始版本中,asyncio還沒有得到語言層面支持,所以需要使用裝飾器+生成器的方式來編寫協程。為了實現yield from, StopIteration會過載多次。這會導致怪異的行為:
>>> def foo(n):
... if n in (0, 1):
... return [1]
... for item in range(n):
... yield item * 2
...
>>> list(foo(0))
[]
>>> list(foo(1))
[]
>>> list(foo(2))
[0, 2]
沒有錯誤,沒有警告,但是我想結果出乎大家的意料。這是因為,在生成器函數中的return,實際上是拋出了一個StopIteration異常,并且攜帶一個參數值代表返回值。這個異常不會被迭代器協議抓取,只會被協程代碼獲取。
在3.5和3.6版本中有巨大的改變,因為現在除了生成器我們還有協程對象。可以通過在定義函數式加入前綴async來實現。例如async def x()會制造一個協程。在3.6中,異步生成器現在還會拋出AsyncStopIteration。在3.5版本,如果使用future import(generator_stop),那么如果在迭代中拋出StopIteration,它會被替換為RuntimeError。
為什么我提到上面這些?因為那些舊東西未曾離開。生成器仍然有send和throw,協程很大程度上仍然像是生成器。
為了區分那些重復之處,python引入了一些新的概念:
awaitable: 一個擁有__await__方法的對象??梢允窃鷧f程,舊式協程,或者其它對象。
coroutinefunction: 一個返回原生協程的函數。請不要搞混淆,這不是一個返回協程的函數。
coroutine:原生協程。注意,在目前為止,文檔中并沒有把舊式的asyncio協程看作是協程。最少insepect.iscoroutine并沒有把它們看作是協程。那些舊式協程,可以看作是future/awaitable這些分支。
另外特別讓人困惑的是,asyncio.iscoroutinefunction和inspect.iscoroutinefunction竟然含義不同。inspect.iscoroutine和inspect.iscoroutinefunction是相同的。
Coroutine Wrappers
在python看到async def的時候,它會調用一個thread local的協程封裝器。它通過sys.set_coroutine_wrapper來進行調用,被封裝的對象是函數??雌饋硐裣旅孢@樣:
>>> import sys
>>> sys.set_coroutine_wrapper(lambda x: 42)
>>> async def foo():
... pass
...
>>> foo()
__main__:1: RuntimeWarning: coroutine 'foo' was never awaited
42
在上面例子中,我沒有調用開始的匿名函數,這樣的示例應該可以讓你看出coroutine wrapper干了什么。另外這個coroutine wrapper是thread local的,也就是說如果你調換了事件循環政策,你需要重新設定這個wrapper。新的線程也不會從父線程中繼承這個。
Awaitables and Futures
一些東西是awaitable的。就目前為止,我看到下面這些都是awaitable:
原生協程
加入了偽造CO_ITERABLE_COROUTINE flag的生成器
擁有__await__方法的對象
這些對象都有__await__方法,除了生成器因為歷史原因而沒有。所以CO_ITERABLE_COROUTINE這個flag是什么?它來自于coroutine wrapper(不要和sys.set_coroutine_wrapper搞混),這個wrapper是@asyncio.coroutine。這會間接地將生成器使用types.coroutine(不要和types.CoroutineType或者asyncio.coroutine混淆)來封裝,它會重新創建內部的對象,并且加入一個額外的flag: CO_ITERABLE_COROUTINE.
那么什么是future呢?首先,我們要搞明白一件事:在Python3中,有兩種類型的future,并且完全不兼容。包括asyncio.futures.Future和concurrent.futures.Future。它們不是同時誕生的,但是可以同時在asyncio中使用。例如,asyncio.run_coroutine_threadsafe()會將一個協程下方到另一個線程的事件循環中,并返回一個concurrent.futures.Future,而不是一個asyncio.futures.Future對象。這講得通,因為concurrent.futures.Future是線程安全的。
現在我們知道在asyncio有兩種不兼容的future了。老實說,我不知道它們的作用,但是先可以把它們叫做“最終要發生的”。這是一個對象,最后會持有一個值,讓你可以處理,但是目前這個值可能還在計算中。一些這種東西的變種叫做deferred, promises。它們之間有什么不同,老實說我也不知道。
你可以對future做什么?你可以對它加上一個callback,在future完成的時候被調用;或者加上另一個callback,在future失敗的時候被調用。另外你可以對它使用await(這會實現__await__方法,所以這也是一個awaitable)。另外任何future都可以被取消。
那么你如何得到一個future呢?你可以對一個awaitable對象調用asyncio.ensure_future。這樣可以把一個舊式的協程轉換為future。
不過,如果你閱讀了文檔,你會發現asyncio.ensure_future實際返回的是一個Task。那么什么是Task呢?
Tasks
Task是一種future,它用一種特別的方式封裝了一個協程。它可以像一個future一樣工作,但是它還有一些額外的方法,可以用來提取協程包含的當前棧信息。我們之前提到過task,因為它有唯一一個可以用來獲取當前事件循環的方法,也就是Task.get_current。
另外,future和task取消的方式也有不同,但是這里不再提。如果你在編寫一個協程的時候,你想要知道這個協程何時在運行,你可以通過Task.get_current來知道,不過你需要另外知道你分派的事件循環綁定在哪個線程。
不太可能知道哪個協程由哪個事件循環來運行。Task也沒有提供公共API來提供這個功能。不過,如果你能過處理一個task,那么你可以通過task._loop這個屬性來訪問到事件循環。
Handles
Handles是一個難懂的對象,是一個用來處理待執行,不可await,但是可以取消的對象。
詳細來講,如果你通過call_soon或者call_soon_threadsafe等來規劃執行,你就獲得一個handle,你可以用來取消執行,但是不可以用它來等待執行完成。
Executors
你如何通知其他的線程來完成一些事情呢?你不可以在另一個線程中為當前的事件循環規劃回調函數,然后獲得結果。所以你需要executors。
Executors來自于concurrent.futures,它允許你將非事件型的工作交給線程完成。比如,如果你在一個事件循環中使用run_in_executor來規劃一個函數。結果會以asyncio協程的方式來返回,而不是像run_coroutine_threadsafe一樣返回concurrent協程。我沒有足夠的心力來理解為什么存在這些API,不知道何時使用哪個API。文檔中建議,executor可以用來執行多進程的事情。
Transport and Protocols
這些東西基本拷貝自twisted,如果你需要理解它們,就去閱讀文檔吧。
如何使用Asyncio
現在我們粗略的理解了asyncio,另外我找到一些人們編寫asyncio代碼的常見模式:
將loop傳入所有的協程。社區中相當一部分的人都是這么做的。讓協程知道自己被哪個loop來規劃,讓協程可以做類似task的事情。
另外,你可以要求loop綁定線程。理想情況下這是一個好辦法,不過可惜社區存在割裂。
如果你想要使用上下文數據(類似thread local),現在沒有什么好辦法。最受歡迎的實現方式是第三方庫aiolocals,但是它需要你手動將信息傳播,因為解釋器現在還不支持。
忘記Python中存在的舊式協程。請使用Python3.5以上版本,比只使用async/await關鍵字。使用新的協程,可以使用異步上下文管理器,這對于資源管理來說相當有用。
學會重啟loop來清理。這里我花了很長時間才明白,它不是我意料之中的方式,但是是現在最有用的方法,定時地將loop重啟,可以清除那些遺留下來沒有執行的協程。
使用subprocess的方式不清晰。你需要有一個loop運行在主線程(我認為是用來監聽signal事件的),然后把subprocess分派給其他的loop。用如下的方式asyncio.get_child_watcher().attach_loop(...).
想要同時編寫異步和同步代碼,注定是要失敗的。另外如果要對對象同時支持with和async with也是很危險的。
如果你想要給一個協程設置名稱,用來在調試的時候知道為什么它沒有被await。設置__name__是沒有用的,你需要使用__qualname__。
有時候內部類型轉換會讓你發瘋。
上下文數據
除了異常的復雜度,我思考使用asycio編寫好的API,還缺少一個東西,就是context local數據。這個東西已經被node社區學會了。
有一個continuation-local-storage已經被接受,但是實現地太晚了。
令人失望的是,在python中目前還沒有任何store可以用。我一直在關注,因為我一直想要使用asyncio來支持Sentry的breadcrumbs,但是還沒有看到好的辦法。asyncio中沒有context的概念,因為如果不使用monkeypatch,從代碼中看不出你使用的是哪個loop,也就不能獲取信息。
Node目前一直在想要為這個問題找到一個長期的處理方法。這個問題對于任何生態都是不可忽略的。這個問題叫做named async context propagation,解決方式有各種名字。在Go中,需要使用context包,并且顯示地傳入所有的goroutine中(不是一個很好的方式,但是最少也提供了解決方案)。.NET對于local context有著最佳解決方案。它可以是一個線程上下文,一個web請求上下文,或者類似的東西,它們都會自動向上傳播除非你抑制它。微軟為了解決這個問題,我相信已經花了15年的時間。
我不知道asyncio生態是否足夠年輕,可以從邏輯上讓context加入,但是我認為應該現在開始做。
個人想法
asycnio已經很復雜,并且會變得更加復雜。我沒有足夠的心智能力來使用asyncio做日常工作。理解它需要不斷地知道語言改動,并且它對語言帶來了巨大的復雜性。也許它還需要數年時間,才可以帶來享受并且穩定的開發體驗。
總結
以上是生活随笔為你收集整理的python asyncio理解_我实在不懂Python的Asyncio的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 两个listmap合并去重_Excel
- 下一篇: python正则表达式在线_python