朴素、Select、Poll和Epoll网络编程模型实现和分析——朴素模型
? ? ? ? 做Linux網絡開發,一般繞不開標題中幾種網絡編程模型。網上已有很多寫的不錯的分析文章,它們的基本論點是差不多的。但是我覺得他們講的還不夠詳細,在一些關鍵論點上缺乏數據支持。所以我決定好好研究這幾個模型。(轉載請指明出于breaksoftware的csdn博客)
? ? ? ? 在研究這些模型前,我決定按如下步驟去做:
- 實現樸素模型
 - 實現發請求的測試程序
 - 實現Select模型,測試其效率
 - 實現Poll模型,測試其效率
 - 實現Epoll模型,測試其效率
 - 分析各模型性能,分析和對比其源碼
 - 針對各模型特點,修改上述程序進行測試和分析
 
? ? ? ? 樸素模型是我們編程時可以使用的最簡單的一種模型。因為沒有一個確切的名字可以稱呼,我索性叫它樸素模型。我選擇先實現它,一是為了由易而難,二是為了遵循模型發展的過程、體會技術發展的歷程。在實現完樸素模型之后,我們要去實現一個用于發送請求的測試程序,它將幫助我們發送大量的請求,以便于之后我們對各個模型進行可用性測試。之后我們再去實現Select、Poll和Epoll網絡模型。這個順序也是技術發展的順序,我們可以在實現前一個模型時分析其優缺點,然后在后一個模型分析中,看到其對這些缺點的改進方案,體會技術進步的過程。
? ? ? ? 為了便于之后各個模型的對比,我會盡可能的重用代碼,即各個模型功能相同的模塊將使用相同的函數去實現,如果實在不可以重用,則使用參數進行區分,但是區分的代碼片段將足夠的小。所以,我們將在本文看到大部分重要的代碼實現片段。
? ? ? ? 為了比較直觀的觀察各個模型的執行,我們將在各個模型執行前,啟動一個打印統計信息的線程
         err = init_print_thread();if (err < 0) {perror("create print thread error");exit(EXIT_FAILURE);} 
? ? ? ? init_print_thread函數將被各個模型使用,wait_print_thread是用于等待該打印結果的線程退出。由于我并不準備讓這個線程退出,所以wait_print_thread往往用來阻塞主線程。
 pthread_t g_print_thread;intinit_print_thread() {return pthread_create(&g_print_thread, NULL, print_count, NULL);}voidwait_print_thread() {pthread_join(g_print_thread, NULL);} 
? ? ? ? print_count函數是用于線程執行的實體,它每隔一秒鐘打印一條記錄
static int g_request_count = 0;static int g_server_suc = 0;
static int g_client_suc = 0;
static int g_read_suc = 0;
static int g_write_suc = 0;static int g_server_fai = 0;
static int g_client_fai = 0;
static int g_read_fai = 0;
static int g_write_fai = 0;void* print_count(void* arg) {struct timeval cur_time;int index = 0;fprintf(stderr, "index\tseconds_micro_seconds\tac\tst\tsr\tsw\tft\tfr\tfw\n");while (1) {sleep(1);gettimeofday(&cur_time, NULL);fprintf(stderr, "%d\t%ld\t%d\t%d\t%d\t%d\t%d\t%d\t%d\n",index,cur_time.tv_sec * 1000000 + cur_time.tv_usec,g_request_count,g_server_suc > g_client_suc ? g_server_suc : g_client_suc,g_read_suc,g_write_suc,g_server_fai > g_client_fai ? g_server_fai : g_client_fai,g_read_fai,g_write_fai);index++;}
} 
? ? ? ? 上述各數據的定義如下: ? ? ? ?
- g_request_count用于記錄總請求數;
 - g_server_suc是用于記錄服務行為成功數,其場景為:讀取客戶端成功且發送回包成功
 - g_server_fai是記錄服務其行為失敗數,其場景為:1 讀取客戶端失敗;2 讀取客戶端成功但是發送回包失敗;
 - g_client_suc用于記錄客戶端行為成功數,其場景為:發送包成功且讀取服務器回包成功;
 - g_client_fai用于記錄客戶端行為失敗數,其場景為:1 發送包失敗; 2 發送包成功但是接收服務器回包失敗;
 - g_read_suc用于記錄讀取行為成功數,其場景為: 1 服務器讀取客戶端請求包成功; 2 客戶端讀取服務器回包成功;
 - g_read_fai用于記錄讀取行為失敗數,其場景為: 1 服務器讀取客戶端請求包失敗; 2 客戶端讀取服務器回包失敗;
 - g_write_suc用于記錄發送行為成功數,其場景為: 1 客戶端向服務器發送請求包成功; 2 服務器向客戶端回包成功;
 - g_write_fai用于記錄發送行為失敗數,其場景為: 1?客戶端向服務器發送請求包失敗; 2?服務器向客戶端回包失敗;
 
