【MIT6.824】lab3 Fault-tolerant Key/Value Service 实现笔记

引言

lab3A的实验要求如下:

Your first task is to implement a solution that works when there are no dropped messages, and no failed servers.

You’ll need to add RPC-sending code to the Clerk Put/Append/Get methods in client.go, and implement PutAppend() and Get() RPC handlers in server.go. These handlers should enter an Op in the Raft log using Start(); you should fill in the Op struct definition in server.go so that it describes a Put/Append/Get operation. Each server should execute Op commands as Raft commits them, i.e. as they appear on the applyCh. An RPC handler should notice when Raft commits its Op, and then reply to the RPC.

You have completed this task when you reliably pass the first test in the test suite: “One client”.

Add code to handle failures, and to cope with duplicate Clerk requests, including situations where the Clerk sends a request to a kvserver leader in one term, times out waiting for a reply, and re-sends the request to a new leader in another term. The request should execute just once. These notes include guidance on duplicate detection. Your code should pass the go test -run 3A tests.

lab3B的实验要求如下:

Modify your kvserver so that it detects when the persisted Raft state grows too large, and then hands a snapshot to Raft. When a kvserver server restarts, it should read the snapshot from persister and restore its state from the snapshot.

总体而言,我们需要在lab2所实现的raft系统上构建一个简单的key-value存储系统,这个系统需要支持客户端的Put/Append/Get操作,同时需要支持Raft的持久化和快照功能。本系统的要求是线性一致的,即每个动作都能被当做是在一个唯一的时刻进行原子执行的,具体一致性相关的内容,可查看之前的文章:分布式系统中的线性一致性。
代码可以在https://github.com/slipegg/MIT6.824中得到。所有代码均通过了1千次的测试。

lab3A 实现

lab3A不涉及到Raft的快照功能,主要是要完成整个系统功能的构建。在实验时测试3A时,测试代码将会不断调用客户端的Put/Append/Get操作,然后检查是否所有的操作都被正确执行。

首先通过一个map来存储key-value,如下中的KVMachine所示:

type KVMachine struct {
	KV map[string]string
}

func (kv *KVMachine) Get(key string) (string, Err) {
	value, ok := kv.KV[key]
	if !ok {
		return "", ErrNoKey
	}
	return value, OK
}

func (kv *KVMachine) Put(key string, value string) Err {
	kv.KV[key] = value
	return OK
}

func (kv *KVMachine) Append(key string, value string) Err {
	oldValue, ok := kv.KV[key]
	if !ok {
		kv.KV[key] = value
		return OK
	}
	kv.KV[key] = oldValue + value
	return OK
}

func newKVMachine() *KVMachine {
	return &KVMachine{make(map[string]string)}
}

然后是Client端的实现,首先Client在初始化时会随机生成一个数字当做自己的id,同时它也专门维护每个请求的唯一id。Client的Put/Append/Get操作都是通过RPC调用Server端的Put/Append/Get操作来实现的,如果Server端返回了错误,告诉当前Server不是leader,那么Client就会重新发送请求到下一个Server去,直到找到leader并执行请求成功了为止。Client端的PutAppend/Get操作的实现如下,Get也是类似,就是错误处理稍微不同,不再赘述:

func (ck *Clerk) PutAppend(key string, value string, op string) {
	DPrintf("{Clinetn-%d} try to %s {'%v': '%v'}\n", ck.clientId, op, key, value)
	args := PutAppendArgs{Key: key, Value: value, Op: op, ClientId: ck.clientId, RequestId: ck.requestId}
	for {
		var reply PutAppendReply
		if ck.servers[ck.leaderId].Call("KVServer.PutAppend", &args, &reply) && reply.Err == OK {
			DPrintf("{Clinetn-%d} %s {'%v': '%v'} success\n", ck.clientId, op, key, value)
			ck.requestId++
			break
		} else {
			ck.leaderId = (ck.leaderId + 1) % int64(len(ck.servers))
			time.Sleep(100 * time.Millisecond)
		}
	}
}

每个Server端都会维护一个KVMachine,并且也连接到一个专门的raft节点,它的主要作用就是将客户端的请求转化为raft节点的日志,然后等待raft节点将日志提交后接收到raft节点的信息,将日志应用到自己的KVMachine中,然后返回给客户端。

