kafka系统设计开篇
戳藍(lán)字“CSDN云計(jì)算”關(guān)注我們哦!
來(lái)源 |?靳剛同學(xué)
MQ(消息隊(duì)列)是跨進(jìn)程通信的方式之一,可理解為異步rpc,上游系統(tǒng)對(duì)調(diào)用結(jié)果的態(tài)度往往是重要不緊急。使用消息隊(duì)列有以下好處:業(yè)務(wù)解耦、流量削峰、靈活擴(kuò)展。接下來(lái)介紹消息中間件Kafka。
Kafka是什么?
Kafka是一個(gè)分布式的消息引擎。具有以下特征
能夠發(fā)布和訂閱消息流(類似于消息隊(duì)列)
以容錯(cuò)的、持久的方式存儲(chǔ)消息流
多分區(qū)概念,提高了并行能力
Kafka架構(gòu)總覽
Topic
消息的主題、隊(duì)列,每一個(gè)消息都有它的topic,Kafka通過(guò)topic對(duì)消息進(jìn)行歸類。Kafka中可以將Topic從物理上劃分成一個(gè)或多個(gè)分區(qū)(Partition),每個(gè)分區(qū)在物理上對(duì)應(yīng)一個(gè)文件夾,以”topicName_partitionIndex”的命名方式命名,該dir包含了這個(gè)分區(qū)的所有消息(.log)和索引文件(.index),這使得Kafka的吞吐率可以水平擴(kuò)展。
Partition
每個(gè)分區(qū)都是一個(gè) 順序的、不可變的消息隊(duì)列, 并且可以持續(xù)的添加;分區(qū)中的消息都被分了一個(gè)序列號(hào),稱之為偏移量(offset),在每個(gè)分區(qū)中此偏移量都是唯一的。
producer在發(fā)布消息的時(shí)候,可以為每條消息指定Key,這樣消息被發(fā)送到broker時(shí),會(huì)根據(jù)分區(qū)算法把消息存儲(chǔ)到對(duì)應(yīng)的分區(qū)中(一個(gè)分區(qū)存儲(chǔ)多個(gè)消息),如果分區(qū)規(guī)則設(shè)置的合理,那么所有的消息將會(huì)被均勻的分布到不同的分區(qū)中,這樣就實(shí)現(xiàn)了負(fù)載均衡。
Broker
Kafka server,用來(lái)存儲(chǔ)消息,Kafka集群中的每一個(gè)服務(wù)器都是一個(gè)Broker,消費(fèi)者將從broker拉取訂閱的消息
Producer
向Kafka發(fā)送消息,生產(chǎn)者會(huì)根據(jù)topic分發(fā)消息。生產(chǎn)者也負(fù)責(zé)把消息關(guān)聯(lián)到Topic上的哪一個(gè)分區(qū)。最簡(jiǎn)單的方式從分區(qū)列表中輪流選擇。也可以根據(jù)某種算法依照權(quán)重選擇分區(qū)。算法可由開(kāi)發(fā)者定義。
Cousumer
Consermer實(shí)例可以是獨(dú)立的進(jìn)程,負(fù)責(zé)訂閱和消費(fèi)消息。消費(fèi)者用consumerGroup來(lái)標(biāo)識(shí)自己。同一個(gè)消費(fèi)組可以并發(fā)地消費(fèi)多個(gè)分區(qū)的消息,同一個(gè)partition也可以由多個(gè)consumerGroup并發(fā)消費(fèi),但是在consumerGroup中一個(gè)partition只能由一個(gè)consumer消費(fèi)
CousumerGroup
Consumer Group:同一個(gè)Consumer Group中的Consumers,Kafka將相應(yīng)Topic中的每個(gè)消息只發(fā)送給其中一個(gè)Consumer
Kafka producer 設(shè)計(jì)原理
一、發(fā)送消息的流程
1.序列化消息&&.計(jì)算partition
根據(jù)key和value的配置對(duì)消息進(jìn)行序列化,然后計(jì)算partition:
ProducerRecord對(duì)象中如果指定了partition,就使用這個(gè)partition。否則根據(jù)key和topic的partition數(shù)目取余,如果key也沒(méi)有的話就隨機(jī)生成一個(gè)counter,使用這個(gè)counter來(lái)和partition數(shù)目取余。這個(gè)counter每次使用的時(shí)候遞增。
2發(fā)送到batch&&喚醒Sender 線程
根據(jù)topic-partition獲取對(duì)應(yīng)的batchs(Dueue<ProducerBatch>),然后將消息append到batch中.如果有batch滿了則喚醒Sender 線程。隊(duì)列的操作是加鎖執(zhí)行,所以batch內(nèi)消息時(shí)有序的。后續(xù)的Sender操作當(dāng)前方法異步操作。
3.Sender把消息有序發(fā)到?broker(tp replia leader)
3.1?確定tp relica?leader 所在的broker?
Kafka中 每臺(tái)broker都保存了kafka集群的metadata信息,metadata信息里包括了每個(gè)topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客戶端從任一broker都可以獲取到需要的metadata信息;sender線程通過(guò)metadata信息可以知道tp leader的brokerId
producer也保存了metada信息,同時(shí)根據(jù)metadata更新策略(定期更新metadata.max.age.ms、失效檢測(cè),強(qiáng)制更新:檢查到metadata失效以后,調(diào)用metadata.requestUpdate()強(qiáng)制更新
3.2 冪等性發(fā)送
為實(shí)現(xiàn)Producer的冪等性,Kafka增加了pid和seq。Producer中每個(gè)RecordBatch都有一個(gè)單調(diào)遞增的seq; Broker上每個(gè)tp也會(huì)維護(hù)pid-seq的映射,并且每Commit都會(huì)更新lastSeq。這樣recordBatch到來(lái)時(shí),broker會(huì)先檢查RecordBatch再保存數(shù)據(jù):如果batch中 baseSeq(第一條消息的seq)比Broker維護(hù)的序號(hào)(lastSeq)大1,則保存數(shù)據(jù),否則不保存(inSequence方法)。
4.?Sender處理broker發(fā)來(lái)的produce response
一旦broker處理完Sender的produce請(qǐng)求,就會(huì)發(fā)送produce response給Sender,此時(shí)producer將執(zhí)行我們?yōu)閟end()設(shè)置的回調(diào)函數(shù)。至此producer的send執(zhí)行完畢。
二、吞吐性&&延時(shí)
?buffer.memory:buffer設(shè)置大了有助于提升吞吐性,但是batch太大會(huì)增大延遲,可搭配linger_ms參數(shù)使用
linger_ms:如果batch太大,或者producer qps不高,batch添加的會(huì)很慢,我們可以強(qiáng)制在linger_ms時(shí)間后發(fā)送batch數(shù)據(jù)
ack:producer收到多少broker的答復(fù)才算真的發(fā)送成功
0表示producer無(wú)需等待leader的確認(rèn)(吞吐最高、數(shù)據(jù)可靠性最差)
1代表需要leader確認(rèn)寫(xiě)入它的本地log并立即確認(rèn)
-1/all 代表所有的ISR都完成后確認(rèn)(吞吐最低、數(shù)據(jù)可靠性最高)
三、Sender線程和長(zhǎng)連接
每初始化一個(gè)producer實(shí)例,都會(huì)初始化一個(gè)Sender實(shí)例,新增到broker的長(zhǎng)連接。
代碼角度:每初始化一次KafkaProducer,都賦一個(gè)空的client
public KafkaProducer(final Map<String, Object> configs) { this(configs, null, null, null, null, null, Time.SYSTEM); }? ?
終端查看TCP連接數(shù):lsof -p portNum -np | grep TCP
由此可見(jiàn),適當(dāng)增加producer可提升系統(tǒng)吞吐型
Consumer設(shè)計(jì)原理
一、poll消息
消費(fèi)者通過(guò)fetch線程拉消息(單線程)
消費(fèi)者通過(guò)心跳線程來(lái)與broker發(fā)送心跳。超時(shí)會(huì)認(rèn)為掛掉
每個(gè)consumer group在broker上都有一個(gè)coordnator來(lái)管理,消費(fèi)者加入和退出,以及消費(fèi)消息的位移都由coordnator處理。
二、位移管理
consumer的消息位移代表了當(dāng)前group對(duì)topic-partition的消費(fèi)進(jìn)度,consumer宕機(jī)重啟后可以繼續(xù)從該offset開(kāi)始消費(fèi)。
在kafka0.8之前,位移信息存放在zookeeper上,由于zookeeper不適合高并發(fā)的讀寫(xiě),新版本Kafka把位移信息當(dāng)成消息,發(fā)往__consumers_offsets 這個(gè)topic所在的broker,__consumers_offsets默認(rèn)有50個(gè)分區(qū)。
消息的key 是groupId+topic_partition,value 是offset.
Kafka?Group?狀態(tài)
Empty:初始狀態(tài),Group 沒(méi)有任何成員,如果所有的 offsets 都過(guò)期的話就會(huì)變成 Dead
PreparingRebalance:Group 正在準(zhǔn)備進(jìn)行 Rebalance
AwaitingSync:Group 正在等待來(lái) group leader 的 分配方案
Stable:穩(wěn)定的狀態(tài)(Group is stable);
Dead:Group 內(nèi)已經(jīng)沒(méi)有成員,并且它的 Metadata 已經(jīng)被移除注意
三、重平衡reblance
當(dāng)一些原因?qū)е耤onsumer對(duì)partition消費(fèi)不再均勻時(shí),kafka會(huì)自動(dòng)執(zhí)行reblance,使得consumer對(duì)partition的消費(fèi)再次平衡。
什么時(shí)候發(fā)生rebalance?:
組訂閱topic數(shù)變更
topic partition數(shù)變更
consumer成員變更
consumer 加入群組或者離開(kāi)群組的時(shí)候
consumer被檢測(cè)為崩潰的時(shí)候
reblance過(guò)程
舉例1?consumer被檢測(cè)為崩潰引起的reblance
比如心跳線程在timeout時(shí)間內(nèi)沒(méi)和broker發(fā)送心跳,此時(shí)coordnator認(rèn)為該group應(yīng)該進(jìn)行reblance。接下來(lái)其他consumer發(fā)來(lái)fetch請(qǐng)求后,coordnator將回復(fù)他們進(jìn)行reblance通知。當(dāng)consumer成員收到請(qǐng)求后,只有l(wèi)eader會(huì)根據(jù)分配策略進(jìn)行分配,然后把各自的分配結(jié)果返回給coordnator。這個(gè)時(shí)候只有consumer leader返回的是實(shí)質(zhì)數(shù)據(jù),其他返回的都為空。收到分配方法后,consumer將會(huì)把分配策略同步給各consumer
舉例2 consumer加入引起的reblance
使用join協(xié)議,表示有consumer 要加入到group中
使用sync?協(xié)議,根據(jù)分配規(guī)則進(jìn)行分配
(上圖摘自網(wǎng)絡(luò))
引申:以上reblance機(jī)制存在的問(wèn)題
在大型系統(tǒng)中,一個(gè)topic可能對(duì)應(yīng)數(shù)百個(gè)consumer實(shí)例。這些consumer陸續(xù)加入到一個(gè)空消費(fèi)組將導(dǎo)致多次的rebalance;此外consumer 實(shí)例啟動(dòng)的時(shí)間不可控,很有可能超出coordinator確定的rebalance timeout(即max.poll.interval.ms),將會(huì)再次觸發(fā)rebalance,而每次rebalance的代價(jià)又相當(dāng)?shù)卮?#xff0c;因?yàn)楹芏酄顟B(tài)都需要在rebalance前被持久化,而在rebalance后被重新初始化。
新版本改進(jìn)
通過(guò)延遲進(jìn)入PreparingRebalance狀態(tài)減少reblance次數(shù)
新版本新增了group.initial.rebalance.delay.ms參數(shù)。空消費(fèi)組接受到成員加入請(qǐng)求時(shí),不立即轉(zhuǎn)化到PreparingRebalance狀態(tài)來(lái)開(kāi)啟reblance。當(dāng)時(shí)間超過(guò)group.initial.rebalance.delay.ms后,再把group狀態(tài)改為PreparingRebalance(開(kāi)啟reblance)。實(shí)現(xiàn)機(jī)制是在coordinator底層新增一個(gè)group狀態(tài):InitialReblance。假設(shè)此時(shí)有多個(gè)consumer陸續(xù)啟動(dòng),那么group狀態(tài)先轉(zhuǎn)化為InitialReblance,待group.initial.rebalance.delay.ms時(shí)間后,再轉(zhuǎn)換為PreparingRebalance(開(kāi)啟reblance)
Broker設(shè)計(jì)原理
Broker 是Kafka 集群中的節(jié)點(diǎn)。負(fù)責(zé)處理生產(chǎn)者發(fā)送過(guò)來(lái)的消息,消費(fèi)者消費(fèi)的請(qǐng)求。以及集群節(jié)點(diǎn)的管理等。由于涉及內(nèi)容較多,先簡(jiǎn)單介紹,后續(xù)專門抽出一篇文章分享?
一、broker zk注冊(cè)
二、broker消息存儲(chǔ)
Kafka的消息以二進(jìn)制的方式緊湊地存儲(chǔ),節(jié)省了很大空間
此外消息存在ByteBuffer而不是堆,這樣broker進(jìn)程掛掉時(shí),數(shù)據(jù)不會(huì)丟失,同時(shí)避免了gc問(wèn)題
通過(guò)零拷貝和順序?qū)ぶ?#xff0c;讓消息存儲(chǔ)和讀取速度都非常快
處理fetch請(qǐng)求的時(shí)候通過(guò)zero-copy?加快速度
三、broker狀態(tài)數(shù)據(jù)
broker設(shè)計(jì)中,每臺(tái)機(jī)器都保存了相同的狀態(tài)數(shù)據(jù)。主要包括以下:
controller所在的broker ID,即保存了當(dāng)前集群中controller是哪臺(tái)broker
集群中所有broker的信息:比如每臺(tái)broker的ID、機(jī)架信息以及配置的若干組連接信息
集群中所有節(jié)點(diǎn)的信息:嚴(yán)格來(lái)說(shuō),它和上一個(gè)有些重復(fù),不過(guò)此項(xiàng)是按照broker ID和監(jiān)聽(tīng)器類型進(jìn)行分組的。對(duì)于超大集群來(lái)說(shuō),使用這一項(xiàng)緩存可以快速地定位和查找給定節(jié)點(diǎn)信息,而無(wú)需遍歷上一項(xiàng)中的內(nèi)容,算是一個(gè)優(yōu)化吧
集群中所有分區(qū)的信息:所謂分區(qū)信息指的是分區(qū)的leader、ISR和AR信息以及當(dāng)前處于offline狀態(tài)的副本集合。這部分?jǐn)?shù)據(jù)按照topic-partitionID進(jìn)行分組,可以快速地查找到每個(gè)分區(qū)的當(dāng)前狀態(tài)。(注:AR表示assigned replicas,即創(chuàng)建topic時(shí)為該分區(qū)分配的副本集合)
四、broker負(fù)載均衡
分區(qū)數(shù)量負(fù)載:各臺(tái)broker的partition數(shù)量應(yīng)該均勻
partition Replica分配算法如下:
將所有Broker(假設(shè)共n個(gè)Broker)和待分配的Partition排序
將第i個(gè)Partition分配到第(i mod n)個(gè)Broker上
將第i個(gè)Partition的第j個(gè)Replica分配到第((i + j) mod n)個(gè)Broker上
存在的問(wèn)題
在kafka1.1之前,Kafka能夠保證各臺(tái)broker上partition數(shù)量均勻,但由于每個(gè)partition內(nèi)的消息數(shù)不同,可能存在不同硬盤(pán)之間內(nèi)存占用差異大的情況。
新版本改進(jìn)
增加容量大小負(fù)載:每臺(tái)broker的硬盤(pán)占用大小應(yīng)該均勻
Kafka1.1中增加了副本跨路徑遷移功能kafka-reassign-partitions.sh,我們可以結(jié)合它跟監(jiān)控系統(tǒng),實(shí)現(xiàn)自動(dòng)化的負(fù)載均衡
Kafka高可用
在介紹kafka高可用之前先介紹幾個(gè)概念
同步復(fù)制:要求所有能工作的Follower都復(fù)制完,這條消息才會(huì)被認(rèn)為commit,這種復(fù)制方式極大的影響了吞吐率
異步復(fù)制:Follower異步的從Leader pull數(shù)據(jù),data只要被Leader寫(xiě)入log認(rèn)為已經(jīng)commit,這種情況下如果Follower落后于Leader的比較多,如果Leader突然宕機(jī),會(huì)丟失數(shù)據(jù)
Isr
Kafka結(jié)合同步復(fù)制和異步復(fù)制,使用ISR(與Partition Leader保持同步的Replica列表)的方式在確保數(shù)據(jù)不丟失和吞吐率之間做了平衡。Producer只需把消息發(fā)送到Partition Leader,Leader將消息寫(xiě)入本地Log。Follower則從Leader pull數(shù)據(jù)。Follower在收到該消息向Leader發(fā)送ACK。一旦Leader收到了ISR中所有Replica的ACK,該消息就被認(rèn)為已經(jīng)commit了,Leader將增加HW并且向Producer發(fā)送ACK。這樣如果leader掛了,只要Isr中有一個(gè)replica存活,就不會(huì)丟數(shù)據(jù)。
Isr動(dòng)態(tài)更新
Leader會(huì)跟蹤ISR,如果ISR中一個(gè)Follower宕機(jī),或者落后太多,Leader將把它從ISR中移除。這里所描述的“落后太多”指Follower復(fù)制的消息落后于Leader后的條數(shù)超過(guò)預(yù)定值(replica.lag.max.messages)或者Follower超過(guò)一定時(shí)間(replica.lag.time.max.ms)未向Leader發(fā)送fetch請(qǐng)求。
broker Nodes In Zookeeper?
/brokers/topics/[topic]/partitions/[partition]/state 保存了topic-partition的leader和Isr等信息
Controller負(fù)責(zé)broker故障檢查&&故障轉(zhuǎn)移(fail/recover)
?1. Controller在Zookeeper上注冊(cè)Watch,一旦有Broker宕機(jī),其在Zookeeper對(duì)應(yīng)的znode會(huì)自動(dòng)被刪除,Zookeeper會(huì)觸發(fā) Controller注冊(cè)的watch,Controller讀取最新的Broker信息
?2. Controller確定set_p,該集合包含了宕機(jī)的所有Broker上的所有Partition
?3. 對(duì)set_p中的每一個(gè)Partition,選舉出新的leader、Isr,并更新結(jié)果。
?3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該P(yáng)arti? ? ? ?tion?當(dāng)前的ISR??
3.2 決定該P(yáng)artition的新Leader和Isr。如果當(dāng)前ISR中有至少一個(gè)Replica還幸存,則選擇其中一個(gè)作為新Leader,新的ISR則包含當(dāng)前ISR中所有幸存的Replica。否則選擇該P(yáng)artition中任意一個(gè)幸存的Replica作為新的Leader以及ISR(該場(chǎng)景下可能會(huì)有潛在的數(shù)據(jù)丟失)??
? ??
3.3更新Leader、ISR、leader_epoch、controller_epoch:寫(xiě)入/brokers/topics/[topic]/partitions/[partition]/state
4. 直接通過(guò)RPC向set_p相關(guān)的Broker發(fā)送LeaderAndISRRequest命令。Controller可以在一個(gè)RPC操作中發(fā)送多個(gè)命令從而提高效率。
Controller掛掉
每個(gè) broker 都會(huì)在 zookeeper 的臨時(shí)節(jié)點(diǎn)?"/controller" 注冊(cè) watcher,當(dāng) controller 宕機(jī)時(shí)?"/controller"?會(huì)消失,觸發(fā)broker的watch,每個(gè) broker 都嘗試創(chuàng)建新的 controller path,只有一個(gè)競(jìng)選成功并當(dāng)選為 controller。
使用Kafka如何保證冪等性
不丟消息
首先kafka保證了對(duì)已提交消息的at least保證
Sender有重試機(jī)制
producer業(yè)務(wù)方在使用producer發(fā)送消息時(shí),注冊(cè)回調(diào)函數(shù)。在onError方法中重發(fā)消息
consumer?拉取到消息后,處理完畢再commit,保證commit的消息一定被處理完畢
不重復(fù)
consumer拉取到消息先保存,commit成功后刪除緩存數(shù)據(jù)
Kafka高性能
Kafka本身高具有的性能
partition提升了并發(fā)
zero-copy
順序?qū)懭?/p>
消息聚集batch
頁(yè)緩存
業(yè)務(wù)方可對(duì) Kafka producer的優(yōu)化
增大producer數(shù)量
ack配置
batch
福利
掃描添加小編微信,備注“姓名+公司職位”,加入【云計(jì)算學(xué)習(xí)交流群】,和志同道合的朋友們共同打卡學(xué)習(xí)!
推薦閱讀:
做了中臺(tái)就不會(huì)死嗎?每年至少40%開(kāi)發(fā)資源是被浪費(fèi)的!
美女主播變大媽:在bug翻車現(xiàn)場(chǎng)說(shuō)測(cè)試策略
漫畫(huà)高手、小說(shuō)家、滑板專家……解鎖程序員的另一面!
手把手教你如何用Python模擬登錄淘寶
鴻蒙霸榜 GitHub,從最初的 Plan B 到“取代 Android”?
每天超50億推廣流量、3億商品展現(xiàn),阿里媽媽的推薦技術(shù)有多牛?
真香,朕在看了!
總結(jié)
以上是生活随笔為你收集整理的kafka系统设计开篇的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: boost::container_has
- 下一篇: 手机开通网银的步骤 银行官网自助开通方法