你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你
我們將消息隊列這個組件加入到了我們的商城系統里,并且通過秒殺這個實際的案例進行了實際演練,知道了它對高并發寫流量做削峰填谷,對非關鍵業務邏輯做異步處理,對不同的業務系統做解耦合。
場景:
現在我們的電商系統中上了一個新產品發紅包的功能,即當用戶在我們商城消費了一定的額度之后,我們系統就給用戶發送一個現金紅包,用來答謝用戶并且促進用戶消費。
前面我們說到過,由于這個發紅包的動作并不屬于當前下單的主流程,所以我們就使用消息隊列來異步處理。這個時候,就會有個隱藏問題:
我們在投遞消息的過程中消息可能會丟失,那我們的用戶就來打客服電話投訴我們說沒有得到紅包,甚至于有關部門投訴我們。
另一個問題,就是如果我們將消息重復發送了,那么用戶就會得到兩個紅包,這樣會造成我們公司的損失。
所以,現在我們要確保,系統生產的消息一定要被消費到,并且只能被消費一次,這個到底該怎么做呢?接下來,我們就來深入學習下。
01 為何消息會丟失?
要想保證消息只被消費一次,那么首先就得要保證消息不丟失。我們先來看看,消息從被寫入消息隊列,到被消費完成,這整個鏈路上會有哪些地方可能會導致消息丟失?我們不難看出,其實主要有三個地方:
消息從生產者到消息隊列的過程。
消息在消息隊列存儲的過程。
消息在被消費的過程。
如上,我們分析了共有 3 個消息可能丟失的地方,接下來,我們就具體來分析下每一種情況。
1. 消息在寫到消息隊列的過程中丟失
消息生產者一般就是業務系統,消息隊列是單獨部署了在獨立的服務器上的,所以業務服務器和消息隊列服務器可能會出現網絡抖動,當出現了網絡抖動,消息就會丟失。
一般這種情況,我們可以采用消息重傳的方案,即當我們發現發送的消息超時后,我們就重新發送一次,但是不能一直無限制的重傳消息。按照經驗來說,如果不是消息隊列本身故障,或者是網絡斷開了,一般重試個 2 到 3 次就行了。
但是,這種方案就有可能造成消息的重復,這樣就會導致消費者消費到重復的消息。
例如,消息發送到消息隊列中,但是由于消息隊列處理消息較慢或者網絡抖動,這個時候,其實消息是寫入成功的,但是對于生產端就認為超時了,那么生產者就會重傳當前消息,則會出現消息重復。對于我們上面案例中,就是用戶會收到兩個紅包。
2. 消息在消息隊列中丟失
即使消息發送到了消息隊列,消息也不會萬無一失,還是會面臨丟失的風險。
我們以 Kafka 為例,消息在Kafka 中是存儲在本地磁盤上的, 為了減少消息存儲對磁盤的隨機 I/O,一般我們會將消息寫入到操作系統的 Page Cache 中,然后在合適的時間將消息刷新到磁盤上。
例如,Kafka 可以配置當達到某一時間間隔,或者累積一定的消息數量的時候再刷盤,也就是所謂的異步刷盤。
不過,如果發生機器掉電或者機器異常重啟,那么 Page Cache 中還沒有來得及刷盤的消息就會丟失了。那么怎么解決呢?你可能會把刷盤的間隔設置很短,或者設置累積一條消息就就刷盤。
但這樣頻繁刷盤會對性能有比較大的影響,而且從經驗來看,出現機器宕機或者掉電的幾率也不高,所以我不建議你這樣做。
如果你的電商系統對消息丟失的容忍度很低,那么你可以考慮以集群方式部署 Kafka 服務,通過部署多個副本備份數據,保證消息盡量不丟失。
那么它是怎么實現的呢?Kafka 集群中有一個 Leader 負責消息的寫入和消費,可以有多個 Follower 負責數據的備份。Follower 中有一個特殊的集合叫做 ISR(in-sync replicas),當 Leader 故障時,新選舉出來的 Leader 會從 ISR 中選擇,默認 Leader 的數據會異步地復制給 Follower,這樣在 Leader 發生掉電或者宕機時,Kafka 會從 Follower 中消費消息,減少消息丟失的可能。
由于默認消息是異步地從 Leader 復制到 Follower 的,所以一旦 Leader 宕機,那些還沒有來得及復制到 Follower 的消息還是會丟失。
為了解決這個問題,Kafka 為生產者提供一個選項叫做“acks”,當這個選項被設置為“all”時,生產者發送的每一條消息除了發給 Leader 外還會發給所有的 ISR,并且必須得到 Leader 和所有 ISR 的確認后才被認為發送成功。這樣,只有 Leader 和所有的 ISR 都掛了,消息才會丟失。
從上面這張圖來看,當設置“acks=all”時,需要同步執行 1,3,4 三個步驟,對于消息生產的性能來說也是有比較大的影響的,所以你在實際應用中需要仔細地權衡考量。這里建議是:
如果你需要確保消息一條都不能丟失,那么建議不要開啟消息隊列的同步刷盤,而是需要使用集群的方式來解決,可以配置當所有 ISR Follower 都接收到消息才返回成功。
如果對消息的丟失有一定的容忍度,那么建議不部署集群,即使以集群方式部署,也建議配置只發送給一個 Follower 就可以返回成功了。
我們的業務系統一般對于消息的丟失有一定的容忍度,比如說以上面的紅包系統為例,如果紅包消息丟失了,我們只要后續給沒有發送紅包的用戶補發紅包就好了。
3. 在消費的過程中存在消息丟失的可能
還是以 Kafka 為例來說明。一個消費者消費消息的進度是記錄在消息隊列集群中的,而消費的過程分為三步:接收消息、處理消息、更新消費進度。
這里面接收消息和處理消息的過程都可能會發生異?;蛘呤?#xff0c;比如說,消息接收時網絡發生抖動,導致消息并沒有被正確的接收到;處理消息時可能發生一些業務的異常導致處理流程未執行完成,這時如果更新消費進度,那么這條失敗的消息就永遠不會被處理了,也可以認為是丟失了。
所以,在這里你需要注意的是,一定要等到消息接收和處理完成后才能更新消費進度,但是這也會造成消息重復的問題,比方說某一條消息在處理之后,消費者恰好宕機了,那么因為沒有更新消費進度,所以當這個消費者重啟之后,還會重復地消費這條消息。
02 如何保證消息只被消費一次
從上面的分析中,你能發現,為了避免消息丟失,我們需要付出兩方面的代價:一方面是性能的損耗;一方面可能造成消息重復消費。
性能的損耗我們還可以接受,因為一般業務系統只有在寫請求時才會有發送消息隊列的操作,而一般系統的寫請求的量級并不高,但是消息一旦被重復消費,就會造成業務邏輯處理的錯誤。那么我們要如何避免消息的重復呢?
想要完全的避免消息重復的發生是很難做到的,因為網絡的抖動、機器的宕機和處理的異常都是比較難以避免的,在工業上并沒有成熟的方法,因此我們會把要求放寬,只要保證即使消費到了重復的消息,從消費的最終結果來看和只消費一次是等同的就好了,也就是保證在消息的生產和消費的過程是“冪等”的。
1. 什么是冪等
冪等是一個數學上的概念,它的含義是多次執行同一個操作和執行一次操作,最終得到的結果是相同的,說起來可能有些抽象,我給你舉個例子:
比如,男生和女生吵架,女生抓住一個點不放,傳遞“你不在乎我了嗎?”(生產消息)的信息。那么當多次埋怨“你不在乎我了嗎?”的時候(多次生產相同消息),她不知道的是,男生的耳朵(消息處理)會自動把 N 多次的信息屏蔽,就像只聽到一次一樣,這就是冪等性。
如果我們消費一條消息的時候,要給現有的庫存數量減 1,那么如果消費兩條相同的消息就會給庫存數量減 2,這就不是冪等的。而如果消費一條消息后,處理邏輯是將庫存的數量設置為 0,或者是如果當前庫存數量是 10 時則減 1,這樣在消費多條消息時,所得到的結果就是相同的,這就是冪等的。
說白了,你可以這么理解“冪等”:一件事兒無論做多少次都和做一次產生的結果是一樣的,那么這件事兒就具有冪等性。
2. 在生產、消費過程中增加消息冪等性的保證
消息在生產和消費的過程中都可能會產生重復,所以你要做的是,在生產過程和消費過程中增加消息冪等性的保證,這樣就可以認為從“最終結果上來看”,消息實際上是只被消費了一次的。
在消息生產過程中,在 Kafka0.11 版本和 Pulsar 中都支持“producer idempotency”的特性,翻譯過來就是生產過程的冪等性,這種特性保證消息雖然可能在生產端產生重復,但是最終在消息隊列存儲時只會存儲一份。
它的做法是給每一個生產者一個唯一的 ID,并且為生產的每一條消息賦予一個唯一 ID,消息隊列的服務端會存儲 < 生產者 ID,最后一條消息 ID> 的映射。當某一個生產者產生新的消息時,消息隊列服務端會比對消息 ID 是否與存儲的最后一條 ID 一致,如果一致,就認為是重復的消息,服務端會自動丟棄。
而在消費端,冪等性的保證會稍微復雜一些,你可以從通用層和業務層兩個層面來考慮。
你可以看到,無論是生產端的冪等性保證方式,還是消費端通用的冪等性保證方式,它們的共同特點都是為每一個消息生成一個唯一的 ID,然后在使用這個消息的時候,先比對這個 ID 是否已經存在,如果存在,則認為消息已經被使用過。
所以這種方式是一種標準的實現冪等的方式,你在項目之中可以拿來直接使用,它在邏輯上的偽代碼就像下面這樣:
boolean isIDExisted = selectByID(ID); // 判斷ID是否存在 if(isIDExisted) { return; //存在則直接返回 } else { process(message); //不存在,則處理消息 saveID(ID); //存儲ID }不過這樣會有一個問題:如果消息在處理之后,還沒有來得及寫入數據庫,消費者宕機了重啟之后發現數據庫中并沒有這條消息,還是會重復執行兩次消費邏輯。
這時你就需要引入事務機制,保證消息處理和寫入數據庫必須同時成功或者同時失敗,但是這樣消息處理的成本就更高了,所以,如果對于消息重復沒有特別嚴格的要求,可以直接使用這種通用的方案,而不考慮引入事務。
在業務層面怎么處理呢?這里有很多種處理方式,其中有一種是增加樂觀鎖的方式。比如,你的消息處理程序需要給一個人的賬號加錢,那么你可以通過樂觀鎖的方式來解決。
具體的操作方式是這樣的:你給每個人的賬號數據中增加一個版本號的字段,在生產消息時先查詢這個賬戶的版本號,并且將版本號連同消息一起發送給消息隊列。消費端在拿到消息和版本號后,在執行更新賬戶金額 SQL 的時候帶上版本號,類似于執行:
update user set amount = amount + 20, version=version+1 where userId=1 and version=1;你看,我們在更新數據時給數據加了樂觀鎖,這樣在消費第一條消息時,version 值為 1,SQL 可以執行成功,并且同時把 version 值改為了 2;在執行第二條相同的消息時,由于 version 值不再是 1,所以這條 SQL 不能執行成功,也就保證了消息的冪等性。
總結,今天我們主要學習了在消息隊列中,消息可能會發生丟失的場景,和我們的應對方法,以及在消息重復的場景下,我們要如何保證,盡量不影響消息最終的處理結果。
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道公眾號
好文章,我在看??
總結
以上是生活随笔為你收集整理的你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你的全部內容,希望文章能夠幫你解決所遇到的問題。