Kafka Java consumer动态修改topic订阅
前段時間在Kafka QQ群中有人問及此事——關于Java consumer如何動態修改topic訂閱的問題。仔細一想才發現這的確是個好問題,因為如果簡單地在另一個線程中直接持有consumer實例然后調用subscribe進行修改,consumer端必然會拋出異常ConcurrentModificationException:KafkaConsumer is not safe for multi-threaded access
和KafkaProducer不同的是,KafkaConsumer不是線程安全的,所以我們不能直接在沒有同步保護的機制下直接啟用另一個線程調用consumer的任何方法(除了wakeup)。因此,實現這個需求有兩種途徑:
- 使用重量級的synchorinzed機制來實現線程安全
- 借助Java類庫已有的線程安全數據結構來實現
如果是第一種方式,則無論哪個線程訪問consumer都必須要配備必要的同步保護機制,代價相當大且極易出錯。本文選取第二種方式,我們可以借助Java提供的ConcurrentLinkedQueue來幫助我們實現。具體的步驟為:
完整樣例代碼如下:
public class ConsumerTest {public static void main(String[] args) {final ConcurrentLinkedQueue<String> subscribedTopics = new ConcurrentLinkedQueue<>();// 創建另一個測試線程,啟動后首先暫停10秒然后變更topic訂閱Runnable runnable = new Runnable() {@Overridepublic void run() {try {Thread.sleep(10000);} catch (InterruptedException e) {// swallow it.}// 變更為訂閱topic: btopic, ctopicsubscribedTopics.addAll(Arrays.asList("btopic", "ctopic"));}};new Thread(runnable).start();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "my-group1");props.put("auto.offset.reset", "earliest");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 最開始的訂閱列表:atopic、btopicconsumer.subscribe(Arrays.asList("atopic", "btopic"));while (true) {consumer.poll(2000); //表示每2秒consumer就有機會去輪詢一下訂閱狀態是否需要變更// 本例不關注消息消費,因此每次只是打印訂閱結果!System.out.println(consumer.subscription());if (!subscribedTopics.isEmpty()) {Iterator<String> iter = subscribedTopics.iterator();List<String> topics = new ArrayList<>();while (iter.hasNext()) {topics.add(iter.next());}subscribedTopics.clear();consumer.subscribe(topics); // 重新訂閱topic}}// 本例只是測試之用,使用了while(true),所以這里沒有顯式關閉consumer // consumer.close();} }
輸出如下:?
[atopic, btopic]
[atopic, btopic]
[atopic, btopic]
[ctopic, btopic]
[ctopic, btopic]
由此可見,本consumer在沒有關閉的情況下動態進行了topic的訂閱變更。另外需要說一下,動態變更時最好不要直接調用subscribe(topics),而是要顯式地定義ConsumerRebalanceListener以避免位移提交的混亂。
轉載于:https://www.cnblogs.com/huxi2b/p/7040617.html
總結
以上是生活随笔為你收集整理的Kafka Java consumer动态修改topic订阅的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 04_数据库升级onUpgradeond
- 下一篇: 简陋版:基于python的自动化测试框架