使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...
原文:http://blog.csdn.net/changong28/article/details/39325079
使用Kafka的同學都知道,我們每次創建Kafka主題(Topic)的時候可以指定分區數和副本數等信息,如果將這些屬性配置到server.properties文件中,以后調用Java API生成的主題將使用默認值,先改變需要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000顯示的修改,我們也希望將此過程在Producer調用之前通過API的方式進行設定,無需在之前或之后使用腳本進行操作,所以才了這篇文章。查看源碼發現,其實內部所有的實現都是通過TopicCommand的main方法,在此記錄兩種方式:
1、創建主題(Topic)
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
【JAVA API方式】:
2、查看所有主題
?
【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181
【JAVA API方式】:
3、查看指定主題:
?
【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
【JAVA API方式】:?
4、修改主題:
?
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
【JAVA API方式】:
5、刪除出題:
? ?【命令方式】:無
? ?【JAVA API方式】:
另:下文kafka刪除topic的方法(出自 “菜光光的博客” 博客,出處http://caiguangguang.blog.51cto.com/1652935/1548069)
0.8的官方文檔提供了一個刪除topic的命令:
kafka-topics.sh --delete 但是在運行時會報錯找不到這個方法。
kafka-topics.sh最終是運行了kafka.admin.TopicCommand這個類,在0.8的源碼中這個類中沒有找到有delete topic相關的代碼。
在kafka的admin包下,提供了一個DeleteTopicCommand的類,可以實現刪除topic的功能。?
kafka.admin.DeleteTopicCommand?
其中刪除topic的具體實現代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | import?org.I0Itec.zkclient.ZkClient import?kafka.utils.{Utils,?ZKStringSerializer,?ZkUtils} ....... ????val?topic?=?options.valueOf(topicOpt) ????val?zkConnect?=?options.valueOf(zkConnectOpt) ????var?zkClient:?ZkClient?=?null ????try?{ ??????zkClient?=?new?ZkClient(zkConnect,?30000,?30000,?ZKStringSerializer) ??????zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))??//其實最終還是通過刪除zk里面對應的路徑來實現刪除topic的功能 ??????println("deletion?succeeded!") ????} ????catch?{ ??????case?e:?Throwable?=> ????????println("delection?failed?because?of?"?+?e.getMessage) ????????println(Utils.stackTrace(e)) ????} ????finally?{ ??????if?(zkClient?!=?null) ????????zkClient.close() ????} |
因為這個命令只會刪除zk里面的信息,真實的數據還是沒有刪除,所以需要登錄各個broker,把對應的topic的分區數據目錄刪除,也可能正因為這一點,delete命令才沒有集成到kafka.admin.TopicCommand這個類。
?
轉載于:https://www.cnblogs.com/davidwang456/p/4313784.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: How to Analyze Java
- 下一篇: springMVC项目在jboss7中配