故事里能毁坏的只有风景
谁也摧毁不了我们的梦境
弦月旁的流星划过了天际
我许下 的愿望 该向谁 去说明
——我是如此相信
完整代码见: https://github.com/SnowLegend-star/6.824
还是那句话,尽量只是参考思路而不是照抄
先阅读几遍实验说明的Introduction部分,Lab的核心就在下面这段话:
客户端可以向键值服务器发送三种不同的RPC:Put(key, value)、Append(key, arg)和Get(key)。服务器维护一个内存中的键值对映射。键和值都是字符串。Put(key, value)为特定键安装或替换值,Append(key, arg)将arg追加到键的值上并返回旧值,Get(key)获取键的当前值。对于不存在的键,Get应该返回一个空字符串。对于不存在的键,Append应该表现得像现有值是零长度字符串一样。每个客户端通过Clerk与服务器进行Put/Append/Get方法的RPC交互。Clerk管理与服务器的RPC交互。
Client有三个操作请求,而Server负责接收这三个操作请求并返回。那么该如何着手呢?先给KVServer声明一个map[string]string类型的映射来维护映射。有了操作对象,接下来就可以有的放矢了。
再看看什么是线性化:
操作的线性化(Linearizability)是分布式系统中一致性模型的一种,它确保操作看起来像是一次一个地立即执行的。具体来说,线性化具有以下特性:
- 实时性:每个操作在某个瞬间发生,这个瞬间称为线性化点(linearization point)。
- 顺序性:如果一个操作在另一个操作之前完成,那么第一个操作的线性化点必须在第二个操作的线性化点之前。
简单来说,线性化保证了系统操作的可见顺序与实际执行顺序一致,这对开发和调试分布式系统非常重要,因为它使系统行为更具可预测性。
线性化的具体要求
- 实时顺序:假设有两个操作A和B。如果操作A在操作B之前完成(即,A的调用返回时,B还未开始),那么A的效果对B应该是可见的。换句话说,操作B应该能够观察到操作A的结果。
- 原子性:操作要么完全发生,要么完全不发生。任何一个操作都不能部分完成。对于键值存储来说,这意味着读和写操作都必须是原子的。
例子
假设我们有一个键值存储系统,系统中有两个客户端,Client 1 和 Client 2。以下是操作序列:
Client 1 向键 "x" 写入值 "A"(Put("x", "A"))。
Client 2 从键 "x" 读取值(Get("x"))。
Client 1 将键 "x" 的值改为 "B"(Put("x", "B"))。
Client 2 再次从键 "x" 读取值(Get("x"))。
线性化要求:
如果 Client 2 在操作3之前进行操作2(即读取操作),它应该读取到的是值 "A"。
如果 Client 2 在操作3之后进行操作2(即读取操作),它应该读取到的是值 "B"。
如果 Client 2 在操作1之后进行操作2(即读取操作),它不能读取到操作1之前的任何值(即不能读取到空值或其他旧值)。
这样,任何并发操作的结果都必须与某个串行化执行顺序一致,即每个操作看起来都是在某个时刻瞬间完成的。
为什么线性化很重要?
线性化使得系统行为对开发人员和用户来说更加直观和可预测。它避免了许多由于并发和网络延迟导致的一致性问题,是构建可靠分布式系统的基础。例如,在银行系统中,如果转账操作不是线性化的,用户可能会看到账户余额不一致的情况,这会导致严重的问题。
总之,线性化确保了系统中的所有操作都可以按某种顺序一一排列,使得分布式系统在面对并发操作时仍然具有一致的行为。
了解了上述两点,可以正式开始阅读part01了。
Key/value server with no network failures (easy)
您的第一个任务是在没有消息丢失的情况下实现一个解决方案。
即不用考虑Server的回复丢失导致Client需要重新发送请求的情况。
您需要在client.go中向Clerk的Put/Append/Get方法添加RPC发送代码,并在server.go中实现Put、Append()和Get() RPC处理程序。
按部就班就行。注意一点——Server中所有涉及并发的方法应当加锁操作。
当您通过测试套件中的前两个测试时,您就完成了这个任务:“one client”和“many clients”。
假如part01没有出现眼花的情况应该是很快就可以完成的,但不巧的是我经典看岔代码导致出现了个bug。
- 要严格注意字符串拼接的顺序。
kv.kvStorage[args.Key] = args.Value + oldValue
我这里直接把顺序拼接反了,导致最后的输出和要求输出完美形成对称~
Key/value server with dropped messages (easy)
基本上通过了part01后,就只会有 “Test: unreliable net, many clients ...”这一个fail的测试点了。所以我们的注意力就只要集中在处理这一点上。
下面来看实验说明:
现在,您应该修改解决方案以在消息丢失的情况下继续运行(例如,RPC请求和RPC回复)。如果消息丢失,那么客户端的ck.server.Call()将返回false(更准确地说,Call()等待回复消息的时间间隔,如果在该时间内没有收到回复,则返回false)。您将面临的问题之一是Clerk可能需要多次发送RPC直到成功。然而,每次调用Clerk.Put()或Clerk.Append()都应该只执行一次,因此您必须确保重发不会导致服务器执行请求两次。向Clerk添加代码以在未收到回复时重试,并向server.go添加代码以在需要时过滤重复。以下是处理重复检测的指导。
开始我还以为要自己设置时间间隔来处理Call()出现消息丢失的情况。后来才发现Call()是自动处理信息丢失的情况从而返回false,我属实是多虑了。
Hints:
1、您需要唯一标识客户端操作,以确保键值服务器只执行每个操作一次。
每个客户端都有唯一的Clerk。每个操作都有唯一的id。在Server维护一个操作请求处理情况的映射即可。
是不是Server端又要添加一个处理Client端请求操作成功从而返回消息的handler?可要可不要。
2、您需要仔细考虑服务器必须维护的状态以处理重复的Get()、Put()和Append()请求,如果有的话。
对这个hint的理解很重要。所谓重复处理,就是Server会再次调用资源去处理已经处理好的operation。
我们发现Get()这个operation就是从Server中获取一个对应的Value即可。无论是否重复处理都只要一步操作,所以在这里进行过滤Get()重复与否其实结果都是一致的。但是,如果Client需要通过中转才会访问到Server,我们倒是可以在中转处设置一个cahce来减少对Server的重复访问。
Put()的重复请求过滤与否也无所谓。毕竟它也只有一步操作哈哈哈。
重点是Append()。如果是第一次处理Append(),那Server需要获得对应Key的oldValue,并且修改给定Key对应的Value。但是对于重复的Append(),我们只需要返回oldValue即可,不用重复修改Key对应的Value了。
3、您的重复检测方案应该快速释放服务器内存,例如每个RPC隐含着客户端已经看到了其上一个RPC的回复 。可以假设一个客户端一次只会调用一个Clerk。
上述要求的意思是,在设计重复检测机制时,服务器需要高效地管理和释放内存,避免长时间保留无用的记录。具体来说,每个RPC请求隐含地表示客户端已经成功处理了之前的RPC请求的回复。因此,服务器可以安全地删除那些已经被客户端确认接收的RPC请求记录。
这意味着服务器只需要保留每个客户端最近处理的RPC请求记录,不需要保存所有的历史记录。
一言蔽之,part02要完成的工作就两点:
- Server维护每个operation的完成情况opComplete
- Server维护每个operation得到的处理结构opReply
- Server收到Client发来的operation已完成的消息后,删除这个operation的完成情况和处理结果,即删除opComplete和opReply对应的表项。
最后贴一张Lab通过的截图~
具体代码
这里贴份代码吧,具体代码可以去github看。
server.go
package kvsrv
import (
"log"
"sync"
)
const Debug = true
func DPrintf(format string, a ...interface{}) (n int, err error) {
if Debug {
log.Printf(format, a...)
}
return
}
type KVServer struct {
mu sync.Mutex
// Your definitions here.
kvStorage map[string]string //存储键值对映射
opReply map[int64]string //维护每个请求应该收到的回复
opComplete map[int64]int //维护每个请求是否完成
}
func (kv *KVServer) TaskComplete_Handeler(args *TaskCompleteArgs, reply *TaskCompleteReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
delete(kv.opComplete, args.Identifier) //处理完这个请求后直接删除记录
delete(kv.opReply, args.Identifier) //及时删除记录
}
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.
kv.mu.Lock()
defer kv.mu.Unlock()
//只有未完成的操作才可以被执行
reply.Value = kv.kvStorage[args.Key]
}
func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) { //Put方法不需要返回值
// Your code here.
kv.mu.Lock()
defer kv.mu.Unlock()
if kv.opComplete[args.Identifier] == 0 {
kv.opComplete[args.Identifier] = 1
kv.kvStorage[args.Key] = args.Value
}
}
func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
kv.mu.Lock()
defer kv.mu.Unlock()
if kv.opComplete[args.Identifier] == 0 {
kv.opComplete[args.Identifier] = 1
oldValue := kv.kvStorage[args.Key]
kv.kvStorage[args.Key] = oldValue + args.Value
reply.Value = oldValue
kv.opReply[args.Identifier] = oldValue //保存这个operation处理的结果
} else {
reply.Value = kv.opReply[args.Identifier]
}
}
func StartKVServer() *KVServer {
kv := new(KVServer)
// You may need initialization code here.
kv.kvStorage = make(map[string]string)
kv.opReply = make(map[int64]string)
kv.opComplete = make(map[int64]int)
return kv
}
client.go
package kvsrv
import (
"crypto/rand"
"math/big"
"6.5840/labrpc"
)
type Clerk struct {
server *labrpc.ClientEnd
// You will have to modify this struct.
identifier []int64 //每个请求的标识符集合
}
func nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
func MakeClerk(server *labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.server = server
// You'll have to add code here.
ck.identifier = make([]int64, 0)
return ck
}
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
// you can send an RPC with code like this:
// ok := ck.server.Call("KVServer.Get", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) Get(key string) string {
// You will have to modify this function.
id := nrand()
args := GetArgs{
Key: key,
Identifier: id,
}
reply := GetReply{}
ok := ck.server.Call("KVServer.Get", &args, &reply)
for ok == false {
// fmt.Println("Get操作失败,自动重试")
ok = ck.server.Call("KVServer.Get", &args, &reply)
}
argsTaskComplete := TaskCompleteArgs{
Identifier: id,
}
replyTaskComplete := TaskCompleteReply{}
ok = ck.server.Call("KVServer.TaskComplete_Handeler", &argsTaskComplete, &replyTaskComplete)
for ok == false {
// fmt.Println("Server没有收到Get操作完成的通知")
ok = ck.server.Call("KVServer.TaskComplete_Handeler", &argsTaskComplete, &replyTaskComplete)
}
return reply.Value
}
// shared by Put and Append.
//
// you can send an RPC with code like this:
// ok := ck.server.Call("KVServer."+op, &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) PutAppend(key string, value string, op string) string {
// You will have to modify this function.
id := nrand()
args := PutAppendArgs{
Key: key,
Value: value,
Identifier: id,
}
reply := PutAppendReply{}
ret := ""
ok := ck.server.Call("KVServer."+op, &args, &reply)
for ok == false {
// fmt.Printf("%v操作失败\n", op)
ok = ck.server.Call("KVServer."+op, &args, &reply)
}
ret = reply.Value
argsTaskComplete := TaskCompleteArgs{
Identifier: id,
}
replyTaskComplete := TaskCompleteReply{}
ok = ck.server.Call("KVServer.TaskComplete_Handeler", &argsTaskComplete, &replyTaskComplete)
for ok == false {
// fmt.Println("Server没有收到PutAppend操作完成的通知")
ok = ck.server.Call("KVServer.TaskComplete_Handeler", &argsTaskComplete, &replyTaskComplete)
}
return ret
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
// Append value to key's value and return that value
func (ck *Clerk) Append(key string, value string) string {
return ck.PutAppend(key, value, "Append")
}