ZooKeeper实战(三):ZooKeeper实现分布式配置中心、分布式锁、Reactive响应式模型
引入
ZooKeeper是做分布式協(xié)調(diào)的,那它協(xié)調(diào)啥?
 配置寫在哪里?難道運(yùn)維人員要登錄到每一臺(tái)機(jī)器一臺(tái)一臺(tái)地改嗎?
 可以將配置文件放在一個(gè)共享的位置中,比如redis,比如數(shù)據(jù)庫,比如zk,任何一個(gè)地方。
 zk具有回調(diào)機(jī)制,就不需要輪詢了。
 分布式鎖的實(shí)現(xiàn),這個(gè)zk也可以做,面試常問,雖然可能用不到,但是這道題可以帶出很多知識(shí)點(diǎn)。
使用ZooKeeper實(shí)現(xiàn)分布式配置中心
思路:我們將所有的配置數(shù)據(jù)用data配置到zk中去,在客戶端我們既可以get它,也可以watch它。
 一旦外界對(duì)這個(gè)數(shù)據(jù)進(jìn)行了修改,這個(gè)修改就會(huì)引發(fā)watch的回調(diào)。
1、四臺(tái)機(jī)器配好 Zookeeper 集群
使用zkServer.sh start啟動(dòng)四臺(tái)Zookeeper
 
2、IDEA新建項(xiàng)目,目錄結(jié)構(gòu):
 pom.xml 里面配好 zookeeper 依賴
