6.584-Lab1:MapReduce

前置知识/概念

Raft

是一个基于“Leader”的协议,能够保证分布式网路的一致性。

RPC(Remote Producer Call)

参考链接1
参考链接2

Golang中regexp正则表达式的用法

https://gukaifeng.cn/posts/golang-zheng-ze-biao-da-shi-regexp-de-ji-ben-yong-fa/index.html

Golang中自定义类型Sort

在提供的排序方法sort.Ints、sort.Floats、sort.Strings的底层都分别实现了三个函数Len()Less()Swap(),所以在我们实现自定义类型排序的时候要实现上述三个函数。

实现

参考链接

概览

在这里插入图片描述
如图,Lab1要实现两个部分分别是Map(映射)和Reduce(规约),Master负责调度,分配任务在代码中是Coordinator,而Worker则负责具体的map task和reduce task。
Map Task具体是统计文件中的单词生成对应的键值对[key, value],通过一个哈希函数Ihash将单词映射为key,存在中间文件,例如File1中有单词a、b对应键值对为[a, 1],[b, 1],分别存放在mr-out-1-ihash(a/b)%NReduce的中间文件中,其中NReduce是Reduce Task的个数,代码中为10。
Reduce Task具体是将中间文件mr-out-*-taskid中的键值对放入最终的文件mr-out-taskid中。

代码

rpc.go

Coordinator和Worker通过RPC进行通信,在文件rpc.go中需要设计相应的数据结构让其进行通信。分析可能的具体行为:Worker需要向Coordinator申请任务、Coordinator需要向Worker分配任务(map task & reduce task)、Worker向Coordinator回复任务的执行情况(成功、失败)、没有闲置任务分配时Coordinator告诉Worker等待(Wait)、所有任务完成告诉Worker结束(Shutdown)。
根据上述分析设计如下数据类型:

// 用不同数字表示不同信息的类别
type MsgType int

const (
	AskForTask    MsgType = iota // 表示worker向coordinator申请任务
	MapSucceed                   // 表示worker向coordinator传递Map Task完成
	MapFailed                    // 表示worker向coordinator传递Map Task失败
	ReduceSucceed                // 表示worker向coordinator传递Reduce Task完成
	ReduceFailed                 // 表示worker向coordinator传递Reduce Task失败
	MapAlloc                     // 表示coordinator向worker分配Map Task
	ReduceAlloc                  // 表示coordinator向worker分配Reduce Task
	Wait                         // 表示coordinator让worker休眠
	Shutdown                     // 表示coordinator让worker终止
)

从Worker视角出发,设计发送给Coordinator的信息结构体MsgSend,需要包含任务id以及任务执行情况:

type MsgSend struct {
	MsgType MsgType
	TaskId  int
}

从Coordinator视角出发,设计发送给Worker的信息的结构体MsgSend,需要包含任务的id即要处理的第几个文件用于中间文件的命名、任务类型、要处理的filename、NRduce:

type MsgReply struct {
	MsgType  MsgType
	NReduce  int
	TaskId   int    // 当worker发送MsgSend申请任务、coordinator回复任务的ID
	TaskName string //
}

worker.go

上面分析可知Worker的行为有:申请任务、执行任务、汇报任务执行情况。

// worker的任务就是不断请求任务、执行任务、报告执行状态
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.
	for {
		// 不断请求
		replyMsg := CallForTask()
		switch replyMsg.MsgType {
		case MapAlloc: // coordinator分配了map task
			err := HandleMapTask(replyMsg, mapf)
			if err != nil { // Map Task任务完成
				_ = CallForReportStatus(MapFailed, replyMsg.TaskId)
			} else { // Map Task 任务失败
				_ = CallForReportStatus(MapSucceed, replyMsg.TaskId)
			}
		case ReduceAlloc:
			err := HandleReduceTask(replyMsg, reducef)
			if err != nil { // Map Task任务完成
				_ = CallForReportStatus(ReduceFailed, replyMsg.TaskId)
			} else { // Map Task 任务失败
				_ = CallForReportStatus(ReduceSucceed, replyMsg.TaskId)
			}
		case Wait:
			time.Sleep(time.Second * 10)
		case Shutdown:
			os.Exit(0)
		}
		time.Sleep(time.Second)
	}
}

其中申请任务、回复任务执行情况的函数均利用了rpc中的Call来实现向Coordinator进行通信。
在这里插入图片描述
在这里插入图片描述

其中执行Map Task 的执行函数HandleMapTask

