1. 什么是Yarn模式部署Flink
独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但我们知道,Flink 是大数据计算框架,不是资源调度框架
,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。
而在目前大数据生态中,国内应用最为广泛的资源管理平台就是 YARN 了。本文主要介绍在强大的 YARN 平台上 Flink 是如何集成部署的。
利用Yarn可以实现Flink的三种集群模式,即:会话模式、单作业模式、应用模式,比较常用的就是利用Yarn部署Flink的会话模式,由于会话模式也已经实现了动态分配,所以没有了普通会话模式的局限了,因此会话模式是最常见的。
整体来说,YARN 上部署的过程是:
- 客户端把 Flink 应用提交给 Yarn 的 ResourceManager,
- Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。
- 在这些容器上,Flink 会部署 JobManager 和 TaskManager 的实例,从而启动集群。
- Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源。
2. 环境准备
- 在 Flink 1.11.0 版本之后,增加了很多重要新特性,其中就包括增加了对Hadoop3.0.0以及更高版本Hadoop的支持,不再提供“flink-shaded-hadoop-*”jar 包,而是通过配置环境变量完成与 YARN 集群的对接。
Flink 1.8.0的注意事项
- 在
Flink1.8.0
之前的版本,想要以 YARN 模式部署 Flink 任务时,需要 Flink 是有 Hadoop 支持的。 - 从 Flink 1.8 版本开始,不再提供基于 Hadoop 编译的安装包,若需要 Hadoop 的环境支持,需要自行在官网下载 Hadoop 相关版本的组件
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
,并将该组件上传至Flink 的 lib
目录下。
在将 Flink 任务部署至 YARN 集群之前,需要确认集群是否安装有 Hadoop,保证 Hadoop 版本至少在 2.2 以上,并且集群中安装有 HDFS 服务。
具体配置步骤如下:
(1)配置环境变量,增加环境变量配置如下:
vim /etc/profile.d/my_env.sh
# 添加以下内容
HADOOP_HOME=/opt/module/hadoop-2.7.5
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
# 刷新环境变量使其生效
source /etc/profile.d/my_env.sh
这里必须保证设置了环境变量 HADOOP_CLASSPATH。
(2)启动 Hadoop 集群,包括 HDFS 和 YARN。
start-dfs.sh
start-yarn.sh
(3)修改 flink的配置文件,若在提交命令中不特定指明,这些配置将作为默认配置。
vim /app/apps/flink-1.13.0/conf/flink-conf.yaml
#修改以下配置
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
3. Yarn部署Flink会话模式
YARN
的会话模式与独立集群
略有不同,需要首先申请一个 YARN 会话(YARN session)来启动 Flink 集群。具体步骤如下:
3.1 启动集群
(1)启动 hadoop 集群(HDFS, YARN)。
(2)执行脚本命令向 YARN 集群申请资源,开启一个 YARN 会话,启动 Flink 集群。
/app/apps/flink-1.13.0/bin/yarn-session.sh -d -nm test
可用参数解读:
-d
:分离模式,后台运行YARN session。
-jm
:(–jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。
-nm
:(–name):配置在 YARN UI 界面上显示的任务名。
-qu
:(–queue):指定 YARN 队列名。
-tm
:(–taskManager):配置每个 TaskManager 所使用内存。
注意:Flink1.11.0
版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,YARN 会按照需求动态分配
TaskManager 和 slot。所以从这个意义上讲,YARN 的会话模式也不会把集群资源固定,同样是动态分配的。
YARN Session 启动之后会给出一个 web UI 地址以及一个 YARN application ID,如下所示,用户可以通过 web UI 或者命令行两种方式提交作业。
3.2 提交作业
(1)通过 Web UI 提交作业,这种方式比较简单,与上文所述 Standalone 部署模式基本相同。
(2)通过命令行提交作业:
- 将 Standalone 模式讲解中打包好的任务运行 JAR 包上传至集群
- 执行以下命令将该任务提交到已经开启的 Yarn-Session 中运行。
$ bin/flink run -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
客户端可以自行确定 JobManager 的地址,也可以通过-m 或者-jobmanager 参数指定JobManager 的地址,JobManager 的地址在 YARN Session 的启动页面中可以找到。
- 任务提交成功后,可在 YARN 的 Web UI 界面查看运行情况。