MIT 6.5840 Lab 3A: Raft - leader election (2025)

Posted on Apr 7, 2025

前言

今年 Raft lab 的测试新增了一个功能,可以在测试失败时输出下面这样的 HTML 文件,帮助你确认在什么情况下出现了什么问题。

当然,光靠这个并不能帮你 debug 代码中的问题,关于如何实现更好的 debug 工具,请看这篇文章

题目

Implement Raft leader election and heartbeats (AppendEntries RPCs with no log entries). The goal for Part 3A is for a single leader to be elected, for the leader to remain the leader if there are no failures, and for a new leader to take over if the old leader fails or if packets to/from the old leader are lost. Run go test -run 3A to test your 3A code.

Lab 3A 只需要实现 leader election,leader 能够在合理的时间内被选出来并且维持 leader 状态,并且在 leader 出问题后新的 leader 能及时被选出。

Election timeout

一个让人头疼的问题是 election timeout 的管理,我们从这里开始讲起。election timeout 有两个含义,分别是 follower 在变为 candidate 并开始 election 前等待的时间,以及 candidate 在开始下一轮 election 前等待的时间。

等待超时与重置计时器

首先要考虑的是什么时候需要等待超时,什么时候需要重置计时器。根据 Figure 2 中的相关内容:

All Servers:

  • If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)

Followers (§5.2):

  • If election timeout elapses without receiving AppendEntries RPC from current leader or granting vote to candidate: convert to candidate

Candidates (§5.2):

  • On conversion to candidate, start election:
    • Reset election timer
  • If AppendEntries RPC received from new leader: convert to follower
  • If election timeout elapses: start new election

虽然只明确提到了 follower 变为 candidate 并且开始 election 时要重置计时器,但是 follower 在 election timeout 时间内没收到指定的消息(AppendEntries RPC from current leader or granting vote to candidate)会变为 candidate 并开始 election,这意味着收到了这些消息会抑制 follower 变为 candidate,也是需要重置计时器的。另外,所有的 server 只要在 RPC 中碰到了比自己更高的 term,都需要转化为 follower 状态(包括本身就是 follower 的情况),这其中也暗含着需要重置计时器的要求。

根据以上描述,我们可以得出结论:

  • follower 超时会变为 candidate 并开始 election;candidate 超时还没成为 leader 并且也没退回成 follower 要发起下一轮 election
  • follower 变为 candidate 并且开始 election 时要重置计时器;follower 给 candidate 投票时需要重置计时器;follower 或者 candidate 收到来自 leader 的 AppendEntries RPC 时需要重置计时器;任何 server 在碰到更高的 term 时需要变为 follower 并重置计时器

对 election timeout 的管理会分布在不同的 goroutine 中,需要用锁或者 channel 的方式进行保护。最开始的想法是利用time.Timer类型去实现超时处理和计时器重置,但是 lab 页面不建议使用,所以就没再往这个方向考虑。

Hint: You’ll need to write code that takes actions periodically or after delays in time. The easiest way to do this is to create a goroutine with a loop that calls time.Sleep(); see the ticker() goroutine that Make() creates for this purpose. Don’t use Go’s time.Timer or time.Ticker, which are difficult to use correctly.

lab guidance 页面中有个链接给出了一种解决办法:

The management of the election timeout is a common source of headaches. Perhaps the simplest plan is to maintain a variable in the Raft struct containing the last time at which the peer heard from the leader, and to have the election timeout goroutine periodically check to see whether the time since then is greater than the timeout period. It’s easiest to use time.Sleep() with a small constant argument to drive the periodic checks. Don’t use time.Ticker and time.Timer; they are tricky to use correctly.

在实现完两种方案以后,我发现他们各有千秋。接下来的内容基于上面这个方案,在本文最后,会给出基于time.Timer的实现。

设置超时时间大小