将客户端请求转化为日志传递给raft部分的代码如下,Get请求也是类似的。注意这里对于重复执行过的Put、Append会直接进行返回,因为运行结果只会是OK,所以直接返回OK即可,而Get请求不需要判断是否重复执行,因为Get请求需要获取的实最新的数据,来一次就执行一次即可。

func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
	// Your code here.
	defer DPrintf("{KVServer-%d} finishes %s {%s: %s}, the reply is %v\n", kv.me, args.Op, args.Key, args.Value, reply)
	kv.mu.RLock()
	if kv.isDuplicate(args.ClientId, args.RequestId) {
		kv.mu.RUnlock()
		reply.Err = OK
		return
	}
	kv.mu.RUnlock()

	logId, _, isLeader := kv.rf.Start(Op{PutAppendArgs: args})

	if !isLeader {
		reply.Err = ErrWrongLeader
		return
	}

	DPrintf("{KVServer-%d} try to %s {%s: %s} with logId: %d\n", kv.me, args.Op, args.Key, args.Value, logId)

	kv.mu.Lock()
	ch_putAppend := kv.getNotifyCh_PutAppend(logId)
	kv.mu.Unlock()

	select {
	case result := <-ch_putAppend:
		reply.Err = result.Err
	case <-time.After(MaxWaitTime):
		reply.Err = ErrTimeout
	}

	go func() {
		kv.mu.Lock()
		delete(kv.notifyChs_PutAppend, logId)
		kv.mu.Unlock()
	}()
}

当raft节点将日志分发给了大部分的节点后,就可以将日志提交,然后提醒Server端将日志应用到自己的KVMachine中。代码如下所示。注意对于Get请求,需要判断这时候节点是不是leader,Term是否还相同,以防止由于applyCh传递时间过长,这时候节点已经不是leader,没有最新的数据了。对于Put、Append操作需要判断是否已经是重复执行过的操作,如果是,直接标记为OK即可,不需要再次执行,同样也需要判断当前还是不是leader,如果是才有权限返回给客户端执行结果。

func (kv *KVServer) applier() {
	for !kv.killed() {
		select {
		case msg := <-kv.applyCh:
			if msg.CommandValid {
				kv.mu.Lock()
				if msg.CommandIndex <= kv.lastApplied {
					DPrintf("{KVServer-%d} reveives applied log{%v}", kv.me, msg)
					kv.mu.Unlock()
					continue
				}
				kv.lastApplied = msg.CommandIndex

				op := msg.Command.(Op)
				if op.GetArgs != nil {
					DPrintf("{KVServer-%d} apply get %v.", kv.me, op.GetArgs.Key)
					value, err := kv.kvMachine.Get(op.GetArgs.Key)
					reply := GetReply{Err: err, Value: value}

					if currentTerm, isLeader := kv.rf.GetState(); isLeader && currentTerm == msg.CommandTerm {
						if ch, ok := kv.notifyChs_Get[msg.CommandIndex]; ok {
							ch <- reply
						}
					}
				} else if op.PutAppendArgs != nil {
					var reply PutAppendReply
					if kv.isDuplicate(op.PutAppendArgs.ClientId, op.PutAppendArgs.RequestId) {
						DPrintf("{KVServer-%d} receives duplicated request{%v}\n", kv.me, msg)
						reply.Err = OK
					} else {
						DPrintf("{KVServer-%d} apply %s {%s: %s}.\n", kv.me, op.PutAppendArgs.Op, op.PutAppendArgs.Key, op.PutAppendArgs.Value)
						if op.PutAppendArgs.Op == "Put" {
							reply.Err = kv.kvMachine.Put(op.PutAppendArgs.Key, op.PutAppendArgs.Value)
						} else if op.PutAppendArgs.Op == "Append" {
							reply.Err = kv.kvMachine.Append(op.PutAppendArgs.Key, op.PutAppendArgs.Value)
						}
						kv.lastPutAppendId[op.PutAppendArgs.ClientId] = op.PutAppendArgs.RequestId
					}

					if _, isLeader := kv.rf.GetState(); isLeader {
						if ch, ok := kv.notifyChs_PutAppend[msg.CommandIndex]; ok {
							ch <- reply
						}
					}
				} else {
					DPrintf("{KVServer-%d} receives unknown command{%v}", kv.me, msg)
				}

				if kv.isNeedSnapshot() {
					DPrintf("{KVServer-%d} needs snapshot\n", kv.me)
					kv.snapshot(msg.CommandIndex)
				}
				kv.mu.Unlock()
			} 
        }
    }
}

