注:文章参考:
数据治理实践 | 网易某业务线的计算资源治理从计算资源治理实践出发,带大家清楚认识计算资源治理到底该如何进行,并如何应用到其他项目中https://mp.weixin.qq.com/s/w6d5zhDaaavNhW_DMEkPsQ
目录
一、计算资源治理的背景
二、计算资源问题复盘
三、计算资源治理前的思考与行动
3.1 治理前的思考
3.2 治理行动
四、治理效果
五、小结
前言
业务成熟期,数仓经常会收到集群资源满载、任务产出延时等消息/邮件,有时候下游数分及其他周边部门也会询问任务运行慢的原因。遇到这类问题,第一想到的是加资源解决,但有些情景,任务运行慢的原因不一定是缺少资源,而是需要优化当前的问题任务。本文章阐述计算资源治理的实施内容。
一、计算资源治理的背景
- 运行时长过长、资源消耗大
问题代码运行时产生了数据倾斜,大量相同key的数据会被发往同一个reduce,进而导致该reduce所需的时间远超过其他reduce,成为整个任务的瓶颈。
- 任务调度时间规划不合理,部分时间段资源消耗较高任务(集群资源打满)
大量的任务堆积在凌晨xx点执行,核心任务和非核心任务没有做资源级别划分,数据产出延迟被业务方投诉。
- 存在空跑任务/未引用模型对应任务/无效监控任务
二、计算资源问题复盘
在做计算治理之前,团队内部盘点了当前计算资源存在的问题(重灾区):
(1)30+高消耗任务(任务执行时间过长):由于前中期业务扩张,数仓要覆盖下游大量的场景应用,为快速响应需求,开发代码后未经审核就上线,故数仓存在较多问题代码,这些代码运行时可能会发生数据倾斜,消耗大量的资源,最终产出时间很久。
(2)任务调度安排的不合理:多数任务堆积在凌晨2-5点执行,该区间CPU满载(内存空闲,但是CPU爆满),导致该时间段的资源消耗变成了重灾区,所有核心/非核心任务都在争抢资源,部分核心任务不能按时产出,一直处于等待阶段;
(3)线上无效DQC&监控配置的资源过少:存在空跑无意义的DQC浪费资源的情况、有些DQC分配的资源过少,导致任务运行时间过长;
ps:数据质量监控DQC:线上任务执行后会触发对应的DQC规则,DQC分为基础DQC(ods -> dwd -> dws ->ads每一层都要配置,对核心表/字段进行监控)和业务dqc(ads层配置)
(4)存在模型未被引用的任务/无效监控任务:早期为快速需求,数仓开发了很多烟囱式数据模型、因为业务迭代或业务下线等,部分模型不再被引用,无用的烟囱模型会带来额外的计算资源开销。
(5)任务缺少调优参数& 计算引擎为mr/spark2:任务缺少调优参数导致资源无法弹性伸缩,以适配任务进行动态调整。有些线上任务仍然使用mr/spark2计算引擎,导致运行效率很低,产出延迟。
三、计算资源治理前的思考与行动
3.1 治理前的思考
经过与团队多次沟通,对当前计算资源治理的优先级、改动成本、治理难度、对下游的影响等因素综合评估后,得出治理的顺序可以是:参数调优&任务引擎切换 --> DQC治理-->高消耗任务治理--> 调度安排 --> 下线无用模型 --> 沉淀公共指标
3.2 治理行动
(1)添加调优参数&计算引擎切换至Spark3
参数层面例如: set spark.sql.adaptive.enabled=true; 该参数代表:是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行,该参数能更好利用的单个executor的性能,还能缓解小文件问题。平台默认是开启的。
任务执行引擎统一从mr/spark2切换至Spark3进行加速。
-
(2)DQC治理
无效DQC下线:查看DQC任务是否与线上任务一一匹配,将无效DQC任务下线。
DQC分配的计算资源调整:由于之前DQC配置资源为集群默认参数,效率极低导致所有DQC运行时长均超过10min。之后调整Driver内存为2048M,Executor个数为2,Executor内存为4096M。
DQC数量缩减:线上的基础DQC只需要对核心字段进行配置就OK,无关的DQC在不影响下游报表的前提下直接下线处理。
(3)高消耗任务调优(怎么去优化任务?数据倾斜该怎么调优?)
高消耗任务:例如:资源消耗连续7天排行前10。调优存在两个难点:1.优化效果不可控;2.高消耗任务调整到何种程度算合适(调优上限不可知)。针对这些难点,我们取所有核心任务的耗时均值,保障单个任务消耗小于平均消耗。此外,我们针对当前高消耗任务列举出以下的优化方案:
- 参数层面
set hive.auto.convert.join = true; -->是否自动转化成Map Join
set hive.groupby.skewindata=true; --> 当数据出现倾斜时,如果该变量设置为true,hive会自动进行负载均衡
set hive.map.aggr=true; -->是否在map端进行聚合,默认为true;
- map阶段
1.剪裁列和剪裁行
剪裁列:select * from tableA 转换成 select column1 , column2 from tableA;
剪裁行:分区限制,where时间及条件限制,减少非必要的数据输入
select column1 , column2 from tableA; 转换成
select column1 , column2 from tableA where ds ='${lst1date}' and xxx > yyy;
2.distribute by rand():
代码结尾添加distribute by rand(),控制map输出结果的分发,即map端如何拆分数据给reduce端;
当distribute by 后边定义的列是rand()时,默认采用hash算法,根据reduce个数进行数据分发,保障每
个分区的数据基本一致。
- reduce阶段
1.笛卡尔积优化
多对多关联发生数据膨胀,很可能出现笛卡尔积,根据实际业务,尽量避免
2.sql语句中distinct切换成group by
3.map join :大表join小表
map join 会把小表全部加载到内存中,在map阶段直接拿另外一个表的数据和内存中表的数据做匹配关联,由于在map阶段已经进行了join操作,省去reduce阶段,任务整体运行的效率会提升很多。
4.大key重组计算(key字段出现空值或单个reduce中的key很多,热点key)
4.1 column1存在空值
select
column1,
sum(column2) as column2_amt
from tableA
where column1 is not null
group by column1;
-----------------------------------------
select
ifnull(column1,name) as column1_name,
sum(column2) as column2_amt
from tableA
group by column1;
------------------------------------------
4.2 column1 数据很多(column1是热点key)
方式一:分段,分而治之
select
column1,
sum(column2) as column2_amt
from tableA
where ds = '${lst1date}' and column1 <> 'c'
group by column1;
union all
select
column1,
sum(column2) as column2_amt
from tableA
where ds = '${lst1date}'and column1 = 'c'
group by column1;
方式二:对热点key加盐打散后再分组聚合
select
split_part(column1_rn,'_',1) as column1,
sum(column2_amt) as column2_amt
from (select column1_ct,
sum(column2) as column2_amt
from(select
concat(column1,'_',floor(rand()*N)) as column1_rn,
sum(column2) as column2_amt
from tableA
where ds = '${lst1date}') t1
group by column1_rn)t2
group by split_part(column1_rn,'_',1)
-- rand()*N 生成0~N的随机数,floor下向取整
5.过滤逻辑尽可能在子查询中就处理好(t0大表 join 中表t1)
SELECT t0.column1,
tmp.column2_amt
from t0
left join (select
column1,
sum(column2_amt) as column2_amt
from t1
where ds = '${lst1date}'
group by column1) tmp
on t0.column1= tmp.column1
- 其他层面
spark aqe特性
1.aqe是spark sql的一种动态优化机制,是针对查询执行计划的优化;
2.aqe工作原理:当查询任务提交后,shuffle过程会将任务划分为多个查询阶段。在执行过程中,上一个查询执行完之后,系统会将查询结果保存下来,下一个查询就可以基于上一个查询的结果继续进行计算了;
3.引入aqe机制后,spark可以在任务运行过程中实时统计任务的执行情况,并通过自适应计划将统计结果反馈给优化器,从而对任务再次进行优化,这种边执行、边优化的方式极大提高了sQL的执行效率。
数据倾斜:通常是指参与计算的数据分布不均,即某个key或者某些key的数据量远超其他key,导致在shuffle阶段,大量相同的key的数据会被发往同一个reduce,进而导致该reduce所需的时间远超过其他reduce,成为整个任务的瓶颈。
(5)任务调度时间合理规划
将堆积在凌晨2-5点的600+任务进行梳理,由于任务间错综复杂的依赖关系,修改后可能会级联影响下游报表,因此按照以下步骤逐步实施:
- 找到所有表的输出输入点,即起始ODS,末尾ADS;
- 划分核心表/非核心表,整理对应的任务开始时间与结束时间;
- 划分核心任务/非核心任务,设置任务执行的优先级,将非核心的任务调度延后;
- 把非核心任务穿插在集群资源的低峰时期运行(2点前,5点后),把核心任务调度提前,保障CDM层(dwd明细层、dws汇总数据层)任务及时产出;
- 梳理部分时间段资源消耗较高的任务,提前/延后该任务的调度时间,保障资源合理分配;
(6)烟囱任务下沉&无用任务下线
对于烟囱任务,可以将公共指标下沉到到DWS以提高复用性;无用任务/无效监控项任务及时下线(这里建议能拿到报表层级的数据血缘,防止任务下线后影响下游报表的数据呈现),降低资源损耗。
四、治理效果
计算资源治理后的可量化指标如下(举例):
(1)将Hive与Spark2任务升级至Spark3,总计升级任务xxx个,升级后任务执行效率提升xx%,cpu资源消耗降低xx%,内存资源消耗降低xx%。
(2)下线无效DQC任务总计50+,调整DQC运行资源,治理后时长由10min优化至3min内。
(3)完成线上x+任务优化,x+任务下线,x+表的指标下沉,节省任务耗时xxx分钟。
(4)任务调度重新分配后,凌晨2-5点的资源使用率由90%降低至50%+,日用资源的趋势图没有大幅度的波动。
(5)整体治理后为部门减少1/3费用,由原来的xx万元降低至xx万元。
五、小结
计算资源治理的核心在于降本增效,用有限资源去运行更多任务。计算资源治理是一项长期工程,不能等到集群资源紧张才去治理优化,而是将计算资源治理的意识贯彻到日常开发中。可通过周/月的资源扫描内容及时推送给数仓部门,让每个任务都有源可循,有方案可优化。
待补充~