rockemq 发送延迟消息_RockeMQ通过代码监控消费者状态
==背景==
物聯網場景,在設備端寫了一個小的API服務程序,這個程序包括:
1、向平臺上報設備數據
2、創建消費者客戶端,用來監聽平臺的下行命令
==問題==
平臺層需要知道設備的狀態:在線? or? 離線。我能想到的解決辦法
1、設備上報心跳數據,平臺通過心跳來判斷設備是否在線。
2、rocketmq應該有可以監控消費者狀態的命令,是否可以通過這個命令實現。
方案1肯定是沒有問題的,不過缺點就是需要在平臺上寫狀態管理的代碼,麻煩不說,可能還有延遲。
于是想嘗試方法2是否可行。
==踐行過程==
首先,我觀察了rocketmq-console(RocketMQ的Web界面,需要獨立部署),發現可以通過Web界面查看消費者狀態,結果如圖:
通過瀏覽器的控制臺日志,可以看到調用的是consumerConnection.query接口。
很好,我是否可以借鑒一下這個思路,去監聽消費者狀態呢。
按照這個思路走,去github上找了源碼:https://github.com/apache/rocketmq-externals
通過查看他們的源碼,才知道RocketMQ已經提供了供查看消費者鏈接信息的API。
==API示例==
需要引入新的pom文件rocketmq-tools、rocketmq-common,增加只有,所有的pom為
org.apache.rocketmq
rocketmq-store
4.5.0
org.apache.rocketmq
rocketmq-client
4.5.0
org.apache.rocketmq
rocketmq-acl
4.5.0
org.apache.rocketmq
rocketmq-tools
4.5.0
org.apache.rocketmq
rocketmq-common
4.5.0
Java代碼示例
packageadmin;importorg.apache.rocketmq.client.exception.MQBrokerException;importorg.apache.rocketmq.client.exception.MQClientException;importorg.apache.rocketmq.common.protocol.body.ConsumerConnection;importorg.apache.rocketmq.remoting.exception.RemotingException;importorg.apache.rocketmq.tools.admin.DefaultMQAdminExt;public classAdminExtSample {public static voidmain(String[] args)throwsMQClientException, InterruptedException, MQBrokerException, RemotingException {
DefaultMQAdminExt defaultMQAdminExt= newDefaultMQAdminExt();
defaultMQAdminExt.setNamesrvAddr("101.132.242.90:9876;47.116.50.192:9876");
defaultMQAdminExt.start();
ConsumerConnection cc= defaultMQAdminExt.examineConsumerConnectionInfo("device_cg_notice_down");
System.out.println(cc.toString());
defaultMQAdminExt.shutdown();
}
}
這樣就可以獲取上面web頁面中的所有信息了。
--END--
RockeMQ通過代碼監控消費者狀態
總結
以上是生活随笔為你收集整理的rockemq 发送延迟消息_RockeMQ通过代码监控消费者状态的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python导入pandas出错_构建d
- 下一篇: hive分区用2个字段有何限制_关于Hi