Spark3 新特性之AQE

news2025/7/28 2:00:07

文章目录

    • Spark3 AQE
      • 一、 背景
      • 二、 Spark 为什么需要AQE? (Why)
      • 三、 AQE 到底是什么?(What)
      • 四、AQE怎么用?(How)
        • 4.1 自动分区合并
        • 4.2 自动数据倾斜处理
        • 4.3 Join 策略调整
      • 五、对比验证
        • 5.1 执行耗时
        • 5.2 自动分区合并
        • 5.3 自动数据倾斜处理
      • 六、结论

Spark3 AQE

一、 背景

Spark 2.x 在遇到有数据倾斜的任务时,需要人为地去优化任务,比较费时费力;如果任务在Reduce阶段,Reduce Task 数据分布参差不齐,会造成各个excutor节点资源利用率不均衡,影响任务的执行效率;Spark 3新特性AQE极大地优化了以上任务的执行效率。

二、 Spark 为什么需要AQE? (Why)

RBO(Rule Based Optimization,基于规则的优化),它往往基于一些规则和策略实现,如谓词下推、列剪枝,这些规则和策略来源于数据库领域已有的应用经验。RBO实际上算是一种经验主义。

经验主义的弊端就是对待相似的问题和场景都使用同一类套路。Spark 社区正是因为意识到了 RBO 的局限性,因此在 2.x 版本中推出了CBO(Cost Based Optimization,基于成本的优化)。

CBO 是基于数据表的统计信息(如表大小、数据列分布)来选择优化策略。CBO 支持的统计信息很丰富,比如数据表的行数、每列的基数(Cardinality)、空值数、最大值、最小值等。因为有统计数据做支持,所以 CBO 选择的优化策略往往优于 RBO 选择的优化规则。

但是,CBO 也有三个方面的不足:

  • 适用面太窄,CBO 仅支持注册到 Hive Metastore 的数据表,但在其他的应用场景中,数据源往往是存储在分布式文件系统的各类文件,如 Parquet、ORC、CSV 等。

  • 统计信息的搜集效率比较低。对于注册到 Hive Metastore 的数据表,开发者需要调用 ANALYZE TABLE COMPUTE STATISTICS 语句收集统计信息,而各类信息的收集会消耗大量时间。

  • 静态优化,RBO、CBO执行计划一旦制定完成,就会按照该计划坚定不移地执行;如果在运行时数据分布发生动态变化,先前制定的执行计划并不会跟着调整、适配。

基于CBO的执行计划
在这里插入图片描述

  • Spark parses the query and creates the Unresolved Logical Plan 创建Unresolved Logical Plan
    • Validates the syntax of the query. 验证语法
    • Doesn’t validate the semantics meaning column name existence, data types. 不验证语义、字段是否存在、数据类型
  • Analysis: Using the Catalyst, it converts the Unresolved Logical Plan to Resolved Logical Plan a.k.a Logical Plan. 转换为Logical Plan
    • The catalog contains the column names and data types, during this step, it validates the columns mentioned in a query with catalog.
  • Optimization: Converts Logical Plan into Optimized Logical Plan. 转换为 Optimized Logical Plan
  • Planner: Now it creates One or More Physical Plans from an optimized Logical plan. 创建一个或多个Physical Plans
  • Cost Model: In this phase, calculates the cost for each Physical plan and select the Best Physical Plan. CBO择优
  • RDD Generation: RDD’s are generated, this is the final phase of query optimization which generates RDD in Java bytecode.

三、 AQE 到底是什么?(What)

考虑到 RBO 和 CBO 的种种限制,Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution,自适应查询执行)。用一句话来概括,AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化

AQE 赖以优化的统计信息与 CBO 不同,这些统计信息并不是关于某张表或是哪个列,而是 Shuffle Map 阶段输出的中间文件。

基于AQE的执行计划
在这里插入图片描述

四、AQE怎么用?(How)

AQE三大特性:自动分区合并 、自动数据倾斜处理、Join 策略调整

4.1 自动分区合并

在 Shuffle 过后,Reduce Task 数据分布参差不齐,AQE 将自动合并过小的数据分区。

那么AQE是如何确定多小的分区需要合并,以及分区合并到多大时停止合并?

对于所有的数据分区,无论大小,AQE 按照分区编号从左到右进行扫描,边扫描边记录分区尺寸,当相邻分区的尺寸之和大于 “推荐尺寸”时,AQE 就把这些扫描过的分区进行合并。然后,继续向右扫描,并采用同样的算法,按照目标尺寸合并剩余分区,直到所有分区都处理完毕。

“推荐尺寸”是由spark.sql.adaptive.advisoryPartitionSizeInBytes设置

