背景
目前公司在从spark 2.4.x升级到3.1.1的时候,遇到了一类SQL极慢的情况,该SQL的如下(只列举了关键的):
select device_personas.*
from
(select
device_id, ads_id,
from_json(regexp_replace(device_personas, '(?<=(\\{|,))"device_', '"user_device_'), ${device_schema}) as device_personas
from input )
其${device_schema} 有几百个字段
在没有调优之前 在360core 720GB内存的情况下,需要运行43分钟:
调优之后,资源不变的情况下,只需要运行6分钟:
结论
先说结论:
主要的原因是 Spark 3.1.x 引入的 org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs
新规则,该规则对于该SQL作用是裁剪了不必要的列:
导致 regexp_replace
会被调用很多次,具体的原因如该规则的解释:
if JsonToStructs(json) is shared among all fields of CreateNamedStruct. prunedSchema contains all accessed fields in original CreateNamedStruct.
所以设置 spark.sql.optimizer.enableJsonExpressionOptimization 为 false
,或者设置
spark.sql.adaptive.optimizer.excludedRules org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs
spark.sql.optimizer.excludedRules org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs
跳过该规则。
分析
该SQL的物理计划如下:
没有跳过该规则的情况下:
该主要的物理计划为:
(6) Project
Output [10]: [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299]
Input [6]: [device_id#62, ads_id#63, device_personas#69, ads_personas#70, album_personas#72, ctx_personas#73]
经过该规则的处理计划转换如下(以两个字段为例):
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs ===
InsertIntoHadoopFsRelationCommand oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227, false, Parquet, Map(coalesceNum -> 500, path -> oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227), Overwrite, [device_id, ads_id, user_device_adv_age_year, user_device_child_age, ads_material_text_tag, ads_ad_pic_resolution, ctx_sound_patch_scene, ctx_position, album_category_id, album_nlp_labels_app] InsertIntoHadoopFsRelationCommand oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227, false, Parquet, Map(coalesceNum -> 500, path -> oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227), Overwrite, [device_id, ads_id, user_device_adv_age_year, user_device_child_age, ads_material_text_tag, ads_ad_pic_resolution, ctx_sound_patch_scene, ctx_position, album_category_id, album_nlp_labels_app]
+- Repartition 500, true +- Repartition 500, true
! +- Project [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299] +- Project [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299]
+- Filter (if ((label_click#84 = 0)) (rand(7794855199306151884) >= 0.95) else true AND (NOT (isnull(device_personas#69) AND isnull(ads_personas#70)) OR NOT isnull(ctx_personas#73))) +- Filter (if ((label_click#84 = 0)) (rand(7794855199306151884) >= 0.95) else true AND (NOT (isnull(device_personas#69) AND isnull(ads_personas#70)) OR NOT isnull(ctx_personas#73)))
+- Filter ((((dt#82 >= 20230710) AND (dt#82 <= 20230712)) AND NOT coalesce(appshadow#76, ) IN (2,3)) AND ((NOT (position_name#75 = sound_agg) AND isnotnull(get_json_object(ads_personas#70, $.ads_first_trade))) AND NOT coalesce(get_json_object(ads_personas#70, $.ads_business_type), -11111) IN (1,2,3))) +- Filter ((((dt#82 >= 20230710) AND (dt#82 <= 20230712)) AND NOT coalesce(appshadow#76, ) IN (2,3)) AND ((NOT (position_name#75 = sound_agg) AND isnotnull(get_json_object(ads_personas#70, $.ads_first_trade))) AND NOT coalesce(get_json_object(ads_personas#70, $.ads_business_type), -11111) IN (1,2,3)))
+- Relation[device_id#62,ads_id#63,response_id#64,track_id#65,album_id#66,imp_ts#67,click_ts#68,device_personas#69,ads_personas#70,track_personas#71,album_personas#72,ctx_personas#73,label_conv#74,position_name#75,appshadow#76,play_num#77,sub_num#78,leave_num#79,pay_num#80,live_num#81,dt#82,hour#83,label_click#84] parquet +- Relation[device_id#62,ads_id#63,response_id#64,track_id#65,album_id#66,imp_ts#67,click_ts#68,device_personas#69,ads_personas#70,track_personas#71,album_personas#72,ctx_personas#73,label_conv#74,position_name#75,appshadow#76,play_num#77,sub_num#78,leave_num#79,pay_num#80,live_num#81,dt#82,hour#83,label_click#84] parquet
可以看到最主要的转换为:
from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293
||
\/
from_json(StructField(user_device_adv_age_year,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293
from_json 中的 schema 由 StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true)
分开成了
StructField(user_device_adv_age_year,StringType,true)
StructField(user_device_child_age,StringType,true)
单独的两个schema
那为什么会变慢呢?是因为JsonToStructs中的处理逻辑:
case class JsonToStructs(
schema: DataType,
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes
with NullIntolerant {
...
@transient lazy val parser = {
val parsedOptions = new JSONOptions(options, timeZoneId.get, nameOfCorruptRecord)
val mode = parsedOptions.parseMode
if (mode != PermissiveMode && mode != FailFastMode) {
throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " +
s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.")
}
val (parserSchema, actualSchema) = nullableSchema match {
case s: StructType =>
ExprUtils.verifyColumnNameOfCorruptRecord(s, parsedOptions.columnNameOfCorruptRecord)
(s, StructType(s.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)))
case other =>
(StructType(StructField("value", other) :: Nil), other)
}
val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = false)
val createParser = CreateJacksonParser.utf8String _
new FailureSafeParser[UTF8String](
input => rawParser.parse(input, createParser, identity[UTF8String]),
mode,
parserSchema,
parsedOptions.columnNameOfCorruptRecord)
}
...
override def nullSafeEval(json: Any): Any = {
converter(parser.parse(json.asInstanceOf[UTF8String]))
}
最主要关心的是 parser这个变量,因为由于上述规则的原因,两个schema单独在不同的parser中,而这里的 Child是由regexp_replace表达式组成的,所以该正则表达式会计算两次,
而由于该字段会有10多个,所以该正则表达式会被重复计算100多次(正则表达式的是比较消耗时间的)。
跳过该规则的情况下
该主要的物理计划为:
(6) Project
Output [10]: [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299]
Input [6]: [device_id#62, ads_id#63, device_personas#69, ads_personas#70, album_personas#72, ctx_personas#73]
如果跳过该规则的话,那么该规则不会被应用,还是以两个字段为例,所以from_json的Schema不会变:
from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293
其实从物理计划我们看到:其实在regexp_replace这个表达式还是会出现多次,难道不会被调用多次么?当然不会被调用多次,直接看物理计划ProjectExec:
ProjectExec
protected override def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val project = UnsafeProjection.create(projectList, child.output)
project.initialize(index)
iter.map(project)
}
}
该方法的调用链如下:
UnsafeProjection.create
||
\/
InterpretedUnsafeProjection.createProjection/GenerateUnsafeProjection.generate
||
\/
create
||
\/
createCode(ctx, expressions, subexpressionEliminationEnabled)
||
\/
ctx.generateExpressions(expressions, useSubexprElimination)
||
\/
subexpressionElimination
subexpressionElimination 这里主要是提取公共表达式,也就是说后续的公共表达式的计算只会被计算一次
那对应到我们的表达式为:
Alias(GetStructField(attribute.get, i), f.name)()
其中 attribute.get 为 JsonToStructs(StructType(StructField(user_device_adv_age_year,StringType,true),StructField(user_device_child_age,StringType,true)), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai))
这里的刚好能和Spark UI上显示的计划能对上:
from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293
(主要就是调用JsonToStructs.toString的方法)
其他
- Alias 的toString方法为:
s"$child AS $name#${exprId.id}$typeSuffix$delaySuffix"
- GetStructField 的toString方法为:
val fieldName = if (resolved) childSchema(ordinal).name else s"_$ordinal"
s"$child.${name.getOrElse(fieldName)}"
-
UnresolvedStar这个类里有对 SELECT record. from (SELECT struct(a,b,c) as record …)*的解释
-
ResolveReferences 规则中的方法buildExpandedProjectList 进行 UnresolvedStar 的expand方法的调用
这里就会解析为 Alias(GetStructField(attribute.get, i), f.name)() -
具体的优化规则见Optimize Json expression chain