《Flink学习笔记》——第三章 Flink的部署模式

不同的应用场景,有时候对集群资源的分配和占用有不同的需求。所以Flink为各种场景提供了不同的部署模式。

3.1 部署模式(作业角度/通用分类)

根据集群的生命周期资源的分配方式main方法到底在哪里执行——客户端还是Client还是JobManager、资源管理 将Flink的部署模式分为四类:

  • 会话模式

  • 单作业模式

  • 应用模式

3.1.1 会话模式(Session Mode)

会话模式首先要启动一个集群,这个集群的资源提前配置好。总共就一个集群,所有资源确定,所有作业/应用竞争这一个集群中的资源。所有job共享相同的JobManager和一定数量的TaskManager。

JobManager/TaskManager的生命周期不受job影响,在job提交之前集群预先将JobManager和TaskManager创建好,并且job执行结束之后不会被销毁。

在Client中生成JobGraph,然后提交给JobManager

image-20230609105719202

优点:

  • 资源共享,提升资源利用率
  • 运维简单,不需要重复创建、销毁JobManager和TaskManager(job结束则释放资源,集群继续正常运行,集群的生命周期是超越于作业之上的)

缺点:

  • 资源伸缩性差,由于资源提前分配好了,如果资源不够了,后续提交作业则会失败
  • 资源隔离性差,由于一个集群运行着多个作业,如果其中一个作业发生故障则可能导致JobManager宕机
  • Client生成JobGraph会消耗大量CPU,并且生成之后发送给JobManager消耗网络带宽,如果频繁的提交任务,Client的压力会非常大。并且Client是顺序提交任务的,一旦某一个任务执行时间过长就会阻塞后面提价的任务。

适合场景:

​ 单个规模小、执行时间短的大量作业

3.1.2 单作业模式(Per-Job-Mode)

会话模式由于多个作业运行在同一个集群,容易导致集群宕机,资源隔离性差。所以考虑为每个作业启动一个集群,这就是单作业模式。每个job都有单独的JobManager和TaskManager,随着job的提交而创建,job结束而销毁。

image-20230609110845233

优点:

  • 资源隔离,某个作业即使发生故障导致运行它的集群的JobManager宕机,也不会影响其它作业
  • 根据不同的job可以申请不同大小的资源,slot数量可以不一样

缺点:

  • 资源浪费,重复创建和销毁JobManager和TaskManager

  • JobManager和TaskManager的生命周期全部交给CLusterManagement管理,管理复杂

  • Client生成JobGraph会消耗大量CPU,并且生成之后发送给JobManager消耗网络带宽,如果频繁提交任务,Client的压力会非常大。并且Client是顺序提交任务的,一旦某一个任务执行时间过长就会阻塞后面提交的任务

注:这个模式由于需要对各个job的资源是分开的,各个job的JobManager、TaskManager创建和销毁等管理和资源的分配是比会话模式要复杂的。所以Flink本身暂时不支持这种模式,需要结合外部的资源管理平台来完成。

3.1.3 应用模式(Application Mode)

1.11版本之后支持应用模式,Client将jar包等发送给JobManager,在JobManager中生成JobGraph,也就是说main方法在JobManager上执行而不是客户端上执行。以应用为单位(一个应用可能多个作业),一个应用创建一个集群。这里可以看成是创建仅在特定应用程序的Job之间共享的session集群。因为一个应用可能有多个job,而这多个job是在一个集群中的。Application 模式允许提交由多个Job组成的应用程序。Job执行的顺序不受部署模式的影响,但受启动Job的调用的影响。使用阻塞的 execute()方法,将是一个顺序执行的效果,结果就是"下一个"Job的执行被推迟到“该”Job完成为止。相反,一旦提交当前作业,非阻塞executeAsync()方法将立即继续提交“下一个”Job。

image-20230609113945178

优点:

  • 降低客户端和网络压力
  • Application之间实现资源隔离,而Application里面的job实现资源共享

缺点:

  • 仅支持Yarn和kubernetes等

这里我们所讲到的部署模式,相对是比较抽象的概念。实际应用时,一般需要和资源管理平台结合起来,选择特定的模式来分配资源、部署应用。

3.1.4 Native Mode

默认只启动jobmanager,之后根据job任务提交情况,动态的申请、启动taskmanager计算资源。这个资源的动态分配,也就是资源管理是交由第三方资源管理器如Yarn、K8s等

3.2 部署模式(资源调度管理器角度)

Flink自己可以进行资源管理,但是没有那么灵活、好用。同时也可以结合第三方的资源管理器进行资源管理,flink专注业务逻辑的实现而资源的管理交由第三方资源管理器。

3.2.1 Standalone运行模式(了解)

独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。

1、会话模式部署

1)先启动一个集群

