分布式公布订阅消息系统 Kafka 架构设计
我們為什么要搭建該系統
Kafka是一個消息系統,原本開發自LinkedIn,用作LinkedIn的活動流(activity stream)和運營數據處理管道(pipeline)的基礎。
如今它已為多家不同類型的公司作為多種類型的數據管道(data
 pipeline)和消息系統使用。
活動流數據是全部站點在對其站點使用情況做報表時要用到的數據中最常規的部分。活動數據包含頁面訪問量(page view)、被查看內容方面的信息以及搜索情況等內容。這樣的數據通常的處理方式是先把各種活動以日志的形式寫入某種文件。然后周期性地對這些文件進行統計分析。運營數據指的是server的性能數據(CPU、IO使用率、請求時間、服務日志等等數據)。運營數據的統計方法種類繁多。
近年來,活動和運營數據處理已經成為了站點軟件產品特性中一個至關重要的組成部分。這就須要一套略微更加復雜的基礎設施對其提供支持。
活動流和運營數據的若干用例
"動態匯總(News feed)"功能。將你朋友的各種活動信息廣播給你相關性以及排序。通過使用計數評級(count rating)、投票(votes)或者點擊率( click-through)判定一組給定的條目中那一項是最相關的.安全:站點須要屏蔽行為不端的網絡爬蟲(crawler),對API的使用進行速率限制,探測出擴散垃圾信息的企圖。并支撐其他的行為探測和預防體系,以切斷站點的某些不正常活動。
運營監控:大多數站點都須要某種形式的實時且隨機應變的方式,對站點執行效率進行監控并在有問題出現的情況下能觸發警告。
報表和批處理: 將數據裝載到數據倉庫或者Hadoop系統中進行離線分析。然后針對業務行為做出對應的報表,這樣的做法非常普遍。
活動流數據的特點
這樣的由不可變(immutable)的活動數據組成的高吞吐量數據流代表了對計算能力的一種真正的挑戰,因其數據量非常easy就可能會比站點中位于第二位的數據源的數據量大10到100倍。
傳統的日志文件統計分析對報表和批處理這樣的離線處理的情況來說,是一種非常不錯且非常有伸縮性的方法;可是這樣的方法對于實時處理來說其時延太大,并且還具有較高的運營復雜度。
還有一方面?,F有的消息隊列系統(messaging and queuing system)卻非常適合于在實時或近實時(near-real-time)的情況下使用,但它們對非常長的未被處理的消息隊列的處理非常不給力,往往并不將數據持久化作為首要的事情考慮。這樣就會造成一種情況,就是當把大量數據傳送給Hadoop這樣的離線系統后。 這些離線系統每一個小時或每天僅能處理掉部分源數據。
Kafka的目的就是要成為一個隊列平臺,只使用它就行既支持離線又支持在線使用這兩種情況。
Kafka支持很通用的消息語義(messaging semantics)。雖然我們這篇文章主要是想把它用于活動處理。但并沒有不論什么限制性條件使得它只適用于此目的。
部署
以下的示意圖所看到的是在LinkedIn中部署后各系統形成的拓撲結構。
要注意的是,一個單個的Kafka集群系統用于處理來自各種不同來源的全部活動數據。它同一時候為在線和離線的數據使用者提供了一個單個的數據管道,在線活動和異步處理之間形成了一個緩沖區層。我們還使用kafka。把全部數據復制(replicate)到另外一個不同的數據中心去做離線處理。
我們并不想讓一個單個的Kafka集群系統跨越多個數據中心,而是想讓Kafka支持多數據中心的數據流拓撲結構。
這是通過在集群之間進行鏡像或“同步”實現的。這個功能很easy,鏡像集群僅僅是作為源集群的數據使用者的角色執行。這意味著,一個單個的集群就行將來自多個數據中心的數據集中到一個位置。
以下所看到的是可用于支持批量裝載(batch loads)的多數據中心拓撲結構的一個樣例:
請注意。在圖中上面部分的兩個集群之間不存在通信連接,兩者可能大小不同,具有不同數量的節點。以下部分中的這個單個的集群能夠鏡像隨意數量的源集群。
要了解鏡像功能使用方面的很多其它細節,請訪問這里.
基本的設計元素
Kafka之所以和其他絕大多數信息系統不同,是由于以下這幾個為數不多的比較重要的設計決策:
Kafka在設計之時為就將持久化消息作為通常的使用情況進行了考慮。基本的設計約束是吞吐量而不是功能。有關哪些數據已經被使用了的狀態信息保存為數據使用者(consumer)的一部分,而不是保存在server之上。Kafka是一種顯式的分布式系統。它如果,數據生產者(producer)、代理(brokers)和數據使用者(consumer)分散于多臺機器之上。
以上這些設計決策將在下文中進行逐條詳述。
基礎知識
首先來看一些主要的術語和概念。
消息指的是通信的基本單位。由消息生產者(producer)公布關于某話題(topic)的消息,這句話的意思是。消息以一種物理方式被發送給了作為代理(broker)的server(可能是另外一臺機器)。若干的消息使用者(consumer)訂閱(subscribe)某個話題,然后生產者所公布的每條消息都會被發送給全部的使用者。
Kafka是一個顯式的分布式系統 —— 生產者、使用者和代理都能夠執行在作為一個邏輯單位的、進行相互協作的集群中不同的機器上。對于代理和生產者。這么做很自然,但使用者卻須要一些特殊的支持。
每一個使用者進程都屬于一個使用者小組(consumer group)。
準確地講,每條消息都僅僅會發送給每一個使用者小組中的一個進程。因此,使用者小組使得很多進程或多臺機器在邏輯上作為一個單個的使用者出現。使用者小組這個概念很強大。能夠用來支持JMS中隊列(queue)或者話題(topic)這兩種語義。
為了支持隊列語義,我們能夠將全部的使用者組成一個單個的使用者小組。在這樣的情況下,每條消息都會發送給一個單個的使用者。為了支持話題語義。能夠將每一個使用者分到它自己的使用者小組中,隨后全部的使用者將接收到每一條消息。在我們的使用其中,一種更常見的情況是,我們依照邏輯劃分出多個使用者小組。每一個小組都是有作為一個邏輯總體的多臺使用者計算機組成的集群。在大數據的情況下,Kafka有個額外的長處,對于一個話題而言。不管有多少使用者訂閱了它,一條條消息都僅僅會存儲一次。
消息持久化(Message Persistence)及其緩存
不要害怕文件系統!
在對消息進行存儲和緩存時,Kafka嚴重地依賴于文件系統。 大家普遍覺得“磁盤非常慢”,因而人們都對持久化結(persistent structure)構能夠提供說得過去的性能抱有懷疑態度。
實際上。同人們的期望值相比。磁盤能夠說是既非常慢又非??欤@取決于磁盤的使用方式。
設計的非常好的磁盤結構往往能夠和網絡一樣快。
磁盤性能方面最關鍵的一個事實是,在過去的十幾年中,硬盤的吞吐量正在變得和磁盤尋道時間嚴重不一致了。結果,在一個由6個7200rpm的SATA硬盤組成的RAID-5磁盤陣列上,線性寫入(linear write)的速度大約是300MB/秒,但隨即寫入卻僅僅有50k/秒,當中的區別接近10000倍。
線性讀取和寫入是全部使用模式中最具可估計性的一種方式。因而操作系統採用預讀(read-ahead)和后寫(write-behind)技術對磁盤讀寫進行探測并優化后效果也不錯。預讀就是提前將一個比較大的磁盤塊中內容讀入內存,后寫是將一些較小的邏輯寫入操作合并起來組成比較大的物理寫入操作。
關于這個問題更深入的討論請參考這篇文章ACM
 Queue article;實際上他們發現,在某些情況下。順序磁盤訪問可以比隨即內存訪問還要快。
