Apache ZooKeeper - 使用原生的API操作ZK
文章目錄
- 概述
- maven依賴
- 驗證
- 測試基類
- ZK構造函數參數
- connectString:ZooKeeper服務器列表
- sessionTimeout:會話的超時時間, “毫秒”為單位
- watcher:事件通知處理器
- canBeReadOnly: 用于標識當前會話是否支持“read-only(只讀)”模式。
- sessionId和 sessionPasswd:會話ID和會話秘鑰
- CRUD
- 同步創建節點
- 修改數據
- 查詢數據(不帶watcher)
- 查詢數據(帶watcher)
- 刪除數據
- 異步創建節點
概述
前面幾篇系列博文我們熟悉了如何通過命令來操作ZK節點數據,下面我們來看下如何使用API來操作
主要兩種方式
今天我們來看下如何使用原生的API操作ZK
maven依賴
和 服務端的版本保持一致
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.8</version></dependency>驗證
接下來我們使用單元測試來驗證下原生API的對ZK 數據的增刪改查
測試基類
我們來寫下測試基類
package com.artisan.zk.originalClient;import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before;import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit;@Slf4j public abstract class StandAloneBaseTest {private static final String ZK_ADDRESS = "192.168.126.131:2181";private static final int SESSION_TIMEOUT = 30_000;private static CountDownLatch countDownLatch = new CountDownLatch(1);public static ZooKeeper getZooKeeper() {return zooKeeper;}private static ZooKeeper zooKeeper ;private static Watcher watcher = event -> {if (event.getState() == Watcher.Event.KeeperState.SyncConnected && event.getType() == Watcher.Event.EventType.None){log.info("ZK Connected");countDownLatch.countDown();}};@Beforepublic void init() throws IOException, InterruptedException {log.info("start to connect zk server: {}" , ZK_ADDRESS);zooKeeper = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, watcher);log.info("connecting to....{}", ZK_ADDRESS);countDownLatch.await();}@Afterpublic void test(){try {TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}}為了方便測試,直接在init初始化方法中創建zookeeper實例 ,不要關閉~
ZK構造函數參數
connectString:ZooKeeper服務器列表
由英文逗號分開的host:port字符串組成,每一個都代表一臺ZooKeeper機器,如 host1:port1,host2:port2,host3:port3
另外,也可以在connectString中設置客戶端連接上ZooKeeper后的根目錄,方法是在host:port字符串之后添加上這個根目錄。
例如,host1:port1,host2:port2,host3:port3/app/a,這樣就指定了該客戶端連接上ZooKeeper服務器之后,所有對ZooKeeper的操作,都會基于這個根目錄。
例如,客戶端對/foo/bar 的操作,最終創建/app/a/foo/bar, 這個目錄也叫Chroot,即客戶端隔離命名空間。
sessionTimeout:會話的超時時間, “毫秒”為單位
在ZooKeeper中有會話的概念,在一個會話周期內,ZooKeeper客戶端和服務器之間會通過心跳檢測機制來維持會話的有效性.
一旦在sessionTimeout時間內沒有進行有效的心跳檢測,會話就會失效。
watcher:事件通知處理器
ZooKeeper允許客戶端在構造方法中傳入一個接口 watcher (org.apache. zookeeper.Watcher)的實現類對象來作為默認的 Watcher事件通知處理器。
當然,該參數可以設置為null 以表明不需要設置默認的 Watcher處理器。
canBeReadOnly: 用于標識當前會話是否支持“read-only(只讀)”模式。
boolean類型的參數
默認情況下,在ZooKeeper集群中,一個機器如果和集群中過半及以上機器失去了網絡連接,那么這個機器將不再處理客戶端請求(包括讀寫請
求)。
但是在某些使用場景下,當ZooKeeper服務器發生此類故障的時候,我們還是希望ZooKeeper服務器能夠提供讀服務(當然寫服務肯定無法提供),這就是 ZooKeeper的“read-only”模式。
sessionId和 sessionPasswd:會話ID和會話秘鑰
這兩個參數能夠唯一確定一個會話,同時客戶端使用這兩個參數可以實現客戶端會話復用,從而達到恢復會話的效果。
具體使用方法是,第一次連接上ZooKeeper服務器時,通過調用ZooKeeper對象實例的以下兩個接口,即可獲得當前會話的ID和秘鑰:
荻取到這兩個參數值之后,就可以在下次創建ZooKeeper對象實例的時候傳入構造方法了
CRUD
同步創建節點
package com.artisan.zk.originalClient;import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Test;@Slf4j public class BaseOperationStandAloneModeTest extends StandAloneBaseTest{private static final String NODE_NAME = "/artisan-node";@Testpublic void testCreate(){try{ZooKeeper zooKeeper = getZooKeeper();String s = zooKeeper.create(NODE_NAME,"artisan-node-value".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.info("create persistent node {} , result {}" , NODE_NAME, s );}catch (Exception e){log.error("create Exception {}", e.getMessage());}} }修改數據
@SneakyThrows@Testpublic void testSetData() {// 修改前數據Stat stat = new Stat();byte[] data = getZooKeeper().getData(NODE_NAME, null, stat);log.info("data before change: " + new String(data));int version = stat.getVersion();log.info("data version {} " , version);// 修改數據Stat newStat = getZooKeeper().setData(NODE_NAME, "ARTISAN - NEW-SET-DATA".getBytes(), version);log.info("new stat version info {} " , newStat.getVersion());log.info("data after change: {} " , new String(getZooKeeper().getData(NODE_NAME, null, newStat)));}
查詢數據(不帶watcher)
@SneakyThrows@Testpublic void testGetWithOutWatch(){byte[] data = getZooKeeper().getData(NODE_NAME, null, null);log.info("data {}" , new String(data));}查詢數據(帶watcher)
@SneakyThrows@Testpublic void testGetWithWatch(){Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {// 監聽NodeDataChanged事件if (event.getPath() != null && event.getPath().equals(NODE_NAME)&& event.getType() == Watcher.Event.EventType.NodeDataChanged){log.info("path {} changed watched " , NODE_NAME);// 監聽一旦觸發就會失效,因此需要重新監聽try {byte[] data = getZooKeeper().getData(NODE_NAME, this, null);log.info("監聽觸發后的操作-- data: {}",new String(data));} catch (Exception e) {log.info("getData Error {} " , e.getMessage());}}}};// 獲取節點數據byte[] data = getZooKeeper().getData(NODE_NAME, watcher, null);log.info("data {}" , new String(data));}因為監聽的是NodeDataChanged事件,因此我們再去調用修改數據的方法,或者在客戶端手動修改數據
觀察testGetWithWatch的日志
zk里查看數據
刪除數據
@SneakyThrows@Testpublic void testDelete(){// if the given version is -1, it matches any node's versions// -1 代表匹配所有版本,直接刪除// 任意大于 -1 的代表可以指定數據版本刪除getZooKeeper().delete(NODE_NAME,-1);}查看客戶端,已經刪除
異步創建節點
@SneakyThrows@Testpublic void testCreateAsyn(){getZooKeeper().create(NODE_NAME, "DATA_VALUE".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,(rc, path, ctx, name) -> {String currentThreadName = Thread.currentThread().getName();log.info("currentThreadName {} , rc {} , path {} , ctx {} , name {} " , currentThreadName, rc , path ,ctx ,name );}, "ARTISAN");byte[] data = getZooKeeper().getData(NODE_NAME, null, null);log.info("data {}" , new String(data));}EventThread創建的節點 ,而非當前線程
行了 基本操作就這些,下篇繼續
總結
以上是生活随笔為你收集整理的Apache ZooKeeper - 使用原生的API操作ZK的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: NFS - MIPS架构下构建NFS共享
- 下一篇: Apache ZooKeeper - 构