Zookeeper 客户端API调用示例(基本使用,增删改查znode数据,监听znode,其它案例,其它网络参考资料)
9.1 基本使用
org.apache.zookeeper.Zookeeper是客戶端入口主類,負責建立與server的會話
它提供以下幾類主要方法? :
| 功能 | 描述 |
| create | 在本地目錄樹中創建一個節點 |
| delete | 刪除一個節點 |
| exists | 測試本地是否存在目標節點 |
| get/set data | 從目標節點上讀取?/?寫數據 |
| get/set ACL | 獲取?/?設置目標節點訪問控制列表信息 |
| get children | 檢索一個子節點上的列表 |
| sync | 等待要被傳送的數據 |
?
?
?
?
?
?
?
?
添加pom的maven依賴:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.test</groupId><artifactId>zookeeper-demo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.1</version></dependency></dependencies></project>表 1 : ZooKeeper API 描述
9.2 增刪改查znode數據
| package cn.com.toto.zk; ? import java.io.IOException; ? import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; ? public class SimpleDemo { ??? //回話超時時間,設置為與系統默認時間一致 ??? private static final int SESSION_TIMEOUT = 30000; ??? //創建ZooKeeper實例 ??? ZooKeeper zk; ??? //創建Watcher實例 ??? Watcher wh = new Watcher() { ??? ?????? @Override ?????? public void process(WatchedEvent event) { ?????????? System.out.println(event.toString()); ?????? } ??? }; ??? ??? //初始化ZooKeeper實例 ??? private void createZKInstance() throws IOException { ?????? zk = new ZooKeeper("hadoop:2181,hadoop2:2181,hadoop3:2181",SimpleDemo.SESSION_TIMEOUT,this.wh); ??? } ??? ??? private void ZKOperations() throws KeeperException, InterruptedException { ?????? System.out.println("/n1. 創建 ZooKeeper 節點 (znode : zoo2, 數據: myData2 ,權限: OPEN_ACL_UNSAFE ,節點類型: Persistent"); ?????? zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ?????? System.out.println("/n2.查看是否創建成功:"); ?????? System.out.println(new String(zk.getData("/zoo2", false, null))); ?????? System.out.println("/n3.修改節點數據"); ?????? zk.setData("/zoo2", "toto".getBytes(), -1); ?????? System.out.println("/n4.查看是否修改成功:"); ?????? System.out.println(new String(zk.getData("/zoo2", false, null))); ?????? System.out.println("/n5.刪除節點"); ?????? zk.delete("/zoo2", -1); ?????? System.out.println("/n6.查看節點是否被刪除:"); ?????? System.out.println("節點狀態:[" + zk.exists("/zoo2", false) + "]"); ??? } ??? ??? private void ZKClose() throws InterruptedException { ?????? zk.close(); ??? } ??? ??? public static void main(String[] args) throws KeeperException, InterruptedException, IOException { ?????? SimpleDemo dm = new SimpleDemo(); ?????? dm.createZKInstance(); ?????? dm.ZKOperations(); ?????? dm.ZKClose(); ??? } } |
| 運行結果: /n1. 創建 ZooKeeper 節點 (znode : zoo2, 數據: myData2 ,權限: OPEN_ACL_UNSAFE ,節點類型: Persistent 一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread primeConnection 信息: Socket connection established to hadoop3/192.168.106.82:2181, initiating session 一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread onConnected 信息: Session establishment complete on server hadoop3/192.168.106.82:2181, sessionid = 0x35983c177d00007, negotiated timeout = 30000 WatchedEvent state:SyncConnected type:None path:null /n2.查看是否創建成功: myData2 /n3.修改節點數據 /n4.查看是否修改成功: toto /n5.刪除節點 /n6.查看節點是否被刪除: 節點狀態:[null] |
?
9.3 監聽znode
Zookeeper的監聽器工作機制
?
監聽器是一個接口,我們的代碼中可以實現Wather這個接口,實現其中的process方法,方法中即我們自己的業務邏輯
?
?
?
?
監聽器的注冊是在獲取數據的操作中實現:
getData(path,watch?)監聽的事件是:節點數據變化事件
getChildren(path,watch?)監聽的事件是:節點下的子節點增減變化事件
?
9.4其它案例
所需jar包:
?
圖1 項目包結構
| package cn.com.toto.zk; ? import java.util.List; import java.util.concurrent.CountDownLatch; ? import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; ? public class SimpleZkClient { ??? ??? private static final String connectString = "192.168.106.80:2181,192.168.106.81:2181,192.168.106.82:2181"; ??? private static final int sessionTimeout = 2000; ??? ??? // latch就相當于一個對象鎖,當latch.await()方法執行時,方法所在的線程會等待 ??? //當latch的count減為0時,將會喚醒等待的線程 ??? CountDownLatch latch = new CountDownLatch(1); ??? ZooKeeper zkClient = null; ??? ??? @Before ??? public void init() throws Exception { ?????? zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { ?????????? ?????????? //事件監聽回調方法 ?????????? @Override ?????????? public void process(WatchedEvent event) { ????????????? if (latch.getCount() > 0 && event.getState() == KeeperState.SyncConnected) { ????????????????? System.out.println("countdown"); ????????????????? latch.countDown(); ????????????? } ????????????? ????????????? //收到事件通知后的回調函數(應該是我們自己的事件處理邏輯) ????????????? System.out.println(event.getType() + "---" + event.getPath()); ????????????? System.out.println(event.getState()); ?????????? } ?????? }); ?????? latch.await(); ??? } ??? ??? //創建數據節點到zk中 ??? @Test ??? public void testCreate() throws KeeperException, InterruptedException { ?????? //參數1:要創建的節點的路徑? 參數2:節點大數據參數3:節點的權限? 參數4:節點的類型 ?????? String nodeCreated = zkClient.create("/eclipse", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ??? ??? //上傳的數據可以是任何類型,但都要轉成byte ?????? zkClient.close(); ??? } ??? ??? //判斷znode是否存在 ??? @Test ??? public void testExist() throws KeeperException, InterruptedException { ?????? Stat stat = zkClient.exists("/eclipse", false); ?????? System.out.println(stat == null ? "not exist" : "exist"); ??? } ??? ??? //獲取znode下的孩子節點 ??? @Test ??? public void getChildren() throws KeeperException, InterruptedException { ?????? ?List<String> children = zkClient.getChildren("/", true); ?????? ?for(String child : children) { ?????????? ?System.out.println(child); ?????? ?} ?????? ?Thread.sleep(Long.MAX_VALUE); ??? } ? ??? //獲取參數 ??? @Test ??? public void getData() throws KeeperException, InterruptedException { ?????? byte[] data = zkClient.getData("/eclipse", true, null); ?????? System.out.println(new String(data)); ?????? Thread.sleep(Long.MAX_VALUE); ??? } ??? ??? //刪除znode ??? @Test ??? public void deleteZnode() throws InterruptedException, KeeperException { ??? ??? //參數2:指定要刪除的版本,-1表示刪除所有版本 ?????? zkClient.delete("/eclipse", -1); ??? } ??? ??? //設置參數 ??? @Test ??? public void setData() throws Exception { ?????? //要注意,這里的/zookeeper 要在zookeeper中的節點中有 ?????? zkClient.setData("/zookeeper", "imissyou angelababy".getBytes(), -1); ? ?????? byte[] data = zkClient.getData("/zookeeper", false, null); ?????? System.out.println(new String(data)); ??? } } |
案例二
| package cn.com.toto.zk; ? import java.util.List; import java.util.concurrent.CountDownLatch; ? import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; ? import com.sun.org.apache.bcel.internal.generic.NEW; ? public class TestZKclient { ??? static ZooKeeper zk = null; ??? ??? public static void main(String[] args) throws Exception { ??? ??? final CountDownLatch countDownLatch = new CountDownLatch(1); ??? ??? zk = new ZooKeeper("hadoop:2181",2000,new Watcher() { ?????????? ?????????? @Override ?????????? public void process(WatchedEvent event) { ????????????? if (event.getState() == KeeperState.SyncConnected) { ????????????????? countDownLatch.countDown(); ????????????? } ????????????? System.out.println(event.getPath()); ????????????? System.out.println(event.getType()); ????????????? try { ????????????????? zk.getChildren("/zookeeper", true); ????????????? } catch (Exception e) { ????????????????? e.printStackTrace(); ????????????? } ?????????? } ?????? }); ??? ??? ??? ??? countDownLatch.await(); ??? ??? ??? ??? /** ??? ??? zk.create("/myboys", "丑陋型".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ??? ??? zk.close(); ??? ??? **/ ??? ??? ??? ??? /** ??? ??? byte[] data = zk.getData("/myboys", true, null); ??? ??? System.out.println(new String(data,"UTF-8")); ??? ??? Thread.sleep(Long.MAX_VALUE); ??? ??? **/ ??? ??? ??? ??? /** ??? ??? zk.create("/myboys/wangkai", "測試型".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ??? ??? zk.close(); ??? ??? **/ ??? ??? ??? ??? /** ??? ??? List<String> children = zk.getChildren("/myboys", true); ??? ??? for(String child : children) { ??? ?????? System.out.println(child); ??? ??? } ??? ??? **/ ??? ??? ??? ??? /**zk.delete("/myboys/wangkai", -1);**/ ??? ??? ??? ??? /**zk.setData("/myboys", "fasdfasdf".getBytes(), -1);**/ ??? ??? /** ??? ??? byte[] data = zk.getData("/myboys", true, null); ??? ??? System.out.println(new String(data,"UTF-8")); ??? ??? **/ ??? ??? ??? ??? Stat stat = zk.exists("/mywives", true); ??? ??? System.out.println(stat == null ? "確實不存在" : "存在"); ??? ??? zk.close(); ??? } } |
?
9.5 其它網絡參考資料
準備工作拷貝ZooKeeper安裝目錄下的zookeeper.x.x.x.jar文件到項目的classpath路徑下. 創建連接和回調接口首先需要創建ZooKeeper對象, 后續的一切操作都是基于該對象進行的. 1.? ZooKeeper(String?connectString,?int?sessionTimeout,?Watcher?watcher)?throws?IOException?? 以下為各個參數的詳細說明: ·???????? connectString. zookeeper server列表, 以逗號隔開. ZooKeeper對象初始化后, 將從server列表中選擇一個server, 并嘗試與其建立連接. 如果連接建立失敗, 則會從列表的剩余項中選擇一個server, 并再次嘗試建立連接. ·???????? sessionTimeout. 指定連接的超時時間. ·???????? watcher. 事件回調接口. 注意, 創建ZooKeeper對象時, 只要對象完成初始化便立刻返回. 建立連接是以異步的形式進行的, 當連接成功建立后, 會回調watcher的process方法. 如果想要同步建立與server的連接, 需要自己進一步封裝. 1.? public?class?ZKConnection?{?? 2.? ????/**? 3.? ?????*?server列表,?以逗號分割? 4.? ?????*/?? 5.? ????protected?String?hosts?=?"localhost:4180,localhost:4181,localhost:4182";?? 6.? ????/**? 7.? ?????*?連接的超時時間,?毫秒? 8.? ?????*/?? 9.? ????private?static?final?int?SESSION_TIMEOUT?=?5000;?? 10. ????private?CountDownLatch?connectedSignal?=?new?CountDownLatch(1);?? 11. ????protected?ZooKeeper?zk;?? 12. ?? 13. ????/**? 14. ?????*?連接zookeeper?server? 15. ?????*/?? 16. ????public?void?connect()?throws?Exception?{?? 17. ????????zk?=?new?ZooKeeper(hosts,?SESSION_TIMEOUT,?new?ConnWatcher());?? 18. ????????//?等待連接完成?? 19. ????????connectedSignal.await();?? 20. ????}?? 21. ?? 22. ????public?class?ConnWatcher?implements?Watcher?{?? 23. ????????public?void?process(WatchedEvent?event)?{?? 24. ????????????//?連接建立,?回調process接口時,?其event.getState()為KeeperState.SyncConnected?? 25. ????????????if?(event.getState()?==?KeeperState.SyncConnected)?{?? 26. ????????????????//?放開閘門,?wait在connect方法上的線程將被喚醒?? 27. ????????????????connectedSignal.countDown();?? 28. ????????????}?? 29. ????????}?? 30. ????}?? 31. }?? ? 創建znodeZooKeeper對象的create方法用于創建znode. 1.? String?create(String?path,?byte[]?data,?List?acl,?CreateMode?createMode);?? 以下為各個參數的詳細說明: ·???????? path. znode的路徑. ·???????? data. 與znode關聯的數據. ·???????? acl. 指定權限信息, 如果不想指定權限, 可以傳入Ids.OPEN_ACL_UNSAFE. ·???????? 指定znode類型. CreateMode是一個枚舉類, 從中選擇一個成員傳入即可. 關于znode類型的詳細說明, 可參考本人的上一篇博文. 1.? /**? 2.? ?*?創建臨時節點? 3.? ?*/?? 4.? public?void?create(String?nodePath,?byte[]?data)?throws?Exception?{?? 5.? ????zk.create(nodePath,?data,?Ids.OPEN_ACL_UNSAFE,?CreateMode.EPHEMERAL);?? 6.? }?? ? 獲取子node列表ZooKeeper對象的getChildren方法用于獲取子node列表. 1.? List?getChildren(String?path,?boolean?watch);?? watch參數用于指定是否監聽path node的子node的增加和刪除事件, 以及path node本身的刪除事件. 判斷znode是否存在ZooKeeper對象的exists方法用于判斷指定znode是否存在. 1.? Stat?exists(String?path,?boolean?watch);?? watch參數用于指定是否監聽path node的創建, 刪除事件, 以及數據更新事件. 如果該node存在, 則返回該node的狀態信息, 否則返回null. 獲取node中關聯的數據ZooKeeper對象的getData方法用于獲取node關聯的數據. 1.? byte[]?getData(String?path,?boolean?watch,?Stat?stat);?? watch參數用于指定是否監聽path node的刪除事件, 以及數據更新事件, 注意, 不監聽path node的創建事件, 因為如果path node不存在, 該方法將拋出KeeperException.NoNodeException異常. 更新node中關聯的數據ZooKeeper對象的setData方法用于更新node關聯的數據. 1.? Stat?setData(final?String?path,?byte?data[],?int?version);?? data為待更新的數據. 刪除znodeZooKeeper對象的delete方法用于刪除znode. 1.? void?delete(final?String?path,?int?version);?? version參數的作用同setData方法. 其余接口請查看ZooKeeper對象的API文檔. 需要注意的幾個地方·???????? znode中關聯的數據不能超過1M. zookeeper的使命是分布式協作, 而不是數據存儲. ·???????? getChildren, getData, exists方法可指定是否監聽相應的事件. 而create, delete, setData方法則會觸發相應的事件的發生. ·???????? 以上介紹的幾個方法大多存在其異步的重載方法, 具體請查看API說明. |
?
總結
以上是生活随笔為你收集整理的Zookeeper 客户端API调用示例(基本使用,增删改查znode数据,监听znode,其它案例,其它网络参考资料)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 军队考生的准考证号丢失咋办?
- 下一篇: 如何让帝国时代2之征服者尸体不消失