MIT6.5840-2023-Lab1: MapReduce

前置知识

MapReduce:Master 将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker。
Map阶段:被分配了 map 任务的 worker 程序读取相关的输入数据片段,生成并输出中间 k/v 对,并缓存在内存中。
Reduce阶段:所有 map 任务结束,reduce 程序使用 RPC 从 map worker 所在主机的磁盘上读取缓存数据,通过对 key 进行排序后使得具有相同 key 值的数据聚合在一起,reduce 进行操作后输出为文件。
image.png

实验内容

实现一个分布式 MapReduce,由两个程序(coordinator 和 worker)组成。只有一个 coordinator 和一个或多个并行执行的 worker 。在真实系统中, worker 会运行在多台不同的机器上,但在本 lab 中将在一台机器上运行所有 worker 。 worker 将通过 RPC 与 coordinator 通话。每个 worker 进程都会向 coordinator 请求 task,从一个或多个文件中读取 task 输入,执行 task,并将 task 输出写入一个或多个文件。 coordinator 应该注意到,如果某个 worker 在合理的时间内(本 lab 使用 10 秒)没有完成任务,就会将相同的 task 交给另一个 worker。
rpc举例:https://pdos.csail.mit.edu/6.824/notes/kv.go
lab内容:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
Impl:mr/coordinator.go、mr/worker.go、mr/rpc.go
总结一下
对于 Coordiantor:

  • map 任务初始化;
  • rpc handler:回应worker分配任务请求、回应worker任务完成通知;
  • 自身状态控制,处理 map/reduce 阶段,还是已经全部完成;
  • 任务超时重新分配;

对于 Worker:

  • 给 coordinator 发送 rpc 请求分配任务;
  • 给 coordinator 发送 rpc 通知任务完成;
  • 自身状态控制,准确来说是 coordinator 不需要 worker 工作时,通知 worker 结束运行;

具体code见:https://github.com/BeGifted/MIT6.5840-2023

实验环境

OS:WSL-Ubuntu-18.04
golang:go1.17.6 linux/amd64

概要设计

所设计的Coordinator、Task、rpc消息格式:

type WorkerArgs struct {
	WorkerId    int
	WorkerState int // init\done\fail
	Task        *Task
}

type WorkerReply struct {
	WorkerId    int
	WorkerState int // init\done\fail
	Task        *Task
}

type Coordinator struct {
	// Your definitions here.
	MapTaskChan    chan *Task
	ReduceTaskChan chan *Task
	NumReduce      int // reduce num
	NumMap         int // map num
	NumDoneReduce  int // reduce done num
	NumDoneMap     int // map done num
	State          int // map\reduce\done
	mu             sync.Mutex
	Timeout        time.Duration
	MapTasks       map[int]*Task
	ReduceTasks    map[int]*Task
}

type Task struct {
	TaskId    int
	TaskType  int // map\reduce
	TaskState int // int\run\done
	NReduce   int // nReduce
	StartTime time.Time

	Input []string
}

const (
	StateMap    = 0
	StateReduce = 1
	StateDone   = 2
)

const (
	TaskStateInit = 0
	TaskStateRun  = 1
	TaskStateDone = 2
)

const (
	TaskTypeMap    = 0
	TaskTypeReduce = 1
)

const (
	WorkerStateInit = 0
	WorkerStateDone = 1
	WorkerStateFail = 2
)

(TODO)WorkerState 出现 fail 的原因主要是在文件无法打开或读取上,如果是在处理 map 任务时出现 fail,那只有可能是原文件丢失了;如果是 reduce 任务时出现 fail,表示中间文件丢失,需要运行某个特定的 map 任务重新生成,然后再重新开始该 reduce 任务。当然,不实现这个也不会影响 test。

主要流程

创建 Coordinator

创建 Coordinator 并且初始化,将需要处理的数据片段放入 MapTaskChan 信道。这里将单个文件视作一个数据片段进行处理,也就是说有 len(files) 个 map 任务。

func MakeCoordinator(files []string, nReduce int) *Coordinator {
    c := Coordinator{}

    // Your code here.
    c.NumMap = len(files)
    c.NumReduce = nReduce
    c.MapTaskChan = make(chan *Task, len(files))
    c.ReduceTaskChan = make(chan *Task, nReduce)
    c.MapTasks = make(map[int]*Task)
    c.ReduceTasks = make(map[int]*Task)
    c.NumDoneMap = 0
    c.NumDoneReduce = 0
    c.State = StateMap
    c.Timeout = time.Duration(time.Second * 10)
    for i, file := range files {
        input := []string{file}
        task := Task{
            TaskId:    i,
            TaskType:  TaskTypeMap,
            TaskState: TaskStateInit,
            Input:     input,
            NReduce:   nReduce,
            StartTime: time.Now(),
        }
        c.MapTaskChan <- &task
        c.MapTasks[i] = &task
    }

    c.server()
    return &c
}

