Flink 优化 (一) --------- 资源配置调优

目录

  • 一、内存设置
    • 1. TaskManager 内存模型
    • 2. 生产资源配置示例
  • 二、合理利用 cpu 资源
    • 1. 使用 DefaultResourceCalculator 策略
    • 2. 使用 DominantResourceCalculator 策略
    • 3 使用 DominantResourceCalculator 策略并指定容器 vcore 数
  • 三、并行度设置
    • 1. 全局并行度计算
    • 2. Source 端并行度的配置
    • 3. Transform 端并行度的配置
    • 4. Sink 端并行度的配置


Flink 性能调优的第一步,就是为任务分配合适的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。

提交方式主要是 yarn-per-job,资源的分配在使用脚本提交 Flink 任务时进行指定。

➢ 标准的 Flink 任务提交脚本 (Generic CLI 模式)

从 1.11 开始,增加了通用客户端模式,参数使用 -D <property=value> 指定

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定 yarn 队列
-Djobmanager.memory.process.size=1024mb \ 指定 JM 的总进程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每个 TM 的总进程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每个 TM 的 slot 数
-c com.fancy.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

参数列表:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/deployment/config.html


一、内存设置

1. TaskManager 内存模型

在这里插入图片描述

① 内存模型详解

  • JVM 特定内存:JVM 本身使用的内存,包含 JVM 的 metaspace 和 over-head
    1)JVM metaspace:JVM 元空间
    taskmanager.memory.jvm-metaspace.size,默认 256mb
    2)JVM over-head 执行开销:JVM 执行时自身所需要的内容,包括线程堆栈、IO、编译缓存等所使用的内存。
    taskmanager.memory.jvm-overhead.fraction,默认 0.1
    taskmanager.memory.jvm-overhead.min,默认 192mb
    taskmanager.memory.jvm-overhead.max,默认 1gb
    总进程内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max 大小

  • 框架内存:Flink 框架,即 TaskManager 本身所占用的内存,不计入 Slot 的资源中。
    堆内:taskmanager.memory.framework.heap.size,默认 128MB
    堆外:taskmanager.memory.framework.off-heap.size,默认 128MB

  • Task 内存:Task 执行用户代码时所使用的内存
    堆内:taskmanager.memory.task.heap.size,默认 none,由 Flink 内存扣除掉其他部分的内存得到。
    堆外:taskmanager.memory.task.off-heap.size,默认 0,表示不使用堆外内存

  • 网络内存:网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区
    堆外:taskmanager.memory.network.fraction,默认 0.1
    taskmanager.memory.network.min,默认 64mb
    taskmanager.memory.network.max,默认 1gb
    Flink 内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max大小

  • 托管内存:用于 RocksDB State Backend 的本地内存和批的排序、哈希表、缓存中间结果
    堆外:taskmanager.memory.managed.fraction,默认 0.4
    taskmanager.memory.managed.size,默认 none
    如果 size 没指定,则等于 Flink 内存*fraction

② 案例分析
基于Yarn模式,一般参数指定的是总进程内存,taskmanager.memory.process.size,比如指定为 4G,每一块内存得到大小如下:

(1) 计算 Flink 内存
JVM 元空间 256mJVM 执行开销: 4g*0.1=409.6m,在[192m,1g]之间,最终结果 409.6m
Flink 内存=4g-256m-409.6m=3430.4m

(2) 网络内存=3430.4m*0.1=343.04m,在[64m,1g]之间,最终结果 343.04m

(3) 托管内存=3430.4m*0.4=1372.16m

(4) 框架内存,堆内和堆外都是 128m

(5) Task 堆内内存=3430.4m-128m-128m-343.04m-1372.16m=1459.2m

在这里插入图片描述
在这里插入图片描述
所以进程内存给多大,每一部分内存需不需要调整,可以看内存的使用率来调整。

2. 生产资源配置示例

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定 yarn 队列
-Djobmanager.memory.process.size=2048mb \ JM2~4G 足够
-Dtaskmanager.memory.process.size=4096mb \ 单个 TM2~8G 足够
-Dtaskmanager.numberOfTaskSlots=2 \ 与容器核数 1core:1slot 或 2core:1slot
-c com.fancy.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

