FROM: http://www.superwu.cn/2014/11/26/1461
本節本來是要介紹 ZooKeeper 的實現原理,但是 ZooKeeper 的原理比較復雜,它涉及到了 paxos 算法、 Zab 協議、通信協議等相關知識,理解起來比較抽象所以還需要借助一些應用場景,來幫我們理解。由于內容比較多,一口氣吃不成胖子,得慢慢來一步一個腳印,因此我對后期 ZooKeeper 的學習規劃如下:
第一階段:
|--- 理解 ZooKeeper 的應用
????|--- ZooKeeper 是什么
????|--- ZooKeeper 能干什么
????|--- ZooKeeper 怎么使用
第二階段:
|--- 理解 ZooKeeper 原理準備
????|--- 了解 paxos
????|--- 理解 zab 原理
????|--- 理解選舉 / 同步流程
第三階段:
????|--- 深入 ZooKeeper 原理
????????|--- 分析源碼
????????|--- 嘗試開發分布式應用
由于內容較多,而且理解較為復雜,所以每個階段分開來學習和介紹,那么本文主要介紹的的是第一階段 ,該階段一般應該放在前面介紹,但感覺像一些 ZooKeeper 應用案例,如果沒有一定的 ZooKeeper 基礎,理解起來也比較抽象, 所以放在這介紹。大家可以對比一下前面的應用程序,來對比理解一下前面的那些應用到底用到 ZooKeeper 的那些功能,來進一步理解 ZooKeeper 的實現理念,由于網上關于這方面的介紹比較多,如果一些可愛的博友對該內容已經比較了解,那么您可以不用往下看了,繼續下一步學習。
一、 ZooKeeper 產生背景
1.1 分布式的發展
分布式 這個概念我想大家并不陌生,但真正實戰開始還要從google 說起,很早以前在實驗室中分布式被人提出,可是說是計算機內入行較為復雜學習較為困難的技術,并且市場也并不成熟,因此大規模的商業應用一直未成出現,但從 Google 發布了 MapReduce? 和 DFS? 以及 Bigtable 的論文之后,分布式在計算機界的格局就發生了變化,從架構上實現了分布式的難題,并且成熟的應用在了海量數據存儲和計算上,其集群的規模也是當前世界上最為龐大的。
以 DFS 為基礎的分布式計算框架和 key 、 value? 數據高效的解決運算的瓶頸 ,而且開發人員不用再寫復雜的分布式程序,只要底層框架完備開發人員只要用較少的代碼就可以完成分布式程序的開發,這使得開發人員只需要關注業務邏輯的即可。 Google 在業界技術上的領軍地位,讓業界望塵莫及的技術實力, IT 因此也是對 Google 所退出的技術十分推崇。在最近幾年中分布式則是成為了海量數據存儲以及計算、高并發、高可靠性、高可用性的解決方案。
1.2 ZooKeeper 的產生
眾所周知通常分布式架構都是中心化的設計,就是一個主控機連接多個處理節點。問題可以從這里考慮,當主控機失效時,整個系統則就無法訪問了,所以保證系統的高可用性是非常關鍵之處,也就是要保證主控機的高可用性。分布式鎖就是一個解決該問題的較好方案,多主控機搶一把鎖。在這里我們就涉及到了我們的重點 Zookeeper 。
ZooKeeper 是什么, chubby 我想大家都不會陌生的, chubby 是實現 Google 的一個分布式鎖的實現,運用到了 paxos 算法解決的一個分布式事務管理的系統。 Zookeeper 就是雅虎模仿強大的 Google chubby 實現的一套分布式鎖管理系統。同時, Zookeeper 分布式服務框架是 Apache Hadoop 的一個子項目,它是一個針對大型分布式系統的可靠協調系統,它主要是用來解決分布式應用中經常遇到的一些數據管理問題,可以高可靠的維護元數據。提供的功能 包括:配置維護、名字服務、分布式同步、組服務等。 ZooKeeper 的設計目標就是封裝 好復雜易出錯的關鍵服務 ,將簡單易用的接口和性能高效、功能穩定的系統提供給用戶。
1.3 ZooKeeper 的使用
Zookeeper 作為一個分布式的服務框架,主要用來解決分布式集群中應用系統的一致性問題,它能提供基于類似于文件系統的目錄節點樹方式的數據存儲,但是 Zookeeper 并不是用來專門存儲數據的,它的作用主要是用來維護和監控你存儲的數據的狀態變化 。通過監控這些數據狀態的變化,從而可以達到基于數據的集群管理 ,后面將 會詳細介紹 Zookeeper 能夠解決的一些典型問題。
注意一下 這里的" 數據 " 是有限制的:
(1)? 從數據大小來看 :我們知道ZooKeeper 的數據存儲在一個叫 ReplicatedDataBase? 的數據庫中,該數據是一個內存數據庫,既然是在內存當中,我就應該知道該數據量就應該不會太大,這一點上就與 hadoop 的 HDFS 有了很大的區別, HDFS 的數據主要存儲在磁盤上,因此數據存儲主要是 HDFS 的事,而 ZooKeeper 主要是協調功能,并不是用來存儲數據的。
(2) 從數據類型來看: 正如前面所說的,ZooKeeper 的數據在內存中,由于內存空間的限制,那么我們就不能在上面隨心所欲的存儲數據,所以 ZooKeeper 存儲的數據都是我們所關心的數據而且數據量還不能太大,而且還會根據我們要以實現的功能來選擇相應的數據。簡單來說,干什么事存什么數據, ZooKeeper 所實現的一切功能,都是由 ZK 節點的性質和該節點所關聯的數據實現的,至于關聯什么數據那就要看你干什么事了。
例如:
① 集群管理 :利用臨時節點特性,節點關聯的是機器的主機名、IP 地址等相關信息,集群單點故障也屬于該范疇。 ?
? ? ② 統一命名: 主要利用節點的唯一性和目錄節點樹結構。
③ 配置管理: 節點關聯的是配置信息。
④ 分布式鎖: 節點關聯的是要競爭的資源。
二、 ZooKeeper 應用場景
ZooKeeper 是一個高可用的分布式數據管理與系統協調框架。基于對 Paxos 算法的實現,使該框架保證了分布式環境中數據的強一致性,也正是基于這樣的特性,使得 zookeeper 能夠應用于很多場景。需要注意的是, ZK 并不是生來就為這些場景設計,都是后來眾多開發者根據框架的特性,摸索出來的典型使用方法。因此,我們也可以根據自己的需要來設計相應的場景實現。正如前文所提到的, ZooKeeper 實現的任何功能都離不開 ZooKeeper 的數據結構,任何功能的實現都是利用 "Znode 結構特性 + 節點關聯的數據 " 來實現的,好吧那么我們就看一下 ZooKeeper 數據結構有哪些特性。 ZooKeeper 數據結構如下圖所示:
圖2.1 ZooKeeper數據結構
Zookeeper 這種數據結構有如下這些特點:
① ? 每個子目錄項如 NameService 都被稱作為 znode ,這個 znode 是被它所在的路徑唯一標識,如 Server1 這個 znode 的標識為 ?/NameService/Server1 ;
② ?znode 可以有子節點目錄,并且每個 znode 可以存儲數據,注意 ?EPHEMERAL? 類型的目錄節點不能有子節點目錄;
③ ?znode 是有版本的,每個 znode 中存儲的數據可以有多個版本,也就是一個訪問路徑中可以存儲多份數據;
④ ?znode 可以是臨時節點,一旦創建這個 znode 的客戶端與服務器失去聯系,這個 znode 也將自動刪除, Zookeeper 的客戶端和服務器通信采用長連接方式,每個客戶端和服務器通過心跳來保持連接,這個連接狀態稱為 session ,如果 znode 是臨時節點,這個 session 失效, znode 也就刪除了;
⑤ ? znode 的目錄名可以自動編號,如 App1 已經存在,再創建的話,將會自動命名為 App2 ;
⑥ ? znode 可以被監控,包括這個目錄節點中存儲的數據的修改,子節點目錄的變化等,一旦變化可以通知設置監控的客戶端,這個是 Zookeeper 的核心特性, Zookeeper 的很多功能都是基于這個特性實現的。
2.1 數據發布與訂閱
(1) 典型場景描述
發布與訂閱即所謂的配置管理,顧名思義就是將數據發布到 ZK 節點上,供訂閱者動態獲取數據,實現配置信息的集中式管理和動態更新。例如全局的配置信息 ,地址列表 等就非常適合使用。集中式的配置管理在應用集群中是非常常見的,一般商業公司內部都會實現一套集中的配置管理中心,應對不同的應用集群對于共享各自配置的需求,并且在配置變更時能夠通知到集群中的每一個機器。
(2) 應用
① ? 索引信息和集群中機器節點狀態存放在 ZK 的一些指定節點,供各個客戶端訂閱使用。
② ? 系統日志(經過處理后的)存儲,這些日志通常 2-3 天后被清除。
③ ? 應用中用到的一些配置信息集中管理,在應用啟動的時候主動來獲取一次,并且在節點上注冊一個 Watcher ,以后每次配置有更新,實時通知到應用,獲取最新配置信息。
④ ? 業務邏輯中需要用到的一些全局變量,比如一些消息中間件的消息隊列通常有個 offset ,這個 offset 存放在 zk 上,這樣集群中每個發送者都能知道當前的發送進度。
⑤ ? 系統中有些信息需要動態獲取,并且還會存在人工手動去修改這個信息。以前通常是暴露出接口,例如 JMX 接口,有了 ZK 后,只要將這些信息存放到 ZK 節點上即可。
(3) 應用舉例
例如 :同一個應用系統需要多臺 PC Server 運行,但是它們運行的應用系統的某些配置項是相同的,如果要修改這些相同的配置項,那么就必須同時修改每臺運行這個應用系統的 PC Server ,這樣非常麻煩而且容易出錯。將配置信息保存在 Zookeeper 的某個目錄節點中,然后將所有需要修改的應用機器監控配置信息的狀態,一旦配置信息發生變化,每臺應用機器就會收到 Zookeeper 的通知,然后從 Zookeeper 獲取新的配置信息應用到系統中。 ZooKeeper 配置管理服務如下圖所示:
圖2.2 配置管理結構圖
Zookeeper 很容易實現這種集中式的配置管理,比如將所需要的配置信息放到 /Configuration ?節點上,集群中所有機器一啟動就會通過 Client 對 /Configuration 這個節點進行監控【 zk.exist("/Configuration″,true) 】,并且實現 Watcher 回調方法 process() ,那么在 zookeeper 上 /Configuration 節點下數據發生變化的時候,每個機器都會收到通知, Watcher 回調方法將會被執行,那么應用再取下數據即可【 zk.getData("/Configuration″,false,null) 】。
2.2 統一命名服務( Name Service )
(1) 場景描述
分布式應用中,通常需要有一套完整的命名規則,既能夠產生唯一的名稱又便于人識別和記住,通常情況下用樹形的名稱結構是一個理想的選擇,樹形的名稱結構是一個有層次的目錄結構,既對人友好又不會重復。說到這里你可能想到了 JNDI ,沒錯 Zookeeper 的 Name Service 與 JNDI 能夠完成的功能是差不多的,它們都是將有層次的目錄結構關聯到一定資源上 ,但是 Zookeeper 的 Name Service 更加是廣泛意義上的關聯,也許你并不需要將名稱關聯到特定資源上,你可能只需要一個不會重復名稱,就像數據庫中產生一個唯一的數字主鍵一樣。
(2) 應用
在分布式系統中,通過使用命名服務,客戶端應用能夠根據指定的名字來獲取資源服務的地址,提供者等信息。被命名的實體通常可以是集群中的機器,提供的服務地址,進程對象等等,這些我們都可以統稱他們為名字( Name )。其中較為常見的就是一些分布式服務框架中的服務地址列表。通過調用 ZK 提供的創建節點的 API ,能夠很容易創建一個全局唯一的 path ,這個 path 就可以作為一個名稱。 Name Service 已經是 Zookeeper 內置的功能,你只要調用 Zookeeper 的 API 就能實現。如調用 create 接口就可以很容易創建一個目錄節點。
(3) 應用舉例
阿里開源的分布式服務框架 Dubbo 中使用 ZooKeeper 來作為其命名服務,維護全局的服務地址列表。在 Dubbo 實現中: ? 服務提供者 在啟動的時候,向ZK 上的指定節點 /dubbo/${serviceName}/providers 目錄下寫入自己的 URL 地址,這個操作就完成了服務的發布。 ? 服務消費者 啟動的時候,訂閱/dubbo/ serviceName / providers 目錄下的提供者 URL 地址,并向 / dubbo / {serviceName} /consumers 目錄下寫入自己的 URL 地址。 注意,所有向 ZK 上注冊的地址都是臨時節點,這樣就能夠保證服務提供者和消費者能夠自動感應資源的變化。 另外, Dubbo 還有針對服務粒度的監控,方法是訂閱 /dubbo/${serviceName} 目錄下所有提供者和消費者的信息。
2.3 分布通知 / 協調( Distribution of notification/coordination )
(1) 典型場景描述
ZooKeeper 中特有 watcher 注冊與異步通知機制,能夠很好的實現分布式環境下不同系統之間的通知與協調,實現對數據變更的實時處理。使用方法通常是不同系統都對 ZK 上同一個 znode 進行注冊,監聽 znode 的變化(包括 znode 本身內容及子節點的),其中一個系統 update 了 znode ,那么另一個系統能夠收到通知,并作出相應處理。
(2) 應用
① ? 另一種心跳檢測機制 :檢測系統和被檢測系統之間并不直接關聯起來,而是通過 ZK 上某個節點關聯,大大減少系統耦合。
② ? 另一種系統調度模式 :某系統由控制臺和推送系統兩部分組成,控制臺的職責是控制推送系統進行相應的推送工作。管理人員在控制臺作的一些操作,實際上是修改了 ZK 上某些節點的狀態,而 ZK 就把這些變化通知給他們注冊 Watcher 的客戶端,即推送系統,于是,作出相應的推送任務。
③ ? 另一種工作匯報模式 :一些類似于任務分發系統,子任務啟動后,到 ZK 來注冊一個臨時節點,并且定時將自己的進度進行匯報(將進度寫回這個臨時節點),這樣任務管理者就能夠實時知道任務進度。
總之,使用 zookeeper 來進行分布式通知和協調能夠大大降低系統之間的耦合。
2.4 分布式鎖( Distribute Lock )
(1) 場景描述
分布式鎖 ,這個主要得益于ZooKeeper 為我們保證了數據的強一致性 ,即用戶只要完全相信每時每刻, zk 集群中任意節點(一個 zk server )上的相同 znode 的數據是一定是相同的。鎖服務可以分為兩類,一個是保持獨占,另一個是控制時序。
保持獨占,就是所有試圖來獲取這個鎖的客戶端,最終只有一個可以成功獲得這把鎖。通常的做法是把 ZK 上的一個 znode 看作是一把鎖,通過 create znode 的方式來實現。所有客戶端都去創建 /distribute_lock 節點,最終成功創建的那個客戶端也即擁有了這把鎖。
控制時序,就是所有試圖來獲取這個鎖的客戶端,最終都是會被安排執行,只是有個全局時序了。做法和上面基本類似,只是這里 /distribute_lock 已經預先存在,客戶端在它下面創建臨時有序節點。 Zk 的父節點( /distribute_lock )維持一份 sequence, 保證子節點創建的時序性,從而也形成了每個客戶端的全局時序。
(2) 應用
共享鎖在同一個進程中很容易實現,但是在跨進程或者在不同 Server 之間就不好實現了。 Zookeeper 卻很容易實現這個功能,實現方式也是需要獲得鎖的 Server 創建一個 ?EPHEMERAL_SEQUENTIAL? 目錄節點,然后調用 ?getChildren 方法獲取當前的目錄節點列表中最小的目錄節點是不是就是自己創建的目錄節點,如果正是自己創建的,那么它就獲得了這個鎖,如果不是那么它就調用 ?exists(String path, boolean watch) 方法并監控 Zookeeper 上目錄節點列表的變化,一直到自己創建的節點是列表中最小編號的目錄節點,從而獲得鎖,釋放鎖很簡單,只要刪除前面它自己所創建的目錄節點就行了。
圖 2.3 ZooKeeper實現Locks的流程圖
代碼清單1 TestMainClient 代碼
package org.zk.leader.election; import?org.apache.log4j.xml.DOMConfigurator; import?org.apache.zookeeper.WatchedEvent; import?org.apache.zookeeper.Watcher; import?org.apache.zookeeper.ZooKeeper; import?java.io.IOException; /** ?* TestMainClient ?* <p/> ?* Author By: sunddenly 工作室 ?* Created Date: 2014-11-13 ?*/ public?class?TestMainClient implements Watcher { ????protected?static?ZooKeeper zk =?null; ????protected?static?Integer mutex; ????int?sessionTimeout = 10000; ????protected?String root; ????public?TestMainClient(String connectString) { ????????if(zk ==?null){ ????????????try?{ ????????????????String configFile =?this.getClass().getResource("/").getPath()+"org/zk/leader/election/log4j.xml"; ????????????????DOMConfigurator.configure(configFile); ????????????????System.out.println(" 創建一個新的連接 :"); ????????????????zk =?new?ZooKeeper(connectString, sessionTimeout,?this); ????????????????mutex =?new?Integer(-1); ????????????}?catch?(IOException e) { ????????????????zk =?null; ????????????} ????????} ????} ???synchronized?public?void?process(WatchedEvent?event) { ????????synchronized (mutex) { ????????????mutex.notify(); ????????} ????} }
清單 2 Locks 代碼
package?org.zk.locks; import?org.apache.log4j.Logger; import?org.apache.zookeeper.CreateMode; import?org.apache.zookeeper.KeeperException; import?org.apache.zookeeper.WatchedEvent; import?org.apache.zookeeper.ZooDefs; import?org.apache.zookeeper.data.Stat; import?org.zk.leader.election.TestMainClient; import?java.util.Arrays; import?java.util.List; /** ?* locks ?* <p/> ?* Author By: sunddenly 工作室 ?* Created Date: 2014-11-13 16:49:40 ?*/ public?class?Locks?extends?TestMainClient { ????public?static?final?Logger logger = Logger.getLogger(Locks.class); ????String myZnode; ????public?Locks(String connectString, String root) { ????????super(connectString); ????????this.root = root; ????????if?(zk !=?null) { ????????????try?{ ????????????????Stat s = zk.exists(root,?false); ????????????????if?(s ==?null) { ????????????????????zk.create(root,?new?byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ????????????????} ????????????}?catch?(KeeperException e) { ????????????????logger.error(e); ????????????}?catch?(InterruptedException e) { ????????????????logger.error(e); ????????????} ????????} ????} ????void?getLock()?throws?KeeperException, InterruptedException{ ????????List<String> list = zk.getChildren(root,?false); ????????String[] nodes = list.toArray(new?String[list.size()]); ????????Arrays.sort(nodes); ????????if(myZnode.equals(root+"/"+nodes[0])){ ????????????doAction(); ????????} ????????else{ ????????????waitForLock(nodes[0]); ????????} ????} ????void?check()?throws?InterruptedException, KeeperException { ????????myZnode = zk.create(root + "/lock_" ,?new?byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); ????????getLock(); ????} ????void?waitForLock(String lower)?throws?InterruptedException, KeeperException { ????????Stat stat = zk.exists(root + "/" + lower,true); ????????if(stat !=?null){ ????????????mutex.wait(); ????????} ????????else{ ????????????getLock(); ????????} ????} ????@Override ????public?void?process(WatchedEvent event) { ????????if(event.getType() == Event.EventType.NodeDeleted){ ????????????System.out.println(" 得到通知 "); ????????????super.process(event); ????????????doAction(); ????????} ????} ????/** ?????*? 執行其他任務 ?????*/ ????private?void?doAction(){ ????????System.out.println(" 同步隊列已經得到同步,可以開始執行后面的任務了 "); ????} ????public?static?void?main(String[] args) { ????????String connectString = "localhost:2181"; ????????Locks lk =?new?Locks(connectString, "/locks"); ????????try?{ ????????????lk.check(); ????????}?catch?(InterruptedException e) { ????????????logger.error(e); ????????}?catch?(KeeperException e) { ????????????logger.error(e); ????????} ????} }
2.5 集群管理( Cluster Management )
(1) 典型場景描述
集群機器監控 :
這通常用于那種對集群中機器狀態,機器在線率有較高要求的場景,能夠快速對集群中機器變化作出響應。這樣的場景中,往往有一個監控系統,實時檢測集群機器是否存活。過去的做法通常是:監控系統通過某種手段(比如 ping )定時檢測每個機器,或者每個機器自己定時向監控系統匯報 " 我還活著 " 。 這種做法可行,但是存在兩個比較明顯的問題:
① ? 集群中機器有變動的時候,牽連修改的東西比較多。
② ? 有一定的延時。
利用 ZooKeeper 中兩個特性,就可以實施另一種集群機器存活性監控系統:
① ? 客戶端在節點 x 上注冊一個 Watcher ,那么如果 x 的子節點變化了,會通知該客戶端。
② ? 創建 EPHEMERAL 類型的節點,一旦客戶端和服務器的會話結束或過期,那么該節點就會消失。
Master 選舉:
Master 選舉則是 zookeeper 中最為經典的使用場景了,在分布式環境中,相同的業務應用分布在不同的機器上,有些業務邏輯,例如 一些耗時的計算,網絡 I/O 處,往往只需要讓整個集群中的某一臺機器進行執行,其余機器可以共享這個結果,這樣可以大大減少重復勞動,提高性能,于是這個 master 選舉便是這種場景下的碰到的主要問題。
利用 ZooKeeper 中兩個特性,就可以實施另一種集群中 Master 選舉:
① ? 利用 ZooKeeper 的強一致性,能夠保證在分布式高并發情況下節點創建的全局唯一性,即:同時有多個客戶端請求創建 /Master 節點,最終一定只有一個客戶端請求能夠創建成功。利用這個特性,就能很輕易的在分布式環境中進行集群選舉了。
② 另外,這種場景演化一下,就是動態 Master 選舉。這就要用到 ?EPHEMERAL_SEQUENTIAL 類型節點的特性了,這樣每個節點會自動被編號。允許所有請求都能夠創建成功,但是得有個創建順序,每次選取序列號最小的那個機器作為 Master 。
(2) 應用
在搜索系統中,如果集群中每個機器都生成一份全量索引,不僅耗時,而且不能保證彼此間索引數據一致。因此讓集群中的 Master 來迚行全量索引的生成,然后同步到集群中其它機器。另外, Master 選丼的容災措施是,可以隨時迚行手動挃定 master ,就是說應用在 zk 在無法獲取 master 信息時,可以通過比如 http 方式,向一個地方獲取 master 。 l 在 Hbase 中,也是使用 ZooKeeper 來實現動態 HMaster 的選舉。在 Hbase 實現中,會在 ZK 上存儲一些 ROOT 表的地址和 HMaster 的地址, HRegionServer 也會把自己以臨時節點( Ephemeral )的方式注冊到 Zookeeper 中,使得 HMaster 可以隨時感知到各個 HRegionServer 的存活狀態,同時,一旦 HMaster 出現問題,會重新選丼出一個 HMaster 來運行,從而避免了 HMaster 的單點問題的存活狀態,同時,一旦 HMaster 出現問題,會重新選丼出一個 HMaster 來運行,從而避免了 HMaster 的單點問題。
(3) 應用舉例
集群監控:
應用集群中,我們常常需要讓每一個機器知道集群中或依賴的其他某一個集群中哪些機器是活著的,并且在集群機器因為宕機,網絡斷鏈等原因能夠不在人工介入的情況下迅速通知到每一個機器, Zookeeper 能夠很容易的實現集群管理的功能,如有多臺 Server 組成一個服務集群,那么必須要一個 " 總管 " 知道當前集群中每臺機器的服務狀態,一旦有機器不能提供服務,集群中其它集群必須知道,從而做出調整重新分配服務策略。同樣當增加集群的服務能力時,就會增加一臺或多臺 Server ,同樣也必須讓 " 總管 " 知道,這就是 ZooKeeper 的集群監控功能。
圖2.4 集群管理結構圖
比如我在 zookeeper 服務器端有一個 znode 叫 /Configuration ,那么集群中每一個機器啟動的時候都去這個節點下創建一個 EPHEMERAL 類型的節點,比如 server1 創建 /Configuration ?/Server1, server2 創建 /Configuration ?/Server1,然后 Server1 和 Server2 都 watch /Configuration 這個父節點,那么也就是這個父節點下數據或者子節點變化都會通知對該節點進行 watch 的客戶端。因為 EPHEMERAL 類型節點有一個很重要的特性,就是客戶端和服務器端連接斷掉或者 session 過期就會使節點消失,那么在某一個機器掛掉或者斷鏈的時候,其對應的節點就會消 失,然后集群中所有對 /Configuration 進行 watch 的客戶端都會收到通知,然后取得最新列表即可。
Master 選舉:
Zookeeper 不僅能夠維護當前的集群中機器的服務狀態,而且能夠選出一個 " 總管 " ,讓這個總管來管理集群,這就是 Zookeeper 的另一個功能 Leader Election 。 Zookeeper 如何實現 Leader Election ,也就是選出一個 Master Server 。和前面的一樣每臺 Server 創建一個 EPHEMERAL 目錄節點,不同的是它還是一個 SEQUENTIAL 目錄節點,所以它是個 EPHEMERAL_SEQUENTIAL 目錄節點。之所以它是 EPHEMERAL_SEQUENTIAL 目錄節點,是因為我們可以給每臺 Server 編號,我們可以選擇當前是最小編號的 Server 為 Master ,假如這個最小編號的 Server 死去,由于是 EPHEMERAL 節點,死去的 Server 對應的節點也被刪除,所以當前的節點列表中又出現一個最小編號的節點,我們就選擇這個節點為當前 Master 。這樣就實現了動態選擇 Master ,避免了傳統意義上單 Master 容易出現單點故障的問題。
清單 3 Leader Election 代碼
package?org.zk.leader.election; import?org.apache.log4j.Logger; import?org.apache.zookeeper.CreateMode; import?org.apache.zookeeper.KeeperException; import?org.apache.zookeeper.WatchedEvent; import?org.apache.zookeeper.ZooDefs; import?org.apache.zookeeper.data.Stat; import?java.net.InetAddress; import?java.net.UnknownHostException; /** ?* LeaderElection ?* <p/> ?* Author By: sunddenly 工作室 ?* Created Date: 2014-11-13 ?*/ public?class?LeaderElection?extends?TestMainClient { ????public?static?final?Logger logger = Logger.getLogger(LeaderElection.class); ????public?LeaderElection(String connectString, String root) { ????????super(connectString); ????????this.root = root; ????????if?(zk !=?null) { ????????????try?{ ????????????????Stat s = zk.exists(root,?false); ????????????????if?(s ==?null) { ????????????????????zk.create(root,?new?byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); ????????????????} ????????????}?catch?(KeeperException e) { ????????????????logger.error(e); ????????????}?catch?(InterruptedException e) { ????????????????logger.error(e); ????????????} ????????} ????} ????void?findLeader()?throws?InterruptedException, UnknownHostException, KeeperException { ????????byte[] leader =?null; ????????try?{ ????????????leader = zk.getData(root + "/leader",?true,?null); ????????}?catch?(KeeperException e) { ????????????if?(e?instanceof?KeeperException.NoNodeException) { ????????????????logger.error(e); ????????????}?else?{ ????????????????throw?e; ????????????} ????????} ????????if?(leader !=?null) { ????????????following(); ????????}?else?{ ????????????String newLeader =?null; ????????????byte[] localhost = InetAddress.getLocalHost().getAddress(); ????????????try?{ ????????????????newLeader = zk.create(root + "/leader", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); ????????????}?catch?(KeeperException e) { ????????????????if?(e?instanceof?KeeperException.NodeExistsException) { ????????????????????logger.error(e); ????????????????}?else?{ ????????????????????throw?e; ????????????????} ????????????} ????????????if?(newLeader !=?null) { ????????????????leading(); ????????????}?else?{ ????????????????mutex.wait(); ????????????} ????????} ????} ????@Override ????public?void?process(WatchedEvent event) { ????????if?(event.getPath().equals(root + "/leader") && event.getType() == Event.EventType.NodeCreated) { ????????????System.out.println(" 得到通知 "); ????????????super.process(event); ????????????following(); ????????} ????} ????void?leading() { ????????System.out.println(" 成為領導者 "); ????} ????void?following() { ????????System.out.println(" 成為組成員 "); ????} ????public?static?void?main(String[] args) { ????????String connectString = "localhost:2181"; ????????LeaderElection le =?new?LeaderElection(connectString, "/GroupMembers"); ????????try?{ ????????????le.findLeader(); ????????}?catch?(Exception e) { ????????????logger.error(e); ????????} ????} }
2.6 隊列管理
Zookeeper 可以處理兩種類型的隊列:
① ? 當一個隊列的成員都聚齊時,這個隊列才可用,否則一直等待所有成員到達,這種是同步隊列。
② ? 隊列按照 FIFO 方式進行入隊和出隊操作,例如實現生產者和消費者模型。
(1)? 同步隊列用 Zookeeper 實現的實現思路如下:
創建一個父目錄 /synchronizing ,每個成員都監控標志( Set Watch )位目錄 /synchronizing/start 是否存在,然后每個成員都加入這個隊列,加入隊列的方式就是創建 /synchronizing/member_i 的臨時目錄節點,然后每個成員獲取 / synchronizing 目錄的所有目錄節點,也就是 member_i 。判斷 i 的值是否已經是成員的個數,如果小于成員個數等待 /synchronizing/start 的出現,如果已經相等就創建 /synchronizing/start 。
用下面的流程圖更容易理解:
圖 2.5 同步隊列流程圖
清單 4 Synchronizing 代碼
package?org.zk.queue; import?java.net.InetAddress; import?java.net.UnknownHostException; import?java.util.List; import?org.apache.log4j.Logger; import?org.apache.zookeeper.CreateMode; import?org.apache.zookeeper.KeeperException; import?org.apache.zookeeper.WatchedEvent; import?org.apache.zookeeper.Watcher; import?org.apache.zookeeper.ZooKeeper; import?org.apache.zookeeper.ZooDefs.Ids; import?org.apache.zookeeper.data.Stat; import?org.zk.leader.election.TestMainClient; /** ?* Synchronizing ?* <p/> ?* Author By: sunddenly 工作室 ?* Created Date: 2014-11-13 ?*/ public?class?Synchronizing?extends?TestMainClient { ????int?size; ????String name; ????public?static?final?Logger logger = Logger.getLogger(Synchronizing.class); ????/** ?????*? 構造函數 ?????* ?????* @param connectString? 服務器連接 ?????* @param root? 根目錄 ?????* @param size? 隊列大小 ?????*/ ????Synchronizing(String connectString, String root,?int?size) { ????????super(connectString); ????????this.root = root; ????????this.size = size; ????????if?(zk !=?null) { ????????????try?{ ????????????????Stat s = zk.exists(root,?false); ????????????????if?(s ==?null) { ????????????????????zk.create(root,?new?byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); ????????????????} ????????????}?catch?(KeeperException e) { ????????????????logger.error(e); ????????????}?catch?(InterruptedException e) { ????????????????logger.error(e); ????????????} ????????} ????????try?{ ????????????name =?new?String(InetAddress.getLocalHost().getCanonicalHostName().toString()); ????????}?catch?(UnknownHostException e) { ????????????logger.error(e); ????????} ????} ????/** ?????*? 加入隊列 ?????* ?????* @return ?????* @throws KeeperException ?????* @throws InterruptedException ?????*/ ????void?addQueue()?throws?KeeperException, InterruptedException{ ????????zk.exists(root + "/start",true); ????????zk.create(root + "/" + name,?new?byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); ????????synchronized?(mutex) { ????????????List<String> list = zk.getChildren(root,?false); ????????????if?(list.size() < size) { ????????????????mutex.wait(); ????????????}?else?{ ????????????????zk.create(root + "/start",?new?byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); ????????????} ????????} ????} ????@Override ????public?void?process(WatchedEvent event) { ????????if(event.getPath().equals(root + "/start") && event.getType() == Event.EventType.NodeCreated){ ????????????System.out.println(" 得到通知 "); ????????????super.process(event); ????????????doAction(); ????????} ????} ????/** ?????*? 執行其他任務 ?????*/ ????private?void?doAction(){ ????????System.out.println(" 同步隊列已經得到同步,可以開始執行后面的任務了 "); ????} ????public?static?void?main(String args[]) { ????????// 啟動 Server ????????String connectString = "localhost:2181"; ????????int?size = 1; ????????Synchronizing b =?new?Synchronizing(connectString, "/synchronizing", size); ????????try{ ????????????b.addQueue(); ????????}?catch?(KeeperException e){ ????????????logger.error(e); ????????}?catch?(InterruptedException e){ ????????????logger.error(e); ????????} ????} }
(2)? FIFO 隊列用 Zookeeper 實現思路如下:
實現的思路也非常簡單,就是在特定的目錄下創建 SEQUENTIAL 類型的子目錄 /queue_i ,這樣就能保證所有成員加入隊列時都是有編號的,出隊列時通過 getChildren( ) 方法可以返回當前所有的隊列中的元素,然后消費其中最小的一個,這樣就能保證 FIFO 。
下面是生產者和消費者這種隊列形式的示例代碼
清單 5 FIFOQueue 代碼
import?org.apache.log4j.Logger; import?org.apache.zookeeper.CreateMode; import?org.apache.zookeeper.KeeperException; import?org.apache.zookeeper.WatchedEvent; import?org.apache.zookeeper.ZooDefs; import?org.apache.zookeeper.data.Stat; import?java.nio.ByteBuffer; import?java.util.List; /** ?* FIFOQueue ?* <p/> ?* Author By: sunddenly 工作室 ?* Created Date: 2014-11-13 ?*/ public?class?FIFOQueue?extends?TestMainClient{ ????public?static?final?Logger logger = Logger.getLogger(FIFOQueue.class); ????/** ?????* Constructor ?????* ?????* @param connectString ?????* @param root ?????*/ ????FIFOQueue(String connectString, String root) { ????????super(connectString); ????????this.root = root; ????????if?(zk !=?null) { ????????????try?{ ????????????????Stat s = zk.exists(root,?false); ????????????????if?(s ==?null) { ????????????????????zk.create(root,?new?byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); ????????????????} ????????????}?catch?(KeeperException e) { ????????????????logger.error(e); ????????????}?catch?(InterruptedException e) { ????????????????logger.error(e); ????????????} ????????} ????} ????/** ?????*? 生產者 ?????* ?????* @param i ?????* @return ?????*/ ????boolean?produce(int?i)?throws?KeeperException, InterruptedException{ ????????ByteBuffer b = ByteBuffer.allocate(4); ????????byte[] value; ????????b.putInt(i); ????????value = b.array(); ????????zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE, ????????????????????CreateMode.PERSISTENT_SEQUENTIAL); ????????return?true; ????} ????/** ?????*? 消費者 ?????* ?????* @return ?????* @throws KeeperException ?????* @throws InterruptedException ?????*/ ????int?consume()?throws?KeeperException, InterruptedException{ ????????int?retvalue = -1; ????????Stat stat =?null; ????????while?(true) { ????????????synchronized?(mutex) { ????????????????List<String> list = zk.getChildren(root,?true); ????????????????if?(list.size() == 0) { ????????????????????mutex.wait(); ????????????????}?else?{ ????????????????????Integer min =?new?Integer(list.get(0).substring(7)); ????????????????????for(String s : list){ ????????????????????????Integer tempValue =?new?Integer(s.substring(7)); ????????????????????????if(tempValue < min) min = tempValue; ????????????????????} ????????????????????byte[] b = zk.getData(root + "/element" + min,false, stat); ????????????????????zk.delete(root + "/element" + min, 0); ????????????????????ByteBuffer buffer = ByteBuffer.wrap(b); ????????????????????retvalue = buffer.getInt(); ????????????????????return?retvalue; ????????????????} ????????????} ????????} ????} ????@Override ????public?void?process(WatchedEvent event) { ????????super.process(event); ????} ????public?static?void?main(String args[]) { ????????// 啟動 Server ????????TestMainServer.start(); ????????String connectString = "localhost:"+TestMainServer.CLIENT_PORT; ????????FIFOQueue q =?new?FIFOQueue(connectString, "/app1"); ????????int?i; ????????Integer max =?new?Integer(5); ????????System.out.println("Producer"); ????????for?(i = 0; i < max; i++) ????????????try{ ????????????????q.produce(10 + i); ????????????}?catch?(KeeperException e){ ????????????????logger.error(e); ????????????}?catch?(InterruptedException e){ ????????????????logger.error(e); ????????????} ????????for?(i = 0; i < max; i++) { ????????????try{ ????????????????int?r = q.consume(); ????????????????System.out.println("Item:?" + r); ????????????}?catch?(KeeperException e){ ????????????????i--; ????????????????logger.error(e); ????????????}?catch?(InterruptedException e){ ????????????????logger.error(e); ????????????} ????????} ????} }
三、 ZooKeeper 實際應用
假設我們的集群有:
(1)?20 個搜索引擎的服務器 :每個負責總索引中的一部分的搜索任務。
① ? 搜索引擎的服務器中的 15 個服務器現在提供搜索服務。
② ?5 個服務器正在生成索引。
這 20 個搜索引擎的服務器,經常要讓正在提供搜索服務的服務器停止提供服務開始生成索引 , 或生成索引的服務器已經把索引生成完成可以搜索提供服務了。
(2)? 一個總服務器 :負責向這20 個搜索引擎的服務器發出搜索請求并合并結果集。
(3)? 一個備用的總服務器 :負責當總服務器宕機時替換總服務器。
(4)? 一個 web 的 cgi :向總服務器發出搜索請求。
使用 Zookeeper 可以保證:
(1)? 總服務器: 自動感知有多少提供搜索引擎的服務器,并向這些服務器發出搜索請求。
(2)? 備用的總服務器: 宕機時自動啟用備用的總服務器。
(3)?web 的 cgi : 能夠自動地獲知總服務器的網絡地址變化。
(4) 實現如下:
① ? 提供搜索 引擎的服務器都在Zookeeper 中創建 znode , zk.create( "/search/nodes/node1",? "hostname".getBytes(),? Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL) ;
② 總服務器 可以從Zookeeper 中獲取一個 znode 的子節點的列表, zk.getChildren("/search/nodes", true);
③ ? 總服務器 遍歷這些子節點,并獲取子節點的數據生成提供搜索引擎的服務器列表;
④ ? 當總服務器 接收到子節點改變的事件信息, 重新返回第二步;
⑤ ? 總服務器 在Zookeeper 中創建節點, zk.create ("/search/master", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);
⑥ 備用的總服務器 監控Zookeeper 中的 "/search/master" 節點。當這個 znode 的節點數據改變時,把自己啟動變成總服務器,并把自己的網絡地址數據放進這個節點。
⑦ ?web 的 cgi 從 Zookeeper 中 "/search/master" 節點獲取總服務器的網絡地址數據,并向其發送搜索請求。
⑧ ?web 的 cgi 監控 Zookeeper 中的 "/search/master" 節點,當這個 znode 的節點數據改變時,從這個節點獲取總服務器的網絡地址數據 , 并改變當前的總服務器的網絡地址。
《新程序員》:云原生和全面數字化實踐 50位技術專家共同創作,文字、視頻、音頻交互閱讀
總結
以上是生活随笔 為你收集整理的分布式服务框架 Zookeeper — 管理分布式环境中的数据 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。