一种基于动态水位值的Flink调度优化算法(flink1.5以前),等同于实现flink的Credit-based反压原理

优化flink反压

  • 说明
  • 1 flink反压介绍
    • 1.1 介绍
    • 1.2 大数据系统反压现状
    • 1.4 flink task与task之间的反压
    • 1.5 netty水位机制作用分析
  • 2 反压优化算法
  • 3 重点! 但是 可但是 flink1.5以后的反压过程。
  • 4 flink反压问题的查找瓶颈办法

说明

首先说明,偶然看了个论文,发现 flink优化原来比我想象中的更简单,得到了一些启发,所以写下这篇帖子,供大家共同学习。
看到的论文是《计算机科学与应用》21年11月的一篇 名字就叫做 : 一种基于动态水位值的Flink调度优化算法。感兴趣的小伙伴可以自己看一下 ,很短没多少字。
但是 很离谱的是 这篇论文中的方法在flink1.5以后的反压机制中早就实现了,我不知道 这篇论文为什么能发表在21年11月的期刊上,也可能是我看论文看的不对?反正就当学习个人实现 flink1.5的反压机制更新了

1 flink反压介绍

1.1 介绍

首先说下flink反压,其实就是flink流式处理中一种动态反馈机制,一般是在实时数据处理的过程中,上游节点的生产速度大于下游节点的消费速度,提出反馈来提醒上游,下游消费不过来了。

如垃圾回收不及时或者停顿可能使得流入系统的数据快速堆积、大促或秒杀活动时出现的流量陡增等都会造成反压。如不对反压及时处理,将会使系统资源耗尽甚至导致系统崩溃。

1.2 大数据系统反压现状

现有大数据实时处理系统处理反压问题方面,Storm 是通过监控Bolt中的接收队列负载情况,如果超过高水位值就会将反压信息写到Zookeeper,Zookeeper上的watch会通知该拓扑的所有Worker都进入反压状态,最后Spout停止发送tuple。J Storm采用逐级降速的方式来处理反压,使用Topology Master替代Zookeeper来协调拓扑进入反压状态,效果较Storm更为稳定。Spark Streaming根据批处理时间(Batch Processing Time)和批次间隔(Batch Interval,即Batch Duration)的信息来动态调整系统的摄入速率,从而完成其反压工作。

在Flink优化方面,根据论文文献目前有几种:

  1. 使用Flink执行一种传统堆序优化后的算法Heap Optimize,增加了Flink的吞吐量 。
  2. 针对Flink默认的先来先服务的任务调度策略,通过资源感知,将待执行任务分配到最佳节点进行计算,优化了Flink的负载均衡。
  3. 根据任务间数据流的大小确定拓扑边的权重,以生成关键路径,大幅缩减了Flink节点间的通信开销。
  4. 把Flink从原来的CPU迁移扩展到异构的CPU-GPU集群,在并行计算、内存管理及通信策略方面极大地提高了Flink的计算能力。

目前诸多的研究当中没有Flink反压方面的问题,当Flink面临远端传输问题时,其所依托的Netty所采用的是一种静态的水位机制,这使得Flink在面临颠簸状态数据的远程传输问题时,容易出现反复反压的情况,极大地影响了Flink传输数据的效率,故而本文将针对此问题展开研究。

例如 从我的工作经历来说,flink做简单的etl操作 ,接受kafka 数据 插入es中,数据量 大小时大时小时,实际使用来看 spark streaming 性能远远好于flink 就是因为 flink的反压

1.4 flink task与task之间的反压

Flink的反压原理如下图所示,假如Flink的一个Job分为Task A、B、C,其中Task A是Source Task、Task B处理数据、Task C为Sink Task。假如Task C由于各种原因吞吐量降低,会将负载信息反馈给Task B,Task B会降低向Task C发送数据的速率,此时若Task B还保持从Task A读取数据,数据会把Task B的Send Buffer和Receive Buffer撑爆,导致OOM或者丢失数据。所以,当Task B的Send Buffer和Receive Buffer被用完后,Task B会用同样的原理将负载信息反馈给Task A,Task A收到Task B的负载信息后,会降低给Task B发送数据的速率,以此类推。

图1

Flink反压存在Task内与跨Task两种情况,本文已在图1中标注,本文主要是针对Flink跨Task传输进行反压优化,故下文主要对Flink跨Task传输进行介绍:图2展示了Flink网络传输时的数据流向,可以看到Task Manager A给TaskManager B发送数据,Task Manager A做为Producer,Task Manager B做为Consumer。Producer端的Operator实例会产生数据,最后通过网络发送给Consumer端的Operator实例。Producer端Operator实例生产的数据首先缓存到Task Manager内部的Net Work Buffer。Net Work依赖Netty来做通信,Producer端的Netty内部有Channel Outbound Buffer,Consumer端的Netty内部有Channel Inbound Buffer。Netty最终还是要通过Socket发送网络请求,Socket这一层也会有Buffer,Producer端有Send Buffer,Consumer端有Receive Buffer。

