javascript
SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送)
場(chǎng)景
Windows上Mqtt服務(wù)器搭建與使用客戶端工具M(jìn)qttBox進(jìn)行測(cè)試:
https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/112305328
在上面搭建好了MQTT服務(wù)器以及客戶端工具M(jìn)qttBox之后,怎樣在SpringBoot中實(shí)現(xiàn)訂閱主題接收消息和發(fā)布主題推送消息的功能。
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
關(guān)注公眾號(hào)
霸道的程序猿
獲取編程相關(guān)電子書、教程推送與免費(fèi)下載。
實(shí)現(xiàn)
首先搭建起一個(gè)SpringBoot項(xiàng)目,引入最基本的Web依賴,這里可以使用快速搭建框架進(jìn)行搭建
若依前后端分離版手把手教你本地搭建環(huán)境并運(yùn)行項(xiàng)目:
https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108465662
然后搭建好SpringBoot項(xiàng)目后,首先在項(xiàng)目中引入mqtt的相關(guān)依賴
??????? <!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>然后連接MQTT服務(wù)器時(shí)需要配置一些參數(shù),比如服務(wù)器地址、用戶名、密碼等。所以需要將這些配置放在配置文件中
?
然后在Spring節(jié)點(diǎn)下添加如下配置
# Spring配置 spring:# MQTTmqtt:# 服務(wù)器連接地址,如果有多個(gè),用逗號(hào)隔開(kāi)host: tcp://你自己的MQTT服務(wù)器的地址:1883# 連接服務(wù)器默認(rèn)客戶端IDclientId: mqtt_client_id_001# 默認(rèn)的消息推送主題,實(shí)際可在調(diào)用接口時(shí)指定topic: mqtt_topic_001,mqtt_topic_002,mqtt_topic_003# 用戶名username: admin# 密碼password: admin# 連接超時(shí)timeout: 30# 心跳keepalive: 30添加位置
?
這里的服務(wù)器連接地址就是上面博客搭建的MQTT服務(wù)器的地址,其端口號(hào)是mqtt協(xié)議連接的端口號(hào),默認(rèn)是1883,不是mqtt服務(wù)端后臺(tái)登錄的端口號(hào)。
默認(rèn)的客戶端id用來(lái)作為在MQTT服務(wù)端的唯一標(biāo)識(shí),然后下面的默認(rèn)消息推送的主題會(huì)在項(xiàng)目啟動(dòng)后先發(fā)布這些主題,實(shí)際使用時(shí)需要在接口調(diào)用時(shí)指定。
然后用戶名密碼就是上面MQTT中的用戶名和密碼
在配置文件中添加了配置項(xiàng)之后需要在代碼中獲取到這些配置項(xiàng),根據(jù)自己的項(xiàng)目的規(guī)范去決定是通過(guò)注解還是其他方式來(lái)獲取配置項(xiàng)
這里使用如下方式
創(chuàng)建一個(gè)PropertiesUtil類用來(lái)加載配置項(xiàng)的的內(nèi)容
import java.io.IOException; import java.io.InputStream; import java.util.Properties;/*** 獲取配置信息**/ public class PropertiesUtil {public static String MQTT_HOST;public static String MQTT_CLIENT_ID;public static String MQTT_USER_NAME;public static String MQTT_PASSWORD;public static String MQTT_TOPIC;public static Integer MQTT_TIMEOUT;public static Integer MQTT_KEEP_ALIVE;/***? mqtt配置*/static {Properties properties = loadMqttProperties();MQTT_HOST = properties.getProperty("host");MQTT_CLIENT_ID = properties.getProperty("clientId");MQTT_USER_NAME = properties.getProperty("username");MQTT_PASSWORD = properties.getProperty("password");MQTT_TOPIC = properties.getProperty("topic");MQTT_TIMEOUT = Integer.valueOf(properties.getProperty("timeout"));MQTT_KEEP_ALIVE = Integer.valueOf(properties.getProperty("keepalive"));}private static Properties loadMqttProperties() {InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/application.yml");Properties properties = new Properties();try {properties.load(inputstream);return properties;} catch (IOException e) {throw new RuntimeException(e);} finally {try {if (inputstream != null) {inputstream.close();}} catch (IOException e) {throw new RuntimeException(e);}}} }然后就是需要在SpringBoot項(xiàng)目啟動(dòng)后連接到mqtt服務(wù)器并創(chuàng)建客戶端,然后在具體的業(yè)務(wù)需求中比如在Controller中進(jìn)行訂閱和發(fā)布主題。
所以首先創(chuàng)建一個(gè)MqttConsumer類,并使其實(shí)現(xiàn)ApplicationRunne接口的run方法以實(shí)現(xiàn)在項(xiàng)目啟動(dòng)后執(zhí)行所需要的操作
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component;@Component public class MqttConsumer implements ApplicationRunner {private static MqttClient client;@Overridepublic void run(ApplicationArguments args) {System.out.println("初始化并啟動(dòng)mqtt......");this.connect();}/*** 連接mqtt服務(wù)器*/private void connect() {try {// 1 創(chuàng)建客戶端getClient();// 2 設(shè)置配置MqttConnectOptions options = getOptions();String[] topic = PropertiesUtil.MQTT_TOPIC.split(",");// 3 消息發(fā)布質(zhì)量int[] qos = getQos(topic.length);// 4 最后設(shè)置create(options, topic, qos);} catch (Exception e) {System.out.println("mqtt連接異常:" + e);}}/***? 創(chuàng)建客戶端? --- 1 ---*/public void getClient() {try {if (null == client) {client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence());}System.out.println("創(chuàng)建mqtt客戶端:" );} catch (Exception e) {System.out.println("創(chuàng)建mqtt客戶端異常:\" + e:" );}}/***? 生成配置對(duì)象,用戶名,密碼等? --- 2 ---*/public MqttConnectOptions getOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(PropertiesUtil.MQTT_USER_NAME);options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());// 設(shè)置超時(shí)時(shí)間options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);// 設(shè)置會(huì)話心跳時(shí)間options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);// 是否清除sessionoptions.setCleanSession(false);System.out.println("--生成mqtt配置對(duì)象");return options;}/***? qos?? --- 3 ---*/public int[] getQos(int length) {int[] qos = new int[length];for (int i = 0; i < length; i++) {/***? MQTT協(xié)議中有三種消息發(fā)布服務(wù)質(zhì)量:** QOS0: “至多一次”,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會(huì)發(fā)生消息丟失或重復(fù)。這一級(jí)別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無(wú)所謂,因?yàn)椴痪煤筮€會(huì)有第二次發(fā)送。* QOS1: “至少一次”,確保消息到達(dá),但消息重復(fù)可能會(huì)發(fā)生。* QOS2: “只有一次”,確保消息到達(dá)一次。這一級(jí)別可用于如下情況,在計(jì)費(fèi)系統(tǒng)中,消息重復(fù)或丟失會(huì)導(dǎo)致不正確的結(jié)果,資源開(kāi)銷大*/qos[i] = 1;}System.out.println("--設(shè)置消息發(fā)布質(zhì)量");return qos;}/***? 裝載各種實(shí)例和訂閱主題? --- 4 ---*/public void create(MqttConnectOptions options, String[] topic, int[] qos) {try {client.setCallback(new MqttConsumerCallback(client, options, topic, qos));System.out.println("--添加回調(diào)處理類");client.connect(options);} catch (Exception e) {System.out.println("裝載實(shí)例或訂閱主題異常:" + e);}}/*** 訂閱某個(gè)主題** @param topic* @param qos*/public static void subscribe(String topic, int qos) {try {System.out.println("topic:" + topic);client.subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}/*** 發(fā)布,非持久化**? qos根據(jù)文檔設(shè)置為1** @param topic* @param msg*/public static void publish(String topic, String msg) {publish(1, false, topic, msg);}/*** 發(fā)布*/public static void publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = client.getTopic(topic);if (null == mTopic) {System.out.println("topic:" + topic + " 不存在");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();if (!token.isComplete()) {System.out.println("消息發(fā)送成功");}} catch (MqttPersistenceException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}} }在上面的啟動(dòng)類中的run方法中會(huì)首先連接mqtt服務(wù)器并創(chuàng)建客戶端,然后加載配置文件中配置的默認(rèn)主題并調(diào)用create
進(jìn)行訂閱。
這其中也提供了單個(gè)的用于訂閱主題和發(fā)布消息的方法。
其中在訂閱主題后接收消息時(shí)需要一個(gè)回調(diào)方法。
?
所以需要新建一個(gè)實(shí)現(xiàn)了MqttCallbackExtended接口的相關(guān)方法的回調(diào)處理類MqttConsumerCallback
import org.eclipse.paho.client.mqttv3.*;import java.util.Arrays;/*** mqtt回調(diào)處理類*/public class MqttConsumerCallback implements MqttCallbackExtended {private MqttClient client;private MqttConnectOptions options;private String[] topic;private int[] qos;public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {this.client = client;this.options = options;this.topic = topic;this.qos = qos;}/*** 斷開(kāi)重連*/@Overridepublic void connectionLost(Throwable cause) {System.out.println("MQTT連接斷開(kāi),發(fā)起重連......");try {if (null != client && !client.isConnected()) {client.reconnect();System.out.println("嘗試重新連接");} else {client.connect(options);System.out.println("嘗試建立新連接");}} catch (Exception e) {e.printStackTrace();}}/*** 接收到消息調(diào)用令牌中調(diào)用*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + Arrays.toString(topic));}/*** 消息處理*/@Overridepublic void messageArrived(String topic, MqttMessage message) {try {String msg = new String(message.getPayload());System.out.println("收到topic:" + topic + " 消息:" + msg);System.out.println("收到消息后執(zhí)行具體的業(yè)務(wù)邏輯操作,比如將消息存儲(chǔ)進(jìn)數(shù)據(jù)庫(kù)");} catch (Exception e) {System.out.println("處理mqtt消息異常:" + e);}}/*** mqtt連接后訂閱主題*/@Overridepublic void connectComplete(boolean b, String s) {try {if (null != topic && null != qos) {if (client.isConnected()) {client.subscribe(topic, qos);System.out.println("mqtt連接成功,客戶端ID:" + PropertiesUtil.MQTT_CLIENT_ID);System.out.println("--訂閱主題::" + Arrays.toString(topic));} else {System.out.println("mqtt連接失敗,客戶端ID:" + PropertiesUtil.MQTT_CLIENT_ID);}}} catch (Exception e) {System.out.println("mqtt訂閱主題異常:" + e);}} }以上三個(gè)類建立成功之后就可以進(jìn)行訂閱主題和發(fā)布消息的測(cè)試了。
發(fā)布指定主題的消息
新建一個(gè)測(cè)試用的Controller接口用來(lái)測(cè)試推送消息
import com.ruoyi.web.mqtt.MqttConsumer; import org.springframework.data.repository.query.Param; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController;@RestController @RequestMapping("/testmqtt") public class MqttTest {/*** 測(cè)試推送消息*/@ResponseBody@GetMapping(value = "/push")public Object push(@Param("topic") String topic,@Param("msg") String msg) {MqttConsumer.publish(topic, msg);return "測(cè)試發(fā)布主題成功";} }并且把這個(gè)接口url放開(kāi)權(quán)限驗(yàn)證可以直接在postman中進(jìn)行接口調(diào)用
這里直接在接口方法中調(diào)用了上面的推送消息的方法,第一個(gè)參數(shù)是指定的主題,第二個(gè)參數(shù)是消息的內(nèi)容
那么此時(shí)SpringBoot就充當(dāng)了發(fā)布者的角色。
在測(cè)試推送消息前需要使用MqttBox連接到同一個(gè)MQTT服務(wù)器并訂閱同一個(gè)主題,這里訂閱badao主題
我們調(diào)用一下此接口
?
然后此時(shí)查看下MqttBox中已經(jīng)收到消息
?
SpringBoot訂閱主題并接收消息
在上面的接口中再添加一個(gè)接口用來(lái)訂閱某個(gè)指定主題
??? /*** 測(cè)試接收消息*/@ResponseBody@GetMapping(value = "/subscribe")public Object subscribe(@Param("topic") String topic,@Param("qus") int qus) {MqttConsumer.subscribe(topic, qus);return "訂閱主題"+topic+"成功";}這里訂閱主題的方法第一個(gè)參數(shù)是主題,第二個(gè)是消息質(zhì)量
然后再調(diào)用下此接口
?
可以看到訂閱主題成功,然后我們使用MqttBox去發(fā)布一個(gè)同樣主題的消息,那么SpringBoot這邊的回調(diào)方法就可以接收到發(fā)送的消息并進(jìn)行后續(xù)的業(yè)務(wù)操作,
比如將消息存儲(chǔ)到數(shù)據(jù)庫(kù)等。
?
總結(jié)
以上是生活随笔為你收集整理的SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 信息系统项目管理师-知识、变更、战略管理
- 下一篇: 信息系统项目管理师-组织级、流程管理核心