不合理的超时时间会影响系统的可用性。election timeout 如果比 heartbeat 的发送间隔小的话,leader 还没来得及发送 heartbeat 就会收到其他 candidate 的 RequestVote 并变为 follower 状态,这样的系统只会不停重复选举 leader,是完全不可用的;但是 election timeout 又不能太大,否则就不能在 leader 出问题的时候及时选出新的 leader,也会影响系统的可用性。论文 5.6 节给出的公式对于系统的超时时间设置有一定参考价值。 $$ broadcastTime \ll electionTimeout \ll MTBF $$

论文中设置的 election timeout 是 150 到 300 毫秒,这样的设置在 heartbeat 每 10 毫秒进行发送的频率下比较合理。但是 lab 要求每秒最多发十次 heartbeat,所以 election timeout 必须设置得大一些,但也不能太大,要保证能在五秒内选出 leader。

Hint: The paper’s Section 5.2 mentions election timeouts in the range of 150 to 300 milliseconds. Such a range only makes sense if the leader sends heartbeats considerably more often than once per 150 milliseconds (e.g., once per 10 milliseconds). Because the tester limits you tens of heartbeats per second, you will have to use an election timeout larger than the paper’s 150 to 300 milliseconds, but not too large, because then you may fail to elect a leader within five seconds.

因为 election timeout 是随机数,极端情况下不同的 candidate 可能会多次随机到接近的值,导致不停发起新的 election,无法在五秒内选出 leader。把上限设置得低一些可以多支持几轮 election 的机会,把上下限差值设置得大一些可以尽量避免随机到接近的值。

const (
	// Because the tester limits you tens of heartbeats per second, you will
	// have to use an election timeout larger than the paper's 150 to 300
	// milliseconds, but not too large, because then you may fail to elect
	// a leader within five seconds.
	ELECTIONTIMEOUTLEAST = 700 * time.Millisecond
	ELECTIONTIMEOUTMOST  = 1200 * time.Millisecond
)

为了方便调用,我们定义一个函数,用来生成设定范围的随机时间。

func getElectionTimeout() time.Duration {
	return ELECTIONTIMEOUTLEAST + (time.Duration(rand.Int63()) % (ELECTIONTIMEOUTMOST - ELECTIONTIMEOUTLEAST))
}

计时器实现

Raft结构体中设置一个表示计时器开始时间的变量。

type Raft struct {
	...
	startTime time.Time
	...
}

func Make(peers []*labrpc.ClientEnd, me int,
	persister *tester.Persister, applyCh chan raftapi.ApplyMsg) raftapi.Raft {
	rf := &Raft{}
	...
	rf.startTime = time.Now()
	...
	return rf
}

判断超时就是看当前时间和这个变量的差值是否超过了 election timeout。

electionTimeout := getElectionTimeout()
rf.mu.Lock()
shouldBeginElection := time.Since(rf.startTime) > electionTimeout
rf.mu.Unlock()

重置计时器就是重新把这个变量设置为当前时间。

rf.mu.Lock()
rf.startTime = time.Now()
rf.mu.Unlock()

Server 生命周期

server 启动后默认以 follower 状态运行,直到发生故障前,一直都在 follower、candidate、leader 三种状态之间相互转化,同一时刻只可能处于其中一种状态,整个逻辑可以用ticker方法来表示。

从 follower 状态开始,监测到 election timeout 之后成为 candidate 并开始 election,election 失败就退回 follower(其他 server 成为 leader)或者继续保持 candidate 状态并等待下一次 election(这轮没有 leader);election 成功就成为 leader 并发送 heartbeats 维持 leader 状态,leader 出问题就退回 follower 状态,等待下一次 election。

func Make(peers []*labrpc.ClientEnd, me int,
	persister *tester.Persister, applyCh chan raftapi.ApplyMsg) raftapi.Raft {
	rf := &Raft{}
	...
	rf.state = FOLLOWER
	...

	// start ticker goroutine to start elections
	go rf.ticker()

	return rf
}
func (rf *Raft) ticker() {
	for rf.killed() == false {

		// Your code here (3A)
		// Check if a leader election should be started.
		electionTimeout := getElectionTimeout()
		rf.mu.Lock()
		shouldBeginElection := time.Since(rf.startTime) > electionTimeout
		rf.mu.Unlock()

		if shouldBeginElection {
			isLeader := rf.beginElection()
			if isLeader {
				rf.sendHeartbeats()
			}
		}

		// pause for a random amount of time between 50 and 350
		// milliseconds.
		ms := 50 + (rand.Int63() % 300)
		time.Sleep(time.Duration(ms) * time.Millisecond)
	}
}