Flink 是实时流处理,关键在于资源情况能不能抗住高峰时期每秒的数据量,通常用QPS/TPS 来描述数据情况。

二、合理利用 cpu 资源

Yarn 的容量调度器默认情况下是使用“DefaultResourceCalculator”分配策略,只根据内存调度资源,所以在 Yarn 的资源管理页面上看到每个容器的 vcore 个数还是 1。

可以修改策略为 DominantResourceCalculator,该资源计算器在计算资源的时候会综合考虑 cpu 和内存的情况。在 capacity-scheduler.xml 中修改属性:

<property>
	<name>yarn.scheduler.capacity.resource-calculator</name>
	<!-- <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>-->
	<value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>

1. 使用 DefaultResourceCalculator 策略

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.fancy.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

可以看到一个容器只有一个 vcore:

在这里插入图片描述

2. 使用 DominantResourceCalculator 策略

修改后 yarn 配置后,分发配置并重启 yarn,再次提交 flink 作业:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.fancy.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

看到容器的 vcore 数变了:

JobManager 1 个,占用 1 个容器,vcore=1
TaskManager 3 个,占用 3 个容器,每个容器 vcore=2,总 vcore=2*3=6,因为默认单个容器的 vcore 数=单 TM 的 slot 数

3 使用 DominantResourceCalculator 策略并指定容器 vcore 数

指定 yarn 容器的 vcore 数,提交:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Dyarn.containers.vcores=3 \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.fancy.flink.tuning.UvDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

在这里插入图片描述

JobManager1 个,占用 1 个容器,vcore=1
TaskManager3 个,占用 3 个容器,每个容器 vcore =3,总 vcore=3*3=9

三、并行度设置

1. 全局并行度计算

开发完成后,先进行压测。任务并行度给 10 以下,测试单个并行度的处理上限。然后总 QPS/单并行度的处理能力 = 并行度。开发完 Flink 作业,压测的方式很简单,先在 kafka 中积压数据,之后开启 Flink 任务,出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。

不能只从 QPS 去得出并行度,因为有些字段少、逻辑简单的任务,单并行度一秒处理几万条数据。而有些数据字段多,处理逻辑复杂,单并行度一秒只能处理 1000 条数据。

最好根据高峰期的 QPS 压测,并行度*1.2 倍,富余一些资源。

2. Source 端并行度的配置

数据源端是 Kafka,Source 的并行度设置为 Kafka 对应 Topic 的分区数。如果已经等于 Kafka 的分区数,消费速度仍跟不上数据生产速度,考虑下 Kafka 要扩大分区,同时调大并行度等于分区数。

Flink 的一个并行度可以处理一至多个分区的数据,如果并行度多于 Kafka 的分区数,那么就会造成有的并行度空闲,浪费资源。

3. Transform 端并行度的配置

➢ Keyby 之前的算子
一般不会做太重的操作,都是比如 map、filter、flatmap 等处理较快的算子,并行度可以和 source 保持一致。

➢ Keyby 之后的算子
如果并发较大,建议设置并行度为 2 的整数次幂,例如:128、256、512;小并发任务的并行度不一定需要设置成 2 的整数次幂;大并发任务如果没有 KeyBy,并行度也无需设置为 2 的整数次幂;

4. Sink 端并行度的配置

Sink 端是数据流向下游的地方,可以根据 Sink 端的数据量及下游的服务抗压能力进行评估。如果 Sink 端是 Kafka,可以设为 Kafka 对应 Topic 的分区数。Sink 端的数据量小,比较常见的就是监控告警的场景,并行度可以设置的小一些。Source 端的数据量是最小的,拿到 Source 端流过来的数据后做了细粒度的拆分,数据量不断的增加,到 Sink 端的数据量就非常大。那么在 Sink 到下游的存储中间件的时候就需要提高并行度。

另外 Sink 端要与下游的服务进行交互,并行度还得根据下游的服务抗压能力来设置,如果在 Flink Sink 这端的数据量过大的话,且 Sink 处并行度也设置的很大,但下游的服务完全撑不住这么大的并发写入,可能会造成下游服务直接被写挂,所以最终还是要在 Sink处的并行度做一定的权衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/11783.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

