MapReduce的原理分析

1.概述

MapReduce的思想核心是“分而治之,先分再合”,适用于大量复杂任务处理场景(大规模数据处理场景)。
MapReduce分两个阶段:

  • map阶段(分):如果任何可以拆分并且没有依赖,那么就把复杂的任务拆分成小任务,拆分成小任务之后,可以并行计算,提高处理效率。
  • reduce阶段(合):把map阶段的各个局部结果进行全局汇总,得到最终的结果

生活中的MapReduce案例:统计图书馆的书籍总数

“Map”:你数1号书架,我数2号书架。我们人越多,数书就更快。

“Reduce”:我们到一起,把所有人的统计数加在一起。

2.MapReduce架构

和HDFS一样,MapReduce也是采用Master/Slave的架构,其架构图如下所示 :
在这里插入图片描述

  • 客户端(Client)

    每一个job都会在用户通过Client类将应用程序以及配置参数 Configuration 打包成 JAR 文件存储在HDFS,并把路径提交到 JobTracker 的 master 服务,然后由 master 创建每一个 Task (即MapTask和 ReduceTask) 将他们分发到各个 TaskTracker 服务中去执行。

  • JobTracker

    JobTracker 负责资源监控和作业调度。JobTracker 监控所有TaskTracker 与 job 的健康状况,一旦发现失败,就将相应的任务转移到其他节点;同时,JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。

  • TaskTracker

    TaskTracker 会周期性地通过Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker 使用"slot"等量划分本节点上的资源量。"slot"代表计算资源(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop 调度器的作用就是将各个TaskTracker 上的空闲slot分配给Task 使用。slot分为Map slot 和Reduce slot 两种,分别供Map Task 和Reduce Task 使用。TaskTracker 通过slot 数目(可配置参数)限定Task 的并发度。

  • Task

    Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动。HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意的是,split 的多少决定了Map Task 的数目,因为每个split 只会交给一个Map Task 处理。Split 和 Block的关系如下图所示 :
    在这里插入图片描述

3.MapReduce运行流程

一个mapreduce作业的执行流程是:作业提交->作业初始化->任务分配->任务执行->更新任务执行进度和状态->作业完成。
在这里插入图片描述

  • 1、提交作业

    • JobClient使用runjob方法创建一个JobClient实例,调用submitJob()方法进行作业的提交 。
  • 2、作业初始化

    • 当JobTracker收到Job提交的请求后,将Job保存在一个内部队列,并让Job Scheduler(作业调度器)处理并初始化。
    • 初始化涉及到创建一个封装了其tasks的job对象,并保持对task的状态和进度的跟踪(步骤5)。
    • 当创建要运行的一系列task对象后,Job Scheduler首先开始从文件系统中获取由JobClient计算的input splits(步骤6),然后再为每个split创建map task。
  • 3、任务分配

    • TaskTracker 和 JobTracker之间的通信和任务分配是通过心跳机制完成的。
    • TaskTracker作为一个单独的 JVM,它执行一个简单的循环,主要实现每隔一段时间向JobTracker发送心跳,告诉JobTracker此TaskTracker是否存活,是否准备执行新的任务。如果有待分配的任务,它就会为TaskTracker分配一个任务。
  • 4、任务的执行

    • TaskTracker申请到新的任务之后,就要在本地运行了。首先将任务本地化(包括运行任务所需的数据、配置信息、代码等),即从HDFS复制到本地,调用localizeJob()完成的。
    • 对于使用Streaming和Pipes创建Map或者Reduce程序的任务,Java会把key/value传递给外部进程,然后通过用户自定义的Map或者Reduce进行处理,然后把key/value传回到java中。其中就好像是TaskTracker的子进程在处理Map和Reduce代码一样。
  • 5、更新任务的执行进度和状态

    • 任务的进度和状态是通过heartbeat(心跳机制)来更新和维护的。
    • 对于Map Task,进度就是已处理数据和所有输入数据的比例。
    • 对于Reduce Task,情况就有点复杂,包括3部分,拷贝中间结果文件、排序、reduce调用,每部分占1/3。
  • 6、作业完成

    • 当 Job 完成后,JobTracker会收一个Job Complete的通知,并将当前的Job状态更新为successful。同时JobClient也会轮询获知提交的Job已经完成,将信息显示给用户。最后,JobTracker会清理和回收该Job的相关资源,并通知TaskTracker进行相同的操作(比如删除中间结果文件)

4.MapReduce处理过程分析

以单词计数为例,分析MR的整个计算过程
在这里插入图片描述

Q1 在MapReduce任务中,需要启动多少个MapTask任务去处理数据?

例如:/wordcount/input 1.txt 200M 2.txt 100M,FileInputFormat会进行逻辑切片。逐个遍历待处理目录针对待处理目录下的文件以及切片大小对文件形成规划。

split size=block size=128M 1个切片对应一个maptask

split-1 1.txt --> 0- 128M

split-2 1.txt --> 128- 200M

split-3 2.txt --> 0- 100M

Q2:数据何时写入缓冲区?

context.write(k2,v2)往外写数据,数据写入到磁盘里(disk),此时要考虑性能问题。如果读取一行数据,就调用map方法,就写入一次;如果有很多行(假设100行),程序就会执行很多次(100次),对应很多次的IO操作(100次)。此时就需要内存缓冲区(3个byte数组)称为环形缓冲区。默认缓冲区的大小为100M,当内存缓冲区的大小为80M时,就开始向磁盘写入数据;内存写到磁盘称为spill(溢出/溢写)。假设文件大小不足128M,至少会发生一次溢出,且在发生溢出行为时,会发生排序(字典序),溢出的次数=文件个数。

4.1 Mapper任务执行过程详解

  • 1、输入目录下的文件按照一定的标准逐个逻辑切片,形成规划;默认:split size = block size(默认128M);每一个切片由一个MapTask处理。
  • 2、默认读取数据组件TextInputFormat对切片中的数据(每一行文本内容)按照一定规则解析成一个<k,v>对。k:光标起始偏移量,v:此行内容。
  • 3、每一行内容解析出的每一个<k,v>对调用一次map方法,每次调用就会输出零个或多个键值对。(多少行调用多少次map方法)数据在缓冲。
  • 4、按照一定的规则对阶段3的键值对进行分区,默认只有一个区。当 reducetask=2 时,就会涉及数据分区;决定map输出的kv去到哪一个reducetask。分区的数量=reducetask的个数(正常情况) 异常情况:分区个数多 直接报错 非法分区 分区个数少 产生空文件。
  • 5、对每个分区中的键值对进行排序(溢写的同时–k的字典序);对溢出的临时文件合并为最终局部结果文件(分区且排序的文件)。
  • 6、数据局部聚合处理(combiner处理),键值相等的键值对会调用一次reduce方法。(本阶段默认是没有的)。

4.2 Reducer任务执行过程详解

  • 1、Reducer任务会主动从Mapper任务复制其输出的键值对。(开启线程(fetcher)主动拉取Map阶段输出的键值对)
  • 2、 1.**合并(merge)**复制到Reducer本地数据;2.排序(sort):对合并后的数据进行排序;3.分组(grouping) 对合并后的数进行分组(k,Iterable([1,1,1]))。
  • 3、键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,将键值对写入到HDFS中。

5.MapTask工作机制

Map阶段流程大体如下图所示 :
在这里插入图片描述

1).input File通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理。

