Apache Kafka API AdminClient Scram账户的操作(增删改查)
前言
很久沒有更新Kafka API相關的文檔了,因為筆者工作變動Kafka這部分內容在工作中接觸的就相對于之前少了一些。但架不住kafka官方還是一如既往的勤奮,官方操作Scram賬戶的創建與刪除這部分已經更新了好久了,這次也算是填坑吧,主要就是針對alterUserScramCredentials方法做一個功能說明和demo。給網上少之又少的Kafka API中文使用教程做個增補,本次基于Kafka API 2.8.0,同時適用于2.7.0版本,但是該版本API并不是很完善,創建的賬戶可能會有無法讀寫Topic的問題。如果你使用的Kafka還沒有升級到這么高的版本,請參考【Apache Kafka API AdminClient Scram賬戶的創建與刪除】這篇博客,其是針對2.7.0及其以下的Scram賬戶操作。更多內容請點擊【Apache Kafka API AdminClient 目錄】。
操作Scram賬戶的方法
首先我們先看官方文檔中對于操作Scram賬戶是怎么說的,查詢官方的文檔只提供了一個方法:
| default AlterUserScramCredentialsResult | alterUserScramCredentials(List< UserScramCredentialAlteration> alterations) | Alter SASL/SCRAM credentials for the given users. | 
| AlterUserScramCredentialsResult | alterUserScramCredentials(List< UserScramCredentialAlteration> alterations, AlterUserScramCredentialsOptions options) | Alter SASL/SCRAM credentials. | 
根據官方給的文檔描述,上表中第二個方法是對第一個方法的擴展,所以這里不做討論。我們主要集中于alterUserScramCredentials()方法的使用上。根據描述,該方法只接受一個泛型為UserScramCredentialAlteration 的List參數,接著看下UserScramCredentialAlteration是什么樣子。
Field Summary
| protected | String user | 
Constructor Summary
| protected | UserScramCredentialAlteration(String user) | 
Method Summary
| String | user() | 
根據官方文檔描述里面只有一個String類型的user字段,顯然不可能只根據這一個字段構造一個能夠使用的賬戶。其實UserScramCredentialAlteration是一個抽象的類,它還有兩個子類UserScramCredentialDeletion和UserScramCredentialUpsertion。從名字就可以看出來,創建大概是UserScramCredentialUpsertion,刪除則應該是UserScramCredentialDeletion。所以我們傳入的List中的泛型,也就是這兩個類的具體實現了。
創建一個Scram賬戶
UserScramCredentialUpsertion
官方文檔對UserScramCredentialUpsertion類的解釋是:
A request to update/insert a SASL/SCRAM credential for a user.
看來這個類不僅僅能做創建,而且也能做更新,大方向是沒問題了。首先我們還是看下創建UserScramCredentialUpsertion這個類要怎么構造:
| UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, byte[] password) | Constructor that generates a random salt | 
| UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, byte[] password, byte[] salt) | Constructor that accepts an explicit salt | 
| UserScramCredentialUpsertion(String user, ScramCredentialInfo credentialInfo, String password) | Constructor that generates a random salt | 
從上述表格中可以看到這個類有三個構造方法,構建這個類除了需要父類提供的user名字以外,還需要另外兩個參數ScramCredentialInfo和password。可以看到其實最后一個構造方法password是String,這個可以說是最方便的了。第二個中為什么還要有一個salt參數,筆者深入代碼只發現了一句話new Field("salt", Type.COMPACT_BYTES, "A random salt generated by the client.")。可能是給用戶自定義什么功能用的,但是kafka自己也會生成這個東西,由于官方文檔和源碼都沒有找到更多的解釋,我們暫且忽略,如果哪位筆者知道這點,也行評論區不吝賜教。
ScramCredentialInfo
那么我們就轉向ScramCredentialInfo這個類的構造:
| ScramCredentialInfo(ScramMechanism mechanism, int iterations) | – | 
可以看到這個類的構建就比較簡單明了,兩個參數一個是ScramMechanism也就是Scram認證機制,這是一個enum類型內置SCRAM_SHA_256、SCRAM_SHA_512、UNKNOWN三個類型,UNKNOWN類型會在創建時報異常。另外一個iterations是循環次數the number of iterations used when creating the credential,代表你要創建的賬戶個數,也就是List.size()吧。
示例代碼
public void createAccount(String name, String pwd, String salt) throws ExecutionException, InterruptedException {//創建User列表List<UserScramCredentialAlteration> alterations = new ArrayList<>();//構造Scram認證機制信息,這里筆者選擇了SCRAM_SHA_512,大家也可以選擇 ScramMechanism.SCRAM_SHA_256//alterations.size()此時為0,或許會報錯,可以試下傳入數字構造,比如下面添加了一個認證信息,那么這里傳入數字1。// ScramCredentialInfo info=new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, alterations.size()); //這里時間久遠,忘記當時寫例子的場景了ScramCredentialInfo info=new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 10000);//三個UserScramCredentialAlteration構造方法,三選一筆者選了一個最簡單的//UserScramCredentialAlteration userScramCredentialAdd=new UserScramCredentialUpsertion(name,info,pwd.getBytes());//UserScramCredentialAlteration userScramCredentialAdd=new UserScramCredentialUpsertion(name,info,pwd.getBytes(),salt.getBytes());UserScramCredentialAlteration userScramCredentialAdd=new UserScramCredentialUpsertion(name,info,pwd);//添加認證信息到列表alterations.add(userScramCredentialAdd);//執行方法,并拿到返回結果AlterUserScramCredentialsResult result = adminClient.alterUserScramCredentials(alterations);//阻塞等待結果完成result.all().get(); }通過官網給出的解釋,我們知道對賬戶的更新操作也是使用UserScramCredentialUpsertion類進行,因此只要保證user name一致即可對相應的user中的內容進行更新,這點大家自己驗證吧。
刪除一個Scram賬戶
刪除賬戶用的則是子類UserScramCredentialDeletion,一樣我們先看它的構造方法:
| UserScramCredentialDeletion(String user, ScramMechanism mechanism) | 
果然刪除就不會像增加或者修改一樣墨跡,只有一個構造方法可用,參數上面也都說過了,直接上示例代碼:
public void deleteAccount(String name) throws ExecutionException, InterruptedException {//創建刪除列表List<UserScramCredentialAlteration> alterations = new ArrayList<>();//構建刪除用的UserScramCredentialAlterationUserScramCredentialAlteration userScramCredentialDel=new UserScramCredentialDeletion(name,ScramMechanism.SCRAM_SHA_512);//添加認證信息到列表alterations.add(userScramCredentialDel);//執行方法,并拿到返回結果AlterUserScramCredentialsResult result = adminClient.alterUserScramCredentials(alterations);//阻塞等待結果完成result.all().get(); }查詢Scram賬戶信息
說完對Scram賬戶的增刪改以后,剩下的自然就剩下查詢了,官方也提供了一個方法用于查詢:
| default DescribeUserScramCredentialsResult | describeUserScramCredentials() | Describe all SASL/SCRAM credentials. | 
| default DescribeUserScramCredentialsResult | describeUserScramCredentials(List< String> users) | Describe SASL/SCRAM credentials for the given users. | 
| DescribeUserScramCredentialsResult | describeUserScramCredentials(List< String> users, DescribeUserScramCredentialsOptions options) | Describe SASL/SCRAM credentials. | 
可以看到大體上也是只有一個方法describeUserScramCredentials(),只不過對這個方法做了一個重載,用于指定查詢某些用戶的信息,參數很簡單不用多說,直接上示例代碼:
public void describeAccount() throws ExecutionException, InterruptedException {//***************************************查詢所有用戶信息*****************************************************//查詢所有的賬戶,這也是默認方法DescribeUserScramCredentialsResult result = adminClient.describeUserScramCredentials();//執行方法,并拿到返回結果Map<String, UserScramCredentialsDescription> future = result.all().get();//輸出future.forEach((name,info)-> System.out.println("[ScramUserName:"+name+"]:[ScramUserInfo:"+info.toString()+"]"));//***************************************這里是分割線*****************************************************//***************************************查詢指定的用戶信息*****************************************************//構造指定的用戶列表List<String> userScramList=new ArrayList<>();//添加兩個用戶userScramList.add("user1");userScramList.add("user2");//傳入特定用戶列表執行方法,并拿到返回結果DescribeUserScramCredentialsResult targetResult = adminClient.describeUserScramCredentials(userScramList);//執行方法,并拿到返回結果Map<String, UserScramCredentialsDescription> targetFuture = targetResult.all().get();//輸出targetFuture.forEach((name,info)-> System.out.println("[ScramUserName:"+name+"]:[ScramUserInfo:"+info.toString()+"]")); }結語
這一篇終于把之前的坑填上了,自從筆者不再需要對Kafka平臺進行維護以后,Kafka的版本也發生了好多巨大的更新,一個最明顯的變化就是Kafka的去zookeeper操作。筆者之前操縱2.7.0的時候,Kafka官方還是警告2.8版本最好不要用于生產環境,而今天官方的態度已經有了一個模糊的轉變,可見Kafka已經大體上完成了對Zookeeper的解耦工作。由于筆者工作的變動不再管理Kafka相關的平臺內容,所以這部分內容就會更新的比較慢,畢竟懶癌晚期的筆者能補上這個坑已經算是祖墳冒煙了。不過好在筆者已經夠把大多數關鍵的Kafka管理操作的API使用方法更新完畢,希望這些內容對網上寥寥無幾的Kafka文檔有所補充,也希望這些博客能夠幫助到那些正在管理Kafka平臺的小伙伴們,希望以后還有機會對更新的版本進行更深一步的探究吧。
總結
以上是生活随笔為你收集整理的Apache Kafka API AdminClient Scram账户的操作(增删改查)的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 思岚科技机器人底盘价格揭秘
 - 下一篇: 山东大学网络考试的计算机试题及答案,专科