需要注意的是这种等待一段时间然后循环检测是否超时的方法,实际的 election timeout 会比设置的时间稍微长一点,不过影响不大。

对于 election 失败以及 leader 出问题的情况的处理分别包含在rf.beginElection()以及rf.sendHeartbeats()这两个方法中。

Raft 数据结构和初始化

接下来我们先定义一下 lab3A 需要用到的字段,方便后面的代码编写。

const (
	FOLLOWER  = "FOLLOW" // 便于调试,你也可以直接用 iota
	CANDIDATE = "CANDID"
	LEADER    = "LEADER"
)

type LogEntry struct {
	Command interface{}
	Term    int
}

type Raft struct {
	...
	// Persistent state on all servers
	currentTerm int         // latest term server has seen
	votedFor    int         // candidateId that received vote in current term (or -1 if none)
	log         []*LogEntry // log entries

	// Volatile state on all servers
	state           string             // the server's state
	startTime       time.Time          // the time an election timeout starts
	cancelElection  context.CancelFunc // can be used to cancel the election immediately
	cancelHeartbeat context.CancelFunc // can be used to cancel the heartbeat immediately
	...
}

func Make(peers []*labrpc.ClientEnd, me int,
	persister *tester.Persister, applyCh chan raftapi.ApplyMsg) raftapi.Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me

	// Your initialization code here (3A, 3B, 3C).
	rf.votedFor = -1
	rf.state = FOLLOWER
	rf.startTime = time.Now()

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())

	// start ticker goroutine to start elections
	go rf.ticker()

	return rf
}

votedFor-1表示当前 term 还没有投过票。cancelElectioncancelHeartbeat分别用来立即取消 candidate 的 election 以及 leader 的 heartbeat,这可能是因为发现了更高的 term,需要立即变为 follower 状态。

有了上面的定义,GetState方法直接读取相应字段就行。

// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {

	var term int
	var isleader bool
	// Your code here (3A).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	term = rf.currentTerm
	isleader = (rf.state == LEADER)

	return term, isleader
}

RPC

RequestVoteAppendEntries这两个 RPC 意味着被其他 server 调用的时候需要做的事情,sendRequestVotesendsendAppendEntries则是调用其他 server 的 RPC。

RequestVote

参数和返回值直接按照 Figure 2 进行设置。

Arguments description
term candidate’s term
candidateId candidate requesting vote
lastLogIndex index of candidate’s last log entry (§5.4)
lastLogTerm term of candidate’s last log entry (§5.4)
Results description
term currentTerm, for candidate to update itself
voteGranted true means candidate received vote
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
type RequestVoteArgs struct {
	// Your data here (3A, 3B).
	Term         int // candidate's term
	CandidateId  int // the candidate who is requesting vote
	LastLogIndex int // index of candidate's last log entry
	LastLogTerm  int // term of candidate's last log entry
}

// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {
	// Your data here (3A).
	Term        int  // the receiver's currentTerm, for candidate to update itself
	VoteGranted bool // true means candidate received vote
}

需要注意的是 RPC 要求参数和返回值的结构体成员名都用大写开头。

RequestVote方法的实现上,Figure 2 写得比较简洁:

Receiver implementation:

  1. Reply false if term < currentTerm (§5.1)
  2. If votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
  1. candidate 的 term 比当前收到 RPC 的 server 的 currentTerm 小的时候直接返回 false
  2. 如果本任期还没有投票或者已经给这个 candidate 投过了票,并且 candidate 的 log 至少和当前 server 一样新的时候,投票给 candidate

