3高并发服务器:多路IO之epoll
1 epoll
epoll是Linux下多路復用IO接口select/poll的增強版本,它能顯著提高程序在大量并、發連接中只有少量活躍的情況下的系統CPU利用率,因為它會復用文件描述符集合來傳遞結果而不用迫使開發者每次等待事件之前都必須重新準備要被偵聽的文件描述符集合,另一點原因就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就行了。
目前epell是linux大規模并發網絡程序中的熱門首選模型。
epoll除了提供select/ poll那種IO事件的電平觸發(Level Triggered)外,還提供了邊沿觸發(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態,減少epoll_wait/epoll_pwait的調用,提高應用程序效率。
一個進程打開大數目的socket描述符
cat/proc/sys/fs/file-max
設置最大打開文件描述符限制
sudo vi /etc/security/limits.conf
??? epollAPI
1.創建一個epoll句柄,參數size用來告訴內核監聽的文件描述符個數,跟內存大小有關。
依賴的頭文件
#include <sys/epoll.h>
函數聲明
int epoll_create(int size);
函數說明:
size:告訴內核監聽的數目
?
2 控制某個epoll監聽的文件描述符上的事件:注冊、修改、刪除
依賴的頭文件
#include <sys/epoll.h>
函數聲明
int epoll_ctl(int epfd,int op,int fd,structepoll_event);
函數說明:
epfd:為epoll_create的句柄
op:表示動作,用3個宏來表示
EPOLL_CTL_ADD(注冊新的fd到epfd)
EPOLL_CTL_MOD(修改已經注冊的fd的監聽事件)
EPOLL_CTL_DEL(從epfd刪除一個fd)
event:告訴內核需要監聽的事件
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
?
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
?
EPOLLIN :表示對應的文件描述符可以讀(包括對端SOCKET正常關閉)
EPOLLOUT:表示對應的文件描述符可以寫
EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這里應該表示有帶外數據到來)
EPOLLERR:表示對應的文件描述符發生錯誤
EPOLLHUP:表示對應的文件描述符被掛斷;
EPOLLET:將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對于水平觸發(Level Triggered)來說的
EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里?
3 等待所監控文件描述符上有事件的產生,類似select()調用。
依賴的頭文件
#include <sys/epoll.h>
函數聲明:
int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);
參數介紹:
events:用來從內核得到事件的集合。
maxevents:告之內核這個events有多大,這個maxevents的值不能大于創建epoll_create()時的size,timeout:是超時時間
-1:阻塞
0:立即返回,非阻塞
??? >0:指定微秒
返回值:成功返回有多少文件描述符就緒,時間到時返回0,出錯返回-1
案例說明:
server.c
| #include<stdio.h> #include<stdlib.h> #include<string.h> #include<netinet/in.h> #include<arpa/inet.h> #include<sys/epoll.h> #include<errno.h> #include<unistd.h> #include<ctype.h> #include"wrap.h" ? #define MAXLINE 80 #define SERV_PORT 8000 #define OPEN_MAX 1024 ? int main(void) { ??? int i,j,maxi,listenfd,connfd,sockfd; ??? int nready,efd,res; ??? ssize_t n; ??? char buf[MAXLINE],str[INET_ADDRSTRLEN]; ??? socklen_t clilen; ??? int client[OPEN_MAX]; struct sockaddr_in cliaddr,servaddr; //ep[OPEN_MAX]保存就緒的文件描述符 ??? struct epoll_event tep,ep[OPEN_MAX]; ? ??? listenfd = Socket(AF_INET,SOCK_STREAM,0); ? ??? bzero(&servaddr,sizeof(servaddr)); ??? servaddr.sin_family = AF_INET; ??? servaddr.sin_addr.s_addr = htonl(INADDR_ANY); ??? servaddr.sin_port = htons(SERV_PORT); ? ??? Bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr)); ??? Listen(listenfd,20); ? ??? for(i = 0;i < OPEN_MAX;i++) { ??????? client[i] = -1; ??? } ??? maxi = -1; ? ??? efd = epoll_create(OPEN_MAX); ? ??? if(efd == -1) ??????? perr_exit("epoll_create"); ?? ??? //監聽讀屬性 tep.events = EPOLLIN; //data里面保存了就緒的文件的文件描述符。 ??? tep.data.fd = listenfd; ??? res = epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&tep); ??? if(res == -1) ??????? perr_exit("epoll_ctl"); ? ??? for(;;) { ??????? /*阻塞監聽*/ ??????? nready = epoll_wait(efd,ep,OPEN_MAX,-1); ??????? if(nready == -1) { ??????????? perr_exit("epoll_wait"); ??????? } ??????? for(i = 0;i< nready;i++) { ??????????? if(!ep[i].events & EPOLLIN) ??????????????? continue; ??????????? if(ep[i].data.fd == listenfd) { ?????????????? ?clilen = sizeof(cliaddr); ??????????????? connfd = Accept(listenfd,(struct sockaddr *)&cliaddr,&clilen); ??????????????? printf("received from %s at PORT %d\n", ??????????????????????? inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)), ??????????????? ????????ntohs(cliaddr.sin_port)); ? ??????????????? for(j = 0; j < OPEN_MAX; j++) { ??????????????????? if(client[j] < 0) { ??????????????????????? client[j] = connfd;? /*save descriptor*/ ??????????????????????? break; ??????????????????? } ?????????????? ?} ? ??????????????? if(j==OPEN_MAX) ??????????????????? perr_exit("too many clients"); ? ??????????????? if(j > maxi) ??????????????????? maxi = j;? /*max index in client[] array*/ ??????????????? tep.events = EPOLLIN; ??????????????? tep.data.fd = connfd; ??????????????? res = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep); ??????????????? if(res == -1) { ??????????????????? perr_exit("epoll_ctl"); ??????????????? } else { ??????????????????? sockfd = ep[i].data.fd; ??????????????????? n = Read(sockfd,buf,MAXLINE); ??????????????????? if(n==0) { ??????????????????????? for(j = 0; j <= maxi;j++) { ??????????????????????????? if(client[j] == sockfd) { ??????????????????????????????? client[j] = -1; ??????????????????????????????? break; ??????????????????????????? } ??????????????????????? } ??????????????????????? res = epoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL); ??????????????????????? if(res == -1) { ??????????????????????????? perr_exit("epoll_ctl"); ??????????????????????? } ??????????????????????? Close(sockfd); ??????????????????????? printf("client[%d] closed connection",j); ??????????????????? } else { ??????????????????????? for(j = 0;j < n;j++) ??????????????????????????? buf[j] = toupper(buf[j]); ??????????????????????? Writen(sockfd,buf,n); ?????????????? ?????} ??????????????? } ??????????? } ??????? } ??? } ??? Close(listenfd); ??? Close(efd); ??? return 0; } |
client.c
| #include<stdio.h> #include<string.h> #include<unistd.h> #include<arpa/inet.h> #include<netinet/in.h> #include"wrap.h" ? #define MAXLINE 80 #define SERV_PORT 8000 ? int main(void) { ??? struct sockaddr_in servaddr; ??? char buf[MAXLINE]; ??? int sockfd,n; ? ??? sockfd = Socket(AF_INET,SOCK_STREAM,0); ??? ??? bzero(&servaddr,sizeof(servaddr)); ??? servaddr.sin_family = AF_INET; ??? inet_pton(AF_INET,"127.0.0.1",&servaddr.sin_addr); ??? servaddr.sin_port = htons(SERV_PORT); ? ??? Connect(sockfd,(struct sockaddr *) &servaddr,sizeof(servaddr)); ? ??? while(fgets(buf,MAXLINE,stdin) != NULL) { ??????? Write(sockfd,buf,strlen(buf)); ??????? n = Read(sockfd,buf,MAXLINE); ??????? if(n == 0) ??????????? printf("the other side has been closed.\n"); ??????? else ??????????? Write(STDOUT_FILENO,buf,n); ??? } ? ??? Close(sockfd); ??? return 0; } |
wrap.h
| #ifndef __WRAP_H_ #define __WRAP_H_ ? void perr_exit(const char *s); int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr); void Bind(int fd, const struct sockaddr *sa, socklen_t salen); void Connect(int fd, const struct sockaddr *sa, socklen_t salen); void Listen(int fd, int backlog); int Socket(int family, int type, int protocol); ssize_t Read(int fd, void *ptr, size_t nbytes); ssize_t Write(int fd, const void *ptr, size_t nbytes); void Close(int fd); ssize_t Readn(int fd, void *vptr, size_t n); ssize_t Writen(int fd, const void *vptr, size_t n); static ssize_t my_read(int fd, char *ptr); ssize_t Readline(int fd, void *vptr, size_t maxlen); ? #endif |
wrap.c
| #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <unistd.h> #include <sys/socket.h> ? void perr_exit(const char *s) { ???????? perror(s); ???????? exit(1); } ? int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) { ???????? int n; ? again: ???????? if ( (n = accept(fd, sa, salenptr)) < 0) { ?????????????????? if ((errno == ECONNABORTED) || (errno == EINTR)) ??????????????????????????? goto again; ?????????????????? else ??????????????????????????? perr_exit("accept error"); ???????? } ???????? return n; } ? void Bind(int fd, const struct sockaddr *sa, socklen_t salen) { ???????? if (bind(fd, sa, salen) < 0) ?????????????????? perr_exit("bind error"); } ? void Connect(int fd, const struct sockaddr *sa, socklen_t salen) { ???????? if (connect(fd, sa, salen) < 0) ?????????????????? perr_exit("connect error"); } ? void Listen(int fd, int backlog) { ???????? if (listen(fd, backlog) < 0) ?????????????????? perr_exit("listen error"); } ? int Socket(int family, int type, int protocol) { ???????? int n; ? ???????? if ( (n = socket(family, type, protocol)) < 0) ?????????????????? perr_exit("socket error"); ???????? return n; } ? ssize_t Read(int fd, void *ptr, size_t nbytes) { ???????? ssize_t n; ? again: ???????? if ( (n = read(fd, ptr, nbytes)) == -1) { ?????????????????? if (errno == EINTR) ??????????????????????????? goto again; ?????????????????? else ??????????????????????????? return -1; ???????? } ???????? return n; } ? ssize_t Write(int fd, const void *ptr, size_t nbytes) { ???????? ssize_t n; ? again: ???????? if ( (n = write(fd, ptr, nbytes)) == -1) { ?????????????????? if (errno == EINTR) ??????????????????????????? goto again; ?????????????????? else ??????????????????????????? return -1; ???????? } ???????? return n; } ? void Close(int fd) { ???????? if (close(fd) == -1) ?????????????????? perr_exit("close error"); } ssize_t Readn(int fd, void *vptr, size_t n) { ???????? size_t? nleft; ???????? ssize_t nread; ???????? char?? *ptr; ? ???????? ptr = vptr; ???????? nleft = n; ???????? while (nleft > 0) { ?????????????????? if ( (nread = read(fd, ptr, nleft)) < 0) { ??????????????????????????? if (errno == EINTR) ???????????????????????????????????? nread = 0; ??????????????????????????? else ???????????????????????????????????? return -1; ?????????????????? } else if (nread == 0) ??????????????????????????? break; ? ?????????????????? nleft -= nread; ?????????????????? ptr += nread; ???????? } ???????? return n - nleft; } ? ssize_t Writen(int fd,const void *vptr, size_t n) { ???????? size_t nleft; ???????? ssize_t nwritten; ???????? const char *ptr; ? ???????? ptr = vptr; ???????? nleft = n; ???????? while (nleft > 0) { ?????????????????? if ( (nwritten = write(fd, ptr, nleft)) <= 0) { ??????????????????????????? if (nwritten < 0 && errno == EINTR) ???????????????????????????????????? nwritten = 0; ??????????????????????????? else ???????????????????????????????????? return -1; ?????????????????? } ? ?????????????????? nleft -= nwritten; ?????????????????? ptr += nwritten; ???????? } ???????? return n; } static ssize_t my_read(int fd, char *ptr) { ???????? static int read_cnt; ???????? static char *read_ptr; ???????? static char read_buf[100]; ? ???????? if (read_cnt <= 0) { again: ?????????????????? if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) { ??????????????????????????? if (errno == EINTR) ???????????????????????????????????? goto again; ??????????????????????????? return -1; ?????????????????? } else if (read_cnt == 0) ??????????????????????????? return 0; ?????????????????? read_ptr = read_buf; ???????? } ???????? read_cnt--; ???????? *ptr = *read_ptr++; ???????? return 1; } ? ssize_t Readline(int fd, void *vptr, size_t maxlen) { ???????? ssize_t n, rc; ???????? char??? c, *ptr; ? ???????? ptr = vptr; ???????? for (n = 1; n < maxlen; n++) { ?????????????????? if ( (rc = my_read(fd, &c)) == 1) { ??????????????????????????? *ptr++ = c; ??????????????????????????? if (c? == '\n') ???????????????????????????????????? break; ?????????????????? } else if (rc == 0) { ??????????????????????????? *ptr = 0; ??????????????????????????? return n - 1; ?????????????????? } else ??????????????????????????? return -1; ???????? } ???????? *ptr? = 0; ???????? return n; } |
總結
以上是生活随笔為你收集整理的3高并发服务器:多路IO之epoll的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 奥德赛为什么喷载客七人?
- 下一篇: cvt是什么?