【RocketMQ】MQ消息发送总结
RocketMQ是通過DefaultMQProducer進行消息發送的,它實現了MQProducer接口,MQProducer接口中定義了消息發送的方法,方法主要分為三大類:
- send同步進行消息發送,向Broker發送消息之后等待響應結果;
- send異步進行消息發送,向Broker發送消息之后立刻返回,當消息發送成功/失敗之后觸發回調函數;
- sendOneway單向發送,也是異步消息發送,向Broker發送消息之后立刻返回,但是沒有回調函數;
public interface MQProducer extends MQAdmin {
    // 同步發送消息
    SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;
    // 異步發送消息,SendCallback為回調函數
    void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
        RemotingException, InterruptedException;
    // 異步發送消息,沒有回調函數
    void sendOneway(final Message msg) throws MQClientException, RemotingException,
        InterruptedException;
}
接下來以同步發送為例,看下生產者發送消息的過程。
生產者發送消息
Topic主題
一般會為同一業務類型設定一個Topic,將不同的業務類型的數據放到不同的Topic中管理,不過主題只是一個邏輯概念,并不是實際的消息容器。
主題內部由多個消息隊列(MessageQueue)組成,消息隊列是消息存儲的實際容器,消息隊列與Kafka的分區(Partition)類似。
獲取主題的發布信息(TopicPublishInfo)
Broker在啟動時會向NameServer發送注冊信息并定時向NameServer發送心跳包,Broker向NameServer注冊信息中包括了該Broker的IP、Name以及Topic的配置信息,
生產者和消費者默認每30s從NameServer更新一次路由信息,可以知道消息所在的Topic分布在哪些Broker上。
生產者有一個主題路由信息表topicPublishInfoTable,緩存從NameServer拉取到的路由信息,它是ConcurrentMap類型的,KEY為topic主題名稱, value為該Topic的發布信息,是TopicPublishInfo類型。
當生產者向Broker發送消息之前,首先需要知道消息所屬的Topic的路由信息,有了Topic的路由信息才能知道Topic分布在哪個Broker上,生產者往哪個Broker上發,而topicPublishInfoTable中記錄了每個主題的相關信息,可以從topicPublishInfoTable中查找Topic的路由信息。
如果從topicPublishInfoTable中查找成功,就可以繼續后續的步驟,如果查找失敗,此時生產者需要從NameServer中查詢該Topic的路由信息:
- 如果查詢成功,會判斷路由信息是否發生了變化,如果發生變化,生產者會更新本地緩存的該Topic的路由信息;
- 如果依舊未查詢到,它會有一個默認的主題,會使用這個默認的主題進行消息發送;
選取消息隊列
前面知道,一個Topic一般由多個消息隊列組成,所以主題的發布信息數據TopicPublishInfo獲取到之后,需要從中選取一個消息隊列,然后獲取此消息隊列所屬的Broker,與Broker通信將消息投遞到對應的消息隊列中。
未啟用故障延遲機制
在每個Topic內部,設置了一個計數器sendWhichQueue用于輪詢從消息隊列集合中選取隊列。
在未啟用故障延遲機制的時候,如果上一次選擇的BrokerName為空,也就是首次發送消息時,處理邏輯如下:
- 對計數器增一;
- 根據計數器的值對消息隊列列表的長度取余得到下標值pos,從隊列列表中獲取pos位置的元素,以此達到輪詢從消息隊列列表中選擇消息隊列的目的;
- 返回第2步中獲取到的消息隊列;
- 在調用獲取消息隊列的地方,會記錄本次選擇消息隊列所在的BrokerName;
如果上一次選擇的BrokerName不為空,表示上次發送消息時就發送給了此Broker,此時的處理邏輯與上面的不同點在第3步,通過從隊列列表中獲取pos位置的元素之后,并沒有直接把選取到的消息隊列返回,而是再增加一個判斷,判斷當前選取到的Broker是否與上次選擇的Broker名稱一致,如果一致會繼續循環,輪詢選擇下一個消息隊列,如果不一致則直接返回:
- 對計數器增一;
- 根據計數器的值對消息隊列列表的長度取余得到下標值pos,從隊列列表中獲取pos位置的元素;
- 對第2步獲取到的消息隊列進行判斷:
- 如果本次選取到的隊列與上次發送消息的Broker一致,回到第1步繼續選擇下一個隊列,如果一直未選出滿足要求的消息隊列,則不作判斷,使用上面的方式輪詢選擇一個隊列返回;
- 如果本次選取到的隊列與上次發送消息的Broker不一致,返回當前的隊列;
 
