[Curator] Path Cache 的使用与分析
為什么80%的碼農都做不了架構師?>>> ??
Path Cache
Path Cache其實就是用于對zk節點的監聽。不論是子節點的新增、更新或者移除的時候,Path Cache都能對子節點集合的狀態和數據變化做出響應。
1. 關鍵 API
org.apache.curator.framework.recipes.cache.PathChildrenCache
org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent
org.apache.curator.framework.recipes.cache.PathChildrenCacheListener
org.apache.curator.framework.recipes.cache.ChildData
2. 機制說明
PathChildrenCache內部使用一個命令模式來封裝各種操作:
- 操作接口:org.apache.curator.framework.recipes.cache.Operation
- 刷新操作:org.apache.curator.framework.recipes.cache.RefreshOperation
- 觸發事件操作:org.apache.curator.framework.recipes.cache.EventOperation
- 獲取數據操作:org.apache.curator.framework.recipes.cache.GetDataOperation
而這些操作對象,都在構造器中接受PathChildrenCache引用,這樣可以在操作中,處理cache(回調):
EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event) {this.cache = cache;this.event = event; } GetDataOperation(PathChildrenCache cache, String fullPath) {this.cache = cache;this.fullPath = PathUtils.validatePath(fullPath); } RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode) {this.cache = cache;this.mode = mode; }而這些操作,還使用了一個單線程的線程池來調用,從而形成了異步調用。
- 使用了一個private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());來作為線程池的任務接收隊列
- 使用set,避免了并發情況下重復操作
- 由于單線程,使得各種操作都是按序執行的
- 所以為了避免curator的監聽機制阻塞
- 在childrenWatcher以及dataWatcher中,都使用異步執行命令的方式
觸發操作:
void offerOperation(final Operation operation) {if ( operationsQuantizer.add(operation) ){submitToExecutor(new Runnable(){@Overridepublic void run(){try{operationsQuantizer.remove(operation);operation.invoke();}catch ( InterruptedException e ){//We expect to get interrupted during shutdown,//so just ignore these eventsif ( state.get() != State.CLOSED ){handleException(e);}Thread.currentThread().interrupt();}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}}});} } private synchronized void submitToExecutor(final Runnable command) {if ( state.get() == State.STARTED ){executorService.submit(command);} }- 考慮到了各種操作的中斷
- 考慮到了狀態
- 統一操作的異常處理
- 投遞方法submitToExecutor使用了synchronized
- 因為可能監聽器觸發,所以需要對狀態進行檢查
- 如先關閉,然后再被某個監聽器回掉,導致不必要的操作
- 而檢查動作不是原子的,所以需要同步鎖
- 因為可能監聽器觸發,所以需要對狀態進行檢查
3. 用法
3.1 創建
public PathChildrenCache(CuratorFramework client,String path,boolean cacheData)- cacheData
- 如果設置true,是否需要緩存數據
3.2 使用
- Cache必須在使用前調用start()方法
- 有兩個start()方法
- void start()
- 無參
- void start(PathChildrenCache.StartMode mode)
- 可以通過參數,選擇如何初始化
- StartMode
- NORMAL
- BUILD_INITIAL_CACHE
- POST_INITIALIZED_EVENT
- public void addListener(PathChildrenCacheListener listener)
4. 錯誤處理
PathChildrenCache實例會通過ConnectionStateListener監聽鏈接狀態。 如果鏈接狀態發生變化,緩存會被重置(PathChildrenCacheListener會受到一個RESET事件)
5. 源碼分析
5.1 類定義
public class PathChildrenCache implements Closeable{}- 實現了java.io.Closeable接口
5.2 成員變量
public class PathChildrenCache implements Closeable {private final Logger log = LoggerFactory.getLogger(getClass());private final CuratorFramework client;private final String path;private final CloseableExecutorService executorService;private final boolean cacheData;private final boolean dataIsCompressed;private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);private final EnsureContainers ensureContainers;private enum State{LATENT,STARTED,CLOSED}private static final ChildData NULL_CHILD_DATA = new ChildData("/", null, null);private static final boolean USE_EXISTS = Boolean.getBoolean("curator-path-children-cache-use-exists");private volatile Watcher childrenWatcher = new Watcher(){@Overridepublic void process(WatchedEvent event){offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));}};private volatile Watcher dataWatcher = new Watcher(){@Overridepublic void process(WatchedEvent event){try{if ( event.getType() == Event.EventType.NodeDeleted ){remove(event.getPath());}else if ( event.getType() == Event.EventType.NodeDataChanged ){offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));}}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}}};@VisibleForTestingvolatile Exchanger<Object> rebuildTestExchanger;private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener(){@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState){handleStateChange(newState);}};private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache"); }- log
- client
- path
- 緩存對應的zk節點路徑
- executorService
- org.apache.curator.utils.CloseableExecutorService
- 線程池
- 用以執行各種操作
- 參見第2章節
- cacheData
- 是否需要緩存數據
- dataIsCompressed
- 數據是否已壓縮
- listeners
- org.apache.curator.framework.listen.ListenerContainer
- 監聽器容器(管理多個監聽器)
- 業務監聽器
- 可以添加自己的監聽器
- currentData
- java.util.concurrent.ConcurrentMap
- 當前數據
- <String, ChildData>
- 存放著多個org.apache.curator.framework.recipes.cache.ChildData
- initialSet
- AtomicReference
- 初始化集合
- 放置節點,以此來跟蹤各個節點是否初始化
- 如果全部節點都初始化完成,則會觸發PathChildrenCacheEvent.Type.INITIALIZED事件
- operationsQuantizer
- 相當于線程池的任務接收隊列
- state
- 狀態
- AtomicReference
- ensureContainers
- org.apache.curator.framework.EnsureContainers
- 可以線程安全的創建path節點
- State
- 內部枚舉
- LATENT
- STARTED
- CLOSED
- 內部枚舉
- NULL_CHILD_DATA
- 私有常量
- 空數據節點
- USE_EXISTS
- 私有常量
- 使用系統配置中curator-path-children-cache-use-exists的值
- childrenWatcher
- volatile
- 子節點變動的監聽器
- dataWatcher
- volatile
- 數據變動監聽器
- rebuildTestExchanger
- java.util.concurrent.Exchanger
- 用于并發線程間傳值
- 在重建緩存時通過此對象傳遞一個信號對象
- 用于測試
- connectionStateListener
- 鏈接狀態監聽器
- defaultThreadFactory
- 線程工廠
5.3 構造器
public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode) {this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory) {this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) {this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory) {this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory) {this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService) {this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService)); }public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) {this.client = client;this.path = PathUtils.validatePath(path);this.cacheData = cacheData;this.dataIsCompressed = dataIsCompressed;this.executorService = executorService;ensureContainers = new EnsureContainers(client, path); }有7個構造器,最終都是調用最后一個。不過從中也可以看出:
- 默認使用newSingleThreadExecutor單線程線程池
- 默認不對數據進行壓縮處理
5.4 啟動
緩存在使用前需要調用start()
public enum StartMode{NORMAL,BUILD_INITIAL_CACHE,POST_INITIALIZED_EVENT}public void start() throws Exception {start(StartMode.NORMAL); }@Deprecated public void start(boolean buildInitial) throws Exception {start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL); }public void start(StartMode mode) throws Exception {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");mode = Preconditions.checkNotNull(mode, "mode cannot be null");client.getConnectionStateListenable().addListener(connectionStateListener);switch ( mode ){case NORMAL:{offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));break;}case BUILD_INITIAL_CACHE:{rebuild();break;}case POST_INITIALIZED_EVENT:{initialSet.set(Maps.<String, ChildData>newConcurrentMap());offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));break;}} }private void processChildren(List<String> children, RefreshMode mode) throws Exception {Set<String> removedNodes = Sets.newHashSet(currentData.keySet());for ( String child : children ) {removedNodes.remove(ZKPaths.makePath(path, child));}for ( String fullPath : removedNodes ){remove(fullPath);}for ( String name : children ){String fullPath = ZKPaths.makePath(path, name);if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) ){getDataAndStat(fullPath);}updateInitialSet(name, NULL_CHILD_DATA);}maybeOfferInitializedEvent(initialSet.get()); }- 無參的start()
- 默認使用StartMode.NORMAL策略
- 不建議使用的start(boolean buildInitial)
- true
- 使用StartMode.BUILD_INITIAL_CACHE策略
- false
- 使用StartMode.NORMAL策略
- true
- 啟動時添加了鏈接狀態的監聽器
可以看到啟動過程有三種策略:
- 使用RefreshMode.STANDARD刷新模式
- 調用org.apache.curator.framework.recipes.cache.PathChildrenCache#refresh方法
- 調用org.apache.curator.framework.EnsureContainers#ensure創建節點
- 在節點上添加childrenWatcher監聽器
- 回調觸發org.apache.curator.framework.recipes.cache.PathChildrenCache#processChildren進行刷新
- 清理掉已緩存在本地的數據中的其他節點
- 篩選出不是本cache的數據節點
- 從本地初始集合中清理掉
- PathChildrenCacheEvent.Type.CHILD_ADDED事件
- PathChildrenCacheEvent.Type.CHILD_UPDATED事件
- NORMAL模式下,這里為空
- 可以參見POST_INITIALIZED_EVENT模式
- 重新查詢所有需要的數據
- 不會觸發任何事件
- 逐個讀取節點數據和狀態
- 構建ChildData放入currentData
- 參見NORMAL模式,但不同的是
- 更新initialSet時
- 如果initialSet的Map不為空
- POST_INITIALIZED_EVENT模式下,這里已經初始化了Map
- 如果initialSet中的數據都已經同步完成(都不等于NULL_CHILD_DATA)
- 將initialSet制空
- 觸發PathChildrenCacheEvent.Type.INITIALIZED事件
5.5 節點發生變化
在啟動start()已經給path上增加了一個監聽器childrenWatcher
private volatile Watcher childrenWatcher = new Watcher() {@Overridepublic void process(WatchedEvent event){offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));} };- 以RefreshMode.STANDARD模式刷新緩存
- 會對本地的緩存數據和zk節點做比較
- 只是處理新的緩存數據
- 注意操作的參數PathChildrenCache.this
- this不同了
5.6 數據發生變化
在每次獲取緩存數據時(getDataAndStat方法),在每個緩存上添加了監聽器dataWatcher:
private volatile Watcher dataWatcher = new Watcher() {@Overridepublic void process(WatchedEvent event){try{if ( event.getType() == Event.EventType.NodeDeleted ){remove(event.getPath());}else if ( event.getType() == Event.EventType.NodeDataChanged ){offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));}}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}} };- 節點刪除時
- 清理緩存
- 觸發PathChildrenCacheEvent.Type.CHILD_REMOVED事件
- 數據發生變化時
- 執行GetDataOperation操作
- 也就是再次執行getDataAndStat方法
- 執行GetDataOperation操作
- 注意操作的參數PathChildrenCache.this
- this不同了
5.7 獲取當前數據
public List<ChildData> getCurrentData() {return ImmutableList.copyOf(Sets.<ChildData>newTreeSet(currentData.values())); }public ChildData getCurrentData(String fullPath) {return currentData.get(fullPath); }都是從本地數據中獲取
5.8 清理
5.8.1 清理緩存
public void clear() {currentData.clear(); } public void clearAndRefresh() throws Exception {currentData.clear();offerOperation(new RefreshOperation(this, RefreshMode.STANDARD)); }清空本地數據
如果需要則使用RefreshMode.STANDARD模式,刷新
5.8.2 清理緩存數據
public void clearDataBytes(String fullPath) {clearDataBytes(fullPath, -1); } public boolean clearDataBytes(String fullPath, int ifVersion) {ChildData data = currentData.get(fullPath);if ( data != null ){if ( (ifVersion < 0) || (ifVersion == data.getStat().getVersion()) ){if ( data.getData() != null ){currentData.replace(fullPath, data, new ChildData(data.getPath(), data.getStat(), null));}return true;}}return false; }保留緩存信息,但是數據部分制空
5.9 鏈接狀態變化
在啟動時(start())中為鏈接添加了connectionStateListener監聽器:
private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState){handleStateChange(newState);} };private void handleStateChange(ConnectionState newState) {switch ( newState ){case SUSPENDED:{offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null)));break;}case LOST:{offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null)));break;}case CONNECTED:case RECONNECTED:{try{offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));}catch ( Exception e ){ThreadUtils.checkInterrupted(e);handleException(e);}break;}} }主要都是根據鏈接狀態,觸發不同的操作,以及觸發業務監聽器來執行。
- 由于數據都是緩存,所以在鏈接丟失,中斷時,僅僅時觸發事件,并沒有將數據置為不可用
- 當鏈接建立CONNECTED,以及恢復時RECONNECTED都觸發了一次RefreshMode.FORCE_GET_DATA_AND_STAT模式的刷新操作。
5.10 關閉
在使用完之后,需要調用close()方法:
public void close() throws IOException {if ( state.compareAndSet(State.STARTED, State.CLOSED) ){client.getConnectionStateListenable().removeListener(connectionStateListener);listeners.clear();executorService.close();client.clearWatcherReferences(childrenWatcher);client.clearWatcherReferences(dataWatcher);// TODO// This seems to enable even more GC - I'm not sure why yet - it// has something to do with Guava's cache and circular referencesconnectionStateListener = null;childrenWatcher = null;dataWatcher = null;} }- 原子操作,將狀態更新為CLOSED
- 移除鏈接狀態監聽器
- 清空業務監聽器
- 關閉線程池
- 清空節點監聽器
- 清空數據監聽器
6. 小結
PathChildrenCache雖然名字帶有Cache。 但其實并不是一個完整的緩存。
應該說,它僅僅是對path下諸多節點進行統一的管理。 當這些節點發生變動,或者數據發生變化時,都可以被PathChildrenCache發現,并同步到本地Map中。以此來達到一個緩存的概念。
從API中也能發現,它只能獲取數據。至于放置緩存,則需要另外實現。
- 其實也簡單,直接向path下新建節點并寫入數據就行
可以通過getListenable().addListener(listener);添加自定義監聽器,從而實現對緩存進行更細致的控制。
7. 示例
這里可以參考官方的示例
轉載于:https://my.oschina.net/roccn/blog/918209
總結
以上是生活随笔為你收集整理的[Curator] Path Cache 的使用与分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (转)Arcgis for Js之Gra
- 下一篇: 在SpringMVC中使用Jackson