高并发的epoll+线程池,epoll在线程池内
生活随笔
收集整理的這篇文章主要介紹了
高并发的epoll+线程池,epoll在线程池内
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
epoll是linux下高并發(fā)服務(wù)器的完美方案,因?yàn)槭腔谑录|發(fā)的,所以比select快的不只是一個(gè)數(shù)量級(jí)。 單線程epoll,觸發(fā)量可達(dá)到15000,但是加上業(yè)務(wù)后,因?yàn)榇蠖鄶?shù)業(yè)務(wù)都與數(shù)據(jù)庫(kù)打交道,所以就會(huì)存在阻塞的情況,這個(gè)時(shí)候就必須用多線程來提速。 epoll在線程池內(nèi),測(cè)試結(jié)果2000個(gè)/s 增加了網(wǎng)絡(luò)斷線后的無效socket檢測(cè)。 測(cè)試工具:stressmark 因?yàn)榧恿诉m用與ab的代碼,所以也可以適用ab進(jìn)行壓力測(cè)試。 char buf[1000] = {0};
sprintf(buf,"HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s","Hello world!\n");
send(socketfd,buf, strlen(buf),0); #include<stdio.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/types.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
//stl head
#include <ext/hash_map>//包含hash_map 的頭文件
//#include //stl的map
using namespace std ; //std 命名空間
using namespace __gnu_cxx ; //而hash_map是在__gnu_cxx的命名空間里的
int init_thread_pool ( int threadNum ) ;
void *epoll_loop ( void * para ) ;
void *check_connect_timeout ( void * para ) ;
struct sockStruct
{
???? time_t time ;
???? unsigned int * recvBuf ;
} ;
//hash-map
//hash_map????????sock_map;
hash_map < int , sockStruct >????????sock_map ;
?
# define MAXRECVBUF 4096
# define MAXBUF MAXRECVBUF +10
int fd_Setnonblocking ( int fd )
{
???? int op ;
?
????op =fcntl (fd ,F_GETFL ,0 ) ;
????fcntl (fd ,F_SETFL ,op |O_NONBLOCK ) ;
?
???? return op ;
}
?
void on_sigint ( int signal )
{
???? exit (0 ) ;
}
?
/*
handle_message - 處理每個(gè) socket 上的消息收發(fā)
*/
int handle_message ( int new_fd )
{
???? char buf [MAXBUF + 1 ] ;
???? char sendbuf [MAXBUF +1 ] ;
???? int len ;
???? /* 開始處理每個(gè)新連接上的數(shù)據(jù)收發(fā) */
????bzero (buf , MAXBUF + 1 ) ;
???? /* 接收客戶端的消息 */
???? //len = recv(new_fd, buf, MAXBUF, 0);
???? int nRecvBuf = MAXRECVBUF ; //設(shè)置為32K
???? setsockopt (new_fd , SOL_SOCKET , SO_RCVBUF , ( const char * ) &nRecvBuf , sizeof ( int ) ) ;
????len = recv (new_fd , &buf , MAXBUF ,0 ) ;
???? //--------------------------------------------------------------------------------------------
???? //這塊為了使用ab測(cè)試
???? char bufSend [1000 ] = {0 } ;
???? sprintf (bufSend , "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s" , "Hello world!\n" ) ;
???? send (new_fd ,bufSend , strlen (buf ) ,0 ) ;
???? //--------------------------------------------------------------------------------------------
???? if (len > 0 ) {
???????? //printf ("%d接收消息成功:'%s',共%d個(gè)字節(jié)的數(shù)據(jù)\n", new_fd, buf, len);
???????? //hash-map
????????
????????hash_map < int , sockStruct > : : iterator it_find ;
????????it_find = sock_map . find (new_fd ) ;
???????? if (it_find = = sock_map .end ( ) ) {
???????????? //新的網(wǎng)絡(luò)連接,申請(qǐng)新的接收緩沖區(qū),并放入map中
???????????? //printf("new socket %d\n", new_fd);
????????????sockStruct newSockStruct ;
????????????newSockStruct . time = time ( ( time_t * )0 ) ;
????????????newSockStruct .recvBuf = ( unsigned int * ) malloc (1000 ) ;
???????????? memset (newSockStruct .recvBuf , 0 , 1000 ) ;
???????????? strcat ( ( char * )newSockStruct .recvBuf , buf ) ;
????????????sock_map .insert ( pair < int ,sockStruct > (new_fd , newSockStruct ) ) ;
???????? } else {
???????????? //網(wǎng)絡(luò)連接已經(jīng)存在,找到對(duì)應(yīng)的數(shù)據(jù)緩沖區(qū),將接收到的數(shù)據(jù)拼接到數(shù)據(jù)緩沖區(qū)中
???????????? //printf("socket %d exist!\n", it_find->first);
???????????? (it_find - >second ) . time = time ( ( time_t * )0 ) ;???????????????? //時(shí)間更改
???????????? char * bufSockMap = ( char * ) (it_find - >second ) .recvBuf ;???? //數(shù)據(jù)存儲(chǔ)
???????????? strcat (bufSockMap , buf ) ;
???????????? //printf("bufSockMap:%s\n", bufSockMap);
???????? }
???? }
???? else {
???????? if (len < 0 )
???????????? printf ( "消息接收失敗!錯(cuò)誤代碼是%d,錯(cuò)誤信息是'%s'\n" ,
???????????? errno , strerror ( errno ) ) ;
???????? else {
???????????? //將socket從map中移除
???????????? /*
????????????hash_map::iterator it_find;
????????????it_find = sock_map.find(new_fd);
????????????sock_map.erase(it_find);
????????????*/
???????????? printf ( "client %d quit!\n" ,new_fd ) ;
???????? }
???????? //close(new_fd);
???????? return -1 ;
???? }
???? /* 處理每個(gè)新連接上的數(shù)據(jù)收發(fā)結(jié)束 */
???? //關(guān)閉socket的時(shí)候,要釋放接收緩沖區(qū)。
????hash_map < int , sockStruct > : : iterator it_find ;
????it_find = sock_map . find (new_fd ) ;
???? free ( (it_find - >second ) .recvBuf ) ;
????sock_map .erase (it_find ) ;
???? close (new_fd ) ;
???? return len ;
}
???? int listenfd ;
???? int sock_op =1 ;
???? struct sockaddr_in address ;
???? struct epoll_event event ;
???? struct epoll_event events [1024 ] ;
???? int epfd ;
???? int n ;
???? int i ;
???? char buf [512 ] ;
???? int off ;
???? int result ;
???? char *p ;
int main ( int argc , char * argv [ ] )
{
????init_thread_pool (1 ) ;
???? signal (SIGPIPE , SIG_IGN ) ;
???? signal (SIGCHLD , SIG_IGN ) ;
???? signal ( SIGINT , &on_sigint ) ;
????listenfd = socket ( AF_INET , SOCK_STREAM ,0 ) ;
???? setsockopt (listenfd ,SOL_SOCKET ,SO_REUSEADDR , &sock_op , sizeof (sock_op ) ) ;
?
???? memset ( &address ,0 , sizeof (address ) ) ;
????address .sin_addr .s_addr = htonl ( INADDR_ANY ) ;
????address .sin_port = htons (8006 ) ;
???? bind (listenfd , ( struct sockaddr * ) &address , sizeof (address ) ) ;
???? listen (listenfd ,1024 ) ;
????fd_Setnonblocking (listenfd ) ;
?
????epfd =epoll_create (65535 ) ;
???? memset ( &event ,0 , sizeof (event ) ) ;
????event .data .fd =listenfd ;
????event .events =EPOLLIN |EPOLLET ;
????epoll_ctl (epfd ,EPOLL_CTL_ADD ,listenfd , &event ) ;
???? while (1 ) {
???????? sleep (1000 ) ;
???? }
???? return 0 ;
}
/*************************************************
* Function: * init_thread_pool
* Description: * 初始化線程
* Input: * threadNum:用于處理epoll的線程數(shù)
* Output: *
* Others: * 此函數(shù)為靜態(tài)static函數(shù),
*************************************************/
int init_thread_pool ( int threadNum )
{
???? int i ,ret ;
???? pthread_t threadId ;
???? //初始化epoll線程池
???? for ( i = 0 ; i < threadNum ; i + + )
???? {
????????ret = pthread_create ( &threadId , 0 , epoll_loop , ( void * )0 ) ;
???????? if (ret ! = 0 )
???????? {
???????????? printf ( "pthread create failed!\n" ) ;
???????????? return ( -1 ) ;
???????? }
???? }
????ret = pthread_create ( &threadId , 0 , check_connect_timeout , ( void * )0 ) ;
???? return (0 ) ;
}
/*************************************************
* Function: * epoll_loop
* Description: * epoll檢測(cè)循環(huán)
* Input: *
* Output: *
* Others: *
*************************************************/
static int count111 = 0 ;
static time_t oldtime = 0 , nowtime = 0 ;
void *epoll_loop ( void * para )
{
???????? while (1 )
???? {
????????n =epoll_wait (epfd ,events ,4096 , -1 ) ;
???????? //printf("n = %d\n", n);
???????? if (n >0 )
???????? {
???????????? for (i =0 ;i <n ; + +i )
???????????? {
???????????????? if (events [i ] .data .fd = =listenfd )
???????????????? {
???????????????????? while (1 )
???????????????????? {
????????????????????????event .data .fd = accept (listenfd , NULL , NULL ) ;
???????????????????????? if (event .data .fd >0 )
???????????????????????? {
????????????????????????????fd_Setnonblocking (event .data .fd ) ;
????????????????????????????event .events =EPOLLIN |EPOLLET ;
????????????????????????????epoll_ctl (epfd ,EPOLL_CTL_ADD ,event .data .fd , &event ) ;
???????????????????????? }
???????????????????????? else
???????????????????????? {
???????????????????????????? if ( errno = =EAGAIN )
???????????????????????????? break ;
???????????????????????? }
???????????????????? }
???????????????? }
???????????????? else
???????????????? {
???????????????????? if (events [i ] .events &EPOLLIN )
???????????????????? {
???????????????????????? //handle_message(events[i].data.fd);
???????????????????????? char recvBuf [1024 ] = {0 } ;
???????????????????????? int ret = 999 ;
???????????????????????? int rs = 1 ;
???????????????????????? while (rs )
???????????????????????? {
????????????????????????????ret = recv (events [n ] .data .fd ,recvBuf ,1024 ,0 ) ; // 接受客戶端消息
???????????????????????????? if (ret < 0 )
???????????????????????????? {
???????????????????????????????? //由于是非阻塞的模式,所以當(dāng)errno為EAGAIN時(shí),表示當(dāng)前緩沖區(qū)已無數(shù)據(jù)可//讀在這里就當(dāng)作是該次事件已處理過。
???????????????????????????????? if ( errno = = EAGAIN )
???????????????????????????????? {
???????????????????????????????????? printf ( "EAGAIN\n" ) ;
???????????????????????????????????? break ;
???????????????????????????????? }
???????????????????????????????? else {
???????????????????????????????????? printf ( "recv error!\n" ) ;
????????????????????????????????????epoll_ctl (epfd , EPOLL_CTL_DEL , events [i ] .data .fd , &event ) ;
???????????????????????????????????? close (events [i ] .data .fd ) ;
???????????????????????????????????? break ;
???????????????????????????????? }
???????????????????????????? }
???????????????????????????? else if (ret = = 0 )
???????????????????????????? {
???????????????????????????????? // 這里表示對(duì)端的socket已正常關(guān)閉.
????????????????????????????????rs = 0 ;
???????????????????????????? }
???????????????????????????? if (ret = = sizeof (recvBuf ) )
????????????????????????????????rs = 1 ; // 需要再次讀取
???????????????????????????? else
????????????????????????????????rs = 0 ;
???????????????????????? }
???????????????????????? if (ret >0 ) {
????????????????????????????count111 + + ;
???????????????????????????? struct tm *today ;
???????????????????????????? time_t ltime ;
???????????????????????????? time ( &nowtime ) ;
???????????????????????????? if (nowtime ! = oldtime ) {
???????????????????????????????? printf ( "%d\n" , count111 ) ;
????????????????????????????????oldtime = nowtime ;
????????????????????????????????count111 = 0 ;
???????????????????????????? }
???????????????????????????? char buf [1000 ] = {0 } ;
???????????????????????????? sprintf (buf , "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s" , "Hello world!\n" ) ;
???????????????????????????? send (events [i ] .data .fd ,buf , strlen (buf ) ,0 ) ;
???????????????????????????? //????CGelsServer Gelsserver;
???????????????????????????? //????Gelsserver.handle_message(events[i].data.fd);
???????????????????????? }
????????????????????????epoll_ctl (epfd , EPOLL_CTL_DEL , events [i ] .data .fd , &event ) ;
???????????????????????? close (events [i ] .data .fd ) ;
???????????????????? }
???????????????????? else if (events [i ] .events &EPOLLOUT )
???????????????????? {
???????????????????????? sprintf (buf , "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s" , "Hello world!\n" ) ;
???????????????????????? send (events [i ] .data .fd ,buf , strlen (buf ) ,0 ) ;
???????????????????????? /*
????????????????????????if(p!=NULL)
????????????????????????{
????????????????????????????free(p);
????????????????????????????p=NULL;
????????????????????????}
????????????????????????*/
???????????????????????? close (events [i ] .data .fd ) ;
???????????????????? }
???????????????????? else
???????????????????? {
???????????????????????? close (events [i ] .data .fd ) ;
???????????????????? }
???????????????? }
???????????? }
???????? }
???? }
}
/*************************************************
* Function: * check_connect_timeout
* Description: * 檢測(cè)長(zhǎng)時(shí)間沒反應(yīng)的網(wǎng)絡(luò)連接,并關(guān)閉刪除
* Input: *
* Output: *
* Others: *
*************************************************/
void *check_connect_timeout ( void * para )
{
????hash_map < int , sockStruct > : : iterator it_find ;
???? for (it_find = sock_map .begin ( ) ; it_find!=sock_map .end ( ) ; + +it_find ) {
???????? if ( time ( ( time_t * )0 ) - (it_find - >second ) . time > 120 ) {???????????????? //時(shí)間更改
???????????? free ( (it_find - >second ) .recvBuf ) ;
????????????sock_map .erase (it_find ) ;
???????????? close (it_find - >first ) ;
???????? }
???? }
}
sprintf(buf,"HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s","Hello world!\n");
send(socketfd,buf, strlen(buf),0); #include<stdio.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/types.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
//stl head
#include <ext/hash_map>//包含hash_map 的頭文件
//#include //stl的map
using namespace std ; //std 命名空間
using namespace __gnu_cxx ; //而hash_map是在__gnu_cxx的命名空間里的
int init_thread_pool ( int threadNum ) ;
void *epoll_loop ( void * para ) ;
void *check_connect_timeout ( void * para ) ;
struct sockStruct
{
???? time_t time ;
???? unsigned int * recvBuf ;
} ;
//hash-map
//hash_map????????sock_map;
hash_map < int , sockStruct >????????sock_map ;
?
# define MAXRECVBUF 4096
# define MAXBUF MAXRECVBUF +10
int fd_Setnonblocking ( int fd )
{
???? int op ;
?
????op =fcntl (fd ,F_GETFL ,0 ) ;
????fcntl (fd ,F_SETFL ,op |O_NONBLOCK ) ;
?
???? return op ;
}
?
void on_sigint ( int signal )
{
???? exit (0 ) ;
}
?
/*
handle_message - 處理每個(gè) socket 上的消息收發(fā)
*/
int handle_message ( int new_fd )
{
???? char buf [MAXBUF + 1 ] ;
???? char sendbuf [MAXBUF +1 ] ;
???? int len ;
???? /* 開始處理每個(gè)新連接上的數(shù)據(jù)收發(fā) */
????bzero (buf , MAXBUF + 1 ) ;
???? /* 接收客戶端的消息 */
???? //len = recv(new_fd, buf, MAXBUF, 0);
???? int nRecvBuf = MAXRECVBUF ; //設(shè)置為32K
???? setsockopt (new_fd , SOL_SOCKET , SO_RCVBUF , ( const char * ) &nRecvBuf , sizeof ( int ) ) ;
????len = recv (new_fd , &buf , MAXBUF ,0 ) ;
???? //--------------------------------------------------------------------------------------------
???? //這塊為了使用ab測(cè)試
???? char bufSend [1000 ] = {0 } ;
???? sprintf (bufSend , "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s" , "Hello world!\n" ) ;
???? send (new_fd ,bufSend , strlen (buf ) ,0 ) ;
???? //--------------------------------------------------------------------------------------------
???? if (len > 0 ) {
???????? //printf ("%d接收消息成功:'%s',共%d個(gè)字節(jié)的數(shù)據(jù)\n", new_fd, buf, len);
???????? //hash-map
????????
????????hash_map < int , sockStruct > : : iterator it_find ;
????????it_find = sock_map . find (new_fd ) ;
???????? if (it_find = = sock_map .end ( ) ) {
???????????? //新的網(wǎng)絡(luò)連接,申請(qǐng)新的接收緩沖區(qū),并放入map中
???????????? //printf("new socket %d\n", new_fd);
????????????sockStruct newSockStruct ;
????????????newSockStruct . time = time ( ( time_t * )0 ) ;
????????????newSockStruct .recvBuf = ( unsigned int * ) malloc (1000 ) ;
???????????? memset (newSockStruct .recvBuf , 0 , 1000 ) ;
???????????? strcat ( ( char * )newSockStruct .recvBuf , buf ) ;
????????????sock_map .insert ( pair < int ,sockStruct > (new_fd , newSockStruct ) ) ;
???????? } else {
???????????? //網(wǎng)絡(luò)連接已經(jīng)存在,找到對(duì)應(yīng)的數(shù)據(jù)緩沖區(qū),將接收到的數(shù)據(jù)拼接到數(shù)據(jù)緩沖區(qū)中
???????????? //printf("socket %d exist!\n", it_find->first);
???????????? (it_find - >second ) . time = time ( ( time_t * )0 ) ;???????????????? //時(shí)間更改
???????????? char * bufSockMap = ( char * ) (it_find - >second ) .recvBuf ;???? //數(shù)據(jù)存儲(chǔ)
???????????? strcat (bufSockMap , buf ) ;
???????????? //printf("bufSockMap:%s\n", bufSockMap);
???????? }
???? }
???? else {
???????? if (len < 0 )
???????????? printf ( "消息接收失敗!錯(cuò)誤代碼是%d,錯(cuò)誤信息是'%s'\n" ,
???????????? errno , strerror ( errno ) ) ;
???????? else {
???????????? //將socket從map中移除
???????????? /*
????????????hash_map::iterator it_find;
????????????it_find = sock_map.find(new_fd);
????????????sock_map.erase(it_find);
????????????*/
???????????? printf ( "client %d quit!\n" ,new_fd ) ;
???????? }
???????? //close(new_fd);
???????? return -1 ;
???? }
???? /* 處理每個(gè)新連接上的數(shù)據(jù)收發(fā)結(jié)束 */
???? //關(guān)閉socket的時(shí)候,要釋放接收緩沖區(qū)。
????hash_map < int , sockStruct > : : iterator it_find ;
????it_find = sock_map . find (new_fd ) ;
???? free ( (it_find - >second ) .recvBuf ) ;
????sock_map .erase (it_find ) ;
???? close (new_fd ) ;
???? return len ;
}
???? int listenfd ;
???? int sock_op =1 ;
???? struct sockaddr_in address ;
???? struct epoll_event event ;
???? struct epoll_event events [1024 ] ;
???? int epfd ;
???? int n ;
???? int i ;
???? char buf [512 ] ;
???? int off ;
???? int result ;
???? char *p ;
int main ( int argc , char * argv [ ] )
{
????init_thread_pool (1 ) ;
???? signal (SIGPIPE , SIG_IGN ) ;
???? signal (SIGCHLD , SIG_IGN ) ;
???? signal ( SIGINT , &on_sigint ) ;
????listenfd = socket ( AF_INET , SOCK_STREAM ,0 ) ;
???? setsockopt (listenfd ,SOL_SOCKET ,SO_REUSEADDR , &sock_op , sizeof (sock_op ) ) ;
?
???? memset ( &address ,0 , sizeof (address ) ) ;
????address .sin_addr .s_addr = htonl ( INADDR_ANY ) ;
????address .sin_port = htons (8006 ) ;
???? bind (listenfd , ( struct sockaddr * ) &address , sizeof (address ) ) ;
???? listen (listenfd ,1024 ) ;
????fd_Setnonblocking (listenfd ) ;
?
????epfd =epoll_create (65535 ) ;
???? memset ( &event ,0 , sizeof (event ) ) ;
????event .data .fd =listenfd ;
????event .events =EPOLLIN |EPOLLET ;
????epoll_ctl (epfd ,EPOLL_CTL_ADD ,listenfd , &event ) ;
???? while (1 ) {
???????? sleep (1000 ) ;
???? }
???? return 0 ;
}
/*************************************************
* Function: * init_thread_pool
* Description: * 初始化線程
* Input: * threadNum:用于處理epoll的線程數(shù)
* Output: *
* Others: * 此函數(shù)為靜態(tài)static函數(shù),
*************************************************/
int init_thread_pool ( int threadNum )
{
???? int i ,ret ;
???? pthread_t threadId ;
???? //初始化epoll線程池
???? for ( i = 0 ; i < threadNum ; i + + )
???? {
????????ret = pthread_create ( &threadId , 0 , epoll_loop , ( void * )0 ) ;
???????? if (ret ! = 0 )
???????? {
???????????? printf ( "pthread create failed!\n" ) ;
???????????? return ( -1 ) ;
???????? }
???? }
????ret = pthread_create ( &threadId , 0 , check_connect_timeout , ( void * )0 ) ;
???? return (0 ) ;
}
/*************************************************
* Function: * epoll_loop
* Description: * epoll檢測(cè)循環(huán)
* Input: *
* Output: *
* Others: *
*************************************************/
static int count111 = 0 ;
static time_t oldtime = 0 , nowtime = 0 ;
void *epoll_loop ( void * para )
{
???????? while (1 )
???? {
????????n =epoll_wait (epfd ,events ,4096 , -1 ) ;
???????? //printf("n = %d\n", n);
???????? if (n >0 )
???????? {
???????????? for (i =0 ;i <n ; + +i )
???????????? {
???????????????? if (events [i ] .data .fd = =listenfd )
???????????????? {
???????????????????? while (1 )
???????????????????? {
????????????????????????event .data .fd = accept (listenfd , NULL , NULL ) ;
???????????????????????? if (event .data .fd >0 )
???????????????????????? {
????????????????????????????fd_Setnonblocking (event .data .fd ) ;
????????????????????????????event .events =EPOLLIN |EPOLLET ;
????????????????????????????epoll_ctl (epfd ,EPOLL_CTL_ADD ,event .data .fd , &event ) ;
???????????????????????? }
???????????????????????? else
???????????????????????? {
???????????????????????????? if ( errno = =EAGAIN )
???????????????????????????? break ;
???????????????????????? }
???????????????????? }
???????????????? }
???????????????? else
???????????????? {
???????????????????? if (events [i ] .events &EPOLLIN )
???????????????????? {
???????????????????????? //handle_message(events[i].data.fd);
???????????????????????? char recvBuf [1024 ] = {0 } ;
???????????????????????? int ret = 999 ;
???????????????????????? int rs = 1 ;
???????????????????????? while (rs )
???????????????????????? {
????????????????????????????ret = recv (events [n ] .data .fd ,recvBuf ,1024 ,0 ) ; // 接受客戶端消息
???????????????????????????? if (ret < 0 )
???????????????????????????? {
???????????????????????????????? //由于是非阻塞的模式,所以當(dāng)errno為EAGAIN時(shí),表示當(dāng)前緩沖區(qū)已無數(shù)據(jù)可//讀在這里就當(dāng)作是該次事件已處理過。
???????????????????????????????? if ( errno = = EAGAIN )
???????????????????????????????? {
???????????????????????????????????? printf ( "EAGAIN\n" ) ;
???????????????????????????????????? break ;
???????????????????????????????? }
???????????????????????????????? else {
???????????????????????????????????? printf ( "recv error!\n" ) ;
????????????????????????????????????epoll_ctl (epfd , EPOLL_CTL_DEL , events [i ] .data .fd , &event ) ;
???????????????????????????????????? close (events [i ] .data .fd ) ;
???????????????????????????????????? break ;
???????????????????????????????? }
???????????????????????????? }
???????????????????????????? else if (ret = = 0 )
???????????????????????????? {
???????????????????????????????? // 這里表示對(duì)端的socket已正常關(guān)閉.
????????????????????????????????rs = 0 ;
???????????????????????????? }
???????????????????????????? if (ret = = sizeof (recvBuf ) )
????????????????????????????????rs = 1 ; // 需要再次讀取
???????????????????????????? else
????????????????????????????????rs = 0 ;
???????????????????????? }
???????????????????????? if (ret >0 ) {
????????????????????????????count111 + + ;
???????????????????????????? struct tm *today ;
???????????????????????????? time_t ltime ;
???????????????????????????? time ( &nowtime ) ;
???????????????????????????? if (nowtime ! = oldtime ) {
???????????????????????????????? printf ( "%d\n" , count111 ) ;
????????????????????????????????oldtime = nowtime ;
????????????????????????????????count111 = 0 ;
???????????????????????????? }
???????????????????????????? char buf [1000 ] = {0 } ;
???????????????????????????? sprintf (buf , "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s" , "Hello world!\n" ) ;
???????????????????????????? send (events [i ] .data .fd ,buf , strlen (buf ) ,0 ) ;
???????????????????????????? //????CGelsServer Gelsserver;
???????????????????????????? //????Gelsserver.handle_message(events[i].data.fd);
???????????????????????? }
????????????????????????epoll_ctl (epfd , EPOLL_CTL_DEL , events [i ] .data .fd , &event ) ;
???????????????????????? close (events [i ] .data .fd ) ;
???????????????????? }
???????????????????? else if (events [i ] .events &EPOLLOUT )
???????????????????? {
???????????????????????? sprintf (buf , "HTTP/1.0 200 OK\r\nContent-type: text/plain\r\n\r\n%s" , "Hello world!\n" ) ;
???????????????????????? send (events [i ] .data .fd ,buf , strlen (buf ) ,0 ) ;
???????????????????????? /*
????????????????????????if(p!=NULL)
????????????????????????{
????????????????????????????free(p);
????????????????????????????p=NULL;
????????????????????????}
????????????????????????*/
???????????????????????? close (events [i ] .data .fd ) ;
???????????????????? }
???????????????????? else
???????????????????? {
???????????????????????? close (events [i ] .data .fd ) ;
???????????????????? }
???????????????? }
???????????? }
???????? }
???? }
}
/*************************************************
* Function: * check_connect_timeout
* Description: * 檢測(cè)長(zhǎng)時(shí)間沒反應(yīng)的網(wǎng)絡(luò)連接,并關(guān)閉刪除
* Input: *
* Output: *
* Others: *
*************************************************/
void *check_connect_timeout ( void * para )
{
????hash_map < int , sockStruct > : : iterator it_find ;
???? for (it_find = sock_map .begin ( ) ; it_find!=sock_map .end ( ) ; + +it_find ) {
???????? if ( time ( ( time_t * )0 ) - (it_find - >second ) . time > 120 ) {???????????????? //時(shí)間更改
???????????? free ( (it_find - >second ) .recvBuf ) ;
????????????sock_map .erase (it_find ) ;
???????????? close (it_find - >first ) ;
???????? }
???? }
}
總結(jié)
以上是生活随笔為你收集整理的高并发的epoll+线程池,epoll在线程池内的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: TCP漏洞:半连接
- 下一篇: Linux内核目录结构(2.6版本以上的