lab3B 实现

这里主要需要实现Server的持久化和快照功能,每个Server有一个自己的persister,其结构如下:

type Persister struct {
	mu        sync.Mutex
	raftstate []byte
	snapshot  []byte
}

其中raftstate部分是raft节点存储自身持久化状态用的,而snapshot节点是用来给Server存储自身状态用的,包括了Server的KVMachine状态以及lastPutAppendId。在Server启动时,会从persister中读取raftstate和snapshot,然后根据raftstate来初始化raft节点,根据snapshot来初始化KVMachine和lastPutAppendId。代码如下所示:

func (kv *KVServer) reloadBySnapshot(snapshot []byte) {
	if snapshot == nil || len(snapshot) < 1 {
		return
	}

	var kvMachine KVMachine
	var lastPutAppendId map[int64]int64

	r := bytes.NewBuffer(snapshot)
	d := labgob.NewDecoder(r)
	if d.Decode(&kvMachine) != nil ||
		d.Decode(&lastPutAppendId) != nil {
		DPrintf("{KVServer-%d} reloadBySnapshot failed\n", kv.me)
	}

	DPrintf("{KVServer-%d} reloadBySnapshot succeeded\n", kv.me)
	kv.lastPutAppendId = lastPutAppendId
	kv.kvMachine = kvMachine
}

当Server在apply节点时,按照要求,如果raft的日志信息过大,就触发快照功能,将Server的状态保存到snapshot中,同时让raft节点生成快照。如下所示:

func (kv *KVServer) snapshot(lastAppliedLogId int) {
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	if mr, lr := e.Encode(kv.kvMachine), e.Encode(kv.lastPutAppendId); mr != nil ||
		lr != nil {
		DPrintf("{KVServer-%d} snapshot failed. kvMachine length: %v, result: {%v}, lastPutAppendId: {%v}, result: {%v},",
			kv.me, len(kv.kvMachine.KV), mr, kv.lastPutAppendId, lr)
		return
	}

	data := w.Bytes()
	kv.rf.Snapshot(lastAppliedLogId, data)
	DPrintf("{KVServer-%d} snapshot succeeded\n", kv.me)
}

由于快照的引入,Server也可能需要apply快照,即对上述的applier函数再多加一个msg类型的判断,如下所示:

else if msg.SnapshotValid {
				kv.mu.Lock()
				kv.reloadBySnapshot(msg.Snapshot)
				kv.lastApplied = msg.CommandIndex
				kv.mu.Unlock()
			}

相关问题

为什么Get操作不能直接读leader的本地数据?

在Raft系统中,当面临网络分区情况时,原本的leader如果位于一个小分区,那么他就不知道其实大分区中已经有了一个新leader了,这样如果client还是连接的原本的leader,并且是直接读取该leader的本地数据,那么就会面临读取到过时数据的问题,导致系统线性不一致。

所以解决这个问题的关键在于确定节点真的是leader,这里采取的是一个简单的方法,即将这个Get操作作为一个log日志放入raft系统中,直到raft系统将这个log日志提交后,才返回。实际上还有优化的空间,一个方法是在raft接受到了一个Get操作后,立刻执行心跳,如果接收到了过半的节点的心跳回复,那么就证明了这个节点是真的leader,这样就可以直接返回数据了,这就避免了将Get操作放入raft系统中的开销。还有一种方法是叫做Lease Read,它的吞吐更大,详情可参考深入浅出etcd/raft —— 0x06 只读请求优化。

applier中是否有机会出现重复执行的put、append操作?

有机会出现。例如当客户端发送后,Server将其提交给了Raft,但是Raft没有在规定时间内返回,那么就会返回超时,然后客户端再去循环提交一轮,再一次提交给这个节点的时候,节点此时可能还是没有收到Raft的返回,所以会再次提交给Raft,这样就会出现重复提交的情况。而在applier中就会只执行第一次提交的操作,后续的提交都会被忽略。

