1 Shuffle机制
对于排序而言分为两个阶段,MapTask后和ReduceTask前。
2 MapTask工作机制
MapTask并行度由切片个数决定;切片个数由切片大小(切片大小取决于块大小、maxsize(Long的最大值)和minsize(默认为1))以及数据读取方式决定。
(1)Read阶段:
job的提交流程:待读写的源数据由客户端进行切片划分,划分完成之后提交(切片信息、jar包、xml配置文件)给yarn,yarn开启MrAppMaster。
MrAppMaster启动后正式开启MapTask,由InputFormat读取数据(默认使用TextInputFormat)调用RecorderReader的reader()读取数据,数据格式:(k,v)=(偏移量,数据的一行内容)。
读取之后将数据返回给Mapper,进入Map阶段。
(2)Map阶段:
主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
(3)Collect收集阶段:
在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。
在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
缓冲区内部的数据全部都是按照分区的方式进行存储,且一侧存数据,一侧存索引,当数据达到80%时进行反向溢写。溢写之前需要对分区中的数据进行排序。
(4)Spill溢写阶段:
当环形缓冲区满后,产生大量的溢写文件,MapReduce会将数据写到本地磁盘上,生成一个临时文件。
将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
(5)Merge阶段:
当所有数据处理完成后(即溢写完成后),MapTask对所有临时文件(溢写文件)进行一次归并排序,以确保最终只会生成一个数据文件。
MapTask后的阶段:
MapTask把处理结果暂时放到环形缓冲区,当环形缓冲区的使用率达到一定阈值(80%)时,对其进行一次快速排序,然后将有序数据写到磁盘上。
当数据处理完后,磁盘上的所有文件再进行一次快速排序。
3 ReduceTask工作机制
前提:MapTask将数据处理完毕且持久化在磁盘上,等待ReduceTask端拉取数据。
(1)Copy阶段:
ReduceTask从各个MapTask上远程拷贝一片数据(即拉取指定分区的数据),并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Sort阶段:
对拉取的文件进行归并排序。
在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
(3)Reduce阶段:
对于相同的key的数据进入到同一个reduce()处理函数,将计算**结果通过OutputFormat(输出)**写到HDFS上。
ReduceTask前的阶段:
ReduceTask从每个MapTask上拉取数据存储在内存上,如果文件太大则溢出写道磁盘。
如果磁盘上的文件数目达到一定阈值时,则进行一次归并排序,生成更大的文件。
如果内存中文件大小或者数目达到一定阈值时,也进行一次合并排序后写到磁盘上。
当所有数据拉取完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
ReduceTask并行度决定机制
ReduceTask并行度一般通过实验得出,实验结果的总时间是服从正态分布的,具体多少个ReduceTask需要根据集群性能而定。
(1)ReduceTask=0,表示没有Reduce阶段,输出文件个数=Map个数。
(2)ReduceTask默认值=1,即输出文件为1个。
(3)当数据分布不均时,可能在Reduce阶段产生数据倾斜。
(4)ReduceTask个数需要根据业务逻辑需求设定。如计算全局汇总只能设置1个ReduceTask。
(5)当分区数不等于1,ReduceTask=1时不执行分区过程。原因:在MapTask源码中,执行分区前需要判断ReduceTask是否大于1,只有大于1时才执行分区。