FastLeaderElection
FastLeaderElection是zookeeper默認的選舉算法,當peer處于ServerState.Looking狀態時會執行FastLeaderElection.lookForLeader進行選主.
重要數據結構:
1.HashMap<Long, Vote> recvset: 本輪選舉中來自 ServerState處于 Looking的 Peer的選票信息. ? 用于判斷是否選舉結束.
2.HashMap<Long, Vote> outofelection : 選舉之外的 peer發送的選票信息, 即 ?ServerState處于 Following和Leading的peer發送的信息 表示選舉已經結束了. ?用于判斷選舉是否結束.
重要函數:
totalOrderPredicate: 比較zxid的大小, 比較順序 epoch -> zxid - > serviceId
termPredicate : 通過判斷 Leader是否在 recvSet中占1/2以上來確定是否結束了選舉
ooePredicate : 通過recvSet和outofelection判斷是否結束了選舉.
選主主要邏輯如下:
1.更新邏輯時鐘+1,向其他peer發送選自己的提議
2.循環處理來自其他Peer的通知:
1) Looking的通知: ?如果通知中推薦的人比自己合適,則更新提議發送給其他peer,否則忽略. ? ?判斷選舉是否結束, 通過判斷 notification.leader 是否占?recvset的 1/2以上選票.
2)Leading或Following的通知: 如果收到這兩種消息說明選舉已經結束, 通過outofelection集合判斷.
?
public Vote lookForLeader() throws InterruptedException {......try {HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();int notTimeout = finalizeWait;synchronized(this){logicalclock++;updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id = " + self.getId() +", proposed zxid=0x" + Long.toHexString(proposedZxid));sendNotifications();/** Loop in which we exchange notifications until we find a leader*/while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){/** Remove next notification from queue, times out after 2 times* the termination time*/Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);/** Sends more notifications if haven't received enough.* Otherwise processes new notification.*/if(n == null){if(manager.haveDelivered()){sendNotifications();} else {manager.connectAll();}/** Exponential backoff*/int tmpTimeOut = notTimeout*2;notTimeout = (tmpTimeOut < maxNotificationInterval?tmpTimeOut : maxNotificationInterval);LOG.info("Notification time out: " + notTimeout);}else if(self.getVotingView().containsKey(n.sid)) {/** Only proceed if the vote comes from a replica in the* voting view.*///處理通知邏輯
switch (n.state) {case LOOKING:// If notification > current, replace and send messages outif (n.electionEpoch > logicalclock) {logicalclock = n.electionEpoch;recvset.clear();if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);} else {updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());}sendNotifications();} else if (n.electionEpoch < logicalclock) {if(LOG.isDebugEnabled()){LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"+ Long.toHexString(n.electionEpoch)+ ", logicalclock=0x" + Long.toHexString(logicalclock));}break;} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}if(LOG.isDebugEnabled()){LOG.debug("Adding vote: from=" + n.sid +", proposed leader=" + n.leader +", proposed zxid=0x" + Long.toHexString(n.zxid) +", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));}
//更新recvSet
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock, proposedEpoch))) {// Verify if there is any change in the proposed leaderwhile((n = recvqueue.poll(finalizeWait,TimeUnit.MILLISECONDS)) != null){
//半路殺出個程咬金if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)){recvqueue.put(n);break;}}//如果n不為空, 說明出現了比 自己推薦的人更適合當leader的peer出現了/** This predicate is true once we don't read any new* relevant message from the reception queue*/if (n == null) {self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState());Vote endVote = new Vote(proposedLeader,proposedZxid,logicalclock,proposedEpoch);leaveInstance(endVote);return endVote;}}break;case OBSERVING:LOG.debug("Notification from observer: " + n.sid);break;case FOLLOWING:case LEADING:/** Consider all notifications from the same epoch* together.*/
//邏輯時鐘相同說明處于同一輪選舉,需要更新recvSet后進行判斷
if(n.electionEpoch == logicalclock){recvset.put(n.sid, new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch));if(ooePredicate(recvset, outofelection, n)) {self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);leaveInstance(endVote);return endVote;}}
//新peer加入集群時需要判斷一下是不是當前大多數的peer都follow這個Leader了,recvSet必然為空,所以需要更新ooe來判斷是否結束了選舉
/** Before joining an established ensemble, verify* a majority is following the same leader.*/outofelection.put(n.sid, new Vote(n.version,n.leader,n.zxid,n.electionEpoch,n.peerEpoch,n.state));if(ooePredicate(outofelection, outofelection, n)) {synchronized(this){logicalclock = n.electionEpoch;self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());}Vote endVote = new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch);leaveInstance(endVote);return endVote;}break;default:LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",n.state, n.sid);break;}} else {LOG.warn("Ignoring notification from non-cluster member " + n.sid);}}return null;} finally {try {if(self.jmxLeaderElectionBean != null){MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);}} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}self.jmxLeaderElectionBean = null;}}
?
轉載于:https://www.cnblogs.com/ironroot/p/7403846.html
總結
以上是生活随笔為你收集整理的FastLeaderElection的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 交易所行情报盘程序配置
 - 下一篇: AC日记——Mato的文件管理 bzoj