简单理解 Kafka 的消息可靠性策略
作者:hymanzhang,騰訊 IEG 運營開發工程師
背景
部門的開發同學最近在開發一個活動的過程中,需要關注大量的應用后臺邏輯,捕捉各種事件的觸發。在設計時打算采用 kafka 消息隊列進行業務邏輯的解耦,這樣活動開發和后臺開發同學的工作就分離開了。但是使用的同學不是很熟悉其原理,擔心以下幾個問題:
我什么業務場景下使用消息隊列
我發消息的時候,需要等 ack 嘛?
我發了消息之后,消費者一定會收到嘛?
申請騰訊云的 kafka 實例后,各種參數怎么設置呀?
遇到各種故障時,我的消息會不會丟?
消費者側會收到多條消息嘛?消費者 svr 重啟后消息會丟失嘛?
這些問題都很正常,在開始接觸和使用時總會有這樣或那樣的問題。一般情況下,不做了解,使用各種默認的推薦值,也是可以 work 的。但是我們要優雅的提升自己的姿(知)勢(識)。學習其背后的原理,至少在遇到一般的問題時,能夠分析和處理問題,做到心中有數。
什么時候使用消息隊列?
簡單來說,3 個關鍵詞, 異步/消峰/解耦,可以理解為:
我做完了后面的我不管了
工作太多了,先放一放我慢慢處理
怎么產生的我不管/怎么處理我不管
以下圖為例:
用戶提交評論中, 寫入數據庫后,存在需要捕捉評論事件的多個邏輯步驟。如果在接口處理過程中,順序的處理不同的步驟,非常繁瑣。我們可以批量的通知各個步驟(異步),無需返回直接處理當次的支付其他邏輯(解耦)。看起來就清爽多了,另外,消息隊列也可以作為緩存暫存發出的消息,不再需要考慮調用各個步驟時時延邏輯的異常場景。
本文以講解 kafka 中的可靠性設計為例,其它消息隊列的選型暫不涉及。
Kafka 基本概念
在回答文章前面的問題之前,需要簡單介紹一下各種概念。Kafka 從拓撲上分有如下角色:
Consumer: 消費者,一般以 API 形式存在于各個業務 svr 中
Producer: 生產者,一般以 API 形式存在于各個業務 svr 中
Kafka broker: kafka 集群中的服務器,topic 里的消息數據存在上面
Producer 采用發送 push 的方式將消息發到 broker 上,broker 存儲后。由 consumer 采用 pull 模式訂閱并消費消息。
如圖所示,Kafka 從存儲結構上,有如下角色:
Topic:kafka 處理的消息的邏輯大類集合,可以理解為表。寫入不同的 topic 即寫入不同的表。
Partition: Topic 下的物理分組,1 個 topic 可以分為多個 partition, 每個 partition 是一個有序的隊列(大文件)。Partition 中每一條消息都有一個有序的 offset。
Msg: 消息,通信的基本單位。每個 msg 在 topic 下的不同 partiton 僅有一份,在 partition 中有一個唯一的 offset 用于定位。
Replica: 副本,partition 的數據冗余備份,用于實現分布式的數據可靠性,但引入了不同副本間的數據一致性問題,帶來了一定的復雜度。
Leader/follower: replica 的角色,leader replica 用來提供該 partition 的讀寫服務。Follower 不停的從 leader 側同步寫入的消息。它們之間的消息狀態采用一致性策略來解決。
Kakfa 的存儲格式
為了方便后文更好的理解 broker 上的消息狀態一致性策略,需要簡單介紹一下消息的存儲格式。當 Producer 發送一條消息到 broker 中, 會根據分配 partition 規則選擇被存儲到哪一個 partition, 如果 partition 規則設置的合理,消息會均勻的分布到不同的 partition 里,這樣就實現了水平擴展。
Pruducer 可以認為 partition 是一個大的串行文件,msg 存儲時被分配一個唯一的 offset。Offset 是一個邏輯意義上的偏移,用于區分每一條消息。
而 partition 本身作為文件,可以有多個多個副本 replica(leader/follower)。多個 replica 分布在在不同的 broker 上。如果要回答如何在 broker 之間保證存儲的消息和狀態不會丟失,就要回答 broker 之間的各個 replica 的消息狀態一致性如何解決,包括 producer 已經提交了哪些消息,哪些消息已經落地,哪些消息在節點故障后不會丟失。
異步發送時的消息可靠性保證
回到文章開頭提到的幾個問題,在使用 kafka 消息隊列做異步發送時,如何保證消息的可靠性?如何回答開頭的幾個問題?這里要分為 3 個部分講解可靠性保證。
生產者的可靠性保證
回答生產者的可靠性保證,即回答:
發消息之后有么有 ack
發消息收到 ack 后,是不是消息就不會丟失了
而 Kafka 通過配置來指定 producer 生產者在發送消息時的 ack 策略:
如果想實現 kafka 配置為 CP(Consistency & Partition tolerance) 系統, 配置需要如下:
request.required.acks=-1 min.insync.replicas?=?${N/2?+?1} unclean.leader.election.enable?=?false 如圖所示,在 acks=-1 的情況下,新消息只有被 ISR 中的所有 follower(f1 和 f2, f3) 都從 leader 復制過去才會回 ack, ack 后,無論那種機器故障情況(全部或部分), 寫入的 msg4,都不會丟失, 消息狀態滿足一致性 C 要求。正常情況下,所有 follower 復制完成后,leader 回 producer ack。
異常情況下,如果當數據發送到 leader 后部分副本(f1 和 f2 同步), leader 掛了?此時任何 follower 都有可能變成新的 leader, producer 端會得到返回異常,producer 端會重新發送數據,但這樣數據可能會重復(但不會丟失), 暫不考慮數據重復的情況。
min.insync.replicas 參數用于保證當前集群中處于正常同步狀態的副本 follower 數量,當實際值小于配置值時,集群停止服務。如果配置為 N/2+1, 即多一半的數量,則在滿足此條件下,通過算法保證強一致性。當不滿足配置數時,犧牲可用性即停服。
異常情況下,leader 掛掉,此時需要重新從 follower 選舉 leader。可以為 f2 或者 f3。
如果選舉 f3 為新 leader, 則可能會發生消息截斷,因為 f3 還未同步 msg4 的數據。Kafka 的通 unclean.leader.election.enable 來控制在這種情況下,是否可以選舉 f3 為 leader。舊版本中默認為 true,在某個版本下已默認為 false,避免這種情況下消息截斷的出現。
通過 ack 和 min.insync.replicas 和 unclean.leader.election.enable 的配合,保證在 kafka 配置為 CP 系統時,要么不工作,要么得到 ack 后,消息不會丟失且消息狀態一致。
min.insync.replicas 參數默認值為 1,即滿足高可用性,只要有 1 臺能工作即可。但此時可工作的 broker 狀態不一定正確(可以想象為啥)
如果想實現 kafka 配置為 AP(Availability & Partition tolerance)系統:
request.required.acks=1 min.insync.replicas?=?1 unclean.leader.election.enable?=?false當配置為 acks=1 時,即 leader 接收消息后回 ack,這時會出現消息丟失的問題:如果 leader 接受到了 第 4 條消息,此時還沒有同步到 follower 中,leader 機器掛了,其中一個 follower 被選為 leader, 則 第 4 條消息丟失了。當然這個也需要 unclean.leader.election.enable 參數配置為 false 來配合。但是 leader 回 ack 的情況下,follower 未同步的概率會大大提升。
通過 producer 策略的配置和 kafka 集群通用參數的配置,可以針對自己的業務系統特點來進行合理的參數配置,在通訊性能和消息可靠性下尋得某種平衡。
Broker 的可靠性保證
消息通過 producer 發送到 broker 之后,還會遇到很多問題:
Partition leader 寫入成功, follower 什么時候同步?
Leader 寫入成功,消費者什么時候能讀到這條消息?
Leader 寫入成功后,leader 重啟,重啟后消息狀態還正常嘛?
Leader 重啟,如何選舉新的 leader?
這些問題集中在, 消息落到 broker 后,集群通過何種機制來保證不同副本建的消息狀態一致性。
Kafka 消息備份和同步
Kafka 通過分區的多副本策略來解決消息的備份問題。通過 HW 和 LEO 的標識,來對應 ISR 和 OSR 的概念,用于類比共識性算法解決數據同步一致性的問題。
分區多副本即前文提到的 Partition 的 replica(副本) 分布在跟 partition 不相同的機器上, 通過數據冗余保證故障自動轉移。而不同副本的狀態形成了 ISR 和 OSR 的概念。
ISR : leader 副本保持一定同步的 follower 副本, 包括 leader 副本自己,叫 In Sync Replica
AR: 所有副本 (replicas) 統稱為 assigned replicas, 即 AR
OSR: follower 同 leader 同步數據有一些延遲的節點
ISR 是 kafka 的同步策略中獨有的概念,區別于 raft 等共識性算法。Raft 要求集群中要求 N/2+1 臺正常,其在這種條件下通過復雜的算法保證選舉出的新 leader 符合一致性狀態。而 kafka 的 ISR 同步策略,通過 ISR 列表的可伸縮性和 HW&LEO 更新,一定程度上解決了消息一致性和吞吐性能之間的平衡。
ISR 通過 HW 和 LEO 的概念表示消息的同步狀態:
HW: Highwater, 俗稱高水位,它表示了一個特定的消息偏移量(offset), 在一個 parttion 中 consumer 只能拉取這個 offset 之前的消息(此 offset 跟 consumer offset 不是一個概念) ;
LEO: LogEndOffset, 日志末端偏移量, 用來表示當前日志文件中下一條寫入消息的 offset;
leader HW: 該 Partititon 所有副本的 LEO 最小值;
follower HW: min(follower 自身 LEO 和 leader HW);
Leader HW = 所有副本 LEO 最小值;
Follower HW = min(follower 自身 LEO 和 leader HW)。
Leader 不僅保存了自己的 HW &LEO 還保存了遠端副本的 HW &LEO
簡單來說,每個副本都有 HW 和 LEO 的存儲,而 leader 不但保存自己的 HW 和 LEO, 還保存了每個遠端副本的 LEO。用于在自身的 HW 更新時計算值。可以看出由于 LEO 遠端存儲的特性,其實會導致副本真實的 LEO 和 leader 存儲的 LEO 有短暫的數值差異,者會帶來一些問題,后面再講。
HW 和 LEO 的更新策略如下:
一次完整的寫請求的 HW / LEO 更新流程:
1. 初始狀態
Leader 所有的 HW&LEO 都為 0, follower 與 leader 建立連接,follower fetch leader, follower 所有 HW&LEO 都為 0
2. Follower 第一次 fetch:
Producer 發來一條消息到 leader, 此時 leader 的 LEO=1, follower 帶著自己的 HW&LEO(都為 0) 開始 fetch, leader 的 HW=min(all follower LEO)=0, leader 記錄 follower 的 LEO=0;follower 拉取到一條消息,帶著消息和 leader 的 HW(0)&LEO(1)返回自身更新自己的 LEO=1, 更新自己的 HW=min(follower 自身 LEO(1) 和 leader HW(0))=0
3. Follower 第二次 fetch:
Follower 帶著自己的 HW(0)&LEO(1) 去請求 leader .此時 leader 的 HW 更新為 1,leader 保存的 follower 的 LEO 更新為 1,帶著 leader 的 HW(1)&LEO(1)返回自身,更新自己的 HW&LEO
此時回到剛才提到的問題,這種 HW 和 LEO 更新策略有個很明顯的問題,即 follower 的 HW 更新需要 follower 的 2 輪 fetch 中的 leader 返回才能更新, 而 Leader 的 HW 已更新。這之間,如果 follower 和 leader 的節點發生故障,則 follower 的 HW 和 leader 的 HW 會處于不一致狀態,帶來比較多的一致性問題。比如如下場景:
Leader 更新完分區 HW 后,follower HW 還未更新,此時 follower 重啟
Follower 重啟后,LEO 設置為之前的 follower HW 值(0), 此時發生消息截斷(臨時狀態)
Follower 重新同步 leader, 此時 leader 宕機,則不選舉則不可用
Follower 被選舉為 leader, 則 msg 1 永久丟失了
在 kafka 配置為 AP 系統的情況下,由于 min.insync.replicas 為 1, 這種重啟后 follower 發生截斷發生的概率會大大提升, 而在多個副本存在的情況下,情況可能還會更加糟糕。而 kafka 新版本為了解決這個 HW&LEO 的同步機制更新缺陷,引入了 Epoch 的概念。
Leader epoch 分兩部分組成:
Epoch : 版本號。每當副本領導權發生變更時,都會增加該版本號。小版本號的 Leader 被認為是過期 Leader,不能再行使 Leader 權力。
起始位移(Start Offset)。Leader 副本在該 Epoch 值上寫入的首條消息的位移。
Leader epoch(1, 120) 說明這個 leader 的版本號為 1,版本的起始位置是 第 120 條消息開始的。Kafka Broker 會在內存中為每個分區都緩存 Leader Epoch 數據,同時它還會定期地將這些信息持久化到一個 checkpoint 文件中。當 Leader 副本寫入消息到磁盤時,Broker 會嘗試更新這部分緩存。如果該 Leader 是首次寫入消息,那么 Broker 會向緩存中增加一個 Leader Epoch 條目,否則就不做更新。這樣,每次有 Leader 變更時,新的 Leader 副本會查詢這部分緩存,取出對應的 Leader Epoch 的起始位移,以避免數據丟失和不一致的情況。
示圖如下:
Kafka 通過 ISR 的同步機制及優化策略,用 HW & LEO 的方式很好的確保了數據不丟失以及吞吐率。而 ISR 的管理最終都會反饋到 Zookeeper 上,其實現和 leader 的選舉策略不再贅述。
Consumer 的可靠性策略
Consumer 的可靠性策略集中在 consumer 的投遞語義上,即:
何時消費,消費到什么?
按消費是否會丟?
消費是否會重復?
這些語義場景,可以通過 kafka 消費者的而部分參數進行配置,簡單來說有以下 3 中場景:
1. AutoCommit(at most once, commit 后掛,實際會丟)
enable.auto.commit = true
auto.commit.interval.ms
配置如上的 consumer 收到消息就返回正確給 brocker, 但是如果業務邏輯沒有走完中斷了,實際上這個消息沒有消費成功。這種場景適用于可靠性要求不高的業務。其中 auto.commit.interval.ms 代表了自動提交的間隔。比如設置為 1s 提交 1 次,那么在 1s 內的故障重啟,會從當前消費 offset 進行重新消費時,1s 內未提交但是已經消費的 msg, 會被重新消費到。
2. 手動 Commit(at least once, commit 前掛,就會重復, 重啟還會丟)
enable.auto.commit = false
配置為手動提交的場景下,業務開發者需要在消費消息到消息業務邏輯處理整個流程完成后進行手動提交。如果在流程未處理結束時發生重啟,則之前消費到未提交的消息會重新消費到,即消息顯然會投遞多次。此處應用與業務邏輯明顯實現了冪等的場景下使用。
特別應關注到在 golang 中 sarama 庫的幾個參數的配置:
sarama.offset.initial?(oldest,?newest) offsets.retention.minutesintitial = oldest 代表消費可以訪問到的 topic 里的最早的消息,大于 commit 的位置,但是小于 HW。同時也受到 broker 上消息保留時間的影響和位移保留時間的影響。不能保證一定能消費到 topic 起始位置的消息。
如果設置為 newest 則代表訪問 commit 位置的下一條消息。如果發生 consumer 重啟且 autocommit 沒有設置為 false, 則之前的消息會發生丟失,再也消費不到了。在業務環境特別不穩定或非持久化 consumer 實例的場景下,應特別注意。
一般情況下, offsets.retention.minutes 為 1440s。
3. Exactly once, 很難,需要 msg 持久化和 commit 是原子的
消息投遞且僅投遞一次的語義是很難實現的。首先要消費消息并且提交保證不會重復投遞,其次提交前要完成整體的業務邏輯關于消息的處理。在 kafka 本身沒有提供此場景語義接口的情況下,這幾乎是不可能有效實現的。一般的解決方案,也是進行原子性的消息存儲,業務邏輯異步慢慢的從存儲中取出消息進行處理。
總結
以上是生活随笔為你收集整理的简单理解 Kafka 的消息可靠性策略的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用万字长文聊一聊 Embedding 技
- 下一篇: 巧用 Protobuf 反射来优化代码,