基于Yarn搭建Flink
1. 概述
1.1 Yarn 简介
Apache Hadoop YARN是一个资源提供程序,受到许多数据处理框架的欢迎。Flink服务被提交给 YARN 的 ResourceManager,后者再由 YARN NodeManager 管理的机器上生成容器。Flink 将其 JobManager 和 TaskManager 实例部署到此类容器中。
Flink 可以根据在 JobManager 上运行的作业所需的处理槽数量动态分配和取消分配任务管理器资源。
1.2 Flink的重要角色
-
JobManager:类似spark中master,负责资源申请,任务分发,任务调度执行,checkpoint的协调执行;可以搭建HA,双master。
-
TaskManager:类似spark中的worker,负责任务的执行,基于dataflow(spark中DAG)划分出的task;与jobmanager保持心跳,汇报任务状态。
2. Yarn环境准备
本环境基于Hadoop-3.1.4搭建,可以参考《Hadoop3.1.4分布式搭建》
-
通过运行 yarn top 来确保您的 YARN 集群已准备好接受 Flink 应用程序。它应该不显示任何错误消息。
yarn top
-
从下载页面下载最近的 Flink 发行版并解压缩。
wget https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-bin-scala_2.12.tgz
-
重要事项 请确保设置了HADOOP_CLASSPATH环境变量(可以通过运行 echo $HADOOP_CLASSPATH 进行检查)。如果没有,请配置使用。
for i in {133..135} 151 157; do echo -e "\n********************************** Config ubuntu@10.10.10.$i **********************************\n" scp ~/Downloads/flink-1.15.4-bin-scala_2.12.tgz ubuntu@10.10.10.$i:~/; ssh ubuntu@10.10.10.$i 'sudo mkdir -p /opt/software/; \ #sudo rm -f /etc/profile.d/Z99-wntime-env-config.sh; \ sudo touch /etc/profile.d/Z99-wntime-env-config.sh; \ sudo tar -zxf ~/flink-1.15.4-bin-scala_2.12.tgz -C /opt/software/;'; # config env rm -rf /tmp/"10.10.10.$i"/; mkdir -p /tmp/"10.10.10.$i"/; scp ubuntu@10.10.10.$i:/etc/profile.d/Z99-wntime-env-config.sh /tmp/"10.10.10.$i"/Z99-wntime-env-config.sh; sudo cat>>/tmp/"10.10.10.$i"/Z99-wntime-env-config.sh<<EOF # HADOOP_CLASSPATH export HADOOP_CLASSPATH=\`hadoop classpath\` export FLINK_HOME=/opt/software/flink-1.15.4 export PATH=\$PATH:\$FLINK_HOME/bin EOF cat /tmp/10.10.10.$i/Z99-wntime-env-config.sh; scp /tmp/10.10.10.$i/Z99-wntime-env-config.sh ubuntu@10.10.10.$i:~/Z99-wntime-env-config.sh; ssh ubuntu@10.10.10.$i 'sudo mv ~/Z99-wntime-env-config.sh /etc/profile.d/Z99-wntime-env-config.sh; \ sudo chmod +x /etc/profile.d/Z99-wntime-env-config.sh; \ source /etc/profile; \ echo $HADOOP_CLASSPATH;' done;
配置conf及权限
for i in {133..135} 151 157; do echo -e "\n********************************** Config ubuntu@10.10.10.$i **********************************\n" ssh ubuntu@10.10.10.$i 'sudo chown -R ubuntu:ubuntu /opt/software/; \ source /etc/profile; \ flink -v;'; done;
for i in {133..135} 151 157; do echo -e "\n********************************** Config ubuntu@10.10.10.$i **********************************\n" # 指定jobmanager,yarn模式会被覆盖不起作用; ssh ubuntu@10.10.10.$i "sed -i 's/jobmanager.rpc.address: localhost/jobmanager.rpc.address: k8s-m134/g' /opt/software/flink-1.15.4/conf/flink-conf.yaml"; ssh ubuntu@10.10.10.$i "sed -i 's/host: localhost/host: 0.0.0.0/g' /opt/software/flink-1.15.4/conf/flink-conf.yaml"; ssh ubuntu@10.10.10.$i "sed -i 's/address: localhost/address: 0.0.0.0/g' /opt/software/flink-1.15.4/conf/flink-conf.yaml"; # yarn模式仍旧起作用; ssh ubuntu@10.10.10.$i "sed -i 's/#web.submit.enable: false/web.submit.enable: true/g' /opt/software/flink-1.15.4/conf/flink-conf.yaml"; ssh ubuntu@10.10.10.$i "sed -i 's/#web.cancel.enable: false/web.cancel.enable: true/g' /opt/software/flink-1.15.4/conf/flink-conf.yaml"; ssh ubuntu@10.10.10.$i "sed -i 's/# io.tmp.dirs: \/tmp/io.tmp.dirs: \/opt\/software\/flink-1.15.4\/data/g' /opt/software/flink-1.15.4/conf/flink-conf.yaml"; ssh ubuntu@10.10.10.$i "sed -i 's/#rest.port: 8081/#rest.port: 8081\nrest.flamegraph.enabled: true/g' /opt/software/flink-1.15.4/conf/flink-conf.yaml"; # classloader.check-leaked-classloader ssh ubuntu@10.10.10.$i "sed -i 's/# classloader.resolve-order: child-first/# classloader.resolve-order: child-first\nclassloader.check-leaked-classloader: false/g' /opt/software/flink-1.15.4/conf/flink-conf.yaml"; done;
3. FLINK 在 YARN 上支持的部署模式
对于生产使用,官方建议在 Application Mode下部署Flink应用程序,因为它可以在应用程序之间提供更好的隔离。
3.1 Session Mode
此部署模式,共享jobmanager和taskmanager,所有的job都在一个runtime中运行。
- 优点:
- 启动集群只有jobmanager,提交job才去yarn申请资源启动taskmanager,任务完成自动释放taskmanager,资源伸缩性好。
- 资源利用率高。
- 缺点:
- 资源隔离性差
开启flink yarn-session集群
使用bin/yarn-session.sh --help 查看可用参数
会话模式有两种操作模式:
-
attached mode(默认):yarn-session.sh 客户端将 Flink 集群提交到 YARN,但客户端继续运行,跟踪集群的状态。如果群集失败,客户端将显示错误。如果客户端被终止,它也会向群集发出关闭信号。
-
detached mode(-d 或 --detached):yarn-session.sh 客户端将 Flink 集群提交到 YARN,然后客户端返回。需要再次调用客户端或 YARN 工具来停止 Flink 集群。
# 启动
./bin/yarn-session.sh -d
# 会话模式将在 /tmp/中创建一个隐藏的 YARN 属性文件/tmp/.yarn-properties-<username>,提交作业时,命令行界面将选取该文件以进行群集发现。
# 输出
2023-05-30 13:33:24,304 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Inter
JobManager Web Interface: http://k8s-m135:38027
2023-05-30 13:33:24,541 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - The Flink YARN
$ echo "stop" | ./bin/yarn-session.sh -id application_1685349449822_0003
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1685349449822_0003
Note that killing Flink might not clean up all job artifacts and temporary files.
此时 Flink JobManager 运行在 k8s-m135节点,WebUI访问地址 http://k8s-m135:38027
也可以通过Yarn WebUI访问
提交任务
- 方式一,指定flink集群,application_1685349449822_0006是flink集群yarn app-id
./bin/flink run -t yarn-session -c org.apache.flink.streaming.examples.windowing.TopSpeedWindowing -Dyarn.application.id=application_1685349449822_0007 ./examples/streaming/TopSpeedWindowing.jar
注意:
若未指定 -c fullclassname时,报以下错误:
若未指定 -Dyarn.application.id时,可能报与上面同样错误,或No ExecutorFactory found to execute the application
测试最终需要同时指定 -c 及 -Dyarn.application.id,任务才能正常启动
/tmp/.yarn-properties-ubuntu已发现,但不知什么原因未生效。
- 方式二,在flink web界面上传jar包提交
停止flink yarn-session集群
- 进入客户端交互界面
./bin/yarn-session.sh -id application_1685349449822_0003
优雅地停止任务
echo "stop" | ./bin/yarn-session.sh -id application_1685349449822_0003
- 强制杀掉任务
yarn application -kill application_1685349449822_0003
NOTE: 启动集群只有jobmanager,提交job才去yarn申请资源启动taskmanager,任务完成自动释放taskmanager
3.2 Per-Job Mode (deprecated)
独享jobmanager和taskmanager,为每一个job独胆启动一个runtime
- 优点:
- 资源充分隔离
- 劣势:
- 资源相对比较浪费
Per job Cluster模式将在YARN上启动Flink集群,然后在本地运行提供的应用程序jar,最后将JobGraph提交给YARN上的JobManager。如果传递–separated参数,那么一旦提交被接受,客户端就会停止。
一旦作业停止,YARN集群将停止。
./bin/flink run -t yarn-per-job -d ./examples/streaming/TopSpeedWindowing.jar
若提示以下错误,需要在conf/flink-conf.yml配置classloader.check-leaked-classloader: false
配置后无报错
一旦部署了“Per-Job Cluster”,就可以与它交互以执行、取消或获取保存点等操作。
# List running job on the cluster
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
3.3 Application Mode
application的main()运行在jobmanager上,而不是在客户端。每一个application对应一个runtime,application中可以含有多个job。
- 优点:
- job graph在flink集群内部生成,可以减轻客户端压力
- application实现了更合理的资源隔离策略
- 缺点:
- 新功能,尚未经过生产大规模验证
开启flink yarn-application集群
./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
# List running job on the cluster
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
停止flink yarn-application集群
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
注意:
当同时运行多个任务时,会出现有的任务一直处于ACCEPTED
解决这个问题,只需要更改Hadoop的配置文件:…/etc/hadoop/capacity-scheduler.xml
选项:yarn.scheduler.capacity.maximum-am-resource-percent从0.1改成更大,最大为1;
意思是集群中可用于运行应用程序主机的最大资源百分比 - 控制并发活动应用程序的数量。每个队列的限制与其队列容量和用户限制成正比。官方文档默认是每个队列最大使用10%的资源,即0.1
yarn.scheduler.capacity.maximum-am-resource-percent
for i in {133..135} 151 157;
do
echo -e "\n********************************** Config ubuntu@10.10.10.$i **********************************\n"
ssh ubuntu@10.10.10.$i 'cat /opt/software/hadoop-3.1.4/etc/hadoop/capacity-scheduler.xml|grep "percent";';
# 这个配置文件中只有这个是percent,所以直接修改了
ssh ubuntu@10.10.10.$i "sed -i 's/<value>0.1<\/value>/<value>0.5<\/value>/' /opt/software/hadoop-3.1.4/etc/hadoop/capacity-scheduler.xml;";
done;