深入理解RocketMQ:Consumer消费消息原理
前言
RocketMQ版本:4.8
Consumer類型:DefaultMQPushConsumer
原理解析
consumer 啟動時做了哪些事情?
定時從NameSrv獲取最新的Topic+Queue+Broker路由信息,獲取頻率默認30秒,可以通過參數 pollNameServerInterval 進行設置
創建類型為LinkedBlockingQueue的隊列pullRequestQueue,用于存放PullRequest請求對象。
創建線程RebalanceService,主要用于執行負載均衡。 默認根據平均分配原則,為當前consumer分配對應數量的queue。隨后遍歷queue分別創建PullRequest,放入隊列pullRequestQueue。
PS:這項工作默認20秒執行一次,可以通過System.setProperty(“rocketmq.client.rebalance.waitInterval”) 修改
Queue分配完畢后,如果線程是第一次運行,會向Broker發送請求獲取當前Consumer對應每個Queue的消費進度。取值結果受Consumer消費策略 consumeFromWhere 影響,默認是獲取上次消費的進度。
創建線程PullMessageService,不斷地從pullRequestQueue拉取數據。取到PullRequest就通過Netty發送類型為RemotingCommand的消息,code=RequestCode.PULL_MESSAGE
總結
以上是生活随笔為你收集整理的深入理解RocketMQ:Consumer消费消息原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ源码解析:Produce
- 下一篇: Canal实时同步数据到RocketMQ