只用lastPutAppendId记录最后一次的Put、Append操作的id是否可行?

可行。因为系统中Put、Append操作的结果只会是ok,所以不需要记录每次的Put、Append操作的id,同时由于raft系统中一旦apply了就是永久apply了,并且前面的操作也都apply了,不存在回退的情况,所以如果当前操作的id小于最新一次Put、Append操作的id,那么就说明是重复执行了,直接返回ok即可。

运行结果

代码通过了1k次的测试,如下图所示。

请添加图片描述

参考资料

  • 深入浅出etcd/raft —— 0x06 只读请求优化

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/555854.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HiveSql中的函数家族(二)

一、窗口函数 1、什么是窗口函数 在 SQL 中&#xff0c;窗口函数&#xff08;Window Functions&#xff09;是一种特殊的函数&#xff0c;它允许在查询结果集的特定窗口&#xff08;通常是一组行&#xff09;上执行聚合、分析和计算操作&#xff0c;而无需聚合整个结果集。窗口…

使用Python工具库SnowNLP对评论数据标注(二)

这一次用pandas处理csv文件 comments.csv import pandas as pd from snownlp import SnowNLPdf pd.read_csv("C:\\Users\\zhour\\Documents\\comments.csv")#{a: [1, 2, 3], b: [4, 5, 6], c: [7, 8, 9]}是个字典 emotions[] for txt in df[sentence]:s SnowNLP(…

接收区块链的CCF会议--ICSOC 2024 截止7.24

ICSOC是CCF B类会议&#xff08;软件工程/系统软件/程序设计语言&#xff09; 2023年长文短文录用率22% Focus Area 4: Emerging Technologies Quantum Service Computing Digital Twins 3D Printing/additive Manufacturing Techniques Blockchain Robotic Process Autom…

【QT+OpenCV】车牌号检测 学习记录 遇到的问题

【QTOpenCV】车牌号检测 学习记录 首先在QT里面配置好OpenCV .pro文件中加入&#xff1a; INCLUDEPATH G:/opencv/build/include LIBS -L"G:/opencv/build/x64/vc14/lib"\-lopencv_core \-lopencv_imgproc \-lopencv_highgui \-lopencv_ml \-lopencv_video \-lo.c…

Meta Llama 3强势来袭:迄今最强开源大模型,性能媲美GPT-4

前言 Meta的最新语言模型Llama 3已经发布&#xff0c;标志着在大型语言模型&#xff08;LLM&#xff09;领域的一次重大突破&#xff0c;其性能在行业内与GPT-4相媲美。此次更新不仅提升了模型的处理能力和精确性&#xff0c;还将开源模型的性能推向了一个新的高度。 Huggingf…

Docker八股总结

1. 容器和虚拟机的区别 传统虚拟机技术是虚拟出一套硬件后&#xff0c;在其上运行一个完整操作系统&#xff0c;在该系统上再运行所需应用进程&#xff1b;而容器内的应用进程直接运行于宿主的内核&#xff0c;容器内没有自己的内核&#xff0c;而且也没有进行硬件虚拟。因此容…

2021年全国大学生电子设计竞赛D题——基于互联网的摄像测量系统(二)

09 电路设计 前面介绍了系统的硬件框图如下&#xff1a; 硬件基本分为三块&#xff0c;两个摄像节点&#xff0c;一个终端节点。 1. 摄像节点硬件 摄像节点由一个DE10-Nano开发板和一个D8M摄像头实现&#xff0c;DE10-Nano开发板的HDMI接口外接HDMI显示器来显示拍摄到的视频。…

Flask + Bootstrap vs Flask + React/Vue:初学者指南

在这篇博客文章中&#xff0c;我们将比较 Flask Bootstrap 和 Flask React/Vue 这两种技术栈&#xff0c;以帮助初学者了解哪种组合更适合他们的项目需求。我们将从学习曲线、易用性、依赖管理、构建部署和路由定义等方面进行比较。 学习曲线 Flask 是一个基于 Python 的轻…

信息系统项目管理师0055:优化和持续改进(4信息系统管理—4.1管理方法—4.1.5优化和持续改进)