func HandleMapTask(reply *MessageReply, mapf func(string, string) []KeyValue) error {
	file, err := os.Open(reply.TaskName)
	if err != nil {
		return err
	}
	defer file.Close()

	content, err := io.ReadAll(file)
	if err != nil {
		return err
	}

	kva := mapf(reply.TaskName, string(content))
	sort.Sort(ByKey(kva))

	tempFiles := make([]*os.File, reply.NReduce)
	encoders := make([]*json.Encoder, reply.NReduce)

	for _, kv := range kva {
		redId := ihash(kv.Key) % reply.NReduce
		if encoders[redId] == nil {
			tempFile, err := ioutil.TempFile("", fmt.Sprintf("mr-map-tmp-%d", redId))
			if err != nil {
				return err
			}
			defer tempFile.Close()
			tempFiles[redId] = tempFile
			encoders[redId] = json.NewEncoder(tempFile)
		}
		err := encoders[redId].Encode(&kv)
		if err != nil {
			return err
		}
	}

	for i, file := range tempFiles {
		if file != nil {
			fileName := file.Name()
			file.Close()
			newName := fmt.Sprintf("mr-out-%d-%d", reply.TaskID, i)
			if err := os.Rename(fileName, newName); err != nil {
				return err
			}
		}
	}
	return nil
}

在这里插入图片描述
由函数TemFile的描述可知,该函数在创建时会按照pattern+随机字符串作为临时文件名字,即是不同程序同时调用该函数会创建不同的临时文件所以不存在资源竞争是并发安全的。
先将Map Task映射的键值对存入临时文件中,等全部放入临时文件后再将临时文件重命名为需要的中间文件的名字,因为重命名操作时原子性的。
如果不采用临时文件直接存入目标中间文件的话,会出现存入中间文件之前中间文件中含有其他数据即脏数据,可能是上个Worker执行到一半因为某些原因而退出之前存入的,所以存入之前需要将中间文件清空一下,这样就比较浪费时间。

执行Reduce Task 的执行函数HandleReduceTask也是同样的问题,虽然从中间件读取的时候没有写入操作但写入最终文件时也同样需要像上面一样保证原子性,这里贴出没有使用临时文件的代码:

// 处理分配的 Reduce 任务,处理每个MapTask产生的mr-out-*-key_id
func HandleReduceTask(reply *MsgReply, reducef func(string, []string) string) error {
	key_id := reply.TaskId
	// todo:这里的key_id要 % NReduce吗 --答:传入的TaskId一定是小于NReduce的,规约的数量就是NRduce也即Reduce Task的数量
	files, err := ReadSpecificFile(key_id, "./")
	if err != nil {
		return err
	}
	// 从所有匹配的文件中读出Json格式的键值对[k1-value]/[k2-value],key哈希的值可以不一样但这里ihash(k1) % NReduce  = ihash(k2) % NReduce
	k_vs := map[string][]string{}
	for _, file := range files {
		dec := json.NewDecoder(file)
		for { // 循环读JSON数据流
			var kv KeyValue // 将读出的JSON数据解码放入kv
			if err := dec.Decode(&kv); err != nil {
				break
			}
			k_vs[kv.Key] = append(k_vs[kv.Key], kv.Value)
		}
		file.Close()
	}

	keys := []string{} // 将map中的keys拿出排序,按照keys的字典序依次写入文件
	for k, _ := range k_vs {
		keys = append(keys, k)
	}
	sort.Strings(keys)
	oname := "mr-out-" + strconv.Itoa(reply.TaskId) // 将Reduce后的结果放入 mr-out-TaskId
	ofile, err := os.Create(oname)
	if err != nil {
		return err
	}
	defer ofile.Close()
	for _, key := range keys {
		output := reducef(key, k_vs[key])
		_, err := fmt.Fprintf(ofile, "%s %s\n", key, output) // 格式化写入文件
		if err != nil {
			return err
		}
	}

	CleanFileByReduceId(reply.TaskId, "./")
	return nil
}

利用临时文件保证原子性参考上面HandleMapTask函数的实现。其中需要注意的是最终写入文件的时候Key要按照字典升序排列,而map存储的Key不是有序的,所以把Key拿出来排序再放入文件。

coordinator.go

coordinator中要实现worker申请任务的函数AskForTask以及对worker报告任务执行情况后对相应任务状态的更新函数NoticeResult
任务的状态有闲置idle、成功finished、失败failed、超时、正在运行running。每次worker申请任务时都轮询一下所有任务,委派闲置、失败、超时的任务,而超时状态的判断则通过为每个任务打上运行开始的时间戳,若轮询到running的任务时判断当前时间与开始的时间戳比较若大于10s则可以判定为超时,可以再次委派给worker。

TaskInfo结构
type TaskStatue int

