让事件飞——Linux eventfd 原理
讓事件飛——Linux eventfd 原理
 讓事件飛 ——Linux eventfd 原理與實踐
 原文作者:楊陽
目前越來越多的應用程序采用事件驅動的方式實現功能,如何高效地利用系統資源實現通知的管理和送達就愈發變得重要起來。在Linux系統中,eventfd是一個用來通知事件的文件描述符,timerfd是的定時器事件的文件描述符。二者都是內核向用戶空間的應用發送通知的機制,可以有效地被用來實現用戶空間的事件/通知驅動的應用程序。
簡而言之,就是eventfd用來觸發事件通知,timerfd用來觸發將來的事件通知。
開發者使用eventfd相關的系統調用,需要包含頭文件;對于timerfd,則是。
系統調用eventfd/timerfd自linux 2.6.22版本加入內核,由Davide Libenzi最初實現和維護。
eventfd
對于eventfd,只有一個系統調用接口
1int eventfd(unsigned int initval, int flags);
 創建一個eventfd對象,或者說打開一個eventfd的文件,類似普通文件的open操作。
該對象是一個內核維護的無符號的64位整型計數器。初始化為initval的值。
flags可以以下三個標志位的OR結果:
EFD_CLOEXEC:FD_CLOEXEC,簡單說就是fork子進程時不繼承,對于多線程的程序設上這個值不會有錯的。
 EFD_NONBLOCK:文件會被設置成O_NONBLOCK,一般要設置。
 EFD_SEMAPHORE:(2.6.30以后支持)支持semophore語義的read,簡單說就值遞減1。
 這個新建的fd的操作很簡單:
read(): 讀操作就是將counter值置0,如果是semophore就減1。
write(): 設置counter的值。
注意,還支持epoll/poll/select操作,當然,以及每種fd都都必須實現的close。
timerfd
 對于timerfd,有三個涉及的系統調用接口
1int timerfd_create(int clockid, int flags);int timerfd_settime(int fd, int flags,
 2 const struct itimerspec *new_value,
 3 struct itimerspec *old_value);int timerfd_gettime(int fd, struct itimerspec *curr_value);
 timerfd_create就是用來創建新的timerfd對象,clockid可以指定時鐘的種類,比較常用的有兩種:CLOCK_REALTIME(實時時鐘)或 CLOCK_MONOTONIC(單調遞增時鐘)。實時時鐘是指系統的時鐘,它可以被手工修改。而后者單調遞增時鐘則是不會被系統時鐘的人為設置的不連續所影響的。通常選擇后者。而flags的選擇,TFD_CLOEXEC和TFD_NONBLOCK的意義就比較直接了。
timerfd_settime函數用來設置定時器的過期時間expiration。itmerspec結構定義如下:
1struct timespec {
 2 time_t tv_sec; /* Seconds /
 3 long tv_nsec; / Nanoseconds /};struct itimerspec {
 4 struct timespec it_interval; / Interval for periodic timer /
 5 struct timespec it_value; / Initial expiration */};
 該結構包含兩個時間間隔:it_value是指第一次過期時間,it_interval是指第一次到期之后的周期性觸發到期的間隔時間,(設為0的話就是到期第一次)。
