Spark 配置项

news2025/7/20 17:20:55

Spark 配置项

  • 硬件资源类
    • CPU
    • 内存
      • 堆外内
      • User Memory/Spark 可用内存
      • Execution/Storage Memory
    • 磁盘
  • Shuffle
  • Spark SQL
    • Join 策略调整
    • 自动分区合并
    • 自动倾斜处理

配置项分为 3 类:

  • 硬件资源类 : 与 CPU、内存、磁盘有关的配置项
  • Shuffle 类 : Shuffle 计算过程的配置项
  • Spark SQL : Spark SQL 优化配置项

读取配置项顺序 :SparkConf 对象 -> 命令行参数 -> 配置文件

硬件资源类

资源类别配置项含义
CPUspark.cores.max集群满 CPU 核
spark.executor.cores每个 Executors 可用的 CPU Cores
spark.default.parallelism默认并行度
spark.sql.shuffle.partitionsReduce 的默认并行度
spark.task.cpus每个任务可用的 CPU 核
spark.executor.instances集群内 Executors 的个数
内存spark.executor.memory单个 Executor 的堆内内存总大小
spark.memory.offHeap.enabled是否启动堆外内存
spark.memory.offHeap.size单个 Executorp 的堆外内存总大小
spark.memory.fraction除 User Memory 外的内存空间占比
spark.memory.storageFraction缓存RDD的内存占比,执行内存占比= 1 - spark.memory.storageFraction
spark.rdd.compressRDD缓存是否压缩,默认不压缩
磁盘spark.local.dir存储 Shuffle 中间文件/RDD Cache 的磁盘目录

CPU

配置项:

spark.cores.max集群满 CPU 核
spark.executor.cores每个 Executors 可用的 CPU 核
spark.task.cpus每个任务可用的 CPU 核
spark.executor.instances集群内 Executors 的个数

并行度 : 定义分布式数据集划分的份数/粒度,决定了分布式任务的计算负载。并行度越高,数据的粒度越细,数据分片越多,数据越分散

并行度的配置项 :

spark.default.parallelism默认并行度
spark.sql.shuffle.partitionsReduce 的默认并行度

并行计算任务:在任一时刻整个集群能够同时计算的任务数量

  • 整个集群的并行计算任务数 = spark.executor.instances * spark.executor.cores

达到 CPU、内存、数据之间的平衡的约定 :

  • spark.executor.cores 指定 CPU Cores ,记为 c
  • Execution Memory 内存大小 ,记为 m
  • 分布式数据集的大小记为 D ,并行度记为 P,D/P = 每个数据分片大小
  • 一个数据分片对应着一个 Task(分布式任务),而一个 Task 又对应着一个 CPU Core

公式量化 :

# D/P = 数据分片大小,m/c = 每个 Task 分到的可用内存
D/P ~ m/c

内存

内存配置项 :

spark.executor.memory单个 Executor 的堆内内存总大小
spark.memory.offHeap.size单个 Executorp 的堆外内存总大小
(spark.memory.offHeap.enabled=true)
spark.memory.fraction堆内内存中,用于缓存RDD和执行计算的内存比例
spark.memory.storageFraction缓存RDD的内存占比,执行内存占比= 1 - spark.memory.storageFraction
spark.rdd.compressRDD缓存是否压缩,默认不压缩

在这里插入图片描述

  • Reserved Memory 大小固定为 300MB
  • M 指定了 Executor 进程的 JVM Heap 大小 ( Executor Memory )
  • Execution Memory 的组成: Execution Memory、Storage Memory 、UserMemory
  • User Memory : 存储用户自定义的数据结构,如 : RDD 的各类实例化对象或集合类型(如: 数组、列表等)
  • Spark 1.6 后,推出了动态内存管理模式,Execution Memory/Storage Memory 能互相抢占

堆外内

堆外存储:

  • int 的用户 ID、String 的姓名、int的年龄、Char 的性别

在这里插入图片描述

处理数据集:

  • 数据模式比较扁平,而且字段多是定长数据类型,就更多使用堆外内存
  • 数据模式很复杂,嵌套结构/变长字段很多,就更多使用 JVM 堆内内存

User Memory/Spark 可用内存

User Memory :存储开发者自定义的数据结构,这些数据结构需要协助分布式数据集的处理

