Apache Kafka API AdminClient Scram账户的创建与删除
前言
由于Apache官方一直沒有提供AdminClient中對賬戶這一塊的操作,因此這部分大多數時候都是用命令行去操作的,但是命令行畢竟不是很方便。為了解決這部分問題,筆者去讀了Kafka Scala的源碼,從中梳理出來這部分內容供給大家參考。重要:如果你的版本升級到2.7.0及其以上,請參考【Apache Kafka API AdminClient Scram賬戶的操作(增刪改查)】。更多內容請點擊【Apache Kafka API AdminClient 目錄】。
Scala版本
為了操作Scala源碼,必須有相應版本的包,怎么看你的Scala版本呢?這個就是在Kafka核心包的<artifactId>鍵值對里面,如下kafka_2.13后面對應的2.13就是Scala的版本,這個2.13版本同樣也是Kafka官方推薦使用的版本,因此我們也就以這個版本為例子去操作賬戶的創建與刪除。要提醒的是如果你使用的版本是Scala 2.12大概率會報錯。
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId> <!--此處對應的就是Scala版本--><version>2.7.0</version> </dependency>獲取JAAS認證文件
在開始之前首先得知道什么是Scram賬戶認證,如果不清楚建議參考【Kafka 如何給集群配置Scram賬戶認證】,這篇帖子里對Scram認證以及配置有很詳細的介紹,這里就不多說廢話了,我們已經有了一個kafka-broker-jaas.conf文件用來登陸Zookeeper。
Client{org.apache.zookeeper.server.auth.DigestLoginModule requiredusername="kafka"password="kafka1234"; };為什么要登陸Zookeeper呢?因為Kafka把認證機制做到了Zookeeper里,而要操作這些需要一個Zookeeper的登陸認證。在這里Zookeeper相當于一個分布式注冊中心,隨者Kafka的不斷升級,Kafka官方也在不斷地減少對Zookeeper的依賴。截止到2.7.0版本,當使用命令行創建賬戶的時候就會收到提示說未來版本可能會下線--zookeeper參數,轉而使用參數--broker.server,但是目前還是兼容的。而且Kafka API的2.7.0版本,似乎也引入了操作Scram的API,但是之前的版本還是需要操作Zookeeper,因此我們還是需要這樣進行賬號的操作。希望2.7.0版本的Scram API會比較好用吧,等筆者弄明白了再分享出來。言歸正傳,我們可以通過下面幾行代碼先行把登陸Zookeeper的認證文件kafka-broker-jaas.conf加載到系統中來。
static {//獲取文件路徑,這里筆者使用的是項目路徑,也可以用絕對路徑,目的是訪問到文件,什么方法自由選擇String path = KafkaCreateUser.class.getClass().getResource("/").getPath();//拿到文件對象File f = new File(path+"kafka-broker-jaas.conf");//存儲到系統參數對象中,以備后續使用System.setProperty("java.security.auth.login.config", f.getAbsolutePath()); }使用Scala方法創建賬戶
獲取了文件對象以后,就可以調用Scala中的方法了,我們主要調用的方法是AdminZkClient.changeConfigs(entityType: String, entityName: String, configs: Properties)方法,這個語法是Scala中的語法,有點類似Java,我們直接用就可以了,Sample如下。
public void createAccount() throws NoSuchAlgorithmException {//獲取ZookeeperClient對象,這里的/kafka是筆者建了一個zk上的目錄,如果直接用192.168.33.101:2181ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通過ZookeeperClient對象和JDK自帶的JaasUtils加載Sasl認證規則KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//獲取Kafka Scala AdminZkClient對象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//構造PropertiesProperties properties=new Properties();//構造Scram認證機制ScramMechanism為SCRAM_SHA_512ScramMechanism scramMechanism=ScramMechanism.valueOf("SCRAM_SHA_512");//構造Scram證書credential,這里"password_1234"就是真實的密碼ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential("password_1234", scramMechanism.minIterations());//轉化為認證串String credentialString=ScramCredentialUtils.credentialToString(credential);//添加到properties中備用properties.put(scramMechanism.mechanismName(),credentialString);//創建名為kaf_aaa的賬戶,并且把properties傳遞進去,那么kaf_aaa賬戶的密碼就是上面設置的password_1234字符串adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",properties); }使用Scala方法查詢存在的賬戶
和創建一樣也需要使用Scala中的方法,這次使用的是依然是AdminZkClient類中的方法,一個是fetchEntityConfig(rootEntityType: String, sanitizedEntityName: String)去查找指定的賬戶信息,其次是用fetchAllEntityConfigs(entityType: String)查找Kafka服務器中所有的賬戶信息,Sample如下。
public void findAccount() {//獲取ZookeeperClient對象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通過ZookeeperClient對象和JDK自帶的JaasUtils加載Sasl認證規則KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//獲取Kafka Scala AdminZkClient對象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//為了查看結果,構造一個Properties對象用來承接返回值Properties properties=new Properties();//指定賬號查詢properties=adminZkClient.fetchEntityConfig(ConfigType.User(),"kaf_aaa");//構建一個接收參數Map<String, Properties> propertiesAll=new HashMap<>();//查詢所有信息propertiesAll =adminZkClient.fetchAllEntityConfigs(ConfigType.User()); }使用Scala方法刪除賬戶
說完創建和查找,那就剩刪除了。刪除對象其實分為兩步,第一步清空Kafka集群上保存的信息,第二部刪除Zookeeper上對應的節點。清空信息用的還是changeConfigs()方法,刪除節點用的則是Zookeeper包里的delete(final String path, int version)方法,Sample如下。
public void deleteAccount() throws InterruptedException, KeeperException {//獲取ZookeeperClient對象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通過ZookeeperClient對象和JDK自帶的JaasUtils加載Sasl認證規則KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//獲取Kafka Scala AdminZkClient對象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",new Properties()); //獲取Zookeeper對象ZooKeeper zooKeeper=kafkaZkClient.currentZooKeeper();//調用delete方法把對應的節點刪除,版本設置為-1是不檢測版本號//注意"/config/users/"這個路徑是固定的,Zk里面就是這樣存的,"kaf_aaa"是自己拼的賬號名字zooKeeper.delete("/config/users/" +"kaf_aaa", -1); }總結
到此Kafka Scram賬戶相關的操作告一段落,如果想要刪除賬戶同時清除賬號下的權限,可以參考【Apache Kafka API AdminClient 賬號對Topic權限賦予與移除】,自己做一個循環刪除即可。
附:完整的Sample和注釋
public class KafkaUserOperation {//加載zookeeper sasl機制授權登陸的配置文件static {String path = KafkaUserOperation.class.getClass().getResource("/").getPath();File f = new File(path+"zk-client-jaas.conf");System.setProperty("java.security.auth.login.config", f.getAbsolutePath());}public void createAccount() throws NoSuchAlgorithmException {//獲取ZookeeperClient對象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通過ZookeeperClient對象和JDK自帶的JaasUtils加載Sasl認證規則KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//獲取Kafka Scala AdminZkClient對象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//構造PropertiesProperties properties=new Properties();//構造Scram認證機制ScramMechanism為SCRAM_SHA_512ScramMechanism scramMechanism=ScramMechanism.valueOf("SCRAM_SHA_512");//構造Scram證書credential,這里"password_1234"就是真實的密碼ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential("password_1234", scramMechanism.minIterations());//轉化為認證串String credentialString=ScramCredentialUtils.credentialToString(credential);//添加到properties中備用properties.put(scramMechanism.mechanismName(),credentialString);//創建名為kaf_aaa的賬戶,并且把properties傳遞進去,那么kaf_aaa賬戶的密碼就是上面設置的password_1234字符串adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",properties);}public void findAccount() {//獲取ZookeeperClient對象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通過ZookeeperClient對象和JDK自帶的JaasUtils加載Sasl認證規則KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//獲取Kafka Scala AdminZkClient對象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//為了查看是否成功,構造一個Properties對象用來承接返回值Properties properties=new Properties();//指定賬號查詢properties=adminZkClient.fetchEntityConfig(ConfigType.User(),"kaf_aaa");Map<String, Properties> proAll=new HashMap<>();//查詢所有信息proAll=adminZkClient.fetchAllEntityConfigs(ConfigType.User());}public void deleteAccount() throws InterruptedException, KeeperException {//獲取ZookeeperClient對象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通過ZookeeperClient對象和JDK自帶的JaasUtils加載Sasl認證規則KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//獲取Kafka Scala AdminZkClient對象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",new Properties()); //獲取Zookeeper對象ZooKeeper zooKeeper=kafkaZkClient.currentZooKeeper();//調用delete方法把對應的節點刪除,版本設置為-1是不檢測版本號zooKeeper.delete("/config/users/"+"kaf_aaa", -1);} }總結
以上是生活随笔為你收集整理的Apache Kafka API AdminClient Scram账户的创建与删除的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 计算机连接游戏手柄,无线游戏手柄怎么连接
- 下一篇: 【游戏开发实战】下载原神模型,PMX转F