old_value如果不為NULL,將會用調用時間來更新old_value所指的itimerspec結構對象。
timerfd_gettime():返回當前timerfd對象的設置值到curr_value指針所指的對象。
read():讀操作的語義是:如果定時器到期了,返回到期的次數,結果存在一個8字節的整數(uint64_6);如果沒有到期,則阻塞至到期,或返回EAGAIN(取決于是否設置了NONBLOCK)。
另外,支持epoll,同eventfd。
生產者-消費者設計模式是常見的后臺架構模式。本實例將實現多個生產者和多個消費者的事件通知框架,用以闡釋eventfd/timerfd在線程通信中作為通知實現的典型場景。
本實例采用以下設計:生產者創建eventfd/timerfd并在事件循環中注冊事件;消費者線程池中的線程共用一個epoll對象,每個消費者線程并行地進行針對eventfd或timerfd觸發的事件循環的輪詢(epoll_wait)。
eventfd對應實現
 1typedef struct thread_info {
 2 pthread_t thread_id;
 3 int rank;
 4 int epfd;} thread_info_t;static void *consumer_routine(void *data) {
 5 struct thread_info *c = (struct thread_info *)data;
 6 struct epoll_event *events;
 7 int epfd = c->epfd;
 8 int nfds = -1;
 9 int i = -1;
 10 uint64_t result;
 11
 12 log(“Greetings from [consumer-%d]”, c->rank);
 13 events = calloc(MAX_EVENTS_SIZE, sizeof(struct epoll_event));
 14 if (events == NULL) handle_error(“calloc epoll events\n”);
 15
 16 for (;😉 {
 17 nfds = epoll_wait(epfd, events, MAX_EVENTS_SIZE, 1000); // poll every second
 18 for (i = 0; i < nfds; i++) {
 19 if (events[i].events & EPOLLIN) {
 20 log("[consumer-%d] got event from fd-%d", c->rank, events[i].data.fd);
 21 // consume events (reset eventfd)
 22 read(events[i].data.fd, &result, sizeof(uint64_t));
 23 close(events[i].data.fd); // NOTE: need to close here
 24 }
 25 }
 26 }}static void *producer_routine(void *data) {
 27 struct thread_info *p = (struct thread_info *)data;
 28 struct epoll_event event;
 29 int epfd = p->epfd;
 30 int efd = -1;
 31 int ret = -1;
 32
 33 log(“Greetings from [producer-%d]”, p->rank);
 34 while (1) {
 35 sleep(1);
 36 // create eventfd (no reuse, create new every time)
 37 efd = eventfd(1, EFD_CLOEXEC|EFD_NONBLOCK);
 38 if (efd == -1) handle_error(“eventfd create: %s”, strerror(errno));
 39 // register to poller
 40 event.data.fd = efd;
 41 event.events = EPOLLIN | EPOLLET; // Edge-Triggered
 42 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &event);
 43 if (ret != 0) handle_error(“epoll_ctl”);
 44 // trigger (repeatedly)
 45 write(efd, (void *)0xffffffff, sizeof(uint64_t));
 46 }}int main(int argc, char *argv[]) {
 47 struct thread_info *p_list = NULL, *c_list = NULL;
 48 int epfd = -1;
 49 int ret = -1, i = -1;
 50 // create epoll fd
 51 epfd = epoll_create1(EPOLL_CLOEXEC);
 52 if (epfd == -1) handle_error(“epoll_create1: %s”, strerror(errno));
 53 // producers
 54 p_list = calloc(NUM_PRODUCERS, sizeof(struct thread_info));
 55 if (!p_list) handle_error(“calloc”);
 56 for (i = 0; i < NUM_PRODUCERS; i++) {
 57 p_list[i].rank = i;
 58 p_list[i].epfd = epfd;
 59 ret = pthread_create(&p_list[i].thread_id, NULL, producer_routine, &p_list[i]);
 60 if (ret != 0) handle_error(“pthread_create”);
 61 }
 62 // consumers
 63 c_list = calloc(NUM_CONSUMERS, sizeof(struct thread_info));
 64 if (!c_list) handle_error(“calloc”);
 65 for (i = 0; i < NUM_CONSUMERS; i++) {
 66 c_list[i].rank = i;
 67 c_list[i].epfd = epfd;
 68 ret = pthread_create(&c_list[i].thread_id, NULL, consumer_routine, &c_list[i]);
 69 if (ret != 0) handle_error(“pthread_create”);
 70 }
 71 // join and exit
 72 for (i = 0; i < NUM_PRODUCERS; i++) {
 73 ret = pthread_join(p_list[i].thread_id, NULL);
 74 if (ret != 0) handle_error(“pthread_join”);
 75 }
 76 for (i = 0; i < NUM_CONSUMERS; i++) {
 77 ret = pthread_join(c_list[i].thread_id, NULL);
 78 if (ret != 0) handle_error(“pthread_join”);
 79 }
 80 free(p_list);
 81 free(c_list);
 82 return EXIT_SUCCESS;}
 執行過程(2個生產者,4個消費者):
1[1532099804] Greetings from [producer-0]
 2[1532099804] Greetings from [producer-1]
 3[1532099804] Greetings from [consumer-0]
 4[1532099804] Greetings from [consumer-1]
 5[1532099804] Greetings from [consumer-2]
 6[1532099804] Greetings from [consumer-3]
 7[1532099805] [consumer-3] got event from fd-4
 8[1532099805] [consumer-3] got event from fd-5
 9[1532099806] [consumer-0] got event from fd-4
 10[1532099806] [consumer-0] got event from fd-4
 11[1532099807] [consumer-1] got event from fd-4
 12[1532099807] [consumer-1] got event from fd-5
 13[1532099808] [consumer-3] got event from fd-4
 14[1532099808] [consumer-3] got event from fd-5
 15^C
 結果符合預期(附:源碼鏈接)
