kafka动态配置topic
?之前使用@org.springframework.kafka.annotation.KafkaListener這個注解的時候,是在yml文件中配置,然后使用@KafkaListener(topics = {"${kafka.topic.a2b.name}"}),這樣去單獨監聽某一個topic,生產者也固定在代碼里定義變量讀取配置文件。昨天改了個需求,希望以后通過配置文件去動態配置生產者和消費者的topic(不知道個數和topic名字),而不需要改代碼。
一、踩坑
?剛開始的時候,由于考慮不充分(沒有考慮到topic個數未知),想到@KafkaListener注解中的topics本身就是個字符串數組,于是想通過傳入變量的形式。產生了以下兩種方法:
1.傳入變量方法一
? 使用@Value注解提取配置文件中相關配置,@KafkaListener中傳入變量
public static String[] topicArr;@Value("${kafka.bootstrap.servers}")public void setTopicArr(String[] value){String topicArr = value;}@KafkaListener(topics= topicArr)emmmm。。。結果可想而知,不行。
2.傳入變量方法二
?還是傳入變量,不過這次寫了個動態配置的代碼
注解里這么寫@KafkaListener(topics = "${topicName1}","${topicName2}","${topicName3}")提前將yml文件里添加topics: topicName1,topicName2,topicName3然后加載進來@Value("${kafka.topics}")public void setTopics(String value){topics = value;}動態配置代碼:@Configurationpublic class KafkaTopicConfiguration implements InitializingBean {@Autowiredprivate KafkaConfig kafkaconfig;@Overridepublic void afterPropertiesSet() throws Exception {String[] topicArr = kafkaconfig.split(",");int i = 1;for(String topic : topicArr){String topicName = "topicName"+i;System.setProperty(topicName, topic);}}}相比方法一,可行。但是未知topic數量呢。GG。
3.不用注解
?百度找到幾個老哥的動態獲取并創建topic的方法
https://www.cnblogs.com/gaoyawei/p/7723974.html https://www.cnblogs.com/huxi2b/p/7040617.html https://blog.csdn.net/qq_27232757/article/details/78970830寫了幾版,各種各樣的問題,還是我太菜。就想再看看有沒有別的簡單點的解決辦法,沒有了再回來搞這個。
4.正則匹配topic
?這期間又找到一個使用正則匹配topic的。直接貼鏈接。
@KafkaListener(topicPattern = "showcase.*") 這里使用正則匹配topic,其中【*】之前得加上【.】才能匹配到。中間模仿寫了一版使用正則匹配的,其實也可以糊弄實現需求,除了topic取名的時候一定得規范以外,還得考慮到如果不想用某個topic了又得想怎么去避免他。
 這種方法不太嚴謹,繼續踩坑吧。
二、問題解決
?用蹩腳的英語google了一下,嗯?好多老哥們也是用的以上差不多的方法。然而最后在某個老哥github的issues中看到了解決辦法。老哥的需求跟我差不多,感謝大佬,貼上最終問題解決方案。
1.kafka消費者監聽配置
還是注解的形式 @KafkaListener(topics = "#{'${kafka.listener_topics}'.split(',')}")讀取yml文件中kafka.listener_topics的參數,然后根據“,”去split,得到一個topics數組。
 這么做就可以根據配置文件動態的去監聽topic。
2.yml配置文件
只列出topic相關部分(mqTypes是我用來判斷使用哪個topic發送的)kafka:listener_topics: kafka-topic-a2b,kafka-topic-c2bconsume:topic:- name: kafka-topic-a2bpartitions: 12replication_factor: 2- name: kafka-topic-c2bpartitions: 12replication_factor: 2product:topic:- name: kafka-topic-b2apartitions: 12replication_factor: 2mqTypes: type1- name: kafka-topic-b2cpartitions: 12replication_factor: 2mqTypes: type13.yml參數解析
這里我將kafka的topic相關加載到bean中處理。
 創建KafkaConsumerBean和KafkaProducerBean分別用來存儲yml中生產者和消費者的topic相關參數
4.創建topic
List<Map<String,String>> producerTopicList = kafkaProducerBean.getTopic();for (Map<String,String> topicProperty : producerTopicList){KafkaClient.createTopic(topicProperty.get("name"),Integer.parseInt(topicProperty.get("partitions")),Integer.parseInt(topicProperty.get("replication_factor")));}List<Map<String,String>> consumerTopicList = kafkaConsumerBean.getTopic();for (Map<String,String> topicProperty : consumerTopicList){KafkaClient.createTopic(topicProperty.get("name"),Integer.parseInt(topicProperty.get("partitions")),Integer.parseInt(topicProperty.get("replication_factor")));}三、總結
?上面解決問題的方法關鍵在于
@KafkaListener(topics = "#{'${kafka.listener_topics}'.split(',')}")@KafkaListener這個注解會去讀取spring的yml配置文件中
kafka:listener_topics: kafka-topic-a2b,kafka-topic-c2b這塊listener_topics配置信息,然后通過’,'分割成topic數組,KafkaListener注解中的 topics 參數,本身就是個數組,如下
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) //package org.springframework.kafka.annotation;import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Repeatable; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.messaging.handler.annotation.MessageMapping;@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @MessageMapping @Documented @Repeatable(KafkaListeners.class) public @interface KafkaListener {String id() default "";String containerFactory() default "";String[] topics() default {};String topicPattern() default "";TopicPartition[] topicPartitions() default {};String group() default ""; }?結合我之前的kafka文章,應該是可以拼出一套成型的。
總結
以上是生活随笔為你收集整理的kafka动态配置topic的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 简单易懂的英特尔E系列超频电压设…
- 下一篇: 微信禁止打卡,裂变营销时代即将终结?
