理大并發(fā)之五 使用libevent利器bufferevent
???????? 首先來翻譯一段文章
???????? 你可能注意到隨著我們代碼變得越來越高效,程序也變得更加復雜。當我們產(chǎn)生一個進程的時候,我們沒有必要為每一個鏈接管理一個buffer,我們只需要每個處理獨立棧分配緩沖區(qū)就可以了。在讀和寫的時候,我們不必明確的跟蹤每一個socket,這在我們的代碼里是一個暗示,我們沒有必要定義一個結構體去跟蹤每一個操作什么時候完成,我們只需要使用循環(huán)棧變量就可以了。
???????? 此外,如果你在windows網(wǎng)絡編程方面有著豐富的經(jīng)驗,當你在使用上一篇博客中的例子時,你可能認識到libevent可能達不到最理想的性能。在windows上,你做的快速異步IO不是用的select,它使用的IOCP API。和其他的快速網(wǎng)絡API不同,當你的程序執(zhí)行完成,sock準備完成,IOCP不會通知你的程序,取而代之的是,程序告訴windows網(wǎng)絡棧開啟一個網(wǎng)絡操作,并且當操作執(zhí)行完成時,IOCP會告訴程序。
???????? 幸運的是,libevent2 的bufferevents接口解決了上面的這些沖突,它使得程序更加容易寫,并且為windows和unix提供了有效的接口。
分析:
libevent的bufferevent在event的基礎上自己維護了一個buffer,這樣的話,就不需要再自己管理一個buffer了,上一篇博客是自己維護一個buffer,維護過程復雜,且過程難以理解,既然libevent自己提供了bufferevent這個神器,且有API,何必自己維護呢?
先看看struct bufferevent這個結構體
[cpp] view plaincopy print?
struct?bufferevent?{?????????????????struct?event_base?*ev_base;??????????const?struct?bufferevent_ops?*be_ops;??????????struct?event?ev_read;??????????struct?event?ev_write;??????????struct?evbuffer?*input;??????????struct?evbuffer?*output;??????????……??????????bufferevent_data_cb?readcb;??????????bufferevent_data_cb?writecb;??????????bufferevent_event_cb?errorcb;??????????……??}??
struct bufferevent {struct event_base *ev_base;const struct bufferevent_ops *be_ops;struct event ev_read;struct event ev_write;struct evbuffer *input;struct evbuffer *output;……bufferevent_data_cb readcb;bufferevent_data_cb writecb;bufferevent_event_cb errorcb;……
}
可以看出struct bufferevent內(nèi)置了兩個event(讀/寫)和對應的緩沖區(qū)。當有數(shù)據(jù)被讀入(input)的時候,readcb被調(diào)用,當output被輸出完成的時候,writecb被調(diào)用,當網(wǎng)絡I/O出現(xiàn)錯誤,如鏈接中斷,超時或其他錯誤時,errorcb被調(diào)用。
使用bufferevent的過程:
1. 設置sock為非阻塞的
[cpp] view plaincopy print?
eg:??evutil_make_socket_nonblocking(fd);??
eg: evutil_make_socket_nonblocking(fd);
2. 使用bufferevent_socket_new創(chuàng)建一個structbufferevent *bev,關聯(lián)該sockfd,托管給event_base
函數(shù)原型為:
[cpp] view plaincopy print?
struct?bufferevent?*?bufferevent_socket_new(struct?event_base?*base,?evutil_socket_t?fd,??int?options)??eg:??struct?bufferevent?*bev;??bev?=?bufferevent_socket_new(base,?fd,?BEV_OPT_CLOSE_ON_FREE);??
struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options)
eg: struct bufferevent *bev;
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
3. 設置讀寫對應的回調(diào)函數(shù)
函數(shù)原型為:
[cpp] view plaincopy print?
void?bufferevent_setcb(struct?bufferevent?*bufev,??????bufferevent_data_cb?readcb,?bufferevent_data_cb?writecb,??????bufferevent_event_cb?eventcb,?void?*cbarg)??eg.??bufferevent_setcb(bev,?readcb,?NULL,?errorcb,?NULL);??
void bufferevent_setcb(struct bufferevent *bufev,bufferevent_data_cb readcb, bufferevent_data_cb writecb,bufferevent_event_cb eventcb, void *cbarg)
eg. bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
4. 啟用讀寫事件,其實是調(diào)用了event_add將相應讀寫事件加入事件監(jiān)聽隊列poll。正如文檔所說,如果相應事件不置為true,bufferevent是不會讀寫數(shù)據(jù)的
函數(shù)原型:
[cpp] view plaincopy print?
int?bufferevent_enable(struct?bufferevent?*bufev,?short?event)??eg.??bufferevent_enable(bev,?EV_READ|EV_WRITE);??
int bufferevent_enable(struct bufferevent *bufev, short event)
eg. bufferevent_enable(bev, EV_READ|EV_WRITE);
5. 進入bufferevent_setcb回調(diào)函數(shù):
在readcb里面從input中讀取數(shù)據(jù),處理完畢后填充到output中;
writecb對于服務端程序,只需要readcb就可以了,可以置為NULL;
errorcb用于處理一些錯誤信息。
針對這些使用過程進入源碼進行分析:
1. bufferevent_socket_new
(1)在bufferevent_init_common中調(diào)用evbuffer_new()初始化input和output
(2)在event_assign中初始化bufferevent中的ev_read和ev_write事件。
(3)在evbuffer_add_cb中給output添加了一個callback bufferevent_socket_outbuf_cb
2. bufferevent_setcb
該函數(shù)的作用主要是賦值,把該函數(shù)后面的參數(shù),賦值給第一個參數(shù)struct bufferevent *bufev定義的變量
3. bufferevent_enable
調(diào)用event_add將讀寫事件加入到事件監(jiān)聽隊列中。
?
對bufferevent常用的幾個函數(shù)進行分析:
[cpp] view plaincopy print?
char?*evbuffer_readln(struct?evbuffer*buffer,?size_t?*n_read_out,enum?evbuffer_eol_style?eol_style);??
char *evbuffer_readln(struct evbuffer*buffer, size_t *n_read_out,enum evbuffer_eol_style eol_style);
含義:Read a single line from an evbuffer.
返回值:讀到的一行內(nèi)容
[cpp] view plaincopy print?
int?evbuffer_add(struct?evbuffer?*buf,const?void?*data,?size_t?datlen);??
int evbuffer_add(struct evbuffer *buf,const void *data, size_t datlen);
含義:將數(shù)據(jù)添加到evbuffer的結尾
返回值:成功返回0,失敗返回-1
[cpp] view plaincopy print?
int?evbuffer_remove(struct?evbuffer*buf,?void?*data,?size_t?datlen);??
int evbuffer_remove(struct evbuffer*buf, void *data, size_t datlen);
含義:從evbuffer讀取數(shù)據(jù)到data
返回值:成功返回0,失敗返回-1
[cpp] view plaincopy print?
size_t?evbuffer_get_length(const?structevbuffer?*buf);??
size_t evbuffer_get_length(const structevbuffer *buf);
含義:返回evbuffer中存儲的字節(jié)長度
暫時先分析到這里,下面是代碼,客戶端發(fā)送消息:HTTP/1.0, Client 0 send Message:
Request: Hello Server! over,服務端一條消息收完成后,會回復:Response ok! Hello Client!
服務端從bufferevent中取出消息是按行取的。代碼可能有不完善的地方,由于才疏學淺,研究時間短(3天),希望高手提出寶貴意見。
libevent_eventbuffer_server.c
[cpp] view plaincopy print?
#include?<netinet/in.h>??#include?<sys/socket.h>??#include?<fcntl.h>????#include?<event2/event.h>??#include?<event2/buffer.h>??#include?<event2/bufferevent.h>????#include?<assert.h>??#include?<unistd.h>??#include?<string.h>??#include?<stdlib.h>??#include?<stdio.h>??#include?<errno.h>????void?do_read(evutil_socket_t?fd,?short?events,?void?*arg);??????緩沖區(qū)(或者直接操作bufferevent)????void?readcb(struct?bufferevent?*bev,?void?*ctx)??{??????printf("called?readcb!\n");??????struct?evbuffer?*input,?*output;??????char?*request_line;??????size_t?len;??????input?=?bufferevent_get_input(bev);??????output?=?bufferevent_get_output(bev);????????size_t?input_len?=?evbuffer_get_length(input);??????printf("input_len:?%d\n",?input_len);??????size_t?output_len?=?evbuffer_get_length(output);??????printf("output_len:?%d\n",?output_len);????????while(1)??????{??????????request_line?=?evbuffer_readln(input,?&len,?EVBUFFER_EOL_CRLF);??的字符串返回這一行,EVBUFFER_EOL_CRLF表示行尾是一個可選的回車,后隨一個換行符??????????if(NULL?==?request_line)??????????{??????????????printf("The?first?line?has?not?arrived?yet.\n");??????????????free(request_line);??????????????break;??????????}??????????else??<span?style="white-space:?pre;">????</span>{??????????????printf("Get?one?line?date:?%s\n",?request_line);??????????????if(strstr(request_line,?"over")?!=?NULL)??????????????{??????????????????char?*response?=?"Response?ok!?Hello?Client!\r\n";??????????????????evbuffer_add(output,?response,?strlen(response));??????????????????printf("服務端接收一條數(shù)據(jù)完成,回復客戶端一條消息:?%s\n",?response);??????????????????free(request_line);??????????????????break;??????????????}??????????}??????????free(request_line);??????}????????size_t?input_len1?=?evbuffer_get_length(input);??????printf("input_len1:?%d\n",?input_len1);??????size_t?output_len1?=?evbuffer_get_length(output);??????printf("output_len1:?%d\n\n",?output_len1);??}????void?errorcb(struct?bufferevent?*bev,?short?error,?void?*ctx)??{??????if?(error?&?BEV_EVENT_EOF)??????{????????????????????printf("connection?closed\n");??????}??????else?if?(error?&?BEV_EVENT_ERROR)??????{????????????????????printf("some?other?error\n");??????}??????else?if?(error?&?BEV_EVENT_TIMEOUT)??????{????????????????????printf("Timed?out\n");??????}??????bufferevent_free(bev);??}????void?do_accept(evutil_socket_t?listener,?short?event,?void?*arg)??{??????struct?event_base?*base?=?arg;??????struct?sockaddr_storage?ss;??????socklen_t?slen?=?sizeof(ss);??????int?fd?=?accept(listener,?(struct?sockaddr*)&ss,?&slen);??????if?(fd?<?0)??????{??????????perror("accept");??????}??????else?if?(fd?>?FD_SETSIZE)??????{??????????close(fd);??????}??????else??????{??????????struct?bufferevent?*bev;??????????evutil_make_socket_nonblocking(fd);????????????????????????????????bev?=?bufferevent_socket_new(base,?fd,?BEV_OPT_CLOSE_ON_FREE);??????????????????????bufferevent_setcb(bev,?readcb,?NULL,?errorcb,?NULL);????????????????ferevent是不會讀寫數(shù)據(jù)的??????????bufferevent_enable(bev,?EV_READ|EV_WRITE);??????}??}????void?run(void)??{??????evutil_socket_t?listener;??????struct?sockaddr_in?sin;??????struct?event_base?*base;??????struct?event?*listener_event;????????base?=?event_base_new();??????if?(!base)??????????return;?????????sin.sin_family?=?AF_INET;??????sin.sin_addr.s_addr?=?0;??????sin.sin_port?=?htons(8000);????????listener?=?socket(AF_INET,?SOCK_STREAM,?0);??????evutil_make_socket_nonblocking(listener);????#ifndef?WIN32??????{??????????int?one?=?1;??????????setsockopt(listener,?SOL_SOCKET,?SO_REUSEADDR,?&one,?sizeof(one));??????}??#endif????????if?(bind(listener,?(struct?sockaddr*)&sin,?sizeof(sin))?<?0)??????{??????????perror("bind");??????????return;??????}??if?(listen(listener,?16)<0)??????{??????????perror("listen");??????????return;??????}????????listener_event?=?event_new(base,?listener,?EV_READ|EV_PERSIST,?do_accept,?(void*)base);????????????event_add(listener_event,?NULL);????????event_base_dispatch(base);??}????int?main(int?argc,?char?**argv)??{??????setvbuf(stdout,?NULL,?_IONBF,?0);????????run();??????return?0;??}??
#include <netinet/in.h>
#include <sys/socket.h>
#include <fcntl.h>#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>void do_read(evutil_socket_t fd, short events, void *arg);//struct bufferevent內(nèi)建了兩個event(read/write)和對應的緩沖區(qū)(struct evbuffer *input, *output),并提供相應的函數(shù)用來操作>
緩沖區(qū)(或者直接操作bufferevent)
//接收到數(shù)據(jù)后,判斷是不一樣一條消息的結束,結束標志為"over"字符串
void readcb(struct bufferevent *bev, void *ctx)
{printf("called readcb!\n");struct evbuffer *input, *output;char *request_line;size_t len;input = bufferevent_get_input(bev);//其實就是取出bufferevent中的inputoutput = bufferevent_get_output(bev);//其實就是取出bufferevent中的outputsize_t input_len = evbuffer_get_length(input);printf("input_len: %d\n", input_len);size_t output_len = evbuffer_get_length(output);printf("output_len: %d\n", output_len);while(1){request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);//從evbuffer前面取出一行,用一個新分配的空字符結束
的字符串返回這一行,EVBUFFER_EOL_CRLF表示行尾是一個可選的回車,后隨一個換行符if(NULL == request_line){printf("The first line has not arrived yet.\n");free(request_line);//之所以要進行free是因為 line = mm_malloc(n_to_copy+1)),在這里進行了mallocbreak;}else
{printf("Get one line date: %s\n", request_line);if(strstr(request_line, "over") != NULL)//用于判斷是不是一條消息的結束{char *response = "Response ok! Hello Client!\r\n";evbuffer_add(output, response, strlen(response));//Adds data to an event bufferprintf("服務端接收一條數(shù)據(jù)完成,回復客戶端一條消息: %s\n", response);free(request_line);break;}}free(request_line);}size_t input_len1 = evbuffer_get_length(input);printf("input_len1: %d\n", input_len1);size_t output_len1 = evbuffer_get_length(output);printf("output_len1: %d\n\n", output_len1);
}void errorcb(struct bufferevent *bev, short error, void *ctx)
{if (error & BEV_EVENT_EOF){/* connection has been closed, do any clean up here */printf("connection closed\n");}else if (error & BEV_EVENT_ERROR){/* check errno to see what error occurred */printf("some other error\n");}else if (error & BEV_EVENT_TIMEOUT){/* must be a timeout event handle, handle it */printf("Timed out\n");}bufferevent_free(bev);
}void do_accept(evutil_socket_t listener, short event, void *arg)
{struct event_base *base = arg;struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener, (struct sockaddr*)&ss, &slen);if (fd < 0){perror("accept");}else if (fd > FD_SETSIZE){close(fd);}else{struct bufferevent *bev;evutil_make_socket_nonblocking(fd);//使用bufferevent_socket_new創(chuàng)建一個struct bufferevent *bev,關聯(lián)該sockfd,托管給event_baseBEV_OPT_CLOSE_ON_FREE表示釋放bufferevent時關閉底層傳輸端口。這將關閉底層套接字,釋放底層bufferevent等。bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);//設置讀寫對應的回調(diào)函數(shù)bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
// bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);//啟用讀寫事件,其實是調(diào)用了event_add將相應讀寫事件加入事件監(jiān)聽隊列poll。正如文檔所說,如果相應事件不置為true,buf
ferevent是不會讀寫數(shù)據(jù)的bufferevent_enable(bev, EV_READ|EV_WRITE);}
}void run(void)
{evutil_socket_t listener;struct sockaddr_in sin;struct event_base *base;struct event *listener_event;base = event_base_new();if (!base)return; /*XXXerr*/sin.sin_family = AF_INET;sin.sin_addr.s_addr = 0;sin.sin_port = htons(8000);listener = socket(AF_INET, SOCK_STREAM, 0);evutil_make_socket_nonblocking(listener);#ifndef WIN32{int one = 1;setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));}
#endifif (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0){perror("bind");return;}
if (listen(listener, 16)<0){perror("listen");return;}listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);/*XXX check it */event_add(listener_event, NULL);event_base_dispatch(base);
}int main(int argc, char **argv)
{setvbuf(stdout, NULL, _IONBF, 0);run();return 0;
}
編譯:gcc -I/usr/include-o test libevent_eventbuffer_server.c -L/usr/local/lib –levent
運行:
服務端:
客戶端:
今晚博客暫時寫完了,時間比較倉促,錯誤估計不會少,關于bufferevent很多API都還不是很熟悉,還有l(wèi)ibevent 添加事件event_add是非線程安全的,如果使用多線程,需要保證event_add不能出現(xiàn)在多個線程中,以后有時間慢慢研究。
體會:關于源碼,還需要好好研究,其實今晚挺郁悶的,弄了半天,沒有什么進展,現(xiàn)在自己還有很多疑問,主要是自己太急了,不過3天時間做到基本了解,自己還算滿意,下一步有時間多研究下吧。晚安,北京
如是轉(zhuǎn)載,請指明原出處:
http://blog.csdn.net/feitianxuxue
,謝謝合作!
新人創(chuàng)作打卡挑戰(zhàn)賽發(fā)博客就能抽獎!定制產(chǎn)品紅包拿不停!
總結
以上是生活随笔為你收集整理的处理大并发之五 使用libevent利器bufferevent的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。