通过完整示例来理解如何使用 epoll
網(wǎng)絡(luò)服務(wù)器通常使用一個獨(dú)立的進(jìn)程或線程來實(shí)現(xiàn)每個連接。由于高性能應(yīng)用程序需要同時處理大量的客戶端,這種方法就不太好用了,因?yàn)橘Y源占用和上下文切換時間等因素影響了同時處理大量客戶端的能力。另一種方法是在一個線程中使用非阻塞 I/O,以及一些就緒通知方法,即當(dāng)你可以在一個套接字上讀寫更多數(shù)據(jù)的時候告訴你。
本文介紹了 Linux 的 epoll(7) 機(jī)制,它是 Linux 最好的就緒通知機(jī)制。我們用 C 語言編寫了示例代碼,實(shí)現(xiàn)了一個完整的 TCP 服務(wù)器。 我假設(shè)您有一定 C 語言編程經(jīng)驗(yàn),知道如何在 Linux 上編譯和運(yùn)行程序,并且可以閱讀手冊查看各種需要的 C 函數(shù)。
epoll 是在 Linux 2.6 中引入的,在其他類 UNIX 操作系統(tǒng)上不可用。它提供了一個類似于 select(2) 和 poll(2) 函數(shù)的功能:
- select(2) 一次可以監(jiān)測 FD_SETSIZE數(shù)量大小的描述符,FD_SETSIZE 通常是一個在 libc 編譯時指定的小數(shù)字。
- poll(2) 一次可以監(jiān)測的描述符數(shù)量并沒有限制,但撇開其它因素,我們每次都不得不檢查就緒通知,線性掃描所有通過描述符,這樣時間復(fù)雜度為 O(n)而且很慢。
epoll 沒有這些固定限制,也不執(zhí)行任何線性掃描。因此它可以更高效地執(zhí)行和處理大量事件。
一個 epoll 實(shí)例可由 epoll_create(2) 或 epoll_create1(2) (它們采用不同的參數(shù))創(chuàng)建,它們的返回值是一個 epoll 實(shí)例。epoll_ctl(2) 用來添加或刪除監(jiān)聽 epoll 實(shí)例的描述符。epoll_wait(2) 用來等待被監(jiān)聽的描述符事件,一直阻塞到事件可用。更多信息請參見相關(guān)手冊。
當(dāng)描述符被添加到 epoll 實(shí)例時,有兩種模式:電平觸發(fā)和邊緣觸發(fā)(譯者注:借鑒電路里面的概念)。當(dāng)你使用電平觸發(fā)模式,并且數(shù)據(jù)可以被讀取,epoll_wait(2) 函數(shù)總是會返回就緒事件。如果你還沒有讀完數(shù)據(jù),并且再次在 epoll 實(shí)例上調(diào)用?epoll_wait(2) 函數(shù)監(jiān)聽這個描述符,由于還有數(shù)據(jù)可讀,那么它會再次返回這個事件。在邊緣觸發(fā)模式下,你只會得到一次就緒通知。如果你沒有將數(shù)據(jù)全部讀走,并且再次在 epoll 實(shí)例上調(diào)用 epoll_wait(2) 函數(shù)監(jiān)聽這個描述符,它就會阻塞,因?yàn)榫途w事件已經(jīng)發(fā)送過了。
傳遞到 epoll_ctl(2) 的 epoll 事件結(jié)構(gòu)體如下。對每一個被監(jiān)聽的描述符,你可以關(guān)聯(lián)到一個整數(shù)或者一個用戶數(shù)據(jù)的指針。
C typedef union epoll_data {void *ptr;int fd;__uint32_t u32;__uint64_t u64; } epoll_data_t;struct epoll_event {__uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */ };| 1 2 3 4 5 6 7 8 9 10 11 12 13 | typedef union epoll_data { ??void????????*ptr; ??int??????????fd; ??__uint32_t?? u32; ??__uint64_t?? u64; } epoll_data_t; struct epoll_event { ??__uint32_t?? events; /* Epoll events */ ??epoll_data_t data;?? /* User data variable */ }; |
現(xiàn)在我們開始寫代碼。我們將實(shí)現(xiàn)一個小的 TCP 服務(wù)器,將發(fā)送到這個套接字的所有數(shù)據(jù)打印到標(biāo)準(zhǔn)輸出上。首先編寫一個 create_and_bind() 函數(shù),用來創(chuàng)建和綁定 TCP 套接字:
C static int create_and_bind (char *port) {struct addrinfo hints;struct addrinfo *result, *rp;int s, sfd;memset (&hints, 0, sizeof (struct addrinfo));hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */hints.ai_flags = AI_PASSIVE; /* All interfaces */s = getaddrinfo (NULL, port, &hints, &result);if (s != 0){fprintf (stderr, "getaddrinfo: %sn", gai_strerror (s));return -1;}for (rp = result; rp != NULL; rp = rp->ai_next){sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);if (sfd == -1)continue;s = bind (sfd, rp->ai_addr, rp->ai_addrlen);if (s == 0){/* We managed to bind successfully! */break;}close (sfd);}if (rp == NULL){fprintf (stderr, "Could not bindn");return -1;}freeaddrinfo (result);return sfd; }| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | static int create_and_bind (char *port) { ??struct addrinfo hints; ??struct addrinfo *result, *rp; ??int s, sfd; ??memset (&hints, 0, sizeof (struct addrinfo)); ??hints.ai_family = AF_UNSPEC;???? /* Return IPv4 and IPv6 choices */ ??hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */ ??hints.ai_flags = AI_PASSIVE;???? /* All interfaces */ ??s = getaddrinfo (NULL, port, &hints, &result); ??if (s != 0) ????{ ??????fprintf (stderr, "getaddrinfo: %sn", gai_strerror (s)); ??????return -1; ????} ??for (rp = result; rp != NULL; rp = rp->ai_next) ????{ ??????sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol); ??????if (sfd == -1) ????????continue; ??????s = bind (sfd, rp->ai_addr, rp->ai_addrlen); ??????if (s == 0) ????????{ ??????????/* We managed to bind successfully! */ ??????????break; ????????} ??????close (sfd); ????} ??if (rp == NULL) ????{ ??????fprintf (stderr, "Could not bindn"); ??????return -1; ????} ??freeaddrinfo (result); ??return sfd; } |
create_and_bind() 包含一個標(biāo)準(zhǔn)代碼塊,用一種可移植的方式來獲得 IPv4 和 IPv6 套接字。它接受一個 port 字符串參數(shù),可由 argv[1] 傳遞。getaddrinfo(3) 函數(shù)返回一堆 addrinfo?結(jié)構(gòu)體到 result 變量中,它們與傳入的 hints參數(shù)是兼容的。addrinfo結(jié)構(gòu)體像這樣:
C struct addrinfo {int ai_flags;int ai_family;int ai_socktype;int ai_protocol;size_t ai_addrlen;struct sockaddr *ai_addr;char *ai_canonname;struct addrinfo *ai_next; };| 1 2 3 4 5 6 7 8 9 10 11 | struct addrinfo { ??int??????????????ai_flags; ??int??????????????ai_family; ??int??????????????ai_socktype; ??int??????????????ai_protocol; ??size_t?????????? ai_addrlen; ??struct sockaddr *ai_addr; ??char????????????*ai_canonname; ??struct addrinfo *ai_next; }; |
我們依次遍歷這些結(jié)構(gòu)體并用它們創(chuàng)建套接字,直到可以創(chuàng)建并綁定一個套接字。如果成功了,create_and_bind() 返回這個套接字描述符。如果失敗則返回 -1。
下面我們編寫一個函數(shù),用于將套接字設(shè)置為非阻塞狀態(tài)。make_socket_non_blocking() 為傳入的 sfd?參數(shù)設(shè)置 O_NONBLOCK 標(biāo)志:
C static int make_socket_non_blocking (int sfd) {int flags, s;flags = fcntl (sfd, F_GETFL, 0);if (flags == -1){perror ("fcntl");return -1;}flags |= O_NONBLOCK;s = fcntl (sfd, F_SETFL, flags);if (s == -1){perror ("fcntl");return -1;}return 0; }| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | static int make_socket_non_blocking (int sfd) { ??int flags, s; ??flags = fcntl (sfd, F_GETFL, 0); ??if (flags == -1) ????{ ??????perror ("fcntl"); ??????return -1; ????} ??flags |= O_NONBLOCK; ??s = fcntl (sfd, F_SETFL, flags); ??if (s == -1) ????{ ??????perror ("fcntl"); ??????return -1; ????} ??return 0; } |
現(xiàn)在說說?main() 函數(shù)吧,它里面包含了這個程序的事件循環(huán)。這是主要代碼:
#define MAXEVENTS 64int main (int argc, char *argv[]) {int sfd, s;int efd;struct epoll_event event;struct epoll_event *events;if (argc != 2){fprintf (stderr, "Usage: %s [port]n", argv[0]);exit (EXIT_FAILURE);}sfd = create_and_bind (argv[1]);if (sfd == -1)abort ();s = make_socket_non_blocking (sfd);if (s == -1)abort ();s = listen (sfd, SOMAXCONN);if (s == -1){perror ("listen");abort ();}efd = epoll_create1 (0);if (efd == -1){perror ("epoll_create");abort ();}event.data.fd = sfd;event.events = EPOLLIN | EPOLLET;s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);if (s == -1){perror ("epoll_ctl");abort ();}/* Buffer where events are returned */events = calloc (MAXEVENTS, sizeof event);/* The event loop */while (1){int n, i;n = epoll_wait (efd, events, MAXEVENTS, -1);for (i = 0; i < n; i++){if ((events[i].events & EPOLLERR) ||(events[i].events & EPOLLHUP) ||(!(events[i].events & EPOLLIN))){/* An error has occured on this fd, or the socket is notready for reading (why were we notified then?) */fprintf (stderr, "epoll errorn");close (events[i].data.fd);continue;}else if (sfd == events[i].data.fd){/* We have a notification on the listening socket, whichmeans one or more incoming connections. */while (1){struct sockaddr in_addr;socklen_t in_len;int infd;char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];in_len = sizeof in_addr;infd = accept (sfd, &in_addr, &in_len);if (infd == -1){if ((errno == EAGAIN) ||(errno == EWOULDBLOCK)){/* We have processed all incomingconnections. */break;}else{perror ("accept");break;}}s = getnameinfo (&in_addr, in_len,hbuf, sizeof hbuf,sbuf, sizeof sbuf,NI_NUMERICHOST | NI_NUMERICSERV);if (s == 0){printf("Accepted connection on descriptor %d ""(host=%s, port=%s)n", infd, hbuf, sbuf);}/* Make the incoming socket non-blocking and add it to thelist of fds to monitor. */s = make_socket_non_blocking (infd);if (s == -1)abort ();event.data.fd = infd;event.events = EPOLLIN | EPOLLET;s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);if (s == -1){perror ("epoll_ctl");abort ();}}continue;}else{/* We have data on the fd waiting to be read. Read anddisplay it. We must read whatever data is availablecompletely, as we are running in edge-triggered modeand won't get a notification again for the samedata. */int done = 0;while (1){ssize_t count;char buf[512];count = read (events[i].data.fd, buf, sizeof buf);if (count == -1){/* If errno == EAGAIN, that means we have read alldata. So go back to the main loop. */if (errno != EAGAIN){perror ("read");done = 1;}break;}else if (count == 0){/* End of file. The remote has closed theconnection. */done = 1;break;}/* Write the buffer to standard output */s = write (1, buf, count);if (s == -1){perror ("write");abort ();}}if (done){printf ("Closed connection on descriptor %dn",events[i].data.fd);/* Closing the descriptor will make epoll remove itfrom the set of descriptors which are monitored. */close (events[i].data.fd);}}}}free (events);close (sfd);return EXIT_SUCCESS; }| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | #define MAXEVENTS 64 int main (int argc, char *argv[]) { ??int sfd, s; ??int efd; ??struct epoll_event event; ??struct epoll_event *events; ??if (argc != 2) ????{ ??????fprintf (stderr, "Usage: %s [port]n", argv[0]); ??????exit (EXIT_FAILURE); ????} ??sfd = create_and_bind (argv[1]); ??if (sfd == -1) ????abort (); ??s = make_socket_non_blocking (sfd); ??if (s == -1) ????abort (); ??s = listen (sfd, SOMAXCONN); ??if (s == -1) ????{ ??????perror ("listen"); ??????abort (); ????} ??efd = epoll_create1 (0); ??if (efd == -1) ????{ ??????perror ("epoll_create"); ??????abort (); ????} ??event.data.fd = sfd; ??event.events = EPOLLIN | EPOLLET; ??s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event); ??if (s == -1) ????{ ??????perror ("epoll_ctl"); ??????abort (); ????} ??/* Buffer where events are returned */ ??events = calloc (MAXEVENTS, sizeof event); ??/* The event loop */ ??while (1) ????{ ??????int n, i; ??????n = epoll_wait (efd, events, MAXEVENTS, -1); ??????for (i = 0; i < n; i++) ????{ ??????if ((events[i].events & EPOLLERR) || ??????????????(events[i].events & EPOLLHUP) || ??????????????(!(events[i].events & EPOLLIN))) ????????{ ??????????????/* An error has occured on this fd, or the socket is not ???????????????? ready for reading (why were we notified then?) */ ??????????fprintf (stderr, "epoll errorn"); ??????????close (events[i].data.fd); ??????????continue; ????????} ??????else if (sfd == events[i].data.fd) ????????{ ??????????????/* We have a notification on the listening socket, which ???????????????? means one or more incoming connections. */ ??????????????while (1) ????????????????{ ??????????????????struct sockaddr in_addr; ??????????????????socklen_t in_len; ??????????????????int infd; ??????????????????char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; ??????????????????in_len = sizeof in_addr; ??????????????????infd = accept (sfd, &in_addr, &in_len); ??????????????????if (infd == -1) ????????????????????{ ??????????????????????if ((errno == EAGAIN) || ??????????????????????????(errno == EWOULDBLOCK)) ????????????????????????{ ??????????????????????????/* We have processed all incoming ???????????????????????????? connections. */ ??????????????????????????break; ????????????????????????} ??????????????????????else ????????????????????????{ ??????????????????????????perror ("accept"); ??????????????????????????break; ????????????????????????} ????????????????????} ??????????????????s = getnameinfo (&in_addr, in_len, ?????????????????????????????????? hbuf, sizeof hbuf, ?????????????????????????????????? sbuf, sizeof sbuf, ?????????????????????????????????? NI_NUMERICHOST | NI_NUMERICSERV); ??????????????????if (s == 0) ????????????????????{ ??????????????????????printf("Accepted connection on descriptor %d " ???????????????????????????? "(host=%s, port=%s)n", infd, hbuf, sbuf); ????????????????????} ??????????????????/* Make the incoming socket non-blocking and add it to the ???????????????????? list of fds to monitor. */ ??????????????????s = make_socket_non_blocking (infd); ??????????????????if (s == -1) ????????????????????abort (); ??????????????????event.data.fd = infd; ??????????????????event.events = EPOLLIN | EPOLLET; ??????????????????s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event); ??????????????????if (s == -1) ????????????????????{ ??????????????????????perror ("epoll_ctl"); ??????????????????????abort (); ????????????????????} ????????????????} ??????????????continue; ????????????} ??????????else ????????????{ ??????????????/* We have data on the fd waiting to be read. Read and ???????????????? display it. We must read whatever data is available ???????????????? completely, as we are running in edge-triggered mode ???????????????? and won't get a notification again for the same ???????????????? data. */ ??????????????int done = 0; ??????????????while (1) ????????????????{ ??????????????????ssize_t count; ??????????????????char buf[512]; ??????????????????count = read (events[i].data.fd, buf, sizeof buf); ??????????????????if (count == -1) ????????????????????{ ??????????????????????/* If errno == EAGAIN, that means we have read all ???????????????????????? data. So go back to the main loop. */ ??????????????????????if (errno != EAGAIN) ????????????????????????{ ??????????????????????????perror ("read"); ??????????????????????????done = 1; ????????????????????????} ??????????????????????break; ????????????????????} ??????????????????else if (count == 0) ????????????????????{ ??????????????????????/* End of file. The remote has closed the ???????????????????????? connection. */ ??????????????????????done = 1; ??????????????????????break; ????????????????????} ??????????????????/* Write the buffer to standard output */ ??????????????????s = write (1, buf, count); ??????????????????if (s == -1) ????????????????????{ ??????????????????????perror ("write"); ??????????????????????abort (); ????????????????????} ????????????????} ??????????????if (done) ????????????????{ ??????????????????printf ("Closed connection on descriptor %dn", ??????????????????????????events[i].data.fd); ??????????????????/* Closing the descriptor will make epoll remove it ???????????????????? from the set of descriptors which are monitored. */ ??????????????????close (events[i].data.fd); ????????????????} ????????????} ????????} ????} ??free (events); ??close (sfd); ??return EXIT_SUCCESS; } |
main() 首先調(diào)用 create_and_bind() 新建套接字。然后把套接字設(shè)置非阻塞模式,再調(diào)用listen(2)。接下來它創(chuàng)建一個 epoll 實(shí)例 efd,添加監(jiān)聽套接字 sfd ,用電平觸發(fā)模式來監(jiān)聽輸入事件。
外層的 while 循環(huán)是主要事件循環(huán)。它調(diào)用epoll_wait(2),線程保持阻塞以等待事件到來。當(dāng)事件就緒,epoll_wait(2) 用 events 參數(shù)返回事件,這個參數(shù)是一群 epoll_event 結(jié)構(gòu)體。
當(dāng)我們添加新的監(jiān)聽輸入連接以及刪除終止的現(xiàn)有連接時,efd 這個 epoll 實(shí)例在事件循環(huán)中不斷更新。
當(dāng)事件是可用的,它們可以有三種類型:
- 錯誤:當(dāng)一個錯誤連接出現(xiàn),或事件不是一個可以讀取數(shù)據(jù)的通知,我們只要簡單地關(guān)閉相關(guān)的描述符。關(guān)閉描述符會自動地移除 efd 這個 epoll 實(shí)例的監(jiān)聽列表。
- 新連接:當(dāng)監(jiān)聽描述符 sfd 是可讀狀態(tài),這表明一個或多個連接已經(jīng)到達(dá)。當(dāng)有一個新連接, accept(2) 接受這個連接,打印一條相應(yīng)的消息,把這個到來的套接字設(shè)置為非阻塞狀態(tài),并將其添加到 efd 這個?epoll 實(shí)例的監(jiān)聽列表。
- 客戶端數(shù)據(jù):當(dāng)任何一個客戶端描述符的數(shù)據(jù)可讀時,我們在內(nèi)部 while 循環(huán)中用 read(2) 以 512 字節(jié)大小讀取數(shù)據(jù)。這是因?yàn)楫?dāng)前我們必須讀走所有可讀的數(shù)據(jù),當(dāng)監(jiān)聽描述符是邊緣觸發(fā)模式下,我們不會再得到事件。被讀取的數(shù)據(jù)使用 write(2) 被寫入標(biāo)準(zhǔn)輸出(fd=1)。如果 read(2) 返回 0,這表示 EOF 并且我們可以關(guān)閉這個客戶端的連接。如果返回 -1,errno 被設(shè)置為 EAGAIN,這表示這個事件的所有數(shù)據(jù)被讀走,我們可以返回主循環(huán)。
就是這樣。它在一個循環(huán)中運(yùn)行,在監(jiān)聽列表中添加和刪除描述符。
下載 epoll-example.c 代碼。
更新1:水平和邊緣觸發(fā)的定義被顛倒錯誤了(雖然代碼是正確的)。這是被Reddit用戶 bodski 發(fā)現(xiàn)的。文章現(xiàn)在正確了。我應(yīng)該在發(fā)布前校對的。對不起,并感謝謝指出錯誤。:)
更新2:代碼被修改成連接將被阻塞時才執(zhí)行accept(2),所以如果多個連接到達(dá),我們?nèi)拷邮堋_@是Reddit用戶 pitchford 提出。謝謝你的評論。 :)
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的通过完整示例来理解如何使用 epoll的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IO多路复用之epoll总结
- 下一篇: 使用四种框架分别实现百万websocke