通过 PhxPaxos 了解 Paxos 原理
通過 PhxPaxos 了解 Paxos 原理
Prepare階段
Prepare
// src/algorithm/proposer.cppvoid Proposer :: Prepare(const bool bNeedNewBallot) {PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),m_oProposerState.GetValue().size());BP->GetProposerBP()->Prepare();m_oTimeStat.Point();ExitAccept();//表明Proposer正處于Prepare階段m_bIsPreparing = true;//不能跳過Prepare階段m_bCanSkipPrepare = false;//目前還未被任意一個Acceptor拒絕m_bWasRejectBySomeone = false;m_oProposerState.ResetHighestOtherPreAcceptBallot();//如果需要產(chǎn)生新的投票,就調(diào)用NewPrepare產(chǎn)生新的ProposalID,新的ProposalID為當前已知的最大ProposalID+1if (bNeedNewBallot){m_oProposerState.NewPrepare();}PaxosMsg oPaxosMsg;//設置Prepare消息的各個字段oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);oPaxosMsg.set_instanceid(GetInstanceID());oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());//MsgCount是專門用來統(tǒng)計票數(shù)的,根據(jù)計算的結果確定是否通過Prepare階段或者Accept階段m_oMsgCounter.StartNewRound();//Prepare超時定時器AddPrepareTimer();PLGHead("END OK");//將Prepare消息發(fā)送到各個節(jié)點BroadcastMessage(oPaxosMsg); }Proposer在Prepare階段主要做了這么幾件事:
重置各個狀態(tài)位,表明當前正處于Prepare階段。
獲取提案編號ProposalID。當bNeedNewBallot為true時需要將ProposalID+1。否則沿用之前的ProposalID。bNeedNewBallot是在NewValue中調(diào)用Prepare方法時傳入的m_bWasRejectBySomeone參數(shù)。也就是如果之前沒有被任何Acceptor拒絕(說明還沒有明確出現(xiàn)更大的ProposalID),則不需要獲取新的ProposalID。對應的場景是Prepare階段超時了,在超時時間內(nèi)沒有收到過半Acceptor同意的消息,因此需要重新執(zhí)行Prepare階段,此時只需要沿用原來的ProposalID即可。
發(fā)送Prepare請求。該請求PaxosMsg是Protocol Buffer定義的一個message,包含MsgType、InstanceID、NodeID、ProposalID等字段。在BroadcastMessage(oPaxosMsg)中還會將oPaxosMsg序列化后才發(fā)送出去。
PaxosMsg的定義如下,Prepare和Accept階段Proposer和Acceptor的所有消息都用PaxosMsg來表示:
// src/comm/paxos_msg.protomessage PaxosMsg {required int32 MsgType = 1;optional uint64 InstanceID = 2;optional uint64 NodeID = 3;optional uint64 ProposalID = 4;optional uint64 ProposalNodeID = 5;optional bytes Value = 6;optional uint64 PreAcceptID = 7;optional uint64 PreAcceptNodeID = 8;optional uint64 RejectByPromiseID = 9;optional uint64 NowInstanceID = 10;optional uint64 MinChosenInstanceID = 11;optional uint32 LastChecksum = 12;optional uint32 Flag = 13;optional bytes SystemVariables = 14;optional bytes MasterVariables = 15; };OnPrepareReply
Proposer發(fā)出Prepare請求后就開始等待Acceptor的回復。當Proposer所在節(jié)點收到PaxosPrepareReply消息后,就會調(diào)用Proposer的OnPrepareReply(oPaxosMsg),其中oPaxosMsg是Acceptor回復的消息。
// src/algorithm/proposer.cppvoid Proposer :: OnPrepareReply(const PaxosMsg & oPaxosMsg) {PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(), oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());BP->GetProposerBP()->OnPrepareReply();//如果Proposer不是在Prepare階段,則忽略該消息if (!m_bIsPreparing){BP->GetProposerBP()->OnPrepareReplyButNotPreparing();//PLGErr("Not preparing, skip this msg");return;}//如果ProposalID不同,也忽略if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID()){BP->GetProposerBP()->OnPrepareReplyNotSameProposalIDMsg();//PLGErr("ProposalID not same, skip this msg");return;}//加入一個收到的消息,用于MsgCounter統(tǒng)計m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());//如果該消息不是拒絕,即Acceptor同意本次Prepare請求if (oPaxosMsg.rejectbypromiseid() == 0){BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu", oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());//加入MsgCounter用于統(tǒng)計投票m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());//將Acceptor返回的它接受過的編號最大的提案記錄下來(如果有的話),用于確定Accept階段的Valuem_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());}//Acceptor拒絕了Prepare請求else{PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());//同樣也要記錄到MsgCounter用于統(tǒng)計投票m_oMsgCounter.AddReject(oPaxosMsg.nodeid());//記錄被Acceptor拒絕過,待會兒如果重新進入Prepare階段的話就需要獲取更大的ProposalIDm_bWasRejectBySomeone = true;//記錄下別的Proposer提出的更大的ProposalID。這樣重新發(fā)起Prepare請求時才知道需要用多大的ProposalIDm_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());}//本次Prepare請求通過了。也就是得到了半數(shù)以上Acceptor的同意if (m_oMsgCounter.IsPassedOnThisRound()){int iUseTimeMs = m_oTimeStat.Point();BP->GetProposerBP()->PreparePass(iUseTimeMs);PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);m_bCanSkipPrepare = true;//進入Accept階段Accept();}//本次Prepare請求沒有通過else if (m_oMsgCounter.IsRejectedOnThisRound()|| m_oMsgCounter.IsAllReceiveOnThisRound()){BP->GetProposerBP()->PrepareNotPass();PLGImp("[Not Pass] wait 30ms and restart prepare");//隨機等待一段時間后重新發(fā)起Prepare請求AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);}PLGHead("END"); }該階段Proposer主要做了以下事情:
判斷消息是否有效。包括ProposalID是否相同,自身是否處于Prepare階段等。因為網(wǎng)絡是不可靠的,有些消息可能延遲很久,等收到的時候已經(jīng)不需要了,所以需要做這些判斷。
將收到的消息加入MsgCounter用于統(tǒng)計。
根據(jù)收到的消息更新自身狀態(tài)。包括Acceptor承諾過的ProposalID,以及Acceptor接受過的編號最大的提案等。
根據(jù)MsgCounter統(tǒng)計的Acceptor投票結果決定是進入Acceptor階段還是重新發(fā)起Prepare請求。這里如果判斷需要重新發(fā)起Prepare請求的話,也不是立即進行,而是等待一段隨機的時間,這樣做的好處是減少不同Proposer之間的沖突,采取的策略跟raft中l(wèi)eader選舉沖突時在一段隨機的選舉超時時間后重新發(fā)起選舉的做法類似。
注:這里跟Paxos算法中提案編號對應的并不是ProposalID,而是BallotNumber。BallotNumber由ProposalID和NodeID組成。還實現(xiàn)了運算符重載。如果ProposalID大,則BallotNumber(即提案編號)大。在ProposalID相同的情況下,NodeID大的BallotNumber大。
Accept 階段
接下來Proposer就進入Accept階段:
Accept
// src/algorithm/proposer.cppvoid Proposer :: Accept() {PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu", m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());BP->GetProposerBP()->Accept();m_oTimeStat.Point();ExitPrepare();m_bIsAccepting = true;//設置Accept請求的消息內(nèi)容PaxosMsg oPaxosMsg;oPaxosMsg.set_msgtype(MsgType_PaxosAccept);oPaxosMsg.set_instanceid(GetInstanceID());oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());oPaxosMsg.set_value(m_oProposerState.GetValue());oPaxosMsg.set_lastchecksum(GetLastChecksum());m_oMsgCounter.StartNewRound();AddAcceptTimer();PLGHead("END");//發(fā)給各個節(jié)點BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final); }Accept請求中 PaxosMsg里的Value是這樣確定的:如果Prepare階段有Acceptor的回復中帶有提案值,則該Value為所有的Acceptor的回復中,編號最大的提案的值。否則就是Proposer在最初調(diào)用NewValue時傳入的值。
OnAcceptReply
// src/algorithm/proposer.cppvoid Proposer :: OnAcceptReply(const PaxosMsg & oPaxosMsg) {PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(), oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());BP->GetProposerBP()->OnAcceptReply();if (!m_bIsAccepting){//PLGErr("Not proposing, skip this msg");BP->GetProposerBP()->OnAcceptReplyButNotAccepting();return;}if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID()){//PLGErr("ProposalID not same, skip this msg");BP->GetProposerBP()->OnAcceptReplyNotSameProposalIDMsg();return;}m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());if (oPaxosMsg.rejectbypromiseid() == 0){PLGDebug("[Accept]");m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());}else{PLGDebug("[Reject]");m_oMsgCounter.AddReject(oPaxosMsg.nodeid());m_bWasRejectBySomeone = true;m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());}if (m_oMsgCounter.IsPassedOnThisRound()){int iUseTimeMs = m_oTimeStat.Point();BP->GetProposerBP()->AcceptPass(iUseTimeMs);PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);ExitAccept();//讓Leaner學習被選定(Chosen)的值m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());}else if (m_oMsgCounter.IsRejectedOnThisRound()|| m_oMsgCounter.IsAllReceiveOnThisRound()){BP->GetProposerBP()->AcceptNotPass();PLGImp("[Not pass] wait 30ms and Restart prepare");AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);}PLGHead("END"); }這里跟OnPrepareReply的過程基本一致。比較大的區(qū)別在于最后如果過半的Acceptor接受了該Accept請求,則說明該Value被選定(Chosen)了,就發(fā)送消息,讓每個節(jié)點上的Learner學習該Value。
Acceptor
OnPrepare
OnPrepare用于處理收到的Prepare請求,邏輯如下:
int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg) {PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());BP->GetAcceptorBP()->OnPrepare();PaxosMsg oReplyPaxosMsg;oReplyPaxosMsg.set_instanceid(GetInstanceID());oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);//構造接收到的Prepare請求里的提案編號BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());//提案編號大于承諾過的提案編號if (oBallot >= m_oAcceptorState.GetPromiseBallot()){PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu ""State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID,m_oAcceptorState.GetAcceptedBallot().m_llProposalID,m_oAcceptorState.GetAcceptedBallot().m_llNodeID);//返回之前接受過的提案的編號oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);//如果接受過的提案編號大于0(<=0說明沒有接受過提案),則設置接受過的提案的Valueif (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0){oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());}//更新承諾的提案編號為新的提案編號(因為新的提案編號更大)m_oAcceptorState.SetPromiseBallot(oBallot);//信息持久化int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());if (ret != 0){BP->GetAcceptorBP()->OnPreparePersistFail();PLGErr("Persist fail, Now.InstanceID %lu ret %d",GetInstanceID(), ret);return -1;}BP->GetAcceptorBP()->OnPreparePass();}//提案編號小于承諾過的提案編號,需要拒絕else{BP->GetAcceptorBP()->OnPrepareReject();PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID);//拒絕該Prepare請求,并返回承諾過的ProposalID oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);}nodeid_t iReplyNodeID = oPaxosMsg.nodeid();PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",GetInstanceID(), oPaxosMsg.nodeid());;//向發(fā)出Prepare請求的Proposer回復消息SendMessage(iReplyNodeID, oReplyPaxosMsg);return 0; }OnAccept
再來看看OnAccept:
void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg) {PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());BP->GetAcceptorBP()->OnAccept();PaxosMsg oReplyPaxosMsg;oReplyPaxosMsg.set_instanceid(GetInstanceID());oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());//提案編號不小于承諾過的提案編號(注意:這里是“>=”,而再OnPrepare中是“>”,可以先思考下為什么),需要接受該提案if (oBallot >= m_oAcceptorState.GetPromiseBallot()){PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu ""State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID,m_oAcceptorState.GetAcceptedBallot().m_llProposalID,m_oAcceptorState.GetAcceptedBallot().m_llNodeID);//更新承諾的提案編號;接受的提案編號、提案值m_oAcceptorState.SetPromiseBallot(oBallot);m_oAcceptorState.SetAcceptedBallot(oBallot);m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());//信息持久化int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());if (ret != 0){BP->GetAcceptorBP()->OnAcceptPersistFail();PLGErr("Persist fail, Now.InstanceID %lu ret %d",GetInstanceID(), ret);return;}BP->GetAcceptorBP()->OnAcceptPass();}//需要拒絕該提案else{BP->GetAcceptorBP()->OnAcceptReject();PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID);//拒絕的消息中附上承諾過的ProposalIDoReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);}nodeid_t iReplyNodeID = oPaxosMsg.nodeid();PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",GetInstanceID(), oPaxosMsg.nodeid());//將響應發(fā)送給ProposerSendMessage(iReplyNodeID, oReplyPaxosMsg); }總結
以上是生活随笔為你收集整理的通过 PhxPaxos 了解 Paxos 原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一致性算法- Paxos
- 下一篇: go-ethereum环境搭建及目录结构