点击查看专栏目录 文章目录 4.1.5优化和持续改进1.定义阶段2.度量阶段3.分析阶段4.改进/设计阶段5.控制/验证阶段4.1.5优化和持续改进 优化和持续改进是信息系统管理活动中的一个环节,良好的优化和持续改进管理活动能够有效保障信息系统的性能和可用性等,延长整体系统的有效使…

偏微分方程算法之一阶双曲差分法

目录 一、研究目标 二、理论推导 2.1 引言 2.2 迎风格式 2.3 完全不稳定差分格式 2.4 蛙跳格式&#xff08;Leapfrog&#xff09; 2.5 Lax-Friedrichs格式 2.6 Lax-Wendroff格式 2.7 Beam-Warming格式 2.8 隐格式 2.9 Courant-Friedrichs-Lewy条件&#xff08;CFL条…

一文学会时序约束

主时钟约束命令/生成时钟约束命令IO输入输出延迟约束命令及效果最大最小延迟命令及作用多周期路径怎么约束什么情况设置伪路径时钟组设置的三个选项 如果不了解时序分析可以先看下下面这篇文章&#xff1a; 数字IC/FPGA——时序分析 目录 1.时钟约束&#xff08;1&#xff09;…

线性代数---行列式的性质

1. 行列式的行与列(按原顺序)互换

redis的数据结构报错

文章目录 redis的数据结构报错Redis使用LocalDateTime报错问题 redis的数据结构报错 Redis使用LocalDateTime报错问题 SpringBoot整合Redis时&#xff0c;使用LocalDate以下报错 org.springframework.data.redis.serializer.SerializationException: Could not read JSON: C…

数字时代安全风险防范与保密科技创新

文章目录 前言一、新技术应用带来的保密挑战1.1 通过技术手段获取国家秘密和重要情报日益普遍1.2 新型信息技术存在的风险不容忽视 二、加强保密科技创新的必要性2.1 提高定密准确性2.2 及时变更密级或解密2.3 对失泄密事故案件进行自动高效的预警和初步处理 三、保密科技创新中…

Jenkins机器已经安装了ansible, 运行的时候却报错ansible: command not found

操作系统&#xff1a;MacOS Jenkins log提示 ansible: command not found 直接在Jenkins 机器中&#xff0c;进入一样的目录执行ansible --version OK 原因&#xff1a; Jenkins 默认使用的环境是 /usr/bin, 而我的ansible 安装配置在conda3 下面&#xff0c;所以需要在Jenkin…

OpenCV从入门到精通实战(四)——答题卡识别判卷系统

基于OpenCV的答题卡识别系统&#xff0c;其主要功能是自动读取并评分答题卡上的选择题答案。系统通过图像处理和计算机视觉技术&#xff0c;自动化地完成了从读取图像到输出成绩的整个流程。下面是该系统的主要步骤和实现细节的概述&#xff1a; 1. 导入必要的库 系统首先导入…

修改npm全局安装模式的路径

修改npm全局安装模式的路径 由于之前安装过nodejs&#xff0c;并且配置环境变量以及cache 、prefix 的信息&#xff1b; 由于项目需求安装最新版本的Nodejs&#xff0c;把环境变量的path相关目录进行调整&#xff0c;然后使用一下命令进行安装cnpm命令&#xff1b; npm insta…

本地启用并操作Redis

本篇文章将向各位讲解redis的基础用法&#xff0c;废话不多说我们直接开始吧&#xff01; 首先需要下载redis到你本地&#xff0c;我这儿是下载到以下文件夹中&#xff1a; 双击redis-server.exe文件运行redis&#xff1a; 然后我们另外启用一个命令窗口&#xff08;需要进入你…

决策树分类器(保姆级教学) 定义+特性+原理及公式+鸢尾花分类经典问题示例(完整Python代码带详细注释、保姆级分部代码解释及结果说明、决策树可视化及解释)

文章目录 引言定义特性基本原理和公式理解信息增益&#xff08;ID3算法&#xff09;熵的定义条件熵信息增益的计算 基尼不纯度&#xff08;CART算法&#xff09;基尼不纯度的定义基尼不纯度的计算例子 实现步骤解决鸢尾花分类问题&#xff08;机器学习入门中的经典案例Python代…

在Linux上用最原始的方式查看内存情况

2024年4月18日&#xff0c;周四上午 cat /proc/meminfo