谷歌云服务器centos9的docker部署chat-web,实现自己的ChatGPT

谷歌云服务器centos9的docker部署chat-web&#xff0c;实现自己的ChatGPT 前提条件&#xff1a;准备一个境外服务器和chatgpt的key。&#xff08;网上教程很多&#xff09; 1.更新yum yum update2.下载docker-ce的repo curl https://download.docker.com/linux/centos/dock…

【iOS开发-响应者链Responder Chain】

文章目录0.0 前言1 响应者链&#xff08;Responder Chain1.1 响应者1.2 响应链事件1.3 响应者对象1.3.1 常见的响应者对象1.3.3 UIResponder1.3 UITouch1.3.1 UITouch的属性1.3.2 UITouch的方法1.4 UIEvent1.4.2 获取touch1.5 完整的响应者链1.5.1寻找响应者的hitTest方法1.5.2…

excel在文本的固定位置插入字符、进行日期和时间的合并

1.excel在文本的固定位置插入字符 如上图&#xff0c;现在想要将其转化为日期格式&#xff08;比如2017/1/1&#xff09;&#xff0c;但是当设置单元格格式为日期时却显示出很多&#xff03;。我们可以通过在20170101中添加两个斜杠“/”来将其转化为2017/1/1。可以用replace函…

【分享】如何移除PDF密码?

相信不少小伙伴在工作的时候&#xff0c;经常会为了保证PDF文档的安全和私密而给文件设置“打开密码”&#xff0c;但如果后续需要频繁使用该文件&#xff0c;每次打开都要查找输入密码&#xff0c;就会使得工作效率大大降低耽误工作时间。那后续不需要设置保护了&#xff0c;要…

如何用ChatGPT写文章?只需要这3步,10倍提升写作效率

随着技术的不断进步和创新&#xff0c;我们的生活方式和工作方式也在不断变化。在日常工作中&#xff0c;越来越多的人使用人工智能和机器学习等技术提高效率减少时间成本。最近ChatGPT火出圈了&#xff0c;很多人通过使用ChatGPT提高了工作效率。那么&#xff0c;在写作领域&a…

【C++ 三】一维数组、二维数组

数组概述、一维数组、二维数组 文章目录数组概述、一维数组、二维数组前言1 数组1.1 概述2 一维数组2.1 一维数组定义方式2.2 一维数组数组名2.3 冒泡排序3 二维数组3.1 二维数组定义方式3.2 二维数组数组名总结前言 本文包含数组概述、一维数组、二维数组。 1 数组 1.1 概述…

Jina AI 创始人肖涵博士:揭秘 Auto-GPT 喧嚣背后的残酷真相

Auto-GPT 究竟是一个开创性的项目&#xff0c;还是一个被过度炒作的 AI 实验&#xff1f;本文为我们揭开了喧嚣背后的真相&#xff0c;并揭示了 Auto-GPT 不适合实际应用的生产局限性。 背景介绍 这两天&#xff0c;Auto-GPT&#xff0c;一款让最强语言模型 GPT-4 能够自主完成…

口令暴力破解--Ftp协议暴力破解与Ssh协议暴力破解

Ftp协议暴力破解 FTP服务检测 FTP服务 FTP是一种文件传输协议&#xff0c; FTP服务默认端口为21。利用FTP服务器可以在本地主机和远程主机间进行文件传输。当FTP没有配置好安全控制&#xff0c;如对登录的源地址及密码尝试次数做限制&#xff0c;那么就会存在暴力破解可能。…

计算机网络

文章目录数据传输过程及接收过程应用层传输层TCP/IP4层网络模型 1.应用层 2.传输层 3.网络层 4.数据链路层 数据传输过程及接收过程 用户A在聊天软件上输入hello world按下发送,发送给B 一: 传输 1.应用层:构建一个应用层的数据报文交给传输层 ** 2.传输层: 根据刚才传过来的…

java虚拟机反射机制

