Kafka Consumer多线程消费
- 概述?
- OrdinaryConsumer類
- ConsumerWorker.java
- MultiThreadedConsumer.java
- MultiThreadedRebalanceListener.java
- Test.java
上一篇《Kafka Consumer多線程實例續(xù)篇》修正了多線程提交位移的問題,但依然可能出現(xiàn)數(shù)據(jù)丟失的情況,原因在于多個線程可能拿到相同分區(qū)的數(shù)據(jù),而消費的順序會破壞消息本身在分區(qū)中的順序,因而擾亂位移的提交。這次我使用KafkaConsumer的pause和resume方法來防止這種情形的發(fā)生。另外,本次我會編寫一個測試類用于驗證消費相同數(shù)量消息時,單線程消費速度要遠遜于多線程消費。
回到頂部
概述?
這一次,我編寫了5個java文件,它們分別是:
- OrdinaryConsumer.java:普通的單線程Consumer,用于后面進行性能測試對比用。
- ConsumerWorker.java:多線程消息處理類,本質(zhì)上就是一個Runnable。會被提交給線程池用于實際消息處理。
- MultiThreadedConsumer.java:多線程Consumer主控類,用于將消息分配給不同的ConsumerWorker,并且管理位移的提交。
- MultiThreadedRebalanceListener.java:為多線程Consumer服務(wù)的Rebalance監(jiān)聽器。
- Test.java:用于測試單線程和多線程性能。
回到頂部
OrdinaryConsumer類
單線程的Consumer最簡單,我首先給出它的代碼:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | package?huxihx.mtc; ? import?org.apache.kafka.clients.consumer.Consumer; import?org.apache.kafka.clients.consumer.ConsumerConfig; import?org.apache.kafka.clients.consumer.ConsumerRecord; import?org.apache.kafka.clients.consumer.ConsumerRecords; import?org.apache.kafka.clients.consumer.KafkaConsumer; import?org.apache.kafka.common.serialization.StringDeserializer; ? import?java.time.Duration; import?java.util.Collections; import?java.util.Properties; import?java.util.concurrent.ThreadLocalRandom; ? /** ?* 單線程Consumer ?*/ public?class?OrdinaryConsumer { ? ????private?final?Consumer<String, String> consumer; ????private?final?int?expectedCount;?// 用于測試的消息數(shù)量 ? ????public?OrdinaryConsumer(String brokerId, String topic, String groupID,?int?expectedCount) { ????????Properties props =?new?Properties(); ????????props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerId); ????????props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); ????????props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); ????????props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,?"true"); ????????props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID); ????????props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,?"earliest"); ????????consumer =?new?KafkaConsumer<>(props); ????????consumer.subscribe(Collections.singletonList(topic)); ????????this.expectedCount = expectedCount; ????} ? ????public?void?run() { ????????try?{ ????????????int?alreadyConsumed =?0; ????????????while?(alreadyConsumed < expectedCount) { ????????????????ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); ????????????????alreadyConsumed += records.count(); ????????????????records.forEach(this::handleRecord); ????????????} ????????}?finally?{ ????????????consumer.close(); ????????} ????} ? ????private?void?handleRecord(ConsumerRecord<String, String> record) { ????????try?{ ????????????// 模擬每條消息10毫秒處理 ????????????Thread.sleep(ThreadLocalRandom.current().nextInt(10)); ????????}?catch?(InterruptedException ignored) { ????????????Thread.currentThread().interrupt(); ????????} ????????System.out.println(Thread.currentThread().getName() +?" finished message processed. Record offset = "?+ record.offset()); ????} } |
?代碼很簡單,沒什么可說的。唯一要說的是Consumer會模擬10毫秒處理一條事件。后面多線程Consumer我們也會使用相同的標準。
回到頂部
ConsumerWorker.java
接下來是消息處理的Runnable類:ConsumerWorker。和上一篇相比,這次最大的不同在于每個Worker只處理相同分區(qū)下的消息,而不是向之前那樣處理多個分區(qū)中的消息。這樣做的好處在于一旦某個分區(qū)的消息分配給了這個Worker,我可以暫停這個分區(qū)的可消費狀態(tài),直到這個Worker全部處理完成。如果是混著多個分區(qū)的消息一起處理,實現(xiàn)這個就比較困難。ConsumerWorker代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | package?huxihx.mtc; ? import?org.apache.kafka.clients.consumer.ConsumerRecord; ? import?java.util.List; import?java.util.concurrent.CompletableFuture; import?java.util.concurrent.ThreadLocalRandom; import?java.util.concurrent.TimeUnit; import?java.util.concurrent.atomic.AtomicLong; import?java.util.concurrent.locks.ReentrantLock; ? public?class?ConsumerWorker<K, V> { ? ????private?final?List<ConsumerRecord<K, V>> recordsOfSamePartition; ????private?volatile?boolean?started =?false; ????private?volatile?boolean?stopped =?false; ????private?final?ReentrantLock lock =?new?ReentrantLock(); ? ????private?final?long?INVALID_COMMITTED_OFFSET = -1L; ????private?final?AtomicLong latestProcessedOffset =?new?AtomicLong(INVALID_COMMITTED_OFFSET); ????private?final?CompletableFuture<Long> future =?new?CompletableFuture<>(); ? ????public?ConsumerWorker(List<ConsumerRecord<K, V>> recordsOfSamePartition) { ????????this.recordsOfSamePartition = recordsOfSamePartition; ????} ? ????public?boolean?run() { ????????lock.lock(); ????????if?(stopped) ????????????return?false; ????????started =?true; ????????lock.unlock(); ????????for?(ConsumerRecord<K, V> record : recordsOfSamePartition) { ????????????if?(stopped) ????????????????break; ????????????handleRecord(record); ????????????if?(latestProcessedOffset.get() < record.offset() +?1) ????????????????latestProcessedOffset.set(record.offset() +?1); ????????} ????????return?future.complete(latestProcessedOffset.get()); ????} ? ????public?long?getLatestProcessedOffset() { ????????return?latestProcessedOffset.get(); ????} ? ????private?void?handleRecord(ConsumerRecord<K, V> record) { ????????try?{ ????????????Thread.sleep(ThreadLocalRandom.current().nextInt(10)); ????????}?catch?(InterruptedException ignored) { ????????????Thread.currentThread().interrupt(); ????????} ????????System.out.println(Thread.currentThread().getName() +?" finished message processed. Record offset = "?+ record.offset()); ????} ? ????public?void?close() { ????????lock.lock(); ????????this.stopped =?true; ????????if?(!started) { ????????????future.complete(latestProcessedOffset.get()); ????????} ????????lock.unlock(); ????} ? ????public?boolean?isFinished() { ????????return?future.isDone(); ????} ? ????public?long?waitForCompletion(long?timeout, TimeUnit timeUnit) { ????????try?{ ????????????return?future.get(timeout, timeUnit); ????????}?catch?(Exception e) { ????????????if?(e?instanceof?InterruptedException) ????????????????Thread.currentThread().interrupt(); ????????????return?INVALID_COMMITTED_OFFSET; ????????} ????} } |
需要說明的地方有以下幾點:
- latestProcessedOffset:使用這個變量保存該Worker當前已消費的最新位移。
- future:使用CompletableFuture來保存Worker要提交的位移。
- Worker成功操作與否的標志就是看這個future是否將latestProcessedOffset值封裝到結(jié)果中。
- handleRecord和單線程Consumer中的一致,模擬10ms處理消息。
回到頂部
MultiThreadedConsumer.java
構(gòu)建好了ConsumerWorker類之后,下面是編寫多線程Consumer的主控類,該類循環(huán)執(zhí)行:1、創(chuàng)建Consumer;2、讀取訂閱分區(qū)的消息;3、將消息按照不同分區(qū)進行歸組分發(fā)給不同的線程;4、暫停這些分區(qū)的后續(xù)消費,同時等待Worker線程完成消息處理;5、提交這些分區(qū)的位移;6、恢復(fù)這些分區(qū)的消費。
以下代碼是MultiThreadedConsumer類的完整代碼:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | package?huxihx.mtc; ? import?org.apache.kafka.clients.consumer.Consumer; import?org.apache.kafka.clients.consumer.ConsumerConfig; import?org.apache.kafka.clients.consumer.ConsumerRecord; import?org.apache.kafka.clients.consumer.ConsumerRecords; import?org.apache.kafka.clients.consumer.KafkaConsumer; import?org.apache.kafka.clients.consumer.OffsetAndMetadata; import?org.apache.kafka.common.TopicPartition; import?org.apache.kafka.common.serialization.StringDeserializer; ? import?java.time.Duration; import?java.util.Collections; import?java.util.HashMap; import?java.util.HashSet; import?java.util.List; import?java.util.Map; import?java.util.Properties; import?java.util.Set; import?java.util.concurrent.CompletableFuture; import?java.util.concurrent.Executor; import?java.util.concurrent.Executors; ? public?class?MultiThreadedConsumer { ? ????private?final?Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers =?new?HashMap<>(); ????private?final?Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =?new?HashMap<>(); ????private?long?lastCommitTime = System.currentTimeMillis(); ????private?final?Consumer<String, String> consumer; ????private?final?int?DEFAULT_COMMIT_INTERVAL =?3000; ????private?final?Map<TopicPartition, Long> currentConsumedOffsets =?new?HashMap<>(); ????private?final?long?expectedCount; ? ????private?final?static?Executor executor = Executors.newFixedThreadPool( ????????????Runtime.getRuntime().availableProcessors() *?10, r -> { ????????????????Thread t =?new?Thread(r); ????????????????t.setDaemon(true); ????????????????return?t; ????????????}); ? ????public?MultiThreadedConsumer(String brokerId, String topic, String groupID,?long?expectedCount) { ????????Properties props =?new?Properties(); ????????props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerId); ????????props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); ????????props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); ????????props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,?"false"); ????????props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID); ????????props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,?"earliest"); ????????consumer =?new?KafkaConsumer<>(props); ????????consumer.subscribe(Collections.singletonList(topic),?new?MultiThreadedRebalanceListener(consumer, outstandingWorkers, offsetsToCommit)); ????????this.expectedCount = expectedCount; ????} ? ????public?void?run() { ????????try?{ ????????????while?(true) { ????????????????ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); ????????????????distributeRecords(records); ????????????????checkOutstandingWorkers(); ????????????????commitOffsets(); ????????????????if?(currentConsumedOffsets.values().stream().mapToLong(Long::longValue).sum() >= expectedCount) { ????????????????????break; ????????????????} ????????????} ????????}?finally?{ ????????????consumer.close(); ????????} ????} ? ????/** ?????* 對已完成消息處理并提交位移的分區(qū)執(zhí)行resume操作 ?????*/ ????private?void?checkOutstandingWorkers() { ????????Set<TopicPartition> completedPartitions =?new?HashSet<>(); ????????outstandingWorkers.forEach((tp, worker) -> { ????????????if?(worker.isFinished()) { ????????????????completedPartitions.add(tp); ????????????} ????????????long?offset = worker.getLatestProcessedOffset(); ????????????currentConsumedOffsets.put(tp, offset); ????????????if?(offset > 0L) { ????????????????offsetsToCommit.put(tp,?new?OffsetAndMetadata(offset)); ????????????} ????????}); ????????completedPartitions.forEach(outstandingWorkers::remove); ????????consumer.resume(completedPartitions); ????} ? ????/** ?????* 提交位移 ?????*/ ????private?void?commitOffsets() { ????????try?{ ????????????long?currentTime = System.currentTimeMillis(); ????????????if?(currentTime - lastCommitTime > DEFAULT_COMMIT_INTERVAL && !offsetsToCommit.isEmpty()) { ????????????????consumer.commitSync(offsetsToCommit); ????????????????offsetsToCommit.clear(); ????????????} ????????????lastCommitTime = currentTime; ????????}?catch?(Exception e) { ????????????e.printStackTrace(); ????????} ????} ? ????/** ?????* 將不同分區(qū)的消息交由不同的線程,同時暫停該分區(qū)消息消費 ?????* @param records ?????*/ ????private?void?distributeRecords(ConsumerRecords<String, String> records) { ????????if?(records.isEmpty()) ????????????return; ????????Set<TopicPartition> pausedPartitions =?new?HashSet<>(); ????????records.partitions().forEach(tp -> { ????????????List<ConsumerRecord<String, String>> partitionedRecords = records.records(tp); ????????????pausedPartitions.add(tp); ????????????final?ConsumerWorker<String, String> worker =?new?ConsumerWorker<>(partitionedRecords); ????????????CompletableFuture.supplyAsync(worker::run, executor); ????????????outstandingWorkers.put(tp, worker); ????????}); ????????consumer.pause(pausedPartitions); ????} } |
?該類代碼需要說明的地方包括:
- executor:我創(chuàng)建了一個包含10倍CPU核數(shù)的線程數(shù)。具體線程數(shù)根據(jù)你自己的業(yè)務(wù)需求而定。如果你的事件處理邏輯是I/O密集型操作(比如寫入外部系統(tǒng)),那么設(shè)置一個大一點的線程數(shù)通常都是有意義的。當然,我個人覺得最好不要超過Consumer分配到的總分區(qū)數(shù)。
- 一定要將自動提交位移的參數(shù)設(shè)置為false。多線程Consumer的一個關(guān)鍵設(shè)計就是要手動提交位移。
- Rebalance監(jiān)聽器設(shè)置為MultiThreadedRebalanceListener。這個類如何響應(yīng)分區(qū)的回收與分配我們稍后討論。
- run方法的邏輯基本上遵循了上面提到的流程:消息獲取 -> 分發(fā) -> 檢查消費進度 -> 提交位移
- expectedCount:這是為了后面進行性能測試比對用到的總消息消費數(shù)。
回到頂部
MultiThreadedRebalanceListener.java
多線程Consumer在Rebalance操作開啟后要小心處理。首先,主線程的poll方法與Worker線程處理消息是并行執(zhí)行的。此時如果發(fā)生Rebalance,那么有些分區(qū)就會被分配給其他Consumer,但Worker線程依然可能正在處理這些分區(qū)。因此,就可能出現(xiàn)這樣的場景:兩個Consumer都會處理這些分區(qū)中的消息。這就破壞了消費者組的設(shè)計理念。針對這種情況,我們必須要確保要被回收的那些分區(qū)的處理必須首先完成,之后才能被重新分配。
總體而言,在要回收分區(qū)前,多線程Consumer必須完成:
當然,一旦分區(qū)被重新分配后,事情就變得簡單了,我們調(diào)用resume恢復(fù)這些分區(qū)的可消費狀態(tài)即可。如果這些分區(qū)之前就是可以消費的,那么調(diào)用resume方法就沒有任何效果,總之是一個“無害”操作。MultiThreadedRebalanceListener類完整代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | package?huxihx.mtc; ? import?org.apache.kafka.clients.consumer.Consumer; import?org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import?org.apache.kafka.clients.consumer.OffsetAndMetadata; import?org.apache.kafka.common.TopicPartition; ? import?java.util.Collection; import?java.util.HashMap; import?java.util.Map; import?java.util.concurrent.TimeUnit; ? public?class?MultiThreadedRebalanceListener?implements?ConsumerRebalanceListener { ? ????private?final?Consumer<String, String> consumer; ????private?final?Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers; ????private?final?Map<TopicPartition, OffsetAndMetadata> offsets; ? ????public?MultiThreadedRebalanceListener(Consumer<String, String> consumer, ??????????????????????????????????????????Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers, ??????????????????????????????????????????Map<TopicPartition, OffsetAndMetadata> offsets) { ????????this.consumer = consumer; ????????this.outstandingWorkers = outstandingWorkers; ????????this.offsets = offsets; ????} ? ????@Override ????public?void?onPartitionsRevoked(Collection<TopicPartition> partitions) { ????????Map<TopicPartition, ConsumerWorker<String, String>> stoppedWorkers =?new?HashMap<>(); ????????for?(TopicPartition tp : partitions) { ????????????ConsumerWorker<String, String> worker = outstandingWorkers.remove(tp); ????????????if?(worker !=?null) { ????????????????worker.close(); ????????????????stoppedWorkers.put(tp, worker); ????????????} ????????} ? ????????stoppedWorkers.forEach((tp, worker) -> { ????????????long?offset = worker.waitForCompletion(1, TimeUnit.SECONDS); ????????????if?(offset > 0L) { ????????????????offsets.put(tp,?new?OffsetAndMetadata(offset)); ????????????} ????????}); ? ????????Map<TopicPartition, OffsetAndMetadata> revokedOffsets =?new?HashMap<>(); ????????partitions.forEach(tp -> { ????????????OffsetAndMetadata offset = offsets.remove(tp); ????????????if?(offset !=?null) { ????????????????revokedOffsets.put(tp, offset); ????????????} ????????}); ? ????????try?{ ????????????consumer.commitSync(revokedOffsets); ????????}?catch?(Exception e) { ????????????e.printStackTrace(); ????????} ????} ? ????@Override ????public?void?onPartitionsAssigned(Collection<TopicPartition> partitions) { ????????consumer.resume(partitions); ????} } |
該類代碼需要說明的地方包括:
- 任何Rebalance監(jiān)聽器都要實現(xiàn)ConsumerRebalanceListener接口。
- 該類定義了3個字段,分別保存Consumer實例、要停掉的Worker線程實例以及要提交的位移數(shù)據(jù)。
- 主要的邏輯在onPartitionsRevoked方法中實現(xiàn)。第一步是停掉Worker線程;第二步是手動提交位移。
回到頂部
Test.java
說完了以上4個Java類之后,現(xiàn)在我們編寫一個測試類來比較單線程Consumer和多線程Consumer的性能對比。首先我們創(chuàng)建一個topic,50個分區(qū),單副本,并使用kafka-producer-perf-test工具創(chuàng)建5萬條消息,每個分區(qū)1000條。之后編寫如下代碼分別測試兩個Consumer的消費耗時:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | package?huxihx.mtc; ? public?class?Test { ????public?static?void?main(String[] args)?throws?InterruptedException { ????????int?expectedCount =?50?*?900; ????????String brokerId =?"localhost:9092"; ????????String groupId =?"test-group"; ????????String topic =?"test"; ? ????????OrdinaryConsumer consumer =?new?OrdinaryConsumer(brokerId, topic, groupId +?"-single", expectedCount); ????????long?start = System.currentTimeMillis(); ????????consumer.run(); ????????System.out.println("Single-threaded consumer costs "?+ (System.currentTimeMillis() - start)); ? ????????Thread.sleep(1L); ? ????????MultiThreadedConsumer multiThreadedConsumer = ????????????????new?MultiThreadedConsumer(brokerId, topic, groupId +?"-multi", expectedCount); ????????start = System.currentTimeMillis(); ????????multiThreadedConsumer.run(); ????????System.out.println("Multi-threaded consumer costs "?+ (System.currentTimeMillis() - start)); ????} } |
最后結(jié)果顯示。單線程Consumer消費45000條消息共耗時232秒,而多線程Consumer耗時6.2秒,如下:
Single-threaded consumer costs 232336
Multi-threaded consumer costs 6246
顯然,采用多線程Consumer的消費性能大約是單線程Consumer的37倍。當然實際的提升效果依具體環(huán)境而定。不過結(jié)論是肯定的,多線程Consumer在CPU核數(shù)很多且消息處理邏輯為I/O密集型操作的情形下會比單線程Consumer表現(xiàn)更好。
總結(jié)
以上是生活随笔為你收集整理的Kafka Consumer多线程消费的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 正确处理kafka多线程消费的姿势
- 下一篇: 基于 qiankun 的微前端应用实践