ZooKeeper Java Example(官方例子)
為了向您介紹ZooKeeper Java API,我們?cè)谶@里開發(fā)了一個(gè)非常簡單的觀看客戶端。該ZooKeeper客戶端通過啟動(dòng)或停止程序來觀察ZooKeeper節(jié)點(diǎn)的更改并進(jìn)行響應(yīng)。
要求
?有四個(gè)要求:
? ? 1.它作為參數(shù):
????????ZooKeeper服務(wù)的地址
????????那么znode的名字就是被觀看的
????????具有參數(shù)的可執(zhí)行文件
? ? 2.它獲取與znode相關(guān)聯(lián)的數(shù)據(jù),并啟動(dòng)可執(zhí)行文件。
? ? 3.如果znode更改,客戶端將重新啟動(dòng)內(nèi)容并重新啟動(dòng)可執(zhí)行文件。
? ? 4.如果znode消失,客戶端將殺死可執(zhí)行文件。
程序設(shè)計(jì)
通常,ZooKeeper應(yīng)用程序分為兩個(gè)單元,一個(gè)維護(hù)連接,另一個(gè)用于監(jiān)視數(shù)據(jù)。在此應(yīng)用程序中,名為Executor的類維護(hù)ZooKeeper連接,并且名為DataMonitor的類監(jiān)視ZooKeeper樹中的數(shù)據(jù)。
此外,Executor包含主線程并包含執(zhí)行邏輯。它負(fù)責(zé)什么樣的用戶交互,以及與您作為參數(shù)傳遞的可執(zhí)行程序的交互以及根據(jù)znode的狀態(tài)關(guān)閉和重新啟動(dòng)示例。
1.?Executor.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | package?com.hellojd.cloud; import?org.apache.zookeeper.KeeperException; import?org.apache.zookeeper.WatchedEvent; import?org.apache.zookeeper.Watcher; import?org.apache.zookeeper.ZooKeeper; import?java.io.FileOutputStream; import?java.io.IOException; import?java.io.InputStream; import?java.io.OutputStream; /** ?*?A?simple?example?program?to?use?DataMonitor?to?start?and ?*?stop?executables?based?on?a?znode.?The?program?watches?the ?*?specified?znode?and?saves?the?data?that?corresponds?to?the ?*?znode?in?the?filesystem.?It?also?starts?the?specified?program ?*?with?the?specified?arguments?when?the?znode?exists?and?kills ?*?the?program?if?the?znode?goes?away. ?*/ public?class?Executor ????????implements?Watcher,?Runnable,?DataMonitor.DataMonitorListener { ????DataMonitor?dm; ????ZooKeeper?zk; ????String?filename; ????String?exec[]; ????Process?child; ????public?Executor(String?hostPort,?String?znode,?String?filename, ????????????????????String?exec[])?throws?KeeperException,?IOException?{ ????????this.filename?=?filename; ????????this.exec?=?exec; ????????zk?=?new?ZooKeeper(hostPort,?3000,?this); ????????dm?=?new?DataMonitor(zk,?znode,?null,?this); ????} ????/** ?????*?@param?args ?????*/ ????public?static?void?main(String[]?args)?{ ????????if?(args.length?<?4)?{ ????????????System.err ????????????????????.println("USAGE:?Executor?hostPort?znode?filename?program?[args?...]"); ????????????System.exit(2); ????????} ????????String?hostPort?=?args[0]; ????????String?znode?=?args[1]; ????????String?filename?=?args[2]; ????????String?exec[]?=?new?String[args.length?-?3]; ????????System.arraycopy(args,?3,?exec,?0,?exec.length); ????????try?{ ????????????new?Executor(hostPort,?znode,?filename,?exec).run(); ????????}?catch?(Exception?e)?{ ????????????e.printStackTrace(); ????????} ????} ????/*************************************************************************** ?????*?We?do?process?any?events?ourselves,?we?just?need?to?forward?them?on. ?????* ?????*?@see?org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent) ?????*/ ????public?void?process(WatchedEvent?event)?{ ????????System.out.println("Watcher?process"); ????????dm.process(event); ????} ????public?void?run()?{ ????????try?{ ????????????synchronized?(this)?{ ????????????????while?(!dm.dead)?{ ????????????????????wait(); ????????????????} ????????????} ????????}?catch?(InterruptedException?e)?{ ????????} ????} ????//以響應(yīng)ZooKeeper連接永久消失。 ????public?void?closing(int?rc)?{ ????????synchronized?(this)?{ ????????????notifyAll(); ????????} ????} ????static?class?StreamWriter?extends?Thread?{ ????????OutputStream?os; ????????InputStream?is; ????????StreamWriter(InputStream?is,?OutputStream?os)?{ ????????????this.is?=?is; ????????????this.os?=?os; ????????????start(); ????????} ????????public?void?run()?{ ????????????byte?b[]?=?new?byte[80]; ????????????int?rc; ????????????try?{ ????????????????while?((rc?=?is.read(b))?>?0)?{ ????????????????????os.write(b,?0,?rc); ????????????????} ????????????}?catch?(IOException?e)?{ ????????????} ????????} ????} ????public?void?exists(byte[]?data)?{ ????????if?(data?==?null)?{ ????????????if?(child?!=?null)?{ ????????????????System.out.println("Killing?process"); ????????????????child.destroy(); ????????????????try?{ ????????????????????child.waitFor(); ????????????????}?catch?(InterruptedException?e)?{ ????????????????} ????????????} ????????????child?=?null; ????????}?else?{ ????????????if?(child?!=?null)?{ ????????????????System.out.println("Stopping?child"); ????????????????child.destroy(); ????????????????try?{ ????????????????????child.waitFor(); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????} ????????????//保存znode數(shù)據(jù)至文件 ????????????try?{ ????????????????FileOutputStream?fos?=?new?FileOutputStream(filename); ????????????????fos.write(data); ????????????????fos.close(); ????????????}?catch?(IOException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????????try?{ ????????????????System.out.println("Starting?child"); ????????????????child?=?Runtime.getRuntime().exec(exec); ????????????????new?StreamWriter(child.getInputStream(),?System.out); ????????????????new?StreamWriter(child.getErrorStream(),?System.err); ????????????}?catch?(IOException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????} ????} } |
2.?DataMonitor.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | /** ?*?A?simple?class?that?monitors?the?data?and?existence?of?a?ZooKeeper ?*?node.?It?uses?asynchronous?ZooKeeper?APIs. ?*/ package?com.hellojd.cloud; import?java.util.Arrays; import?org.apache.zookeeper.KeeperException; import?org.apache.zookeeper.WatchedEvent; import?org.apache.zookeeper.Watcher; import?org.apache.zookeeper.ZooKeeper; import?org.apache.zookeeper.AsyncCallback.StatCallback; import?org.apache.zookeeper.KeeperException.Code; import?org.apache.zookeeper.data.Stat; /** ?*?另一方面,DataMonitorListener接口不是ZooKeeper?API的一部分。?它是一個(gè)完全定制的界面,專為此示例應(yīng)用程序而設(shè)計(jì)。 ?*?DataMonitor對(duì)象使用它來回傳給它的容器,它也是Executor對(duì)象。 ?*/ public?class?DataMonitor?implements??StatCallback?{ ????//Executor或一些類似Executor的對(duì)象“擁有”ZooKeeper連接,但可以將事件委托給其他事件到其他對(duì)象。 ????ZooKeeper?zk; ????String?znode; ????Watcher?chainedWatcher; ????boolean?dead; ????//簡單地將這些事件轉(zhuǎn)發(fā)到DataMonitor來決定如何處理它們 ????DataMonitorListener?listener; ????byte?prevData[]; ????//?它主要是異步和事件驅(qū)動(dòng) ????public?DataMonitor(ZooKeeper?zk,?String?znode,?Watcher?chainedWatcher, ???????????????????????DataMonitorListener?listener)?{ ????????this.zk?=?zk; ????????this.znode?=?znode; ????????this.chainedWatcher?=?chainedWatcher; ????????this.listener?=?listener; ????????//?Get?things?started?by?checking?if?the?node?exists.?We?are?going ????????//?to?be?completely?event?driven ????????zk.exists(znode,?true,?this,?null); ????} ????/** ?????*?該接口在DataMonitor類中定義,并在Executor類中實(shí)現(xiàn)。?當(dāng)調(diào)用Executor.exists()時(shí),執(zhí)行器根據(jù)要求決定是啟動(dòng)還是關(guān)閉。 ?????*?當(dāng)znode不再存在時(shí),需要說的是殺死可執(zhí)行文件。 ?????*/ ????public?interface?DataMonitorListener?{ ????????/** ?????????*?The?existence?status?of?the?node?has?changed. ?????????*/ ????????void?exists(byte?data[]); ????????/** ?????????*?The?ZooKeeper?session?is?no?longer?valid. ?????????* ?????????*?@param?rc ?????????*????????????????the?ZooKeeper?reason?code ?????????*/ ????????void?closing(int?rc); ????} ????//響應(yīng)ZooKeeper狀態(tài)的更改 ????public?void?process(WatchedEvent?event)?{ ????????String?path?=?event.getPath(); ????????if?(event.getType()?==?Watcher.Event.EventType.None)?{ ????????????//?We?are?are?being?told?that?the?state?of?the ????????????//?connection?has?changed ????????????switch?(event.getState())?{ ????????????????case?SyncConnected: ????????????????????//?In?this?particular?example?we?don't?need?to?do?anything ????????????????????//?here?-?watches?are?automatically?re-registered?with ????????????????????//?server?and?any?watches?triggered?while?the?client?was ????????????????????//?disconnected?will?be?delivered?(in?order?of?course) ????????????????????break; ????????????????case?Expired: ????????????????????//?It's?all?over ????????????????????dead?=?true; ????????????????????listener.closing(KeeperException.Code.SessionExpired); ????????????????????break; ????????????} ????????}?else?{ ????????????if?(path?!=?null?&&?path.equals(znode))?{ ????????????????//?Something?has?changed?on?the?node,?let's?find?out ????????????????zk.exists(znode,?true,?this,?null); ????????????} ????????} ????????if?(chainedWatcher?!=?null)?{ ????????????chainedWatcher.process(event); ????????} ????} ????public?void?processResult(int?rc,?String?path,?Object?ctx,?Stat?stat)?{ /** ?*?首先檢查znode存在,致命錯(cuò)誤和可恢復(fù)錯(cuò)誤的錯(cuò)誤代碼。 ?*?如果文件(或znode)存在,它將從znode獲取數(shù)據(jù),然后調(diào)用Executor的exists()回調(diào), ?*?如果狀態(tài)已更改。?注意,它不必對(duì)getData調(diào)用執(zhí)行異常處理,因?yàn)樗哂袙炱鸬娜魏慰赡軐?dǎo)致錯(cuò)誤的監(jiān)視器: ?*?如果節(jié)點(diǎn)在調(diào)用ZooKeeper.getData()之前被刪除,則由ZooKeeper設(shè)置的監(jiān)視事件?.exists()觸發(fā)回調(diào); ?*如果發(fā)生通信錯(cuò)誤,連接回顯將觸發(fā)連接監(jiān)視事件。 ?*/ ????????boolean?exists; ????????switch?(rc)?{ ????????????case?Code.Ok: ????????????????exists?=?true; ????????????????break; ????????????case?Code.NoNode: ????????????????exists?=?false; ????????????????break; ????????????case?Code.SessionExpired: ????????????case?Code.NoAuth: ????????????????dead?=?true; ????????????????listener.closing(rc); ????????????????return; ????????????default: ????????????????//?Retry?errors ????????????????zk.exists(znode,?true,?this,?null); ????????????????return; ????????} ????????//文件(或znode)存在 ????????byte?b[]?=?null; ????????if?(exists)?{ ????????????try?{ ????????????????b?=?zk.getData(znode,?false,?null); ????????????}?catch?(KeeperException?e)?{ ????????????????//?We?don't?need?to?worry?about?recovering?now.?The?watch ????????????????//?callbacks?will?kick?off?any?exception?handling ????????????????e.printStackTrace(); ????????????}?catch?(InterruptedException?e)?{ ????????????????return; ????????????} ????????} ????????if?((b?==?null?&&?b?!=?prevData) ????????????????||?(b?!=?null?&&?!Arrays.equals(prevData,?b)))?{ ????????????listener.exists(b); ????????????prevData?=?b; ????????} ????} } |
調(diào)試:
參數(shù)列表:192.168.0.10:2181 /hellojd_node filename calc
192.168.0.10:2181:ZK地址
/hellojd_node :監(jiān)視node?
filename :備份數(shù)據(jù)文件
calc:命令
本文轉(zhuǎn)自 randy_shandong 51CTO博客,原文鏈接:http://blog.51cto.com/dba10g/1975090,如需轉(zhuǎn)載請(qǐng)自行聯(lián)系原作者
總結(jié)
以上是生活随笔為你收集整理的ZooKeeper Java Example(官方例子)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hdfs web_ui深入讲解、服务启动
- 下一篇: nginx反向代理配置