&#xff08;1&#xff09;Java虚拟机反射机制的定义&#xff1f; Java反射机制是在运行状态中&#xff0c;对于任意一个类&#xff0c;都能够知道这个类的所有属性和方法;对于任意一个对象&#xff0c;都能够调用它的任意方法和属性;这种动态获取信息以及动态调用对象方法的功…

MySQL事务隔离级别

一、概念说明 脏读&#xff1a;指的是读到了其他事务未提交的数据&#xff0c;未提交意味着这些数据可能会回滚&#xff0c;也就是可能最终不会存到数据库中&#xff0c;也就是不存在的数据。读到了并不一定最终存在的数据&#xff0c;这就是脏读。 可重复读&#xff1a;在一个…

【GPT4】微软 GPT-4 测试报告(3)GPT4 的编程能力

欢迎关注【youcans的GPT学习笔记】原创作品&#xff0c;火热更新中 微软 GPT-4 测试报告&#xff08;1&#xff09;总体介绍 微软 GPT-4 测试报告&#xff08;2&#xff09;多模态与跨学科能力 微软 GPT-4 测试报告&#xff08;3&#xff09;GPT4 的编程能力 【GPT4】微软 GPT-…

ChatGPT和GPT-4带你选笔记本电脑

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

(只需三步)如何用chatgpt自动生成思维导图

目录 chatgpt是可以生成思维导图的&#xff01;只需三步&#xff0c;非常简单&#xff01; 第一步&#xff1a;打开chatgpt&#xff0c;告诉它主题 第二步&#xff0c;完善思维导图 第三步&#xff1a;查看思维导图的效果 chatgpt是可以生成思维导图的&#xff01;只需三步&am…

vue2路由(下)

编程式路由导航 通过点击按钮实现push和replace俩种模式的跳转 实现&#xff1a;就是通过$router原型里面的方法 也能实现路由的跳转和后退&#xff0c;分别采用的是$router里面的black和forward方法 感觉就是BOM对象中的history对象里面的方法 正是前进&#xff0c;后是后…

C/C++|物联网开发入门+项目实战|嵌入式C语言高级|C语言常用关键字及运算符操作-学习笔记(8)

文章目录2-2: C语言常用关键字及运算符操作关键字参考&#xff1a; 麦子学院-嵌入式C语言高级2-2: C语言常用关键字及运算符操作 [重点] 掌握C语言的常用关键宇及其应用场景&#xff0c;使用技巧 关键字 编译器&#xff1a;预先定义了一定意义的字符串&#xff0c;32个。 s…

通讯录的实现(初级版本)

思维导图&#xff1a; 目录 思维导图&#xff1a; ​编辑 一&#xff0c;实现通讯录的第一步——实现逻辑 二&#xff0c;头文件内实现的内容 三&#xff0c;contect.c文件内的函数定义 3.1初始化通讯录:void InitContect(cotect*pc) 3.2:实现通讯录内容的增加&#xff…

Python的基础

这是我自己学习Python的三个星期的小总结&#xff0c;内容包含了规范、数据类型、函数、类和捕捉异常&#xff0c;做了一个简单的梳理&#xff0c;希望可以帮助到和我一样开始学习Python的小伙伴&#xff0c;也希望多多支持&#xff0c;相互进步&#xff0c;下面步入正题。 基…

【Python】Python中的列表,元组,字典

文章目录列表创建列表获取元素修改元素添加元素查找元素删除元素列表拼接遍历列表切片操作元组创建元组元组中的操作字典创建字典添加/修改元素删除元素查找字典的遍历合法的key类型列表 列表是一种批量保存数据的方式&#xff0c;列表使用[]表示 创建列表 创建两个空列表 …

Java对日开发成趋势?网友:找工作打开了新思路

近两年行业环境起起伏伏&#xff0c;企业降本增效&#xff0c;提高人才招聘的门槛&#xff0c;导致大家找工作时觉得越来越难&#xff0c;尤其是Java开发&#xff0c;主打的就是一个“卷”&#xff01; 不过行业变革&#xff0c;挑战与机遇并存。Java作为编程语言排行榜的常年第…