spark.memory.fraction : 明确 Spark 可支配内存占比,即 :User Memory 堆内占比 = 1 - spark.memory.fraction

  • spark.memory.fraction :系数越大,Spark 可支配的内存越多,User Memory 占比越小
  • spark.memory.fraction 默认值是 0.6,JVM 堆内的 60% 给 Spark支配,40% 给 User Memory

调整内存相对占比:

  • 自定义数据结构多,spark.memory.fraction 调低,用于分布式计算和缓存分布式数据集
  • 自定义数据结构少,spark.memory.fraction 调高,用于分布式计算和缓存分布式数据集

Execution/Storage Memory

sf 的设置情况:

  • ETL :RDD Cache 使用少。就能将 sf 设低点,让 Execution Memory 大点
  • 缓存密集型 :机器学习:RDD Cache 使用较多,就能把 sf 设高点,让 Storage Memory 大点
  • 过多的缓存会引发 GC(Garbage Collection,垃圾回收)

JVM 把 Heap 堆内内存分为:

  • 年轻代:存储生命周期较短、引用次数较低的对象,会引发 Young GC
  • 老年代:存储生命周期较长、引用次数高的对象,会引发 Full GC
  • RDDcache 会存在老年代

Full GC时,会引发 STW:

  • 抢占应用程序执行线程,把所有 CPU 线程都做垃圾回收,应用程序的暂时不执行(Stop the world)
  • 等 Full GC 完事后,才把 CPU 线程释放,应用程序才能继续执行
  • Full GC 弊端远大于 Young GC

为了 RDD cache 访问效率,用 RDD/DataFrame/Dataset.cache ,以对象值形式缓存到内存 (避免序列化消耗)

  • 用对象值形式缓存数据,每条数据都要构成一个对象 (自定义Case class, Row 对象)
  • 当大量的 RDD cache 时,会引发 Full GC
  • 当应用是缓存密集型,需要大量缓存,为了执行效率,可以改用序列化

spark.rdd.compress :RDD 缓存默认不压缩

  • 启用压缩后,能节省缓存内存的占用,把更多的内存空间留给分布式任务执行
  • 启用压缩后,会引入额外的计算开销、牺牲 CPU

磁盘

磁盘的配置项:

  • spark.local.dir :任意的本地文件系统目录,默认值是 /tmp 。 用于存储各种各样的临时数据,如: Shuffle 中间文件、RDD Cache。

有条件可以设置个大而性能好的文件系统,如:空间足够大的 SSD 文件系统目录

Shuffle

spark.shuffle.file.bufferMap 输出端的写缓冲区的大小
spark.reducer.maxSizeInFlightReduce 输入端的读缓冲区的大小
spark.shuffle.sort.bypassMergeThresholdMap 阶段不进行排序的分区阈值

Shuffle 的计算的两个阶段:

  • Map 阶段:执行映射逻辑,并按 Reducer 的分区规则,将中间数据写入到本地磁盘
  • Reduce 阶段:从各个节点下载数据分片,并根据需要实现聚合计算
  • Map 阶段的计算结果(中间文件),会存储到写缓冲区(Write Buffer),满后再写入到磁盘文件系统
  • Reduce 阶段,通过网络从不同节点的磁盘中拉取中间文件,以数据块暂存到计算节点的读缓冲区(Read Buffer),满后再写入到磁盘文件系统

自 Spark 1.6 后,全用 Sort shuffle manager 管理 Shuffle

  • Sort shuffle manager 会把 Map/Reduce 都引入排序

repartition、groupBy 就没有排序的需求,引入的排序就是额外的计算开销

  • 不需要聚合/排序时,调整 spark.shuffle.sort.bypassMergeThreshold 改变 Reduce 端的并行度(默认值 200)。当 Reduce 的分区数 < 该值时,Shuffle 就不会引入排序

Spark SQL