配置说明
spark.sql.adaptive.coalescePartitions.enabledWhen true and ‘spark.sql.adaptive.enabled’ is true, Spark will coalesce contiguous shuffle partitions according to the target size (specified by ‘spark.sql.adaptive.advisoryPartitionSizeInBytes’), to avoid too many small tasks.
开启分区自动合并,默认开启
spark.sql.adaptive.advisoryPartitionSizeInBytesThe advisory size in bytes of the shuffle partition during adaptive optimization (when spark.sql.adaptive.enabled is true). It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition.
分区合并后的推荐尺寸,默认为64M
spark.sql.adaptive.coalescePartitions.minPartitionSizeThe minimum size of shuffle partitions after coalescing. This is useful when the adaptively calculated target size is too small during partition coalescing.
可合并分区尺寸大小,默认为1M

假设推荐尺寸为100M,shuffle后每一个分区的大小为70M、30M、80M、90M、10M、20M

按照正常情况(顺序处理),会启动4个reduce task:

第一个处理:70M、30M

第二个处理:80M

第三个处理:90M

第四个处理:10M、20M

spark3.0版本按照上述情况合并之后,各分区数据还是出现了不均衡,从而导致后续计算出现小的数据倾斜

查看spark 3.2官网新增了 spark.sql.adaptive.coalescePartitions.minPartitionSize(合并分区后,最小分区尺寸),如果把该参数设置为和推荐尺寸一致,那是不是只会启动3个 reduce task,3个都处理100M的数据?(个人猜想,不是按照顺序合并,而是会先遍历分区大小,保证合并后的分区大小相近)

You do not need to set a proper shuffle partition number to fit your dataset. Spark can pick the proper shuffle partition number at runtime once you set a large enough initial number of shuffle partitions via spark.sql.adaptive.coalescePartitions.initialPartitionNum configuration. 官方建议:分区数不用设置,spark会自动设置合适的分区。

4.2 自动数据倾斜处理

AQE 自动拆分 Reduce 阶段过大的数据分区,降低单个Reduce Task 的工作负载。

AQE 如何判定数据分区是否倾斜呢?它又是怎么把大分区拆分成多个小分区的?

配置说明
spark.sql.adaptive.skewJoin.enabledWhen true and ‘spark.sql.adaptive.enabled’ is true, Spark dynamically handles skew in shuffled join (sort-merge and shuffled hash) by splitting (and replicating if needed) skewed partitions.
开启AQE自动数据倾斜处理
spark.sql.adaptive.skewJoin.skewedPartitionFactorA partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than ‘spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes’
判定数据分区是否倾斜,默认值为5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytesA partition is considered as skewed if its size in bytes is larger than this threshold and also larger than ‘spark.sql.adaptive.skewJoin.skewedPartitionFactor’ multiplying the median partition size. Ideally this config should be set larger than ‘spark.sql.adaptive.advisoryPartitionSizeInBytes’.
判定数据分区是否倾斜,默认值为256M

判定倾斜分区:大于 median partition size * spark.sql.adaptive.skewJoin.skewedPartitionFactor 且 大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

假设数据表 A 有 3 个分区,分区大小分别是80MB、100MB 和 512MB。这些分区按大小个排序后的中位数是 100MB,因为

skewedPartitionFactor 的默认值是 5 ,skewedPartitionThresholdInBytes默认值是256M,512M大于 100MB * 5 = 500MB且大于256M 的分区被判定为倾斜分区。

拆分倾斜分区:上述例子512M分区会被拆分 512/256 =2个分区

4.3 Join 策略调整

如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从 Shuffle Sort Merge Join 降级(Demote)为执行效率更高的 Broadcast Hash Join。

Join 策略调整指的就是 Spark SQL在运行时动态地将原本的 Shuffle Join 策略,调整为执行更加高效的 Broadcast Join。

具体来说,每当 DAG 中的 Map 阶段执行完毕,Spark SQL 就会结合 Shuffle 中间文件的统计信息,重新计算 Reduce 阶段数据表的存储大小。如果发现基表尺寸小于广播阈值, 那么 Spark SQL 就把下一阶段的 Shuffle Join 调整为 Broadcast Join。

Broadcast Join 以广播的方式将小表的全量数据分发到集群中所有的 Executors,大表的数据就可以与小表数据在Process local级别进行关联操作。本地性级别有 4 种:Process local < Node local < Rack local < Any。

配置说明
spark.sql.adaptive.autoBroadcastJoinThresholdConfigures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework.
可广播的表尺寸阈值,默认10M

五、对比验证

spark3.2 开启AQE

http://szzb-bg-uat-etl-16:18080/history/application_1665940579703_5390/

spark3.2 关闭AQE

http://szzb-bg-uat-etl-16:18080/history/application_1665940579703_5812/

spark2.2 无AQE

http://szzb-bg-uat-etl-11:18080/history/application_1665940579703_9257/