./start-cluster.sh

2)通过Client提交作业(参考第二章)

方式一:通过Web UI提交

方式二:通过命令

flink run -m hadoop102:8081 -c com.zlin.wc.StreamWordCount ./chapter2-1.0-SNAPSHOT.jar
2、单作业模式部署

Flink的Standalone集群并不支持单作业模式部署。因为单作业模式需要借助一些资源管理平台。

3、应用模式部署

应用模式下不会提前创建集群,所以不能调用start-cluster.sh脚本。我们可以使用同样在bin目录下的standalone-job.sh来创建一个JobManager

image-20230618171349536

具体步骤如下:

(1)将jar包放到lib/目录下

[root@hadoop102 lib]# cp /opt/jars/chapter2-1.0-SNAPSHOT.jar /opt/module/flink-1.17.1/lib

(2)启动JobManager

[root@hadoop102 bin]# /opt/module/flink-1.17.1/bin/standalone-job.sh start --job-classname com.zlin.wc.StreamWordCount

这里不用再指定jar包,因为脚本会到lib/目录下扫描

image-20230618172142897

image-20230618172331041

我们可以看到,虽然提交了作业,但是申请不到资源,一直处于CREATED状态。需要启动TaskManager。

(3)启动TaskManager

[root@hadoop102 bin]# /opt/module/flink-1.17.1/bin/taskmanager.sh start

image-20230618180642329

(4) 如果希望停掉集群,可以使用

/opt/module/flink-1.16.0/bin/standalone-job.sh stop
/opt/module/flink-1.16.0/bin/taskmanager.sh stop

3.2.2 Native Mode

Native模式是对Session和Per-Job的优化,把生成多少TaskManager交给资源调度器。支持的资源调度器有Yarn,Mesos,Kubernetes等。

前期准备

  • 需要安装hadoop(Yarn、HDFS)

  • 配置环境变量

    [root@hadoop102 ~]# vim ~/.bashrc
    
    #SET HADOOP_HOME
    export HADOOP_HOME=/opt/module/hadoop-2.7.7
    export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
    export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
    export HADOOP_CLASSPATH=`hadoop classpath`
    
    
  • 启动hadoop(Yarn、HDFS服务)

    [root@hadoop102 sbin]# /opt/module/hadoop-2.7.7/sbin/start-all.sh
    
1、Flink on Yarn(重点)
(1)会话模式部署

1)启动集群

第一步:首先要启动hadoop相关服务(yarn、hdfs)

第二步:执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群

bin/yarn-session.sh -nm test -d

在YARN的ResourceManager界面查看执行情况

Yarn WEB UI: http://hadoop102:8088/

image-20230108021135694

可用参数:

-d:分离模式,即yarn session后台运行

-jm:JobManager内存,默认单位MB

-nm:任务名

-qu:指定Yarn队列名

-tm:每个TaskManager的内存

2)提交作业

两种提交方式:

  • Web UI提交(略)

  • 命令行提交

    [root@hadoop102 bin]# flink run -c com.zlin.wc.StreamWordCount /opt/jars/chapter2-1.0-SNAPSHOT.jar
    

    从Yarn Web UI -> 点击对应提交的任务的application ID -> 点击Tracking URL:ApplicationMaster -> 进入到Flink web ui 可查看到提交的任务

尝试1:可以再提交一个任务吗?
可以,而且由yarn自己去动态分配,且提交到同一个application中
image-20230108024556818

尝试2:再创建一个Yarn session?

2023-06-18 18:36:55,836 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

会卡住,无法创建成功,不知道是不是资源问题还是说会话模式本来就只有一个集群不让创建。

(2)单作业模式部署
[root@hadoop102 bin]# flink run -t yarn-per-job -c com.zlin.wc.StreamWordCount /opt/jars/chapter2-1.0-SNAPSHOT.jar

image-20230618184423787

查看或者取消作业:

[atguigu@hadoop102 flink-1.17.0]$ bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
[atguigu@hadoop102 flink-1.17.0]$ bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

这里的application_XXXX_YY是当前应用的ID,是作业的ID。注意如果取消作业,整个Flink集群也会停掉。

