「实验记录」MIT 6.824 Raft Lab2B Log Replication

#Lab2B - Log Replication

  • I. Source
  • II. My Code
  • III. Motivation
  • IV. Solution
    • S1 - leader上任即初始化
    • S2 - leader发送AppendEntries
    • S3 - follower接收AppendEntries
    • S4 - leader收到AppendEntries 回信
    • S5 - candidate选举限制
    • S6 - defs.go约定俗成和实现Start()
  • V. Result

I. Source

  1. MIT-6.824 2020 课程官网
  2. Lab2: Raft 实验主页
  3. simviso 精品付费翻译 MIT 6.824 课程
  4. Paper - Raft extended version

II. My Code

  1. source code 的 Gitee 地址
  2. Lab2B: Log Replication 的 Gitee 地址

课程官网提供的 Lab 代码下载地址,我没有访问成功,于是我从 Github 其他用户那里 clone 到干净的源码,有需要可以访问我的 Gitee 获取

III. Motivation

提出 Raft 的主要目的,是为了解决容错问题,即使集群中有一些机器发生了故障,也不影响整体的运作(对外提供的服务)

我用一个 demo 来说明,假设我们的需求一直都是自己的 PC 能够顺利访问云端的资源(HTTP 或数据库)服务器。在服务器稳定在线的情况下,我们去访问它,一点问题都没有

但是,如果那唯一的一台服务器掉线了,那么我们将无法再访问,即对外的服务到此停止。这是我们无法忍受的,我们希望提供服务的一方能够保持稳定,时时刻刻为我提供访问服务。这就是我们的需求

好,现在问题摆在眼前,提供服务的一方怎样保证稳定性?让唯一的那台服务器永远维持稳定的状态,不允许宕机?这非常地不现实,就好比让一个人练成金刚不坏之身

所以,我们只能琢磨是否可以通过添加服务器的数量来确保对外服务的稳定。更近一步,即是现在服务器不再只有一台,扩充到 3 台,这 3 台中有一台是 primary 服务器,也主要由它对外提供服务;其他 2 台是 secondary 服务器(后备力量),拥有和 primary 服务器相同的数据内容

在 primary 服务器出现故障的时候,secondary 服务器顶上去,替代它的位置。这样就可以保持稳定的对外服务了

这就是我们应对资源服务器崩溃的最常用最有效的法子,但是想实现这个想法,首先要解决数据同步的问题,即如何确保 secondary 服务器拥有和 primary 服务器同样的内容?

这个同步问题,在学术上被称为共识算法,最经典的共识算法是 Paxos,但是它太难理解了。于是,斯坦福那帮人想出了更为简便的共识算法,即 Raft

通过 Raft 算法就可以同步集群中服务器的内容。要实现该算法,分三步走,5 - The Raft consensus algorithm 章节中的 Leader Election、Log Replication 和 Safety

本文主要针对第二步,Lab2B: Log Replication 展开讲解,如有 Lab2A: Leader Election 的需要,请移步

IV. Solution

在选举出 leader 之后,集群就可以正式开始对外提供服务了。不论什么类型的服务(数据库的 CRUD…),归根结底都可以变成读写操作 Write/Read。在系统软件级别都是用 log 来记录操作,将服务分解成一条条指令,然后交给自己的状态机执行。状态机之间只要确保 log 已相同顺序写入,那么就可以确保彼此的状态是相同的

log 的作用我可能一言两语概括不完,好多东西都需要自己悟的,可以多去看看 2 - Replicated state machines 中关于状态机和 log 的描述

好,回归正题。Lab2B: Log Replication 就是解决日志同步的问题,即如何确保集群中服务器拥有同样的数据

首先,让我们看一下 Raft 节点的工作流程

S1 - leader上任即初始化

在 Lab2A: Leader Election 中我们有详细讲解到 Raft 节点的 S1 - 角色转换 的规则,在本文中将不再赘述。当 candidate 获得过半选票后成为 leader,它第一件事就是要向集群中广播 AppendEntries 心跳包,即告诉所有人,它是 leader

在此之前还需要进行一些初始化的工作,尤其是 nextIdxs[]matchIdxs[] ,前者记录了下一次应该从日志的何处开始向第 i i i 位 follower 发送 AppendEntries ;而后者是 leader 自己的私密小抄本,用来记录第 i i i 位 follower 已经复制日志到何处了。且看一下 raft.go:run() 中新加的部分,

func (rf *Raft) run() {
	for !rf.killed() {
		switch rf.role {
		case Follower:
			select {
			case <-rf.grantVoteCh:
			case <-rf.heartBeatCh:
			case <-time.After(randElectionTimeOut()):
				rf.role = Candidate
			}
			break
		case Candidate:
			rf.mu.Lock()
			rf.curTerm++
			rf.votedFor = rf.me
			rf.voteCount = 1
			rf.mu.Unlock()

			go rf.boatcastRV()
			select {
			case <-time.After(randElectionTimeOut()):
			case id := <-rf.heartBeatCh:
				/* 一定是收到了来自集群中同期 OR 任期比它大的 leader 的心跳包 */
				rf.mu.Lock()
				rf.role = Follower
				rf.votedFor = id.int /* 被动回滚至 follower */
				rf.mu.Unlock()
			case <-rf.leaderCh:
				/* 赢得了选举 */
				rf.mu.Lock()
				rf.role = Leader
				/*------------Lab2B Log Replication----------------*/
				rf.nextIdxs = make([]int, len(rf.peers))
				rf.matchIdxs = make([]int, len(rf.peers))
				for i := range rf.peers {
					rf.nextIdxs[i] = rf.lastLogIdx() + 1
					rf.matchIdxs[i] = 0
				}
				rf.mu.Unlock()
			}
			break
		case Leader:
			rf.boatcastAE()
			time.Sleep(fixedHeartBeatTimeOut())
			break
		}
		time.Sleep(10 * time.Millisecond)
	}
}