第二条中已经给这个 candidate 投过票还继续投的原因是可能投票之后这个 server 就挂掉了,candidate 没有收到 RPC 的返回,然后又重新发起 RPC 调用,这时候 server 又上线了。

实际还有一些细节需要注意,比如收到的 RPC 中的 term 比自己的 currentTerm 大的时候要把 currentTerm 设置为 RPC 中的 term,并且把状态切换为 follower,同时也不要忘了重置计时器。

IF RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)

用 early return 的形式,如果当前 term 已经给 candidate 以外的 server 投了票就直接返回。

我们把lastLogIndexlastLogTerm置为-1表示 log 中还没有 logentry,在比较谁的 log 更新的时候要严格按照论文中的描述:

Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs. If the logs have last entries with different terms, then the log with the later term is more up-to-date. If the logs end with the same term, then whichever log is longer is more up-to-date.

投票完之后还需要重置一下计时器。

在 RPC 返回之前还需要持久化一下状态,目前直接调用给定的persist方法就行。

// example RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (3A, 3B).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	// "If a server receives a request with a stale term number, it rejects the
	// request."
	if rf.currentTerm > args.Term {
		reply.Term = rf.currentTerm
		reply.VoteGranted = false
		return
	}

	// "If one server's current term is smaller than the other's, then it updates
	// its current term to the larger value. If a candidate or leader discovers
	// that its term is out of date, it immediately reverts to follower state."
	if rf.currentTerm < args.Term {
		rf.currentTerm = args.Term
		rf.votedFor = -1
		rf.state = FOLLOWER
		// reset election timer if you discovers that your term is out of date
		rf.startTime = time.Now()

		if rf.cancelElection != nil {
			rf.cancelElection()
		}

		if rf.cancelHeartbeat != nil {
			rf.cancelHeartbeat()
		}
	}

	// "Each server will vote for at most one candidate in a given term, on a
	// first-come-first-served basis."
	if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
		reply.Term = rf.currentTerm
		reply.VoteGranted = false
		return
	}

	// "The RPC includes information about the candidate's log, and the voter
	// denies its vote if its own log is more up-to-date than that of the candidate."
	lastLogIndex := len(rf.log) - 1
	var lastLogTerm int
	if lastLogIndex == -1 {
		lastLogTerm = -1
	} else {
		lastLogTerm = rf.log[lastLogIndex].Term
	}
	if lastLogTerm > args.LastLogTerm ||
		(lastLogTerm == args.LastLogTerm && lastLogIndex > args.LastLogIndex) {
		reply.Term = rf.currentTerm
		reply.VoteGranted = false
		return
	}

	reply.Term = rf.currentTerm
	reply.VoteGranted = true
	rf.votedFor = args.CandidateId
	// reset election timer if you grant a vote to another peer
	rf.startTime = time.Now()

	// "Updated on stable storage before responding to RPCs."
	rf.persist()
}

AppendEntries

因为只用来实现 heartbeats,所以参数和返回值都按最简单的来设置:

type AppendEntriesArgs struct {
	Term         int // leader's term
	LeaderId     int // so follower can redirect clients
	PrevLogIndex int // index of log entry immediately preceding new ones
	PrevLogTerm  int // term of PrevLogIndex entry
}

type AppendEntriesReply struct {
	Term int // currentTerm, for leader to update itself
}

AppendEntries方法前面部分基本上跟RequestVote差不多,最后也要重置计时器以及持久化状态。

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	// "If a server receives a request with a stale term number, it rejects the
	// request."
	if rf.currentTerm > args.Term {
		reply.Term = rf.currentTerm
		reply.Success = false
		return
	}

	// "If one server's current term is smaller than the other's, then it updates
	// its current term to the larger value. If a candidate or leader discovers
	// that its term is out of date, it immediately reverts to follower state."
	if rf.currentTerm < args.Term {
		rf.currentTerm = args.Term
		rf.votedFor = -1
		rf.state = FOLLOWER
		// reset election timer if you discovers that your term is out of date
		rf.startTime = time.Now()
	}

	reply.Term = rf.currentTerm

    // reset election timer if you get an AppendEntries RPC from the current leader
	rf.startTime = time.Now()

	if rf.cancelElection != nil {
		rf.cancelElection()
	}

	if rf.cancelHeartbeat != nil {
		rf.cancelHeartbeat()
	}

	// "Updated on stable storage before responding to RPCs."
	rf.persist()
}

