Linux的进程间通信-消息队列
生活随笔
收集整理的這篇文章主要介紹了
Linux的进程间通信-消息队列
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Linux的進程間通信-消息隊列
微博ID:orroz
微信公眾號:Linux系統技術
前言
Linux系統給我們提供了一種可以發送格式化數據流的通信手段,這就是消息隊列。使用消息隊列無疑在某些場景的應用下可以大大減少工作量,相同的工作如果使用共享內存,除了需要自己手工構造一個可能不夠高效的隊列外,我們還要自己處理競爭條件和臨界區代碼。而內核給我們提供的消息隊列,無疑大大方便了我們的工作。
Linux環境提供了XSI和POSIX兩套消息隊列,本文將幫助您掌握以下內容:
如何使用XSI消息隊列。
如何使用POSIX消息隊列。
它們的底層實現分別是什么樣子的?
它們分別有什么特點?以及相關資源限制。
請任意打賞,多謝多謝!
XSI消息隊列
系統提供了四個方法來操作XSI消息隊列,它們分別是:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
我們可以使用msgget去創建或訪問一個消息隊列,與其他XSI IPC一樣,msgget使用一個key作為創建消息隊列的標識。這個key可以通過ftok生成或者指定為IPC_PRIVATE。指定為IPC_PRIVATE時,此隊列會新建出來,而且內核會保證新建的隊列key不會與已經存在的隊列沖突,所以此時后面的msgflag應指定為IPC_CREAT。當msgflag指定為IPC_CREAT時,msgget會去試圖創建一個新的消息隊列,除非指定key的消息隊列已經存在??梢允褂肙_CREAT | O_EXCL在指定key已經存在的情況下報錯,而不是訪問這個消息隊列。我們來看創建一個消息隊列的例子:
[zorro@zorro-pc mqueue]$ cat msg_create.c
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#define FILEPATH "/etc/passwd"
#define PROJID 1234
int main()
{
? ? int msgid;
? ? key_t key;
? ? struct msqid_ds msg_buf;
? ? key = ftok(FILEPATH, PROJID);
? ? if (key == -1) {
? ? ? ? perror("ftok()");
? ? ? ? exit(1);
? ? }
? ? msgid = msgget(key, IPC_CREAT|IPC_EXCL|0600);
? ? if (msgid == -1) {
? ? ? ? perror("msgget()");
? ? ? ? exit(1);
? ? }
? ? if (msgctl(msgid, IPC_STAT, &msg_buf) == -1) {
? ? ? ? perror("msgctl()");
? ? ? ? exit(1);
? ? }
? ? printf("msgid: %d\n", msgid);
? ? printf("msg_perm.uid: %d\n", msg_buf.msg_perm.uid);
? ? printf("msg_perm.gid: %d\n", msg_buf.msg_perm.gid);
? ? printf("msg_stime: %d\n", msg_buf.msg_stime);
? ? printf("msg_rtime: %d\n", msg_buf.msg_rtime);
? ? printf("msg_qnum: %d\n", msg_buf.msg_qnum);
? ? printf("msg_qbytes: %d\n", msg_buf.msg_qbytes);
}
這個程序可以創建并查看一個消息隊列的相關狀態,執行結果:
[zorro@zorro-pc mqueue]$ ./msg_create?
msgid: 0
msg_perm.uid: 1000
msg_perm.gid: 1000
msg_stime: 0
msg_rtime: 0
msg_qnum: 0
msg_qbytes: 16384
如果我們在次執行這個程序,就會報錯,因為key沒有變化,我們使用了IPC_CREAT|IPC_EXCL,所以相關隊列已經存在了就會報錯:
[zorro@zorro-pc mqueue]$ ./msg_create?
msgget(): File exists
順便看一下msgctl方法,我們可以用它來取一個消息隊列的相關狀態。更詳細的信息可以man 2 msgctl查看。除了查看隊列狀態以外,還可以使用msgctl設置相關隊列狀態以及刪除指定隊列。另外我們還可以使用ipcs -q命令查看系統中XSI消息隊列的相關狀態。其他相關參數請參考man ipcs。
使用msgsnd和msgrcv向隊列發送和從隊列接收消息。我們先來看看如何訪問一個已經存在的消息隊列和向其發送消息:
[zorro@zorro-pc mqueue]$ cat msg_send.c
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#define FILEPATH "/etc/passwd"
#define PROJID 1234
#define MSG "hello world!"
struct msgbuf {
? ? long mtype;
? ? char mtext[BUFSIZ];
};
int main()
{
? ? int msgid;
? ? key_t key;
? ? struct msgbuf buf;
? ? key = ftok(FILEPATH, PROJID);
? ? if (key == -1) {
? ? ? ? perror("ftok()");
? ? ? ? exit(1);
? ? }
? ? msgid = msgget(key, 0);
? ? if (msgid == -1) {
? ? ? ? perror("msgget()");
? ? ? ? exit(1);
? ? }
? ? buf.mtype = 1;
? ? strncpy(buf.mtext, MSG, strlen(MSG));
? ? if (msgsnd(msgid, &buf, strlen(buf.mtext), 0) == -1) {
? ? ? ? perror("msgsnd()");
? ? ? ? exit(1);
? ? }
}
使用msgget訪問一個已經存在的消息隊列時,msgflag指定為0即可。使用msgsnd發送消息時主要需要注意的是它的第二個和第三個參數。第二個參數用來指定要發送的消息,它實際上應該是一個指向某個特殊結構的指針,這個結構可以定義如下:
struct msgbuf {
? ? long mtype;
? ? char mtext[BUFSIZ];
};
這個結構的mtype實際上是用來指定消息類型的,可以指定的數字必需是個正整數。我們可以把這個概念理解為XSI消息隊列對消息優先級的實現方法,即:需要傳送的消息體的第一個long長度是用來指定類型的參數,而非消息本身,后面的內容才是消息。在我們實現的消息中,這個結構題可以傳送的最大消息長度為BUFSIZE的字節數。當然,如果你的消息并不是一個字符串,也可以將mtype后面的信息實現成各種需要的格式,比如想要發送一個人的名字和他的數學語文成績的話,可以這樣實現:
struct msgbuf {
? ? long mtype;
? ? char name[NAMESIZE];
? ? int math, chinese;
};
這實際上就是讓使用者自己去設計一個通訊協議,然后發送端和接收端使用約定好的協議進行通訊。msgsnd的第三個參數應該是這個消息結構體除了mtype以外的真實消息的長度,而不是這個結構題的總長度,這點是要注意的。所以,如果你定義了一個很復雜的消息協議的話,建議的長度寫法是這樣:
sizeof(buf)-sizeof(long)
msgsnd的最后一個參數可以用來指定IPC_NOWAIT。在消息隊列滿的情況下,默認的發送行為會阻塞等待,如果加了這個參數,則不會阻塞,而是立即返回,并且errno設置為EAGAIN。然后我們來看接收消息和刪除消息隊列的例子:
[zorro@zorro-pc mqueue]$ cat msg_receive.c
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#define FILEPATH "/etc/passwd"
#define PROJID 1234
struct msgbuf {
? ? long mtype;
? ? char mtext[BUFSIZ];
};
int main()
{
? ? int msgid;
? ? key_t key;
? ? struct msgbuf buf;
? ? key = ftok(FILEPATH, PROJID);
? ? if (key == -1) {
? ? ? ? perror("ftok()");
? ? ? ? exit(1);
? ? }
? ? msgid = msgget(key, 0);
? ? if (msgid == -1) {
? ? ? ? perror("msgget()");
? ? ? ? exit(1);
? ? }
? ? if (msgrcv(msgid, &buf, BUFSIZ, 1, 0) == -1) {
? ? ? ? perror("msgrcv()");
? ? ? ? exit(1);
? ? }
? ? printf("mtype: %d\n", buf.mtype);
? ? printf("mtype: %s\n", buf.mtext);
? ? if (msgctl(msgid, IPC_RMID, NULL) == -1) {
? ? ? ? perror("msgctl()");
? ? ? ? exit(1);
? ? }
? ? exit(0);
}
msgrcv會將消息從指定隊列中刪除,并將其內容填到其第二個參數指定的buf地址所在的內存中。第三個參數指定承接消息的buf長度,如果消息內容長度大于指定的長度,那么這個函數的行為將取決于最后一個參數msgflag是否設置了MSG_NOERROR,如果這個標志被設定,那消息將被截短,消息剩余部分將會丟失。如果沒設置這個標志,msgrcv會失敗返回,并且errno被設定為E2BIG。
第四個參數用來指定從消息隊列中要取的消息類型msgtyp,如果設置為0,則無論什么類型,取隊列中的第一個消息。如果值大于0,則讀取符合這個類型的第一個消息,當最后一個參數msgflag設置為MSG_EXCEPT的時候,是對消息類型取邏輯非。即,不等于這個消息類型的第一個消息會被讀取。如果指定一個小于0的值,那么將讀取消息類型比這個負數的絕對值小的類型的所有消息中的第一個。
最后一個參數msgflag還可以設置為:
IPC_NOWAIT:非阻塞方式讀取。當隊列為空的時候,msgrcv會阻塞等待。加這個標志后將直接返回,errno被設置為ENOMSG。
MSG_COPY:從Linux 3.8之后開始支持以消息位置的方式讀取消息。如果標志為置為MSG_COPY則表示啟用這個功能,此時msgtyp的含義將從類型變為位置偏移量,第一個消息的起始值為0。如果指定位置的消息不存在,則返回并設置errno為ENOMSG。并且MSG_COPY和MSG_EXCEPT不能同時設置。另外還要注意這個功能需要內核配置打開CONFIG_CHECKPOINT_RESTORE選項。這個選項默認應該是不開的。
使用msgctl刪除消息隊列的方法比較簡單,不在復述。另外關于msgctl的其他使用,請大家參考msgctl的手冊。這部分內容的另外一個權威參考資料就是《UNIX環境高級編程》。我們在這里補充一下Linux系統對XSI消息隊列的限制相關參數介紹:
/proc/sys/kernel/msgmax:這個文件限制了系統中單個消息最大的字節數。
/proc/sys/kernel/msgmni:這個文件限制了系統中可創建的最大消息隊列個數。
/proc/sys/kernel/msgmnb:這個文件用來限制單個消息隊列中可以存放的最大消息字節數。
以上文件都可以使用echo或者sysctl命令進行修改。
POSIX消息隊列
POSIX消息隊列是獨立于XSI消息隊列的一套新的消息隊列API,讓進程可以用消息的方式進行數據交換。這套消息隊列在Linux 2.6.6版本之后開始支持,還需要你的glibc版本必須高于2.3.4。它們使用如下方法進行操作和控制:
#include <fcntl.h> ? ? ? ? ? /* For O_* constants */
#include <sys/stat.h> ? ? ? ?/* For mode constants */
#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
類似對文件的open,我們可以用mq_open來打開一個已經創建的消息隊列或者創建一個消息隊列。這個函數返回一個叫做mqd_t類型的返回值,其本質上還是一個文件描述符,只是在這這里被叫做消息隊列描述符(message queue descriptor),在進程里使用這個描述符對消息隊列進程操作。所有被創建出來的消息隊列在系統中都有一個文件與之對應,這個文件名是通過name參數指定的,這里需要注意的是:name必須是一個以”/“開頭的字符串,比如我想讓消息隊列的名字叫”message”,那么name應該給的是”/message”。消息隊列創建完畢后,會在/dev/mqueue目錄下產生一個以name命名的文件,我們還可以通過cat這個文件來看這個消息隊列的一些狀態信息。其它進程在消息隊列已經存在的情況下就可以通過mp_open打開名為name的消息隊列來訪問它。
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);
在一個消息隊列創建完畢之后,我們可以使用mq_send來對消息隊列發送消息,mq_receive來對消息隊列接收消息。正常的發送消息一般不會阻塞,除非消息隊列處在某種異常狀態或者消息隊列已滿的時候,而消息隊列在空的時候,如果使用mq_receive去試圖接受消息的行為也會被阻塞,所以就有必要為兩個方法提供一個帶超時時間的版本。這里要注意的是msg_prio這個參數,是用來指定消息優先級的。每個消息都有一個優先級,取值范圍是0到sysconf(_SC_MQ_PRIO_MAX) - 1的大小。在Linux上,這個值為32768。默認情況下,消息隊列會先按照優先級進行排序,就是msg_prio這個值越大的越先出隊列。同一個優先級的消息按照fifo原則處理。在mq_receive方法中的msg_prio是一個指向int的地址,它并不是用來指定取的消息是哪個優先級的,而是會將相關消息的優先級取出來放到相關變量中,以便用戶自己處理優先級。
int mq_close(mqd_t mqdes);
我們可以使用mq_close來關閉一個消息隊列,這里的關閉并非刪除了相關文件,關閉之后消息隊列在系統中依然存在,我們依然可以繼續打開它使用。這跟文件的close和unlink的概念是類似的。
int mq_unlink(const char *name);
使用mq_unlink真正刪除一個消息隊列。另外,我們還可以使用mq_getattr和mq_setattr來查看和設置消息隊列的屬性,其函數原型為:
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
mq_attr結構體是這樣的結構:
struct mq_attr {
? ? long mq_flags; ? ? ? /* 只可以通過此參數將消息隊列設置為是否非阻塞O_NONBLOCK */
? ? long mq_maxmsg; ? ? ?/* 消息隊列的消息數上限 */
? ? long mq_msgsize; ? ? /* 消息最大長度 */
? ? long mq_curmsgs; ? ? /* 消息隊列的當前消息個數 */
};
消息隊列描述符河文件描述符一樣,當進程通過fork打開一個子進程后,子進程中將從父進程繼承相關描述符。此時父子進程中的描述符引用的是同一個消息隊列,并且它們的mq_flags參數也將共享。下面我們使用幾個簡單的例子來看看他們的操作方法:
創建并向消息隊列發送消息:
[zorro@zorro-pc mqueue]$ cat send.c
#include <fcntl.h>
#include <sys/stat.h> ? ? ? ?/* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define MQNAME "/mqtest"
int main(int argc, char *argv[])
{
? ? mqd_t mqd;
? ? int ret;
? ? if (argc != 3) {
? ? ? ? fprintf(stderr, "Argument error!\n");
? ? ? ? exit(1);
? ? }
? ? mqd = mq_open(MQNAME, O_RDWR|O_CREAT, 0600, NULL);
? ? if (mqd == -1) {
? ? ? ? perror("mq_open()");
? ? ? ? exit(1);
? ? }
? ? ret = mq_send(mqd, argv[1], strlen(argv[1]), atoi(argv[2]));
? ? if (ret == -1) {
? ? ? ? perror("mq_send()");
? ? ? ? exit(1);
? ? }
? ? exit(0);
}
注意相關方法在編譯的時候需要鏈接一些庫,所以我們可以創建Makefile來解決這個問題:
[zorro@zorro-pc mqueue]$ cat Makefile?
CFLAGS+=-lrt -lpthread
我們添加了rt和pthread庫,為以后的例子最好準備。當然大家也可以直接使用gcc -lrt -lpthread來解決這個問題,然后我們對程序編譯并測試:
[zorro@zorro-pc mqueue]$ rm send
[zorro@zorro-pc mqueue]$ make send
cc -lrt -lpthread ? ?send.c ? -o send
[zorro@zorro-pc mqueue]$ ./send zorro 1
[zorro@zorro-pc mqueue]$ ./send shrek 2
[zorro@zorro-pc mqueue]$ ./send jerry 3
[zorro@zorro-pc mqueue]$ ./send zzzzz 1
[zorro@zorro-pc mqueue]$ ./send ssssss 2
[zorro@zorro-pc mqueue]$ ./send jjjjj 3
我們以不同優先級給消息隊列添加了幾條消息。然后我們可以通過文件來查看相關消息隊列的狀態:
[zorro@zorro-pc mqueue]$ cat /dev/mqueue/mqtest?
QSIZE:31 ? ? ? ? NOTIFY:0 ? ? SIGNO:0 ? ? NOTIFY_PID:0?
然后我們來看如何接收消息:
[zorro@zorro-pc mqueue]$ cat recv.c
#include <fcntl.h>
#include <sys/stat.h> ? ? ? ?/* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define MQNAME "/mqtest"
int main()
{
? ? mqd_t mqd;
? ? int ret;
? ? int val;
? ? char buf[BUFSIZ];
? ? mqd = mq_open(MQNAME, O_RDWR);
? ? if (mqd == -1) {
? ? ? ? perror("mq_open()");
? ? ? ? exit(1);
? ? }
? ? ret = mq_receive(mqd, buf, BUFSIZ, &val);
? ? if (ret == -1) {
? ? ? ? perror("mq_send()");
? ? ? ? exit(1);
? ? }
? ? ret = mq_close(mqd);
? ? if (ret == -1) {
? ? ? ? perror("mp_close()");
? ? ? ? exit(1);
? ? }
? ? printf("msq: %s, prio: %d\n", buf, val);
? ? exit(0);
}
直接編譯執行:
[zorro@zorro-pc mqueue]$ ./recv?
msq: jerry, prio: 3
[zorro@zorro-pc mqueue]$ ./recv?
msq: jjjjj, prio: 3
[zorro@zorro-pc mqueue]$ ./recv?
msq: shrek, prio: 2
[zorro@zorro-pc mqueue]$ ./recv?
msq: ssssss, prio: 2
[zorro@zorro-pc mqueue]$ ./recv?
msq: zorro, prio: 1
[zorro@zorro-pc mqueue]$ ./recv?
msq: zzzzz, prio: 1
可以看到優先級對消息隊列內部排序的影響。然后是刪除這個消息隊列:
[zorro@zorro-pc mqueue]$ cat rmmq.c?
#include <fcntl.h>
#include <sys/stat.h> ? ? ? ?/* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define MQNAME "/mqtest"
int main()
{
? ? int ret;
? ? ret = mq_unlink(MQNAME);
? ? if (ret == -1) {
? ? ? ? perror("mp_unlink()");
? ? ? ? exit(1);
? ? }
? ? exit(0);
}
大家在從消息隊列接收消息的時候會發現,當消息隊列為空的時候,mq_receive會阻塞,直到有人給隊列發送了消息才能返回并繼續執行。在很多應用場景下,這種同步處理的方式會給程序本身帶來性能瓶頸。為此,POSI消息隊列使用mq_notify為處理過程增加了一個異步通知機制。使用這個機制,我們就可以讓隊列在由空變成不空的時候觸發一個異步事件,通知調用進程,以便讓進程可以在隊列為空的時候不用阻塞等待。這個方法的原型為:
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
其中sevp用來想內核注冊具體的通知行為,可以man 7 sigevent查看相關幫助。這里我們不展開講解,詳細內容將在信號相關內容中詳細說明。簡單來說,我們可以使用nq_notify方法注冊3種行為:SIGEV_NONE,SIGEV_SIGNAL和SIGEV_THREAD。它們分別的含義如下:
SIGEV_NONE:一個“空”提醒。其實就是不提醒。
SIGEV_SIGNAL:當隊列中有了消息后給調用進程發送一個信號??梢允褂胹truct sigevent結構體中的sigev_signo指定信號編號,信號的si_code字段將設置為SI_MESGQ以標示這是消息隊列的信號。還可以通過si_pid和si_uid來指定信號來自什么pid和什么uid。
SIGEV_THREAD:當隊列中有了消息后觸發產生一個線程。當設置為線程時,可以使用struct sigevent結構體中的sigev_notify_function指定具體觸發什么線程,使用sigev_notify_attributes設置線程屬性,使用sigev_value.sival_ptr傳遞一個任何東西的指針。
我們先來看使用信號的簡單例子:
[zorro@zorro-pc mqueue]$ cat notify_sig.c
#include <pthread.h>
#include <mqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
static mqd_t mqdes;
void mq_notify_proc(int sig_num)
{
? ? /* mq_notify_proc()是信號處理函數,
? ? 當隊列從空變成非空時,會給本進程發送信號,
? ? 觸發本函數執行。 */
? ? struct mq_attr attr;
? ? void *buf;
? ? ssize_t size;
? ? int prio;
? ? struct sigevent sev;
? ? /* 我們約定使用SIGUSR1信號進行處理,
? ? 在此判斷發來的信號是不是SIGUSR1。 */
? ? if (sig_num != SIGUSR1) {
? ? ? ? return;
? ? }
? ? /* 取出當前隊列的消息長度上限作為緩存空間大小。 */
? ? if (mq_getattr(mqdes, &attr) < 0) {
? ? ? ? perror("mq_getattr()");
? ? ? ? exit(1);
? ? }
? ? buf = malloc(attr.mq_msgsize);
? ? if (buf == NULL) {
? ? ? ? perror("malloc()");
? ? ? ? exit(1);
? ? }
? ? /* 從消息隊列中接收消息。 */
? ? size = mq_receive(mqdes, buf, attr.mq_msgsize, &prio);
? ? if (size == -1) {
? ? ? ? perror("mq_receive()");
? ? ? ? exit(1);
? ? }
? ? /* 打印消息和其優先級。 */
? ? printf("msq: %s, prio: %d\n", buf, prio);
? ? free(buf);
? ? /* 重新注冊mq_notify,以便下次可以出觸發。 */
? ? sev.sigev_notify = SIGEV_SIGNAL;
? ? sev.sigev_signo = SIGUSR1;
? ? if (mq_notify(mqdes, &sev) == -1) {
? ? ? ? perror("mq_notify()");
? ? ? ? exit(1);
? ? }
? ? return;
}
int main(int argc, char *argv[])
{
? ? struct sigevent sev;
? ? if (argc != 2) {
? ? ? ? fprintf(stderr, "Argument error!\n");
? ? ? ? exit(1);
? ? }
? ? /* 注冊信號處理函數。 */
? ? if (signal(SIGUSR1, mq_notify_proc) == SIG_ERR) {
? ? ? ? perror("signal()");
? ? ? ? exit(1);
? ? }
? ? /* 打開消息隊列,注意此隊列需要先創建。 */
? ? mqdes = mq_open(argv[1], O_RDONLY);
? ? if (mqdes == -1) {
? ? ? ? perror("mq_open()");
? ? ? ? exit(1);
? ? }
? ? /* 注冊mq_notify。 */
? ? sev.sigev_notify = SIGEV_SIGNAL;
? ? sev.sigev_signo = SIGUSR1;
? ? if (mq_notify(mqdes, &sev) == -1) {
? ? ? ? perror("mq_notify()");
? ? ? ? exit(1);
? ? }
? ? /* 主進程每秒打印一行x,等著從消息隊列發來異步信號觸發收消息。 */
? ? while (1) {
? ? ? ? printf("x\n");
? ? ? ? sleep(1);
? ? }
}
我們編譯這個程序并執行:
[zorro@zorro-pc mqueue]$ ./notify_sig /mqtest
x
x
...
會一直打印x,等著隊列變為非空,我們此時在別的終端給隊列發送一個消息:
[zorro@zorro-pc mqueue]$ ./send hello 1
進程接收到信號,并且現實消息相關內容:
...
x
x
msq: hello, prio: 1
x
...
再發一個試試:
[zorro@zorro-pc mqueue]$ ./send zorro 3
顯示:
...
x
msq: zorro, prio: 3
x
...
在mq_notify的man手冊中,有一個觸發線程進行異步處理的例子,我們在此就不再額外寫一遍了,在此引用并注釋一下,以方便大家理解:
字數限制,本段代碼請參考原文。
大家可以自行編譯執行此程序進行測試。請注意mq_notify的行為:
一個消息隊列智能通過mq_notify注冊一個進程進行異步處理。
異步通知只會在消息隊列從空變成非空的時候產生,其它隊列的變動不會觸發異步通知。
如果有其他進程使用mq_receive等待隊列的消息時,消息到來不會觸發已注冊mq_notify的程序產生異步通知。隊列的消息會遞送給在使用mq_receive等待的進程。
一次mq_notify注冊只會觸發一次異步事件,此后如果隊列再次由空變為非空也不會觸發異步通知。如果需要一直可以觸發,請處理異步通知之后再次注冊mq_notify。
如果sevp指定為NULL,表示取消注冊異步通知。
POSIX消息隊列相對XSI消息隊列的一大優勢是,我們又一個類似文件描述符的mqd的描述符可以進行操作,所以很自然的我們就會聯想到是否可以使用多路IO轉接機制對消息隊列進程處理?在Linux上,答案是肯定的,我們可以使用select、poll和epoll對隊列描述符進行處理,我們在此僅使用epoll舉個簡單的例子:
字數限制,本段代碼請參考原文。
這就是POSIX消息隊列比XSI更有趣的地方,XSI的消息隊列并未遵守“一切皆文件”的原則。當然,使用select和poll這里就不再舉例了,有興趣的可以自己實現一下作為練習。
以上例子中,我們也分別演示了如何使用mq_setattr和mq_getattr,此處我們應該知道,在所有可以顯示的屬性中,O_NONBLOCK是mq_setattr唯一可以更改的參數設置,其他參數對于這個方法都是只讀的,不能修改。系統提供了其他手段可以對這些限制進行修改:
/proc/sys/fs/mqueue/msg_default:在mq_open的attr參數設置為NULL的時候,這個文件中的數字限定了mq_maxmsg的值,就是隊列的消息個數限制。默認為10個,當消息數達到上限之后,再使用mq_send發送消息會阻塞。
/proc/sys/fs/mqueue/msg_max:可以通過mq_open的attr參數設定的mq_maxmsg的數字上限。這個值默認也是10。
/proc/sys/fs/mqueue/msgsize_default:在mq_open的attr參數設置為NULL的時候,這個文件中的數字限定了mq_msgsize的值,就是隊列的字節數數限制。
/proc/sys/fs/mqueue/msgsize_max:可以通過mq_open的attr參數設定的mq_msgsize的數字上限。
/proc/sys/fs/mqueue/queues_max:系統可以創建的消息隊列個數上限。
最后
希望這些內容對大家進一步深入了解Linux的消息隊列有幫助。如果有相關問題,可以在我的微博、微信或者博客上聯系我。
微博ID:orroz
微信公眾號:Linux系統技術
前言
Linux系統給我們提供了一種可以發送格式化數據流的通信手段,這就是消息隊列。使用消息隊列無疑在某些場景的應用下可以大大減少工作量,相同的工作如果使用共享內存,除了需要自己手工構造一個可能不夠高效的隊列外,我們還要自己處理競爭條件和臨界區代碼。而內核給我們提供的消息隊列,無疑大大方便了我們的工作。
Linux環境提供了XSI和POSIX兩套消息隊列,本文將幫助您掌握以下內容:
如何使用XSI消息隊列。
如何使用POSIX消息隊列。
它們的底層實現分別是什么樣子的?
它們分別有什么特點?以及相關資源限制。
請任意打賞,多謝多謝!
XSI消息隊列
系統提供了四個方法來操作XSI消息隊列,它們分別是:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
我們可以使用msgget去創建或訪問一個消息隊列,與其他XSI IPC一樣,msgget使用一個key作為創建消息隊列的標識。這個key可以通過ftok生成或者指定為IPC_PRIVATE。指定為IPC_PRIVATE時,此隊列會新建出來,而且內核會保證新建的隊列key不會與已經存在的隊列沖突,所以此時后面的msgflag應指定為IPC_CREAT。當msgflag指定為IPC_CREAT時,msgget會去試圖創建一個新的消息隊列,除非指定key的消息隊列已經存在??梢允褂肙_CREAT | O_EXCL在指定key已經存在的情況下報錯,而不是訪問這個消息隊列。我們來看創建一個消息隊列的例子:
[zorro@zorro-pc mqueue]$ cat msg_create.c
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#define FILEPATH "/etc/passwd"
#define PROJID 1234
int main()
{
? ? int msgid;
? ? key_t key;
? ? struct msqid_ds msg_buf;
? ? key = ftok(FILEPATH, PROJID);
? ? if (key == -1) {
? ? ? ? perror("ftok()");
? ? ? ? exit(1);
? ? }
? ? msgid = msgget(key, IPC_CREAT|IPC_EXCL|0600);
? ? if (msgid == -1) {
? ? ? ? perror("msgget()");
? ? ? ? exit(1);
? ? }
? ? if (msgctl(msgid, IPC_STAT, &msg_buf) == -1) {
? ? ? ? perror("msgctl()");
? ? ? ? exit(1);
? ? }
? ? printf("msgid: %d\n", msgid);
? ? printf("msg_perm.uid: %d\n", msg_buf.msg_perm.uid);
? ? printf("msg_perm.gid: %d\n", msg_buf.msg_perm.gid);
? ? printf("msg_stime: %d\n", msg_buf.msg_stime);
? ? printf("msg_rtime: %d\n", msg_buf.msg_rtime);
? ? printf("msg_qnum: %d\n", msg_buf.msg_qnum);
? ? printf("msg_qbytes: %d\n", msg_buf.msg_qbytes);
}
這個程序可以創建并查看一個消息隊列的相關狀態,執行結果:
[zorro@zorro-pc mqueue]$ ./msg_create?
msgid: 0
msg_perm.uid: 1000
msg_perm.gid: 1000
msg_stime: 0
msg_rtime: 0
msg_qnum: 0
msg_qbytes: 16384
如果我們在次執行這個程序,就會報錯,因為key沒有變化,我們使用了IPC_CREAT|IPC_EXCL,所以相關隊列已經存在了就會報錯:
[zorro@zorro-pc mqueue]$ ./msg_create?
msgget(): File exists
順便看一下msgctl方法,我們可以用它來取一個消息隊列的相關狀態。更詳細的信息可以man 2 msgctl查看。除了查看隊列狀態以外,還可以使用msgctl設置相關隊列狀態以及刪除指定隊列。另外我們還可以使用ipcs -q命令查看系統中XSI消息隊列的相關狀態。其他相關參數請參考man ipcs。
使用msgsnd和msgrcv向隊列發送和從隊列接收消息。我們先來看看如何訪問一個已經存在的消息隊列和向其發送消息:
[zorro@zorro-pc mqueue]$ cat msg_send.c
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#define FILEPATH "/etc/passwd"
#define PROJID 1234
#define MSG "hello world!"
struct msgbuf {
? ? long mtype;
? ? char mtext[BUFSIZ];
};
int main()
{
? ? int msgid;
? ? key_t key;
? ? struct msgbuf buf;
? ? key = ftok(FILEPATH, PROJID);
? ? if (key == -1) {
? ? ? ? perror("ftok()");
? ? ? ? exit(1);
? ? }
? ? msgid = msgget(key, 0);
? ? if (msgid == -1) {
? ? ? ? perror("msgget()");
? ? ? ? exit(1);
? ? }
? ? buf.mtype = 1;
? ? strncpy(buf.mtext, MSG, strlen(MSG));
? ? if (msgsnd(msgid, &buf, strlen(buf.mtext), 0) == -1) {
? ? ? ? perror("msgsnd()");
? ? ? ? exit(1);
? ? }
}
使用msgget訪問一個已經存在的消息隊列時,msgflag指定為0即可。使用msgsnd發送消息時主要需要注意的是它的第二個和第三個參數。第二個參數用來指定要發送的消息,它實際上應該是一個指向某個特殊結構的指針,這個結構可以定義如下:
struct msgbuf {
? ? long mtype;
? ? char mtext[BUFSIZ];
};
這個結構的mtype實際上是用來指定消息類型的,可以指定的數字必需是個正整數。我們可以把這個概念理解為XSI消息隊列對消息優先級的實現方法,即:需要傳送的消息體的第一個long長度是用來指定類型的參數,而非消息本身,后面的內容才是消息。在我們實現的消息中,這個結構題可以傳送的最大消息長度為BUFSIZE的字節數。當然,如果你的消息并不是一個字符串,也可以將mtype后面的信息實現成各種需要的格式,比如想要發送一個人的名字和他的數學語文成績的話,可以這樣實現:
struct msgbuf {
? ? long mtype;
? ? char name[NAMESIZE];
? ? int math, chinese;
};
這實際上就是讓使用者自己去設計一個通訊協議,然后發送端和接收端使用約定好的協議進行通訊。msgsnd的第三個參數應該是這個消息結構體除了mtype以外的真實消息的長度,而不是這個結構題的總長度,這點是要注意的。所以,如果你定義了一個很復雜的消息協議的話,建議的長度寫法是這樣:
sizeof(buf)-sizeof(long)
msgsnd的最后一個參數可以用來指定IPC_NOWAIT。在消息隊列滿的情況下,默認的發送行為會阻塞等待,如果加了這個參數,則不會阻塞,而是立即返回,并且errno設置為EAGAIN。然后我們來看接收消息和刪除消息隊列的例子:
[zorro@zorro-pc mqueue]$ cat msg_receive.c
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#define FILEPATH "/etc/passwd"
#define PROJID 1234
struct msgbuf {
? ? long mtype;
? ? char mtext[BUFSIZ];
};
int main()
{
? ? int msgid;
? ? key_t key;
? ? struct msgbuf buf;
? ? key = ftok(FILEPATH, PROJID);
? ? if (key == -1) {
? ? ? ? perror("ftok()");
? ? ? ? exit(1);
? ? }
? ? msgid = msgget(key, 0);
? ? if (msgid == -1) {
? ? ? ? perror("msgget()");
? ? ? ? exit(1);
? ? }
? ? if (msgrcv(msgid, &buf, BUFSIZ, 1, 0) == -1) {
? ? ? ? perror("msgrcv()");
? ? ? ? exit(1);
? ? }
? ? printf("mtype: %d\n", buf.mtype);
? ? printf("mtype: %s\n", buf.mtext);
? ? if (msgctl(msgid, IPC_RMID, NULL) == -1) {
? ? ? ? perror("msgctl()");
? ? ? ? exit(1);
? ? }
? ? exit(0);
}
msgrcv會將消息從指定隊列中刪除,并將其內容填到其第二個參數指定的buf地址所在的內存中。第三個參數指定承接消息的buf長度,如果消息內容長度大于指定的長度,那么這個函數的行為將取決于最后一個參數msgflag是否設置了MSG_NOERROR,如果這個標志被設定,那消息將被截短,消息剩余部分將會丟失。如果沒設置這個標志,msgrcv會失敗返回,并且errno被設定為E2BIG。
第四個參數用來指定從消息隊列中要取的消息類型msgtyp,如果設置為0,則無論什么類型,取隊列中的第一個消息。如果值大于0,則讀取符合這個類型的第一個消息,當最后一個參數msgflag設置為MSG_EXCEPT的時候,是對消息類型取邏輯非。即,不等于這個消息類型的第一個消息會被讀取。如果指定一個小于0的值,那么將讀取消息類型比這個負數的絕對值小的類型的所有消息中的第一個。
最后一個參數msgflag還可以設置為:
IPC_NOWAIT:非阻塞方式讀取。當隊列為空的時候,msgrcv會阻塞等待。加這個標志后將直接返回,errno被設置為ENOMSG。
MSG_COPY:從Linux 3.8之后開始支持以消息位置的方式讀取消息。如果標志為置為MSG_COPY則表示啟用這個功能,此時msgtyp的含義將從類型變為位置偏移量,第一個消息的起始值為0。如果指定位置的消息不存在,則返回并設置errno為ENOMSG。并且MSG_COPY和MSG_EXCEPT不能同時設置。另外還要注意這個功能需要內核配置打開CONFIG_CHECKPOINT_RESTORE選項。這個選項默認應該是不開的。
使用msgctl刪除消息隊列的方法比較簡單,不在復述。另外關于msgctl的其他使用,請大家參考msgctl的手冊。這部分內容的另外一個權威參考資料就是《UNIX環境高級編程》。我們在這里補充一下Linux系統對XSI消息隊列的限制相關參數介紹:
/proc/sys/kernel/msgmax:這個文件限制了系統中單個消息最大的字節數。
/proc/sys/kernel/msgmni:這個文件限制了系統中可創建的最大消息隊列個數。
/proc/sys/kernel/msgmnb:這個文件用來限制單個消息隊列中可以存放的最大消息字節數。
以上文件都可以使用echo或者sysctl命令進行修改。
POSIX消息隊列
POSIX消息隊列是獨立于XSI消息隊列的一套新的消息隊列API,讓進程可以用消息的方式進行數據交換。這套消息隊列在Linux 2.6.6版本之后開始支持,還需要你的glibc版本必須高于2.3.4。它們使用如下方法進行操作和控制:
#include <fcntl.h> ? ? ? ? ? /* For O_* constants */
#include <sys/stat.h> ? ? ? ?/* For mode constants */
#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
類似對文件的open,我們可以用mq_open來打開一個已經創建的消息隊列或者創建一個消息隊列。這個函數返回一個叫做mqd_t類型的返回值,其本質上還是一個文件描述符,只是在這這里被叫做消息隊列描述符(message queue descriptor),在進程里使用這個描述符對消息隊列進程操作。所有被創建出來的消息隊列在系統中都有一個文件與之對應,這個文件名是通過name參數指定的,這里需要注意的是:name必須是一個以”/“開頭的字符串,比如我想讓消息隊列的名字叫”message”,那么name應該給的是”/message”。消息隊列創建完畢后,會在/dev/mqueue目錄下產生一個以name命名的文件,我們還可以通過cat這個文件來看這個消息隊列的一些狀態信息。其它進程在消息隊列已經存在的情況下就可以通過mp_open打開名為name的消息隊列來訪問它。
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);
在一個消息隊列創建完畢之后,我們可以使用mq_send來對消息隊列發送消息,mq_receive來對消息隊列接收消息。正常的發送消息一般不會阻塞,除非消息隊列處在某種異常狀態或者消息隊列已滿的時候,而消息隊列在空的時候,如果使用mq_receive去試圖接受消息的行為也會被阻塞,所以就有必要為兩個方法提供一個帶超時時間的版本。這里要注意的是msg_prio這個參數,是用來指定消息優先級的。每個消息都有一個優先級,取值范圍是0到sysconf(_SC_MQ_PRIO_MAX) - 1的大小。在Linux上,這個值為32768。默認情況下,消息隊列會先按照優先級進行排序,就是msg_prio這個值越大的越先出隊列。同一個優先級的消息按照fifo原則處理。在mq_receive方法中的msg_prio是一個指向int的地址,它并不是用來指定取的消息是哪個優先級的,而是會將相關消息的優先級取出來放到相關變量中,以便用戶自己處理優先級。
int mq_close(mqd_t mqdes);
我們可以使用mq_close來關閉一個消息隊列,這里的關閉并非刪除了相關文件,關閉之后消息隊列在系統中依然存在,我們依然可以繼續打開它使用。這跟文件的close和unlink的概念是類似的。
int mq_unlink(const char *name);
使用mq_unlink真正刪除一個消息隊列。另外,我們還可以使用mq_getattr和mq_setattr來查看和設置消息隊列的屬性,其函數原型為:
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
mq_attr結構體是這樣的結構:
struct mq_attr {
? ? long mq_flags; ? ? ? /* 只可以通過此參數將消息隊列設置為是否非阻塞O_NONBLOCK */
? ? long mq_maxmsg; ? ? ?/* 消息隊列的消息數上限 */
? ? long mq_msgsize; ? ? /* 消息最大長度 */
? ? long mq_curmsgs; ? ? /* 消息隊列的當前消息個數 */
};
消息隊列描述符河文件描述符一樣,當進程通過fork打開一個子進程后,子進程中將從父進程繼承相關描述符。此時父子進程中的描述符引用的是同一個消息隊列,并且它們的mq_flags參數也將共享。下面我們使用幾個簡單的例子來看看他們的操作方法:
創建并向消息隊列發送消息:
[zorro@zorro-pc mqueue]$ cat send.c
#include <fcntl.h>
#include <sys/stat.h> ? ? ? ?/* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define MQNAME "/mqtest"
int main(int argc, char *argv[])
{
? ? mqd_t mqd;
? ? int ret;
? ? if (argc != 3) {
? ? ? ? fprintf(stderr, "Argument error!\n");
? ? ? ? exit(1);
? ? }
? ? mqd = mq_open(MQNAME, O_RDWR|O_CREAT, 0600, NULL);
? ? if (mqd == -1) {
? ? ? ? perror("mq_open()");
? ? ? ? exit(1);
? ? }
? ? ret = mq_send(mqd, argv[1], strlen(argv[1]), atoi(argv[2]));
? ? if (ret == -1) {
? ? ? ? perror("mq_send()");
? ? ? ? exit(1);
? ? }
? ? exit(0);
}
注意相關方法在編譯的時候需要鏈接一些庫,所以我們可以創建Makefile來解決這個問題:
[zorro@zorro-pc mqueue]$ cat Makefile?
CFLAGS+=-lrt -lpthread
我們添加了rt和pthread庫,為以后的例子最好準備。當然大家也可以直接使用gcc -lrt -lpthread來解決這個問題,然后我們對程序編譯并測試:
[zorro@zorro-pc mqueue]$ rm send
[zorro@zorro-pc mqueue]$ make send
cc -lrt -lpthread ? ?send.c ? -o send
[zorro@zorro-pc mqueue]$ ./send zorro 1
[zorro@zorro-pc mqueue]$ ./send shrek 2
[zorro@zorro-pc mqueue]$ ./send jerry 3
[zorro@zorro-pc mqueue]$ ./send zzzzz 1
[zorro@zorro-pc mqueue]$ ./send ssssss 2
[zorro@zorro-pc mqueue]$ ./send jjjjj 3
我們以不同優先級給消息隊列添加了幾條消息。然后我們可以通過文件來查看相關消息隊列的狀態:
[zorro@zorro-pc mqueue]$ cat /dev/mqueue/mqtest?
QSIZE:31 ? ? ? ? NOTIFY:0 ? ? SIGNO:0 ? ? NOTIFY_PID:0?
然后我們來看如何接收消息:
[zorro@zorro-pc mqueue]$ cat recv.c
#include <fcntl.h>
#include <sys/stat.h> ? ? ? ?/* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define MQNAME "/mqtest"
int main()
{
? ? mqd_t mqd;
? ? int ret;
? ? int val;
? ? char buf[BUFSIZ];
? ? mqd = mq_open(MQNAME, O_RDWR);
? ? if (mqd == -1) {
? ? ? ? perror("mq_open()");
? ? ? ? exit(1);
? ? }
? ? ret = mq_receive(mqd, buf, BUFSIZ, &val);
? ? if (ret == -1) {
? ? ? ? perror("mq_send()");
? ? ? ? exit(1);
? ? }
? ? ret = mq_close(mqd);
? ? if (ret == -1) {
? ? ? ? perror("mp_close()");
? ? ? ? exit(1);
? ? }
? ? printf("msq: %s, prio: %d\n", buf, val);
? ? exit(0);
}
直接編譯執行:
[zorro@zorro-pc mqueue]$ ./recv?
msq: jerry, prio: 3
[zorro@zorro-pc mqueue]$ ./recv?
msq: jjjjj, prio: 3
[zorro@zorro-pc mqueue]$ ./recv?
msq: shrek, prio: 2
[zorro@zorro-pc mqueue]$ ./recv?
msq: ssssss, prio: 2
[zorro@zorro-pc mqueue]$ ./recv?
msq: zorro, prio: 1
[zorro@zorro-pc mqueue]$ ./recv?
msq: zzzzz, prio: 1
可以看到優先級對消息隊列內部排序的影響。然后是刪除這個消息隊列:
[zorro@zorro-pc mqueue]$ cat rmmq.c?
#include <fcntl.h>
#include <sys/stat.h> ? ? ? ?/* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define MQNAME "/mqtest"
int main()
{
? ? int ret;
? ? ret = mq_unlink(MQNAME);
? ? if (ret == -1) {
? ? ? ? perror("mp_unlink()");
? ? ? ? exit(1);
? ? }
? ? exit(0);
}
大家在從消息隊列接收消息的時候會發現,當消息隊列為空的時候,mq_receive會阻塞,直到有人給隊列發送了消息才能返回并繼續執行。在很多應用場景下,這種同步處理的方式會給程序本身帶來性能瓶頸。為此,POSI消息隊列使用mq_notify為處理過程增加了一個異步通知機制。使用這個機制,我們就可以讓隊列在由空變成不空的時候觸發一個異步事件,通知調用進程,以便讓進程可以在隊列為空的時候不用阻塞等待。這個方法的原型為:
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
其中sevp用來想內核注冊具體的通知行為,可以man 7 sigevent查看相關幫助。這里我們不展開講解,詳細內容將在信號相關內容中詳細說明。簡單來說,我們可以使用nq_notify方法注冊3種行為:SIGEV_NONE,SIGEV_SIGNAL和SIGEV_THREAD。它們分別的含義如下:
SIGEV_NONE:一個“空”提醒。其實就是不提醒。
SIGEV_SIGNAL:當隊列中有了消息后給調用進程發送一個信號??梢允褂胹truct sigevent結構體中的sigev_signo指定信號編號,信號的si_code字段將設置為SI_MESGQ以標示這是消息隊列的信號。還可以通過si_pid和si_uid來指定信號來自什么pid和什么uid。
SIGEV_THREAD:當隊列中有了消息后觸發產生一個線程。當設置為線程時,可以使用struct sigevent結構體中的sigev_notify_function指定具體觸發什么線程,使用sigev_notify_attributes設置線程屬性,使用sigev_value.sival_ptr傳遞一個任何東西的指針。
我們先來看使用信號的簡單例子:
[zorro@zorro-pc mqueue]$ cat notify_sig.c
#include <pthread.h>
#include <mqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
static mqd_t mqdes;
void mq_notify_proc(int sig_num)
{
? ? /* mq_notify_proc()是信號處理函數,
? ? 當隊列從空變成非空時,會給本進程發送信號,
? ? 觸發本函數執行。 */
? ? struct mq_attr attr;
? ? void *buf;
? ? ssize_t size;
? ? int prio;
? ? struct sigevent sev;
? ? /* 我們約定使用SIGUSR1信號進行處理,
? ? 在此判斷發來的信號是不是SIGUSR1。 */
? ? if (sig_num != SIGUSR1) {
? ? ? ? return;
? ? }
? ? /* 取出當前隊列的消息長度上限作為緩存空間大小。 */
? ? if (mq_getattr(mqdes, &attr) < 0) {
? ? ? ? perror("mq_getattr()");
? ? ? ? exit(1);
? ? }
? ? buf = malloc(attr.mq_msgsize);
? ? if (buf == NULL) {
? ? ? ? perror("malloc()");
? ? ? ? exit(1);
? ? }
? ? /* 從消息隊列中接收消息。 */
? ? size = mq_receive(mqdes, buf, attr.mq_msgsize, &prio);
? ? if (size == -1) {
? ? ? ? perror("mq_receive()");
? ? ? ? exit(1);
? ? }
? ? /* 打印消息和其優先級。 */
? ? printf("msq: %s, prio: %d\n", buf, prio);
? ? free(buf);
? ? /* 重新注冊mq_notify,以便下次可以出觸發。 */
? ? sev.sigev_notify = SIGEV_SIGNAL;
? ? sev.sigev_signo = SIGUSR1;
? ? if (mq_notify(mqdes, &sev) == -1) {
? ? ? ? perror("mq_notify()");
? ? ? ? exit(1);
? ? }
? ? return;
}
int main(int argc, char *argv[])
{
? ? struct sigevent sev;
? ? if (argc != 2) {
? ? ? ? fprintf(stderr, "Argument error!\n");
? ? ? ? exit(1);
? ? }
? ? /* 注冊信號處理函數。 */
? ? if (signal(SIGUSR1, mq_notify_proc) == SIG_ERR) {
? ? ? ? perror("signal()");
? ? ? ? exit(1);
? ? }
? ? /* 打開消息隊列,注意此隊列需要先創建。 */
? ? mqdes = mq_open(argv[1], O_RDONLY);
? ? if (mqdes == -1) {
? ? ? ? perror("mq_open()");
? ? ? ? exit(1);
? ? }
? ? /* 注冊mq_notify。 */
? ? sev.sigev_notify = SIGEV_SIGNAL;
? ? sev.sigev_signo = SIGUSR1;
? ? if (mq_notify(mqdes, &sev) == -1) {
? ? ? ? perror("mq_notify()");
? ? ? ? exit(1);
? ? }
? ? /* 主進程每秒打印一行x,等著從消息隊列發來異步信號觸發收消息。 */
? ? while (1) {
? ? ? ? printf("x\n");
? ? ? ? sleep(1);
? ? }
}
我們編譯這個程序并執行:
[zorro@zorro-pc mqueue]$ ./notify_sig /mqtest
x
x
...
會一直打印x,等著隊列變為非空,我們此時在別的終端給隊列發送一個消息:
[zorro@zorro-pc mqueue]$ ./send hello 1
進程接收到信號,并且現實消息相關內容:
...
x
x
msq: hello, prio: 1
x
...
再發一個試試:
[zorro@zorro-pc mqueue]$ ./send zorro 3
顯示:
...
x
msq: zorro, prio: 3
x
...
在mq_notify的man手冊中,有一個觸發線程進行異步處理的例子,我們在此就不再額外寫一遍了,在此引用并注釋一下,以方便大家理解:
字數限制,本段代碼請參考原文。
大家可以自行編譯執行此程序進行測試。請注意mq_notify的行為:
一個消息隊列智能通過mq_notify注冊一個進程進行異步處理。
異步通知只會在消息隊列從空變成非空的時候產生,其它隊列的變動不會觸發異步通知。
如果有其他進程使用mq_receive等待隊列的消息時,消息到來不會觸發已注冊mq_notify的程序產生異步通知。隊列的消息會遞送給在使用mq_receive等待的進程。
一次mq_notify注冊只會觸發一次異步事件,此后如果隊列再次由空變為非空也不會觸發異步通知。如果需要一直可以觸發,請處理異步通知之后再次注冊mq_notify。
如果sevp指定為NULL,表示取消注冊異步通知。
POSIX消息隊列相對XSI消息隊列的一大優勢是,我們又一個類似文件描述符的mqd的描述符可以進行操作,所以很自然的我們就會聯想到是否可以使用多路IO轉接機制對消息隊列進程處理?在Linux上,答案是肯定的,我們可以使用select、poll和epoll對隊列描述符進行處理,我們在此僅使用epoll舉個簡單的例子:
字數限制,本段代碼請參考原文。
這就是POSIX消息隊列比XSI更有趣的地方,XSI的消息隊列并未遵守“一切皆文件”的原則。當然,使用select和poll這里就不再舉例了,有興趣的可以自己實現一下作為練習。
以上例子中,我們也分別演示了如何使用mq_setattr和mq_getattr,此處我們應該知道,在所有可以顯示的屬性中,O_NONBLOCK是mq_setattr唯一可以更改的參數設置,其他參數對于這個方法都是只讀的,不能修改。系統提供了其他手段可以對這些限制進行修改:
/proc/sys/fs/mqueue/msg_default:在mq_open的attr參數設置為NULL的時候,這個文件中的數字限定了mq_maxmsg的值,就是隊列的消息個數限制。默認為10個,當消息數達到上限之后,再使用mq_send發送消息會阻塞。
/proc/sys/fs/mqueue/msg_max:可以通過mq_open的attr參數設定的mq_maxmsg的數字上限。這個值默認也是10。
/proc/sys/fs/mqueue/msgsize_default:在mq_open的attr參數設置為NULL的時候,這個文件中的數字限定了mq_msgsize的值,就是隊列的字節數數限制。
/proc/sys/fs/mqueue/msgsize_max:可以通過mq_open的attr參數設定的mq_msgsize的數字上限。
/proc/sys/fs/mqueue/queues_max:系統可以創建的消息隊列個數上限。
最后
希望這些內容對大家進一步深入了解Linux的消息隊列有幫助。如果有相關問題,可以在我的微博、微信或者博客上聯系我。
總結
以上是生活随笔為你收集整理的Linux的进程间通信-消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 车载360度全景监视系统
- 下一篇: LINUX 下构建OpenGL ES 3