运行 Worker

Worker 主要处理两类任务:map 和 reduce。这两类任务通过 rpc 与 Coordinator 通信获取。
map 任务处理:

if task.TaskType == TaskTypeMap {
    filename := task.Input[0]
    intermediate := []KeyValue{}
    file, err := os.Open(filename)
    if err != nil {
        log.Fatalf("cannot open %v", filename)
        continue
    }
    content, err := ioutil.ReadAll(file)
    if err != nil {
        log.Fatalf("cannot read %v", filename)
        continue
    }
    file.Close()
    // log.Println("mapf")
    // log.Println(task.TaskId)
    kva := mapf(filename, string(content))
    intermediate = append(intermediate, kva...)

    // sort.Sort(ByKey(intermediate))

    ReduceSplit := make(map[int][]KeyValue)
    for _, kv := range intermediate {
        ReduceSplit[ihash(kv.Key)%task.NReduce] = append(ReduceSplit[ihash(kv.Key)%task.NReduce], kv)
    }

    for i := 0; i < task.NReduce; i++ {
        oname := fmt.Sprintf("mr-%d-%d.tmp", task.TaskId, i)
        ofile, _ := os.Create(oname)
        enc := json.NewEncoder(ofile)
        for _, kv := range ReduceSplit[i] {
            err := enc.Encode(&kv)
            if err != nil {
                log.Fatalf("cannot encode %v", kv)
                break
            }
        }
        ofile.Close()
    }

    // Task Done
    args.Task = task
    TaskDone(&args)
}

reduce 任务处理:

if task.TaskType == TaskTypeReduce {
    var kva ByKey
    for _, filename := range task.Input {
        file, err := os.Open(filename)
        if err != nil {
            log.Fatalf("cannot open %v", filename)
            file.Close()
            continue
        }

        dec := json.NewDecoder(file)
        for {
            var kv KeyValue
            if err := dec.Decode(&kv); err != nil {
                break
            }
            kva = append(kva, kv)
        }
        file.Close()
    }

    sort.Sort(kva)

    i := 0
    oname := fmt.Sprintf("mr-out-%d", task.TaskId)
    ofile, _ := os.Create(oname)
    for i < len(kva) {
        j := i + 1
        for j < len(kva) && kva[j].Key == kva[i].Key {
            j++
        }
        values := []string{}
        for k := i; k < j; k++ {
            values = append(values, kva[k].Value)
        }
        output := reducef(kva[i].Key, values)

        fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)
        i = j
    }

    // Task Done
    args.Task = task
    TaskDone(&args)
}

WorkerHandler

给 worker 分配 map/reduce 任务,取决于阶段任务是否全部完成,当阶段任务全部完成,coordinator 的状态也需要更新。这个过程全局加锁。
处理 map 阶段:

if c.State == StateMap {
    select {
    case reply.Task = <-c.MapTaskChan:
        reply.Task.StartTime = time.Now()
        reply.Task.TaskState = TaskStateRun
    default:
        for _, mapTask := range c.MapTasks {
            if mapTask.TaskState == TaskStateRun && time.Since(mapTask.StartTime) > c.Timeout {
                mapTask.StartTime = time.Now()
                reply.Task = mapTask
                return nil
            }
        }
    }
}

处理 reduce 阶段:

if c.State == StateReduce {
    select {
    case reply.Task = <-c.ReduceTaskChan:
        reply.Task.StartTime = time.Now()
        reply.Task.TaskState = TaskStateRun
    default:
        for _, reduceTask := range c.ReduceTasks {
            if reduceTask.TaskState == TaskStateRun && time.Since(reduceTask.StartTime) > c.Timeout {
                reduceTask.StartTime = time.Now()
                reply.Task = reduceTask
                return nil
            }
        }
    }
}

需要注意的是,除了这两个阶段外还有 StateDone 阶段,即 reduce 任务都执行完毕了,coordinator 还没完全回收,此时 worker 还在请求分配任务,这时候就应该通知 worker 停止。

DoneHandler

coordinator 处理任务完成的通知。全程加锁。在这里更新 task/coordinator 状态。