(3)应用模式部署
  • 命令行提交

    1)提交

    [root@hadoop102 bin]# flink run-application -t yarn-application -c com.zlin.wc.StreamWordCount /opt/jars/chapter2-1.0-SNAPSHOT.jar
    

    2)查看和取消作业

    flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
    flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
    
  • 上传HDFS提交

    1)上传jar包到hdfs

    我们知道在客户端生成JobGraph一旦提交任务数过多,会造成很大压力,而应用模式部署在JM上生成JobGraph所以可以解决这个问题。但是我们提交时还是需要经过网络传输将jar包传送到JM,提交速度会受到影响。我们可以将jar包提前上传到hdfs。

    可以通过yarn.provided.lib.dirs配置选项指定位置,将jar上传到hdfs。这种方式下 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。

    [root@hadoop102 flink-1.17.1]# hadoop fs -mkdir /flink-jars
    [root@hadoop102 flink-1.17.1]# hadoop fs -put /opt/jars/chapter2-1.0-SNAPSHOT.jar /flink-jars
    

    2)上传flink的lib和plugins到HDFS上

    [root@hadoop102 flink-1.17.1]# hadoop fs -mkdir /flink-dist
    [root@hadoop102 flink-1.17.1]# hadoop fs -put lib/ /flink-dist
    [root@hadoop102 flink-1.17.1]# hadoop fs -put plugins/ /flink-dist
    

    2)提交作业

    [root@hadoop102 flink-1.17.1]# bin/flink run-application -t yarn-application	-Dyarn.provided.lib.dirs="hdfs://hadoop102:8020/flink-dist"	-c com.zlin.wc.StreamWordCount hdfs://hadoop102:8020/flink-jars/chapter2-1.0-SNAPSHOT.jar
    
2、Flink on k8s(了解)

容器化部署是如今业界流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetes(k8s),而Flink也在最近的版本中支持了k8s部署模式。后续补充。。。

附:Flink可以结合常见的资源管理器,各种资源调度器中支持的运行模式如下图:

image-20230609101718724

3.3 历史服务器

运行 Flink job 的集群一旦停止,只能去 yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位问题了,所以如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些问题。

Flink提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我们都知道只有当作业处于运行中的状态,才能够查看到相关的WebUI统计信息。通过 History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。

此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。

配置历史服务器:

1)创建存储目录

hadoop fs -mkdir -p /logs/flink-job

2)在flink配置问价flink-conf.yaml进行配置

jobmanager.archive.fs.dir: hdfs://hadoop102:9000/logs/flink-job
historyserver.web.address: hadoop102
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://hadoop102:9000/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000

注意:这里hdfs://hadoop102:9000/logs/flink-job的端口需要根据hadoop版本来定。hadoop1.x是8020,hadoop2.x是9000

3)启动历史服务器

[root@hadoop102 bin]# ./historyserver.sh start
Starting historyserver daemon on host hadoop102.

# 启动成功会有一个HistoryServer的进程
HistoryServer

4)停止历史服务器

[root@hadoop102 bin]# ./historyserver.sh stop

5)在浏览器地址栏输入:**[http://hadoop102:8082]**查看已经停止的job的统计信息

image-20230708170026146

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/96022.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Pillow:Python的图像处理库(安装与使用教程)

在Python中&#xff0c;Pillow库是一个非常强大的图像处理库。它提供了广泛的图像处理功能&#xff0c;让我们可以轻松地操作图像&#xff0c;实现图像的转换、裁剪、缩放、旋转等操作。此外&#xff0c;Pillow还支持多种图像格式的读取和保存&#xff0c;包括JPEG、PNG、BMP、…

白嫖idea

白嫖idea 地址 https://www.jetbrains.com/toolbox-app/

基于java Swing 和 mysql实现的购物管理系统(源码+数据库+说明文档+运行指导视频)

一、项目简介 本项目是一套基于java Swing 和 mysql实现的购物管理系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、项目文档、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过…

使用WSL修改docker文件存储位置

按照以下说明将其重新定位到其他驱动器/目录&#xff0c;并保留所有现有的Docker数据。 首先&#xff0c;右键单击Docker Desktop图标关闭Docker桌面&#xff0c;然后选择退出Docker桌面&#xff0c;然后&#xff0c;打开命令提示符&#xff1a; wsl --list -v您应该能够看到&a…

Linux之Shell(一)

Linux之Shell Shell概述Linux提供的Shell解析器bash和sh的关系Centos默认的解析器是bash Shell脚本入门脚本格式第一个脚本脚本常用的执行方式 变量系统预定义变量自定义变量特殊变量$n$#\$*、\$$? 运算符条件判断流程控制(▲)if判断case语句for循环while循环 read读取控制台输…

如何判断一个java对象还活着

引用计数算法 引用计数器的算法是这样的&#xff1a;在对象中添加一个引用计数器&#xff0c;每当有一个地方引用它时&#xff0c;计数器值就加一&#xff1b;当引用失效时&#xff0c;计数器值就减一&#xff1b;任何时刻计数器为零的对象就是不可能再被使用的。 缺点&#x…

el-select范围选择框

1、html <el-select v-model"searchForm.hour" :class"searchForm.hour?.length>1?edit-tag-hour:keep-tag-hour" filterable multiple clearable :multiple-limit"2" remove-tag"removeChange" change"hourChange"…

Vscode画流程图

1.下载插件 Draw.id Integration 2.桌面新建文件&#xff0c;后缀名改为XXX.drawio 在vscode打开此文件 &#xff0c;就可以进行绘制流程图啦

音频基本知识

声音传播方式: 1)声音的传播需要介质,在真空中不能传播; 2)声波属于纵波,即如下图传播方向与振动方向一致; 声音速度: 1)常温常压下,一般空气速度为340m/s; 2)温度越高,声速越大; 3)液体、固体的传播速度比空气快; 人耳可接收到的频域范围: 1)通常范围…

