MQTT再学习 -- 安装MQTT客户端及测试
上一篇文章我們已經(jīng)講了 MQTT 服務(wù)器的搭建,參看:MQTT再學(xué)習(xí) -- 搭建MQTT服務(wù)器及測(cè)試
接下來我們看一下 MQTT 客戶端。
一、客戶端下載
首先,客戶端也有多種,我們需要面臨選擇了。
參看:基于mqtt的消息推送(三)客戶端實(shí)現(xiàn)
現(xiàn)有客戶端sdk分析,基本分為兩大類:一類移植自C類庫(kù),如Mosquitto,一類是用objc或者swift原生實(shí)現(xiàn)。
各種sdk對(duì)比如下,我選用的是MQTT-Client,使用swift的同學(xué)可以使用CocoaMQTT,這個(gè)sdk的作者同時(shí)也是服務(wù)端實(shí)現(xiàn)emqtt的作者。
| Paho | Original | C | Open-Source.?Eclipse project |
| IBM | Original | C | Close Source.?IBM SDK |
| Mosquitto | Original | C | Open-Source.?Eclipse project |
| MQTTKit | Wrapper (Mosquitto) | Objective-C | Open-Source.?Github |
| Marquette | Wrapper (Mosquitto) | Objective-C | Open-Source.?Github |
| Moscapsule | Wrapper (Mosquitto) | Swift | Open-Source.?Github |
| Musqueteer | Wrapper (Mosquitto) | Objective-C | |
| MQTT-Client | Native | Objective-C | Open-Source.?Github |
| MQTTSDK | Native | Objective-C | |
| CocoaMQTT | Native | Swift | Open-Source.?Github |
我們上一篇就用到了 Mosquitto 就是它了。
如果你下載的是上面這個(gè)鏈接里的,比如說 ?org.eclipse.mosquitto-1.4.8.tar.gz?
安裝的時(shí)候和上一篇文章是一樣的。
不過遇到出現(xiàn)兩個(gè)問題:
參看:mosquitto 使用時(shí)出現(xiàn)的一些問題及其解決辦法
xsltproc:命令未找到
解決方法:sudo apt-get install xsltproc
make[1]: 正在進(jìn)入目錄 `/home/tarena/project/MQTT/org.eclipse.mosquitto-1.4.8/man'
xsltproc mosquitto.8.xml
warning: failed to load external entity "/usr/share/xml/docbook/stylesheet/docbook-xsl/manpages/docbook.xsl"
compilation error: file manpage.xsl line 3 element import
xsl:import : unable to load /usr/share/xml/docbook/stylesheet/docbook-xsl/manpages/docbook.xsl
compilation error: file mosquitto.8.xml line 4 element refentry
xsltParseStylesheetProcess : document is not a stylesheet
make[1]: *** [mosquitto.8] 錯(cuò)誤 5
make[1]:正在離開目錄 `/home/tarena/project/MQTT/org.eclipse.mosquitto-1.4.8/man'
make: *** [docs] 錯(cuò)誤 2
解決方法:
sudo apt-get install docbook-xsl
二、自寫客戶端
看完客戶端源碼(其實(shí)先寫的這個(gè),源碼還沒看...),那么接下來我們自己來寫一個(gè)客戶端。 首先講一下會(huì)用到的一些 C/UINX 的知識(shí)點(diǎn),當(dāng)然你要是基礎(chǔ)很扎實(shí)再好不過。 這里用到了 消息隊(duì)列、線程、fgets、共享庫(kù)等一些知識(shí)點(diǎn)。看到這里,不由得慶幸一下,看來我用了一年時(shí)間總結(jié)基礎(chǔ)知識(shí)是沒有白費(fèi)的,雖然很多已經(jīng)忘的差不多了。但是回過頭來,再看就明白了。特么,我都佩服我自己,一篇文章寫這么多,我都快看不下去了。 參看:UNIX再學(xué)習(xí) -- XSI IPC通信方式 參看:UNIX再學(xué)習(xí) -- 線程 參看:C語言再學(xué)習(xí) -- 輸入/輸出 參看:UNIX再學(xué)習(xí) -- 靜態(tài)庫(kù)與共享庫(kù)(1)自寫源碼
言歸正傳。先將我寫的源碼貼出。當(dāng)然是閹割版的,其中的 PM2.5 上一篇文章也提到了采集器不同協(xié)議也不一樣。并且我手頭沒有,我木有什么辦法呀啊。所以只能發(fā)送接收一個(gè) hello world ,向世界問聲好了。 #include <sys/types.h> #include <sys/msg.h> #include <sys/time.h> #include <fcntl.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <stdint.h> #include <stdbool.h> #include <string.h> #include <pthread.h> #include "mosquitto.h" #include "net_zslf.h" #include <string.h> #include <pthread.h> #include <sys/wait.h> #include <sys/types.h>//const char *mqtt_broker_address = "192.168.x.xx"; /* mqtt_broker ip address */ int mqtt_broker_port = 1883; /* mqtt_broker port number */ long msgtype = 10; /* pm sensor message type */ int msgsize = 100; /* pm sensor message size */int main(int argc, char** argv) {int gflags;int msgid;key_t key;pthread_t thread1, thread2;int ret;/* struct msqid_ds msg_ginfo, msg_sinfo; */char *msgpath = "home/tarena/project/MQTT/test/a.txt";//消息隊(duì)列的 鍵key = ftok(msgpath, 'a');gflags = IPC_CREAT;//創(chuàng)建消息隊(duì)列msgid = msgget(key, gflags | 00666);if(msgid == -1){DUG_PRINTF("msg create error\n");return -1;}/* msg_stat(msgid,msg_ginfo); *///創(chuàng)建消息隊(duì)列發(fā)送線程ret = pthread_create(&thread1, NULL, &start_thread_msgsend, (void *)&msgid);if (ret != 0) {perror("pthread msgsend create error\n");return -1;}//創(chuàng)建消息隊(duì)列接收線程ret = pthread_create(&thread2, NULL, &start_thread_msgrcv, (void *)&msgid);if (ret != 0){perror("pthread msgrcv create error\n");return -1;}//線程等待 pthread_join(thread1, NULL);pthread_join(thread2, NULL);return 0; }//消息隊(duì)列接收線程 void *start_thread_msgrcv(void *arg) {int rflags = 0;int ret;int msgid = *(int *)(arg);MSG_data_buf msg_rbuf; //消息隊(duì)列類型struct mosquitto *mosq; //保存一個(gè)MQTT客戶端連接的所有信息//下面的代碼是從xml文件中讀取FILE *fp; char szFileBuff[1024] = {0};char serverADDR[16] = {0},devID[15] = {0},devName[15] = {0};char Longitude[15] = {0},Latitude[15] = {0},frequency[3] = {0};char *lFirst, *lEnd; char devInfo[70];FILE *fp_re;char buffer_re[4];//打開xml文件 fp = fopen("/home/tarena/project/MQTT/test/deviceCfg.xml","r"); if (fp==NULL) { DUG_PRINTF("read XML file error!\n"); } //你只要知道while里面是獲取xml信息的,至于這種操作有點(diǎn)6 while(fgets(szFileBuff, 1023, fp)) { if ((lFirst = strstr(szFileBuff, "<serverADDR>")) != NULL) { lEnd = strstr(lFirst + 1, "</serverADDR>"); memcpy(serverADDR, lFirst + 12, lEnd - lFirst - 12); } if ((lFirst = strstr(szFileBuff, "<devID>")) != NULL) { lEnd = strstr(lFirst + 1, "</devID>"); memcpy(devID, lFirst + 7, lEnd - lFirst - 7); } if ((lFirst = strstr(szFileBuff, "<devName>")) != NULL) { lEnd = strstr(lFirst + 1, "</devName>"); memcpy(devName, lFirst + 9, lEnd - lFirst - 9); } if ((lFirst = strstr(szFileBuff, "<Longitude>")) != NULL) { lEnd = strstr(lFirst + 1, "</Longitude>"); memcpy(Longitude, lFirst + 11, lEnd - lFirst - 11); } if ((lFirst = strstr(szFileBuff, "<Latitude>")) != NULL) { lEnd = strstr(lFirst + 1, "</Latitude>"); memcpy(Latitude, lFirst + 10, lEnd - lFirst - 10); }//下面這個(gè)語句是用于分頻率傳送數(shù)據(jù)的if ((lFirst = strstr(szFileBuff, "<frequency>")) != NULL) { lEnd = strstr(lFirst + 1, "</frequency>"); memcpy(frequency, lFirst + 11, lEnd - lFirst - 11); } if ((lFirst = strstr(szFileBuff, "</display>")) != NULL) { sprintf(devInfo, "&%s&%s&%s&%s&\n",devID,devName,Longitude,Latitude);} }fclose(fp);//這里是關(guān)鍵了,MQTT協(xié)議 要上演了 這部分為 pub 發(fā)布內(nèi)容//MQTT 庫(kù)初始化mosquitto_lib_init();//新建mosq = mosquitto_new(devID, true, NULL);//連接回調(diào)設(shè)置mosquitto_connect_callback_set(mosq, my_connect_callback);//斷開回調(diào)設(shè)置mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);//發(fā)布回調(diào)設(shè)置mosquitto_publish_callback_set(mosq, my_publish_callback);//MQTT連接 if(mosquitto_connect(mosq, serverADDR, mqtt_broker_port, 600) != MOSQ_ERR_SUCCESS){DUG_PRINTF("mosquitto connection error\n");//銷毀mosquitto_destroy(mosq);//清空mosquitto_lib_cleanup();}//這里開始 消息隊(duì)列接收消息while (1){//接收消息ret=msgrcv(msgid, &msg_rbuf, msgsize, msgtype, rflags);sleep(1);if (ret == -1){DUG_PRINTF("read msg error\n");}DUG_PRINTF("%s\n", msg_rbuf.mtext);//將接收到的信息發(fā)布mosquitto_publish(mosq, NULL, "pmsensor", ret, (void *)msg_rbuf.mtext, 0, 0);sleep(5);}/* should never run below *///銷毀mosquitto_destroy(mosq);//清空mosquitto_lib_cleanup();return NULL; }void *start_thread_msgsend(void *arg) {/* 這里寫 PM.25 采集 */int retval; MSG_data_buf msg_sbuf; //消息隊(duì)列類型int msgid = *(int *)(arg);int sflags=IPC_NOWAIT; msg_sbuf.mtype = msgtype;strcpy(msg_sbuf.mtext,"hello world");//消息隊(duì)列發(fā)送retval = msgsnd(msgid, &msg_sbuf,msgsize, sflags);if(retval == -1) {DUG_PRINTF("message send error\n"); } }(2)源碼分析
簡(jiǎn)單來看一下上面的代碼首先創(chuàng)建了一個(gè)消息隊(duì)列
//消息隊(duì)列的 鍵key = ftok(msgpath, 'a');gflags = IPC_CREAT;//創(chuàng)建消息隊(duì)列msgid = msgget(key, gflags | 00666);if(msgid == -1){DUG_PRINTF("msg create error\n");return -1;}然后創(chuàng)建了兩個(gè)線程
//創(chuàng)建消息隊(duì)列發(fā)送線程ret = pthread_create(&thread1, NULL, &start_thread_msgsend, (void *)&msgid);if (ret != 0) {perror("pthread msgsend create error\n");return -1;}//創(chuàng)建消息隊(duì)列接收線程ret = pthread_create(&thread2, NULL, &start_thread_msgrcv, (void *)&msgid);if (ret != 0){perror("pthread msgrcv create error\n");return -1;}先看一下發(fā)送消息隊(duì)列線程
strcpy(msg_sbuf.mtext,"hello world"); //消息隊(duì)列發(fā)送 retval = msgsnd(msgid, &msg_sbuf,msgsize, sflags);無非將 hello world 替換成采集到的PM2.5數(shù)據(jù)。通過消息隊(duì)列函數(shù) msgsnd 發(fā)送。再看一下接收消息隊(duì)列線程
打開xml文件,從 xml 文件中獲取信息fp = fopen("/home/tarena/project/MQTT/test/deviceCfg.xml","r"); if (fp==NULL) { DUG_PRINTF("read XML file error!\n"); } 然后 while循環(huán)里的獲取信息,居然還有這種操作... ? 然后就是關(guān)鍵了,MQTT 協(xié)議要上演了 //MQTT 庫(kù)初始化 mosquitto_lib_init(); //新建 mosq = mosquitto_new(devID, true, NULL); //連接回調(diào)設(shè)置 mosquitto_connect_callback_set(mosq, my_connect_callback); //斷開回調(diào)設(shè)置 mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); //發(fā)布回調(diào)設(shè)置 mosquitto_publish_callback_set(mosq, my_publish_callback);接收消息隊(duì)列、發(fā)布內(nèi)容 //接收消息 ret=msgrcv(msgid, &msg_rbuf, msgsize, msgtype, rflags); //將接收到的信息發(fā)布 mosquitto_publish(mosq, NULL, "pmsensor", ret, (void *)msg_rbuf.mtext, 0, 0);最后銷毀清空 MQTT //銷毀 mosquitto_destroy(mosq); //清空 mosquitto_lib_cleanup();(3)項(xiàng)目下載
將 deviceCfg.xml 里的 IP 地址換成你自己的,服務(wù)器IP還是不能暴露滴。下載:MQTT 客戶端
(4)演示
總結(jié)
以上是生活随笔為你收集整理的MQTT再学习 -- 安装MQTT客户端及测试的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux wc命令用于计算字数。
- 下一篇: springcloud hystrix入