? ? ? ? 通過數據的打印,我們將知道服務器和客戶端執行執行的過程,以及出問題的環節,還有服務器的丟包情況。
? ? ? ? 下一步,我們需要創建一個供客戶端連接的Socket。
	listen_sock = make_socket(0); 
? ? ? ? 我們對make_socket傳入了參數0,是因為我們不要求創建的監聽Socket具有異步屬性。
int
make_socket(int asyn) {int listen_sock = -1;int rc = -1;int on = 1;struct sockaddr_in name;listen_sock = socket(AF_INET, SOCK_STREAM, 0);if (listen_sock < 0) {perror("create socket error");exit(EXIT_FAILURE);}rc = setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on));if (rc < 0) {perror("setsockopt error");exit(EXIT_FAILURE);}if (asyn) {rc = ioctl(listen_sock, FIONBIO, (char*)&on);if (rc < 0) {perror("ioctl failed");exit(EXIT_FAILURE);}}name.sin_family = AF_INET;name.sin_port = htons(PORT);name.sin_addr.s_addr = htonl(INADDR_ANY);if (bind(listen_sock, (struct sockaddr*)&name, sizeof(name)) < 0) {perror("bind error");exit(EXIT_FAILURE);}return listen_sock;
} 
? ? ? ? 這個函數中我們使用了socket函數創建了一個TCP的Socket。并使用bind函數將該socket綁定到本機特定的端口上。
? ? ? ? 在樸素模型中,我們讓服務器是一個同步處理過程。于是不要求之后的連接具有異步屬性,所以我們創建該Socket時傳了參數0——讓監聽Socket不具有異步特性。在之后介紹的Select、Poll和Epoll模型中,我們需要客戶端接入的連接是異步的,于是我們就傳遞了參數1,讓監聽Socket具有異步特性,這樣通過它接入的連接也是異步的。
? ? ? ? Socket綁定之后,服務器就要開始監聽客戶端的接入
	if (listen(listen_sock, SOMAXCONN) < 0) {perror("listen error");exit(EXIT_FAILURE);} 
