javascript
SpringBoot +RabbitMQ 做智能家居,居然如此简单!
前一段有幸參與到一個智能家居項目的開發,由于之前都沒有過這方面的開發經驗,所以對智能硬件的開發模式和技術棧都頗為好奇。
智能可燃氣體報警器產品是一款可燃氣體報警器,如果家中燃氣泄露濃度到達一定閾值,報警器檢測到并上傳氣體濃度值給后臺,后臺以電話、短信、微信等方式,提醒用戶家中可能有氣體泄漏。
用戶還可能向報警器發一些關閉報警、調整音量的指令等。整體功能還是比較簡單的,大致的邏輯如下圖所示:
但當我真正的參與其中開發時,其實有一點小小的失望,因為在整個研發過程中,并沒用到什么新的技術,還是常規的幾種中間件,只不過換個用法而已。
技術選型用rabbitmq?來做核心的組件,主要考慮到運維成本低,組內成員使用的熟練度比較高。
下面和小伙伴分享一下如何用?springboot?+?rabbitmq?搭建物聯網(IOT)平臺,其實智能硬件也沒想象的那么高不可攀!
很多小伙伴可能有點懵?rabbitmq?不是消息隊列嗎?怎么又能做智能硬件了?
其實rabbitmq有兩種協議,我們平時接觸的消息隊列是用的AMQP協議,而用在智能硬件中的是MQTT協議。
一、什么是 MQTT協議?
MQTT?全稱(Message Queue Telemetry Transport):一種基于發布/訂閱(publish/subscribe)模式的輕量級通訊協議,通過訂閱相應的主題來獲取消息,是物聯網(Internet of Thing)中的一個標準傳輸協議。
該協議將消息的發布者(publisher)與訂閱者(subscriber)進行分離,因此可以在不可靠的網絡環境中,為遠程連接的設備提供可靠的消息服務,使用方式與傳統的MQ有點類似。
TCP協議位于傳輸層,MQTT?協議位于應用層,MQTT?協議構建于TCP/IP協議上,也就是說只要支持TCP/IP協議棧的地方,都可以使用MQTT協議。
二、為什么要用 MQTT協議?
MQTT協議為什么在物聯網(IOT)中如此受偏愛?而不是其它協議,比如我們更為熟悉的?HTTP協議呢?
-
首先HTTP協議它是一種同步協議,客戶端請求后需要等待服務器的響應。而在物聯網(IOT)環境中,設備會很受制于環境的影響,比如帶寬低、網絡延遲高、網絡通信不穩定等,顯然異步消息協議更為適合IOT應用程序。
-
HTTP是單向的,如果要獲取消息客戶端必須發起連接,而在物聯網(IOT)應用程序中,設備或傳感器往往都是客戶端,這意味著它們無法被動地接收來自網絡的命令。
-
通常需要將一條命令或者消息,發送到網絡上的所有設備上。HTTP要實現這樣的功能不但很困難,而且成本極高。
三、MQTT協議介紹
前邊說過MQTT是一種輕量級的協議,它只專注于發消息, 所以此協議的結構也非常簡單。
MQTT數據包
在MQTT協議中,一個MQTT數據包由:固定頭(Fixed header)、?可變頭(Variable header)、?消息體(payload)三部分構成。
-
固定頭(Fixed header),所有數據包中都有固定頭,包含數據包類型及數據包的分組標識。
-
可變頭(Variable header),部分數據包類型中有可變頭。
-
內容消息體(Payload),存在于部分數據包類,是客戶端收到的具體消息內容。
1、固定頭
固定頭部,使用兩個字節,共16位:
(4-7)位表示消息類型,使用4位二進制表示,可代表如下的16種消息類型,不過 0 和 15位置屬于保留待用,所以共14種消息事件類型。
DUP Flag(重試標識)
DUP Flag:保證消息可靠傳輸,消息是否已送達的標識。默認為0,只占用一個字節,表示第一次發送,當值為1時,表示當前消息先前已經被傳送過。
QoS Level(消息質量等級)
QoS Level:消息的質量等級,后邊會詳細介紹
RETAIN(持久化)
-
值為1:表示發送的消息需要一直持久保存,而且不受服務器重啟影響,不但要發送給當前的訂閱者,且以后新加入的客戶端訂閱了此Topic,訂閱者也會馬上得到推送。注意:新加入的訂閱者,只會取出最新的一個RETAIN flag = 1的消息推送。
-
值為0:僅為當前訂閱者推送此消息。
Remaining Length(剩余長度)
在當前消息中剩余的byte(字節)數,包含可變頭部和消息體payload。
2、可變頭
固定頭部僅定義了消息類型和一些標志位,一些消息的元數據需要放入可變頭部中。可變頭部內容字節長度 + 消息體payload = 剩余長度。
可變頭部居于固定頭部和payload中間,包含了協議名稱,版本號,連接標志,用戶授權,心跳時間等內容。
可變頭存在于這些類型的消息:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。
3、消息體payload
消息體payload只存在于CONNECT、PUBLISH、SUBSCRIBE、SUBACK、UNSUBSCRIBE這幾種類型的消息:
-
CONNECT:包含客戶端的ClientId、訂閱的Topic、Message以及用戶名和密碼。
-
PUBLISH:向對應主題發送消息。
-
SUBSCRIBE:要訂閱的主題以及QoS。
-
SUBACK:服務器對于SUBSCRIBE所申請的主題及QoS進行確認和回復。
-
UNSUBSCRIBE:取消要訂閱的主題。
消息質量(QoS )
消息質量(Quality of Service),即消息的發送質量,發布者(publisher)和訂閱者(subscriber)都可以指定qos等級,有QoS 0、QoS 1、QoS 2三個等級。
下邊分別說明一下這三個等級的區別。
1、Qos 0
Qos 0:At most once(至多一次)只發送一次消息,不保證消息是否成功送達,沒有確認機制,消息可能會丟失或重復。
2、Qos 1
Qos 1:At least once(至少一次),相對于QoS 0而言Qos 1增加了ack確認機制,發送者(publisher)推送消息到MQTT代理(broker)時,兩者自身都會先持久化消息,只有當publisher?或者?Broker分別收到?PUBACK確認時,才會刪除自身持久化的消息,否則就會重發。
但有個問題,盡管我們可以通過確認來保證一定收到客戶端 或 服務器的message,可我們卻不能保證僅收到一次message,也就是當客戶端publisher沒收到Broker的puback或者?Broker沒有收到subscriber的puback,那么就會一直重發。
publisher -> broker 大致流程:
publisher store msg -> publish ->broker (傳遞message)
broker -> puback -> publisher delete msg (確認傳遞成功)
3、Qos 2
Qos 2:Exactly once(只有一次),相對于QoS 1,QoS 2升級實現了僅接受一次message,publisher?和?broker?同樣對消息進行持久化,其中?publisher?緩存了message和 對應的msgID,而?broker?緩存了?msgID,可以保證消息不重復,由于又增加了一個confirm?機制,整個流程變得復雜很多。
publisher -> broker 大致流程:
publisher store msg -> publish ->broker -> broker store
msgID(傳遞message) broker -> puberc (確認傳遞成功)
publisher -> pubrel ->broker delete msgID (告訴broker刪除msgID)
broker -> pubcomp -> publisher delete msg (告訴publisher刪除msg)
LWT(最后遺囑)
LWT?全稱為?Last Will and Testament,其實遺囑是一個由客戶端預先定義好的主題和對應消息,附加在CONNECT的數據包中,包括遺愿主題、遺愿 QoS、遺愿消息等。
當MQTT代理?Broker?檢測到有客戶端client非正常斷開連接時,再由服務器主動發布此消息,然后相關的訂閱者會收到消息。
舉個栗子:聊天室中所有人都訂閱一個叫talk的主題 ,但小富由于網絡抖動突然斷開了鏈接,這時聊天室中所有訂閱主題?talk的客戶端都會收到一個 “小富離開聊天室” 的遺愿消息。
遺囑的相關參數:
-
Will Flag:是否使用 LWT,1 開啟
-
Will Topic:遺愿主題名,不可使用通配符
-
Will Qos:發布遺愿消息時使用的 QoS
-
Will Retain:遺愿消息的 Retain 標識
-
Will Message:遺愿消息內容
那客戶端Client?有哪些場景是非正常斷開連接呢?
-
Broker?檢測到底層的 I/O 異常;
-
客戶端 未能在心跳?Keep Alive?的間隔內和?Broker?進行消息交互;
-
客戶端 在關閉底層?TCP?連接前沒有發送?DISCONNECT?數據包;
-
客戶端 發送錯誤格式的數據包到?Broker,導致關閉和客戶端的連接等。
注意:當客戶端通過發布?DISCONNECT?數據包斷開連接時,屬于正常斷開連接,并不會觸發?LWT?的機制,與此同時Broker?還會丟棄掉當前客戶端在連接時指定的相關?LWT?參數。
四、MQTT協議應用場景
MQTT協議廣泛應用于物聯網、移動互聯網、智能硬件、車聯網、電力能源等領域。使用的場景也是非常非常多,下邊列舉一些:
-
物聯網M2M通信,物聯網大數據采集
-
Android消息推送,WEB消息推送
-
移動即時消息,例如Facebook Messenger
-
智能硬件、智能家具、智能電器
-
車聯網通信,電動車站樁采集
-
智慧城市、遠程醫療、遠程教育
-
電力、石油與能源等行業市場
五、代碼實現
具體?rabbitmq?的環境搭建就不贅述了,網上教程比較多,有條件的用服務器,沒條件的像我搞個Windows版的也很快樂嘛。
1、啟用 rabbitmq的mqtt協議
我們先開啟?rabbitmq?的?mqtt協議,因為默認安裝下是關閉的,命令如下:
rabbitmq-plugins?enable?rabbitmq_mqtt2、mqtt 客戶端依賴包
上一步中安裝rabbitmq環境并開啟?mqtt協議后,實際上mqtt?消息代理服務就搭建好了,接下來要做的就是實現客戶端消息的推送和訂閱。
這里使用spring-integration-mqtt、org.eclipse.paho.client.mqttv3兩個工具包實現。
<!--mqtt依賴包--> <dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version> </dependency>3、消息發送者
消息的發送比較簡單,主要是應用到@ServiceActivator注解,需要注意messageHandler.setAsync屬性,如果設置成false,關閉異步模式發送消息時可能會阻塞。
@Configuration public?class?IotMqttProducerConfig?{@Autowiredprivate?MqttConfig?mqttConfig;@Beanpublic?MqttPahoClientFactory?mqttClientFactory()?{DefaultMqttPahoClientFactory?factory?=?new?DefaultMqttPahoClientFactory();factory.setServerURIs(mqttConfig.getServers());return?factory;}@Beanpublic?MessageChannel?mqttOutboundChannel()?{return?new?DirectChannel();}@Bean@ServiceActivator(inputChannel?=?"iotMqttInputChannel")public?MessageHandler?mqttOutbound()?{MqttPahoMessageHandler?messageHandler?=?new?MqttPahoMessageHandler(mqttConfig.getServerClientId(),?mqttClientFactory());messageHandler.setAsync(false);messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());return?messageHandler;} }MQTT?對外提供發送消息的API時,需要使用@MessagingGateway?注解,去提供一個消息網關代理,參數defaultRequestChannel?指定發送消息綁定的channel。
可以實現三種API接口,payload?為發送的消息,topic?發送消息的主題,qos?消息質量。
@MessagingGateway(defaultRequestChannel?=?"iotMqttInputChannel") public?interface?IotMqttGateway?{//?向默認的?topic?發送消息void?sendMessage2Mqtt(String?payload);//?向指定的?topic?發送消息void?sendMessage2Mqtt(String?payload,@Header(MqttHeaders.TOPIC)?String?topic);//?向指定的?topic?發送消息,并指定服務質量參數void?sendMessage2Mqtt(@Header(MqttHeaders.TOPIC)?String?topic,?@Header(MqttHeaders.QOS)?int?qos,?String?payload); }4、消息訂閱
消息訂閱和我們平時用的MQ消息監聽實現思路基本相似,@ServiceActivator注解表明當前方法用于處理MQTT消息,inputChannel?參數指定了用于接收消息的channel。
/***?@Author:?xiaofu*?@Description:?消息訂閱配置*?@date?2020/6/8?18:24*/ @Configuration public?class?IotMqttSubscriberConfig?{@Autowiredprivate?MqttConfig?mqttConfig;@Beanpublic?MqttPahoClientFactory?mqttClientFactory()?{DefaultMqttPahoClientFactory?factory?=?new?DefaultMqttPahoClientFactory();factory.setServerURIs(mqttConfig.getServers());return?factory;}@Beanpublic?MessageChannel?iotMqttInputChannel()?{return?new?DirectChannel();}@Beanpublic?MessageProducer?inbound()?{MqttPahoMessageDrivenChannelAdapter?adapter?=?new?MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(),?mqttClientFactory(),?mqttConfig.getDefaultTopic());adapter.setCompletionTimeout(5000);adapter.setConverter(new?DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(iotMqttInputChannel());return?adapter;}/***?@author?xiaofu*?@description?消息訂閱*?@date?2020/6/8?18:20*/@Bean@ServiceActivator(inputChannel?=?"iotMqttInputChannel")public?MessageHandler?handlerTest()?{return?message?->?{try?{String?string?=?message.getPayload().toString();System.out.println("接收到消息:"?+?string);}?catch?(MessagingException?ex)?{//logger.info(ex.getMessage());}};} }六、測試消息
額~ 由于本渣渣對硬件一竅不通,為了模擬硬件的發送消息,只能借助一下工具,其實硬件端實現MQTT協議,跟我們前邊的基本沒什么區別,只不過換種語言嵌入到硬件中而已。
這里選的測試工具為mqttbox,下載地址:http://workswithweb.com/mqttbox.html
1、測試消息發送
我們用先用mqttbox模擬向主題mqtt_test_topic發送消息,看后臺是否能成功接收到。
看到后臺成功拿到了向主題mqtt_test_topic發送的消息。
2、測試消息訂閱
用mqttbox模擬訂閱主題mqtt_test_topic,在后臺向主題mqtt_test_topic發送一條消息,這里我簡單的寫了個controller調用API發送消息。
http://127.0.0.1:8080/fun/testMqtt?topic=mqtt_test_topic&message=我是后臺向主題 mqtt_test_topic 發送的消息我們看mqttbox的訂閱消息,已經成功的接收到了后臺的消息,到此我們的MQTT通信環境就算搭建成功了。如果把mqttbox工具換成具體硬件設備,整個流程就是我們常說的智能家居了,其實真的沒那么難。
七、應用注意事項
在我們實際的生產環境中遇到過的問題,這里分享一下讓大家少踩坑。
clientId 要唯一
在客戶端connect連接的時,會有一個clientId?參數,需要每個客戶端都保持唯一的。但我們在開發測試階段clientId直接在代碼中寫死了,而且服務都是單實例部署,并沒有暴露出什么問題。
MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(),?mqttClientFactory(),?mqttConfig.getDefaultTopic());然而在生產環境內側的時候,由于服務是多實例集群部署,結果出現了下邊的奇怪問題。同一時間內只能有一個客戶端能拿到消息,其他客戶端不但不能消費消息,而且還在不斷的掉線重連:Lost connection: 已斷開連接; retrying...。
這就是由于clientId相同導致客戶端間相互競爭消費,最后將clientId獲取方式換成從發號器中拿,問題就好了,所以這個地方是需要特別注意的。
平時程序在開發環境沒問題,可偏偏到了生產環境就一大堆問題,很多都是因為服務部署方式不同導致的。所以多學習分布式還是很有必要的。
八、其他中間件
MQTT它只是一種協議,支持MQTT協議的消息中間件產品非常多,下邊的也只是其中的一部分
-
Mosquitto
-
Eclipse Paho
-
RabbitMQ
-
Apache ActiveMQ
-
HiveMQ
-
JoramMQ
-
ThingMQ
-
VerneMQ
-
Apache Apollo
-
emqttd Xively
-
IBM Websphere .....
總結
以上是生活随笔為你收集整理的SpringBoot +RabbitMQ 做智能家居,居然如此简单!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 牛逼!Redis 的字符串是这样实现的…
- 下一篇: JVM 运行时数据区详解,写得非常好!