5.1 执行耗时

使用资源: --driver-memory 6g --executor-memory 6g --executor-cores 6

耗时
spark3.2 开启AQE4.3 min
spark3.2 关闭AQE6.7 min
spark2.2 无AQE> 9.6min (OOM)

5.2 自动分区合并

在这里插入图片描述

5.3 自动数据倾斜处理

查看各stage执行时间

  • spark3.2 开启AQE 每个stage执行时间相差不大 (需要看每个stage的tasks )

  • spark2.2 无AQE 每个stage执行时间相差较大 (需要看每个stage的tasks )
    在这里插入图片描述

查看执行时间最长的stage数据分布

  • spark3.2 开启AQE Shuffle Read/Write 数据较均衡

  • spark2.2 无AQE Shuffle Read/Write 数据不均衡

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GB5NWCsF-1677203632031)(C:\Users\602946\AppData\Roaming\Typora\typora-user-images\image-20221019110500813.png)]

六、结论

spark3.2.2 开启AQE(默认开启),当Reduce Task 数据分布参差不齐时,能够自动合并过小的数据分区;且在 Reduce 阶段存在数据倾斜的情况下,能够拆分大分区;通过对比执行时间,AQE能极大的提升任务的执行效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/368102.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电脑录屏怎么操作,操作步骤详解,2023新版

在日常的学习、生活和工作中&#xff0c;当小伙伴想要分享一段游戏视频或者教学视频时&#xff0c;电脑录屏就显得尤为重要了。但是小伙伴你是否知道电脑录屏怎么操作&#xff1f;今天小编就分享电脑录屏操作步骤的详细教程&#xff0c;一起来看看吧。 电脑录屏怎么操作1&#…

【剑指Offer】重建二叉树(递归+迭代)

重建二叉树一、递归法二、迭代法题目链接 题目描述&#xff1a; 输入某二叉树的前序遍历和中序遍历的结果&#xff0c;请构建该二叉树并返回其根节点。 假设输入的前序遍历和中序遍历的结果中都不含重复的数字。 示例 1: Input: preorder [3,9,20,15,7], inorder [9,3,15,…

C进阶:7.程序环境和预处理

目录 1.程序的翻译环境和执行环境 2.详解编译 链接 2.1翻译环境 2.2编译本身也分为几个阶段&#xff1a; 2.3运行环境 3.预处理详解 3.1预定义符号 3.2 #define 3.2.1 #define 定义标识符 3.2.2 #define 定义宏 3.2.3 #define 替换规则 3.2.4 # 和 ## 3.2.5带副…

大规模食品图像识别:T-PAMI 2023论文解读

美团基础研发平台视觉智能部与中科院计算所展开科研课题合作&#xff0c;共同构建大规模数据集Food2K&#xff0c;并提出渐进式区域增强网络用于食品图像识别&#xff0c;相关研究成果已发表于T-PAMI 2023。本文主要介绍了数据集特点、方法设计、性能对比&#xff0c;以及基于该…

Unreal Engine 虚幻引擎,性能分析,优化(二)

一、CPU 性能分析 如渲染线程中出现 CPU 受限&#xff0c;原因可能是绘制调用过多。这是一个常见问题&#xff0c;美术师通常会将绘制调用进行组合&#xff0c;从而减少消耗&#xff08;如&#xff1a;将多个墙壁组合为一个网格体&#xff09;。实际消耗存在于多个区域中&…

Ubuntu 22.04.2 发布,可升级至 Linux Kernel 5.19

Ubuntu 22.04 LTS (Jammy Jellyfish) Ubuntu 22.04.2 发布&#xff0c;可升级至 Linux Kernel 5.19 请访问原文链接&#xff1a;Ubuntu 22.04 LTS (Jammy Jellyfish)&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a;www.sysin.org 发行说…

file_get_contents 打开本地文件报错: failed to open stream: No such file or directory