? ? ? ??SOMAXCONN是可以同時處理的最大連接數,它是一個系統宏。在我系統上它的值是128。
? ? ? ? 最后,我們在一個死循環中接收并處理客戶端的請求
	while (1) {int new_sock;new_sock = accept(listen_sock, NULL, NULL);if (new_sock < 0) {perror("accept error");exit(EXIT_FAILURE);} 
? ? ? ? 通過accept我們將獲得接入的socket。如果socket值合法,我們則需要讓接受的請求數自增1
		request_add(1); 
? ? ? ? request_add函數將在之后不同模型以及測試程序中被調用,而且會是在不同的線程中調用。于是這兒就引入一個多線程的問題。我并不打算使用鎖等方法,而是利用簡單的原子操作來實現。
void 
request_add(int count) {__sync_fetch_and_add(&g_request_count, count);
} 
? ? ? ? 由于我們設計的樸素模式是一個同步過程,所以接入的socket不是異步的。當一些特殊情況發生時,之后的讀取socket內容的行為或者往socket中寫入內容的行為可能會卡住。這樣將導致整個服務都卡住,這是我們不希望看到的。于是我們需要對該同步socket設置操作超時屬性。
		set_block_filedes_timeout(new_sock); 
void
set_block_filedes_timeout(int filedes) {struct timeval tv_out, tv_in;tv_in.tv_sec = READ_TIMEOUT_S;tv_in.tv_usec = READ_TIMEOUT_US;if (setsockopt(filedes, SOL_SOCKET, SO_RCVTIMEO, &tv_in, sizeof(tv_in)) < 0) {perror("set rcv timeout error");exit(EXIT_FAILURE);}tv_out.tv_sec = WRITE_TIMEOUT_S;tv_out.tv_usec = WRITE_TIMEOUT_US;if (setsockopt(filedes, SOL_SOCKET, SO_SNDTIMEO, &tv_out, sizeof(tv_out)) < 0) {perror("set rcv timeout error");exit(EXIT_FAILURE);}
} 
? ? ? ? 這兒要說明下,我在網上看過很多人提問說通過上述方法設置超時屬性無效。其實是他們犯了一個錯誤,就是將socket設置為異步屬性。如果socket既設置為異步屬性,又設置了超時,socket當然是按異步特點去執行的,超時設置也就無效了。
? ? ? ? 還有一個問題,就是有些同學在自己設計服務器和客戶端時發生了“死鎖”問題(非嚴格定義意義上的死鎖)。那是因為設計的服務器和客戶端都是同步的,而且socket都沒有設置超時。這樣在客戶端調用完write之后進入read時,服務器此時也是read狀態,導致了“死鎖”。但是這個問題并不是經常發生,因為大部分同學實現read時給了一個很大的緩存,并認為讀取的內容一次性可以讀完。而沒有考慮到一次read操作可能讀不完全部數據的情況,比如下面的實現
	while (nbytes > 0) {nbytes = recv(filedes, buffer, sizeof(buffer) - 1, 0);if (nbytes > 0) {total_length_recv += nbytes;}//buffer[nbytes] = 0;//fprintf(stderr, "%s", buffer);} 
? ? ? ? 這段服務器read操作考慮到了一次性可能讀不完全部數據的問題。但是如果客戶端發送完數據后,服務器第一次recv可以把全部數據讀取出來了。由于讀取的數據大于0,于是再次進入讀取操作,這個時候,客戶端已經處于讀取服務器返回的階段。由于socket是同步的,且未設置超時,導致服務器一直卡在再次讀取的操作中,這樣就發生了“死鎖”。其實這個過程非常有意思,當我們對一段不健壯的代碼進行加固時,往往會掉到另外一個坑里。但是只要我們努力的從坑里跳出來,就會豁然開朗且認識到很多別人忽視的問題。
? ? ? ? 我們再回到正題,我們設置好socket超時屬性后,就開始讓服務器讀取客戶端的輸入內容,如果輸入內容讀取成功,則往客戶端回包。最后服務器將該次連接關閉
		if (0 == server_read(new_sock)) {server_write(new_sock);}close(new_sock); 
? ? ? ? server_read方法在底層調用了read_data方法,read_data方法是我們整個代碼的兩個關鍵行為之一
int
is_nonblock(int fd) {int flags = fcntl(fd, F_GETFL);if (flags == -1) {perror("get fd flags error");exit(EXIT_FAILURE);}return (flags & O_NONBLOCK) ? 1 : 0;
}int
read_data(int filedes, int from_server) {char buffer[MAXMSG];int nbytes;int total_len_recv;int wait_count = 0;int rec_suc = 0;total_len_recv = 0;while (1) {nbytes = recv(filedes, buffer, sizeof(buffer) - 1, 0);if (nbytes < 0) {if (is_nonblock(filedes)) {if (EAGAIN == errno || EWOULDBLOCK == errno || EINTR == errno) {if (wait_count < WAIT_COUNT_MAX) {wait_count++;usleep(wait_count);continue;}}}break;}if (nbytes == 0) {//fprintf(stderr, "read end\n");break;}else if (nbytes > 0) {total_len_recv += nbytes;//buffer[nbytes] = 0;//fprintf(stderr, "%s", buffer);}if ((from_server && is_server_recv_finish(total_len_recv))|| (!from_server && is_client_recv_finish(total_len_recv))) {rec_suc = 1;break;}} 
? ? ? ? read_data行為分為客戶端和服務器兩個版本實現,其基本邏輯是一樣的。我們考慮到讀取操作可能一次性讀不完,所以我們使用while循環持續嘗試讀取。如果是一個異步的socket,我們則考慮recv函數返回小于0時各種錯誤值的場景,并使用漸長等待的方式進行多次嘗試。如果是同步的socket,一旦recv返回值小于0,則退出讀取操作。total_len_recv函數用于統計一共讀取的長度,之后通過這個長度結合是否是服務器還是客戶端的標識,判斷讀取操作是否完成。
? ? ? ? 當讀取操作結束后,我們要統計讀取操作的行為及其標識的整個過程的行為。
	if (from_server) {if (rec_suc) {__sync_fetch_and_add(&g_read_suc, 1);return 0;} else {__sync_fetch_and_add(&g_read_fai, 1);__sync_fetch_and_add(&g_server_fai, 1);return -1;}} else {if (rec_suc) {__sync_fetch_and_add(&g_read_suc, 1);__sync_fetch_and_add(&g_client_suc, 1);return 0;} else {__sync_fetch_and_add(&g_read_fai, 1);__sync_fetch_and_add(&g_client_fai, 1);return -1;}}} 
? ? ? ? 如果讀取操作成功,則進行發送操作。server_write方法在底層調用了write_data方法
int
write_data(int filedes, int from_server) {int nbytes;int total_len_send;int wait_count = 0;int index;int send_suc = 0;total_len_send = 0;index = 0;while (1) {if (from_server) {nbytes = send(filedes, get_server_send_ptr(index), get_server_send_len(index), 0);}else {nbytes = send(filedes, get_client_send_ptr(index), get_client_send_len(index), 0);}if (nbytes < 0) {if (is_nonblock(filedes)) {if (EAGAIN == errno || EWOULDBLOCK == errno || EINTR == errno) {if (wait_count < WAIT_COUNT_MAX) {wait_count++;usleep(wait_count);continue;}}}break;}else if (nbytes == 0) {break;}else if (nbytes > 0) {total_len_send += nbytes;}if ((from_server && is_server_send_finish(total_len_send)) ||(!from_server && is_client_send_finish(total_len_send))){send_suc = 1;break;}} 
? ? ? ? 其實現和read_data思路一致,也考慮到一次性寫不完的情況和同步異步socket問題。寫入操作完成后再去統計相關行為
	if (from_server) {if (send_suc) {__sync_fetch_and_add(&g_write_suc, 1);__sync_fetch_and_add(&g_server_suc, 1);return 0;} else {__sync_fetch_and_add(&g_write_fai, 1);__sync_fetch_and_add(&g_server_fai, 1);return -1;}} else {if (send_suc) {__sync_fetch_and_add(&g_write_suc, 1);return 0;} else {__sync_fetch_and_add(&g_write_fai, 1);__sync_fetch_and_add(&g_client_fai, 1);return -1;}}
} 
? ? ? ? 最后我們講下測試程序的實現。為了便于測試,我要求測試程序可以接受至少2個參數,第一個參數是用于標識啟動多少個線程發送請求;第二個參數用于指定線程中等待多少毫秒發送一次請求;第三個參數是可選的,標識一共發送多少次請求。這樣我們可以通過這些參數控制測試程序的行為
#define MAXREQUESTCOUNT 100000static int g_total = 0;
static int g_max_total = 0;void* send_data(void* arg) {int wait_time;int client_sock;wait_time = *(int*)arg;while (__sync_fetch_and_add(&g_total, 1) < g_max_total) {usleep(wait_time);client_sock = make_client_socket();connect_server(client_sock);request_add(1);set_block_filedes_timeout(client_sock);if (0 == client_write(client_sock)) {client_read(client_sock);}close(client_sock);client_sock = 0;}
}int 
main(int argc, char* argv[]) {int thread_count;int index;int err;int wait_time;pthread_t thread_id;if (argc < 3) {fprintf(stderr, "error! example: client 10 50\n");return 0;}err = init_print_thread();if (err < 0) {perror("create print thread error");exit(EXIT_FAILURE);}thread_count = atoi(argv[1]);wait_time = atoi(argv[2]);g_max_total = MAXREQUESTCOUNT;if (argc > 3) {g_max_total = atoi(argv[3]);}	for (index = 0; index < thread_count; index++) {err = pthread_create(&thread_id, NULL, send_data, &wait_time);if (err != 0) {perror("can't create send thread");exit(EXIT_FAILURE);}}wait_print_thread();return 0;
} 
? ? ? ? 線程中,首先通過make_client_socket創建socket并綁定到本地端口上
int
make_client_socket() {int client_sock = -1;struct sockaddr_in client_addr;client_sock = socket(AF_INET, SOCK_STREAM, 0);if (client_sock < 0) {perror("create socket error");exit(EXIT_FAILURE);}bzero(&client_addr, sizeof(client_addr));client_addr.sin_family = AF_INET;client_addr.sin_addr.s_addr = htons(INADDR_ANY);client_addr.sin_port = htons(0);if (bind(client_sock, (struct sockaddr*)&client_addr, sizeof(client_addr)) < 0) {perror("bind error");exit(EXIT_FAILURE);}return client_sock;
} 
? ? ? ? 然后通過connect_server連接服務器
void 
connect_server(int client_sock) {struct sockaddr_in server_addr;bzero(&server_addr, sizeof(server_addr));server_addr.sin_family = AF_INET;if (inet_aton("127.0.0.1", &server_addr.sin_addr) == 0) {perror("set server ip error");exit(EXIT_FAILURE);}server_addr.sin_port = htons(PORT);if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {perror("client connect server error");exit(EXIT_FAILURE);}
} 
? ? ? ? 最后通過client_write和client_read和服務器通信。這兩個函數都是調用上面介紹的write_data和read_data,所以沒什么好講的。
int
client_read(int filedes) {return read_data(filedes, 0);
}int
client_write(int filedes) {return write_data(filedes, 0);
} 
? ? ? ? 我們啟動一千個線程,發送30萬次請求。看看樸素模型的處理能力。
? ? ? ? 首先我們看看服務器的結果打印
? ? ? ? 可以發現穩定的處理能力大概在每秒14000~15000左右。
? ? ? ? 我們再看看客戶端的打印
 ? ? ? ? 我們發現其發送頻率差不多也是14000~16000。這兒要說明下,因為客戶端是同步模型,服務器也是同步模型,所以這個速率是服務器處理的峰值。否則按照設置的1微秒的等待時間,1000個線程一秒鐘發送的請求數肯定不止15000。我使用過兩個測試進程同時去壓,也驗證了其最大的處理能力也就是在14000~15000左右(在我的配置環境下)。
? ? ? ? 我們發現,使用樸素模型實現網絡通信是非常方便的。但是這個模型有個明顯的缺點,就是一次只能處理一個請求——即接收請求、讀socket、寫socket是串行執行的。除非使用線程池去優化這個流程,否則在單線程的情況下,似乎就不能解決這個問題了。科技總是進步的,我們將在下一節講解Select模型,它就可以解決這個問題。
總結
以上是生活随笔為你收集整理的朴素、Select、Poll和Epoll网络编程模型实现和分析——朴素模型的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: Google Mock(Gmock)简单
 - 下一篇: 朴素、Select、Poll和Epoll