Twisted——基于事件驱动的Python网络框架
對于追求服務器程序性能的應用有什么適用的Python框架嗎?那就是今天和大家分享的Twisted框架,它支持許多常見的傳輸及應用層協議,包括TCP、UDP、SSL/TLS、HTTP、FTP等,這也意味著能為客戶端和服務器端提供自定義開發工具。那為什么就說它能保證高效能通信呢?
Twisted在不同的操作系統平臺上利用了不同的底層技術:在Windows中,基于IO完成端口技術保證了底層高效地將I/O事件通知給框架及應用程序;在Linux中采用epoll技術,它能顯著提高在大量并發連接中只有少量活躍的情況下CPU利用率。Twisted框架采用Reactor設計模式,它的核心是Reactor的事件循環,監聽網絡、文件系統以及定時器等事件,并提供統一處理接口,使得事件能被快速響應。
在上一篇事件驅動中介紹過:對于不需要同步處理的多任務,我們可以使用事件驅動。那么在Twisted中使得程序設計可以采用事件驅動機制得益于Deferred(延遲)對象,它是一個管理回調函數的對象,我們可以向該對象添加需要回調的函數,同時可以指定該組回調函數何時被調用。
from twisted.internet import deferfrom twisted.python import failureimport sys?d = defer.Deferred() # 定義defer實例def printSquare(d): # 正常處理函數 print("Square of %d is %d" % (d, d * d))?def processError(f): # 錯誤處理函數 print("Error with process ")d.addCallback(printSquare) # 添加正常處理的回調函數d.addErrback(processError) # 添加錯誤處理回調函數# 開始調用deferif len(sys.argv) > 1 and sys.argv[1] == 'call_error': f = failure.Failure(Exception('my Exception')) d.errback(f) # 調用錯誤處理函數processErrorelse: d.callback(4) # 調用正常處理函數printSquare(4)代碼圍繞twisted.internet.defer.Deffered對象展開。
Defer中可以管理兩種回調函數:Deffered.addCallback()正常處理函數和Deffered.addErrback錯誤處理函數。兩種回調函數可以通過Deffered.callback()和Deffered.errback()進行調用。
另外可以給一個Deffer對象賦予多個正常或錯誤處理的回調函數,這樣在Defer對象內部形成正常處理函數鏈和錯誤處理函數鏈,示例代碼如下。
from twisted.internet import defer?d?=?defer.Deferred()?def printSquare(d): print("Square of %d is %d", (d, d * d)) return d?def processError(f): print("error with process")?def printTwice(d): print("Twice of %d is %d", (2 * d)) return d?d.addCallback(printSquare)d.addErrback(processError)d.addCallback(printTwice)?d.callback(5)# Square of %d is %d (5, 25)#?Twice?of?%d?is?%d?10Deffered的主要成員函數包括:
| addCallback(self, callback, *args, **kwargs) | 給Defer對象添加正常處理回調函數,需要至少有一個輸入參數 |
| addErrback(self,?errback, *args, **kwargs) | 給Defer對象添加錯誤處理回調函數,errback為錯誤處理函數名,需要至少有一個輸入參數 |
| addBoth(self, callback, *args, **kwargs) | 回調函數同時作為正常和錯誤處理回調函數添加到Defer對象中 |
| chainDeffered(self, d) | 將另一個Defer對象的正常和錯誤處理回調函數添加到本Defer對象中。本函數是單向的 |
| callback(self, result) | 調用正常處理函數鏈,result是傳遞給第一個正常處理回調函數的參數 |
| errback(self, fail=None) | 調用錯誤處理函數鏈,result是傳遞給第一個錯誤處理回調函數的參數。 |
| pause(self)和unpause(self) | pause(self)和unpause(self)?用來暫停和繼續調用鏈 |
Defer為什么要分別管理兩條回調函數調用鏈?因為調用鏈函數之間除了簡單的順序調用關系,還存在交叉調用關系,兩條為了對回調過程提供更好的可控性,調用流程圖如下:
其中實線為回調函數正常返回時的繼續調用路徑,虛線為處理函數中產生異常時的后續調用路徑。
我們再將Deffer對象和reactor的延時調用機制結合在一起,來實現異步調用函數。
makeDefer函數內定義了調用鏈執行的邏輯關系,其中 reactor.callLater(2, d.callback, 5)表示在reactor.run()運行后的2后,twisted框架才去調用callback對應的兩個函數(printSquare,printTwice)。
callLater()函數原型如下
def?callLater(delay,?callable,?*args,?**kw):??passdelay定義延時調用秒數,如果為0則是立即調用;callable為被調用的函數名及其參數。
通過reactor.callLater(4, reactor.stop)定義4秒后調用函數reactor.stop(),還可以實現定時退出Twisted消息循環。
下面我們通過一個實時通信的廣播系統模型介紹下用Twisted框架開發基于TCP的網絡應用的方法:
首先Twisted提供了基本的通信編程封裝,這里先介紹下Transports。它代表網絡中兩個通信結點之間的連接。Transports負責描述連接的細節,比如連接是面向流式的還是面向數據報的,流控以及可靠性,比如TCP、UDP和Unix套接字。對應方法如下:
| write | 以非阻塞的方式按順序依次將數據寫到物理連接上 |
| writeSequence | 將一個字符串列表寫到物理連接上 |
| loseConnection | 將所有掛起的數據寫入,然后關閉連接 |
| getPeer | 取得連接中對端的地址信息 |
| getHost | 取得連接中本端的地址信息 |
Protocols描述了如何以異步的方式處理網絡中的事件。HTTP、DNS以及IMAP是應用層協議中的例子。Protocols實現了IProtocol接口,它包含如下的方法:
| makeConnection | 在transport對象和服務器之間建立一條連接 |
| connectionMade | ?連接建立起來后調用 |
| dataReceived | 接收數據時調用 |
| connectionLost | 關閉連接時調用 |
廣播系統服務器
針對Twisted的Protocol、Factory等類進行編程,定義它們的子類并重寫connectionMade和dataReceived進行事件化處理。
from twisted.internet.protocol import Protocolfrom twisted.internet.protocol import Factoryfrom twisted.internet.endpoints import TCP4ServerEndpointfrom twisted.internet import reactor?clients = [] # 保存所有客戶端連接# Protocol的子類class Spreader(Protocol): def __init__(self, factory): self.factory = factory? def connectionMade(self): self.factory.numProtocols = self.factory.numProtocols + 1 # 對連接的客戶端進行計數 self.transport.write( (u"歡迎來到Spread Site,您是第%d個客戶端用戶!\n" % (self.factory.numProtocols,).encode('utf8'))) print(f"new connect {self.factory.numProtocols}") clients.append(self) # 將self保存到clients列表中? def connectionLost(self, reason): clients.remove() print(f"lost connect: {self.connect_id}")? def dataReceived(self, data): if data == "close": self.transport.loseConnection() print(f"{self.connect_id} closed") else: print(f"spreading message from {self.connect_id, self.data}") for client in clients: if client != self: # 將收到的數據通過Protocol.transport.write()函數分發給除自己以外的所有客戶端 client.transport.write(data)# Factory的子類class SpreadFactory(Factory): def __init__(self): self.numProtocols = 0 # 將客戶端計數器置0? def buildProtocol(self, addr):????????return?Spreader(self)??#?建立Protocol子類的實例?endpoint = TCP4ServerEndpoint(reactor, 8007) # 定義服務器監聽端口endpoint.listen(SpreadFactory()) # 指定子類實例reactor.run() # 掛起運行廣播客戶端
Twisted同樣提供了基于Protocol類的TCP客戶端編程方法。
from twisted.internet.protocol import Protocol, ClientFactoryfrom twisted.internet import reactorimport sysimport datetime?class Echo(Protocol): def connectionMade(self): print("connect to server!")? def dataReceived(self, data): print("got message: ", data.decode('utf8')) reactor.callLater(5, self.say_hello)? def connectionLost(self, reason): print("Disconnected from server!")? def say_hello(self): if self.transport.connected: self.transport.write( (u"hello, I'm %s %s" % (sys.argv[1], datetime.datetime.now())).encode('utf-8'))?class EchoClientFactory(ClientFactory): def __init__(self): self.protocol = None? def startedConnecting(self, connector): print("Start to connect.")? def buildProtocol(self, addr): self.protocol = Echo() return self.protocol? def clientConnectionLost(self, connector, reason): print("Lost Connection. Reason:", reason)? def clientConnectionFailed(self, connector, reason): print("Connection failed. Reason:", reason)?host = "127.0.0.1"port =8007factory = EchoClientFactory()reactor.connectTCP(host, port, factory)reactor.run()執行順序如下:
-
建立連接
ClientFactory.startedConnecting()
Protocol.connectionMade()
-
已連接
用Protocol.dataReceived()接受消息
用Protocol.transport.write()發送消息
-
連接斷開
Protocol.connectionLost()
ClientFactory.?clientConnectionLost()
即建立連接時先執行ClientFactory中回調,然后執行Protocol中回調,連接斷開時正好相反。?? ???
歡迎你來我的公眾號“才淺的每日python”和我一起討論,也歡迎你來催更扯淡。日拱一卒,下一篇文不見不散。
總結
以上是生活随笔為你收集整理的 Twisted——基于事件驱动的Python网络框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 中国电信IT研发中心 2019校园招聘
- 下一篇: Microsoft Office Vis