故Flink网络传输时的整个反压过程为:首先Producer Operator从自己的上游或者外部数据源读取到数据后,对一条条的数据进行处理,处理完的数据首先输出到Producer Operator对应的Net Work Buffer中。Buffer写满或者超时后,就会触发将Net Work Buffer中的数据拷贝到Producer端Netty的Channel Outbound Buffer,之后又把数据拷贝到Socket的Send Buffer中,这里有一个从用户态拷贝到内核态的过程,最后通过Socket发送网络请求,把Send Buffer中的数据发送到Consumer端的Receive Buffer。数据到达Consumer端后,再依次从Socket的Receive Buffer拷贝到Netty的Channel Inbound Buffer,再拷贝到Consumer Operator的Net Work Buffer,最后Consumer Operator就可以读到数据进行处理了,这就是两个Task Manager之间的数据传输过程。

图2

1.5 netty水位机制作用分析

分析源码可知,Netty水位机制是一种静态的机制,Netty默认其水位线的高度为定值,这使得Flink系统在面临瞬时流量不稳定的场景(即系统的数据流量值在特别高与特别低的值之间不断跳动时)时,会出现下述两种问题:

  1. 水位值较下游可用缓存区数偏低:如图3 (左图)所示,图中以“圆圈”表示数据,以“方框”表示缓存区的大小,下同。假设当上游A点来临的数据量是9 (Flink中以buffer为数据单位,每个buffer大小为32 k,为便于表述,下文块描述),而此时下游B点的可用缓存区是10,N代表代表数据通道(其作用类似于水坝,水位值的大小决定了其单位时间通过的数据量大小),此处设水位值高度为4,则Flink传输本批次的数据需要3个单位时间(上游共9块数据,每个单位时间只能通过4块的数据,需要3个单位的时间来处理这批数据)。而若此时的水位值为9或者10的话,则只需要一个单位时间,Flink便可以处理本批次的数据。

  2. 水位值较下游可用缓存区数偏高:如图3 (右图),假设当上游A点来临的数据量是4,而此时下游B点的可用缓存区为2,水位值高度为4。由于数据量不于水位值高度,Flink会误以为可以在一单位时间内接受这批数据,如图中可以看出,只有2块的缓存区,直接接收了4块的数据量,会直接导致内存溢出(OOM)甚至引起系统阻塞。综上,由于不合理的静态水位线的设置,使得Flink传输数据时间延长,或者出现非正常的阻塞,进而影响整个Flink的数据传输情况。

图3
综上,由于不合理的静态水位线的设置,使得Flink传输数据时间延长,或者出现非正常的阻塞,进而影响整个Flink的数据传输情况。

2 反压优化算法

虽然可以在数据处理前对Netty所默认的两个buffer高度进行参数调整,但这种默认的定值始终是一种静态的机制。这种相对静态的机制使得Flink在面临远程传输问题时,容易出现上文所述的两种问题。本节将针对Flink反压传输所存在的缺点,提出一种基于动态水位值Flink调度优化算法,并给出例子进行说明。Flink-N算法的核心思想是:把Flink中Netty下游可用buffer数Bt实时写入Redis中,根据Redis中前后时刻buffer数(即Bt值)的大小变化,对水位值Wt进行动态调整,算法流程如图4所示,其具体步骤如下:

第一步,设置访问函数,并创建接口,使得Flink启动的同时运行访问函数。其中,访问函数的作用是,每间隔一段时间访问Netty下游缓存区可用buffer (图中B点位置)的数量,并将其记录到Redis中;

第二步,获得下游可用buffer数Bt;

第三步,取0.8倍的B0值的整数部分(向下取整)作为Netty的高水位值,即令W0H = ⌊0.8|B0|⌋;

第四步,将Bt值反馈到Redis中并记录;

第五步,根据Bt值调整水位值Wt,具体方法为:若Bt大于Bt与Bt−1的平均值,则Wt取Bt大于Bt与Bt−1的平均值;反之,当Bt ≤ Bt−1时,则令Wt = Bt。

第六步,重复第二步、第四步与第五步。

本文选择Redis是因为Flink处理时延是ms级别的,而Redis数据读取速度可达110,000次/s,写数据的速度可达81,000次/s,选择Redis相较于其他数据库而言,不会对Flink的时效性产生负增益。

图4
经过实验室测试,发现吞吐量有明显上升
在这里插入图片描述
cpu利用率也是