第 32 行之后就是初始化的相关工作,nextIdxs[] 赋值为最后一条目的下一个位置,为什么是下一个位置呢?因为待会 client 会调用 Start() 向日志中追加一新条目;matchIdxs[] 就默认为 0 即可,因为之后的每次 AppendEntries 都会一并更新 nextIdxs[]matchIdxs[] ,而且 matchIdxs[] 更多涉及的是 commit 日志相关的流程,与发送 AppendEntries 关系不大

另外,第 47 行通过休眠 10 ms 来暂缓了 Raft 节点的事件循环,我一开始并没有意识到,后来看到了 Lab2: Raft 实验主页 的提示才添加了这句话,原话是这样的,

Your code may have loops that repeatedly check for certain events. Don’t have these loops execute continuously without pausing, since that will slow your implementation enough that it fails tests. Use Go’s condition variables, or insert a time.Sleep(10 * time.Millisecond) in each loop iteration.

我翻译一下,即是不要让事件循环一直连续执行,因为这会减慢实现速度,导致测试失败。使用 Go 的条件变量 OR 选择在每个循环迭代中休眠 10 ms

这里要吐槽一下,虽然 golang 的并发模型很好用,但是还是没有达到特别精准的程度,其中还是有很多坑的

Raft 节点也添加了一些字段,其中的 commitCh 用来感知自己是否应该提交日志条目了,appliedIdxcommitIdx 是一对,可以理解为日志的固定窗口,

//
// A Go object implementing a single Raft peer.
//
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	/*------------Lab2A Leader Election----------------*/
	role      Role /* 三个角色 */
	voteCount int  /* 选票数 */
	curTerm   int  /* 任期号 */
	votedFor  int  /* 投给谁了 */

	grantVoteCh chan struct{}      /* 是否收到了拉票请求 */
	leaderCh    chan struct{}      /* 是否赢得了选举 */
	heartBeatCh chan struct{ int } /* 感知 leader 发来的心跳包 */

	/*------------Lab2B Log Replication----------------*/
	log        []LogEntry    /* 日志序列 */
	commitIdx  int           /* 最新一次提交的日志条目编号 */
	appliedIdx int           /* 最新一次回应 client 的日志条目编号 */
	nextIdxs   []int         /* 明确下一次应该发送给 followers 哪条日志条目 */
	matchIdxs  []int         /* 统计已提交该条目的 followers 个数 */
	commitCh   chan struct{} /* 感知是否应该提交日志条目 */
	applyCh    chan ApplyMsg /* 应用于状态机后需要告知 clients */
}

前者的下一个位置是窗口的起始位置,后者是窗口的末尾;applyCh 是对外的接口,Raft 节点将已经应用于状态机的条目返回给 client。需要注意,applyCh 是同步 channel,MIT 6.824 Lab2 QA 中提醒不可以在对其写入的过程中上锁,这样有可能导致死锁。讲到这里,就可以扯出 commit 的相关操作了,且看代码,

func (rf *Raft) commit() {
	for !rf.killed() {
		select {
		case <-rf.commitCh:
			rf.mu.Lock()
			for i := rf.appliedIdx + 1; i <= rf.commitIdx; i++ {
				msg := ApplyMsg{CommandIndex: i, CommandValid: true, Command: rf.log[i].Cmd}
				rf.mu.Unlock()
				rf.applyCh <- msg
				rf.mu.Lock()
				rf.appliedIdx = i
			}
			rf.mu.Unlock()
		}
	}
}

raft.go:Make() 中通过 go rf.commit() 开启提交协程,该协程一直在线,一直盯着 commitCh ,感知自己是否有新的日志条目可以提交。第 6 行即是固定窗口的手法,在第 8 行放锁确保写入 applyCh 不会阻塞

S2 - leader发送AppendEntries

leader 在 raft.go:run() 中会调用 raft.go:boatcastAE() 向集群广播 AppendEntries 心跳包,且看具体实现,

func (rf *Raft) boatcastAE() {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	/* 所有 peers 应该收到相同的 AE 包 */
	for i, _ := range rf.peers {
		if i != rf.me && rf.role == Leader {
			go func(id int) {
				args := AppendEntriesArgs{
					Term:     rf.curTerm,
					LeaderId: rf.me,
				}
	
				/*------------Lab2B Log Replication----------------*/
				args.LeaderCommit = rf.commitIdx
				if rf.nextIdxs[id]-1 <= rf.lastLogIdx() { /* backup test 不加以限制可能会越界 */
					args.PrevLogIdx = rf.nextIdxs[id] - 1
				} else {
					args.PrevLogIdx = rf.lastLogIdx() /* 意味着发送不含日志条目的心跳包 */
				}
				args.PrevLogTerm = rf.log[args.PrevLogIdx].Term
	
				if rf.nextIdxs[id] <= rf.lastLogIdx() { /* rejoin test 不加以限制可能会越界 */
					args.Entries = make([]LogEntry, len(rf.log[rf.nextIdxs[id]:]))
					copy(args.Entries, rf.log[rf.nextIdxs[id]:])
				}
	
				reply := AppendEntriesReply{}
				rf.sendAppendEntries(id, &args, &reply)
			}(i)
		}
	}
}

