kafka详解 转载
轉自https://blog.csdn.net/lingbo229/article/details/80761778
?
Kafka Kafka是最初由Linkedin公司開發,是一個分布式、支持分區的(partition)、多副本的(replica),基于zookeeper協調的分布式消息系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基于hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日志、訪問日志,消息服務等等,用scala語言編寫,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源 項目。 1.前言 消息隊列的性能好壞,其文件存儲機制設計是衡量一個消息隊列服務技術水平和最關鍵指標之一。下面將從Kafka文件存儲機制和物理結構角度,分析Kafka是如何實現高效文件存儲,及實際應用效果。 1.1 ?Kafka的特性: - 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。 - 可擴展性:kafka集群支持熱擴展 - 持久性、可靠性:消息被持久化到本地磁盤,并且支持數據備份防止數據丟失 - 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗) - 高并發:支持數千個客戶端同時讀寫 1.2 ? Kafka的使用場景: - 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。 - 消息系統:解耦和生產者和消費者、緩存消息等。 - 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。 - 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告。 - 流式處理:比如spark streaming和storm - 事件源 1.3 ?Kakfa的設計思想 -?Kakfa Broker Leader的選舉:Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker節點一起去Zookeeper上注冊一個臨時節點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper注冊Watch)。這個Controller會監聽其他的Kafka Broker的所有信息,如果這個kafka broker controller宕機了,在zookeeper上面的那個臨時節點就會消失,此時所有的kafka broker又會一起去Zookeeper上注冊一個臨時節點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。例如:一旦有一個broker宕機了,這個kafka broker controller會讀取該宕機broker上所有的partition在zookeeper上的狀態,并選取ISR列表中的一個replica作為partition leader(如果ISR列表中的replica全掛,選一個幸存的replica作為leader; 如果該partition的所有的replica都宕機了,則將新的leader設置為-1,等待恢復,等待ISR中的任一個Replica“活”過來,并且選它作為Leader;或選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader),這個broker宕機的事情,kafka controller也會通知zookeeper,zookeeper就會通知其他的kafka broker。 這里曾經發生過一個bug,TalkingData使用Kafka0.8.1的時候,kafka controller在Zookeeper上注冊成功后,它和Zookeeper通信的timeout時間是6s,也就是如果kafka controller如果有6s中沒有和Zookeeper做心跳,那么Zookeeper就認為這個kafka controller已經死了,就會在Zookeeper上把這個臨時節點刪掉,那么其他Kafka就會認為controller已經沒了,就會再次搶著注冊臨時節點,注冊成功的那個kafka broker成為controller,然后,之前的那個kafka controller就需要各種shut down去關閉各種節點和事件的監聽。但是當kafka的讀寫流量都非常巨大的時候,TalkingData的一個bug是,由于網絡等原因,kafka controller和Zookeeper有6s中沒有通信,于是重新選舉出了一個新的kafka controller,但是原來的controller在shut down的時候總是不成功,這個時候producer進來的message由于Kafka集群中存在兩個kafka controller而無法落地。導致數據淤積。 這里曾經還有一個bug,TalkingData使用Kafka0.8.1的時候,當ack=0的時候,表示producer發送出去message,只要對應的kafka broker topic partition leader接收到的這條message,producer就返回成功,不管partition leader 是否真的成功把message真正存到kafka。當ack=1的時候,表示producer發送出去message,同步的把message存到對應topic的partition的leader上,然后producer就返回成功,partition leader異步的把message同步到其他partition replica上。當ack=all或-1,表示producer發送出去message,同步的把message存到對應topic的partition的leader和對應的replica上之后,才返回成功。但是如果某個kafka controller?切換的時候,會導致partition leader的切換(老的?kafka controller上面的partition leader會選舉到其他的kafka broker上),但是這樣就會導致丟數據。 -??Consumergroup:各個consumer(consumer 線程)可以組成一個組(Consumer group?),partition中的每個message只能被組(Consumer group?)中的一個consumer(consumer 線程)消費,如果一個message可以被多個consumer(consumer 線程)消費的話,那么這些consumer必須在不同的組。Kafka不支持一個partition中的message由兩個或兩個以上的同一個consumer group下的consumer thread來處理,除非再啟動一個新的consumer group。所以如果想同時對一個topic做消費的話,啟動多個consumer group就可以了,但是要注意的是,這里的多個consumer的消費都必須是順序讀取partition里面的message,新啟動的consumer默認從partition隊列最頭端最新的地方開始阻塞的讀message。它不能像AMQ那樣可以多個BET作為consumer去互斥的(for update悲觀鎖)并發處理message,這是因為多個BET去消費一個Queue中的數據的時候,由于要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的性能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許同一個consumer group下的一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那么再加新的consumer thread去消費。如果想多個不同的業務都需要這個topic的數據,起多個consumer group就好了,大家都是順序的讀取message,offsite的值互不影響。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分布式消費的概念。 當啟動一個consumer group去消費一個topic的時候,無論topic里面有多個少個partition,無論我們consumer group里面配置了多少個consumer thread,這個consumer group下面的所有consumer thread一定會消費全部的partition;即便這個consumer group下只有一個consumer thread,那么這個consumer thread也會去消費所有的partition。因此,最優的設計就是,consumer group下的consumer thread的數量等于partition數量,這樣效率是最高的。 同一partition的一條message只能被同一個Consumer Group內的一個Consumer消費。不能夠一個consumer group的多個consumer同時消費一個partition。 一個consumer group下,無論有多少個consumer,這個consumer group一定回去把這個topic下所有的partition都消費了。當consumer group里面的consumer數量小于這個topic下的partition數量的時候,如下圖groupA,groupB,就會出現一個conusmer thread消費多個partition的情況,總之是這個topic下的partition都會被消費。如果consumer group里面的consumer數量等于這個topic下的partition數量的時候,如下圖groupC,此時效率是最高的,每個partition都有一個consumer thread去消費。當consumer group里面的consumer數量大于這個topic下的partition數量的時候,如下圖GroupD,就會有一個consumer thread空閑。因此,我們在設定consumer group的時候,只需要指明里面有幾個consumer數量即可,無需指定對應的消費partition序號,consumer會自動進行rebalance。 多個Consumer Group下的consumer可以消費同一條message,但是這種消費也是以o(1)的方式順序的讀取message去消費,,所以一定會重復消費這批message的,不能向AMQ那樣多個BET作為consumer消費(對message加鎖,消費的時候不能重復消費message) -?Consumer Rebalance的觸發條件:(1)Consumer增加或刪除會觸發 Consumer Group的Rebalance(2)Broker的增加或者減少都會觸發 Consumer Rebalance -?Consumer:?Consumer處理partition里面的message的時候是o(1)順序讀取的。所以必須維護著上一次讀到哪里的offsite信息。high level API,offset存于Zookeeper中,low level API的offset由自己維護。一般來說都是使用high level api的。Consumer的delivery gurarantee,默認是讀完message先commmit再處理message,autocommit默認是true,這時候先commit就會更新offsite+1,一旦處理失敗,offsite已經+1,這個時候就會丟message;也可以配置成讀完消息處理再commit,這種情況下consumer端的響應就會比較慢的,需要等處理完才行。 一般情況下,一定是一個consumer group處理一個topic的message。Best Practice是這個consumer group里面consumer的數量等于topic里面partition的數量,這樣效率是最高的,一個consumer thread處理一個partition。如果這個consumer group里面consumer的數量小于topic里面partition的數量,就會有consumer thread同時處理多個partition(這個是kafka自動的機制,我們不用指定),但是總之這個topic里面的所有partition都會被處理到的。。如果這個consumer group里面consumer的數量大于topic里面partition的數量,多出的consumer thread就會閑著啥也不干,剩下的是一個consumer thread處理一個partition,這就造成了資源的浪費,因為一個partition不可能被兩個consumer thread去處理。所以我們線上的分布式多個service服務,每個service里面的kafka consumer數量都小于對應的topic的partition數量,但是所有服務的consumer數量只和等于partition的數量,這是因為分布式service服務的所有consumer都來自一個consumer group,如果來自不同的consumer group就會處理重復的message了(同一個consumer group下的consumer不能處理同一個partition,不同的consumer group可以處理同一個topic,那么都是順序處理message,一定會處理重復的。一般這種情況都是兩個不同的業務邏輯,才會啟動兩個consumer group來處理一個topic)。 如果producer的流量增大,當前的topic的parition數量=consumer數量,這時候的應對方式就是很想擴展:增加topic下的partition,同時增加這個consumer group下的consumer。 ? ? ? ? ? ? ?? -?Delivery Mode :?Kafka producer 發送message不用維護message的offsite信息,因為這個時候,offsite就相當于一個自增id,producer就盡管發送message就好了。而且Kafka與AMQ不同,AMQ大都用在處理業務邏輯上,而Kafka大都是日志,所以Kafka的producer一般都是大批量的batch發送message,向這個topic一次性發送一大批message,load balance到一個partition上,一起插進去,offsite作為自增id自己增加就好。但是Consumer端是需要維護這個partition當前消費到哪個message的offsite信息的,這個offsite信息,high level api是維護在Zookeeper上,low level api是自己的程序維護。(Kafka管理界面上只能顯示high level api的consumer部分,因為low level api的partition offsite信息是程序自己維護,kafka是不知道的,無法在管理界面上展示 )當使用high level api的時候,先拿message處理,再定時自動commit offsite+1(也可以改成手動), 并且kakfa處理message是沒有鎖操作的。因此如果處理message失敗,此時還沒有commit offsite+1,當consumer thread重啟后會重復消費這個message。但是作為高吞吐量高并發的實時處理系統,at least once的情況下,至少一次會被處理到,是可以容忍的。如果無法容忍,就得使用low level api來自己程序維護這個offsite信息,那么想什么時候commit offsite+1就自己搞定了。 -?Topic & Partition:Topic相當于傳統消息系統MQ中的一個隊列queue,producer端發送的message必須指定是發送到哪個topic,但是不需要指定topic下的哪個partition,因為kafka會把收到的message進行load balance,均勻的分布在這個topic下的不同的partition上( hash(message) % [broker數量] ?)。物理上存儲上,這個topic會分成一個或多個partition,每個partiton相當于是一個子queue。在物理結構上,每個partition對應一個物理的目錄(文件夾),文件夾命名是[topicname]_[partition]_[序號],一個topic可以有無數多的partition,根據業務需求和數據量來設置。在kafka配置文件中可隨時更高num.partitions參數來配置更改topic的partition數量,在創建Topic時通過參數指定parittion數量。Topic創建之后通過Kafka提供的工具也可以修改partiton數量。 一般來說,(1)一個Topic的Partition數量大于等于Broker的數量,可以提高吞吐率。(2)同一個Partition的Replica盡量分散到不同的機器,高可用。 當add a new partition的時候,partition里面的message不會重新進行分配,原來的partition里面的message數據不會變,新加的這個partition剛開始是空的,隨后進入這個topic的message就會重新參與所有partition的load balance -?Partition Replica:每個partition可以在其他的kafka broker節點上存副本,以便某個kafka broker節點宕機不會影響這個kafka集群。存replica副本的方式是按照kafka broker的順序存。例如有5個kafka broker節點,某個topic有3個partition,每個partition存2個副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此類推(replica副本數目不能大于kafka broker節點的數目,否則報錯。這里的replica數其實就是partition的副本總數,其中包括一個leader,其他的就是copy副本)。這樣如果某個broker宕機,其實整個kafka內數據依然是完整的。但是,replica副本數越高,系統雖然越穩定,但是回來帶資源和性能上的下降;replica副本少的話,也會造成系統丟數據的風險。 (1)怎樣傳送消息:producer先把message發送到partition leader,再由leader發送給其他partition follower。(如果讓producer發送給每個replica那就太慢了) (2)在向Producer發送ACK前需要保證有多少個Replica已經收到該消息:根據ack配的個數而定 (3)怎樣處理某個Replica不工作的情況:如果這個部工作的partition replica不在ack列表中,就是producer在發送消息到partition leader上,partition leader向partition follower發送message沒有響應而已,這個不會影響整個系統,也不會有什么問題。如果這個不工作的partition replica在ack列表中的話,producer發送的message的時候會等待這個不工作的partition replca寫message成功,但是會等到time out,然后返回失敗因為某個ack列表中的partition replica沒有響應,此時kafka會自動的把這個部工作的partition replica從ack列表中移除,以后的producer發送message的時候就不會有這個ack列表下的這個部工作的partition replica了。? (4)怎樣處理Failed Replica恢復回來的情況:如果這個partition replica之前不在ack列表中,那么啟動后重新受Zookeeper管理即可,之后producer發送message的時候,partition leader會繼續發送message到這個partition follower上。如果這個partition replica之前在ack列表中,此時重啟后,需要把這個partition replica再手動加到ack列表中。(ack列表是手動添加的,出現某個部工作的partition replica的時候自動從ack列表中移除的) -?Partition leader與follower:partition也有leader和follower之分。leader是主partition,producer寫kafka的時候先寫partition leader,再由partition leader push給其他的partition follower。partition leader與follower的信息受Zookeeper控制,一旦partition leader所在的broker節點宕機,zookeeper會沖其他的broker的partition follower上選擇follower變為parition leader。 -?Topic分配partition和partition replica的算法:(1)將Broker(size=n)和待分配的Partition排序。(2)將第i個Partition分配到第(i%n)個Broker上。(3)將第i個Partition的第j個Replica分配到第((i + j) % n)個Broker上 -?消息投遞可靠性 一個消息如何算投遞成功,Kafka提供了三種模式: - 第一種是啥都不管,發送出去就當作成功,這種情況當然不能保證消息成功投遞到broker; - 第二種是Master-Slave模型,只有當Master和所有Slave都接收到消息時,才算投遞成功,這種模型提供了最高的投遞可靠性,但是損傷了性能; - 第三種模型,即只要Master確認收到消息就算投遞成功;實際使用時,根據應用特性選擇,絕大多數情況下都會中和可靠性和性能選擇第三種模型 消息在broker上的可靠性,因為消息會持久化到磁盤上,所以如果正常stop一個broker,其上的數據不會丟失;但是如果不正常stop,可能會使存在頁面緩存來不及寫入磁盤的消息丟失,這可以通過配置flush頁面緩存的周期、閾值緩解,但是同樣會頻繁的寫磁盤會影響性能,又是一個選擇題,根據實際情況配置。 消息消費的可靠性,Kafka提供的是“At least once”模型,因為消息的讀取進度由offset提供,offset可以由消費者自己維護也可以維護在zookeeper里,但是當消息消費后consumer掛掉,offset沒有即時寫回,就有可能發生重復讀的情況,這種情況同樣可以通過調整commit offset周期、閾值緩解,甚至消費者自己把消費和commit offset做成一個事務解決,但是如果你的應用不在乎重復消費,那就干脆不要解決,以換取最大的性能。 -?Partition ack:當ack=1,表示producer寫partition leader成功后,broker就返回成功,無論其他的partition follower是否寫成功。當ack=2,表示producer寫partition leader和其他一個follower成功的時候,broker就返回成功,無論其他的partition follower是否寫成功。當ack=-1[parition的數量]的時候,表示只有producer全部寫成功的時候,才算成功,kafka broker才返回成功信息。這里需要注意的是,如果ack=1的時候,一旦有個broker宕機導致partition的follower和leader切換,會導致丟數據。 -?message狀態:在Kafka中,消息的狀態被保存在consumer中,broker不會關心哪個消息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味著如果consumer處理不好的話,broker上的一個消息可能會被消費多次。 -?message持久化:Kafka中會把消息持久化到本地文件系統中,并且保持o(1)極高的效率。我們眾所周知IO讀取是非常耗資源的性能也是最慢的,這就是為了數據庫的瓶頸經常在IO上,需要換SSD硬盤的原因。但是Kafka作為吞吐量極高的MQ,卻可以非常高效的message持久化到文件。這是因為Kafka是順序寫入o(1)的時間復雜度,速度非常快。也是高吞吐量的原因。由于message的寫入持久化是順序寫入的,因此message在被消費的時候也是按順序被消費的,保證partition的message是順序消費的。一般的機器,單機每秒100k條數據。 -?message有效期:Kafka會長久保留其中的消息,以便consumer可以多次消費,當然其中很多細節是可配置的。 -?Produer :?Producer向Topic發送message,不需要指定partition,直接發送就好了。kafka通過partition ack來控制是否發送成功并把信息返回給producer,producer可以有任意多的thread,這些kafka服務器端是不care的。Producer端的delivery guarantee默認是At least once的。也可以設置Producer異步發送實現At most once。Producer可以用主鍵冪等性實現Exactly once -?Kafka高吞吐量: Kafka的高吞吐量體現在讀寫上,分布式并發的讀和寫都非常快,寫的性能體現在以o(1)的時間復雜度進行順序寫入。讀的性能體現在以o(1)的時間復雜度進行順序讀取,?對topic進行partition分區,consume group中的consume線程可以以很高能性能進行順序讀。 - Kafka delivery guarantee(message傳送保證):(1)At most once消息可能會丟,絕對不會重復傳輸;(2)At least once 消息絕對不會丟,但是可能會重復傳輸;(3)Exactly once每條信息肯定會被傳輸一次且僅傳輸一次,這是用戶想要的。 -?批量發送:Kafka支持以消息集合為單位進行批量發送,以提高push效率。 -?push-and-pull?: Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,兩者對消息的生產和消費是異步的。 -?Kafka集群中broker之間的關系:不是主從關系,各個broker在集群中地位一樣,我們可以隨意的增加或刪除任何一個broker節點。 -?負載均衡方面: Kafka提供了一個 metadata API來管理broker之間的負載(對Kafka0.8.x而言,對于0.7.x主要靠zookeeper來實現負載均衡)。 -?同步異步:Producer采用異步push方式,極大提高Kafka系統的吞吐率(可以通過參數控制是采用同步還是異步方式)。 -?分區機制partition:Kafka的broker端支持消息分區partition,Producer可以決定把消息發到哪個partition,在一個partition?中message的順序就是Producer發送消息的順序,一個topic中可以有多個partition,具體partition的數量是可配置的。partition的概念使得kafka作為MQ可以橫向擴展,吞吐量巨大。partition可以設置replica副本,replica副本存在不同的kafka broker節點上,第一個partition是leader,其他的是follower,message先寫到partition leader上,再由partition leader push到parition follower上。所以說kafka可以水平擴展,也就是擴展partition。 -?離線數據裝載:Kafka由于對可拓展的數據持久化的支持,它也非常適合向Hadoop或者數據倉庫中進行數據裝載。 -?實時數據與離線數據:kafka既支持離線數據也支持實時數據,因為kafka的message持久化到文件,并可以設置有效期,因此可以把kafka作為一個高效的存儲來使用,可以作為離線數據供后面的分析。當然作為分布式實時消息系統,大多數情況下還是用于實時的數據處理的,但是當cosumer消費能力下降的時候可以通過message的持久化在淤積數據在kafka。 -?插件支持:現在不少活躍的社區已經開發出不少插件來拓展Kafka的功能,如用來配合Storm、Hadoop、flume相關的插件。 -?解耦: ?相當于一個MQ,使得Producer和Consumer之間異步的操作,系統之間解耦 -?冗余: ?replica有多個副本,保證一個broker node宕機后不會影響整個服務 -?擴展性: ?broker節點可以水平擴展,partition也可以水平增加,partition replica也可以水平增加 -?峰值: ?在訪問量劇增的情況下,kafka水平擴展, 應用仍然需要繼續發揮作用 -?可恢復性: ?系統的一部分組件失效時,由于有partition的replica副本,不會影響到整個系統。 -?順序保證性:由于kafka的producer的寫message與consumer去讀message都是順序的讀寫,保證了高效的性能。 -?緩沖:由于producer那面可能業務很簡單,而后端consumer業務會很復雜并有數據庫的操作,因此肯定是producer會比consumer處理速度快,如果沒有kafka,producer直接調用consumer,那么就會造成整個系統的處理速度慢,加一層kafka作為MQ,可以起到緩沖的作用。 -?異步通信:作為MQ,Producer與Consumer異步通信2.Kafka文件存儲機制
2.1 Kafka部分名詞解釋如下: Kafka中發布訂閱的對象是topic。我們可以為每類數據創建一個topic,把向topic發布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和consumers可以同時從多個topic讀寫數據。一個kafka集群由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。- Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群。
- Topic:一類消息,消息存放的目錄即主題,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時負責多個topic的分發。
- Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列
- Segment:partition物理上由多個segment組成,每個Segment存著message信息
- Producer?: 生產message發送到topic
- Consumer?: 訂閱topic消費message, consumer作為一個線程來消費
- Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置文件中配置好的。各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group ) 中的一個consumer(consumer 線程 )消費,如果一個message可以被多個consumer(consumer 線程 ) 消費的話,那么這些consumer必須在不同的組。Kafka不支持一個partition中的message由兩個或兩個以上的consumer thread來處理,即便是來自不同的consumer group的也不行。它不能像AMQ那樣可以多個BET作為consumer去處理message,這是因為多個BET去消費一個Queue中的數據的時候,由于要保證不能多個線程拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的性能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那么再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分布式消費的概念。
- 2.2 kafka一些原理概念
- 2.3 ?kafka拓撲結構
分析過程分為以下4個步驟:
- topic中partition存儲分布
- partiton中文件存儲方式 (partition在linux服務器上就是一個目錄(文件夾))
- partiton中segment文件存儲結構
- 在partition中如何通過offset查找message
通過上述4過程詳細分析,我們就可以清楚認識到kafka文件存儲機制的奧秘。
2.3 topic中partition存儲分布假設實驗環境中Kafka集群只有一個broker,xxx/message-folder為數據文件存儲根目錄,在Kafka broker中server.properties文件配置(參數log.dirs=xxx/message-folder),例如創建2個topic名 稱分別為report_push、launch_info, partitions數量都為partitions=4
存儲路徑和目錄規則為:
xxx/message-folder
|--report_push-0? |--report_push-1
? |--report_push-2
? |--report_push-3
? |--launch_info-0
? |--launch_info-1
? |--launch_info-2
? |--launch_info-3 在Kafka文件存儲中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。 消息發送時都被發送到一個topic,其本質就是一個目錄,而topic由是由一些Partition組成,其組織結構如下圖所示: 我們可以看到,Partition是一個Queue的結構,每個Partition中的消息都是有序的,生產的消息被不斷追加到Partition上,其中的每一個消息都被賦予了一個唯一的offset值。 Kafka集群會保存所有的消息,不管消息有沒有被消費;我們可以設定消息的過期時間,只有過期的數據才會被自動清除以釋放磁盤空間。比如我們設置消息過期時間為2天,那么這2天內的所有消息都會被保存到集群中,數據只有超過了兩天才會被清除。 Kafka只維護在Partition中的offset值,因為這個offsite標識著這個partition的message消費到哪條了。Consumer每消費一個消息,offset就會加1。其實消息的狀態完全是由Consumer控制的,Consumer可以跟蹤和重設這個offset值,這樣的話Consumer就可以讀取任意位置的消息。 把消息日志以Partition的形式存放有多重考慮,第一,方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;第二就是可以提高并發,因為可以以Partition為單位讀寫了。 通過上面介紹的我們可以知道,kafka中的數據是持久化的并且能夠容錯的。Kafka允許用戶為每個topic設置副本數量,副本數量決定了有幾個broker來存放寫入的數據。如果你的副本數量設置為3,那么一份數據就會被存放在3臺不同的機器上,那么就允許有2個機器失敗。一般推薦副本數量至少為2,這樣就可以保證增減、重啟機器時不會影響到數據消費。如果對數據持久化有更高的要求,可以把副本數量設置為3或者更多。 Kafka中的topic是以partition的形式存放的,每一個topic都可以設置它的partition數量,Partition的數量決定了組成topic的message的數量。Producer在生產數據時,會按照一定規則(這個規則是可以自定義的)把消息發布到topic的各個partition中。上面將的副本都是以partition為單位的,不過只有一個partition的副本會被選舉成leader作為讀寫用。 關于如何設置partition值需要考慮的因素。一個partition只能被一個消費者消費(一個消費者可以同時消費多個partition),因此,如果設置的partition的數量小于consumer的數量,就會有消費者消費不到數據。所以,推薦partition的數量一定要大于同時運行的consumer的數量。另外一方面,建議partition的數量大于集群broker的數量,這樣leader partition就可以均勻的分布在各個broker中,最終使得集群負載均衡。在Cloudera,每個topic都有上百個partition。需要注意的是,kafka需要為每個partition分配一些內存來緩存消息數據,如果partition數量越大,就要為kafka分配更大的heap space。 2.4 partiton中文件存儲方式
- 每個partion(目錄)相當于一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每個段segment file消息數量不一定相等,這種特性方便old segment file快速被刪除。
- 每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數決定。
這樣做的好處就是能快速刪除無用文件,有效提高磁盤利用率。
2.5 partiton中segment文件存儲結構 producer發message到某個topic,message會被均勻的分布到多個partition上(隨機或根據用戶指定的回調函數進行分布),kafka broker收到message往對應partition的最后一個segment上添加該消息,當某個segment上的消息條數達到配置值或消息發布時間超過閾值時,segment上的消息會被flush到磁盤,只有flush到磁盤上的消息consumer才能消費,segment達到一定的大小后將不會再往該segment寫數據,broker會創建新的segment。 每個part在內存中對應一個index,記錄每個segment中的第一條消息偏移。- segment file組成:由2大部分組成,分別為index file和data file,此2個文件一一對應,成對出現,后綴".index"和“.log”分別表示為segment索引文件、數據文件.
- segment文件命名規則:partion全局的第一個segment從0開始,后續每個segment文件名為上一個全局partion的最大offset(偏移message數)。數值最大為64位long大小,19位數字字符長度,沒有數字用0填充。
下面文件列表是筆者在Kafka broker上做的一個實驗,創建一個topicXXX包含1 partition,設置每個segment大小為500MB,并啟動producer向Kafka broker寫入大量數據,如下圖2所示segment文件列表形象說明了上述2個規則:
以上述圖2中一對segment file文件為例,說明segment中index<—->data file對應關系物理結構如下:
上述圖3中索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。其中以索引文件中 元數據3,497為例,依次在數據文件中表示第3個message(在全局partiton表示第368772個message)、以及該消息的物理偏移 地址為497。
從上述圖3了解到segment data file由許多message組成,下面詳細說明message物理結構如下:
參數說明:
| 8 byte offset | 在parition(分區)內的每條消息都有一個有序的id號,這個id號被稱為偏移(offset),它可以唯一確定每條消息在parition(分區)內的位置。即offset表示partiion的第多少message |
| 4 byte message size | message大小 |
| 4 byte CRC32 | 用crc32校驗message |
| 1 byte “magic" | 表示本次發布Kafka服務程序協議版本號 |
| 1 byte “attributes" | 表示為獨立版本、或標識壓縮類型、或編碼類型。 |
| 4 byte key length | 表示key的長度,當key為-1時,K byte key字段不填 |
| K byte key | 可選 |
| value bytes payload | 表示實際消息數據。 |
例如讀取offset=368776的message,需要通過下面2個步驟查找。
-
第一步查找segment file
上述圖2為例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)為0.第二個文件 00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1.同樣,第三個文件00000000000000737337.index的起始偏移量為737338=737337 + 1,其他后續文件依次類推,以起始偏移量命名并排序這些文件,只要根據offset **二分查找**文件列表,就可以快速定位到具體文件。
當offset=368776時定位到00000000000000368769.index|log
-
第二步通過segment file查找message通過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和 00000000000000368769.log的物理偏移地址,然后再通過00000000000000368769.log順序查找直到 offset=368776為止。
Kafka高效文件存儲設計特點
- Kafka把topic中一個parition大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經消費完文件,減少磁盤占用。
- 通過索引信息可以快速定位message和確定response的最大大小。
- 通過index元數據全部映射到memory,可以避免segment file的IO磁盤操作。
- 通過索引文件稀疏存儲,可以大幅降低index文件元數據占用空間大小。
1. Kafka集群partition?replication默認自動分配分析
下面以一個Kafka集群中4個Broker舉例,創建1個topic包含4個Partition,2 Replication;數據Producer流動如圖所示:
(1)
?
?
(2)當集群中新增2節點,Partition增加到6個時分布情況如下:
?
副本分配邏輯規則如下:
- 在Kafka集群中,每個Broker都有均等分配Partition的Leader機會。
- 上述圖Broker Partition中,箭頭指向為副本,以Partition-0為例:broker1中parition-0為Leader,Broker2中Partition-0為副本。
- 上述圖種每個Broker(按照BrokerId有序)依次分配主Partition,下一個Broker為副本,如此循環迭代分配,多副本都遵循此規則。
- 將所有N Broker和待分配的i個Partition排序.
- 將第i個Partition分配到第(i mod n)個Broker上.
- 將第i個Partition的第j個副本分配到第((i + j) mod n)個Broker上.
生產者客戶端應用程序產生消息:
消費者客戶端應用程序消費消息:
2.??不創建單獨的cache,使用系統的page cache。發布者順序發布,訂閱者通常比發布者滯后一點點,直接使用Linux的page cache效果也比較后,同時減少了cache管理及垃圾收集的開銷。
3.??使用sendfile優化網絡傳輸,減少一次內存拷貝。 6.Kafka 與 Zookeeper 6.1 Zookeeper?協調控制 1.?管理broker與consumer的動態加入與離開。(Producer不需要管理,隨便一臺計算機都可以作為Producer向Kakfa Broker發消息) 2.?觸發負載均衡,當broker或consumer加入或離開時會觸發負載均衡算法,使得一 個consumer group內的多個consumer的消費負載平衡。(因為一個comsumer消費一個或多個partition,一個partition只能被一個consumer消費)3.??維護消費關系及每個partition的消費信息。
?
6.2 Zookeeper上的細節:
1.?每個broker啟動后會在zookeeper上注冊一個臨時的broker registry,包含broker的ip地址和端口號,所存儲的topics和partitions信息。
2.?每個consumer啟動后會在zookeeper上注冊一個臨時的consumer registry:包含consumer所屬的consumer group以及訂閱的topics。
3.?每個consumer group關聯一個臨時的owner registry和一個持久的offset registry。對于被訂閱的每個partition包含一個owner registry,內容為訂閱這個partition的consumer id;同時包含一個offset registry,內容為上一次訂閱的offset。
?
轉載于:https://www.cnblogs.com/Rubick7/p/10916791.html
總結
以上是生活随笔為你收集整理的kafka详解 转载的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Winserver-Exception
- 下一篇: cdn缓存的研究