// 定义类别
const (
	idle     TaskStatue = iota // 闲置
	finished                   // 完成
	running                    // 运行
	failed                     // 失败
)

type MapTaskInfo struct {
	statue    TaskStatue // 任务状态
	TaskId    int        // 任务编号
	startTime int64      // 分配时间,当前时间-分配时间>10s表示超时
}
type ReduceTaskInfo struct {
	statue TaskStatue
	//TaskId reduce task的编号用数组下标表示
	startTime int64
}
type Coordinator struct {
	NReduce     int // reduce tasks可用的数量
	MapTasks    map[string]*MapTaskInfo
	mu          sync.Mutex // 互斥锁
	ReduceTasks []*ReduceTaskInfo
}

初始化函数:

// 初始化函数
func (c *Coordinator) Init(files []string) {
	for idx, filename := range files {
		c.MapTasks[filename] = &MapTaskInfo{
			statue: idle, // 初始为闲置
			TaskId: idx,
		}
	}
	for idx := 0; idx < c.NReduce; idx++ {
		c.ReduceTasks[idx] = &ReduceTaskInfo{
			statue: idle,
		}
	}
}

// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{
		NReduce:     nReduce,
		MapTasks:    make(map[string]*MapTaskInfo),
		ReduceTasks: make([]*ReduceTaskInfo, nReduce),
	}
	c.Init(files)

	c.server()
	return &c
}
rpc相应函数AskForTask的实现

AskForTask:实现相对复杂,大致流程为:
1.每个任务初始化为闲置。
2.每当有worker申请任务的话就轮询所有任务。
3.若存在有idle、failed、超时的任务就可以分配。
4.若没有可分配的任务,判断完成任务的个数是否等于所有任务个数:
4.1 若相等,则表明所有任务完成,告知workershutdown
4.2 若不相等,则表明有任务还在进行且没有可分配的任务,告知workerWait

期间应保证共享资源的互斥,每个worker请求任务的同时要上锁。

func (c *Coordinator) AskForTask(req *MsgSend, reply *MsgReply) error {
	if req.MsgType != AskForTask { // 传入的不是“申请任务”类型的信息
		return NoMathMsgType
	}

	// 加锁,保证每个worker申请任务时互斥
	c.mu.Lock()
	defer c.mu.Unlock()
	// 选择一个失败or闲置or超时的任务分配给worker
	MapSuccessNum := 0 // Map task 完成个数
	for filename, maptaskinfo := range c.MapTasks {
		alloc := false
		if maptaskinfo.statue == idle || maptaskinfo.statue == failed { // 该任务闲置或失败则可以分配
			alloc = true
		} else if maptaskinfo.statue == running { // 判断该任务是否超时,若超时则再分配
			if time.Now().Unix()-maptaskinfo.startTime > 10 {
				maptaskinfo.startTime = time.Now().Unix() // 再分配更新开始时间
				alloc = true
			}
		} else { // 该任务是已完成任务
			MapSuccessNum++
		}

		// 当前任务可以分配
		if alloc {
			reply.TaskId = maptaskinfo.TaskId
			reply.TaskName = filename
			reply.NReduce = c.NReduce
			reply.MsgType = MapAlloc

			maptaskinfo.statue = running
			maptaskinfo.startTime = time.Now().Unix()
			return nil
		}
	}

	// 没有任务可以分配但所有任务没有完成
	if MapSuccessNum < len(c.MapTasks) {
		reply.MsgType = Wait
		return nil
	}

	// 运行到这里表明所有的Map任务都已经完成
	ReduceSuccessNum := 0
	for idex, reducetaskinfo := range c.ReduceTasks {
		alloc := false
		if reducetaskinfo.statue == idle || reducetaskinfo.statue == failed {
			alloc = true
		} else if reducetaskinfo.statue == running {
			if time.Now().Unix()-reducetaskinfo.startTime > 10 {
				reducetaskinfo.startTime = time.Now().Unix()
				alloc = true
			}
		} else {
			ReduceSuccessNum++
		}

		if alloc {
			reply.TaskId = idex
			reply.NReduce = c.NReduce
			reply.MsgType = ReduceAlloc

			reducetaskinfo.statue = running
			reducetaskinfo.startTime = time.Now().Unix()
			return nil
		}
	}
	if ReduceSuccessNum < len(c.ReduceTasks) {
		reply.MsgType = Wait
		return nil
	}

	// 运行到这里表明所有的任务都已完成
	reply.MsgType = Shutdown

	return nil
}
rpc相应函数NoticeResult的实现

只需要将worker传递过来的任务完成状态更新到coordinator的TaskInfo即可。