要确保发送给每位 follower 的心跳包其背后依据的日志是一样,只有日志相同,才能保证数据的一致性,即心跳包的内容是同等程度新的。这就需要通过上锁来保证

第 16 行和第 23 行增加了越界判断,因为 nextIdx 有时候可能会超出日志的范围,我们需要对其加以限制,不然在访问条目任期和内容时会出现错误。另外,如果越界了,就将 PrevLogIdx 拉回日志的最后一条目,并向该 follower 发送不含日志条目的心跳包

同样在 appendEntries.go:appendEntries() 中也有对应的应对手段,即是检查发过来的 AE 包是否是空包,将在 S3 - follower接收AppendEntries 中展开叙述

PrevLogIdx 设置为下一次发送的条目的前一个位置,即 rf.nextIdxs[id]-1PrevLogTerm 是对应位置上的任期;特别需要注意,不能直接将 rf.log[rf.nextIdxs[id]:] 赋值给 args.Entries ,这样做是浅拷贝,在并发的环境下会带来致命的错误。这里需要重新创建副本,调用 make()copy() 进行日志条目的深拷贝

S3 - follower接收AppendEntries

当收到 AppendEntries 之后,会做一些常规的判断,例如发送方的任期是否较新,对应论文图 2 中 AppendEntries RPC 的 Receiver 的第一条,

没我新,则直接拒绝。且看一下 appendEntries.go:AppendEntries() 代码,有点长,

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	reply.Success = false
	reply.Term = rf.curTerm
	/*----backup 未优化的手段----*/
	//reply.NextIdx = rf.lastLogIdx() + 1

	if args.Term < rf.curTerm {
		return
	}

	/* 心跳包只对 follower 和 candidate 管用,leader 是不会响应它的 */
	rf.heartBeatCh <- struct{}{}
	/* 主要为了让旧 leader 收到了新 leader 的心跳包后而被迫退位 */
	if args.Term > rf.curTerm {
		rf.curTerm = args.Term
		rf.role = Follower
		rf.votedFor = NoBody
	}

	/*------------Lab2B Log Replication----------------*/
  	/* 已提交的日志条目不允许覆盖 */
	if args.PrevLogIdx < rf.commitIdx {
		reply.XTerm = XTermCommitted
		reply.XIdx = rf.commitIdx + 1
		return
	}
  
	if rf.lastLogIdx() < args.PrevLogIdx { /* 违法下标,越界了 */
		reply.XTerm = -1
		reply.XLen = len(rf.log)
		return
	}

	/* 相同 index 但 term 不同 */
	if rf.log[args.PrevLogIdx].Term != args.PrevLogTerm {
		/*----backup 未优化的手段----*/
		/* 删掉冲突点及其之后的所有条目 */
		//rf.log = rf.log[:args.PrevLogIdx]
		//reply.NextIdx = rf.lastLogIdx() + 1

		reply.XTerm = rf.log[args.PrevLogIdx].Term
		reply.XIdx = args.PrevLogIdx
		for i := args.PrevLogIdx; i >= 0; i-- { /* 从当前位置开始向前扫 */
			/* 定位到 XTerm 的第一个日志条目 */
			if rf.log[i].Term != reply.XTerm {
				reply.XIdx = i + 1
				break
			}
		}
		return
	}

	reply.Success = true

  	if args.Entries == nil {
		goto COMMITL
	}
  
  	/* leader 发来的 AE 包中含有日志条目 */
	//for i, entry := range args.Entries { /* 寻找重叠部分的冲突点 */
	//	index := args.PrevLogIdx + i + 1
	//	if index > rf.lastLogIdx() {
	//		rf.log = append(rf.log, entry)
	//	} else {
	//		/* 冲突点,相同 index 不同 term */
	//		if rf.log[index].Term != entry.Term {
	//			rf.log = rf.log[:index]
	//			rf.log = append(rf.log, entry)
	//		}
	//		/* 相同 index 处的日志条目的 term 相同,说明存储的命令是相同的 AND 之前的所有日志也是相同的 */
	//	}
	//}

	/* 找到了结合点 BUT 只能在其后 Append-Only 日志条目 */
	rf.log = rf.log[:args.PrevLogIdx+1]
	rf.log = append(rf.log, args.Entries...)

COMMITL:
	/* follower 通过 LeaderCommit 来感知日志条目已提交到何处 AND 始终应该比 leader 慢一点 */
	if args.LeaderCommit > rf.commitIdx { /* 对应论文 Figure 2 中的 AppendEntries Receiver 的第 5 条 */
		rf.commitIdx = args.LeaderCommit
		if args.LeaderCommit > rf.lastLogIdx() {
			rf.commitIdx = rf.lastLogIdx()
		}
		rf.commitCh <- struct{}{} /* 告知主线程,现在可以提交了 因为 leader 都提交了,follower 跟着交就行了 */
	}
}

起初和 Lab2A: Leader Election 一样,主要为了让旧 leader 收到了新 leader 的心跳包后而被迫退位,这个部分并没有做修整;之后就都是新增内容了,这部分主要就是首先寻找冲突点,然后,删除其后的条目,接着再强制复制来自 leader 的条目,对应论文图 2 中 AppendEntries RPC 的 Receiver 的第三条和第四条。在代码中对应,