- 在調用獲取消息隊列的地方,會記錄本次選擇消息隊列所在的BrokerName;
總結
在未啟用故障延遲機制時,從該消息所屬的Topic下的所有消息隊列集合中,輪詢選擇消息隊列進行發送,如果上一次選擇了某個Broker發送消息,本次將不會再選擇這個Broker,當然如果最后仍未找到滿足要求的消息隊列,將會跳過這個判斷,直接從隊列中輪詢獲取消息隊列返回。
開啟故障延遲機制
在生產者進行發送消息的時候,無論消息是否發送成功與否都會記錄向每個Broker的發送消息的條目信息FaultItem,有一個失敗條目表faultItemTable,faultItemTable記錄了每個Broker對應的失敗條目FaultItem,FaultItem中主要有以下信息:
- name:Broker的名稱;
- currentLatency:延遲時間,可以理解為是本次向該Broker發送消息耗時時間:發送消息結束時間 - 消息發送開始時間;
- startTimestamp:規避故障時間,一般為當前時間 + 不可用的持續時間,不可用的持續時間有兩種情況,分別為30000ms或者使用currentLatency延遲時間(也就是上次發送消息所用的時間),一般在出現異常的時候,會將不可用的持續時間設置為30000ms,消息正常發送的時候使用currentLatency延遲時間。
設置規避故障時間主要是為了在某個時間段內規避某個Broker,假設向某個Broker發送失敗/或者向此Broker發生消息的耗時比較長,生產者認為此Broker可能暫時處于異常狀態/或者該時間段內此Broker的性能不高,在下次發送消息時盡量規避這個Broker,避免向此Broker上投遞消息。
每次消息發送之后會更新該Broker的失敗條目的處理邏輯如下:
- 根據Broker名稱從faultItemTable獲取對應的FaultItem對象;
- 如果上一步獲取為空,說明之前沒有記錄過該Broker的信息,需要新建對應FaultItem對象,此時需要設置name、currentLatency延遲時間、startTimestamp規避故障時間;
- 如果第1步中獲取到該Broker對應的FaultItem對象,直接更新里面的currentLatency延遲時間、startTimestamp規避故障時間即可;
接下來看如何使用FaultItem中記錄的信息,來實現故障規避。
使用故障規避,需要啟用故障延遲機制,此時從隊列集合中選擇消息隊列的處理邏輯如下:
- 對計數器增一; 
- 根據計數器的值對消息隊列列表的長度取余得到下標值 - pos,從隊列列表中獲取- pos位置的元素,依舊輪詢從消息隊列列表中選擇消息隊列,這兩步與未開啟故障時邏輯一致;
- 選擇出消息隊列之后,會獲取該隊列所在的Broker名稱,上面說到,生產者每次與Broker通信發送消息時,會記錄消息發發送情況,此時可以根據Broker的名稱,從失敗條目表 - faultItemTable中獲取該Broker的- FaultItem,用來判斷當前選擇的消息隊列是否可用,- FaultItem中有一個規避故障時間,來看兩種情況:- 情況一:上次向此Broker發送消息失敗,那么這個時間的值為發送消息失敗時的時間 + 30000ms,判斷當前時間有沒有超過故障規避設置的時間,如果超過了當前選擇的消息隊列可用,那么就會返回當前選擇的這個消息隊列,如果未超過表示該Broker暫時不可用所以不能使用當前選擇的消息隊列,需要回到第1步繼續選擇下一個隊列;
- 情況二:上次向此Broker發送消息成功,那么這個時間的值為發送消息失敗時的時間 + 上次發送消息的耗時時間,判斷當前時間有沒有超過故障規避設置的時間,這個依賴于上次發送消息的耗時時間的長短,如果耗時比較長,可能還未超過規避時間,本次就不能選擇向此Broker發送消息同樣需要回到第1步選擇下一個隊列,如果耗時比較短,可能現在已經過了規避時間,那么就可以選擇當前的消息隊列返回;
 
