前言
今天是Flink学习的第二天,我的心情异常的复杂哈哈哈(苦笑),学习上还是比较顺利的,感情上我并不擅长,所以心情波动大在所难免。害,至少还有学习让我不被各种糟糕琐碎的日常生活里的人和事所影响。
不管是学习还是生活,保持积极的心态很重要,不要好高骛远,不要想着遥远的目标不敢去努力。
今天摘录《解忧杂货店》的一句话:
如果自己不想积极认真地生活,不管得到什么样的回答都没用。
Flink 提交 Job 的方式
1、Web 端提交 Job
Web UI 的方式就不做介绍了,完全傻瓜式的点击。无非就是需要添加4个参数:
- 入口类的全限定名。
- 并行度(不能超过可用资源,我们的并行度也就是我们 Flink 集群中任务槽的个数Task Slots,默认一个TaskManager 一个任务槽Task Slot)。
- 运行时的参数。
- 检查点的目录(这两里的检查点的概念类似于我们 Spark 中的 checkPoint ,他们都是用来解决预防实时任务失败或关闭重启造成的数据丢失,所以设置一个检查点来对之前的数据进行恢复,那必然是需要额外的资源开销的,比如磁盘开销)。
至于取消任务,UI 端有一个大大的 Cancel。
2、命令行提交 Job
$ bin/flink run -m hadoop102:8081 -c com.study.lyh.wc.StreamWordCount ./FlinkStudy-1.0-SNAPSHOT.jar
如果资源不足,再提交一个任务会使用默认最低的并行度,但要是最低的并行度也不能满足,就直接任务失败。除非上一个任务结束释放资源。
$ bin/flink cancel jobId
部署模式
Flink 一共有三种部署模式:会话模式、单作业模式和应用模式。
1、会话模式
会话模式是先启动集群,然后客户端将用户的程序代码转换成数据流图(Dataflow Graph),并最终生成作业图(JobGraph),然后一并发送给JobManager;作业提交后,JobManager 为作业分配相应的资源,作业完成就释放资源,而集群并不因为没有作业就关闭。
当然缺点也是显而易见的:因为资源是共享的,所以资源不够了,提交新的作业就会失败。另外,同一个 TaskManager 上可能运行了很多作业,如果其中一个发生故障导致 TaskManager 宕机,那么所有作业都会受到影响(因为当作业很多的时候,我们的资源TaskSlot被占满了,所以当有TaskManager节点宕机时,就无法保证容错,因为已经没有空闲资源供使用了,而且后面可能还有一堆作业等着处理呢)。
2、单作业模式
单作业模式也很好理解,就是严格的一对一,集群只为这个作业而生。同样由客户端运行应用程序,然后启动集群,作业被提交给 JobManager,进而分发给 TaskManager 执行。作业完成后,集群就会关闭,所有资源也会释放。这样一来,每个作业都有它自己的 JobManager管理,占用独享的资源,即使发生故障,它的 TaskManager 宕机也不会影响其他作业。
这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。需要注意的是,Flink 本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如 YARN、Kubernetes。
3、应用模式
应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由 JobManager 执行应用程序的,并且即使应用包含了多个作业,也只创建一个集群。
总结
- 在会话模式下,集群的生命周期独立于集群上运行的任何作业的生命周期,并且提交的所有作业共享资源。
- 而单作业模式为每个提交的作业创建一个集群,带来了更好的资源隔离,这时集群的生命周期与作业的生命周期绑定。
- 最后,应用模式为每个应用程序创建一个会话集群,在 JobManager 上直接调用应用程序的 main()方法。
Yarn
Flink 独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是比如独立模式中的单作业模式不依赖外部资源管理框架就无法实现。而我们刚开始提到的提交Job的方式默认就是Flink独立模式下的会话模式。
我们知道,Flink只是一个流式计算框架,它并不擅长资源的管理,所以我们这里使用YARN,而且即使 Flink 的独立模式本就支持会话模式,我们还是使用YARN来管理,毕竟 YARN 是专业的资源调度框架嘛。
Yarn 部署的流程:
客户端把 Flink 应用提交给 Yarn 的 ResourceManager, Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源。
1、Yarn 会话模式
1.1、启动 yarn-session
bin/yarn-session.sh -nm test -d
-nm :指定应用名称
-d :意思是挂载到后台,我们的hadoop102可以 ctrl+C 停止当前会话而并不会关闭 yarn session。
执行该命令后返回的信息中,需要记住两条信息,一个是给我们的 Web UI 端口,一个是关闭该会话的命令。
1.2、提交 Job
注意:这里提交 Job 的用户需要和创建 YARN-Session 的用户保持一致,否则会报错:
The main method caused an error: Failed to execute job ‘Flink Streaming Job’...拒绝连接
bin/flink run -c com.lyh.wc.UnBoundedStreamWordCount FlinkStudy-1.0-SNAPSHOT.jar
1.3、关闭 yarn-session 会话
echo "stop" | ./bin/yarn-session.sh -id application_1699415564762_0001
2、Yarn 单作业模式
2.1、提交作业
在 YARN 环境中,由于有了外部平台做资源调度,所以我们也可以直接向 YARN 提交一个单独的作业,从而启动一个 Flink 集群。
$ bin/flink run -d -t yarn-per-job -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar