python asyncio 并发编程_asyncio并发编程
一. 事件循環(huán)
1.注:
實(shí)現(xiàn)搭配:事件循環(huán)+回調(diào)(驅(qū)動(dòng)生成器【協(xié)程】)+epoll(IO多路復(fù)用),asyncio是Python用于解決異步編程的一整套解決方案;
基于asynico:tornado,gevent,twisted(Scrapy,django channels),tornado(實(shí)現(xiàn)了web服務(wù)器,可以直接部署,真正部署還是要加nginx),django,flask(uwsgi,gunicorn+nginx部署)
1 importasyncio2 importtime3 async defget_html(url):4 print('start get url')5 #不能直接使用time.sleep,這是阻塞的函數(shù),如果使用time在并發(fā)的情況有多少個(gè)就有多少個(gè)2秒
6 await asyncio.sleep(2)7 print('end get url')8 if __name__=='__main__':9 start_time=time.time()10 loop=asyncio.get_event_loop()11 task=[get_html('www.baidu.com') for i in range(10)]12 loop.run_until_complete(asyncio.wait(task))13 print(time.time()-start_time)
View Code
2.如何獲取協(xié)程的返回值(和線程池類似):
1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8 return "HAHA"
9 #需要接收task,如果要接收其他的參數(shù)就需要用到partial(偏函數(shù)),參數(shù)需要放到前面
10 defcallback(url,future):11 print(url+'success')12 print('send email')13 if __name__=='__main__':14 loop=asyncio.get_event_loop()15 task=loop.create_task(get_html('www.baidu.com'))16 #原理還是獲取event_loop,然后調(diào)用create_task方法,一個(gè)線程只有一個(gè)loop
17 #get_future=asyncio.ensure_future(get_html('www.baidu.com'))也可以
18 #loop.run_until_complete(get_future)
19 #run_until_complete可以接收future類型,task類型(是future類型的一個(gè)子類),也可以接收可迭代類型
20 task.add_done_callback(partial(callback,'www.baidu.com'))21 loop.run_until_complete(task)22 print(task.result())
View Code
3.wait和gather的區(qū)別:
3.1wait簡單使用:
1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8
9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 tasks=[get_html('www.baidu.com') for i in range(10)]12 #wait和線程的wait相似
13 loop.run_until_complete(asyncio.wait(tasks))
View Code
協(xié)程的wait和線程的wait相似,也有timeout,return_when(什么時(shí)候返回)等參數(shù)
3.2gather簡單使用:
1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8
9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 tasks=[get_html('www.baidu.com') for i in range(10)]12 #gather注意加*,這樣就會(huì)變成參數(shù)
13 loop.run_until_complete(asyncio.gather(*tasks))
View Code
3.3gather和wait的區(qū)別:(定制性不強(qiáng)時(shí)可以優(yōu)先考慮gather)
gather更加高層,可以將tasks分組;還可以成批的取消任務(wù)
1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8
9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 groups1=[get_html('www.baidu.com') for i in range(10)]12 groups2=[get_html('www.baidu.com') for i in range(10)]13 #gather注意加*,這樣就會(huì)變成參數(shù)
14 loop.run_until_complete(asyncio.gather(*groups1,*groups2))15 #這種方式也可以
16 #groups1 = [get_html('www.baidu.com') for i in range(10)]
17 #groups2 = [get_html('www.baidu.com') for i in range(10)]
18 #groups1=asyncio.gather(*groups1)
19 #groups2=asyncio.gather(*groups2)
20 #取消任務(wù)
21 #groups2.cancel()
22 #loop.run_until_complete(asyncio.gather(groups1,groups2))
View Code
二. 協(xié)程嵌套
1.run_util_complete()源碼:和run_forever()區(qū)別并不大,只是可以在運(yùn)行完指定的協(xié)程后可以把loop停止掉,而run_forever()不會(huì)停止
2.loop會(huì)被放在future里面,future又會(huì)放在loop中
3.取消future(task):
3.1子協(xié)程調(diào)用原理:
官網(wǎng)例子:
解釋:?await相當(dāng)于yield from,loop運(yùn)行協(xié)程print_sum(),print_sum又會(huì)去調(diào)用另一個(gè)協(xié)程compute,run_util_complete會(huì)把協(xié)程print_sum注冊(cè)到loop中。
1.event_loop會(huì)為print_sum創(chuàng)建一個(gè)task,通過驅(qū)動(dòng)task執(zhí)行print_sum(task首先會(huì)進(jìn)入pending【等待】的狀態(tài));
2.print_sum直接進(jìn)入字協(xié)程的調(diào)度,這個(gè)時(shí)候轉(zhuǎn)向執(zhí)行另一個(gè)協(xié)程(compute,所以print_sum變?yōu)閟uspended【暫停】狀態(tài));
3.compute這個(gè)協(xié)程首先打印,然后去調(diào)用asyncio的sleep(此時(shí)compute進(jìn)入suspende的狀態(tài)【暫停】),直接把返回值返回給Task(沒有經(jīng)過print_sum,相當(dāng)于yield from,直接在調(diào)用方和子生成器通信,是由委托方print_sum建立的通道);
4.Task會(huì)告訴Event_loop暫停,Event_loop等待一秒后,通過Task喚醒(越過print_sum和compute建立一個(gè)通道);
5.compute繼續(xù)執(zhí)行,變?yōu)闋顟B(tài)done【執(zhí)行完成】,然后拋一個(gè)StopIteration的異常,會(huì)被await語句捕捉到,然后提取出1+2=3的值,進(jìn)入print_sum,print_sum也被激活(因?yàn)閽伋隽薙topIteration的異常被print_sum捕捉),print_sum執(zhí)行完也會(huì)被標(biāo)記為done的狀態(tài),同時(shí)拋出StopIteration會(huì)被Task接收
三. call_soon、call_later、call_at、call_soon_threadsafe
1.call_soon:可以直接接收函數(shù),而不用協(xié)程
1 importasyncio2 #函數(shù)
3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通過該函數(shù)暫停
6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 #可以直接傳遞函數(shù),而不用協(xié)程,call_soon其實(shí)就是調(diào)用的call_later,時(shí)間為0秒
11 loop.call_soon(callback,2)12 loop.call_soon(stoploop,loop)13 #不能用run_util_complete(因?yàn)椴皇菂f(xié)程),run_forever找到call_soon一直運(yùn)行
14 loop.run_forever()
View Code
2.call_later:可以指定多長時(shí)間后啟動(dòng)(實(shí)際調(diào)用call_at,時(shí)間不是傳統(tǒng)的時(shí)間,而是loop內(nèi)部的時(shí)間)
1 importasyncio2 #函數(shù)
3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通過該函數(shù)暫停
6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 loop.call_later(3,callback,1)11 loop.call_later(1, callback, 2)12 loop.call_later(1, callback, 2)13 loop.call_later(1, callback, 2)14 loop.call_soon(callback,4)15 #loop.call_soon(stoploop,loop)
16 #不能用run_util_complete(因?yàn)椴皇菂f(xié)程),run_forever找到call_soon一直運(yùn)行
17 loop.run_forever()
View Code
3.call_at:指定某個(gè)時(shí)間執(zhí)行
1 importasyncio2 #函數(shù)
3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通過該函數(shù)暫停
6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 now=loop.time()11 print(now)12 loop.call_at(now+3,callback,1)13 loop.call_at(now+1, callback, 0.5)14 loop.call_at(now+1, callback, 2)15 loop.call_at(now+1, callback, 2)16 #loop.call_soon(stoploop,loop)
17 #不能用run_util_complete(因?yàn)椴皇菂f(xié)程),run_forever找到call_soon一直運(yùn)行
18 loop.run_forever()
View Code
4.call_soon_threadsafe:
線程安全的方法,不僅能解決協(xié)程,也能解決線程,進(jìn)程,和call_soon幾乎一致,多了self._write_to_self(),和call_soon用法一致
四. ThreadPoolExecutor+asyncio(線程池和協(xié)程結(jié)合)
1.使用run_in_executor:就是把阻塞的代碼放進(jìn)線程池運(yùn)行,性能并不是特別高,和多線程差不多
1 #使用多線程,在協(xié)程中集成阻塞io
2 importasyncio3 importsocket4 from urllib.parse importurlparse5 from concurrent.futures importThreadPoolExecutor6 importtime7 defget_url(url):8 #通過socket請(qǐng)求html
9 url=urlparse(url)10 host=url.netloc11 path=url.path12 if path=="":13 path="/"
14 #建立socket連接
15 client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)16 client.connect((host,80))17 #向服務(wù)器發(fā)送數(shù)據(jù)
18 client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))19 #將數(shù)據(jù)讀取完
20 data=b""
21 whileTrue:22 d=client.recv(1024)23 ifd:24 data+=d25 else:26 break
27 #會(huì)將header信息作為返回字符串
28 data=data.decode('utf8')29 print(data.split('\r\n\r\n')[1])30 client.close()31
32 if __name__=='__main__':33 start_time=time.time()34 loop=asyncio.get_event_loop()35 excutor=ThreadPoolExecutor()36 tasks=[]37 for i in range(20):38 task=loop.run_in_executor(excutor,get_url,'http://www.baidu.com')39 tasks.append(task)40 loop.run_until_complete(asyncio.wait(tasks))41 print(time.time()-start_time)
View Code
五. asyncio模擬http請(qǐng)求
注:asyncio目前沒有提供http協(xié)議的接口
1 #asyncio目前沒有提供http協(xié)議的接口
2 importasyncio3 from urllib.parse importurlparse4 importtime5
6
7 async defget_url(url):8 #通過socket請(qǐng)求html
9 url =urlparse(url)10 host =url.netloc11 path =url.path12 if path == "":13 path = "/"
14 #建立socket連接(比較耗時(shí)),非阻塞需要注冊(cè),都在open_connection中實(shí)現(xiàn)了
15 reader, writer = await asyncio.open_connection(host, 80)16 #向服務(wù)器發(fā)送數(shù)據(jù),unregister和register都實(shí)現(xiàn)了
17 writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))18 #讀取數(shù)據(jù)
19 all_lines =[]20 #源碼實(shí)現(xiàn)較復(fù)雜,有__anext__的魔法函數(shù)(協(xié)程)
21 async for line inreader:22 data = line.decode('utf8')23 all_lines.append(data)24 html = '\n'.join(all_lines)25 returnhtml26
27
28 async defmain():29 tasks =[]30 for i in range(20):31 url = "http://www.baidu.com/"
32 tasks.append(asyncio.ensure_future(get_url(url)))33 for task inasyncio.as_completed(tasks):34 result =await task35 print(result)36
37
38 if __name__ == '__main__':39 start_time =time.time()40 loop =asyncio.get_event_loop()41 #tasks=[get_url('http://www.baidu.com') for i in range(10)]
42 #在外部獲取結(jié)果,保存為future對(duì)象
43 #tasks = [asyncio.ensure_future(get_url('http://www.baidu.com')) for i in range(10)]
44 #loop.run_until_complete(asyncio.wait(tasks))
45 #for task in tasks:
46 #print(task.result())
47 #執(zhí)行完一個(gè)打印一個(gè)
48 loop.run_until_complete(main())49 print(time.time() - start_time)
View Code
六. future和task
1.future:協(xié)程中的future和線程池中的future相似
future中的方法,都和線程池中的相似
set_result方法
不像線程池中運(yùn)行完直接運(yùn)行代碼(這是單線程的,會(huì)調(diào)用call_soon方法)
2.task:是future的子類,是future和協(xié)程之間的橋梁
會(huì)首先啟動(dòng)_step方法
該方法會(huì)首先啟動(dòng)協(xié)程,把返回值(StopIteration的值)做處理,用于解決協(xié)程和線程不一致的地方
七. asyncio同步和通信
1.單線程協(xié)程不需要鎖:
1 importasyncio2 total=03 async defadd():4 globaltotal5 for i in range(1000000):6 total+=1
7
8
9 async defdecs():10 globaltotal11 for i in range(1000000):12 total-=1
13 if __name__=='__main__':14 loop=asyncio.get_event_loop()15 tasks=[add(),decs()]16 loop.run_until_complete(asyncio.wait(tasks))17 print(total)
View Code
2.某種情況需要鎖:
asyncio中的鎖(同步機(jī)制)
1 importasyncio,aiohttp2 #這是并沒有調(diào)用系統(tǒng)的鎖,只是簡單的自己實(shí)現(xiàn)(注意是非阻塞的),Queue也是非阻塞的,都用了yield from,不用用到condition【單線程】】
3 #Queue還可以限流,如果只需要通信還可以直接使用全局變量否則可以
4 from asyncio importLock,Queue5 catche={}6 lock=Lock()7 async defget_stuff():8 #實(shí)現(xiàn)了__enter__和__exit__兩個(gè)魔法函數(shù),可以用with
9 #with await lock:
10 #更明確的語法__aenter__和__await__
11 async with lock:12 #注意加await,是一個(gè)協(xié)程
13 #await lock.acquire()
14 for url incatche:15 returncatche[url]16 #異步的接收
17 stauff=aiohttp.request('Get',url)18 catche[url]=stauff19 returnstauff20 #release是一個(gè)簡單的函數(shù)
21 #lock.release()
22
23 async defparse_stuff():24 stuff=await get_stuff()25
26 async defuse_stuff():27 stuff=await get_stuff()28 #如果沒有同步機(jī)制,就會(huì)發(fā)起兩次請(qǐng)求(這里就可以加一個(gè)同步機(jī)制)
29 tasks=[parse_stuff(),use_stuff()]30 loop=asyncio.get_event_loop()31 loop.run_until_complete(asyncio.wait(tasks))
View Code
八. aiohttp實(shí)現(xiàn)高并發(fā)爬蟲
1 #asyncio去重url,入庫(異步的驅(qū)動(dòng)aiomysql)
2 importaiohttp3 importasyncio4 importre5 importaiomysql6 from pyquery importpyquery7
8 start_url = 'http://www.jobbole.com/'
9 waiting_urls =[]10 seen_urls =[]11 stopping =False12 #限制并發(fā)數(shù)
13 sem=asyncio.Semaphore(3)14
15
16 async deffetch(url, session):17 async with sem:18 await asyncio.sleep(1)19 try:20 async with session.get(url) as resp:21 print('url_status:{}'.format(resp.status))22 if resp.status in [200, 201]:23 data =await resp.text()24 returndata25 exceptException as e:26 print(e)27
28
29 defextract_urls(html):30 '''
31 解析無io操作32 '''
33 urls =[]34 pq =pyquery(html)35 for link in pq.items('a'):36 url = link.attr('href')37 if url and url.startwith('http') and url not inurls:38 urls.append(url)39 waiting_urls.append(url)40 returnurls41
42
43 async definit_urls(url, session):44 html =await fetch(url, session)45 seen_urls.add(url)46 extract_urls(html)47
48
49 async defhandle_article(url, session, pool):50 '''
51 處理文章52 '''
53 html =await fetch(url, session)54 seen_urls.append(url)55 extract_urls(html)56 pq =pyquery(html)57 title = pq('title').text()58 async with pool.acquire() as conn:59 async with conn.cursor() as cur:60 insert_sql = "insert into Test(title) values('{}')".format(title)61 await cur.execute(insert_sql)62
63
64 async defconsumer(pool):65 with aiohttp.CLientSession() as session:66 while notstopping:67 if len(waiting_urls) ==0:68 await asyncio.sleep(0.5)69 continue
70 url =waiting_urls.pop()71 print('start url:' + 'url')72 if re.match('http://.*?jobble.com/\d+/', url):73 if url not inseen_urls:74 asyncio.ensure_future(handle_article(url, session, pool))75 await asyncio.sleep(30)76 else:77 if url not inseen_urls:78 asyncio.ensure_future(init_urls(url, session))79
80
81 async defmain():82 #等待mysql連接好
83 pool = aiomysql.connect(host='localhost', port=3306, user='root',84 password='112358', db='my_aio', loop=loop, charset='utf8', autocommit=True)85 async with aiohttp.CLientSession() as session:86 html =await fetch(start_url, session)87 seen_urls.add(start_url)88 extract_urls(html)89 asyncio.ensure_future(consumer(pool))90
91 if __name__ == '__main__':92 loop =asyncio.get_event_loop()93 asyncio.ensure_future(loop)94 loop.run_forever(main(loop))
View Code
總結(jié)
以上是生活随笔為你收集整理的python asyncio 并发编程_asyncio并发编程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: oracle数据库物理结构包含,Orac
- 下一篇: java 不同包_Java项目中不同包的