為了抵消這樣的性能上的波動,現代操作系變得越來越積極地將主內存用作磁盤緩存。全部現代的操作系統都會樂于將全部空暇內存轉做磁盤緩存。即時在須要回收這些內存的情況下會付出一些性能方面的代價。
全部的磁盤讀寫操作都須要經過這個統一的緩存。想要舍棄這個特性都不太easy。除非使用直接I/O。因此,對于一個進程而言。即使它在進程內的緩存中保存了一份數據,這份數據也可能在OS的頁面緩存(pagecache)中有反復的一份。結構就成了一份數據保存了兩次。
更進一步講,我們是在JVM的基礎之上開發的系統,僅僅要是了解過一些Java中內存用法的人都知道這兩點:
Java對象的內存開銷(overhead)很大。往往是對象中存儲的數據所占內存的兩倍(或更糟)。Java中的內存垃圾回收會隨著堆內數據不斷增長而變得越來越不明白,回收所花費的代價也會越來越大。
由于這些因素。使用文件系統并依賴于頁面緩存要優于自己在內存中維護一個緩存或者什么別的結構 —— 通過對全部空暇內存自己主動擁有訪問權,我們至少將可用的緩存大小翻了一倍,然后通過保存壓縮后的字節結構而非單個對象,緩存可用大小接著可能又翻了一倍。這么做下來。在GC性能不受損失的情況下,我們可在一臺擁有32G內存的機器上獲得高達28到30G的緩存。并且,這樣的緩存即使在服務重新啟動之后會仍然保持有效。而不象進程內緩存。進程重新啟動后還須要在內存中進行緩存重建(10G的緩存重建時間可能須要10分鐘),否則就須要以一個全空的緩存開始執行(這么做它的初始性能會很糟糕)。
這還大大簡化了代碼,由于對緩存和文件系統之間的一致性進行維護的全部邏輯如今都是在OS中實現的。這事OS做起來要比我們在進程中做那種一次性的緩存更加高效。準確性也更高。假設你使用磁盤的方式更傾向于線性讀取操作,那么隨著每次磁盤讀取操作,預讀就能很高效使用隨后準能用得著的數據填充緩存。
這就讓人聯想到一個很easy的設計方案:不是要在內存中保存盡可能多的數據并在須要時將這些數據刷新(flush)到文件系統。而是我們要做全然相反的事情。全部數據都要馬上寫入文件系統中持久化的日志中但不進行刷新數據的不論什么調用。實際中這么做意味著。數據被傳輸到OS內核的頁面緩存中了。OS隨后會將這些數據刷新到磁盤的。
此外我們加入了一條基于配置的刷新策略。同意用戶對把數據刷新到物理磁盤的頻率進行控制(每當接收到N條消息或者每過M秒)。從而能夠為系統硬件崩潰時“處于危急之中”的數據在量上加個上限。
這樣的以頁面緩存為中心的設計風格在一篇解說Varnish的設計思想的文章中有具體的描寫敘述(文風略帶有助于身心健康的傲氣)。
常量時長足矣
消息系統元數據的持久化數據結構往往採用BTree。 BTree是眼下最通用的數據結構,在消息系統中它能夠用來廣泛支持多種不同的事務性或非事務性語義。 它的確也帶來了一個很高的處理開銷,Btree運算的時間復雜度為O(log N)。
一般O(log N)被覺得基本上等于常量時長,但對于磁盤操作來講,情況就不同了。磁盤尋道時間一次要花10ms的時間,并且每一個磁盤同一時候僅僅能進行一個尋道操作。因而其并行程度很有限。因此,即使少量的磁盤尋道操作也會造成很大的時間開銷。由于存儲系統混合了快速緩存操作和真正的物理磁盤操作,所以樹型結構(tree
 structure)可觀察到的性能往往是超線性的(superlinear)。
更進一步講,BTrees須要一種很復雜的頁面級或行級鎖定機制才干避免在每次操作時鎖定一整顆樹。實現這樣的機制就要為行級鎖定付出很高昂的代價,否則就必須對全部的讀取操作進行串行化(serialize)。由于對磁盤尋道操作的高度依賴。就不太可能高效地從驅動器密度(drive density)的提高中獲得改善。因而就不得不使用容量較小(< 100GB)轉速較高的SAS驅動去,以維持一種比較合理的數據與尋道容量之比。
直覺上講,持久化隊列能夠依照通常的日志解決方式的樣子構建。僅僅是簡單的文件讀取和簡單地向文件里加入內容。盡管這樣的結果必定無法支持BTree實現中的豐富語義。但有個優勢之處在于其全部的操作的復雜度都是O(1),讀取操作并不須要阻止寫入操作,并且反之亦然。這樣做顯然有性能優勢,由于性能全然同數據大小之間脫離了關系
 —— 一個server如今就能利用大量的便宜、低轉速、容量超過1TB的SATA驅動器。
