管道与系统调用pipe
管道機制的主體是系統調用pipe,但是由pipe所建立的管道的兩端都在同一進程中,所以必須在fork的配合下,才能在父子進程之間或兩個子進程之間建立起進程間的通信管道。
我們先來看系統調用pipe。
由于管道兩端都是以(已打開)文件的形式出現在相關的進程中,在具體實現上也是作為匿名文件來實現的,所以pipe的代碼與文件系統密切相關。
先看系統調用的入口sys_pipe,其代碼如下:
/** sys_pipe() is the normal C calling standard for creating* a pipe. It's not the way Unix traditionally does this, though.*/ asmlinkage int sys_pipe(unsigned long * fildes) {int fd[2];int error;error = do_pipe(fd);if (!error) {if (copy_to_user(fildes, fd, 2*sizeof(int)))error = -EFAULT;}return error; }這里由do_pipe建立起一個管道,通過作為調用參數的數組fd返回帶抱著管道兩端的兩個已打開文件號,再由copy_to_user將數組fd復制到用戶空間。顯然,do_pipe是這個系統調用的主體,其代碼在fs/pipe.c中,我們分段閱讀:
sys_pipe=>do_pipe
int do_pipe(int *fd) {struct qstr this;char name[32];struct dentry *dentry;struct inode * inode;struct file *f1, *f2;int error;int i,j;error = -ENFILE;f1 = get_empty_filp();if (!f1)goto no_files;f2 = get_empty_filp();if (!f2)goto close_f1;inode = get_pipe_inode();if (!inode)goto close_f12;error = get_unused_fd();if (error < 0)goto close_f12_inode;i = error;error = get_unused_fd();if (error < 0)goto close_f12_inode_i;j = error;在文件系統系列中讀者已經看到,進程對每個已打開文件的操作都是通過一個file數據結構進行的,只有在由同一進程按相同模式重復打開同一文件時才共享同一個數據結構。一個管道實際上就是一個無形(只存在于內存中)的文件,對這個文件的操作要通過兩個已打開文件進行,分別代表該管道的兩端。雖然最初創建時一個管道的兩端都在同一進程中,但是在實際使用時卻總是分別在兩個不同的進程(通常是父子進程)中,所以,管道的兩端不能共享同一個file數據結構,而要為之各分配一個file數據結構。代碼匯總520行和524行調用get_empty_filp的目的就是為管道的兩端f1和f2各分配一個file數據結構。get_empty_filp的代碼以及file結構的定義可參看文件系統系列,這里不再重復。只是要指出,這個數據結構只是代表這個一個特定進程對某個文件操作的現狀,而并不代表這個文件本身的狀態。例如,結構中的成分f_pos就表示該進程在此文件中即將進行讀寫的起始位置,當不同的進程分別打開同一文件進行讀寫時,最初此位置都是0,以后就可能各不相同了。
同時,每個文件都是由一個inode數據結構代表的。雖然一個管道實際上是一個無形的文件,它也得要有一個inode數據結構。由于這個文件在創建管道之前并不存在,所以需要在創建管道時臨時創建其一個inode結構,這就是代碼中528行調用get_pipe_inode的目的。實際上,創建一個管道的過程主要就是創建這么一個文件的過程,函數get_pipe_inode的代碼如下:
sys_pipe=>do_pipe=>get_pipe_inode
static struct inode * get_pipe_inode(void) {struct inode *inode = get_empty_inode();if (!inode)goto fail_inode;if(!pipe_new(inode))goto fail_iput;PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 1;inode->i_fop = &rdwr_pipe_fops;inode->i_sb = pipe_mnt->mnt_sb;/** Mark the inode dirty from the very beginning,* that way it will never be moved to the dirty* list because "mark_inode_dirty()" will think* that it already _is_ on the dirty list.*/inode->i_state = I_DIRTY;inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR;inode->i_uid = current->fsuid;inode->i_gid = current->fsgid;inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;inode->i_blksize = PAGE_SIZE;return inode;fail_iput:iput(inode); fail_inode:return NULL; }先在478行分配一個空閑的inode數據結構。這是一個比較復雜的數據結構,我們已在文件系統中加以詳細介紹。對于管道的創建和使用,我們關系的只是其中少數幾個成分。第一個成分i_pipe是指向一個pipe_inode_info數據結構的指針,只有當由一個inode數據結構所代表的文件時用來實現一個管道的時候才使用它,否則就把這個指針設為NULL。pipe_inode_info的數據結構定義如下:
struct pipe_inode_info {wait_queue_head_t wait;char *base;unsigned int start;unsigned int readers;unsigned int writers;unsigned int waiting_readers;unsigned int waiting_writers;unsigned int r_counter;unsigned int w_counter; };同一文件中還定義了一些宏操作,下面我們就要用這些宏定義。
/* Differs from PIPE_BUF in that PIPE_SIZE is the length of the actualmemory allocation, whereas PIPE_BUF makes atomicity guarantees. */ #define PIPE_SIZE PAGE_SIZE#define PIPE_SEM(inode) (&(inode).i_sem) #define PIPE_WAIT(inode) (&(inode).i_pipe->wait) #define PIPE_BASE(inode) ((inode).i_pipe->base) #define PIPE_START(inode) ((inode).i_pipe->start) #define PIPE_LEN(inode) ((inode).i_size) #define PIPE_READERS(inode) ((inode).i_pipe->readers) #define PIPE_WRITERS(inode) ((inode).i_pipe->writers) #define PIPE_WAITING_READERS(inode) ((inode).i_pipe->waiting_readers) #define PIPE_WAITING_WRITERS(inode) ((inode).i_pipe->waiting_writers) #define PIPE_RCOUNTER(inode) ((inode).i_pipe->r_counter) #define PIPE_WCOUNTER(inode) ((inode).i_pipe->w_counter)#define PIPE_EMPTY(inode) (PIPE_LEN(inode) == 0) #define PIPE_FULL(inode) (PIPE_LEN(inode) == PIPE_SIZE) #define PIPE_FREE(inode) (PIPE_SIZE - PIPE_LEN(inode)) #define PIPE_END(inode) ((PIPE_START(inode) + PIPE_LEN(inode)) & (PIPE_SIZE-1)) #define PIPE_MAX_RCHUNK(inode) (PIPE_SIZE - PIPE_START(inode)) #define PIPE_MAX_WCHUNK(inode) (PIPE_SIZE - PIPE_END(inode))前面get_pipe_inode的代碼中就引用了PIPE_READERS和PIPE_WRITERS。分配了inode數據結構以后,483行又通過pipe_new分配所需的緩沖區。這個函數的代碼定義如下:
sys_pipe=>do_pipe=>get_pipe_inode=>pipe_new
struct inode* pipe_new(struct inode* inode) {unsigned long page;page = __get_free_page(GFP_USER);if (!page)return NULL;inode->i_pipe = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL);if (!inode->i_pipe)goto fail_page;init_waitqueue_head(PIPE_WAIT(*inode));PIPE_BASE(*inode) = (char*) page;PIPE_START(*inode) = PIPE_LEN(*inode) = 0;PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0;PIPE_WAITING_READERS(*inode) = PIPE_WAITING_WRITERS(*inode) = 0;PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1;return inode; fail_page:free_page(page);return NULL; }這里先分配一個內存頁面用作管道的緩沖區,再分配一個緩沖區用作pipe_inode_info數據結構。前面講過,用來實現管道的文件是無形的,它并不出現在磁盤或其他的文件系統存儲介質上,而只存在于內存空間,其他進程也無從打開或訪問這個文件。所以,這個所謂文件實質上只是一個用作緩沖區的內存頁面,只是把它納入了文件系統的機制,借用了文件系統的各種數據結構和操作加以管理而已。
在前一章中已經講過,inode數據結構中有個重要的成分i_fop,是指向一個file_operations數據結構的指針。在這個數據結構中給出了用于該文件的每種操作的函數指針。對于管道(見上面get_pipe_inode中的486行),這個數據結構是rdwr_pipe_fops,也是在pipe.c中定義的:
struct file_operations rdwr_pipe_fops = {llseek: pipe_lseek,read: pipe_read,write: pipe_write,poll: pipe_poll,ioctl: pipe_ioctl,open: pipe_rdwr_open,release: pipe_rdwr_release, };結合上列的宏定義,讀者應該不難理解get_pipe_inode中自分配了inode結構以后的一些初始化操作,我們就不多講了。值得注意的是,代碼匯總并沒有設置inode結構中的inode_operations結構指針i_op,所以該指針為0。可見,對于用來實現管道的inode并不允許對這里的inode進行常規操作,只有當inode代表著有形的文件時才使用。
從get_pipe_inode返回到do_pipe中時,必須的數據結構都已經齊全了。但是,還要為代表著管道兩端的兩個已打開文件分別分配打開文件號,這是通過調用get_unused_fd完成的。
讓我們在do_pipe中繼續往下看:
sys_pipe=>do_pipe
error = -ENOMEM;sprintf(name, "[%lu]", inode->i_ino);this.name = name;this.len = strlen(name);this.hash = inode->i_ino; /* will go */dentry = d_alloc(pipe_mnt->mnt_sb->s_root, &this);if (!dentry)goto close_f12_inode_i_j;dentry->d_op = &pipefs_dentry_operations;d_add(dentry, inode);f1->f_vfsmnt = f2->f_vfsmnt = mntget(mntget(pipe_mnt));f1->f_dentry = f2->f_dentry = dget(dentry);/* read file */f1->f_pos = f2->f_pos = 0;f1->f_flags = O_RDONLY;f1->f_op = &read_pipe_fops;f1->f_mode = 1;f1->f_version = 0;/* write file */f2->f_flags = O_WRONLY;f2->f_op = &write_pipe_fops;f2->f_mode = 2;f2->f_version = 0;fd_install(i, f1);fd_install(j, f2);fd[0] = i;fd[1] = j;return 0;在正常的情況下,每個文件都至少有一個目錄項,代表這個文件的一個路徑名;而每個目錄項則只描述一個文件,在dentry數據結構中有個指針指向相應的inode結構。因此,在file數據結構中有個指針f_dentry指向所打開文件的目錄項dentry數據結構,這樣,從file結構開始就可以一路通到文件的inode結構。對于管道來說,由于文件是無形的,本來并不非得有個目錄項不可。可是,在file數據結構中并沒有直接指向相應inode結構的指針,一定要經過一個目錄項中轉一下才行。而inode結構又是各種文件操作的樞紐。這么一來,對于管道就也得有一個目錄項了。所以代碼中的547行調用d_alloc分配,然后通過d_add使已經分配的inode結構與這個目錄項互相掛上鉤,并且讓兩個已打開文件結構中的f_dentry指針都指向這個目錄項。
對目錄項的操作時通過一個dentry_operations數據結構定義的。具體到管道文件的目錄項,這個數據結構是pipefs_dentry_operations,這是在550行中設置的,定義如下:
static struct dentry_operations pipefs_dentry_operations = {d_delete: pipefs_delete_dentry, };就是說,對于管道的目錄項只允許一種操作,那就是pipefs_delete_dentry,即把它刪除。
對于管道的兩端來說,管道是單向的,所以其f1一端設置成只讀(O_RDONLY)。同時,兩端的文件操作也分別設置成read_pipe_fops和write_pipe_fops,定義如下:
struct file_operations read_pipe_fops = {llseek: pipe_lseek,read: pipe_read,write: bad_pipe_w,poll: pipe_poll,ioctl: pipe_ioctl,open: pipe_read_open,release: pipe_read_release, };struct file_operations write_pipe_fops = {llseek: pipe_lseek,read: bad_pipe_r,write: pipe_write,poll: pipe_poll,ioctl: pipe_ioctl,open: pipe_write_open,release: pipe_write_release, };比較一下,就可以發現,在read_pipe_fops中的寫操作函數為bad_pipe_w,而在write_pipe_fops中的讀操作函數為bad_pipe_r,分別用來返回一個出錯代碼。讀者可能會問,前面在管道的inode數據結構中將指針i_fop設置成指向rdwr_pipe_fops,那顯然是雙向的,這里不是有矛盾嗎?其實不然。對于代表著管道兩端的兩個已打開文件來說,一個只能寫而另一個只能讀,這是事情的一個方面。可是,另一方面,這兩個邏輯上的已打開文件都通向同一個inode、同一個物理上存在的文件,即用作管道的緩沖區;顯然這個緩沖區應該既支持寫又支持讀,這才能使數據流通。讀者在文件系統系列博客中看到,file結構中的指針f_op一般都來自inode結構中的指針i_fop,都指向同一個file_operations結構。而這里,對于管道這么一種特殊的文件,則使管道兩端的file結構各自指向不同的file_operations結構,以此來確保一端只能讀而另一端只能寫。
管道時一種特殊的文件,它并不屬于某種特定的文件系統(如ext2),而是自己構成一種獨立的文件系統,也有自身的數據結構pipe_fs_type:
static DECLARE_FSTYPE(pipe_fs_type, "pipefs", pipefs_read_super,FS_NOMOUNT|FS_SINGLE);系統在初始化時通過kern_mount安裝這個特殊的文件系統,并讓一個指針pipe_mnt指向安裝時的vfsmount數據結構:
static struct vfsmount *pipe_mnt;現在,代表著管道兩端的兩個文件既然都屬于這個文件系統,它們各自的file結構中的指針f_vfsmnt就要指向安裝該文件系統的vfsmount數據結構,而這個數據結構也就多了兩個使用者,所以要調用mntget兩次(見522行),使其使用計數加2。
最后,do_pipe中的568行和569行兩個已打開文件結構跟分配的打開文件號掛上鉤來(注意,打開文件號只在一個進程的范圍內有效);并且將兩個打開文件號填入數組fd中,使得fd[0]為管道讀出端的打開文件號,而fd[1]為寫入端的打開文件號。這個數組隨后在sys_pipe中被復制到當前進程的用戶空間。
顯然,管道的兩端在創建之初都在同一進程中,這樣是起不到進程間通信的作用的。那么,怎樣才能將管道用于進程間通信呢?下面就是一個典型的過程。
下面我們看一個實例:
#include<stdio.h> int main() {int child_B, child_C;int pipefds[2];/*pipefds[0] for read, pipefds[1] for write*/char * args1[] = {"/bin/wc", NULL};char * args2[] = {"/usr/bin/ls", "-l", NULL};/*process A*/pipe(pipefds);if (!(child_B = fork())) /*fork process B*/{/* Process B*/close(pipefds[1]);/* close the write end*//* redirect stdin*/close(0);dup2(pipefds[0], 0);close(pipefds[0]);/*exec the target*/execve("/usr/bin/wc", args1, NULL); /*no return if success*/printf("pid %d: I am back, something is wrong!\n", getpid());}/* process A continue*/close(pipefds[0]);/*close the read end */if (!(child_C = fork())) /*fork process C*/{/*process C*//*redirect stdout*/close(1);dup2(pipefds[1], 1);close(pipefds[1]);/*exec the target*/execve("/bin/ls", args2, NULL); /* no return if success*/printf("pid :%d: I am back, something is wrong!\n", getpid());}/*process A continue*/close(pipefds[1]);/*close the write end */wait4(child_B, NULL, 0, NULL); /* wait for process B to finish*/printf("Done!\n");return 0; }結果如下(ubuntu中運行):
程序中調用的dup2是個系統調用,它將一個已打開文件的file結構指針復制到由指定的打開文件號所對應的通道。在進程B中,先把打開文件號0(即標準輸入)關閉,然后把管道的讀端復制到文件號0所對應的通道,這就完成了對標準輸入的重定向。但是原先的管道讀端既然已經復制就沒用用處了,所以也將其關閉。進程C的重定向與此相似,只不過所重定向的是標準輸出、除此之蛙,就與前面所述的過程完全一樣了。
從進程間通信的角度來說,這種標準輸入和標準輸出的重定向并非必須,直接使用管道原先的寫端和讀端也能在進程之間傳遞數據。但是,應該承認這種將標準輸入輸出重定向與管道結合使用的辦法非常巧妙的,不這樣就難以達到將可執行程序在啟動執行時動態地加以組合的靈活性。另一方面,一旦將標準輸入和標注輸出分別重定向到管道的讀端和寫端以后,兩個進程就都像對普通文件一樣地讀寫。事實上,它們并不知道自己在讀寫的到底是一個文件、一個設備、還是一個管道?
但是,我們知道當讀一個文件到達末尾的時候會碰到EOF,從而知道已經讀完了,可是當從管道中讀時應該讀到什么時候為止呢?下面讀者將會看到,向管道寫入的進程在完成其使命以后就會關閉管道的寫端,一旦管道沒有了寫端就相當于達到了文件的末尾。
從管道所傳遞數據的角度看,兩端的兩個進程之間是一種典型的生產者消費者關系。一旦生產者停止了生產并關閉了管道的寫入端,消費者就沒有東西可消費了,這時候就到了文件(管道)的末尾。在典型的情況下,生產者總是在完成了使命,調用exit之前關閉其所有的已打開文件,包括管道,而消費者則總是在得知已經到達了輸入文件末尾時才調用exit,所以一般總是生產者調用exit之前,消費者調用exit在后。但是,在特殊的條件下,也會有消費者exit在前的情況發生(里偶然消費者進程發生了非法越界訪問),而使得管道的讀端關閉在前。在這種情況下內核會向生產者進程發出一個SIGPIPE信號,表示管道已經斷裂,而生產者進程在接收到這種信號時通常就會調用exit。
下面,我們進一步看看對管道的關閉以及讀、寫操作的源代碼,以加深對管道機制的了解和理解。
先看管道的關閉。當一個進程通過系統調用close關閉代表著管道一段的已打開文件時,在內核中經由如下的執行路線而達到fput:
sys_close=>filp_close=>fput
關于這條執行路線的詳情可參閱文件系統系列博客。每當對一個已打開文件執行關閉操作時,在fput中將相應file數據結構中的共享計數減1。如果減1以后該共享計數變成了0,就進而通過具體文件系統提供的release操作,釋放對dentry數據結構的使用,并釋放file數據結構。
在最初打開一個文件時,內核為之分配一個file數據結構,并將共享計數設置成1.那么,在什么情況下這個共享計數會變成大于1,從而使得在一次調用fput后共享計數不變成0呢?
第一種情況是在fork一個進程的時候,讀者在前面的博客中看到過do_fork中要調用一個函數copy_files;里面有個for循環,對所有已打開文件的file結構調用get_file遞增其共享計數。所有,在fork出來一個子進程以后,若父進程先關閉一個已打開文件,便只會使其共享計數減1,而并不會使計數達到0,因而也就不會最終地關文件。
第二種情況是通過系統調用dup或dup2復制一個打開文件號。這與將同一個文件再打開一次是不同的,它只是使一個已經存在的file數據結構和本進程的另一個打開文件號建立聯系而已。因此,前面所舉的例子中將標準輸入重定向到一個管道時,先是dup2然后close,并不會使其file結構中的共享計數變成0。
也就是說,只有在一個file結構的最后一個用戶通向該結構的最后一條通路也被關閉時,才會調用具體文件系統提供的release操作并最終釋放file數據結構。
函數fput所處理的對象是所關閉的文件相聯系的dentry等數據結構。在do_pipe的代碼中,我們已經看到管道兩端的文件操作結構(file_operations結構)被分別設置成read_pipe_fops和write_pipe_ops。兩個數據結構中對應于release的函數指針分別為pipe_read_release和pipe_write_release。所以,在fput采用這些指針來調用相應的函數時就會執行pipe_read_release或pipe_write_release、這兩個函數都是通向pipe_release的中轉站,或者說是pipe_release的外層,繼續沿著pipe.c往下看:
sys_close=>filp_close=>fput=>pipe_read_release
static int pipe_read_release(struct inode *inode, struct file *filp) {return pipe_release(inode, 1, 0); }static int pipe_write_release(struct inode *inode, struct file *filp) {return pipe_release(inode, 0, 1); }其主體為函數pipe_release,源代碼在pipe.c中。
結合這兩個函數以及前面所列的pipe_fs_i.h中的一些宏定義,pipe_release的代碼就不難讀懂了。
sys_close=>filp_close=>fput=>pipe_read_release=>pipe_release
static int pipe_release(struct inode *inode, int decr, int decw) {down(PIPE_SEM(*inode));PIPE_READERS(*inode) -= decr;PIPE_WRITERS(*inode) -= decw;if (!PIPE_READERS(*inode) && !PIPE_WRITERS(*inode)) {struct pipe_inode_info *info = inode->i_pipe;inode->i_pipe = NULL;free_page((unsigned long) info->base);kfree(info);} else {wake_up_interruptible(PIPE_WAIT(*inode));}up(PIPE_SEM(*inode));return 0; }就像在file結構中有個共享計數一樣,在由inode->i_pipe所指向的pipe_inode_info結構中也有共享計數,而且有兩個,一個是readership,一個是writers。這兩個共享計數在創建管道時在get_pipe_inode中都被設置成1(見pipe.c:get_pipe_inode中的485行)。然后,當關閉管道的讀端是,pipe_read_release調用pipe_release,使共享計數readers減1;而關閉管道的寫端時則使writers減1.當二者都減到了0時,整個管道就完成了使命,此時應將用作緩沖區的存儲頁面以及pipe_inode_info數據結構釋放。在常規的文件操作中,文件的inode存在于磁盤(或其他介質)上,所以在最后關閉時要將內存中的inode數據結構寫回到磁盤上。但是,管道并不是常規意義上的文件,磁盤上并沒有相應的索引節點,所以最后只是將分配的inode數據結構(以及dentry結構)釋放了事,而并沒有什么磁盤操作。這一點從用于管道的inode數據結構中的inode_operations結構指針為0可以看出。
再看管道的讀操作。在典型的應用中,管道的讀端總是處于一個循環中,通過系統調用read從管道中讀,讀了就處理,處理完又讀。對管道的讀操作,在內核中經過sys_read和數據結構read_pipe_fops中的函數指針而到達pipe_read。這個函數的代碼在pipe.c中,讓我我們逐段地往下看:
sys_read=>pipe_read
static ssize_t pipe_read(struct file *filp, char *buf, size_t count, loff_t *ppos) {struct inode *inode = filp->f_dentry->d_inode;ssize_t size, read, ret;/* Seeks are not allowed on pipes. */ret = -ESPIPE;read = 0;if (ppos != &filp->f_pos)goto out_nolock;這里44行的注釋解說seek操作在管道上是不允許的,這當然是對的,事實上函數pipe_lseek只是返回一個出錯代碼。注意47行的檢驗所針對的只是參數ppos,那是個指針,必須指向file->f_pos本身。沿著pipe.c再往下看:
sys_read=>pipe_read
/* Always return 0 on null read. */ret = 0;if (count == 0)goto out_nolock;/* Get the pipe semaphore */ret = -ERESTARTSYS;if (down_interruptible(PIPE_SEM(*inode)))goto out_nolock;if (PIPE_EMPTY(*inode)) { do_more_read:ret = 0;if (!PIPE_WRITERS(*inode))goto out;ret = -EAGAIN;if (filp->f_flags & O_NONBLOCK)goto out;for (;;) {PIPE_WAITING_READERS(*inode)++;pipe_wait(inode);PIPE_WAITING_READERS(*inode)--;ret = -ERESTARTSYS;if (signal_pending(current))goto out;ret = 0;if (!PIPE_EMPTY(*inode))break;if (!PIPE_WRITERS(*inode))goto out;}}如果讀的時候管道里已經有數據在緩沖區中,這一段程序就會被跳過了。可是,如果管道緩沖區中沒有數據,那一版就要睡眠等待了,但是有兩種例外的情況。第一種情況是管道的writers計數已經為0,也就是說已經沒有生產者會向管道中寫了,這時候當然不能再等待。第二種情況是管道創建時設置了標志位O_NONBLOCK,表示在讀不到東西時,當前進程不應該被阻塞(也就是在睡眠中等待),而要立即返回。只要不屬于這兩種特殊情況,那就要通過pipe_wait在睡眠中等待了。函數pipe_wait的代碼也在同一文件中:
sys_read=>pipe_read=>pipe_wait
/** We use a start+len construction, which provides full use of the * allocated memory.* -- Florian Coosmann (FGC)* * Reads with count = 0 should always return 0.* -- Julian Bradfield 1999-06-07.*//* Drop the inode semaphore and wait for a pipe event, atomically */ void pipe_wait(struct inode * inode) {DECLARE_WAITQUEUE(wait, current);current->state = TASK_INTERRUPTIBLE;add_wait_queue(PIPE_WAIT(*inode), &wait);up(PIPE_SEM(*inode));schedule();remove_wait_queue(PIPE_WAIT(*inode), &wait);current->state = TASK_RUNNING;down(PIPE_SEM(*inode)); }注意,與這里的up操作配對的是down_interruptible,是在pipe_read代碼中的57行,一個在for循環外面,一個在for循環里面。實際上,pipe_read中的臨界區時從57行至127行(見下面的代碼),但是睡眠時必須要退出臨界區,而到被喚醒后再進入臨界區。為什么要把pipe_wait放在一個循環中呢?這是因為睡眠中的進程被喚醒的原因不一定就是有進程往管道中寫,也可能是收到了信號。而且,即使是因為有進程往管道中寫而喚醒,也不能保證每個被喚醒的進程都能讀到數據,因為等待著從管道中讀數據的進程可能不止一個。因此,要將睡眠等待的過程放在一個循環中,并且在喚醒以后還要再檢驗所等待的條件是否得到滿足,以及是否發生了例外的情況。對于在生產者、消費者模型中消費者一方的等待過程,這是一種典型的設計。在正常的情況下,這個循環一般都是因為管道中有了數據而結束(見78和79行),于是具體從管道中讀取數據的操作就開始了:
sys_read=>pipe_read
/* Read what data is available. */ret = -EFAULT;while (count > 0 && (size = PIPE_LEN(*inode))) {char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode);ssize_t chars = PIPE_MAX_RCHUNK(*inode);if (chars > count)chars = count;if (chars > size)chars = size;if (copy_to_user(buf, pipebuf, chars))goto out;read += chars;PIPE_START(*inode) += chars;PIPE_START(*inode) &= (PIPE_SIZE - 1);PIPE_LEN(*inode) -= chars;count -= chars;buf += chars;}/* Cache behaviour optimization */if (!PIPE_LEN(*inode))PIPE_START(*inode) = 0;if (count && PIPE_WAITING_WRITERS(*inode) && !(filp->f_flags & O_NONBLOCK)) {/** We know that we are going to sleep: signal* writers synchronously that there is more* room.*/wake_up_interruptible_sync(PIPE_WAIT(*inode));if (!PIPE_EMPTY(*inode))BUG();goto do_more_read;}/* Signal writers asynchronously that there is more room. */wake_up_interruptible(PIPE_WAIT(*inode));ret = read; out:up(PIPE_SEM(*inode)); out_nolock:if (read)ret = read;return ret; }每個管道只有一個頁面用作緩沖區,該頁面時按喚醒緩沖區的方式來使用的。就是說,每當讀寫到了頁面的末端就又要回到頁面的始端(見下圖),這樣,管道中的數據就有可能要分兩段讀出,所以要由一個循環來完成。
?結合本博客前面所列的宏定義,這段代碼應該是不難理解的。循環結束以后的情況有以下幾種可能:
在前兩種情況下,管道中的數據都已讀完,但指示著下一次讀寫的起始點start,在不同的條件下有可能在頁面中的任何位置上。可是,既然管道中已經空了,那就不如把起始點start設置到頁面的開頭,這樣可以減少下一次讀寫必須分成兩段進行的可能性,這就是108行和109行所做優化的目的。
由于管道的換乘區只限于一個頁面,當生產者進程有大量數據要寫時,每當寫滿了一個頁面(分一段或兩段)就得停下來睡眠等待,等到消費者進程從管道中讀走了一些數據而騰出一些空間時才能繼續。所以,消費者進程在讀出了一些數據以后要喚醒可能正在睡眠中的生產者進程。最后,只要讀出的長度不為0,就要更新inode的受訪問時間印記。
所以這些操作,包括從管道中讀出,復制到用戶空間,更新inode的受訪問時間印記等等,都是不能容許其他進程打擾的,所以都是放在臨界區中進行。而57行處的down_interruptible和127行處的up正是界定了這樣一個臨界區。
與讀操作相似,對管道的寫操作也是在sys_write中通過file結構中的指針f_op找到file_operations數據結構write_pipe_fops,再通過其函數指針write調用pipe_write。這個函數也是在pipe.c中定義的,我們還是逐段來解讀:
sys_write->pipe_write
static ssize_t pipe_write(struct file *filp, const char *buf, size_t count, loff_t *ppos) {struct inode *inode = filp->f_dentry->d_inode;ssize_t free, written, ret;/* Seeks are not allowed on pipes. */ret = -ESPIPE;written = 0;if (ppos != &filp->f_pos)goto out_nolock;/* Null write succeeds. */ret = 0;if (count == 0)goto out_nolock;顯然,這一段與pipe_read的開頭一段完全相同。繼續往下讀:
sys_write->pipe_write
ret = -ERESTARTSYS;if (down_interruptible(PIPE_SEM(*inode)))goto out_nolock;/* No readers yields SIGPIPE. */if (!PIPE_READERS(*inode))goto sigpipe;/* If count <= PIPE_BUF, we have to make it atomic. */free = (count <= PIPE_BUF ? count : 1);/* Wait, or check for, available space. */if (filp->f_flags & O_NONBLOCK) {ret = -EAGAIN;if (PIPE_FREE(*inode) < free)goto out;} else {while (PIPE_FREE(*inode) < free) {PIPE_WAITING_WRITERS(*inode)++;pipe_wait(inode);PIPE_WAITING_WRITERS(*inode)--;ret = -ERESTARTSYS;if (signal_pending(current))goto out;if (!PIPE_READERS(*inode))goto sigpipe;}}如果管道的讀端已經全部關閉,那就表示已經沒有消費者了。既然沒有了消費者,那么生產者的存在以及繼續生產就失去意義了;所以此時轉到標號sigpipe處,向當前進程發送一個SIGPIPE信號,表示管道已經斷裂:
sys_write->pipe_write
sigpipe:if (written)goto out;up(PIPE_SEM(*inode));send_sig(SIGPIPE, current, 0);return -EPIPE; }一般而言,進程在接收到SIGPIPE信號時會調用do_exit來結束其生命。讀者也許注意到,這里其實是當前進程自己向自己發SIGPIPE信號。man么為何不直接調用do_exit呢?這里有兩方面的考慮:一方面是使程序的結構更好,更整齊劃一;另一方面也為進程通過信號機制來改變其在接收到SIGPIPE信號時的行為提供了更多的靈活性的可能性。
160行中的常數PIPE_BUF定義為4096.當要求寫入的長度不超過這個數值時,內核保證寫入操作的原子性,也就是說,一定要到管道緩沖區中足夠容納這塊數據時才開始寫。如果超過整個數值,就不保證其原子性了,這時候有多大空間就寫多少字節,有一個字節的空間就寫一個字節,余下的等消費者獨奏一些字節以后以后再繼續寫。這就是第160行將變量free設置成count或者1的意義。注意變量free表示開始寫入前緩沖區中至少要有這么多個空閑的字節,否則就要睡眠等待;所以只是在決定等待與否時使用,而一旦開始寫入就不再使用了。讀者可以對照前面pipe_read中的代碼自行閱讀這里的162-179行,應該不會有困難。
一旦生產者進程在等待中被消費者進程喚醒,并且緩沖區中有了足夠的空間,或者一開始時緩沖區中就有足夠的空間,具體的寫入操作就開始了:
sys_write->pipe_write
/* Copy into available space. */ret = -EFAULT;while (count > 0) {int space;char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode);ssize_t chars = PIPE_MAX_WCHUNK(*inode);if ((space = PIPE_FREE(*inode)) != 0) {if (chars > count)chars = count;if (chars > space)chars = space;if (copy_from_user(pipebuf, buf, chars))goto out;written += chars;PIPE_LEN(*inode) += chars;count -= chars;buf += chars;space = PIPE_FREE(*inode);continue;}ret = written;if (filp->f_flags & O_NONBLOCK)break;do {/** Synchronous wake-up: it knows that this process* is going to give up this CPU, so it doesnt have* to do idle reschedules.*/wake_up_interruptible_sync(PIPE_WAIT(*inode));PIPE_WAITING_WRITERS(*inode)++;pipe_wait(inode);PIPE_WAITING_WRITERS(*inode)--;if (signal_pending(current))goto out;if (!PIPE_READERS(*inode))goto sigpipe;} while (!PIPE_FREE(*inode));ret = -EFAULT;}/* Signal readers asynchronously that there is more data. */wake_up_interruptible(PIPE_WAIT(*inode));inode->i_ctime = inode->i_mtime = CURRENT_TIME;mark_inode_dirty(inode);out:up(PIPE_SEM(*inode)); out_nolock:if (written)ret = written;return ret;...... }首先,對照pipe_read中分兩段讀的情況,即使要求寫入的長度小于PIPE_BUF時,也可能會要分兩段來寫,所以整個寫入的過程也放在一個while循環中。另外,要求寫入的長度大于PIPE_BUF時,還要分成幾次來寫,也就是寫寫入若干字節,然后睡眠等待消費者從緩沖區中讀走一些字節而創造出一些空間,再繼續寫。這就是為什么要有209-223行的do-while循環的原因。這個循環與前面的睡眠等待循環略有不同,這就是當前進程被喚醒時,只檢查緩沖區中是否有空間,而不問空間多大。為什么呢?因為此時的宗旨是有一個字節的空間就寫一個字節,而既然消費者進程已經讀走了若干字節,那么至少已經有一個字節的空間,可以進入while循環體的下一次循環了。對照pipe_read的代碼,讀者應該可以讀懂上面這段代碼而不會有太大的困難,我們把它留給讀者作為練習。建議讀者假設幾種不同的數據長度來走過這段程序,并且在紙上記下幾種不同情況下的執行路線。閱讀時要注意202行的continue語句,當要求寫入的數據長度不大于PIPE_BUF但需要分兩段(不是兩次)寫入時,它使執行路線跳過后面的do-while循環。同時,還要注意185行中的宏定義PIPE_END,它使寫入的位置pipe_buf回到頁面的起點。
這樣,在典型的情景下,生產者和消費者之間互相等待,互相喚醒,協調地向前發展,也就是說:
- 對生產者而言,緩沖區有空間就往里寫,并且喚醒可能正在等待著要從緩沖區中讀數據的小費制;沒有空間就要睡眠,等待消費者從緩沖區讀走數據而騰出空間。
 - 對小輔助而言,緩沖區中有數據就讀出,然后喚醒可能正在等待著要往緩沖區寫的生產者。如果沒有數據就睡眠,等待生產者往緩沖區中寫數據。
 
一句話,管道兩端的進程通過管道所形成的是典型的生產者消費者關系和運行模式。
總結
以上是生活随笔為你收集整理的管道与系统调用pipe的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 苹果六电池_苹果新产品发布,这次加量不加
 - 下一篇: 时光轴一之listView实现时光轴效果