Apache ZooKeeper - Watch 机制的底层原理
文章目錄
- Watch 機制
- API 使用
- Watch 機制的底層原理
- 客戶端 Watch 注冊實現過程 ZKWatchManager
- 服務端 Watch 注冊實現過程 WatchManager
- 服務端 Watch 事件的觸發過程
- 客戶端回調的處理過程
- 小結
- 實現一個分布式的發布訂閱功能
Watch 機制
ZooKeeper 又一關鍵技術——Watch 監控機制 。
API 使用
ZooKeeper 的客戶端可以通過 Watch 機制來訂閱當服務器上某一節點的數據或狀態發生變化時收到相應的通知,我們可以通過向 ZooKeeper 客戶端的構造方法中傳遞 Watcher 參數的方式實現
new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)-
connectString 服務端地址
-
sessionTimeout:超時時間
-
Watcher:監控事件
Watcher 將作為整個 ZooKeeper 會話期間的上下文 ,一直被保存在客戶端 ZKWatchManager 的 defaultWatcher 中 .
除此之外,ZooKeeper 客戶端也可以通過 getData、exists 和 getChildren 三個接口來向 ZooKeeper 服務器注冊 Watcher,從而方便地在不同的情況下添加 Watch 事件
getData(String path, Watcher watcher, Stat stat)知道了 ZooKeeper 添加服務器監控事件的方式,下面我們來講解一下觸發通知的條件
上圖中列出了客戶端在不同會話狀態下,相應的在服務器節點所能支持的事件類型。
例如在客戶端連接服務端的時候,可以對數據節點的創建、刪除、數據變更、子節點的更新等操作進行監控。
Watch 機制的底層原理
其結構很像設計模式中的”觀察者模式“,一個對象或者數據節點可能會被多個客戶端監控,當對應事件被觸發時,會通知這些對象或客戶端。
我們可以將 Watch 機制理解為是分布式環境下的觀察者模式。
所以接下來就以觀察者模式的角度點來看看 ZooKeeper 底層 Watch 是如何實現的。
通常我們在實現觀察者模式時,最核心或者說關鍵的代碼就是創建一個列表來存放觀察者。
而在 ZooKeeper 中則是在客戶端和服務器端分別實現兩個存放觀察者列表,即:ZKWatchManager 和 WatchManager。
其核心操作就是圍繞著這兩個展開的。
客戶端 Watch 注冊實現過程 ZKWatchManager
在發送一個 Watch 監控事件的會話請求時,ZooKeeper 客戶端主要做了兩個工作:
1. 標記該會話是一個帶有 Watch 事件的請求
2. 將 Watch 事件存儲到 ZKWatchManager
我們以 getData 接口為例子
當發送一個帶有 Watch 事件的請求時,客戶端首先會把該會話標記為帶有 Watch 監控的事件請求,之后通過 DataWatchRegistration 類來保存 watcher 事件和節點的對應關系:
public byte[] getData(final String path, Watcher watcher, Stat stat){...WatchRegistration wcb = null;if (watcher != null) {wcb = new DataWatchRegistration(watcher, clientPath);}RequestHeader h = new RequestHeader();request.setWatch(watcher != null);...GetDataResponse response = new GetDataResponse();ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);}之后客戶端向服務器發送請求時,是將請求封裝成一個 Packet 對象,并添加到一個等待發送隊列 outgoingQueue 中:
public Packet queuePacket(RequestHeader h, ReplyHeader r,...) {Packet packet = null;...packet = new Packet(h, r, request, response, watchRegistration);...outgoingQueue.add(packet); ...return packet;}最后,ZooKeeper 客戶端就會向服務器端發送這個請求,完成請求發送后。調用負責處理服務器響應的 SendThread 線程類中的 readResponse 方法接收服務端的回調,并在最后執行 finishPacket()方法將 Watch 注冊到 ZKWatchManager 中:
private void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}...}服務端 Watch 注冊實現過程 WatchManager
下面我們來看一下服務端是如何處理一個 Watch 事件。
Zookeeper 服務端處理 Watch 事件基本有 2 個過程:
1. 解析收到的請求是否帶有 Watch 注冊事件
2. 將對應的 Watch 事件存儲到 WatchManager
當 ZooKeeper 服務器接收到一個客戶端請求后,首先會對請求進行解析,判斷該請求是否包含 Watch 事件.
ZooKeeper 底層是通過 FinalRequestProcessor 類中的 processRequest 函數實現的。當 getDataRequest.getWatch() 值為 True 時,表明該請求需要進行 Watch 監控注冊。并通過 zks.getZKDatabase().getData 函數將 Watch 事件注冊到服務端的 WatchManager 中
public void processRequest(Request request) {...byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);rsp = new GetDataResponse(b, stat);..}服務端 Watch 事件的觸發過程
在客戶端和服務端都對 watch 注冊完成后,我們接下來看一下在 ZooKeeper 中觸發一個 Watch 事件的底層實現過程:
以 setData 接口即“節點數據內容發生變更”事件為例。在 DataTree#setData 方法內部執行完對節點數據的變更后,會調用 WatchManager.triggerWatch 方法觸發數據變更事件。
public Stat setData(String path, byte data[], ...){Stat s = new Stat();DataNode n = nodes.get(path);...dataWatches.triggerWatch(path, EventType.NodeDataChanged);return s;}那看下 triggerWatch
首先,封裝了一個具有會話狀態、事件類型、數據節點 3 種屬性的 WatchedEvent 對象。之后查詢該節點注冊的 Watch 事件,如果為空說明該節點沒有注冊過 Watch 事件。如果存在 Watch 事件則添加到定義的 Wathcers 集合中,并在 WatchManager 管理中刪除。最后,通過調用 process 方法向客戶端發送通知。
Set<Watcher> triggerWatch(String path, EventType type...) {WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);Set<Watcher> watchers;synchronized (this) {watchers = watchTable.remove(path);...for (Watcher w : watchers) {Set<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}w.process(e);}return watchers;}客戶端回調的處理過程
知道了服務器端 Watch 事件的觸發過程后,我們來看一下客戶端接收到通知后如何進行操作的。
客戶端使用 SendThread.readResponse() 方法來統一處理服務端的相應。
首先反序列化服務器發送請求頭信息 replyHdr.deserialize(bbia, “header”),并判斷相屬性字段 xid 的值為 -1,表示該請求響應為通知類型。在處理通知類型時,首先將己收到的字節流反序列化轉換成 WatcherEvent 對象。
接著判斷客戶端是否配置了 chrootPath 屬性,如果為 True 說明客戶端配置了 chrootPath 屬性。需要對接收到的節點路徑進行 chrootPath 處理。
最后調用 eventThread.queueEvent( )方法將接收到的事件交給 EventThread 線程進行處理
if (replyHdr.getXid() == -1) {...WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");...if (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");...event.setPath(serverPath.substring(chrootPath.length()));...}WatchedEvent we = new WatchedEvent(event);...eventThread.queueEvent( we );}接下來我們來看一下 EventThread.queueEvent() 方法內部的執行邏輯。
其主要工作分為 2 點:
第 1 步按照通知的事件類型,從 ZKWatchManager 中查詢注冊過的客戶端 Watch 信息。客戶端在查詢到對應的 Watch 信息后,會將其從 ZKWatchManager 的管理中刪除。因此這里也請你多注意,客戶端的 Watcher 機制是一次性的,觸發后就會被刪除。
完成了第 1 步工作獲取到對應的 Watcher 信息后,將查詢到的 Watcher 存儲到 waitingEvents 隊列中,調用 EventThread 類中的 run 方法會循環取出在 waitingEvents 隊列中等待的 Watcher 事件進行處理。
public void run() {try {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}...}最后調用 processEvent(event) 方法來最終執行實現了 Watcher 接口的 process()方法。
private void processEvent(Object event) {...if (event instanceof WatcherSetEventPair) {WatcherSetEventPair pair = (WatcherSetEventPair) event;for (Watcher watcher : pair.watchers) {try {watcher.process(pair.event);} catch (Throwable t) {LOG.error("Error while calling watcher ", t);}}}}小結
ZooKeeper 中 Watch 機制的,大體上ZooKeeper 實現的方式是通過客服端和服務端分別創建有觀察者的信息列表。客戶端調用 getData、exist 等接口時,首先將對應的 Watch 事件放到本地的 ZKWatchManager 中進行管理。服務端在接收到客戶端的請求后根據請求類型判斷是否含有 Watch 事件,并將對應事件放到 WatchManager 中進行管理。
在事件觸發的時候服務端通過節點的路徑信息查詢相應的 Watch 事件通知給客戶端,客戶端在接收到通知后,首先查詢本地的 ZKWatchManager 獲得對應的 Watch 信息處理回調操作。
這種設計不但實現了一個分布式環境下的觀察者模式,而且通過將客戶端和服務端各自處理 Watch 事件所需要的額外信息分別保存在兩端,減少彼此通信的內容,提升了服務的處理性能。
實現一個分布式的發布訂閱功能
來搞個實際應用來加深我們對 ZooKeeper 中 Watch 機制的理解。
提到 ZooKeeper 的應用場景,可能第一時間會想到最為典型的發布訂閱功能。
發布訂閱功能可以看作是一個一對多的關系,即一個服務或數據的發布者可以被多個不同的消費者調用。一般一個發布訂閱模式的數據交互可以分為消費者主動請求生產者信息的拉取模式,和生產者數據變更時主動推送給消費者的推送模式。ZooKeeper 采用了兩種模式結合的方式實現訂閱發布功能。
下面我們來分析一個具體案例:
在系統開發的過程中會用到各種各樣的配置信息,如數據庫配置項、第三方接口、服務地址等,我們可以用配置管理功能自動完成服務器配置信息的維護,利用ZooKeeper 的發布訂閱功能就能解決這個問題。
可以把諸如數據庫配置項這樣的信息存儲在 ZooKeeper 數據節點中。比如下圖中的 /confs/data_item1。
服務器集群客戶端對該節點添加 Watch 事件監控,當集群中的服務啟動時,會讀取該節點數據獲取數據配置信息。而當該節點數據發生變化時,ZooKeeper 服務器會發送 Watch 事件給各個客戶端,集群中的客戶端在接收到該通知后,重新讀取節點的數據庫配置信息。
我們使用 Watch 機制實現了一個分布式環境下的配置管理功能,通過對 ZooKeeper 服務器節點添加數據變更事件,實現當數據庫配置項信息變更后,集群中的各個客戶端能接收到該變更事件的通知,并獲取最新的配置信息。要注意一點是,我們提到 Watch 具有一次性,所以當我們獲得服務器通知后要再次添加 Watch 事件。
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Apache ZooKeeper - Watch 机制的底层原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache ZooKeeper - Z
- 下一篇: Apache ZooKeeper - 使