Spring boot中调用C/C++(dll)

添加JNA依赖 <dependency><groupId>net.java.dev.jna</groupId><artifactId>jna</artifactId><version>5.5.0</version> </dependency>准备C代码/C代码 如下是C代码&#xff0c;文件名&#xff1a;xizi.c #include <std…

Python实战之数据表提取和下载自动化

在网络爬虫领域&#xff0c;动态渲染类型页面的数据提取和下载自动化是一个常见的挑战。本文将介绍如何利用Pyppeteer库完成这一任务&#xff0c;帮助您轻松地提取动态渲染页面中的数据表并实现下载自动化。 一、环境准备 首先&#xff0c;确保您已经安装了Python环境。接下来…

阔别线下三年的BIRTV影视盛会:有哪些变革式创新应用?

2023年8月26日&#xff0c;以“融合创新 面向未来”为主题的第三十届北京国际广播电影电视展览会&#xff08;BIRTV 2023&#xff09;收官。这是一场阔别线下三年的行业顶尖盛会&#xff0c;展馆处处人潮涌动。 接下来盘点一下&#xff0c;本次BIRTV的一些特色应用&#xff1a…

《vue3实战》通过indexOf方法实现电影评价系统的模糊查询功能

目录 前言 一、indexOf是什么&#xff1f;indexOf有什么作用&#xff1f; 含义&#xff1a; 作用&#xff1a; 二、功能实现 这段是查询过程中过滤筛选功能的代码部分: 分析&#xff1a; 这段是查询用户和性别功能的代码部分&#xff1a; 分析&#xff1a; 三、最终效…

Python-pyqt不同窗口数据传输【使用静态函数】

文章目录 前言程序1&#xff1a;caogao1.py输入数据界面程序2&#xff1a;caogao2.py接收数据界面 程序3 &#xff1a;将输入数据界面和接收数据界面组合成一个总界面讲解 总结 前言 在编写pyqt 页面时有时候需要不同页面进行数据传输。本文讲解静态函数方法。直接看示例。 程…

pandas数据分析之数据绘图

一图胜千言&#xff0c;将信息可视化&#xff08;绘图&#xff09;是数据分析中最重要的工作之一。它除了让人们对数据更加直观以外&#xff0c;还可以帮助我们找出异常值、必要的数据转换、得出有关模型的想法等等。pandas 在数据分析、数据可视化方面有着较为广泛的应用。本文…

Python爬虫(十六)_JSON模块与JsonPath

数据提取之JSON与JsonPATH JSON(JavaScript Object Notation)是一种轻量级的数据交换格式&#xff0c;它是的人们很容易的进行阅读和编写。同时也方便了机器进行解析和生成。适用于进行数据交互的场景&#xff0c;比如网站前台与后台之间的数据交互。 JSON和XML的比较可谓不相…

完善区域企业监测预警机制,助推区域产业可持续发展

“五度易链”产业大数据解决方案由产业经济、智慧招商、企业服务、数据服务四大应用解决方案组成&#xff0c;囊括了产业经济监测、产业诊断分析、企业监测预警、企业综合评估、大数据精准招商、招商智能管理、企业管理、企业培育、企业市场服务、企业金融服务、产业数据开放服…

【C++进阶(三)】STL大法--vector迭代器失效深浅拷贝问题剖析

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习C   &#x1f51d;&#x1f51d; vector-下 1. 前言2. 什么是迭代器失效?3. 迭代…

Flink CDC学习笔记

第一章 CDC简介 1.1 什么是CDC ​ CDC (Change Data Capture 变更数据获取&#xff09;的简称。核心思想就是&#xff0c;检测并获取数据库的变动&#xff08;增删查改&#xff09;&#xff0c;将这些变更按发生的顺序记录下来&#xff0c;写入到消息中间件以供其它服务进行订…

前端基础---HTML笔记汇总一

HTML定义 HTML超文本标记语言——HyperText Markup Language。 超文本是什么&#xff1f; 链接标记是什么&#xff1f; 标记也叫标签&#xff0c;带尖括号的文本 标签分类 单标签:只有开始标签&#xff0c;没有结束标签(<br>换行 <hr>水平线 <img> 图像标…