epoll源码分析
epoll源碼分析
最近在使用libev過程中遇到一個場景:一個fd從一個ev_loop遷移到另一個ev_loop,會出現(xiàn)這個fd同時存在兩個epoll的瞬間。
不禁要問了,一個fd同時被兩個epoll監(jiān)視的行為是怎樣的,epoll嵌套使用是怎樣實(shí)現(xiàn)的?為此,整理了以前讀的epoll源碼。
概述
epoll的擴(kuò)展性和性能關(guān)鍵在于兩個數(shù)據(jù)結(jié)構(gòu): 0) 一個rbtree; 1) 一個ready list.
epoll是有狀態(tài)的, 內(nèi)核中維護(hù)了一個數(shù)據(jù)結(jié)構(gòu)用來管理所要監(jiān)視的fd,這個數(shù)據(jù)結(jié)構(gòu)是eventpoll.
在eventpoll中有一顆紅黑樹, 用來快速的查找和修改要監(jiān)視的fd,每個節(jié)點(diǎn)被封裝成epitem結(jié)構(gòu).
在eventpoll中有一個列表, 用來收集已經(jīng)發(fā)生事件的epitem, 這個list叫ready list.
epoll系統(tǒng)的初始化
eventpoll_init() {eventpoll_mnt = kern_mount(&eventpoll_fs_type);epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC,NULL, NULL);pwq_cache = kmem_cache_create("eventpoll_pwq",sizeof(struct eppoll_entry), 0,EPI_SLAB_DEBUG|SLAB_PANIC, NULL, NULL);error = register_filesystem(&eventpoll_fs_type);eventpoll_mnt = kern_mount(&eventpoll_fs_type); }init初始化代碼很簡單:
1. 申請epitem的緩沖;
2. 申請eppoll_entry的緩沖;
3. 把epoll和文件系統(tǒng)關(guān)聯(lián)起來.
下圖是 epoll和VFS的關(guān)聯(lián):
epoll創(chuàng)建 - epoll_create
SYSCALL_DEFINE1(epoll_create1, int, flags) {int error, fd;struct eventpoll *ep = NULL;struct file *file;error = ep_alloc(&ep);fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, (flags & O_CLOEXEC));fd_install(fd, file);ep->file = file;return fd; }epoll添加事件 - epoll_ctl
asmlinkage long sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event) {struct file *file, *tfile;struct eventpoll *ep;struct epitem *epi;struct epoll_event epds;tfile = fget(fd);// 判斷epfd是否是一個epollif (file == tfile || !is_file_epoll(file))goto eexit_3;// 從private_data中取出eventpoll指針// 并且上鎖, 因此一個epoll_ctl是線程安全的ep = file->private_data;down_write(&ep->sem);// 嘗試著從紅黑樹ep->rbr上找到tfile對應(yīng)的一個epitemepi = ep_find(ep, tfile, fd);error = -EINVAL;switch (op) {case EPOLL_CTL_ADD:if (!epi) { // 如果是ADD操作,并且這個fd不在eventpoll里,則執(zhí)行插入操作,注意:內(nèi)核會主動加上POLLERR和POLLHUP事件epds.events |= POLLERR | POLLHUP;error = ep_insert(ep, &epds, tfile, fd);} else // 否則設(shè)置errorerror = -EEXIST;clear_tfile_check_list();break;case EPOLL_CTL_DEL:if (epi)error = ep_remove(ep, epi);elseerror = -ENOENT;break;case EPOLL_CTL_MOD:if (epi) {epds.events |= POLLERR | POLLHUP;error = ep_modify(ep, epi, &epds);} elseerror = -ENOENT;break;}} }ep_insert插入事件
下圖是epoll的數(shù)據(jù)結(jié)構(gòu)。root指向紅黑樹的樹根;rdlist指向待收割事件的列表ready list:
在插入一個fd到epoll中會顯示調(diào)用一次poll, 對于tcp來說是tcp_poll.
來看看poll是如何初始化和被調(diào)用的.
tcp_poll
下圖是網(wǎng)卡硬件中斷觸發(fā)epoll_wait返回的調(diào)用路徑:
tcp_poll的邏輯
static unsigned int sock_poll(struct file *file, poll_table * wait) {struct socket *sock;sock = file->private_data;return sock->ops->poll(file, sock, wait); }// 0) 注冊事件到tcp中; // 1) 返回此時已經(jīng)發(fā)生的事件. unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait) {unsigned int mask;struct sock *sk = sock->sk;struct tcp_sock *tp = tcp_sk(sk);// 注冊一個回調(diào)到sk->sk_sleep中// 注意, wait為空時忽略注冊動作poll_wait(file, sk->sk_sleep, wait);// 如果是監(jiān)聽套接字,則inet_csk_listen_pollif (sk->sk_state == TCP_LISTEN)return inet_csk_listen_poll(sk);mask = 0;if (sk->sk_err)mask = POLLERR;// copied_seq 和 rcv_nxt 不相等,則說明有未讀數(shù)據(jù)出現(xiàn)了if ((1 << sk->sk_state) & ~(TCPF_SYN_SENT | TCPF_SYN_RECV)) {if ((tp->rcv_nxt != tp->copied_seq) && (tp->urg_seq != tp->copied_seq || tp->rcv_nxt != tp->copied_seq + 1 || sock_flag(sk, SOCK_URGINLINE) || !tp->urg_data))mask |= POLLIN | POLLRDNORM;if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {mask |= POLLOUT | POLLWRNORM;}} }看看如何注冊回調(diào)到tcp socket中
// 反向調(diào)用poll_table->qproc,注冊一個poll_callback static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) {if (p && wait_address)p->qproc(filp, wait_address, p); }// 注冊poll_callback到sock->sk_sleep上 // 0) file是sock對應(yīng)的file句柄; // 1) whead是sock->sk_sleep static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) {struct epitem *epi = ep_item_from_epqueue(pt);struct eppoll_entry *pwq;if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, SLAB_KERNEL))) {init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);pwq->whead = whead;pwq->base = epi;add_wait_queue(whead, &pwq->wait);list_add_tail(&pwq->llink, &epi->pwqlist);epi->nwait++;} else {/* We have to signal that an error occurred */epi->nwait = -1;} }// 只要socket上有事件發(fā)生就會回調(diào)上面注冊的回調(diào)
poll_callback的回調(diào)
數(shù)據(jù)包到達(dá):
PKT Arrive INT
--> Driver
--> 0) alloc_skb; 1) netif_rx
--> RX_SOFTIRQ
--> net_rx_action軟中斷處理函數(shù) (dev->poll)
--> process_backlog
--> netif_receive_skb
--> tcp_v4_rcv()
--> tcp_v4_do_rcv
--> tcp_rcv_state_process
--> sock_def_wakeup
--> ep_poll_callback
回調(diào)
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) {int pwake = 0;unsigned long flags;// 通過wait找到epoll_entry// 通過epoll_entry->base找到epitemstruct epitem *epi = ep_item_from_wait(wait);struct eventpoll *ep = epi->ep;spin_lock_irqsave(&ep->lock, flags);if (!(epi->event.events & ~EP_PRIVATE_BITS))goto out_unlock;if (key && !((unsigned long) key & epi->event.events))goto out_unlock;// 把當(dāng)前epitem添加到ready list中,等待收割if (!ep_is_linked(&epi->rdllink))list_add_tail(&epi->rdllink, &ep->rdllist);// 在收到數(shù)據(jù)包的回調(diào)中喚醒等待在epll上的進(jìn)程 if (waitqueue_active(&ep->wq))wake_up_locked(&ep->wq);// 喚醒嵌套epoll的進(jìn)程if (waitqueue_active(&ep->poll_wait))pwake++;pin_unlock_irqrestore(&ep->lock, flags);if (pwake)ep_poll_safewake(&ep->poll_wait);return 1; }下面看看用戶態(tài)如何收割事件.
epoll事件收割 - epoll_wait
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout) {error = ep_poll(ep, events, maxevents, timeout); }static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) {int res = 0, eavail, timed_out = 0;if (timeout > 0) {struct timespec end_time = ep_set_mstimeout(timeout);slack = select_estimate_accuracy(&end_time);to = &expires;*to = timespec_to_ktime(end_time);} else if (timeout == 0) {timed_out = 1;spin_lock_irqsave(&ep->lock, flags);goto check_events;}fetch_events:spin_lock_irqsave(&ep->lock, flags);// 如果ready list為空if (!ep_events_available(ep)) {init_waitqueue_entry(&wait, current);wait.flags |= WQ_FLAG_EXCLUSIVE;// 把當(dāng)前進(jìn)程添加到等待隊(duì)列中__add_wait_queue(&ep->wq, &wait);for (;;) {// 設(shè)置進(jìn)程的狀態(tài)為TASK_INTERRUPTIBLE,以便在ep_poll_callback將其喚醒set_current_state(TASK_INTERRUPTIBLE);// ready list非空if (ep_events_available(ep) || timed_out)break;// 有信號返回EINTR if (signal_pending(current)) {res = -EINTR;break;}// 解鎖準(zhǔn)備調(diào)度出去spin_unlock_irqrestore(&ep->lock, flags);if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))timed_out = 1;// 再次運(yùn)行后,第一件事就是獲取鎖 spin_lock_irqsave(&ep->lock, flags);}__remove_wait_queue(&ep->wq, &wait);set_current_state(TASK_RUNNING);}eavail = ep_events_available(ep);// 開始收割事件ep_send_events(ep, events, maxevents); }static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents) {struct ep_send_events_data esed;esed.maxevents = maxevents;esed.events = events;return ep_scan_ready_list(ep, ep_send_events_proc, &esed); }static int ep_scan_ready_list(struct eventpoll *ep, int (*sproc)(struct eventpoll *, struct list_head *, void *), void *priv) {int error, pwake = 0;unsigned long flags;struct epitem *epi, *nepi;LIST_HEAD(txlist);// 上鎖,和epoll_ctl, epoll_wait互斥mutex_lock(&ep->mtx);// 原子的置換readlist 到 txlist中// 并且開啟ovflist, 使得在sproc執(zhí)行過程中產(chǎn)生的事件存入其中, 是一個事件的臨時停靠點(diǎn)spin_lock_irqsave(&ep->lock, flags);list_splice_init(&ep->rdllist, &txlist);ep->ovflist = NULL;spin_unlock_irqrestore(&ep->lock, flags);// 開始調(diào)用sproc組織事件到用戶空間的數(shù)組中error = (*sproc)(ep, &txlist, priv);spin_lock_irqsave(&ep->lock, flags);for (nepi = ep->ovflist; (epi = nepi) != NULL;nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {// 把sproc執(zhí)行期間產(chǎn)生的事件加入到ready list中, 但是有可能這些新誕生的事件到目前為止還在txlist中// 也就是, 有可能sproc并沒有消耗完本次的ready list,那么剩下的事件要等到下次epoll_wait來收割// 所以,// 0) 需要去重, 這是通過ep_is_linked(&epi->rdllink)來做到的, 因?yàn)槿绻@個epi在txlist中, 它的rdllikn非空;// 1) 需要把還沒有被收割到用戶空間的事件再次的放入ready list中, 并且要保證這些事件在新誕生的事件的前面, 這是通過list_splice做到的.if (!ep_is_linked(&epi->rdllink))list_add_tail(&epi->rdllink, &ep->rdllist);}// 關(guān)閉ovflist ep->ovflist = EP_UNACTIVE_PTR;list_splice(&txlist, &ep->rdllist);// 喚醒if (!list_empty(&ep->rdllist)) {if (waitqueue_active(&ep->wq))wake_up_locked(&ep->wq);if (waitqueue_active(&ep->poll_wait))pwake++;}spin_unlock_irqrestore(&ep->lock, flags);mutex_unlock(&ep->mtx);if (pwake)ep_poll_safewake(&ep->poll_wait);return error; }事件是怎么被收割到用戶空間的
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv) {struct ep_send_events_data *esed = priv;int eventcnt;unsigned int revents;struct epitem *epi;struct epoll_event __user *uevent;// 這個函數(shù)不需要再上鎖了// 收割事件的個數(shù)上限是esed->maxeventsfor (eventcnt = 0, uevent = esed->events; !list_empty(head) && eventcnt < esed->maxevents;) {epi = list_first_entry(head, struct epitem, rdllink);// 已經(jīng)被收割的事件要從txlist中移除掉, 很重要.// 因?yàn)?#xff0c;并不是txlist上的所有的事件都會被收割到用戶空間// 剩下的未收割的事件要再次的放回到ready listlist_del_init(&epi->rdllink);// 顯示的tcp_poll一次事件, 看看這個fd上發(fā)生了什么事情, 并和自己關(guān)心的事件做交集revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) & epi->event.events;if (revents) {// 回傳到用戶空間 if (__put_user(revents, &uevent->events) ||__put_user(epi->event.data, &uevent->data)) {list_add(&epi->rdllink, head);return eventcnt ? eventcnt : -EFAULT;}eventcnt++;uevent++;if (epi->event.events & EPOLLONESHOT)epi->event.events &= EP_PRIVATE_BITS;else if (!(epi->event.events & EPOLLET)) {// 如果是LT模式要再次放入到ready list中// 難道這個事件就一直在ready list中了? 用戶態(tài)的epoll_wait豈不是每次都會收割到事件?什么時候會被剔除掉?// 非也(以讀事件為例):// 0) 如果用戶態(tài)在epoll_wait中獲取到了一個epi事件, 并沒有處理, 那么這個事件是一直存在在fd上的(舉個例子: 可讀狀態(tài)會一直處于可讀, rcv_nxt>copied_seq)// 1) 用戶態(tài)代碼不讀取數(shù)據(jù)或僅僅讀取了部分?jǐn)?shù)據(jù), 為了保證LT語義, 下次epoll_wait時候能夠再次獲取到改epi, 這個epi必須要保存到ready list中;// 2) 用戶態(tài)代碼一直讀取這個fd上的數(shù)據(jù)直到EGAIN, 下次epoll_wait的時候任然會從ready list中碰到這個事件, 但此時tcp_poll不會返回可讀事件了, 所以此后會從ready list中剔除掉.// 3) 也就是, epoll事件的剔除是發(fā)生在下一次epoll_wait中l(wèi)ist_add_tail(&epi->rdllink, &ep->rdllist);}}}return eventcnt; }自問自答
問: 一個fd加入到多個epoll行為如何?
答: 統(tǒng)一個fd通過epoll_ctl添加到兩個epoll中,在epoll_ctl流程中會通過tcp_poll調(diào)用在fd->sock->sk_sleep中插入一個回調(diào)。也就是說,兩次epoll_ctl就會往同一個fd的sk_sleep中插入兩個回調(diào)。在有事件到來時會遍歷sk_sleep上所有的回調(diào)。所以,會觸發(fā)兩次epoll_wait返回。
問:epoll的LT和ET如何實(shí)現(xiàn)的?和具體的poll()有關(guān)嗎?
答:具體的poll()函數(shù)是無感知LT和ET的。tcp_poll在state change時候會回調(diào)sk_sleep上的回調(diào)。epoll在收割事件的時候會判斷是ET還是LT,如果是ET則把epi從ready list移除掉,并且加入到用戶態(tài)的events數(shù)組中,所以下次epoll_wait就不會收割到這個事件了,除非state change又發(fā)生了變化觸發(fā)了回調(diào);如果是LT除了把epi加入到用戶態(tài)的events數(shù)組中,還會再次加入到ready list之后,下次epoll_wait會再次返回,但是并不會始終返回。
轉(zhuǎn)載于:https://www.cnblogs.com/diegodu/p/9377535.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
- 上一篇: HttpSession 和 HttpSe
- 下一篇: Yii2 使用 RESTful 写API