func (c *Coordinator) DoneHandler(args *WorkerArgs, reply *WorkerReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	task := args.Task
	if task.TaskType == TaskTypeMap {
		if task.TaskState == TaskStateRun {
			task.TaskState = TaskStateDone
			c.MapTasks[task.TaskId].TaskState = TaskStateDone
			c.NumDoneMap++
		}
	} else if task.TaskType == TaskTypeReduce {
		if task.TaskState == TaskStateRun {
			task.TaskState = TaskStateDone
			c.ReduceTasks[task.TaskId].TaskState = TaskStateDone
			c.NumDoneReduce++
		}
	}

	if c.State == StateMap {
		if c.NumDoneMap == c.NumMap {
			c.State = StateReduce
			for i := 0; i < c.NumReduce; i++ {
				input := []string{}
				for j := 0; j < c.NumMap; j++ {
					input = append(input, fmt.Sprintf("mr-%d-%d.tmp", j, i))
				}
				task := Task{
					TaskId:    i,
					TaskType:  TaskTypeReduce,
					TaskState: TaskStateInit,
					NReduce:   c.NumReduce,
					StartTime: time.Now(),
					Input:     input,
				}
				c.ReduceTaskChan <- &task
				c.ReduceTasks[i] = &task
			}
		}
	} else if c.State == StateReduce {
		if c.NumDoneReduce == c.NumReduce {
			c.State = StateDone
		}
	}
	return nil
}

实验结果

bash test-mr-many.sh 10

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/224534.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux socket编程(12):Unix套接字之socketpair、sendmsg和recvmsg详解

在上一篇文章Unix套接字编程及通信例子中&#xff0c;我们对Unix套接字编程有一个基本的了解。但在Unix套接字编程的领域中&#xff0c;有一组特殊而强大的工具&#xff1a;socketpair、sendmsg 和 recvmsg&#xff0c;它们为实现本地进程间通信提供了便捷的方式。 文章目录 1 …

python二维数组创建赋值问题:更改单个值却更改了所有项的值

test_list [] dic1 {} test_list [dic1 for _ in range(3)] ll [1, 2, 3]for i in range(3):test_list[i][value] ll[i]print(test_list)运行结果&#xff1a;每次赋值都更改了所有项 原因&#xff1a;python的二位数据创建方式就是这样&#xff0c;官方文档中有描述Wha…

大话数据结构-查找-线性索引查找

注&#xff1a;本文同步发布于稀土掘金。 4 线性索引查找 4.1 概述 索引就是把一个关键字与它对应的记录相关联的过程&#xff0c;一个索引由若干个索引项构成&#xff0c;每个索引项至少应包含关键字和其对应的记录在存储器中的位置等信息。 索引按照结构可分为线性索引、树…

【SpringBoot】在SpringBoot中配置序列化的Redis

文章目录 前言展示包结构在SpringBoot中配置Redis测试总结 前言 在使用Java操作Redis时&#xff0c;如果不对Redis进行序列化操作&#xff0c;可能会导致存储的key和value与原来的数据不一致的问题 本文也借此机会来详细讲解一下SpringBoot中配置序列化Redis的步骤 展示包结构 …

AI助力智慧农业,基于YOLOv7【tiny/yolov7/yolov7x】开发构建不同参数量级农田场景下庄稼作物、杂草智能检测识别系统

智慧农业随着数字化信息化浪潮的演变有了新的定义&#xff0c;在前面的系列博文中&#xff0c;我们从一些现实世界里面的所见所想所感进行了很多对应的实践&#xff0c;感兴趣的话可以自行移步阅读即可&#xff1a; 《自建数据集&#xff0c;基于YOLOv7开发构建农田场景下杂草…

绘图 Seaborn 10个示例

绘图 Seaborn 是什么安装使用显示中文及负号散点图箱线图小提琴图堆叠柱状图分面绘图分类散点图热力图成对关系图线图直方图 是什么 Seaborn 是一个Python数据可视化库&#xff0c;它基于Matplotlib。Seaborn提供了高级的绘图接口&#xff0c;可以用来绘制各种统计图形&#xf…

nodejs+vue+微信小程序+python+PHP新闻发布系统的设计与实现-计算机毕业设计推荐

根据现实需要&#xff0c;此系统我们设计出一下功能&#xff0c;主要有以下功能模板。 &#xff08;1&#xff09;新闻发布系统前台&#xff1a;首页、时事新闻、公告资讯、个人中心。 &#xff08;2&#xff09;管理员功能&#xff1a;首页、个人中心、用户管理、新闻分类管理…

文本编辑软件:Ulysses mac介绍说明

Ulysses mac是面向 Mac、iPhone 和 iPad 的一站式写作环境。Ulysses 提供令人愉悦、专注的写作体验&#xff0c;加上高效文稿管理、无缝同步以及灵活导出。markdown 可以直接对于文本进行不同类型的分类、编辑&#xff0c;比如标题、注解、评论之类的内容。 Ulysses让注意力专…

rpm安装gitlab