- 情況一:上次向此Broker發送消息失敗,那么這個時間的值為
- 如果進行到這一步,以上步驟沒有選擇到可用的消息隊列,此時需要通過以下方式再次選擇消息隊列: 
 (1)遍歷- faultItemTable失敗條目表,將每一個Broker對應的FaultItem加入一個- LinkedList鏈表;
 (2)對鏈表進行排序,- FaultItem實現- Comparable就是為了在這里進行排序,值小的排在鏈表前面,值的大小判斷規則如下:- 對比是否有超過規避時間的Broker(調用isAvailable可以判斷),如果有表示值比較小,會排在前面,之后被優先選擇,如果所有的Broker都為超過規避時間,進入下一個對比條件;
- 對比currentLatency的值,值越小排序的時候越靠前,也就是盡量選擇發送消息耗時短的那個Broker,如果值相等進入下一個對比條件;
- 對比startTimestamp的值,同樣值越小排序的時候越靠前,盡量選擇規避時間較短的那個Broker;
 - (3)經過以上的規則進行排序后,會根據鏈表的總大小,計算一個中間值: - 如果half值小于等于0,取鏈表中的第一個元素;
- 如果half值大于0,從前half個元素中輪詢選擇元素;
 - (4)在鏈表中越靠前的元素,表示發送消息的延遲越低,在選擇時優先級就越高,如果half值小于等于0的時候,取鏈表中的第一個元素,half值大于0的時候,處于鏈表前half個的Broker,延遲都是相對較低的,此時輪詢從前haft個Broker中選擇一個Broker,總之經過這么多處理就是為了選擇一個延遲相對較低的Broker; - (5)獲取上一步選取到的那個Broker,獲取Broker可寫的隊列數量: - 如果數量小于0表示該Broker不可用,需要移除然后進入下一步;
- 如果數量大于0,表示該Broker可用,然后重新輪詢從消息隊列列表中選取一個隊列,將本次選取到的消息隊列所屬的Broker設置為第(4)步中選取到的那個Broker,也就是將這個消息隊列及Topic重置到新的Broker中(認為原本所屬的Broker不可用,需要設置一個新的Broker),然后返回當前選取的消息隊列;
 
- 對比是否有超過規避時間的Broker(調用
- 如果經過第4步依舊未選出可用的消息隊列,那么就跳過故障延遲機制,直接從該Topic的所有隊列中輪詢選擇一個返回; 
總結
故障延遲機制指的是在發送消息時記錄每個Broker的耗時時間,如果某個Broker發生故障,但是生產者還未感知(NameServer 30s檢測一次心跳,有可能Broker已經發生故障但未到檢測時間,所以會有一定的延遲),用耗時時間做為一個故障規避時間(也可以是30000ms),此時消息會發送失敗,在重試或者下次選擇消息隊列的時候,如果在規避時間內,可以避免再次選擇到此Broker,以此達到故障規避的目的。
如果某個Topic所在的所有Broker都處于不可用狀態,此時盡量選擇延遲時間最短、規避時間最短(排序后的失敗條目中靠前的元素)的Broker作為此次發送消息消息的Broker。
對應的相關源碼可參考:
參考
總結
以上是生活随笔為你收集整理的【RocketMQ】MQ消息发送总结的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: AT_agc064_a题解
- 下一篇: 一幅图像为f=[1 4 7;2 5 8;
