Memcached 源码分析——从 main 函数说起
(廣告時間: 最近在寫一個基于 Leveldb 存儲引擎的數據服務器,C開發,使用 Libevent 處理網絡事件,后臺利用多線程并發處理客戶端連接,理論上單機就應該支持數千-上萬的客戶端連接(未測試),框架已基本成型,暫取名LLDB(Libevent-based and Leveldb-backended DataBase),等代碼成熟以后將它開源,希望能有同學試用。)
Memcached? 是以 LiveJournal 旗下 Danga Interactive 公司的 Brad Fitzpatric? 為首開發的一款分布式緩存服務器,基于內存,性能非常高,現在已成為mixi、hatena、Facebook、Vox、LiveJournal等眾多服務中提高Web應用擴展性的重要因素(更多介紹參見:維基百科,百科百科)。下面粗略地分析一下 Memcached 的啟動流程(基于 memcached-1.4.14),此處只列出了代碼的梗概。
int main (int argc, char **argv) {int c;bool lock_memory = false;bool do_daemonize = false;bool preallocate = false;int maxcore = 0;char *username = NULL;char *pid_file = NULL;struct passwd *pw;struct rlimit rlim;char unit = '\0';int size_max = 0;int retval = EXIT_SUCCESS;/* listening sockets */static int *l_socket = NULL;/* 更多的參數設置 *//* 有效性檢查 */if (!sanitycheck()) {return EX_OSERR;}/* 注冊信號處理函數*/signal(SIGINT, sig_handler);/* 數據庫配置初始化 */settings_init();/* 處理輸入參數,并初始化 memcached 配置,代碼略 *//* 如果指定了 -S 參數,則初始化 sasl 模塊 */if (settings.sasl) {init_sasl();}/* 是否以守護進程方式運行 memcached*//* if we want to ensure our ability to dump core, don't chdir to / */if (do_daemonize) {if (sigignore(SIGHUP) == -1) {perror("Failed to ignore SIGHUP");}if (daemonize(maxcore, settings.verbose) == -1) {fprintf(stderr, "failed to daemon() in order to daemonize\n");exit(EXIT_FAILURE);}}/* 初始化 libevent 主線程實例 */main_base = event_init();/* 其他模塊初始化 */stats_init();assoc_init(settings.hashpower_init);conn_init();slabs_init(settings.maxbytes, settings.factor, preallocate);/** 忽視 SIGPIPE 信號,如果我們需要 SIGPIPE 信號,可以檢測條件 errno == EPIPE*/if (sigignore(SIGPIPE) == -1) {perror("failed to ignore SIGPIPE; sigaction");exit(EX_OSERR);}/* 如果以多線程模式運行 memcached,則啟動工作者線程 */thread_init(settings.num_threads, main_base);/* 啟動 assoc 維護線程*/if (start_assoc_maintenance_thread() == -1) {exit(EXIT_FAILURE);}/* 啟動 slab 維護線程 */if (settings.slab_reassign &&start_slab_maintenance_thread() == -1) {exit(EXIT_FAILURE);}/* 初始化時鐘處理函數 */clock_handler(0, 0, 0);/* 釋放特權后創建 unix 模式套接字 */if (settings.socketpath != NULL) {errno = 0;if (server_socket_unix(settings.socketpath,settings.access)) {vperror("failed to listen on UNIX socket: %s", settings.socketpath);exit(EX_OSERR);}}/* 創建監聽套接字,綁定該套接字,然后進行相關初始化 */if (settings.socketpath == NULL) {const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");char temp_portnumber_filename[PATH_MAX];FILE *portnumber_file = NULL;if (portnumber_filename != NULL) {snprintf(temp_portnumber_filename,sizeof(temp_portnumber_filename),"%s.lck", portnumber_filename);portnumber_file = fopen(temp_portnumber_filename, "a");if (portnumber_file == NULL) {fprintf(stderr, "Failed to open \"%s\": %s\n",temp_portnumber_filename, strerror(errno));}}errno = 0;if (settings.port && server_sockets(settings.port, tcp_transport,portnumber_file)) {vperror("failed to listen on TCP port %d", settings.port);exit(EX_OSERR);}/** 初始化順序:首先創建監聽套接字(低端口的套接字可能需要root權限),* 然后釋放 root 權限,如果設置以守護進程運行 memcached,則 Daemonise it。* 然后初始化 libevent 庫。*//* 創建 UDP 監聽套接字,并綁定該套接字 */errno = 0;if (settings.udpport && server_sockets(settings.udpport, udp_transport,portnumber_file)) {vperror("failed to listen on UDP port %d", settings.udpport);exit(EX_OSERR);}if (portnumber_file) {fclose(portnumber_file);rename(temp_portnumber_filename, portnumber_filename);}}if (pid_file != NULL) {save_pid(pid_file);}/* 釋放特權 */drop_privileges();/* 進入事件循環 */if (event_base_loop(main_base, 0) != 0) {retval = EXIT_FAILURE;}stop_assoc_maintenance_thread();/* 如果不是守護進程,則刪除 PID 文件 */if (do_daemonize)remove_pidfile(pid_file);/* Clean up strdup() call for bind() address */if (settings.inter)free(settings.inter);if (l_socket)free(l_socket);if (u_socket)free(u_socket);return retval; }main 函數中值得注意的幾個函數調用如下:
- conn_init();
- thread_init(settings.num_threads, main_base);
- clock_handler(0, 0, 0);
- server_socket_unix(settings.socketpath,settings.access)
- server_sockets(settings.port, tcp_transport, portnumber_file);
- event_base_loop(main_base, 0);
在分析上面幾個函數之前我們來看看一些重要的變量和結構體的定義:
- 重要變量聲明
- struct conn 結構體定義:
- LIBEVENT_THREAD 和 LIBEVENT_DISPATCHER_THREAD定義:
下面分析conn_init(); 函數:
static void conn_init(void) {freetotal = 200;freecurr = 0;if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {fprintf(stderr, "Failed to allocate connection structures\n");}return; }基本上就是分配 freetotal 個 conn * 空間,非常簡單,
接下來是另外一個重要的函數調用:thread_init();
/** 初始化線程子模塊,創建各種 worker 線程。** nthreads 代表 worker 事件處理線程的數目* main_base 是主線程的event base。*/ void thread_init(int nthreads, struct event_base *main_base) {int i;int power;/* 初始化鎖 */pthread_mutex_init(&cache_lock, NULL);pthread_mutex_init(&stats_lock, NULL);pthread_mutex_init(&init_lock, NULL);pthread_cond_init(&init_cond, NULL);pthread_mutex_init(&cqi_freelist_lock, NULL);cqi_freelist = NULL;/* Want a wide lock table, but don't waste memory */if (nthreads < 3) {power = 10;} else if (nthreads < 4) {power = 11;} else if (nthreads < 5) {power = 12;} else {/* 8192 buckets, and central locks don't scale much past 5 threads */power = 13;}item_lock_count = ((unsigned long int)1 << (power));item_lock_mask = item_lock_count - 1;item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));if (! item_locks) {perror("Can't allocate item locks");exit(1);}for (i = 0; i < item_lock_count; i++) {pthread_mutex_init(&item_locks[i], NULL);}threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));if (! threads) {perror("Can't allocate thread descriptors");exit(1);}/* 設置 dispatcher_thread (即主線程)的相關結構 */dispatcher_thread.base = main_base;dispatcher_thread.thread_id = pthread_self();for (i = 0; i < nthreads; i++) {int fds[2];if (pipe(fds)) {perror("Can't create notify pipe");exit(1);}/* 此處用了一個 trick,worker 線程通過讀取 notify_receice_fd * 一個字節獲知主線程接受到了事件。 */threads[i].notify_receive_fd = fds[0];threads[i].notify_send_fd = fds[1];setup_thread(&threads[i]);/* 為 libevent 保留三個 fd,另外兩個預留給管道 */stats.reserved_fds += 5;}/* 完成了所有的 libevent 設置后創建 worker 線程 */for (i = 0; i < nthreads; i++) {create_worker(worker_libevent, &threads[i]);}/* 主線程等待所有的線程設置好了以后在返回 */pthread_mutex_lock(&init_lock);while (init_count < nthreads) {pthread_cond_wait(&init_cond, &init_lock);}pthread_mutex_unlock(&init_lock); } static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;/** 每個 Libevent 實例都有一對喚醒的管道,其他線程可以想管道中寫入數據* 來告知他在隊列中放入了一個新的連接*/ static LIBEVENT_THREAD *threads;
thread_init() 中又調用了 setup_thread() 來設置每個 worker 線程的信息。
static void setup_thread(LIBEVENT_THREAD *me) {me->base = event_init();if (! me->base) {fprintf(stderr, "Can't allocate event base\n");exit(1);}/* Listen for notifications from other threads */event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);event_base_set(me->base, &me->notify_event);if (event_add(&me->notify_event, 0) == -1) {fprintf(stderr, "Can't monitor libevent notify pipe\n");exit(1);}me->new_conn_queue = malloc(sizeof(struct conn_queue));if (me->new_conn_queue == NULL) {perror("Failed to allocate memory for connection queue");exit(EXIT_FAILURE);}cq_init(me->new_conn_queue);if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {perror("Failed to initialize mutex");exit(EXIT_FAILURE);}me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),NULL, NULL);if (me->suffix_cache == NULL) {fprintf(stderr, "Failed to create suffix cache\n");exit(EXIT_FAILURE);} }并在setup_thread() 中設置 worker 線程的回調函數,thread_libevent_process() :
/** 當每個 worker 線程的喚醒管道(wakeup pipe)收到有連接到來的通知時,* 就調用該函數。*/ static void thread_libevent_process(int fd, short which, void *arg) {LIBEVENT_THREAD *me = arg;CQ_ITEM *item;char buf[1];if (read(fd, buf, 1) != 1)if (settings.verbose > 0)fprintf(stderr, "Can't read from libevent pipe\n");item = cq_pop(me->new_conn_queue);if (NULL != item) {conn *c = conn_new(item->sfd, item->init_state, item->event_flags,item->read_buffer_size, item->transport, me->base);if (c == NULL) {if (IS_UDP(item->transport)) {fprintf(stderr, "Can't listen for events on UDP socket\n");exit(1);} else {if (settings.verbose > 0) {fprintf(stderr, "Can't listen for events on fd %d\n",item->sfd);}close(item->sfd);}} else {c->thread = me;}cqi_free(item);} }thread_init() 中還調用了create_worker() 函數創建 worker 線程,同時設置worker 線程的回調函數為 worker_libevent():
/** Worker 線程: 事件循環*/ static void *worker_libevent(void *arg) {LIBEVENT_THREAD *me = arg;/* thread_init() 會一直阻塞到所有的線程完成初始化*/pthread_mutex_lock(&init_lock);init_count++;pthread_cond_signal(&init_cond);pthread_mutex_unlock(&init_lock);/* worker 線程進入事件循環 */event_base_loop(me->base, 0);return NULL; }至此 thread_init() 函數返回。
接下來一個比較重要的調用是server_sockets(),server_sockets() 中又調用了 server_socket(),然后在在 server_socket() 中又調用了 conn_new(),并在 conn_new()中設置事件的回調函數 event_handler(),
void event_handler(const int fd, const short which, void *arg) {conn *c;c = (conn *)arg;assert(c != NULL);c->which = which;/* sanity */if (fd != c->sfd) {if (settings.verbose > 0)fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");conn_close(c);return;}drive_machine(c);/* wait for next event */return; }drive_machine() 函數可以說是一個大的狀態機,函數很長,
static void drive_machine(conn *c) {bool stop = false;int sfd, flags = 1;socklen_t addrlen;struct sockaddr_storage addr;int nreqs = settings.reqs_per_event;int res;const char *str;assert(c != NULL);while (!stop) {switch(c->state) {case conn_listening:addrlen = sizeof(addr);if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {/* these are transient, so don't log anything */stop = true;} else if (errno == EMFILE) {if (settings.verbose > 0)fprintf(stderr, "Too many open connections\n");accept_new_conns(false);stop = true;} else {perror("accept()");stop = true;}break;}if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {perror("setting O_NONBLOCK");close(sfd);break;}if (settings.maxconns_fast &&stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {str = "ERROR Too many open connections\r\n";res = write(sfd, str, strlen(str));close(sfd);STATS_LOCK();stats.rejected_conns++;STATS_UNLOCK();} else {dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,DATA_BUFFER_SIZE, tcp_transport);}stop = true;break;case conn_waiting:if (!update_event(c, EV_READ | EV_PERSIST)) {if (settings.verbose > 0)fprintf(stderr, "Couldn't update event\n");conn_set_state(c, conn_closing);break;}conn_set_state(c, conn_read);stop = true;break;case conn_read:res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);switch (res) {case READ_NO_DATA_RECEIVED:conn_set_state(c, conn_waiting);break;case READ_DATA_RECEIVED:conn_set_state(c, conn_parse_cmd);break;case READ_ERROR:conn_set_state(c, conn_closing);break;case READ_MEMORY_ERROR: /* Failed to allocate more memory *//* State already set by try_read_network */break;}break;case conn_parse_cmd :if (try_read_command(c) == 0) {/* wee need more data! */conn_set_state(c, conn_waiting);}break;case conn_new_cmd:/* Only process nreqs at a time to avoid starving otherconnections */--nreqs;if (nreqs >= 0) {reset_cmd_handler(c);} else {pthread_mutex_lock(&c->thread->stats.mutex);c->thread->stats.conn_yields++;pthread_mutex_unlock(&c->thread->stats.mutex);if (c->rbytes > 0) {/* We have already read in data into the input buffer,so libevent will most likely not signal read eventson the socket (unless more data is available. As ahack we should just put in a request to write data,because that should be possible ;-)*/if (!update_event(c, EV_WRITE | EV_PERSIST)) {if (settings.verbose > 0)fprintf(stderr, "Couldn't update event\n");conn_set_state(c, conn_closing);}}stop = true;}break;case conn_nread:if (c->rlbytes == 0) {complete_nread(c);break;}/* first check if we have leftovers in the conn_read buffer */if (c->rbytes > 0) {int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;if (c->ritem != c->rcurr) {memmove(c->ritem, c->rcurr, tocopy);}c->ritem += tocopy;c->rlbytes -= tocopy;c->rcurr += tocopy;c->rbytes -= tocopy;if (c->rlbytes == 0) {break;}}/* now try reading from the socket */res = read(c->sfd, c->ritem, c->rlbytes);if (res > 0) {pthread_mutex_lock(&c->thread->stats.mutex);c->thread->stats.bytes_read += res;pthread_mutex_unlock(&c->thread->stats.mutex);if (c->rcurr == c->ritem) {c->rcurr += res;}c->ritem += res;c->rlbytes -= res;break;}if (res == 0) { /* end of stream */conn_set_state(c, conn_closing);break;}if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {if (!update_event(c, EV_READ | EV_PERSIST)) {if (settings.verbose > 0)fprintf(stderr, "Couldn't update event\n");conn_set_state(c, conn_closing);break;}stop = true;break;}/* otherwise we have a real error, on which we close the connection */if (settings.verbose > 0) {fprintf(stderr, "Failed to read, and not due to blocking:\n""errno: %d %s \n""rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",errno, strerror(errno),(long)c->rcurr, (long)c->ritem, (long)c->rbuf,(int)c->rlbytes, (int)c->rsize);}conn_set_state(c, conn_closing);break;case conn_swallow:/* we are reading sbytes and throwing them away */if (c->sbytes == 0) {conn_set_state(c, conn_new_cmd);break;}/* first check if we have leftovers in the conn_read buffer */if (c->rbytes > 0) {int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;c->sbytes -= tocopy;c->rcurr += tocopy;c->rbytes -= tocopy;break;}/* now try reading from the socket */res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);if (res > 0) {pthread_mutex_lock(&c->thread->stats.mutex);c->thread->stats.bytes_read += res;pthread_mutex_unlock(&c->thread->stats.mutex);c->sbytes -= res;break;}if (res == 0) { /* end of stream */conn_set_state(c, conn_closing);break;}if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {if (!update_event(c, EV_READ | EV_PERSIST)) {if (settings.verbose > 0)fprintf(stderr, "Couldn't update event\n");conn_set_state(c, conn_closing);break;}stop = true;break;}/* otherwise we have a real error, on which we close the connection */if (settings.verbose > 0)fprintf(stderr, "Failed to read, and not due to blocking\n");conn_set_state(c, conn_closing);break;case conn_write:/** We want to write out a simple response. If we haven't already,* assemble it into a msgbuf list (this will be a single-entry* list for TCP or a two-entry list for UDP).*/if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {if (add_iov(c, c->wcurr, c->wbytes) != 0) {if (settings.verbose > 0)fprintf(stderr, "Couldn't build response\n");conn_set_state(c, conn_closing);break;}}/* fall through... */case conn_mwrite:if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {if (settings.verbose > 0)fprintf(stderr, "Failed to build UDP headers\n");conn_set_state(c, conn_closing);break;}switch (transmit(c)) {case TRANSMIT_COMPLETE:if (c->state == conn_mwrite) {while (c->ileft > 0) {item *it = *(c->icurr);assert((it->it_flags & ITEM_SLABBED) == 0);item_remove(it);c->icurr++;c->ileft--;}while (c->suffixleft > 0) {char *suffix = *(c->suffixcurr);cache_free(c->thread->suffix_cache, suffix);c->suffixcurr++;c->suffixleft--;}/* XXX: I don't know why this wasn't the general case */if(c->protocol == binary_prot) {conn_set_state(c, c->write_and_go);} else {conn_set_state(c, conn_new_cmd);}} else if (c->state == conn_write) {if (c->write_and_free) {free(c->write_and_free);c->write_and_free = 0;}conn_set_state(c, c->write_and_go);} else {if (settings.verbose > 0)fprintf(stderr, "Unexpected state %d\n", c->state);conn_set_state(c, conn_closing);}break;case TRANSMIT_INCOMPLETE:case TRANSMIT_HARD_ERROR:break; /* Continue in state machine. */case TRANSMIT_SOFT_ERROR:stop = true;break;}break;case conn_closing:if (IS_UDP(c->transport))conn_cleanup(c);elseconn_close(c);stop = true;break;case conn_max_state:assert(false);break;}}return; }可以說整個 memcached 就是圍繞這個狀態機運行的,可能的狀態如下:
enum conn_states {conn_listening, /**< 套接字監聽端口,等待新的連接 */conn_new_cmd, /**< 準備下一次命令的連接 */conn_waiting, /**< 等待可讀套接字 */conn_read, /**< 讀入命令行 */conn_parse_cmd, /**< 從輸入緩沖區中分析命令 */conn_write, /**< 響應寫出 */conn_nread, /**< 讀入固定大小的字節 */conn_swallow, /**< 去除不必要的存儲字節 */conn_closing, /**< 關閉連接 */conn_mwrite, /**< 順序寫 item */conn_max_state /**< 最大的狀態值,用于狀態Assertion(斷言) */ };在 conn_listening 狀態時,接受新的客戶端連接,然后調用dispatch_new_connection():
/** 分發新的連接至其他線程,該函數只會在主線程中調用,* 調用時機為:主線程初始化(UDP模式)或者* 存在新的連接到來*/ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,int read_buffer_size, enum network_transport transport) {CQ_ITEM *item = cqi_new();int tid = (last_thread + 1) % settings.num_threads;LIBEVENT_THREAD *thread = threads + tid;last_thread = tid;item->sfd = sfd;item->init_state = init_state;item->event_flags = event_flags;item->read_buffer_size = read_buffer_size;item->transport = transport;cq_push(thread->new_conn_queue, item);MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);if (write(thread->notify_send_fd, "", 1) != 1) {perror("Writing to thread notify pipe");} }至此,主線程和 worker 線程大部分邏輯均已介紹完畢,并各自進入自己的事件循環處理相應的業務。
讀后語:memcached 代碼簡介易讀,基于 Libevent 處理網絡事件,并采用了多線程機制,大大利用了多核的計算能力,提高了系統接受客戶端請求并發數量。
同時 memcached 在主線程和 worker 線程之間關于新連接到來的通知的處理也比較有趣,主線程和 worker 線程之間使用了一對管道來通信,每當主線程接受到新的連接時,它就向管道的一段寫入一個字節的數據,然后由于 worker 線程監聽了管道另外一端的事件,所以 worker 線程可以感知到新的連接到了,然后該連接被主線程 Dispatch 到某一個線程的隊列中,再由 worker 線程進行處理。
?
總結
以上是生活随笔為你收集整理的Memcached 源码分析——从 main 函数说起的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ps如何去双下巴(Ps小技巧:快速去除双
- 下一篇: 天窗保养(天窗保养也要注意)