1.rpm包下载 https://mirrors.tuna.tsinghua.edu.cn/gitlab-ce/yum/el7/ 2.进行安装 rpm -ivh gitlab-ce-15.9.7-ce.0.el7.x86_64.rpm --nodeps --force 3.配置访问地址 vim /etc/gitlab/gitlab.rb 4.重新加载配置以及重启服务 gitlab-ctl reconfiguregitlab-ctl resta…

Ubuntur编译ROS报错:error PCL requires C++14 or above

ubuntu20.04 编译ROS包 报错&#xff1a; error&#xff1a; PCL requires C14 or above&#xff1a; 修改Cmakelists.txt文件&#xff1a; set&#xff08;CMAKE_CXX_STANDARD 14&#xff09; 再次编译成功.

2023 IoTDB 用户大会成功举办,深入洞察工业互联网数据价值

2023 年 12 月 3 日&#xff0c;中国通信学会作为指导单位&#xff0c;Apache IoTDB Community、清华大学软件学院、中国通信学会开源技术委员会联合主办&#xff0c;“科创中国”开源产业科技服务团和天谋科技&#xff08;北京&#xff09;有限公司承办的 2023 IoTDB 用户大会…

AI 绘画 | Stable Diffusion 动漫人物真人化

前言 如何让一张动漫人物变成真实系列人物?Stable Diffusion WebUI五步即可实现。快来使用AI绘画打开异世界的大门吧!!! 动漫真人化 首先在图生图里上传一张二次元动漫人物图片,然后选择一个真实系人物画风的大模型,最后点击DeepBooru 反推,自动填充提示词,调整重绘…

【MySQL】:数据库基本认识

数据库基础 一.什么是数据库1.mysql是什么2.为什么要有数据库3.服务器&#xff0c;数据库&#xff0c;表关系4.Mysql架构5.SQL语句分类 二.存储引擎 一.什么是数据库 1.mysql是什么 1.mysql是数据库服务的客户端。 2.mysqld是数据库服务的服务器端。 3.mysql本质&#xff1a;基…

【Python】logging模块函数详解和示例

在Python中&#xff0c;LOGGER通常是指一个用于记录日志的模块或对象。它可以帮助你在程序中跟踪和记录事件&#xff0c;以便于调试、错误跟踪和日志分析。Python的标准库中包含了一个名为logging的模块&#xff0c;它提供了一个灵活且功能强大的日志记录系统。本文对相应的函数…

unity 2d 入门 飞翔小鸟 下坠功能且碰到地面要停止 刚体 胶囊碰撞器 (四)

1、实现对象要受重力 在对应的图层添加刚体 改成持续 2、设置胶囊碰撞器并设置水平方向 3、地面添加盒状碰撞器 运行则能看到小鸟下坠并落到地面上

【南京站-EI会议征稿中】第三届网络安全、人工智能与数字经济国际学术会议(CSAIDE 2024)

第三届网络安全、人工智能与数字经济国际学术会议&#xff08;CSAIDE 2024&#xff09; 2024 3rd International Conference on Cyber Security, Artificial Intelligence and Digital Economy 第三届网络安全、人工智能与数字经济国际学术会议&#xff08;CSAIDE 2024&…

基于高德API实现网络geoJSON功能(突出省份)

代码实现&#xff1a; <script>// 3、初始化一个高德图层const gaode new ol.layer.Tile({title: "高德地图",source: new ol.source.XYZ({url: http://wprd0{1-4}.is.autonavi.com/appmaptile?langzh_cn&size1&style7&x{x}&y{y}&z{z},w…

SpringBoot 启动加载器解析

计时器介绍 启动加载器实战 实现方式1 实现CommandLineRunner接口重写run方法通过Order进行排序 示例: Component Order(1) public class FirstCommandlineRunner implements CommandLineRunner {Overridepublic void run(String... args) throws Exception {System.out.pr…

Windows server 部署iSCSI共享磁盘搭建故障转移群集

在域环境下&#xff0c;在域控制器中配置iSCSI服务&#xff0c;配置共享网络磁盘&#xff0c;在节点服务器使用共享磁盘&#xff0c;并在节点服务器中搭建故障转移群集&#xff0c;实现故障转移 环境准备 准备3台服务器&#xff0c;配置都是8g2核&#xff0c;50g硬盘&#xf…

前端实现检索文本高亮实现

文章目录 一、前言二、实现三、最后 一、前言 使用搜索引擎时的搜索结果高亮&#xff0c;搜索文本在查询出来的结果内高亮显示&#xff0c;这种在全文检索应该很常见 二、实现 看了下百度检索的实现&#xff0c;是给内容加上了em标签&#xff0c;然后给em标签设置颜色&#x…