python io多路复用_Python之IO多路复用
一、IO模型介紹
? 同步(synchronous) IO和異步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分別是什么,到底有什么區(qū)別?這個(gè)問題其實(shí)不同的人給出的答案都可能不同,比如wiki,就認(rèn)為asynchronous IO和non-blocking IO是一個(gè)東西。這其實(shí)是因?yàn)椴煌娜说闹R背景不同,并且在討論這個(gè)問題的時(shí)候上下文(context)也不相同。所以,為了更好的回答這個(gè)問題,我先限定一下本文的上下文。
本文討論的背景是Linux環(huán)境下的network IO。本文最重要的參考文獻(xiàn)是Richard Stevens的“UNIX? Network Programming Volume 1, Third Edition: The Sockets Networking ”,6.2節(jié)“I/O Models ”,Stevens在這節(jié)中詳細(xì)說明了各種IO的特點(diǎn)和區(qū)別,如果英文夠好的話,推薦直接閱讀。Stevens的文風(fēng)是有名的深入淺出,所以不用擔(dān)心看不懂。本文中的流程圖也是截取自參考文獻(xiàn)。
Stevens在文章中一共比較了五種IO Model:
* blocking IO 阻塞IO
* nonblocking IO 非阻塞IO
* IO multiplexing IO多路復(fù)用
* signal driven IO 信號驅(qū)動IO(不常見,不講)
* asynchronous IO 異步IO
由signal driven IO(信號驅(qū)動IO)在實(shí)際中并不常用,所以主要介紹其余四種IO Model。
? 再說一下IO發(fā)生時(shí)涉及的對象和步驟。對于一個(gè)network IO (這里我們以read、recv舉例),它會涉及到兩個(gè)系統(tǒng)對象,一個(gè)是調(diào)用這個(gè)IO的process (or thread),另一個(gè)就是系統(tǒng)內(nèi)核(kernel)。當(dāng)一個(gè)read/recv讀數(shù)據(jù)的操作發(fā)生時(shí),該操作會經(jīng)歷兩個(gè)階段:
1)等待數(shù)據(jù)準(zhǔn)備 (Waiting for the data to be ready)
2)將數(shù)據(jù)從內(nèi)核拷貝到進(jìn)程中(Copying the data from the kernel to the process)
記住這兩點(diǎn)很重要,因?yàn)檫@些IO模型的區(qū)別就是在兩個(gè)階段上各有不同的情況。
補(bǔ)充:
#1、輸入操作:read、readv、recv、recvfrom、recvmsg共5個(gè)函數(shù),如果會阻塞狀態(tài),則會經(jīng)理wait data和copy data兩個(gè)階段,如果設(shè)置為非阻塞則在wait 不到data時(shí)拋出異常
#2、輸出操作:write、writev、send、sendto、sendmsg共5個(gè)函數(shù),在發(fā)送緩沖區(qū)滿了會阻塞在原地,如果設(shè)置為非阻塞,則會拋出異常
#3、接收外來鏈接:accept,與輸入操作類似
#4、發(fā)起外出鏈接:connect,與輸出操作類似
二、阻塞IO(Blocking IO)
? 在linux中,默認(rèn)情況下所有的socket都是blocking,一個(gè)典型的讀操作流程大概是這樣:(recvfrom和tcp里面的recv在這些IO模型里面是一樣的)。
上面的圖形分析:兩個(gè)階段的阻塞
? 當(dāng)用戶進(jìn)程調(diào)用了recvfrom這個(gè)系統(tǒng)調(diào)用,kernel就開始了IO的第一個(gè)階段:準(zhǔn)備數(shù)據(jù)。對于network io來說,很多時(shí)候數(shù)據(jù)在一開始還沒有到達(dá)(比如,還沒有收到一個(gè)完整的UDP包),這個(gè)時(shí)候kernel就要等待足夠的數(shù)據(jù)到來。
而在用戶進(jìn)程這邊,整個(gè)進(jìn)程會被阻塞。當(dāng)kernel一直等到數(shù)據(jù)準(zhǔn)備好了,它就會將數(shù)據(jù)從kernel中拷貝到用戶內(nèi)存,然后kernel返回結(jié)果,用戶進(jìn)程才解除block的狀態(tài),重新運(yùn)行起來。
? 所以,blocking IO的特點(diǎn)就是在IO執(zhí)行的兩個(gè)階段(等待數(shù)據(jù)和拷貝數(shù)據(jù)兩個(gè)階段)都被block了。
這里我們回顧一下同步/異步/阻塞/非阻塞:
同步:提交一個(gè)任務(wù)之后要等待這個(gè)任務(wù)執(zhí)行完畢
異步:只管提交任務(wù),不等待這個(gè)任務(wù)執(zhí)行完畢就可以去做其他的事情
阻塞:recv、recvfrom、accept,線程階段 運(yùn)行狀態(tài)-->阻塞狀態(tài)-->就緒
非阻塞:沒有阻塞狀態(tài)
在一個(gè)線程的IO模型中,我們r(jià)ecv的地方阻塞,我們就開啟多線程,但是不管你開啟多少個(gè)線程,這個(gè)recv的時(shí)間是不是沒有被規(guī)避掉,不管是多線程還是多進(jìn)程都沒有規(guī)避掉這個(gè)IO時(shí)間。
? 幾乎所有的程序員第一次接觸到的網(wǎng)絡(luò)編程都是從listen()、send()、recv() 等接口開始的,使用這些接口可以很方便的構(gòu)建服務(wù)器/客戶機(jī)的模型。然而大部分的socket接口都是阻塞型的。如下圖
ps:所謂阻塞型接口是指系統(tǒng)調(diào)用(一般是IO接口)不返回調(diào)用結(jié)果并讓當(dāng)前線程一直阻塞,只有當(dāng)該系統(tǒng)調(diào)用獲得結(jié)果或者超時(shí)出錯(cuò)時(shí)才返回。
實(shí)際上,除非特別指定,幾乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。這給網(wǎng)絡(luò)編程帶來了一個(gè)很大的問題,如在調(diào)用recv(1024)的同時(shí),線程將被阻塞,在此期間,線程將無法執(zhí)行任何運(yùn)算或響應(yīng)任何的網(wǎng)絡(luò)請求。
一個(gè)簡單的解決方案:
在服務(wù)器端使用多線程(或多進(jìn)程)。多線程(或多進(jìn)程)的目的是讓每個(gè)連接都擁有獨(dú)立的線程(或進(jìn)程),這樣任何一個(gè)連接的阻塞都不會影響其他的連接。
該方案的問題是:
開啟多進(jìn)程或都線程的方式,在遇到要同時(shí)響應(yīng)成百上千路的連接請求,則無論多線程還是多進(jìn)程都會嚴(yán)重占據(jù)系統(tǒng)資源,降低系統(tǒng)對外界響應(yīng)效率,而且線程與進(jìn)程本身也更容易進(jìn)入假死狀態(tài)。
改進(jìn)方案:
很多程序員可能會考慮使用“線程池”或“連接池”。“線程池”旨在減少創(chuàng)建和銷毀線程的頻率,其維持一定合理數(shù)量的線程,并讓空閑的線程重新承擔(dān)新的執(zhí)行任務(wù)。“連接池”維持連接的緩存池,盡量重用已有的連接、減少創(chuàng)建和關(guān)閉連接的頻率。這兩種技術(shù)都可以很好的降低系統(tǒng)開銷,都被廣泛應(yīng)用很多大型系統(tǒng),如websphere、tomcat和各種數(shù)據(jù)庫等。
改進(jìn)后方案其實(shí)也存在著問題:
“線程池”和“連接池”技術(shù)也只是在一定程度上緩解了頻繁調(diào)用IO接口帶來的資源占用。而且,所謂“池”始終有其上限,當(dāng)請求大大超過上限時(shí),“池”構(gòu)成的系統(tǒng)對外界的響應(yīng)并不比沒有池的時(shí)候效果好多少。所以使用“池”必須考慮其面臨的響應(yīng)規(guī)模,并根據(jù)響應(yīng)規(guī)模調(diào)整“池”的大小。
對應(yīng)上例中的所面臨的可能同時(shí)出現(xiàn)的上千甚至上萬次的客戶端請求,“線程池”或“連接池”或許可以緩解部分壓力,但是不能解決所有問題。總之,多線程模型可以方便高效的解決小規(guī)模的服務(wù)請求,但面對大規(guī)模的服務(wù)請求,多線程模型也會遇到瓶頸,可以用非阻塞接口來嘗試解決這個(gè)問題。
三、非阻塞IO(non-Blocking IO)
Linux下,可以通過設(shè)置socket使其變?yōu)閚on-blocking。當(dāng)對一個(gè)non-blocking socket執(zhí)行讀操作時(shí),流程是這個(gè)樣子:
? 從圖中可以看出,當(dāng)用戶進(jìn)程發(fā)出read操作時(shí),如果kernel中的數(shù)據(jù)還沒有準(zhǔn)備好,那么它并不會block用戶進(jìn)程,而是立刻返回一個(gè)error。從用戶進(jìn)程角度講 ,它發(fā)起一個(gè)read操作后,并不需要等待,而是馬上就得到了一個(gè)結(jié)果。用戶進(jìn)程判斷結(jié)果是一個(gè)error時(shí),它就知道數(shù)據(jù)還沒有準(zhǔn)備好,于是用戶就可以在本次到下次再發(fā)起read詢問的時(shí)間間隔內(nèi)做其他事情,或者直接再次發(fā)送read操作。一旦kernel中的數(shù)據(jù)準(zhǔn)備好了,并且又再次收到了用戶進(jìn)程的system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存(這一階段仍然是阻塞的),然后返回。
也就是說非阻塞的recvform系統(tǒng)調(diào)用調(diào)用之后,進(jìn)程并沒有被阻塞,內(nèi)核馬上返回給進(jìn)程,如果數(shù)據(jù)還沒準(zhǔn)備好,此時(shí)會返回一個(gè)error。進(jìn)程在返回之后,可以干點(diǎn)別的事情,然后再發(fā)起recvform系統(tǒng)調(diào)用。重復(fù)上面的過程,循環(huán)往復(fù)的進(jìn)行recvform系統(tǒng)調(diào)用。這個(gè)過程通常被稱之為輪詢。輪詢檢查內(nèi)核數(shù)據(jù),直到數(shù)據(jù)準(zhǔn)備好,再拷貝數(shù)據(jù)到進(jìn)程,進(jìn)行數(shù)據(jù)處理。需要注意,拷貝數(shù)據(jù)整個(gè)過程,進(jìn)程仍然是屬于阻塞的狀態(tài)。
所以,在非阻塞式IO中,用戶進(jìn)程其實(shí)是需要不斷的主動詢問kernel數(shù)據(jù)準(zhǔn)備好了沒有。
非阻塞IO示例:
服務(wù)端
# 服務(wù)端
import socket
import time
server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)
server.setblocking(False) #設(shè)置不阻塞
r_list=[] #用來存儲所有來請求server端的conn連接
w_list={} #用來存儲所有已經(jīng)有了請求數(shù)據(jù)的conn的請求數(shù)據(jù)
while 1:
try:
conn,addr=server.accept() #不阻塞,會報(bào)錯(cuò)
r_list.append(conn) #為了將連接保存起來,不然下次循環(huán)的時(shí)候,上一次的連接就沒有了
except BlockingIOError:
# 強(qiáng)調(diào)強(qiáng)調(diào)強(qiáng)調(diào):!!!非阻塞IO的精髓在于完全沒有阻塞!!!
# time.sleep(0.5) # 打開該行注釋純屬為了方便查看效果
print('在做其他的事情')
print('rlist: ',len(r_list))
print('wlist: ',len(w_list))
# 遍歷讀列表,依次取出套接字讀取內(nèi)容
del_rlist=[] #用來存儲刪除的conn連接
for conn in r_list:
try:
data=conn.recv(1024) #不阻塞,會報(bào)錯(cuò)
if not data: #當(dāng)一個(gè)客戶端暴力關(guān)閉的時(shí)候,會一直接收b'',別忘了判斷一下數(shù)據(jù)
conn.close()
del_rlist.append(conn)
continue
w_list[conn]=data.upper()
except BlockingIOError: # 沒有收成功,則繼續(xù)檢索下一個(gè)套接字的接收
continue
except ConnectionResetError: # 當(dāng)前套接字出異常,則關(guān)閉,然后加入刪除列表,等待被清除
conn.close()
del_rlist.append(conn)
# 遍歷寫列表,依次取出套接字發(fā)送內(nèi)容
del_wlist=[]
for conn,data in w_list.items():
try:
conn.send(data)
del_wlist.append(conn)
except BlockingIOError:
continue
# 清理無用的套接字,無需再監(jiān)聽它們的IO操作
for conn in del_rlist:
r_list.remove(conn)
#del_rlist.clear() #清空列表中保存的已經(jīng)刪除的內(nèi)容
for conn in del_wlist:
w_list.pop(conn)
#del_wlist.clear()
客戶端
#客戶端
import socket
import os
import time
import threading
client=socket.socket()
client.connect(('127.0.0.1',8083))
while 1:
res=('%s hello' %os.getpid()).encode('utf-8')
client.send(res)
data=client.recv(1024)
print(data.decode('utf-8'))
##多線程的客戶端請求版本
# def func():
# sk = socket.socket()
# sk.connect(('127.0.0.1',9000))
# sk.send(b'hello')
# time.sleep(1)
# print(sk.recv(1024))
# sk.close()
#
# for i in range(20):
# threading.Thread(target=func).start()
雖然我們上面的代碼通過設(shè)置非阻塞,規(guī)避了IO操作,但是非阻塞IO模型絕不被推薦。
我們不能否定其優(yōu)點(diǎn):能夠在等待任務(wù)完成的時(shí)間里干其他活了(包括提交其他任務(wù),也就是 “后臺” 可以有多個(gè)任務(wù)在“”同時(shí)“”執(zhí)行)。
但是也難掩其缺點(diǎn):
#1. 循環(huán)調(diào)用recv()將大幅度推高CPU占用率;這也是我們在代碼中留一句time.sleep(2)的原因,否則在低配主機(jī)下極容易出現(xiàn)卡機(jī)情況
#2. 任務(wù)完成的響應(yīng)延遲增大了,因?yàn)槊窟^一段時(shí)間才去輪詢一次read操作,而任務(wù)可能在兩次輪詢之間的任意時(shí)間完成。這會導(dǎo)致整體數(shù)據(jù)吞吐量的降低。
此外,在這個(gè)方案中recv()更多的是起到檢測“操作是否完成”的作用,實(shí)際操作系統(tǒng)提供了更為高效的檢測“操作是否完成“作用的接口,例如select()多路復(fù)用模式,可以一次檢測多個(gè)連接是否活躍。
四、多路復(fù)用IO(IO multiplexing)(重點(diǎn))
? IO multiplexing這個(gè)詞可能有點(diǎn)陌生,但是如果我說select/epoll,大概就都能明白了。有些地方也稱這種IO方式為事件驅(qū)動IO(event driven IO)。我們都知道,select/epoll的好處就在于單個(gè)process就可以同時(shí)處理多個(gè)網(wǎng)絡(luò)連接的IO。它的基本原理就是select/epoll這個(gè)function會不斷的輪詢所負(fù)責(zé)的所有socket,當(dāng)某個(gè)socket有數(shù)據(jù)到達(dá)了,就通知用戶進(jìn)程。它的流程如圖:
先看解釋圖,里面的select就像個(gè)代理。
? 用戶進(jìn)程調(diào)用了select,那么整個(gè)進(jìn)程會被block,而同時(shí),kernel會“監(jiān)視”所有select負(fù)責(zé)的socket,當(dāng)任何一個(gè)socket中的數(shù)據(jù)準(zhǔn)備好了,select就會返回。這個(gè)時(shí)候用戶進(jìn)程再調(diào)用read操作,將數(shù)據(jù)從kernel拷貝到用戶進(jìn)程。
這個(gè)圖和blocking IO的圖其實(shí)并沒有太大的不同,事實(shí)上還更差一些。因?yàn)樗粌H阻塞了還多需要使用兩個(gè)系統(tǒng)調(diào)用(select和recvfrom),而blocking IO只調(diào)用了一個(gè)系統(tǒng)調(diào)用(recvfrom),當(dāng)只有一個(gè)連接請求的時(shí)候,這個(gè)模型還不如阻塞IO效率高。但是,用select的優(yōu)勢在于它可以同時(shí)處理多個(gè)connection,而阻塞IO那里不能,我不管阻塞不阻塞,你所有的連接包括recv等操作,我都幫你監(jiān)聽著(以什么形式監(jiān)聽的呢?先不要考慮,下面會說),其中任何一個(gè)有變動(有鏈接,有數(shù)據(jù)),我就告訴你用戶,那么你就可以去調(diào)用這個(gè)數(shù)據(jù)了,這就是他的NB之處。這個(gè)IO多路復(fù)用模型機(jī)制是操作系統(tǒng)幫我們提供的,在windows上有這么個(gè)機(jī)制叫做select,那么如果我們想通過自己寫代碼來控制這個(gè)機(jī)制或者自己寫這么個(gè)機(jī)制,我們可以使用python中的select模塊來完成上面這一系列代理的行為。在一切皆文件的unix下,這些可以接收數(shù)據(jù)的對象或者連接,都叫做文件描述符fd。
強(qiáng)調(diào):
1. 如果處理的連接數(shù)不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優(yōu)勢并不是對于單個(gè)連接能處理得更快,而是在于能處理更多的連接。
2. 在多路復(fù)用模型中,對于每一個(gè)socket,一般都設(shè)置成為non-blocking,但是,如上圖所示,整個(gè)用戶的process其實(shí)是一直被block的。只不過process是被select這個(gè)函數(shù)block,而不是被socket IO給block。
1.Python中的select模塊
import select
fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])
參數(shù):可接受四個(gè)參數(shù)(前三個(gè)必須)
rlist: wait until ready for reading #等待讀的對象,你需要監(jiān)聽的需要獲取數(shù)據(jù)的對象列表
wlist: wait until ready for writing #等待寫的對象,你需要寫一些內(nèi)容的時(shí)候,input等等,也就是說我會循環(huán)他看看是否有需要發(fā)送的消息,如果有我取出這個(gè)對象的消息并發(fā)送出去,一般用不到,這里我們也給一個(gè)[]。
xlist: wait for an “exceptional condition” #等待異常的對象,一些額外的情況,一般用不到,但是必須傳,那么我們就給他一個(gè)[]。
timeout: 超時(shí)時(shí)間
當(dāng)超時(shí)時(shí)間 = n(正整數(shù))時(shí),那么如果監(jiān)聽的句柄均無任何變化,則select會阻塞n秒,之后返回三個(gè)空列表,如果監(jiān)聽的句柄有變化,則直接執(zhí)行。
返回值:三個(gè)列表與上面的三個(gè)參數(shù)列表是對應(yīng)的
? select方法用來監(jiān)視文件描述符(當(dāng)文件描述符條件不滿足時(shí),select會阻塞),當(dāng)某個(gè)文件描述符狀態(tài)改變后,會返回三個(gè)列表
1、當(dāng)參數(shù)1 序列中的fd滿足“可讀”條件時(shí),則獲取發(fā)生變化的fd并添加到fd_r_list中
2、當(dāng)參數(shù)2 序列中含有fd時(shí),則將該序列中所有的fd添加到 fd_w_list中
3、當(dāng)參數(shù)3 序列中的fd發(fā)生錯(cuò)誤時(shí),則將該發(fā)生錯(cuò)誤的fd添加到 fd_e_list中
4、當(dāng)超時(shí)時(shí)間為空,則select會一直阻塞,直到監(jiān)聽的句柄發(fā)生變化
結(jié)論: select的優(yōu)勢在于可以處理多個(gè)連接,不適用于單個(gè)連接
2.select網(wǎng)絡(luò)IO示例
服務(wù)端:
#服務(wù)端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
# 設(shè)置為非阻塞
server.setblocking(False)
# 初始化將服務(wù)端socket對象加入監(jiān)聽列表,后面還要?jiǎng)討B(tài)添加一些conn連接對象,當(dāng)accept的時(shí)候sk就有感應(yīng),當(dāng)recv的時(shí)候conn就有動靜
rlist=[server,]
rdata = {} #存放客戶端發(fā)送過來的消息
wlist=[] #等待寫對象
wdata={} #存放要返回給客戶端的消息
print('預(yù)備!監(jiān)聽!!!')
count = 0 #寫著計(jì)數(shù)用的,為了看實(shí)驗(yàn)效果用的,沒用
while True:
# 開始 select 監(jiān)聽,對rlist中的服務(wù)端server進(jìn)行監(jiān)聽,select函數(shù)阻塞進(jìn)程,直到rlist中的套接字被觸發(fā)(在此例中,套接字接收到客戶端發(fā)來的握手信號,從而變得可讀,滿足select函數(shù)的“可讀”條件),被觸發(fā)的(有動靜的)套接字(服務(wù)器套接字)返回給了rl這個(gè)返回值里面;
rl,wl,xl=select.select(rlist,wlist,[],0.5)
print('%s 次數(shù)>>'%(count),wl)
count = count + 1
# 對rl進(jìn)行循環(huán)判斷是否有客戶端連接進(jìn)來,當(dāng)有客戶端連接進(jìn)來時(shí)select將觸發(fā)
for sock in rl:
# 判斷當(dāng)前觸發(fā)的是不是socket對象, 當(dāng)觸發(fā)的對象是socket對象時(shí),說明有新客戶端accept連接進(jìn)來了
if sock == server:
# 接收客戶端的連接, 獲取客戶端對象和客戶端地址信息
conn,addr=sock.accept()
#把新的客戶端連接加入到監(jiān)聽列表中,當(dāng)客戶端的連接有接收消息的時(shí)候,select將被觸發(fā),會知道這個(gè)連接有動靜,有消息,那么返回給rl這個(gè)返回值列表里面。
rlist.append(conn)
else:
# 由于客戶端連接進(jìn)來時(shí)socket接收客戶端連接請求,將客戶端連接加入到了監(jiān)聽列表中(rlist),客戶端發(fā)送消息的時(shí)候這個(gè)連接將觸發(fā)
# 所以判斷是否是客戶端連接對象觸發(fā)
try:
data=sock.recv(1024)
#沒有數(shù)據(jù)的時(shí)候,我們將這個(gè)連接關(guān)閉掉,并從監(jiān)聽列表中移除
if not data:
sock.close()
rlist.remove(sock)
continue
print("received {0} from client {1}".format(data.decode(), sock))
#將接受到的客戶端的消息保存下來
rdata[sock] = data.decode()
#將客戶端連接對象和這個(gè)對象接收到的消息加工成返回消息,并添加到wdata這個(gè)字典里面
wdata[sock]=data.upper()
#需要給這個(gè)客戶端回復(fù)消息的時(shí)候,我們將這個(gè)連接添加到wlist寫監(jiān)聽列表中
wlist.append(sock)
#如果這個(gè)連接出錯(cuò)了,客戶端暴力斷開了(注意,我還沒有接收他的消息,或者接收他的消息的過程中出錯(cuò)了)
except Exception:
#關(guān)閉這個(gè)連接
sock.close()
#在監(jiān)聽列表中將他移除,因?yàn)椴还苁裁丛?#xff0c;它畢竟是斷開了,沒必要再監(jiān)聽它了
rlist.remove(sock)
# 如果現(xiàn)在沒有客戶端請求連接,也沒有客戶端發(fā)送消息時(shí),開始對發(fā)送消息列表進(jìn)行處理,是否需要發(fā)送消息
for sock in wl:
sock.send(wdata[sock])
wlist.remove(sock)
wdata.pop(sock)
# #將一次select監(jiān)聽列表中有接收數(shù)據(jù)的conn對象所接收到的消息打印一下
# for k,v in rdata.items():
# print(k,'發(fā)來的消息是:',v)
# #清空接收到的消息
# rdata.clear()
客戶端:
#客戶端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8093))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
data=client.recv(1024)
print(data.decode('utf-8'))
client.close()
select監(jiān)聽fd變化的過程分析:
#用戶進(jìn)程創(chuàng)建socket對象,拷貝監(jiān)聽的fd到內(nèi)核空間,每一個(gè)fd會對應(yīng)一張系統(tǒng)文件表,內(nèi)核空間的fd響應(yīng)到數(shù)據(jù)后,就會發(fā)送信號給用戶進(jìn)程數(shù)據(jù)已到;
#用戶進(jìn)程再發(fā)送系統(tǒng)調(diào)用,比如(accept)將內(nèi)核空間的數(shù)據(jù)copy到用戶空間,同時(shí)作為接受數(shù)據(jù)端內(nèi)核空間的數(shù)據(jù)清除,這樣重新監(jiān)聽時(shí)fd再有新的數(shù)據(jù)又可以響應(yīng)到了(發(fā)送端因?yàn)榛赥CP協(xié)議所以需要收到應(yīng)答后才會清除)。
該模型的優(yōu)點(diǎn):
#相比其他模型,使用select() 的事件驅(qū)動模型只用單線程(進(jìn)程)執(zhí)行,占用資源少,不消耗太多 CPU,同時(shí)能夠?yàn)槎嗫蛻舳颂峁┓?wù)。如果試圖建立一個(gè)簡單的事件驅(qū)動的服務(wù)器程序,這個(gè)模型有一定的參考價(jià)值。
該模型的缺點(diǎn):
#首先select()接口并不是實(shí)現(xiàn)“事件驅(qū)動”的最好選擇。因?yàn)楫?dāng)需要探測的句柄值較大時(shí),select()接口本身需要消耗大量時(shí)間去輪詢各個(gè)句柄。很多操作系統(tǒng)提供了更為高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要實(shí)現(xiàn)更高效的服務(wù)器程序,類似epoll這樣的接口更被推薦。遺憾的是不同的操作系統(tǒng)特供的epoll接口有很大差異,所以使用類似于epoll的接口實(shí)現(xiàn)具有較好跨平臺能力的服務(wù)器會比較困難。
#其次,該模型將事件探測和事件響應(yīng)夾雜在一起,一旦事件響應(yīng)的執(zhí)行體龐大,則對整個(gè)模型是災(zāi)難性的。
select做得事情和第二階段的阻塞沒有關(guān)系,就是從內(nèi)核態(tài)將數(shù)據(jù)拷貝到用戶態(tài)的阻塞,始終幫你做得監(jiān)聽的工作,幫你節(jié)省了一些第一階段阻塞的時(shí)間。
IO多路復(fù)用的機(jī)制:
select機(jī)制: Windows、Linux
poll機(jī)制 : Linux #和lselect監(jiān)聽機(jī)制一樣,但是對監(jiān)聽列表里面的數(shù)量沒有限制,select默認(rèn)限制是1024個(gè),但是他們兩個(gè)都是操作系統(tǒng)輪詢每一個(gè)被監(jiān)聽的文件描述符(如果數(shù)量很大,其實(shí)效率不太好),看是否有可讀操作。
epoll機(jī)制 : Linux #它的監(jiān)聽機(jī)制和上面兩個(gè)不同,他給每一個(gè)監(jiān)聽的對象綁定了一個(gè)回調(diào)函數(shù),你這個(gè)對象有消息,那么觸發(fā)回調(diào)函數(shù)給用戶,用戶就進(jìn)行系統(tǒng)調(diào)用來拷貝數(shù)據(jù),并不是輪詢監(jiān)聽所有的被監(jiān)聽對象,這樣的效率高很多。
五、異步IO(Asynchronous IO)
Linux下的asynchronous IO其實(shí)用得不多,從內(nèi)核2.6版本才開始引入。先看一下它的流程:
用戶進(jìn)程發(fā)起read操作之后,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當(dāng)它受到一個(gè)asynchronous read之后,首先它會立刻返回,所以不會對用戶進(jìn)程產(chǎn)生任何block。然后,kernel操作系統(tǒng)會等待數(shù)據(jù)(阻塞)準(zhǔn)備完成,然后將數(shù)據(jù)拷貝到用戶內(nèi)存,當(dāng)這一切都完成之后,kernel會給用戶進(jìn)程發(fā)送一個(gè)signal,告訴它read操作完成了。
貌似異步IO這個(gè)模型很牛~~但是你發(fā)現(xiàn)沒有,這不是我們自己代碼控制的,都是操作系統(tǒng)完成的,而python在copy數(shù)據(jù)這個(gè)階段沒有提供操縱操作系統(tǒng)的接口,所以用python沒法實(shí)現(xiàn)這套異步IO機(jī)制,其他幾個(gè)IO模型都沒有解決第二階段的阻塞(用戶態(tài)和內(nèi)核態(tài)之間copy數(shù)據(jù)),但是C語言是可以實(shí)現(xiàn)的,因?yàn)榇蠹叶贾繡語言是最接近底層的,雖然我們用python實(shí)現(xiàn)不了,但是python仍然有異步的模塊和框架(tornado、twstied,高并發(fā)需求的時(shí)候用),這些模塊和框架很多都是用底層的C語言實(shí)現(xiàn)的,它幫我們實(shí)現(xiàn)了異步,你只要使用就可以了,但是你要知道這個(gè)異步是不是很好呀,不需要你自己等待了,操作系統(tǒng)幫你做了所有的事情,你就直接收數(shù)據(jù)就行了,就像你有一張銀行卡,銀行定期給你打錢一樣。
六、IO模型比較分析
? 到目前為止,已經(jīng)將四個(gè)IO Model都介紹完了。現(xiàn)在回過頭來回答最初的那幾個(gè)問題:blocking和non-blocking的區(qū)別在哪,synchronous IO和asynchronous IO的區(qū)別在哪。
先回答最簡單的這個(gè):blocking vs non-blocking。前面的介紹中其實(shí)已經(jīng)很明確的說明了這兩者的區(qū)別。調(diào)用blocking IO會一直block住對應(yīng)的進(jìn)程直到操作完成,而non-blocking IO在kernel還準(zhǔn)備數(shù)據(jù)的情況下會立刻返回。
? 再說明synchronous IO和asynchronous IO的區(qū)別之前,需要先給出兩者的定義。Stevens給出的定義(其實(shí)是POSIX的定義)是這樣子的:
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
兩者的區(qū)別就在于synchronous IO做”IO operation”的時(shí)候會將process阻塞。按照這個(gè)定義,四個(gè)IO模型可以分為兩大類,之前所述的blocking IO,non-blocking IO,IO multiplexing都屬于synchronous IO這一類,而 asynchronous I/O后一類 。
有人可能會說,non-blocking IO并沒有被block啊。這里有個(gè)非常“狡猾”的地方,定義中所指的”IO operation”是指真實(shí)的IO操作,就是例子中的recvfrom這個(gè)system call。non-blocking IO在執(zhí)行recvfrom這個(gè)system call的時(shí)候,如果kernel的數(shù)據(jù)沒有準(zhǔn)備好,這時(shí)候不會block進(jìn)程。但是,當(dāng)kernel中數(shù)據(jù)準(zhǔn)備好的時(shí)候,recvfrom會將數(shù)據(jù)從kernel拷貝到用戶內(nèi)存中,這個(gè)時(shí)候進(jìn)程是被block了,在這段時(shí)間內(nèi),進(jìn)程是被block的。而asynchronous IO則不一樣,當(dāng)進(jìn)程發(fā)起IO 操作之后,就直接返回再也不理睬了,直到kernel發(fā)送一個(gè)信號,告訴進(jìn)程說IO完成。在這整個(gè)過程中,進(jìn)程完全沒有被block。
各個(gè)IO Model的比較如圖所示:
? 經(jīng)過上面的介紹,會發(fā)現(xiàn)non-blocking IO和asynchronous IO的區(qū)別還是很明顯的。在non-blocking IO中,雖然進(jìn)程大部分時(shí)間都不會被block,但是它仍然要求進(jìn)程去主動的check,并且當(dāng)數(shù)據(jù)準(zhǔn)備完成以后,也需要進(jìn)程主動的再次調(diào)用recvfrom來將數(shù)據(jù)拷貝到用戶內(nèi)存。而asynchronous IO則完全不同。它就像是用戶進(jìn)程將整個(gè)IO操作交給了他人(kernel)完成,然后他人做完后發(fā)信號通知。在此期間,用戶進(jìn)程不需要去檢查IO操作的狀態(tài),也不需要主動的去拷貝數(shù)據(jù)。
七、selectors模塊
IO復(fù)用:為了解釋這個(gè)名詞,首先來理解下復(fù)用這個(gè)概念,復(fù)用也就是共用的意思,這樣理解還是有些抽象,為此,咱們來理解下復(fù)用在通信領(lǐng)域的使用,在通信領(lǐng)域中為了充分利用網(wǎng)絡(luò)連接的物理介質(zhì),往往在同一條網(wǎng)絡(luò)鏈路上采用時(shí)分復(fù)用或頻分復(fù)用的技術(shù)使其在同一鏈路上傳輸多路信號,到這里我們就基本上理解了復(fù)用的含義,即公用某個(gè)“介質(zhì)”來盡可能多的做同一類(性質(zhì))的事,那IO復(fù)用的“介質(zhì)”是什么呢?為此我們首先來看看服務(wù)器編程的模型,客戶端發(fā)來的請求服務(wù)端會產(chǎn)生一個(gè)進(jìn)程來對其進(jìn)行服務(wù),每當(dāng)來一個(gè)客戶請求就產(chǎn)生一個(gè)進(jìn)程來服務(wù),然而進(jìn)程不可能無限制的產(chǎn)生,因此為了解決大量客戶端訪問的問題,引入了IO復(fù)用技術(shù),即:一個(gè)進(jìn)程可以同時(shí)對多個(gè)客戶請求進(jìn)行服務(wù)。也就是說IO復(fù)用的“介質(zhì)”是進(jìn)程(準(zhǔn)確的說復(fù)用的是select和poll,因?yàn)檫M(jìn)程也是靠調(diào)用select和poll來實(shí)現(xiàn)的),復(fù)用一個(gè)進(jìn)程(select和poll)來對多個(gè)IO進(jìn)行服務(wù),雖然客戶端發(fā)來的IO是并發(fā)的但是IO所需的讀寫數(shù)據(jù)多數(shù)情況下是沒有準(zhǔn)備好的,因此就可以利用一個(gè)函數(shù)(select和poll)來監(jiān)聽IO所需的這些數(shù)據(jù)的狀態(tài),一旦IO有數(shù)據(jù)可以進(jìn)行讀寫了,進(jìn)程就來對這樣的IO進(jìn)行服務(wù)。
理解完IO復(fù)用后,我們在來看下實(shí)現(xiàn)IO復(fù)用中的三個(gè)API(select、poll和epoll)的區(qū)別和聯(lián)系:
select,poll,epoll都是IO多路復(fù)用的機(jī)制,I/O多路復(fù)用就是通過一種機(jī)制,可以監(jiān)視多個(gè)描述符,一旦某個(gè)描述符就緒(一般是讀就緒或者寫就緒),能夠通知應(yīng)用程序進(jìn)行相應(yīng)的讀寫操作。但select,poll,epoll本質(zhì)上都是同步I/O,因?yàn)樗麄兌夹枰谧x寫事件就緒后自己負(fù)責(zé)進(jìn)行讀寫,也就是說這個(gè)讀寫過程是阻塞的,而異步I/O則無需自己負(fù)責(zé)進(jìn)行讀寫,異步I/O的實(shí)現(xiàn)會負(fù)責(zé)把數(shù)據(jù)從內(nèi)核拷貝到用戶空間。
1.select
select的原型:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
select的第一個(gè)參數(shù)nfds為fdset集合中最大描述符值加1,fdset是一個(gè)位數(shù)組,其大小限制為__FD_SETSIZE(1024),位數(shù)組的每一位代表其對應(yīng)的描述符是否需要被檢查。第二三四參數(shù)表示需要關(guān)注讀、寫、錯(cuò)誤事件的文件描述符位數(shù)組,這些參數(shù)既是輸入?yún)?shù)也是輸出參數(shù),可能會被內(nèi)核修改用于標(biāo)示哪些描述符上發(fā)生了關(guān)注的事件,所以每次調(diào)用select前都需要重新初始化fdset。timeout參數(shù)為超時(shí)時(shí)間,該結(jié)構(gòu)會被內(nèi)核修改,其值為超時(shí)剩余的時(shí)間。
select的調(diào)用步驟:
(1)使用copy_from_user從用戶空間拷貝fdset到內(nèi)核空間
(2)注冊回調(diào)函數(shù)__pollwait
(3)遍歷所有fd,調(diào)用其對應(yīng)的poll方法(對于socket,這個(gè)poll方法是sock_poll,sock_poll根據(jù)情況會調(diào)用到tcp_poll,udp_poll或者datagram_poll)
(4)以tcp_poll為例,其核心實(shí)現(xiàn)就是__pollwait,也就是上面注冊的回調(diào)函數(shù)。
(5)__pollwait的主要工作就是把current(當(dāng)前進(jìn)程)掛到設(shè)備的等待隊(duì)列中,不同的設(shè)備有不同的等待隊(duì)列,對于tcp_poll 來說,其等待隊(duì)列是sk->sk_sleep(注意把進(jìn)程掛到等待隊(duì)列中并不代表進(jìn)程已經(jīng)睡眠了)。在設(shè)備收到一條消息(網(wǎng)絡(luò)設(shè)備)或填寫完文件數(shù) 據(jù)(磁盤設(shè)備)后,會喚醒設(shè)備等待隊(duì)列上睡眠的進(jìn)程,這時(shí)current便被喚醒了。
(6)poll方法返回時(shí)會返回一個(gè)描述讀寫操作是否就緒的mask掩碼,根據(jù)這個(gè)mask掩碼給fd_set賦值。
(7)如果遍歷完所有的fd,還沒有返回一個(gè)可讀寫的mask掩碼,則會調(diào)用schedule_timeout是調(diào)用select的進(jìn)程(也就是 current)進(jìn)入睡眠。當(dāng)設(shè)備驅(qū)動發(fā)生自身資源可讀寫后,會喚醒其等待隊(duì)列上睡眠的進(jìn)程。如果超過一定的超時(shí)時(shí)間(schedule_timeout 指定),還是沒人喚醒,則調(diào)用select的進(jìn)程會重新被喚醒獲得CPU,進(jìn)而重新遍歷fd,判斷有沒有就緒的fd。
(8)把fd_set從內(nèi)核空間拷貝到用戶空間。
select的幾個(gè)缺點(diǎn):
(1)每次調(diào)用select,都需要把fd集合從用戶態(tài)拷貝到內(nèi)核態(tài),這個(gè)開銷在fd很多時(shí)會很大
(2)同時(shí)每次調(diào)用select都需要在內(nèi)核遍歷傳遞進(jìn)來的所有fd,這個(gè)開銷在fd很多時(shí)也很大
(3)select支持的文件描述符數(shù)量太小了,默認(rèn)是1024
2.poll
poll的原型:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
? poll與select不同,通過一個(gè)pollfd數(shù)組向內(nèi)核傳遞需要關(guān)注的事件,故沒有描述符個(gè)數(shù)的限制,pollfd中的events字段和revents分別用于標(biāo)示關(guān)注的事件和發(fā)生的事件,故pollfd數(shù)組只需要被初始化一次。
? poll的實(shí)現(xiàn)機(jī)制與select類似,其對應(yīng)內(nèi)核中的sys_poll,只不過poll向內(nèi)核傳遞pollfd數(shù)組,然后對pollfd中的每個(gè)描述符進(jìn)行poll,相比處理fdset來說,poll效率更高。poll返回后,需要對pollfd中的每個(gè)元素檢查其revents值,來得指事件是否發(fā)生。
3.epoll
? 直到Linux2.6才出現(xiàn)了由內(nèi)核直接支持的實(shí)現(xiàn)方法,那就是epoll,被公認(rèn)為Linux2.6下性能最好的多路I/O就緒通知方法。epoll可以同時(shí)支持水平觸發(fā)和邊緣觸發(fā)(Edge Triggered,只告訴進(jìn)程哪些文件描述符剛剛變?yōu)榫途w狀態(tài),它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發(fā)),理論上邊緣觸發(fā)的性能要更高一些,但是代碼實(shí)現(xiàn)相當(dāng)復(fù)雜。epoll同樣只告知那些就緒的文件描述符,而且當(dāng)我們調(diào)用epoll_wait()獲得就緒文件描述符時(shí),返回的不是實(shí)際的描述符,而是一個(gè)代表就緒描述符數(shù)量的值,你只需要去epoll指定的一個(gè)數(shù)組中依次取得相應(yīng)數(shù)量的文件描述符即可,這里也使用了內(nèi)存映射(mmap)技術(shù),這樣便徹底省掉了這些文件描述符在系統(tǒng)調(diào)用時(shí)復(fù)制的開銷。另一個(gè)本質(zhì)的改進(jìn)在于epoll采用基于事件的就緒通知方式。在select/poll中,進(jìn)程只有在調(diào)用一定的方法后,內(nèi)核才對所有監(jiān)視的文件描述符進(jìn)行掃描,而epoll事先通過epoll_ctl()來注冊一個(gè)文件描述符,一旦基于某個(gè)文件描述符就緒時(shí),內(nèi)核會采用類似callback的回調(diào)機(jī)制,迅速激活這個(gè)文件描述符,當(dāng)進(jìn)程調(diào)用epoll_wait()時(shí)便得到通知。
epoll的原型:
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
? epoll既然是對select和poll的改進(jìn),就應(yīng)該能避免上述的三個(gè)缺點(diǎn)。那epoll都是怎么解決的呢?在此之前,我們先看一下epoll 和select和poll的調(diào)用接口上的不同,select和poll都只提供了一個(gè)函數(shù)——select或者poll函數(shù)。而epoll提供了三個(gè)函 數(shù),epoll_create,epoll_ctl和epoll_wait,epoll_create是創(chuàng)建一個(gè)epoll句柄;epoll_ctl是注 冊要監(jiān)聽的事件類型;epoll_wait則是等待事件的產(chǎn)生。
對于第一個(gè)缺點(diǎn),epoll的解決方案在epoll_ctl函數(shù)中。每次注冊新的事件到epoll句柄中時(shí)(在epoll_ctl中指定 EPOLL_CTL_ADD),會把所有的fd拷貝進(jìn)內(nèi)核,而不是在epoll_wait的時(shí)候重復(fù)拷貝。epoll保證了每個(gè)fd在整個(gè)過程中只會拷貝 一次。
對于第二個(gè)缺點(diǎn),epoll的解決方案不像select或poll一樣每次都把current輪流加入fd對應(yīng)的設(shè)備等待隊(duì)列中,而只在 epoll_ctl時(shí)把current掛一遍(這一遍必不可少)并為每個(gè)fd指定一個(gè)回調(diào)函數(shù),當(dāng)設(shè)備就緒,喚醒等待隊(duì)列上的等待者時(shí),就會調(diào)用這個(gè)回調(diào) 函數(shù),而這個(gè)回調(diào)函數(shù)會把就緒的fd加入一個(gè)就緒鏈表)。epoll_wait的工作實(shí)際上就是在這個(gè)就緒鏈表中查看有沒有就緒的fd(利用 schedule_timeout()實(shí)現(xiàn)睡一會,判斷一會的效果,和select實(shí)現(xiàn)中的第7步是類似的)。
對于第三個(gè)缺點(diǎn),epoll沒有這個(gè)限制,它所支持的FD上限是最大可以打開文件的數(shù)目,這個(gè)數(shù)字一般遠(yuǎn)大于2048,舉個(gè)例子, 在1GB內(nèi)存的機(jī)器上大約是10萬左右,具體數(shù)目可以cat /proc/sys/fs/file-max察看,一般來說這個(gè)數(shù)目和系統(tǒng)內(nèi)存關(guān)系很大。
epoll實(shí)現(xiàn)代碼示例:(了解即可)
#!/usr/bin/env python
import select
import socket
response = b''
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
# 因?yàn)閟ocket默認(rèn)是阻塞的,所以需要使用非阻塞(異步)模式。
serversocket.setblocking(0)
# 創(chuàng)建一個(gè)epoll對象
epoll = select.epoll()
# 在服務(wù)端socket上面注冊對讀event的關(guān)注。一個(gè)讀event隨時(shí)會觸發(fā)服務(wù)端socket去接收一個(gè)socket連接
epoll.register(serversocket.fileno(), select.EPOLLIN)
try:
# 字典connections映射文件描述符(整數(shù))到其相應(yīng)的網(wǎng)絡(luò)連接對象
connections = {}
requests = {}
responses = {}
while True:
# 查詢epoll對象,看是否有任何關(guān)注的event被觸發(fā)。參數(shù)“1”表示,我們會等待1秒來看是否有event發(fā)生。
# 如果有任何我們感興趣的event發(fā)生在這次查詢之前,這個(gè)查詢就會帶著這些event的列表立即返回
events = epoll.poll(1)
# event作為一個(gè)序列(fileno,event code)的元組返回。fileno是文件描述符的代名詞,始終是一個(gè)整數(shù)。
for fileno, event in events:
# 如果是服務(wù)端產(chǎn)生event,表示有一個(gè)新的連接進(jìn)來
if fileno == serversocket.fileno():
connection, address = serversocket.accept()
print('client connected:', address)
# 設(shè)置新的socket為非阻塞模式
connection.setblocking(0)
# 為新的socket注冊對讀(EPOLLIN)event的關(guān)注
epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
# 初始化接收的數(shù)據(jù)
requests[connection.fileno()] = b''
# 如果發(fā)生一個(gè)讀event,就讀取從客戶端發(fā)送過來的新數(shù)據(jù)
elif event & select.EPOLLIN:
print("------recvdata---------")
# 接收客戶端發(fā)送過來的數(shù)據(jù)
requests[fileno] += connections[fileno].recv(1024)
# 如果客戶端退出,關(guān)閉客戶端連接,取消所有的讀和寫監(jiān)聽
if not requests[fileno]:
connections[fileno].close()
# 刪除connections字典中的監(jiān)聽對象
del connections[fileno]
# 刪除接收數(shù)據(jù)字典對應(yīng)的句柄對象
del requests[connections[fileno]]
print(connections, requests)
epoll.modify(fileno, 0)
else:
# 一旦完成請求已收到,就注銷對讀event的關(guān)注,注冊對寫(EPOLLOUT)event的關(guān)注。寫event發(fā)生的時(shí)候,會回復(fù)數(shù)據(jù)給客戶端
epoll.modify(fileno, select.EPOLLOUT)
# 打印完整的請求,證明雖然與客戶端的通信是交錯(cuò)進(jìn)行的,但數(shù)據(jù)可以作為一個(gè)整體來組裝和處理
print('-' * 40 + '\n' + requests[fileno].decode())
# 如果一個(gè)寫event在一個(gè)客戶端socket上面發(fā)生,它會接受新的數(shù)據(jù)以便發(fā)送到客戶端
elif event & select.EPOLLOUT:
print("-------send data---------")
# 每次發(fā)送一部分響應(yīng)數(shù)據(jù),直到完整的響應(yīng)數(shù)據(jù)都已經(jīng)發(fā)送給操作系統(tǒng)等待傳輸給客戶端
byteswritten = connections[fileno].send(requests[fileno])
requests[fileno] = requests[fileno][byteswritten:]
if len(requests[fileno]) == 0:
# 一旦完整的響應(yīng)數(shù)據(jù)發(fā)送完成,就不再關(guān)注寫event
epoll.modify(fileno, select.EPOLLIN)
# HUP(掛起)event表明客戶端socket已經(jīng)斷開(即關(guān)閉),所以服務(wù)端也需要關(guān)閉。
# 沒有必要注冊對HUP event的關(guān)注。在socket上面,它們總是會被epoll對象注冊
elif event & select.EPOLLHUP:
print("end hup------")
# 注銷對此socket連接的關(guān)注
epoll.unregister(fileno)
# 關(guān)閉socket連接
connections[fileno].close()
del connections[fileno]
finally:
# 打開的socket連接不需要關(guān)閉,因?yàn)镻ython會在程序結(jié)束的時(shí)候關(guān)閉。這里顯式關(guān)閉是一個(gè)好的代碼習(xí)慣
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()
---------------------
本文來自 richard1ybb 的CSDN 博客 ,全文地址請點(diǎn)擊:https://blog.csdn.net/richard1ybb/article/details/74573200?utm_source=copy
總結(jié)
(1)select,poll實(shí)現(xiàn)需要自己不斷輪詢所有fd集合,直到設(shè)備就緒,期間可能要睡眠和喚醒多次交替。而epoll其實(shí)也需要調(diào)用 epoll_wait不斷輪詢就緒鏈表,期間也可能多次睡眠和喚醒交替,但是它是設(shè)備就緒時(shí),調(diào)用回調(diào)函數(shù),把就緒fd放入就緒鏈表中,并喚醒在 epoll_wait中進(jìn)入睡眠的進(jìn)程。雖然都要睡眠和交替,但是select和poll在“醒著”的時(shí)候要遍歷整個(gè)fd集合,而epoll在“醒著”的 時(shí)候只要判斷一下就緒鏈表是否為空就行了,這節(jié)省了大量的CPU時(shí)間,這就是回調(diào)機(jī)制帶來的性能提升。
(2)select,poll每次調(diào)用都要把fd集合從用戶態(tài)往內(nèi)核態(tài)拷貝一次,并且要把current往設(shè)備等待隊(duì)列中掛一次,而epoll只要 一次拷貝,而且把current往等待隊(duì)列上掛也只掛一次(在epoll_wait的開始,注意這里的等待隊(duì)列并不是設(shè)備等待隊(duì)列,只是一個(gè)epoll內(nèi) 部定義的等待隊(duì)列),這也能節(jié)省不少的開銷。
4.selector
? 這三種IO多路復(fù)用模型在不同的平臺有著不同的支持,而epoll在windows下就不支持,好在我們有selectors模塊,幫我們默認(rèn)選擇當(dāng)前平臺下最合適的,我們只需要寫監(jiān)聽誰,然后怎么發(fā)送消息接收消息,但是具體怎么監(jiān)聽的,選擇的是select還是poll還是epoll,這是selector幫我們自動選擇的
selector代碼示例:
服務(wù)端
#服務(wù)端
from socket import *
import selectors
sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
conn,addr=server_fileobj.accept()
sel.register(conn,selectors.EVENT_READ,read)
def read(conn,mask):
try:
data=conn.recv(1024)
if not data:
print('closing',conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close()
server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #設(shè)置socket的接口為非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相當(dāng)于網(wǎng)select的讀列表里append了一個(gè)文件句柄server_fileobj,并且綁定了一個(gè)回調(diào)函數(shù)accept
while True:
events=sel.select() #檢測所有的fileobj,是否有完成wait data的
for sel_obj,mask in events:
callback=sel_obj.data #callback=accpet
callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)
客戶端
#客戶端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))
while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
小練習(xí):基于selectors模塊實(shí)現(xiàn)并發(fā)的FTP
總結(jié)
以上是生活随笔為你收集整理的python io多路复用_Python之IO多路复用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: excel流程图分叉 合并_Excel和
- 下一篇: final类是否可以被代理_设计模式——