注意,推薦在eventfd在打開時設置NON_BLOCKING,并在注冊至epoll監聽對象時設為EPOLLET(盡管一次8字節的read就可以讀完整個計數器到用戶空間),因為畢竟,只有采用了非阻塞IO和邊沿觸發,epoll的并發能力才能完全發揮極致。
另外,本實例中的eventfd消費地非常高效,fd號幾乎不會超過5(前四個分別為stdin/stdout/stderr/eventpoll),但實際應用中往往在close前會執行一些事務,隨著消費者線程的增加,eventfd打開的文件也會增加(這個數值得上限由系統的ulimit -n決定)。然而,eventfd打開、讀寫和關閉都效非常高,因為它本質并不是文件,而是kernel在內核空間(內存中)維護的一個64位計數器而已。
timerfd對應實現
 main函數和consumer線程實現幾乎一致,而producer線程創建timerfd,并注冊到事件循環中。
timer的it_value設為1秒,即第一次觸發為1秒以后;it_interval設為3秒,即后續每3秒再次觸發一次。
注意,timerfd_settime函數的位置與之前eventfd的write的相同,二者達到了類似的設置事件的作用,只不過這次是定時器事件。
1static void *producer_routine(void *data) {
 2 struct thread_info *p = (struct thread_info *)data;
 3 struct epoll_event event;
 4 int epfd = p->epfd;
 5 int tfd = -1;
 6 int ret = -1;
 7 struct itimerspec its;
 8 its.it_value.tv_sec = 1; // initial expiration
 9 its.it_value.tv_nsec = 0;
 10 its.it_interval.tv_sec = 3; // interval
 11 its.it_interval.tv_nsec = 0;
 12
 13 log(“Greetings from [producer-%d]”, p->rank);
 14 // create timerfd
 15 tfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC|TFD_NONBLOCK);
 16 if (tfd == -1) handle_error(“timerfd create: %s”, strerror(errno));
 17 // register to poller
 18 event.data.fd = tfd;
 19 event.events = EPOLLIN | EPOLLET; // Edge-Triggered
 20 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, tfd, &event);
 21 if (ret != 0) handle_error(“epoll_ctl”);
 22 // register timer expired in future
 23 ret = timerfd_settime(tfd, 0, &its, NULL);
 24 if (ret != 0) handle_error(“timerfd settime”);
 25 return (void *)0;}
 執行過程(2個生產者,4個消費者):
1[1532099143] Greetings from [producer-1]
 2[1532099143] Greetings from [consumer-1]
 3[1532099143] Greetings from [consumer-2]
 4[1532099143] Greetings from [consumer-3]
 5[1532099143] Greetings from [consumer-0]
 6[1532099143] Greetings from [producer-0]
 7[1532099144] [consumer-3] got event from fd-4
 8[1532099144] [consumer-3] got event from fd-5
 9[1532099147] [consumer-3] got event from fd-4
 10[1532099147] [consumer-3] got event from fd-5
 11[1532099150] [consumer-0] got event from fd-4
 12[1532099150] [consumer-0] got event from fd-5
 13[1532099153] [consumer-1] got event from fd-4
 14[1532099153] [consumer-1] got event from fd-5
 15^C
