支持断线重连、永久watcher、递归操作 ZooKeeper 客户端
項目介紹
ZooKeeper本質上是一個分布式的小文件存儲系統。原本是Apache Hadoop的一個組件,現在被拆分為一個Hadoop的獨立子項目。
Zookeeper 作為一個分布式的服務框架,主要用來解決分布式集群中應用系統的一致性問題,它能提供基于類似于文件系統的目錄節點樹方式的數據存儲,但是 Zookeeper 并不是用來專門存儲數據的,它的作用主要是用來維護和監控你存儲的數據的狀態變化。通過監控這些數據狀態的變化,從而可以達到基于數據的集群管理。
Zookeeper 在Windows安裝和使用,可參考http://www.cnblogs.com/shanyou/p/3221990.html
ZookeeperClient是在https://github.com/shayhatsor/zookeeper基礎上的再次封裝,使開發者更方便使用ZooKeeper相關的功能。
ZookeeperClient實現了斷線重連,會話過期重連,永久監聽,子節點數據變化的監聽。并且加入了常用功能,例如分布式鎖,Leader選舉,分布式隊列
,項目地址https://github.com/milanyangbo/ZooKeeper.Net
支持的平臺
.NET 4及以上,目前尚不支持.NET Core, .NET Core推薦用另外一個項目?支持斷線重連、永久watcher、遞歸操作并且能跨平臺(.NET Core)的ZooKeeper異步客戶端?
使用說明
下面列一下常用的使用方法,不僅限于此哦!
一、創建ZKClient對象
創建ZKClient對象 有兩種方式可以方便的創建ZKClient對象
使用構造函數創建
?string address = "localhost:2181";
? ?ZKClient zkClient1 = new ZKClient(address); ? ?
? ?ZKClient zkClient2 = new ZKClient(address, TimeSpan.FromMilliseconds(10000)); ?
? ?ZKClient zkClient3 = new ZKClient(address, TimeSpan.FromMilliseconds(10000), TimeSpan.FromMilliseconds(10000)); ?
? ?ZKClient zkClient4 = new ZKClient(address, TimeSpan.FromMilliseconds(30000), TimeSpan.FromMilliseconds(10000), new SerializableSerializer()); ? ? ?
? ?ZKClient zkClient5 = new ZKClient(address, TimeSpan.FromMilliseconds(30000), TimeSpan.FromMilliseconds(10000), new SerializableSerializer(), TimeSpan.FromMilliseconds(60000));?
使用輔助類創建
string address = "localhost:2181"; ?
? ?ZKClient zkClient = ZKClientBuilder.NewZKClient(address) ?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .SessionTimeout(30000)//可選 ?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .Serializer(new SerializableSerializer())//可選 ?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .RetryTimeout(60000)//可選 ?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .ConnectionTimeout(10000)//可選 ?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .Build(); //創建實例
二、節點的新增、更新、刪除和獲取
新增節點
常規新增節點
父節點不存在會拋出異常
遞歸新增節點(新增節點及其父節點)
如果父節點不存在會被一并創建。
對于PERSISTENT類型的節點,遞歸創建,父節點和子節點都創建為PERSISTENT。
對于EPHEMERAL類型的節點,遞歸創建,父節點都是PERSISTENT類型,而最后一級節點才是EPHEMERAL類型。(因為EPHEMERAL不能擁有子節點)
注意:第二個參數為節點的值,指的的最后一級節點的值。
?string path = "/test8/1/2/3"; ? ? //遞歸創建節點及父節點
? ? await zkClient.CreateRecursiveAsync(path, "abc", CreateMode.PERSISTENT); ? ? await zkClient.CreateRecursiveAsync(path, "123", ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
特殊的EPHEMERAL類型節點
特殊類型的EPHEMERAL節點,該節點在會話失效被刪除后,重新連接會被自動創建。
更新節點數據
string path = "/test"; await zkClient.SetDataAsync(path, "456"); //帶期望版本號的更新,如果真實的版本號與期望版本號不一致會更新失敗,拋出異常await zkClient.SetDataAsync(path, "123", 2);刪除節點
常規刪除
遞歸刪除(刪除節點及子節點)
獲取節點數據
string path = "/test"; await zkClient.GetDataAsync<string>(path); //如果節點不存在拋出異常await zkClient.GetDataAsync<string>(path, true); //如果節點不存在返回nullStat stat = (await zkClient.GetZKDataAsync<string>(path)).stat; //獲得數據以及stat信息等待節點創建
string path = "/test"; //等待直到超時或者節點創建成功。await zkClient.WaitUntilExistsAsync(path, TimeSpan.FromMilliseconds(5000));三、權限管理
ZooKeeper的權限管理亦即ACL控制功能通過Server、Client兩端協調完成:
Server端:
一個ZooKeeper的節點(znode)存儲兩部分內容:數據和狀態,狀態中包含ACL信息。創建一個znode會產生一個ACL列表,列表中每個ACL包括:
驗證模式(scheme)
具體內容(Id)(當scheme=“digest”時,Id為用戶名密碼,例如“root:J0sTy9BCUKubtK1y8pkbL7qoxSw=”)
權限(perms)
ZooKeeper提供了如下幾種驗證模式(scheme):
digest:Client端由用戶名和密碼驗證,譬如user:password,digest的密碼生成方式是Sha1摘要的base64形式
auth:不使用任何id,代表任何已確認用戶。
ip:Client端由IP地址驗證,譬如172.2.0.0/24
world:固定用戶為anyone,為所有Client端開放權限
super:在這種scheme情況下,對應的id擁有超級權限,可以做任何事情(cdrwa)
注意的是,exists操作和getAcl操作并不受ACL許可控制,因此任何客戶端可以查詢節點的狀態和節點的ACL。
節點的權限(perms)主要有以下幾種:
Create 允許對子節點Create操作
Read 允許對本節點GetChildren和GetData操作
Write 允許對本節點SetData操作
Delete 允許對子節點Delete操作
Admin 允許對本節點setAcl操作
Znode ACL權限用一個int型數字perms表示,perms的5個二進制位分別表示setacl、delete、create、write、read。比如0x1f=adcwr,0x1=----r,0x15=a-c-r。
四、監聽相關
注意:對于斷開連接時間過長造成的會話過期,由于服務器端在會話過期后會刪除客戶端設置的監聽。
即便客戶端在會話過期后自動連接成功,但是在會話過期到會話重建這段時間客戶端監聽的節點仍可能發生了改變,
而具體哪些變了或是沒變,客戶端是無法感知到的。
為了避免丟掉任何數據改變的事件,所有的監聽器的都有一個回調方法(SessionExpiredHandler),用來處理會話過期這種特殊情況。
SessionExpiredHandler = async (path) =>{ await Task.Run(() =>{ Console.WriteLine(path);});};訂閱節點的信息改變(創建節點,刪除節點,添加子節點)
IZKChildListener childListener = new ZKChildListener(); //子節點內容變化childListener.ChildChangeHandler = async (parentPath, currentChilds) =>{ await Task.Run(() =>{ Console.WriteLine(parentPath); Console.WriteLine(string.Join(".", currentChilds));});}; //子節點數量變化childListener.ChildCountChangedHandler = async (parentPath, currentChilds) =>{ await Task.Run(() =>{ Console.WriteLine(parentPath); Console.WriteLine(string.Join(".", currentChilds));});}; //"/testUserNode" 監聽的節點,可以是現在存在的也可以是不存在的 zkClient.SubscribeChildChanges("/testUserNode3", childListener);訂閱節點的數據內容的變化
IZKDataListener dataListener = new ZKDataListener(); // 節點創建和節點內容變化dataListener.DataCreatedOrChangeHandler = async (dataPath, data) =>{ await Task.Run(() =>{ Console.WriteLine(dataPath + ":" + Convert.ToString(data));});}; // 節點刪除dataListener.DataDeletedHandler = async (dataPath) =>{ await Task.Run(() =>{ Console.WriteLine(dataPath);});}; // 節點創建dataListener.DataCreatedHandler = async (dataPath, data) =>{ await Task.Run(() =>{ Console.WriteLine(dataPath + ":" + Convert.ToString(data));});}; // 節點內容變化dataListener.DataChangeHandler = async (dataPath, data) =>{ await Task.Run(() =>{ Console.WriteLine(dataPath);});}; zkClient.SubscribeDataChanges("/testUserNode", dataListener);客戶端狀態監聽
IZKStateListener stateListener = new ZKStateListener(); //狀態改變stateListener.StateChangedHandler = async (state) =>{ await Task.Run(() =>{ Console.WriteLine(state.ToString());});}; //會話失效stateListener.SessionExpiredHandler = async (path) =>{ await Task.Run(() =>{ Console.WriteLine(path);});}; //創建會話stateListener.NewSessionHandler = async () =>{ await Task.Run(() =>{});}; //會話失敗stateListener.SessionEstablishmentErrorHandler = async (ex) =>{ await Task.Run(() =>{ Console.WriteLine(ex.Message);});}; zkClient.SubscribeStateChanges(stateListener);五、擴展功能
分布式鎖
using (var zkClient = new ZKClient(TestUtil.zkServers)){ await zkClient.CreateRecursiveAsync("/zk/lock", null, CreateMode.Persistent); //創建分布式鎖, 非線程安全類,每個線程請創建單獨實例。var _lock = new ZKDistributedLock(zkClient, "/zk/lock"); await _lock.LockAsync(); //獲得鎖//do sometingawait _lock.UnLockAsync();//釋放鎖}Leader選舉
using (var zkClient = new ZKClient(TestUtil.zkServers)){ await zkClient.CreateRecursiveAsync("/zk/leader", null, CreateMode.Persistent); var listener = new ZKLeaderSelectorListener();listener.takeLeadership = async (client, selector) =>{ Console.WriteLine("I am the leader-" + await selector.GetLeaderAsync()); selector.Close();}; var selector = new ZKLeaderSelector("id", true, zkClient, "/zk/leader", listener); //啟動并參與Leader選舉selector.Start(); //獲得當前主服務的IDawait selector.GetLeaderAsync(); //如果要退出Leader選舉selector.Close(); }
分布式隊列
using (var zkClient = new ZKClient(TestUtil.zkServers)){? ? ? ?await zkClient.CreateRecursiveAsync("/zk/queue", null, CreateMode.PERSISTENT); var queue = new ZKDistributedQueue<long>(new ZKClient(TestUtil.zkServers), "/zk/queue") await queue.OfferAsync("123");//放入元素var value = await queue.PollAsync();//刪除并獲取頂部元素var value = await queue.PeekAsync(); //獲取頂部元素,不會刪除 ? ?}
總結
以上是生活随笔為你收集整理的支持断线重连、永久watcher、递归操作 ZooKeeper 客户端的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 测试——《微服务设计》读书笔记
- 下一篇: 通过 Transifex 中文化开源软件