gevent-tutorial翻译和解读
原文在[1],翻譯的在[2],這篇博客進一步解讀一些代碼,以及加了一些注釋,并且所有代碼都已經改成python3.x:
(斜線表示原文引用)
--------------------------------------------------------------------------------------------------------------------------------
Synchronous & Asynchronous Execution 同步&異步執行
The core idea of concurrency is that a larger task can be broken down into a collection of subtasks whose operation does not depend on the other tasks and thus can be run?asynchronously?instead of one at a time?synchronously. A switch between the two executions is known as a?context switch.
A context switch in gevent done through?yielding. In this case example we have two contexts which yield to each other through invoking?gevent.sleep(0).
并發的核心思想是一個更大的任務可以分解成多個子任務,其運行不依賴于其他任務的集合,因此可以異步運行?,而不是一個在時間?同步。兩個執行程序間的轉換是一個關聯轉換。
在gevent中一個關聯轉換可以通過?yielding?來實現.在這個例子,兩個程序的轉換是通過調用?gevent.sleep(0).
import geventdef foo():print('Running in foo-foo函數')gevent.sleep(0)print('Explicit context switch to foo again-foo函數')def bar():print('Explicit context to bar-bar函數')gevent.sleep(0)print('Implicit context switch back to bar-bar函數')gevent.joinall([gevent.spawn(foo),gevent.spawn(bar), ])It is illuminating to visualize the control flow of the program or walk through it with a debugger to see the context switches as they occur.
在調解器里面清楚地看到程序在兩個轉換之間是怎么運行的.
The real power of gevent comes when we use it for network and IO bound functions which can be cooperatively scheduled. Gevent has taken care of all the details to ensure that your network libraries will implicitly yield their greenlet contexts whenever possible. I cannot stress enough what a powerful idiom this is. But maybe an example will illustrate.
gevent真正的能力在于我們把它用于網絡和IO相關的功能會很好的合作安排.
下面這個例子其實不是太好,select用法很不靠譜(略過)
import time import gevent from gevent import selectstart = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start)def gr1():# Busy waits for a second, but we don't want to stick around...print('Started Polling: ', tic())select.select([], [], [], 2)print('Ended Polling: ', tic())def gr2():# Busy waits for a second, but we don't want to stick around...print('Started Polling: ', tic())select.select([], [], [], 2)print('Ended Polling: ', tic())def gr3():print("Hey lets do some stuff while the greenlets poll, at", tic())gevent.sleep(1)gevent.joinall([gevent.spawn(gr1),gevent.spawn(gr2),gevent.spawn(gr3), ])A somewhat synthetic example defines a?task?function which is?non-deterministic?(i.e. its output is not guaranteed to give the same result for the same inputs). In this case the side effect of running the function is that the task pauses its execution for a random number of seconds.
一個比較綜合的例子,定義一個task函數,它是不確定的(并不能保證相同的輸入輸出).在這種情況運行task函數的作用只是暫停其執行幾秒鐘的隨機數.
import gevent import random import time def task(pid):"""Some non-deterministic task"""gevent.sleep(random.randint(0,2)*0.001)print('Task', pid, 'done')def synchronous():for i in range(1,10):task(i)def asynchronous():threads = [gevent.spawn(task, i) for i in range(10)]gevent.joinall(threads)print('Synchronous:') start=time.time() synchronous() end=time.time() print("time interval=",end-start) print('------------------------------------') print('Asynchronous:') start=time.time() asynchronous() end=time.time() print("time interval=",end-start)輸出結果是:
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
time interval= 0.010333538055419922
------------------------------------
Asynchronous:
Task 2 done
Task 5 done
Task 6 done
Task 8 done
Task 0 done
Task 1 done
Task 3 done
Task 4 done
Task 7 done
Task 9 done
time interval= 0.0026671886444091797
[Finished in 0.1s]
注意,這個異步的亂序在不同機子上是不一樣的,僅僅在單機上是確定的。
In the synchronous case all the tasks are run sequentially, which results in the main programming?blocking?( i.e. pausing the execution of the main program ) while each task executes.
在同步的情況所有任務都會順序的運行,當每個任務執行的時候導致主程序?blocking.
The important parts of the program are the?gevent.spawn?which wraps up the given function inside of a Greenlet thread. The list of initialized greenlets are stored in the array?threads?which is passed to the?gevent.joinall?function which blocks the current program to run all the given greenlets. The execution will step forward only when all the greenlets terminate.
程序重要的部分是包裝起來的函數gevent.spawn?, 它是Greenlet的線程. 初始化的greenlets儲存在一個數組threads?,然后提交給?gevent.joinall函數,然后阻塞當前的程序去運行所有greenlets.只有當所有greenlets停止的時候程序才會繼續運行.
The important fact to notice is that the order of execution in the async case is essentially random and that the total execution time in the async case is much less than the sync case. In fact the maximum time for the synchronous case to complete is when each tasks pauses for 2 seconds resulting in a 20 seconds for the whole queue. In the async case the maximum runtime is roughly 2 seconds since none of the tasks block the execution of the others.
要注意的是異步的情況程序是無序的,異步的執行時間是遠少于同步的.事實上同步去完成每個任務停止2秒的話,結果是要20秒才能完成整個隊列.在異步的情況最大的運行時間大概就是2秒,因為每個任務的執行都不會阻塞其他的任務.
A more common use case, fetching data from a server asynchronously, the runtime of?fetch()?will differ between requests given the load on the remote server.
一個更常見的情況,是從服務器上異步獲取數據,請求之間?fetch()?的運行時間會給服務器帶來不同的負載.
import gevent.monkey gevent.monkey.patch_socket() import time import gevent import urllib3 # import simplejson as json from urllib import request import urllib import json # resp = request.urlopen('http://www.baidu.com')# 這個是在發送http請求 def fetch(pid):response = request.urlopen('http://quan.suning.com/getSysTime.do')# resp = urllib.request.urlopen(url)ele_json = json.loads(response.read())datetime = ele_json['sysTime1']print("Process", pid, datetime)return ele_json['sysTime1']def synchronous():for i in range(1,10):fetch(i)def asynchronous():threads = []for i in range(1,10):threads.append(gevent.spawn(fetch, i))#傳入參數i給函數fetchgevent.joinall(threads)print('------------------Synchronous------------------') start=time.time() synchronous() end=time.time() print("同步運行的時間間隔=",end-start) print('------------------Asynchronous------------------') start=time.time() asynchronous() end=time.time() print("異步運行的時間間隔=",end-start)運行結果是:
------------------Synchronous------------------
Process 1 20191126215800
Process 2 20191126215800
Process 3 20191126215800
Process 4 20191126215801
Process 5 20191126215800
Process 6 20191126215801
Process 7 20191126215801
Process 8 20191126215801
Process 9 20191126215801
同步運行的時間間隔= 0.420764684677124
------------------Asynchronous------------------
Process 6 20191126215801
Process 8 20191126215801
Process 5 20191126215801
Process 1 20191126215801
Process 2 20191126215801
Process 9 20191126215801
Process 3 20191126215801
Process 7 20191126215801
Process 4 20191126215801
異步運行的時間間隔= 0.0831596851348877
[Finished in 0.8s]
?
?
Determinism 確定性
As mentioned previously, greenlets are deterministic. Given the same inputs and they always produce the same output. For example lets spread a task across a multiprocessing pool compared to a gevent pool.
正如之前提到的,greenlets是確定性的.給相同的輸入就總會提供相同的輸出.例如展開一個任務來比較一個multiprocessing pool和一個gevent pool.
import timedef echo(i):time.sleep(0.001)return i# Non Deterministic Process Poolfrom multiprocessing.pool import Poolp = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))]print( run1 == run2 == run3 == run4 )# Deterministic Gevent Poolfrom gevent.pool import Poolp = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))]print( run1 == run2 == run3 == run4 )這段代碼中的imap_unordered表示:
不保證返回的結果順序與進程添加的順序一致。
[3]中提到了上面代碼中的非阻塞式版本
Pool.apply_async()和Pool.map_async()?
這段代碼的意思就是示范下異步的不確定性。
?
Even though gevent is normally deterministic, sources of non-determinism can creep into your program when you begin to interact with outside services such as sockets and files. Thus even though green threads are a form of "deterministic concurrency", they still can experience some of the same problems that POSIX threads and processes experience.
The perennial problem involved with concurrency is known as a?race condition. Simply put is when two concurrent threads / processes depend on some shared resource but also attempt to modify this value. This results in resources whose values become time-dependent on the execution order. This is a problem, and in general one should very much try to avoid race conditions since they result program behavior which is globally non-deterministic.*
The best approach to this is to simply avoid all global state all times. Global state and import-time side effects will always come back to bite you!
Spawning Threads(下面開始的一些東西的意思是你可以像使用gevent一樣使用Greenlet)
gevent provides a few wrappers around Greenlet initialization. Some of the most common patterns are:
gevent提供了一些Greenlet初始化的封裝.部分比較常用的模塊是:
import gevent from gevent import Greenletdef foo(message, n):"""Each thread will be passed the message, and n argumentsin its initialization."""gevent.sleep(n)print(message)# Initialize a new Greenlet instance running the named function # foo thread1 = Greenlet.spawn(foo, "Hello", 1)# Wrapper for creating and runing a new Greenlet from the named # function foo, with the passed arguments thread2 = gevent.spawn(foo, "I live!", 2)# Lambda expressions thread3 = gevent.spawn(lambda x: (x+1), 2)threads = [thread1, thread2, thread3]# Block until all threads complete. gevent.joinall(threads)?
Hello I live!?
In addition to using the base Greenlet class, you may also subclass Greenlet class and overload the?_run?method.
除了用Greenlet的基類,你也可以用Greenlet的子類,重載_run?方法.
from gevent import Greenletclass MyGreenlet(Greenlet):def __init__(self, message, n):Greenlet.__init__(self)self.message = messageself.n = ndef _run(self):print(self.message)gevent.sleep(self.n)g = MyGreenlet("Hi there!", 3) g.start() g.join()運行結果是:
Hi there!
?
Greenlet State 狀態
Like any other segment of code, Greenlets can fail in various ways. A greenlet may fail to throw an exception, fail to halt or consume too many system resources.
?
像其他編程,Greenlets會以不同的方式失敗.一個greenlet可能會拋出一個異常, 失敗會使程序停止或者消耗系統很多資源.
The internal state of a greenlet is generally a time-dependent parameter. There are a number of flags on greenlets which let you monitor the state of the thread
greenlet內部的狀態通常是一個按時間變化的參數.以下幾個狀態讓你可以監聽線程的狀態.
- started?-- Boolean, indicates whether the Greenlet has been started. 表明是否Greenlet已經開始
- ready()?-- Boolean, indicates whether the Greenlet has halted. 表明是否Greenlet已經停止
- successful()?-- Boolean, indicates whether the Greenlet has halted and not thrown an exception. 表明是否Greenlet已經停止并且沒有拋出異常
- value?-- arbitrary, the value returned by the Greenlet. 任意,Greenlet返回的值
- exception?-- exception, uncaught exception instance thrown inside the greenlet 異常,greenlet內部實例沒有被捕抓的異常
實驗結果:
True
True
You win!
None
True
True
True
False
You fail at failing.
Program Shutdown 程序關閉
Greenlets that fail to yield when the main program receives a SIGQUIT may hold the program's execution longer than expected. This results in so called "zombie processes" which need to be killed from outside of the Python interpreter.
當主程序接受到一個SIGQUIT的時候,Greenlets的失敗可能會讓程序的執行比預想中長時間.這樣的結果稱為"zombie processes" ,需要讓Python解析器以外的程序殺掉.
A common pattern is to listen SIGQUIT events on the main program and to invoke?gevent.shutdown?before exit.
一個常用的模塊是在主程序中監聽SIGQUIT事件和退出前調用?gevent.shutdown?.
import gevent import signaldef run_forever():gevent.sleep(1000)if __name__ == '__main__':gevent.signal(signal.SIGQUIT, gevent.shutdown)#這行代碼是對協程運行失敗進行監聽,這行代碼已經無法運行了,因為不存在signal.SIGQUITthread = gevent.spawn(run_forever)thread.join()Timeouts 超時設定
Timeouts are a constraint on the runtime of a block of code or a Greenlet.
超時是對一推代碼或者一個Greenlet運行時間的一種約束.
import gevent from gevent import Timeoutseconds = 10timeout = Timeout(seconds) timeout.start()#下面的excetp Timeout會監聽上面的timeout設定 #------------------------------------------------- def wait():gevent.sleep(10)try:gevent.spawn(wait).join() except Timeout:print 'Could not complete'Or with a context manager in a with?a statement.
或者是帶著一個語境的管理在一個with的狀態.
import gevent from gevent import Timeouttime_to_wait = 5 # secondsclass TooLong(Exception):passwith Timeout(time_to_wait, TooLong):gevent.sleep(10)In addition, gevent also provides timeout arguments for a variety of Greenlet and data stucture related calls. For example:
另外,gevent同時也提供timeout的參數給各種Greenlet和數據結構相關的調用.例如:
import gevent from gevent import Timeoutdef wait():gevent.sleep(2)timer = Timeout(1).start() thread1 = gevent.spawn(wait)#---------------上面的timer被下面的try使用,然后except補充try中的timer------------------------------- try:thread1.join(timeout=timer) except Timeout:print('Thread 1 timed out')# --timer = Timeout.start_new(1)#換個定時器 thread2 = gevent.spawn(wait) #---------------------------------------------- try:thread2.get(timeout=timer) except Timeout:print('Thread 2 timed out')# --try:gevent.with_timeout(1, wait) except Timeout:print('Thread 3 timed out')Thread 1 timed out
Thread 2 timed out
Thread 3 timed out
Data Structures 數據結構
?
Events 事件(根據[4][6]中的說法,AsyncResult不實用,可以用于協程之間的通信)
Events are a form of asynchronous communication between Greenlets.
事件是Greenlets內部一種異步通訊的形式.
import gevent from gevent.event import AsyncResulta = AsyncResult()def setter():"""After 3 seconds set wake all threads waiting on the value ofa."""gevent.sleep(3)a.set()def waiter():"""After 3 seconds the get call will unblock."""a.get() # blockingprint 'I live!'gevent.joinall([gevent.spawn(setter),gevent.spawn(waiter), ])A extension of the Event object is the AsyncResult which allows you to send a value along with the wakeup call. This is sometimes called a future or a deferred, since it holds a reference to a future value that can be set on an arbitrary time schedule.
Event對象的一個擴展AsyncResult,可以讓你發送一個值連同喚醒調用.這樣有時候調用一個將來或者一個延遲,然后它就可以保存涉及到一個將來的值可以用于任意時間表.
import gevent from gevent.event import AsyncResult a = AsyncResult()def setter():"""After 3 seconds set the result of a."""gevent.sleep(3)a.set('Hello!')def waiter():"""After 3 seconds the get call will unblock after the setterputs a value into the AsyncResult."""print a.get()gevent.joinall([gevent.spawn(setter),gevent.spawn(waiter), ])Queues 隊列(關于Queues的用法可以參考[5])
Queues are ordered sets of data that have the usual?put?/?get?operations but are written in a way such that they can be safely manipulated across Greenlets.
Queues是一組數據的排序,有常用的?put?/?get操作,但也可以以另一種方式寫入,就是當他們在Greenlets之間可以安全地操作.
For example if one Greenlet grabs an item off of the queue, the same item will not grabbed by another Greenlet executing simultaneously.
import gevent from gevent.queue import Queuetasks = Queue()#消耗隊列中的數據 def worker(n):while not tasks.empty():task = tasks.get()print('Worker %s got task %s' % (n, task))gevent.sleep(0)print('Quitting time!')#放入隊列中 def boss():for i in xrange(1,25):tasks.put_nowait(i)gevent.spawn(boss).join()gevent.joinall([gevent.spawn(worker, 'steve'),gevent.spawn(worker, 'john'),gevent.spawn(worker, 'nancy'), ])例如如果一個Greenlet在隊列中取出一個元素,同樣的元素就不會被另一個正在執行的Greenlet取出.
?
實驗結果:
Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker nancy got task 5
Worker john got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Worker steve got task 10
Worker nancy got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker nancy got task 15
Worker steve got task 16
Worker nancy got task 17
Worker john got task 18
Worker steve got task 19
Worker john got task 20
Worker nancy got task 21
Worker steve got task 22
Worker nancy got task 23
Worker john got task 24
Quitting time!
Quitting time!
Quitting time!
Queues can also block on either?put?or?get?as the need arises.
Queues也可以在?put或者get的時候阻塞,如果有必要的話.
Each of the?put?and?get?operations has a non-blocking counterpart,?put_nowait?and?get_nowait?which will not block, but instead raise either?gevent.queue.Emptyor?gevent.queue.Full?in the operation is not possible.
每個put和get操作都有一個對應的非阻塞的函數:put_nowait和get_nowait,但在操作中拋出gevent.queue.Empty或者gevent.queue.Full是不可能的.
In this example we have the boss running simultaneously to the workers and have a restriction on the Queue that it can contain no more than three elements. This restriction means that the?put?operation will block until there is space on the queue. Conversely the?get?operation will block if there are no elements on the queue to fetch, it also takes a timeout argument to allow for the queue to exit with the exception?gevent.queue.Empty?if no work can found within the time frame of the Timeout.
在這個例子中,我們有一個boss同時給工人任務,有一個限制是說隊列中不能超過3個元素(這個3應該是受限制于worker數量),這個限制意味著:當隊伍中沒有空間,put操作會阻塞.相反的,如果隊列中沒有元素可取,get操作會阻塞,也可以加入一個timeout的參數來允許隊列帶著一個異常gevent.queue.Empty退出,如果在Timeout時間范圍內沒有工作.
import gevent from gevent.queue import Queue, Emptytasks = Queue(maxsize=3)def worker(n):try:while True:task = tasks.get(timeout=1) # decrements queue size by 1print('Worker %s got task %s' % (n, task))gevent.sleep(0)except Empty:print('Quitting time!')def boss():"""Boss will wait to hand out work until a individual worker isfree since the maxsize of the task queue is 3."""for i in xrange(1,10):tasks.put(i)print('Assigned all work in iteration 1')for i in xrange(10,20):tasks.put(i)print('Assigned all work in iteration 2')gevent.joinall([gevent.spawn(boss),gevent.spawn(worker, 'steve'),gevent.spawn(worker, 'john'),gevent.spawn(worker, 'bob'), ])Worker steve got task 1
Worker john got task 2
Worker bob got task 3
Worker steve got task 4
Worker bob got task 5
Worker john got task 6
Assigned all work in iteration 1
Worker steve got task 7
Worker john got task 8
Worker bob got task 9
Worker steve got task 10
Worker bob got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker bob got task 15
Worker steve got task 16
Worker bob got task 17
Worker john got task 18
Assigned all work in iteration 2
Worker steve got task 19
Quitting time!
Quitting time!
Quitting time!
(下面這兩段代碼沒什么用)
import geventclass Actor(gevent.Greenlet):def __init__(self):self.inbox = queue.Queue()Greenlet.__init__(self)def receive(self, message):"""Define in your subclass."""raise NotImplemented()def _run(self):self.running = Truewhile self.running:message = self.inbox.get()self.receive(message)In a use case:
import gevent from gevent.queue import Queue from gevent import Greenletclass Pinger(Actor):def receive(self, message):print messagepong.inbox.put('ping')gevent.sleep(0)class Ponger(Actor):def receive(self, message):print messageping.inbox.put('pong')gevent.sleep(0)ping = Pinger() pong = Ponger()ping.start() pong.start()ping.inbox.put('start') gevent.joinall([ping, pong])Real World Applications
Gevent ZeroMQ
ZeroMQ?is described by its authors as "a socket library that acts as a concurrency framework". It is a very powerful messaging layer for building concurrent and distributed applications.
ZeroMQ根據其作者的描述是"一個socket庫作為一個并發性的框架".它是非常強大的消息傳送層在建立并發性結構和分布式應用的時候.
ZeroMQ provides a variety of socket primitives, the simplest of which being a Request-Response socket pair. A socket has two methods of interest?send?and?recv, both of which are normally blocking operations. But this is remedied by a briliant library by?Travis Cline?which uses gevent.socket to poll ZeroMQ sockets in a non-blocking manner. You can install gevent-zeromq from PyPi via:?pip install gevent-zeromq
ZeroMQ提供了各種socket基元,最簡單的就是一對Request-Response socket. 一個socket有2個有用方法?send和recv,兩者一般都會有阻塞操作.但這已經被一個作者叫Travis Cline,基于gevent 寫的briliant庫補救了.socket 屬于 ZeroMQ sockets 是一種不會阻塞的方式.你可以安裝 gevent-zeromq 從 PyPi 取到:?pip install gevent-zeromq
(下面這個代碼沒有太大價值,留意bind和connect的成對寫法)
# Note: Remember to ``pip install pyzmq gevent_zeromq`` import gevent from gevent_zeromq import zmq# Global Context context = zmq.Context()def server():server_socket = context.socket(zmq.REQ)server_socket.bind("tcp://127.0.0.1:5000")for request in range(1,10):server_socket.send("Hello")print('Switched to Server for ', request)# Implicit context switch occurs hereserver_socket.recv()def client():client_socket = context.socket(zmq.REP)client_socket.connect("tcp://127.0.0.1:5000")for request in range(1,10):client_socket.recv()print('Switched to Client for ', request)# Implicit context switch occurs hereclient_socket.send("World")publisher = gevent.spawn(server) client = gevent.spawn(client)gevent.joinall([publisher, client])Switched to Server for ?1
Switched to Client for ?1
Switched to Server for ?2
Switched to Client for ?2
Switched to Server for ?3
Switched to Client for ?3
Switched to Server for ?4
Switched to Client for ?4
Switched to Server for ?5
Switched to Client for ?5
Switched to Server for ?6
Switched to Client for ?6
Switched to Server for ?7
Switched to Client for ?7
Switched to Server for ?8
Switched to Client for ?8
Switched to Server for ?9
Switched to Client for ?9
?
Simple Telnet Servers(常見工作場景不需要自己去搭建這樣的Telnet服務器的)
# On Unix: Access with ``$ nc 127.0.0.1 5000`` # On Window: Access with ``$ telnet 127.0.0.1 5000``from gevent.server import StreamServerdef handle(socket, address):socket.send("Hello from a telnet!\n")for i in range(5):socket.send(str(i) + '\n')socket.close()server = StreamServer(('127.0.0.1', 5000), handle) server.serve_forever()WSGI Servers
Gevent provides two WSGI servers for serving content over HTTP. Henceforth called?wsgi?and?pywsgi:
Gevent提供2個WSGI服務器用于HTTP.稱為wsgi和pywsgi:
- gevent.wsgi.WSGIServer
- gevent.pywsgi.WSGIServer
In earlier versions of gevent before 1.0.x, gevent used libevent instead of libev. Libevent included a fast HTTP server which was used by gevent's?wsgi?server.
在1.0x前的gevent版本,gevent用libevent代替libev.Libevent包含一個快的HTTP服務用于gevent的wsgi服務器.
In gevent 1.0.x there is no http server included. Instead?gevent.wsgi?it is now an alias for the pure Python server in?gevent.pywsgi.
在gevent1.0.x這里沒有http服務器包含,gevent.wsgi現在已經是純Python服務器gevent.pywsgi的別名.
Streaming Servers 流式服務器
If you are using gevent 1.0.x, this section does not apply
如果你用的是gevent1.0.x,這部分是不適用的.
For those familiar with streaming HTTP services, the core idea is that in the headers we do not specify a length of the content. We instead hold the connection open and flush chunks down the pipe, prefixing each with a hex digit indicating the length of the chunk. The stream is closed when a size zero chunk is sent.
和那些流式HTTP服務器相似,核心意見是在headers中,我們不指定內容的長度.我們用保持連接打到管道接收緩沖塊,在每一塊的前綴用十六進制表明塊的長度,當塊的長度是0發送的時候,流就會關閉.
實驗結果:
HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked
8
<p>Hello
9
World</p>
0
?
The above HTTP connection could not be created in wsgi because streaming is not supported. It would instead have to buffered.
以上的HTTP連接,不能用于創建wsgi,因為流是不支持的,我們用緩沖的方法.
from gevent.wsgi import WSGIServerdef application(environ, start_response):status = '200 OK'body = '<p>Hello World</p>'headers = [('Content-Type', 'text/html')]start_response(status, headers)return [body]WSGIServer(('', 8000), application).serve_forever()Using pywsgi we can however write our handler as a generator and yield the result chunk by chunk.
使用pywsgi我們把處理當作一個發生器,一塊接一塊的返回結果.
from gevent.pywsgi import WSGIServerdef application(environ, start_response):status = '200 OK'headers = [('Content-Type', 'text/html')]start_response(status, headers)yield "<p>Hello"yield "World</p>"WSGIServer(('', 8000), application).serve_forever()But regardless, performance on Gevent servers is phenomenal compared to other Python servers. libev is a very vetted technology and its derivative servers are known to perform well at scale.
To benchmark, try Apache Benchmark?ab?or see this?Benchmark of Python WSGI Servers?for comparison with other servers.
但是不管怎樣,gevent跟其他python服務器對比是非凡的.libev是一個非常vetted的技術,和它派生出來的服務器已被認可是表現良好的.
用基準問題測試,嘗試 Apache Benchmark?ab?看這個?Benchmark of Python WSGI Servers?和其他服務的比較.
$ ab -n 10000 -c 100 http://127.0.0.1:8000/Long Polling(下面這個代碼在現實場景需要拆分成兩份分別運行)
import gevent from gevent.queue import Queue, Empty from gevent.pywsgi import WSGIServer import simplejson as jsondata_source = Queue()def producer():while True:data_source.put_nowait('Hello World')gevent.sleep(1)def ajax_endpoint(environ, start_response):status = '200 OK'headers = [('Content-Type', 'application/json')]try:datum = data_source.get(timeout=5)except Empty:datum = []start_response(status, headers)return json.dumps(datum)gevent.spawn(producer)WSGIServer(('', 8000), ajax_endpoint).serve_forever()Websockets(下面這個實驗可以用flask-socketio復現)
Websocket example which requires?gevent-websocket.(pip install gevent-websocket)
Websocket例子需要?gevent-websocket.
# Simple gevent-websocket server import json import randomfrom gevent import pywsgi, sleep from geventwebsocket.handler import WebSocketHandlerclass WebSocketApp(object):'''Send random data to the websocket'''def __call__(self, environ, start_response):ws = environ['wsgi.websocket']x = 0while True:data = json.dumps({'x': x, 'y': random.randint(1, 5)})ws.send(data)x += 1sleep(0.5)server = pywsgi.WSGIServer(("", 10000), WebSocketApp(),handler_class=WebSocketHandler) server.serve_forever()HTML Page:
<html><head><title>Minimal websocket application</title><script type="text/javascript" src="jquery.min.js"></script><script type="text/javascript">$(function() {// Open up a connection to our servervar ws = new WebSocket("ws://localhost:10000/");// What do we do when we get a message?ws.onmessage = function(evt) {$("#placeholder").append('<p>' + evt.data + '</p>')}// Just update our conn_status field with the connection statusws.onopen = function(evt) {$('#conn_status').html('<b>Connected</b>');}ws.onerror = function(evt) {$('#conn_status').html('<b>Error</b>');}ws.onclose = function(evt) {$('#conn_status').html('<b>Closed</b>');}});</script></head><body><h1>WebSocket Example</h1><div id="conn_status">Not Connected</div><div id="placeholder" style="width:600px;height:300px;"></div></body> </html>Chat Server
The final motivating example, a realtime chat room. This example requires?Flask?( but not neccesarily so, you could use Django, Pyramid, etc ). The corresponding Javascript and HTML files can be found?here.
最后一個激勵的例子,一個實時的聊天室.這個例子需要Flask(但不是必須的,你可以用Django, Pyramid等等).相應的Javascript 和 HTML 文件可以在這里找到.
?
自己註釋和解讀後的代碼在這裏:
https://github.com/appleyuchi/minichat
# Micro gevent chatroom. # ----------------------from flask import Flask, render_template, requestfrom gevent import queue from gevent.pywsgi import WSGIServerimport simplejson as jsonapp = Flask(__name__) app.debug = Truerooms = {'topic1': Room(),'topic2': Room(), }users = {}class Room(object):def __init__(self):self.users = set()self.messages = []def backlog(self, size=25):return self.messages[-size:]def subscribe(self, user):self.users.add(user)def add(self, message):for user in self.users:print useruser.queue.put_nowait(message)self.messages.append(message)class User(object):def __init__(self):self.queue = queue.Queue()@app.route('/') def choose_name():return render_template('choose.html')@app.route('/<uid>') def main(uid):return render_template('main.html',uid=uid,rooms=rooms.keys())@app.route('/<room>/<uid>') def join(room, uid):user = users.get(uid, None)if not user:users[uid] = user = User()active_room = rooms[room]active_room.subscribe(user)print 'subscribe', active_room, usermessages = active_room.backlog()return render_template('room.html',room=room, uid=uid, messages=messages)@app.route("/put/<room>/<uid>", methods=["POST"]) def put(room, uid):user = users[uid]room = rooms[room]message = request.form['message']room.add(':'.join([uid, message]))return ''@app.route("/poll/<uid>", methods=["POST"]) def poll(uid):try:msg = users[uid].queue.get(timeout=10)except queue.Empty:msg = []return json.dumps(msg)if __name__ == "__main__":http = WSGIServer(('', 5000), app)http.serve_forever()?
Reference:
[1]http://sdiehl.github.com/gevent-tutorial/
[2]https://www.cnblogs.com/bjdxy/archive/2012/11/27/2790854.html
[3]https://www.jianshu.com/p/b1934ff22b06
[4]https://blog.51cto.com/rfyiamcool/1538367
[5]https://blog.csdn.net/Croyance_M/article/details/100605415
[6]https://yq.aliyun.com/articles/402343
總結
以上是生活随笔為你收集整理的gevent-tutorial翻译和解读的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 《关于gevent的几点思考》阅读笔记
- 下一篇: ubuntu 18.10无法locate