Zookeeper_实际应用讲解
生活随笔
收集整理的這篇文章主要介紹了
Zookeeper_实际应用讲解
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
我告訴你zookeeper一般是做什么事的,這里會有一個管理工具,到底怎么去用呢,eclipse和工具去集成
輸入zookeeperBrower這個名字,輸入http://www.massedynamic.org/eclipse/updates/網址,無非就是把這個插件裝上,Help里面有一個eclipsemarket,肯定要求輸入名字和地址,安裝這個插件,安裝上了就可以用這個zookeeper explorer,這個名字其實你可以隨便起的,但是地址你不能隨便弄,你如果想在eclipse上去觀察的話你可以用這個,我也會發給你第三方的工具,咱們看一下cluster,這是隨便模擬的一個例子,他并不是做一個很真實的場景
如果有一個實際的場景的話,我希望對分布式系統配置,進行集中地管理,我可能是集群,我有好多的服務,比如開始連的是同樣的數據庫,都要連zookeeper,一下子變化了,zookeeper地址一下子變化了,怎么辦,原先zookeeper在1.1上,1.1,1.2,1.3,現在整體變成1.4,1.5,1.6了,3臺無所謂,如果是30臺,300臺呢,你可能一個機器一個機器的去改太麻煩了,這個時候就可以使用watcher來做這個事情,去觀察,動態的去修改,這個后期再去講實際的案例吧,我現在有一個非常簡單的DEMO,首先我有連個client端,client1和client2,你可以把這兩個哥們當成兩臺機器,這兩臺機器在工作中怎么去用呢,項目啟動的時候,我都要watcher啟動起來,然后我就一直在這里等著,就是把項目啟動了,我就直接運行,第一臺機器連上了就是運行的,第二臺機器也是這樣的,首先每一個機器都是watcher,分別有兩臺機器去watcher了,是什么節點呢,你可以去看一下,這兩個client都同時實例化一個類,以后在工作中你可以把watcher寫在一個common里面,也可以寫在每一個項目中,這都是行的
package com.learn.zookeeper.cluster;import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;/*** 這是好久之前模擬的* 代碼寫的也比較low* 主要是讓你看到這個效果* 以后用到一些Curator框架的時候* 那才是最好的* 自己剛學會CopyOnWriteArrayList這種集合* 像這種節點頻繁變更的* 不應該采用這種list* 你寧可采用ArrayList* 或者并發的List* 所以你千萬別使用這個* 你就當看一個邏輯* 而不是看代碼怎么去實現的* 我要你知道子節點變更的時候是有多麻煩* 我要知道子節點到底發生了什么樣的變化* 原生的API寫的時候是比較麻煩的* 你看一下process就知道了* process的時候我做了一堆事* 我自己都不知道怎么去寫的* 以后我們就用Curator會是更好的方式* 主要講的是一個場景* 我們在真正的工作中有好幾臺服務器* 都要公用一份config文件* 那么我就可以用zookeeper去做這個事情* * * * @author Leon.Sun**/
public class ZKWatcher implements Watcher {/** zk變量 */private ZooKeeper zk = null;/** 父節點path *//*** 有一個/super節點* super節點它是key* 默認的時候是先去get /super* 然后把IP地址獲得* 獲取是其他的服務器* 咱們舉個例子* 一旦有super節點發生update的時候* 誰watcher了我就給誰發update* Node Change這個事件* 直接把新的value給更新上了* 更新上了就做一個集體的切換* 很多時候工作就是做這個事* 這是最簡單的一種實現* * */static final String PARENT_PATH = "/super";/** 信號量設置,用于等待zookeeper連接建立之后 通知阻塞程序繼續向下執行 */private CountDownLatch connectedSemaphore = new CountDownLatch(1);private List<String> cowaList = new CopyOnWriteArrayList<String>();/** zookeeper服務器地址 */
// public static final String CONNECTION_ADDR = "192.168.80.88:2181,192.168.80.87:2181,192.168.80.86:2181";public static final String CONNECTION_ADDR = "59.110.138.145:2181";/** 定義session失效時間 */public static final int SESSION_TIMEOUT = 30000;public ZKWatcher() throws Exception{zk = new ZooKeeper(CONNECTION_ADDR, SESSION_TIMEOUT, this);System.out.println("開始連接ZK服務器");connectedSemaphore.await();}@Overridepublic void process(WatchedEvent event) {// 連接狀態KeeperState keeperState = event.getState();// 事件類型EventType eventType = event.getType();// 受影響的pathString path = event.getPath();System.out.println("受影響的path : " + path);if (KeeperState.SyncConnected == keeperState) {// 成功連接上ZK服務器if (EventType.None == eventType) {System.out.println("成功連接上ZK服務器");connectedSemaphore.countDown();try {if(this.zk.exists(PARENT_PATH, false) == null){this.zk.create(PARENT_PATH, "root".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }List<String> paths = this.zk.getChildren(PARENT_PATH, true);for (String p : paths) {System.out.println(p);this.zk.exists(PARENT_PATH + "/" + p, true);}} catch (KeeperException | InterruptedException e) {e.printStackTrace();} } //創建節點else if (EventType.NodeCreated == eventType) {System.out.println("節點創建");try {this.zk.exists(path, true);} catch (KeeperException | InterruptedException e) {e.printStackTrace();}} //更新節點else if (EventType.NodeDataChanged == eventType) {System.out.println("節點數據更新");try {//update nodes call functionthis.zk.exists(path, true);} catch (KeeperException | InterruptedException e) {e.printStackTrace();}} //更新子節點else if (EventType.NodeChildrenChanged == eventType) {System.out.println("子節點 ... 變更");try {List<String> paths = this.zk.getChildren(path, true);if(paths.size() >= cowaList.size()){paths.removeAll(cowaList);for(String p : paths){this.zk.exists(path + "/" + p, true);//this.zk.getChildren(path + "/" + p, true);System.out.println("這個是新增的子節點 : " + path + "/" + p);//add new nodes call function}cowaList.addAll(paths);} else {cowaList = paths;}System.out.println("cowaList: " + cowaList.toString());System.out.println("paths: " + paths.toString());} catch (KeeperException | InterruptedException e) {e.printStackTrace();}} //刪除節點else if (EventType.NodeDeleted == eventType) {System.out.println("節點 " + path + " 被刪除");try {//delete nodes call functionthis.zk.exists(path, true);} catch (KeeperException | InterruptedException e) {e.printStackTrace();}}else ;} else if (KeeperState.Disconnected == keeperState) {System.out.println("與ZK服務器斷開連接");} else if (KeeperState.AuthFailed == keeperState) {System.out.println("權限檢查失敗");} else if (KeeperState.Expired == keeperState) {System.out.println("會話失效");}else ;System.out.println("--------------------------------------------");}}
package com.learn.zookeeper.cluster;public class Client1 {public static void main(String[] args) throws Exception{ZKWatcher myWatcher = new ZKWatcher();Thread.sleep(100000000);}
}
package com.learn.zookeeper.cluster;public class Client2 {public static void main(String[] args) throws Exception{ZKWatcher myWatcher = new ZKWatcher();Thread.sleep(100000000);}
}
package com.learn.zookeeper.cluster;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;/*** 機器1和機器2是一個集群的方式* 以后要連同樣的一份代碼* 要連redis集群* 你再開發一套應用* 這個應用就是用來管理zookeeper上的節點的* 我這塊添加了一個節點了* 這兩臺機器應該馬上感知到* 我再添加刪除* 然后這里面的業務邏輯變更* 其實就是這么一個道理* 我把test運行一下吧* 運行完了之后Test就停止了* * * @author Leon.Sun**/
public class Test {/** zookeeper地址 */
// static final String CONNECT_ADDR = "192.168.1.106:2181,192.168.1.107:2181,192.168.1.108:2181";/*** 這個Test無論做什么事* 這兩個client都能感覺到的* 比如我這個Test做什么事* * */static final String CONNECT_ADDR = "59.110.138.145:2181";/** session超時時間 */static final int SESSION_OUTTIME = 2000;//ms /** 信號量,阻塞程序執行,用于等待zookeeper連接成功,發送成功信號 */static final CountDownLatch connectedSemaphore = new CountDownLatch(1);public static void main(String[] args) throws Exception{/*** new了一個zookeeper* 這個Watcher就做了一個很簡單的操作* 連接上了就OK了* */ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){@Overridepublic void process(WatchedEvent event) {//獲取事件的狀態KeeperState keeperState = event.getState();EventType eventType = event.getType();//如果是建立連接if(KeeperState.SyncConnected == keeperState){if(EventType.None == eventType){//如果建立連接成功,則發送信號量,讓后續阻塞程序向下執行connectedSemaphore.countDown();System.out.println("zk 建立連接");}}}});//進行阻塞connectedSemaphore.await();// //創建子節點/*** 創建了4個節點* c1,c2,c3,c4*/zk.create("/super/c1", "c1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//創建子節點zk.create("/super/c2", "c2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//創建子節點zk.create("/super/c3", "c3".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//創建子節點zk.create("/super/c4", "c4".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// zk.create("/super/c4/c44", "c44".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//獲取節點信息
// byte[] data = zk.getData("/testRoot", false, null);
// System.out.println(new String(data));
// System.out.println(zk.getChildren("/testRoot", false));//修改節點的值/*** 修改了c1和c2*/zk.setData("/super/c1", "modify c1".getBytes(), -1);zk.setData("/super/c2", "modify c2".getBytes(), -1);byte[] data = zk.getData("/super/c2", false, null);System.out.println(new String(data)); // //判斷節點是否存在
// System.out.println(zk.exists("/super/c3", false));
// //刪除節點/*** 刪除了c3* 其實就是這么簡單的操作* 刪除c3了* 就只剩下c1,c2,c4了* 我做完這個操作之后* client1和client2都應該能感知到才行* 都是觸發了這些東西* client1是這樣* client2也是這樣* 沒有任何變化* 這兩個哥們C1和c2收到的信息是一樣的* 可以這么去理解* 這兩個都監聽到了* 就是這個意思* 理解這個事就行了* 很多塊時候都需要做這個事情* 降低重復工作* * * */zk.delete("/super/c3", -1);zk.close();}
}
?
總結
以上是生活随笔為你收集整理的Zookeeper_实际应用讲解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Zookeeper_安全认证讲解
- 下一篇: Zookeeper_zkClientAP