func (c *Coordinator) NoticeResult(req *MsgSend, reply *MsgReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if req.MsgType == MapSucceed {
		for _, taskinfo := range c.MapTasks {
			if taskinfo.TaskId == req.TaskId {
				taskinfo.statue = finished
			}
		}
	} else if req.MsgType == ReduceSucceed {
		c.ReduceTasks[req.TaskId].statue = finished
	} else if req.MsgType == MapFailed {
		for _, taskinfo := range c.MapTasks {
			if taskinfo.TaskId == req.TaskId {
				taskinfo.statue = failed
			}
		}
	} else if req.MsgType == ReduceFailed {
		c.ReduceTasks[req.TaskId].statue = failed
	}
	return nil
}
所有任务是否完成-Done函数

只需要轮询一遍任务数组,判断是否所有任务均完成。

// if the entire job has finished.
func (c *Coordinator) Done() bool {

	// Your code here.
	// 遍历所有任务,全部完成则返回true,否则返回false
	for _, taskinfo := range c.MapTasks {
		if taskinfo.statue != finished {
			return false
		}
	}

	for _, taskinfo := range c.ReduceTasks {
		if taskinfo.statue != finished {
			return false
		}
	}
	return true
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/914957.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker在微服务架构中的最佳实践

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 Docker在微服务架构中的最佳实践 Docker在微服务架构中的最佳实践 Docker在微服务架构中的最佳实践 引言 Docker 概述 定义与原理…

大数据新视界 -- 大数据大厂之 Impala 性能优化:基于数据特征的存储格式选择(上)(19/30)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…

【C++】用红黑树封装set和map

在C标准库中&#xff0c;set容器和map容器的底层都是红黑树&#xff0c;它们的各种接口都是基于红黑树来实现的&#xff0c;我们在这篇文章中已经模拟实现了红黑树 ->【C】红黑树&#xff0c;接下来我们在此红黑树的基础上来看看如何封装set和map。 一、共用一颗红黑树 我…

Leetcode3345. 最小可整除数位乘积 I

Every day a Leetcode 题目来源&#xff1a;3345. 最小可整除数位乘积 I 解法1&#xff1a;枚举 至多循环 10 次&#xff0c;一定会遇到个位数为 0 的数字&#xff0c;数位乘积是 0&#xff0c;一定是 t 的倍数。 所以暴力枚举即可。 代码&#xff1a; /** lc appleetcod…

通过scrapy和Django登录、爬取和持久化数据

使用 Scrapy 和 Django 实现登录、爬取和持久化数据的完整流程&#xff0c;可以通过以下步骤完成&#xff1a; 创建 Django 项目和数据库模型&#xff1a;定义一个存储爬取数据的数据库模型。创建 Scrapy 项目&#xff1a;实现登录并抓取目标页面的数据。整合 Scrapy 和 Djang…

SpringMVC全面复习

Javaweb SpringMVC Spring MVC是Spring框架的一个模块&#xff0c;专门用于构建Web应用程序的模型-视图-控制器&#xff08;MVC&#xff09;架构。它通过清晰的分离关注点&#xff0c;简化了Web应用各部分的开发。Spring MVC提供了强大的绑定机制&#xff0c;能够将请求参数绑定…

【再谈设计模式】抽象工厂模式~对象创建的统筹者

一、引言 在软件开发的世界里&#xff0c;高效、灵活且易于维护的代码结构是每个开发者追求的目标。设计模式就像是建筑蓝图中的经典方案&#xff0c;为我们提供了应对各种常见问题的有效策略。其中&#xff0c;抽象工厂模式在对象创建方面扮演着重要的角色&#xff0c;它如同一…

【Linux】ELF可执行程序和动态库加载

&#x1f525; 个人主页&#xff1a;大耳朵土土垚 &#x1f525; 所属专栏&#xff1a;Linux系统编程 这里将会不定期更新有关Linux的内容&#xff0c;欢迎大家点赞&#xff0c;收藏&#xff0c;评论&#x1f973;&#x1f973;&#x1f389;&#x1f389;&#x1f389; 文章目…

SpringBootCloud 服务注册中心Nacos对服务进行管理

介绍 Nacos&#xff08;Naming and Configuration Service&#xff09;是一个开源的、动态的服务发现、配置管理和服务管理平台&#xff0c;特别适用于云原生应用和微服务架构。它可以作为服务注册中心&#xff0c;用于微服务的注册、发现、配置管理等。在微服务架构中&#x…

八款局域网监控软件优选|2024最新排行榜(企业老板收藏篇)

在当今数字化办公的时代&#xff0c;企业和组织对于局域网电脑监控的需求日益增长。无论是为了保障信息安全、提高员工工作效率&#xff0c;还是为了规范网络行为&#xff0c;一款优秀的局域网电脑监控软件都能发挥重要作用。市面上的监控软件种类繁多&#xff0c;功能各异&…

限价订单簿中的高频交易

数量技术宅团队在CSDN学院推出了量化投资系列课程 欢迎有兴趣系统学习量化投资的同学&#xff0c;点击下方链接报名&#xff1a; 量化投资速成营&#xff08;入门课程&#xff09; Python股票量化投资 Python期货量化投资 Python数字货币量化投资 C语言CTP期货交易系统开…

丹摩征文活动|CogVideoX-2b:从0到1,轻松完成安装与部署!

丹摩征文活动 | CogVideoX-2b&#xff1a;从0到1&#xff0c;轻松完成安装与部署&#xff01; CogVideoX 介绍 CogVideoX的问世&#xff0c;标志着视频制作技术迈入了一个全新的时代。它不仅打破了传统视频制作在效率与质量之间的平衡难题&#xff0c;还通过其先进的3D变分自…

Creo 9.0 中文版软件下载安装教程

[软件名称]&#xff1a;Creo 9.0 [软件语言]&#xff1a;简体中文 [软件大小]&#xff1a;5.2G [安装环境]&#xff1a;Win11/Win10/ [硬件要求]&#xff1a;内存8G及以上 下载方法&#xff1a;电脑打开浏览器&#xff0c;复制下载链接&#xff0c;粘贴至浏览器网址栏&…

RT-DETR融合CVPR[2024]无膨胀多尺度卷积PKI模块及相关改进思路

RT-DETR使用教程&#xff1a; RT-DETR使用教程 RT-DETR改进汇总贴&#xff1a;RT-DETR更新汇总贴 《Poly Kernel Inception Network for Remote Sensing Detection》 一、 模块介绍 论文链接&#xff1a;https://arxiv.org/abs/2403.06258 代码链接&#xff1a;https://github…

ubuntu-desktop-24.04上手指南(更新阿里源、安装ssh、安装chrome、设置固定IP、安装搜狗输入法)

ubuntu-desktop-24.04上手指南(更新阿里源、安装ssh、安装chrome、设置固定IP、安装搜狗输入法) 一、更新并安装基础软件 #切换root用户 sudo su -#更新 apt update #升级 apt upgrade#install vim apt install vim#install net-tools apt install net-tools二、安装ssh并设置…

[CKS] K8S ServiceAccount Set Up

最近准备花一周的时间准备CKS考试&#xff0c;在准备考试中发现有一个题目关于Rolebinding的题目。 ​ 专栏其他文章: [CKS] Create/Read/Mount a Secret in K8S-CSDN博客[CKS] Audit Log Policy-CSDN博客 -[CKS] 利用falco进行容器日志捕捉和安全监控-CSDN博客[CKS] K8S Netwo…

介绍和安装及数据类型

1、介绍和安装 1.1、简介 ClickHouse是俄罗斯的Yandex于2016年开源的列式存储数据库&#xff08;DBMS&#xff09;&#xff0c;使用C语言编写&#xff0c;主要用于在线分析处理查询&#xff08;OLAP&#xff09;&#xff0c;能够使用SQL查询实时生成分析数据报告。 OLAP&…

算法魅力-二分查找实战

目录 前言 算法定义 朴素二分模版 二分查找 二分的边界查找 在排序数组中查找元素的第一个和最后一个位置&#xff08;medium&#xff09; 暴力算法 二分查找 边界查找分析 山峰数组的峰顶 暴力枚举 二分查找 搜索旋转排序数组中的最小值&#xff08;medium&#xf…

Linux第四讲:Git gdb

Linux第四讲&#xff1a;Git && gdb 1.版本控制器Git1.1理解版本控制1.2理解协作开发1.3Git的历史1.4Git的操作1.4.1仓库创建解释、仓库克隆操作1.4.2本地文件操作三板斧1.4.3文件推送详细问题 2.调试器 -- gdb/cgdb使用2.1调试的本质是什么2.2watch命令2.3set var命令…

海底捞点单

单点锅底推荐&#xff1a; 番茄锅底通31 牛油麻辣通44 清汤麻辣备44 菌汤锅底通31 小吃&主食&#xff1a; 捞派捞面一黄金小馒头一茴香小油条 红糖枇杷一小酥肉 DIY锅底推荐&#xff1a; 1.寿喜锅&#xff1a;海鲜味酱4勺陈醋1勺蚝油2勺盐适量白糖7勺 芹菜1勺 2.麻辣锅底…