public Stat setData(String path,byte data[],int version,long zxid,long time)throws KeeperException.NoNodeException {Stat s =newStat();DataNode n = nodes.get(path);if(n == null){thrownewKeeperException.NoNodeException();}byte lastdata[]= null;synchronized(n){lastdata = n.data;n.data = data;n.stat.setMtime(time);n.stat.setMzxid(zxid);n.stat.setVersion(version);n.copyStat(s);}// now update if the path is in a quota subtree.String lastPrefix;if((lastPrefix =getMaxPrefixWithQuota(path))!= null){this.updateBytes(lastPrefix,(data == null ?0: data.length)-(lastdata == null ?0: lastdata.length));}dataWatches.triggerWatch(path, EventType.NodeDataChanged);return s;}
以上setData方法流程就兩個步驟:
利用Path從存儲節點的ConcurrentHashMap中獲取節點信息,
修改節點信息
調用WatchManager 的triggerWatch方法
可以看到以上代碼是通過調用WatchManager的triggerWatch方法來觸發相關事件
public Set<Watcher>triggerWatch(String path, EventType type, Set<Watcher> supress){WatchedEvent e =newWatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;synchronized(this){watchers = watchTable.remove(path);......for(Watcher w : watchers){HashSet<String> paths = watch2Paths.get(w);if(paths != null){paths.remove(path);}}}for(Watcher w : watchers){if(supress != null && supress.contains(w)){continue;}w.process(e);}return watchers;}
publicabstractvoidprocess(WatchedEvent event);synchronizedpublicvoidprocess(WatchedEvent event){ReplyHeader h =newReplyHeader(-1,-1L,0);if(LOG.isTraceEnabled()){ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,"Deliver event "+ event +" to 0x"+ Long.toHexString(this.sessionId)+" through "+this);}// Convert WatchedEvent to a type that can be sent over the wireWatcherEvent e = event.getWrapper();sendResponse(h, e,"notification");}
classSendThreadextendsZooKeeperThread{privatelong lastPingSentNs;privatefinal ClientCnxnSocket clientCnxnSocket;private Random r =newRandom(System.nanoTime());privateboolean isFirstConnect =true;voidreadResponse(ByteBuffer incomingBuffer)throws IOException {ByteBufferInputStream bbis =newByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr =newReplyHeader();replyHdr.deserialize(bbia,"header");......if(replyHdr.getXid()==-1){// -1 means notificationif(LOG.isDebugEnabled()){LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event =newWatcherEvent();event.deserialize(bbia,"response");// convert from a server path to a client pathif(chrootPath != null){String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");elseif(serverPath.length()> chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else{LOG.warn("Got server path "+ event.getPath()+" which is too short for chroot path "+ chrootPath);}}WatchedEvent we =newWatchedEvent(event);if(LOG.isDebugEnabled()){LOG.debug("Got "+ we +" for sessionid 0x"+ Long.toHexString(sessionId));}eventThread.queueEvent( we );return;}......}
publicvoidqueueEvent(WatchedEvent event){if(event.getType()== EventType.None&& sessionState == event.getState()){return;}sessionState = event.getState();// materialize the watchers based on the eventWatcherSetEventPair pair =newWatcherSetEventPair(watcher.materialize(event.getState(), event.getType(),event.getPath()),event);// queue the pair (watch set & event) for later processingwaitingEvents.add(pair);}