2).数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(默认使用hash分区)。

3).写入buffer,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘。

4).当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。

详细步骤:

1. 读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。

2. 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。key:每行首字符偏移值,value:此行文本内容

3.读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader每读取一行内容就调用一次map方法。

4. map执行完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。

MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认为:key.hash % reduce task数量。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job.setPartitionerClass上。

5.将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。

注:环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。

缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。

6.当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序

如果job设置过Combiner(局部聚合),那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。

那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。

7. 每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

6.Shuffle机制

map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫shuffle

shuffle: 洗牌、发牌——(核心机制:数据分区,排序,合并)。

在这里插入图片描述

shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段,描述了两个阶段之间数据处理的特性。

一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle

shuffle过程:

Map阶段:

  • Collect阶段:将MapTask的结果输出到默认大小为100M的环形缓冲区,保存的是key/value,Partition分区信息等。
  • Spill阶段:当内存中的数据量达到一定的阀值(spill percent=0.8)的时候,就会将数据写入本地磁盘; 数据写入磁盘之前需要对数据进行一次排序的操作; 如果配置了combiner(默认是没有的),maptask的结果进行局部聚合。
  • Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。

Reduce阶段:

  • Copy阶段 :ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据 数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
  • Merge阶段 :在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
  • Sort阶段 :在对数据进行合并的同时,会进行排序操作;MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。

