Kafka Consumer多线程实例
Kafka 0.9版本開始推出了Java版本的consumer,優(yōu)化了coordinator的設(shè)計以及擺脫了對zookeeper的依賴。社區(qū)最近也在探討正式用這套consumer API替換Scala版本的consumer的計劃。鑒于目前這方面的資料并不是很多,本文將嘗試給出一個利用KafkaConsumer編寫的多線程消費者實例,希望對大家有所幫助。
? ? 這套API最重要的入口就是KafkaConsumer(o.a.k.clients.consumer.KafkaConsumer),普通的單線程使用方法官網(wǎng)API已有介紹,這里不再贅述了。因此,我們直奔主題——討論一下如何創(chuàng)建多線程的方式來使用KafkaConsumer。KafkaConsumer和KafkaProducer不同,后者是線程安全的,因此我們鼓勵用戶在多個線程中共享一個KafkaProducer實例,這樣通常都要比每個線程維護一個KafkaProducer實例效率要高。但對于KafkaConsumer而言,它不是線程安全的,所以實現(xiàn)多線程時通常由兩種實現(xiàn)方法:
1 每個線程維護一個KafkaConsumer
2? 維護一個或多個KafkaConsumer,同時維護多個事件處理線程(worker thread)
當然,這種方法還可以有多個變種:比如每個worker線程有自己的處理隊列。consumer根據(jù)某種規(guī)則或邏輯將消息放入不同的隊列。不過總體思想還是相同的,故這里不做過多展開討論了。
下表總結(jié)了兩種方法的優(yōu)缺點:?
| ? | 優(yōu)點 | 缺點 |
| 方法1(每個線程維護一個KafkaConsumer) | 方便實現(xiàn) 速度較快,因為不需要任何線程間交互 易于維護分區(qū)內(nèi)的消息順序 | 更多的TCP連接開銷(每個線程都要維護若干個TCP連接) consumer數(shù)受限于topic分區(qū)數(shù),擴展性差 頻繁請求導致吞吐量下降 線程自己處理消費到的消息可能會導致超時,從而造成rebalance |
| 方法2 (單個(或多個)consumer,多個worker線程) | 可獨立擴展consumer數(shù)和worker數(shù),伸縮性好 | 實現(xiàn)麻煩 通常難于維護分區(qū)內(nèi)的消息順序 處理鏈路變長,導致難以保證提交位移的語義正確性? |
?
下面我們分別實現(xiàn)這兩種方法。需要指出的是,下面的代碼都是最基本的實現(xiàn),并沒有考慮很多編程細節(jié),比如如何處理錯誤等。
方法1
ConsumerRunnable類
1 import org.apache.kafka.clients.consumer.ConsumerRecord;2 import org.apache.kafka.clients.consumer.ConsumerRecords;3 import org.apache.kafka.clients.consumer.KafkaConsumer;4 5 import java.util.Arrays;6 import java.util.Properties;7 8 public class ConsumerRunnable implements Runnable {9 10 // 每個線程維護私有的KafkaConsumer實例 11 private final KafkaConsumer<String, String> consumer; 12 13 public ConsumerRunnable(String brokerList, String groupId, String topic) { 14 Properties props = new Properties(); 15 props.put("bootstrap.servers", brokerList); 16 props.put("group.id", groupId); 17 props.put("enable.auto.commit", "true"); //本例使用自動提交位移 18 props.put("auto.commit.interval.ms", "1000"); 19 props.put("session.timeout.ms", "30000"); 20 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 21 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 22 this.consumer = new KafkaConsumer<>(props); 23 consumer.subscribe(Arrays.asList(topic)); // 本例使用分區(qū)副本自動分配策略 24 } 25 26 @Override 27 public void run() { 28 while (true) { 29 ConsumerRecords<String, String> records = consumer.poll(200); // 本例使用200ms作為獲取超時時間 30 for (ConsumerRecord<String, String> record : records) { 31 // 這里面寫處理消息的邏輯,本例中只是簡單地打印消息 32 System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() + 33 "th message with offset: " + record.offset()); 34 } 35 } 36 } 37 }ConsumerGroup類
1 package com.my.kafka.test;2 3 import java.util.ArrayList;4 import java.util.List;5 6 public class ConsumerGroup {7 8 private List<ConsumerRunnable> consumers;9 10 public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) { 11 consumers = new ArrayList<>(consumerNum); 12 for (int i = 0; i < consumerNum; ++i) { 13 ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic); 14 consumers.add(consumerThread); 15 } 16 } 17 18 public void execute() { 19 for (ConsumerRunnable task : consumers) { 20 new Thread(task).start(); 21 } 22 } 23 }ConsumerMain類
1 public class ConsumerMain {2 3 public static void main(String[] args) {4 String brokerList = "localhost:9092";5 String groupId = "testGroup1";6 String topic = "test-topic";7 int consumerNum = 3;8 9 ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList); 10 consumerGroup.execute(); 11 } 12 }?
方法2
Worker類
1 import org.apache.kafka.clients.consumer.ConsumerRecord;2 3 public class Worker implements Runnable {4 5 private ConsumerRecord<String, String> consumerRecord;6 7 public Worker(ConsumerRecord record) {8 this.consumerRecord = record;9 } 10 11 @Override 12 public void run() { 13 // 這里寫你的消息處理邏輯,本例中只是簡單地打印消息 14 System.out.println(Thread.currentThread().getName() + " consumed " + consumerRecord.partition() 15 + "th message with offset: " + consumerRecord.offset()); 16 } 17 }ConsumerHandler類
1 import org.apache.kafka.clients.consumer.ConsumerRecord;2 import org.apache.kafka.clients.consumer.ConsumerRecords;3 import org.apache.kafka.clients.consumer.KafkaConsumer;4 5 import java.util.Arrays;6 import java.util.Properties;7 import java.util.concurrent.ArrayBlockingQueue;8 import java.util.concurrent.ExecutorService;9 import java.util.concurrent.ThreadPoolExecutor; 10 import java.util.concurrent.TimeUnit; 11 12 public class ConsumerHandler { 13 14 // 本例中使用一個consumer將消息放入后端隊列,你當然可以使用前一種方法中的多實例按照某張規(guī)則同時把消息放入后端隊列 15 private final KafkaConsumer<String, String> consumer; 16 private ExecutorService executors; 17 18 public ConsumerHandler(String brokerList, String groupId, String topic) { 19 Properties props = new Properties(); 20 props.put("bootstrap.servers", brokerList); 21 props.put("group.id", groupId); 22 props.put("enable.auto.commit", "true"); 23 props.put("auto.commit.interval.ms", "1000"); 24 props.put("session.timeout.ms", "30000"); 25 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 26 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 27 consumer = new KafkaConsumer<>(props); 28 consumer.subscribe(Arrays.asList(topic)); 29 } 30 31 public void execute(int workerNum) { 32 executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, 33 new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); 34 35 while (true) { 36 ConsumerRecords<String, String> records = consumer.poll(200); 37 for (final ConsumerRecord record : records) { 38 executors.submit(new Worker(record)); 39 } 40 } 41 } 42 43 public void shutdown() { 44 if (consumer != null) { 45 consumer.close(); 46 } 47 if (executors != null) { 48 executors.shutdown(); 49 } 50 try { 51 if (!executors.awaitTermination(10, TimeUnit.SECONDS)) { 52 System.out.println("Timeout.... Ignore for this case"); 53 } 54 } catch (InterruptedException ignored) { 55 System.out.println("Other thread interrupted this shutdown, ignore for this case."); 56 Thread.currentThread().interrupt(); 57 } 58 } 59 60 }Main類
1 public class Main {2 3 public static void main(String[] args) {4 String brokerList = "localhost:9092,localhost:9093,localhost:9094";5 String groupId = "group2";6 String topic = "test-topic";7 int workerNum = 5;8 9 ConsumerHandler consumers = new ConsumerHandler(brokerList, groupId, topic); 10 consumers.execute(workerNum); 11 try { 12 Thread.sleep(1000000); 13 } catch (InterruptedException ignored) {} 14 consumers.shutdown(); 15 } 16 }?
總結(jié)一下,這兩種方法或是模型都有各自的優(yōu)缺點,在具體使用時需要根據(jù)自己實際的業(yè)務特點來選取對應的方法。就我個人而言,我比較推崇第二種方法以及背后的思想,即不要將很重的處理邏輯放入消費者的代碼中,很多Kafka consumer使用者碰到的各種rebalance超時、coordinator重新選舉、心跳無法維持等問題都來源于此。
?
在第二種用法中我使用的是自動提交的方式,省去了多線程提交位移的麻煩。很多人跑來問如果是手動提交應該怎么寫?由于KafkaConsumer不是線程安全的,因此我們不能簡單地在多個線程中直接調(diào)用consumer.commitSync來提交位移。本文將給出一個實際的例子來模擬多線程消費以及手動提交位移。
本例中包含3個類:
- ConsumerThreadHandler類:consumer多線程的管理類,用于創(chuàng)建線程池以及為每個線程分配任務。另外consumer位移的提交也在這個類中進行
- ConsumerWorker類:本質(zhì)上是一個Runnable,執(zhí)行真正的消費邏輯并上報位移信息給ConsumerThreadHandler
- Main類:測試主方法類
測試代碼
ConsumerWorker類
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | package?huxi.test.consumer.multithreaded; ? import?org.apache.kafka.clients.consumer.ConsumerRecord; import?org.apache.kafka.clients.consumer.ConsumerRecords; import?org.apache.kafka.clients.consumer.OffsetAndMetadata; import?org.apache.kafka.common.TopicPartition; ? import?java.util.List; import?java.util.Map; ? public?class?ConsumerWorker<K, V>?implements?Runnable { ? ????private?final?ConsumerRecords<K, V> records; ????private?final?Map<TopicPartition, OffsetAndMetadata> offsets; ? ????public?ConsumerWorker(ConsumerRecords<K, V> record, Map<TopicPartition, OffsetAndMetadata> offsets) { ????????this.records = record; ????????this.offsets = offsets; ????} ? ????@Override ????public?void?run() { ????????for?(TopicPartition partition : records.partitions()) { ????????????List<ConsumerRecord<K, V>> partitionRecords = records.records(partition); ????????????for?(ConsumerRecord<K, V> record : partitionRecords) { ????????????????// 插入消息處理邏輯,本例只是打印消息 ????????????????System.out.println(String.format("topic=%s, partition=%d, offset=%d", ????????????????????????record.topic(), record.partition(), record.offset())); ????????????} ? ????????????// 上報位移信息 ????????????long?lastOffset = partitionRecords.get(partitionRecords.size() -?1).offset(); ????????????synchronized?(offsets) { ????????????????if?(!offsets.containsKey(partition)) { ????????????????????offsets.put(partition,?new?OffsetAndMetadata(lastOffset +?1)); ????????????????}?else?{ ????????????????????long?curr = offsets.get(partition).offset(); ????????????????????if?(curr <= lastOffset +?1) { ????????????????????????offsets.put(partition,?new?OffsetAndMetadata(lastOffset +?1)); ????????????????????} ????????????????} ????????????} ????????} ????} } |
ConsumerThreadHandler類
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | package?huxi.test.consumer.multithreaded; ? import?org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import?org.apache.kafka.clients.consumer.ConsumerRecords; import?org.apache.kafka.clients.consumer.KafkaConsumer; import?org.apache.kafka.clients.consumer.OffsetAndMetadata; import?org.apache.kafka.common.TopicPartition; import?org.apache.kafka.common.errors.WakeupException; ? import?java.util.Arrays; import?java.util.Collection; import?java.util.Collections; import?java.util.HashMap; import?java.util.Map; import?java.util.Properties; import?java.util.concurrent.ArrayBlockingQueue; import?java.util.concurrent.ExecutorService; import?java.util.concurrent.ThreadPoolExecutor; import?java.util.concurrent.TimeUnit; ? public?class?ConsumerThreadHandler<K, V> { ? ????private?final?KafkaConsumer<K, V> consumer; ????private?ExecutorService executors; ????private?final?Map<TopicPartition, OffsetAndMetadata> offsets =?new?HashMap<>(); ? ????public?ConsumerThreadHandler(String brokerList, String groupId, String topic) { ????????Properties props =?new?Properties(); ????????props.put("bootstrap.servers", brokerList); ????????props.put("group.id", groupId); ????????props.put("enable.auto.commit",?"false"); ????????props.put("auto.offset.reset",?"earliest"); ????????props.put("key.deserializer",?"org.apache.kafka.common.serialization.ByteArrayDeserializer"); ????????props.put("value.deserializer",?"org.apache.kafka.common.serialization.ByteArrayDeserializer"); ????????consumer =?new?KafkaConsumer<>(props); ????????consumer.subscribe(Arrays.asList(topic),?new?ConsumerRebalanceListener() { ????????????@Override ????????????public?void?onPartitionsRevoked(Collection<TopicPartition> partitions) { ????????????????consumer.commitSync(offsets); ????????????} ? ????????????@Override ????????????public?void?onPartitionsAssigned(Collection<TopicPartition> partitions) { ????????????????offsets.clear(); ????????????} ????????}); ????} ? ????/** ?????* 消費主方法 ?????* @param threadNumber? 線程池中線程數(shù) ?????*/ ????public?void?consume(int?threadNumber) { ????????executors =?new?ThreadPoolExecutor( ????????????????threadNumber, ????????????????threadNumber, ????????????????0L, ????????????????TimeUnit.MILLISECONDS, ????????????????new?ArrayBlockingQueue<Runnable>(1000), ????????????????new?ThreadPoolExecutor.CallerRunsPolicy()); ????????try?{ ????????????while?(true) { ????????????????ConsumerRecords<K, V> records = consumer.poll(1000L); ????????????????if?(!records.isEmpty()) { ????????????????????executors.submit(new?ConsumerWorker<>(records, offsets)); ????????????????} ????????????????commitOffsets(); ????????????} ????????}?catch?(WakeupException e) { ????????????// swallow this exception ????????}?finally?{ ????????????commitOffsets(); ????????????consumer.close(); ????????} ????} ? ????private?void?commitOffsets() { ????????// 盡量降低synchronized塊對offsets鎖定的時間 ????????Map<TopicPartition, OffsetAndMetadata> unmodfiedMap; ????????synchronized?(offsets) { ????????????if?(offsets.isEmpty()) { ????????????????return; ????????????} ????????????unmodfiedMap = Collections.unmodifiableMap(new?HashMap<>(offsets)); ????????????offsets.clear(); ????????} ????????consumer.commitSync(unmodfiedMap); ????} ? ????public?void?close() { ????????consumer.wakeup(); ????????executors.shutdown(); ????} } |
Main類
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | package?huxi.test.consumer.multithreaded; ? public?class?Main { ? ????public?static?void?main(String[] args) { ????????String brokerList =?"localhost:9092"; ????????String topic =?"test-topic"; ????????String groupID =?"test-group"; ????????final?ConsumerThreadHandler<byte[],?byte[]> handler =?new?ConsumerThreadHandler<>(brokerList, groupID, topic); ????????final?int?cpuCount = Runtime.getRuntime().availableProcessors(); ? ????????Runnable runnable =?new?Runnable() { ????????????@Override ????????????public?void?run() { ????????????????handler.consume(cpuCount); ????????????} ????????}; ????????new?Thread(runnable).start(); ? ????????try?{ ????????????// 20秒后自動停止該測試程序 ????????????Thread.sleep(20000L); ????????}?catch?(InterruptedException e) { ????????????// swallow this exception ????????} ????????System.out.println("Starting to close the consumer..."); ????????handler.close(); ????} } |
測試步驟
1. 首先創(chuàng)建一個測試topic: test-topic,10個分區(qū),并使用kafka-producer-perf-test.sh腳本生產(chǎn)50萬條消息
2. 運行Main,假定group.id設(shè)置為test-group
3. 新開一個終端,不斷地運行以下腳本監(jiān)控consumer group的消費進度
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
測試結(jié)果
LAG列全部為0表示consumer group的位移提交正常。值得一提的是,各位可以通過控制consumer.poll的超時時間來控制ConsumerThreadHandler類提交位移的頻率。
感謝QQ群友的提醒,這種方式有丟失數(shù)據(jù)的時間窗口——假設(shè)T1線程在t0時間消費分區(qū)0的位移=100的消息M1,而T2線程在t1時間消費分區(qū)0的位移=101的消息M2。現(xiàn)在假設(shè)t3時T2線程先完成處理,于是上報位移101給Handler,但此時T1線程尚未處理完成。t4時handler提交位移101,之后T1線程發(fā)生錯誤,拋出異常導致位移100的消息消費失敗,但由于位移已經(jīng)提交到101,故消息丟失~。
總結(jié)
以上是生活随笔為你收集整理的Kafka Consumer多线程实例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kafka rebalance与数据重复
- 下一篇: 正确处理kafka多线程消费的姿势