启动zookeeper_Giraph源码分析(一)—启动ZooKeeper服务
作者 | 白松
Giraph介紹:
Apache Giraph is an iterative graph processing system built for high scalability. For example, it is currently used at Facebook to analyze the social graph formed by users and their connections. Giraph originated as the open-source counterpart to Pregel, the graph processing architecture developed at Google and described in a 2010 paper. Both systems are inspired by the Bulk Synchronous Parallelmodel of distributed computation introduced by Leslie Valiant. Giraph adds several features beyond the basic Pregel model, including master computation, sharded aggregators, edge-oriented input, out-of-core computation, and more. With a steady development cycle and a growing community of users worldwide, Giraph is a natural choice for unleashing the potential of structured datasets at a massive scale.
原理:
Giraph基于Hadoop而建,將MapReduce中Mapper進行封裝,未使用reducer。在Mapper中進行多次迭代,每次迭代等價于BSP模型中的SuperStep。一個Hadoop Job等價于一次BSP作業。基礎結構如下圖所示。
每部分的功能如下:
1. ZooKeeper: responsible for computation state
–partition/worker mapping
–global state: #superstep
–checkpoint paths, aggregator values, statistics
2. Master: responsible for coordination
–assigns partitions to workers
–coordinates synchronization
–requests checkpoints
–aggregates aggregator values
–collects health statuses
3. Worker: responsible for vertices
–invokes active vertices compute() function
–sends, receives and assigns messages
–computes local aggregation values
說明
(1)實驗環境
三臺服務器:test165、test62、test63。test165同時是JobTracker和TaskTracker.
測試例子:官網自帶的SSSP程序,數據是自己模擬生成。
運行命令:Hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsVertex -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/giraph/SSSP -of org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/giraph/output-sssp-debug-7 -w 5
(2)為節約空間,下文中所有代碼均為核心代碼片段。
(3)core-site.xml中hadoop.tmp.dir的路徑設為:/home/hadoop/hadooptmp
(4)寫本文是多次調試完成的,故文中的JobID不一樣,讀者可理解為同一JobID.
(5)后續文章也遵循上述規則。
org.apache.giraph.graph.GraphMapper類
Giraph中自定義org.apache.giraph.graph.GraphMapper類來繼承Hadoop中的 org.apache.hadoop.mapreduce.Mapper類,覆寫了setup()、map()、cleanup()和run()方法。GraphMapper類的說明如下:
“This mapper that will execute the BSP graph tasks alloted to this worker. All tasks will be performed by calling the GraphTaskManager object managed by this GraphMapper wrapper classs. Since this mapper will not be passing data by key-value pairs through the MR framework, the Mapper parameter types are irrelevant, and set to Object type.”
BSP的運算邏輯被封裝在GraphMapper類中,其擁有一GraphTaskManager對象,用來管理Job的tasks。每個GraphMapper對象都相當于BSP中的一個計算節點(compute node)。
在GraphMapper類中的setup()方法中,創建GraphTaskManager對象并調用其setup()方法進行一些初始化工作。如下:
map()方法為空,因為所有操作都被封裝在了GraphTaskManager類中。在run()方法中調用GraphTaskManager對象的execute()方法進行BSP迭代計算。
org.apache.giraph.graph.GraphMapper類
功能:The Giraph-specific business logic for a single BSP compute node in whatever underlying type of cluster our Giraph job will run on. Owning object will provide the glue into the underlying cluster framework and will call this object to perform Giraph work.
下面講述setup()方法,代碼如下:
依次介紹每個方法的功能:
1、locateZookeeperClasspath(zkPathList)
找到ZK jar的本地副本,其路徑為:/home/hadoop/hadooptmp/mapred/local/taskTracker/root/jobcache/job_201403270456_0001/jars/job.jar ,用于啟動ZooKeeper服務。
2、startZooKeeperManager(),初始化和配置ZooKeeperManager。
定義如下:
3、org.apache.giraph.zk.ZooKeeperManager 類
功能:Manages the election of ZooKeeper servers, starting/stopping the services, etc.
ZooKeeperManager類的setup()定義如下:
createCandidateStamp()方法在 HDFS上 的_bsp/_defaultZkManagerDir/job_201403301409_0006/_task 目錄下為每個task創建一個文件,文件內容為空。文件名為本機的Hostname+taskPartition,如下截圖:
運行時指定了5個workers(-w 5),再加上一個master,所有上面有6個task。
getZooKeeperServerList()方法中,taskPartition為0的task會調用createZooKeeperServerList()方法創建ZooKeeper server List,也是創建一個空文件,通過文件名來描述Zookeeper servers。
首先獲取taskDirectory(_bsp/_defaultZkManagerDir/job_201403301409_0006/_task)目錄下文件,如果當前目錄下有文件,則把文件名(Hostname+taskPartition)中的Hostname和taskPartition存入到hostNameTaskMap中。掃描taskDirectory目錄后,若hostNameTaskMap的size大于serverCount(等于GiraphConstants.java中的ZOOKEEPER_SERVER_COUNT變量,定義為1),就停止外層的循環。外層循環的目的是:因為taskDirectory下的文件每個task文件時多個task在分布式條件下創建的,有可能task 0在此創建server List時,別的task還沒有生成后task文件。Giraph默認為每個Job啟動一個ZooKeeper服務,也就是說只有一個task會啟動ZooKeeper服務。
經過多次測試,task 0總是被選為ZooKeeper Server ,因為在同一進程中,掃描taskDirectory時,只有它對應的task 文件(其他task的文件還沒有生成好),然后退出for循環,發現hostNameTaskMap的size等于1,直接退出while循環。那么此處就選了test162 0。
最后,創建了文件:_bsp/_defaultZkManagerDir/job_201403301409_0006/zkServerList_test162 0
onlineZooKeeperServers(),根據zkServerList_test162 0文件,Task 0 先生成zoo.cfg配置文件,使用ProcessBuilder來創建ZooKeeper服務進程,然后Task 0 再通過socket連接到ZooKeeper服務進程上,最后創建文件 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0 來標記master任務已完成。worker一直在進行循環檢測master是否生成好 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0,即worker等待直到master上的ZooKeeper服務已經啟動完成。
啟動ZooKeeper服務的命令如下:
4、determineGraphFunctions()。
GraphTaskManager類中有CentralizedServiceMaster對象和CentralizedServiceWorker 對象,分別對應于master和worker。每個BSP compute node扮演的角色判定邏輯如下:
a) If not split master, everyone does the everything and/or running ZooKeeper.
b) If split master/worker, masters also run ZooKeeper
c) If split master/worker == true and giraph.zkList is set, the master will not instantiate a ZK instance, but will assume a quorum is already active on the cluster for Giraph to use.
該判定在GraphTaskManager 類中的靜態方法determineGraphFunctions()中定義,片段代碼如下:
默認的,Giraph會區分master和worker。會在master上面啟動zookeeper服務,不會在worker上啟動ZooKeeper服務。那么Task 0 就是master+ZooKeeper,其他Tasks就是workers。
更多技術文章可以查看“了解更多”
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的启动zookeeper_Giraph源码分析(一)—启动ZooKeeper服务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 薇娅就偷逃税致歉 称完全接受处罚:李佳琦
- 下一篇: 女子58同城找人疏通马桶报价60元:付款