调用其他 server 的方法直接模仿sendRequestVote即可:

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
	ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
	return ok
}

选举和心跳

Lab 3A 的实现中最复杂的部分就是ticker中的beginElectionsendHeartbeats方法了,并且这两个方法有一些相似的部分,值得先讨论一下。

在这两个方法中,都需要启动一些 goroutine,并且根据这些 goroutine 的执行结果来进行不同的操作。有时候不用等所有的 goroutine 执行完就可以直接返回(收到了大多数的投票或者心跳得到了大多数的确认);有时候某个 goroutine 的结果就可以导致直接返回(发现了更高的 term);有时候 RPC 执行太慢需要进行超时退出(election timeout 和 heartbeat timeout)。我们需要一种结构来处理所有这些情况。

最明显的特征是主 goroutine 可以通知子 goroutine 退出,反之亦然。我选择用Context来实现这个功能,并且用select来进行超时、提前退出、结果汇总的多路处理,关键代码如下:

ctx, cancel := context.WithCancel(context.Background())
countCh := make(chan bool)
for i := range rf.peers {
	if i != rf.me {
		go func(peer int) {
			...
			doneCh := make(chan bool, 1)
			go func() {
				ok := rf.sendRPC(peer, &args, &reply)
				if ok {
					if reply.xxx {
						doneCh <- true
					} else {
						doneCh <- false
					}
				} else {
					doneCh <- false
				}
			}()

			select {
			case <-ctx.Done():
				return
			case result := <-doneCh:
				countCh <- result
				return
			}
		}(i)
	}
}

timeout := time.NewTimer(xxx)
for {
	select {
	case <-timeout.C:
		return
	case <-ctx.Done():
		return
	case result := <-countCh:
		...
	}
}

注意代码中使用了一个缓存为 1 的 channel,并且单独开了一个 goroutine 来调用 RPC 并且发送结果到这个 channel。如果不单独开个 goroutine 的话就需要在发送 RPC 结果的同时监控Context,需要在每一个发送结果的地方都进行 select 操作。

ok := rf.sendRPC(peer, &args, &reply)
if ok {
	select {
	case <-ctx.Done():
	case countCh <- true:
	}
} else {
	select {
	case <-ctx.Done():
	case countCh <- false:
	}
}

单独开 goroutine 的方式写法上比较简洁,但是Context被 cancel 之后单独开的 goroutine 可能还需要运行一段时间才会退出,而外层的位于 doneCh 接收侧的 goroutine 可能直接退出了,这就是为什么需要一个 buffered channel 来避免 goroutine leak。

选举

除了上面说到的直接返回的情况,选举过程中还有一种直接返回的情况是收到了 leader 的心跳。

While waiting for votes, a candidate may receive an AppendEntries RPC from another server claiming to be leader. If the leader’s term (included in its RPC) is at least as large as the candidate’s current term, then the candidate recognizes the leader as legitimate and returns to follower state.

要实现这个功能依然可以借助于上面提到的Context实现,不过为了在其他 goroutine 中也能调用 cancel function,我们需要在Raft中额外增加一个字段:

type Raft struct {
	...
	cancelElection context.CancelFunc // can be used to cancel the election immediately
}

然后在beginElection中初始化这个字段:

func (rf *Raft) beginElection() bool {
	rf.mu.Lock()
	...
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	rf.cancelElection = cancel
	...
	rf.mu.Unlock()
	...
}

在收到心跳之后就可以调用rf.cancelElection()来取消正在进行的 election:

if rf.cancelElection != nil {
	rf.cancelElection()
}

完整代码如下:

