Hadoop八股
- Hadoop
- MapReduce考点
- MR on Yarn 分布式工作原理
- shuffle:MapTask 和 ReduceTask的处理流程
- MR中的MapTask 和 ReduceTask 的数量决定
- MR和Spark两者Shuffle的区别
- 简单讲一下map- reduce 原理
- **MapReduce 的核心概念**
- **MapReduce 的工作流程**
- **MapReduce 的优势**
- **MapReduce 的示例**
- 任务:统计文本中每个单词的出现次数
- **MapReduce 的实现**
- 1. MapReduce 介绍
- 1.1 MapReduce 设计构思
- 2. MapReduce 编程规范
- 4. WordCount示例编写
- **1. 环境准备**
- **2. Java 代码实现**
- **WordCount.java**
- **3. 编译和运行**
- **步骤 1:编译代码**
- **步骤 2:打包为 JAR**
- **步骤 3:准备输入数据**
- **步骤 4:运行 MapReduce 作业**
- **步骤 5:查看结果**
- **4. 输出结果**
- **5. 代码说明**
- 6. MapReduce的运行机制详解
- 6.1 MapTask 工作机制
- 6.2 ReduceTask 工作机制
- 6.3 Shuffle 过程
- HDFS
- HDFS读写流程
- HDFS副本存放策略
- 1. HDFS概述
- 2. HDFS架构
- 3. HDFS的特性
- 4. HDFS 的命令行使用
- 5. hdfs的高级使用命令
- 5.2 HDFS 的安全模式
- 7. HDFS 文件写入过程(非常重要)
- 7.2 机架感知(副本节点选择)
- 8.HDFS 文件读取过程(非常重要)
- 9. NameNode 工作机制以及元数据管理(重要)
- 9.1 namenode 与 datanode 启动
- 9.2 FSImage与edits详解
- 9.3 FSimage文件当中的文件信息查看
- 9.4 edits当中的文件信息查看
- 9.5 secondarynameNode如何辅助管理FSImage与Edits文件
- 9.6 namenode元数据信息多目录配置
- 9.7 namenode故障恢复
- 10. datanode工作机制以及数据存储
- YARN
- YARN的3种调度
- 1. yarn的架构和原理
- 1.1 yarn的基本介绍和产生背景
- 1.2 hadoop 1.0 和 hadoop 2.0 的区别
- 1.3 yarn 集群的架构和工作原理
- 1.4 yarn 的任务提交流程
- 2. RM和NM的功能介绍
- 2.1 resourceManager基本介绍
- 2.2 nodeManager功能介绍
- 3. yarn的applicationMaster介绍
- 3.1 applicationMaster的职能
- 3.2 报告活跃
- 3.3 资源需求
- 3.4 调度任务
- 3.5 启动container
- 3.6 完成的container
- 3.7 AM的失败和恢复
- 3.8 applicationMaster启动过程
- 4. yarn的资源调度
- 1.资源调度器的职能
- 2.资源调度器的分类
- 3.基本架构
- 4.1 资源调度三种模型介绍
- 4.1.1 层级队列管理机制FIFO调度策略
- 4.1.2 Capacity Scheduler
- 4.1.3 Fair Scheduler
Hadoop
1.hadoop基本组成
2.hdfs数据上传流程
3.mapreduce怎么工作的
4.mapreduce shuffle机制
5.yarn是什么?
MapReduce考点
MR on Yarn 分布式工作原理
1.客户端启动,向RM(资源管理器)请求运行APP,RM返回job id 和 HDFS 资源提交路径
2.客户端向HDFS提交资源(任务切片job.split,程序jar包job.jar,程序参数job.xml)
3.客户端向RM申请创建1个容器启动MRAppMaster,MRAppMaster完成作业初始化,MRAppMaster从HDFS中获取 输入的切片,决定MapTask 和 ReduceTask个数。
4.MRAppMaster向RM请求创建n个容器
5.MRAppMaster指派任务启动 yranchild 运行 MapTask 和 ReduceTask.
6.Task运行结束后,释放容器资源,MRAppMaster销毁。
shuffle:MapTask 和 ReduceTask的处理流程
MapTask:
1.YarnChild 的main方法,调runtask() 方法调Maptask对象。Maptask对象调run方法,调mapper对象的run方法。
YarnChild (runtask() 方法) -》 Maptask对象(run方法) -》 mapper对象(run方法)
2.map方法产生kv数据,接着调用Partition分区,再将数据输出到缓冲区。分区数据在元数据中。
3.-缓冲区到80%是会溢出到磁盘,溢出前对key的索引进行 快速排序 链接: 快速排序_912. 排序数组(10中排序算法) ,该过程边向缓冲区写入、边溢出磁盘。
4.多次溢出的小文件,通过Merger 归并排序 链接: 快速排序_912. 排序数组(10中排序算法) 合并成一个文件,最终输出的文件部分是分区 且有序的
ReduceTask:
1.YarnChild 的main方法,调runtask() 方法调Reducetask 对象。
2.Fetcher的fetch方法下载对应分区数据,到容器磁盘。
3.多个数据文件经过Meger归并排序 链接: 快速排序_912. 排序数组(10中排序算法) ,合成一个文件
4.reduce()方法循环通一个key中的value用迭代器去聚合,GroupingComparator分组比较器去判断相邻两个key是否属于同一个组。
5.write() 通过OutputFormat 向外写出数据,具体实现是 拿到LineRecordWrite()直接写出到文件系统。
MR中的MapTask 和 ReduceTask 的数量决定
影响map个数的因素有:
文件大小
文件个数
splitSize的大小: 分片是按照splitSize的大小进行分割,一个splitSize大小默认=hdfs block的大小。
影响reduce个数的因素,取决于三个参数:
mapred.reduce.task: 指定reduce的任务数量。
hive.exec.reduce.bytes.per.reduce: 每个reduce任务处理的数据量。
hive.exec.reducers.max: 每个任务最大的reduce数,默认999
计算reduce数公式:N = min(最大reduce数,总数据量 / 每个reduce处理的数据量)
只有一个reduce的场景:
没有group by的汇总
Order by (不进行分区 和 使用 order by 进行全局排序只有一个 reduce)
笛卡尔积
MR和Spark两者Shuffle的区别
简单讲一下map- reduce 原理
MapReduce
是一种用于处理大规模数据集的编程模型和框架,最初由 Google 提出,用于分布式计算。它的核心思想是将复杂的数据处理任务分解为两个主要阶段:Map 和 Reduce,并通过并行化处理来提高效率。
MapReduce 的核心概念
-
Map 阶段
• 输入数据被分割成多个独立的块(chunks),每个块由Map
函数处理。
•Map
函数对每个输入数据项进行转换,生成一组中间键值对(key-value pairs)。
• 例如:统计单词频率时,Map
函数会将每个单词映射为(word, 1)
。 -
Shuffle 和 Sort 阶段
• 系统会将Map
阶段输出的键值对按照键(key)进行分组和排序。
• 这个过程确保相同键的值被发送到同一个Reduce
任务。 -
Reduce 阶段
•Reduce
函数对每个键的所有值进行聚合或计算,生成最终结果。
• 例如:在单词统计中,Reduce
函数会将相同单词的计数相加,生成(word, total_count)
。
MapReduce 的工作流程
- 输入数据:将大规模数据集分割成多个小块。
- Map:对每个数据块应用
Map
函数,生成中间键值对。 - Shuffle 和 Sort:将中间结果按键分组并排序。
- Reduce:对每个键的值进行聚合,生成最终结果。
- 输出数据:将结果存储或输出。
MapReduce 的优势
- 并行处理:
Map
和Reduce
任务可以并行执行,适合分布式计算。 - 容错性:如果一个节点失败,系统可以将任务重新分配到其他节点。
- 扩展性:可以轻松扩展到数千台机器,处理 PB 级别的数据。
MapReduce 的示例
任务:统计文本中每个单词的出现次数
-
Map 阶段
输入:"hello world hello"
输出:[("hello", 1), ("world", 1), ("hello", 1)]
-
Shuffle 和 Sort
中间结果:[("hello", [1, 1]), ("world", [1])]
-
Reduce 阶段
输出:[("hello", 2), ("world", 1)]
MapReduce 的实现
• Hadoop:最著名的开源 MapReduce 实现。
• Spark:更高效的分布式计算框架,支持类似 MapReduce 的操作。
1. MapReduce 介绍
MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。
Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce负责“合”,即对map阶段的结果进行全局汇总。
MapReduce运行在yarn集群
—1.ResourceManager
—2.NodeManager
这两个阶段合起来正是MapReduce思想的体现。
还有一个比较形象的语言解释MapReduce:
我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。
现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。
1.1 MapReduce 设计构思
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
既然是做计算的框架,那么表现形式就是有个输入(input),MapReduce操作这个输入(input),通过本身定义好的计算模型,得到一个输出(output)。
对许多开发者来说,自己完完全全实现一个并行计算程序难度太大,而MapReduce就是一种简化并行计算的编程模型,降低了开发并行应用的入门门槛。
Hadoop MapReduce构思体现在如下的三个方面:
1.如何对付大数据处理:分而治之
对相互间不具有计算依赖关系的大数据,实现并行最自然的办法就是采取分而治之的策略。并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。不可分拆的计算任务或相互间有依赖关系的数据无法进行并行计算!
2.构建抽象模型:Map和Reduce
MapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型。
Map: 对一组数据元素进行某种重复式的处理;
Reduce: 对Map的中间结果进行某种进一步的结果整理。
MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:
map: (k1; v1) → [(k2; v2)]
reduce: (k2; [v2]) → [(k3; v3)]
Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。通过以上两个编程接口,大家可以看出MapReduce处理的数据类型是<key,value>键值对。
3.MapReduce框架结构
一个完整的mapreduce程序在分布式运行时有三类实例进程:
MR AppMaster:负责整个程序的过程调度及状态协调;
MapTask:负责map阶段的整个数据处理流程;
ReduceTask:负责reduce阶段的整个数据处理流程。
2. MapReduce 编程规范
MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为 2 个步骤,Shuffle 阶段 4 个步骤,Reduce 阶段分为 2 个步骤
1.Map 阶段 2 个步骤:
1.1 设置 InputFormat 类, 将数据切分为 Key-Value**(K1和V1)** 对, 输入到第二步
1.2 自定义 Map 逻辑, 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果
2.Shuffle 阶段 4 个步骤:
2.1 对输出的 Key-Value 对进行分区
2.2 对不同分区的数据按照相同的 Key 排序
2.3 (可选) 对分组过的数据初步规约, 降低数据的网络拷贝
2.4 对数据进行分组, 相同 Key 的 Value 放入一个集合中
3.Reduce 阶段 2 个步骤:
3.1 对多个 Map 任务的结果进行排序以及合并, 编写 Reduce 函数实现自己的逻辑, 对输入的 Key-Value 进行处理, 转为新的 Key-Value(K3和V3)输出
3.2 设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据
4. WordCount示例编写
以下是使用 Java 实现的 WordCount 示例,基于 Hadoop MapReduce 框架。这个程序会统计输入文本中每个单词的出现次数。
1. 环境准备
• 安装 Hadoop(单机或集群模式)。
• 确保 Hadoop 的环境变量已配置(如 HADOOP_HOME
)。
• 使用 Maven 或直接编译 Java 文件。
2. Java 代码实现
WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
// Mapper 类
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
// Reducer 类
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
// 主函数
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3. 编译和运行
步骤 1:编译代码
将代码保存为 WordCount.java
,然后使用以下命令编译:
javac -classpath $(hadoop classpath) -d . WordCount.java
步骤 2:打包为 JAR
将编译后的类文件打包为 JAR:
jar -cvf WordCount.jar -C . .
步骤 3:准备输入数据
创建一个输入文件(如 input.txt
),内容如下:
hello world
hello hadoop
hello mapreduce
将文件上传到 HDFS:
hadoop fs -mkdir /input
hadoop fs -put input.txt /input
步骤 4:运行 MapReduce 作业
使用以下命令运行 WordCount:
hadoop jar WordCount.jar WordCount /input /output
步骤 5:查看结果
查看输出结果:
hadoop fs -cat /output/part-r-00000
4. 输出结果
输出文件 part-r-00000
的内容如下:
hadoop 1
hello 3
mapreduce 1
world 1
5. 代码说明
• Mapper:将每行文本拆分为单词,并输出 (word, 1)
键值对。
• Reducer:对相同单词的计数进行累加,输出 (word, total_count)
。
• Combiner:在 Mapper 和 Reducer 之间进行局部聚合,减少数据传输量。
通过以上步骤,你可以成功运行一个基于 Hadoop MapReduce 的 WordCount 程序!
6. MapReduce的运行机制详解
6.1 MapTask 工作机制
整个Map阶段流程大体如上图所示。
简单概述:inputFile通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(默认使用hash分区),然后写入buffer,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
详细步骤
1.读取数据组件 InputFormat (默认 TextInputFormat) 会通过 getSplits 方法对输入目录中文件进行逻辑切片规划得到 block, 有多少个 block就对应启动多少个 MapTask
2.将输入文件切分为 block 之后, 由 RecordReader 对象 (默认是LineRecordReader) 进行读取, 以 \n 作为分隔符, 读取一行数据, 返回 <key,value>. Key 表示每行首字符偏移值, Value 表示这一行文本内容
3.读取 block 返回 <key,value>, 进入用户自己继承的 Mapper 类中,执行用户重写的 map 函数, RecordReader 读取一行这里调用一次
4.Mapper 逻辑结束之后, 将 Mapper 的每条结果通过 context.write 进行collect数据收集. 在 collect 中, 会先对其进行分区处理,默认使用 HashPartitioner
MapReduce 提供 Partitioner 接口, 它的作用就是根据 Key 或 Value 及 Reducer 的数量来决定当前的这对输出数据最终应该交由哪个 Reduce task 处理, 默认对 Key Hash 后再以 Reducer 数量取模. 默认的取模方式只是为了平均 Reducer 的处理能力, 如果用户自己对 Partitioner 有需求, 可以订制并设置到 Job 上
5.接下来, 会将数据写入内存, 内存中这片区域叫做环形缓冲区, 缓冲区的作用是批量收集 Mapper 结果, 减少磁盘 IO 的影响. 我们的 Key/Value 对以及 Partition 的结果都会被写入缓冲区. 当然, 写入之前,Key 与 Value 值都会被序列化成字节数组
环形缓冲区其实是一个数组, 数组中存放着 Key, Value 的序列化数据和 Key, Value 的元数据信息, 包括 Partition, Key 的起始位置, Value 的起始位置以及 Value 的长度. 环形结构是一个抽象概念。
缓冲区是有大小限制, 默认是 100MB. 当 Mapper 的输出结果很多时, 就可能会撑爆内存, 所以需要在一定条件下将缓冲区中的数据临时写入磁盘, 然后重新利用这块缓冲区. 这个从内存往磁盘写数据的过程被称为 Spill, 中文可译为溢写. 这个溢写是由单独线程来完成, 不影响往缓冲区写 Mapper 结果的线程. 溢写线程启动时不应该阻止 Mapper 的结果输出, 所以整个缓冲区有个溢写的比例 spill.percent. 这个比例默认是 0.8, 也就是当缓冲区的数据已经达到阈值 buffer size * spill percent = 100MB * 0.8 = 80MB, 溢写线程启动, 锁定这 80MB 的内存, 执行溢写过程. Mapper 的输出结果还可以往剩下的 20MB 内存中写, 互不影响
6.当溢写线程启动后, 需要对这 80MB 空间内的 Key 做排序 (Sort). 排序是 MapReduce 模型默认的行为, 这里的排序也是对序列化的字节做的排序
如果 Job 设置过 Combiner, 那么现在就是使用 Combiner 的时候了. 将有相同 Key 的 Key/Value 对的 Value 合并在起来, 减少溢写到磁盘的数据量. Combiner 会优化 MapReduce 的中间结果, 所以它在整个模型中会多次使用 \ 那哪些场景才能使用 Combiner 呢? 从这里分析, Combiner 的输出是 Reducer 的输入, Combiner 绝不能改变最终的计算结果. Combiner 只应该用于那种 Reduce 的输入 Key/Value 与输出 Key/Value 类型完全一致, 且不影响最终结果的场景. 比如累加, 最大值等. Combiner 的使用一定得慎重, 如果用好, 它对 Job 执行效率有帮助, 反之会影响 Reducer 的最终结果
7.合并溢写文件, 每次溢写会在磁盘上生成一个临时文件 (写之前判断是否有 Combiner), 如果 Mapper 的输出结果真的很大, 有多次这样的溢写发生, 磁盘上相应的就会有多个临时文件存在. 当整个数据处理结束之后开始对磁盘中的临时文件进行 Merge 合并, 因为最终的文件只有一个, 写入磁盘, 并且为这个文件提供了一个索引文件, 以记录每个reduce对应数据的偏移量
【mapTask的一些基础设置配置】
配置 默认值 解释
mapreduce.task.io.sort.mb 100 设置环型缓冲区的内存值大小
mapreduce.map.sort.spill.percent 0.8 设置溢写的比例
mapreduce.cluster.local.dir ${hadoop.tmp.dir}/mapred/local 溢写数据目录
mapreduce.task.io.sort.factor 10 设置一次合并多少个溢写文件
6.2 ReduceTask 工作机制
Reduce 大致分为 copy、sort、reduce 三个阶段,重点在前两个阶段。copy 阶段包含一个 eventFetcher 来获取已完成的 map 列表,由 Fetcher 线程去 copy 数据,在此过程中会启动两个 merge 线程,分别为 inMemoryMerger 和 onDiskMerger,分别将内存中的数据 merge 到磁盘和将磁盘中的数据进行 merge。待数据 copy 完成之后,copy 阶段就完成了,开始进行 sort 阶段,sort 阶段主要是执行 finalMerge 操作,纯粹的 sort 阶段,完成之后就是 reduce 阶段,调用用户定义的 reduce 函数进行处理
详细步骤:
1.Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。
2.Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
3.合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
4.对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。
6.3 Shuffle 过程
map 阶段处理的数据如何传递给 reduce 阶段,是 MapReduce 框架中最关键的一个流程,这个流程就叫 shuffle
shuffle: 洗牌、发牌 ——(核心机制:数据分区,排序,分组,规约,合并等过程)
shuffle 是 Mapreduce 的核心,它分布在 Mapreduce 的 map 阶段和 reduce 阶段。一般把从 Map 产生输出开始到 Reduce 取得数据作为输入之前的过程称作 shuffle。
1.Collect阶段:将 MapTask 的结果输出到默认大小为 100M 的环形缓冲区,保存的是 key/value,Partition 分区信息等。
2.Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了 combiner,还会将有相同分区号和 key 的数据进行排序。
3.Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个 MapTask 最终只产生一个中间数据文件。
4.Copy阶段:ReduceTask 启动 Fetcher 线程到已经完成 MapTask 的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
5.Merge阶段:在 ReduceTask 远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
6.Sort阶段:在对数据进行合并的同时,会进行排序操作,由于 MapTask 阶段已经对数据进行了局部的排序,ReduceTask 只需保证 Copy 的数据的最终整体有效性即可。
Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快
缓冲区的大小可以通过参数调整, 参数:mapreduce.task.io.sort.mb 默认100M
HDFS
HDFS读写流程
写
1.客户端向namenode请求上传文件,namenode检查目标文件是否已存在,父目录是否存在
2.namenode响应 可以上传文件
3.客户端请求第一个Block上传到那几个DataNode服务器上
4.namenode返回3个DataNode节点,用于存储数据
5.客户端请求建立block传输通道,请求dn1上传数据,dn1收到请求会调用dn2,然后dn2调用dn3,传输通道建立完成
6.dn1,dn2,dn3 逐级应答客户端
7.客户端开始往dn上传第一个block(先从磁盘读取数据到本地缓存),以packet为单位,dn1收到一个packet回传给dn2,dn2传给dn3;dn1每传一个packet会存入一个应答队列等待应答。
8.当一个block传输完成后,客户端再次请求namenode上传block的服务器,循环往复。
读
1.客户端请求下载文件,namenode查询元数据,找到文件所在的DataNode
2.挑选一台DataNode服务器(就近原则,然后随机),请求读取数据
3.DataNode开始传输数据给客户端(从磁盘里读取数据输入流,以packet为单位做校验)
4.客户端以packet为单位接收,先写在本地缓存,然后写入目标文件。
HDFS副本存放策略
机架感知机机制
第一个副本在 client 所处的节点上。如果客户端在集群外,随机选一个。
第二个副本在另一个机架的随机一个节点。
第三个副本在第二个副本所在机架的随机节点。
1. HDFS概述
在现代的企业环境中,单机容量往往无法存储大量数据,需要跨机器存储。统一管理分布在集群上的文件系统称为分布式文件系统。
HDFS(Hadoop Distributed File System)是 Hadoop 项目的一个子项目。是 Hadoop 的核心组件之一, Hadoop 非常适于存储大型数据 (比如 TB 和 PB),其就是使用 HDFS 作为存储系统. HDFS 使用多台计算机存储文件,并且提供统一的访问接口,像是访问一个普通文件系统一样使用分布式文件系统。
HDFS文件系统
2. HDFS架构
HDFS架构
HDFS是一个主/从(Mater/Slave)体系结构,由三部分组成: NameNode 和 DataNode 以及 SecondaryNamenode:
NameNode 负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。
DataNode 负责管理用户的文件数据块,每一个数据块都可以在多个 DataNode 上存储多个副本,默认为3个。
Secondary NameNode 用来监控 HDFS 状态的辅助后台程序,每隔一段时间获取 HDFS 元数据的快照。最主要作用是辅助 NameNode 管理元数据信息。
3. HDFS的特性
首先,它是一个文件系统,用于存储文件,通过统一的命名空间目录树来定位文件;
其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。
- master/slave 架构(主从架构)
HDFS 采用 master/slave 架构。一般一个 HDFS 集群是有一个 Namenode 和一定数目的 Datanode 组成。Namenode 是 HDFS 集群主节点,Datanode 是 HDFS 集群从节点,两种角色各司其职,共同协调完成分布式的文件存储服务。 - 分块存储
HDFS 中的文件在物理上是分块存储(block)的,块的大小可以通过配置参数来规定,默认大小在 hadoop2.x 版本中是 128M。 - 名字空间(NameSpace)
HDFS 支持传统的层次型文件组织结构。用户或者应用程序可以创建目录,然后将文件保存在这些目录里。文件系统名字空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件。
Namenode 负责维护文件系统的名字空间,任何对文件系统名字空间或属性的修改都将被 Namenode 记录下来。
HDFS 会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data。 - NameNode 元数据管理
我们把目录结构及文件分块位置信息叫做元数据。NameNode 负责维护整个 HDFS 文件系统的目录树结构,以及每一个文件所对应的 block 块信息(block 的 id,及所在的 DataNode 服务器)。 - DataNode 数据存储
文件的各个 block 的具体存储管理由 DataNode 节点承担。每一个 block 都可以在多个 DataNode 上。DataNode 需要定时向 NameNode 汇报自己持有的 block 信息。 存储多个副本(副本数量也可以通过参数设置 dfs.replication,默认是 3) - 副本机制
为了容错,文件的所有 block 都会有副本。每个文件的 block 大小和副本系数都是可配置的。应用程序可以指定某个文件的副本数目。副本系数可以在文件创建的时候指定,也可以在之后改变。 - 一次写入,多次读出
HDFS 是设计成适应一次写入,多次读出的场景,且不支持文件的修改。 正因为如此,HDFS 适合用来做大数据分析的底层存储服务,并不适合用来做网盘等应用,因为修改不方便,延迟大,网络开销大,成本太高。
4. HDFS 的命令行使用
如果没有配置 hadoop 的环境变量,则在 hadoop 的安装目录下的bin目录中执行以下命令,如已配置 hadoop 环境变量,则可在任意目录下执行
help
格式: hdfs dfs -help 操作命令
作用: 查看某一个操作命令的参数信息
ls
格式:hdfs dfs -ls URI
作用:类似于Linux的ls命令,显示文件列表
lsr
格式 : hdfs dfs -lsr URI
作用 : 在整个目录下递归执行ls, 与UNIX中的ls-R类似
mkdir
格式 : hdfs dfs -mkdir [-p] <paths>
作用 : 以<paths>中的URI作为参数,创建目录。使用-p参数可以递归创建目录
put
格式 : hdfs dfs -put <localsrc > ... <dst>
作用 : 将单个的源文件src或者多个源文件srcs从本地文件系统拷贝到目标文件系统中(<dst>对应的路径)。也可以从标准输入中读取输入,写入目标文件系统中
hdfs dfs -put /rooot/bigdata.txt /dir1
moveFromLocal
格式: hdfs dfs -moveFromLocal <localsrc> <dst>
作用: 和put命令类似,但是源文件localsrc拷贝之后自身被删除
hdfs dfs -moveFromLocal /root/bigdata.txt /
copyFromLocal
格式: hdfs dfs -copyFromLocal <localsrc> ... <dst>
作用: 从本地文件系统中拷贝文件到hdfs路径去
appendToFile
格式: hdfs dfs -appendToFile <localsrc> ... <dst>
作用: 追加一个或者多个文件到hdfs指定文件中.也可以从命令行读取输入.
hdfs dfs -appendToFile a.xml b.xml /big.xml
moveToLocal
在 hadoop 2.6.4 版本测试还未未实现此方法
格式:hadoop dfs -moveToLocal [-crc] <src> <dst>
作用:将本地文件剪切到 HDFS
get
格式 hdfs dfs -get [-ignorecrc ] [-crc] <src> <localdst>
作用:将文件拷贝到本地文件系统。 CRC 校验失败的文件通过-ignorecrc选项拷贝。 文件和CRC校验可以通过-CRC选项拷贝
hdfs dfs -get /bigdata.txt /export/servers
getmerge
格式: hdfs dfs -getmerge <src> <localdst>
作用: 合并下载多个文件,比如hdfs的目录 /aaa/下有多个文件:log.1, log.2,log.3,...
copyToLocal
格式: hdfs dfs -copyToLocal <src> ... <localdst>
作用: 从hdfs拷贝到本地
mv
格式 : hdfs dfs -mv URI <dest>
作用: 将hdfs上的文件从原路径移动到目标路径(移动之后文件删除),该命令不能跨文件系统
hdfs dfs -mv /dir1/bigdata.txt /dir2
rm
格式: hdfs dfs -rm [-r] 【-skipTrash】 URI 【URI 。。。】
作用: 删除参数指定的文件,参数可以有多个。 此命令只删除文件和非空目录。
如果指定-skipTrash选项,那么在回收站可用的情况下,该选项将跳过回收站而直接删除文件;
否则,在回收站可用时,在HDFS Shell 中执行此命令,会将文件暂时放到回收站中。
hdfs dfs -rm -r /dir1
cp
格式: hdfs dfs -cp URI [URI ...] <dest>
作用: 将文件拷贝到目标路径中。如果<dest> 为目录的话,可以将多个文件拷贝到该目录下。
-f
选项将覆盖目标,如果它已经存在。
-p
选项将保留文件属性(时间戳、所有权、许可、ACL、XAttr)。
hdfs dfs -cp /dir1/a.txt /dir2/bigdata.txt
cat
hdfs dfs -cat URI [uri ...]
作用:将参数所指示的文件内容输出到stdout
hdfs dfs -cat /bigdata.txt
tail
格式: hdfs dfs -tail path
作用: 显示一个文件的末尾
text
格式:hdfs dfs -text path
作用: 以字符形式打印一个文件的内容
chmod
格式:hdfs dfs -chmod [-R] URI[URI ...]
作用:改变文件权限。如果使用 -R 选项,则对整个目录有效递归执行。使用这一命令的用户必须是文件的所属用户,或者超级用户。
hdfs dfs -chmod -R 777 /bigdata.txt
chown
格式: hdfs dfs -chmod [-R] URI[URI ...]
作用: 改变文件的所属用户和用户组。如果使用 -R 选项,则对整个目录有效递归执行。使用这一命令的用户必须是文件的所属用户,或者超级用户。
hdfs dfs -chown -R hadoop:hadoop /bigdata.txt
df
格式: hdfs dfs -df -h path
作用: 统计文件系统的可用空间信息
du
格式: hdfs dfs -du -s -h path
作用: 统计文件夹的大小信息
count
格式: hdfs dfs -count path
作用: 统计一个指定目录下的文件节点数量
setrep
格式: hdfs dfs -setrep num filePath
作用: 设置hdfs中文件的副本数量
注意: 即使设置的超过了datanode的数量,副本的数量也最多只能和datanode的数量是一致的
expunge (慎用)
格式: hdfs dfs -expunge
作用: 清空hdfs垃圾桶
5. hdfs的高级使用命令
5.1 HDFS文件限额配置
在多人共用HDFS的环境下,配置设置非常重要。特别是在 Hadoop 处理大量资料的环境,如果没有配额管理,很容易把所有的空间用完造成别人无法存取。HDFS 的配额设定是针对目录而不是针对账号,可以让每个账号仅操作某一个目录,然后对目录设置配置。
HDFS 文件的限额配置允许我们以文件个数,或者文件大小来限制我们在某个目录下上传的文件数量或者文件内容总量,以便达到我们类似百度网盘网盘等限制每个用户允许上传的最大的文件的量。
hdfs dfs -count -q -h /user/root/dir1 #查看配额信息
结果:
5.1.1 数量限额
hdfs dfs -mkdir -p /user/root/dir #创建hdfs文件夹
hdfs dfsadmin -setQuota 2 dir # 给该文件夹下面设置最多上传两个文件,发现只能上传一个文件
hdfs dfsadmin -clrQuota /user/root/dir # 清除文件数量限制
5.1.2 空间大小限额
在设置空间配额时,设置的空间至少是 block_size * 3 大小
hdfs dfsadmin -setSpaceQuota 4k /user/root/dir # 限制空间大小4KB
hdfs dfs -put /root/a.txt /user/root/dir
生成任意大小文件的命令:
dd if=/dev/zero of=1.txt bs=1M count=2 #生成2M的文件
清除空间配额限制
hdfs dfsadmin -clrSpaceQuota /user/root/dir
5.2 HDFS 的安全模式
安全模式是hadoop的一种保护机制,用于保证集群中的数据块的安全性。当集群启动的时候,会首先进入安全模式。当系统处于安全模式时会检查数据块的完整性。
假设我们设置的副本数(即参数dfs.replication)是3,那么在datanode上就应该有3个副本存在,假设只存在2个副本,那么比例就是2/3=0.666。hdfs默认的副本率0.999。我们的副本率0.666明显小于0.999,因此系统会自动的复制副本到其他dataNode,使得副本率不小于0.999。如果系统中有5个副本,超过我们设定的3个副本,那么系统也会删除多于的2个副本。
在安全模式状态下,文件系统只接受读数据请求,而不接受删除、修改等变更请求。在,当整个系统达到安全标准时,HDFS自动离开安全模式。30s
安全模式操作命令
hdfs dfsadmin -safemode get #查看安全模式状态
hdfs dfsadmin -safemode enter #进入安全模式
hdfs dfsadmin -safemode leave #离开安全模式
7. HDFS 文件写入过程(非常重要)
HDFS 文件写入过程
1.Client 发起文件上传请求,通过 RPC 与 NameNode 建立通讯, NameNode 检查目标文件是否已存在,父目录是否存在,返回是否可以上传;
2.Client 请求第一个 block 该传输到哪些 DataNode 服务器上;
3.NameNode 根据配置文件中指定的备份数量及机架感知原理进行文件分配, 返回可用的 DataNode 的地址如:A, B, C;
Hadoop 在设计时考虑到数据的安全与高效, 数据文件默认在 HDFS 上存放三份, 存储策略为本地一份,同机架内其它某一节点上一份,不同机架的某一节点上一份。
4.Client 请求 3 台 DataNode 中的一台 A 上传数据(本质上是一个 RPC 调用,建立 pipeline ),A 收到请求会继续调用 B,然后 B 调用 C,将整个 pipeline 建立完成, 后逐级返回 client;
5.Client 开始往 A 上传第一个 block(先从磁盘读取数据放到一个本地内存缓存),以 packet 为单位(默认64K),A 收到一个 packet 就会传给 B,B 传给 C。A 每传一个 packet 会放入一个应答队列等待应答;
6.数据被分割成一个个 packet 数据包在 pipeline 上依次传输,在 pipeline 反方向上, 逐个发送 ack(命令正确应答),最终由 pipeline 中第一个 DataNode 节点 A 将 pipelineack 发送给 Client;
7.当一个 block 传输完成之后,Client 再次请求 NameNode 上传第二个 block,重复步骤 2;
7.2 机架感知(副本节点选择)
1.低版本Hadoop副本节点选择
第一个副本在client所处的节点上。如果客户端在集群外,随机选一个。
第二个副本和第一个副本位于不相同机架的随机节点上。
第三个副本和第二个副本位于相同机架,节点随机。
机架感知
8.HDFS 文件读取过程(非常重要)
HDFS 文件读取过程
1.Client向NameNode发起RPC请求,来确定请求文件block所在的位置;
2.NameNode会视情况返回文件的部分或者全部block列表,对于每个block,NameNode 都会返回含有该 block 副本的 DataNode 地址; 这些返回的 DN 地址,会按照集群拓扑结构得出 DataNode 与客户端的距离,然后进行排序,排序两个规则:网络拓扑结构中距离 Client 近的排靠前;心跳机制中超时汇报的 DN 状态为 STALE,这样的排靠后;
3.Client 选取排序靠前的 DataNode 来读取 block,如果客户端本身就是DataNode,那么将从本地直接获取数据(短路读取特性);
4.底层上本质是建立 Socket Stream(FSDataInputStream),重复的调用父类 DataInputStream 的 read 方法,直到这个块上的数据读取完毕;
5.当读完列表的 block 后,若文件读取还没有结束,客户端会继续向NameNode 获取下一批的 block 列表;
6.读取完一个 block 都会进行 checksum 验证,如果读取 DataNode 时出现错误,客户端会通知 NameNode,然后再从下一个拥有该 block 副本的DataNode 继续读。
7.read 方法是并行的读取 block 信息,不是一块一块的读取;NameNode 只是返回Client请求包含块的DataNode地址,并不是返回请求块的数据;
8.最终读取来所有的 block 会合并成一个完整的最终文件。
从 HDFS 文件读写过程中,可以看出,HDFS 文件写入时是串行写入的,数据包先发送给节点A,然后节点A发送给B,B在给C;而HDFS文件读取是并行的, 客户端 Client 直接并行读取block所在的节点。
以下是 HDFS 文件读取过程 的简化记忆版,用 关键词 + 流程 的形式帮助你快速掌握:
HDFS 文件读取流程(好记版)
-
问路
• Client 问 NameNode:“文件在哪?”
• NameNode 回答:“文件被切成 block,地址是这些 DataNode。” -
排序
• DataNode 地址按规则排序:
◦ 近的优先(离 Client 近的靠前)。
◦ 健康的优先(心跳异常的靠后)。 -
取货
• Client 找最近的 DataNode 拿 block。
• 如果 Client 自己是 DataNode,直接从本地拿(短路读取)。 -
开箱验货
• 每读一个 block,检查 checksum,确保数据没问题。 -
换地方
• 如果某个 DataNode 坏了,Client 换下一个有副本的 DataNode 继续读。 -
并行干活
• Client 同时从多个 DataNode 拿 block,速度快。 -
拼图
• 所有 block 读完,拼成一个完整的文件。
记忆口诀
- 问路 → Client 问 NameNode。
- 排序 → 近的、健康的优先。
- 取货 → 找最近的 DataNode 拿数据。
- 验货 → 检查 checksum。
- 换地方 → 坏了就换下一个。
- 并行干 → 同时拿多个 block。
- 拼图 → 拼成完整文件。
核心关键词
• NameNode:文件地图。
• DataNode:数据仓库。
• 并行读取:多线程干活。
• 容错机制:坏了就换。
通过这种简洁的方式,你可以轻松记住 HDFS 文件读取的整个过程!
9. NameNode 工作机制以及元数据管理(重要)
NameNode 工作机制
9.1 namenode 与 datanode 启动
-namenode工作机制
1.第一次启动namenode格式化后,创建fsimage和edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
2.客户端对元数据进行增删改的请求。
3.namenode记录操作日志,更新滚动日志。
4.namenode在内存中对数据进行增删改查。
-secondary namenode
1.secondary namenode询问 namenode 是否需要 checkpoint。直接带回 namenode 是否检查结果。
2.secondary namenode 请求执行 checkpoint。
3.namenode 滚动正在写的edits日志。
4.将滚动前的编辑日志和镜像文件拷贝到 secondary namenode。
5.secondary namenode 加载编辑日志和镜像文件到内存,并合并。
6.生成新的镜像文件 fsimage.chkpoint。
7.拷贝 fsimage.chkpoint 到 namenode。
8.namenode将 fsimage.chkpoint 重新命名成fsimage。
9.2 FSImage与edits详解
所有的元数据信息都保存在了FsImage与Eidts文件当中,这两个文件就记录了所有的数据的元数据信息,元数据信息的保存目录配置在了 hdfs-site.xml 当中
<!--fsimage文件存储的路径-->
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///opt/hadoop-2.6.0-cdh5.14.0/hadoopDatas/namenodeDatas</value>
</property>
<!-- edits文件存储的路径 -->
<property>
<name>dfs.namenode.edits.dir</name>
<value>file:///opt/hadoop-2.6.0-cdh5.14.0/hadoopDatas/dfs/nn/edits</value>
</property>
客户端对hdfs进行写文件时会首先被记录在edits文件中。
edits修改时元数据也会更新。
每次hdfs更新时edits先更新后客户端才会看到最新信息。
fsimage:是namenode中关于元数据的镜像,一般称为检查点。
一般开始时对namenode的操作都放在edits中,为什么不放在fsimage中呢?
因为fsimage是namenode的完整的镜像,内容很大,如果每次都加载到内存的话生成树状拓扑结构,这是非常耗内存和CPU。
fsimage内容包含了namenode管理下的所有datanode中文件及文件block及block所在的datanode的元数据信息。随着edits内容增大,就需要在一定时间点和fsimage合并。
9.3 FSimage文件当中的文件信息查看
使用命令 hdfs oiv
cd /opt/hadoop-2.6.0-cdh5.14.0/hadoopDatas/namenodeDatas/current
hdfs oiv -i fsimage_0000000000000000112 -p XML -o hello.xml
9.4 edits当中的文件信息查看
查看命令 hdfs oev
cd /opt/hadoop-2.6.0-cdh5.14.0/hadoopDatas/dfs/nn/edits
hdfs oev -i edits_0000000000000000112-0000000000000000113 -o myedit.xml -p XML
9.5 secondarynameNode如何辅助管理FSImage与Edits文件
1.secnonaryNN通知NameNode切换editlog。
2.secondaryNN从NameNode中获得FSImage和editlog(通过http方式)。
3.secondaryNN将FSImage载入内存,然后开始合并editlog,合并之后成为新的fsimage。
4.secondaryNN将新的fsimage发回给NameNode。
5.NameNode用新的fsimage替换旧的fsimage。
完成合并的是 secondarynamenode,会请求namenode停止使用edits,暂时将新写操作放入一个新的文件中(edits.new)。
secondarynamenode从namenode中通过http get获得edits,因为要和fsimage合并,所以也是通过http get 的方式把fsimage加载到内存,然后逐一执行具体对文件系统的操作,与fsimage合并,生成新的fsimage,然后把fsimage发送给namenode,通过http post的方式。
namenode从secondarynamenode获得了fsimage后会把原有的fsimage替换为新的fsimage,把edits.new变成edits。同时会更新fsimage。
hadoop进入安全模式时需要管理员使用dfsadmin的save namespace来创建新的检查点。
secondarynamenode在合并edits和fsimage时需要消耗的内存和namenode差不多,所以一般把namenode和secondarynamenode放在不同的机器上。
fsimage与edits的合并时机取决于两个参数,第一个参数是默认1小时fsimage与edits合并一次。
第一个参数:时间达到一个小时fsimage与edits就会进行合并
dfs.namenode.checkpoint.period 3600
第二个参数:hdfs操作达到1000000次也会进行合并
dfs.namenode.checkpoint.txns 1000000
第三个参数:每隔多长时间检查一次hdfs的操作次数
dfs.namenode.checkpoint.check.period 60
9.6 namenode元数据信息多目录配置
为了保证元数据的安全性,我们一般都是先确定好我们的磁盘挂载目录,将元数据的磁盘做RAID1
namenode的本地目录可以配置成多个,且每个目录存放内容相同,增加了可靠性。
具体配置方案:
hdfs-site.xml
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/namenodeDatas</value>
</property>
9.7 namenode故障恢复
在我们的secondaryNamenode对namenode当中的fsimage和edits进行合并的时候,每次都会先将namenode的fsimage与edits文件拷贝一份过来,所以fsimage与edits文件在secondarNamendoe当中也会保存有一份,如果namenode的fsimage与edits文件损坏,那么我们可以将secondaryNamenode当中的fsimage与edits拷贝过去给namenode继续使用,只不过有可能会丢失一部分数据。这里涉及到几个配置选项
namenode保存fsimage的配置路径
<!-- namenode元数据存储路径,实际工作当中一般使用SSD固态硬盘,并使用多个固态硬盘隔开,冗余元数据 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/namenodeDatas</value>
</property>
namenode保存edits文件的配置路径
<property>
<name>dfs.namenode.edits.dir</name>
<value>file:///export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/dfs/nn/edits</value>
</property>
secondaryNamenode保存fsimage文件的配置路径
<property>
<name>dfs.namenode.checkpoint.dir</name>
<value>file:///export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/dfs/snn/name</value>
</property>
secondaryNamenode保存edits文件的配置路径
<property>
<name>dfs.namenode.checkpoint.edits.dir</name>
<value>file:///export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/dfs/nn/snn/edits</value>
</property>
接下来我们来模拟namenode的故障恢复功能
1.杀死namenode进程: 使用jps查看namenode的进程号 , kill -9 直接杀死。
2.删除namenode的fsimage文件和edits文件。
根据上述配置, 找到namenode放置fsimage和edits路径. 直接全部rm -rf 删除。
3.拷贝secondaryNamenode的fsimage与edits文件到namenode的fsimage与edits文件夹下面去。
根据上述配置, 找到secondaryNamenode的fsimage和edits路径, 将内容 使用cp -r 全部复制到namenode对应的目录下即可。
4.重新启动namenode, 观察数据是否存在。
10. datanode工作机制以及数据存储
datanode工作机制
1.一个数据块在datanode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
2.DataNode启动后向namenode注册,通过后,周期性(1小时)的向namenode上报所有的块信息。(dfs.blockreport.intervalMsec)。
3.心跳是每3秒一次,心跳返回结果带有namenode给该datanode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个datanode的心跳,则认为该节点不可用。
4.集群运行中可以安全加入和退出一些机器。
数据完整性
1.当DataNode读取block的时候,它会计算checksum。
2.如果计算后的checksum,与block创建时值不一样,说明block已经损坏。
3.client读取其他DataNode上的block。
4.datanode在其文件创建后周期验证checksum。
以下是 DataNode 工作机制 的简化记忆版,用 关键词 + 流程 的形式帮助你快速掌握:
DataNode 工作机制(好记版)
-
存数据
• 数据块在 DataNode 上存为两个文件:
◦ 数据文件:存储实际数据。
◦ 元数据文件:存储块长度、校验和、时间戳等信息。 -
报到
• DataNode 启动后向 NameNode 注册,注册成功后:
◦ 周期性上报:每 1 小时上报一次所有块信息(dfs.blockreport.intervalMsec
)。
◦ 心跳机制:每 3 秒发一次心跳,NameNode 通过心跳返回指令(如复制块、删除块)。
◦ 超时判定:如果 10 分钟没收到心跳,认为该 DataNode 不可用。 -
动态调整
• 集群运行中,可以安全地加入或退出 DataNode,不影响整体服务。
记忆口诀
- 存数据 → 数据块存两份:数据 + 元数据。
- 报到 → 注册后,1 小时报一次,3 秒心跳一次。
- 听指令 → 心跳带回 NameNode 的命令。
- 超时挂 → 10 分钟没心跳,判定不可用。
- 动态调 → 随时加机器,随时退机器。
核心关键词
• 数据块存储:数据文件 + 元数据文件。
• 注册与上报:启动注册,周期性上报。
• 心跳机制:3 秒一次,带回指令。
• 超时判定:10 分钟没心跳,节点不可用。
• 动态调整:集群可动态增减 DataNode。
通过这种简洁的方式,你可以轻松记住 DataNode 的工作机制!
YARN
YARN的3种调度
先进先出器
单队列,根据提交作业的先后顺序,先来先服务。
优点是简单易懂;缺点是不支持多队列,生产环境很少使用。
容器调度器
多队列:每个队列可配置一定的资源,每个队列采用FIFO调度策略。
容量保证:可为每个队列设置资源最低保证和资源使用上限。
灵活性:如果一个队列中的资源有剩余,可以暂时共享给需要资源的队列,而一旦该队列有新的应用提交,则其它队列借调的资源需要归还。
多用户:支持多用户共享集群:为了防止同一用户作业独占队列资源,容量调度器会对同一用户提交作业占用资源进行限制。
公平调度器(常用)
具有与容量调度器相同的四个特征:多队列、容量保证、灵活性、多用户
动态调整Job的资源分配,大作业运行时,如果有小作业提交,大作业可以释放部分占有的资源给小作业。
公平调度器可以短的作业在合理的时间完成,不必一直等待长作业的完成。
1. yarn的架构和原理
1.1 yarn的基本介绍和产生背景
YARN是Hadoop2引入的通用的资源管理和任务调度的平台,可以在YARN上运行MapReduce、Tez、Spark等多种计算框架,只要计算框架实现了YARN所定义的接口,都可以运行在这套通用的Hadoop资源管理和任务调度平台上。
Hadoop 1.0是由HDFS和MapReduce V1组成的,YARN出现之前是MapReduce V1来负责资源管理和任务调度,MapReduce V1由JobTracker和TaskTracker两部分组成。
MapReduce V1有如下缺点:
1.扩展性差:
在MapReduce V1中,JobTracker同时负责资源管理和任务调度,而JobTracker只有一个节点,所以JobTracker成为了制约系统性能的一个瓶颈,制约了Hadoop平台的扩展性。
2.可靠性低:
MapReduce V1中JobTracker存在单点故障问题,所以可靠性低。
3.资源利用率低:
MapReduce V1采用了基于槽位的资源分配模型,槽位是一种粗粒度的资源划分单位。
一是通常情况下为一个job分配的槽位不会被全部利用。
二是一个MapReduce任务的Map阶段和Reduce阶段会划分了固定的槽位,并且不可以共用,很多时候一种类型的槽位资源很紧张而另外一种类型的槽位很空闲,导致资源利用率低。
4.不支持多种计算框架
MapReduce V1这种资源管理和任务调度方式只适合MapReduce这种计算框架,而MapReduce这种离线计算框架很多时候不能满足应用需求。
yarn的优点:
1.支持多种计算框架
YARN是通用的资源管理和任务调度平台,只要实现了YARN的接口的计算框架都可以运行在YARN上。
2.资源利用率高
多种计算框架可以共用一套集群资源,让资源充分利用起来,提高了利用率。
3.运维成本低
避免一个框架一个集群的模式,YARN降低了集群的运维成本。
4.数据可共享
共享集群模式可以让多种框架共享数据和硬件资源,减少数据移动带来的成本。
1.2 hadoop 1.0 和 hadoop 2.0 的区别
1.组成部分
Hadoop1.0由HDFS和MapReduce组成,Hadoop2.0由HDFS和YARN组成。
2.HDFS可扩展性
Hadoop1.0中的HDFS只有一个NameNode,制约着集群文件个数的增长,Hadoop2.0增加了HDFS联盟的架构,可以将NameNode所管理的NameSpace水平划分,增加了HDFS的可扩展性。
3.HDFS的可靠性
Hadoop1.0中的HDFS只有一个NameNode,存在着单点故障的问题,Hadoop2.0提供了HA的架构,可以实现NameNode的热备份和热故障转移,提高了HDFS的可靠性。
4.可支持的计算框架
Hadoop1.0中只支持MapReduce一种计算框架,Hadoop2.0因为引入的YARN这个通用的资源管理与任务调度平台,可以支持很多计算框架了。
5.资源管理和任务调度
Hadoop1.0中资源管理和任务调度依赖于MapReduce中的JobTracker,JobTracker工作很繁重,很多时候会制约集群的性能。
Hadoop2.0中将资源管理任务分给了YARN的ResourceManage,将任务调度分给了YARN的ApplicationMaster。
1.3 yarn 集群的架构和工作原理
YARN的基本设计思想是将MapReduce V1中的JobTracker拆分为两个独立的服务:ResourceManager和ApplicationMaster。ResourceManager负责整个系统的资源管理和分配,ApplicationMaster负责单个应用程序的的管理。
1.ResourceManager
RM是一个全局的资源管理器,负责整个系统的资源管理和分配,它主要由两个部分组成:调度器(Scheduler)和应用程序管理器(Application Manager)。
调度器根据容量、队列等限制条件,将系统中的资源分配给正在运行的应用程序,在保证容量、公平性和服务等级的前提下,优化集群资源利用率,让所有的资源都被充分利用 。
应用程序管理器负责管理整个系统中的所有的应用程序,包括应用程序的提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重启它。
2.ApplicationMaster
用户提交的一个应用程序会对应于一个ApplicationMaster,它的主要功能有:
与RM调度器协商以获得资源,资源以Container表示。
将得到的任务进一步分配给内部的任务。
与NM通信以启动/停止任务。
监控所有的内部任务状态,并在任务运行失败的时候重新为任务申请资源以重启任务。
3.nodeManager
NodeManager是每个节点上的资源和任务管理器,一方面,它会定期地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,他接收并处理来自AM的Container启动和停止请求。
4.container
Container是YARN中的资源抽象,封装了各种资源。一个应用程序会分配一个Container,这个应用程序只能使用这个Container中描述的资源。
不同于MapReduceV1中槽位slot的资源封装,Container是一个动态资源的划分单位,更能充分利用资源。
1.4 yarn 的任务提交流程
当jobclient向YARN提交一个应用程序后,YARN将分两个阶段运行这个应用程序:一是启动ApplicationMaster;第二个阶段是由ApplicationMaster创建应用程序,为它申请资源,监控运行直到结束。
具体步骤如下:
1.用户向YARN提交一个应用程序,并指定ApplicationMaster程序、启动ApplicationMaster的命令、用户程序。
2.RM为这个应用程序分配第一个Container,并与之对应的NM通讯,要求它在这个Container中启动应用程序ApplicationMaster。
3.ApplicationMaster向RM注册,然后拆分为内部各个子任务,为各个内部任务申请资源,并监控这些任务的运行,直到结束。
4.AM采用轮询的方式向RM申请和领取资源。
5.RM为AM分配资源,以Container形式返回
6.AM申请到资源后,便与之对应的NM通讯,要求NM启动任务。
7.NodeManager为任务设置好运行环境,将任务启动命令写到一个脚本中,并通过运行这个脚本启动任务
8.各个任务向AM汇报自己的状态和进度,以便当任务失败时可以重启任务。
9.应用程序完成后,ApplicationMaster向ResourceManager注销并关闭自己
2. RM和NM的功能介绍
2.1 resourceManager基本介绍
ResourceManager负责集群中所有资源的统一管理和分配,它接收来自各个NodeManager的资源汇报信息,并把这些信息按照一定的策略分配给各个ApplicationMaster。
2.1.1 RM的职能
1.与客户端交互,处理客户端的请求。
2.启动和管理AM,并在它运行失败时候重新启动它。
3.管理NM,接收来自于NM的资源汇报信息,并向NM下达管理指令。
4.资源管理和调度,接收来自于AM的资源请求,并为它分配资源。
2.1.2 RM 的内部结构
用户交互模块:
1.clientRMService : 为普通用户服务,处理请求,如:提交应用程序、终止程序、获取程序状态
2.adminService : 给管理员提供的服务。普通用户交互模块是ClientRMService,管理员交互模块是AdminService,之所以要将两个模块分开,用不同的通信通道发送给ResourceManager,是因为要避免普通用户的请求过多导致管理员请求被阻塞
3.WebApp : 更友好的展示集群资源和程序运行状态
NM管理模块:
1.NMLivelinessMonitor : 监控NM是否活着,如果指定时间内未收到心跳,就从集群中移除。RM会通过心跳告诉AM某个NM上的Container失效,如果Am判断需要重新执行,则AM重新向RM申请资源。
2.NodesListManager : 维护inlude(正常)和exlude(异常)的NM节点列表。默认情况下,两个列表都为空,可以由管理员添加节点。exlude列表里的NM不允许与RM进行通信。
3.ResourceTrackerService : 处理来自NM的请求,包括注册和心跳。注册是NM启动时的操作,包括节点ID和可用资源上线等。心跳包括各个Container运行状态,运行Application列表、节点健康状态
AM管理模块:
1.AMLivelinessMonitor : 监控AM是否还活着,如果指定时间内没有接受到心跳,则将正在运行的Container置为失败状态,而AM会被重新分配到另一个节点上
2.ApplicationMasterLauncher: 要求某一个NM启动ApplicationMaster,它处理创建AM的请求和kill AM的请求
3.ApplicationMasterService : 处理来自AM的请求,包括注册、心跳、清理。注册是在AM启动时发送给ApplicationMasterService的;心跳是周期性的,包括请求资源的类型、待释放的Container列表;清理是程序结束后发送给RM,以回收资源清理内存空间;
Application管理模块:
1.ApplicationACLLsManager : 管理应用程序的访问权限,分为查看权限和修改权限。
2.RMAppManager : 管理应用程序的启动和关闭
3.ContainerAllocationExpirer : RM分配Container给AM后,不允许AM长时间不对Container使用,因为会降低集群的利用率,如果超时(时间可以设置)还没有在NM上启动Container,RM就强制回收Container。
状态机管理模块:
1.RMApp : RMApp维护一个应用程序的的整个运行周期,一个应用程序可能有多个实例,RMApp维护的是所有实例的
2.RMAppAttempt : RMAppAttempt维护一个应用程序实例的一次尝试的整个生命周期
3.RMContainer : RMContainer维护一个Container的整个运行周期(可能和任务的周期不一致)
4.RMNode : RMNode维护一个NodeManager的生命周期,包括启动到运行结束的整个过程。
安全模块:
RM自带了全面的权限管理机制。主要由ClientToAMSecretManager、ContainerTokenSecretManager、ApplicationTokenSecretManager等模块组成。
资源分配模块:
ResourceScheduler:ResourceScheduler是资源调度器,他按照一定的约束条件将资源分配给各个应用程序。RM自带了一个批处理资源调度器(FIFO)和两个多用户调度器Fair Scheduler 和Capacity Scheduler
2.1.3 启动ApplicationMaster
1.客户端提交一个任务给RM,ClientRMService负责处理客户端请求
2.ClentRMService通知RMAppManager。
3.RMAppManager为应用程序创建一个RMApp对象来维护任务的状态。
4.RMApp启动任务,创建RMAppAttempt对象。
5.RMAppAttempt进行一些初始化工作,然后通知ResourceScheduler申请资源。
6.ResourceScheduler为任务分配资源后,创建一个RMContainer维护Container状态
7.并通知RMAppAttempt,已经分配资源。
8.RMAppAttempt通知ApplicationMasterLauncher在资源上启动AM。
9.在NodeManager的已分配资源上启动AM
10.AM启动后向ApplicationMasterService注册。
2.1.4 申请和分配container
AM向RM请求资源和RM为AM分配资源是两个阶段的循环过程:
阶段一:AM请求资源请求并领取资源的过程,这个过程是AM发送请求、RM记录请求。
阶段二:NM向RM汇报各个Container运行状态,如果RM发现它上面有空闲的资源就分配给等待的AM。
具体过程如下:
阶段一:
1.AM通过RPC函数向RM发送资源需求信息,包括新的资源需求描述、待释放的Container列表、请求加入黑名单的节点列表、请求移除黑名单的节点列表等
2.RM的ApplicationMasterService负责处理AM的请求。一旦收到请求,就通知RMAppAttempt,更新应用程序执行进度,在AMLivenessMonitor中记录更新时间。
3.ApplicationMasterService调用ResourceScheduler,将AM的资源需求汇报给ResourceScheduler。
4.ResouceScheduler首先读取待释放的Container列表,通知RMContainer更改状态,杀死要释放的Container,然后将新的资源需求记录,如果资源足够就记录已经分配好资源。
阶段二:
1.NM通过RPC向RM汇报各自的各个Container的运行情况
2.RM的ResourceTrackerService负责处理来自NM的汇报,收到汇报后,就通知RMNode更改Container状态,并通知ResourceScheduler。
3.ResourceScheduler收到通知后,如果有可分配的空闲资源,就将资源分配给等待资源的AM,等待AM下次心跳将资源领取走。
2.1.5 杀死 application
杀死Application流程:
Kill Job通常是客户端发起的,RM的ClientRMService负责处理请求,接收到请求后,先检查权限,确保用户有权限Kill Job,然后通知维护这个Application的RMApp对象,根据Application当前状态调用相应的函数来处理。
这个时候分为两种情况:Application没有在运行、Application正在运行。
1.Application没有在运行
向已经运行过的NodeManger节点对应的状态维护对象RMNode发送通知,进行清理;向RMAppManager发送通知,将Application设置为已完成状态。
2.Application正在运行
如果正在运行,也首先像情况一处理一遍,回收运行过的NodeManager资源,将Application设置为已完成。另外RMApp还要通知维护任务状态的RMAppAttempt对象,将已经申请和占用的资源回收,但是真正的回收是由资源调度器ResourceScheduler异步完成的。
异步完成的步骤是先由ApplicationMasterLauncher杀死AM,并回收它占用的资源,再由各个已经启动的RMContainer杀死Container并回收资源。
2.1.6 Container超时
YARN里有两种Container:运行AM的Container和运行普通任务的Container。
1.RM为要启动的AM分配Container后,会监控Container的状态,如果指定时间内AM还没有在Container上启动的话,Container就会被回收,AM Container超时会导致Application执行失败。
2.普通Container超时会进行资源回收,但是YARN不会自动在其他资源上重试,而是通知AM,由AM决定是否重试。
2.1.7 安全管理
Hadoop的安全管理是为了更好地让多用户在共享Hadoop集群环境下安全高效地使用集群资源。系统安全机制由认证和授权两大部分构成,Hadoop2.0中的认证机制采用Kerberos和Token两种方案,而授权则是通过引入访问控制表(Access Control List,ACL)实现的。
1.术语
Kerberos是一种基于第三方服务的认证协议,非常安全。特点是用户只需要输入一次身份验证信息就可以凭借此验证获得的票据访问多个服务。
Token是一种基于共享密钥的双方身份认证机制。
Principal是指集群中被认证或授权的主体,主要包括用户、Hadoop服务、Container、Application、Localizer、Shuffle Data等。
2.Hadoop认证机制
Hadoop同时采用了Kerberos和Token两种技术,服务和服务之间的认证采用了Kerberos,用户和NameNode及用户和ResourceManager首次通讯也采用Kerberos认证,用户和服务之间一旦建立连接后,用户就可以从服务端获取一个Token,之后就可以使用Token认证通讯了。因为Token认证要比Kerberos要高效。
Hadoop里Kerberos认证默认是关闭的,可以通过参数hadoop.security.authentication设置为kerberos,这个配置模式是simple。
3.Hadoop授权机制
Hadoop授权是通过访问控制列表(ACL)实现的,Hadoop的访问控制机制与UNIX的POSIX风格的访问控制机制是一致的,将权限授予对象分为:用户、同组用户、其他用户。默认情况下,Hadoop公用UNIX/Linux下的用户和用户组。
队列访问控制列表
应用程序访问控制列表
服务访问控制列表
2.2 nodeManager功能介绍
NM是单个节点上的代理,功能包括与ResourceManager保持通讯、管理Container的生命周期、监控Container的资源使用、追踪节点健康状态、管理日志。
2.2.1 基本内部构造
2.2.2 状态机管理
NodeManager维护着三类状态机,分别是Application、Container、LocalizedResource。
1.Application状态机
RM上有一个整个集群上Application信息列表,而一个NM上也有一个处在它自己节点的Application的信息列表,NodeManager上的Application状态机维护着NodeManager上Application的状态。
这有利于对一个NM节点上的同一个Application所有的Container进行统一管理。
2.Container状态机
Container状态机维护NodeManager上所有Container的生命周期。
3.LocalizedResource状态机
LocalizedResource状态是NodeManager上用于维护一个资源生命周期的数据结构。资源包括文件、JAR包等。
2.2.3 container生命周期的管理
NodeManager中的ContainerManager负责接收AM发来的请求以启动Container,Container的启动过程分三个阶段:资源本地化、启动并运行Container、资源清理。
1.资源本地化
资源本地化主要是进行分布是缓存工作,分为应用程序初始化和Container本地化。
2.运行Container
Container运行是由ContainerLauncher服务完成启动后,调用ContainerExecutor来进行的。主要流程为:将待运行的Container所需要的环境变量和运行命令写到Shell脚本launch_container.sh中,并将启动该脚本的命令写入default_container_executor.sh中,然后通过运行该脚本启动container。
3.资源清理
container清理是资源本地化的逆过程,是指当container运行完成后,NodeManager来回收资源。
3. yarn的applicationMaster介绍
ApplicationMaster实际上是特定计算框架的一个实例,每种计算框架都有自己独特的ApplicationMaster,负责与ResourceManager协商资源,并和NodeManager协同来执行和监控Container。MapReduce只是可以运行在YARN上一种计算框架。
3.1 applicationMaster的职能
Application启动后,将负责以下任务:
1.初始化向ResourceManager报告自己的活跃信息的进程 (注册)
2.计算应用程序的的资源需求。
3.将需求转换为YARN调度器可以理解的ResourceRequest。
4.与调度器协商申请资源
5.与NodeManager协同合作使用分配的Container。
6.跟踪正在运行的Container状态,监控它的运行。
7.对Container或者节点失败的情况进行处理,在必要的情况下重新申请资源。
3.2 报告活跃
1.注册
ApplicationMaster执行的第一个操作就是向ResourceManager注册,注册时AM告诉RM它的IPC的地址和网页的URL。
IPC地址是面向客户端的服务地址;网页URL是AM的一个Web服务的地址,客户端可以通过Http获取应用程序的状态和信息。
注册后,RM返回AM可以使用的信息,包括:YARN接受的资源的大小范围、应用程序的ACL信息。
2.心跳
注册成功后,AM需要周期性地发送心跳到RM确认他还活着。参数yarn.am.liveness-monitor.expiry配置AM心跳最大周期,如果RM发现超过这个时间还没有收到AM的心跳,那么就判断AM已经死掉。
3.3 资源需求
AM所需要的资源分为静态资源和动态资源。
1.静态资源
在任务提交时就能确定,并且在AM运行时不再变化的资源是静态资源,比如MapReduce程序中的Map的数量。
2.动态资源
AM在运行时确定要请求数量的资源是动态资源。
3.4 调度任务
当AM的资源请求数量达到一定数量或者到了心跳时,AM才会发送心跳到RM,请求资源,心跳是以ResourceRequest形式发送的,包括的信息有:resourceAsks、ContainerID、containersToBeReleased。
RM响应的信息包括:新分配的Container列表、已经完成了的Container状态、集群可用的资源上限。
3.5 启动container
1.AM从RM那里得到了Container后就可以启动Container了。
2.AM首先构造ContainerLaunchContext对象,包括分配资源的大小、安全令牌、启动Container执行的命令、进程环境、必要的文件等
3.AM与NM通讯,发送StartContainerRequest请求,逐一或者批量启动Container。
4.NM通过StartContainerResponse回应请求,包括:成功启动的Container列表、失败的Container信信息等。
5.整个过程中,AM没有跟RM进行通信。
6.AM也可以发送StopContainerRequest请求来停止Container。
3.6 完成的container
当Container执行结束时,由RM通知AM Container的状态,AM解释Container状态并决定如何继续操作。所以YARN平台只是负责为计算框架提供Container信息。
3.7 AM的失败和恢复
当AM失效后,YARN只负责重新启动一个AM,任务恢复到失效前的状态是由AM自己完成的。AM为了能实现恢复任务的目标,可以采用以下方案:将任务的状态持久化到外部存储中。比如:MapReduce框架的ApplicationMaster会将已完成的任务持久化,失效后的恢复时可以将已完成的任务恢复,重新运行未完成的任务。
3.8 applicationMaster启动过程
4. yarn的资源调度
1.资源调度器的职能
资源调度器是YARN最核心的组件之一,是一个插拔式的服务组件,负责整个集群资源的管理和分配。YARN提供了三种可用的资源调度器:FIFO、Capacity Scheduler、Fair Scheduler。
2.资源调度器的分类
不同的任务类型对资源有着不同的负责质量要求,有的任务对时间要求不是很高(如Hive),有的任务要求及时返还结果(如HBase),有的任务是CPU密集型的,有的是I/O密集型的,所以简单的一种调度器并不能完全符合所有的任务类型。
有两种调度器的设计思路:
一是在一个物理Hadoop集群上虚拟多个Hadoop集群,这些集群各自有自己全套的Hadoop服务,典型的代表是HOD(Hadoop On Demand)调度器,Hadoop2.0中已经过时。
另一种是扩展YARN调度器。典型的是Capacity Scheduler、Fair Scheduler。
3.基本架构
插拔式组件
YARN里的资源调度器是可插拔的,ResourceManager在初始化时根据配置创建一个调度器,可以通过参数yarn.resourcemanager.scheduler.class参数来设置调度器的主类是哪个,默认是CapacityScheduler,配置值为:org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler。
所有的资源调度器都要实现接口org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler。
事件处理器
YARN的资源管理器实际上是一个事件处理器,它处理6个SchedulerEventType类型的事件。
事件说明:
Node_Removed 集群中移除一个计算节点,资源调度器需要收到该事件后从可分配的资源总量中移除相应的资源量。
Node_Added 集群增加一个节点
Application_added RM收到一个新的Application。
Application_Remove 表示一个Application运行结束
Container_expired 当一个Container分配给AM后,如果在一段时间内AM没有启动Container,就触发这个事件。调度器会对该Container进行回收。
Node_Update RM收到NM的心跳后,就会触发Node_Update事件。
4.1 资源调度三种模型介绍
究竟使用哪种调度模型,取决于这个配置项,apache版本的hadoop默认使用的是capacity scheduler调度方式。CDH版本的默认使用的是fair scheduler调度方式 : yarn-site.xml
yarn.resourcemanager.scheduler.class
1.双层资源调度模型
YARN使用了双层资源调度模型。
第一层:ResourceManager中的调度器将资源分配给各个ApplicationMaster。这一层调度由YARN的资源调度器来实现。
第二层:ApplicationMaster再进一步将资源分配给它内部的各个任务。这一层的调度由用户程序这个计算框架来实现。
YARN的资源分配过程是异步的,YARN的调度器分配给AM资源后,先将资源存入一个缓冲区内,当AM下次心跳时来领取资源。
资源分配过程如下7个步骤:
步骤1:NodeManager通过周期性的心跳汇报节点信息 : 告诉resourceManager当前剩余的资源信息
步骤2:RM为NM返回一个应答,包括要释放的Container列表。
步骤3:RM收到NM汇报的信息后,会出发资源调度器的Node_Update事件。
步骤4:资源调度器收到Node_Update事件后,会按照一定的策略将该节点上资源分配给各个应用程序,并将分配结果存入一个内存数据结构中。
步骤5:应用程序的ApplicationMaster周期性地向RM发送心跳,以领取最新分配的Container。
步骤6:RM收到AM的心跳后,将分配给它的Container以心跳应答的方式返回给ApplicationMaster
步骤7:AM收到新分配的Container后,会将这些Container进一步分配给他的内部子任务。
2.资源保证机制
YARN采用增量资源分配机制来保证资源的分配。
增量资源分配机制是指当YARN暂时不能满足应用程序的资源要求时,将现有的一个节点上的资源预留,等到这个节点上累计释放的资源满足了要求,再分配给ApplicationMaster。
这种增量资源分配机制虽然会造成资源的浪费,但是能保证AM肯定会得到资源,不会被饿死。
3.资源分配算法
YARN的资源调度器采用了主资源公平调度算法(DRF)来支持多维度资源调度。
4.资源抢占模型
资源调度器中,每个队列可以设置一个最小资源量和最大资源量。为了提高集群使用效率,资源调度器会将负载较轻的队列资源分配给负载较重的队列使用,当负载较轻的队列突然接到了新的任务时,调度器才会将本属于该队列的资源分配给它,但是此时资源有可能正被其他队列使用,因此调度器必须等待其他队列释放资源,如果一段时间后发现资源还未得到释放,则进行资源抢占。
关于资源抢占的实现,涉及到一下两个问题:
如何决定是否抢占某个队列的资源
如何使得资源抢占代价最小
资源抢占是通过杀死正在使用的Container实现的,由于Container已经处于运行状态,直接杀死Container会造成已经完成的计算白白浪费,为了尽可能地避免资源浪费,YARN优先选择优先级低的Container做为资源抢占的对象,并且不会立刻杀死Container,而是将释放资源的任务留给ApplicationMaster中的应用程序,以期望他能采取一定的措施来执行释放这些Container,比如保存一些状态后退出,如果一段时间后,ApplicationMaster仍未主动杀死Container,则RM再强制杀死这些Container。
4.1.1 层级队列管理机制FIFO调度策略
Hadoop1.0中使用了平级队列的组织方式,而后来采用了层级队列的组织方式。
层级队列的特点:
子队列
队列可以嵌套,每个队列都可以包含子队列;用户只能将应用程序提交到叶子队列中。
最小容量
每个子队列均有一个最小容量比属性,表示可以使用的父队列容量的百分比。
调度器总是优先选择当前资源使用率最低的队列,并为之分配资源。
指定了最小容量,但是不会保证会保持最小容量,同样会被分配给其他队列。
最大容量
队列指定了最大容量,任何时候队列使用的资源都不会超过最大容量。
默认情况下队列的最大容量是无限大。
用户权限管理
管理员可以配置每个叶子节点队列对应的操作系统的用户和用户组。
系统资源管理
管理员设置了每个队列的容量,每个用户可以用资源的量,调度器根据这些配置来进行资源调度
队列命名规则:
为了防止队列名称的冲突和便于识别队列,YARN采用了自顶向下的路径命名规则,父队列和子队列名称采用.拼接。
4.1.2 Capacity Scheduler
Capacity Scheduler是Yahoo!开发的多用户调度器。主要有以下几个特点:
容量保证
管理员可以为队列设置最低保证和资源使用上限,同一个队列里的应用程序可以共享使用队列资源。
灵活性:
一个队列里的资源有剩余,可以暂时共享给其他队列,一旦该队列有的新的任务,其他队列会归还资源,这样尽量地提高了集群的利用率。
多重租赁
支持多用户共享集群和多应用程序同时运行
安全保证
每个队列有严格的ACL列表,限制了用户的权限
动态更新配置文件
管理员对参数的配置是动态的。
配置方案:
Capacity Scheduler的所有配置都在capactiy-scheduler.xml里,管理员修改后,要通过命令来刷写队列:yarn mradmin –refreshQueues
Capacity Scheduler不允许管理员动态地减少队列数目,且更新的配置参数值应该是合法值。
以下以队列tongyong为例来说明参数配置:
4.1.3 Fair Scheduler
基本特点:
1.资源公平共享
默认是Fair策略分配资源,Fair 策略是一种基于最大最小公平算法实现的,所有应用程序平分资源。
2.支持资源抢占
某个队列中有剩余资源时,调度器会将这些资源共享给其他队列,当该队列有了新的应用程序提交过来后,调度器会回收资源,调度器采用先等待再强制回收的策略。
3.负载均衡
Fair Scheduler提供了一个基于任务数目的负载均衡机制,尽可能将系统中的任务均匀分布到各个节点上。
4.调度策略配置灵活
可以每个队列选用不同的调度策略:FIFO、Fair、DRF
5.提高小应用程序的响应时间
小作业也可以分配大资源,可以快速地运行完成