Pre-Vote在Etcd中的实现

摘要:Raft利用Pre-Vote解决了server rejoin过程时造成的Disruptive(不必要的重新选举)问题,本文探究了该算法在Etcd中是如何实现的。

利用Pre-Vote优化Leader Election

Diego Ongaro在其博士论文”Consensus: Bridging Theory And Practice”中描述了以下规则:

server收到term > currentTerm时,设置currentTerm = term并转为follower。

当系统曾出现分区,并在分区消失后,某个被split的Follower的Term数值很大(每次electionTimeout后成为Candidate都会递增Term),该规则能够令其他server更新自己的Term并引发重新选举,从而携带更大的Term以“接受”这个Follower的加入(如果其余server不更新Term,这个被split的Follower由于Term太大拒绝所有消息,也许永远不能被成功加入系统)。然而,被split的Follower很大概率在log上是落后的,那么在重新选举后一定不会成为Leader,由此引入的重新选举反而导致临时的延迟与可用性问题。

解决方式是在Vote阶段前增加一个尝试阶段:pre-Vote阶段,同时加入新状态preCandidate,并在该状态下限制Term自增,只有当server通过pre-Vote流程后(说明有很大可能成为Leader)转为Candidate,才能自增。这样,当candidate在一个electionTimeout后还没有收到heartbeat,就尝试发送preVote请求(请求中模拟下一次递增过的Term,但本身的Term并不递增),如果有多数集合愿意接受请求(接收server不因preVote改变自身状态):

  1. Term > currentTerm
  2. PreCandidate的log是否up-to-date

满足以上条件才能说明当前server可竞争Leader,这才从PreCandidate状态转为Candidate,递增Term并正式参与选举。对于重新加入、并且log落后的server来说,preVote请求得不到多数回复,其Term并不会增加,当前Leader也不会因为他的重新加入带来不必要的重新选举,其自身在收到heartbeat后能够加入系统并catch up Leader。

Pre-Vote实现

Etcd的事件处理方式

Cluster中每个server的Raft状态机都是由node.run()通过for...select方式驱动的,一旦从node内的propc和recvc通道读出消息,就能将该消息传递通过raft.Step(msg)传递给Raft状态机。raft.Step()既处理请求消息,也处理回复消息,是整个Raft状态机的一个主要入口。raft.Step()实现了Rules for All Servers的逻辑,会在处理消息的过程中改变Raft状态机的状态(ie. Leader/{Pre}Candidate/Follower)并根据Raft状态路由到stepLeader()/stepFollower()/stepCandidate()。

另一方面,Timeout的实现,基本思路是利用周期更短的tick信号进行时间采样,达到每次累加elapsed时间戳,超过timeout时间戳即认为timeout。Raft的tick触发是由外部通过node.tickc触发raft.tick()函数实现的(比如外部可能以100ms的周期往node.tickc中写数据),raft.tick是一个函数引用,根据当前Raft的状态,调用不同的函数,对于Follower和(Pre)Candidate,tick指向raft.tickElection(),对于Leader,tick指向raft.tickHeartbeat()

处理流程

当nodeA作为Follower的eletctionTimeout事件触发后,调用Step(Term:0, Type: MsgHup),按照Step状态机流程处理MsgHup事件,调用campaign(campaign{Pre}Election)参与选举(改变Raft状态为{Pre}Candidate,并给所有peers发送{Pre}Vote)。peers的node.recvc收到由nodeA发送的{Pre}Vote消息后调用raft.Step(m),在处理{Pre}Vote的分支中检查请求是否符合要求、Raft Log是否足够新,接着回复Msg{Pre}VoteResp。nodeA的nodec.recv通道收到来自某个peer的Msg{Pre}VoteResp后,通过node.run调用raft.Step(m),Step状态机流程通过调用raft.step处理Msg{Pre}VoteResp,此时nodeA作为{Pre}Candidate调用的是stepCandidate,在myVoteRespType分支中计算当前{Pre}Vote是否达成Quorum,如果PreVote达成Quorum,则调用campaign(campaignElection)正式发送Vote,如果Vote达成Quorum,则选举成功,转为Leader身份。

代码详解

Etcd将Candidate的状态分为两种,PreCandidate和Candidate,区别在于前者发送PreVote后者发送Vote,并且,PreVote不会自增Term,而Vote需要自增Term。

raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (r *raft) becomeCandidate() {  
r.step = stepCandidate
r.reset(r.Term + 1) // term递增
r.tick = r.tickElection
r.Vote = r.id // 选举自身
r.state = StateCandidate
}

func (r *raft) becomePreCandidate() {
// 注意:preVote不递增term,但是会在请求中自增term去尝试
r.step = stepCandidate
r.votes = make(map[uint64]bool) // 初始化votes,用来收集回复是否达到Quorum
r.tick = r.tickElection
r.lead = None // 不是正式选举,不设置Vote
r.state = StatePreCandidate
}

Follower/{Pre}Candidate在满足一个ElectionTimeout后,给Raft状态机发了MsgHup消息,准备通过raft.Step()参与{Pre}Vote选举。

