引言
lab2C的实验要求如下
Complete the functions persist() and readPersist() in raft.go by adding code to save and restore persistent state. You will need to encode (or “serialize”) the state as an array of bytes in order to pass it to the Persister. Use the labgob encoder; see the comments in persist() and readPersist(). labgob is like Go’s gob encoder but prints error messages if you try to encode structures with lower-case field names. For now, pass nil as the second argument to persister.Save(). Insert calls to persist() at the points where your implementation changes persistent state. Once you’ve done this, and if the rest of your implementation is correct, you should pass all of the 2C tests.
lab2D的实验要求如下
Implement Snapshot() and the InstallSnapshot RPC, as well as the changes to Raft to support these (e.g, operation with a trimmed log). Your solution is complete when it passes the 2D tests (and all the previous Lab 2 tests).
总体而言, lab2C需要我们实现关键数据的持久化,lab2D需要我们通过快照实现日志的压缩。代码可以在https://github.com/slipegg/MIT6.824中得到。所有代码均通过了1千次的测试。
lab2C 实现
在实验时测试2C时,测试代码将会尝试将某些节点从网络中断开,然后一段时间后再依据这些断开的节点的持久化的信息重新生成一个新的节点并加入到网络中,测试代码将会检测加入这个节点后是否与预期相同。
在初始化节点的时候,会传入一个Persister对象,这个对象充当一个硬盘的角色,用于持久化数据,后续在测试重新生成节点时,就需要传入旧节点的Persister对象,以便新节点能够从硬盘中读取旧节点的数据进行复原。
参考raft论文,我们需要持久化的数据有:
- currentTerm
- votedFor
- log entries
在raft.go中,我们需要实现persist和readPersist函数,用于持久化和读取数据。
// persist saves Raft's persistent state to stable storage,
func (rf *Raft) persist() {
rf.persister.Save(rf.encodeState(), rf.persister.snapshot)
}
func (rf *Raft) encodeState() []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.logs)
return w.Bytes()
}
// readPersist restores previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
var currentTerm int
var votedFor int
var logs []LogEntry
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
if d.Decode(¤tTerm) != nil ||
d.Decode(&votedFor) != nil ||
d.Decode(&logs) != nil {
Debug(dError, "S%v failed to read persist", rf.me)
} else {
Debug(dInfo, "S%v read persist successfully", rf.me)
rf.currentTerm = currentTerm
rf.votedFor = votedFor
rf.logs = logs
rf.lastApplied = rf.getFirstIndex()
rf.commitIndex = rf.getFirstIndex()
}
}
然后我们需要在每次修改了持久化数据的地方调用persist函数,然后在初始化节点时调用readPersist函数来读取持久化数据,整体难度不大。
lab2D 实现
在实验时测试2D时,测试代码在接收到apply的命令id为9结尾时,就会调用节点的Snapshot函数进行快照,将日志压缩。代码需要做到在压缩日志后,仍然能够准确地运行。
首先需要完成快照生成的函数,如下所示,每次会传入需要快照到的日志index,以及当这个节点为止的状态机的快照数据,系统保证传入的日志index一定是已经apply过的。由于已经将状态机的内容放入到了snapshot中,所以其实包括index在内的前面的所有日志都可以删除了,但是由于在同步日志信息时,需要上一个日志的term信息,所以我们会单独保留id为index的日志的id和term信息,放在logs的第一位。
func (rf *Raft) Snapshot(index int, snapshot []byte) {
rf.mu.Lock()
defer rf.mu.Unlock()
if index <= rf.getFirstIndex() {
Debug(dSnap, "S%v ignores the snapshot request with end index %v, because the index is not bigger than the first index %v", rf.me, index, rf.getFirstIndex())
return
}
rf.logs = append([]LogEntry{{index, rf.logs[index-rf.getFirstIndex()].Term, nil}}, rf.logs[index-rf.getFirstIndex()+1:]...)
rf.persister.Save(rf.encodeState(), snapshot)
Debug(dSnap, "S%v applies the snapshot with end index %v, now the len(logs)=%v", rf.me, index, len(rf.logs))
}
由于快照的引入,现在logs中的第一个日志可能不再是0了,所以之前代码中所有从logs中依据日志index获取日志的代码都要修改为:rf.logs[index-rf.getFirstIndex()]
。
同时快照的引入还会导致在leader与follower进行日志同步时,需要的同步的日志可能已经没有了,所以这时候需要直接将整个日志发送给对方。
需要发送的快照请求如下:
func (rf *Raft) genInstallSnapshotRequest() *InstallSnapshotRequest {
return &InstallSnapshotRequest{
Term: rf.currentTerm,
LeaderId: rf.me,
LastIncludeIndex: rf.getFirstIndex(),
LastIncludeTerm: rf.logs[0].Term,
Data: rf.persister.ReadSnapshot(),
}
}
follower接收到快照请求后,需要进行如下处理,主要就是检查这个快照有没有过期,是不是真的比自己当前commit的日志还要新,如果是的话,就将自己的日志全部删除,只保留快照中给的最后一个日志,作为logs中的第一个日志,然后再唤起applyCond进行快照的apply。
func (rf *Raft) InstallSnapshot(request *InstallSnapshotRequest, reply *InstallSnapshotReply) {
rf.mu.Lock()
Debug(dSnap, "S%v {term: %v, commitIndex: %v}, received from S%v with InstallSnapshotRequest {%v} ", rf.me, rf.currentTerm, rf.commitIndex, request.LeaderId, request)
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if request.Term < rf.currentTerm {
return
}
if request.Term > rf.currentTerm {
rf.currentTerm = request.Term
rf.votedFor = -1
rf.persist()
}
rf.changeState(Follower)
if request.LastIncludeIndex <= rf.commitIndex {
return
}
rf.persister.Save(rf.encodeState(), request.Data)
rf.commitIndex = request.LastIncludeIndex
rf.logs = []LogEntry{{request.LastIncludeIndex, request.LastIncludeTerm, nil}} //2D遇到的bug所在
Debug(dSnap, "S%v installs snapshot from S%v, now the commitIndex is %v", rf.me, request.LeaderId, rf.commitIndex)
rf.waitApplySnapshotRequest = *request
rf.applyCond.Signal()
}
如果leader接收到回复表示快照已经更新成功了,那么就更新这个节点的nextIndex和matchIndex。
func (rf *Raft) handleInstallSnapshotReply(peer int, request *InstallSnapshotRequest, reply *InstallSnapshotReply) {
if reply.Term > rf.currentTerm {
rf.changeState(Follower)
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.persist()
Debug(dWarn, "S%v found higher term %v in InstallSnapshotReply %v from S%v, changes to follower", rf.me, reply.Term, reply, peer)
} else {
rf.nextIndex[peer] = request.LastIncludeIndex + 1
rf.matchIndex[peer] = request.LastIncludeIndex
Debug(dLog, "S%v has installed snapshot to S%v, now the S%v's nextIndex is %v", rf.me, peer, peer, rf.nextIndex[peer])
rf.updateCommitIndexForLeader()
}
}
注意为了能够有序地进行快照的apply,对原本的applier函数进行了修改,同时增加了waitApplySnapshotRequest来记录最新需要apply的快照请求。
其主要思想是每次唤起applyCond时,先检查是否有新的快照请求,即waitApplySnapshotRequest的Term是否为-1,如果不为-1,那么就进行快照的apply,快照apply了之后再把waitApplySnapshotRequest的Term设置为-1。如果没有新的快照请求,那么就进行日志的apply。
func (rf *Raft) applier() {
for !rf.killed() {
rf.mu.Lock()
for rf.lastApplied >= rf.commitIndex {
rf.applyCond.Wait()
}
if rf.waitApplySnapshotRequest.Term != -1 {
if rf.lastApplied < rf.waitApplySnapshotRequest.LastIncludeIndex {
rf.mu.Unlock()
rf.applyCh <- ApplyMsg{ //Question: two applyCh update way, how to update orderly?
SnapshotValid: true,
Snapshot: rf.waitApplySnapshotRequest.Data,
SnapshotTerm: rf.waitApplySnapshotRequest.LastIncludeTerm,
SnapshotIndex: rf.waitApplySnapshotRequest.LastIncludeIndex,
}
rf.mu.Lock()
rf.lastApplied = rf.waitApplySnapshotRequest.LastIncludeIndex
Debug(dSnap, "S%v applies snapshot from S%v, now the lastApplied is %v", rf.me, rf.waitApplySnapshotRequest.LeaderId, rf.lastApplied)
}
rf.waitApplySnapshotRequest = InstallSnapshotRequest{Term: -1}
rf.mu.Unlock()
} else {
commitIndex, lastApplied := rf.commitIndex, rf.lastApplied
if rf.getFirstIndex() != 0 && lastApplied+1-rf.getFirstIndex() <= 0 {
Debug(dWarn, "S%v has no log to apply, because lastApplied %v < firstIndex %v", rf.me, lastApplied, rf.getFirstIndex())
rf.mu.Unlock()
continue
}
entries := make([]LogEntry, commitIndex-lastApplied)
Debug(dInfo, "S%v pre to apply log entries. LastApplied: %v, FirstIndex: %v, commitIndex: %v)",
rf.me, lastApplied, rf.getFirstIndex(), commitIndex)
copy(entries, rf.logs[lastApplied+1-rf.getFirstIndex():commitIndex+1-rf.getFirstIndex()])
rf.mu.Unlock()
for _, entry := range entries {
rf.applyCh <- ApplyMsg{
CommandValid: true,
Command: entry.Command,
CommandIndex: entry.Index,
CommandTerm: entry.Term,
}
}
rf.mu.Lock()
Debug(dInfo, "S%v finishes applying log entries(startId: %v, length: %v), now rf.lastApplied = %v",
rf.me, lastApplied+1, len(entries), rf.lastApplied)
rf.lastApplied = commitIndex
rf.mu.Unlock()
}
}
}
问题记录
当时写的时候也感觉不是特别复杂,但是后面测试的时候发现这里还是有很多需要注意的点,容易导致错误。快照的引入导致的一个重要的问题是我们现在有两种方式来更新状态机的数据,一种是通过日志的apply,一种是通过快照的apply。
一开始的写法是在接收到快照请求进行InstallSnapshot的处理的时候新起了一个go协程来直接对快照进行apply,但是这会导致一系列的问题。
一开始我们对这两者的并发做什么限制,那么这就有可能出现下面这种情况:
- follower节点接受到快照同步请求,并且开启一个协程开始进行快照的apply
- 在快照的apply之前,follower节点接收到下一个日志的同步的请求,开始进行日志的apply
这两个apply的顺序其实是不确定的,很有可能就会出现先进行日志的apply,然后再进行快照的apply,这样就会导致状态机的数据不一致,所以需要控制在快照进行apply的时候,不允许进行日志的apply。
然后我采用的方法是控制节点的lastApplied值,即在开启协程进行快照的apply前将lastApplied值设置为-1,然后在快照的apply结束后再将lastApplied设置为快照的index值,然后在日志进行apply的时候,对lastApplied进行判断,如果lastApplied值为-1,那么就进行锁等待,直到lastApplied值不为-1,然后再进行日志的apply。但是这种方法在测试的时候会发现,进行1000次测试大约会有0~3次的可能出现错误,错误的原因是在进行日志的apply的时候,需要apply的日志已经在logs中没有了,导致了取值的错误,也就是并发控制没有成功,在进行了快照的apply后,日志的apply依旧在进行。
经过debug发现这是由于出现了如下这种情况:
- followe节点接收到日志同步的请求,开启一个协程进行日志的apply
- leader节点已经进行了快照,然后由于超时又给该follower节点发送了日志同步的请求
- follower节点接收到快照同步的请求,设置lastApplied为-1,然后开启一个协程进行快照的apply
- follower节点结束了日志的apply,将lastApplied设置为日志的index,然后follower节点继续检查,发现lastApplied不为-1,且lastApplied小于commitIndex,所以继续进行日志的apply,然后在logs中取日志时发现该日志已经没有了,导致错误。
所以通过lastApplied进行并发控制并不可行,最后采用的方法是添加了snapApplyCount变量,每次在进行快照的apply时,将snapApplyCount加1,快照的apply结束后将snapApplyCount减1,然后在进行日志的apply时,如果snapApplyCount不为0,那么就进入锁等待。
注意在完成快照的apply后,有可能节点已经接收到了leader同步来的其他日志,所以需要在结束后检查是否有新的日志需要apply,如果需要就唤起日志的apply。最后处理快照同步请求的代码如上述的InstallSnapshot所示,日志apply的代码如下:
func (rf *Raft) applier() {
for !rf.killed() {
rf.mu.Lock()
for rf.snapApplyCount != 0 || rf.lastApplied >= rf.commitIndex {
rf.applyCond.Wait()
}
commitIndex, lastApplied := rf.commitIndex, rf.lastApplied
if rf.getFirstIndex() != 0 && lastApplied+1-rf.getFirstIndex() <= 0 {
rf.mu.Unlock()
continue
}
entries := make([]LogEntry, commitIndex-lastApplied)
Debug(dInfo, "S%v pre to apply log entries. LastApplied: %v, FirstIndex: %v, commitIndex: %v)",
rf.me, lastApplied, rf.getFirstIndex(), commitIndex)
copy(entries, rf.logs[lastApplied+1-rf.getFirstIndex():commitIndex+1-rf.getFirstIndex()])
rf.mu.Unlock()
for _, entry := range entries {
rf.applyCh <- ApplyMsg{
CommandValid: true,
Command: entry.Command,
CommandIndex: entry.Index,
CommandTerm: entry.Term,
}
}
rf.mu.Lock()
Debug(dInfo, "S%v finishes applying log entries(startId: %v, length: %v), now rf.lastApplied = %v",
rf.me, lastApplied+1, len(entries), rf.lastApplied)
rf.lastApplied = commitIndex
rf.mu.Unlock()
}
}
但是上述方法后面经过测试发现也还是有少量的bug,bug的主要原因在于如下这种情况:
- follower节点接收到最后日志为x的快照同步请求,开启一个协程进行快照的apply
- follower节点又接收到最后日志为x+10的快照同步请求,开启一个协程进行快照的apply
- follower先完成了x+10的快照的apply,然后才完成了x的快照的apply,但是这时候它会将lastApplied设置为x,同时apply的顺序也出现了错误。
纵观上面的问题的一大根源在于我们出现了多个apply的协程,而没有对协程进行很好的并发控制,所以最后采取了上述的发型,将所有的apply都放在一个协程中进行,优先进行快照的apply,进测试可以准确地通过。
实验结果
最终对lab2中所有的测试进行了1000次的测试,全部通过。
总结
整个lab2中感觉难度最大的还是lab2B,因为需要实现的功能比较多,需要多多参考raft论文中的论文,最为印象深刻的就是lab2D中的并发问题了,这种问题确实在一开始实现的时候比较难想到,需要通过实验发现,而这种1000次测试才出现一两次错误的问题就更加难发现了,需要有全面的日志记录和多次重复实验的系统才行,后面有机会也分享一下有关日志记录和重复实验相关的内容。