盡管這些驅動器尋道操作的性能非常低。但這些驅動器在大量數據讀寫的情況下性能還湊和,而僅僅需1/3的價格就能獲得3倍的容量。
可以存取到差點兒無限大的磁盤空間而無須付出性能代價意味著。我們可以提供一些消息系統中并不常見的功能。
例如。在Kafka中,消息在使用完后并沒有馬上刪除,而是會將這些消息保存相當長的一段時間(例如說一周)。
效率最大化
我們的如果是。系統里消息的量很之大。實際消息量是站點頁面瀏覽總數的數倍之多(由于每一個頁面瀏覽就是我們要處理的當中一個活動)。并且我們如果公布的每條消息都會被至少讀取一次(往往是多次),因而我們要為消息使用而不是消息的產生進行系統優化。
導致低效率的原因常見的有兩個:過多的網絡請求和大量的字節拷貝操作。
為了提高效率,API是環繞這“消息集”(message set)抽象機制進行設計的,消息集將消息進行自然分組。這么做能讓網絡請求把消息合成一個小組,分攤網絡往返(roundtrip)所帶來的開銷。而不是每次只發送一個單個消息。
MessageSet實現(implementation)本身是對字節數組或文件進行一次包裝后形成的一薄層API。
因而,里面并不存在消息處理所需的單獨的序列化(serialization)或逆序列化(deserialization)的步驟。消息中的字段(field)是按需進行逆序列化的(或者說。在不須要時就不進行逆序列化)。
由代理維護的消息日志本身只是是那些已寫入磁盤的消息集的文件夾。
按此進行抽象處理后,就能夠讓代理和消息使用者共用一個單個字節的格式(從某種程度上說,消息生產者也能夠用它。消息生產者的消息要求其校驗和(checksum)并在驗證后才會加入到日志中)
使用共通的格式后就能對最重要的操作進行優化了:持久化后日志塊(chuck)的網絡傳輸。為了將數據從頁面緩存直接傳送給socket?,F代的Unix操作系統提供了一個高度優化的代碼路徑(code path)。在Linux中這是通過sendfile這個系統調用實現的。通過Java中的API,FileChannel.transferTo,由它來簡潔的調用上述的系統調用。
為了理解sendfile所帶來的效果,重要的是要理解將數據從文件傳輸到socket的數據路徑:
操作系統將數據從磁盤中讀取到內核空間里的頁面緩存應用程序將數據從內核空間讀入到用戶空間的緩沖區應用程序將讀到的數據寫回內核空間并放入socke的緩沖區操作系統將數據從socket的緩沖區復制到NIC(網絡借口卡,即網卡)的緩沖區。自此數據才干通過網絡發送出去
這樣效率顯然非常低,由于里面涉及4次拷貝,2次系統調用。使用sendfile就能夠避免這些反復的拷貝操作,讓OS直接將數據從頁面緩存發送到網絡中,當中僅僅需最后一步中的將數據復制到NIC的緩沖區。
我們預期的一種常見的用例是一個話題擁有多個消息使用者。採用前文所述的零拷貝優化方案。數據僅僅需復制到頁面緩存中一次,然后每次發送給使用者時都對它進行反復使用就可以,而無須先保存到內存中,然后在閱讀該消息時每次都須要將其復制到內核空間中。如此一來。消息使用的速度就能接近網絡連接的極限。
要得到Java中對send'file和零拷貝的支持方面的很多其它背景知識,請參考IBM developerworks上的這篇文章。
端到端的批量壓縮
多數情況下系統的瓶頸是網絡而不是CPU。 這一點對于須要將消息在個數據中心間進行傳輸的數據管道來說,尤其如此。
當然。無需來自Kafka的支持,用戶總是能夠自行將消息壓縮后進行傳輸。但這么做的壓縮率會非常低,由于不同的消息里都有非常多反復性的內容(比方JSON里的字段名、web日志中的用戶代理或者經常使用的字符串)。
高效壓縮須要將多條消息一起進行壓縮而不是分別壓縮每條消息。
理想情況下。以端到端的方式這么做是行得通的 —— 也即,數據在消息生產者發送之前先壓縮一下,然后在server上一直保存壓縮狀態,僅僅有到終于的消息使用者那里才須要將其解壓縮。
通過執行遞歸消息集,Kafka對這樣的壓縮方式提供了支持。 一批消息能夠打包到一起進行壓縮。然后以這樣的形式發送給server。這批消息都會被發送給同一個消息使用者。并會在到達使用者那里之前一直保持為被壓縮的形式。
Kafka支持GZIP和Snappy壓縮協議。
關于壓縮的很多其它更具體的信息,請參見這里。
客戶狀態
追蹤(客戶)消費了什么是一個消息系統必須提供的一個關鍵功能之中的一個。
它并不直觀,可是記錄這個狀態是該系統的關鍵性能之中的一個。狀態追蹤要求(不斷)更新一個有持久性的實體的和一些潛在會發生的隨機訪問。
因此它更可能受到存儲系統的查詢時間的制約而不是帶寬(正如上面所描寫敘述的)。
大部分消息系統保留著關于代理者使用(消費)的消息的元數據。也就是說,當消息被交到客戶手上時,代理者自己記錄了整個過程。這是一個相當直觀的選擇,并且確實對于一個單機server來說。它(數據)能去(放在)哪里是不清晰的。又由于很多消息系統存儲使用的數據結構規模小,所以這也是個有用的選擇--由于代理者知道什么被消費了使得它能夠立馬刪除它(數據),保持數據大小只是大。
或許不顯然的是,讓代理和使用者這兩者對消息的使用情況做到一致表述絕不是一件輕而易舉的事情。
假設代理每次都是在將消息發送到網絡中后就將該消息記錄為已使用的話,一旦使用者沒能真正處理到該消息(比方說,由于它宕機或這請求超時了抑或別的什么原因)。就會出現消息丟失的情況。為了解決此問題,很多消息系新加了一個確認功能,當消息發出后僅把它標示為已發送而不是已使用。然后代理須要等到來自使用者的特定的確認信息后才將消息記錄為已使用。這樣的策略的確攻克了丟失消息的問題。但由此產生了新問題。首先。假設使用者已經處理了該消息但卻未能發送出確認信息,那么就會讓這一條消息被處理兩次。
第二個問題是關于性能的,這樣的策略中的代理必須為每條單個的消息維護多個狀態(首先為了防止反復發送就要將消息鎖定。然后,然后還要將消息標示為已使用后才干刪除該消息)。另外另一些棘手的問題須要處理。比方,對于那些以發出卻未得到確認的消息該怎樣處理?
消息傳遞語義(Message delivery semantics)
系統能夠提供的幾種可能的消息傳遞保障例如以下所看到的:
最多一次—這樣的用于處理前段文字所述的第一種情況。
消息在發出后馬上標示為已使用,因此消息不會被發出去兩次,但這在很多故障中都會導致消息丟失。
至少一次—這樣的用于處理前文所述的另外一種情況,系統保證每條消息至少會發送一次,但在有故障的情況下可能會導致反復發送。只一次—這樣的是人們實際想要的,每條消息只會并且僅會發送一次。
這個問題已得到廣泛的研究,屬于“事務提交”問題的一個變種。
提供只一次語義的算法已經有了。兩階段或者三階段提交法以及Paxos算法的一些變種就是當中的一些樣例,但它們都有與生俱來的的缺陷。
這些算法往往須要多個網絡往返(round trip),可能也無法非常好的保證其活性(liveness)(它們可能會導致無限期停機)。FLP結果給出了這些算法的一些主要的局限。
Kafka對元數據做了兩件非常不平常的事情。一件是。代理將數據流劃分為一組互相獨立的分區。
這些分區的語義由生產者定義,由生產者來指定每條消息屬于哪個分區。
一個分區內的消息以到達代理的時間為準進行排序,將來按此順序將消息發送給使用者。
這么一來,就用不著為每一天消息保存一條元數據(比方說,將消息標示為已使用)了,我們僅僅需為使用者、話題和分區的每種組合記錄一個“最高水位標記”(high
 water mark)就可以。因此,標示使用者狀態所需的元數據總量實際上特別小。在Kafka中,我們將該最高水位標記稱為“偏移量”(offset)。這么叫的原因將在實現細節部分解說。