作用配置项含义
AQEspark.sql.adaptive.enabled是否启用 AQE
Join 策略spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin非空分区比例 < 该值,调整Join策略
spark.sql.autoBroadcastJoinThreshold基表 < 该值, 触发Broadcast Join
自动分区合并spark.sql.adaptive.coalescePartitions.enabled是否启用合并分区
spark.sql.adaptive.advisoryPartitionSizelnBytes合并后的目标分区大小
spark.sql.adaptive.coalescePartitions.minPartitionNum分区合并后,并行度 > 该值
自动倾斜处理spark.sql.adaptive.skewJoin.enabled是否自动处理数据倾斜
spark.sql.adaptive.skewJoin.skewedPartitionFactor倾斜分区的比例系数
spark.sql.adaptive.skewJoin.skewedPartitionThresholdlnBytes倾斜分区的最低阀值
spark.sql.adaptive.advisoryPartitionSizeInBytes拆分倾斜分区粒度 (字节)

Spark 3.0 推出 AQE (Adaptive Query Execution, 自适应查询执行) 的 3 个动态优化特性: Join 策略调整、自动分区合并、自动倾斜处理

# 启用 AQE
spark.sql.adaptive.enabled true

Join 策略调整

Join 策略调整 : Spark SQL 在运行时动态调整为 Broadcast Join

  • 每当 DAG 中的 Map 阶段执行完毕,会结合 Shuffle 中间文件的统计信息,重新计算 Reduce 数据表的存储大小。当基表 < autoBroadcastJoinThreshold时,下个阶段就可能变为 Broadcast Join

动态 Join 策略的条件二 :大表过滤后,非空分区比例 < nonEmptyPartitionRatioForBroadcastJoin,才能成功触发 Broadcast Join 降级

  • 例子 :大表有 100 个分区,过滤后只有 15 个分区有数据
  • 非空分区比例 : 15 / 100 = 15% < 20% , 就触发 Broadcast Join 降级

配置项:

# AQE前,基表 < 该值,就会触发 Broadcast Join
spark.sql.autoBroadcastJoinThreshold 10m

# AQE后,非空分区比例 < 该值,就调整动态 Join 策略
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 0.5

Spark SQL 的广播阈值对比的两种情况:

  • 基表来自文件系统,用基表在磁盘的存储大小与广播阈值对比
  • 基表来自 DAG 的中间文件,用 DataFrame 执行计划中的统计值与广播阈值对比

DataFrame 执行计划中的统计值 :

val df: DataFrame = _
// 先对分布式数据集加Cache
df.cache.count

// 获取执行计划
val plan = df.queryExecution.logical

// 获取执行计划对于数据集大小的精确预估
val estimated: BigInt = spark
  .sessionState
  .executePlan(plan)
  .optimizedPlan
  .stats
  .sizeInBytes

自动分区合并

自动分区合并 :解决 Reduce 过小的分区,而导致的数据的不均衡问题

分区合并示意图 :

  • 依序扫描数据分区,当相邻分区的尺寸之和 > 实际大小时,就把扫描过的分区做一次合并

在这里插入图片描述

# 是否启用自动分区合并,默认启用
spark.sql.adaptive.coalescePartitions.enabled true

# 合并后的目标分区大小
spark.sql.adaptive.advisoryPartitionSizelnBytes 256MB

# 分区合并后,并行度 > 该值
spark.sql.adaptive.coalescePartitions.minPartitionNum 1
  • 每个分区的平均大小 = 数据集大小/最低并行度
  • 实际大小 = min(advisoryPartitionSizeInBytes , 分区的平均大小)

例子 :Shuffle 中间文件 = 20GB,minPartitionNum = 200,

  • 每个分区的尺寸= 20GB / 200 =100MB
  • 设 advisoryPartitionSizeInBytes = 200MB,最终分区 = min(100MB,200MB) = 100MB

自动倾斜处理

自动倾斜处理:把倾斜的数据分区拆分成小分区

  • 对所有数据分区按大小做排序,取中位数。将 中位数 * skewedPartitionFactor ,得到判定阈值。凡是 > 阈值的数据分区,就可能认为倾斜分区
  • 当可能倾斜分区 > skewedPartitionThresholdInBytes,就会判定为倾斜分区

配置项 :

# 开启自动倾斜处理
spark.sql.adaptive.skewJoin.enabled true

# 判断大分区,倾斜分区的比例系数
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5

# 判断大分区,倾斜分区的最低阔值
spark.sql.adaptive.skewJoin.skewedPartitionThresholdinBytes 256MB

# 拆分大分区 , 倾斜分区的拆分单位
spark.sql.adaptive.advisoryPartitionSizelnBytes 256MB

