用 Go 语言实现 Raft 选主
用 Go 語言實現(xiàn) Raft 選主
選主模塊主要包括三大功能:
- candidate狀態(tài)下的選主功能
- leader狀態(tài)下的心跳廣播功能
- follower狀態(tài)下的確認(rèn)功能
candidate狀態(tài)下的選主功能
candidate狀態(tài)下的選主功能需要關(guān)注兩個方面:
- 何時進(jìn)入candidate狀態(tài),進(jìn)行選主?
- 選主的邏輯是怎樣的?
首先,來討論何時進(jìn)入candidate狀態(tài),進(jìn)行選主。
在一定時間內(nèi)沒有收到來自leader或者其他candidate的有效RPC時,將會觸發(fā)選主。這里需要關(guān)注的是有效兩個字,要么是leader發(fā)的有效的心跳信息,要么是candidate發(fā)的是有效的選主信息,即server本身確認(rèn)這些信息是有效的后,才會重新更新超時時間,超時時間根據(jù)raft論文中推薦設(shè)置為[150ms,300ms],并且每次是隨機(jī)生成的值。
其次,來討論選主的邏輯。
server首先會進(jìn)行選主的初始化操作,即server會增加其term,把狀態(tài)改成candidate,然后選舉自己為主,并把選主的RPC并行地發(fā)送給集群中其他的server,根據(jù)返回的RPC的情況的不同,做不同的處理:
- 該server被選為leader
- 其他的server選為leader
- 一段時間后,沒有server被選為leader
針對情況一,該server被選為leader,當(dāng)前僅當(dāng)在大多數(shù)的server投票給該server時。當(dāng)其被選為主時,會立馬發(fā)送心跳消息給其他的server,來表明其已經(jīng)是leader,防止發(fā)生新的選舉。
針對情況二,其他的server被選為leader,它會收到leader發(fā)送的心跳信息,此時,該server應(yīng)該轉(zhuǎn)為follower,然后退出選舉。
針對情況三,一段時間后,沒有server被選為leader,這種情況發(fā)生在沒有server獲得了大多數(shù)的server的投票情況下,此時,應(yīng)該發(fā)起新一輪的選舉。
leader狀態(tài)下的心跳廣播功能
當(dāng)某個server被選為leader后,需要廣播心跳信息,表明其是leader,主要在以下兩個場景觸發(fā):
- server剛當(dāng)選為leader
- server周期性的發(fā)送心跳消息,防止其他的server進(jìn)入candidate選舉狀態(tài)
leader廣播心跳的邏輯為,如果廣播的心跳信息得到了大多數(shù)的server的確認(rèn),那么更新leader自身的選舉超時時間,防止發(fā)生重新選舉。
follower狀態(tài)下的確認(rèn)功能
主要包括對candidate發(fā)的選舉RPC以及l(fā)eader發(fā)來的心跳RPC的確認(rèn)功能。
對于選舉RPC,假設(shè)candidate c發(fā)送選舉RPC到該follower,由于follower每個term只能選舉一個server,因此,只有當(dāng)一個follower沒有選舉其他server的時候,并且選舉RPC中的candidate c的term大于或等于follower的term時,才會返回選舉當(dāng)前candidate c為主,否則,則返回拒絕選舉當(dāng)前candidate c為主。
對于leader的心跳RPC,如果leader的心跳的term大于或等于follower的term,則認(rèn)可該leader的心跳,否則,不認(rèn)可該leader的心跳。
代碼實現(xiàn)
candidate狀態(tài)下的選主功能
根據(jù)前面描述,主要的邏輯為
- 等待選舉超時
- 增加term,置狀態(tài)為follower,并且選舉自己為leader
- 向其他的server并行地發(fā)送選舉RPC,直到碰到上述描述的三種情況退出
首先等待選舉超時,超時后,會進(jìn)入真正的選舉邏輯election_one_round()
首先,進(jìn)入candidate狀態(tài),增加其term,然后,選舉自己。
func (rf *Raft) becomeCandidate() { rf.state = 1 rf.setTerm(rf.currentTerm + 1)rf.votedFor = rf.merf.currentLeader = -1 }接著,向除自己外的server發(fā)送選舉RPC,等待server的回復(fù)
fmt.Printf("candidate=%d start electing leader\n", rf.me)for {for i := 0; i < len(rf.peers); i++ {if i != rf.me {var args RequestVoteArgsserver := iargs.Term = rf.currentTermargs.CandidateId = rf.mevar reply RequestVoteReplyprintTime()fmt.Printf("candidate=%d send request vote to server=%d\n", rf.me, i)go rf.sendRequestVoteAndTrigger(server, args, &reply, rpcTimeout)}}done = 0triggerHeartbeat = falsefor i := 0; i < len(rf.peers)-1; i++ {printTime()fmt.Printf("candidate=%d waiting for select for i=%d\n", rf.me, i)select {case ok := <-rf.electCh:if ok {done++success = done >= len(rf.peers)/2 || rf.currentLeader > -1success = success && rf.votedFor == rf.meif success && !triggerHeartbeat {triggerHeartbeat = truerf.mu.Lock()rf.becomeLeader()rf.mu.Unlock()rf.heartbeat <- trueprintTime()fmt.Printf("candidate=%d becomes leader\n", rf.currentLeader)}}}printTime()fmt.Printf("candidate=%d complete for select for i=%d\n", rf.me, i)}if (timeout+last < milliseconds()) || (done >= len(rf.peers)/2 || rf.currentLeader > -1) {break} else {select {case <-time.After(time.Duration(10) * time.Millisecond):}}}printTime()fmt.Printf("candidate=%d receive votes status=%t\n", rf.me, success)return success當(dāng)成功返回數(shù)目到多數(shù)派時(包含自己在內(nèi)),則宣布自己稱為leader,即becomeLeader(),如下
func (rf *Raft) becomeLeader() {rf.state = 2rf.currentLeader = rf.me }即,修改自身狀態(tài)為leader。然后,給發(fā)送心跳的線程發(fā)送 rf.heartbeat <-true,通知心跳線程開始發(fā)心跳包。
leader狀態(tài)下的廣播心跳功能
首先,來看觸發(fā)心跳的邏輯
func (rf *Raft) sendLeaderHeartBeat() {timeout := 20for { select {case <-rf.heartbeat:rf.sendAppendEntriesImpl()case <-time.After(time.Duration(timeout) * time.Millisecond):rf.sendAppendEntriesImpl()} } }分為兩個方面:
第一個為剛當(dāng)選為leader后,需要馬上發(fā)送心跳信息,防止新的選舉發(fā)生
第二個是leader周期性的發(fā)送心跳信息,來宣布自己為主
真正的廣播心跳的邏輯如下:
func (rf *Raft) sendAppendEntriesImpl() {if rf.currentLeader == rf.me {var args AppendEntriesArgsvar success_count inttimeout := 20args.LeaderId = rf.meargs.Term = rf.currentTermprintTime()fmt.Printf("broadcast heartbeat start\n")for i := 0; i < len(rf.peers); i++ {if i != rf.me {var reply AppendEntriesReplyprintTime()fmt.Printf("Leader=%d send heartbeat to server=%d\n", rf.me, i)go rf.sendHeartBeat(i, args, &reply, timeout)}}for i := 0; i < len(rf.peers)-1; i++ {select {case ok := <-rf.heartbeatRe:if ok {success_count++if success_count >= len(rf.peers)/2 {rf.mu.Lock()rf.setMessageTime(milliseconds())rf.mu.Unlock()}}}}printTime()fmt.Printf("broadcast heartbeat end\n")if success_count < len(rf.peers)/2 {rf.mu.Lock()rf.currentLeader = -1rf.mu.Unlock()}} }先是向集群中所有的其他server廣播心跳,分為兩種結(jié)果:
收到了大多數(shù)server的確認(rèn),則更新leader的超時時間,防止重新進(jìn)入選舉狀態(tài)
未收到大多數(shù)server的確認(rèn),則會退出發(fā)送心跳的邏輯,即置currentLeader = -1,此后,自然會有選舉超時的server重新發(fā)起選舉
follower狀態(tài)下的確認(rèn)功能
包括對選舉RPC的確認(rèn)已經(jīng)對心跳RPC的確認(rèn)。
選舉RPC的確認(rèn)邏輯如下
func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {// Your code here.currentTerm, _ := rf.GetState()if args.Term < currentTerm {reply.Term = currentTermreply.VoteGranted = falseprintTime() fmt.Printf("candidate=%d term = %d smaller than server = %d, currentTerm = %d\n", args.CandidateId, args.Term, rf.me, rf.currentTerm)return } if rf.votedFor != -1 && args.Term <= rf.currentTerm {reply.VoteGranted = falserf.mu.Lock()rf.setTerm(max(args.Term, currentTerm))reply.Term = rf.currentTermrf.mu.Unlock()printTime() fmt.Printf("rejected candidate=%d term = %d server = %d, currentTerm = %d, has_voted_for = %d\n", args.CandidateId, args.Term, rf.me, rf.currentTerm, rf.votedFor)} else { rf.mu.Lock()rf.becomeFollower(max(args.Term, currentTerm), args.CandidateId)rf.mu.Unlock()reply.VoteGranted = truefmt.Printf("accepted server = %d voted_for candidate = %d\n", rf.me, args.CandidateId)} }如果當(dāng)前server的term大于candidate的term,或者當(dāng)前server已經(jīng)選舉過其他server為leader了,那么返回拒絕的RPC,否則,則返回成功的RPC,并置自身狀態(tài)為follower。
心跳的RPC的邏輯如下
func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {if args.Term < rf.currentTerm {reply.Success = falsereply.Term = rf.currentTerm} else { reply.Success = truereply.Term = rf.currentTermrf.mu.Lock() rf.currentLeader = args.LeaderIdrf.votedFor = args.LeaderIdrf.state = 0 rf.setMessageTime(milliseconds())printTime() fmt.Printf("server = %d learned that leader = %d\n", rf.me, rf.currentLeader)rf.mu.Unlock()} }如果follower的term大于leader的term,則返回拒絕的RPC,否則,返回成功的RPC。
總結(jié)
以上是生活随笔為你收集整理的用 Go 语言实现 Raft 选主的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Raft 集群成员变更、日志压缩、客户端
- 下一篇: 一致性算法- Paxos