使用者的狀態
在Kafka中。由使用者負責維護反映哪些消息已被使用的狀態信息(偏移量)。典型情況下。Kafka使用者的library會把狀態數據保存到Zookeeper之中。
然而。讓使用者將狀態信息保存到保存它們的消息處理結果的那個數據存儲(datastore)中或許會更佳。
比如。使用者或許就是要把一些統計值存儲到集中式事物OLTP數據庫中。在這樣的情況下,使用者能夠在進行那個數據庫數據更改的同一個事務中將消息使用狀態信息存儲起來。
這樣就消除了分布式的部分,從而攻克了分布式中的一致性問題!這在非事務性系統中也有類似的技巧可用。搜索系統可用將使用者狀態信息同它的索引段(index
 segment)存儲到一起。雖然這么做可能無法保證數據的持久性(durability)。但卻可用讓索引同使用者狀態信息保存同步:假設因為宕機造成有一些沒有刷新到磁盤的索引段信息丟了。我們總是可用從上次建立檢查點(checkpoint)的偏移量處繼續對索引進行處理。與此類似,Hadoop的載入作業(load job)從Kafka中并行載入,也有同樣的技巧可用。每一個Mapper在map任務結束前,將它使用的最后一個消息的偏移量存入HDFS。
這個決策還帶來一個額外的優點。使用者可用有益回退(rewind)到曾經的偏移量處。再次使用一遍曾經使用過的數據。盡管這么做違背了隊列的一般協約(contract)。但對非常多使用者來講卻是個非常主要的功能。
舉個樣例,假設使用者的代碼里有個Bug,并且是在它處理完一些消息之后才被發現的,那么當把Bug改正后,使用者還有機會又一次處理一遍那些消息。
Push和Pull
相關問題另一個,就是究竟是應該讓使用者從代理那里吧數據Pull(拉)回來還是應該讓代理把數據Push(推)給使用者。和大部分消息系統一樣,Kafka在這方面遵循了一種更加傳統的設計思路:由生產者將數據Push給代理,然后由使用者將數據代理那里Pull回來。
近來有些系統。比方scribe和flume,更著重于日志統計功能,遵循了一種很不同的基于Push的設計思路,當中每一個節點都能夠作為代理。數據一直都是向下游Push的。
上述兩種方法都各有優缺點。
然而。由于基于Push的系統中代理控制著數據的傳輸速率,因此它難以應付大量不同種類的使用者。
我們的設計目標是。讓使用者能以它最大的速率使用數據。不幸的是,在Push系統中當數據的使用速率低于產生的速率時。使用者往往會處于超載狀態(這實際上就是一種拒絕服務攻擊)。
基于Pull的系統在使用者的處理速度稍稍落后的情況下會表現更佳,并且還能夠讓使用者在有能力的時候往往前趕趕。讓使用者採用某種退避協議(backoff
 protocol)向代理表明自己處于超載狀態,能夠解決部分問題,可是,將傳輸速率調整到正好能夠全然利用(但從不能過度利用)使用者的處理能力可比初看上去難多了。曾經我們嘗試過多次,想按這樣的方式構建系統,得到的經驗教訓使得我們選擇了更加常規的Pull模型。
