gj13 asyncio并发编程
13.1 事件循環(huán)
asyncio
包含各種特定系統(tǒng)實(shí)現(xiàn)的模塊化事件循環(huán)
傳輸和協(xié)議抽象
對TCP、UDP、SSL、子進(jìn)程、延時(shí)調(diào)用以及其他的具體支持
模仿futures模塊但適用于事件循環(huán)使用的Future類
基于 yield from 的協(xié)議和任務(wù),可以讓你用順序的方式編寫并發(fā)代碼
必須使用一個(gè)將產(chǎn)生阻塞IO的調(diào)用時(shí),有接口可以把這個(gè)事件轉(zhuǎn)移到線程池
模仿threading模塊中的同步原語、可以用在單線程內(nèi)的協(xié)程之間
事件循環(huán)+回調(diào)(驅(qū)動生成器)+epoll(IO多路復(fù)用)
asyncio是python用于解決異步io編程的一整套解決方案
tornado、gevent、twisted(scrapy, django channels)
torando(實(shí)現(xiàn)web服務(wù)器), django+flask(uwsgi, gunicorn+nginx)
tornado可以直接部署, nginx+tornado
13.2 協(xié)程嵌套
13.3 call_soon、call_later、call_at、call_soon_threadsafe
import asynciodef callback(sleep_times, loop):print("success time {}".format(loop.time()))def stoploop(loop):loop.stop()# call_later, call_at if __name__ == "__main__":loop = asyncio.get_event_loop()# 馬上執(zhí)行隊(duì)列里面的task# loop.call_soon(callback, 4, loop)# loop.call_soon(stoploop, loop)# call_later() 等待多少秒后執(zhí)行# loop.call_later(2, callback, 2, loop)# loop.call_later(1, callback, 1, loop)# loop.call_later(3, callback, 3, loop)# call_at() 在某一時(shí)刻執(zhí)行now = loop.time()loop.call_at(now+2, callback, 2, loop)loop.call_at(now+1, callback, 1, loop)loop.call_at(now+3, callback, 3, loop)loop.run_forever()# loop.call_soon_threadsafe() view13.4 ThreadPoolExecutor+asyncio
# 使用多線程:在協(xié)程中集成阻塞io # 數(shù)據(jù)庫等阻塞式IO import asyncio from concurrent.futures import ThreadPoolExecutor import socket from urllib.parse import urlparsedef get_url(url):# 通過socket請求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = "/"# 建立socket連接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# client.setblocking(False)client.connect((host, 80)) # 阻塞不會消耗cpu# 不停的詢問連接是否建立好, 需要while循環(huán)不停的去檢查狀態(tài)# 做計(jì)算任務(wù)或者再次發(fā)起其他的連接請求client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))data = b""while True:d = client.recv(1024)if d:data += delse:breakdata = data.decode("utf8")html_data = data.split("\r\n\r\n")[1]print(html_data)client.close()if __name__ == "__main__":import timestart_time = time.time()loop = asyncio.get_event_loop()executor = ThreadPoolExecutor(3) # 線程池tasks = []for url in range(20):url = "http://www.baidu.com/s?wd={}/".format(url)task = loop.run_in_executor(executor, get_url, url) # 將阻塞的放到執(zhí)行器里面tasks.append(task)loop.run_until_complete(asyncio.wait(tasks))print("last time:{}".format(time.time() - start_time))# 將線程池直接應(yīng)用到協(xié)程里面13.5 asyncio模擬http請求
# coding=utf-8 # asyncio 沒有提供http協(xié)議的接口 aiohttp import asyncio from urllib.parse import urlparseasync def get_url(url):# 通過socket請求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = "/"# 建立socket連接reader, writer = await asyncio.open_connection(host, 80)writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))all_lines = []async for raw_line in reader:data = raw_line.decode("utf8")all_lines.append(data)html = "\n".join(all_lines)return htmlasync def main():tasks = []for url in range(20):url = "http://www.baidu.com/s?wd={}/".format(url)tasks.append(asyncio.ensure_future(get_url(url)))for task in asyncio.as_completed(tasks):result = await taskprint(result)if __name__ == "__main__":import timestart_time = time.time()loop = asyncio.get_event_loop()loop.run_until_complete(main())print('last time:{}'.format(time.time() - start_time))13.6 future和task
future 結(jié)果容器
task 是 future 的子類,協(xié)程和future之間的橋梁,啟動協(xié)程
13.7 asyncio同步和通信
total = 0async def add():# 1,dosomething1# 2.io操作# 1.dosomething3global totalfor i in range(100000):total += 1async def desc():global totalfor i in range(100000):total -= 1if __name__ == "__main__":import asynciotasks = [add(), desc()]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))print(total) # 不需要鎖的情況13.8 aiohttp實(shí)現(xiàn)高并發(fā)爬蟲
與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的gj13 asyncio并发编程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: gj12-2 协程和异步io
- 下一篇: drf1 rest restful规范