mica-mqtt 1.0.2 发布,完善 stater 和 example
一、簡介
mica mqtt
mica-mqtt基于t-io實現的簡單、低延遲、高性能的 mqtt 物聯網開源組件。使用詳見 mica-mqtt gitee 源碼mica-mqtt-example模塊。
mica-mqtt: 基于 t-io 實現的低延遲、高性能的 mqtt 組件。 記得右上角點個star 關注更新!
二、功能
支持 MQTT v3.1、v3.1.1 以及 v5.0 協議。
支持 MQTT client 客戶端。
支持 MQTT server 服務端。
支持 MQTT 遺囑消息。
支持 MQTT 保留消息。
支持自定義消息(mq)處理轉發實現集群。
MQTT 客戶端 阿里云 mqtt 連接 demo。
支持 GraalVM 編譯成本機可執行程序。
支持 Spring boot 項目快速接入(mica-mqtt-spring-boot-starter)。
mica-mqtt-spring-boot-starter 支持對接 Prometheus + Grafana。
三、待辦
添加 websocket 支持(已預研成功)。
優化處理 mqtt session,以及支持 v5.0
四、更新記錄
文檔添加集群處理步驟說明,添加遺囑消息、保留消息的使用場景。
✨ 去除演示中的 qos2 參數,性能損耗大避免誤用。
✨ 遺囑、保留消息內部消息轉發抽象。
✨ 添加 mica-mqtt-spring-boot-example 。感謝 wsq( @冷月宮主 )pr。
✨ mica-mqtt-spring-boot-starter 支持客戶端接入和服務端優化。感謝 wsq( @冷月宮主 )pr。
✨ mica-mqtt-spring-boot-starter 服務端支持指標收集。可對接 Prometheus + Grafana 監控。
✨ mqtt server 接受連接時,先判斷該 clientId 是否存在其它連接,有則解綁并關閉其他連接。
⬆️ 升級 mica-auto 到 2.1.3 修復 ide 多模塊增量編譯問題。
五、Spring boot 快速接入
5.1 添加依賴
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-spring-boot-starter</artifactId>
<version>1.0.2</version>
</dependency>
5.2 服務端 yml 配置
mqtt:
server:
enabled: true # 是否開啟,默認:true
ip: 127.0.0.1 # 服務端 ip 默認:127.0.0.1
port: 5883 # 端口,默認:1883
name: Mica-Mqtt-Server # 名稱,默認:Mica-Mqtt-Server
buffer-allocator: HEAP # 堆內存和堆外內存,默認:堆內存
heartbeat-timeout: 120000 # 心跳超時,單位毫秒,默認: 1000 * 120
read-buffer-size: 8092 # 接收數據的 buffer size,默認:8092
max-bytes-in-message: 8092 # 消息解析最大 bytes 長度,默認:8092
debug: true # 如果開啟 prometheus 指標收集建議關閉
5.3 服務端可實現接口(注冊成 Spring Bean 即可)
|
接口 |
是否必須 |
說明 |
|
IMqttServerAuthHandler |
是 |
用于客戶端認證 |
|
IMqttMessageListener |
是 |
消息監聽 |
|
IMqttConnectStatusListener |
是 |
連接狀態監聽 |
|
IMqttSessionManager |
否 |
session 管理 |
|
IMqttMessageStore |
集群是,單機否 |
遺囑和保留消息存儲 |
|
AbstractMqttMessageDispatcher |
集群是,單機否 |
消息轉發,(遺囑、保留消息轉發) |
|
IpStatListener |
否 |
t-io ip 狀態監聽 |
5.4 服務端自定義配置(可選)
@Configuration(proxyBeanMethods = false)
public class MqttServerCustomizerConfiguration {
@Bean
public MqttServerCustomizer activeRecordPluginCustomizer() {
return new MqttServerCustomizer() {
@Override
public void customize(MqttServerCreator creator) {
// 此處可自定義配置 creator,會覆蓋 yml 中的配置
System.out.println("----------------MqttServerCustomizer-----------------");
}
};
}
}
5.5 MqttServerTemplate 使用示例
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
/**
* @author wsq
*/
@Service
public class ServerService {
@Autowired
private MqttServerTemplate server;
public boolean publish(String body) {
server.publishAll("/test/123", ByteBuffer.wrap(body.getBytes()));
return true;
}
}
5.6 基于 mq 消息廣播集群處理
實現IMqttConnectStatusListener處理設備狀態存儲。
實現IMqttMessageListener將消息轉發到 mq,業務按需處理 mq 消息。
實現IMqttMessageStore存儲遺囑和保留消息。
實現AbstractMqttMessageDispatcher將消息發往 mq,mq 再廣播回 mqtt 集群,mqtt 將消息發送到設備。
業務消息發送到 mq,mq 廣播到 mqtt 集群,mqtt 將消息發送到設備。
5.7 Prometheus + Grafana 監控對接
得益于t-io良好的設計,監控指標直接對接的t-iostat,目前支持下列指標,后期會不斷完善。
|
支持得指標 |
說明 |
|
mqtt_connections_accepted |
共接受過連接數 |
|
mqtt_connections_closed |
關閉過的連接數 |
|
mqtt_connections_size |
當前連接數 |
|
mqtt_messages_handled_packets |
已處理消息數 |
|
mqtt_messages_handled_bytes |
已處理消息字節數 |
|
mqtt_messages_received_packets |
已接收消息數 |
|
mqtt_messages_received_bytes |
已處理消息字節數 |
|
mqtt_messages_send_packets |
已發送消息數 |
|
mqtt_messages_send_bytes |
已發送消息字節數 |
關于
mica-mqtt-spring-boot-starter客戶端等更多使用方式請查看 gitee
mica-mqtt-spring-boot-starter 模塊 readme 文檔。
六、普通 java 項目接入
6.1 maven 依賴
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-core</artifactId>
<version>1.0.2</version>
</dependency>
6.2 mica-mqtt 客戶端
// 初始化 mqtt 客戶端
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883) // 默認:1883
.username("admin")
.password("123456")
.version(MqttVersion.MQTT_5) // 默認:3_1_1
.clientId("xxxxxx") // 默認:MICA-MQTT- 前綴和 36進制的納秒數
.connect(); // 連接
// 消息訂閱,同類方法 subxxx
client.subQos0("/test/#", (topic, payload) -> {
logger.info(topic + ' ' + ByteBufferUtil.toString(payload));
});
// 取消訂閱
client.unSubscribe("/test/#");
// 發送消息
client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
// 斷開連接
client.disconnect();
// 重連
client.reconnect();
// 停止
client.stop();
6.3 mica-mqtt 服務端
// 注意:為了能接受更多鏈接(降低內存),請添加 jvm 參數 -Xss129k
MqttServer mqttServer = MqttServer.create()
// 默認:127.0.0.1
.ip("127.0.0.1")
// 默認:1883
.port(1883)
// 默認為: 8092(mqtt 默認最大消息大小),為了降低內存可以減小小此參數,如果消息過大 t-io 會嘗試解析多次(建議根據實際業務情況而定)
.readBufferSize(512)
// 自定義認證
.authHandler((clientId, userName, password) -> true)
// 消息監聽
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
// ssl 配置
.useSsl("", "", "")
// 自定義客戶端上下線監聽
.connectStatusListener(new IMqttConnectStatusListener() {
@Override
public void online(String clientId) {
}
@Override
public void offline(String clientId) {
}
})
// 自定義消息轉發,可用 mq 廣播實現集群化處理
.messageDispatcher(new IMqttMessageDispatcher() {
@Override
public void config(MqttServer mqttServer) {
}
@Override
public boolean send(Message message) {
return false;
}
@Override
public boolean send(String clientId, Message message) {
return false;
}
})
.debug() // 開啟 t-io debug 信息日志
.start();
// 發送給某個客戶端
mqttServer.publish("clientId","/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// 發送給所有在線監聽這個 topic 的客戶端
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// 停止服務
mqttServer.stop();
七、效果演示
mica-mqtt-example 效果演示
JUST DO IT!
總結
以上是生活随笔為你收集整理的mica-mqtt 1.0.2 发布,完善 stater 和 example的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 纯CSS3实现Material Desi
- 下一篇: form-validation-engi