分發
Kafka通常情況下是執行在集群中的server上。
沒有中央的“主”節點。代理彼此之間是對等的。不須要不論什么手動配置就可以可隨時加入和刪除。
相同。生產者和消費者能夠在不論什么時候開啟。
每一個代理都能夠在Zookeeper(分布式協調系統)中注冊的一些元數據(比如,可用的主題)。生產者和消費者能夠使用Zookeeper發現主題和相互協調。關于生產者和消費者的細節將在以下描寫敘述。
生產者
生產者自己主動負載均衡
對于生產者,Kafka支持client負載均衡,也能夠使用一個專用的負載均衡器對TCP連接進行負載均衡調整。專用的第四層負載均衡器在Kafka代理之上對TCP連接進行負載均衡。在這樣的配置的情況。一個給定的生產者所發送的消息都會發送給一個單個的代理。使用第四層負載均衡器的優點是。每一個生產者僅需一個單個的TCP連接而無須同Zookeeper建立不論什么連接。
不好的地方在于全部均衡工作都是在TCP連接的層次完畢的,因而均衡效果可能并不佳(假設有些生產者產生的消息遠多于其他生產者,按每一個代理對TCP連接進行平均分配可能會導致每一個代理接收到的消息總數并不平均)。
採用client基于zookeeper的負載均衡能夠解決部分問題。假設這么做就能讓生產者動態地發現新的代理,并按請求數量進行負載均衡。
類似的,它還能讓生產者依照某些鍵值(key)對數據進行分區(partition)而不是隨機亂分,因而能夠保存同使用者的關聯關系(比如,依照用戶id對數據使用進行分區)。這樣的分法叫做“語義分區”(semantic partitioning)。下文再討論其細節。
以下解說基于zookeeper的負載均衡的工作原理。
在發生下列事件時要對zookeeper的監視器(watcher)進行注冊:
增加了新的代理有一個代理下線了注冊了新的話題代理注冊了已有話題。
生產者在其內部為每個代理維護了一個彈性的連接(同代理建立的連接)池。通過使用zookeeper監視器的回調函數(callback),該連接池在建立/保持同全部在線代理的連接時都要進行更新。當生產者要求進入某特定話題時,由分區者(partitioner)選擇一個代理分區(參加語義分區小結)。從連接池中找出可用的生產者連接,并通過它將數據發送到剛才所選的代理分區。
異步發送
對于可伸縮的消息系統而言。異步非堵塞式操作是不可或缺的。在Kafka中,生產者有個選項(producer.type=async)可用指定使用異步分發出產請求(produce request)。這樣就同意用一個內存隊列(in-memory queue)把生產請求放入緩沖區。然后再以某個時間間隔或者事先配置好的批量大小將數據批量發送出去。由于一般來說數據會從一組以不同的數據速度生產數據的異構的機器中公布出。所以對于代理而言。這樣的異步緩沖的方式有助于產生均勻一致的流量,因而會有更佳的網絡利用率和更高的吞吐量。
語義分區
以下看看一個想要為每一個成員統計一個個人空間訪客總數的程序該怎么做。應該把一個成員的全部個人空間訪問事件發送給某特定分區,因此就能夠把對一個成員的全部更新都放在同一個使用者線程中的同一個事件流中。生產者具有從語義上將消息映射到有效的Kafka節點和分區之上的能力。
這樣就能夠用一個語義分區函數將消息流依照消息中的某個鍵值進行分區。并將不同分區發送給各自對應的代理。
通過實現kafak.producer.Partitioner接口。能夠對分區函數進行定制。在缺省情況下使用的是隨即分區函數。上例中,那個鍵值應該是member_id。分區函數能夠是hash(member_id)%num_partitions。
對Hadoop以及其他批量數據裝載的支持
具有伸縮性的持久化方案使得Kafka可支持批量數據裝載,可以周期性將快照數據加載進行批量處理的離線系統。
我們利用這個功能將數據加載我們的數據倉庫(data warehouse)和Hadoop集群。
批量處理始于數據加載階段。然后進入非循環圖(acyclic graph)處理過程以及輸出階段(支持情況在這里)。
支持這樣的處理模型的一個重要特性是。要有又一次裝載從某個時間點開始的數據的能力(以防處理中有不論什么發生錯誤)。
對于Hadoop,我們通過在單個的map任務之上切割裝載任務對數據的裝載進行了并行化處理,切割時,全部節點/話題/分區的每種組合都要分出一個來。Hadoop提供了任務管理,失敗的任務能夠重頭再來,不存在數據被反復的危急。
實施細則
以下給出了一些在上一節所描寫敘述的低層相關的實現系統的某些部分的細節的簡要說明。
API 設計
生產者 APIs
生產者 API 是給兩個底層生產者的再封裝 -kafka.producer.SyncProducerandkafka.producer.async.AsyncProducer.
class Producer {
	
  /* Sends the data, partitioned by key to the topic using either the */
  /* synchronous or the asynchronous producer */
  public void send(kafka.javaapi.producer.ProducerData producerData);
  /* Sends a list of data, partitioned by key to the topic using either */
  /* the synchronous or the asynchronous producer */
  public void send(java.util.List< kafka.javaapi.producer.ProducerData> producerData);
  /* Closes the producer and cleans up */	
  public void close();
} 
該API的目的是將生產者的全部功能通過一個單個的API公開給其使用者(client)。
新建的生產者能夠:
對多個生產者請求進行排隊/緩沖并異步發送批量數據 —— kafka.producer.Producer提供了在將多個生產請求序列化并發送給適當的Kafka代理分區之前,對這些生產請求進行批量處理的能力(producer.type=async)。批量的大小能夠通過一些配置參數進行控制。
當事件進入隊列時會先放入隊列進行緩沖。直到時間到了queue.time或者批量大小到達batch.size為止,后臺線程(kafka.producer.async.ProducerSendThread)會將這批數據從隊列中取出,交給kafka.producer.EventHandler進行序列化并發送給適當的kafka代理分區。通過event.handler這個配置參數。能夠在系統中插入一個自己定義的事件處理器。
在該生產者隊列管道中的各個不同階段,為了插入自己定義的日志/跟蹤代碼或者自己定義的監視邏輯,如能注入回調函數會很實用。通過實現kafka.producer.asyn.CallbackHandler接口并將配置參數callback.handler設置為實現類就能夠實現注入。使用用戶指定的Encoder處理數據的序列化(serialization)
interfaceEncoder<T> {
  publicMessage toMessage(T data);
}
Encoder的缺省值是一個什么活都不干的kafka.serializer.DefaultEncoder。提供基于zookeeper的代理自己主動發現功能 —— 通過使用zk.connect配置參數指定zookeeper的連接url,就行使用基于zookeeper的代理發現和負載均衡功能。
在有些應用場合,可能不太適合于依賴zookeeper。
在這樣的情況下。生產者可以從broker.list這個配置參數中獲得一個代理的靜態列表,每一個生產請求會被隨即的分配給各代理分區。假設對應的代理宕機。那么生產請求就會失敗。通過使用一個可選性的、由用戶指定的Partitioner,提供由軟件實現的負載均衡功能 —— 數據發送路徑選擇決策受kafka.producer.Partitioner的影響。
interfacePartitioner<T> {
   intpartition(T key, intnumPartitions);
}
分區API依據相關的鍵值以及系統中具有的代理分區的數量返回一個分區id。將該id用作索引,在broker_id和partition組成的經過排序的列表中為對應的生產者請求找出一個代理分區。缺省的分區策略是hash(key)%numPartitions。
假設key為null,那就進行隨機選擇。使用partitioner.class這個配置參數也能夠插入自己定義的分區策略。
使用者API
我們有兩個層次的使用者API。
底層比較簡單的API維護了一個同單個代理建立的連接,全然同發送給server的網絡請求相吻合。該API全然是無狀態的,每一個請求都帶有一個偏移量作為參數,從而同意用戶以自己選擇的隨意方式維護該元數據。
高層API對使用者隱藏了代理的詳細細節,讓使用者可執行于集群中的機器之上而無需關心底層的拓撲結構。它還維護著數據使用的狀態。高層API還提供了訂閱同一個過濾表達式(比如,白名單或黑名單的正則表達式)相匹配的多個話題的能力。
底層API
class SimpleConsumer {
	
