常用 MQTT 客户端库简介
前言
MQTT 是一個(gè)輕量的發(fā)布訂閱模式消息傳輸協(xié)議,專門針對(duì)低帶寬和不穩(wěn)定網(wǎng)絡(luò)環(huán)境的物聯(lián)網(wǎng)應(yīng)用設(shè)計(jì)。MQTT 基于發(fā)布/訂閱范式,工作在 TCP/IP協(xié)議族上,MQTT 協(xié)議輕量、簡(jiǎn)單、開放并易于實(shí)現(xiàn),這些特點(diǎn)使它適用范圍非常廣泛。
MQTT 基于客戶端-服務(wù)器通信模式,MQTT 服務(wù)端稱為 MQTT Broker,目前行業(yè)內(nèi)可選的 MQTT Broker 較多,其優(yōu)劣與功能差別比較本文不再贅述。本文以開源社區(qū)中最流行的 MQTT 消息服務(wù)器 EMQ X 為例,使用 EMQ 提供的公共 Broker broker.emqx.io ,通過一個(gè)簡(jiǎn)單客戶端連接 Broker 并發(fā)布、處理消息的例子,整理總結(jié)不同編程語(yǔ)言、平臺(tái)下 MQTT 客戶端庫(kù)的使用方式與樣例。
入選客戶端庫(kù)如下:
Eclipse Paho C 與 Eclipse Paho Embedded C
Eclipse Paho Java Client
Eclipse Paho MQTT Go client
emqtt : EMQ 提供的 Erlang MQTT 客戶端庫(kù)
MQTT.js Web 端 & Node.js 平臺(tái) MQTT 客戶端
Eclipse Paho Python
MQTT 社區(qū)收錄了許多 MQTT 客戶端庫(kù),讀者可以在此處查看。
樣例應(yīng)用介紹
MQTT 客戶端整個(gè)生命周期的行為可以概括為:建立連接、訂閱主題、接收消息并處理、向指定主題發(fā)布消息、取消訂閱、斷開連接。
標(biāo)準(zhǔn)的客戶端庫(kù)在每個(gè)環(huán)節(jié)都暴露出相應(yīng)的方法,不同庫(kù)在相同環(huán)節(jié)所需方法參數(shù)含義大致相同,具體選用哪些參數(shù)、啟用哪些功能特性需要用戶深入了解 MQTT 協(xié)議特性并結(jié)合實(shí)際應(yīng)用場(chǎng)景而定。
本文以一個(gè)客戶端連接并發(fā)布、處理消息為例,給出每個(gè)環(huán)節(jié)大致需要使用的參數(shù):
建立連接:
指定 MQTT Broker 基本信息接入地址與端口
指定傳輸類型是 TCP 還是 MQTT over WebSocket
如果啟用 TLS 需要選擇協(xié)議版本并攜帶相應(yīng)的的證書
Broker 啟用了認(rèn)證鑒權(quán)則客戶端需要攜帶相應(yīng)的 MQTT Username Password 信息
配置客戶端參數(shù)如 keepalive 時(shí)長(zhǎng)、clean session 回話保留標(biāo)志位、MQTT 協(xié)議版本、遺囑消息(LWT)等
訂閱主題:連接建立成功后可以訂閱主題,需要指定主題信息
指定主題過濾器 Topic,訂閱的時(shí)候支持主題通配符 + 與 # 的使用
指定 QoS,根據(jù)客戶端庫(kù)和 Broker 的實(shí)現(xiàn)可選 Qos 0 1 2,注意部分 Broker 與云服務(wù)提供商不支持部分 QoS 級(jí)別,如 AWS IoT 、阿里云 IoT 套件、Azure IoT Hub 均不支持 QoS 2 級(jí)別消息
訂閱主題可能因?yàn)榫W(wǎng)絡(luò)問題、Broker 端 ACL 規(guī)則限制而失敗
接收消息并處理:
一般是在連接時(shí)指定處理函數(shù),依據(jù)客戶端庫(kù)與平臺(tái)的網(wǎng)絡(luò)編程模型不同此部分處理方式略有不同
發(fā)布消息:向指定主題發(fā)布消息
指定目標(biāo)主題,注意該主題不能包含通配符 + 或 #,若主題中包含通配符可能會(huì)導(dǎo)致消息發(fā)布失敗、客戶端斷開等情況(視 Broker 與客戶端庫(kù)實(shí)現(xiàn)方式)
指定消息 QoS 級(jí)別,同樣存在不同 Broker 與平臺(tái)支持的 QoS 級(jí)別不同,如 Azure IoT Hub 發(fā)布 QoS 2 的消息將斷開客戶端連接
指定消息體內(nèi)容,消息體內(nèi)容大小不能超出 Broker 設(shè)置最大消息大小
指定消息 Retain 保留消息標(biāo)志位
取消訂閱:
指定目標(biāo)主題即可
斷開連接:
主動(dòng)斷開連接,將發(fā)布遺囑消息(LWT)
Eclipse Paho C 與 Eclipse Paho Embedded C
Eclipse Paho C 與 Eclipse Paho Embedded C 均為 Eclipse Paho 項(xiàng)目下的客戶端庫(kù),均為使用 ANSI C 編寫的功能齊全的 MQTT 客戶端,Eclipse Paho Embedded C 可以在桌面操作系統(tǒng)上使用,但主要針對(duì) mbed,Arduino和 FreeRTOS 等嵌入式環(huán)境。
該客戶端有同步/異步兩種 API ,分別以 MQTTClient 和 MQTTAsync 開頭:
同步 API 旨在更簡(jiǎn)單,更有用,某些調(diào)用將阻塞直到操作完成為止,使用編程上更加容易;
異步 API 中只有一個(gè)調(diào)用塊 API-waitForCompletion ,通過回調(diào)進(jìn)行結(jié)果通知,更適用于非主線程的環(huán)境。
兩個(gè)庫(kù)的下載、使用詳細(xì)說明請(qǐng)移步至項(xiàng)目主頁(yè)查看,本文使用 Eclipse Paho C,直接提供樣例代碼如下:
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTClient.h"
#define ADDRESS "tcp://broker.emqx.io:1883"
#define CLIENTID "emqx_test"
#define TOPIC "testtopic/1"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
int main(int argc, char* argv[])
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
// Connection parameters
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d
", rc);
exit(-1);
}
// Publish message
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
printf("Waiting for up to %d seconds for publication of %s
"
"on topic %s for client with ClientID: %s
",
(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered
", token);
// Disconnect
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}
Eclipse Paho Java Client
Eclipse Paho Java Client 是用 Java 編寫的 MQTT 客戶端庫(kù),可用于 JVM 或其他 Java 兼容平臺(tái)(例如Android)。
Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 異步和同步 API。
通過 Maven 安裝:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
連接樣例代碼如下:
App.java
package io.emqx;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class App {
public static void main(String[] args) {
String subTopic = "testtopic/#";
String pubTopic = "testtopic/1";
String content = "Hello World";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = "emqx_test";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// Connection options
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("emqx_test");
connOpts.setPassword("emqx_test_password".toCharArray());
// Retain connection
connOpts.setCleanSession(true);
// Set callback
client.setCallback(new PushCallback());
// Setup connection
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
// Publish
client.subscribe(subTopic);
// Required parameters for publishing message
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(pubTopic, message);
System.out.println("Message published");
client.disconnect();
System.out.println("Disconnected");
client.close();
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
回調(diào)消息處理類 OnMessageCallback.java
package io.emqx;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class OnMessageCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// Reconnect after lost connection.
System.out.println("Connection lost, and re-connect here.");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// Message handler after receiving message
System.out.println("Topic:" + topic);
System.out.println("QoS:" + message.getQos());
System.out.println("Payload:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
Eclipse Paho MQTT Go client
Eclipse Paho MQTT Go Client 為 Eclipse Paho 項(xiàng)目下的 Go 語(yǔ)言版客戶端庫(kù),該庫(kù)能夠連接到 MQTT Broker 以發(fā)布消息,訂閱主題并接收已發(fā)布的消息,支持完全異步的操作模式。
客戶端依賴于 Google 的 proxy 和 websockets 軟件包,通過以下命令完成安裝:
go get github.com/eclipse/paho.mqtt.golang
連接樣例代碼如下:
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/eclipse/paho.mqtt.golang"
)
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s
", msg.Topic())
fmt.Printf("MSG: %s
", msg.Payload())
}
func main() {
mqtt.DEBUG = log.New(os.Stdout, "", 0)
mqtt.ERROR = log.New(os.Stdout, "", 0)
opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx_test_client")
opts.SetKeepAlive(60 * time.Second)
// Message callback handler
opts.SetDefaultPublishHandler(f)
opts.SetPingTimeout(1 * time.Second)
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
// Subscription
if token := c.Subscribe("testtopic/#", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
// Publish message
token := c.Publish("testtopic/1", 0, false, "Hello World")
token.Wait()
time.Sleep(6 * time.Second)
// Cancel subscription
if token := c.Unsubscribe("testtopic/#"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
// Disconnect
c.Disconnect(250)
time.Sleep(1 * time.Second)
}
emqtt : EMQ 提供的 Erlang MQTT 客戶端庫(kù)
emqtt 是開源 MQTT Broker EMQ X 官方 EMQ 提供的客戶端庫(kù),適用于 Erlang 語(yǔ)言。
Erlang 生態(tài)有多個(gè) MQTT Broker 實(shí)現(xiàn),如通過插件支持 MQTT 的 RabbitMQ ,VerenMQ、EMQ X 等。但是 MQTT 客戶端庫(kù)幾乎沒有選擇的余地,MQTT 社區(qū)收錄的 Erlang 客戶端庫(kù)中 emqtt 是最佳選擇。
emqtt 完全由 Erlang 實(shí)現(xiàn),完成支持 MQTT v3.1.1 和 MQTT v5.0 協(xié)議版本,支持 SSL 單雙向認(rèn)證與 WebSocket 連接。另一款 MQTT 基準(zhǔn)測(cè)試工具 emqtt_bench 就基于該客戶端庫(kù)構(gòu)建。
emqtt 使用方式如下:
ClientId = <<"test">>.
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]).
{ok, _Props} = emqtt:connect(ConnPid).
Topic = <<"guide/#">>.
QoS = 1.
{ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {Topic, QoS}).
{ok, _PktId} = emqtt:publish(ConnPid, <<"guide/1">>, <<"Hello World!">>, QoS).
%% If the qos of publish packet is 0, `publish` function would not return packetid.
ok = emqtt:publish(ConnPid, <<"guide/2">>, <<"Hello World!">>, 0).
%% Recursively get messages from mail box.
Y = fun (Proc) -> ((fun (F) -> F(F) end)((fun(ProcGen) -> Proc(fun() -> (ProcGen(ProcGen))() end) end))) end.
Rec = fun(Receive) -> fun()-> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Receive(); _Other -> Receive() after 5 -> ok end end end.
(Y(Rec))().
%% If you don't like y combinator, you can also try named function to recursively get messages in erlang shell.
Receive = fun Rec() -> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Rec(); _Other -> Rec() after 5 -> ok end end.
Receive().
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, <<"guide/#">>).
ok = emqtt:disconnect(ConnPid).
MQTT.js Web 端 & Node.js 平臺(tái) MQTT 客戶端
MQTT.js 是 JavaScript 編寫的,實(shí)現(xiàn)了 MQTT 協(xié)議客戶端功能的模塊,可以在 Node.js 或?yàn)g覽器環(huán)境中使用。在 Node.js 中使用時(shí),即可以 -g 全局安裝以命令行的形式使用,又可以將其集成到項(xiàng)目中調(diào)用。
由于 JavaScript 單線程特性,MQTT.js 是全異步 MQTT 客戶端,MQTT.js 支持 MQTT 與 MQTT over WebSocket,在不同運(yùn)行環(huán)境支持程度如下:
瀏覽器環(huán)境:MQTT over WebSocket(包括微信小程序、支付寶小程序等定制瀏覽器環(huán)境)
Node.js 環(huán)境:MQTT、MQTT over WebSocket
不同環(huán)境里除了少部分連接參數(shù)不同,其他 API 均是相同的。
使用 npm 安裝:
npm i mqtt
使用 CDN 安裝(瀏覽器):
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
// Initialize a global mqtt variable
console.log(mqtt)
</script>
樣例代碼:
// const mqtt = require('mqtt')
import mqtt from 'mqtt'
// Connection option
const options = {
clean: true, // Retain connection
connectTimeout: 4000, // Timeout
// Authtication
clientId: 'emqx_test',
username: 'emqx_test',
password: 'emqx_test',
}
// Connection string
// ws: unsecured WebSocket
// wss: secured WebSocket connection
// mqtt: unsecured TCP connection
// mqtts: secured TCP connection
const connectUrl = 'wss://broker.emqx.io:8084/mqtt'
const client = mqtt.connect(connectUrl, options)
client.on('reconnect', (error) => {
console.log('reconnect:', error)
})
client.on('reconnect', (error) => {
console.log('reconnect:', error)
})
client.on('message', (topic, message) => {
console.log('message:', topic, message.toString())
})
Eclipse Paho Python
Eclipse Paho Python 為 Eclipse Paho 項(xiàng)目下的 Python 語(yǔ)言版客戶端庫(kù),該庫(kù)能夠連接到 MQTT Broker 以發(fā)布消息,訂閱主題并接收已發(fā)布的消息。
使用 PyPi 包管理工具安裝:
pip install paho-mqtt
代碼樣例:
import paho.mqtt.client as mqtt
# Successful Connection Callback
def on_connect(client, userdata, flags, rc):
print('Connected with result code '+str(rc))
client.subscribe('testtopic/#')
# Message delivery callback
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
# Set callback handler
client.on_connect = on_connect
client.on_message = on_message
# Set up connection
client.connect('broker.emqx.io', 1883, 60)
# Publish message
client.publish('emqtt',payload='Hello World',qos=0)
client.loop_forever()
總結(jié)
關(guān)于 MQTT 協(xié)議、MQTT 客戶端庫(kù)使用流程、常用 MQTT 客戶端的簡(jiǎn)介就到這里,歡迎讀者通過 EMQ X 進(jìn)行MQTT 學(xué)習(xí)、項(xiàng)目開發(fā)使用。
總結(jié)
以上是生活随笔為你收集整理的常用 MQTT 客户端库简介的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Konva入门教程
- 下一篇: 茅台酒厂家有姓钱的吗?