rocketmq 消息指定_详解RocketMQ不同类型的消费者
原標題:詳解RocketMQ不同類型的消費者
云棲君導讀:本文節選自云棲社區系列叢書《RocketMQ原理與實戰解析》,作者:阿里巴巴數據專家楊開元。本節將重點講解RocketMQ不同類型的消費者。
根據使用者對讀取操作的控制情況,分為兩種類型。一個是DefaultMQPushConsumer,由系統控制讀取操作,收到消息后自動調用傳入的處理方法來處理;另一個是DefaultMQPullConsumer,讀取操作中的大部分功能由使用者自主控制。
1.1.1 DefaultMQPushConsumer的使用
使用DefaultMQPushConsumer主要是設置好各種參數和傳入處理消息的函數。系統收到消息后自動調用處理函數來處理消息,自動保存Offset,而且加入新的DefaultMQPushConsumer后會自動做負載均衡。下面結合org.apache.rocketmq.example.quickstart包中的源碼來介紹。
代碼清單1-1 DefaultMQPushConsumer示例
DefaultMQPushConsumer需要設置三個參數:一是這個Consumer的GroupName,二是NameServer的地址和端口號,三是Topic的名稱,下面詳細介紹。
Consumer的GroupName用于把多個Consumer組織到一起,提高并發處理能力,GroupName需要和消息模式(MessageModel)配合使用。
RocketMQ支持兩種消息模式:Clustering 和 Broadcasting。
在Clustering 模式下,同一個ConsumerGroup(GroupName相同)里的每個Consumer只消費所訂閱消息的一部分內容,同一個ConsumerGroup里所有的Consumer消費的內容合起來才是所訂閱Topic內容的整體,從而達到負載均衡的目的。
在Broadcasting模式下,同一個ConsumerGroup里的每個Consumer都能消費到所訂閱Topic的全部消息,也就是一個消息會被多次分發,被多個Consumer消費。
NameServer的地址和端口號,可以填寫多個,用分號隔開,達到消除單點故障的目的,比如 “ip1:port;ip2:port;ip3:port”。
Topic名稱用來標識消息類型,需要提前創建。如果不需要消費某個Topic下的所有消息,可以通過指定消息的Tag進行消息過濾,比如:Consumer.subscribe("TopicTest", "tag1 || tag2 || tag3"),表示這個Consumer要消費“TopicTest”下帶有tag1或tag2或tag3的消息(Tag是在發送消息時設置的標簽)。在填寫Tag參數的位置,用null或者“*”表示要消費這個Topic的所有消息。
1.1.2 DefaultMQPushConsumer的處理流程
本節通過分析源碼來說明DefaultMQPushConsumer的處理流程。
DefaultMQPushConsumer主要功能實現在DefaultMQPushConsumerImpl類中,消息的處理邏輯是在pullMessage這個函數里的PullCallBack中。在PullCallBack函數里有個switch語句,根據從Broker返回的消息類型做相應的處理,具體處理邏輯可以查看源碼。
代碼清單1-2 DefaultMQPushConsuer的處理邏輯
DefaultMQPushConsuer的源碼中有很多PullRequest語句,比如DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest),為什么“PushConsumer”中使用“PullRequest”呢?這是通過“長輪詢”方式達到Push效果的方法,長輪詢方式既有Pull的優點,又兼具Push方式的實時性。
Push方式是Server端接收到消息后,主動把消息推送給Client端,實時性高。對于一個提供隊列服務的Server來說,用Push方式主動推送有很多弊端;首先是加大Server端的工作量,進而影響Server的性能,其次Client的處理能力各不相同,Client的狀態不受Server控制,如果Client不能及時處理Server推送過來的消息,會造成各種潛在問題。
Pull方式是Client端循環地從Server端拉取消息,主動權在Client手里,自己拉取到一定量消息后,處理妥當了再接著取。Pull方式的問題是循環拉取消息的間隔不好設定,間隔太短就處在一個“忙等”的狀態,浪費資源;每個Pull的時間間隔太長,Server端有消息到來有可能沒有被及時處理。
“長輪詢”方式是通過Client端和Server端的配合,既擁有Pull的優點,又能達到保證實時性的目的。我們結合源碼來分析:
代碼清單1- 3 發送Pull消息代碼片段
源碼中有這一行設置語句
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis),設置Broker最長阻塞時間,默認設置是15秒,注意是Broker在沒有新消息的時候才阻塞,有消息會立刻返回。
從Broker的源碼中可以看出,服務端接到新消息請求后,如果隊列里沒有新消息,并不急于返回,通過一個循環不斷查看狀態,每次 waitForRunning一段時候(默認是5秒),然后后再Check。默認情況下當Broker一直沒有新消息,第三次Check的時候,等待時間超過Request里面的 BrokerSuspendMaxTimeMillis,就返回空結果。在等待的過程中,Broker收到了新的消息后會直接調用notifyMessageArriving函數返回請求結果。“長輪詢”的核心是,Broker端HOLD住客戶端過來的請求一小段時間,在這個時間內有新消息到達,就利用現有的連接立刻返回消息給Consumer。“長輪詢”的主動權還是掌握在Consumer手中,Broker即使有大量消息積壓,也不會主動推送給Consumer。
長輪詢方式的局限性,是在HOLD住Consumer請求的時候需要占用資源,它適合用在消息隊列這種客戶端連接數可控的場景中。
1.1.3 DefaultMQPullConsumer
使用DefaultMQPullConsumer像使用DefaultMQPushConsumer一樣需要設置各種參數,寫處理消息的函數,同時還需要做額外的事情。接下來結合org.apache.rocketmq.example.simple包中的例子源碼來介紹。
示例代碼的處理邏輯是逐個讀取某Topic下所有Message Queue的內容,讀完一遍后退出,主要處理額外的三件事情:
(1) 獲取Message Queue并遍歷
一個Topic包括多個Message Queue,如果這個Consumer需要獲取Topic下所有的消息,就要遍歷多有的Message Queue。如果有特殊情況,也可以選擇某些特定的Message Queue來讀取消息。
(2) 維護Offsetstore
從一個Message Queue里拉取消息的時候,要傳入Offset參數(long類型的值),隨著不斷讀取消息,Offset會不斷增長。這個時候由用戶負責把Offset存儲下來,根據具體情況可以存到內存里、寫到磁盤或者數據庫里等。
(3) 根據不同的消息狀態做不同的處理
拉取消息的請求發出后,會返回:FOUND,NO_MATCHED_MSG,NO_NEW_MSG,OFFSET_ILLEGAL四種狀態,要根據每個狀態做不同的處理。比較重要的兩個狀態是FOUNT和NO_NEW_MSG,分別表示獲取到消息和沒有新的消息
實際情況中可以把while(true)放到外層,達到無限循環的目的。因為PullConsumer需要用戶自己處理遍歷Message Queue、保存Offset,所以PullConsumer有更多的自主性和靈活性。
----------------
責任編輯:
總結
以上是生活随笔為你收集整理的rocketmq 消息指定_详解RocketMQ不同类型的消费者的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis 用中文做key_推荐一款Re
- 下一篇: 图卷积网络进行骨骼识别代码_【骨骼行为识