MIT 6.824 Lab2A (raft) -- Leader Election
文章目錄
- 實驗要求
- Leader Election流程 及詳細實現介紹
- 基本角色
- 關鍵超時變量
- 關鍵的兩個RPC實現
- RequestVote RPC
- AppendEntries RPC
- Go并發編程實現leader election調度
本節記錄的是完成MIT6.824 raft lab的leader Election部分實驗。
代碼: https://github.com/BaronStack/MIT-6.824-lab, clone之后git checkout lab2-2A
實驗要求
這里是raft lab2 的2A部分,也是這個lab的一個基礎部分。主要完成的raft功能是 leader election 和 heartbeat 心跳。即 集群選舉從初始化狀態選舉出一個leader,且在集群沒有異常的情況下這個leader會通過heartbeat心跳一直保持自己的leader狀態。
詳細的功能可以 通過test_test.go的兩個測試看看2A這里主要的功能是什么?
func TestInitialElection2A(t *testing.T) {servers := 3 //初始化三個peercfg := make_config(t, servers, false) // 完成初使選舉defer cfg.cleanup()cfg.begin("Test (2A): initial election")// is a leader elected?cfg.checkOneLeader() // 檢查leader是否選舉出來且只有一個// sleep a bit to avoid racing with followers learning of the// election, then check that all peers agree on the term.time.Sleep(50 * time.Millisecond)// 完成leader選舉之后,當前leader任期內的term 大于等于初始化的term// 且后續沒有網絡異常的情況下這個term不會發生變化term1 := cfg.checkTerms()if term1 < 1 {t.Fatalf("term is %v, but should be at least 1", term1)}// does the leader+term stay the same if there is no network failure?time.Sleep(2 * RaftElectionTimeout)// 過了一段時間,確保term不會發生變化term2 := cfg.checkTerms()if term1 != term2 {fmt.Printf("warning: term changed even though there were no failures")}// there should still be a leader.// 仍然只有一個leadercfg.checkOneLeader()cfg.end()
}
后面的一個測試是針對leader election過程中的其他異常情況進行的,詳細代碼可以看看test_test.go 的 TestReElection2A函數的測試內容:
- 三個peer選舉出一個leader
- 一個peer異常,leader能夠正常選出來
- 兩個peer異常,leader選舉不出來,因為已經超過大多數異常了
- 恢復了一個peer之后有兩個peer,能夠選舉出來一個leader
- 再加入一個peer之后不影響之前正常的leader
整體來看就是一個完整的leader election的實現。
Leader Election流程 及詳細實現介紹
基本角色
這里的角色在實際raft相關的應用中是以服務進程的形式存在的。
follower,所有角色開始時的狀態,等待接受leader心跳RPCs,如果收不到則會變成CandidateCandidate,候選人。是變成Leader的上一個角色,候選人會向其他所有節點發送RequestVote RPCs,如果收到集群大多數的回復,則會將自己角色變更為Leader,并發送AppendEntries RPCs。Leader,集群的皇帝/主人…,raft能夠保證每一個集群僅有一個leader。負責和客戶端進行通信,并將客戶端請求轉發給集群其他成員。
代碼中定義了三種常量表示peer不同的state:
const (STATE_FOLLOWER = iota // 0STATE_CANDIDATESTATE_LEADERHBINTERVAL = 50 * time.Millisecond // 50ms 心跳間隔
)
關鍵超時變量
-
Election Timeout選舉超時時間。即Cadidate 向集群其他節點發送vote請求時,如果在Election Timeout時間內沒有收到大多數的回復,則會重新發送vote rpc。以上將實際
RequestVote簡寫為vote ,就是請求投票的rpc一般這個超時時間是在
150-300ms的隨機時間,為了防止集群出現頻繁的 split vote 影響leader選舉效率的情況,將這個超時時間取在155-300ms范圍內的隨機時間。當然,這個數值也是經過測試的,超時時間設置在150-300ms 之間能夠保證raft集群 leader的穩定性,也可以將超時時間設置的比較低(12-24ms),但是存在的網絡延遲則會導致一些不必要的leader選舉。隨機超時時間的設定實現如下,因為看到有很多完成6.824的伙伴有說這里超市時間是個坑,測試數百上千次可能無法保證每次都能在超市時間內選舉出leader,目前還沒有遇到:
time.After(time.Duration(rand.Int63() % 333 + 550) * time.Millisecond) //這里設置的是550-880ms之間關于splite vote的情況可以看如下圖,圖片來自raft可視化官網:
兩個節點收到對方的vote請求之前變成了candidate,發送了各自的request vote。 -
Heartbeats Timeout心跳超時時間。follower接受來自leader的心跳,如果在heartbeats timeout這個時間段內follower沒有收到來自leader的AppendEntries RPCs,則follower會重新觸發選舉。收到了,則重置follower 本地的 heartbeats timeout。 -
TermLeader選舉過程中除了之前提到的基本變量,還會有一個Term 的概念。
,每一個term的變更不一定表示Leader一定會被選舉出來了。
上圖中的 term3 則完全沒有選出leader,這種情況的出現就是上文中描述的splite vote的情況,這個時候Term也會增加,當時并沒有leader 被選出來,在ceph/zookeeper中 其實就類比于Epoch。
關鍵的兩個RPC實現
在講實際的RequestVoterpc和SendRequestVote實現之前我們先來看看什么是RPC(remote procedure call)遠程進程調用。
我們知道raft維護的是一個集群多臺機器之間的共識狀態,那需要這個集群內的機器之間頻繁得進行數據傳輸。而我們希望實際發送過去得不僅僅是數據流,還有可以執行產生數據流的函數,這樣能夠高效得完成一些邏輯上的數據處理。比如,raft中我們將RequestVote封裝成一個函數,將本地的peer狀態作為參數和整個函數一起發送到遠端的機器,遠端的機器能夠根據發送過來的peer狀態通過RequestVote內部邏輯來決定自己本地的行為。這個過程如果純粹得通過網絡發送數據包,顯然需要大量的數據傳輸,所以RPC也就應運而生了。
實現RPC的話 不像我們本地服務器進程之間通信或者進程內部的函數調用這么簡單方便,因為是跨服務器的,之間的信息交流只能通過網絡。我們想要讓本地的函數在遠端也能夠執行,需要實現如下幾個機制:
- Call ID映射。保證本地和遠端服務器都能夠通過這個映射找到唯一的函數指針執行
- 序列化和反序列化。需要將函數參數進行序列化成字節流 通過網絡傳輸到遠端,遠端服務器再進行反序列化解析得到參數。
- 網絡傳輸。需要通過網絡協議將Call ID、序列化和反序列化數據 發送到遠端。這里的協議并不會有太多的限制,TCP/UDP/HTTP等都可以。
輕量級得RPC的實現感興趣的同學可以看看labrpc.go,對于RPC過程中需要處理的網絡異常或者流量控制這樣的需求 學習gRPC或者bRPC等C++實現也是很經典的。
RequestVote RPC
這個RPC存在的目的是為了選舉leader,即集群中有peer變成了candidate狀態時就會發送RequestVote rpc。
-
RequestVote RPCs以上兩個超時過程也說了,投票是通過rpc請求實現的,且當有RequestVote 出現時,說明發送的peer本省的state已經是處于Candidate了實際的RPC-args和reply結構體如下:
type RequestVoteArgs struct {// Your data here (2A, 2B).Term int // current candidate's termCandidateId int // candidate's id requesting voteLastLogIndex int // index of current candidate's last log entryLastLogTerm int // term of current candidate's last log entry }// // example RequestVote RPC reply structure. // field names must start with capital letters! // type RequestVoteReply struct {// Your data here (2A).Term int // current term, for candidate to update itselfVoteGranted bool // true means candidate received vote }我們的raft中
RequestVote的實現中,如果想要接收這個rpc的peer為發送的rpc即RequestVoteArgs投票,則需要滿足以下幾個條件:-
Receive-peer 的 term > send-peer 的term,則receiver-peer保留自己本身的狀態,畢竟Term都比請求投票的peer term新
-
為了保證一致性,當send-peer的term 滿足大于等于receive-peer的term的時候需要比較上一個term是否比receive-peer的上一個term新,如果是相等,還需要確認上一個log index是否更新。這一些都滿足之后才能更新receive-peer的狀態為follower 以及 投票的id。即receive-peer認可了send-peer是leader。
除了最開始的term的比較之外,后續的last-term以及last-log-index 都是為了保證選舉出來的leader能夠擁有最更新的日志。
代碼實現如下:
func (rf *Raft) RequestVote(args RequestVoteArgs,reply *RequestVoteReply) {// Your code here (2A, 2B).rf.mu.Lock()defer rf.mu.Unlock()reply.VoteGranted = falseif args.Term < rf.currentTerm { // 判斷term,rf.currentTerm是receiver-peer的term// send-peer的term沒有receiver-peer的term新,直接返回reply.Term = rf.currentTermreturn}// send-peer的term更新,則更新receive-peer的state和本地term// 如果兩者相等, 則需要繼續后續的last-term和last-index的判斷if args.Term > rf.currentTerm { rf.currentTerm = args.Termrf.state = STATE_FOLLOWERrf.voteFor = -1}reply.Term = rf.currentTermlast_term := rf.GetLastTerm()last_index := rf.GetLastIndex()update := false// only the leader have the newer term and log-index than current peer// then we could vote for the peerif args.LastLogTerm > last_term {update = true}if args.LastLogTerm == last_term && args.LastLogIndex >= last_index {update = true}// 都滿足send-peer擁有更全的日志,receive-peer才會選擇去更新本地相關狀態和跟進termif (rf.voteFor == -1 || rf.voteFor == args.CandidateId) && update {rf.chanGrantVote <- truerf.state = STATE_FOLLOWERreply.VoteGranted = truerf.voteFor = args.CandidateId // 投票給send-peer的peer id} } -
-
sendAppendEntries的實現 大體是在send-peer端在發送完rpc接收到reply之后的處理邏輯
- 在收到
RequestVote之后,檢查發現當前state的狀態已經發生變化了,則保持這個狀態直接返回(在此期間可能收到了AppendEntries RPC ,則會直接變更為follower) - term 發生了變化,則認為當前peer在收到自己發送的rpc回復之前收到別人的rpc且為別人投了票,也就是狀態也發生了變化
- 自己還是保持的發送之前的state和term,只是收到回復的term比自己的term大,那將自己狀態變更為follower
- 收到的回復中發現別人投給自己一票,那就準備將自己變更為leader
func (rf *Raft) sendRequestVote(server int, args RequestVoteArgs, reply *RequestVoteReply) bool {ok := rf.peers[server].Call("Raft.RequestVote", args, reply)rf.mu.Lock()defer rf.mu.Unlock()if ok {// find that the current peer's state changed , return okif rf.state != STATE_CANDIDATE {return ok}// keep the current peer's state, our state have been changedterm := rf.currentTermif args.Term != term {return ok}if reply.Term > term {rf.currentTerm = reply.Termrf.state = STATE_FOLLOWERrf.voteFor = -1}// 別人的回復認可了自己,投了自己一票if reply.VoteGranted {rf.voteCount ++// 確認自己的投票總數超過半數,則通過channel 標記自己成為leaderif rf.state == STATE_CANDIDATE && rf.voteCount > len(rf.peers)/2 {rf.state = STATE_FOLLOWERrf.chanLeader <- true}}}return ok
}
關于Term在candidate 投票過程中發生的變化 如下圖。
AppendEntries RPC
這個rpc是leader維護自己狀態的,每隔一段時間像其他的follower發送AppendEntries,這段時間的集群term不會發生變化。并且AppendEntries也會攜帶著log-entry 更新log index。
-
AppendEntries RPCsleader 同步數據時的rpc請求。其發送和接收回復的結構體形態如下:
// AppendEntries RPC args type AppendEntriesArgs struct {Term int // leader 的termLeaderId int // leader 所在peer的idPrevLogIndex int // leader上一個log indexPrevLogTerm int // leader 上一個log的termEntries []LogEntry // leader 存放的logLeaderCommit int // leader 已經commit的index }// AppendEntries RPC reply type AppendEntriesReply struct {Term int // 當前peer回復給leader的term,leader用來判斷是否需要變更自身的狀態Success bool // 當前peer是否仍然認可leaderNextIndex int // 下一個log entry的index內容 }AppendEntries 中主要做的事情如下(L表示leader,P表示收到RPC的peer):
- 發現L-term < P-term,這個時候認為集群發生了異常,返回success為false表示當前peer不認可leader的任期了
- 檢查L-prevLogIndex和P-LogIndex是否匹配,如過發現L-prevLogIndex更 新,則認為follower的log不全,需要從leader補充,那需要找到和P-LogIndex 匹配的index,將找到的index+1返回給leader。這個過程其實就是leader補全和follower之間的日志差異,需要向前找到leader和follower所處的同一個term的同一個index才能返回。
當然,第二點其實是lab 2B要做的事情,這個無關于leader election。
看一下實現
func (rf *Raft) AppendEntries(args AppendEntriesArgs,reply *AppendEntriesReply) {rf.mu.Lock()defer rf.mu.Unlock()// 發現L-term < P-term,認為集群發生了異常,將當前peer的term返回回去reply.Success = falseif args.Term < rf.currentTerm {reply.Term = rf.currentTermreply.NextIndex = rf.GetLastIndex() + 1return}// 如果是Term正常的,那就直接填充channel,告訴leader當前peer仍然是followerrf.chanHeartbeat <- trueif args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.state = STATE_FOLLOWERrf.voteFor = -1}reply.Term = args.Termif args.PrevLogIndex > rf.GetLastIndex() {reply.NextIndex = rf.GetLastIndex() + 1return}baseIndex := rf.log[0].LogIndex// 對PrevLogIndex的檢查,確保follower的entry是和leader的log entry同步的if args.PrevLogIndex > baseIndex {term := rf.log[args.PrevLogIndex-baseIndex].LogTermif args.PrevLogTerm != term {for i := args.PrevLogIndex - 1 ; i >= baseIndex; i-- {if rf.log[i-baseIndex].LogTerm != term {reply.NextIndex = i + 1break}}return}}if args.PrevLogIndex < baseIndex {} else {rf.log = rf.log[: args.PrevLogIndex+1-baseIndex]rf.log = append(rf.log, args.Entries...)reply.Success = truereply.NextIndex = rf.GetLastIndex() + 1}return } -
sendAppendEntries是leader 發送完AppendEntriesRPC之后的一些處理邏輯func (rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, reply *AppendEntriesReply) bool {ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)rf.mu.Lock()defer rf.mu.Unlock()if ok {// 發送RPC之后發現當前leader的狀態和term發生了變化,直接返回吧// 可能在此期間收到其他peer的rpc擁有更高的term,也會讓自身的狀態和term發生變化if rf.state != STATE_LEADER {return ok}if args.Term != rf.currentTerm {return ok}// 如果leader自身沒有變化,但是發現收到回復的term比自己的term新// 只能認為自己的follower了if reply.Term > rf.currentTerm {rf.currentTerm = reply.Termrf.state = STATE_FOLLOWERrf.voteFor = -1return ok}// 更新logindexif reply.Success {if len(args.Entries) > 0 {rf.nextIndex[server] = args.Entries[len(args.Entries) - 1].LogIndex + 1rf.matchInex[server] = rf.nextIndex[server] - 1}} else {rf.nextIndex[server] = reply.NextIndex}}return ok }
從上面的AppendEntries和RequestVote兩種RPC我們大體清楚了在leader Election過程中的節點狀態變化的情況。
總體來說就是當Follower長時間沒有收到心跳的時候就會變成 Candidate,Candidate通過RequestVote邏輯對一些term新舊的判斷或者logIndex新舊的判斷進行投票從而選擇term最新且log最全的peer作為leader,不認為自己能夠當選leader 的peer同時也會將自己的狀態變更為follower;leader會不斷得向follower發送AppendEntries 來維持自己的leader狀態。當集群發生異常(宕機的舊leader重新啟動,收到了新Leader的狀態信息)則會將自己標記為Follower。
如下圖:
Go并發編程實現leader election調度
我們在RPC中已經將大多數的核心實現已經描述清楚了, 接下來就是在外部構造集群需要的多個peer ,每個peer通過接收發送RPC來維護自己的外部狀態機的行為,從而更好得在三種狀態之間變遷。
根據Lab 2A的要求,會檢查集群從最開始沒有leader的狀態進行選舉,完成leader選舉;到集群正常運行時模擬節點網絡異常進行leader選舉。
這里面需要用到GO并發的一些知識 多路選擇 + 超時控制 + CSP(communicating sequential processes),能夠體會到GO語言在并發編程下的強大。
調度這里我們要做的事情就是:
-
初始化幾個peer
-
每一個peer維護一個狀態機,每一個狀態下去調度各自狀態的邏輯。
-
Follower
a. 對candidate和leader的rpc進行回復
b. 如果超市時間內沒有收到AppendEntries rpc 或者 收到candidate的投票,會講自己的狀態轉為candidate -
Candidates
投票過程中會做的事情:
a. 增加當前peer的term
b. 為自己投票
c. 重置選舉超市時間
d. 發送RequestVote RPC 發送給其他peer如果發送的rpc收到的回復大多數都認可自己,那就變成leader
如果收到了AppendEntries RPC, 那就變成follower,說明有其他人在選舉
如果超時時間過期了,那就開啟一個新的term -
Leader
選舉過程中的leader主要是發送AppendEntries RPC來維護自己的term
-
看看具體的實現(僅僅是leader選舉的部分,并沒有處理持久化的log信息):
這個Make的調用會在測試代碼通過make_config --> Start1 --> Make初始化三個peer
func Make(peers []*labrpc.ClientEnd, me int,persister *Persister, applyCh chan ApplyMsg) *Raft {rf := &Raft{}rf.peers = peersrf.persister = persisterrf.me = me// Your initialization code here (2A, 2B, 2C).// 初始化當前peerrf.state = STATE_FOLLOWERrf.voteFor = -1rf.voteCount = 0rf.log = append(rf.log, LogEntry{LogTerm: 0})// 這里維護了幾個channel,在后續變更peer狀態的時候會從channel中取數據// heartbeat和requestVote 的兩個計時器也都是依賴channel來實現的// channel填充的話則是在我們前面實現的RPC之中rf.chanLeader = make(chan bool, 100) // 變更為leaderrf.chanHeartbeat = make(chan bool, 100) // 接收到heartbeat心跳rf.chanGrantVote = make(chan bool, 100) // 投票完成rf.chanApply = applyCh// 啟動一個go routine,來維護當前peer的狀態機go func() {for {switch rf.state {case STATE_FOLLOWER:select {// 有一段時間接受不到心跳,或者收到Candidate的投票// 則將當前follower狀態變更為candidate,準備進行leader electioncase <- rf.chanHeartbeat:case <- rf.chanGrantVote:case <-time.After(time.Duration(rand.Int63() % 333 + 550) * time.Millisecond): // 計時器rf.state = STATE_CANDIDATE} case STATE_LEADER:rf.broadCastAppendEntries() //leader 廣播AppendEntries,有log的話會攜帶著log-indextime.Sleep(HBINTERVAL)case STATE_CANDIDATE:rf.mu.Lock()rf.currentTerm ++ // 增加當前term,表示開啟了一個新一輪的leader任期rf.voteFor = rf.me // 每個candidate先為自己投票 rf.voteCount = 1 // 投票計數自增,后續通過這個計數判斷是否能夠成為leaderrf.mu.Unlock()go rf.broadCastReqeustVote() // candidate 向除自己之外的其他peer廣播RequestVoteselect {case <-time.After(time.Duration(rand.Int63() % 333 + 550) * time.Millisecond):case <-rf.chanHeartbeat: // chanHeartbeat為真,收到了AppendEntries,則變更為follower(已經有leader了)rf.state = STATE_FOLLOWERcase <-rf.chanLeader: // 在處理RequestVote返回的邏輯中發現自己能夠成為leader,變更為leaderrf.mu.Lock()rf.state = STATE_LEADER// 調整后續要發送的rf.nextIndex = make([]int,len(rf.peers))rf.matchInex = make([]int,len(rf.peers))for i := range rf.peers {rf.nextIndex[i] = rf.GetLastIndex() + 1rf.matchInex[i] = 0}rf.mu.Unlock()}}}}()
}
需要注意的是go的channel機制如果不初始化buffer,則會是阻塞的,一個channel 會一直阻塞在這段超時時間內 直到拿到了值。
ch := make(chan bool, 10) //設置大小為10的buffer,如果不設置buffer大小,則后續取值的時候會阻塞ch <- true // 向ch中填值
ret := <- ch // 從ch取值
所以在raft的go實現中 針對channel變量的設置 都會有 buffer,從而防止其他routine獲取channel 值時阻塞。
總結
以上是生活随笔為你收集整理的MIT 6.824 Lab2A (raft) -- Leader Election的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 100个单位的溶解酶能溶解多少玻尿酸
- 下一篇: 为什么都做试管婴儿