Kafka Design
Github:https://kafka.apache.org/0100/documentation.html
中文版的設計文檔,?http://www.oschina.net/translate/kafka-design
?
Kafka: a Distributed Messaging System for Log Processing
1. Introduction
We have built a novel messaging system for log processing called Kafka [18] that?combines?the benefits of traditional?log aggregators?and?messaging systems.
On the one hand, Kafka is distributed and scalable, and offers high throughput.
On the other hand, Kafka provides an API similar to a messaging system and allows applications to consume log events in real time.
可以理解成一個分布式的product-consumer架構.
2. Related Work
既然先前有那么多的log aggreagtor和messaging system系統, 為什么還需要kafka?
和傳統messaging system 對比
1. MQ或JMS都有很強的delivery guarantees功能, 這個對于log aggregator不需要, 某些log丟就丟了無所謂,而這些功能大大增加系統復雜性.?
2. 因為以前沒有bigdata, 所以沒有focus在throughput上, 比如不支持批發送?
3. 缺乏對distributed的support?
4. 不支持實時分析, consume的速度必須非常快, 否則隊列過長會有效率問題
Traditional messaging system?tend not to be a good fit for log processing.
First, there is a mismatch in features offered by enterprise systems.
For example, IBM Websphere MQ [7] has transactional supports that allow an application to insert messages into multiple queues atomically. The JMS [14] specification allows each individual message to be acknowledged after consumption, potentially out of order.
Second, many systems do not focus as strongly on?throughput?as their primary design constraint.
Third, those systems are?weak?in distributed support.
Finally, many messaging systems assume near immediate consumption of messages, so the queue of unconsumed messages is always fairly small.
?
和現有的log aggregator對比, Pull特性
A number of?specialized log aggregators?have been built over the last few years.
Facebook?uses a system called?Scribe. Each frontend machine can send log data to a set of Scribe machines over sockets. Each Scribe machine aggregates the log entries and periodically dumps them to HDFS [9] or an NFS device.
Yahoo’s data highway?project has a similar dataflow. A set of machines aggregate events from the clients and roll out “minute” files, which are then added to HDFS.
Flume?is a relatively new log aggregator developed by?Cloudera. It supports extensible “pipes” and “sinks”, and makes streaming log data very flexible. It also has more integrated distributed support. However, most of those systems are built for consuming the log data offline, and often expose implementation details unnecessarily (e.g. “minute files”) to the consumer.
most of them use a “push” model in which the broker forwards data to consumers. At LinkedIn, we find the “pull” model more suitable for our applications since each consumer can retrieve the messages at the maximum rate it can sustain and?avoid being flooded?by messages pushed faster than it can handle.
為什么要使用pull來代替push, consumer的飽饑只有consumer知道, 所以broker強制推送沒有consumer自己來拿合理.
那以前的系統就想不到這點, 不是的, 這個不難想到, 問題是以前的系統都是基于offline consumer, consumer都是直接將數據存儲到HDFS中, 不會在線分析, 所以通常情況下consumer不會存在被flooded的危險. 在這樣的前提下, push更為簡單些.
?
3. Kafka Architecture and Design Principles
We first introduce the basic concepts in Kafka.
A stream of messages of a particular type is defined by a?topic.
A?producer?can publish messages to a topic.
The published messages are then stored at a set of servers called?brokers.
A?consumer?can subscribe to one or more topics from the brokers, and consume the subscribed messages by pulling data from the brokers.
To balance load, a topic is divided into multiple?partitions?and each broker stores one or more of those partitions.
通過topic劃分partition的策略, 來保證load balance?
這種分區相對比較合理, topic的熱度不一樣, 所以如果把不同的topic放到不同的broker上的話, 容易導致負載失衡.?
默認是使用random partition, 可以定制更為合理的partition策略
?
3.1 Efficiency on a Single Partition
Simple storage, 簡單存儲
Kafka具有非常簡單的存儲結構
1. 存儲的單元是partition, 而每個partition其實就是一組segment files, 之所以用一組files, 防止單個文件過大?
所以從邏輯上, 你可以認為, 一個partition就是一個log file, 而新的message就會被append到file的末端?
象所以的文件系統一樣, 所有的message只有在flush后才可被consumer取到.
2. 使用logic offset來代替message id, 減少存儲的overhead.?
Kafka has a very simple storage layout.
1. Each partition of a topic corresponds to a logical log.
Physically, a log is implemented as a set of segment files of approximately the same size (e.g., 1GB).
Every time a producer publishes a message to a partition, the broker simply appends the message to the last segment file.
A message is only exposed to the consumers after it is flushed.
2. A message stored in Kafka doesn’t have an explicit message id. Instead, each message is addressed by its?logical offset?in the log. This avoids the overhead of maintaining auxiliary, seek-intensive random-access index structures that map the message ids to the actual message locations.
?
Efficient transfer, 高效傳輸
1. 一組messages批量發送, 提高吞吐效率
2. 使用文件系統cache, 而非memory cache
3. 使用sendfile, 繞過應用層buffer, 直接將數據從file傳到socket (前提是應用邏輯確實不關心發送的內容)
We are very careful about transferring data in and out of Kafka.
1. Producer can submit a set of messages in a single send request. Consumer also retrieves multiple messages up to a certain size, typically hundreds of kilobytes.
2. Another unconventional choice that we made is to avoid explicitly caching messages in memory at the Kafka layer. Instead, we rely on the underlying?file system page cache.
使用文件系統的page cache的優點? 參考kafka design?
首先, 直接利用page cache比較簡單高效, 不需要做特別的事, 避免特意去創建memory buffer?
再者, page cache只要磁盤不斷電, 就一直存在, broker進程重啟或crash都不會丟失?
最后, 最重要的是, 適合這個場景, kafka都是順序讀寫
This has the main benefit of avoiding double buffering---messages are only cached in the page cache.
This has the additional benefit of retaining warm cache even when a broker process is restarted.
Since both the producer and the consumer access the segment files, sequentially, with the consumer often lagging the producer by a small amount, normal operating system caching heuristics are very effective. producer和consumer
3. We optimize the network access for consumers.
On Linux and other Unix operating systems, there exists a sendfile API [5] that can directly transfer bytes from a file channel to a socket channel.
用這個省去了, (2) copy data in the page cache to an application buffer, (3) copy application buffer to another kernel buffer,
因為他本來就不想用memory buffer, 所以這樣更高效, 直接從page cache copy到kernel buffer.
?
Stateless broker, broker無狀態
Unlike most other messaging systems, in Kafka, the information about how much each consumer has consumed is not maintained by the broker, but by the consumer itself.
由于consumer采用pull決定的, broker沒有必要知道consumer讀了多少. 如果是push, 你必須知道...
帶來的問題是你不知道consumer什么時候來pull, 那么broker什么時候把message刪掉, 他用了個很簡單的方法, simple time-based SLA, 過段時間就刪, 比如7天.?
There is an important side benefit of this design. A consumer can deliberately rewind back to an old offset and re-consume data.
這個特性很方便, 比如測試, 深有體會, 一般queue讀一次就沒了, 要用相同數據反復測試非常麻煩, 對于kafka改改offset就可以, 很方便
還有就是consumer掛了, 數據沒有寫成功, 沒事, 拿上次的offset再讀還能讀到. 確實不錯...
問題是, kafka本身不提供offset的操作接口, 看上去很美好, 實際使用確不是很方便.
?
3.2 Distributed Coordination
We now describe how the producers and the consumers behave in a distributed setting.
Each producer can publish a message to either a randomly selected partition or a partition semantically determined by a partitioning key and a partitioning function. We will focus on how the consumers interact with the brokers.
對于producer, 很簡單, 要么隨機, 要么通過某種hash方法, 發到某一個partition.
對于consumer, 就比較復雜了, 一個topic有那么多partition, 為了效率肯定需要用多個consumer去consume, 那么怎么保證consumers之間的coordination.
?
Kafka has the concept of consumer groups. Each?consumer group?consists of one or more consumers that jointly consume a set of?
subscribed topics, i.e., each message is delivered to only one of the consumers within the group.
你可以把一個consumer group抽象為單一的consumer, 每條message我只需要consume一次, 之所以使用group是為了并發操作
而對于不同的group之間, 完全獨立的, 一條message可以被每個group都consume一次, 所以group之間是不需要coordination的.
問題是同一個group之間的consumer需要coordinate, 來保證只每個message只被consume一次, 而且我們的目標是盡量減少這種coordinate的overhead.
?
1. 為了簡化設計, 取消paritition本身的并發性, 只支持partition之間的并發?
Our first decision?is to make a partition within a topic the smallest unit of parallelism. This means that at any given time, all messages from one partition are consumed only by a single consumer within each consumer group.
一個partition, 只能有一個consumer, 這樣就避免了多consumer讀時的locking and state maintenance overhead
那每個partition都安排一個專屬consumer可不可以, 可以, 但太浪費...partition往往比consumer的數量多很多的
所以一個consumer需要cover多個partition, 這樣就產生一個問題, 當partition或consumer的數量發生變化的時候, 我們需要去做rebalance, 以從新分配consume關系. 只有當這個時候, 我們需要去coordinate各個consumer, 所以coordinate的overhead是比較低的.
這樣設計最大的問題在于, 單個或少量partition的低速會拖慢整個處理速度, 因為一個partition只能有一個consumer, 其他consumers就算閑著也無法幫你.?
所以你必須保證每個partition的數據產生和消費速度差不多, 否則就會有問題?
比如必須巧妙的設計partition的數目, 因為如果partition數目不能整除consumer數目, 就會導致不平均?
個人認為這不算值得借鑒的設計, 應該有更好的選擇...
?
2. 使用Zookeeper來代替center master
The second decision?that we made is to not have a central “master” node, but instead let consumers coordinate among themselves in a decentralized fashion.
Kafka uses Zookeeper for the following tasks:
(1) detecting the addition and the removal of brokers and consumers,
(2) triggering a rebalance process in each consumer when the above events happen, and
(3) maintaining the consumption relationship and keeping track of the consumed offset of each partition.
Specifically, when each broker or consumer starts up, it stores its information in a broker or consumer registry in Zookeeper.
The broker registry (ephemeral)?contains the broker’s host name and port, and the set of topics and partitions stored on it.
The consumer registry (ephemeral)?includes the consumer group to which a consumer belongs and the set of topics that it subscribes to.
The ownership registry (ephemeral)?has one path for every subscribed partition and the path value is the id of the consumer currently consuming from this partition (we use the terminology that the consumer owns this partition).
The offset registry (persistent)?stores for each subscribed partition, the offset of the last consumed message in the partition (for Each consumer group).
當broker和consumer發生變化時, 增加或減少, 對應的ephemeral registry會自動跟隨變化, 很簡單.
但同時, 還會觸發consumer的rebalance event, 根據rebalance的結果去修改或增減ownership registry.
這里面只有offset registry是persistent的, 無論你consumer怎樣變化, 只要記錄了每個group的在partition上的offset, 就可以保證group內的coordinate.
?
3. Consumer Rebalance
Algorithm 1: rebalance process for consumer Ci in group G #對于group中的某個consumer?
For each topic T that Ci subscribes to {??????????????????????? #按Topic逐個進行的, 不同topic的partition數目不同?
??? remove partitions owned by Ci from the ownership registry? #先清除own關系?
??? read the broker and the consumer registries from Zookeeper #讀取broker和consumer registries?
??? compute PT = partitions available in all brokers under topic T #取出T的partition list?
??? compute CT = all consumers in G that subscribe to topic T?? #取出T對應的consumer list?
??? sort PT and CT?????????????????????????????????????????????????????????? #對兩個list進行排序?
??? let j be the index position of Ci in CT and let N = |PT|/|CT| #找出C在consumer list的順序, j?
??? assign partitions from j*N to (j+1)*N - 1 in PT to consumer Ci
??? for each assigned partition p {?
??????? set the owner of p to Ci in the ownership registry #改ownship?
??????? let Op = the offset of partition p stored in the offset registry #讀offset?
??????? invoke a thread to pull data in partition p from offset Op #創建線程去并發的handle每個partition?
??? }?
}
?
算法關鍵就是這個公式, j*N to (j+1)*N - 1
其實很簡單, 如果有10個partition, 2個consumer, 每個consumer應該handle幾個partition?
怎么分配這5個partition, 根據 C在consumer list的順序, j?
根據這個就可以實現kafka的自動負載均衡, 總是保證每個partition都被consumer均勻分布的handle, 但某個consumer掛了, 通過rebalance就會有其他的consumer補上.
但是kafka的"make a partition within a topic the smallest unit of parallelism”策略雖然簡化的復雜度, 但是也降低了balance的粒度, 他無法handle某一個partition的數據特別多這種case, 因為一個paritition最多只能有一個consumer. 所以producer在扔的時候需要保證各個partition的均衡.
設計的關鍵,?由于對于partition會記錄該group讀取的offset, 所以任何時候可以任意切換讀取的consumer, 所以rebalance只是簡單的做了重新分配, 不用考慮其他.
但在rebalance的時候, 有時會導致數據讀重.?
原因是我們考慮到consumer的不穩定性, 當把數據處理完后再commit到broker, 這樣consumer crash也不會丟失數據?
但當consumer rebalance的時候, 就會導致其他consumer讀到相同數據...
?
Partition ownership的競爭, 由于通知時機導致
When there are multiple consumers within a group, each of them will be notified of a broker or a consumer change.?
However, the notification may come at slightly different times at the consumers.
So, it is possible that one consumer tries to take ownership of a partition still owned by another consumer. When this happens, the first consumer simply releases all the partitions that it currently owns, waits a bit and retries the rebalance process. In practice, the rebalance process often stabilizes after only a few retries.
?
3.3 Delivery Guarantees
In general, Kafka only guarantees at-least-once delivery. Exactly once delivery typically requires two-phase commits and is not necessary for our applications.
這塊不是kafka的重點, 不需要如兩段提交這種commit機制. 一般情況下, 我們是可以保證Exactly once delivery, 但如果一個consumer讀完數據后在更新zookeeper之前掛了, 那后續的consumer是有可能讀到重復數據的.?
Kafka guarantees that messages from a single partition are delivered to a consumer in order. However, there is no guarantee on the ordering of messages coming from different partitions.
To avoid log corruption, Kafka stores a CRC for each message in the log.?使用CRC來防止網絡錯誤, 數據被篡改
If a broker goes down, any message stored on it not yet consumed becomes unavailable. If the storage system on a broker is permanently damaged, any unconsumed message is lost forever.?
如果broker crash, 會有數據丟失問題.
In the future, we plan to add built-in?replication?in Kafka to redundantly store each message on multiple brokers.
?
4. Kafka Usage at LinkedIn
We have one Kafka cluster co-located with each datacenter where our userfacing services run.
首先在run service的datacenter 跑個kafka集群用于收集數據
The frontend services generate various kinds of log data and publish it to the local Kafka brokers in batches.?
We rely on a hardware load-balancer to distribute the publish requests to the set of Kafka brokers evenly.
這個很重要, 必須要保證distribute the publish requests的balance, 因為后面無法彌補這種unbalance
The online consumers of Kafka run in services within the same datacenter.
對于這個集群, 我們采用online consumer, 來實時分析
We also deploy a cluster of Kafka in a separate datacenter for offline analysis, located geographically close to our Hadoop?
cluster and other data warehouse infrastructure.
在離Hadoop集群和數據倉庫比較近的地方, 建一個為了offline分析的kafka集群
This instance of Kafka runs a set of embedded consumers to pull data from the Kafka instances in the live datacenters.
consumer本身可以是另一個kafka集群, 很有創意的用法...
We then run data load jobs to pull data from this replica cluster of Kafka into Hadoop and our data warehouse, where we run various reporting jobs and analytical process on the data.
總結
以上是生活随笔為你收集整理的Kafka Design的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Spark Summit EU 201
- 下一篇: php接口 接受ios或android端