shuffle是Mapreduce的诟病所在;原因是在shuffle过程中涉及到了大量的内存到磁盘 磁盘到内存 内存再到磁盘的过程,执行效率大大降低。

当涉及多个mr程序之间的串联执行的时候,shuffle的弊端 就会被无限放大,故mr程序不是合做迭代计算。

Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

7.ReduceTask工作机制

Reduce阶段流程大体如下图所示 :
在这里插入图片描述
Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。

详细步骤:

1.Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。

2. Merge阶段。 这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。

merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用

当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。

3.把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。

4. 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/460204.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【云原生-kubernetes系列】--kubernetes日志收集

1、ELK架构 1.1、部署ES集群 https://mirrors.tuna.tsinghua.edu.cn/elasticstack/apt/7.x/pool/main/e/elasticsearch/ 1、下载软件包 rootes-server1:~# wget https://mirrors.tuna.tsinghua.edu.cn/elasticstack/apt/7.x/pool/main/e/elasticsearch/elasticsearch-7.12.0-…

QMI8658芯片I2C驱动开发指南

这个芯片纯国产挺好用的&#xff0c;电路很好设计&#xff0c;我这垃圾焊功&#xff0c;纯手焊&#xff0c;&#xff0c;居然能用。 第一部分 硬件连接 画的很简陋&#xff0c;看看就可以了&#xff0c;这里I2C总线需要接10K上拉没有画出来&#xff0c;这个需要注意一下。 …

【XR806开发板试用】基于WEBSOCKET实现人机交互(控制开关灯)以及开发问题记录

一、开发板编译、功能介绍 根据官方文档编译烧录成功后&#xff0c;我们修改下官方例子&#xff0c;进行开发来实现websocket。 整体流程&#xff1a;开发板先自动寻找指定的wifi并且连接&#xff0c;连接成功后&#xff0c;通过websocket来与服务端连接&#xff0c;连接成功后…

idea项目mapper.xml中的SQL语句黄色下划线去除

问题描述 当我们使用idea开发java项目时&#xff0c;经常会与数据库打交道&#xff0c;一般在使用mybatis的时候需要写一大堆的mapper.xml以及SQL语句&#xff0c;每当写完SQL语句的时候总是有黄色下划线&#xff0c;看着很不舒服。 解决方案&#xff1a; 修改idea的配置 Edi…

实验01 ASP.NET网站的建立及运行

【实验目的】 &#xff08;1&#xff09;能熟悉ASP.NET的开发环境Visual Studio Community 2019&#xff08;VSC 2019&#xff09;。 &#xff08;2&#xff09;能通过解决方案管理网站&#xff0c;会在解决方案中创建网站。 &#xff08;3&#xff09;会设置IIS 10中的网站…

Node.js(1)

跨平台的node.js运行环境&#xff0c;使开发者可以搭建服务器端的js应用程序 它可以编写服务器端程序&#xff1b; 编写数据接口&#xff1b;提供网页资源浏览功能 前端工程化&#xff1a;开发集成的所有工具和技术 与浏览器环境的区别 node.js环境中没有DOM和BOM fs模块-读…

Linux下安装多个nodejs并映射Jenkins

背景 需要Jenkins中切换多个Node&#xff0c;比如nodejs16和nodesjs18,所以在宿主机按照好这两个版本&#xff0c;然后再映射到Jenkins容器中 步骤 1.下载地址 https://nodejs.org/dist/ 放到 cd /opt/soft/2.解压 tar -xzvf node-v16.20.0-linux-x64.tar.gz tar -xzvf n…

SSM SpringBoot vue智能手机参数分析平台

SSM SpringBoot vue智能手机参数分析平台 系统功能 首页 图片轮播 新闻资讯 手机信息 手机百科 登录注册 个人中心 后台管理 登录注册 个人中心 手机百科管理 用户管理 手机对比管理 配置管理 新闻资讯管理 手机信息管理 对比信息管理 我的收藏管理 开发环境和技术 开发语言…

Kafka配置SASL_PLAINTEXT权限。常用操作命令,创建用户,topic授权

查看已经创建的topic ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list 创建topic 创建分区和副本数为1的topic ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic acltest --partitions 1 --replication-factor 1 创建kafka用户 …

迷宫寻路[天梯赛 -- 栈]

文章目录 题目描述思路AC代码 题目描述 输入样例 8 8 0 0 1 0 0 0 1 0 0 0 1 0 0 0 1 0 0 0 0 0 1 1 0 0 0 1 1 1 0 0 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 1 0 0 0 1 1 1 0 1 1 0 1 0 0 0 0 0 0 0 4 4 0 0 1 0 0 0 0 0 0 0 1 1 0 1 0 0 -1 -1输出样例 1,1 2,1 3,1 4,1 5,1 5,2 5…