  /* Send fetch request to a broker and get back a set of messages. */ 
  public ByteBufferMessageSet fetch(FetchRequest request);
  /* Send a list of fetch requests to a broker and get back a response set. */ 
  public MultiFetchResponse multifetch(List<FetchRequest> fetches);
  /**
   * Get a list of valid offsets (up to maxSize) before the given time.
   * The result is a list of offsets, in descending order.
   * @param time: time in millisecs,
   *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
   */
  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}
底層API不但用于實現高層API,并且還直接用于我們的離線使用者(比方Hadoop這個使用者),這些使用者還對狀態的維護有比較特定的需求。
高層API
/* create a connection to the cluster */ 
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
	
  /**
   * This method is used to get a list of KafkaStreams, which are iterators over
   * MessageAndMetadata objects from which you can obtain messages and their
   * associated metadata (currently only topic).
   *  Input: a map of <topic, #streams>
   *  Output: a map of <topic, list of message streams>
   */
  public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); 
  /**
   * You can also obtain a list of KafkaStreams, that iterate over messages
   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
   * whitelist or a blacklist which is a standard Java regex.)
   */
  public List<KafkaStream> createMessageStreamsByFilter(
      TopicFilter topicFilter, int numStreams);
  /* Commit the offsets of all messages consumed so far. */
  public commitOffsets()
  
  /* Shut down the connector */
  public shutdown()
}
該API的中心是一個由KafkaStream這個類實現的迭代器(iterator)。每一個KafkaStream都代表著一個從一個或多個分區到一個或多個server的消息流。每一個流都是使用單個線程進行處理的。所以,該API的使用者在該API的創建調用中能夠提供所需的隨意個數的流。這樣,一個流可能會代表多個server分區的合并(同處理線程的數目同樣)。但每一個分區僅僅會把數據發送給一個流中。
createMessageStreams方法為使用者注冊到對應的話題之上,這將導致須要對使用者/代理的分配情況進行又一次平衡。為了將又一次平衡操作降低到最小。
該API鼓舞在一次調用中就創建多個話題流。createMessageStreamsByFilter方法為發現同其過濾條件想匹配的話題(額外地)注冊了多個監視器(watchers)。
應該注意。createMessageStreamsByFilter方法所返回的每一個流都可能會對多個話題進行迭代(比方,在滿足過濾條件的話題有多個的情況下)。
網絡層
網絡層就是一個特別直截了當的NIO服務器。在此就不進行過于仔細的討論了。sendfile是通過給MessageSet接口加入了一個writeTo方法實現的。
這樣就能夠讓基于文件的消息更加高效地利用transferTo實現,而不是使用線程內緩沖區讀寫方式。
線程模型用的是一個單個的接收器(acceptor)線程和每一個能夠處理固定數量網絡連接的N個處理器線程。這樣的設計方案在別處已經經過了很徹底的檢驗,發現事實上現起來簡單、執行起來很快。當中使用的協議一直都很easy,將來還能夠用其他語言實現其client。
消息
消息由一個固定大小的消息頭和一個變長不透明字節數字的有效載荷構成(opaque byte array payload)。消息頭包括格式的版本號信息和一個用于探測出壞數據和不完整數據的CRC32校驗。讓有效載荷保持不透明是個很正確的決策:在用于序列化的代碼庫方面如今正在取得很大的進展,不論什么特定的選擇都不可能適用于全部的使用情況。
都不用說。在Kafka的某特定應用中很有可能在它的使用中須要採用某種特殊的序列化類型。MessageSet接口就是一個使用特殊的方法對NIOChannel進行大宗數據讀寫(bulk
 reading and writing to an NIOChannel)的消息迭代器。
消息的格式
/** * A message. The format of an N byte message is the following: * * If magic byte is 0 * * 1. 1 byte "magic" identifier to allow format changes * * 2. 4 byte CRC32 of the payload * * 3. N - 5 byte payload * * If magic byte is 1 * * 1. 1 byte "magic" identifier to allow format changes * * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) * * 3. 4 byte CRC32 of the payload * * 4. N - 6 byte payload * */
日志
具有兩個分區的、名稱為"my_topic"的話題的日志由兩個文件夾組成(即:my_topic_0和my_topic_1),文件夾中存儲的是內容為該話題的消息的數據文件。日志的文件格式是一系列的“日志項”;每條日志項包括一個表示消息長度的4字節整數N。其后接著保存的是N字節的消息。
每條消息用一個64位的整數偏移量進行唯一性標示。該偏移量表示了該消息在那個分區中的那個話題下發送的全部消息組成的消息流中所處的字節位置。每條消息在磁盤上的格式例如以下文所看到的。
每一個日志文件的以它所包括的第一條消息的偏移量來命名。因此,第一個創建出來的文件的名字將為00000000000.kafka,隨后每一個后加的文件的名字將是前一個文件的文件名稱大約再加S個字節所得的整數,當中,S是配置文件里指定的最大日志文件的大小。
消息的確切的二進制格式都有版本號。它保持為一個標準的接口,讓消息集能夠依據須要在生產者、代理、和使用者直接進行自由傳輸而無須又一次拷貝或轉換。其格式例如以下所看到的:
On-disk format of a message message length : 4 bytes (value: 1+4+n) "magic" value : 1 byte crc : 4 bytes payload : n bytes
將消息的偏移量作為消息的可不常見。
我們原先的想法是使用由生產者產生的GUID作為消息id,然后在每一個代理上作一個從GUID到偏移量的映射??墒?,既然使用者必須為每一個server維護一個ID,那么GUID所具有的全局唯一性就失去了價值。更有甚者,維護將從一個隨機數到偏移量的映射關系帶來的復雜性,使得我們必須使用一種重量級的索引結構,并且這樣的結構還必須與磁盤保持同步,這樣我們還就必須使用一種全然持久化的、需隨機訪問的數據結構。如此一來,為了簡化查詢結構,我們就決定使用一個簡單的依分區的原子計數器(atomic counter),這個計數器能夠同分區id以及節點id結合起來唯一的指定一條消息;這樣的方法使得查詢結構簡化不少,雖然每次在處理使用者請求時仍有可能會涉及多次磁盤尋道操作。然而,一旦我們決定使用計數器。跳向直接使用偏移量作為id就很自然了,畢竟兩者都是分區內具有唯一性的、單調添加的整數。既然偏移量是在使用者API中并不會體現出來,所以這個決策終于還是屬于一個實現細節,進而我們就選擇了這樣的更加高效的方式。
寫操作
日志能夠順序加入,加入的內容總是保存到最后一個文件。當大小超過配置中指定的大?。ū确秸f1G)后,該文件就會換成另外一個新文件。有關日志的配置參數有兩個,一個是M,用于指出寫入多少條消息之后就要強制OS將文件刷新到磁盤。還有一個是S,用來指定過多少秒就要強制進行一次刷新。這樣就能夠保證一旦發生系統崩潰,最多會有M條消息丟失,或者最長會有S秒的數據丟失.
讀操作
能夠通過給出消息的64位邏輯偏移量和S字節的數據塊最大的字節數對日志文件進行讀取。
讀取操作返回的是這S個字節中包括的消息的迭代器。S應該要比最長的單條消息的字節數大,但在出現特別長的消息情況下,能夠反復進行多次讀取,每次的緩沖區大小都加倍,直到能成功讀取出這樣長的一條消息。也能夠指定一個最大的消息和緩沖區大小并讓server拒絕接收比這個大小大一些的消息,這樣也能給client一個能夠讀取一條完整消息所需緩沖區的大小的上限。非常有可能會出現讀取緩沖區以一個不完整的消息結尾的情況,這個情況用大小界定(size
 delimiting)非常easy就能探知。
