kafka读写速度快的原因
KAFKA是分布式發(fā)布-訂閱消息系統(tǒng),是一個(gè)分布式的,可劃分的,冗余備份的持久性的日志服務(wù)。它主要用于處理活躍的流式數(shù)據(jù)。
?
現(xiàn)在被廣泛地應(yīng)用于構(gòu)建實(shí)時(shí)數(shù)據(jù)管道和流應(yīng)用的場景中,具有橫向擴(kuò)展,容錯(cuò),快等優(yōu)點(diǎn),并已經(jīng)運(yùn)行在眾多大中型公司的生產(chǎn)環(huán)境中,成功應(yīng)用于大數(shù)據(jù)領(lǐng)域,本文分享一下我所了解的KAFKA。
?
【KAFKA高吞吐率性能揭秘】
KAFKA的第一個(gè)突出特定就是“快”,而且是那種變態(tài)的“快”,在普通廉價(jià)的虛擬機(jī)器上,比如一般SAS盤做的虛擬機(jī)上,據(jù)LINDEDIN統(tǒng)計(jì),最新的數(shù)據(jù)是每天利用KAFKA處理的消息超過1萬億條,在峰值時(shí)每秒鐘會(huì)發(fā)布超過百萬條消息,就算是在內(nèi)存和CPU都不高的情況下,Kafka的速度最高可以達(dá)到每秒十萬條數(shù)據(jù),并且還能持久化存儲(chǔ)。
?
作為消息隊(duì)列,要承接讀跟寫兩塊的功能,首先是寫,就是消息日志寫入KAFKA,那么,KAFKA在“寫”上是怎么做到寫變態(tài)快呢?
?
?
<KAFKA讓代碼飛起來之寫得快>
首先,可以使用KAFKA提供的生產(chǎn)端API發(fā)布消息到1個(gè)或多個(gè)Topic(主題)的一個(gè)(保證數(shù)據(jù)的順序)或者多個(gè)分區(qū)(并行處理,但不一定保證數(shù)據(jù)順序)。Topic可以簡單理解成一個(gè)數(shù)據(jù)類別,是用來區(qū)分不同數(shù)據(jù)的。
?
KAFKA維護(hù)一個(gè)Topic中的分區(qū)log,以順序追加的方式向各個(gè)分區(qū)中寫入消息,每個(gè)分區(qū)都是不可變的消息隊(duì)列。分區(qū)中的消息都是以k-v形式存在。
??k表示offset,稱之為偏移量,一個(gè)64位整型的唯一標(biāo)識(shí),offset代表了Topic分區(qū)中所有消息流中該消息的起始字節(jié)位置。
??v就是實(shí)際的消息內(nèi)容,每個(gè)分區(qū)中的每個(gè)offset都是唯一存在的,所有分區(qū)的消息都是一次寫入,在消息未過期之前都可以調(diào)整offset來實(shí)現(xiàn)多次讀取。
?
以上提到KAFKA“快”的第一個(gè)因素:消息順序?qū)懭氪疟P。
?
我們知道現(xiàn)在的磁盤大多數(shù)都還是機(jī)械結(jié)構(gòu)(SSD不在討論的范圍內(nèi)),如果將消息以隨機(jī)寫的方式存入磁盤,就會(huì)按柱面、磁頭、扇區(qū)的方式進(jìn)行(尋址過程),緩慢的機(jī)械運(yùn)動(dòng)(相對(duì)內(nèi)存)會(huì)消耗大量時(shí)間,導(dǎo)致磁盤的寫入速度只能達(dá)到內(nèi)存寫入速度的幾百萬分之一,為了規(guī)避隨機(jī)寫帶來的時(shí)間消耗,KAFKA采取順序?qū)懙姆绞酱鎯?chǔ)數(shù)據(jù),如下圖所示:
新來的消息只能追加到已有消息的末尾,并且已經(jīng)生產(chǎn)的消息不支持隨機(jī)刪除以及隨機(jī)訪問,但是消費(fèi)者可以通過重置offset的方式來訪問已經(jīng)消費(fèi)過的數(shù)據(jù)。
即使順序讀寫,過于頻繁的大量小I/O操作一樣會(huì)造成磁盤的瓶頸,所以KAFKA在此處的處理是把這些消息集合在一起批量發(fā)送,這樣減少對(duì)磁盤IO的過度讀寫,而不是一次發(fā)送單個(gè)消息。
另一個(gè)是無效率的字節(jié)復(fù)制,尤其是在負(fù)載比較高的情況下影響是顯著的。為了避免這種情況,KAFKA采用由Producer,broker和consumer共享的標(biāo)準(zhǔn)化二進(jìn)制消息格式,這樣數(shù)據(jù)塊就可以在它們之間自由傳輸,無需轉(zhuǎn)換,降低了字節(jié)復(fù)制的成本開銷。
同時(shí),KAFKA采用了MMAP(Memory Mapped Files,內(nèi)存映射文件)技術(shù)。很多現(xiàn)代操作系統(tǒng)都大量使用主存做磁盤緩存,一個(gè)現(xiàn)代操作系統(tǒng)可以將內(nèi)存中的所有剩余空間用作磁盤緩存,而當(dāng)內(nèi)存回收的時(shí)候幾乎沒有性能損失。
由于KAFKA是基于JVM的,并且任何與Java內(nèi)存使用打過交道的人都知道兩件事:
? 對(duì)象的內(nèi)存開銷非常高,通常是實(shí)際要存儲(chǔ)數(shù)據(jù)大小的兩倍;
? 隨著數(shù)據(jù)的增加,java的垃圾收集也會(huì)越來越頻繁并且緩慢。
基于此,使用文件系統(tǒng),同時(shí)依賴頁面緩存就比使用其他數(shù)據(jù)結(jié)構(gòu)和維護(hù)內(nèi)存緩存更有吸引力:
? 不使用進(jìn)程內(nèi)緩存,就騰出了內(nèi)存空間,可以用來存放頁面緩存的空間幾乎可以翻倍。
? 如果KAFKA重啟,進(jìn)行內(nèi)緩存就會(huì)丟失,但是使用操作系統(tǒng)的頁面緩存依然可以繼續(xù)使用。
可能有人會(huì)問KAFKA如此頻繁利用頁面緩存,如果內(nèi)存大小不夠了怎么辦?
KAFKA會(huì)將數(shù)據(jù)寫入到持久化日志中而不是刷新到磁盤。實(shí)際上它只是轉(zhuǎn)移到了內(nèi)核的頁面緩存。
利用文件系統(tǒng)并且依靠頁緩存比維護(hù)一個(gè)內(nèi)存緩存或者其他結(jié)構(gòu)要好,它可以直接利用操作系統(tǒng)的頁緩存來實(shí)現(xiàn)文件到物理內(nèi)存的直接映射。完成映射之后對(duì)物理內(nèi)存的操作在適當(dāng)時(shí)候會(huì)被同步到硬盤上。
<KAFKA讓代碼飛起來之讀得快>
KAFKA除了接收數(shù)據(jù)時(shí)寫得快,另外一個(gè)特點(diǎn)就是推送數(shù)據(jù)時(shí)發(fā)得快。
KAFKA這種消息隊(duì)列在生產(chǎn)端和消費(fèi)端分別采取的push和pull的方式,也就是你生產(chǎn)端可以認(rèn)為KAFKA是個(gè)無底洞,有多少數(shù)據(jù)可以使勁往里面推送,消費(fèi)端則是根據(jù)自己的消費(fèi)能力,需要多少數(shù)據(jù),你自己過來KAFKA這里拉取,KAFKA能保證只要這里有數(shù)據(jù),消費(fèi)端需要多少,都盡可以自己過來拿。
▲零拷貝
具體到消息的落地保存,broker維護(hù)的消息日志本身就是文件的目錄,每個(gè)文件都是二進(jìn)制保存,生產(chǎn)者和消費(fèi)者使用相同的格式來處理。維護(hù)這個(gè)公共的格式并允許優(yōu)化最重要的操作:網(wǎng)絡(luò)傳輸持久性日志塊。 現(xiàn)代的unix操作系統(tǒng)提供一個(gè)優(yōu)化的代碼路徑,用于將數(shù)據(jù)從頁緩存?zhèn)鬏數(shù)絪ocket;在Linux中,是通過sendfile系統(tǒng)調(diào)用來完成的。Java提供了訪問這個(gè)系統(tǒng)調(diào)用的方法:FileChannel.transferTo API。
要理解senfile的影響,重要的是要了解將數(shù)據(jù)從文件傳輸?shù)絪ocket的公共數(shù)據(jù)路徑,如下圖所示,數(shù)據(jù)從磁盤傳輸?shù)絪ocket要經(jīng)過以下幾個(gè)步驟:
? 操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁緩存
? 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
? 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到socket緩存中
? 操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出
這里有四次拷貝,兩次系統(tǒng)調(diào)用,這是非常低效的做法。如果使用sendfile,只需要一次拷貝就行:允許操作系統(tǒng)將數(shù)據(jù)直接從頁緩存發(fā)送到網(wǎng)絡(luò)上。所以在這個(gè)優(yōu)化的路徑中,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的。
常規(guī)文件傳輸和zeroCopy方式的性能對(duì)比:
假設(shè)一個(gè)Topic有多個(gè)消費(fèi)者的情況, 并使用上面的零拷貝優(yōu)化,數(shù)據(jù)被復(fù)制到頁緩存中一次,并在每個(gè)消費(fèi)上重復(fù)使用,而不是存儲(chǔ)在存儲(chǔ)器中,也不在每次讀取時(shí)復(fù)制到用戶空間。 這使得以接近網(wǎng)絡(luò)連接限制的速度消費(fèi)消息。
這種頁緩存和sendfile組合,意味著KAFKA集群的消費(fèi)者大多數(shù)都完全從緩存消費(fèi)消息,而磁盤沒有任何讀取活動(dòng)。
▲批量壓縮
在很多情況下,系統(tǒng)的瓶頸不是CPU或磁盤,而是網(wǎng)絡(luò)帶寬,對(duì)于需要在廣域網(wǎng)上的數(shù)據(jù)中心之間發(fā)送消息的數(shù)據(jù)流水線尤其如此。所以數(shù)據(jù)壓縮就很重要。可以每個(gè)消息都?jí)嚎s,但是壓縮率相對(duì)很低。所以KAFKA使用了批量壓縮,即將多個(gè)消息一起壓縮而不是單個(gè)消息壓縮。
KAFKA允許使用遞歸的消息集合,批量的消息可以通過壓縮的形式傳輸并且在日志中也可以保持壓縮格式,直到被消費(fèi)者解壓縮。
KAFKA支持Gzip和Snappy壓縮協(xié)議。
?
【KAFKA數(shù)據(jù)可靠性深度解讀】
?
KAFKA的消息保存在Topic中,Topic可分為多個(gè)分區(qū),為保證數(shù)據(jù)的安全性,每個(gè)分區(qū)又有多個(gè)Replia。
??多分區(qū)的設(shè)計(jì)的特點(diǎn):
1.為了并發(fā)讀寫,加快讀寫速度;
2.是利用多分區(qū)的存儲(chǔ),利于數(shù)據(jù)的均衡;
3.是為了加快數(shù)據(jù)的恢復(fù)速率,一但某臺(tái)機(jī)器掛了,整個(gè)集群只需要恢復(fù)一部分?jǐn)?shù)據(jù),可加快故障恢復(fù)的時(shí)間。
每個(gè)Partition分為多個(gè)Segment,每個(gè)Segment有.log和.index 兩個(gè)文件,每個(gè)log文件承載具體的數(shù)據(jù),每條消息都有一個(gè)遞增的offset,Index文件是對(duì)log文件的索引,Consumer查找offset時(shí)使用的是二分法根據(jù)文件名去定位到哪個(gè)Segment,然后解析msg,匹配到對(duì)應(yīng)的offset的msg。
<Partition recovery過程>
每個(gè)Partition會(huì)在磁盤記錄一個(gè)RecoveryPoint,,記錄已經(jīng)flush到磁盤的最大offset。當(dāng)broker 失敗重啟時(shí),會(huì)進(jìn)行l(wèi)oadLogs。首先會(huì)讀取該P(yáng)artition的RecoveryPoint,找到包含RecoveryPoint的segment及以后的segment, 這些segment就是可能沒有完全flush到磁盤segments。然后調(diào)用segment的recover,重新讀取各個(gè)segment的msg,并重建索引。每次重啟KAFKA的broker時(shí),都可以在輸出的日志看到重建各個(gè)索引的過程。
< 數(shù)據(jù)同步>
Producer和Consumer都只與Leader交互,每個(gè)Follower從Leader拉取數(shù)據(jù)進(jìn)行同步。
如上圖所示,ISR是所有不落后的replica集合,不落后有兩層含義:距離上次FetchRequest的時(shí)間不大于某一個(gè)值或落后的消息數(shù)不大于某一個(gè)值,Leader失敗后會(huì)從ISR中隨機(jī)選取一個(gè)Follower做Leader,該過程對(duì)用戶是透明的。
當(dāng)Producer向Broker發(fā)送數(shù)據(jù)時(shí),可以通過request.required.acks參數(shù)設(shè)置數(shù)據(jù)可靠性的級(jí)別。
此配置是表明當(dāng)一次Producer請(qǐng)求被認(rèn)為完成時(shí)的確認(rèn)值。特別是,多少個(gè)其他brokers必須已經(jīng)提交了數(shù)據(jù)到它們的log并且向它們的Leader確認(rèn)了這些信息。
?
?典型的值:
0: 表示Producer從來不等待來自broker的確認(rèn)信息。這個(gè)選擇提供了最小的時(shí)延但同時(shí)風(fēng)險(xiǎn)最大(因?yàn)楫?dāng)server宕機(jī)時(shí),數(shù)據(jù)將會(huì)丟失)。
1:表示獲得Leader replica已經(jīng)接收了數(shù)據(jù)的確認(rèn)信息。這個(gè)選擇時(shí)延較小同時(shí)確保了server確認(rèn)接收成功。
-1:Producer會(huì)獲得所有同步replicas都收到數(shù)據(jù)的確認(rèn)。同時(shí)時(shí)延最大,然而,這種方式并沒有完全消除丟失消息的風(fēng)險(xiǎn),因?yàn)橥絩eplicas的數(shù)量可能是1。如果你想確保某些replicas接收到數(shù)據(jù),那么你應(yīng)該在Topic-level設(shè)置中選項(xiàng)min.insync.replicas設(shè)置一下。
僅設(shè)置 acks= -1 也不能保證數(shù)據(jù)不丟失,當(dāng)ISR列表中只有Leader時(shí),同樣有可能造成數(shù)據(jù)丟失。要保證數(shù)據(jù)不丟除了設(shè)置acks=-1,還要保證ISR的大小大于等于2。
?
?具體參數(shù)設(shè)置:
request.required.acks:設(shè)置為-1 等待所有ISR列表中的Replica接收到消息后采算寫成功。
min.insync.replicas: 設(shè)置為>=2,保證ISR中至少兩個(gè)Replica。
Producer:要在吞吐率和數(shù)據(jù)可靠性之間做一個(gè)權(quán)衡。
KAFKA作為現(xiàn)代消息中間件中的佼佼者,以其速度和高可靠性贏得了廣大市場和用戶青睞,其中的很多設(shè)計(jì)理念都是非常值得我們學(xué)習(xí)的,本文所介紹的也只是冰山一角,希望能夠?qū)Υ蠹伊私釱AFKA有一定的作用。
轉(zhuǎn)載自:http://rdcqii.hundsun.com/portal/article/709.html
總結(jié)
以上是生活随笔為你收集整理的kafka读写速度快的原因的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: XGBoost在携程搜索排序中的应用
- 下一篇: 复方蟾酥膏_功效作用注意事项用药禁忌用法