/* leader 发来的 AE 包中含有日志条目 */
for i, entry := range args.Entries { /* 寻找重叠部分的冲突点 */
  	index := args.PrevLogIdx + i + 1
	if index > rf.lastLogIdx() {
    	rf.log = append(rf.log, entry)
 	} else {
	    /* 冲突点,相同 index 不同 term */
	    if rf.log[index].Term != entry.Term {
	      rf.log = rf.log[:index]
	      rf.log = append(rf.log, entry)
	    }
	    /* 相同 index 处的日志条目的 term 相同,说明存储的命令是相同的 AND 之前的所有日志也是相同的 */
  	}
}

通过遍历 args.Entries 与 Raft 节点的日志进行比对,如果相同位置处的条目任期不同,则认为该点为冲突点;解决冲突点问题,即是将冲突点之后的条目通通删除,然后再将 args.Entries 中冲突点之后的条目复制到 Raft 节点日志中

另外,如果 leader 发来的是空包,即只有心跳,可以直接跳转到 commit 环节,

if args.Entries == nil {
	goto COMMITL
}

这里,我采用 goto 跳转是因为我不想将寻找冲突点的代码嵌入在 if 里,我认为 if 里面套着 if 或 for 的代码是非常丑陋的,除非万不得已,所以请别这样做

其中,需要记住两条原则性的常识,

  1. 如果两条日志的相同索引位置存储的条目的任期是相同的,那么可以认定它们存储的命令也是相同的;
  2. 如果两条日志的相同索引位置存储的条目的任期是相同的,那么在此之前的所有条目也都是相同的

论文中说是这么说的,但是它说得很隐晦,很绕,并没有教我们怎么做。我这样写,也是参考大多数人的做法。但是,我自己的想法是与其一个一个的比较,不如不找冲突点了,在 PrevLogTermrf.log[PrevLogIdx].Term 对上的情况下,直接将所有 args.Entries 复制到日志中,

/* 找到了结合点 BUT 只能在其后 Append-Only 日志条目 */
rf.log = rf.log[:args.PrevLogIdx+1]
rf.log = append(rf.log, args.Entries...)

替换冲突点解决方案,我认为是可行的,而且测试结果也是相同的

还需要注意的地方,即是 follower 中已经被提交的日志条目不能被 leader 覆盖,体现在代码中即是,

/* 已提交的日志条目不允许覆盖 */
if args.PrevLogIdx < rf.commitIdx {
  	reply.XTerm = XTermCommitted
  	reply.XIdx = rf.commitIdx + 1
  	return
}

最后,进行 commit,对应论文图 2 中 AppendEntries RPC 的 Receiver 的第五条,选择较小的值更新自己的 commitIdx 。在这套流程中,follower 需要及时提交已复制的条目,具体是通过 leader 发来的下一次的心跳包来判断自己是否应该提交上一次复制的条目。对应在代码中,

if args.LeaderCommit > rf.commitIdx { /* 对应论文 Figure 2 中的 AppendEntries Receiver 的第 5 条 */
	rf.commitIdx = args.LeaderCommit
	if args.LeaderCommit > rf.lastLogIdx() {
		rf.commitIdx = rf.lastLogIdx()
	}
	rf.commitCh <- struct{}{} /* 告知主线程,现在可以提交了 因为 leader 都提交了,follower 跟着交就行了 */
}

查看 leader 的提交进度,如果 leader 已经回应了 client(提交条目并应用到状态机),那么 follower 要赶紧提交上一次复制的条目。总结下来,follower 的提交进度应该比 leader 慢一拍

其中,用来判断 leader 发来的 PrevLogIdx 是否越界了,如果超出了日志的范围,那么就直接返回同步失败的讯息,

if rf.lastLogIdx() < args.PrevLogIdx {
	return
}

告诉 leader,将下次再发送前一条日志试一试,看看是否能匹配上,对应在 appendEntries.go:sendAppendEntries()rf.nextIdxs[i]--

这里我采用了 backup 优化策略,稍后再提及。目前,我们先按照论文中所介绍的方法,逐一递减 nextIdx

如果遇到相同位置但任期不同的情况,可将该点视为冲突点,采用对待冲突的方法,即是将冲突点之后的条目通通删掉,然后再复制上传过来的条目,

if rf.log[args.PrevLogIdx].Term != args.PrevLogTerm {
	/*----backup 未优化的手段----*/
	/* 删掉冲突点及其之后的所有条目 */
	rf.log = rf.log[:args.PrevLogIdx]
	reply.NextIdx = rf.lastLogIdx() + 1
	return
}

这样,一整套将 follower 接收到 AppendEntries 的流程跑了下来。从理论上是对的,但是从实践的角度出发,达成共识(日志复制)的速度还是不够快,尤其是在 backup 测试中

于是,我们想办法是否能将冲突点 nextIdx 递减的速度加快,让 leader 选择下一次发送时更为准确的条目,而不是通过递减的手段进行一一试错来判断该条目是否合适

课程中也提到了优化的手法,先看一下 AppendEntries RPC 的结构体定义,

type AppendEntriesArgs struct {
	Term         int
	LeaderId     int
	PrevLogTerm  int
	PrevLogIdx   int
	Entries      []LogEntry
	LeaderCommit int
}

type AppendEntriesReply struct {
	Term    int
	Success bool
	//NextIdx int
	XTerm int /* 冲突点条目的任期 */
	XIdx  int /* XTerm 期间的第一条目位置 */
	XLen  int /* follower 所拥有的日志的长度 */
}