從某偏移量開始進行日志讀取的實際過程須要先找出存儲所需數據的日志段文件,從全局偏移量計算出文件內偏移量,然后再從該文件偏移量處開始讀取。搜索過程通過對每一個文件保存在內存中的范圍值進行一種變化后的二分查找完畢。
日志提供了獲取最新寫入的消息的功能,從而同意從“當下”開始消息訂閱。
這個功能在使用者在SLA規定的天數內沒能正常使用數據的情況下也非常實用。當使用者企圖從一個并不存在的偏移量開始使用數據時就會出現這樣的情況,此時使用者會得到一個OutOfRangeException異常,它能夠依據詳細的使用情況對自己進行重新啟動或者只失敗而退出。
下面是發送給數據使用者(consumer)的結果的格式。
MessageSetSend (fetch result) total length : 4 bytes error code : 2 bytes message 1 : x bytes ... message n : x bytes
MultiMessageSetSend (multiFetch result) total length : 4 bytes error code : 2 bytes messageSetSend 1 ... messageSetSend n
刪除
一次僅僅能刪除一個日志段的數據。 日志管理器同意通過可載入的刪除策略設定刪除的文件。 當前策略刪除改動事件超過N天以上的文件,也能夠選擇保留最后NGB 的數據。 為了避免刪除時的讀取鎖定沖突。我們能夠使用副本寫入模式,以便在進行刪除的同一時候對日志段的一個不變的靜態快照進行二進制搜索。
數據正確性保證
日志功能里有一個配置參數M,可對在強制進行磁盤刷新之前可寫入的消息的最大條目數進行控制。在系統啟動時會執行一個日志恢復過程,對最新的日志段內全部消息進行迭代。以對每條消息項的有效性進行驗證。
一條消息項是合法的,僅當其大小加偏移量小于文件的大小而且該消息中有效載荷的CRC32值同該消息中存儲的CRC值相等。在探測出有數據損壞的情況下,就要將文件依照最后一個有效的偏移量進行截斷。
要注意。這里有兩種必需處理的數據損壞情況:因為系統崩潰造成的未被正常寫入的數據塊(block)因而須要截斷的情況以及因為文件里被增加了毫無意義的數據塊而造成的數據損壞情況。造成數據損壞的原因是,一般來說OS并不能保證文件索引節點(inode)和實際數據塊這兩者的寫入順序,因此。除了可能會丟失未刷新的已寫入數據之外,在索引節點已經用新的文件大小更新了但在將數據塊寫入磁盤塊之前發生了系統崩潰的情況下。文件就可能會獲得一些毫無意義的數據。CRC值就是用于這樣的極端情況,避免由此造成整個日志文件的損壞(雖然未得到保存的消息當然是真的找不回來了)。
分發
Zookeeper文件夾
接下來討論zookeeper用于在使用者和代理直接進行協調的結構和算法。
記法
當一個路徑中的元素是用[xyz]這樣的形式表示的時,其意思是, xyz的值并不固定并且實際上xyz的每種可能的值都有一個zookpeer z節點(znode)。比如。/topics/[topic]表示了一個名為/topics的文件夾。當中包括的子文件夾同話題相應。一個話題一個文件夾并且文件夾名即為話題的名稱。也能夠給出數字范圍。比如[0...5],表示的是子文件夾0、1、2、3、4。箭頭->用于給出z節點的內容。比如/hello -> world表示的是一個名稱為/hello的z節點,包括的值為"world"。
代理節點的注冊
/brokers/ids/[0...N] --> host:port (ephemeral node)
上面是全部出現的代理節點的列表,列表中每一項都提供了一個具有唯一性的邏輯代理id。用于讓使用者可以識別代理的身份(這個必須在配置中給出)。在啟動時,代理節點就要用/brokers/ids下列出的邏輯代理id創建一個z節點,并在自己注冊到系統中。
使用邏輯代理id的目的是,可以讓我們在不影響數據使用者的情況下就能把一個代理搬到還有一臺不同的物理機器上。試圖用已在使用中的代理id(比方說,兩個server配置成了同一個代理id)進行注冊會導致錯誤發生。
由于代理是以非長久性z節點的方式注冊的,所以這個注冊過程是動態的,當代理關閉或宕機后注冊信息就會消失(至此要數據使用者。該代理不再有效)。
代理話題的注冊
/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)
每一個代理會都要注冊在某話題之下。注冊后它會維護并保存該話題的分區總數。
使用者和使用者小組
為了對數據的使用進行負載均衡并記錄使用者使用的每一個代理上的每一個分區上的偏移量,全部話題的使用者都要在Zookeeper中進行注冊。
多個使用者能夠組成一個小組共同使用一個單個的話題。同一小組內的每一個使用者共享同一個給定的group_id。比方說,假設某個使用者負責用三臺機器進行某某處理過程。你就能夠為這組使用者分配一個叫做“某某”的id。
這個小組id是在使用者的配置文件里指定的,而且這就是你告訴使用者它究竟屬于哪個組的方法。
小組內的使用者要盡量公正地劃分出分區,每一個分區僅為小組內的一個使用者所使用。
使用者ID的注冊
除了小組內的全部使用者都要共享一個group_id之外。每一個使用者為了要同其他使用者差別開來,還要有一個非永久性的、具有唯一性的consumer_id(採用hostname:uuid的形式)。consumer_id要在下面的文件夾中進行注冊。
/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)
小組內的每一個使用者都要在它所屬的小組中進行注冊并採用consumer_id創建一個z節點。
z節點的值包括了一個<topic, #streams>的map。consumer_id僅僅是用來識別小組內活躍的每一個使用者。使用者建立的z節點是個暫時性的節點,因此假設這個使用者進程終止了,注冊信息也將隨之消失。
數據使用者偏移追蹤
數據使用者跟蹤他們在每一個分區中耗用的最大偏移量。
這個值被存儲在一個Zookeeper(分布式協調系統)文件夾中。
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)
分區擁有者注冊表
每一個代理分區都被分配給了指定使用者小組中的單個數據使用者。數據使用者必須在耗用給定分區前確立對其的全部權。要確立其全部權,數據使用者須要將其 id 寫入到特定代理分區中的一個暫時節點(ephemeral node)中。
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)
代理節點的注冊
代理節點之間基本上都是相互獨立的,因此它們僅僅須要公布它們擁有的信息。當有新的代理增加進來時,它會將自己注冊到代理節點注冊文件夾中,寫下它的主機名和port。代理還要將已有話題的列表和它們的邏輯分區注冊到代理話題注冊表中。在代理上生成新話題時,須要動態的對話題進行注冊。
使用者注冊算法
當使用者啟動時。它要做下面這些事情:
將自己注冊到它屬小組下的使用者id注冊表。注冊一個監視使用者id列的表變化情況(有新的使用者增加或者不論什么現有使用者的離開)的變化監視器。(每一個變化都會觸發一次對發生變化的使用者所屬的小組內的全部使用者進行負載均衡。)主次一個監視代理id注冊表的變化情況(有新的代理增加或者不論什么現有的代理的離開)的變化監視器。(每一個變化都會觸發一次對全部小組內的全部使用者負載均衡。)假設使用者使用某話題過濾器創建了一個消息流,它還要注冊一個監視代理話題變化情況(增加了新話題)的變化監視器。(每一個變化都會觸發一次對全部可用話題的評估,以找出話題過濾器過濾出哪些話題。新過濾出來的話題將觸發一次對該使用者所在的小組內全部的使用者負載均衡。)迫使自己在小組內進行又一次負載均衡。
使用者又一次負載均衡的算法
使用者又一次復雜均衡的算法可用讓小組內的全部使用者對哪個使用者使用哪些分區達成一致意見。使用者又一次負載均衡的動作每次加入或移除代理以及同一小組內的使用者時被觸發。對于一個給定的話題和一個給定的使用者小組,代理分區是在小組內的全部使用者中進行平均劃分的。一個分區總是由一個單個的使用者使用。
這樣的設計方案簡化了實施過程。如果我們執行多個使用者以并發的方式同一時候使用同一個分區,那么在該分區上就會形成爭用(contention)的情況。這樣一來就須要某種形式的鎖定機制。
如果使用者的個數比分區多,就會出現有寫使用者根本得不到數據的情況。在又一次進行負載均衡的過程中,我們依照盡量降低每一個使用者須要連接的代理的個數的方式,嘗嘗試著將分區分配給使用者。
每一個使用者在又一次進行負載均衡時須要做下列的事情:
   1.   針對Ci所訂閱的每一個話題T
   2.   將PT設為生產話題T的全部分區
   3.   將CG設為小組內同Ci 一樣使用話題T的全部使用者
   4.   對PT進行排序(讓同一個代理上的各分區挨在一起)
   5.   對CG進行排序 
   6.   將i設為Ci在CG中的索引值并讓N = size(PT)/size(CG)
   7.   將從i*N到(i+1)*N - 1的分區分配給使用者Ci
   8.   將Ci當前所擁有的分區從分區擁有者注冊表中刪除
   9.   將新分配的分區增加到分區擁有者注冊表中
        (我們可能須要多次嘗試才干讓原先的分區擁有者釋放其擁有權)
       
