前言:一个好产品,功能应该尽量包装在服务内部;对于Flink而言,无疑是做到了这一点。但是用户在使用Flink的时候,依然可以从版本的选择、代码逻辑、资源参数、业务的数据情况等方面做任务级的定制化优化;用最合理的资源使用,保障实时性、稳定性和最佳Tps的处理能力。
一、Flink任务优化分析
1.1 参考Spark的优化方式
对于任何的技术发展,后来的的技术架构都可以参考之前优秀竞品的设计思想或缺陷,然后加以改进和优化。大数据分布式计算领域,Flink在设计自己的功能时,都做了哪些努力,让其在实现自己特性的同时,依旧保障了高性能。参考Spark的性能优化:Saprk可以从开发调优(RDD使用、算子区别、序列化方式、对象等)、资源参数调优(Driver、Executor数量和大小等)、数据负载调优(数据倾斜处理)、shuffle调优(shuffle方式和局部内存配置),四个方面做优化。
1.2 任务开发分析
Flink在开发阶段:Flink有四个模块的SDK提供数据操作API:DateStream、DataSet、FlinkTbale、FlinkSQL;
其他有Checkpoint配置、RocksDB选择存储,如同RDD类似;所以在开发层面,设置Checkpoint配置、判断是否选择RocksDB作为状态存储,基本可以把精力聚焦在逻辑层面;当然在sink阶段,可以对写入中间件的客户端做组件级的优化,减轻写入背压。
1.3 资源参数分析
资源参数分析:Flink任务可以动态调节并发数(slot),分配CPU资源;内存主要有JobManager和TaskManager两个进程,TaskManager中有多个内存级线程任务;所以可以从slot数、两个进程资源的大小、以及TaskManager进程内部的多个局部内存的使用情况,精细化调节、优化任务;
1.4 数据负载分析
Flink数据负载分析:Flink也是并发处理,所以存在数据在某个阶段的单个任务高负载运行,这种时候要排查出高聚合数据,然后做特殊处理;实时处理不存在阶段影响、理想存在阶段计算结果落磁盘,数据负载在离线场景上比实时场景上,对任务性能影响更大。
1.5 Shuffle分析
spark的计算架构是阶段式计算,每处理完上一个阶段的计算,会把内存溢出的结果写入磁盘,下个阶段从内存和磁盘读取上阶段结果再做计算;spark的主要性能就消耗在每个阶段之间的shuffle上;Spark用单独的进程管理shuffle阶段的操作,有多种不同的shuffle模式选择(如:HashShuffle、SortShuffle等),涉及多个shuffleManager进程的内存配置优化;
Flink1.16版本之前,实时计算和离线计算使用不同的shuffle模式;实时计算使用Pipelined Shuffle计算Task任务,离线计算使用 Blocking Shuffle计算任务,Flink1.16之后,新加入了Hybrid Shuffle处理方式计算。
二、Flink任务优化操作
总结第一节四个方面的内容:Flink可以从代码开发、资源参数配置、数据负载三个方面做任何版本的任务级优化;可以通过对FLink版本的选择,做到离线计算shuffle级的优化。
2.1 Flink开发调优
2.1.1 开发SDK选择
对于没有边界数据流的实时处理和有边界数据流的离线处理,可以选择不同的SDK开发,
关系如下:
不涉及状态、窗口、时间的操作,可以用SQL表达(数据分析和数据管道);细粒度的操作就需要底层核心API做数据处理了,比如调用外部接口,中间数据层操作等;提供多种方式,按照需求选择,正常表达业务逻辑就行。
2.1.2 其他代码层优化操作
CheckPoint的配置选择:在Flink中,CheckPoint、State与Backend存储方式,共同保障实时任务运行时的容错(离线没有);CheckPoint有多种配置,能从运行模式、检查间隔、State快照大小和数量等方面做配置调优,StateBackend可以选择多种方式,保障CheakPoint生成状态的存储。
可以根据业务量大小、业务实时效果、业务的重要程度,调节Checkpoint和StateBackend的如下参数,观测各种任务指标的各种情况优化:
CheckPoint与StateBackend代码配置如下:
注:Checkpoint配置类属性,可检测CheckpointConfig配置类,里面有所有的配置项,我就不在这里截图了。
除此之外:
Flink在sink得时候,包装的是中间件得客户端,如Kafka、ES、Redis、Mysql等;
每个中间件客户端都有自己的优化配置:
比如Kafka客户端优化:单条size(默认1M)、bufferSize大小写、条数写、连接超时、连接数等;
比如ES客户端优化:批次写设置、间隔写设置、连接超时设置等;
比如Redis客户端优化:连接数,连接时常等设置;
比如Mysql客户端优化:客户端连接数等配置;
2.2 Flink资源参数配置优化
Flink在资源参数调优方面:可以对slots并发数、TaskManager内存大小、JobManager内存大小做Flink进程级调参优化;
在TaskManager内部,有多个内存概念,分别用来存储不同阶段、不同状态的数据,比如:托管内存、网络内存等;
除此之外,当使用Java开发Flink任务时,还可以对Java的Gc做选择,比如:-Denv.java.opts="-Dfile.encoding=UTF-8";
yarn启动方式样例参考:
TaskManager局部组件内存级参数配置如下:
TaskManager进程,内存架构图:
2.3 数据负载优化
如何观察数据负载现象:在Flink的运行的时候,taskManagers的task会记录数据情况;
如下图:是一个生产任务,并发任务之间就有五倍的负载差距;
出现负载了要如何做呢?
这里就和前面算子计算reBalance有关;
比如实时计算时:source是kafka:就和kafka的分区数据负载有关,可以从Kafka的数据生产者配置分区轮询;
如果是算子间,对于大负载的task,对读出数据的算子做key均匀打散,初次计算完后,再还原key做二次聚合;
离线计算时:对于分组计算,task负载操作类似;
Flink本身的运行机制,保证数据计算会快速释放资源,所以这个优化只是让资源释放更快,加大业务的单点TPS。
2.4 Shuffle优化
Flink社区多个版本整体上存在三种shuffle:Pipelined Shuffle、Blocking Shuffle、Hybrid Shuffle
这三者有什么区别呢?
Pipelined Shuffle方式可以全在内存中实时计算task,通过网络内存(netWork)做缓冲,且没有阶段计算等待限制; Blocking Shuffle方式每个阶段都要全部task计算完,落地到磁盘,下一个阶段再从磁盘读取开始计算;Hybrid Shuffle结合 Pipelined Shuffle实时快速计算的特性和Blocking Shuffle写磁盘稳定性特性,让离线计算不用每个阶段task算完和必须落地到磁盘的操作,当内存资源足够的时候基本可以不落地到磁盘;利用了Flink实时流快速计算的特性,写磁盘做为安全保障,保障离线计算的处理能力。
Flink没有ShuffleManager管理进程,它使用对内存资源队列感知和背压的方式,巧妙地解决了shuffle过程的定制化管理;可以说对于Flink的shuffle阶段,Flink基本在逻辑内部就自动实现了,没有spark那样需要专门做shuffleManager做内存管理。
三、总结
Flink任务分离线和实时两种计算场景:
从版本性能上:对于实时处理,各版本的设计架构基本一致;Flink1.15和Flink1.16对离线处理,任务管理架构,做了比较大的调整,相比Spark的计算管理架构和Flink之前的Blocking Shuffle方式,有了较大的性能提升;
从代码逻辑上:Flink实时处理通过对Checkpoint的配置、状态后端存储方式选择,离线对每个批处理的数量配置等,做特殊配置优化处理;以及开发SDK模块的选择,做到尽量只关心开发逻辑的的优化;
从资源参数配置上:离线和实时均可以从>的并发数量,JobManager、taskManager进程资源大小,以及taskManager内部,托管内存,网络内存等局部内存上优化运行任务。
从数据负载上:实时任务由于其快速的资源使用与释放,数据负载影响不大,但是对实时数据负载的优化,可以提升最大TPS的数据处理能力(数百万TPS);离线任务在早期的版本架构中,因为Blocking Shuffle阶段结果数据要整体落到磁盘,数据负载会直接导致局部资源等待,所以数据负载对离线处理的性能影响很大,1.15版本后,优化离线处理方式之后,数据负载的影响在服务内部技术架构上优化掉了。