其中的 Args 是致敬原文图 2 中的结构,并未做删改添加;之后的 Reply 添加了 XTermXIdxXLen 三个字段,分别记录 follower 日志中冲突点的任期、该任期内的第一条目的位置以及 follower 此时的日志长度。三者的作用且许我慢慢道来

原先如果 PrevLogIdx 越界了,我们会告诉 leader:下一次递减一位再发来;现在优化过后,

if rf.lastLogIdx() < args.PrevLogIdx { /* 违法下标,越界了 */
  reply.XTerm = XTermOutRange
  reply.XLen = len(rf.log)
  return
}

告诉 leader:我当前日志的长度,即可;原先如果 PrevLogIdx 处有冲突,则采取直接删除的策略,现在优化成,

/* 相同 index 但 term 不同 */
if rf.log[args.PrevLogIdx].Term != args.PrevLogTerm {
  /*----backup 未优化的手段----*/
  /* 删掉冲突点及其之后的所有条目 */
  //rf.log = rf.log[:args.PrevLogIdx]
  //reply.NextIdx = rf.lastLogIdx() + 1

  reply.XTerm = rf.log[args.PrevLogIdx].Term
  reply.XIdx = args.PrevLogIdx
  for i := args.PrevLogIdx; i >= 0; i-- { /* 从当前位置开始向前扫 */
    /* 定位到 XTerm 的第一个日志条目 */
    if rf.log[i].Term != reply.XTerm {
      reply.XIdx = i + 1
      break
    }
  }
  return
}

不采用一股脑的删除策略了,而是去 follower 日志中寻找冲突点任期内的第一条目并记下其下标。我采用较为暴力的法子,即是从当前位置开始向前扫,直到找到一条不为当前任期的条目。这里,也可以采用二分查找法,我懒得去实现了,读者可以试一下。为什么可以采用二分查找法?因为日志是有顺序的,任期从小到大,符合二分查找的条件

Raft 算法采用多数选票机制来确保安全性,candidate 必须拿到过半选票才能当选为 leader,而拿到过半选票的前提条件是 candidate 的日志要足够新;虽然 leader 可以强制覆盖 follower 的日志,但是新当选的 leader 一定是集群中还在线的机器中最新的那一个。所以就不存在已提交的日志条目被后来的 leader 覆盖的情况

以上就是 follower 在接收到 AppendEntries RPC 后该有的反应

S4 - leader收到AppendEntries 回信

在代码中对应的是 appendEntries.go:sendAppendEntries() ,我们先看一下代码吧,

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
	ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)

	rf.mu.Lock()
	defer rf.mu.Unlock()

	if !ok {
		return ok
	}

	term := rf.curTerm
	/* 自身过期的情况下,不需要在维护 nextIdx 了 */
	if rf.role != Leader || args.Term != term {
		return ok
	}

	/* 仅仅是被动退位,不涉及到需要投票给谁 */
	if reply.Term > term {
		rf.curTerm = reply.Term
		rf.role = Follower /* 主动回滚至 follower */
		rf.votedFor = NoBody
		return ok
	}

	/*------------Lab2B Log Replication----------------*/
	if reply.Success {
		//rf.nextIdxs[server] += len(args.Entries)
		rf.nextIdxs[server] = args.PrevLogIdx + len(args.Entries) + 1
		/* 目前第 i 号 follower 的日志已复制到 nextIdx-1 处  */
		rf.matchIdxs[server] = rf.nextIdxs[server] - 1

		/* 对应论文 Figure 2 的 Rules for Servers 的 Leaders 最后一条 */
		N := rf.commitIdx
		for i := N + 1; i <= rf.lastLogIdx(); i++ {
			count := 1
			for id := range rf.peers {
				/* leader 只会统计自己任期内所追加的日志条目,这样设计划分了新旧 leader 之间的职责,即每一任的 leader 顾好自己任期内的事 */
				if id != rf.me && rf.matchIdxs[id] >= i && rf.log[i].Term == rf.curTerm {
					count++
				}
			}

			if count > len(rf.peers)/2 { /* 大多数 followers 已经复制了第 i 号之前(包括)的日志条目 */
				N = i
				break
			}
		}

		if N > rf.commitIdx {
			rf.commitIdx = N
			/* 在发送 AE 包之前,要将上次那些已经被大多数 followers 复制的条目提交上去 */
			rf.commitCh <- struct{}{}
		}
	} else {
		/*----backup 未优化的手段----*/
		//rf.nextIdxs[server] = reply.NextIdx

		if reply.XTerm == XTermOutRange { /* follower 告知上次发来的 AE 包内的日志条目已超范围 */
			rf.nextIdxs[server] = reply.XLen
		} else if reply.XTerm == XTermCommitted { /* 上次发来的 AE 包欲覆盖已提交的日志条目 */
			rf.nextIdxs[server] = reply.XIdx
		} else {
			termNotExist := true

			for i := rf.lastLogIdx(); i >= 1; i-- { /* 从后往前扫,寻找 XTerm 的最后一个条目 */
				if rf.log[i].Term == reply.XTerm {
					termNotExist = false
					rf.nextIdxs[server] = i
					break
				}

				if rf.log[i].Term < reply.XTerm {
					break
				}
			}

			if termNotExist {
				rf.nextIdxs[server] = reply.XIdx
			}
		}
	}

	return ok
}