func (rf *Raft) beginElection() bool {
	rf.mu.Lock()
	rf.state = CANDIDATE
	rf.currentTerm++
	rf.votedFor = rf.me
	rf.startTime = time.Now() // reset election timer
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	rf.cancelElection = cancel
	lastLogIndex := len(rf.log) - 1
	var lastLogTerm int
	if lastLogIndex == -1 {
		lastLogTerm = -1
	} else {
		lastLogTerm = rf.log[lastLogIndex].Term
	}
	currentTerm := rf.currentTerm
	rf.mu.Unlock()

	ticket := 1
	count := 0
	voteCh := make(chan bool)

	for i := range rf.peers {
		if i != rf.me {
			go func(peer int) {
				args := RequestVoteArgs{
					Term:         currentTerm,
					CandidateId:  rf.me,
					LastLogIndex: lastLogIndex,
					LastLogTerm:  lastLogTerm,
				}
				var reply RequestVoteReply

				doneCh := make(chan bool, 1) // use buffered channel to avoid goroutine leak
				go func() {
					ok := rf.sendRequestVote(peer, &args, &reply)
					if ok {
						if reply.Term > args.Term {
							rf.mu.Lock()
							rf.currentTerm = reply.Term
							rf.mu.Unlock()
							cancel()
							return
						}
						if reply.VoteGranted {
							doneCh <- true
						} else {
							doneCh <- false
						}
					} else {
						doneCh <- false
					}
				}()

				select {
				case <-ctx.Done():
					return
				case result := <-doneCh:
					voteCh <- result
					return
				}
			}(i)
		}
	}

	electionTimeout := getElectionTimeout()

	for {
		select {
		case <-ctx.Done():
			rf.mu.Lock()
			rf.state = FOLLOWER
			rf.mu.Unlock()
			return false // find a larger term or receive heartbeats from leader
		case vote := <-voteCh:
			count++
			if vote {
				ticket++
			}
			if ticket > len(rf.peers)/2 {
				rf.mu.Lock()
				rf.state = LEADER
				rf.mu.Unlock()
				return true // win the election
			}
			if count == len(rf.peers)-1 {
				return false // lose the election
			}
		default:
			rf.mu.Lock()
			shouldBeginAnotherElection := time.Since(rf.startTime) > electionTimeout
			rf.mu.Unlock()
			if shouldBeginAnotherElection {
				return false // use `return rf.beginElection()` instead?
			}
		}
	}
}

多种情况调用同一个Context简化了代码,但同时也让调试变得困难了,因为不确定case <-ctx.Done()的时候到底是发现了更大的 term 还是收到了 leader 的心跳。如果对这个方面有要求,可以考虑取消共用Context,发现更大的 term 之后用一个专门的 channel 通知主 goroutine。

关于检测超时的部分,因为是判断时间差值的办法,所以逻辑需要放在default里面,如果用的time.Timer方法这边会简单一些。

另外,检测到超时之后要开始下一轮 election,这边直接返回 false 后控制权回到 ticker方法,短暂睡眠一段时间之后开启下一轮 election,但是感觉用递归才是更直观的选择?

The third possible outcome is that a candidate neither wins nor loses the election: if many followers become candidates at the same time, votes could be split so that no candidate obtains a majority. When this happens, each candidate will time out and start a new election by incrementing its term and initiating another round of RequestVote RPCs.

心跳

heartbeat timeout

很容易忽略的一件事是心跳也有超时,因为 RPC 从调用到返回可能花费较长时间,这个时间可能会超过 election timeout,如果 leader 一直等待 RPC 返回,那么可能其他 server 已经开始 election 甚至选出了新的 leader,这个 leader 还在等待。所以心跳的超时时间一定要小于 election timeout,但是要比前面提到的 broadcastTime 尽量大一些,否则选出来的 leader 会频繁超时退回到 follower 状态。在 RPC 调用正常的情况下,我们每隔一定的时间重复一次,为了满足每秒最多十次心跳的要求,这个时间不能少于 100 毫秒(实际可以稍微小于 100,因为最快的 RPC 调用也需要一些时间)。

