zookeeper 源码阅读(2)
接著zookeeper 源碼閱讀(1)
Zookeeper服務器的啟動,大致可以分為以下五個步驟
1. 配置文件解析。
2. 初始化數據管理器。
3. 初始化網絡I/O管理器。
4. 數據恢復。
5. 對外服務。
2.1 單機版服務器啟動
單機版服務器的啟動其流程圖如下
上圖的過程可以分為預啟動和初始化過程。
1. 預啟動
1. 統一由QuorumPeerMain作為啟動類。無論單機或集群,在zkServer.cmd和zkServer.sh中都配置了QuorumPeerMain作為啟動入口類。
2. 解析配置文件zoo.cfg。zoo.cfg配置運行時的基本參數,如tickTime、dataDir、clientPort等參數。
3. 創建并啟動歷史文件清理器DatadirCleanupManager。對事務日志和快照數據文件進行定時清理。
4. 判斷當前是集群模式還是單機模式啟動。若是單機模式,則委托給ZooKeeperServerMain進行啟動。
5. 再次進行配置文件zoo.cfg的解析。
6. 創建服務器實例ZooKeeperServer。Zookeeper服務器首先會進行服務器實例的創建,然后對該服務器實例進行初始化,包括連接器、內存數據庫、請求處理器等組件的初始化。
2. 初始化
1. 創建服務器統計器ServerStats。ServerStats是Zookeeper服務器運行時的統計器。
2. 創建Zookeeper數據管理器FileTxnSnapLog。FileTxnSnapLog是Zookeeper上層服務器和底層數據存儲之間的對接層,提供了一系列操作數據文件的接口,如事務日志文件和快照數據文件。Zookeeper根據zoo.cfg文件中解析出的快照數據目錄dataDir和事務日志目錄dataLogDir來創建FileTxnSnapLog。
3. 設置服務器tickTime和會話超時時間限制。
4. 創建ServerCnxnFactory。通過配置系統屬性zookeper.serverCnxnFactory來指定使用Zookeeper自己實現的NIO還是使用Netty框架作為Zookeeper服務端網絡連接工廠。
5. 初始化ServerCnxnFactory。Zookeeper會初始化Thread作為ServerCnxnFactory的主線程,然后再初始化NIO服務器。
6. 啟動ServerCnxnFactory主線程。進入Thread的run方法,此時服務端還不能處理客戶端請求。
7. 恢復本地數據。啟動時,需要從本地快照數據文件和事務日志文件進行數據恢復。
8. 創建并啟動會話管理器。Zookeeper會創建會話管理器SessionTracker進行會話管理。
9. 初始化Zookeeper的請求處理鏈。Zookeeper請求處理方式為責任鏈模式的實現。會有多個請求處理器依次處理一個客戶端請求,在服務器啟動時,會將這些請求處理器串聯成一個請求處理鏈。
10. 注冊JMX服務。Zookeeper會將服務器運行時的一些信息以JMX的方式暴露給外部。
11.?注冊Zookeeper服務器實例。將Zookeeper服務器實例注冊給ServerCnxnFactory,之后Zookeeper就可以對外提供服務。
至此,單機版的Zookeeper服務器啟動完畢。
2.2 集群服務器啟動
單機和集群服務器的啟動在很多地方是一致的,其流程圖如下
上圖的過程可以分為預啟動、初始化、Leader選舉、Leader與Follower啟動期交互過程、Leader與Follower啟動等過程。
1. 預啟動
1. 統一由QuorumPeerMain作為啟動類。
2. 解析配置文件zoo.cfg。
3. 創建并啟動歷史文件清理器DatadirCleanupFactory。
4. 判斷當前是集群模式還是單機模式的啟動。在集群模式中,在zoo.cfg文件中配置了多個服務器地址,可以選擇集群啟動。
2. 初始化
1. 創建ServerCnxnFactory。
2. 初始化ServerCnxnFactory。
3. 創建Zookeeper數據管理器FileTxnSnapLog。
4. 創建QuorumPeer實例。Quorum是集群模式下特有的對象,是Zookeeper服務器實例(ZooKeeperServer)的托管者,QuorumPeer代表了集群中的一臺機器,在運行期間,QuorumPeer會不斷檢測當前服務器實例的運行狀態,同時根據情況發起Leader選舉。
5. 創建內存數據庫ZKDatabase。ZKDatabase負責管理ZooKeeper的所有會話記錄以及DataTree和事務日志的存儲。
6. 初始化QuorumPeer。將核心組件如FileTxnSnapLog、ServerCnxnFactory、ZKDatabase注冊到QuorumPeer中,同時配置QuorumPeer的參數,如服務器列表地址、Leader選舉算法和會話超時時間限制等。
7. 恢復本地數據。
8. 啟動ServerCnxnFactory主線程。
單機版是啟動入口:ZooKeeperServerMain,多機器集群入口是:QuorumPeerMain
---------------------------------------------------------------------------------------------------------------------------------------------------
直接看下zk 中的選舉算法先關的類:
zk 中的選舉類:AuthFastLeaderElection,FastLeaderElection和LeaderElection,其中很重要的選舉操作和算法在lookForLeader();AuthFastLeaderElection,FastLeaderElection和LeaderElection都是繼承接口Election。
FastLeaderElection.lookForLeader代碼:
/*** Starts a new round of leader election. Whenever our QuorumPeer* changes its state to LOOKING, this method is invoked, and it* sends notifications to all other peers.*/public Vote lookForLeader() throws InterruptedException {try {self.jmxLeaderElectionBean = new LeaderElectionBean();MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);self.jmxLeaderElectionBean = null;}if (self.start_fle == 0) {self.start_fle = System.currentTimeMillis();}try {HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();int notTimeout = finalizeWait;synchronized(this){logicalclock++;updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id = " + self.getId() +", proposed zxid=0x" + Long.toHexString(proposedZxid));sendNotifications();/** Loop in which we exchange notifications until we find a leader*/while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){/** Remove next notification from queue, times out after 2 times* the termination time*/Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);/** Sends more notifications if haven't received enough.* Otherwise processes new notification.*/if(n == null){if(manager.haveDelivered()){sendNotifications();} else {manager.connectAll();}/** Exponential backoff*/int tmpTimeOut = notTimeout*2;notTimeout = (tmpTimeOut < maxNotificationInterval?tmpTimeOut : maxNotificationInterval);LOG.info("Notification time out: " + notTimeout);}else if(self.getVotingView().containsKey(n.sid)) {/** Only proceed if the vote comes from a replica in the* voting view.*/switch (n.state) {case LOOKING:// If notification > current, replace and send messages outif (n.electionEpoch > logicalclock) {logicalclock = n.electionEpoch;recvset.clear();if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);} else {updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());}sendNotifications();} else if (n.electionEpoch < logicalclock) {if(LOG.isDebugEnabled()){LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"+ Long.toHexString(n.electionEpoch)+ ", logicalclock=0x" + Long.toHexString(logicalclock));}break;} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}if(LOG.isDebugEnabled()){LOG.debug("Adding vote: from=" + n.sid +", proposed leader=" + n.leader +", proposed zxid=0x" + Long.toHexString(n.zxid) +", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));}recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock, proposedEpoch))) {// Verify if there is any change in the proposed leaderwhile((n = recvqueue.poll(finalizeWait,TimeUnit.MILLISECONDS)) != null){if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)){recvqueue.put(n);break;}}/** This predicate is true once we don't read any new* relevant message from the reception queue*/if (n == null) {self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState());Vote endVote = new Vote(proposedLeader,proposedZxid,logicalclock,proposedEpoch);leaveInstance(endVote);return endVote;}}break;case OBSERVING:LOG.debug("Notification from observer: " + n.sid);break;case FOLLOWING:case LEADING:/** Consider all notifications from the same epoch* together.*/if(n.electionEpoch == logicalclock){recvset.put(n.sid, new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch));if(ooePredicate(recvset, outofelection, n)) {self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);leaveInstance(endVote);return endVote;}}/** Before joining an established ensemble, verify* a majority is following the same leader.*/outofelection.put(n.sid, new Vote(n.version,n.leader,n.zxid,n.electionEpoch,n.peerEpoch,n.state));if(ooePredicate(outofelection, outofelection, n)) {synchronized(this){logicalclock = n.electionEpoch;self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());}Vote endVote = new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch);leaveInstance(endVote);return endVote;}break;default:LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",n.state, n.sid);break;}} else {LOG.warn("Ignoring notification from non-cluster member " + n.sid);}}return null;} finally {try {if(self.jmxLeaderElectionBean != null){MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);}} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}self.jmxLeaderElectionBean = null;LOG.debug("Number of connection processing threads: {}",manager.getConnectionThreadCount());}}這個方法中刷選候選者:totalOrderPredicate,其目的獲取zxid較大者作為候選者。
/*** Check if a pair (server id, zxid) succeeds our* current vote.** @param id Server identifier* @param zxid Last zxid observed by the issuer of this vote*/protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));if(self.getQuorumVerifier().getWeight(newId) == 0){return false;}/** We return true if one of the following three cases hold:* 1- New epoch is higher* 2- New epoch is the same as current epoch, but new zxid is higher* 3- New epoch is the same as current epoch, new zxid is the same* as current zxid, but server id is higher.*/return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) &&((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));}?
具體調用這個方法是通過QuorumPeer,看下zk 啟動腳本zkServer.sh/cmd
@echo off REM Licensed to the Apache Software Foundation (ASF) under one or more REM contributor license agreements. See the NOTICE file distributed with REM this work for additional information regarding copyright ownership. REM The ASF licenses this file to You under the Apache License, Version 2.0 REM (the "License"); you may not use this file except in compliance with REM the License. You may obtain a copy of the License at REM REM http://www.apache.org/licenses/LICENSE-2.0 REM REM Unless required by applicable law or agreed to in writing, software REM distributed under the License is distributed on an "AS IS" BASIS, REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. REM See the License for the specific language governing permissions and REM limitations under the License.setlocal call "%~dp0zkEnv.cmd"set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain echo on call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*endlocal逐步看下集群模式下源碼:
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.zookeeper.server.quorum;import java.io.File; import java.io.IOException;import javax.management.JMException;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.jmx.ManagedUtil; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.DatadirCleanupManager; import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;/**** <h2>Configuration file</h2>** When the main() method of this class is used to start the program, the first* argument is used as a path to the config file, which will be used to obtain* configuration information. This file is a Properties file, so keys and* values are separated by equals (=) and the key/value pairs are separated* by new lines. The following is a general summary of keys used in the* configuration file. For full details on this see the documentation in* docs/index.html* <ol>* <li>dataDir - The directory where the ZooKeeper data is stored.</li>* <li>dataLogDir - The directory where the ZooKeeper transaction log is stored.</li>* <li>clientPort - The port used to communicate with clients.</li>* <li>tickTime - The duration of a tick in milliseconds. This is the basic* unit of time in ZooKeeper.</li>* <li>initLimit - The maximum number of ticks that a follower will wait to* initially synchronize with a leader.</li>* <li>syncLimit - The maximum number of ticks that a follower will wait for a* message (including heartbeats) from the leader.</li>* <li>server.<i>id</i> - This is the host:port[:port] that the server with the* given id will use for the quorum protocol.</li>* </ol>* In addition to the config file. There is a file in the data directory called* "myid" that contains the server id as an ASCII decimal value.**/ public class QuorumPeerMain {private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);private static final String USAGE = "Usage: QuorumPeerMain configfile";protected QuorumPeer quorumPeer;/*** To start the replicated server specify the configuration file name on* the command line.* @param args path to the configfile*/public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {main.initializeAndRun(args);} catch (IllegalArgumentException e) {LOG.error("Invalid arguments, exiting abnormally", e);LOG.info(USAGE);System.err.println(USAGE);System.exit(2);} catch (ConfigException e) {LOG.error("Invalid config, exiting abnormally", e);System.err.println("Invalid config, exiting abnormally");System.exit(2);} catch (Exception e) {LOG.error("Unexpected exception, exiting abnormally", e);System.exit(1);}LOG.info("Exiting normally");System.exit(0);}protected void initializeAndRun(String[] args)throws ConfigException, IOException{QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {config.parse(args[0]);}// Start and schedule the the purge taskDatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());purgeMgr.start();if (args.length == 1 && config.servers.size() > 0) {runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running "+ " in standalone mode");// there is only server in the quorum -- run as standaloneZooKeeperServerMain.main(args);}}public void runFromConfig(QuorumPeerConfig config) throws IOException {try {ManagedUtil.registerLog4jMBeans();} catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);}LOG.info("Starting quorum peer");try {ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns());quorumPeer = new QuorumPeer(config.getServers(),new File(config.getDataDir()),new File(config.getDataLogDir()),config.getElectionAlg(),config.getServerId(),config.getTickTime(),config.getInitLimit(),config.getSyncLimit(),config.getQuorumListenOnAllIPs(),cnxnFactory,config.getQuorumVerifier());quorumPeer.setClientPortAddress(config.getClientPortAddress());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());// sets quorum sasl authentication configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);if(quorumPeer.isQuorumSaslAuthEnabled()){quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);}quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();quorumPeer.start();quorumPeer.join();} catch (InterruptedException e) {// warn, but generally this is okLOG.warn("Quorum Peer interrupted", e);}} }解析zkcfg以及委托zookeeperMain在方法:
protected void initializeAndRun(String[] args)throws ConfigException, IOException{QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {config.parse(args[0]);}// Start and schedule the the purge taskDatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());purgeMgr.start();if (args.length == 1 && config.servers.size() > 0) {runFromConfig(config);} else {LOG.warn("Either no config or no quorum defined in config, running "+ " in standalone mode");// there is only server in the quorum -- run as standaloneZooKeeperServerMain.main(args);}}解析zkcfg 文件,參數傳入的是zkcfg 的路徑:
public void parse(String path) throws ConfigException {File configFile = new File(path);LOG.info("Reading configuration from: " + configFile);try {if (!configFile.exists()) {throw new IllegalArgumentException(configFile.toString()+ " file is missing");}Properties cfg = new Properties();FileInputStream in = new FileInputStream(configFile);try {cfg.load(in);} finally {in.close();}parseProperties(cfg);} catch (IOException e) {throw new ConfigException("Error processing " + path, e);} catch (IllegalArgumentException e) {throw new ConfigException("Error processing " + path, e);}} public void parseProperties(Properties zkProp)throws IOException, ConfigException {int clientPort = 0;String clientPortAddress = null;for (Entry<Object, Object> entry : zkProp.entrySet()) {String key = entry.getKey().toString().trim();String value = entry.getValue().toString().trim();if (key.equals("dataDir")) {dataDir = value;} else if (key.equals("dataLogDir")) {dataLogDir = value;} else if (key.equals("clientPort")) {clientPort = Integer.parseInt(value);} else if (key.equals("clientPortAddress")) {clientPortAddress = value.trim();} else if (key.equals("tickTime")) {tickTime = Integer.parseInt(value);} else if (key.equals("maxClientCnxns")) {maxClientCnxns = Integer.parseInt(value);} else if (key.equals("minSessionTimeout")) {minSessionTimeout = Integer.parseInt(value);} else if (key.equals("maxSessionTimeout")) {maxSessionTimeout = Integer.parseInt(value);} else if (key.equals("initLimit")) {initLimit = Integer.parseInt(value);} else if (key.equals("syncLimit")) {syncLimit = Integer.parseInt(value);} else if (key.equals("electionAlg")) {electionAlg = Integer.parseInt(value);} else if (key.equals("quorumListenOnAllIPs")) {quorumListenOnAllIPs = Boolean.parseBoolean(value);} else if (key.equals("peerType")) {if (value.toLowerCase().equals("observer")) {peerType = LearnerType.OBSERVER;} else if (value.toLowerCase().equals("participant")) {peerType = LearnerType.PARTICIPANT;} else{throw new ConfigException("Unrecognised peertype: " + value);}} else if (key.equals( "syncEnabled" )) {syncEnabled = Boolean.parseBoolean(value);} else if (key.equals("autopurge.snapRetainCount")) {snapRetainCount = Integer.parseInt(value);} else if (key.equals("autopurge.purgeInterval")) {purgeInterval = Integer.parseInt(value);} else if (key.startsWith("server.")) {int dot = key.indexOf('.');long sid = Long.parseLong(key.substring(dot + 1));String parts[] = splitWithLeadingHostname(value);if ((parts.length != 2) && (parts.length != 3) && (parts.length !=4)) {LOG.error(value+ " does not have the form host:port or host:port:port " +" or host:port:port:type");}LearnerType type = null;String hostname = parts[0];Integer port = Integer.parseInt(parts[1]);Integer electionPort = null;if (parts.length > 2){electionPort=Integer.parseInt(parts[2]);}if (parts.length > 3){if (parts[3].toLowerCase().equals("observer")) {type = LearnerType.OBSERVER;} else if (parts[3].toLowerCase().equals("participant")) {type = LearnerType.PARTICIPANT;} else {throw new ConfigException("Unrecognised peertype: " + value);}}if (type == LearnerType.OBSERVER){observers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type));} else {servers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type));}} else if (key.startsWith("group")) {int dot = key.indexOf('.');long gid = Long.parseLong(key.substring(dot + 1));numGroups++;String parts[] = value.split(":");for(String s : parts){long sid = Long.parseLong(s);if(serverGroup.containsKey(sid))throw new ConfigException("Server " + sid + "is in multiple groups");elseserverGroup.put(sid, gid);}} else if(key.startsWith("weight")) {int dot = key.indexOf('.');long sid = Long.parseLong(key.substring(dot + 1));serverWeight.put(sid, Long.parseLong(value));} else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) {quorumEnableSasl = Boolean.parseBoolean(value);} else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED)) {quorumServerRequireSasl = Boolean.parseBoolean(value);} else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED)) {quorumLearnerRequireSasl = Boolean.parseBoolean(value);} else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT)) {quorumLearnerLoginContext = value;} else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT)) {quorumServerLoginContext = value;} else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) {quorumServicePrincipal = value;} else if (key.equals("quorum.cnxn.threads.size")) {quorumCnxnThreadsSize = Integer.parseInt(value);} else {System.setProperty("zookeeper." + key, value);}}if (!quorumEnableSasl && quorumServerRequireSasl) {throw new IllegalArgumentException(QuorumAuth.QUORUM_SASL_AUTH_ENABLED+ " is disabled, so cannot enable "+ QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);}if (!quorumEnableSasl && quorumLearnerRequireSasl) {throw new IllegalArgumentException(QuorumAuth.QUORUM_SASL_AUTH_ENABLED+ " is disabled, so cannot enable "+ QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED);}// If quorumpeer learner is not auth enabled then self won't be able to// join quorum. So this condition is ensuring that the quorumpeer learner// is also auth enabled while enabling quorum server require sasl.if (!quorumLearnerRequireSasl && quorumServerRequireSasl) {throw new IllegalArgumentException(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED+ " is disabled, so cannot enable "+ QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);}// Reset to MIN_SNAP_RETAIN_COUNT if invalid (less than 3)// PurgeTxnLog.purge(File, File, int) will not allow to purge less// than 3.if (snapRetainCount < MIN_SNAP_RETAIN_COUNT) {LOG.warn("Invalid autopurge.snapRetainCount: " + snapRetainCount+ ". Defaulting to " + MIN_SNAP_RETAIN_COUNT);snapRetainCount = MIN_SNAP_RETAIN_COUNT;}if (dataDir == null) {throw new IllegalArgumentException("dataDir is not set");}if (dataLogDir == null) {dataLogDir = dataDir;}if (clientPort == 0) {throw new IllegalArgumentException("clientPort is not set");}if (clientPortAddress != null) {this.clientPortAddress = new InetSocketAddress(InetAddress.getByName(clientPortAddress), clientPort);} else {this.clientPortAddress = new InetSocketAddress(clientPort);}if (tickTime == 0) {throw new IllegalArgumentException("tickTime is not set");}if (minSessionTimeout > maxSessionTimeout) {throw new IllegalArgumentException("minSessionTimeout must not be larger than maxSessionTimeout");}if (servers.size() == 0) {if (observers.size() > 0) {throw new IllegalArgumentException("Observers w/o participants is an invalid configuration");}// Not a quorum configuration so return immediately - not an error// case (for b/w compatibility), server will default to standalone// mode.return;} else if (servers.size() == 1) {if (observers.size() > 0) {throw new IllegalArgumentException("Observers w/o quorum is an invalid configuration");}// HBase currently adds a single server line to the config, for// b/w compatibility reasons we need to keep this here.LOG.error("Invalid configuration, only one server specified (ignoring)");servers.clear();} else if (servers.size() > 1) {if (servers.size() == 2) {LOG.warn("No server failure will be tolerated. " +"You need at least 3 servers.");} else if (servers.size() % 2 == 0) {LOG.warn("Non-optimial configuration, consider an odd number of servers.");}if (initLimit == 0) {throw new IllegalArgumentException("initLimit is not set");}if (syncLimit == 0) {throw new IllegalArgumentException("syncLimit is not set");}/** If using FLE, then every server requires a separate election* port.*/if (electionAlg != 0) {for (QuorumServer s : servers.values()) {if (s.electionAddr == null)throw new IllegalArgumentException("Missing election port for server: " + s.id);}}/** Default of quorum config is majority*/if(serverGroup.size() > 0){if(servers.size() != serverGroup.size())throw new ConfigException("Every server must be in exactly one group");/** The deafult weight of a server is 1*/for(QuorumServer s : servers.values()){if(!serverWeight.containsKey(s.id))serverWeight.put(s.id, (long) 1);}/** Set the quorumVerifier to be QuorumHierarchical*/quorumVerifier = new QuorumHierarchical(numGroups,serverWeight, serverGroup);} else {/** The default QuorumVerifier is QuorumMaj*/LOG.info("Defaulting to majority quorums");quorumVerifier = new QuorumMaj(servers.size());}// Now add observers to servers, once the quorums have been// figured outservers.putAll(observers);File myIdFile = new File(dataDir, "myid");if (!myIdFile.exists()) {throw new IllegalArgumentException(myIdFile.toString()+ " file is missing");}BufferedReader br = new BufferedReader(new FileReader(myIdFile));String myIdString;try {myIdString = br.readLine();} finally {br.close();}try {serverId = Long.parseLong(myIdString);MDC.put("myid", myIdString);} catch (NumberFormatException e) {throw new IllegalArgumentException("serverid " + myIdString+ " is not a number");}// Warn about inconsistent peer typeLearnerType roleByServersList = observers.containsKey(serverId) ? LearnerType.OBSERVER: LearnerType.PARTICIPANT;if (roleByServersList != peerType) {LOG.warn("Peer type from servers list (" + roleByServersList+ ") doesn't match peerType (" + peerType+ "). Defaulting to servers list.");peerType = roleByServersList;}}}解析完之后
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());purgeMgr.start(); /*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.zookeeper.server;import java.io.File; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.TimeUnit;import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** This class manages the cleanup of snapshots and corresponding transaction* logs by scheduling the auto purge task with the specified* 'autopurge.purgeInterval'. It keeps the most recent* 'autopurge.snapRetainCount' number of snapshots and corresponding transaction* logs.*/ public class DatadirCleanupManager {private static final Logger LOG = LoggerFactory.getLogger(DatadirCleanupManager.class);/*** Status of the dataDir purge task*/public enum PurgeTaskStatus {NOT_STARTED, STARTED, COMPLETED;}private PurgeTaskStatus purgeTaskStatus = PurgeTaskStatus.NOT_STARTED;private final String snapDir;private final String dataLogDir;private final int snapRetainCount;private final int purgeInterval;private Timer timer;/*** Constructor of DatadirCleanupManager. It takes the parameters to schedule* the purge task.* * @param snapDir* snapshot directory* @param dataLogDir* transaction log directory* @param snapRetainCount* number of snapshots to be retained after purge* @param purgeInterval* purge interval in hours*/public DatadirCleanupManager(String snapDir, String dataLogDir, int snapRetainCount,int purgeInterval) {this.snapDir = snapDir;this.dataLogDir = dataLogDir;this.snapRetainCount = snapRetainCount;this.purgeInterval = purgeInterval;LOG.info("autopurge.snapRetainCount set to " + snapRetainCount);LOG.info("autopurge.purgeInterval set to " + purgeInterval);}/*** Validates the purge configuration and schedules the purge task. Purge* task keeps the most recent <code>snapRetainCount</code> number of* snapshots and deletes the remaining for every <code>purgeInterval</code>* hour(s).* <p>* <code>purgeInterval</code> of <code>0</code> or* <code>negative integer</code> will not schedule the purge task.* </p>* * @see PurgeTxnLog#purge(File, File, int)*/public void start() {if (PurgeTaskStatus.STARTED == purgeTaskStatus) {LOG.warn("Purge task is already running.");return;}// Don't schedule the purge task with zero or negative purge interval.if (purgeInterval <= 0) {LOG.info("Purge task is not scheduled.");return;}timer = new Timer("PurgeTask", true);TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));purgeTaskStatus = PurgeTaskStatus.STARTED;}/*** Shutdown the purge task.*/public void shutdown() {if (PurgeTaskStatus.STARTED == purgeTaskStatus) {LOG.info("Shutting down purge task.");timer.cancel();purgeTaskStatus = PurgeTaskStatus.COMPLETED;} else {LOG.warn("Purge task not started. Ignoring shutdown!");}}static class PurgeTask extends TimerTask {private String logsDir;private String snapsDir;private int snapRetainCount;public PurgeTask(String dataDir, String snapDir, int count) {logsDir = dataDir;snapsDir = snapDir;snapRetainCount = count;}@Overridepublic void run() {LOG.info("Purge task started.");try {PurgeTxnLog.purge(new File(logsDir), new File(snapsDir), snapRetainCount);} catch (Exception e) {LOG.error("Error occurred while purging.", e);}LOG.info("Purge task completed.");}}/*** Returns the status of the purge task.* * @return the status of the purge task*/public PurgeTaskStatus getPurgeTaskStatus() {return purgeTaskStatus;}/*** Returns the snapshot directory.* * @return the snapshot directory.*/public String getSnapDir() {return snapDir;}/*** Returns transaction log directory.* * @return the transaction log directory.*/public String getDataLogDir() {return dataLogDir;}/*** Returns purge interval in hours.* * @return the purge interval in hours.*/public int getPurgeInterval() {return purgeInterval;}/*** Returns the number of snapshots to be retained after purge.* * @return the number of snapshots to be retained after purge.*/public int getSnapRetainCount() {return snapRetainCount;} }里面啟動task 清理工作空間和log 的整理工作,老的日志文件整理為年月日文件。
是集群執行:
public void runFromConfig(QuorumPeerConfig config) throws IOException {try {ManagedUtil.registerLog4jMBeans();} catch (JMException e) {LOG.warn("Unable to register log4j JMX control", e);}LOG.info("Starting quorum peer");try {ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns());quorumPeer = new QuorumPeer(config.getServers(),new File(config.getDataDir()),new File(config.getDataLogDir()),config.getElectionAlg(),config.getServerId(),config.getTickTime(),config.getInitLimit(),config.getSyncLimit(),config.getQuorumListenOnAllIPs(),cnxnFactory,config.getQuorumVerifier());quorumPeer.setClientPortAddress(config.getClientPortAddress());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());// sets quorum sasl authentication configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);if(quorumPeer.isQuorumSaslAuthEnabled()){quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);}quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();quorumPeer.start();quorumPeer.join();} catch (InterruptedException e) {// warn, but generally this is okLOG.warn("Quorum Peer interrupted", e);}}調用start 方法:
@Overridepublic synchronized void start() {loadDataBase();cnxnFactory.start(); startLeaderElection();super.start();}loadDataBase:
private void loadDataBase() {File updating = new File(getTxnFactory().getSnapDir(),UPDATING_EPOCH_FILENAME);try {zkDb.loadDataBase();// load the epochslong lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);try {currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);if (epochOfZxid > currentEpoch && updating.exists()) {LOG.info("{} found. The server was terminated after " +"taking a snapshot but before updating current " +"epoch. Setting current epoch to {}.",UPDATING_EPOCH_FILENAME, epochOfZxid);setCurrentEpoch(epochOfZxid);if (!updating.delete()) {throw new IOException("Failed to delete " +updating.toString());}}} catch(FileNotFoundException e) {// pick a reasonable epoch number// this should only happen once when moving to a// new code versioncurrentEpoch = epochOfZxid;LOG.info(CURRENT_EPOCH_FILENAME+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",currentEpoch);writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);}if (epochOfZxid > currentEpoch) {throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);}try {acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);} catch(FileNotFoundException e) {// pick a reasonable epoch number// this should only happen once when moving to a// new code versionacceptedEpoch = epochOfZxid;LOG.info(ACCEPTED_EPOCH_FILENAME+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",acceptedEpoch);writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);}if (acceptedEpoch < currentEpoch) {throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));}} catch(IOException ie) {LOG.error("Unable to load database on disk", ie);throw new RuntimeException("Unable to run quorum server ", ie);}}工廠模式調用start:
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.zookeeper.server;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Map; import java.util.concurrent.ConcurrentHashMap;import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginException; import javax.security.auth.login.AppConfigurationEntry;import javax.management.JMException;import org.apache.zookeeper.Login; import org.apache.zookeeper.Environment; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.auth.SaslServerCallbackHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public abstract class ServerCnxnFactory {public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";public interface PacketProcessor {public void processPacket(ByteBuffer packet, ServerCnxn src);}Logger LOG = LoggerFactory.getLogger(ServerCnxnFactory.class);/*** The buffer will cause the connection to be close when we do a send.*/static final ByteBuffer closeConn = ByteBuffer.allocate(0);public abstract int getLocalPort();public abstract Iterable<ServerCnxn> getConnections();public int getNumAliveConnections() {synchronized(cnxns) {return cnxns.size();}}ZooKeeperServer getZooKeeperServer() {return zkServer;}public abstract void closeSession(long sessionId);public abstract void configure(InetSocketAddress addr,int maxClientCnxns) throws IOException;protected SaslServerCallbackHandler saslServerCallbackHandler;public Login login;/** Maximum number of connections allowed from particular host (ip) */public abstract int getMaxClientCnxnsPerHost();/** Maximum number of connections allowed from particular host (ip) */public abstract void setMaxClientCnxnsPerHost(int max);public abstract void startup(ZooKeeperServer zkServer)throws IOException, InterruptedException;public abstract void join() throws InterruptedException;public abstract void shutdown();public abstract void start();protected ZooKeeperServer zkServer;final public void setZooKeeperServer(ZooKeeperServer zk) {this.zkServer = zk;if (zk != null) {zk.setServerCnxnFactory(this);}}public abstract void closeAll();static public ServerCnxnFactory createFactory() throws IOException {String serverCnxnFactoryName =System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);if (serverCnxnFactoryName == null) {serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();}try {return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName).newInstance();} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate "+ serverCnxnFactoryName);ioe.initCause(e);throw ioe;}}static public ServerCnxnFactory createFactory(int clientPort,int maxClientCnxns) throws IOException{return createFactory(new InetSocketAddress(clientPort), maxClientCnxns);}static public ServerCnxnFactory createFactory(InetSocketAddress addr,int maxClientCnxns) throws IOException{ServerCnxnFactory factory = createFactory();factory.configure(addr, maxClientCnxns);return factory;}public abstract InetSocketAddress getLocalAddress();private final Map<ServerCnxn, ConnectionBean> connectionBeans= new ConcurrentHashMap<ServerCnxn, ConnectionBean>();protected final HashSet<ServerCnxn> cnxns = new HashSet<ServerCnxn>();public void unregisterConnection(ServerCnxn serverCnxn) {ConnectionBean jmxConnectionBean = connectionBeans.remove(serverCnxn);if (jmxConnectionBean != null){MBeanRegistry.getInstance().unregister(jmxConnectionBean);}}public void registerConnection(ServerCnxn serverCnxn) {if (zkServer != null) {ConnectionBean jmxConnectionBean = new ConnectionBean(serverCnxn, zkServer);try {MBeanRegistry.getInstance().register(jmxConnectionBean, zkServer.jmxServerBean);connectionBeans.put(serverCnxn, jmxConnectionBean);} catch (JMException e) {LOG.warn("Could not register connection", e);}}}/*** Initialize the server SASL if specified.** If the user has specified a "ZooKeeperServer.LOGIN_CONTEXT_NAME_KEY"* or a jaas.conf using "java.security.auth.login.config"* the authentication is required and an exception is raised.* Otherwise no authentication is configured and no exception is raised.** @throws IOException if jaas.conf is missing or there's an error in it.*/protected void configureSaslLogin() throws IOException {String serverSection = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME);// Note that 'Configuration' here refers to javax.security.auth.login.Configuration.AppConfigurationEntry entries[] = null;SecurityException securityException = null;try {entries = Configuration.getConfiguration().getAppConfigurationEntry(serverSection);} catch (SecurityException e) {// handle below: might be harmless if the user doesn't intend to use JAAS authentication.securityException = e;}// No entries in jaas.conf// If there's a configuration exception fetching the jaas section and// the user has required sasl by specifying a LOGIN_CONTEXT_NAME_KEY or a jaas file// we throw an exception otherwise we continue without authentication.if (entries == null) {String jaasFile = System.getProperty(Environment.JAAS_CONF_KEY);String loginContextName = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY);if (securityException != null && (loginContextName != null || jaasFile != null)) {String errorMessage = "No JAAS configuration section named '" + serverSection + "' was found";if (jaasFile != null) {errorMessage += "in '" + jaasFile + "'.";}if (loginContextName != null) {errorMessage += " But " + ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY + " was set.";}LOG.error(errorMessage);throw new IOException(errorMessage);}return;}// jaas.conf entry availabletry {saslServerCallbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration());login = new Login(serverSection, saslServerCallbackHandler);login.startThreadIfNeeded();} catch (LoginException e) {throw new IOException("Could not configure server because SASL configuration did not allow the "+ " ZooKeeper server to authenticate itself properly: " + e);}} }這塊后面再深入分析,先分析后面的流程。
-------------------------------------------------------------------------------------------------------------------------------------------------
調用到了startLeaderElection選舉:
synchronized public void startLeaderElection() {try {currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());} catch(IOException e) {RuntimeException re = new RuntimeException(e.getMessage());re.setStackTrace(e.getStackTrace());throw re;}for (QuorumServer p : getView().values()) {if (p.id == myid) {myQuorumAddr = p.addr;break;}}if (myQuorumAddr == null) {throw new RuntimeException("My id " + myid + " not in the peer list");}if (electionType == 0) {try {udpSocket = new DatagramSocket(myQuorumAddr.getPort());responder = new ResponderThread();responder.start();} catch (SocketException e) {throw new RuntimeException(e);}}this.electionAlg = createElectionAlgorithm(electionType);}這塊代碼(如下,new 出來的)
try {ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns());quorumPeer = new QuorumPeer(config.getServers(),new File(config.getDataDir()),new File(config.getDataLogDir()),config.getElectionAlg(),config.getServerId(),config.getTickTime(),config.getInitLimit(),config.getSyncLimit(),config.getQuorumListenOnAllIPs(),cnxnFactory,config.getQuorumVerifier());quorumPeer.setClientPortAddress(config.getClientPortAddress());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());// sets quorum sasl authentication configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);if(quorumPeer.isQuorumSaslAuthEnabled()){quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);}quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();quorumPeer.start();quorumPeer.join();對應的構造方法:
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,File dataLogDir, int electionType,long myid, int tickTime, int initLimit, int syncLimit,boolean quorumListenOnAllIPs,ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException {this();this.cnxnFactory = cnxnFactory;this.quorumPeers = quorumPeers;this.electionType = electionType;this.myid = myid;this.tickTime = tickTime;this.initLimit = initLimit;this.syncLimit = syncLimit; this.quorumListenOnAllIPs = quorumListenOnAllIPs;this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);this.zkDb = new ZKDatabase(this.logFactory);if(quorumConfig == null)this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers));else this.quorumConfig = quorumConfig;}?private int electionType; 初始化0,在構造方法中set 進來,set進來是config 中配置的類型;electionType具體是在選舉算法的選擇:this.electionAlg = createElectionAlgorithm(electionType);
protected Election createElectionAlgorithm(int electionAlgorithm){Election le=null;//TODO: use a factory rather than a switchswitch (electionAlgorithm) {case 0:le = new LeaderElection(this);break;case 1:le = new AuthFastLeaderElection(this);break;case 2:le = new AuthFastLeaderElection(this, true);break;case 3:qcm = createCnxnManager();QuorumCnxManager.Listener listener = qcm.listener;if(listener != null){listener.start();le = new FastLeaderElection(this, qcm);} else {LOG.error("Null listener when initializing cnx manager");}break;default:assert false;}return le;}?
?
總結
以上是生活随笔為你收集整理的zookeeper 源码阅读(2)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 7499 元,华硕天选 4R 游戏本明日
- 下一篇: 【知识图谱】知识推理[通俗易懂](「知识