文章目录
- YARN基本介绍
- YARN的结构分析
- YARN中的调度器
- 实际案例:YARN多资源队列的配置和使用
YARN基本介绍
- 实现Hadoop集群的资源共享
- 不仅支持MapReduce,还支持Spark,Flink等计算
YARN的结构分析
- 主要复制集群资源的管理和调度,支持主从架构,主节点可以有两个,从节点可以有多个
- ResourceManager:主节点主要负责集群资源的管理和分配
- NodeManager:从节点主要负责当前机器的资源管理
- NodeManager启动时会向ResourceManager注册,注册信息中包含该节点可分配的CPU内存总量
YARN中的调度器
- FIFO Schedule:先进先出调度策略
- Capacity Schedule:FIFO Schedule多队列版本
- Fail Schedule:多队列,多用户共享资源
集群中默认使用Capacity Schedule,可以在YARN管理页面查看到。在实际工作中使用最多的也是Capacity Schedule。
实际案例:YARN多资源队列的配置和使用
- 增加online(实时任务,一直执行的任务)和offline(离线任务)队列
- 向offline中提交任务
修改配置
[root@hadoop01 hadoop-3.2.0]# vim etc/hadoop/capacity-scheduler.xml
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>
<property>
<name>yarn.scheduler.capacity.maximum-applications</name>
<value>10000</value>
<description>
Maximum number of applications that can be pending and running.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
<value>0.1</value>
<description>
Maximum percent of resources in the cluster which can be used to run
application masters i.e. controls number of concurrent running
applications.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>
<description>
The ResourceCalculator implementation to be used to compare
Resources in the scheduler.
The default i.e. DefaultResourceCalculator only uses Memory while
DominantResourceCalculator uses dominant-resource to compare
multi-dimensional resources such as Memory, CPU etc.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,online,offline</value>
<description>
The queues at the this level (root is the root queue).
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>70</value>
<description>Default queue target capacity.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.online.capacity</name>
<value>10</value>
<description>online queue target capacity.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.offline.capacity</name>
<value>20</value>
<description>offline queue target capacity.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.user-limit-factor</name>
<value>1</value>
<description>
Default queue user limit a percentage from 0.0 to 1.0.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>70</value>
<description>
The maximum capacity of the default queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.online.maximum-capacity</name>
<value>10</value>
<description>
The maximum capacity of the online queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.offline.maximum-capacity</name>
<value>20</value>
<description>
The maximum capacity of the default queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.state</name>
<value>RUNNING</value>
<description>
The state of the default queue. State can be one of RUNNING or STOPPED.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.acl_submit_applications</name>
<value>*</value>
<description>
The ACL of who can submit jobs to the default queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.acl_administer_queue</name>
<value>*</value>
<description>
The ACL of who can administer jobs on the default queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.acl_application_max_priority</name>
<value>*</value>
<description>
The ACL of who can submit applications with configured priority.
For e.g, [user={name} group={name} max_priority={priority} default_priority={priority}]
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.maximum-application-lifetime
</name>
<value>-1</value>
<description>
Maximum lifetime of an application which is submitted to a queue
in seconds. Any value less than or equal to zero will be considered as
disabled.
This will be a hard time limit for all applications in this
queue. If positive value is configured then any application submitted
to this queue will be killed after exceeds the configured lifetime.
User can also specify lifetime per application basis in
application submission context. But user lifetime will be
overridden if it exceeds queue maximum lifetime. It is point-in-time
configuration.
Note : Configuring too low value will result in killing application
sooner. This feature is applicable only for leaf queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.default-application-lifetime
</name>
<value>-1</value>
<description>
Default lifetime of an application which is submitted to a queue
in seconds. Any value less than or equal to zero will be considered as
disabled.
If the user has not submitted application with lifetime value then this
value will be taken. It is point-in-time configuration.
Note : Default lifetime can't exceed maximum lifetime. This feature is
applicable only for leaf queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.node-locality-delay</name>
<value>40</value>
<description>
Number of missed scheduling opportunities after which the CapacityScheduler
attempts to schedule rack-local containers.
When setting this parameter, the size of the cluster should be taken into account.
We use 40 as the default value, which is approximately the number of nodes in one rack.
Note, if this value is -1, the locality constraint in the container request
will be ignored, which disables the delay scheduling.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.rack-locality-additional-delay</name>
<value>-1</value>
<description>
Number of additional missed scheduling opportunities over the node-locality-delay
ones, after which the CapacityScheduler attempts to schedule off-switch containers,
instead of rack-local ones.
Example: with node-locality-delay=40 and rack-locality-delay=20, the scheduler will
attempt rack-local assignments after 40 missed opportunities, and off-switch assignments
after 40+20=60 missed opportunities.
When setting this parameter, the size of the cluster should be taken into account.
We use -1 as the default value, which disables this feature. In this case, the number
of missed opportunities for assigning off-switch containers is calculated based on
the number of containers and unique locations specified in the resource request,
as well as the size of the cluster.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.queue-mappings</name>
<value></value>
<description>
A list of mappings that will be used to assign jobs to queues
The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]*
Typically this list will be used to map users to queues,
for example, u:%user:%user maps all users to queues with the same name
as the user.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
<value>false</value>
<description>
If a queue mapping is present, will it override the value specified
by the user? This can be used by administrators to place jobs in queues
that are different than the one specified by the user.
The default is false.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments</name>
<value>1</value>
<description>
Controls the number of OFF_SWITCH assignments allowed
during a node's heartbeat. Increasing this value can improve
scheduling rate for OFF_SWITCH containers. Lower values reduce
"clumping" of applications on particular nodes. The default is 1.
Legal values are 1-MAX_INT. This config is refreshable.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.application.fail-fast</name>
<value>false</value>
<description>
Whether RM should fail during recovery if previous applications'
queue is no longer valid.
</description>
</property>
</configuration>
拷贝到其它两台机器
[root@hadoop01 hadoop]# scp -rq capacity-scheduler.xml hadoop02:/home/soft/hadoop-3.2.0/etc/hadoop/
[root@hadoop01 hadoop]# scp -rq capacity-scheduler.xml hadoop03:/home/soft/hadoop-3.2.0/etc/hadoop/
重启
[root@hadoop01 hadoop-3.2.0]# sbin/start-all.sh
Starting namenodes on [hadoop01]
Last login: Thu Mar 7 12:25:50 CST 2024 on pts/1
Starting datanodes
Last login: Thu Mar 7 12:28:23 CST 2024 on pts/1
Starting secondary namenodes [hadoop01]
Last login: Thu Mar 7 12:28:26 CST 2024 on pts/1
Starting resourcemanager
Last login: Thu Mar 7 12:28:31 CST 2024 on pts/1
Starting nodemanagers
Last login: Thu Mar 7 12:28:38 CST 2024 on pts/1
You have new mail in /var/spool/mail/root
[root@hadoop01 hadoop-3.2.0]# jps
37651 SecondaryNameNode
37380 NameNode
37896 ResourceManager
34569 JobHistoryServer
38221 Jps
/**
* map阶段
*/
public static class MyMapProcess extends Mapper<LongWritable, Text, Text, LongWritable> {
/**
* 实现map函数
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
// k1代表每行数据的行首偏移量 v1代表每行的内容
// 对获取的数据每一行切割
String[] words = v1.toString().split(" ");
for (String word: words
) {
// 封装为<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
context.write(k2, v2);
}
}
}
/**
* reduce阶段
* 针对<k2,{v2...}>这样的数据进行累加求和,转换为<k3,v3></>
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
long sum = 0L;
for (LongWritable v2: v2s
) {
sum += v2.get();
}
context.write(k2, new LongWritable(sum));
}
}
/**
* 组装job=map+reduce
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length < 2) {
System.out.println("请输入两个目录地址");
return;
}
Configuration entries = new Configuration();
String[] remainingArgs = new GenericOptionsParser(entries, args).getRemainingArgs();
Job job = Job.getInstance(entries);
// 必须设置
job.setJarByClass(WordCountJob.class);
// 指定输入路径,可以是文件也可以是目录
FileInputFormat.setInputPaths(job, new Path(remainingArgs[0]));
// 只能指定一个不存在的目录
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
// 指定map
job.setMapperClass(MyMapProcess.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// reduce指定
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 提交job
job.waitForCompletion(true);
}
[root@hadoop01 hadoop-3.2.0]# bin/hadoop jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.example.hadoop.demo.mapreduce.WordCountJob -Dmapreduce.job.queuename=offline /test/hello.txt /pointqueue
(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 提交job
job.waitForCompletion(true);
}
```shell
[root@hadoop01 hadoop-3.2.0]# bin/hadoop jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.example.hadoop.demo.mapreduce.WordCountJob -Dmapreduce.job.queuename=offline /test/hello.txt /pointqueue