const (
	// Heartbeat RPCs should finish within this time. Cannot be too small,
	// beacuse the leader will convert to follower state easily. Cannot be
	// too large, because the leader may not convert to follower just in time
	// when there's a network partition and the leader become disconnected
	// from the other servers.
	HEARTBEATTIMEOUT = 500 * time.Millisecond
	// The tester requires that the leader send heartbeat RPCs no more than
	// ten times per second.
	HEARTBEATINTERVAL = 100 * time.Millisecond
)

心跳只要失败返回都需要变为 follower 状态,我们用一个defer来实现。

在主要结构方面跟上面最大的区别是外面套了一层for循环,因为正常情况下心跳是一个不断重复的过程。另外一个区别是检测超时的部分用time.Timer来实现。

在最后,因为有select的存在我们需要设置 label 才能跳出select外层的for循环。

func (rf *Raft) sendHeartbeats() {
	defer func() {
		rf.mu.Lock()
		rf.state = FOLLOWER
		rf.mu.Unlock()
	}()

	rf.mu.Lock()
	currentTerm := rf.currentTerm
	rf.mu.Unlock()

	for {
		countCh := make(chan bool)
		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()

		for i := range rf.peers {
			if i != rf.me {
				go func(peer int) {
					args := AppendEntriesArgs{
						Term:     currentTerm,
						LeaderId: rf.me,
					}
					var reply AppendEntriesReply

					doneCh := make(chan bool, 1)
					go func() {
						// this may spend a lot of time to finish
						ok := rf.sendAppendEntries(peer, &args, &reply)
						if ok {
							if args.Term < reply.Term {
								rf.mu.Lock()
								rf.currentTerm = reply.Term
								rf.mu.Unlock()
								cancel()
							} else {
								doneCh <- true
							}
						} else {
							countCh <- false
						}
					}()

					select {
					case <-ctx.Done():
						return
					case result := <-doneCh:
						countCh <- result
					}
				}(i)
			}
		}

		success := 0
		count := 0
		// each round of heartbeat should finish within this amount of time
		timeout := time.NewTimer(HEARTBEATTIMEOUT)
		defer timeout.Stop()
	OuterLoop:
		for {
			select {
			case <-timeout.C:
				return
			case <-ctx.Done():
				return
			case result := <-countCh:
				count++
				if result {
					success++
				}
				if count == len(rf.peers)-1 {
					if success < len(rf.peers)/2 {
						return
					} else {
						break OuterLoop
					}
				}
			}
		}
		time.Sleep(HEARTBEATINTERVAL)
	}
}

后记

使用time.Timer的代码结构类似这样:

const (
	ELECTIONTIMEOUTLEAST = 500 * time.Millisecond
	ELECTIONTIMEOUTMOST  = 1000 * time.Millisecond
)

type Raft struct {
	...
	electionTimeout *time.Timer
}

func getElectionTimeout() time.Duration {
	return ELECTIONTIMEOUTLEAST + (time.Duration(rand.Int63()) % ELECTIONTIMEOUTMOST)
}

func Make(peers []*labrpc.ClientEnd, me int,
	persister *tester.Persister, applyCh chan raftapi.ApplyMsg) raftapi.Raft {
	rf := &Raft{}
	...
	rf.electionTimeout = time.NewTimer(getElectionTimeout())
	...
	go rf.ticker()
	return rf
}

func (rf *Raft) ticker() {
	for rf.killed() == false {
		<-rf.electionTimeout.C
		isLeader := rf.beginElection()
		if isLeader {
			rf.sendHeartbeats()
		}
	}
}

func (rf *Raft) resetElectionTimeout() {
	rf.electionTimeout.Reset(getElectionTimeout())
}

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	...
	rf.resetElectionTimeout()
	...
}

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	...
	rf.resetElectionTimeout()
	...
}

func (rf *Raft) beginElection() bool {
	...
	rf.resetElectionTimeout()
	...
	select {
	case <-rf.electionTimeout.C:
		rf.electionTimeout.Reset(0) // 这边很重要,否则会导致 ticker 中 <-rf.electionTimeout.C 阻塞
		return false
	...
	}
}