消息中间件—Kafka 的设计思想
1.動(dòng)機(jī)
設(shè)計(jì) kafka 初衷,作為統(tǒng)一平臺(tái)處理大公司的實(shí)時(shí)數(shù)據(jù)。所以 必須具有如下特性:
- 支持海量數(shù)據(jù)
- 高吞吐量
- 低延遲(實(shí)時(shí)性)
- 支持分區(qū),分布式
- 容錯(cuò)
2.持久化
kafka 高度依賴(lài) 文件系統(tǒng) 存儲(chǔ)和緩存消息。通過(guò)對(duì)磁盤(pán)的順序讀寫(xiě),并借助 OS 層面的 頁(yè)緩存(page cache),保證優(yōu)于緩存在內(nèi)存中或其他結(jié)構(gòu)中。
為何使用磁盤(pán)效率仍然很高:
利用磁盤(pán)的順序讀寫(xiě),操作一個(gè)文件,將數(shù)據(jù)追加到文件的末尾。相比于隨機(jī)讀寫(xiě),效率很高。 利用 OS 層面的頁(yè)緩存(page cache),順序讀文件可以預(yù)讀數(shù)據(jù)到 page cache。通過(guò)自動(dòng)訪問(wèn)所有可用內(nèi)存 以及 存儲(chǔ)緊湊型字節(jié)結(jié)構(gòu)而非單個(gè)對(duì)象提高內(nèi)存使用率。OS緩存相對(duì)于進(jìn)程內(nèi)的緩存,重啟后仍然可用,不需要重建。 所有的操作時(shí)間復(fù)雜度都是 常量時(shí)間O(1),與數(shù)據(jù)大小無(wú)關(guān),讀 和 寫(xiě) 不會(huì)互相阻塞。
3.效率
使用磁盤(pán)效率低下主要有兩個(gè)原因:
過(guò)多的小 I/O 操作:發(fā)生在客戶(hù)端和服務(wù)端之間,以及 服務(wù)端自己的持久化操作中 過(guò)多的字節(jié)復(fù)制 針對(duì) 小 I/O 操作,kafka 根據(jù) "message set" 抽象構(gòu)建了一個(gè)協(xié)議,該 抽象 自然地將消息分組在一起。該協(xié)議允許網(wǎng)絡(luò)請(qǐng)求將消息分組在一起,并分?jǐn)偩W(wǎng)絡(luò)往返的開(kāi)銷(xiāo),而不是一次發(fā)送一條消息。服務(wù)器依次將消息塊一次附加到其日志中,而消費(fèi)者一次獲取大型線性塊。
針對(duì)過(guò)多的字節(jié)復(fù)制,使用了由生產(chǎn)者、代理 和 消費(fèi)者共享的標(biāo)準(zhǔn)化二進(jìn)制消息格式(這樣,數(shù)據(jù)塊就可以在它們之間不進(jìn)行修改的情況下進(jìn)行傳輸)。服務(wù)器所持有的消息日志 本身是一個(gè)文件目錄,每個(gè)文件都由一系列 "message set" 填充。這些消息集以生產(chǎn)者和消費(fèi)者使用的相同格式寫(xiě)入磁盤(pán)。維護(hù)這種通用格式可以?xún)?yōu)化 持久化日志塊的 網(wǎng)絡(luò)傳輸。
存儲(chǔ)在文件中的信息通過(guò)網(wǎng)絡(luò)發(fā)送給客戶(hù),經(jīng)歷的幾個(gè)路徑:
- 操作系統(tǒng)在內(nèi)核空間將數(shù)據(jù)從磁盤(pán)讀取到 page cache 中。
- 應(yīng)用程序從內(nèi)核空間讀取到 用戶(hù)空間緩沖區(qū)。
- 應(yīng)用程序?qū)?shù)據(jù)寫(xiě)回到內(nèi)核空間的套接字緩沖區(qū)。
- 操作系統(tǒng)將數(shù)據(jù)從套接字緩沖區(qū)復(fù)制到 NIC 緩沖區(qū)(NIC:網(wǎng)絡(luò)接口控制器)。
- 以上產(chǎn)生了四個(gè)副本拷貝,2個(gè)系統(tǒng)調(diào)用開(kāi)銷(xiāo),效率低下。
大致流程
上下文切換開(kāi)銷(xiāo)基于 零拷貝技術(shù):消息數(shù)據(jù)直接從 page cache 發(fā)送到網(wǎng)絡(luò)。linux 中使用 sendfile 完成零拷貝技術(shù)。java 中 java.nio.channels.FileChannel 的 transferTo() 方法也使用了零拷貝技術(shù)。
減少開(kāi)銷(xiāo)
kafka 通過(guò) page cache 和 sendfile 的組合,將看不到磁盤(pán)上的任何讀取活動(dòng),因?yàn)樗鼈儗⑼耆珡木彺嬷刑峁?shù)據(jù)。
端到端的批量壓縮 Kafka通過(guò)遞歸消息集來(lái)支持同時(shí)壓縮多個(gè)消息而減少相同消息的冗余。 一批消息可以一起壓縮并以此形式發(fā)送到服務(wù)器。 這批消息將以壓縮形式寫(xiě)入,并將在日志中保持壓縮,并且只能由消費(fèi)者解壓縮。Kafka支持GZIP和Snappy壓縮協(xié)議。
4.生產(chǎn)者
4.1負(fù)載均衡
生產(chǎn)者將數(shù)據(jù)直接發(fā)送給分區(qū)對(duì)應(yīng)的 leader。為了實(shí)現(xiàn)這一點(diǎn),所有的 kafka 節(jié)點(diǎn)要能夠在 任何時(shí)候應(yīng)答 哪個(gè)服務(wù)器還活著以及 topic分區(qū)的leader在哪里的 元數(shù)據(jù)請(qǐng)求。
客戶(hù)端自己控制 消息發(fā)送到哪個(gè)分區(qū),這可以隨機(jī)完成,實(shí)現(xiàn)一種隨機(jī)的負(fù)載平衡,也可以通過(guò)一些語(yǔ)義分區(qū)函數(shù)完成。
4.2異步發(fā)送
啟用 kafka 生產(chǎn)者 的批處理,kafka 將在內(nèi)存中累積數(shù)據(jù)然后一次性批量發(fā)送。可以配置 累計(jì)不超過(guò)固定數(shù)量的消息(bach.size),等待不超過(guò)固定延遲時(shí)間(linger.ms)。
5.消費(fèi)者
5.1拉 VS 推送
消費(fèi)者主動(dòng)拉取消息缺點(diǎn):如果 broker 沒(méi)有數(shù)據(jù),消費(fèi)者會(huì)輪詢(xún),忙等待直到數(shù)據(jù)到達(dá)。kafka 可以在拉請(qǐng)求中設(shè)置一些參數(shù),允許使用者請(qǐng)求在“長(zhǎng)輪詢(xún)”中阻塞,等待數(shù)據(jù)到達(dá)(也可以選擇等待,直到給定的字節(jié)數(shù)可用,以確保傳輸大小很大)
消費(fèi)者被動(dòng)推送消息缺點(diǎn):很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,消息發(fā)送速率是由 broker 決定的,broker 是盡可能快的將消息發(fā)送出去,這樣會(huì)造成消費(fèi)者來(lái)不及處理消息,典型的表現(xiàn)就是 網(wǎng)絡(luò)阻塞 和 拒絕服務(wù)。
5.2消費(fèi)者的定位
topic 被分為一組有序的分區(qū),每個(gè)分區(qū)在任何給定的時(shí)間都由每個(gè)訂閱消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)。這意味著消費(fèi)者在每個(gè)分區(qū)中的位置只是一個(gè)整數(shù),這個(gè)整數(shù)代表了即將要消費(fèi)的消息的偏移量。這樣做的好處是可以返回到舊的偏移量進(jìn)行消費(fèi)。
5.3離線數(shù)據(jù)加載
可伸縮持久性允許消費(fèi)者只定期使用,例如批量數(shù)據(jù)加載,定期將數(shù)據(jù)批量加載到離線系統(tǒng)(如Hadoop或關(guān)系數(shù)據(jù)倉(cāng)庫(kù))中。
6.消息傳遞語(yǔ)義
很明顯,消息傳遞保證能夠提供多種可能:
- 最多一次:消息可能丟失,但是絕不會(huì)重發(fā)
- 至少一次:消息絕不會(huì)丟失,但是可能會(huì)重發(fā)
- 正好一次:每條消息被傳遞一次 kafka 的消息傳遞語(yǔ)義:
一旦發(fā)布的消息已提交到日志,只要副本分區(qū)寫(xiě)入了此消息的一個(gè)broker仍然"活著”,它就不會(huì)丟失。
0.11.0.0 版本之前,如果一個(gè)生產(chǎn)者沒(méi)有收到消息提交的響應(yīng),那么生產(chǎn)者只能重新發(fā)送該消息。這就保證了至少一次的傳遞語(yǔ)義。如果上一次的請(qǐng)求實(shí)際上是成功的,那么消息就會(huì)再次寫(xiě)到日志中,造成重復(fù)消費(fèi)。
0.11.0.0 版本之后,kafka 生產(chǎn)者支持冪等傳遞,保證重新發(fā)送不會(huì)導(dǎo)致日志中有重復(fù)記錄。為了實(shí)現(xiàn)這一點(diǎn),broker 為 每一個(gè)生產(chǎn)者 分配一個(gè) ID,使用生產(chǎn)者隨每條消息一起發(fā)送的序列號(hào)來(lái)消除重復(fù)的消息。
同時(shí)也是從 0.11.0.0 版本之后,生產(chǎn)者支持使用事務(wù)類(lèi)語(yǔ)義將消息發(fā)送到多個(gè) topic 分區(qū)的能力:即,要么所有消息都已成功寫(xiě)入,要么都未成功寫(xiě)入。這方面的主要用例是在Kafka topic 之間進(jìn)行一次處理。
當(dāng)然,不是所有的使用場(chǎng)景都需要如此嚴(yán)謹(jǐn)?shù)谋U?#xff0c;對(duì)于延遲敏感的,我們?cè)试S生產(chǎn)者指定它想要的耐用性水平。如生產(chǎn)者可以指定它獲取需等待10毫秒量級(jí)上的響應(yīng)。生產(chǎn)者也可以指定異步發(fā)送,或只等待leader(不需要副本的響應(yīng))有響應(yīng)。
從消費(fèi)者的角度描述語(yǔ)義:
- 讀取到消息,在日志中保存位置,最后處理消息。這種順序 如果消費(fèi)者在保存位置之后,處理消息之前崩潰,數(shù)據(jù)會(huì)丟失,屬于 最多一次的語(yǔ)義。
- 讀取消息,處理消息,在日志中保存位置。這種順序,如果消費(fèi)者在處理消息之后,日志中保存位置之前崩潰,數(shù)據(jù)會(huì)被多次處理,屬于至少一次的語(yǔ)義。在多數(shù)情況下,消息都有一個(gè)主鍵,所以更新是冪等的(一次執(zhí)行和多次執(zhí)行的影響相同)。 kafka 默認(rèn)是保證“至少一次”傳遞,并允許用戶(hù)通過(guò)禁止生產(chǎn)者重試和處理一批消息前提交它的偏移量來(lái)實(shí)現(xiàn) “最多一次”傳遞。而“正好一次”傳遞需要與目標(biāo)存儲(chǔ)系統(tǒng)合作,但kafka提供了偏移量,所以實(shí)現(xiàn)這個(gè)很簡(jiǎn)單。
7.副本
kafka 在各個(gè)服務(wù)器上備份 每個(gè) topic 的 partition (通過(guò) replication factor 設(shè)置副本數(shù))。當(dāng)集群中的某個(gè)服務(wù)器發(fā)生故障時(shí),自動(dòng)轉(zhuǎn)移到這些副本,以便在故障時(shí),消息仍然可用。
kafka 的默認(rèn) 副本因子為 1,即不創(chuàng)建副本。副本因子是指副本的總數(shù),包括 leader 。
副本以 topic 的 partition 為單位。在非故障的情況下,kafka 中的每個(gè) partition 都有一個(gè) leader,0 個(gè)或者多個(gè) follower。所有的讀 和寫(xiě)都指向 分區(qū)的 leader。通常,分區(qū)數(shù) 多于 broker 的數(shù)量,leader 均勻的分布在 broker 上。follower 的日志與 leader 的日志相同,即相同的 偏移量 offset 和 消息順序 。(當(dāng)然,有可能在某個(gè)時(shí)間點(diǎn),leader 上比 follower 多幾條還未同步的消息)。
kafka 節(jié)點(diǎn)存活的2個(gè)條件:
- 一個(gè)節(jié)點(diǎn)必須能維持與 zookeeper 的會(huì)話(通過(guò) zookeeper 的心跳機(jī)制)。
- 如果該節(jié)點(diǎn)是 slave,它必須復(fù)制 leader 的寫(xiě)數(shù)據(jù),并且不能落后太多。 如果節(jié)點(diǎn) 死掉,卡主,或者落后太多,leader 將 從 同步副本 ISR (In Sync Replicas)中移除該節(jié)點(diǎn)。落后多少是由 replica.lag.max.messages 控制,卡主多久算卡主是由 replica.lag.time.max.ms 控制。
kafka 動(dòng)態(tài)維護(hù)一組同步 leader 數(shù)據(jù)的副本(ISR),只有這個(gè)組中的成員才有資格當(dāng)選 leader。在所有同步副本都收到寫(xiě)操作之前,不會(huì)認(rèn)為已提交對(duì)Kafka分區(qū)的寫(xiě)操作。這組 ISR 保存在 zookeeper 中,正因?yàn)槿绱?#xff0c;在ISR中的任何副本都有資格當(dāng)選leader。對(duì)于 f+1 個(gè) 副本的 kafka, topic 可以容忍f失敗而不會(huì)丟失已提交的消息。
如果所有的節(jié)點(diǎn)都死掉,有兩種可以實(shí)現(xiàn)的方式:
- 等待 ISR 列表中的節(jié)點(diǎn)活過(guò)來(lái),并且選擇該節(jié)點(diǎn)作為 leader.
- 選擇第一個(gè)活過(guò)來(lái)的節(jié)點(diǎn)(不管它在 ISR 列表中)作為 leader. 從 0.11.0.0 開(kāi)始 kafka 默認(rèn)選擇第一種策略,等待一致性的副本;可以通過(guò)配置 unclean.leader.election.enable 為 true 來(lái)選用第二種策略。這兩種策略是 可用性 和一致性的權(quán)衡,需要根據(jù)實(shí)際業(yè)務(wù)來(lái)決定。
可用性 和 耐久性保證
當(dāng)寫(xiě)消息到 kafka 時(shí),生產(chǎn)者可以 配置 需要 leader 收到的確認(rèn)數(shù) 來(lái)確定是否完成請(qǐng)求,通過(guò) 配置 acks 滿(mǎn)足多種情況:
- acks = 0 :生產(chǎn)者不會(huì)等待服務(wù)器的任何確認(rèn),消息記錄將被立刻添加到 socket 緩沖區(qū)并視為已發(fā)送。這種情況無(wú)法確保服務(wù)器已經(jīng)接收到消息記錄,重試的配置也不會(huì)生效。每個(gè)記錄返回的偏移量始終被設(shè)置為 1.
- acks = 1 :服務(wù)器端的 leader 寫(xiě)入消息到本地日志就立即響應(yīng)生產(chǎn)者,而不等待 follower 應(yīng)答。這種情況,如果在服務(wù)器響應(yīng)生產(chǎn)者之后,復(fù)制到 follower 之前掛掉 就會(huì)丟失數(shù)據(jù)。
- acks = all(-1):服務(wù)器端的 leader 會(huì)等待 ISR 中所有副本同步響應(yīng)來(lái)確認(rèn)消息記錄。這保證了只要 ISR 中還有一個(gè)副本存活就不會(huì)丟失記錄,也可以設(shè)置為 -1; 提供兩種 topic 級(jí)別的配置 來(lái)確保 持久性 而非 可用性。
unclean.leader.election.enable 設(shè)為 false,(默認(rèn)即為 false)即 所有的副本都不可用時(shí),分區(qū)才不可用。只有當(dāng) ISR 中的節(jié)點(diǎn) 活過(guò)來(lái) 分區(qū)才能可用。 指定 一個(gè)最小的 ISR 數(shù)量值,通過(guò) min.insync.replicas 來(lái)配置,只有當(dāng) ISR 中的數(shù)量 超過(guò)最小值,分區(qū)才會(huì)接受寫(xiě)入操作,以此來(lái)防止僅寫(xiě)入單個(gè)副本而后副本不可用而導(dǎo)致的消息的丟失。該設(shè)置僅在 acks = all 并保證至少有這么多同步副本確認(rèn)消息時(shí)生效。 副本管理
上面關(guān)于復(fù)制日志的討論實(shí)際上只涉及了一個(gè)日志,例如 一個(gè) topic 的partition,然而,kafka 集群管理著成百上千個(gè)這樣的分區(qū)。通過(guò) round-robin 的方式平衡 集群中的分區(qū),避免 大部分的分區(qū)分布在少量的及誒單上,同樣,平衡 leader,使在分區(qū)份額上的每個(gè)節(jié)點(diǎn)都是 leader。
kafka 選擇 其中一個(gè) broker 作為 controller(到 zookeeper 上注冊(cè),先到先得)。該 controller 檢測(cè) broker 級(jí)別的故障,并負(fù)責(zé)更改 故障 broker 上受影響的 分區(qū)的 leader。這樣就可以批量處理 leader 的變更。如果 controller 故障,其他存活的 broker 將會(huì)成為新的 controller(同樣需要到 zookeeper 上注冊(cè))。
歡迎關(guān)注 編程那點(diǎn)事兒,隨時(shí)隨地想學(xué)就學(xué)~
轉(zhuǎn)載于:https://juejin.im/post/5c90538a5188252dac6d2881
總結(jié)
以上是生活随笔為你收集整理的消息中间件—Kafka 的设计思想的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 记录console的使用
- 下一篇: Reactor和Proactor对比以及