raft.go
1
2
3
4
5
6
7
8
func (r *raft) tickElection() {
r.electionElapsed++
// 只要当前节点可以被promote为leader,并且选举时间到期
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}

raft.Step()处理Msg{Pre}Vote、Msg{Pre}VoteResp等相关请求与回复的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func (r *raft) Step(m pb.Message) error {
// 根据消息携带的term变化status状态
switch {
case m.Term == 0:
// local message
case m.Term > r.Term:
...
switch {
case m.Type == pb.MsgPreVote: // preVote请求不需要改状态
case m.Type == pb.MsgPreVoteResp && !m.Reject: // preVote接受回复,在后面判断,如果回复达成Quorum,就可以增加term,正式提出Vote请求
default:
// 根据$3.3 转为follower
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
case m.Term < r.Term:
if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
...
} else if m.Type == pb.MsgPreVote {
// 根据pre-Vote算法,收到term更小的preVote,直接拒绝
r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
} else {
// ignore
}
return nil // term < currentTerm,要么忽略要么拒绝,处理完就退出
}

switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
...
// 如果有已commit但未apply的ConfChange entry,不能选举leader
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
return nil
}
if r.preVote {
r.campaign(campaignPreElection) // preVote Option On,发送PreVote参与选举
} else {
r.campaign(campaignElection) // 否则发送Vote参与选举
}
}
case pb.MsgVote, pb.MsgPreVote:
if r.isLearner { return nil } // learner不参与选举
// canVote把Vote和preVote的条件放在了一起,但还不包括update-to-date检验
canVote := r.Vote == m.From || (r.Vote == None && r.lead == None) || // $3.4,对RequestVote的要求
(m.Type == pb.MsgPreVote && m.Term > r.Term) // term大的preVote才能grant
// update to date检验,通过才能grant
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) // grant
// Vote请求成功要更新
if m.Type == pb.MsgVote {
r.electionElapsed = 0
r.Vote = m.From
}
} else {
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
default: // MsgPreVoteResp会走这里
err := r.step(r, m)
if err != nil {
return err
}
}
return nil
}

投票过程campaign实现了向所有peers发送preVote或Vote请求的逻辑,注意投票类型决定了当前Candidate的类型是PreCandidate还是Candidate,前者会发送preVote后者发送Vote,。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// campaign包含了preVote和Vote,根据t的类型选择  
func (r *raft) campaign(t CampaignType) {
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
r.becomePreCandidate() // 不会递增term
voteMsg = pb.MsgPreVote
term = r.Term + 1 // 请求的时候需要模拟递增过的term
} else {
r.becomeCandidate() // 已经递增了term
voteMsg = pb.MsgVote
term = r.Term
}
// 自己给自己投票,r.votes[r.id] = v,并统计r.votes中为true的个数,如果达到Quorum,说明是single-node cluster,直接返回
if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
if t == campaignPreElection {
r.campaign(campaignElection) // 如果preVote成功,进行Vote
} else {
r.becomeLeader() // Vote成功,转为leader
}
return
}
// 非single-node cluster,给所有的peer发送(pre)Vote
for id := range r.prs {
if id == r.id {
continue // 排除自身
}
// 论文3.10,follower收到old leader发出的TimeoutNow消息后,会调用campaign(campaignTransfer)参与选举(leader transfer)
// (pre)Vote中携带campaignTransfer为了告诉对方自己是target leader
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}

stepLeader和stepFollower都不会处理MsgPreVoteResp,只有stepCandidate会处理,这里{Pre}Candidate检查当前{Pre}Vote的granted回复是否达成Qurom,达成即可参与正式Vote或直接选举成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// candidate有两种状态,普通的Candidate和preCandidate,区别在于前者响应Vote,后者响应preVote 
// invariant: m.Term > r.Term
func stepCandidate(r *raft, m pb.Message) error {
// 根据状态决定Vote的响应类型
var myVoteRespType pb.MessageType
if r.state == StatePreCandidate {
myVoteRespType = pb.MsgPreVoteResp
} else {
myVoteRespType = pb.MsgVoteResp
}
switch m.Type {
case pb.MsgProp:
...
case pb.MsgApp:
...
case pb.MsgHeartbeat:
...
case pb.MsgSnap:
...
case myVoteRespType:
// m.From加入r.votes,并统计目前接受到的vote个数
gr := r.poll(m.From, m.Type, !m.Reject)
switch r.quorum() {
case gr: // 达成Quorum
// 如果是preVote granted,就进行Vote流程
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
// Vote granted,转为leader
r.becomeLeader()
r.bcastAppend()
}
case len(r.votes) - gr: // 收到多数拒绝,直接转follower
r.becomeFollower(r.Term, None) // Term不变,根据后面Candidate的状态会递增Term
}
case pb.MsgTimeoutNow:
...
}
return nil
}

参考资料:
[1] Diego Ongaro. 2014. Consensus: Bridging Theory And Practice.
[2] Etcd source code.