python grequests极限_Python grequests闲话
前段時(shí)間看到這個(gè)grequests庫,感覺還是蠻有意思的,所以今天來對(duì)這個(gè)庫拆解拆解。這個(gè)庫是崇拜的大神kennethreitz寫的。Github地址:https://github.com/kennethreitz/grequests
首先看到文檔上給的示例:
import grequests
urls = [
'http://www.heroku.com',
'http://python-tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://fakedomain/',
'http://kennethreitz.com'
]
# 創(chuàng)建沒有發(fā)送的request集合
rs = (grequests.get(u) for u in urls)
# 發(fā)送
grequests.map(rs)
# 為了防止超時(shí)和異常發(fā)生,可以指定一個(gè)異常處理器
def exception_handler(request, exception):
print("Request failed")
reqs = [
grequests.get('http://httpbin.org/delay/1', timeout=0.001),
grequests.get('http://fakedomain/'),
grequests.get('http://httpbin.org/status/500')]
grequests.map(reqs, exception_handler=exception_handler)
另外,可以使用imap來提高性能
根據(jù)這個(gè)示例,我們來看看源代碼
AsyncRequest
首先來看grequests.get:
get = partial(AsyncRequest, 'GET')
options = partial(AsyncRequest, 'OPTIONS')
它和其他諸多HTTP方法一樣,只是一個(gè)快捷方式,其本質(zhì)是調(diào)用了AsyncRequest,看到這個(gè)名字就應(yīng)該知道是異步的Request,所以應(yīng)該是對(duì)普通的Request做了封裝和修改:
class AsyncRequest(object):
""" 異步的Request,接收和Session.request相同的參數(shù),還有一些額外的參數(shù)
session: 發(fā)送請求的session
callback: 在返回對(duì)象上的回調(diào)函數(shù),和傳遞hooks={'response': callback}一樣
"""
def __init__(self, method, url, **kwargs):
#: Request method
self.method = method
#: URL to request
self.url = url
#: Associated ``Session``
self.session = kwargs.pop('session', None)
if self.session is None:
# requests里的Session對(duì)象
self.session = Session()
callback = kwargs.pop('callback', None)
if callback:
kwargs['hooks'] = {'response': callback}
#: The rest arguments for ``Session.request``
self.kwargs = kwargs
#: Resulting `` Response``
self.response = None
可以看到使用partial(AsyncRequest, 'GET')會(huì)使得method默認(rèn)為GET,然后再rs = (grequests.get(u) for u in urls)會(huì)形成一個(gè)生成器,里面生成AsyncRequest對(duì)象。下面來看看這些request是如何發(fā)送的:
map
之后程序會(huì)調(diào)用grequests.map(rs),那么我們來看看map:
def map(requests, stream=False, size=None, exception_handler=None, gtimeout=None):
"""并發(fā)的將Requests列表轉(zhuǎn)換成響應(yīng)
requests: Request對(duì)象的集合
stream: 如果為True,那么響應(yīng)內(nèi)容不會(huì)立即下載
size: 指定同時(shí)發(fā)起的請求數(shù)目,如果為None,就不會(huì)有限制
exception_handler: 回調(diào)函數(shù),當(dāng)異常發(fā)生的時(shí)候調(diào)用,參數(shù)是Request和Exception
gtimeout: Gevent合并所有的超時(shí)時(shí)間,單位為秒(與每個(gè)request的超時(shí)時(shí)間無關(guān))
"""
# 將生成器直接轉(zhuǎn)換成list
requests = list(requests)
# gevent的Pool對(duì)象,是時(shí)候研究一波gevent了
pool = Pool(size) if size else None
# 調(diào)用send函數(shù)來發(fā)送請求
jobs = [send(r, pool, stream=stream) for r in requests]
# 等待所有的greenlet處理單元結(jié)束運(yùn)行
gevent.joinall(jobs, timeout=gtimeout)
ret = []
# 處理所有的請求響應(yīng),并且處理異常
for request in requests:
if request.response is not None:
ret.append(request.response)
# 如果有異常處理器并且request有異常進(jìn)行處理
elif exception_handler and hasattr(request, 'exception'):
ret.append(exception_handler(request, request.exception))
else:
# 否則結(jié)果置為None
ret.append(None)
return ret
可以看到map函數(shù)很簡單,大體流程就是建立了greenlet處理器池,然后對(duì)每個(gè)request進(jìn)行調(diào)用,然后等待結(jié)束,最后得到響應(yīng)并且處理響應(yīng)。
# 如果給定size則創(chuàng)建greenlet池,否則為None
pool = Pool(size) if size else None
jobs = [send(r, pool, stream=stream) for r in requests]
gevent.joinall(jobs, timeout=gtimeout)
所以問題的關(guān)鍵還是在于gevent那幾行的調(diào)用,創(chuàng)建管理greenlet的池,用來限制并發(fā),創(chuàng)建好了之后,調(diào)用這個(gè)pool然后去發(fā)送請求,最后等待所有的greenlet結(jié)束。
Gevent的并發(fā)
無論創(chuàng)建還是沒有創(chuàng)建池,最終是要調(diào)用send方法的,來看看這個(gè)函數(shù):
def send(r, pool=None, stream=False):
"""使用指定的pool發(fā)送request對(duì)象,如果pool沒有指定,這個(gè)方法就會(huì)阻塞,Pools很有用,因?yàn)槟憧梢灾付úl(fā)限制"""
if pool is not None:
return pool.spawn(r.send, stream=stream)
return gevent.spawn(r.send, stream=stream)
gevent.spawn
gevent.spawn創(chuàng)建一個(gè)新的Greenlet對(duì)象,并且排定運(yùn)行調(diào)用function(*args, **kwargs),這個(gè)可以使用gevent.spawn或者是Greenlet.spawn,其實(shí)gevent.spawn就是Greenlet.spawn,并且最后會(huì)調(diào)用Greenlet的類方法,首先實(shí)例化一個(gè)對(duì)象,然后調(diào)用start方法,所以也相當(dāng)于調(diào)用Greenlet(*args, **kwargs)
這也是類方法的一個(gè)用法,另外的實(shí)例化對(duì)象的方法
@classmethod
def spawn(cls, *args, **kwargs):
g = cls(*args, **kwargs)
g.start()
return g
pool.spawn
這個(gè)方法使用給定的參數(shù)開始一個(gè)新的greenlet,通常是傳遞給Greenlet構(gòu)造函數(shù),并且將其加入這個(gè)pool管理的greenlets集合
Pool是Group的子類,提供了限制并發(fā)的方法,其spawn方法在greenlets數(shù)目達(dá)到上限的時(shí)候阻塞,直到有一個(gè)可用的greenlet。
這個(gè)方法也是使用pool實(shí)例為Greenlet創(chuàng)建一個(gè)實(shí)例,然后start it
def spawn(self, *args, **kwargs):
greenlet = self.greenlet_class(*args, **kwargs)
self.start(greenlet)
return greenlet
r.send
這個(gè)是AsyncRequest的send方法,比較簡單,就是發(fā)送請求,等待響應(yīng)。
def send(self, **kwargs):
merged_kwargs = {}
merged_kwargs.update(self.kwargs)
merged_kwargs.update(kwargs)
try:
self.response = self.session.request(self.method, self.url, **merged_kwargs)
except Exception as e:
self.exception = e
self.traceback = traceback.format_exc()
return self
imap
imap據(jù)說可以提高性能,快來看看吧:
def imap(requests, stream=False, size=2, exception_handler=None):
"""并發(fā)的將Request對(duì)象的生成器轉(zhuǎn)換成響應(yīng)的生成器。
requests: Request對(duì)象的生成器
stream: 如果為True,則不會(huì)立即自動(dòng)下載
size: 同時(shí)發(fā)起的請求數(shù),默認(rèn)為2
exception_handler: 當(dāng)發(fā)生異常時(shí)候回調(diào)
"""
pool = Pool(size)
def send(r):
return r.send(stream=stream)
for request in pool.imap_unordered(send, requests):
if request.response is not None:
yield request.response
elif exception_handler:
exception_handler(request, request.exception)
pool.join()
可以看到這個(gè)函數(shù)主要是使用了pool.imap_unordered,其實(shí)pool還有一個(gè)方法是imap
pool.imap
和itertools.imap()是一致的,itertools.imap()可以用于迭代無窮序列,比如itertools.imap(lambda x, y: x * y, [10, 20, 30], itertools.count(1)),如果兩個(gè)序列長短不一致,以短的為準(zhǔn),并且imap實(shí)現(xiàn)了惰性計(jì)算,類似生成器。
pool.imap可以并行運(yùn)行,按順序從迭代對(duì)象中取出元素迭代,應(yīng)用在函數(shù)上,然后收集結(jié)果。
如果限制了可以同時(shí)進(jìn)行的greenlets數(shù)量,那么最多只有這么多個(gè)任務(wù)同時(shí)進(jìn)行。
pool.imap_unordered
和imap一樣,返回的結(jié)果順序是隨意的,比起imap更加輕量級(jí),如果順序不重要的話,首先應(yīng)該選用這個(gè)。
join
等待這個(gè)group的greenlets都運(yùn)行完,如果這個(gè)group沒有g(shù)reenlet的話,立即返回
可以看到,如果不要求順序的話,imap_unordered會(huì)比imap更加高效,同時(shí)imap版本肯定比map版本性能好,因?yàn)閙ap版本必須全部運(yùn)行完才能拿數(shù)據(jù),但是imap版本只要有g(shù)reenlet有結(jié)果就可以取出來。
小結(jié)
這個(gè)庫就這么多內(nèi)容,其實(shí)主要是使用gevent封裝了一層requests,所以核心就是使用gevent,gevent怎么用,如何用,待我繼續(xù)研究。
不過通過看這個(gè)庫,也了解到了簡單的gevent的用法
總結(jié)
以上是生活随笔為你收集整理的python grequests极限_Python grequests闲话的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑键盘上的Alt键有何妙用
- 下一篇: python css selector_