例子:数据表有 3 个分区:90MB、100MB 、512MB。中位数是 100MB

  • 判定阈值 = 中位数 * skewedPartitionFactor = 100MB * 5 = 500MB
  • 512MB 为候选分区
  • 512MB > skewedPartitionThresholdInBytes(256MB) ,就认为该分区是倾斜分区
  • 512MB < skewedPartitionThresholdInBytes(1GB) ,就不是倾斜分区
  • 再根据 advisoryPartitionSizeInBytes(256MB) , 对大分区进行拆分
  • 512MB 被拆成两个小分区(512MB / 2 = 256MB)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/395336.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL 中的 distinct 和 group by 的性能比较

1 先说大致的结论&#xff08;完整结论在文末&#xff09;在语义相同&#xff0c;有索引的情况下&#xff1a;group by和 distinct 都能使用索引&#xff0c;效率相同。在语义相同&#xff0c;无索引的情况下&#xff1a;distinct 效率高于group by。原因是 distinct 和 group …

【软件开发】基于PyQt5开发的标注软件

这里是基于PyQt5写的面向目标检测的各类标注PC端软件系统。目前现有的labelme软件和labelImg开源软件无法满足特殊数据集的标注要求&#xff0c;而且没有标注顺序的报错提示。当然我设计的软件就会不具有适用性了&#xff08;毕竟从下面开发的软件可以明显看出来我做的基本上是…

spark-submit报错

spark-submit --class ${main_class} \ --master yarn \ --deploy-mode client \ --driver-memory 8g \ --executor-memory 6g \ --num-executors 1 \ --executor-cores 34 \ ${HOME_PATH}/obs_finance-1.0-SNAPSHOT-jar-with-dependencies.jar ${year_month} 注意这个34超过…

【教学典型案例】28.单表的11个Update接口--MyBatis

目录一&#xff1a;背景介绍二&#xff1a;前期准备引入pom依赖MyBatis配置文件数据库连接文件MyBatis配置类三&#xff1a;代码编写Mapper编写接口通用mapper实体pojojunit测试编写测试结果四&#xff1a;总结一&#xff1a;背景介绍 在进行项目开发编写更新接口时&#xff0…

顶级动漫IP加持之下,3A策略游戏Mechaverse如何改变GameFi

2021年是元宇宙发展的元年&#xff0c;元宇宙与GameFi创造了一波又一波市场热点。在经历第一波热潮之后&#xff0c;元宇宙的到来让不少人看到了加密市场的潜力&#xff0c;同时大家也意识到这将是未来的重要方向。如何将元宇宙推向更广阔的市场&#xff0c;让更多人能够轻松进…

项目管理工具DHTMLX Gantt灯箱元素配置教程:显示任务内容

DHTMLX Gantt是用于跨浏览器和跨平台应用程序的功能齐全的Gantt图表。可满足项目管理应用程序的大部分开发需求&#xff0c;具备完善的甘特图图表库&#xff0c;功能强大&#xff0c;价格便宜&#xff0c;提供丰富而灵活的JavaScript API接口&#xff0c;与各种服务器端技术&am…

神经网络(容易被忽视的基础知识)

主要内容&#xff1a; 基本神经元作为线性分类器的单个神经元为什么要是深度神经网而不是”肥胖“神经网络&#xff1f;为什么在人工神经网络中的神经元需要激活函数&#xff1f;一些主要的激活函数神经网络中的偏置有什么意义&#xff1f;初始化神经网络的参数可以全为0吗&am…

【java】java基本类型和包装类型的区别

文章目录简介1.包装类型可以为 null&#xff0c;而基本类型不可以2.包装类型可用于泛型&#xff0c;而基本类型不可以3.基本类型比包装类型更高效4.自动装箱和自动拆箱简介 Java 的每个基本类型都对应了一个包装类型&#xff0c;比如说 int 的包装类型为 Integer&#xff0c;d…

ECOLOY直接更换流程表单后导致历史流程中数据为空白的解决方案

用户反馈流历史流程打开是空白了没有内容。 一、问题调查分析&#xff1a; 工作流“XX0204 员工培训协议审批流程”workflowId37166产生的7个具体流程中&#xff0c;创建日期为2021年的4个具体流程原先引用的数据库表单应该是“劳动合同签订审批表”(formtable_main_190)&…

JavaScript-缓存

