详述 Kafka 基本原理
文章目錄
- 1 簡介
- 2 Kafka 架構
- 3 Kafka 存儲策略
- 4 Kafka 刪除策略
- 5 Kafka broker
- 6 Kafka 官方文檔
- 7 代碼示例
1 簡介
Apache Kafka 是分布式發布-訂閱消息系統。它最初由 LinkedIn 公司開發,之后成為 Apache 項目的一部分。Kafka 是一種快速、可擴展的、設計內在就是分布式的,分區的和可復制的提交日志服務。
2 Kafka 架構
它的架構包括以下組件:
- 話題(Topic):是特定類型的消息流。消息是字節的有效負載(Payload),話題是消息的分類名或種子(Feed)名。
- 生產者(Producer):是能夠發布消息到話題的任何對象。
- 服務代理(Broker):已發布的消息保存在一組服務器中,它們被稱為代理(Broker)或 Kafka 集群。
- 消費者(Consumer):可以訂閱一個或多個話題,并從Broker拉數據,從而消費這些已發布的消息。
3 Kafka 存儲策略
- Kafka 以topic來進行消息管理,每個topic包含多個partition,每個partition對應一個邏輯log,有多個segment組成。
- 每個segment中存儲多條消息(見下圖),消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
- 每個part在內存中對應一個index,記錄每個segment中的第一條消息偏移。
- 發布者發到某topic的消息會被均勻的分布到多個partition上(或根據用戶指定的路由規則進行分布),broker收到發布消息往對應partition的最后一個segment上添加該消息,當某個segment上的消息條數達到配置值或消息發布時間超過閾值時,segment上的消息會被flush到磁盤,只有flush到磁盤上的消息訂閱者才能訂閱到,segment達到一定的大小后將不會再往該segment寫數據,broker會創建新的segment。
4 Kafka 刪除策略
- N天前的刪除。
- 保留最近的MGB數據。
5 Kafka broker
與其它消息系統不同,Kafka broker是無狀態的。這意味著消費者必須維護已消費的狀態信息。這些信息由消費者自己維護,broker完全不管(由offset managerbroker管理)。
從代理刪除消息變得很棘手,因為代理并不知道消費者是否已經使用了該消息。Kafka 創新性地解決了這個問題,它將一個簡單的基于時間的 SLA 應用于保留策略。當消息在代理中超過一定時間后,將會被自動刪除。
這種創新設計有很大的好處,消費者可以故意倒回到老的偏移量再次消費數據。這違反了隊列的常見約定,但被證明是許多消費者的基本特征。
6 Kafka 官方文檔
Kafka Design
- 目標
- 高吞吐量來支持高容量的事件流處理
- 支持從離線系統加載數據
- 低延遲的消息系統
- 持久化
- 依賴文件系統,持久化到本地
- 數據持久化到log
- 效率
- 解決small IO problem:
- 使用message set組合消息。
- server使用chunks of messages寫到log
- consumer一次獲取大的消息塊。
- 解決byte copying:
- 在producer、broker和consumer之間使用統一的binary message format
- 使用系統pagecache
- 使用sendfile傳輸log,避免拷貝
- 解決small IO problem:
端到端的批量壓縮(End-to-end Batch Compression),Kafka 支持 GZIP 和 Snappy 壓縮協議。
The Producer
- 負載均衡
- producer可以自定義發送到哪個partition的路由規則。默認路由規則:hash(key) % numPartitions,如果key為null則隨機選擇一個partition。
- 自定義路由:如果key是一個user id,可以把同一個user的消息發送到同一個partition,這時consume就可以從同一個partition讀取同一個user的消息。
- 異步批量發送
- 批量發送:配置不多于固定消息數目一起發送并且等待時間小于一個固定延遲的數據。
The Consumer
consumer控制消息的讀取。
Push vs Pull:
- producer推(push)數據到broker,consumer從broker拉(pull)數據
- consumer拉的優點:consumer自己控制消息的讀取速度和數量
- consumer拉的缺點:如果broker沒有數據,則可能要pull多次忙等待,Kafka 可以配置consumer long pull一直等到有數據
Consumer Position:
- 大部分消息系統由broker記錄哪些消息被消費了,但 Kafka 不是
- Kafka 由consumer控制消息的消費,consumer甚至可以回到一個old offset的位置再次消費消息
Message Delivery Semantics:
- 至多一次(At most once ),消息可能丟失,但不會重復
- 至少一次(At least once),消息不會丟失,但可能重復
- 恰好一次(Exactly once),這正是我們想要的,消息僅被發送一次
Producer:有個acks配置可以控制接收的leader的在什么情況下就回應producer消息寫入成功。
Consumer:
- 讀取消息,寫log,處理消息。如果處理消息失敗,log已經寫入,則無法再次處理失敗的消息,對應At most once。
- 讀取消息,處理消息,寫log。如果消息處理成功,寫log失敗,則消息會被處理兩次,對應At least once。
- 讀取消息,同時處理消息并把result和log同時寫入,這樣保證result和log同時更新或同時失敗,對應Exactly once。
Kafka 默認保證at-least-once delivery,容許用戶實現at-most-once語義,exactly-once的實現取決于目的存儲系統,Kafka 提供了讀取offset,實現也沒有問題。
復制(Replication)
- 一個partition的復制個數(replication factor)包括這個partition的leader本身。
- 所有對partition的讀和寫都通過leader。
- Followers通過pull獲取leader上log(message和offset)
- 如果一個follower掛掉、卡住或者同步太慢,leader會把這個follower從in sync replicas(ISR)列表中刪除。
- 當所有的in sync replicas的follower把一個消息寫入到自己的log中時,這個消息才被認為是committed的。
- 如果針對某個partition的所有復制節點都掛了,Kafka 選擇最先復活的那個節點作為leader(這個節點不一定在ISR里)。
日志壓縮(Log Compaction)
- 針對一個topic的partition,壓縮使得 Kafka 至少知道每個key對應的最后一個值。
- 壓縮不會重排序消息。
- 消息的offset是不會變的。
- 消息的offset是順序的。
Distribution
-
Consumer Offset Tracking
-
High-level consumer 記錄每個 partition 所消費的 maximum offset,并定期 commit 到 offset manager(broker)。
-
Simple consumer 需要手動管理 offset。現在的 Simple consumer Java API 只支持 commit offset 到 zookeeper。
-
Consumers and Consumer Groups
-
consumer 注冊到 zookeeper
-
屬于同一個 group 的 consumer(group id 一樣)平均分配 partition,每個 partition 只會被一個 consumer 消費。
-
當 broker 或同一個 group 的其他 consumer 的狀態發生變化的時候,consumer rebalance 就會發生。
Zookeeper 協調控制
- 管理broker與consumer的動態加入與離開。
- 觸發負載均衡,當broker或consumer加入或離開時會觸發負載均衡算法,使得一個 consumer組內的多個consumer的訂閱負載平衡。
- 維護消費關系及每個partition的消費信息。
7 代碼示例
生產者代碼示例:
import java.util.*;import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;public class TestProducer {public static void main(String[] args) {long events = Long.parseLong(args[0]);Random rnd = new Random();Properties props = new Properties();props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("partitioner.class", "example.producer.SimplePartitioner");props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);Producer<String, String> producer = new Producer<String, String>(config);for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = “192.168.2.” + rnd.nextInt(255); String msg = runtime + “,www.example.com,” + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);producer.send(data);}producer.close();} }Partitioning Code:
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties;public class SimplePartitioner implements Partitioner {public SimplePartitioner (VerifiableProperties props) {}public int partition(Object key, int a_numPartitions) {int partition = 0;String stringKey = (String) key;int offset = stringKey.lastIndexOf('.');if (offset > 0) {partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;}return partition;} }消費者代碼示例:
import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class ConsumerGroupExample {private final ConsumerConnector consumer;private final String topic;private ExecutorService executor;public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));this.topic = a_topic;}public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");}} catch (InterruptedException e) {System.out.println("Interrupted during shutdown, exiting uncleanly");}}public void run(int a_numThreads) {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);// now launch all the threads//executor = Executors.newFixedThreadPool(a_numThreads);// now create an object to consume the messages//int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber));threadNumber++;}}private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);props.put("group.id", a_groupId);props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);}public static void main(String[] args) {String zooKeeper = args[0];String groupId = args[1];String topic = args[2];int threads = Integer.parseInt(args[3]);ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);example.run(threads);try {Thread.sleep(10000);} catch (InterruptedException ie) {}example.shutdown();} }ConsumerTest:
import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream;public class ConsumerTest implements Runnable {private KafkaStream m_stream;private int m_threadNumber;public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {m_threadNumber = a_threadNumber;m_stream = a_stream;}public void run() {ConsumerIterator<byte[], byte[]> it = m_stream.iterator();while (it.hasNext())System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));System.out.println("Shutting down Thread: " + m_threadNumber);} }轉載聲明:本文轉自博客園「阿凡盧」,Kafka基本原理。
總結
以上是生活随笔為你收集整理的详述 Kafka 基本原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Influx Sql系列教程三:meas
- 下一篇: 易到用车服务器修复了提现,易到用车提现最