kafka控制器,复制与存储小结
【README】
- 1,本文主要總結(jié)kafka復(fù)制,存儲(chǔ)細(xì)節(jié);
- 2,本文的kafka集群版本是3.0.0, 有3個(gè)broker,分別是 centos201, centos202, centos203 對(duì)應(yīng)的brokerid為 1, 2, 3 ;
【1】kafka內(nèi)部原理
【1.1】broker-消息中心點(diǎn)
1)broker:一個(gè)獨(dú)立的kafka服務(wù)器節(jié)點(diǎn);也稱為發(fā)送消息的中心點(diǎn);
- kafka使用zk維護(hù)集群成員關(guān)系;
- 每個(gè)broker都有自己的id存儲(chǔ)在zk;broker啟動(dòng)時(shí),創(chuàng)建zk節(jié)點(diǎn)把自己id注冊(cè)到zk;
2)zk存儲(chǔ)的kafka集群信息的節(jié)點(diǎn)列表
# zk存儲(chǔ)的kafka集群信息的節(jié)點(diǎn) [zk: localhost:2181(CONNECTED) 1] ls / [cluster,controller_epoch,controller,brokers,zookeeper,feature,admin,isr_change_notification,consumers,log_dir_event_notification,latest_producer_id_block,config]查看zk中的 broker id
# 查看kafka brokerid 和 topic [zk: localhost:2181(CONNECTED) 2] ls /brokers [ids, topics, seqid] [zk: localhost:2181(CONNECTED) 3] ls /brokers/ids [1, 2, 3] [zk: localhost:2181(CONNECTED) 4] ls /brokers/topics [hello04, hello05, hello02, hello03, hello01, hello10, __consumer_offsets]【1.2】控制器
1)控制器定義:集群里第一個(gè)啟動(dòng)的broker通過(guò)在zk創(chuàng)建臨時(shí)節(jié)點(diǎn) /controller 讓自己成為控制器;
其他broker也嘗試創(chuàng)建 controller 節(jié)點(diǎn),若已存在,則報(bào)錯(cuò);其他 broker 會(huì)在控制器節(jié)點(diǎn)上創(chuàng)建 zk watch 對(duì)象,這樣非控制器節(jié)點(diǎn)可以收到控制器節(jié)點(diǎn)狀態(tài)變更的通知;(干貨——這種方式可以確保一個(gè)集群只能有一個(gè)控制器存在,防止腦裂問(wèn)題)
2)控制器選舉策略:一旦控制器被關(guān)閉或與zk斷開,其他broker通過(guò)watch對(duì)象就會(huì)收到控制器消失的通知,這些 非控制器broker 會(huì)競(jìng)爭(zhēng)在 zk 上創(chuàng)建 controller節(jié)點(diǎn),誰(shuí)最先創(chuàng)建成功,誰(shuí)就是集群控制器; 然后其他broker在控制器節(jié)點(diǎn)上創(chuàng)建 zk watch對(duì)象;
- 2.1)每次控制器選舉后: 控制器紀(jì)元值(時(shí)代值)controller_epoch? 都會(huì)遞增;其他broker若收到控制器發(fā)出的包含舊 epoch 的消息,就會(huì)忽略;
3)控制器實(shí)驗(yàn)
step1) 查看 控制器和控制器紀(jì)元
[zk: localhost:2181(CONNECTED) 5] get /controller_epoch 6[zk: localhost:2181(CONNECTED) 6] get /controller {"version":1,"brokerid":1,"timestamp":"1638692039821"}顯然, epoch是6,控制器是broker1;
step2)停止掉 broker1;?
這個(gè)時(shí)候,broker2,3 會(huì)競(jìng)爭(zhēng)選舉為控制器;我們?cè)俅尾榭纯刂破?#xff0c;發(fā)現(xiàn)控制器現(xiàn)在是broker2了;且 epoch自增為7;?
[zk: localhost:2181(CONNECTED) 7] get /controller_epoch 7[zk: localhost:2181(CONNECTED) 8] get /controller {"version":1,"brokerid":2,"timestamp":"1638733315396"}4)控制器作用
- 控制器負(fù)責(zé)在broker加入或離開時(shí)進(jìn)行分區(qū)首領(lǐng)選舉;
- 控制器使用 epoch 避免腦裂問(wèn)題;
【補(bǔ)充】腦裂指兩個(gè)節(jié)點(diǎn)同時(shí)認(rèn)為自己是集群控制器;?
5)zk的作用
【1.3】復(fù)制
復(fù)制功能是kafka架構(gòu)的核心;在kafka 文檔里,kafka把自己描述為 一個(gè)分布式的,可分區(qū)的,可復(fù)制的提交日志服務(wù);(kakfa的日志就是數(shù)據(jù)或消息);
【1.3.1】副本
1)數(shù)據(jù)存儲(chǔ)
kafka使用主題來(lái)組織數(shù)據(jù)(邏輯);使用分區(qū)為單位來(lái)讀寫數(shù)據(jù)(物理);
為什么說(shuō)kakfa以分區(qū)為單位讀寫? 是因?yàn)槲覀儎?chuàng)建帶有分區(qū)數(shù)和副本數(shù)的主題后, kakfa會(huì)創(chuàng)建以這個(gè)分區(qū)命名的文件夾,分區(qū)文件夾下存儲(chǔ)消息內(nèi)容,索引文件等;
?2)主題,分區(qū),副本關(guān)系
- 1個(gè)主題對(duì)應(yīng)多個(gè)分區(qū);
- 1個(gè)分區(qū)對(duì)應(yīng)多個(gè)副本;
- 1個(gè)副本對(duì)應(yīng)多個(gè)分段文件;(分段存儲(chǔ))?
3)副本類型?
- 3.1)首領(lǐng)副本:每個(gè)分區(qū)都有一個(gè)首領(lǐng)副本,消息讀寫首先會(huì)操作首領(lǐng)副本;
- 3.2)跟隨者副本:首領(lǐng)副本以外的副本;它們不處理讀寫請(qǐng)求,唯一任務(wù)是從首領(lǐng)副本復(fù)制消息,與首領(lǐng)保持?jǐn)?shù)據(jù)同步;如果首領(lǐng)發(fā)生崩潰,其中一個(gè)同步的跟隨者副本被提升為首領(lǐng)副本;
補(bǔ)充1:跟隨者副本在成為不同步副本前的時(shí)間是通過(guò) replica.lag.time.max.ms 來(lái)配置;
補(bǔ)充2:跟隨者從首領(lǐng)副本復(fù)制消息時(shí)的請(qǐng)求,與消費(fèi)者從首領(lǐng)副本消費(fèi)消息時(shí)發(fā)出的請(qǐng)求是一樣的;
【1.4】處理請(qǐng)求
1)broker處理請(qǐng)求過(guò)程
- step1)broker會(huì)在監(jiān)聽端口上運(yùn)行一個(gè) Acceptor線程(可以理解為服務(wù)器套接字 ServerSocket),這個(gè)線程會(huì)創(chuàng)建一個(gè)連接(類似ServerSocket.accept() 方法),把請(qǐng)求交給 Processor線程(網(wǎng)絡(luò)線程)去處理;
- step2)Processor線程從客戶端獲取請(qǐng)求消息,把它放進(jìn)請(qǐng)求隊(duì)列,然后從響應(yīng)隊(duì)列獲取響應(yīng)結(jié)果并發(fā)送給客戶端;
- step3) 在請(qǐng)求被放入請(qǐng)求隊(duì)列后, IO線程會(huì)處理它們,并把處理結(jié)果放入 響應(yīng)隊(duì)列;
?2)常見請(qǐng)求類型
- 生產(chǎn)請(qǐng)求:生產(chǎn)者發(fā)送的請(qǐng)求,包含要寫入的消息;
- 獲取請(qǐng)求:消費(fèi)者或跟隨者副本所在broker需要從首領(lǐng)副本所在broker獲取消息而發(fā)送的請(qǐng)求;
【注意】
3)客戶端怎么知道請(qǐng)求發(fā)送到哪里呢?
3.1)客戶端在發(fā)送請(qǐng)求前,先發(fā)送元數(shù)據(jù)請(qǐng)求;
- 這種請(qǐng)求的響應(yīng)結(jié)果包括 主題,主題分區(qū),分區(qū)副本以及首領(lǐng)副本;
3.2)客戶端會(huì)緩存這些元數(shù)據(jù)信息;
- 獲取元數(shù)據(jù)信息后,會(huì)直接往對(duì)應(yīng)的 broker發(fā)送請(qǐng)求和獲取請(qǐng)求;
- 當(dāng)然,客戶端需要定時(shí)刷新元數(shù)據(jù)緩存; 刷新時(shí)間間隔通過(guò)? metadata.max.age.ms 來(lái)配置;?
【1.4.1】生產(chǎn)請(qǐng)求
1)生產(chǎn)者acks有3個(gè)值;
- acks=0 ; 生產(chǎn)者在發(fā)送消息后,默認(rèn)發(fā)送成功;而不會(huì)等待服務(wù)器響應(yīng);
- acks=1 ; 只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到發(fā)送成功的響應(yīng);而不管副本節(jié)點(diǎn)是否收到消息;
- acks=all; 需要集群的首領(lǐng)節(jié)點(diǎn)和跟隨節(jié)點(diǎn)(副本節(jié)點(diǎn))都收到消息后,生產(chǎn)者才會(huì)收到發(fā)送成功的響應(yīng);
2)首領(lǐng)副本所在broker收到生產(chǎn)請(qǐng)求后,會(huì)對(duì)請(qǐng)求做一些驗(yàn)證:
- 發(fā)送數(shù)據(jù)的用戶是否有寫入權(quán)限;
- acks的值是否合法; (只允許出現(xiàn) 0, 1, all);
- 根據(jù)acks的值,進(jìn)行副本復(fù)制策略;
【1.4.2】獲取請(qǐng)求
1)首領(lǐng)副本所在broker收到獲取請(qǐng)求后,會(huì)根據(jù)客戶端指定的請(qǐng)求偏移量從分區(qū)里讀取消息;
2)kafka使用 零復(fù)制技術(shù) 向客戶端發(fā)送消息,即kafka直接把消息從文件發(fā)送到網(wǎng)絡(luò)通道,而不經(jīng)過(guò)任何中間緩沖區(qū);(干貨——這是kakfa與大部分?jǐn)?shù)據(jù)庫(kù)不一樣的地方,其他數(shù)據(jù)庫(kù)在把數(shù)據(jù)發(fā)送到客戶端前,會(huì)把數(shù)據(jù)保存到本地緩存)
- 零復(fù)制技術(shù)優(yōu)點(diǎn):避免了字節(jié)復(fù)制,也不需要管理內(nèi)存緩沖區(qū),從而獲取更好性能;?
3)消費(fèi)者客戶端只能讀取已經(jīng)被寫入所有同步副本的消息,而不是所有消息
- 因?yàn)檫€沒有被足夠多副本復(fù)制的消息被認(rèn)為是不安全的;如果首領(lǐng)副本所在broker發(fā)送崩潰,另一副本成為新首領(lǐng),那這些不安全的消息就會(huì)丟失;
4)擴(kuò)展 ISR, HW高水位
小結(jié): 消費(fèi)者只能看到已經(jīng)復(fù)制所有副本的消息;
5)在 Kafka 中,高水位的作用主要有 2 個(gè)。
6)下面這張圖展示了多個(gè)與高水位相關(guān)的 Kafka 術(shù)語(yǔ) 。
我們假設(shè)這是某個(gè)分區(qū) Leader副本的高水位圖。
1)首先,請(qǐng)你注意圖中的“已提交消息”和“未提交消息”。在分區(qū)高水位以下的消息被認(rèn)為是已提交消息,反之就是未提交消息。消費(fèi)者只能消費(fèi)已提交消息,即圖中位移小于 8 的所有消息。另外,需要關(guān)注的是,位移值等于高水位的消息也屬于未提交消息。也就是說(shuō),高水位上的消息是不能被消費(fèi)者消費(fèi)的。
2)圖中還有一個(gè)日志末端位移的概念,即 Log End Offset,簡(jiǎn)寫是 LEO。
它表示副本寫入下一條消息的位移值。注意,數(shù)字 15 所在的方框是虛線,這就說(shuō)明,這個(gè)副本當(dāng)前只有 15 條消息,位移值是從 0 到 14,下一條新消息的位移是 15。顯然,介于高水位和 LEO 之間的消息就屬于未提交消息。這也從側(cè)面告訴了我們一個(gè)重要的事實(shí),那就是:同一個(gè)副本對(duì)象,其高水位值不會(huì)大于 LEO 值。
【高水位小結(jié)】高水位和 LEO 是副本對(duì)象的兩個(gè)重要屬性
?【1.4.3】其他請(qǐng)求
- OffsetCommitRequest, 偏移量提交請(qǐng)求;
- OffsetFetchRequest;
- ListOffsetsRequest;
【1.5】物理存儲(chǔ)
1)kafka的基本存儲(chǔ)單元是分區(qū); 分區(qū)會(huì)在所屬broker上的kafka數(shù)據(jù)根目錄下新建名為分區(qū)名的文件夾,如 hello04-2(主題為hello04的2號(hào)分區(qū)文件夾),kafka數(shù)據(jù)根目錄由 server.properties 中的 log.dirs 來(lái)指定;
2)主題,分區(qū),副本關(guān)系
- 1個(gè)主題對(duì)應(yīng)多個(gè)分區(qū);
- 1個(gè)分區(qū)對(duì)應(yīng)多個(gè)副本;
- 1個(gè)副本對(duì)應(yīng)多個(gè)分段文件;(分段存儲(chǔ))?
【1.5.1】分區(qū)分配
1)創(chuàng)建指定分區(qū)和副本數(shù)的topic來(lái)做實(shí)驗(yàn)
# 創(chuàng)建分區(qū)數(shù)3副本數(shù)2的主題 kafka-topics.sh --bootstrap-server centos201:9092 --create --topic hello11 --partitions 3 --replication-factor 2 # 副本數(shù)量必須小于等于broker數(shù)量,但分區(qū)數(shù)沒有這個(gè)限制;查看分區(qū)詳情
[root@centos201 hello04-1]# kafka-topics.sh --bootstrap-server centos201:9092 \ --describe --topic hello11 Topic: hello11 TopicId: IliU_BDeS8ycreLufxCMMw PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1024Topic: hello11 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3Topic: hello11 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1Topic: hello11 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2查看具體存儲(chǔ)數(shù)據(jù)的文件夾,以broker1為例;?
根據(jù)topic詳情,我們知道 broker1 存儲(chǔ)了topic hello11的1號(hào)和2號(hào)分區(qū); 且它是2號(hào)分區(qū)首領(lǐng)所在的broker ;
進(jìn)入 broker1的kafka數(shù)據(jù)根目錄,
?
?進(jìn)入其中一個(gè)分區(qū)文件夾查看? hello11-1 ,如下:
再查看分區(qū)文件夾前,我們先寫入10條消息; 指定topic hello11, 1號(hào)分區(qū);
for (int i = 0; i < 10; i++) {Future<RecordMetadata> future = producer.send( new ProducerRecord<String, String>("hello11", 1,"", String.format("[%s] ", order++) + now + " > " + DataFactory.INSTANCE.genOneHundred()));try {System.out.println("[生產(chǎn)者] " + future.get().partition() + "-" + future.get().offset());} catch (Exception e) {e.printStackTrace();} }查看分區(qū)文件夾下的文件 ;
2) kafka的分段存儲(chǔ);
因?yàn)樵谝粋€(gè)大文件里查找和刪除消息很耗時(shí);所以把一個(gè)分區(qū)分成若干片段進(jìn)行存儲(chǔ);默認(rèn)情況下,一個(gè)片段存儲(chǔ)1g數(shù)據(jù),為了實(shí)驗(yàn),這里我修改為 1k,可以在 server.properties文件中設(shè)置 log.segment.bytes=1024 來(lái)實(shí)現(xiàn);
3)kafka的稀疏索引
- kafka并沒有對(duì)每條消息建立索引,那樣太大了,而是采用稀疏索引(稀疏存儲(chǔ))的方式,即一條索引記錄指向一個(gè)消息范圍;
例如: 索引值 1~100 指向 數(shù)據(jù)文件1.log中的消息1到消息100的消息范圍的起始地址;
refer2 Apache Kafka ;
當(dāng)消費(fèi)者指定消費(fèi)某個(gè)offset記錄時(shí), kafka集群通過(guò)二分查找從索引文件找出包含offset的索引值,通過(guò)索引值找到對(duì)應(yīng)數(shù)據(jù)文件的起始地址,然后從起始地址開始順序讀取對(duì)應(yīng)offset的消息;
【1.5.2】文件格式
1)kafka 使用零復(fù)制技術(shù)給消費(fèi)者發(fā)送消息,避免了對(duì)生產(chǎn)者已經(jīng)壓縮過(guò)的消息進(jìn)行解壓和再壓縮;?
2)普通消息與壓縮消息格式 ?
?可以看出,多個(gè)壓縮消息共用同一個(gè)消息頭,從而減少消息大小;
【References】
總結(jié)
以上是生活随笔為你收集整理的kafka控制器,复制与存储小结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 落地页网址怎么生成(落地页网址怎么生成的
- 下一篇: 转:运维监控系统-监控项及指标的梳理