從上圖可以看出,運行時打開的fd-4和fd-5兩個文件描述符即是timerfd。
結果符合預期(附:源碼鏈接)
引用eventfs的Manual中NOTE段落的第一句話:
Applications can use an eventfd file descriptor instead of a pipe in all cases where a pipe is used simply to signal events.
在信號通知的場景下,相比pipe有非常大的資源和性能優勢。其根本在于counter(計數器)和channel(數據信道)的區別。
第一,是打開文件數量的巨大差別。由于pipe是半雙工的傳統IPC方式,所以兩個線程通信需要兩個pipe文件,而用eventfd只要打開一個文件。眾所周知,文件描述符可是系統中非常寶貴的資源,linux的默認值也只有1024而已。那開發者可能會說,1相比2也只節省了一半嘛。要知道pipe只能在兩個進程/線程間使用,并且是面向連接(類似TCP socket)的,即需要之前準備好兩個pipe;而eventfd是廣播式的通知,可以多對多的。如上面的NxM的生產者-消費者例子,如果需要完成全雙工的通信,需要NxMx2個的pipe,而且需要提前建立并保持打開,作為通知信號實在太奢侈了,但如果用eventfd,只需要在發通知的時候瞬時創建、觸發并關閉一個即可。
 第二,是內存使用的差別。eventfd是一個計數器,內核維護幾乎成本忽略不計,大概是自旋鎖+喚醒隊列(后續詳細介紹),8個字節的傳輸成本也微乎其微。但pipe可就完全不是了,一來一回數據在用戶空間和內核空間有多達4次的復制,而且更糟糕的是,內核還要為每個pipe分配至少4K的虛擬內存頁,哪怕傳輸的數據長度為0。
 第三,對于timerfd,還有精準度和實現復雜度的巨大差異。由內核管理的timerfd底層是內核中的hrtimer(高精度時鐘定時器),可以精確至納秒(1e-9秒)級,完全勝任實時任務。而用戶態要想實現一個傳統的定時器,通常是基于優先隊列/二叉堆,不僅實現復雜維護成本高,而且運行時效率低,通常只能到達毫秒級。
 所以,第一個最佳實踐法則:當pipe只用來發送通知(傳輸控制信息而不是實際數據),放棄pipe,放心地用eventfd/timerfd,“in all cases”。
另外一個重要優勢就是eventfd/timerfd被設計成與epoll完美結合,比如支持非阻塞的讀取等。事實上,二者就是為epoll而生的(但是pipe就不是,它在Unix的史前時代就有了,那時不僅沒有epoll連Linux都還沒誕生)。應用程序可以在用epoll監控其他文件描述符的狀態的同時,可以“順便“”一起監控實現了eventfd的內核通知機制,何樂而不為呢?
所以,第二個最佳實踐法則:eventfd配上epoll才更搭哦。
eventfd在內核源碼中,作為syscall實現在內核源碼的 fs/eventfd.c下。從Linux 2.6.22版本引入內核,在2.6.27版本以后加入對flag的支持。以下分析參考Linux 2.6.27源碼。
內核中的數據結構:eventfd_ctx
該結構除了包括之前所介紹的一個64位的計數器,還包括了等待隊列頭節點(較新的kernel中還加上了一個kref)。
定義和初始化過程核心代碼如下,比較直接:內核malloc,設置count值,創建eventfd的anon_inode。
1struct eventfd_ctx {
 2 wait_queue_head_t wqh;
 3 __u64 count;};
 以下為創建eventfd的函數的片段,比較直接。
1SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags) {
 2 // …
 3 ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
 4 if (!ctx)
 5 return -ENOMEM;
 6 init_waitqueue_head(&ctx->wqh);
 7 ctx->count = count;
 8 fd = anon_inode_getfd("[eventfd]", &eventfd_fops, ctx,
 9 flags & (O_CLOEXEC | O_NONBLOCK));
 10 // …}
 稍提一下,等待隊列是內核中的重要數據結構,在進程調度、異步通知等多種場景都有很多的應用。其節點結構并不復雜,即自帶自旋鎖的雙向循環鏈表的節點,如下:
1struct __wait_queue_head {
 2 spinlock_t lock;
 3 struct list_head task_list;};typedef struct __wait_queue_head wait_queue_head_t;
 等待隊列中存放的是task(內存中對線程的抽象)的結構。
操作等待隊列的函數主要是和調度相關的函數,如:wake_up和schedule,它們位于sched.c中,前者即喚醒當前等待隊列中的task,后者為當前task主動讓出CPU時間給等待隊列中的其他task。這樣,便通過等待隊列實現了多個task在運行中(TASK_RUNNING)和IO等待(TASK_INTERRUPTABLE)中的狀態切換。
讓我們一起復習下,系統中進程的狀態轉換:
TASK_RUNNING: 正在在CPU上運行,或者在執行隊列(run queue)等待被調度執行。
 TASK_INTERRUPTIBLE: 睡眠中等待默寫事件出現,task可以被信號打斷,一旦接收到信號或顯示調用了wake-up,轉為TASK_RUNNING狀態。常見于IO等待中。
 清楚了task的兩種狀態以及run queue / wait queue原理,read函數就不難理解了。
以下是read函數的實現:
1static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count,
 2 loff_t *ppos){
 3 struct eventfd_ctx *ctx = file->private_data;
 4 ssize_t res;
 5 __u64 ucnt;
 6 DECLARE_WAITQUEUE(wait, current);
 7
 8 if (count < sizeof(ucnt))
 9 return -EINVAL;
 10 spin_lock_irq(&ctx->wqh.lock);
 11 res = -EAGAIN;
 12 ucnt = ctx->count;
 13 if (ucnt > 0)
 14 res = sizeof(ucnt);
 15 else if (!(file->f_flags & O_NONBLOCK)) {
 16 __add_wait_queue(&ctx->wqh, &wait);
 17 for (res = 0;😉 {
 18 set_current_state(TASK_INTERRUPTIBLE);
 19 if (ctx->count > 0) {
 20 ucnt = ctx->count;
 21 res = sizeof(ucnt);
 22 break;
 23 }
 24 if (signal_pending(current)) {
 25 res = -ERESTARTSYS;
 26 break;
 27 }
 28 spin_unlock_irq(&ctx->wqh.lock);
 29 schedule();
 30 spin_lock_irq(&ctx->wqh.lock);
 31 }
 32 __remove_wait_queue(&ctx->wqh, &wait);
 33 __set_current_state(TASK_RUNNING);
 34 }
 35 if (res > 0) {
 36 ctx->count = 0;
 37 if (waitqueue_active(&ctx->wqh))
 38 wake_up_locked(&ctx->wqh);
 39 }
 40 spin_unlock_irq(&ctx->wqh.lock);
 41 if (res > 0 && put_user(ucnt, (__u64 __user *) buf))
 42 return -EFAULT;
 43
 44 return res;}
 read操作目的是要將count值返回用戶空間并清零。ctx中的count值是共享數據,通過加irq自旋鎖實現對其的獨占安全訪問,spin_lock_irq函數可以禁止本地中斷和搶占,在SMP體系中也是安全的。從源碼可以看出,如果是對于(通常的epoll中的,也是上面實例中的)非阻塞讀,count大于0則直接返回并清零,count等于0則直接返回EAGAIN。
對于阻塞讀,如果count值為0則加入等待隊列并阻塞,直到值不為0時(被其他線程更新)返回。阻塞是如何實現的呢?是通過TASK_INTERRUPTABLE狀態下的循環加schedule。注意,schedule前釋放了自旋鎖,意味著允許其他線程更新值,只要值被更新大于0且又再次獲得cpu時間,那么就可以跳出循環繼續執行而返回了。
考慮一個情景,兩個線程幾乎同時read請求,那么:兩個都會被加入到等待隊列中,當第一個搶到自旋鎖,返回了大于1的res并重置了count為0,此時它會(在倒數第二個if那里) 第一時間喚醒等待隊列中的其他線程,此時第二個線程被調度到,于是開始了自己的循環等待。即實現了:事件只會通知到第一個接收到的線程。
那么問題來了:我們知道在其他線程write后,阻塞的read線程是馬上返回的。那么如何能在count置一旦不為0時,等待的調度的阻塞讀線程可以盡快地再次獲得cpu時間,從而繼續執行呢?關鍵在于write函數也有當確認可以成功返回時,主動調用wakeup_locked的過程,這樣就能實現write后立即向等待隊列通知的效果了。
write操作與read操作過程非常相似,不在此展開。
關于poll操作的核心代碼如下:
1// …
 2 spin_lock_irqsave(&ctx->wqh.lock, flags);
 3 if (ctx->count > 0)
 4 events |= POLLIN;
 5 if (ctx->count == ULLONG_MAX)
 6 events |= POLLERR;
 7 if (ULLONG_MAX - 1 > ctx->count)
 8 events |= POLLOUT;
 9 spin_unlock_irqrestore(&ctx->wqh.lock, flags);
 在count值大于0時,返回了設置POLLIN標志的事件,使得用戶層的應用可以通過epoll監控 eventfd的可讀事件狀態。
本篇小結
 通過對eventfd/timerfd的接口和實現的了解,可以看出其不僅功能實用,而且調用方式簡單。另外,其實現是非常精巧高效的,構建于內核眾多系統基礎核心功能之上,為用戶態的應用封裝了十分高效簡單的事件通知機制。
參考資料
 Linux 內核源碼 https://elixir.bootlin.com/linux/latest/source/fs/eventfd.c
Linux Programmer’s Manual eventfd(2) - Linux manual page
總結
以上是生活随笔為你收集整理的让事件飞——Linux eventfd 原理的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 摄像模组中光学相关知识(四)
 - 下一篇: **JAVA实习周记(第一周):任何的浮