reply.success 部分是 Lab2B: Log Replication 新增的内容,如果 follower 成功复制了发过去的日志条目,则更新 nextIdxmatchIdx ,S1 - leader 上任初始化 中已经讲述了两者的作用

之后就是 commit 环节,要严格遵照论文图 2 中 Rules for Servers 的 Leaders 最后一条,其大意是统计一下有多少 followers 已成功复制了当前任期内发送的日志条目

如果过半 followers 已成功复制,leader 则顺理成章 commit 条目,应用状态机以及回应 client,这一套一气呵成

另外,在论文 5.4.2 - Committing entries from previous terms 提到 leader 应该只统计自己任期内所追加的日志条目。这样设计划分了新旧 leader 之间的职责,即每一任的 leader 顾好自己任期内的事。当前任期内的日志条目成功 commit,从而间接保证了之前任期的条目也顺利 commit 了

这个道理需要慢慢想,我也是琢磨了很久,现在也不是很透彻。但是它就是这么规定的,我们在学习中会经常遇到一些自己当时不太能理解的问题,其实与其去刨根问底 OR 纠缠不清,不如记住这个道理,使用即可,不必知道为什么。待自己境界到了,自然也就可以悟出其中的道理了

有个非常重要的规则,即是已 commit 的日志条目不可以被覆盖!!!已 commit 意味着 leader 已将该条目应用至状态机,并积极回应了 client。试想,如果被覆盖了,那么 client 可能会在同一个位置看到多份不同的条目,这是彻头彻尾的错误

接下来,要重点讲讲同步失败的情况,S3 - follower 接收 AppendEntries 提到论文中是采用 nextIdx 递减的策略来一一试错,速度很慢

我们采用了 XTerm 的方法记录冲突点的相关讯息,leader 收到回信后,如果发现 follower 的日志较短,远远没到达 PrevLogIdx 所在的位置,那么就会将 nextIdx 置为 follower 日志的长度,即日志的最后一条目的下一个位置。对应代码中的,

if reply.XTerm == XTermOutRange { /* follower 告知上次发来的 AE 包内的日志条目已超范围 */
	rf.nextIdxs[server] = reply.XLen
}

再者,如果上次发送的日志条目欲覆盖 follower 已有的内容,那么需要下一次发送的条目需要从 follower 返回的 commitIdx 之后选取,

} else if reply.XTerm == XTermCommitted { /* 上次发来的 AE 包欲覆盖已提交的日志条目 */
	rf.nextIdxs[server] = reply.XIdx
}

正常情况下,确认回信中提到的冲突点的任期在 leader 日志中是否存在。如果存在,则将 nextIdx 置为该任期内最后一条目的位置;反之,置为回信的 XIdx

这里也可以采用二分查找法,我为了省事选择了暴力法,读者可以进行调整

至此,差不多讲完了 leader 收到 AppendEntries 回信后的反应

S5 - candidate选举限制

在 5.4.1 - Election restriction 中提及到日志何为最新,有两个评判标准,

  1. 如果两条日志的最新条目任期不同,那么任期号大的日志是最新的;
  2. 如果日志以相同的任期结束,那么哪个日志越长,哪个日志就越最新

体现在 requestVote.go:RequestVote() 中即是 up2date 布尔值,

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	/* 默认不投票 */
	reply.VoteGranted = false
	reply.Term = rf.curTerm

	if args.Term < rf.curTerm {
		return
	}

	if args.Term > rf.curTerm { /* 也可能是为了镇压任期较旧的 leader */
		rf.curTerm = args.Term
		rf.role = Follower
		rf.votedFor = NoBody /* 为臣服做准备 */
	}

	idx := rf.lastLogIdx()
	term := rf.lastLogTerm()
	up2date := false

	if args.LastLogTerm > term { /* 日志任期大的新 */
		up2date = true
	}

	if args.LastLogTerm == term && args.LastLogIdx >= idx { /* 任期相同,日志长的新 */
		up2date = true
	}

	/* 对应论文中 5.4.1 - Election restriction */
	if (rf.votedFor == NoBody || rf.votedFor == args.CandidateId) && up2date {
		rf.role = Follower
		rf.votedFor = args.CandidateId /* 臣服于 leader */
		reply.VoteGranted = true
		rf.grantVoteCh <- struct{}{} /* 如果投票给他人,那么就需要重置自己的 ElectionTimeOut */
	}
}

这部分比较简单,需要修改的地方也不多,重点即在第 24~30 行,读者可自行品鉴

S6 - defs.go约定俗成和实现Start()

defs.go 中定义了两个常用的函数 lastLogIdx()lastLogTerm() ,以及一些常用的枚举变量,

const (
	NoBody           = -1
	Follower         = 0
	Candidate        = 1
	Leader           = 2
	ChanCap          = 100
	ElectionTimeOut  = 250 * time.Millisecond /* 要远大于论文中的 150-300 ms 才有意义,当然也要保证在 5 秒之内完成测试 */
	HeartBeatTimeOut = 100 * time.Millisecond /* 心跳 1 秒不超过 10 次 */
	XTermOutRange    = -1
	XTermCommitted   = -2
)

/* Raft 节点最后一条日志的编号 */
func (rf *Raft) lastLogIdx() int {
	return rf.log[len(rf.log)-1].Idx
}

/* Raft 节点最后一条日志的任期号 */
func (rf *Raft) lastLogTerm() int {
	return rf.log[len(rf.log)-1].Term
}