在这里插入图片描述
以及延时
在这里插入图片描述

3 重点! 但是 可但是 flink1.5以后的反压过程。

经过跟论文比较发现,实际上 flink1.5已经实现了等同于论文中设想的功能并实现。

在 Flink 层面实现反压机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息(图上两个 ResultSubPartition 和 InputChannel 之间是虚线是因为最终还是要通过 Netty 和 Socket 去通信)
图2

  • 1.5之前是tcp反压 链路太长 ,会涉及多个task 如果在同一个taskmanager会造成重复使用Socket 阻塞
  • 新的网路栈:TaskManager 传输数据时,不同的 TaskManager 上的两个 Subtask 间通 常根据 key 的数量有多个 Channel,这些 Channel 会复用同一个 TaskManager 级别的 TCP 链接,并且共享接收端 Subtask 级别的 Buffer Pool。
  • 在接收端,每个 Channel 在初始阶段会被分配固定数量的 Exclusive Buffer, 这些 Buffer 会被用于存储接受到的数据,交给 Operator 使用后再次被释放。 Channel 接收端空闲的 Buffer 数量称为 Credit,Credit 会被定时同步给发送端被后 者用于决定发送多少个 Buffer 的数据。
  • 在流量较大时,Channel 的 Exclusive Buffer 可能会被写满,此时 Flink 会向 Buffer Pool 申请剩余的 Floating Buffer。这些 Floating Buffer 属于备用 Buffer,哪 个 Channel 需要就去哪里。而在 Channel 发送端,一个 Subtask 所有的 Channel 会共享同一个 Buffer Pool,这边就没有区分 Exclusive Buffer 和 Floating Buffer。

看图可以知道 ,底层的通信肯定是socket,但实际用的组件是netty。是netty在互相通信,来实现反压信息的传递

在这里插入图片描述

4 flink反压问题的查找瓶颈办法

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

一般情况使用Metrics都能够定位到问题,是cpu 内存,然后再定位算子,查看设计是否有缺陷,定位节点,加Metrics
我们在监控反压时会用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有关,最为有用的是以下几个:
Metrics: Metris描述
outPoolUsage发送端 Buffer 的使用率
inPoolUsage接收端 Buffer 的使用率
floatingBuffersUsage(1.9 以上)接收端 Floating Buffer 的使用率
exclusiveBuffersUsage (1.9 以上)接收端 Exclusive Buffer 的使用率

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/400995.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

iMazing2024Windows和Mac的iOS设备管理软件(可以替代iTunes进行数据备份和管理)

iMazing2024是一款兼容 Windows 和 Mac 的 iOS 设备管理软件,可以替代 iTunes 进行数据备份和管理。以下是一些 iMazing 的主要功能和优点: 数据备份和恢复:iMazing 提供了强大的数据备份和恢复功能,可以备份 iOS 设备上的各种数据…

Unity xLua开发环境搭建与基础进阶

Unity是一款非常流行的游戏开发引擎,而xLua是一个为Unity开发者提供的Lua框架,可以让开发者使用Lua语言来进行游戏开发。在本文中,我们将介绍如何搭建Unity xLua开发环境,并进行基础进阶的学习。 环境搭建 首先,我们需…

MATLAB练习题:拉马努金的恒等式

​讲解视频:可以在bilibili搜索《MATLAB教程新手入门篇——数学建模清风主讲》。​ MATLAB教程新手入门篇(数学建模清风主讲,适合零基础同学观看)_哔哩哔哩_bilibili 拉马努金是印度历史上最著名的数学家之一,他没有接…

【电子书】产品经理

资料 wx:1945423050 个人整理了一些互联网电子书 产品经理 破茧成蝶2——以产品为中心的设计革命.epub产品经理的20堂必修课.epub信息无障碍_提升用户体验的另一种视角.epub引爆用户增长.epub在你身边,为你设计 腾讯的用户体验设计之道.epub以匠心&…

新版Java面试专题视频教程——多线程篇①

新版Java面试专题视频教程——多线程篇① Java多线程相关面试题 0. 问题汇总0.1 线程的基础知识0.2 线程中并发安全 1.线程的基础知识1.1 线程和进程的区别?1.2 并行和并发有什么区别?1.3 创建线程的四种方式1.4 runnabl…

Global Gamers Challenge | 与 Flutter 一起保护地球

作者 / Kelvin Boateng 我们知道 Flutter 开发者热爱挑战,因此我们很高兴地宣布,新一轮的 Flutter 挑战赛来了! 挑战https://flutter.cn/events/puzzle-hack Global Gamers Challenge 是一项为期 8 周的比赛,参赛者需要设计、构建…

