python如何封装成可调用的库_在python中如何以异步的方式调用第三方库提供的同步API...
在關于asyncio的基本用法中提到,asyncio并不是多線程。在協程中調用同步(阻塞函數),都占用同一線程的CPU時間,即當前線程會被阻塞(即協程只會在等待一個協程時可能出讓CPU,如果是普通函數,它是不會出讓CPU的,會一直執行直到完成,或者被其它線程中斷)。
如果我們依賴的某個第三方庫并不是異步的,那么對其API的調用也會阻塞住。如果這個第三方庫是網絡IO請求密集型的,那么是可以通過多線程甚至多進程封裝,從而將其改造成異步庫的。
本文提供了通過concurrent.futures庫來實現多線程異步封裝的思路和實現。
concurrent.futures
這個包提供了線程池和進程池的實現。從Python 3.5以后,asyncio提供了loop.run_in_executor的實現,將asyncio的協程與concurrent.futures的future連接起來的方法。這樣我們自己就不用去實現線程池,信號機制、返回值的傳遞機制了。
我們這里不仔細分析兩者的連接及內部機制,只通過一個例子來展示如何使用:
from concurrent.futures import ThreadPoolExecutor
import time
import asyncio
def work():
time.sleep(5)
return 'done'
async def main(loop):
executor = ThreadPoolExecutor()
result = await loop.run_in_executor(executor, work)
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
上面的代碼已經很清楚了。代碼定義了一個線程池executor,通過loop.run_in_executor,將同步調用work轉化成異步調用,并且work的返回值也一并傳遞出來。
整個代碼段都是異步函數風格的。如果你多調用幾次await loop.run_in_executor(executor, work),就會發現代碼的執行也確實是異步行為。
通過代理機制封裝
明白了通過concurrent.futures來實現同步轉異步的原理,理論上我們就可以依照上面的方式,將任何一個同步調用(比如上面的work),轉化成異步調用了。
但如果第三方庫提供了非常多的API,我們就得考慮更優美的實現方式,以減少重復代碼量。這里我們使用代理機制。
首先我們來看一個特別的函數, getattr(self, name)。如果我們有一個類對象foo,通過foo來引用其屬性bar時,如果bar不存在,python就會調用getattr來繼續查找這個bar,如果getattr沒有被我們改寫,則結果仍然會是找不到,此時就會拋出熟悉的AttributeError:
AttributeError: 'Foo' object has no attribute 'bar'
我們可以利用這個特性來實現Python的對象代理。假設被代理的庫名為somelib,其中提供了一個同步的網絡函數send,則我們可以通過代理技術來實現一個mylib,當調用mylib.send時,最終仍然通過somelib.send來完成功能,但它是異步的。
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncWrapper:
def __init__(self, subject, loop=None, max_workers=None):
self.subject = subject
self.loop = loop or asyncio.get_event_loop()
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def __getattr__(self, name):
origin = getattr(self.subject, name)
if callable(origin):
def foo(*args, **kwargs):
return self.run(origin, *args, **kwargs)
# cache the function we built right now, to avoid later lookup
self.__dict__[name] = foo
return foo
else:
return origin
async def run(self, origin_func, *args, **kwargs):
def wrapper():
return origin_func(*args, **kwargs)
return await self.loop.run_in_executor(self.executor, wrapper)
這里我實現了一個非常簡單的異步封裝器AsynWrapper。構造函數接受三個參數,第一個為要代理的對象主體,在我們的例子中即為somelib。第二個是event loop對象,如果不提供,則會自動生成。第三個是初始化線程池所需要的。
這里要注意event loop對象盡管是可選的,但如果你的程序是多線程的,則必須在主線程中獲取event loop對象并將其傳遞過來。因為每個線程都有自己的event loop,它們之間無法同步。
改寫的getattr是我們實現魔法的地方。假設我們通過AsyncWrapper生成了一個對象foo,則在foo上調用send函數時:
await foo.send(...)
當foo.send()被調用時,究竟發生了什么?可以認為這里發生了兩件事,第一件事是要找到foo.send這個函數對象,其次是要對它進行調用。看起來比較啰嗦,但卻是理解我們封裝的關鍵。
我們先看查找。
由于foo本身是沒有send這個屬性的,因此getattr被調用,并且傳入了name = 'send'。我們先檢查這個send是否是原來lib中的一個函數,因為我們沒有必要也不應該攔截屬性:
origin = getattr(self.subject, name)
if callable(origin):
#替換
else:
return origin
因此如果send是somelib中的一個屬性(比如常量),我們直接返回其值。但如果它是一個可執行對象,那么我們將其封裝成一個異步函數。
如果send是一個函數呢?我們當然不能直接返回它,而應該返回另一個函數,在這個函數里,它將在executor中執行origin,從而實現異步化。這個函數就是self.run:
async def run(self, origin_func, *args, **kwargs):
def wrapper():
return origin_func(*args, **kwargs)
return await self.loop.run_in_executor(self.executor, wrapper)
這里的內聯函數wrapper只是為了將參數封裝,因為run_in_executor只接受位置參數(args),而不接受可選參考(*kwargs)。
現在問題來了,如何在getattr中返回run對象,并且這個run對象知道應該執行哪一個origin函數呢?這就是內聯函數foo的作用。它將origin原本應該有的參數,以及origin本身一起打包:
def foo(*args, **kwargs):
return self.run(origin, *args, **kwargs)
最后要提到的就是這一行:
self.__dict__[name] = foo
這是一種優化。如此以來,下一次我們再調用foo.send時,getattr就不會再調用了,因為send已經成為foo的一個方法。
Demo
import somelib
async main():
foo = AsyncWrapper(somelib)
await foo.send("hello world!")
其它
除了getattr外,python還提供了getattribute函數。兩者的區別是,后者無論如何(即在foo中有send屬性時)都會被調用。考慮到我們的目的,這里當然使用getattr。
總結
以上是生活随笔為你收集整理的python如何封装成可调用的库_在python中如何以异步的方式调用第三方库提供的同步API...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python如何做一个数据库_Pytho
- 下一篇: vantui框架switch上显示提示文