通过前者可以获取日志最后一条目的下标;而后者返回最后一条目的任期

raft.go:Start() 是对外的接口,与 client 进行交互。client 向 leader 发送一条命令请求,leader 需要将其追加其日志中,并将其同步至集群。代码如下,

func (rf *Raft) Start(command interface{}) (int, int, bool) {
	// Your code here (2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	index := -1
	term := rf.curTerm
	isLeader := rf.role == Leader

	if isLeader {
		rf.log = append(rf.log, LogEntry{Idx: rf.lastLogIdx() + 1, Term: term, Cmd: command})
		index = rf.lastLogIdx()
	}

	return index, term, isLeader
}

至此,已然讲明白了 Lab2B: Log Replication 整个一套流程

V. Result

golang 比较麻烦,它有 GOPATH 模式,也有 GOMODULE 模式,6.824-golabs-2020 采用的是 GOPATH,所以在运行之前,需要将 golang 默认的 GOMODULE 关掉,

$ export GO111MODULE="off"

随后,就可以进入 src/raft 中开始运行测试程序,

$ go test -run 2B

仅此一次的测试远远不够,可以通过 shell 循环,让测试跑个千把次,

$ for i in {1..1000}; go test -run 2B    

这样,如果还没错误,那应该是真的通过了。分布式的很多 bug 需要通过反复模拟才能复现出来的,它不像单线程程序那样,永远是幂等的情况。也可以用我写的脚本 test_2b.py,

import os

ntests = 100
nfails = 0
noks = 0

if __name__ == "__main__":
  for i in range(ntests):
    print("*************ROUND " + str(i+1) + "/" + str(ntests) + "*************")

    filename = "out" + str(i+1)
    os.system("go test -run 2B | tee " + filename)
    with open(filename) as f:
      if 'FAIL' in f.read():
        nfails += 1
        print("✖️fails, " + str(nfails) + "/" + str(ntests))
        continue
      else:
        noks += 1
        print("✔️ok, " + str(noks) + "/" + str(ntests))
        os.system("rm " + filename)

我已经跑过千次,无一 FAIL。之后的 Lab3: Fault-tolerant Key/Value Service 和 Lab4: Sharded Key/Value Service 都是基于 Lab2: Raft 的,要确保你实现的 Raft 算法没有 bug,不然 Labs 越做到后面越难受

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/22102.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 栈和队列OJ题目分享

目录 有效的括号&#xff08;括号匹配&#xff09;用栈实现队列用队列实现栈设计循环队列 有效的括号&#xff08;括号匹配&#xff09; 链接: link 题目描述&#xff1a; 题目思路&#xff1a; 1、如果是左括号“&#xff08; { [ ”就入栈 2、如果是右括号“&#xff09; }…

程序员:面试造火箭,入职拧螺丝?太难了···

刚开始工作的时候&#xff0c;我也想不通这个问题&#xff0c;甚至很鄙视这种现象。后面当了面试官&#xff0c;做到了公司中层管理&#xff0c;也会站在公司以及行业角度去重新思考这个问题。 为什么这种现象会越来越普遍呢&#xff1f;尤其在 IT 行业愈加明显。 面试看的是…

Packet Tracer – VLAN 实施故障排除场景 2

Packet Tracer – VLAN 实施故障排除场景 2 拓扑图 地址分配表 设备 接口 IPv4 地址 子网掩码 默认网关 S1 VLAN 56 192.168.56.11 255.255.255.0 不适用 S2 VLAN 56 192.168.56.12 255.255.255.0 不适用 S3 VLAN 56 192.168.56.13 255.255.255.0 不适用 P…

头歌计算机组成原理实验—运算器设计(11)第11关:MIPS运算器设计

第11关&#xff1a;MIPS运算器设计 实验目的 学生理解算术逻辑运算单元&#xff08;ALU&#xff09;的基本构成&#xff0c;掌握 Logisim 中各种运算组件的使用方法&#xff0c;熟悉多路选择器的使用&#xff0c;能利用前述实验完成的32位加法器、 Logisim 中的运算组件构造指…

图神经网络:(处理点云)PointNet++的实现

文章说明&#xff1a; 1)参考资料&#xff1a;PYG官方文档。超链。 2)博主水平不高&#xff0c;如有错误还望批评指正。 3)我在百度网盘上传了这篇文章的jupyter notebook和有关文献。超链。提取码8848。 文章目录 简单前置工作学习文献阅读PointNet的实现模型问题 简单前置工作…

智慧井盖监测终端,智能井盖-以科技解决智慧城市“顽疾”,守护城市生命线

平升电子智慧井盖监测终端,智能井盖-以科技解决智慧城市“顽疾”,守护城市生命线-智慧井盖&#xff0c;实现对井下设备和井盖状态的监测及预警&#xff0c;是各类智慧管网管理系统中不可或缺的重要设备&#xff0c;解决了井下监测环境潮湿易水淹、电力供应困难、通讯不畅等难题…

XDP入门--BPF程序如何转发报文到其它网卡

本文目录 1、测试环境&#xff1a;2、实现的功能&#xff0c;使用bpf_redirect直接转发收到的报文到另外一张网卡3、测试步骤与测试结果 1、测试环境&#xff1a; 参照把树莓派改造成无线网卡(3)-----共享无线网络&#xff0c;无线网络转换成有线网络&#xff0c;让有线网络设…

插入排序、选择排序、冒泡排序小结(45)

