python协程实时输出_python协程
不知道你有沒(méi)有被問(wèn)到過(guò)有沒(méi)有使用過(guò)的python協(xié)程?
協(xié)程是什么?
協(xié)程是一種用戶態(tài)輕量級(jí),是實(shí)現(xiàn)并發(fā)編程的一種方式。說(shuō)到并發(fā),就能想到了多線程 / 多進(jìn)程模型,是解決并發(fā)問(wèn)題的經(jīng)典模型之一。
但是隨刻客戶端數(shù)量達(dá)到一定量級(jí),進(jìn)程上下文切換占用了大量的資源,線程也頂不住如此巨大的壓力,對(duì)于多線程應(yīng)用,CPU通過(guò)切片的方式來(lái)切換線程間的執(zhí)行,
線程切換時(shí)需要耗時(shí)(保存狀態(tài),下次繼續(xù))。協(xié)程,則只使用一個(gè)線程,在一個(gè)線程中規(guī)定某個(gè)代碼塊執(zhí)行順序。線程是搶占式的調(diào)度,而協(xié)程是協(xié)同式的調(diào)度,也就是說(shuō),協(xié)程需要自己做調(diào)度。
協(xié)程有什么用?
在別的語(yǔ)言中協(xié)程意義不大,多線程可解決問(wèn)題,但是python因?yàn)樗蠫IL(Global Interpreter Lock 全局解釋器鎖 )在同一時(shí)間只有一個(gè)線程在工作,如果一個(gè)線程里面I/O操作特別多,協(xié)程就比較適用;
在python中多線程的執(zhí)行情況如下圖:
怎么實(shí)現(xiàn)協(xié)程?
python2.7中用生成器實(shí)現(xiàn)協(xié)程
在python3.7以后使用asyncio 和 async / await 的方法實(shí)現(xiàn)協(xié)程,實(shí)現(xiàn)過(guò)程就變得很簡(jiǎn)單
具體說(shuō)明:簡(jiǎn)單的代碼示例
import asyncio
import time
async def sub_function(str):
print(' {}'.format(str))
sleep_time = int(str.split('_')[-1])
await asyncio.sleep(sleep_time)
print('OK {}'.format(str))
async def main(strs):
for str in strs:
await sub_function(str)
# asyncio.run(main()) 作為主程序的入口函數(shù),在程序運(yùn)行周期內(nèi),只調(diào)用一次 asyncio.run
t0 = time.time()
asyncio.run(main(['str_1', 'str_2', 'str_3', 'str_4']))
t1 = time.time()
print("Total time running: %s seconds" %(str(t1 - t0)))
輸出結(jié)果:一共是 10s 和我們順序分析 分別等待 1 2 3 4 秒 好像沒(méi)有什么提升作用,繼續(xù)往下看
執(zhí)行協(xié)程的方法有三種:
1. 入上例所示,用await 來(lái)調(diào)用實(shí)現(xiàn),但是await 執(zhí)行的效果,和 Python 正常執(zhí)行是一樣的,也就是說(shuō)程序會(huì)阻塞在這里,進(jìn)入被調(diào)用的協(xié)程函數(shù),執(zhí)行完畢返回后再繼續(xù),而這也是 await 的字面意思。
await 是同步調(diào)用,相當(dāng)于我們用異步接口寫了個(gè)同步代碼,所以運(yùn)行時(shí)間沒(méi)有得到提升。
2. 上栗的異步想過(guò)沒(méi)有體現(xiàn)出來(lái),接下來(lái)我們使用另一個(gè)概念 task
async defsub_function(str):print('{}'.format(str))
sleep_time= int(str.split('_')[-1])
await asyncio.sleep(sleep_time)print('OK {}'.format(str))
asyncdefmain(strs):
#
tasks= [asyncio.create_task(sub_function(str)) for str instrs]for task intasks:
await task#asyncio.run(main()) 作為主程序的入口函數(shù),在程序運(yùn)行周期內(nèi),只調(diào)用一次 asyncio.run
t0 =time.time()
asyncio.run(main(['str_1', 'str_2', 'str_3', 'str_4']))
t1=time.time()print("Total time running: %s seconds" %(str(t1 - t0)))
們有了協(xié)程對(duì)象后,便可以通過(guò) asyncio.create_task 來(lái)創(chuàng)建任務(wù)。任務(wù)創(chuàng)建后很快就會(huì)被調(diào)度執(zhí)行,
這樣,我們的代碼也不會(huì)阻塞在任務(wù)這里。用for task in tasks: await task 即可。這次,你就看到效果了吧,結(jié)果顯示,運(yùn)行總時(shí)長(zhǎng)等于運(yùn)行時(shí)間最長(zhǎng)一句。
運(yùn)行結(jié)果:
3. 上面的task也可以用下面的方式寫:
async defmain(strs):
tasks= [asyncio.create_task(sub_function(str)) for str instrs]
await asyncio.gather(*tasks)
唯一要注意的是,*tasks 解包列表,將列表變成了函數(shù)的參數(shù);與之對(duì)應(yīng)的是, ** dict 將字典變成了函數(shù)的參數(shù)。
在實(shí)際中,我們會(huì)遇到接口超時(shí),我們就需要取消的情況,這種情況該怎么處理呢?再進(jìn)一步,如果某些協(xié)程運(yùn)行時(shí)出現(xiàn)錯(cuò)誤,又該怎么處理呢?
importtimeimportasyncio
asyncdefworker_1():
await asyncio.sleep(1)return 1asyncdefworker_2():
await asyncio.sleep(2)return 2 /0
asyncdefworker_3():
await asyncio.sleep(3)return 3asyncdefmain():
task_1=asyncio.create_task(worker_1())
task_2=asyncio.create_task(worker_2())
task_3=asyncio.create_task(worker_3())
await asyncio.sleep(2)
task_3.cancel()
res= await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)print(res)
t0=time.time()
asyncio.run(main())
t1=time.time()print("Total time running: %s seconds" %(str(t1 - t0)))
要注意return_exceptions=True這行代碼。這個(gè)參數(shù)默認(rèn)值為False, 如果不設(shè)置這個(gè)參數(shù),錯(cuò)誤就會(huì)完整地 throw 到我們這個(gè)執(zhí)行層,從而需要 try except 來(lái)捕捉,
這也就意味著其他還沒(méi)被執(zhí)行的任務(wù)會(huì)被全部取消掉。為了避免這個(gè)局面,我們將 return_exceptions 設(shè)置為 True 即可。
線程能實(shí)現(xiàn)的,協(xié)程都能做到.
總結(jié)
以上是生活随笔為你收集整理的python协程实时输出_python协程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: python 文本分析库_Python有
- 下一篇: c 输出空格_Python编程:案例详解