震惊!原来这才是Kafka的“真面目”!
出處:https://www.jianshu.com/p/d3e963ff8b70
Kafka 是一個分布式消息隊列,具有高性能、持久化、多副本備份、橫向擴展能力。生產者往隊列里寫消息,消費者從隊列里取消息進行業務邏輯。一般在架構設計中起到解耦、削峰、異步處理的作用。
Kafka 對外使用 Topic 的概念,生產者往 Topic 里寫消息,消費者從中讀消息。
為了做到水平擴展,一個 Topic 實際是由多個 Partition 組成的,遇到瓶頸時,可以通過增加 Partition 的數量來進行橫向擴容。單個 Parition 內是保證消息有序。
每新寫一條消息,Kafka 就是在對應的文件 append 寫,所以性能非常高。
Kafka 的總體數據流是這樣的:
大概用法就是,Producers 往 Brokers 里面的指定 Topic 中寫消息,Consumers 從 Brokers 里面拉取指定 Topic 的消息,然后進行業務處理。
圖中有兩個 Topic,Topic0 有兩個 Partition,Topic1 有一個 Partition,三副本備份。
可以看到 Consumer Gourp1 中的 Consumer2 沒有分到 Partition 處理,這是有可能出現的,下面會講到。
關于 Broker、Topics、Partitions 的一些元信息用 ZK 來存,監控和路由啥的也都會用到 ZK。
生產
基本流程是這樣的:
創建一條記錄,記錄中一個要指定對應的 Topic 和 Value,Key 和 Partition 可選。?
先序列化,然后按照 Topic 和 Partition,放進對應的發送隊列中。Kafka Produce 都是批量請求,會積攢一批,然后一起發送,不是調 send() 就立刻進行網絡發包。
如果 Partition 沒填,那么情況會是這樣的:
-
Key 有填。按照 Key 進行哈希,相同 Key 去一個 Partition。(如果擴展了 Partition 的數量那么就不能保證了)
-
Key 沒填。Round-Robin 來選 Partition。
這些要發往同一個 Partition 的請求按照配置,攢一波,然后由一個單獨的線程一次性發過去。
API
有 High Level API,替我們把很多事情都干了,Offset,路由啥都替我們干了,用起來很簡單。
還有 Simple API,Offset 啥的都是要我們自己記錄。(注:消息消費的時候,首先要知道去哪消費,這就是路由,消費完之后,要記錄消費單哪,就是 Offset)
Partition
當存在多副本的情況下,會盡量把多個副本,分配到不同的 Broker 上。
Kafka 會為 Partition 選出一個 Leader,之后所有該 Partition 的請求,實際操作的都是 Leader,然后再同步到其他的 Follower。
當一個 Broker 歇菜后,所有 Leader 在該 Broker 上的 Partition 都會重新選舉,選出一個 Leader。(這里不像分布式文件存儲系統那樣會自動進行復制保持副本數)
然后這里就涉及兩個細節:
-
怎么分配 Partition
-
怎么選 Leader
關于 Partition 的分配,還有 Leader 的選舉,總得有個執行者。在 Kafka 中,這個執行者就叫 Controller。
Kafka 使用 ZK 在 Broker 中選出一個 Controller,用于 Partition 分配和 Leader 選舉。
Partition 的分配:
-
將所有 Broker(假設共 n 個 Broker)和待分配的 Partition 排序。
-
將第 i 個 Partition 分配到第(i mod n)個 Broker 上 (這個就是 Leader)。
-
將第 i 個 Partition 的第 j 個 Replica 分配到第((i + j) mode n)個 Broker 上。
Leader 容災
Controller 會在 ZK 的 /brokers/ids 節點上注冊 Watch,一旦有 Broker 宕機,它就能知道。
當 Broker 宕機后,Controller 就會給受到影響的 Partition 選出新 Leader。
Controller 從?ZK?的 /brokers/topics/[topic]/partitions/[partition]/state 中,讀取對應 Partition 的 ISR(in-sync replica 已同步的副本)列表,選一個出來做 Leader。
選出 Leader 后,更新 ZK,然后發送 LeaderAndISRRequest 給受影響的 Broker,讓它們知道改變這事。
為什么這里不是使用 ZK 通知,而是直接給 Broker 發送 RPC 請求,我的理解可能是這樣做 ZK 有性能問題吧。
如果 ISR 列表是空,那么會根據配置,隨便選一個 Replica 做 Leader,或者干脆這個 Partition 就是歇菜。
如果 ISR 列表的有機器,但是也歇菜了,那么還可以等 ISR 的機器活過來。
多副本同步
這里的策略,服務端這邊的處理是 Follower 從 Leader 批量拉取數據來同步。但是具體的可靠性,是由生產者來決定的。
生產者生產消息的時候,通過 request.required.acks 參數來設置數據的可靠性。
在 Acks=-1 的時候,如果 ISR 少于 min.insync.replicas 指定的數目,那么就會返回不可用。
這里 ISR 列表中的機器是會變化的,根據配置 replica.lag.time.max.ms,多久沒同步,就會從 ISR 列表中剔除。
以前還有根據落后多少條消息就踢出 ISR,在 1.0 版本后就去掉了,因為這個值很難取,在高峰的時候很容易出現節點不斷的進出 ISR 列表。
從 ISA 中選出 Leader 后,Follower 會把自己日志中上一個高水位后面的記錄去掉,然后去和 Leader 拿新的數據。
因為新的 Leader 選出來后,Follower 上面的數據,可能比新 Leader 多,所以要截取。
這里高水位的意思,對于 Partition 和 Leader,就是所有 ISR 中都有的最新一條記錄。消費者最多只能讀到高水位。
從 Leader 的角度來說高水位的更新會延遲一輪,例如寫入了一條新消息,ISR 中的 Broker 都 Fetch 到了,但是 ISR 中的 Broker 只有在下一輪的 Fetch 中才能告訴 Leader。
也正是由于這個高水位延遲一輪,在一些情況下,Kafka 會出現丟數據和主備數據不一致的情況,0.11 開始,使用 Leader Epoch 來代替高水位。
思考:當 Acks=-1 時
-
是 Follwers 都來 Fetch 就返回成功,還是等 Follwers 第二輪 Fetch?
-
Leader 已經寫入本地,但是 ISR 中有些機器失敗,那么怎么處理呢?
消費
訂閱 Topic 是以一個消費組來訂閱的,一個消費組里面可以有多個消費者。同一個消費組中的兩個消費者,不會同時消費一個 Partition。
換句話來說,就是一個 Partition,只能被消費組里的一個消費者消費,但是可以同時被多個消費組消費。
因此,如果消費組內的消費者如果比 Partition 多的話,那么就會有個別消費者一直空閑。
API
?
訂閱 Topic 時,可以用正則表達式,如果有新 Topic 匹配上,那能自動訂閱上。
Offset 的保存
一個消費組消費 Partition,需要保存 Offset 記錄消費到哪,以前保存在 ZK 中,由于 ZK 的寫性能不好,以前的解決方法都是 Consumer 每隔一分鐘上報一次。
這里 ZK 的性能嚴重影響了消費的速度,而且很容易出現重復消費。在 0.10 版本后,Kafka 把這個 Offset 的保存,從 ZK 總剝離,保存在一個名叫 consumeroffsets topic 的 Topic 中。
寫進消息的 Key 由 Groupid、Topic、Partition 組成,Value 是偏移量 Offset。Topic 配置的清理策略是 Compact。總是保留最新的 Key,其余刪掉。
一般情況下,每個 Key 的 Offset 都是緩存在內存中,查詢的時候不用遍歷 Partition,如果沒有緩存,第一次就會遍歷 Partition 建立緩存,然后查詢返回。
確定 Consumer Group 位移信息寫入 consumers_offsets 的哪個 Partition,具體計算公式:
__consumers_offsets?partition?=Math.abs(groupId.hashCode()?%?groupMetadataTopicPartitionCount)??? //groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認是50個分區。思考:如果正在跑的服務,修改了 offsets.topic.num.partitions,那么 Offset 的保存是不是就亂套了?
分配 Partition—Reblance
生產過程中 Broker 要分配 Partition,消費過程這里,也要分配 Partition 給消費者。
類似 Broker 中選了一個 Controller 出來,消費也要從 Broker 中選一個 Coordinator,用于分配 Partition。
下面從頂向下,分別闡述一下:
-
怎么選 Coordinator
-
交互流程
-
Reblance 的流程
①選 Coordinator:看 Offset 保存在那個 Partition;該 Partition Leader 所在的 Broker 就是被選定的 Coordinator。
這里我們可以看到,Consumer Group 的 Coordinator,和保存 Consumer Group Offset 的 Partition Leader 是同一臺機器。
②交互流程:把 Coordinator 選出來之后,就是要分配了。
整個流程是這樣的:
-
Consumer 啟動、或者 Coordinator 宕機了,Consumer 會任意請求一個 Broker,發送 ConsumerMetadataRequest 請求。
Broker 會按照上面說的方法,選出這個 Consumer 對應 Coordinator 的地址。
-
Consumer 發送 Heartbeat 請求給 Coordinator,返回 IllegalGeneration 的話,就說明 Consumer 的信息是舊的了,需要重新加入進來,進行 Reblance。
返回成功,那么 Consumer 就從上次分配的 Partition 中繼續執行。
③Reblance 流程:
-
Consumer 給 Coordinator 發送 JoinGroupRequest 請求。
-
這時其他 Consumer 發 Heartbeat 請求過來時,Coordinator 會告訴他們,要 Reblance 了。
-
其他 Consumer 發送 JoinGroupRequest 請求。
-
所有記錄在冊的 Consumer 都發了 JoinGroupRequest 請求之后,Coordinator 就會在這里 Consumer 中隨便選一個 Leader。
然后回 JoinGroupRespone,這會告訴 Consumer 你是 Follower 還是 Leader,對于 Leader,還會把 Follower 的信息帶給它,讓它根據這些信息去分配 Partition。
-
Consumer 向 Coordinator 發送 SyncGroupRequest,其中 Leader 的 SyncGroupRequest 會包含分配的情況。
-
Coordinator 回包,把分配的情況告訴 Consumer,包括 Leader。
當 Partition 或者消費者的數量發生變化時,都得進行 Reblance。
列舉一下會 Reblance 的情況:
-
增加 Partition
-
增加消費者
-
消費者主動關閉
-
消費者宕機了
-
Coordinator 自己也宕機了
消息投遞語義
Kafka 支持 3 種消息投遞語義:
-
At most once:最多一次,消息可能會丟失,但不會重復。
-
At least once:最少一次,消息不會丟失,可能會重復。
-
Exactly once:只且一次,消息不丟失不重復,只且消費一次(0.11 中實現,僅限于下游也是 Kafka)
在業務中,常常都是使用 At least once 的模型,如果需要可重入的話,往往是業務自己實現。
At least once
先獲取數據,再進行業務處理,業務處理成功后 Commit Offset:
-
生產者生產消息異常,消息是否成功寫入不確定,重做,可能寫入重復的消息。
-
消費者處理消息,業務處理成功后,更新 Offset 失敗,消費者重啟的話,會重復消費。
At most once
先獲取數據,再 Commit Offset,最后進行業務處理:
-
生產者生產消息異常,不管,生產下一個消息,消息就丟了。
-
消費者處理消息,先更新 Offset,再做業務處理,做業務處理失敗,消費者重啟,消息就丟了。
Exactly once
思路是這樣的,首先要保證消息不丟,再去保證不重復。所以盯著 At least once 的原因來搞。
首先想出來的:
-
生產者重做導致重復寫入消息:生產保證冪等性。
-
消費者重復消費:消滅重復消費,或者業務接口保證冪等性重復消費也沒問題。
由于業務接口是否冪等,不是 Kafka 能保證的,所以 Kafka 這里提供的 Exactly once 是有限制的,消費者的下游也必須是 Kafka。
所以以下討論的,沒特殊說明,消費者的下游系統都是 Kafka(注:使用 Kafka Conector,它對部分系統做了適配,實現了 Exactly once)。生產者冪等性好做,沒啥問題。
解決重復消費有兩個方法:
-
下游系統保證冪等性,重復消費也不會導致多條記錄。
-
把 Commit Offset 和業務處理綁定成一個事務。
本來 Exactly once 實現第 1 點就 OK 了。但是在一些使用場景下,我們的數據源可能是多個 Topic,處理后輸出到多個 Topic,這時我們會希望輸出時要么全部成功,要么全部失敗。這就需要實現事務性。
既然要做事務,那么干脆把重復消費的問題從根源上解決,把 Commit Offset 和輸出到其他 Topic 綁定成一個事務。
生產冪等性
思路是這樣的,為每個 Producer 分配一個 Pid,作為該 Producer 的唯一標識。
Producer 會為每一個維護一個單調遞增的 Seq。類似的,Broker 也會為每個記錄下最新的 Seq。
當 req_seq == broker_seq+1 時,Broker 才會接受該消息,因為:
-
消息的 Seq 比 Broker 的 Seq 大超過時,說明中間有數據還沒寫入,即亂序了。
-
消息的 Seq 不比 Broker 的 Seq 小,那么說明該消息已被保存。
事務性/原子性廣播
場景是這樣的:
-
先從多個源 Topic 中獲取數據。
-
做業務處理,寫到下游的多個目的 Topic。
-
更新多個源 Topic 的 Offset。
其中第 2、3 點作為一個事務,要么全成功,要么全失敗。這里得益于?Offset 實際上是用特殊的 Topic 去保存,這兩點都歸一為寫多個 Topic 的事務性處理。
基本思路是這樣的:
-
引入 Tid(transaction id),和 Pid 不同,這個 ID 是應用程序提供的,用于標識事務,和 Producer 是誰并沒關系。
就是任何 Producer 都可以使用這個 Tid 去做事務,這樣進行到一半就死掉的事務,可以由另一個 Producer 去恢復。
-
同時為了記錄事務的狀態,類似對 Offset 的處理,引入 Transaction Coordinator 用于記錄 Transaction Log。
在集群中會有多個 Transaction Coordinator,每個 Tid 對應唯一一個 Transaction Coordinator。
注:Transaction Log 刪除策略是 Compact,已完成的事務會標記成 Null,Compact 后不保留。
做事務時,先標記開啟事務,寫入數據,全部成功就在 Transaction Log 中記錄為 Prepare Commit 狀態,否則寫入 Prepare Abort 的狀態。
之后再去給每個相關的 Partition 寫入一條 Marker(Commit 或者 Abort)消息,標記這個事務的 Message 可以被讀取或已經廢棄。成功后在 Transaction Log記錄下 Commit/Abort 狀態,至此事務結束。
數據流:
-
首先使用 Tid 請求任意一個 Broker(代碼中寫的是負載最小的 Broker),找到對應的 Transaction Coordinator。
-
請求 Transaction Coordinator 獲取到對應的 Pid,和 Pid 對應的 Epoch,這個 Epoch 用于防止僵死進程復活導致消息錯亂。
當消息的 Epoch 比當前維護的 Epoch 小時,拒絕掉。Tid 和 Pid 有一一對應的關系,這樣對于同一個 Tid 會返回相同的 Pid。
-
Client 先請求 Transaction Coordinator 記錄的事務狀態,初始狀態是 Begin,如果是該事務中第一個到達的,同時會對事務進行計時。
Client 輸出數據到相關的 Partition 中;Client 再請求 Transaction Coordinator 記錄 Offset 的事務狀態;Client 發送 Offset Commit 到對應 Offset Partition。
-
Client 發送 Commit 請求,Transaction Coordinator 記錄 Prepare Commit/Abort,然后發送 Marker 給相關的 Partition。
全部成功后,記錄 Commit/Abort 的狀態,最后這個記錄不需要等待其他 Replica 的 ACK,因為 Prepare 不丟就能保證最終的正確性了。
這里 Prepare 的狀態主要是用于事務恢復,例如給相關的 Partition 發送控制消息,沒發完就宕機了,備機起來后,Producer 發送請求獲取 Pid 時,會把未完成的事務接著完成。
當 Partition 中寫入 Commit 的 Marker 后,相關的消息就可被讀取。所以 Kafka 事務在 Prepare Commit 到 Commit 這個時間段內,消息是逐漸可見的,而不是同一時刻可見。
消費事務
前面都是從生產的角度看待事務。還需要從消費的角度去考慮一些問題。
消費時,Partition 中會存在一些消息處于未 Commit 狀態,即業務方應該看不到的消息,需要過濾這些消息不讓業務看到,Kafka 選擇在消費者進程中進行過來,而不是在 Broker 中過濾,主要考慮的還是性能。
Kafka 高性能的一個關鍵點是 Zero Copy,如果需要在 Broker 中過濾,那么勢必需要讀取消息內容到內存,就會失去 Zero Copy 的特性。
文件組織
Kafka 的數據,實際上是以文件的形式存儲在文件系統的。Topic 下有 Partition,Partition 下有 Segment,Segment 是實際的一個個文件,Topic 和 Partition 都是抽象概念。
在目錄 /partitionid}/ 下,存儲著實際的 Log 文件(即 Segment),還有對應的索引文件。
每個 Segment 文件大小相等,文件名以這個 Segment 中最小的 Offset 命名,文件擴展名是 .log。Segment 對應的索引的文件名字一樣,擴展名是 .index。
有兩個 Index 文件:
-
一個是 Offset Index 用于按 Offset 去查 Message。
-
一個是 Time Index 用于按照時間去查,其實這里可以優化合到一起,下面只說 Offset Index。
總體的組織是這樣的:
為了減少索引文件的大小,降低空間使用,方便直接加載進內存中,這里的索引使用稀疏矩陣,不會每一個 Message 都記錄下具體位置,而是每隔一定的字節數,再建立一條索引。?
索引包含兩部分:
-
BaseOffset:意思是這條索引對應 Segment 文件中的第幾條 Message。這樣做方便使用數值壓縮算法來節省空間。例如 Kafka 使用的是 Varint。
-
Position:在 Segment 中的絕對位置。
查找 Offset 對應的記錄時,會先用二分法,找出對應的 Offset 在哪個 Segment 中,然后使用索引,在定位出 Offset 在 Segment 中的大概位置,再遍歷查找 Message。
常用配置項
Broker 配置
Topic 配置
關于日志清理,默認當前正在寫的日志,是怎么也不會清理掉的。
還有 0.10 之前的版本,時間看的是日志文件的 Mtime,但這個值是不準確的,有可能文件被 Touch 一下,Mtime 就變了。因此從?0.10 版本開始,改為使用該文件最新一條消息的時間來判斷。
按大小清理這里也要注意,Kafka 在定時任務中嘗試比較當前日志量總大小是否超過閾值至少一個日志段的大小。如果超過但是沒超過一個日志段,那么就不會刪除。
總結
以上是生活随笔為你收集整理的震惊!原来这才是Kafka的“真面目”!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 来,聊聊程序员的爱情
- 下一篇: 一个Java对象到底占用多大内存?