用emqx做mqtt客户端
最近項目中有一個需求,要用mqtt協議接收路側設備的數據到云平臺上,所以,研究了一下mqtt客戶端的制作方法。
mqtt協議是一個發布訂閱模式的協議。
這篇文章主要記錄下我搭建mqttbroker和寫mqtt客戶端的過程,是記錄,不是教程,無意教程。
一、下載安裝emqx
emqx是一個mqtt的broker軟件,這個軟件是比較好用的一個broker軟件,以前用過mosquitto軟件做mqtt的broker,但是mosquitto沒有emqx易用,所以就放棄了。
從emqx的官網上下載得到emqx-4.4.1-otp24.1.5-3-el7-amd64.zip,unzip解壓出來一個emqx的文件夾。
進入到emqx/bin下執行 emqx start,就將emqx啟動起來了。
[root@localhost bin]# ./emqx start WARNING: There seem to be missing dynamic libs from the OS. Using libs from /root/emqx/dynlibs EMQ X Broker 4.4.1 is started successfully! [root@localhost bin]#查看emqx的運行狀態用bin下的emqx_ctl命令。
[root@localhost bin]# ./emqx_ctl status Node 'emqx@127.0.0.1' 4.4.1 is started [root@localhost bin]#經過以上的下載、解壓、啟動、查看狀態四個步驟,就將emqx軟件正常運行起來了。
二、mqtt客戶端的編寫
編寫mqtt客戶端,是需要eclips的paho庫的,這個庫是一個mqtt客戶端的編程接口庫。
在基于maven的java項目中,要增加一個dependency。
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version> </dependency>接下來就是寫client的代碼了。
下面的代碼中有發送消息的代碼,也有接收消息的代碼,接收消息的代碼就是那個OnMessageCallback,有了這個Callback類就可以接收mqtt消息了。
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.nio.charset.StandardCharsets;public class App {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "哈哈哈,我又來了。";int qos = 2;String broker = "tcp://123.56.182.37:1883"; //這里是broker的地址String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 連接選項MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_test");connOpts.setPassword("emqx_test_password".toCharArray());// 保留會話connOpts.setCleanSession(true);// 設置回調,這個回調類是用來接收mqtt消息的,如果只是想要發布mqtt消息,那么就不需要這個回調了。// 當然,如果只是想接收mqtt消息,不想發送,那么就只設置這個回調,下面的發送代碼也就不需要了client.setCallback(new OnMessageCallback());// 建立連接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// 訂閱client.subscribe(subTopic);// 消息發布所需參數long i = 0;while (i < 10e10) {MqttMessage message = new MqttMessage(content.getBytes(StandardCharsets.UTF_8));message.setQos(qos);client.publish(pubTopic, message);Thread.sleep(10000);i++;System.out.println("Message published, i = " + i);}client.disconnect();System.out.println("Disconnected");client.close();System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}} } import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage;public class OnMessageCallback implements MqttCallback {public void connectionLost(Throwable cause) {// 連接丟失后,一般在這里面進行重連System.out.println("連接斷開,可以做重連");}public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息會執行到這里面System.out.println("接收消息主題:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息內容:" + new String(message.getPayload(), "utf-8"));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());} }以上兩步就可以完成一個具備基本功能的mqtt協議客戶端了,這個客戶端既可以發布消息,也可以接收消息。當然,如果只想要一個方面的功能,也可以適當裁剪以適應自己的需要。
參考資料:
1、https://www.emqx.io/docs/zh/v4.4/getting-started/install.html
總結
以上是生活随笔為你收集整理的用emqx做mqtt客户端的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 前端03——css
- 下一篇: acc--›Android无障碍开发手势