Python Gevent – 高性能的 Python 并发框架
?
From:http://www.xuebuyuan.com/1604603.html
Gevent 指南(英文):http://sdiehl.github.io/gevent-tutorial
Gevent 指南(中文):http://xlambda.com/gevent-tutorial
Gevent 指南(中文)下載地址:http://download.csdn.net/download/freeking101/9924351
初試Gevent – 高性能的Python并發框架:http://python.jobbole.com/87041
gevent 官網文檔:http://www.gevent.org/contents.html
gevent For the Working Python Developer:http://sdiehl.github.io/gevent-tutorial/
Python 的協程庫 greenlet 和 gevent:https://blog.csdn.net/freeking101/article/details/97276736
?
? ? ? ? 在 Python 里,按照官方解釋 greenlet 是輕量級的并行編程,gevent 就是利用 greenlet 實現的基于協程(coroutine)的 python 的網絡 library,通過使用greenlet提供了一個在libev事件循環頂部的高級別并發API。即 gevent 是對 greenlet 的高級封裝。
主要特性有以下幾點:
libevent 是一個事件分發引擎,greenlet 提供了輕量級線程的支持,gevent 就是基于這兩個的一個專門處理網絡邏輯的并行庫。
原理:
? ? ? ? 程序的重要部分是將任務函數封裝到 gevent.spawn。初始化的 greenlet 列表存放在數組 threads 中,此數組被傳給 gevent.joinall 函數,gevent.joinall 會阻塞當前流程,并執行所有給定的 greenlet,執行流程只會在所有greenlet執行完后才會繼續向下走。?gevent 實現了python 標準庫里面大部分的阻塞式系統調用,包括 socket、ssl、threading 和 select 等模塊,而將這些阻塞式調用變為協作式運行(參見猴子補丁部分)。
?
猴子補丁 Monkey Patch:?
- (1)猴子補丁的由來 。猴子補丁的這個叫法起源于 Zope 框架,大家在修正 Zope 的 Bug 的時候經常在程序后面追加更新部分,這些被稱作是 “雜牌軍補丁(guerillapatch)”,后來 guerilla 就漸漸的寫成了 gorllia(猩猩),再后來就寫了 monkey(猴子),所以猴子補丁的叫法是這么莫名其妙的得來的。?后來在動態語言中,不改變源代碼而對功能進行追加和變更,統稱為“猴子補丁”。所以猴子補丁并不是 Python 中專有的。猴子補丁這種東西充分利用了動態語言的靈活性,可以對現有的語言Api 進行追加,替換,修改 Bug,甚至性能優化等等。?使用猴子補丁的方式,gevent 能夠修改標準庫里面大部分的阻塞式系統調用,包括 socket、ssl、threading 和 select 等模塊,而變為協作式運行。也就是通過猴子補丁的 monkey.patch_xxx() 來將 python 標準庫中 模塊 或 函數 改成 gevent 中的響應的具有協程的協作式對象。這樣在不改變原有代碼的情況下,將應用的阻塞式方法,變成協程式的。?
- (2)猴子補丁使用時的注意事項 。猴子補丁的功能很強大,但是也帶來了很多的風險,尤其是像 gevent 這種直接進行 API替換的補丁,整個 Python 進程所使用的模塊都會被替換,可能自己的代碼能 hold 住,但是其它第三方庫,有時候問題并不好排查,即使排查出來也是很棘手,所以,就像松本建議的那樣,如果要使用猴子補丁,那么只是做功能追加,盡量避免大規模的 API 覆蓋。?雖然猴子補丁仍然是邪惡的(evil),但在這種情況下它是 “有用的邪惡(useful evil)”。
?
1、關于Linux的 epoll 機制:
epoll是Linux內核為處理大批量文件描述符而作了改進的poll,是Linux下多路復用IO接口select/poll的增強版本,它能顯著提高程序在大量并發連接中只有少量活躍的情況下的系統CPU利用率。epoll的優點:
支持一個進程打開大數目的socket描述符。select的一個進程所打開的FD由FD_SETSIZE的設置來限定,而epoll沒有這個限制,它所支持的FD上限是最大可打開文件的數目,遠大于2048。
IO效率不隨FD數目增加而線性下降:由于epoll只會對“活躍”的socket進行操作,于是,只有”活躍”的socket才會主動去調用 callback函數,其他idle狀態的socket則不會。
使用mmap加速內核與用戶空間的消息傳遞。epoll是通過內核于用戶空間mmap同一塊內存實現的。
內核微調。
2、libev機制
提供了指定文件描述符事件發生時調用回調函數的機制。libev是一個事件循環器:向libev注冊感興趣的事件,比如socket可讀事件,libev會對所注冊的事件的源進行管理,并在事件發生時觸發相應的程序。
示例:
import gevent from gevent import socketurls = ['www.baidu.com', 'www.example.com', 'www.python.org']jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]gevent.joinall(jobs, timeout=2)result = [job.value for job in jobs] print(result)結果:['61.135.169.125', '93.184.216.34', '151.101.228.223']
注解:gevent.spawn() 方法 spawn 一些 jobs,然后通過 gevent.joinall 將 jobs 加入到 微線程 執行隊列中等待其完成,設置超時為 2 秒。執行后的結果通過檢查 gevent.Greenlet.value 值來收集。gevent.socket.gethostbyname() 函數與標準的socket.gethotbyname() 有相同的接口,但它不會阻塞整個解釋器,因此會使得其他的 greenlets 跟隨著無阻的請求而執行。
?
? ? ? ? 協程,gevent,greenlet,eventlet 不了解的可以上網查找資料了解下。至于 協程,進程 和 線程 大家平時了解的都比較多,而協程算是一種輕量級進程,但又不能叫進程,因為操作系統并不知道它的存在。什么意思呢,就是說,協程像是一種在程序級別來模擬系統級別的進程,由于是單進程,并且少了上下文切換,于是相對來說系統消耗很少,而且網上的各種測試也表明,協程確實擁有驚人的速度。并且在實現過程中,協程可以用以前同步思路的寫法,而運行起來確是異步的,也確實很有意思。話說有一種說法就是說進化歷程是:多進程->多線程->異步->協程,暫且不論說的對不對,單從諸多贊譽來看,協程還是有必要理解一下的。
比較慚愧,greenlet 沒怎么看就直接看 gevent,官方文檔還是可以看看的,尤其是源碼里的 examples 都相當不錯,有助于理解gevent 的使用。
gevent 封裝了很多很方便的接口,其中一個就是 monkey
from gevent import monkey monkey.patch_all()這樣兩行,就可以使用 python 以前的 socket 之類的,因為 gevent 已經給你自動轉化了,真是超級方便阿。
而且安裝 gevent 也是很方便,首先安裝依賴 libevent 和 greenlet,再利用 pypi 安裝即可
? ? ? ? ? ? ? ? 安裝 libevent:sudo apt-get install libevent-dev
? ? ? ? ? ? ? ? 安裝 python-dev:sudo apt-get install python-dev
? ? ? ? ? ? ? ? 安裝 gevent:sudo pip install gevent
? ? ? ? ? ? ? ? 安裝 greenlet:sudo pip install greenlet
示例代碼:
import gevent from gevent import monkey# 切換是在 IO 操作時自動完成,所以gevent需要修改Python自帶的一些標準庫 # 這一過程在啟動時通過monkey patch完成 monkey.patch_all()def func_a():while 1:print('-------A-------')# 用來模擬一個耗時操作,注意不是time模塊中的sleep# 每當碰到耗時操作,會自動跳轉至其他協程gevent.sleep(1)def func_b():while 1:print('-------B-------')gevent.sleep(0.5)# gevent.joinall([gevent.spawn(fn) g1 = gevent.spawn(func_a) # 創建一個協程 g2 = gevent.spawn(func_b) g1.join() # 等待協程執行結束 g2.join()select() 函數通常是對各種文件描述符進行輪詢的阻塞調用。
from gevent import select ... select.select([], [], [], 2)gevent 池
示例代碼,測試 gevent 的 任務池
from gevent import poolg_pool = pool.Pool()def a():for i in range(100):g_pool.spawn(b)def b():print('b')g_pool.spawn(a) g_pool.join()示例代碼。程序及注釋如下:
# -*- coding: utf-8 -*- # @Author : # @File : test_gevent.py # @Software: PyCharm # @description : XXXimport gevent import time from gevent import event # 調用 gevent 的 event 子模塊# 三個進程需要定義三個事件 event1,event2,event3,來進行12,23,31循環機制,即進程一,進程二,進程三順序執行def fun1(num, event1, event2): # 固定格式i = 0while i < 10: # 設置循環10次i += 1time.sleep(1) # 睡眠1秒print('進程一:111111111')event2.set() # 將event2值設為Trueevent1.clear() # 將event1值設為Falseevent1.wait() # event1等待,其值為True時才執行def fun2(num, event2, event3):i = 0while i < 10:i += 1time.sleep(1)print('進程二:222222222')event3.set() # 將event3值設為Trueevent2.clear() # 將event2值設為Falseevent2.wait() # event2等待,其值為True時才執行def fun3(num, event3, event1):i = 0while i < 10:i += 1time.sleep(1)print('進程三:333333333')event1.set()event3.clear()event3.wait()if __name__ == "__main__": # 執行調用格式act1 = gevent.event.Event() # 調用event中的Event類,用act1表示act2 = gevent.event.Event()act3 = gevent.event.Event()# 三個進程,act1,act2,act3gevent_list = [] # 建立一個數列,用來存和管理進程# 調用gevent中的Greenlet子模塊,用Greenlet創建進程一g = gevent.Greenlet(fun1, 1, act1, act2)g.start()gevent_list.append(g) # 將進程一加入到Gevents數列print('進程一啟動:')g = gevent.Greenlet(fun2, 2, act2, act3)g.start()gevent_list.append(g)print('進程二啟動:')g = gevent.Greenlet(fun3, 3, act3, act1)g.start()gevent_list.append(g)print('進程三啟動:')print('所有進程都已啟動!')# 調用Greenlet中的joinall函數,將Gevents的進程收集排列gevent.joinall(gevent_list)?
##################################
看看 Gevent :http://www.gevent.org/?
您可以創建幾個 Greenlet 對象為幾個任務。
每個 greenlet 是 綠色的線程 :https://en.wikipedia.org/wiki/Green_threads。
示例代碼:
from gevent import monkey; monkey.patch_all() import gevent import requestsdef get_url(url):res = requests.get(url)print(url, res.status_code, len(res.text))url_l = ['http://www.baidu.com','http://www.python.org','http://www.cnblogs.com' ] g_l = [] for i in url_l:g_l.append(gevent.spawn(get_url, i)) gevent.joinall(g_l)示例代碼( 利用 gevent 并發抓取?):
from gevent import monkeymonkey.patch_all()import requests import gevent import io import sys# 解決console顯示亂碼的編碼問題 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')class Douban(object):"""A class containing interface test method of Douban object"""def __init__(self):self.host = 'movie.douban.com'self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:61.0) Gecko/20100101 Firefox/61.0','Referer': 'https://movie.douban.com/',}def get_response(self, url, data):resp = requests.post(url=url, data=data, headers=self.headers).content.decode('utf-8')return respdef test_search_tags_movie(self):method = 'search_tags'url = 'https://%s/j/%s' % (self.host, method)post_data = {'type': 'movie','source': 'index'}resp = self.get_response(url=url, data=post_data)print(resp)return respif __name__ == '__main__':douban = Douban()threads = []for i in range(6):thread = gevent.spawn(douban.test_search_tags_movie)threads.append(thread)gevent.joinall(threads)并發爬圖片:
from gevent import monkey monkey.patch_all() import requests import gevent from lxml import etreedef downloader(img_name, img_url):req = requests.get(img_url)img_content = req.contentwith open(img_name, "wb") as f:f.write(img_content)def main():r = requests.get('http://www.nsgirl.com/portal.php')if r.status_code == 200:img_src_xpath = '//div[@id="frameXWswSe"]//div[@class="portal_block_summary"]//li//img/@src's_html = etree.HTML(text=r.text)all_img_src = s_html.xpath(img_src_xpath)count = 0for img_src in all_img_src:count += 1# print(img_src)# http://www.nsgirl.com/forum.php?mod=image&aid=342&size=218x285&key=cd6828baf05c305curl = 'http://www.nsgirl.com/' + img_srcgevent.joinall([gevent.spawn(downloader, f"{count}.jpg", url), ])if __name__ == '__main__':main()?
?
?
gevent?程序員指南
?
由 Gevent 社區編寫
gevent是一個基于libev的并發庫。它為各種并發和網絡相關的任務提供了整潔的API。?
介紹
?
本指南假定讀者有中級Python水平,但不要求有其它更多的知識,不期待讀者有 并發方面的知識。本指南的目標在于給予你需要的工具來開始使用gevent,幫助你 馴服現有的并發問題,并從今開始編寫異步應用程序。
?
貢獻者
按提供貢獻的時間先后順序列出如下:?Stephen Diehl?Jérémy Bethmont?sww?Bruno Bigras?David Ripton?Travis Cline?Boris Feld?youngsterxyf?Eddie Hebert?Alexis Metaireau?Daniel Velkov
同時感謝Denis Bilenko寫了gevent和相應的指導以形成本指南。
這是一個以MIT許可證發布的協作文檔。你想添加一些內容?或看見一個排版錯誤? Fork一個分支發布一個request到?Github. 我們歡迎任何貢獻。
本頁也有日文版本。
?
?
核心部分
?
Greenlets
在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
在任何時刻,只有一個協程在運行。
這與multiprocessing或threading等提供真正并行構造的庫是不同的。 這些庫輪轉使用操作系統調度的進程和線程,是真正的并行。
?
同步和異步執行
?
并發的核心思想在于,大的任務可以分解成一系列的子任務,后者可以被調度成 同時執行或異步執行,而不是一次一個地或者同步地執行。兩個子任務之間的 切換也就是上下文切換。
在gevent里面,上下文切換是通過yielding來完成的. 在下面的例子里, 我們有兩個上下文,通過調用gevent.sleep(0),它們各自yield向對方。
import geventdef foo():print('Running in foo')gevent.sleep(0)print('Explicit context switch to foo again')def bar():print('Explicit context to bar')gevent.sleep(0)print('Implicit context switch back to bar')gevent.joinall([gevent.spawn(foo),gevent.spawn(bar), ])結果
Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar下圖將控制流形象化,就像在調試器中單步執行整個程序,以說明上下文切換如何發生。
當我們在受限于網絡或IO的函數中使用gevent,這些函數會被協作式的調度, gevent的真正能力會得到發揮。Gevent處理了所有的細節, 來保證你的網絡庫會在可能的時候,隱式交出greenlet上下文的執行權。 這樣的一種用法是如何強大,怎么強調都不為過。或者我們舉些例子來詳述。
下面例子中的select()函數通常是一個在各種文件描述符上輪詢的阻塞調用。
import time import gevent from gevent import selectstart = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start)def gr1():# Busy waits for a second, but we don't want to stick around...print('Started Polling: %s' % tic())select.select([], [], [], 2)print('Ended Polling: %s' % tic())def gr2():# Busy waits for a second, but we don't want to stick around...print('Started Polling: %s' % tic())select.select([], [], [], 2)print('Ended Polling: %s' % tic())def gr3():print("Hey lets do some stuff while the greenlets poll, %s" % tic())gevent.sleep(1)gevent.joinall([gevent.spawn(gr1),gevent.spawn(gr2),gevent.spawn(gr3), ])結果
Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at 0.0 seconds Ended Polling: at 2.0 seconds Ended Polling: at 2.0 seconds下面是另外一個多少有點人造色彩的例子,定義一個非確定性的(non-deterministic)?的task函數(給定相同輸入的情況下,它的輸出不保證相同)。 此例中執行這個函數的副作用就是,每次task在它的執行過程中都會隨機地停某些秒。
import gevent import randomdef task(pid):"""Some non-deterministic task"""gevent.sleep(random.randint(0,2)*0.001)print('Task %s done' % pid)def synchronous():for i in range(1,10):task(i)def asynchronous():threads = [gevent.spawn(task, i) for i in xrange(10)]gevent.joinall(threads)print('Synchronous:') synchronous()print('Asynchronous:') asynchronous()結果
Synchronous: Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done Asynchronous: Task 3 done Task 7 done Task 9 done Task 2 done Task 4 done Task 1 done Task 8 done Task 6 done Task 0 done Task 5 done上例中,在同步的部分,所有的task都同步的執行, 結果當每個task在執行時主流程被阻塞(主流程的執行暫時停住)。
程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall?函數,后者阻塞當前流程,并執行所有給定的greenlet。執行流程只會在 所有greenlet執行完后才會繼續向下走。
要重點留意的是,異步的部分本質上是隨機的,而且異步部分的整體運行時間比同步 要大大減少。事實上,同步部分的最大運行時間,即是每個task停0.002秒,結果整個 隊列要停0.02秒。而異步部分的最大運行時間大致為0.002秒,因為沒有任何一個task會 阻塞其它task的執行。
一個更常見的應用場景,如異步地向服務器取數據,取數據操作的執行時間 依賴于發起取數據請求時遠端服務器的負載,各個請求的執行時間會有差別。
import gevent.monkey gevent.monkey.patch_socket()import gevent import urllib2 import simplejson as jsondef fetch(pid):response = urllib2.urlopen('http://json-time.appspot.com/time.json')result = response.read()json_result = json.loads(result)datetime = json_result['datetime']print('Process %s: %s' % (pid, datetime))return json_result['datetime']def synchronous():for i in range(1,10):fetch(i)def asynchronous():threads = []for i in range(1,10):threads.append(gevent.spawn(fetch, i))gevent.joinall(threads)print('Synchronous:') synchronous()print('Asynchronous:') asynchronous()?
確定性
就像之前所提到的,greenlet具有確定性。在相同配置相同輸入的情況下,它們總是 會產生相同的輸出。下面就有例子,我們在multiprocessing的pool之間執行一系列的 任務,與在gevent的pool之間執行作比較。
import timedef echo(i):time.sleep(0.001)return i# Non Deterministic Process Poolfrom multiprocessing.pool import Poolp = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))]print(run1 == run2 == run3 == run4)# Deterministic Gevent Poolfrom gevent.pool import Poolp = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))]print(run1 == run2 == run3 == run4)結果
False True即使gevent通常帶有確定性,當開始與如socket或文件等外部服務交互時, 不確定性也可能溜進你的程序中。因此盡管gevent線程是一種“確定的并發”形式, 使用它仍然可能會遇到像使用POSIX線程或進程時遇到的那些問題。
涉及并發長期存在的問題就是競爭條件(race condition)。簡單來說, 當兩個并發線程/進程都依賴于某個共享資源同時都嘗試去修改它的時候, 就會出現競爭條件。這會導致資源修改的結果狀態依賴于時間和執行順序。 這是個問題,我們一般會做很多努力嘗試避免競爭條件, 因為它會導致整個程序行為變得不確定。
最好的辦法是始終避免所有全局的狀態。全局狀態和導入時(import-time)副作用總是會 反咬你一口!
?
創建 Greenlets
gevent 對 Greenlet 初始化提供了一些封裝,最常用的使用模板之一有
import gevent from gevent import Greenletdef foo(message, n):"""Each thread will be passed the message, and n argumentsin its initialization."""gevent.sleep(n)print(message)# Initialize a new Greenlet instance running the named function # foo thread1 = Greenlet.spawn(foo, "Hello", 1)# Wrapper for creating and running a new Greenlet from the named # function foo, with the passed arguments thread2 = gevent.spawn(foo, "I live!", 2)# Lambda expressions thread3 = gevent.spawn(lambda x: (x+1), 2)threads = [thread1, thread2, thread3]# Block until all threads complete. gevent.joinall(threads)結果:
Hello I live!除使用基本的Greenlet類之外,你也可以子類化Greenlet類,重載它的_run方法。
import gevent from gevent import Greenletclass MyGreenlet(Greenlet):def __init__(self, message, n):Greenlet.__init__(self)self.message = messageself.n = ndef _run(self):print(self.message)gevent.sleep(self.n)g = MyGreenlet("Hi there!", 3) g.start() g.join()結果
Hi there!?
Greenlet狀態
就像任何其他成段代碼,Greenlet也可能以不同的方式運行失敗。 Greenlet可能未能成功拋出異常,不能停止運行,或消耗了太多的系統資源。
一個greenlet的狀態通常是一個依賴于時間的參數。在greenlet中有一些標志, 讓你可以監視它的線程內部狀態:
- started?-- Boolean, 指示此Greenlet是否已經啟動
- ready()?-- Boolean, 指示此Greenlet是否已經停止
- successful()?-- Boolean, 指示此Greenlet是否已經停止而且沒拋異常
- value?-- 任意值, 此Greenlet代碼返回的值
- exception?-- 異常, 此Greenlet內拋出的未捕獲異常
結果
True True You win! None True True True False You fail at failing.?
程序停止
當主程序(main program)收到一個SIGQUIT信號時,不能成功做yield操作的 Greenlet可能會令意外地掛起程序的執行。這導致了所謂的僵尸進程, 它需要在Python解釋器之外被kill掉。
對此,一個通用的處理模式就是在主程序中監聽SIGQUIT信號,在程序退出 調用gevent.shutdown。
import gevent import signaldef run_forever():gevent.sleep(1000)if __name__ == '__main__':gevent.signal(signal.SIGQUIT, gevent.shutdown)thread = gevent.spawn(run_forever)thread.join()超時
超時是一種對一塊代碼或一個Greenlet的運行時間的約束。
import gevent from gevent import Timeoutseconds = 10timeout = Timeout(seconds) timeout.start()def wait():gevent.sleep(10)try:gevent.spawn(wait).join() except Timeout:print('Could not complete')超時類也可以用在上下文管理器(context manager)中, 也就是with語句內。
import gevent from gevent import Timeouttime_to_wait = 5 # secondsclass TooLong(Exception):passwith Timeout(time_to_wait, TooLong):gevent.sleep(10)另外,對各種Greenlet和數據結構相關的調用,gevent也提供了超時參數。 例如:
import gevent from gevent import Timeoutdef wait():gevent.sleep(2)timer = Timeout(1).start() thread1 = gevent.spawn(wait)try:thread1.join(timeout=timer) except Timeout:print('Thread 1 timed out')# --timer = Timeout.start_new(1) thread2 = gevent.spawn(wait)try:thread2.get(timeout=timer) except Timeout:print('Thread 2 timed out')# --try:gevent.with_timeout(1, wait) except Timeout:print('Thread 3 timed out')結果
Thread 1 timed out Thread 2 timed out Thread 3 timed out?
猴子補丁(Monkey patching)
我們現在來到 gevent 的死角了. 在此之前,我已經避免提到猴子補丁(monkey patching) 以嘗試使gevent這個強大的協程模型變得生動有趣,但現在到了討論猴子補丁的黑色藝術 的時候了。你之前可能注意到我們提到了monkey.patch_socket()這個命令,這個 純粹副作用命令是用來改變標準socket庫的。
import socket print(socket.socket)print("After monkey patch") from gevent import monkey monkey.patch_socket() print(socket.socket)import select print(select.select) monkey.patch_select() print("After monkey patch") print(select.select)結果:
class 'socket.socket' After monkey patch class 'gevent.socket.socket'built-in function select After monkey patch function select at 0x1924de8Python的運行環境允許我們在運行時修改大部分的對象,包括模塊,類甚至函數。 這是個一般說來令人驚奇的壞主意,因為它創造了“隱式的副作用”,如果出現問題 它很多時候是極難調試的。雖然如此,在極端情況下當一個庫需要修改Python本身 的基礎行為的時候,猴子補丁就派上用場了。在這種情況下,gevent能夠 修改標準庫里面大部分的阻塞式系統調用,包括socket、ssl、threading和?select等模塊,而變為協作式運行。
例如,Redis的python綁定一般使用常規的tcp socket來與redis-server實例通信。 通過簡單地調用gevent.monkey.patch_all(),可以使得redis的綁定協作式的調度 請求,與gevent棧的其它部分一起工作。
這讓我們可以將一般不能與gevent共同工作的庫結合起來,而不用寫哪怕一行代碼。 雖然猴子補丁仍然是邪惡的(evil),但在這種情況下它是“有用的邪惡(useful evil)”。
?
?
數據結構
?
事件
事件(event)是一個在Greenlet之間異步通信的形式。
import gevent from gevent.event import Event''' Illustrates the use of events '''evt = Event()def setter():'''After 3 seconds, wake all threads waiting on the value of evt'''print('A: Hey wait for me, I have to do something')gevent.sleep(3)print("Ok, I'm done")evt.set()def waiter():'''After 3 seconds the get call will unblock'''print("I'll wait for you")evt.wait() # blockingprint("It's about time")def main():gevent.joinall([gevent.spawn(setter),gevent.spawn(waiter),gevent.spawn(waiter),gevent.spawn(waiter),gevent.spawn(waiter),gevent.spawn(waiter)])if __name__ == '__main__':main()事件對象的一個擴展是AsyncResult,它允許你在喚醒調用上附加一個值。 它有時也被稱作是future或defered,因為它持有一個指向將來任意時間可設置 為任何值的引用。
import gevent from gevent.event import AsyncResult a = AsyncResult()def setter():"""After 3 seconds set the result of a."""gevent.sleep(3)a.set('Hello!')def waiter():"""After 3 seconds the get call will unblock after the setterputs a value into the AsyncResult."""print(a.get())gevent.joinall([gevent.spawn(setter),gevent.spawn(waiter), ])?
隊列
?
隊列是一個排序的數據集合,它有常見的put?/?get操作, 但是它是以在Greenlet之間可以安全操作的方式來實現的。
舉例來說,如果一個Greenlet從隊列中取出一項,此項就不會被 同時執行的其它Greenlet再取到了。
import gevent from gevent.queue import Queuetasks = Queue()def worker(n):while not tasks.empty():task = tasks.get()print('Worker %s got task %s' % (n, task))gevent.sleep(0)print('Quitting time!')def boss():for i in xrange(1,25):tasks.put_nowait(i)gevent.spawn(boss).join()gevent.joinall([gevent.spawn(worker, 'steve'),gevent.spawn(worker, 'john'),gevent.spawn(worker, 'nancy'), ])結果
Worker steve got task 1 Worker john got task 2 Worker nancy got task 3 Worker steve got task 4 Worker nancy got task 5 Worker john got task 6 Worker steve got task 7 Worker john got task 8 Worker nancy got task 9 Worker steve got task 10 Worker nancy got task 11 Worker john got task 12 Worker steve got task 13 Worker john got task 14 Worker nancy got task 15 Worker steve got task 16 Worker nancy got task 17 Worker john got task 18 Worker steve got task 19 Worker john got task 20 Worker nancy got task 21 Worker steve got task 22 Worker nancy got task 23 Worker john got task 24 Quitting time! Quitting time! Quitting time!如果需要,隊列也可以阻塞在put或get操作上。
put和get操作都有非阻塞的版本,put_nowait和get_nowait不會阻塞, 然而在操作不能完成時拋出gevent.queue.Empty或gevent.queue.Full異常。
在下面例子中,我們讓boss與多個worker同時運行,并限制了queue不能放入多于3個元素。 這個限制意味著,直到queue有空余空間之間,put操作會被阻塞。相反地,如果隊列中 沒有元素,get操作會被阻塞。它同時帶一個timeout參數,允許在超時時間內如果 隊列沒有元素無法完成操作就拋出gevent.queue.Empty異常。
import gevent from gevent.queue import Queue, Emptytasks = Queue(maxsize=3)def worker(n):try:while True:task = tasks.get(timeout=1) # decrements queue size by 1print('Worker %s got task %s' % (n, task))gevent.sleep(0)except Empty:print('Quitting time!')def boss():"""Boss will wait to hand out work until a individual worker isfree since the maxsize of the task queue is 3."""for i in xrange(1,10):tasks.put(i)print('Assigned all work in iteration 1')for i in xrange(10,20):tasks.put(i)print('Assigned all work in iteration 2')gevent.joinall([gevent.spawn(boss),gevent.spawn(worker, 'steve'),gevent.spawn(worker, 'john'),gevent.spawn(worker, 'bob'), ])結果:
Worker steve got task 1 Worker john got task 2 Worker bob got task 3 Worker steve got task 4 Worker bob got task 5 Worker john got task 6 Assigned all work in iteration 1 Worker steve got task 7 Worker john got task 8 Worker bob got task 9 Worker steve got task 10 Worker bob got task 11 Worker john got task 12 Worker steve got task 13 Worker john got task 14 Worker bob got task 15 Worker steve got task 16 Worker bob got task 17 Worker john got task 18 Assigned all work in iteration 2 Worker steve got task 19 Quitting time! Quitting time! Quitting time!?
組和池
?
組(group)是一個運行中greenlet的集合,集合中的greenlet像一個組一樣 會被共同管理和調度。 它也兼飾了像Python的multiprocessing庫那樣的 平行調度器的角色。
import gevent from gevent.pool import Groupdef talk(msg):for i in xrange(3):print(msg)g1 = gevent.spawn(talk, 'bar') g2 = gevent.spawn(talk, 'foo') g3 = gevent.spawn(talk, 'fizz')group = Group() group.add(g1) group.add(g2) group.join()group.add(g3) group.join()結果
bar bar bar foo foo foo fizz fizz fizz在管理異步任務的分組上它是非常有用的。
就像上面所說,Group也以不同的方式為分組greenlet/分發工作和收集它們的結果也提供了API。
import gevent from gevent import getcurrent from gevent.pool import Groupgroup = Group()def hello_from(n):print('Size of group %s' % len(group))print('Hello from Greenlet %s' % id(getcurrent()))group.map(hello_from, xrange(3))def intensive(n):gevent.sleep(3 - n)return 'task', nprint('Ordered')ogroup = Group() for i in ogroup.imap(intensive, xrange(3)):print(i)print('Unordered')igroup = Group() for i in igroup.imap_unordered(intensive, xrange(3)):print(i)結果
Size of group 3 Hello from Greenlet 31048720 Size of group 3 Hello from Greenlet 31049200 Size of group 3 Hello from Greenlet 31049040 Ordered ('task', 0) ('task', 1) ('task', 2) Unordered ('task', 2) ('task', 1) ('task', 0)池(pool)是一個為處理數量變化并且需要限制并發的greenlet而設計的結構。 在需要并行地做很多受限于網絡和IO的任務時常常需要用到它。
import gevent from gevent.pool import Poolpool = Pool(2)def hello_from(n):print('Size of pool %s' % len(pool))pool.map(hello_from, xrange(3))結果
Size of pool 2 Size of pool 2 Size of pool 1當構造gevent驅動的服務時,經常會將圍繞一個池結構的整個服務作為中心。 一個例子就是在各個socket上輪詢的類。
from gevent.pool import Poolclass SocketPool(object):def __init__(self):self.pool = Pool(1000)self.pool.start()def listen(self, socket):while True:socket.recv()def add_handler(self, socket):if self.pool.full():raise Exception("At maximum pool size")else:self.pool.spawn(self.listen, socket)def shutdown(self):self.pool.kill()?
鎖和信號量
?
信號量是一個允許greenlet相互合作,限制并發訪問或運行的低層次的同步原語。 信號量有兩個方法,acquire和release。在信號量是否已經被 acquire或release,和擁有資源的數量之間不同,被稱為此信號量的范圍 (the bound of the semaphore)。如果一個信號量的范圍已經降低到0,它會 阻塞acquire操作直到另一個已經獲得信號量的greenlet作出釋放。
from gevent import sleep from gevent.pool import Pool from gevent.coros import BoundedSemaphoresem = BoundedSemaphore(2)def worker1(n):sem.acquire()print('Worker %i acquired semaphore' % n)sleep(0)sem.release()print('Worker %i released semaphore' % n)def worker2(n):with sem:print('Worker %i acquired semaphore' % n)sleep(0)print('Worker %i released semaphore' % n)pool = Pool() pool.map(worker1, xrange(0,2)) pool.map(worker2, xrange(3,6))結果
Worker 0 acquired semaphore Worker 1 acquired semaphore Worker 0 released semaphore Worker 1 released semaphore Worker 3 acquired semaphore Worker 4 acquired semaphore Worker 3 released semaphore Worker 4 released semaphore Worker 5 acquired semaphore Worker 5 released semaphore范圍為1的信號量也稱為鎖(lock)。它向單個greenlet提供了互斥訪問。 信號量和鎖常常用來保證資源只在程序上下文被單次使用。
?
線程局部變量
Gevent也允許你指定局部于greenlet上下文的數據。 在內部,它被實現為以greenlet的getcurrent()為鍵, 在一個私有命名空間尋址的全局查找。
import gevent from gevent.local import localstash = local()def f1():stash.x = 1print(stash.x)def f2():stash.y = 2print(stash.y)try:stash.xexcept AttributeError:print("x is not local to f2")g1 = gevent.spawn(f1) g2 = gevent.spawn(f2)gevent.joinall([g1, g2])結果
1 2 x is not local to f2很多集成了gevent的web框架將HTTP會話對象以線程局部變量的方式存儲在gevent內。 例如使用Werkzeug實用庫和它的proxy對象,我們可以創建Flask風格的請求對象。
from gevent.local import local from werkzeug.local import LocalProxy from werkzeug.wrappers import Request from contextlib import contextmanagerfrom gevent.wsgi import WSGIServer_requests = local() request = LocalProxy(lambda: _requests.request)@contextmanager def sessionmanager(environ):_requests.request = Request(environ)yield_requests.request = Nonedef logic():return "Hello " + request.remote_addrdef application(environ, start_response):status = '200 OK'with sessionmanager(environ):body = logic()headers = [('Content-Type', 'text/html')]start_response(status, headers)return [body]WSGIServer(('', 8000), application).serve_forever()Flask系統比這個例子復雜一點,然而使用線程局部變量作為局部的會話存儲, 這個思想是相同的。
?
子進程
自gevent 1.0起,gevent.subprocess,一個Python?subprocess模塊 的修補版本已經添加。它支持協作式的等待子進程。
import gevent from gevent.subprocess import Popen, PIPEdef cron():while True:print("cron")gevent.sleep(0.2)g = gevent.spawn(cron) sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True) out, err = sub.communicate() g.kill() print(out.rstrip()) cron cron cron cron cron Linux很多人也想將gevent和multiprocessing一起使用。最明顯的挑戰之一 就是multiprocessing提供的進程間通信默認不是協作式的。由于基于?multiprocessing.Connection的對象(例如Pipe)暴露了它們下面的 文件描述符(file descriptor),gevent.socket.wait_read和wait_write?可以用來在直接讀寫之前協作式的等待ready-to-read/ready-to-write事件。
import gevent from multiprocessing import Process, Pipe from gevent.socket import wait_read, wait_write# To Process a, b = Pipe()# From Process c, d = Pipe()def relay():for i in xrange(10):msg = b.recv()c.send(msg + " in " + str(i))def put_msg():for i in xrange(10):wait_write(a.fileno())a.send('hi')def get_msg():for i in xrange(10):wait_read(d.fileno())print(d.recv())if __name__ == '__main__':proc = Process(target=relay)proc.start()g1 = gevent.spawn(get_msg)g2 = gevent.spawn(put_msg)gevent.joinall([g1, g2], timeout=1)然而要注意,組合multiprocessing和gevent必定帶來 依賴于操作系統(os-dependent)的缺陷,其中有:
-
在兼容POSIX的系統創建子進程(forking)之后, 在子進程的gevent的狀態是不適定的(ill-posed)。一個副作用就是,?multiprocessing.Process創建之前的greenlet創建動作,會在父進程和子進程兩 方都運行。
-
上例的put_msg()中的a.send()可能依然非協作式地阻塞調用的線程:一個 ready-to-write事件只保證寫了一個byte。在嘗試寫完成之前底下的buffer可能是滿的。
-
上面表示的基于wait_write()/wait_read()的方法在Windows上不工作 (IOError: 3 is not a socket (files are not supported)),因為Windows不能監視 pipe事件。
Python包gipc以大體上透明的方式在 兼容POSIX系統和Windows上克服了這些挑戰。它提供了gevent感知的基于multiprocessing.Process的子進程和gevent基于pipe的協作式進程間通信。
?
Actors
actor模型是一個由于Erlang變得普及的更高層的并發模型。 簡單的說它的主要思想就是許多個獨立的Actor,每個Actor有一個可以從 其它Actor接收消息的收件箱。Actor內部的主循環遍歷它收到的消息,并 根據它期望的行為來采取行動。
Gevent沒有原生的Actor類型,但在一個子類化的Greenlet內使用隊列, 我們可以定義一個非常簡單的。
import gevent from gevent.queue import Queueclass Actor(gevent.Greenlet):def __init__(self):self.inbox = Queue()Greenlet.__init__(self)def receive(self, message):"""Define in your subclass."""raise NotImplemented()def _run(self):self.running = Truewhile self.running:message = self.inbox.get()self.receive(message)下面是一個使用的例子:
import gevent from gevent.queue import Queue from gevent import Greenletclass Pinger(Actor):def receive(self, message):print(message)pong.inbox.put('ping')gevent.sleep(0)class Ponger(Actor):def receive(self, message):print(message)ping.inbox.put('pong')gevent.sleep(0)ping = Pinger() pong = Ponger()ping.start() pong.start()ping.inbox.put('start') gevent.joinall([ping, pong])?
?
真實世界的應用
?
Gevent ZeroMQ
ZeroMQ?被它的作者描述為 “一個表現得像一個并發框架的socket庫”。 它是一個非常強大的,為構建并發和分布式應用的消息傳遞層。
ZeroMQ提供了各種各樣的socket原語。最簡單的是請求-應答socket對 (Request-Response socket pair)。一個socket有兩個方法send和recv, 兩者一般都是阻塞操作。但是Travis Cline?的一個杰出的庫彌補了這一點,這個庫使用gevent.socket來以非阻塞的方式 輪詢ZereMQ socket。通過命令:
pip install gevent-zeromq
你可以從PyPi安裝gevent-zeremq。
# Note: Remember to ``pip install pyzmq gevent_zeromq`` import gevent from gevent_zeromq import zmq# Global Context context = zmq.Context()def server():server_socket = context.socket(zmq.REQ)server_socket.bind("tcp://127.0.0.1:5000")for request in range(1,10):server_socket.send("Hello")print('Switched to Server for %s' % request)# Implicit context switch occurs hereserver_socket.recv()def client():client_socket = context.socket(zmq.REP)client_socket.connect("tcp://127.0.0.1:5000")for request in range(1,10):client_socket.recv()print('Switched to Client for %s' % request)# Implicit context switch occurs hereclient_socket.send("World")publisher = gevent.spawn(server) client = gevent.spawn(client)gevent.joinall([publisher, client])結果
Switched to Server for 1 Switched to Client for 1 Switched to Server for 2 Switched to Client for 2 Switched to Server for 3 Switched to Client for 3 Switched to Server for 4 Switched to Client for 4 Switched to Server for 5 Switched to Client for 5 Switched to Server for 6 Switched to Client for 6 Switched to Server for 7 Switched to Client for 7 Switched to Server for 8 Switched to Client for 8 Switched to Server for 9 Switched to Client for 9?
簡單server
?
# On Unix: Access with ``$ nc 127.0.0.1 5000`` # On Window: Access with ``$ telnet 127.0.0.1 5000``from gevent.server import StreamServerdef handle(socket, address):socket.send("Hello from a telnet!\n")for i in range(5):socket.send(str(i) + '\n')socket.close()server = StreamServer(('127.0.0.1', 5000), handle) server.serve_forever()?
WSGI Servers
?
Gevent為HTTP內容服務提供了兩種WSGI server。從今以后就稱為?wsgi和pywsgi:
- gevent.wsgi.WSGIServer
- gevent.pywsgi.WSGIServer
在1.0.x之前更早期的版本里,gevent使用libevent而不是libev。 Libevent包含了一個快速HTTP server,它被用在gevent的wsgi?server。
在gevent 1.0.x版本,沒有包括http server了。作為替代,gevent.wsgi?現在是純Python server?gevent.pywsgi的一個別名。
?
流式server
這個章節不適用于gevent 1.0.x版本
熟悉流式HTTP服務(streaming HTTP service)的人知道,它的核心思想 就是在頭部(header)不指定內容的長度。反而,我們讓連接保持打開, 在每塊數據前加一個16進制字節來指示數據塊的長度,并將數據刷入pipe中。 當發出一個0長度數據塊時,流會被關閉。
HTTP/1.1 200 OK Content-Type: text/plain Transfer-Encoding: chunked8 <p>Hello9 World</p>0上述的HTTP連接不能在wsgi中創建,因為它不支持流式。 請求只有被緩沖(buffered)下來。
from gevent.wsgi import WSGIServerdef application(environ, start_response):status = '200 OK'body = '<p>Hello World</p>'headers = [('Content-Type', 'text/html')]start_response(status, headers)return [body]WSGIServer(('', 8000), application).serve_forever()然而使用pywsgi我們可以將handler寫成generator,并以塊的形式yield出結果。
from gevent.pywsgi import WSGIServerdef application(environ, start_response):status = '200 OK'headers = [('Content-Type', 'text/html')]start_response(status, headers)yield "<p>Hello"yield "World</p>"WSGIServer(('', 8000), application).serve_forever()但無論如何,與其它Python server相比gevent server性能是顯勝的。 Libev是得到非常好審查的技術,由它寫出的server在大規模上表現優異為人熟知。
為了測試基準,試用Apache Benchmark?ab或瀏覽?Benchmark of Python WSGI Servers?來與其它server作對比。
$ ab -n 10000 -c 100 http://127.0.0.1:8000/?
Long Polling
import gevent from gevent.queue import Queue, Empty from gevent.pywsgi import WSGIServer import simplejson as jsondata_source = Queue()def producer():while True:data_source.put_nowait('Hello World')gevent.sleep(1)def ajax_endpoint(environ, start_response):status = '200 OK'headers = [('Content-Type', 'application/json')]start_response(status, headers)while True:try:datum = data_source.get(timeout=5)yield json.dumps(datum) + '\n'except Empty:passgevent.spawn(producer)WSGIServer(('', 8000), ajax_endpoint).serve_forever()?
Websockets
運行Websocket的例子需要gevent-websocket包。
# Simple gevent-websocket server import json import randomfrom gevent import pywsgi, sleep from geventwebsocket.handler import WebSocketHandlerclass WebSocketApp(object):'''Send random data to the websocket'''def __call__(self, environ, start_response):ws = environ['wsgi.websocket']x = 0while True:data = json.dumps({'x': x, 'y': random.randint(1, 5)})ws.send(data)x += 1sleep(0.5)server = pywsgi.WSGIServer(("", 10000), WebSocketApp(),handler_class=WebSocketHandler) server.serve_forever()HTML Page:
<html><head><title>Minimal websocket application</title><script type="text/javascript" src="jquery.min.js"></script><script type="text/javascript">$(function() {// Open up a connection to our servervar ws = new WebSocket("ws://localhost:10000/");// What do we do when we get a message?ws.onmessage = function(evt) {$("#placeholder").append('<p>' + evt.data + '</p>')}// Just update our conn_status field with the connection statusws.onopen = function(evt) {$('#conn_status').html('<b>Connected</b>');}ws.onerror = function(evt) {$('#conn_status').html('<b>Error</b>');}ws.onclose = function(evt) {$('#conn_status').html('<b>Closed</b>');}});</script></head><body><h1>WebSocket Example</h1><div id="conn_status">Not Connected</div><div id="placeholder" style="width:600px;height:300px;"></div></body> </html>?
聊天 server
?
最后一個生動的例子,實現一個實時聊天室。運行這個例子需要?Flask?(你可以使用Django, Pyramid等,但不是必須的)。 對應的Javascript和HTML文件可以在?這里找到。
# Micro gevent chatroom. # ----------------------from flask import Flask, render_template, requestfrom gevent import queue from gevent.pywsgi import WSGIServerimport simplejson as jsonapp = Flask(__name__) app.debug = Truerooms = {'topic1': Room(),'topic2': Room(), }users = {}class Room(object):def __init__(self):self.users = set()self.messages = []def backlog(self, size=25):return self.messages[-size:]def subscribe(self, user):self.users.add(user)def add(self, message):for user in self.users:print(user)user.queue.put_nowait(message)self.messages.append(message)class User(object):def __init__(self):self.queue = queue.Queue()@app.route('/') def choose_name():return render_template('choose.html')@app.route('/<uid>') def main(uid):return render_template('main.html',uid=uid,rooms=rooms.keys())@app.route('/<room>/<uid>') def join(room, uid):user = users.get(uid, None)if not user:users[uid] = user = User()active_room = rooms[room]active_room.subscribe(user)print('subscribe %s %s' % (active_room, user))messages = active_room.backlog()return render_template('room.html',room=room, uid=uid, messages=messages)@app.route("/put/<room>/<uid>", methods=["POST"]) def put(room, uid):user = users[uid]room = rooms[room]message = request.form['message']room.add(':'.join([uid, message]))return ''@app.route("/poll/<uid>", methods=["POST"]) def poll(uid):try:msg = users[uid].queue.get(timeout=10)except queue.Empty:msg = []return json.dumps(msg)if __name__ == "__main__":http = WSGIServer(('', 5000), app)http.serve_forever()?
?
?
?
?
總結
以上是生活随笔為你收集整理的Python Gevent – 高性能的 Python 并发框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: p1和p7签名的区别
- 下一篇: SpringBoot 自带工具类~Aop