Apache ZooKeeper - 使用Apache Curator操作ZK
文章目錄
- 原生ZK API VS Curator
- Curator 概述
- Maven依賴
- 會(huì)話創(chuàng)建
- 靜態(tài)工廠方式創(chuàng)建會(huì)話
- 使用 fluent 風(fēng)格創(chuàng)建會(huì)話
- 創(chuàng)建節(jié)點(diǎn)
- protection 模式 ,規(guī)避僵尸節(jié)點(diǎn)
- 獲取數(shù)據(jù)
- 修改數(shù)據(jù)
- 刪除數(shù)據(jù) guaranteed()
- 獲取子節(jié)點(diǎn)
- 異步線程池
原生ZK API VS Curator
Apache ZooKeeper - 使用原生的API操作ZK
ZooKeeper原生Java API的不足之處:
- 連接zk超時(shí)時(shí),不支持自動(dòng)重連,需要手動(dòng)操作
- Watch注冊(cè)一次就會(huì)失效,需手工反復(fù)注冊(cè)
- 不支持遞歸創(chuàng)建節(jié)點(diǎn)
- 異步支持,沒(méi)有線程池
- …
Apache curator:
- 解決Watch注冊(cè)一次就會(huì)失效的問(wèn)題
- API 更加簡(jiǎn)單易用、封裝了常用的ZooKeeper工具類
- 使用Curator實(shí)現(xiàn)比如分布式鎖等需求更簡(jiǎn)單
- 異步執(zhí)行,支持自定義線程池
- …
Curator是netflix公司開源的一套zookeeper客戶端,Apache的頂級(jí)項(xiàng)目
與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡(jiǎn)化了Zookeeper客戶端的開發(fā)量
Curator解決了很多zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊(cè)wathcer和NodeExistsException 異常等
Curator 概述
Apache Curator : https://curator.apache.org/
看看模塊
-
curator-framework:對(duì)zookeeper的底層api的一些封裝
-
curator-client:提供一些客戶端的操作,例如重試策略等
-
curator-recipes:封裝了一些高級(jí)特性,如:Cache事件監(jiān)聽(tīng)、選舉、分布式鎖、分布式計(jì)數(shù)器、分布式Barrier等
Maven依賴
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency>會(huì)話創(chuàng)建
和客戶端/ 服務(wù)器交互,第一步就要?jiǎng)?chuàng)建會(huì)話
Curator 提供了多種方式創(chuàng)建會(huì)話
靜態(tài)工廠方式創(chuàng)建會(huì)話
RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);CuratorFramework client = CuratorFrameworkFactory.newClient(getConnectStr(), retryPolicy);client .start();使用 fluent 風(fēng)格創(chuàng)建會(huì)話
RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);curatorFramework = CuratorFrameworkFactory.builder().connectString(getConnectStr()).retryPolicy(retryPolicy).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).canBeReadOnly(true).build();curatorFramework.start();上述代碼采用了流式方式,最核心的類是 CuratorFramework 類,該類的作用是定義一個(gè) ZooKeeper 客戶端對(duì)象,并在之后的上下文中使用。
在定義 CuratorFramework 對(duì)象實(shí)例的時(shí)候, 使用了 CuratorFrameworkFactory 工廠方法,并指定了 connectionString服務(wù)器地址列表、retryPolicy 重試策略 、sessionTimeoutMs 會(huì)話超時(shí)時(shí)間、connectionTimeoutMs 會(huì)話創(chuàng)建超時(shí)時(shí)間。
- connectionString:服務(wù)器地址列表,在指定服務(wù)器地址列表的時(shí)候可以是一個(gè)地址,也可以是多個(gè)地址。如果是多個(gè)地址,那么每個(gè)服務(wù)器地址列表用逗號(hào)分隔, 如 host1:port1,host2:port2,host3:port3
- retryPolicy:重試策略,當(dāng)客戶端異常退出或者與服務(wù)端失去連接的時(shí)候,可以通過(guò)設(shè)置客戶端重新連接 ZooKeeper 服務(wù)端。而 Curator 提供了 一次重試、多次重試等不同種類的實(shí)現(xiàn)方式。在 Curator 內(nèi)部,可以通過(guò)判斷服務(wù)器返回的 keeperException 的狀態(tài)代碼來(lái)判斷是否進(jìn)行重試處理,如果返回的是 OK 表示一切操作都沒(méi)有問(wèn)題,而 SYSTEMERROR 表示系統(tǒng)或服務(wù)端錯(cuò)誤
| ExponentialBackoffRetry | 重試一組次數(shù),重試之間的睡眠時(shí)間增加 |
| RetryNTimes | 重試最大次數(shù) |
| RetryOneTime | 只重試一次 |
| RetryUntilElapsed | 在給定的時(shí)間結(jié)束之前重試 |
-
sessionTimeoutMs 超時(shí)時(shí)間:Curator 客戶端創(chuàng)建過(guò)程中,有兩個(gè)超時(shí)時(shí)間的設(shè)置。一個(gè)是 sessionTimeoutMs會(huì)話超時(shí)時(shí)間,用來(lái)設(shè)置該條會(huì)話在 ZooKeeper 服務(wù)端的失效時(shí)間。
-
另一個(gè)是 connectionTimeoutMs 客戶端創(chuàng)建會(huì)話的超時(shí)時(shí)間,用來(lái)限制客戶端發(fā)起一個(gè)會(huì)話連接到接收Z(yǔ)ooKeeper 服務(wù)端應(yīng)答的時(shí)間。sessionTimeoutMs 作用在服務(wù)端,而connectionTimeoutMs 作用在客戶端。
創(chuàng)建節(jié)點(diǎn)
/*** 遞歸創(chuàng)建子節(jié)點(diǎn)*/@SneakyThrows@Testpublic void testCreateWithParent() {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/artisan-node/artisan-node-sub1/artisan-node-sub1-1";String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{} successfully.", path);}使用 create 函數(shù)創(chuàng)建數(shù)據(jù)節(jié)點(diǎn),并通過(guò) withMode 函數(shù)指定節(jié)點(diǎn)類型持久化節(jié)點(diǎn),臨時(shí)節(jié)點(diǎn),順序節(jié)點(diǎn),臨時(shí)順序節(jié)點(diǎn),持久化順序節(jié)點(diǎn)等),默認(rèn)是持久化節(jié)點(diǎn),之后調(diào)用?forPath?函數(shù)來(lái)指定節(jié)點(diǎn)的路徑和數(shù)據(jù)信息
protection 模式 ,規(guī)避僵尸節(jié)點(diǎn)
/*** protection 模式,防止由于異常原因,導(dǎo)致僵尸節(jié)點(diǎn)* @throws Exception*/@SneakyThrows@Testpublic void testCreate() {CuratorFramework curatorFramework = getCuratorFramework();String forPath = curatorFramework.create().withProtection() // 防止僵尸節(jié)點(diǎn).withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-node", "data".getBytes());log.info("curator create node :{} successfully.", forPath);}看下zk中的數(shù)據(jù)
實(shí)現(xiàn)原理后面單獨(dú)開篇解讀,總體思想就是 隨機(jī)生成一個(gè)UUID, 再創(chuàng)建之前客戶端根據(jù)這個(gè)緩存的UUID去看ZK Server是否存在,存在則認(rèn)為是成功的,否則就繼續(xù)創(chuàng)建。
獲取數(shù)據(jù)
@Testpublic void testGetData() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node :{} successfully.", new String(bytes));}通過(guò)客戶端實(shí)例的getData() 方法更新 ZooKeeper 服務(wù)上的數(shù)據(jù)節(jié)點(diǎn),在getData 方法的后邊,通過(guò) forPath 函數(shù)來(lái)指定查詢的節(jié)點(diǎn)名稱
修改數(shù)據(jù)
@Testpublic void testSetData() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node /curator-node :{} successfully.", new String(bytes));}通過(guò)客戶端實(shí)例的 setData() 方法更新 ZooKeeper 服務(wù)上的數(shù)據(jù)節(jié)點(diǎn),在setData 方法的后邊,通過(guò) forPath 函數(shù)來(lái)指定更新的數(shù)據(jù)節(jié)點(diǎn)路徑以及要更新的數(shù)據(jù)。
刪除數(shù)據(jù) guaranteed()
@Testpublic void testDelete() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/node-parent";curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);}guaranteed:主要起到一個(gè)保障刪除成功的作用, 只要該客戶端的會(huì)話有效,就會(huì)在后臺(tái)持續(xù)發(fā)起刪除請(qǐng)求,直到該數(shù)據(jù)節(jié)點(diǎn)在ZooKeeper 服務(wù)端被刪除。
deletingChildrenIfNeeded:指定了該函數(shù)后,系統(tǒng)在刪除該數(shù)據(jù)節(jié)點(diǎn)的時(shí)候會(huì)以遞歸的方式直接刪除其子節(jié)點(diǎn),以及子節(jié)點(diǎn)的子節(jié)點(diǎn)。
獲取子節(jié)點(diǎn)
@Testpublic void testListChildren() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/artisan-node";List<String> list = curatorFramework.getChildren().forPath(pathWithParent);list.forEach(System.out::println); }通過(guò)客戶端實(shí)例的 getChildren() 方法更新 ZooKeeper 服務(wù)上的數(shù)據(jù)節(jié)點(diǎn),在getChildren方法的后邊,通過(guò) forPath 函數(shù)來(lái)指定節(jié)點(diǎn)下的一級(jí)子節(jié)點(diǎn)的名稱
異步線程池
Curator 使用BackgroundCallback 接口處理服務(wù)器端返回來(lái)的信息。
如果在異步線程中調(diào)用,默認(rèn)在 EventThread 線程中調(diào)用,支持自定義線程池
/*** 使用默認(rèn)的 EventThread異步線程處理* @throws Exception*/@Testpublic void testThreadPoolByDefaultEventThread() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String ZK_NODE="/artisan-node";curatorFramework.getData().inBackground((client, event) -> {log.info(" background: {}", new String(event.getData()));}).forPath(ZK_NODE);;} /*** 使用自定義線程池* @throws Exception*/@Testpublic void testThreadPoolByCustomThreadPool() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();ExecutorService executorService = Executors.newSingleThreadExecutor();String ZK_NODE="/artisan-node";curatorFramework.getData().inBackground((client, event) -> {log.info(" background: {}", new String(event.getData()));},executorService).forPath(ZK_NODE);}總結(jié)
以上是生活随笔為你收集整理的Apache ZooKeeper - 使用Apache Curator操作ZK的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Apache ZooKeeper - 使
- 下一篇: Apache ZooKeeper - 高