php 使用file_get_contents时报错 failed to open stream: No such file or directory (打开流失败&#xff0c;没有这样的文件或目录) 1. 首先确保文件路径没问题 最好是直接复制一下文件的路径 2. windows电脑可以右键该文件 → 属性→安全 →对象名称 选中后复制一下 3. 然后…

【数据存储】浮点型在内存中的存储

目录 一、存储现象 二、IEEE标准规范 1.存储 2.读取 三、举例验证 1.存储 2.读取 浮点型存储的标准是IEEE&#xff08;电气电子工程师学会&#xff09;754制定的。 一、存储现象 浮点数由于其有小数点的特殊性&#xff0c;有很多浮点数是不能精确存储的&#xff0c;如&#…

【unittest学习】unittest框架主要功能

1.认识unittest在 Python 中有诸多单元测试框架&#xff0c;如 doctest、unittest、pytest、nose 等&#xff0c;Python 2.1 及其以后的版本已经将 unittest 作为一个标准模块放入 Python 开发包中。2.认识单元测试不用单元测试框架能写单元测试吗&#xff1f;答案是肯定的。单…

UNIAPP实战项目笔记58 注册成功信息存储到数据库

UNIAPP实战项目笔记58 注册成功信息存储到数据库 注册时候验证手机验证码 验证码通过后讲用户信息存入数据库 实际案例图片 后端接口文件 index.js var express require(express); var router express.Router(); var connection require(../db/sql.js); var user require(…

爱奇艺“资产重定价”:首次全年运营盈利是拐点,底层逻辑大改善

长视频行业历时一年有余的降本增效、去肥增瘦&#xff0c;迎来首个全周期圆满收官的玩家。 北京时间2月22日美股盘前&#xff0c;爱奇艺发布2022年Q4及全年财报&#xff0c;Q4 Non-GAAP净利润明显超越预期&#xff0c;且首次实现全年运营盈利。受业绩提振&#xff0c;爱奇艺盘…

SpringSecurity源码分析(二) SpringBoot集成SpringSecurity即Spring安全框架的执行过程

在上一篇文章中我们描述了SpringSecurity的执行过程。我们我们了解到了以下内容 在SpringSecurity框架中有三个非常核心的类和接口&#xff0c;分别是 1.SecurityFilterChain接口 2.FilterChainProxy类 3.DelegatingFilterProxy类 springboot项目中&#xff0c;客户端向Tomcat …

day51【代码随想录】动态规划之回文子串、最长回文子序列

文章目录前言一、回文子串&#xff08;力扣647&#xff09;二、最长回文子序列&#xff08;力扣516&#xff09;前言 1、回文子串 2、最长回文子序列 一、回文子串&#xff08;力扣647&#xff09; 给你一个字符串 s &#xff0c;请你统计并返回这个字符串中 回文子串 的数目…

从网易到支付宝,3年外包生涯做完,我这人生算是彻底废了......

我为什么一直做外包呢&#xff0c;原因是薪资和技术方面。 在华三做了一年外包&#xff0c;薪资5k&#xff0c;功能测试&#xff0c;接触Linux和网络&#xff0c;但是说实在的技术很难沉淀&#xff0c;就像雾里看花一样&#xff0c;过年之后&#xff0c;想走的人都走了&#x…

Word处理控件Aspose.Words功能演示:使用 C# VB.NET 将 DOCX 转换为 DOC 或将 DOC 转换为 DOCX

Aspose.Words 是一种高级Word文档处理API&#xff0c;用于执行各种文档管理和操作任务。API支持生成&#xff0c;修改&#xff0c;转换&#xff0c;呈现和打印文档&#xff0c;而无需在跨平台应用程序中直接使用Microsoft Word。此外&#xff0c; Aspose API支持流行文件格式处…

深度学习编译器CINN(1):框架概览和编译安装

目录 框架概览 编译安装 参考 框架概览 CINN是一种在不改变模型代码的条件下加速飞桨模型运行速度的深度学习编译器。CINN致力于创造训推一体自动调优、分布式编译加速等特色功能&#xff0c;对深度学习模型提供全自动、极致的性能优化&#xff0c;并在科研界和工业界建立影…

【布隆过滤器(Bloom Filter)基本概念与原理、Bloom Filter优点与缺点、以及应用场景】

布隆过滤器&#xff08;Bloom Filter&#xff09;基本概念与原理、Bloom Filter优点与缺点、以及应用场景 Bloom Filter 基本概念 布隆过滤器是1970年由一个叫布隆的小伙子提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在…

DualCor: Event Causality Extraction with Event Argument Correlations论文解读

Event Causality Extraction with Event Argument Correlations(带有事件论元相关性的事件因果关系抽取) 论文&#xff1a;2301.11621.pdf (arxiv.org) 代码&#xff1a;cuishiyao96/ECE: Dataset and code for COLING2022 paper Event Causality Extraction with Event Argum…

【vue 10 第一个vue-cli程序】

1.什么是vue-cli vue-cli官方提供的一个脚手架&#xff0c;用于快速生成一个vue的项目模版&#xff1b; 预先定义好的目录结构及基础代码&#xff0c;就好比咱们在创建Maven项目时可以选择创建一个骨架项目&#xff0c;这个骨架项目就是脚手架&#xff0c;我们的开发更加的快速…

FC总线知识点小结

FC总线 &#xff08;一&#xff09;特点&#xff1a; FC具备通道和网络双重优势&#xff0c;具备高带宽、高可靠性、高稳定性&#xff0c;抵抗电磁干扰等优点&#xff0c;能够提供非常稳定可靠的光纤连接&#xff0c;容易构建大型的数据传输和通信网络&#xff0c;目前支持1x…