在觸發了一個使用者要又一次進行負載均衡時,同一小組內的其他使用者也會差點兒在同一時候被觸發又一次進行負載均衡。
參考文獻:
英文原文:Kafka Architecture Design
網址:http://kafka.apache.org/documentation.html#design
參與翻譯(4人):
fbm,CodingKu,Khiyuan,nesteaa
各自的Blog例如以下:
fbm: http://my.oschina.net/fbm
CodingKu: http://my.oschina.net/muchuanwazi
Khiyuan: http://my.oschina.net/Khiyuan
nesteaa: http://my.oschina.net/u/947878
其它apache
 kafka技術分享系列(文件夾索引)
文件夾索引:
1)apache
 kafka消息服務
2)kafka在zookeeper中存儲結構
3)kafka
 log4j配置
4)kafka replication設計機制
5)apache kafka監控系列-監控指標
6)kafka.common.ConsumerRebalanceFailedException異常解決的方法
7)kafak安裝與使用
8)apache kafka中server.properties配置文件參數說明
9)apache kafka的consumer初始化時獲取不到消息
10)Kafka Producer處理邏輯
11)apache kafka源碼project環境搭建(IDEA)
12)apache kafka監控系列-KafkaOffsetMonitor
13)Kafka Controller設計機制
14)Kafka性能測試報告(虛擬機版)
15)apache kafka監控系列-kafka-web-console
16)apache kafka遷移與擴容工具使用方法
17)kafka LeaderNotAvailableException
18)apache kafka jmx監控指標參數
19)apache kafka性能測試命令使用和構建kafka-perf
20)apache kafka源代碼構建打包
21)Apache kafkaclient開發-java
22)kafka broker內部架構
23)apache kafka源代碼分析走讀-kafka總體結構分析
24)apache kafka源代碼分析走讀-Producer分析
25)apache kafka性能優化架構分析
26)apache kafka源代碼分析走讀-server端網絡架構分析
27)apache kafka源代碼分析走讀-ZookeeperConsumerConnector分析
28)kafka的ZkUtils類的java版本號部分代碼
29)kafka & mafka client開發與實踐
30) kafka文件系統設計那些事
31)kafka的ZookeeperConsumer實現
參考資料:http://blog.csdn.net/lizhitao/article/details/39499283
總結
以上是生活随笔為你收集整理的分布式公布订阅消息系统 Kafka 架构设计的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: Angular开发模式下的setNgRe
 - 下一篇: 医保怎么网上缴费