一文理清RocketMQ顺序消费、重复消费、消息丢失问题
前言
在使用消息隊列時不可避免的會遇到順序消費、重復消費、消息丟失三個問題。在一次面試字節的時候,面試官問到如何保證順序消費,當時回答不太準確,特意此文回顧如何解決順序消費、重復消費、消息丟失三個問題。
重復消費
解決重復消費的關鍵在于消費方的冪等
冪等(idempotent、idempotence)是一個數學與計算機學概念,常見于抽象代數中。在編程中一個冪等操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同。
冪等函數,或冪等方法,是指可以使用相同參數重復執行,并能獲得相同結果的函數。這些函數不會影響系統狀態,也不用擔心重復執行會對系統造成改變。
冪等操作根據場景的不同可以分為:
(1)強校驗
場景:如與金錢相關的支付等關鍵消息,必須強校驗。
基于數據庫的唯一鍵來保證重復數據不會被插入多條。建立一個已消費消息的表,每次消費之前檢查消費表中當前消費的消息是否已經存在,若存在表示消息已經被消費過直接返回。
(2)弱校驗
場景:可以有小概率出現重復消費的非關鍵消息
基于Redis的實現:
順序消費
RocketMQ提供兩種順序消息模式:
- 普通順序消息
普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。普通順序消息在 Broker 重啟情況下不會保證消息順序性 (短暫時間) 。 - 嚴格順序消息
嚴格順序消息模式下,對于指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發布和消費。嚴格順序消息 即使在異常情況下也會保證消息的順序性 。
嚴格順序雖然能更好的保證消息有序,但實現它可會付出巨大的代價。如果你使用嚴格順序模式,Broker 集群中只要有一臺機器不可用,則整個集群都不可用。
一般而言,我們的 MQ 都是能容忍短暫的亂序,所以推薦使用普通順序模式。
順序消費的實現
在MQ的模型中,順序需要由3個階段去保障:
消息被發送時保持順序
使用嚴格順序模式
嚴格順序消息模式下,對于指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發布和消費。因此只要保證消息同步發送(發完一條后再發下一條)即可保證消息發送時保持順序。
使用普通順序模式
普通順序模式下,只有同一個隊列的消息能保證有序。Producer 生產消息的時候會進行輪詢(根據設定的負載均衡策略)來向同一主題的不同消息隊列發送消息。那么如果此時有幾個消息分別是同一個訂單的創建、支付、發貨,在輪詢的策略下這 三個消息會被發送到不同隊列 ,因為在不同的隊列此時就無法使用 RocketMQ 帶來的隊列有序特性來保證消息有序性了。
因此使用普通順序時,在同步發送的基礎上,還需要將消息發送到相同的隊列。
在RocketMQ中,通過MessageQueueSelector來實現隊列的選擇。通過對訂單的唯一標識符取hash,將同一個訂單的消息發送到相同的隊列。
消息被消費時保持和存儲的順序一致
在分布式的情況下,即使消息隊列有序的將消息發送給消費者,也可能因為網絡等原因,導致消費者接收到的消息無序。如:按順序發送消息a、b給消費者。雖然a先發送,但因為網絡原因,消息a在網絡中滯留一段時間,導致消費者收到的消息順序為b、a。同時,若同一個隊列的消息由不同消費者消費也可能出現以上情況。
消費者順序消費消息的實現
基于以上分析,要保證消息順序的被消費者消費,必須滿足下列條件:
(1)同一個消費者消費
類似于通過訂單id的hash選擇相同的隊列,可以通過訂單的hash選擇同一個消費者同步消費(消費完一條后再拉取下一條,單線程消費),保證同一個訂單的順序消費
(2)通過 consumer 內部用內存隊列做排隊,然后分發給底層不同的 worker 實現(實現復雜)
若消費者是多線程,此時在消費者內部建立內存隊列。先將消息拉取到內存隊列后,在分發給不同的線程
消息丟失
消息的可靠性需要由3個階段去保障:
發送端消息可靠性
消息發送一般有以下幾種方式:同步發送、異步發送以及單向發送,業務具體選擇哪種方式進行消息發送,需要根據情況進行判斷,下面具體介紹不同的發送方式實現的消息可靠性保證。
(1)同步發送
同步發送是指發送端在發送消息時,阻塞線程進行等待,直到服務器返回發送的結果。發送端如果需要保證消息的可靠性,防止消息發送失敗,可以采用同步阻塞式的發送,然后同步檢查Brocker返回的狀態來判斷消息是否持久化成功。如果發送超時或者失敗,則會默認重試2次,RocketMQ選擇至少傳輸成功一次的消息模型,但是因為網絡傳輸是不可靠的,有可能發生重復投遞。
(2)異步發送
異步發送是指發送端在發送消息時,傳入回調接口實現類,調用該發送接口后不會阻塞,發送方法會立即返回,回調任務會在另一個線程中執行,消息發送結果會回傳給相應的回調函數。具體的業務實現可以根據發送的結果信息來判斷是否需要重試來保證消息的可靠性。
(3)單向發送
單向發送是指發送端發送完成之后,調用該發送接口后立刻返回,并不返回發送的結果,業務方無法根據發送的狀態來判斷消息是否發送成功,單向發送相對前兩種發送方式來說是一種不可靠的消息發送方式,因此要保證消息發送的可靠性,不推薦采用這種方式來發送消息。
存儲端消息可靠性
存儲端的可靠性依靠持久化策略、備份(主從復制)保證
RocketMQ刷盤機制
同步刷盤
消息寫入內存的 PageCache后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執行完成后喚醒等待的線程,返回消息寫成功的狀態。這種方式可以保證數據絕對安全,但是吞吐量不大。
異步刷盤
消息寫入到內存的 PageCache中,就立刻給客戶端返回寫操作成功,當 PageCache中的消息積累到一定的量時,觸發一次寫操作,或者定時等策略將 PageCache中的消息寫入到磁盤中。這種方式吞吐量大,性能高,但是 PageCache中的數據可能丟失,不能保證數據絕對的安全。
消費端消息可靠性
(1)消費重試
消費者從RocketMQ拉取到消息之后,需要返回消費成功來表示業務方正常消費完成。因此只有返回CONSUME_SUCCESS才算消費完成,如果返回CONSUME_LATER則會按照不同的messageDelayLevel時間進行再次消費,時間分級從秒到小時,最長時間為2個小時后再次進行消費重試,如果消費滿16次之后還是未能消費成功,則不再重試,會將消息發送到死信隊列,從而保證消息存儲的可靠性。
(2)死信隊列
未能成功消費的消息,消息隊列并不會立刻將消息丟棄,而是將消息發送到死信隊列,其名稱是在原隊列名稱前加%DLQ%,如果消息最終進入了死信隊列,則可以通過RocketMQ提供的相關接口從死信隊列獲取到相應的消息,保證了消息消費的可靠性。
(3)消息回溯
回溯消費是指Consumer已經消費成功的消息,或者之前消費業務邏輯有問題,現在需要重新消費。要支持此功能,則Broker存儲端在向Consumer消費端投遞成功消息后,消息仍然需要保留。重新消費一般是按照時間維度,例如由于Consumer系統故障,恢復后需要重新消費1小時前的數據。RocketMQ Broker提供了一種機制,可以按照時間維度來回退消費進度,這樣就可以保證只要發送成功的消息,只要消息沒有過期,消息始終是可以消費到的。
參考
- 👉冒著期末掛科的風險也要給你看的消息隊列和RocketMQ入門總結(全面、易懂)
- 聊一聊順序消息(RocketMQ順序消息的實現機制)(分析透徹好文)
- rocketmq-常見問題總結(消息的順序、重復、消費模式)(分析透徹好文)
- 字節跳動面試官這樣問消息隊列:分布式事務、重復消費、順序消費,我整理了一下(比較全面)
- Rocket官網
- 面試官再問我如何保證 RocketMQ 不丟失消息,這回我笑了!
- RocketMQ如何保證消息的可靠性?
- 分布式消息隊列:如何保證消息的順序性
- 消息隊列之如何保證消息的順序性?
總結
以上是生活随笔為你收集整理的一文理清RocketMQ顺序消费、重复消费、消息丢失问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 有序数组中查找第一个比target大的数
- 下一篇: 怎么申请qq企业邮箱(qq企业邮箱怎么注