Kafka:Kafka核心概念
1 消息系統(tǒng)簡(jiǎn)介
1.1 為什么要用消息系統(tǒng) ?
解耦 各位系統(tǒng)之間通過消息系統(tǒng)這個(gè)統(tǒng)一的接口交換數(shù)據(jù),無須了解彼此的存在;
冗余 部分消息系統(tǒng)具有消息持久化能力,可規(guī)避消息處理前丟失的風(fēng)險(xiǎn);
靈活性和消除峰值 在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰;(節(jié)省資源)
可恢復(fù)性 系統(tǒng)中部分組件失效并不會(huì)影響整個(gè)系統(tǒng),它恢復(fù)后仍然可從消息系統(tǒng)中獲取并處理數(shù)據(jù);
順序保障 在大多使用場(chǎng)景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊(duì)列本來就是排序的,并且能保證數(shù)據(jù)會(huì)按照特定的順序來處理。Kafka保證一個(gè)Partition內(nèi)的消息的有序性;
異步通信 在不需要立即處理請(qǐng)求的場(chǎng)景下,可以將請(qǐng)求放入消息系統(tǒng),合適的時(shí)候再處理。
1.2 有哪些消息系統(tǒng) ?
RabbitMQ Erlang編寫,支持多協(xié)議 AMQP,XMPP,SMTP,STOMP。支持負(fù)載均衡、數(shù)據(jù)持久化。同時(shí)支持Peer-to-Peer和發(fā)布/訂閱模式;
Redis 基于Key-Value對(duì)的NoSQL數(shù)據(jù)庫(kù),同時(shí)支持MQ功能,可做輕量級(jí)隊(duì)列服務(wù)使用。就入隊(duì)操作而言, Redis對(duì)短消息(小于10KB)的性能比RabbitMQ好,長(zhǎng)消息的性能比RabbitMQ差;
ZeroMQ 輕量級(jí),不需要單獨(dú)的消息服務(wù)器或中間件,應(yīng)用程序本身扮演該角色,Peer-to-Peer。它實(shí)質(zhì)上是 一個(gè)庫(kù),需要開發(fā)人員自己組合多種技術(shù),使用復(fù)雜度高;
ActiveMQ JMS實(shí)現(xiàn),Peer-to-Peer,支持持久化、XA事務(wù);
MetaQ/RocketMQ 純Java實(shí)現(xiàn),發(fā)布/訂閱消息系統(tǒng),支持本地事務(wù)和XA分布式事務(wù);
Kafka 高性能跨語言的分布式發(fā)布/訂閱消息系統(tǒng),數(shù)據(jù)持久化,全分布式,同時(shí)支持實(shí)時(shí)在線處理和離線數(shù)據(jù)處理。Apache Kafka相對(duì)于ActiveMQ是一個(gè)非常輕量級(jí)的消息系統(tǒng),除了性能非常好之外,還是一個(gè)工作良好的分布式系統(tǒng)。
1.3 Kafka設(shè)計(jì)目標(biāo)是什么?
高吞吐率 在廉價(jià)的商用機(jī)器上單機(jī)可支持每秒100萬條消息的讀寫;
消息持久化 所有消息均被持久化到磁盤,無消息丟失,支持消息重放;
完全分布式 Producer,Broker,Consumer均支持水平擴(kuò)展,同時(shí)適應(yīng)在線流處理和離線批處理。
2 Kafka簡(jiǎn)介和架構(gòu)
2.1 kafka架構(gòu)
kafka是生產(chǎn)者生產(chǎn)消息、kafka集群、消費(fèi)者獲取消息這樣一種架構(gòu),如下圖:
注意,還有zookeeper圖中未畫出。
2.2 kafka核心概念
(1)消息
消息是kafka中最基本的數(shù)據(jù)單元,其ProducerRecord如下所示:
public class ProducerRecord<K, V> {private final String topic;private final Integer partition;private final Headers headers;private final K key;private final V value;private final Long timestamp;//略... }其中key/value是producer發(fā)送數(shù)據(jù)時(shí)指定,key的主要作用是根據(jù)一定的策略,將此消息路由到指定的Partition中,這樣可以保證同一key的消息全部寫入同一分區(qū)中(key可以為null)。
(2)Topic & 分區(qū) & Log
Topic:存儲(chǔ)消息的邏輯概念,可以看作是一個(gè)消息集合。每個(gè)Topic可以有多個(gè)生產(chǎn)者向其中push消息,也可以任意多個(gè)消費(fèi)者消費(fèi)消息。
Partition:每個(gè)Topic可以劃分成多個(gè)分區(qū),同一Topic下的不同分區(qū)包含的消息是不同的。一個(gè)消息被添加到Topic時(shí),會(huì)分配唯一的一個(gè)offset,Kafka通過offset保證消息在分區(qū)內(nèi)時(shí)順序的。即:Kafka保證一個(gè)分區(qū)內(nèi)的消息是有序的;同一Topic的多個(gè)分區(qū)的消息,Kafka并不保證其順序性,如下圖:
注:同一Topic的不同分區(qū)會(huì)分配在不同Broker上,分區(qū)是Kafka水平擴(kuò)展性的基礎(chǔ)。
Log:分區(qū)在邏輯上對(duì)應(yīng)一個(gè)Log,當(dāng)生產(chǎn)者將消息寫入分區(qū)時(shí),實(shí)際就是寫入到對(duì)應(yīng)的Log中。
Log是邏輯概念,對(duì)應(yīng)到磁盤中的一個(gè)文件夾。Log是由多個(gè)Segment組成的,每個(gè)Segment對(duì)應(yīng)一個(gè)日志文件和索引文件,注意Segment的大小是由限制的,當(dāng)超過限制后會(huì)產(chǎn)生新的Segment。注意的是:Kafka采取的是順序磁盤IO,所以只允許向最新的Segment追加數(shù)據(jù)。索引文件采用稀疏索引的方式,運(yùn)行時(shí)會(huì)將其映射到內(nèi)存,提高索引速度。
(3)Broker
一個(gè)單獨(dú)的Kafka server就是一個(gè)Broker,主要工作是接收生產(chǎn)者發(fā)送的消息,分配offset,之后保存到磁盤中;同時(shí),接收消費(fèi)者、其他Broker的請(qǐng)求,根據(jù)請(qǐng)求類型進(jìn)行相應(yīng)處理并返回響應(yīng)。
(4)Producer
主要工作是生產(chǎn)消息,將消息按照一定的規(guī)則推送到Topic的分區(qū)中。如:根據(jù)消息key的Hash值選擇分區(qū)、或者按序輪詢?nèi)糠謪^(qū)。
(5)Consumer
主要工作是從Topic拉取消息進(jìn)行消費(fèi)。某個(gè)消費(fèi)者消費(fèi)Partition的哪個(gè)位置(offset)是由Consumer自己維護(hù)的。
這么設(shè)計(jì)的目的:
- 避免KafkaServer端維護(hù)消費(fèi)者消費(fèi)位置的開銷;
- 防止KafkaSever端出現(xiàn)延遲或者消費(fèi)狀態(tài)丟失時(shí),影響大量的Consumer;
- 提高了Consumer的靈活性,Consumer可以修改消費(fèi)位置對(duì)某些消息反復(fù)消費(fèi)。
(6)Consumer Group
Kafka中可以讓多個(gè)Consumer組成一個(gè) Consumer Group(下面簡(jiǎn)稱CG),一個(gè)Consumer只能屬于一個(gè)CG。Kafka保證其訂閱的Topic的每個(gè)Partition只會(huì)分配給此CG的一個(gè)消費(fèi)者進(jìn)行處理。如果不同CG訂閱了同一個(gè)Topic,CG之間是不會(huì)互相影響的。
CG可以實(shí)現(xiàn)**“獨(dú)占”和“廣播”**模式的消息處理。
“獨(dú)占”:即實(shí)現(xiàn)一個(gè)消息只被一個(gè)消費(fèi)者消費(fèi)的效果,則將每個(gè)Consumer單獨(dú)放入一個(gè)CG中。
“廣播”:即實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者消費(fèi)的效果,則將所有消費(fèi)者放在一個(gè)CG中。
Kafka還通過CG實(shí)現(xiàn)了Consumer的水平擴(kuò)展和故障轉(zhuǎn)移。
“水平擴(kuò)展”:如上圖,當(dāng)Consumer3能力不足以處理兩個(gè)分區(qū)時(shí),可以向CG添加一個(gè)Consumer4,并觸發(fā)Rebalance重新分配分區(qū)與消費(fèi)者的對(duì)應(yīng)關(guān)系,實(shí)現(xiàn)水平擴(kuò)展,是Consumer4對(duì)Partition3進(jìn)行消費(fèi)。
“故障轉(zhuǎn)移”:若此時(shí),Consumer4宕機(jī)了,CG又會(huì)重新分配分區(qū),Consumer3將會(huì)接管Consumer4的分區(qū)。
注意:CG中的消費(fèi)者數(shù)量不是越多越好,當(dāng) CG消費(fèi)者數(shù)量 > 分區(qū)數(shù)量 時(shí),將會(huì)造成消費(fèi)者的浪費(fèi)。
(7)副本
Kafka對(duì)消息進(jìn)行了冗余備份,每個(gè)分區(qū)可以有多個(gè)副本,每個(gè)副本包含的消息是一樣的。(同一時(shí)刻,副本之間其實(shí)并不完全一樣)
每個(gè)分區(qū)的副本集合有兩種角色:一個(gè)leader副本、多個(gè)follower副本。kafka在不同的場(chǎng)景下會(huì)采用不同的選舉策略。所有的讀寫請(qǐng)求都由選舉出的leader提供服務(wù),其他都作為follower副本,如下圖所示:
follower副本僅僅只是把leader副本數(shù)據(jù)拉取到本地后,同步更新到自己的Log中。
一般情況下,同一分區(qū)的多個(gè)副本是被分到不同Broker上的,這樣當(dāng)leader所在的Broker宕機(jī)后,可以重新選舉新的leader繼續(xù)對(duì)外提供服務(wù)。
(8)保留策略 & 日志壓縮
無論消費(fèi)者是否消費(fèi)過消息,Kafka為了保證磁盤不被占滿,會(huì)配置相應(yīng)的“保留策略”,以實(shí)現(xiàn)周期性地刪除陳舊的消息。
kafka有2種保留策略:
kafka會(huì)啟動(dòng)一個(gè)后臺(tái)線程,定期檢查是否有可以刪除的消息。“保留策略”可以有全局配置,也可以針對(duì)某個(gè)Topic覆蓋全局配置。
“日志壓縮”:有些場(chǎng)景下,用戶只關(guān)心key對(duì)應(yīng)的最新value值,這是就可以開啟其日志壓縮功能,會(huì)啟動(dòng)一個(gè)線程,定期將相同key的消息合并,只保留最新的value。如下所示:
(9)Cluster & Controller
多個(gè)Broker構(gòu)成一個(gè)Cluster(集群)對(duì)外提供服務(wù),每個(gè)集群會(huì)選取一個(gè)Broker來擔(dān)任Controller。
Controller職責(zé):管理分區(qū)的狀態(tài)、管理每個(gè)分區(qū)的副本狀態(tài)、監(jiān)聽Zookeeper中數(shù)據(jù)的變化等工作。
其他Broker:監(jiān)聽Controller Leader的狀態(tài)。
當(dāng)Controller出現(xiàn)故障時(shí)會(huì)重新選取Controller Leader。
(10)ISR集合
ISR是In-Sync Replica的縮寫,ISR集合表示的是 **目前“可用”(alive)**且 消息量與Leader相差不多的副本集合。ISR集合中的副本必須滿足下面兩個(gè)條件:
每個(gè)分區(qū)的leader副本會(huì)維護(hù)此分區(qū)的ISR集合,會(huì)將違反上面兩個(gè)條件的副本踢出ISR集合外。
(11)HW & LEO
HW(HightWatermark,水位線)標(biāo)記了一個(gè)特殊的offset,消費(fèi)者處理消息的時(shí)候,HW之后的消息對(duì)于消費(fèi)者是不可見的。HW也是由leader副本管理的。
Kafka官網(wǎng)將HW之前的消息狀態(tài)稱為“commit”,此時(shí)就算leader副本損壞了,也不會(huì)造成HW之前的數(shù)據(jù)丟失。當(dāng)ISR集合中全部的Follower副本都拉取HW指定消息進(jìn)行同步后,Leader副本會(huì)遞增HW。
LEO(Log End Offset)是所有副本都會(huì)有的一個(gè)offset標(biāo)記,它指向當(dāng)前副本的最后一個(gè)消息的offset。
現(xiàn)在考慮kafka為什么要這樣設(shè)計(jì)?
在分布式存儲(chǔ)中,冗余備份一般有兩種方案:同步復(fù)制 和 異步復(fù)制。
**同步復(fù)制:**要求所有Follower副本全部復(fù)制完,這條消息才會(huì)被認(rèn)為提交成功。此時(shí)若有一個(gè)副本出現(xiàn)故障,會(huì)導(dǎo)致HW無法完成遞增,消息無法提交,故障的Follower副本就會(huì)拖慢系統(tǒng)性能,甚至造成不可用。
**異步復(fù)制:**Leader副本收到生產(chǎn)者推送的消息,就會(huì)認(rèn)為消息提交成功。Follower副本異步地從Leader副本中同步消息,這可能會(huì)造成Follower副本的消息量總是遠(yuǎn)遠(yuǎn)落后于Leader副本。
**Kafka怎么解決的?**kafka權(quán)衡了上述兩種策略,引入了ISR集合的概念,當(dāng)Follower副本延遲過高時(shí),Follower副本被踢出ISR集合,使得消息依然能快速被提交。
- 可以通過從ISR集合中踢出高延遲的Follower副本,避免高延遲副本影響集群性能;
- 當(dāng)Leader副本宕機(jī)時(shí),kafka會(huì)優(yōu)先將ISR集合中的Follower副本選舉為L(zhǎng)eader副本,新副本包含了HW之前的全部消息,從而避免消息丟失。
注意:Follower副本更新消息時(shí)采用的是批量寫磁盤,加速了磁盤IO,極大減少了Follower與Leader的差距。
2.3 zookeeper在kafka的作用
其在Kafka的作用有:
-
Broker注冊(cè)
Zookeeper上會(huì)有一個(gè)專門用來進(jìn)行Broker服務(wù)器列表記錄的節(jié)點(diǎn):/brokers/ids。每個(gè)Broker在啟動(dòng)時(shí),都會(huì)到Zookeeper上進(jìn)行注冊(cè),即到/brokers/ids下創(chuàng)建屬于自己的節(jié)點(diǎn),如/brokers/ids/[0…N]。不同的Broker必須使用不同的Broker ID進(jìn)行注冊(cè),創(chuàng)建完節(jié)點(diǎn)后,每個(gè)Broker就會(huì)將自己的IP地址和端口信息記錄到該節(jié)點(diǎn)中去。其中,Broker創(chuàng)建的節(jié)點(diǎn)類型是臨時(shí)節(jié)點(diǎn),一旦Broker宕機(jī),則對(duì)應(yīng)的臨時(shí)節(jié)點(diǎn)也會(huì)被自動(dòng)刪除。
-
Topic注冊(cè)
在Kafka中,同一個(gè)Topic的消息會(huì)被分成多個(gè)分區(qū)并將其分布在多個(gè)Broker上,這些分區(qū)信息及與Broker的對(duì)應(yīng)關(guān)系也都是由Zookeeper在維護(hù),由專門的節(jié)點(diǎn)來記錄,如:/borkers/topics。Broker服務(wù)器啟動(dòng)后,會(huì)到對(duì)應(yīng)Topic節(jié)點(diǎn)(/brokers/topics)上注冊(cè)自己的Broker ID并寫入針對(duì)該Topic的分區(qū)總數(shù),如/brokers/topics/login/3->2,這個(gè)節(jié)點(diǎn)表示Broker ID為3的一個(gè)Broker服務(wù)器,對(duì)于"login"這個(gè)Topic的消息,提供了2個(gè)分區(qū)進(jìn)行消息存儲(chǔ),同樣,這個(gè)分區(qū)節(jié)點(diǎn)也是臨時(shí)節(jié)點(diǎn)。
-
生產(chǎn)者負(fù)載均衡
由于同一個(gè)Topic消息會(huì)被分區(qū)并將其分布在多個(gè)Broker上,因此,生產(chǎn)者需要將消息合理地發(fā)送到這些分布式的Broker上,那么如何實(shí)現(xiàn)生產(chǎn)者的負(fù)載均衡,Kafka支持傳統(tǒng)的四層負(fù)載均衡,也支持Zookeeper方式實(shí)現(xiàn)負(fù)載均衡。
(1) 四層負(fù)載均衡,根據(jù)生產(chǎn)者的IP地址和端口來為其確定一個(gè)相關(guān)聯(lián)的Broker。通常,一個(gè)生產(chǎn)者只會(huì)對(duì)應(yīng)單個(gè)Broker,然后該生產(chǎn)者產(chǎn)生的消息都發(fā)往該Broker。這種方式邏輯簡(jiǎn)單,每個(gè)生產(chǎn)者不需要同其他系統(tǒng)建立額外的TCP連接,只需要和Broker維護(hù)單個(gè)TCP連接即可。但是,其無法做到真正的負(fù)載均衡,因?yàn)閷?shí)際系統(tǒng)中的每個(gè)生產(chǎn)者產(chǎn)生的消息量及每個(gè)Broker的消息存儲(chǔ)量都是不一樣的,如果有些生產(chǎn)者產(chǎn)生的消息遠(yuǎn)多于其他生產(chǎn)者的話,那么會(huì)導(dǎo)致不同的Broker接收到的消息總數(shù)差異巨大,同時(shí),生產(chǎn)者也無法實(shí)時(shí)感知到Broker的新增和刪除。
(2) 使用Zookeeper進(jìn)行負(fù)載均衡,由于每個(gè)Broker啟動(dòng)時(shí),都會(huì)完成Broker注冊(cè)過程,生產(chǎn)者會(huì)通過該節(jié)點(diǎn)的變化來動(dòng)態(tài)地感知到Broker服務(wù)器列表的變更,這樣就可以實(shí)現(xiàn)動(dòng)態(tài)的負(fù)載均衡機(jī)制。
-
消費(fèi)者負(fù)載均衡
與生產(chǎn)者類似,Kafka中的消費(fèi)者同樣需要進(jìn)行負(fù)載均衡來實(shí)現(xiàn)多個(gè)消費(fèi)者合理地從對(duì)應(yīng)的Broker服務(wù)器上接收消息,每個(gè)消費(fèi)者分組包含若干消費(fèi)者,每條消息都只會(huì)發(fā)送給分組中的一個(gè)消費(fèi)者,不同的消費(fèi)者分組消費(fèi)自己特定的Topic下面的消息,互不干擾。
-
記錄 分區(qū) 與 消費(fèi)者組 的關(guān)系
在Kafka中,規(guī)定了每個(gè)消息分區(qū) 只能被同組的一個(gè)消費(fèi)者進(jìn)行消費(fèi),因此,需要在 Zookeeper 上記錄 消息分區(qū) 與 Consumer 之間的關(guān)系,每個(gè)消費(fèi)者一旦確定了對(duì)一個(gè)消息分區(qū)的消費(fèi)權(quán)力,需要將其Consumer ID 寫入到 Zookeeper 對(duì)應(yīng)消息分區(qū)的臨時(shí)節(jié)點(diǎn)上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一個(gè) 消息分區(qū) 的標(biāo)識(shí),節(jié)點(diǎn)內(nèi)容就是該 消息分區(qū) 上 消費(fèi)者的Consumer ID。
-
offset的記錄
在消費(fèi)者對(duì)指定消息分區(qū)進(jìn)行消息消費(fèi)的過程中,需要定時(shí)地將分區(qū)消息的消費(fèi)進(jìn)度Offset記錄到Zookeeper上,以便在該消費(fèi)者進(jìn)行重啟或者其他消費(fèi)者重新接管該消息分區(qū)的消息消費(fèi)后,能夠從之前的進(jìn)度開始繼續(xù)進(jìn)行消息消費(fèi)。Offset在Zookeeper中由一個(gè)專門節(jié)點(diǎn)進(jìn)行記錄,其節(jié)點(diǎn)路徑為:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
節(jié)點(diǎn)內(nèi)容就是Offset的值。
-
消費(fèi)者注冊(cè)
消費(fèi)者服務(wù)器在初始化啟動(dòng)時(shí)加入消費(fèi)者分組的步驟如下
注冊(cè)到消費(fèi)者分組。每個(gè)消費(fèi)者服務(wù)器啟動(dòng)時(shí),都會(huì)到Zookeeper的指定節(jié)點(diǎn)下創(chuàng)建一個(gè)屬于自己的消費(fèi)者節(jié)點(diǎn),例如/consumers/[group_id]/ids/[consumer_id],完成節(jié)點(diǎn)創(chuàng)建后,消費(fèi)者就會(huì)將自己訂閱的Topic信息寫入該臨時(shí)節(jié)點(diǎn)。
對(duì) 消費(fèi)者分組 中的 消費(fèi)者 的變化注冊(cè)監(jiān)聽。每個(gè) 消費(fèi)者 都需要關(guān)注所屬 消費(fèi)者分組 中其他消費(fèi)者服務(wù)器的變化情況,即對(duì)/consumers/[group_id]/ids節(jié)點(diǎn)注冊(cè)子節(jié)點(diǎn)變化的Watcher監(jiān)聽,一旦發(fā)現(xiàn)消費(fèi)者新增或減少,就觸發(fā)消費(fèi)者的負(fù)載均衡。
對(duì)Broker服務(wù)器變化注冊(cè)監(jiān)聽。消費(fèi)者需要對(duì)/broker/ids/[0-N]中的節(jié)點(diǎn)進(jìn)行監(jiān)聽,如果發(fā)現(xiàn)Broker服務(wù)器列表發(fā)生變化,那么就根據(jù)具體情況來決定是否需要進(jìn)行消費(fèi)者負(fù)載均衡。
進(jìn)行消費(fèi)者負(fù)載均衡。為了讓同一個(gè)Topic下不同分區(qū)的消息盡量均衡地被多個(gè) 消費(fèi)者 消費(fèi)而進(jìn)行 消費(fèi)者 與 消息 分區(qū)分配的過程,通常,對(duì)于一個(gè)消費(fèi)者分組,如果組內(nèi)的消費(fèi)者服務(wù)器發(fā)生變更或Broker服務(wù)器發(fā)生變更,會(huì)發(fā)出消費(fèi)者負(fù)載均衡。
Kafka的zookeeper存儲(chǔ)結(jié)構(gòu)如下:
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-tYv7k0ac-1578199803523)(assets/zookeeper在kafka的作用-1534754742260.png)]
2.4 kafka高性能的原因
(1)高效使用磁盤
-
Kafka的整個(gè)設(shè)計(jì)中,Partition相當(dāng)于一個(gè)非常長(zhǎng)的數(shù)組,而Broker接收到的所有消息順序?qū)懭脒@個(gè)大數(shù)組中。同時(shí)Consumer通過Offset順序消費(fèi)這些數(shù)據(jù),并且不刪除已經(jīng)消費(fèi)的數(shù)據(jù),從而避免了隨機(jī)寫磁盤的過程。
-
Kafka順序存寫數(shù)據(jù),故刪除時(shí)刪除對(duì)應(yīng)的Segment(物理文件,disk),避免對(duì)文件的隨機(jī)寫操作。
-
充分利用了頁緩存PageCache。
-
支持多DIsk Drive。Broker的log.dirs配置項(xiàng),允許配置多個(gè)文件夾。如果機(jī)器上有多個(gè)Disk Drive,可將不同的Disk掛載到不同的目錄,然后將這些目錄都配置到log.dirs里。Kafka會(huì)盡可能將不同的Partition分配到不同的目錄,也即不同的Disk上,從而充分利用了多Disk的優(yōu)勢(shì)。
(2)零拷貝技術(shù)
Kafka中存在大量的網(wǎng)絡(luò)數(shù)據(jù)持久化到磁盤(Producer到Broker)和磁盤文件通過網(wǎng)絡(luò)發(fā)送(Broker到Consumer)的過程。這一過程的性能直接影響Kafka的整體吞吐量。對(duì)比傳統(tǒng)模式的拷貝來看看kafka如何實(shí)現(xiàn)零拷貝
傳統(tǒng)模式下的四次拷貝與四次上下文切換
以將磁盤文件通過網(wǎng)絡(luò)發(fā)送為例。傳統(tǒng)模式下,一般使用如下偽代碼所示的方法先將文件數(shù)據(jù)讀入內(nèi)存,然后通過Socket將內(nèi)存中的數(shù)據(jù)發(fā)送出去。
buffer = File.read Socket.send(buffer)這一過程實(shí)際上發(fā)生了四次數(shù)據(jù)拷貝。首先通過系統(tǒng)調(diào)用將文件數(shù)據(jù)讀入到內(nèi)核態(tài)Buffer(DMA拷貝),然后應(yīng)用程序將內(nèi)存態(tài)Buffer數(shù)據(jù)讀入到用戶態(tài)Buffer(CPU拷貝),接著用戶程序通過Socket發(fā)送數(shù)據(jù)時(shí)將用戶態(tài)Buffer數(shù)據(jù)拷貝到內(nèi)核態(tài)Buffer(CPU拷貝),最后通過DMA拷貝將數(shù)據(jù)拷貝到NIC Buffer(網(wǎng)卡緩沖)。同時(shí),還伴隨著四次上下文切換,如下圖所示。
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-TDx7TZNY-1578199803524)(assets/BIO 四次拷貝 四次上下文切換.png)]
sendfile和transferTo實(shí)現(xiàn)零拷貝
Linux 2.4+內(nèi)核通過sendfile系統(tǒng)調(diào)用,提供了零拷貝。數(shù)據(jù)通過DMA拷貝到內(nèi)核態(tài)Buffer后,直接通過DMA(Direct Memory Access,直接內(nèi)存存取)拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減少數(shù)據(jù)拷貝外,因?yàn)檎麄€(gè)讀文件-網(wǎng)絡(luò)發(fā)送由一個(gè)sendfile調(diào)用完成,整個(gè)過程只有兩次上下文切換,因此大大提高了性能。零拷貝過程如下圖所示。
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-iXtFuD1p-1578199803524)(assets/BIO 零拷貝 兩次上下文切換.png)]
從具體實(shí)現(xiàn)來看,Kafka的數(shù)據(jù)傳輸通過TransportLayer來完成,其子類PlaintextTransportLayer通過Java NIO的FileChannel的transferTo和transferFrom方法實(shí)現(xiàn)零拷貝,如下所示。
@Override public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {return fileChannel.transferTo(position, count, socketChannel); }注: transferTo和transferFrom并不保證一定能使用零拷貝。實(shí)際上是否能使用零拷貝與操作系統(tǒng)相關(guān),如果操作系統(tǒng)提供sendfile這樣的零拷貝系統(tǒng)調(diào)用,則這兩個(gè)方法會(huì)通過這樣的系統(tǒng)調(diào)用充分利用零拷貝的優(yōu)勢(shì),否則并不能通過這兩個(gè)方法本身實(shí)現(xiàn)零拷貝。
(3)減少網(wǎng)絡(luò)開銷
批處理
批處理是一種常用的用于提高I/O性能的方式。對(duì)Kafka而言,批處理既減少了網(wǎng)絡(luò)傳輸?shù)腛verhead,又提高了寫磁盤的效率。
Kafka 0.8.1及以前的Producer區(qū)分同步Producer和異步Producer。同步Producer的send方法主要分兩種形式。一種是接受一個(gè)KeyedMessage作為參數(shù),一次發(fā)送一條消息。另一種是接受一批KeyedMessage作為參數(shù),一次性發(fā)送多條消息。而對(duì)于異步發(fā)送而言,無論是使用哪個(gè)send方法,實(shí)現(xiàn)上都不會(huì)立即將消息發(fā)送給Broker,而是先存到內(nèi)部的隊(duì)列中,直到消息條數(shù)達(dá)到閾值或者達(dá)到指定的Timeout才真正的將消息發(fā)送出去,從而實(shí)現(xiàn)了消息的批量發(fā)送。
Kafka 0.8.2開始支持新的Producer API,將同步Producer和異步Producer結(jié)合。雖然從send接口來看,一次只能發(fā)送一個(gè)ProducerRecord,而不能像之前版本的send方法一樣接受消息列表,但是send方法并非立即將消息發(fā)送出去,而是通過batch.size和linger.ms控制實(shí)際發(fā)送頻率,從而實(shí)現(xiàn)批量發(fā)送。
由于每次網(wǎng)絡(luò)傳輸,除了傳輸消息本身以外,還要傳輸非常多的網(wǎng)絡(luò)協(xié)議本身的一些內(nèi)容(稱為Overhead),所以將多條消息合并到一起傳輸,可有效減少網(wǎng)絡(luò)傳輸?shù)腛verhead,進(jìn)而提高了傳輸效率。
數(shù)據(jù)壓縮降低網(wǎng)絡(luò)負(fù)載
Kafka從0.7開始,即支持將數(shù)據(jù)壓縮后再傳輸給Broker。除了可以將每條消息單獨(dú)壓縮然后傳輸外,Kafka還支持在批量發(fā)送時(shí),將整個(gè)Batch的消息一起壓縮后傳輸。數(shù)據(jù)壓縮的一個(gè)基本原理是,重復(fù)數(shù)據(jù)越多壓縮效果越好。因此將整個(gè)Batch的數(shù)據(jù)一起壓縮能更大幅度減小數(shù)據(jù)量,從而更大程度提高網(wǎng)絡(luò)傳輸效率。
Broker接收消息后,并不直接解壓縮,而是直接將消息以壓縮后的形式持久化到磁盤。Consumer Fetch到數(shù)據(jù)后再解壓縮。因此Kafka的壓縮不僅減少了Producer到Broker的網(wǎng)絡(luò)傳輸負(fù)載,同時(shí)也降低了Broker磁盤操作的負(fù)載,也降低了Consumer與Broker間的網(wǎng)絡(luò)傳輸量,從而極大得提高了傳輸效率,提高了吞吐量。
(4)高效的序列化方式
Kafka消息的Key和Payload(或者說Value)的類型可自定義,只需同時(shí)提供相應(yīng)的序列化器和反序列化器即可。因此用戶可以通過使用快速且緊湊的序列化-反序列化方式(如Avro,Protocal Buffer)來減少實(shí)際網(wǎng)絡(luò)傳輸和磁盤存儲(chǔ)的數(shù)據(jù)規(guī)模,從而提高吞吐率。這里要注意,如果使用的序列化方法太慢,即使壓縮比非常高,最終的效率也不一定高。
參考:
https://www.jianshu.com/p/a036405f989c
https://www.jianshu.com/p/eb75372df00a
總結(jié)
以上是生活随笔為你收集整理的Kafka:Kafka核心概念的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka:分布式消息队列的抽象模型
- 下一篇: Kafka:Zero-Copy零拷贝