基于CNN-GRU-Attention的时间序列回归预测matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1 CNN(卷积神经网络)部分 4.2 GRU(门控循环单元)部分 4.3 Attention机制部分 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版…

详解 IT/OT 融合的五层架构(从PLC/SCADA到MES/ERP)

作为一个电气自动化的从业者,有必要搞懂下面术语的意思。 IT:Information Technology的缩写,指信息技术; OT:Operational Technology的缩写,指操作层面的技术,比如运营技术;CT&…

智慧城市环卫车辆监控管理方案

二.方案设计 智慧城市环卫系统主要包括以下几个方面: 1、通过 RFID 实时自动采集功能,自动统计了解各处垃圾桶每天清理情况; 2、GPS 与 DTU 透传相结合,实时掌握保洁及垃圾车辆的工作状态, 行驶路线以及任…

Vue路由缓存问题

路由缓存问题的产生 VueRouter允许用户在页面中创建多个视图(多级路由),并根据路由参数来动态的切换视图。使用带参数的路由时,相同的组件实例将被重复使用。因为两个路由都渲染同一个组件,比起销毁再创建,…

OpenAI Sora 技术报告: Video generation models as world simulators

Paper name OpenAI Sora 技术报告:Video generation models as world simulators Paper Reading Note 官网:https://openai.com/sora 技术报告:https://openai.com/research/video-generation-models-as-world-simulators TL;DR 2024 Op…

高维数据的中介效应【中介分析】《R包:HIMA》

允许基于高级中介筛选和惩罚回归技术来估计和测试高维中介效应 Hima包浏览 高维中介示意图 图1. 在暴露和结果之间有高维中介的情况 本包的作用 在确定独立筛选和极小极大凹惩罚技术的基础上,采用联合显著性检验方法对调解效果进行检验。使用蒙特卡罗模拟研究来展…

鸿蒙应用/元服务开发实战-Serverless云存储没法创建处理方式

新账户,Serverless云存储没法创建 ,没法进行下一步。 解决方式 请按照这个方式修改一下就能正常创建了,浏览器中打开控制台输入 window.top.cfpConfig.cloudStorageSwitch‘off’ 后再创建桶

二叉树基础知识总结

目录 二叉树基础知识 概念 : 根节点的五个形态 : 特殊的二叉树 满二叉树 : 完全二叉树 : 二叉搜索树 : 平衡二叉搜索树 : 二叉树的性质 : 二叉树的存储结构 二叉树的顺序存储结构 二叉树的链式存储结构 二叉树的遍历方式 : 基础概念 前中后遍历 层序遍历 :…

堆的结构实现与应用

目录 前言: 1.认识堆 a.如何认识堆? b.大根堆与小根堆 c.堆应用的简单认识 2.堆的结构与要实现的功能 3.向上调整算法 4.向下调整算法 5.向堆插入数据并建堆 6.堆的大小 7.堆的判空 8.取堆顶数据 9.删除堆顶数据 10.向上调整时间复杂度 11.向下调整时…

(十二)【Jmeter】线程(Threads(Users))之tearDown 线程组

简述 操作路径如下: 作用:在正式测试结束后执行清理操作,如关闭连接、释放资源等。配置:设置清理操作的采样器、执行顺序等参数。使用场景:确保在测试结束后应用程序恢复到正常状态,避免资源泄漏或对其他测试的影响。优点:提供清理操作,确保测试环境的整洁和可重复性…

【前端】夯实基础 css/html/js 50个练手项目(持续更新)

文章目录 前言Day 1 expanding-cardsDay 2 progress-steps 前言 发现一个没有用前端框架的练手项目,很适合我这种纯后端开发夯实基础,内含50个mini project,学习一下,做做笔记。 项目地址:https://github.com/bradtr…

集合可视化:rainbow box与欧拉图

论文:A new diagram for amino acids: User study comparing rainbow boxes to Venn/Euler diagram 最近偶然看到了这篇论文,觉得很有意思,针对的任务是集合数据的可视化。 我们用示例来说明,比如图二的欧拉图,展示的…

爬取链家二手房房价数据存入mongodb并进行分析

实验目的 1.使用python将爬虫数据存入mongodb; 2.使用python读取mongodb数据并进行可视化分析。 实验原理 MongoDB是文档数据库,采用BSON的结构来存储数据。在文档中可嵌套其他文档类型,使得MongoDB具有很强的数据描述能力。本节案例使用的…

【深度学习】微调Qwen1.8B

1.前言 使用地址数据微调Qwen1.8B。Qwen提供了预构建的Docker镜像,在使用时获取镜像只需安装驱动、下载模型文件即可启动Demo、部署OpenAI API以及进行微调。 github地址:GitHub - QwenLM/Qwen: The official repo of Qwen (通义千问) chat & pretr…