3、代碼
TestConfig.java,主程序測(cè)試入口
package com.msb.zookeeper.config;import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test;public class TestConfig {ZooKeeper zk;@Beforepublic void conn() {zk = ZKUtils.getZK();}@Afterpublic void close() {try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void getConf() {WatchCallBack watchCallBack = new WatchCallBack();watchCallBack.setZk(zk);MyConf myConf = new MyConf();watchCallBack.setMyConf(myConf);watchCallBack.awaitExists();//1,節(jié)點(diǎn)不存在//2,節(jié)點(diǎn)存在while (true) {if (myConf.getConf().equals("")) {System.out.println("conf diu le ......");watchCallBack.awaitExists();} else {System.out.println("In getConf Test, myConf is:" + myConf.getConf());}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}} }Myconf.java,用來存放配置。
 真正的開發(fā)中,配置中的內(nèi)容可能是一個(gè)復(fù)雜的json,xml等等。
 此處用一個(gè)String conf字符串代替。
ZKUtils.java,用來配置Zookeeper的Server端服務(wù)器地址,以及創(chuàng)建并返回Zookeeper實(shí)例
package com.msb.zookeeper.config;import org.apache.zookeeper.ZooKeeper;import java.util.concurrent.CountDownLatch;public class ZKUtils {private static ZooKeeper zk;// 需要在zk中設(shè)置 create /testLock create /testLock/AppConf set /testLock/AppConf "hello,conf"// ip:port 后面的 /testLock 是指定的根目錄,后續(xù)所有的操作都在以 /testLock 為根目錄的基礎(chǔ)上進(jìn)行private static String address = "10.0.0.131:2181,10.0.0.132:2181,10.0.0.133:2181,10.0.0.134:2181/testLock";private static DefaultWatch watch = new DefaultWatch();private static CountDownLatch init = new CountDownLatch(1);public static ZooKeeper getZK() {try {zk = new ZooKeeper(address, 1000, watch);//new ZooKeeper 對(duì)象的時(shí)候需要使用到 DefaultWatchwatch.setLatch(init);init.await();} catch (Exception e) {e.printStackTrace();}return zk;} }DefaultWatch.java,在 new ZooKeeper() 對(duì)象的時(shí)候需要用到,作為參數(shù)傳入。
package com.msb.zookeeper.config;import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher;import java.util.concurrent.CountDownLatch;public class DefaultWatch implements Watcher {CountDownLatch latch;public void setLatch(CountDownLatch latch) {this.latch = latch;}@Overridepublic void process(WatchedEvent event) {System.out.println(event.toString());switch (event.getState()) {case Unknown:break;case Disconnected:break;case NoSyncConnected:break;case SyncConnected:System.out.println("In DefaultWatch, SyncConnected,連接成功.");latch.countDown();break;case AuthFailed:break;case ConnectedReadOnly:break;case SaslAuthenticated:break;case Expired:break;}} }WatchCallBack.java,實(shí)現(xiàn)多個(gè)接口,既是Watcher,又是Callback。重寫了各個(gè)回調(diào)函數(shù),是整個(gè)Reactor模型的核心
package com.msb.zookeeper.config;import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat;import java.util.concurrent.CountDownLatch;public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {ZooKeeper zk;MyConf myConf;CountDownLatch latch = new CountDownLatch(1);public MyConf getMyConf() {return myConf;}public ZooKeeper getZk() {return zk;}public void setMyConf(MyConf myConf) {this.myConf = myConf;}public void setZk(ZooKeeper zk) {this.zk = zk;}public void awaitExists() {System.out.println("awaitExists...");zk.exists("/AppConf", this, this, "ABC");try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("awaitExists finish...");}@Override/*** DataCallback接口實(shí)現(xiàn)*/public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {if (data != null) {String str = new String(data);System.out.println("In WatchCallBack, data is: " + str);myConf.setConf(str);latch.countDown();}}@Override/*** StatCallback 接口實(shí)現(xiàn)*/public void processResult(int rc, String path, Object ctx, Stat stat) {if (stat != null) {zk.getData("/AppConf", this, this, "sdfs");}}@Override/*** Watcher 接口實(shí)現(xiàn)*/public void process(WatchedEvent event) {switch (event.getType()) {case None:break;case NodeCreated://節(jié)點(diǎn)被創(chuàng)建事件zk.getData("/AppConf", this, this, "sdfs");break;case NodeDeleted://節(jié)點(diǎn)被刪除事件,這要根據(jù)業(yè)務(wù)的容忍性,寫處理方式System.out.println("In WatchCallBack, 節(jié)點(diǎn)被刪除了");myConf.setConf("");latch = new CountDownLatch(1);//阻塞break;case NodeDataChanged://數(shù)據(jù)被更改zk.getData("/AppConf", this, this, "sdfs");break;case NodeChildrenChanged:break;}} }4、運(yùn)行 & 測(cè)試
運(yùn)行 Test
 
 控制臺(tái)輸出,可以看到由于zk.exists("/AppConf", this, this, "ABC");后面加了latch.await();正在阻塞地等待Zookeeper中對(duì)應(yīng)節(jié)點(diǎn)的創(chuàng)建。
 
用客戶端連接Zookeeper
 
創(chuàng)建指定節(jié)點(diǎn)create /testLock/AppConf "foo"
檢測(cè)到配置文件節(jié)點(diǎn)的產(chǎn)生
 
修改配置文件的節(jié)點(diǎn)數(shù)據(jù)set /testLock/AppConf "bar"
檢測(cè)到節(jié)點(diǎn)數(shù)據(jù)的修改
 刪除配置文件節(jié)點(diǎn)delete /testLock/AppConf
 
輸出“conf 丟了”,重新進(jìn)入阻塞等待狀態(tài)…等待節(jié)點(diǎn)的創(chuàng)建
 
附:Zookeeper客戶端常用命令:
 ls /列出根目錄所有節(jié)點(diǎn)名稱
 create /testLock/AppConf "foo"創(chuàng)建節(jié)點(diǎn)并設(shè)置節(jié)點(diǎn)內(nèi)容
 set /testLock/AppConf "bar"更新節(jié)點(diǎn)內(nèi)容
 delete /testLock/AppConf刪除節(jié)點(diǎn)
使用Zookeeper實(shí)現(xiàn)分布式鎖
1、和前面一樣,配好4臺(tái)Zookeeper集群
zkServer.sh start 四臺(tái)zk都啟動(dòng)一下
 
2、目錄結(jié)構(gòu)
3、代碼
用到了前面的兩個(gè):
- ZKUtils.java,獲取zk實(shí)例,配置server ip
- DefaultWatch.java,默認(rèn)的Watcher,在 new ZooKeeper 對(duì)象的時(shí)候會(huì)用到
另外,增加了:
TestLock.java,測(cè)試分布式鎖的程序入口
package com.msb.zookeeper.lock;import com.msb.zookeeper.config.ZKUtils; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test;public class TestLock {ZooKeeper zk;@Beforepublic void conn() {zk = ZKUtils.getZK();}@Afterpublic void close() {try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void lock() {for (int i = 0; i < 10; i++) {//模擬10個(gè)線程分布在10臺(tái)機(jī)器上new Thread() {@Overridepublic void run() {WatchCallBack watchCallBack = new WatchCallBack();watchCallBack.setZk(zk);String threadName = Thread.currentThread().getName();watchCallBack.setThreadName(threadName);//每一個(gè)線程://搶鎖watchCallBack.tryLock();//干活System.out.println(threadName + " working...");try {Thread.sleep(100);//模擬干活需要的時(shí)間,如果這里不sleep,會(huì)在第一個(gè)已經(jīng)釋放了,后面的還沒開始watch,看不到第一個(gè)釋放鎖的消息,就斷層了} catch (InterruptedException e) {e.printStackTrace();}//釋放鎖watchCallBack.unLock();}}.start();}while (true) {}} }WatchCallBack.java,分布式鎖的Watcher,里面維護(hù)了一個(gè)CountDownLatch。同時(shí),繼承了接口實(shí)現(xiàn)各種回調(diào)方法,所以既是Watcher,也是Callback,在傳參的時(shí)候直接傳this即可。
package com.msb.zookeeper.lock;import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {ZooKeeper zk;String threadName;CountDownLatch cc = new CountDownLatch(1);String pathName;public String getPathName() {return pathName;}public void setPathName(String pathName) {this.pathName = pathName;}public String getThreadName() {return threadName;}public void setThreadName(String threadName) {this.threadName = threadName;}public ZooKeeper getZk() {return zk;}public void setZk(ZooKeeper zk) {this.zk = zk;}/*** 通過zk創(chuàng)建順序節(jié)點(diǎn)的方式,獲取鎖*/public void tryLock() {try {System.out.println(threadName + " create....");zk.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "str");cc.await();//等待自己是第一個(gè)的時(shí)候,才能返回} catch (InterruptedException e) {e.printStackTrace();}}/*** 釋放鎖*/public void unLock() {try {zk.delete(pathName, -1);System.out.println(threadName + ": finish work....");} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}@Override//process in interface Watcherpublic void process(WatchedEvent event) {//如果第一臺(tái)機(jī)器拿到的鎖釋放了,只有第二臺(tái)機(jī)器收到了回調(diào)事件//如果,不是第一臺(tái)機(jī)器,而是其他的某一臺(tái)機(jī)器掛了,也能造成他后邊的那臺(tái)機(jī)器收到這個(gè)通知,從而讓他后邊那臺(tái)機(jī)器跟去watch掛掉這個(gè)哥們前邊的機(jī)器switch (event.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zk.getChildren("/", false, this, "str");//去調(diào)用getChildren的回調(diào)方法break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}@Override//StringCallbackpublic void processResult(int rc, String path, Object ctx, String name) {if (name != null) {//接收zk幫你創(chuàng)建的順序節(jié)點(diǎn)的名稱System.out.println(threadName + " create node : " + name);pathName = name;zk.getChildren("/", false, this, "str");}}@Override//zk.getChildren 的 callbackpublic void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {//能看到自己前邊的所有節(jié)點(diǎn) List<String> children,對(duì)前面所有的節(jié)點(diǎn)名稱排序Collections.sort(children);int index = children.indexOf(pathName.substring(1));//看自己的位置//判斷自己是否為第一個(gè)if (index == 0) {//如果自己是第一個(gè),執(zhí)行countdownSystem.out.println(threadName + ": I am first....");try {zk.setData("/", threadName.getBytes(), -1);//把自己的鎖信息寫進(jìn)根目錄的node里面去,你可以用get看到,這個(gè)步驟為了拖慢一下速度cc.countDown();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}} else {//自己不是第一個(gè),則watch前一個(gè)zk.exists("/" + children.get(index - 1), this, this, "sdf");}}@Override// StatCallbackpublic void processResult(int rc, String path, Object ctx, Stat stat) {//偷懶,這里省略沒寫} }4、運(yùn)行 & 測(cè)試
輸出結(jié)果如下,可以看到10個(gè)線程在創(chuàng)建之后,都拿到了自己的自增目錄名稱,然后按照自己的目錄名稱順序依次進(jìn)行:拿到鎖,干活,釋放鎖。
WatchedEvent state:SyncConnected type:None path:null In DefaultWatch, SyncConnected,連接成功. Thread-3 create.... Thread-9 create.... Thread-1 create.... Thread-7 create.... Thread-0 create.... Thread-5 create.... Thread-8 create.... Thread-4 create.... Thread-2 create.... Thread-6 create.... Thread-3 create node : /lock0000000071 Thread-9 create node : /lock0000000072 Thread-6 create node : /lock0000000073 Thread-0 create node : /lock0000000074 Thread-5 create node : /lock0000000075 Thread-7 create node : /lock0000000076 Thread-1 create node : /lock0000000077 Thread-4 create node : /lock0000000078 Thread-8 create node : /lock0000000079 Thread-2 create node : /lock0000000080 Thread-3: I am first.... Thread-3 working... Thread-3: finish work.... Thread-9: I am first.... Thread-9 working... Thread-9: finish work.... Thread-6: I am first.... Thread-6 working... Thread-6: finish work.... Thread-0: I am first.... Thread-0 working... Thread-0: finish work.... Thread-5: I am first.... Thread-5 working... Thread-5: finish work.... Thread-7: I am first.... Thread-7 working... Thread-7: finish work.... Thread-1: I am first.... Thread-1 working... Thread-1: finish work.... Thread-4: I am first.... Thread-4 working... Thread-4: finish work.... Thread-8: I am first.... Thread-8 working... Thread-8: finish work.... Thread-2: I am first.... Thread-2 working... Thread-2: finish work....在程序執(zhí)行的同時(shí),從Zookeeper的Client端不斷獲取目錄內(nèi)容,看到整個(gè)過程大致是這樣的:
當(dāng)一個(gè)線程中的任務(wù)執(zhí)行完之后,顯示調(diào)用了zk.delete(pathName, -1);將節(jié)點(diǎn)刪除。這樣就能觸發(fā)下一個(gè)正在watch它的節(jié)點(diǎn)所在的線程,去判斷自己是不是在排名的第一個(gè)。
- 如果是第一個(gè),開始干活
- 如果不是第一個(gè),watch它的前一個(gè)(這種情況出現(xiàn)在前一個(gè)節(jié)點(diǎn)突然掛了的情況)
總結(jié)
以上是生活随笔為你收集整理的ZooKeeper实战(三):ZooKeeper实现分布式配置中心、分布式锁、Reactive响应式模型的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: leetcode 121. 买卖股票的最
- 下一篇: P8-DevOps中的CI/CD环境搭建