修复ElementUI中el-select与el-option无法通过v-model实现数据双向绑定的问题

1. 问题描述 需求&#xff1a;在使用ElementUI时&#xff0c;通过el-select和el-option标签实现下拉列表功能&#xff0c;当el-option中的选项被选中时&#xff0c;被选中的选项可以正确回显到已选择的列表中。 对于上面的下拉列表&#xff0c;当我们选中“超级管理员”的选项…

Tomcat的使用

1. Tomcat 1.1 Tomcat 是什么 Tomcat 就是基于 Java 实现的一个开源免费, 也是被广泛使用的 HTTP 服务器 1.2 下载安装 Tomcat官网选择其中的 zip 压缩包, 下载后解压缩即可&#xff0c;解压缩的目录最好不要带 “中文” 或者 特殊符号 进入 webapps 目录,每个文件夹都对应…

vue3项目随笔1

1,Eslint Prettier 报错情况&#xff1a; 解决办法&#xff1a; &#xff08;1&#xff09;下载Prettier - code formatter &#xff08;2&#xff09;配置setting.json文件 文件 -> 首选项 -> 设置 -> 用户 -> Eslint "editor.defaultFormatter":…

【Hadoop】Hadoop概述与核心组件

目录 Hadoop概述Hadoop 发展历史Hadoop 三大发行版本1.Apache Hadoop&#xff08;常用&#xff09;2.Cloudera Hadoop3.Hortonworks Hadoop优势优势总结——4高&#xff08;高可靠、高扩展、高效、高容错&#xff09; Hadoop组成1.HDFS管理者&#xff1a;NameNode&#xff08;n…

【计算机网络_传输层】UDP和TCP协议

文章目录 1. 重新理解端口号端口号划分netstat指令pidof 2. UDP协议2.1 UDP协议端格式2.2 UDP的特点2.3 UDP的注意事项2.4 基于UDP的应用层协议 3. TCP协议&#xff08;传输控制协议&#xff09;3.1 TCP协议的格式和报头字段3.2 如何解包和分用3.3 理解TCP协议报头3.4 TCP协议的…

解决electron打包vue-element-admin项目页面无法跳转的问题

解决electron打包vue-element-admin项目页面无法跳转的问题 说明之前通过这个教程已经打包成功&#xff0c;但是发现进行账号密码登录后页面无法跳转的问题。现在已经解决&#xff0c;所以记录一下。 1、检查路由模式是否为hash模式&#xff0c;如果不是改成hash模式。 new Ro…

【DL经典回顾】激活函数大汇总(十五)(LogSoftmax附代码和详细公式)

激活函数大汇总&#xff08;十五&#xff09;&#xff08;LogSoftmax附代码和详细公式&#xff09; 更多激活函数见激活函数大汇总列表 一、引言 欢迎来到我们深入探索神经网络核心组成部分——激活函数的系列博客。在人工智能的世界里&#xff0c;激活函数扮演着不可或缺的…

【矩阵】54. 螺旋矩阵【中等】

螺旋矩阵 给你一个 m 行 n 列的矩阵 matrix &#xff0c;请按照 顺时针螺旋顺序 &#xff0c;返回矩阵中的所有元素。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,2,3],[4,5,6],[7,8,9]] 输出&#xff1a;[1,2,3,6,9,8,7,4,5] 解题思路 1、模拟顺时针螺旋顺序遍历矩阵…

R语言语法基础(说人话版)

在Rstudio中使用ctrl回车来执行某一行的代码 在R语言中&#xff0c;通常不需要像C语言一样在每条语句的结尾添加分号来表示语句结束。R语言是一种脚本语言&#xff0c;它使用换行符来分隔语句&#xff0c;因此分号通常是可选的&#xff0c;除非你想在同一行上写多个语句。在R中…

Selenium 自动化 —— 使用WebDriverManager自动下载驱动

上一篇文章 入门和 Hello World 实例 中&#xff0c;我们提供了一个最简单的 Selenium 上手的例子。 但是某一天&#xff0c;突然发现相同的代码居然运行报错了。这是怎么回事呢&#xff1f; 日志排查 日志中其实提示的很明显了&#xff1a;Chrome浏览器和Chrome WebDriver的…