小朋友们好&#xff0c;大朋友们好&#xff01; 我是猫妹&#xff0c;一名爱上Python编程的小学生。 和猫妹学Python&#xff0c;一起趣味学编程。 今日主题 插入排序、选择排序、冒泡排序有什么区别&#xff1f; 原理不同 插入排序是将未排序的元素逐个插入到已排序序列中…

Unity之ASE从入门到精通 目录

前言 Amplify Shader Editor (ASE) 是受行业领先软件启发的基于节点着色器创建工具。它是一个开放且紧密集成的解决方案,提供了熟悉和连贯的开发环境,使 Unity 的 UI 约定和着色器的使用无缝地融合一起 目录 这里是ASE从入门到精通专栏的目录,不停更新中,有问题随时留…

入门JavaScript编程:上手实践四个常见操作和一个轮播图案例

部分数据来源&#xff1a;ChatGPT 简介 JavaScript是一门广泛应用于Web开发的脚本语言&#xff0c;它主要用于实现动态效果和客户端交互。下面我们将介绍几个例子&#xff0c;涵盖了JavaScript中一些常见的操作&#xff0c;包括&#xff1a;字符串、数组、对象、事件等。 例子…

rk3568 适配rk809音频

rk3568 适配rk809音频 RK809是一款集成了多种功能的电源管理芯片&#xff0c;主要用于笔记本电脑、平板电脑、工控机等设备的电源管理。以下是RK809的详细功能介绍&#xff1a; 电源管理&#xff1a;控制电源的开关、电压、电流等参数&#xff0c;保证设备的稳定运行。音频管…

Unity之使用Photon PUN开发多人游戏教程

前言 Photon是一个网络引擎和多人游戏平台,可以处理其服务器上的所有请求,我们可以在 Unity(或其他游戏引擎)中使用它,并快速把游戏接入Photon的网络中,而我们就可以专注于在项目中添加逻辑,专注于游戏玩法和功能了。 PUN(Photon Unity Networking)是一种开箱即用的解…

什么是DevOps?如何理解DevOps思想?

博文参考总结自&#xff1a;https://www.kuangstudy.com/course/play/1573900157572333569 仅供学习使用&#xff0c;若侵权&#xff0c;请联系我删除&#xff01; 1、什么是DevOps? DevOps是一种思想或方法论&#xff0c;它涵盖开发、测试、运维的整个过程。DevOps强调软件开…

Maven方式构建Spring Boot项目

文章目录 一&#xff0c;创建Maven项目二&#xff0c;添加依赖三&#xff0c;创建入口类四&#xff0c;创建控制器五&#xff0c;运行入口类六&#xff0c;访问Web页面七&#xff0c;修改访问映射路径八&#xff0c;定制启动标语1、创建标语文件2、生成标语字符串3、编辑标语文…

DNDC模型在土地利用变化、未来气候变化下的建模方法及温室气体时空动态模拟实践技术

DNDC模型讲解 1.1 碳循环模型简介 1.2 DNDC模型原理 1.3 DNDC下载与安装 1.4 DNDC注意事项 ​ DNDC初步操作 2.1 DNDC界面介绍 2.2 DNDC数据及格式 2.3 DNDC点尺度模拟 2.4 DNDC区域尺度模拟 2.5 DNDC结果分析 ​ DNDC气象数据制备 3.1 数据制备中的遥感和GIS技术 3…

Vue3 + TypeScript + Uniapp 开发小程序【医疗小程序完整案例·一篇文章精通系列】

当今的移动应用市场已经成为了一个日趋竞争激烈的领域&#xff0c;而开发一个既能在多个平台上运行&#xff0c;又能够高效、可维护的应用则成为了一个急需解决的问题。 在这个领域中&#xff0c;Vue3 TypeScript Uniapp 的组合已经成为了一种受欢迎的选择&#xff0c;特别…

ODB 2.4.0 使用延迟指针 lazy_shared_ptr 时遇到的问题

最近在学习使用C下的ORM库——ODB&#xff0c;来抽象对数据库的CURD&#xff0c;由于C的ORM实在是太冷门了&#xff0c;ODB除了官方英语文档&#xff0c;几乎找不到其他好用的资料&#xff0c;所以在使用过程中也是遇到很多疑惑&#xff0c;也解决很多问题。近期遇到的一个源码…

推荐系统系列之推荐系统概览(下)

在推荐系统概览的第一讲中&#xff0c;我们介绍了推荐系统的常见概念&#xff0c;常用的评价指标以及首页推荐场景的通用召回策略。本文我们将继续介绍推荐系统概览的其余内容&#xff0c;包括详情页推荐场景中的通用召回策略&#xff0c;排序阶段常用的排序模型&#xff0c;推…

Keil Debug 逻辑分析仪使用

Keil Debug 逻辑分析仪使用 基础配置 更改对应的bebug窗口参数 两边的 Dialog DLL 更改为&#xff1a;DARMSTM.DLL两边的 Parameter &#xff08;这里的根据单片机型号更改&#xff09;更改为&#xff1a;-pSTM32F103VE 选择左边的 Use Simulator 选项。 打开Debug和其中的逻…

数据全生命周期管理

数据存储 时代"海纳百川&#xff0c;有容乃大"意味结构化、半结构和非结构化多样化的海量的 &#xff0c;也意味着批数据和流数据多种数据形式的存储和计算。面对不同数据结构、数据形式、时效性与性能要求和存储与计算成本等因素考虑&#xff0c;应该使用适合的存储…