参考资料彻底解决让用户清一下浏览器缓存浏览器缓存彻底理解浏览器的缓存机制彻底弄懂前端缓存浅解强缓存和协商缓存浏览器缓存策略(强缓存和协商缓存)一文搞定Http缓存-强制缓存与协商缓存前端浏览器缓存知识梳理ASP.NET Core 中使用缓存IIS中设置Cache-Control是什么当我们第…

[Gin]框架底层实现理解(一)

前言&#xff1a;路由原理———压缩字典 这边简单讲一下gin非常重要的一个基点&#xff0c;也就是他作为go web框架的一个亮点 也就是Trie树和压缩字典算法 gin 通过树来存储路由&#xff0c;讲路由的字符拆解为一个个的结点&#xff0c;在获取handler函数时&#xff0c;会…

项目管理系统软件有哪些?这10款最好用的项目管理软件值得推荐

项目管理系统软件有哪些&#xff1f;这10款好用的项目管理系统软件值得推荐 如今企业规模不断扩大&#xff0c;业务逐渐复杂化&#xff0c;项目管理系统已经成为现代企业管理中不可或缺的一环&#xff1b; 而项目管理系统软件不仅可以保证项目按时完成&#xff0c;提高团队的…

【Java基础 下】 029 -- 多线程

目录 一、为什么要有多线程&#xff1f; 1、线程与进程 2、多线程的应用场景 3、小结 二、多线程中的两个概念&#xff08;并发和并行&#xff09; 1、并发 2、并行 3、小结 三、多线程的三种实现方式 1、继承Thread类的方式进行实现 2、实现Runnable接口的方式进行实现 3、利用…

MySQL简述

一、什么是数据库 数据库&#xff08;DB&#xff0c;DataBase&#xff09; 概念&#xff1a;数据仓库&#xff0c;相当于一款软件&#xff0c;安装在操作系统&#xff08;Windows&#xff0c;Linux&#xff0c;mac&#xff0c;…&#xff09;之上&#xff0c;可以存储大量的数…

nvm安装及使用(win)

一、安装nvm 下载地址&#xff1a;下载之后安装包安装 安装路径不要有中文或者空格 安装时尽量不要选择系统盘&#xff08;安装在系统盘可能会涉及到权限问题&#xff09; 安装前一定要把 node 的环境变量和 npm 的环境变量删除&#xff0c;否则在切换 node 版本时会报错 二、…

SCL_PFENET跑通填坑

1.数据准备&#xff1a;VOC2012数据集&#xff0c;initmodel文件夹&#xff08;预训练模型&#xff09;&#xff0c;SegmentationClassAug数据2.训练部分&#xff1a;训练部分没什么需要改动的&#xff0c;也就改一下选择的配置文件。在config文件夹里有关于coco和voc数据的配置…

备战蓝桥杯Day3python——迭代器的使用

python封装的迭代器 我们接下来介绍用于返回笛卡尔积的Permutations函数 Permutations(iterator, r) r:表示进行迭代后返回的长度 实例&#xff1a; >>> from itertools import permutations >>> a abc >>> for i in permutations(a,3):print …

Java深拷贝,浅拷贝

一、浅拷贝&#xff1a; &#xff08;1&#xff09; 对于数据类型是基本数据类型的成员变量&#xff0c;浅拷贝会直接进行值传递&#xff0c;也就是将该属性复制一份给新的对象。因为是两份不同的数据&#xff0c;所以对其中一的对象的成员变量值进行修改&#xff0c;不会影响另…

大考在即 百度版ChatGPT会翻车吗?

文心一言的发布会定档3月16日&#xff0c;不出意外&#xff0c;百度创始人李彦宏、CTO王海峰将出现在北京总部的发布会现场。这是百度版ChatGPT最新的官方消息&#xff0c;2月7日&#xff0c;文心一言首次官宣&#xff0c;当时称&#xff0c;产品“3月见”。 3月如期而至&…

C语言再学习 -- __attribute__详解

一、attribute 介绍 __attribute__是一个编译属性&#xff0c;用于向编译器描述特殊的标识、错误检查或高级优化。它是GNU C特色之一&#xff0c;系统中有许多地方使用到。__attribute__可以设置函数属性&#xff08;Function Attribute&#xff09;、变量属性&#xff08;Var…