Linux网络编程 | 多路复用I/O :select、poll、epoll、水平触发与边缘触发、惊群问题
文章目錄
- 多路復用IO
- 多路復用IO的概念
- 多路復用IO與多線程/多進程的并發
- 多路復用IO模型進行服務器并發處理
- 多線程/多進程進行服務器并發處理
- select
- 工作原理
- 接口
- 優缺點
- select的封裝
- select模型實現TCP服務器
- poll
- 工作原理
- 接口
- 優缺點
- poll模型實現TCP服務器
- epoll
- 工作原理
- 接口
- 優缺點
- epoll的封裝
- epoll的工作模式
- LT模式(水平觸發)
- ET模式(邊緣觸發)
- LT水平觸發與ET邊緣觸發
- epoll LT模式實現TCP服務器
- epoll ET模式實現TCP服務器
- 驚群問題
- 多線程環境下驚群問題的解決方法
- 多進程環境下驚群問題的解決方法
多路復用IO
多路復用IO的概念
多路復用IO用于對大量描述符進行IO就緒事件監控,能夠讓用戶只針對就緒了指定事件的描述符進行操作。
IO的就緒事件分為可讀、可寫、異常
- 可讀事件:一個描述符對應的緩沖區中有數據可讀
- 可寫事件:一個描述符對應的緩沖區中有剩余空間可以寫入數據
- 異常事件:一個描述符發生了特定的異常信息
相比較于其他IO方式,多路復用IO 避免了對沒有就緒的描述符進行操作而帶來的阻塞,同時只針對已就緒的描述符進行操作,提高了效率
在Linux下,操作系統提供了三種模型:select模型、poll模型、epoll模型。
多路復用IO與多線程/多進程的并發
多路復用IO模型進行服務器并發處理
即在單執行流中進行輪詢處理就緒的描述符。如果就緒的描述符較多時,很難做到負載均衡(最后一個描述符要等待很長時間,前邊的描述符處理完了才能處理它)。
解決這一問題的方法就是在用戶態實現負載均衡,規定每個描述符只能讀取指定數量的數據,讀取了就進行下一個描述符。
多路復用IO模型適用于有大量描述符需要監控,但是同一時間只有少量活躍的場景
多線程/多進程進行服務器并發處理
即操作系統通過輪詢調度執行流實現每個執行流中描述符的處理
由于其在內核態實現了負載均衡,所以不需要用戶態做過多操作
多路復用適合于IO密集型服務,多進程或線程適合于CPU密集型服務,它們各有各的優勢,并不存在誰取代誰的傾向。基于兩者的特點,通??梢詫⒍嗦窂陀肐O和多線程/多進程搭配一起使用。
使用多路復用IO監控大量的描述符,哪個描述符有事件到來,就創建執行流去處理。這樣做的好處是防止直接創建執行流而描述符還未就緒,浪費資源。
select
工作原理
定義指定監控事件的描述符集合(即位圖),初始化集合后,將需要監控指定事件的描述符添加到指定事件(可讀、可寫、異常)的描述符集合中
將描述符集合拷貝到內核當中,對集合中所有描述符進行輪詢判斷,當描述符就緒或者等待超時后就調用返回,返回后的集合中只剩下已就緒的描述符(未就緒會在位圖中置為0)
通過遍歷描述符,判斷哪些描述符還在集合中,就可以知道哪些描述符已經就緒了,開始處理對應的IO時間。
接口
//清空集合 void FD_ZERO(fd_set *set);//向集合中添加描述符fd void FD_SET(int fd, fd_set *set);//從集合中刪除描述符fd void FD_CLR(int fd, fd_set *set);//判斷描述符是否還在集合中 int FD_ISSET(int fd, fd_set *set);//發起調用將集合拷貝到內核中并進行監控 int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);/*fd:文件描述符set:描述符位圖nfds:集合中最大描述符數值+1readfds:可讀事件集合writefds:可寫事件集合exceptfds:異常事件集合timeout:超時等待時間timeval結構體有兩個成員struct timeval {long tv_sec; 毫秒long tv_usec; 微秒}; */優缺點
缺點:
優點:
select的封裝
為了能讓select使用更加便利,對其進行一層封裝。
#ifndef __SELECT_H_ #define __SELECT_H_#include<iostream> #include<vector> #include<sys/socket.h> #include"TcpSocket.hpp"class Select {public:Select() : _maxfd(-1){//將集合初始化清空FD_ZERO(&_rfds);} //向集合中添加描述符bool Add(const TcpSocket& socket){int fd = socket.GetFd();FD_SET(fd, &_rfds);//如果新增描述符比最大描述符大,則更新if(fd > _maxfd){_maxfd = fd;}return true;}//從集合中刪除描述符bool Del(const TcpSocket& socket) {int fd = socket.GetFd();FD_CLR(fd, &_rfds);//如果被刪除的描述符是最大的,則從后往前再找一個if(fd == _maxfd){for(int i = _maxfd; i >= 0; i--){//如果這個描述符在集合中,則更新最大值if(FD_ISSET(i, &_rfds)){_maxfd = i;break;}}}return true;}//從集合中找到所有就緒的描述符bool Wait(std::vector<TcpSocket>& vec, int outlime = 3) {struct timeval tv;//以毫秒為單位tv.tv_sec = outlime;//計算剩余的微秒tv.tv_usec = 0;//因為select會去掉集合中沒就緒的描述符,所以不能直接操作集合,只能操作集合的拷貝fd_set set = _rfds;int ret = select(_maxfd + 1, &set, NULL, NULL, &tv);if(ret < 0){std::cerr << "select error" << std::endl;return false;}else if(ret == 0){std::cerr << "wait timeout" << std::endl;return true;}for(int i = 0; i < _maxfd + 1; i++){//將就緒描述符放入數組中if(FD_ISSET(i, &set)){TcpSocket socket;socket.SetFd(i);vec.push_back(socket);}}return true;} private://需要監控的描述符,因為select會修改集合,所以每次進行操作的都是它的拷貝fd_set _rfds;//最大的描述符,因為fd_set是位圖,所以保存最大的描述符可以減少遍歷的次數。int _maxfd; };#endifselect模型實現TCP服務器
#include<iostream> #include<string> #include<unistd.h> #include<sys/socket.h> #include<arpa/inet.h> #include<netinet/in.h> #include"TcpSocket.hpp" #include"select.hpp"using namespace std;int main(int argc, char* argv[]) {if(argc != 3){ cerr << "正確輸入方式: ./select_srv.cc ip port\n" << endl;return -1; } string srv_ip = argv[1];uint16_t srv_port = stoi(argv[2]);TcpSocket lst_socket;//創建監聽套接字CheckSafe(lst_socket.Socket());//綁定地址信息CheckSafe(lst_socket.Bind(srv_ip, srv_port));//開始監聽CheckSafe(lst_socket.Listen());Select s;s.Add(lst_socket);while(1){vector<TcpSocket> vec; //去掉未就緒描述符bool ret = s.Wait(vec);if(ret == false){continue;}//取出就緒描述符進行處理for(auto socket : vec){//如果就緒的是監聽套接字,則代表有新連接if(socket.GetFd() == lst_socket.GetFd()){TcpSocket new_socket;ret = lst_socket.Accept(&new_socket);if(ret == false){continue;}//新建套接字加入集合中s.Add(new_socket);}//新數據到來else{string data;//接收數據ret = socket.Recv(data);//斷開連接,移除監控if(ret == false){s.Del(socket);socket.Close();continue;}cout << "cli send message: " << data << endl;data.clear();if(ret == false){s.Del(socket);socket.Close();continue;}}}}//關閉監聽套接字lst_socket.Close();return 0; }poll
工作原理
接口
struct pollfd {int fd; //需要監控的文件描述符short events; //需要監控的事件short revents; //實際就緒的事件 }; /*操作相對簡單,如果某個描述符不需要繼續監控時,直接將對應結構體中的fd置為-1即可。 *///發起監控 int poll(struct pollfd *fds, nfds_t nfds, int timeout); /*fds:pollfd數組nfds:數組的大小timeout:超時等待時間,單位為毫秒 */優缺點
缺點:
優點:
poll模型實現TCP服務器
#include<poll.h> #include<vector> #include <sys/socket.h> #include"TcpSocket.hpp" #define MAX_SIZE 10using namespace std;int main(int argc, char* argv[]) {if(argc != 3){ cerr << "正確輸入方式: ./select_srv.cc ip port\n" << endl;return -1; } string srv_ip = argv[1];uint16_t srv_port = stoi(argv[2]);TcpSocket lst_socket;//創建監聽套接字CheckSafe(lst_socket.Socket());//綁定地址信息CheckSafe(lst_socket.Bind(srv_ip, srv_port));//開始監聽CheckSafe(lst_socket.Listen());struct pollfd poll_fd[MAX_SIZE];poll_fd[0].fd = lst_socket.GetFd();poll_fd[0].events = POLLIN;int i = 0, maxi = 0;for(i = 1; i < MAX_SIZE; i++){poll_fd[i].fd = -1;}while(1){int ret = poll(poll_fd, maxi + 1, 2000);if(ret < 0){cerr << "not ready" << endl;continue;}else if(ret == 0){cerr << "wait timeout" << endl;continue;}//監聽套接字就緒則增加新連接if(poll_fd[0].revents & (POLLIN | POLLERR)){struct sockaddr_in addr;socklen_t len = sizeof(sockaddr_in);//創建一個新的套接字與客戶端建立連接int new_fd = accept(lst_socket.GetFd(), (sockaddr*)&addr, &len);for(i = 1; i < MAX_SIZE; i++){if(poll_fd[i].fd == -1){poll_fd[i].fd = new_fd;poll_fd[i].events = POLLIN;break;}}if(i > maxi){maxi = i;}if(--ret <= 0){continue;}}for(i = 1; i <= maxi; i++){ if(poll_fd[i].fd == -1){continue;}if(poll_fd[i].revents & (POLLIN | POLLERR)){//新數據到來char buff[4096] = { 0 };int ret = recv(poll_fd[i].fd, buff, 4096, 0); if(ret == 0){ std::cerr << "connect error" << std::endl;close(poll_fd[i].fd);poll_fd[i].fd = -1;} else if(ret < 0){ std::cerr << "recv error" << std::endl;close(poll_fd[i].fd);poll_fd[i].fd = -1;} else{cout << "cli send message: " << buff << endl;}if(--ret <= 0){break;}}}}lst_socket.Close();return 0; }epoll
工作原理
struct eventpoll{ .... /*紅黑樹的根節點,這顆樹中存儲著所有添加到epoll中的需要監控的事件*/ struct rb_root rbr; /*雙鏈表中則存放著將要通過epoll_wait返回給用戶的滿足條件的事件*/ struct list_head rdlist; .... };接口
//在內核中創建eventpoll結構體,返回操作句柄(size為監控的最大數量,但是在linux2.6.8后忽略上限,只需要給一個大于0的數字即可) int epoll_create(int size);//組織描述符事件結構體 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); /*epfd:eventpoll結構體的操作句柄op:操作的選項,EPOLL_CTL_ADD/EPOLL_CTL_MOD/EPOLL_CTL_DELfd:描述符event:監控描述符對應的事件信息結構體struct epoll_event {uint32_t events; // 要監控的事件,以及調用返回后實際就緒的事件 epoll_data_t data; // 聯合體,用來存放各種類型的描述符 };typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;} epoll_data_t; *///開始監控,當有描述符就緒或者等待超時后調用返回 int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout); /*maxevents:events數組的結點數量timeout:超時等待時間返回值為就緒的描述符個數 */優缺點
epoll是Linux下性能最高的多路復用IO模型,幾乎具備了一切所需的優點
缺點:
優點:
4. 底層用的是紅黑樹存儲,監控的描述符數量沒有上限
5. 所有的描述符事件信息只需要向內核中拷貝一次
6. 監控采用異步阻塞,性能不會隨著描述符增多而下降
7. 直接返回就緒描述符事件信息,可以直接對就緒描述符進行操作,不需要像select和poll一樣遍歷判斷。
epoll的封裝
#ifndef __EPOLL_H_ #define __EPOLL_H_ #include<iostream> #include<vector> #include<sys/epoll.h> #include<unistd.h> #include"TcpSocket.hpp"const int EPOLL_SIZE = 1000;class Epoll {public:Epoll(){//現版本已經忽略size,隨便給一個大于0的數字即可_epfd = epoll_create(1);if(_epfd < 0){std::cerr << "epoll create error" << std::endl;exit(0);}}~Epoll(){close(_epfd);}//增加新的監控事件bool Add(const TcpSocket& socket, bool epoll_et = false, uint32_t events = EPOLLIN) const{int fd = socket.GetFd();//組織監控事件結構體struct epoll_event ev;ev.data.fd = fd;//設置需要監控的描述符if(epoll_et == true){ev.events = events | EPOLLET;}else{ ev.events = events;}int ret = epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);if(ret < 0){std::cerr << "epoll ctl add error " << std::endl;return false;}return true;}//刪除監控事件bool Del(const TcpSocket& socket) const {int fd = socket.GetFd();int ret = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);if(ret < 0){std::cerr << "epoll ctl del error" << std::endl;return false;}return true;}//開始監控bool Wait(std::vector<TcpSocket>& vec, int timeout = 3000) const {vec.clear();struct epoll_event evs[EPOLL_SIZE];//開始監控,返回值為就緒描述符數量int ret = epoll_wait(_epfd, evs, EPOLL_SIZE, timeout);//當前沒有描述符就緒if(ret < 0){std::cerr << "epoll not ready" << std::endl;return false;}//等待超時else if(ret == 0){std::cerr << "epoll wait timeout" << std::endl;return false;}for(int i = 0; i < ret; i++){//將所有就緒描述符放進數組中TcpSocket new_socket;new_socket.SetFd(evs[i].data.fd);vec.push_back(new_socket);}return true;}private://epoll的操作句柄int _epfd; };#endifepoll的工作模式
epoll有兩種工作模式,LT模式(水平觸發模式)和ET模式(邊緣觸發模式)。
LT模式(水平觸發)
LT模式也就是水平觸發模式,是epoll的默認觸發模式(select和poll只有這種模式)
觸發條件
可讀事件:接受緩沖區中的數據大小高于低水位標記,則會觸發事件
可寫事件:發送緩沖區中的剩余空間大小大于低水位標記,則會觸發事件
低水位標記:一個基準值,默認是1
所以簡單點說,水平觸發模式就是只要緩沖區中還有數據,就會一直觸發事件
- 當epoll檢測到socket上事件就緒的時候, 可以不立刻進行處理. 或者只處理一部分.
- 如上面的例子, 由于只讀了1K數據, 緩沖區中還剩1K數據, 在第二次調用 epoll_wait 時, epoll_wait 仍然會立刻返回并通知socket讀事件就緒.
- 直到緩沖區上所有的數據都被處理完, epoll_wait 才不會立刻返回.
- 支持阻塞讀寫和非阻塞讀寫
ET模式(邊緣觸發)
ET模式也就是邊緣觸發模式,如果我們在第1步將socket添加到epoll_event描述符的時候使用了EPOLLET標志, epoll就會進入ET工作模式
觸發條件
可讀事件:(不關心接受緩沖區是否有數據)每當有新數據到來時,才會觸發事件。
可寫事件:剩余空間從無到有的時候才會觸發事件(即從不可寫到可寫)
簡單點說,ET模式下只有在新數據到來的情況下才會觸發事件。這也就要求我們在新數據到來的時候最好能夠一次性將所有數據取出,否則不會觸發第二次事件,只有等到下次再有新數據到來才會觸發。而我們也不知道具體有多少數據,所以就需要循環處理,直到緩沖區為空,但是recv是一個阻塞讀取,如果沒有數據時就會阻塞等待,這時候就需要將描述符的屬性設置為非阻塞,才能解決這個問題
void SetNoBlock(int fd) {int flag = fcntl(fd, F_GETFL);flag |= O_NONBLOCK;fcntl(fd, F_SETFL, flag); }- 當epoll檢測到socket上事件就緒時, 必須立刻處理.
- 如上面的例子, 雖然只讀了1K的數據, 緩沖區還剩1K的數據, 在第二次調用 epoll_wait 的時候, epoll_wait 不會再返回了.
- 也就是說, ET模式下, 文件描述符上的事件就緒后, 只有一次處理機會.
- ET的性能比LT性能更高( epoll_wait 返回的次數少了很多). Nginx默認采用ET模式使用epoll.
- 只支持非阻塞的讀寫
LT水平觸發與ET邊緣觸發
所以簡單點說,LT就是只要緩沖區中還有數據,就會一直觸發事件,而ET模式下只有在新數據到來的情況下才會觸發事件。
LT模式的優點主要在于其簡單且穩定,不容易出現問題,傳統的select和poll都是使用這個模式。但是他也有缺點,就是因為事件觸發過多導致效率降低
ET最大的優點就是減少了epoll的觸發次數,但是這也帶來了巨大的代價,就是要求必須一次性將所有的數據處理完,雖然效率得到了提高,但是代碼的復雜程度大大的增加了。Nginx就是默認采用ET模式
還有一種場景適合ET模式使用,如果我們需要接受一條數據,但是這條數據因為某種問題導致其發送不完整,需要分批發送。所以此時的緩沖區中數據只有部分,如果此時將其取出,則會增加維護數據的開銷,正確的做法應該是等待后續數據到達后將其補全,再一次性取出。但是如果此時使用的是LT模式,就會因為緩沖區不為空而一直觸發事件,所以這種情況下使用ET會比較好。
epoll LT模式實現TCP服務器
#include<poll.h> #include<vector> #include <sys/socket.h> #include"TcpSocket.hpp" #include"epoll.hpp"using namespace std;int main(int argc, char* argv[]) {if(argc != 3){ cerr << "正確輸入方式: ./epoll_lt_srv ip port\n" << endl;return -1; } string srv_ip = argv[1];uint16_t srv_port = stoi(argv[2]);TcpSocket lst_socket;//創建監聽套接字CheckSafe(lst_socket.Socket());//綁定地址信息CheckSafe(lst_socket.Bind(srv_ip, srv_port));//開始監聽CheckSafe(lst_socket.Listen());Epoll epoll;epoll.Add(lst_socket);while(1){vector<TcpSocket> vec;int ret = epoll.Wait(vec);if(ret <= 0){continue;}for(auto& socket : vec){//如果就緒的是監聽套接字,則說明有新連接到來if(socket.GetFd() == lst_socket.GetFd()){TcpSocket new_socket;lst_socket.Accept(&new_socket);epoll.Add(new_socket);}//如果不是,則說明已連接的套接字有新數據到來else{ string data;//接收數據ret = socket.Recv(data);//斷開連接,移除監控if(ret == false){ epoll.Del(socket);socket.Close();continue;} cout << "cli send message: " << data << endl;data.clear();if(ret == false){ epoll.Del(socket);socket.Close();continue;} }}}lst_socket.Close();return 0; }epoll ET模式實現TCP服務器
因為ET模式只支持非阻塞的讀寫,所以需要新增非阻塞讀以及非阻塞寫的接口,同時要對加入epoll的套接字加上EPOLLET的選項
//非阻塞發送數據,因為ET模式對于讀寫的響應只處理一次,所以需要通過輪詢的將緩沖區一次性讀取完 bool SendNoBlock(const std::string& data) {ssize_t pos = 0;ssize_t left_size = data.size();while (1){ssize_t ret = send(_socket_fd, data.data() + pos, left_size, 0);if (ret < 0){//嘗試重新寫入if (errno == EAGAIN || errno == EWOULDBLOCK){continue;}return false;}pos += ret;left_size -= ret;//如果數據發送完畢if (left_size <= 0){break;}}return true; }//非阻塞接收數據 bool RecvNoBlock(std::string& data) {data.clear();char buff[4096] = { 0 };while (1){ssize_t ret = recv(_socket_fd, buff, 4096, 0);//沒有內容if (ret < 0){//嘗試重新寫入if (errno == EAGAIN || errno == EWOULDBLOCK){continue;}return false;}//對端關閉else if (ret == 0){return false;}buff[ret] = '\0';data += buff;//如果當前接受數據小于緩沖區長度,則說明數據全部接收完畢,反之則說明還需要多次輪詢接收if (ret < 4096){break;}}return true; } #include<poll.h> #include<vector> #include <sys/socket.h> #include"TcpSocket.hpp" #include"epoll.hpp"using namespace std;int main(int argc, char* argv[]) {if(argc != 3){ cerr << "正確輸入方式: ./epoll_et_srv ip port\n" << endl;return -1; } string srv_ip = argv[1];uint16_t srv_port = stoi(argv[2]);TcpSocket lst_socket;//創建監聽套接字CheckSafe(lst_socket.Socket());//綁定地址信息CheckSafe(lst_socket.Bind(srv_ip, srv_port));//開始監聽CheckSafe(lst_socket.Listen());lst_socket.SetNoBlock();Epoll epoll;epoll.Add(lst_socket);while(1){vector<TcpSocket> vec;int ret = epoll.Wait(vec);if(ret <= 0){continue;}for(auto& socket : vec){//如果就緒的是監聽套接字,則說明有新連接到來if(socket.GetFd() == lst_socket.GetFd()){TcpSocket new_socket;lst_socket.Accept(&new_socket);new_socket.SetNoBlock();epoll.Add(new_socket, true);}//如果不是,則說明已連接的套接字有新數據到來else{ string data;//接收數據bool ret = socket.RecvNoBlock(data);//斷開連接,移除監控if(!ret){ epoll.Del(socket);socket.Close();continue;} cout << "cli send message: " << data << endl;data.clear();if(ret == false){ epoll.Del(socket);socket.Close();continue;} }}}lst_socket.Close();return 0; }驚群問題
在一個執行流中,如果添加了特別多的描述符進行監控,則輪詢處理就會比較慢。
因此就會采取多執行流的解決方法,在多個執行流中創建epoll,每個epoll監控一部分描述符,使壓力分攤。但是可能因為無法確定哪些描述符即將就緒,所以就會讓每個執行流都監控所有描述符,誰先搶到事件則誰去處理。
所以當多個執行流同時在等待就緒事件時,如果某個描述符就緒,他就會喚醒全部執行流中的epoll進行爭搶,但是此時就只會有一個執行流搶到并執行,而此時其他的執行流都會因為爭搶失敗而報錯,錯誤碼EAGAIN。這就是驚群問題。
驚群問題帶來了什么壞處呢?
多線程環境下驚群問題的解決方法
這種方法其實也就是本篇博客開頭提到的一種做法。只使用一個線程進行事件的監控,每當有就緒事件到來時,就將這些事件轉交給其他線程去處理,這樣就避免了因為多執行流同時使用epoll監控而帶來的驚群問題。
多進程環境下驚群問題的解決方法
這里主要借鑒的是lighttpd和nginx的解決方法。
lighttpd的解決思路很簡單粗暴,就是直接無視這個問題,事件到來后依舊能夠喚醒多個進程來爭搶,并且只有一個能成功,其他進程爭搶失敗后的報錯EAGAIN會被捕獲,捕獲后不會處理這個錯誤,而是直接無視,就當做沒有發生。
nginx的解決思路是其實就是加鎖與負載均衡。使用一個全局的互斥鎖,每當有描述符就緒,就會讓每個進程都去競爭這把鎖(如果某個進程當前連接數達到了最大連接數的7/8,也就是其負載均衡點,此時這個進程就不會再去爭搶所資源,而是將負載均衡到其他進程上),如果成功競爭到了鎖,則將描述符加入進自己的wait集合中,而對于沒有競爭到鎖的進程,則將其從自己的wait集合中移除,這樣就保證了不會讓多個進程同一時間進行監控,而是讓每個進程都通過競爭鎖的方式輪流進行監控,這樣保證了同一時間只會有一個進程進行監控,所以驚群問題也得到了解決。
參考資料
高并發網絡編程之epoll詳解
epoll詳解
Linux驚群效應詳解
epoll的驚群效應
Apache與Nginx網絡模型
[框架]高并發中的驚群效應
Nginx如何解決“驚群”現象
總結
以上是生活随笔為你收集整理的Linux网络编程 | 多路复用I/O :select、poll、epoll、水平触发与边缘触发、惊群问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++ 类型转换 :C语言的类型转换、C
- 下一篇: C++ STL : SGI-STL空间配