如何设计Kafka?
商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
作者:Sugar Su
鏈接:http://zhuanlan.zhihu.com/ms15213/20545422
來源:知乎
此文稿來源于太閣實驗室每周三晚上的weekly tech show,活動注冊鏈接:https://attendee.gotowebinar.com/register/1150708559813755649
下期活動為:Spark內核架構設計與實現原理
Kafka是一個分布式的消息系統,以可水平擴展和高吞吐量和準實時性而被廣泛使用。我們從三個方面來引出和介紹Kafka。
1. How to design Google Analytics?
2. What is Kafka?
3. How to scale and survive?
1. How to design Google Analytics?
1) what is Google Analytics?
Google Analytics是統計網站數據的服務。如下圖:
from: Top 20 Google Analytics Blogs in 2015
2) can we understand it with an easier case?
網站的統計服務較為復雜,我們先看一下如何統計一個短鏈接的訪問數據?
如用戶地區分布、用戶瀏覽器類型等等,如下圖。
3) how to calculate the distribution of countries in the last hour?
統計的基礎是log。web server的log是訪問短鏈接的用戶的信息。
4) where is data?
一個典型的Kafka集群包含若干Producer,若干broker,若干Consumer Group,以及一個Zookeeper集群。其中producer負責收集log信息(如web服務器的PageView,服務器CPU狀態日志等),并且發送給broker,broker將log發送到相關的consumers。
這是經典的生產者消費者模型
5) how to calculate the last minute/hour/day/month/year?
Producer產生每個log,傳給broker,broker可以分給多個consumer,它們分別計算過去每分鐘,小時,天的數據。
在設計一個Consumer的處理邏輯的時候,可以使用buckets的方案。比如要存每分鐘的數據,可以使用兩個buckets A和B。一個分鐘使用bucker A,一分鐘使用bucket B。將這個方案擴展,如果我們需要的是過去兩小時內每分鐘的數據,那么針對這個consumer,我們需要121個bucket(120mins),我們可以使用環形鏈表或者queue的數據結構來存儲。我們每分鐘輪換一個bucket來存儲信息。。
6) how to do it in real time?
假設每秒有一百萬的日志需要處理,就輪到Kafka登場了
2. What is Kafka?
1) what is Kafka?
Kafka是一種分布式的,基于發布/訂閱的消息系統。它的大體框架由producer,broker和consumer構成,producer傳message給broker,broker傳message給consumer。
2) push or pull?
在producer和broker之間,可以選擇push或者pull兩種模式。如果broker每次都從producer pull數據的話,往往需要在producer的本地維持一個較大的log緩存,保存broker pull之前需要存儲的數據,這會增加很多復雜度。如果是從producer push的話,broker的設計會相對簡單。因此Kafka使用了push模式。
push也會引入阻塞的問題。如果producer用的流量太多,會阻礙其他producer的數據傳輸。因此在最新版的Kafka中引入了限流。
Kafka的設計包括兩部分:
監控:使用30個buckets,每個bucket記錄1秒的流量
限制。當broker監控到一個producer超過限值的時候,延遲response消息的發送時間來減緩producer的流量
由于Kafka的設計基于良性環境(假設所有P都是好人),所以不會出現producer故意忽略response的事情。
我們這里留下一個思考題?在broker和consumer之間,是push還是pull呢?
3) is message the same as log?
log更強調行為記錄,message強調消息傳輸,在使用的時候這兩個詞常常互換。
4) what is the format of messages?
message一定要有ID嗎?不一定。message核心是binary content,broker不需要知道它是什么。
message的屬性主要有length,CRC,state數據(記錄是否為特殊message,比如0指無特殊意義的message,1,2,3...對應不同的特殊message)。
CRC是校驗和,用來驗證數據是否出現錯誤。,它性能好、校驗率高,廣泛應用于各類文件系統中,如Google File System。如果CRC自己傳錯了怎么辦?CRC錯了就認為整個包是錯的,不需要區分具體是誰錯的,不用overthinking。
5) how to identify the message to be read now?
broker里存了很多message,consumer用offset就可以知道讀到哪個消息了。
Kafka最早的做法是將所有consumer的offset存在zookeeper里。此時zookeeper類似于一個分布式的database。每個用戶讀之前,可以從database里找到對應的讀取位置。這樣即使consumer失敗了,offset數據也不會丟失
當consumer很多的時候,zookeeper會成為性能瓶頸。因此最新版的Kafka中會專門將offset存儲于一個特定的compacted topic里面。這個topic有50個partition,每個partition的Leader Broker會做為一些consumer group的coordinator。因此這些coordinator會取代zookeeper來協調consumer group。
當然,我們也可以將offset存儲于consumer處,進行進一步的定制。
6) do we need index for log?
不需要。因為message是自然排序。如果它們在內存里,只需要二叉查找。Kafka不關心message發送時的順序,而是使用接收到的時間來排序。但是message里面可以有timestamp等信息,如LogAppendTime。
3. How to scale and survive
1) how to deal with the failure of a consumer?
當consumer失敗時,可能會造成broker overflow。因為短時間內message依舊源源不斷從producer傳給broker。
在Kafka里log是持久化的,也就是把message寫入disk中。因此能夠應對大多數message overflow的問題。
2) how to save message into disk?
因為message是序列寫在disk,用時相對較短。雖然memory速度至少是disk的十倍以上,但是因為Kafka可以通過一些優化讓硬盤達到和內存相匹敵的速度。例如使用磁盤陣列,同時有5個列,每個陣列都是4塊硬盤。把一個數據拆成不部分,比如C1, C2, C3和Cp。不同部分可以同時寫入陣列中,也就是讓幾十塊硬盤同時運轉,從而大大縮短了速度。
此外,Kafka也復用了OS的Page Cache,因此很多寫也是首先在內存中完成的。
在消息存儲上,Kafka會構建虛擬地址,每個消息記錄它的offset,并且將offset做為該消息的ID,從而通過ID能夠快速找到硬盤的相應位置。
進一步來說,對于一個topic的整個消息,Kafka會存放在硬盤上的多個文件中,每個文件的文件名就是存于該文件的第一個message的offset。Kafka同時會在內存中構建所有文件的offset的列表,因此能夠基于一個message的offset,在內存中快速定位到相應的文件,并且進一步得知在該文件內的偏移量。
為什么disk上不能存成一個文件?因為在disk上無法開出一片足夠大的連續區域。即使開出足夠大的連續區域,也會造成空間的浪費。
3) how to transfer message from disk into consumer?
傳統的方法是將數據首先拷貝到kernel的空間,然后拷貝到用戶空間,之后再拷貝回kernel空間,最后再送到網卡。在這個過程中需要四步拷貝,浪費了大量的時間和CPU資源。
因此Kafka往往借用OS提供的Zero Copy來一步將數據從硬盤拷貝到網卡。
4) how to deal with a huge amount of messages?
如果producer個數很多,可以使用多個broker。producer可以向多個broker里寫數據。
如何決定某個producer寫向哪個broker呢?最簡單的方法就是round robin。還有種方法是利用key的hash來寫入對應的broker。
一個topic也可以分配為多個不同的partition,在partition內部的消息是有序的,但是在partition之間的消息是無序的。當然我們也可以通過在message中加入timestamp的方式來維持消息的序列。
在Kafka中,topic指的是消息的一個集合。我們通過在發送消息和接收消息的時候指定topic來讓每個consumer只是接收和處理屬于自己的消息子集。
(參考:Kafka剖析(一):Kafka背景及架構介紹)
5) should we sync among partitions?
為了防止broker崩潰時引起的消息丟失,我們引入了primary(P)和slave(S)的概念。。P收到的每一條message都會同步給所有的S。因此在Kafka中,Primary被稱為Leader,Slave被稱為Follower。
為了提高性能,我們可能會動態添加新的broker。這是還需要手動的啟動Partition Reassignment來遷移數據。
6) how to select the new leader after a failures?
如果之前的Leader失敗,在剩下的followers中,誰的數據最全,就選這個broker做為新的P。
7) how to log the logs?
如果想統計在Kafka中,過去一分鐘內收到了多少message。可以啟動一個consumer接受所有的消息,并且每分鐘將統計結果重新放回到Kafka的broker中。另外再啟動一個consumer收集這些統計信息,從而給出報表。
8) how to compact the log?
對于key, value型log,可以刪除歷史信息,對log去冗余。比如log信息為K1=3, K1=7, K2=9, K1=5,前兩個log都可以刪掉,因為K1的終值是5。
時間關系無法將所有技術細節都涵蓋,這里提供一些進一步學習的參考資料:
Kafka
https://www.youtube.com/watch?v=7dkSze52i-o
Apache Kafka
轉載于:https://www.cnblogs.com/tianhangzhang/p/5185936.html
總結
以上是生活随笔為你收集整理的如何设计Kafka?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2016第一季度目标
- 下一篇: 在Linux 5/6上使用UDEV SC