背景
目前公司在从spark 2.4.x升级到3.1.1的时候,遇到了一类SQL极慢的情况,该SQL的如下(只列举了关键的):
 
 select device_personas.* 
 from
 (select
        device_id, ads_id, 
        from_json(regexp_replace(device_personas, '(?<=(\\{|,))"device_', '"user_device_'), ${device_schema}) as device_personas
        from input )
其${device_schema} 有几百个字段
 
在没有调优之前 在360core 720GB内存的情况下,需要运行43分钟:
 
调优之后,资源不变的情况下,只需要运行6分钟:
 
结论
先说结论:
 主要的原因是 Spark 3.1.x 引入的 org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs 新规则,该规则对于该SQL作用是裁剪了不必要的列:
 导致 regexp_replace 会被调用很多次,具体的原因如该规则的解释:
if JsonToStructs(json) is shared among all fields of CreateNamedStruct. prunedSchema contains all accessed fields in original CreateNamedStruct.
 
所以设置 spark.sql.optimizer.enableJsonExpressionOptimization 为 false,或者设置
spark.sql.adaptive.optimizer.excludedRules	    org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs
spark.sql.optimizer.excludedRules	              org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs
 
跳过该规则。
分析
该SQL的物理计划如下:
 
没有跳过该规则的情况下:
该主要的物理计划为:
(6) Project
Output [10]: [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299]
Input [6]: [device_id#62, ads_id#63, device_personas#69, ads_personas#70, album_personas#72, ctx_personas#73]
 
经过该规则的处理计划转换如下(以两个字段为例):
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs ===
 InsertIntoHadoopFsRelationCommand oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227, false, Parquet, Map(coalesceNum -> 500, path -> oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227), Overwrite, [device_id, ads_id, user_device_adv_age_year, user_device_child_age, ads_material_text_tag, ads_ad_pic_resolution, ctx_sound_patch_scene, ctx_position, album_category_id, album_nlp_labels_app]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        InsertIntoHadoopFsRelationCommand oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227, false, Parquet, Map(coalesceNum -> 500, path -> oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227), Overwrite, [device_id, ads_id, user_device_adv_age_year, user_device_child_age, ads_material_text_tag, ads_ad_pic_resolution, ctx_sound_patch_scene, ctx_position, album_category_id, album_nlp_labels_app]
 +- Repartition 500, true                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              +- Repartition 500, true
!   +- Project [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299]      +- Project [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299]
       +- Filter (if ((label_click#84 = 0)) (rand(7794855199306151884) >= 0.95) else true AND (NOT (isnull(device_personas#69) AND isnull(ads_personas#70)) OR NOT isnull(ctx_personas#73)))                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 +- Filter (if ((label_click#84 = 0)) (rand(7794855199306151884) >= 0.95) else true AND (NOT (isnull(device_personas#69) AND isnull(ads_personas#70)) OR NOT isnull(ctx_personas#73)))
          +- Filter ((((dt#82 >= 20230710) AND (dt#82 <= 20230712)) AND NOT coalesce(appshadow#76, ) IN (2,3)) AND ((NOT (position_name#75 = sound_agg) AND isnotnull(get_json_object(ads_personas#70, $.ads_first_trade))) AND NOT coalesce(get_json_object(ads_personas#70, $.ads_business_type), -11111) IN (1,2,3)))                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        +- Filter ((((dt#82 >= 20230710) AND (dt#82 <= 20230712)) AND NOT coalesce(appshadow#76, ) IN (2,3)) AND ((NOT (position_name#75 = sound_agg) AND isnotnull(get_json_object(ads_personas#70, $.ads_first_trade))) AND NOT coalesce(get_json_object(ads_personas#70, $.ads_business_type), -11111) IN (1,2,3)))
             +- Relation[device_id#62,ads_id#63,response_id#64,track_id#65,album_id#66,imp_ts#67,click_ts#68,device_personas#69,ads_personas#70,track_personas#71,album_personas#72,ctx_personas#73,label_conv#74,position_name#75,appshadow#76,play_num#77,sub_num#78,leave_num#79,pay_num#80,live_num#81,dt#82,hour#83,label_click#84] parquet                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   +- Relation[device_id#62,ads_id#63,response_id#64,track_id#65,album_id#66,imp_ts#67,click_ts#68,device_personas#69,ads_personas#70,track_personas#71,album_personas#72,ctx_personas#73,label_conv#74,position_name#75,appshadow#76,play_num#77,sub_num#78,leave_num#79,pay_num#80,live_num#81,dt#82,hour#83,label_click#84] parquet
 
可以看到最主要的转换为:
from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293
              ||
              \/
from_json(StructField(user_device_adv_age_year,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293
 
from_json 中的 schema 由 StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true)分开成了
 StructField(user_device_adv_age_year,StringType,true)
 StructField(user_device_child_age,StringType,true)单独的两个schema
那为什么会变慢呢?是因为JsonToStructs中的处理逻辑:
case class JsonToStructs(
    schema: DataType,
    options: Map[String, String],
    child: Expression,
    timeZoneId: Option[String] = None)
  extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes
    with NullIntolerant {
    ...
    @transient lazy val parser = {
    val parsedOptions = new JSONOptions(options, timeZoneId.get, nameOfCorruptRecord)
    val mode = parsedOptions.parseMode
    if (mode != PermissiveMode && mode != FailFastMode) {
      throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " +
        s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.")
    }
    val (parserSchema, actualSchema) = nullableSchema match {
      case s: StructType =>
        ExprUtils.verifyColumnNameOfCorruptRecord(s, parsedOptions.columnNameOfCorruptRecord)
        (s, StructType(s.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)))
      case other =>
        (StructType(StructField("value", other) :: Nil), other)
    }
    val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = false)
    val createParser = CreateJacksonParser.utf8String _
    new FailureSafeParser[UTF8String](
      input => rawParser.parse(input, createParser, identity[UTF8String]),
      mode,
      parserSchema,
      parsedOptions.columnNameOfCorruptRecord)
  }
  ...
  override def nullSafeEval(json: Any): Any = {
    converter(parser.parse(json.asInstanceOf[UTF8String]))
  }
 
最主要关心的是 parser这个变量,因为由于上述规则的原因,两个schema单独在不同的parser中,而这里的 Child是由regexp_replace表达式组成的,所以该正则表达式会计算两次,
 而由于该字段会有10多个,所以该正则表达式会被重复计算100多次(正则表达式的是比较消耗时间的)。
跳过该规则的情况下
该主要的物理计划为:
(6) Project
Output [10]: [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299]
Input [6]: [device_id#62, ads_id#63, device_personas#69, ads_personas#70, album_personas#72, ctx_personas#73]
 
如果跳过该规则的话,那么该规则不会被应用,还是以两个字段为例,所以from_json的Schema不会变:
from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293
 
其实从物理计划我们看到:其实在regexp_replace这个表达式还是会出现多次,难道不会被调用多次么?当然不会被调用多次,直接看物理计划ProjectExec:
ProjectExec
  protected override def doExecute(): RDD[InternalRow] = {
    child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
      val project = UnsafeProjection.create(projectList, child.output)
      project.initialize(index)
      iter.map(project)
    }
  }
 
该方法的调用链如下:
UnsafeProjection.create
              ||
              \/
InterpretedUnsafeProjection.createProjection/GenerateUnsafeProjection.generate
              ||
              \/
             create
              ||
              \/
createCode(ctx, expressions, subexpressionEliminationEnabled)
              ||
              \/
ctx.generateExpressions(expressions, useSubexprElimination)
              ||
              \/
subexpressionElimination
 
subexpressionElimination 这里主要是提取公共表达式,也就是说后续的公共表达式的计算只会被计算一次
 那对应到我们的表达式为:
 Alias(GetStructField(attribute.get, i), f.name)()
 其中 attribute.get 为 JsonToStructs(StructType(StructField(user_device_adv_age_year,StringType,true),StructField(user_device_child_age,StringType,true)), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai))
 
这里的刚好能和Spark UI上显示的计划能对上:
from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293
 
(主要就是调用JsonToStructs.toString的方法)
其他
- Alias 的toString方法为:
 
s"$child AS $name#${exprId.id}$typeSuffix$delaySuffix" 
 
- GetStructField 的toString方法为:
 
val fieldName = if (resolved) childSchema(ordinal).name else s"_$ordinal"
s"$child.${name.getOrElse(fieldName)}" 
 
-  
UnresolvedStar这个类里有对 SELECT record. from (SELECT struct(a,b,c) as record …)*的解释
 -  
ResolveReferences 规则中的方法buildExpandedProjectList 进行 UnresolvedStar 的expand方法的调用
这里就会解析为 Alias(GetStructField(attribute.get, i), f.name)() -  
具体的优化规则见Optimize Json expression chain
 







![[Docker实现测试部署CI/CD----自由风格和流水线的CD操作(6)]](https://img-blog.csdnimg.cn/ce509b478a244be2a468ad862e9c9a9a.png)











